diff --git a/gun/gun.go b/gun/gun.go index 75f19f3..f2cad9e 100644 --- a/gun/gun.go +++ b/gun/gun.go @@ -7,18 +7,20 @@ import ( ) type Gun struct { - peers []Peer - storage Storage - soulGen func() string + peers []Peer + storage Storage + soulGen func() string + peerErrorHandler func(*ErrPeer) - messageIDPutListeners map[string]chan<- *ReceivedMessage + messageIDPutListeners map[string]chan<- *MessageReceived messageIDPutListenersLock sync.RWMutex } type Config struct { - Peers []Peer - Storage Storage - SoulGen func() string + Peers []Peer + Storage Storage + SoulGen func() string + PeerErrorHandler func(*ErrPeer) } func New(config Config) *Gun { @@ -26,7 +28,8 @@ func New(config Config) *Gun { peers: make([]Peer, len(config.Peers)), storage: config.Storage, soulGen: config.SoulGen, - messageIDPutListeners: map[string]chan<- *ReceivedMessage{}, + peerErrorHandler: config.PeerErrorHandler, + messageIDPutListeners: map[string]chan<- *MessageReceived{}, } // Copy over peers copy(g.peers, config.Peers) @@ -62,45 +65,43 @@ func NewFromPeerURLs(ctx context.Context, peerURLs ...string) (g *Gun, err error return New(c), nil } -type Message struct { - Ack string `json:"@,omitEmpty"` - ID string `json:"#,omitEmpty"` - Sender string `json:"><,omitEmpty"` - Hash string `json:"##,omitempty"` - How string `json:"how,omitempty"` - Get *MessageGetRequest `json:"get,omitempty"` - Put map[string]*Node `json:"put,omitempty"` - DAM string `json:"dam,omitempty"` - PID string `json:"pid,omitempty"` +func (g *Gun) Close() error { + var errs []error + for _, p := range g.peers { + if err := p.Close(); err != nil { + errs = append(errs, err) + } + } + if len(errs) == 0 { + return nil + } else if len(errs) == 1 { + return errs[0] + } else { + return fmt.Errorf("Multiple errors: %v", errs) + } } -type MessageGetRequest struct { - ID string `json:"#,omitempty"` - Field string `json:".,omitempty"` +func (g *Gun) Send(ctx context.Context, msg *Message) <-chan *ErrPeer { + return g.send(ctx, msg, nil) } -type ReceivedMessage struct { - *Message - Peer Peer -} - -type PeerError struct { - Err error - Peer Peer -} - -func (g *Gun) Send(ctx context.Context, msg *Message) <-chan *PeerError { - ch := make(chan *PeerError, len(g.peers)) +func (g *Gun) send(ctx context.Context, msg *Message, ignorePeer Peer) <-chan *ErrPeer { + ch := make(chan *ErrPeer, len(g.peers)) // Everything async go func() { defer close(ch) var wg sync.WaitGroup for _, peer := range g.peers { + if peer == ignorePeer { + continue + } wg.Add(1) go func(peer Peer) { defer wg.Done() if err := peer.Send(ctx, msg); err != nil { - ch <- &PeerError{err, peer} + peerErr := &ErrPeer{err, peer} + go g.onPeerError(peerErr) + ch <- peerErr } }(peer) } @@ -113,37 +114,40 @@ func (g *Gun) startReceiving() { for _, peer := range g.peers { go func(peer Peer) { for msgOrErr := range peer.Receive() { - // TODO: what to do with error? if msgOrErr.Err != nil { - g.onPeerReceiveError(&PeerError{msgOrErr.Err, peer}) + go g.onPeerError(&ErrPeer{msgOrErr.Err, peer}) continue } // See if a listener is around to handle it instead of rebroadcasting - msg := &ReceivedMessage{Message: msgOrErr.Message, Peer: peer} + msg := &MessageReceived{Message: msgOrErr.Message, Peer: peer} if msg.Ack != "" && len(msg.Put) > 0 { g.messageIDPutListenersLock.RLock() l := g.messageIDPutListeners[msg.Ack] g.messageIDPutListenersLock.RUnlock() if l != nil { - safeReceivedMessageSend(l, msg) + go safeReceivedMessageSend(l, msg) continue } } - g.onUnhandledMessage(msg) + go g.onUnhandledMessage(msg) } }(peer) } } -func (g *Gun) onUnhandledMessage(msg *ReceivedMessage) { - +func (g *Gun) onUnhandledMessage(msg *MessageReceived) { + // Unhandled message means rebroadcast + // TODO: we need a timeout or global context here... + g.send(context.TODO(), msg.Message, msg.Peer) } -func (g *Gun) onPeerReceiveError(err *PeerError) { - +func (g *Gun) onPeerError(err *ErrPeer) { + if g.peerErrorHandler != nil { + g.peerErrorHandler(err) + } } -func (g *Gun) RegisterMessageIDPutListener(id string, ch chan<- *ReceivedMessage) { +func (g *Gun) RegisterMessageIDPutListener(id string, ch chan<- *MessageReceived) { g.messageIDPutListenersLock.Lock() defer g.messageIDPutListenersLock.Unlock() g.messageIDPutListeners[id] = ch @@ -160,14 +164,14 @@ func (g *Gun) UnregisterMessageIDPutListener(id string) { // } func (g *Gun) Scoped(ctx context.Context, key string, children ...string) *Scoped { - s := newScoped(g, "", key) + s := newScoped(g, nil, key) if len(children) > 0 { s = s.Scoped(ctx, children[0], children[1:]...) } return s } -func safeReceivedMessageSend(ch chan<- *ReceivedMessage, msg *ReceivedMessage) { +func safeReceivedMessageSend(ch chan<- *MessageReceived, msg *MessageReceived) { // Due to the fact that we may send on a closed channel here, we ignore the panic defer func() { recover() }() ch <- msg diff --git a/gun/message.go b/gun/message.go new file mode 100644 index 0000000..f9624f7 --- /dev/null +++ b/gun/message.go @@ -0,0 +1,23 @@ +package gun + +type Message struct { + Ack string `json:"@,omitEmpty"` + ID string `json:"#,omitEmpty"` + To string `json:"><,omitEmpty"` + Hash string `json:"##,omitempty"` + How string `json:"how,omitempty"` + Get *MessageGetRequest `json:"get,omitempty"` + Put map[string]*Node `json:"put,omitempty"` + DAM string `json:"dam,omitempty"` + PID string `json:"pid,omitempty"` +} + +type MessageGetRequest struct { + Soul string `json:"#,omitempty"` + Field string `json:".,omitempty"` +} + +type MessageReceived struct { + *Message + Peer Peer +} diff --git a/gun/node.go b/gun/node.go index 7f548ac..9e275e1 100644 --- a/gun/node.go +++ b/gun/node.go @@ -17,14 +17,14 @@ var SoulGenDefault = func() string { } type Node struct { - NodeMetadata + Metadata Values map[string]Value } func (n *Node) MarshalJSON() ([]byte, error) { // Just put it all in a map and then encode it toEnc := make(map[string]interface{}, len(n.Values)+1) - toEnc["_"] = &n.NodeMetadata + toEnc["_"] = &n.Metadata for k, v := range n.Values { toEnc[k] = v } @@ -49,7 +49,7 @@ func (n *Node) UnmarshalJSON(b []byte) error { } else if keyStr, ok := key.(string); !ok { return fmt.Errorf("Unrecognized token %v", key) } else if keyStr == "_" { - if err = dec.Decode(&n.NodeMetadata); err != nil { + if err = dec.Decode(&n.Metadata); err != nil { return fmt.Errorf("Failed unmarshaling metadata: %v", err) } } else if val, err := dec.Token(); err != nil { @@ -60,8 +60,8 @@ func (n *Node) UnmarshalJSON(b []byte) error { } } -type NodeMetadata struct { - ID string `json:"#"` +type Metadata struct { + Soul string `json:"#"` State map[string]int64 `json:">"` } @@ -110,7 +110,8 @@ func (n ValueRelation) MarshalJSON() ([]byte, error) { return json.Marshal(map[string]string{"#": string(n)}) } -type StatefulValue struct { +type ValueWithState struct { Value Value + // This is 0 for top-level values State int64 } diff --git a/gun/peer.go b/gun/peer.go index 9e4c27b..5704b56 100644 --- a/gun/peer.go +++ b/gun/peer.go @@ -8,6 +8,13 @@ import ( "github.com/gorilla/websocket" ) +type ErrPeer struct { + Err error + Peer Peer +} + +func (e *ErrPeer) Error() string { return fmt.Sprintf("Error on peer %v: %v", e.Peer, e.Err) } + type Peer interface { Send(ctx context.Context, msg *Message) error Receive() <-chan *MessageOrError diff --git a/gun/scoped.go b/gun/scoped.go index 198597c..9d12814 100644 --- a/gun/scoped.go +++ b/gun/scoped.go @@ -2,13 +2,17 @@ package gun import ( "context" + "errors" "sync" ) type Scoped struct { - gun *Gun - parentID string - field string + gun *Gun + + parent *Scoped + field string + cachedParentSoul string + cachedParentSoulLock sync.RWMutex valueChansToListeners map[<-chan *ValueFetch]*messageIDListener valueChansToListenersLock sync.Mutex @@ -17,14 +21,14 @@ type Scoped struct { type messageIDListener struct { id string values chan *ValueFetch - receivedMessages chan *ReceivedMessage + receivedMessages chan *MessageReceived } -func newScoped(gun *Gun, parentID string, field string) *Scoped { +func newScoped(gun *Gun, parent *Scoped, field string) *Scoped { return &Scoped{ - gun: gun, - parentID: parentID, - field: field, + gun: gun, + parent: parent, + field: field, } } @@ -32,16 +36,34 @@ type ValueFetch struct { // This can be a context error on cancelation Err error Field string - // Nil if there is an error - Value *StatefulValue + // Nil if the value doesn't exist or there's an error + Value *ValueWithState // Nil when local and sometimes on error Peer Peer } -type Ack struct { - Err error - Ok bool - Peer Peer +var ErrNotObject = errors.New("Scoped value not an object") +var ErrLookupOnTopLevel = errors.New("Cannot do lookup on top level") + +// Empty string if doesn't exist, ErrNotObject if self or parent not an object +func (s *Scoped) Soul(ctx context.Context) (string, error) { + s.cachedParentSoulLock.RLock() + cachedParentSoul := s.cachedParentSoul + s.cachedParentSoulLock.RUnlock() + if cachedParentSoul != "" { + return cachedParentSoul, nil + } else if v := s.Val(ctx); v.Err != nil { + return "", v.Err + } else if v.Value == nil { + return "", nil + } else if rel, ok := v.Value.Value.(ValueRelation); !ok { + return "", ErrNotObject + } else { + s.cachedParentSoulLock.Lock() + s.cachedParentSoul = string(rel) + s.cachedParentSoulLock.Unlock() + return string(rel), nil + } } func (s *Scoped) Val(ctx context.Context) *ValueFetch { @@ -53,14 +75,25 @@ func (s *Scoped) Val(ctx context.Context) *ValueFetch { } func (s *Scoped) ValLocal(ctx context.Context) *ValueFetch { - var v ValueFetch - if v.Value, v.Err = s.gun.storage.Get(ctx, s.parentID, s.field); v.Err == ErrStorageNotFound { - return nil + // If there is no parent, this is just the relation + if s.parent == nil { + return &ValueFetch{Field: s.field, Value: &ValueWithState{Value: ValueRelation(s.field)}} } - return &v + v := &ValueFetch{Field: s.field} + // Need parent soul for lookup + var parentSoul string + if parentSoul, v.Err = s.parent.Soul(ctx); v.Err == nil { + if v.Value, v.Err = s.gun.storage.Get(ctx, parentSoul, s.field); v.Err == ErrStorageNotFound { + return nil + } + } + return v } func (s *Scoped) ValRemote(ctx context.Context) *ValueFetch { + if s.parent == nil { + return &ValueFetch{Err: ErrLookupOnTopLevel, Field: s.field} + } ch := s.OnRemote(ctx) defer s.Off(ch) return <-ch @@ -68,29 +101,46 @@ func (s *Scoped) ValRemote(ctx context.Context) *ValueFetch { func (s *Scoped) On(ctx context.Context) <-chan *ValueFetch { ch := make(chan *ValueFetch, 1) - if v := s.ValLocal(ctx); v != nil { - ch <- v + if s.parent == nil { + ch <- &ValueFetch{Err: ErrLookupOnTopLevel, Field: s.field} + } else { + if v := s.ValLocal(ctx); v != nil { + ch <- v + } + go s.onRemote(ctx, ch) } - go s.onRemote(ctx, ch) return ch } func (s *Scoped) OnRemote(ctx context.Context) <-chan *ValueFetch { ch := make(chan *ValueFetch, 1) - go s.onRemote(ctx, ch) + if s.parent == nil { + ch <- &ValueFetch{Err: ErrLookupOnTopLevel, Field: s.field} + } else { + go s.onRemote(ctx, ch) + } return ch } func (s *Scoped) onRemote(ctx context.Context, ch chan *ValueFetch) { + if s.parent == nil { + panic("No parent") + } + // We have to get the parent soul first + parentSoul, err := s.parent.Soul(ctx) + if err != nil { + ch <- &ValueFetch{Err: ErrLookupOnTopLevel, Field: s.field} + return + } // Create get request req := &Message{ ID: randString(9), - Get: &MessageGetRequest{ID: s.parentID, Field: s.field}, + Get: &MessageGetRequest{Soul: parentSoul, Field: s.field}, } // Make a chan to listen for received messages and link it to // the given one so we can turn it "off". Off will close this // chan. - msgCh := make(chan *ReceivedMessage) + msgCh := make(chan *MessageReceived) s.valueChansToListenersLock.Lock() s.valueChansToListeners[ch] = &messageIDListener{req.ID, ch, msgCh} s.valueChansToListenersLock.Unlock() @@ -109,17 +159,15 @@ func (s *Scoped) onRemote(ctx context.Context, ch chan *ValueFetch) { if !ok { return } - // We asked for a single field, should only get that field - if n := msg.Put[s.parentID]; n != nil && n.Values[s.field] != nil { - // TODO: conflict resolution - // TODO: dedupe - // TODO: store and cache - safeValueFetchSend(ch, &ValueFetch{ - Field: s.field, - Value: &StatefulValue{n.Values[s.field], n.State[s.field]}, - Peer: msg.Peer, - }) + f := &ValueFetch{Field: s.field, Peer: msg.Peer} + // We asked for a single field, should only get that field or it doesn't exist + if n := msg.Put[parentSoul]; n != nil && n.Values[s.field] != nil { + f.Value = &ValueWithState{n.Values[s.field], n.State[s.field]} } + // TODO: conflict resolution and defer + // TODO: dedupe + // TODO: store and cache + safeValueFetchSend(ch, f) } } }() @@ -151,7 +199,11 @@ func (s *Scoped) Off(ch <-chan *ValueFetch) bool { } func (s *Scoped) Scoped(ctx context.Context, key string, children ...string) *Scoped { - panic("TODO") + ret := newScoped(s.gun, s, key) + for _, child := range children { + ret = newScoped(s.gun, ret, child) + } + return ret } func safeValueFetchSend(ch chan<- *ValueFetch, f *ValueFetch) { diff --git a/gun/storage.go b/gun/storage.go index e4c87e1..d730def 100644 --- a/gun/storage.go +++ b/gun/storage.go @@ -9,8 +9,8 @@ import ( var ErrStorageNotFound = errors.New("Not found") type Storage interface { - Get(ctx context.Context, parentID, field string) (*StatefulValue, error) - Put(ctx context.Context, parentID, field string, val *StatefulValue) (bool, error) + Get(ctx context.Context, parentSoul, field string) (*ValueWithState, error) + Put(ctx context.Context, parentSoul, field string, val *ValueWithState) (bool, error) // Tracking(ctx context.Context, id string) (bool, error) } @@ -18,10 +18,10 @@ type StorageInMem struct { values sync.Map } -func (s *StorageInMem) Get(ctx context.Context, parentID, field string) (*StatefulValue, error) { +func (s *StorageInMem) Get(ctx context.Context, parentSoul, field string) (*ValueWithState, error) { panic("TODO") } -func (s *StorageInMem) Put(ctx context.Context, parentID, field string, val *StatefulValue) (bool, error) { +func (s *StorageInMem) Put(ctx context.Context, parentSoul, field string, val *ValueWithState) (bool, error) { panic("TODO") } diff --git a/gun/tests/context_test.go b/gun/tests/context_test.go index a588982..55f36c0 100644 --- a/gun/tests/context_test.go +++ b/gun/tests/context_test.go @@ -64,7 +64,7 @@ func (t *testContext) startJS(script string) (*bytes.Buffer, *exec.Cmd, context. func (t *testContext) startGunServer(port int) { // Remove entire data folder first - t.Require.NoError(os.RemoveAll("rodata-server")) + t.Require.NoError(os.RemoveAll("radata-server")) t.startJS(` var Gun = require('gun') const server = require('http').createServer().listen(` + strconv.Itoa(port) + `) diff --git a/gun/tests/gun_test.go b/gun/tests/gun_test.go index 70c9aae..09fe0dd 100644 --- a/gun/tests/gun_test.go +++ b/gun/tests/gun_test.go @@ -1,11 +1,6 @@ package tests -import ( - "testing" - - "github.com/cretz/esgopeta/gun" -) - +/* func TestGunGo(t *testing.T) { // Run the server, put in one call, get in another, then check ctx, cancelFn := newContext(t) @@ -46,3 +41,4 @@ func TestGunGo(t *testing.T) { ctx.Require.NoError(f.Err) } +*/ diff --git a/gun/util.go b/gun/util.go index cdcc1d9..a12b277 100644 --- a/gun/util.go +++ b/gun/util.go @@ -1,6 +1,8 @@ package gun -import "crypto/rand" +import ( + "crypto/rand" +) const randChars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"