From 5fa024d1e38ebee9d69d3607d53fd94cc903f93a Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Sun, 24 Feb 2019 22:23:15 -0600 Subject: [PATCH] Successful put --- gun/gun.go | 50 +++++------ gun/message.go | 2 + gun/node.go | 3 +- gun/peer.go | 24 ++--- gun/scoped.go | 202 ++++++------------------------------------ gun/scoped_fetch.go | 169 +++++++++++++++++++++++++++++++++++ gun/scoped_put.go | 191 +++++++++++++++++++++++++++++++++++++++ gun/storage.go | 1 + gun/tests/gun_test.go | 37 ++++++-- 9 files changed, 460 insertions(+), 219 deletions(-) create mode 100644 gun/scoped_fetch.go create mode 100644 gun/scoped_put.go diff --git a/gun/gun.go b/gun/gun.go index eeb8547..ed5c27a 100644 --- a/gun/gun.go +++ b/gun/gun.go @@ -16,8 +16,8 @@ type Gun struct { myPeerID string tracking Tracking - messageIDPutListeners map[string]chan<- *MessageReceived - messageIDPutListenersLock sync.RWMutex + messageIDListeners map[string]chan<- *MessageReceived + messageIDListenersLock sync.RWMutex } type Config struct { @@ -42,14 +42,14 @@ const DefaultPeerSleepOnError = 30 * time.Second func New(ctx context.Context, config Config) (*Gun, error) { g := &Gun{ - peers: make([]*gunPeer, len(config.PeerURLs)), - storage: config.Storage, - soulGen: config.SoulGen, - peerErrorHandler: config.PeerErrorHandler, - peerSleepOnError: config.PeerSleepOnError, - myPeerID: config.MyPeerID, - tracking: config.Tracking, - messageIDPutListeners: map[string]chan<- *MessageReceived{}, + peers: make([]*gunPeer, len(config.PeerURLs)), + storage: config.Storage, + soulGen: config.SoulGen, + peerErrorHandler: config.PeerErrorHandler, + peerSleepOnError: config.PeerSleepOnError, + myPeerID: config.MyPeerID, + tracking: config.Tracking, + messageIDListeners: map[string]chan<- *MessageReceived{}, } // Create all the peers sleepOnError := config.PeerSleepOnError @@ -78,7 +78,7 @@ func New(ctx context.Context, config Config) (*Gun, error) { g.storage = &StorageInMem{} } if g.soulGen == nil { - g.soulGen = SoulGenDefault + g.soulGen = DefaultSoulGen } if g.myPeerID == "" { g.myPeerID = randString(9) @@ -162,10 +162,10 @@ func (g *Gun) startReceiving() { func (g *Gun) onPeerMessage(ctx context.Context, msg *MessageReceived) { // If there is a listener for this message, use it - if msg.Ack != "" && len(msg.Put) > 0 { - g.messageIDPutListenersLock.RLock() - l := g.messageIDPutListeners[msg.Ack] - g.messageIDPutListenersLock.RUnlock() + if msg.Ack != "" { + g.messageIDListenersLock.RLock() + l := g.messageIDListeners[msg.Ack] + g.messageIDListenersLock.RUnlock() if l != nil { go safeReceivedMessageSend(l, msg) return @@ -195,22 +195,18 @@ func (g *Gun) onPeerError(err *ErrPeer) { } } -func (g *Gun) RegisterMessageIDPutListener(id string, ch chan<- *MessageReceived) { - g.messageIDPutListenersLock.Lock() - defer g.messageIDPutListenersLock.Unlock() - g.messageIDPutListeners[id] = ch +func (g *Gun) RegisterMessageIDListener(id string, ch chan<- *MessageReceived) { + g.messageIDListenersLock.Lock() + defer g.messageIDListenersLock.Unlock() + g.messageIDListeners[id] = ch } -func (g *Gun) UnregisterMessageIDPutListener(id string) { - g.messageIDPutListenersLock.Lock() - defer g.messageIDPutListenersLock.Unlock() - delete(g.messageIDPutListeners, id) +func (g *Gun) UnregisterMessageIDListener(id string) { + g.messageIDListenersLock.Lock() + defer g.messageIDListenersLock.Unlock() + delete(g.messageIDListeners, id) } -// func (g *Gun) RegisterValueIDPutListener(id string, ch chan<- *ReceivedMessage) { -// panic("TODO") -// } - func (g *Gun) Scoped(ctx context.Context, key string, children ...string) *Scoped { s := newScoped(g, nil, key) if len(children) > 0 { diff --git a/gun/message.go b/gun/message.go index 83cf56c..596f4a1 100644 --- a/gun/message.go +++ b/gun/message.go @@ -12,6 +12,8 @@ type Message struct { Put map[string]*Node `json:"put,omitempty"` DAM string `json:"dam,omitempty"` PID string `json:"pid,omitempty"` + OK int `json:"ok,omitempty"` + Err string `json:"err,omitempty"` } func (m *Message) Clone() *Message { diff --git a/gun/node.go b/gun/node.go index 19b1e17..f86a356 100644 --- a/gun/node.go +++ b/gun/node.go @@ -7,7 +7,7 @@ import ( "strconv" ) -var SoulGenDefault = func() string { +func DefaultSoulGen() string { ms, uniqueNum := TimeNowUniqueUnix() s := strconv.FormatInt(ms, 36) if uniqueNum > 0 { @@ -65,6 +65,7 @@ type Metadata struct { State map[string]int64 `json:">,omitempty"` } +// TODO: put private methd to seal enum type Value interface { } diff --git a/gun/peer.go b/gun/peer.go index fb11b23..4fd0ab1 100644 --- a/gun/peer.go +++ b/gun/peer.go @@ -24,16 +24,20 @@ type Peer interface { Close() error } -var PeerURLSchemes = map[string]func(context.Context, *url.URL) (Peer, error){ - "http": func(ctx context.Context, peerURL *url.URL) (Peer, error) { - schemeChangedURL := &url.URL{} - *schemeChangedURL = *peerURL - schemeChangedURL.Scheme = "ws" - return NewPeerWebSocket(ctx, schemeChangedURL) - }, - "ws": func(ctx context.Context, peerURL *url.URL) (Peer, error) { - return NewPeerWebSocket(ctx, peerURL) - }, +var PeerURLSchemes map[string]func(context.Context, *url.URL) (Peer, error) + +func init() { + PeerURLSchemes = map[string]func(context.Context, *url.URL) (Peer, error){ + "http": func(ctx context.Context, peerURL *url.URL) (Peer, error) { + schemeChangedURL := &url.URL{} + *schemeChangedURL = *peerURL + schemeChangedURL.Scheme = "ws" + return NewPeerWebSocket(ctx, schemeChangedURL) + }, + "ws": func(ctx context.Context, peerURL *url.URL) (Peer, error) { + return NewPeerWebSocket(ctx, peerURL) + }, + } } func NewPeer(ctx context.Context, peerURL string) (Peer, error) { diff --git a/gun/scoped.go b/gun/scoped.go index 5f9829c..3720cbd 100644 --- a/gun/scoped.go +++ b/gun/scoped.go @@ -3,6 +3,7 @@ package gun import ( "context" "errors" + "fmt" "sync" ) @@ -14,200 +15,57 @@ type Scoped struct { cachedParentSoul string cachedParentSoulLock sync.RWMutex - resultChansToListeners map[<-chan *Result]*messageIDListener - resultChansToListenersLock sync.Mutex -} + fetchResultListeners map[<-chan *FetchResult]*fetchResultListener + fetchResultListenersLock sync.Mutex -type messageIDListener struct { - id string - results chan *Result - receivedMessages chan *MessageReceived + putResultListeners map[<-chan *PutResult]*putResultListener + putResultListenersLock sync.Mutex } func newScoped(gun *Gun, parent *Scoped, field string) *Scoped { return &Scoped{ - gun: gun, - parent: parent, - field: field, - resultChansToListeners: map[<-chan *Result]*messageIDListener{}, + gun: gun, + parent: parent, + field: field, + fetchResultListeners: map[<-chan *FetchResult]*fetchResultListener{}, + putResultListeners: map[<-chan *PutResult]*putResultListener{}, } } -type Result struct { - // This can be a context error on cancelation - Err error - Field string - // Nil if the value doesn't exist, exists and is nil, or there's an error - Value Value - State int64 // This can be 0 for errors or top-level value relations - ValueExists bool - // Nil when local and sometimes on error - peer *gunPeer -} - var ErrNotObject = errors.New("Scoped value not an object") -var ErrLookupOnTopLevel = errors.New("Cannot do lookup on top level") +var ErrLookupOnTopLevel = errors.New("Cannot do put/lookup on top level") // Empty string if doesn't exist, ErrNotObject if self or parent not an object func (s *Scoped) Soul(ctx context.Context) (string, error) { - s.cachedParentSoulLock.RLock() - cachedParentSoul := s.cachedParentSoul - s.cachedParentSoulLock.RUnlock() - if cachedParentSoul != "" { - return cachedParentSoul, nil - } else if r := s.Val(ctx); r.Err != nil { + if cachedSoul := s.cachedSoul(); cachedSoul != "" { + return cachedSoul, nil + } else if r := s.FetchOne(ctx); r.Err != nil { return "", r.Err } else if !r.ValueExists { return "", nil } else if rel, ok := r.Value.(ValueRelation); !ok { return "", ErrNotObject + } else if !s.setCachedSoul(rel) { + return "", fmt.Errorf("Concurrent soul cache set") } else { - s.cachedParentSoulLock.Lock() - s.cachedParentSoul = string(rel) - s.cachedParentSoulLock.Unlock() return string(rel), nil } } -func (s *Scoped) Put(ctx context.Context, val Value) <-chan *Result { - panic("TODO") +func (s *Scoped) cachedSoul() string { + s.cachedParentSoulLock.RLock() + defer s.cachedParentSoulLock.RUnlock() + return s.cachedParentSoul } -func (s *Scoped) Val(ctx context.Context) *Result { - // Try local before remote - if r := s.ValLocal(ctx); r.Err != nil || r.ValueExists { - return r +func (s *Scoped) setCachedSoul(val ValueRelation) bool { + s.cachedParentSoulLock.Lock() + defer s.cachedParentSoulLock.Unlock() + if s.cachedParentSoul != "" { + return false } - return s.ValRemote(ctx) -} - -func (s *Scoped) ValLocal(ctx context.Context) *Result { - // If there is no parent, this is just the relation - if s.parent == nil { - return &Result{Field: s.field, Value: ValueRelation(s.field), ValueExists: true} - } - r := &Result{Field: s.field} - // Need parent soul for lookup - var parentSoul string - if parentSoul, r.Err = s.parent.Soul(ctx); r.Err == nil { - var vs *ValueWithState - if vs, r.Err = s.gun.storage.Get(ctx, parentSoul, s.field); r.Err == ErrStorageNotFound { - r.Err = nil - } else if r.Err == nil { - r.Value, r.State, r.ValueExists = vs.Value, vs.State, true - } - } - return r -} - -func (s *Scoped) ValRemote(ctx context.Context) *Result { - if s.parent == nil { - return &Result{Err: ErrLookupOnTopLevel, Field: s.field} - } - ch := s.OnRemote(ctx) - defer s.Off(ch) - return <-ch -} - -func (s *Scoped) On(ctx context.Context) <-chan *Result { - ch := make(chan *Result, 1) - if s.parent == nil { - ch <- &Result{Err: ErrLookupOnTopLevel, Field: s.field} - close(ch) - } else { - if r := s.ValLocal(ctx); r.Err != nil || r.ValueExists { - ch <- r - } - go s.onRemote(ctx, ch) - } - return ch -} - -func (s *Scoped) OnRemote(ctx context.Context) <-chan *Result { - ch := make(chan *Result, 1) - if s.parent == nil { - ch <- &Result{Err: ErrLookupOnTopLevel, Field: s.field} - close(ch) - } else { - go s.onRemote(ctx, ch) - } - return ch -} - -func (s *Scoped) onRemote(ctx context.Context, ch chan *Result) { - if s.parent == nil { - panic("No parent") - } - // We have to get the parent soul first - parentSoul, err := s.parent.Soul(ctx) - if err != nil { - ch <- &Result{Err: ErrLookupOnTopLevel, Field: s.field} - return - } - // Create get request - req := &Message{ - ID: randString(9), - Get: &MessageGetRequest{Soul: parentSoul, Field: s.field}, - } - // Make a chan to listen for received messages and link it to - // the given one so we can turn it "off". Off will close this - // chan. - msgCh := make(chan *MessageReceived) - s.resultChansToListenersLock.Lock() - s.resultChansToListeners[ch] = &messageIDListener{req.ID, ch, msgCh} - s.resultChansToListenersLock.Unlock() - // Listen for responses to this get - s.gun.RegisterMessageIDPutListener(req.ID, msgCh) - // TODO: only for children: s.gun.RegisterValueIDPutListener(s.id, msgCh) - // Handle received messages turning them to value fetches - go func() { - for { - select { - case <-ctx.Done(): - ch <- &Result{Err: ctx.Err(), Field: s.field} - s.Off(ch) - return - case msg, ok := <-msgCh: - if !ok { - return - } - r := &Result{Field: s.field, peer: msg.peer} - // We asked for a single field, should only get that field or it doesn't exist - if n := msg.Put[parentSoul]; n != nil && n.Values[s.field] != nil { - r.Value, r.State, r.ValueExists = n.Values[s.field], n.State[s.field], true - } - // TODO: conflict resolution and defer - // TODO: dedupe - // TODO: store and cache - safeResultSend(ch, r) - } - } - }() - // Send async, sending back errors - go func() { - for peerErr := range s.gun.Send(ctx, req) { - safeResultSend(ch, &Result{ - Err: peerErr.Err, - Field: s.field, - peer: peerErr.peer, - }) - } - }() -} - -func (s *Scoped) Off(ch <-chan *Result) bool { - s.resultChansToListenersLock.Lock() - l := s.resultChansToListeners[ch] - delete(s.resultChansToListeners, ch) - s.resultChansToListenersLock.Unlock() - if l != nil { - // Unregister the chan - s.gun.UnregisterMessageIDPutListener(l.id) - // Close the message chan and the result chan - close(l.receivedMessages) - close(l.results) - } - return l != nil + s.cachedParentSoul = string(val) + return true } func (s *Scoped) Scoped(ctx context.Context, key string, children ...string) *Scoped { @@ -217,9 +75,3 @@ func (s *Scoped) Scoped(ctx context.Context, key string, children ...string) *Sc } return ret } - -func safeResultSend(ch chan<- *Result, r *Result) { - // Due to the fact that we may send on a closed channel here, we ignore the panic - defer func() { recover() }() - ch <- r -} diff --git a/gun/scoped_fetch.go b/gun/scoped_fetch.go new file mode 100644 index 0000000..7486965 --- /dev/null +++ b/gun/scoped_fetch.go @@ -0,0 +1,169 @@ +package gun + +import ( + "context" + "fmt" +) + +func (s *Scoped) FetchOne(ctx context.Context) *FetchResult { + // Try local before remote + if r := s.FetchOneLocal(ctx); r.Err != nil || r.ValueExists { + return r + } + return s.FetchOneRemote(ctx) +} + +func (s *Scoped) FetchOneLocal(ctx context.Context) *FetchResult { + // If there is no parent, this is just the relation + if s.parent == nil { + return &FetchResult{Field: s.field, Value: ValueRelation(s.field), ValueExists: true} + } + r := &FetchResult{Field: s.field} + // Need parent soul for lookup + var parentSoul string + if parentSoul, r.Err = s.parent.Soul(ctx); r.Err == nil { + var vs *ValueWithState + if vs, r.Err = s.gun.storage.Get(ctx, parentSoul, s.field); r.Err == ErrStorageNotFound { + r.Err = nil + } else if r.Err == nil { + r.Value, r.State, r.ValueExists = vs.Value, vs.State, true + } + } + return r +} + +func (s *Scoped) FetchOneRemote(ctx context.Context) *FetchResult { + if s.parent == nil { + return &FetchResult{Err: ErrLookupOnTopLevel, Field: s.field} + } + ch := s.FetchRemote(ctx) + defer s.FetchDone(ch) + return <-ch +} + +func (s *Scoped) Fetch(ctx context.Context) <-chan *FetchResult { + ch := make(chan *FetchResult, 1) + if s.parent == nil { + ch <- &FetchResult{Err: ErrLookupOnTopLevel, Field: s.field} + close(ch) + } else { + if r := s.FetchOneLocal(ctx); r.Err != nil || r.ValueExists { + ch <- r + } + go s.fetchRemote(ctx, ch) + } + return ch +} + +func (s *Scoped) FetchRemote(ctx context.Context) <-chan *FetchResult { + ch := make(chan *FetchResult, 1) + if s.parent == nil { + ch <- &FetchResult{Err: ErrLookupOnTopLevel, Field: s.field} + close(ch) + } else { + go s.fetchRemote(ctx, ch) + } + return ch +} + +func (s *Scoped) fetchRemote(ctx context.Context, ch chan *FetchResult) { + if s.parent == nil { + panic("No parent") + } + // We have to get the parent soul first + parentSoul, err := s.parent.Soul(ctx) + if err != nil { + ch <- &FetchResult{Err: ErrLookupOnTopLevel, Field: s.field} + return + } + // Create get request + req := &Message{ + ID: randString(9), + Get: &MessageGetRequest{Soul: parentSoul, Field: s.field}, + } + // Make a chan to listen for received messages and link it to + // the given one so we can turn it "off". Off will close this + // chan. + msgCh := make(chan *MessageReceived) + s.fetchResultListenersLock.Lock() + s.fetchResultListeners[ch] = &fetchResultListener{req.ID, ch, msgCh} + s.fetchResultListenersLock.Unlock() + // Listen for responses to this get + s.gun.RegisterMessageIDListener(req.ID, msgCh) + // TODO: only for children: s.gun.RegisterValueIDListener(s.id, msgCh) + // Handle received messages turning them to value fetches + go func() { + for { + select { + case <-ctx.Done(): + ch <- &FetchResult{Err: ctx.Err(), Field: s.field} + s.FetchDone(ch) + return + case msg, ok := <-msgCh: + if !ok { + return + } + r := &FetchResult{Field: s.field, peer: msg.peer} + // We asked for a single field, should only get that field or it doesn't exist + if msg.Err != "" { + r.Err = fmt.Errorf("Remote error: %v", msg.Err) + } else if n := msg.Put[parentSoul]; n != nil && n.Values[s.field] != nil { + r.Value, r.State, r.ValueExists = n.Values[s.field], n.State[s.field], true + } + // TODO: conflict resolution and defer + // TODO: dedupe + // TODO: store and cache + safeFetchResultSend(ch, r) + } + } + }() + // Send async, sending back errors + go func() { + for peerErr := range s.gun.Send(ctx, req) { + safeFetchResultSend(ch, &FetchResult{ + Err: peerErr.Err, + Field: s.field, + peer: peerErr.peer, + }) + } + }() +} + +func (s *Scoped) FetchDone(ch <-chan *FetchResult) bool { + s.fetchResultListenersLock.Lock() + l := s.fetchResultListeners[ch] + delete(s.fetchResultListeners, ch) + s.fetchResultListenersLock.Unlock() + if l != nil { + // Unregister the chan + s.gun.UnregisterMessageIDListener(l.id) + // Close the message chan and the result chan + close(l.receivedMessages) + close(l.results) + } + return l != nil +} + +func safeFetchResultSend(ch chan<- *FetchResult, r *FetchResult) { + // Due to the fact that we may send on a closed channel here, we ignore the panic + defer func() { recover() }() + ch <- r +} + +type fetchResultListener struct { + id string + results chan *FetchResult + receivedMessages chan *MessageReceived +} + +type FetchResult struct { + // This can be a context error on cancelation + Err error + Field string + // Nil if the value doesn't exist, exists and is nil, or there's an error + Value Value + State int64 // This can be 0 for errors or top-level value relations + ValueExists bool + // Nil when local and sometimes on error + peer *gunPeer +} diff --git a/gun/scoped_put.go b/gun/scoped_put.go new file mode 100644 index 0000000..4273235 --- /dev/null +++ b/gun/scoped_put.go @@ -0,0 +1,191 @@ +package gun + +import ( + "context" + "fmt" +) + +type putResultListener struct { + id string + results chan *PutResult + receivedMessages chan *MessageReceived +} + +type PutResult struct { + Err error + + peer *gunPeer +} + +type PutOption interface{} + +type putOptionStoreLocalOnly struct{} + +func PutOptionStoreLocalOnly() PutOption { return putOptionStoreLocalOnly{} } + +type putOptionFailWithoutParent struct{} + +func PutOptionFailWithoutParent() PutOption { return putOptionFailWithoutParent{} } + +func (s *Scoped) Put(ctx context.Context, val Value, opts ...PutOption) <-chan *PutResult { + // Collect the options + storeLocalOnly := false + failWithoutParent := false + for _, opt := range opts { + switch opt.(type) { + case putOptionStoreLocalOnly: + storeLocalOnly = true + case putOptionFailWithoutParent: + failWithoutParent = true + } + } + ch := make(chan *PutResult, 1) + // Get all the parents + parents := []*Scoped{} + for next := s.parent; next != nil; next = next.parent { + parents = append([]*Scoped{next}, parents...) + } + if len(parents) == 0 { + ch <- &PutResult{Err: ErrLookupOnTopLevel} + close(ch) + return ch + } + // Ask for the soul on the last parent. What this will do is trigger + // lazy soul fetch up the chain. Then we can go through and find who doesn't have a + // cached soul, create one, and store locally. + if soul, err := parents[len(parents)-1].Soul(ctx); err != nil { + ch <- &PutResult{Err: ErrLookupOnTopLevel} + close(ch) + return ch + } else if soul == "" && failWithoutParent { + ch <- &PutResult{Err: fmt.Errorf("Parent not present but required")} + close(ch) + return ch + } + // Now for every parent that doesn't have a cached soul we create one and + // put as part of the message. We accept fetching the cache this way is a bit + // racy. + req := &Message{ + ID: randString(9), + Put: make(map[string]*Node), + } + // We know that the first has a soul + prevParentSoul := parents[0].cachedSoul() + currState := TimeNowUnixMs() + for _, parent := range parents[1:] { + parentCachedSoul := parent.cachedSoul() + if parentCachedSoul == "" { + // Create the soul and make it as part of the next put + parentCachedSoul = s.gun.soulGen() + req.Put[prevParentSoul] = &Node{ + Metadata: Metadata{ + Soul: prevParentSoul, + State: map[string]int64{parent.field: currState}, + }, + Values: map[string]Value{parent.field: ValueRelation(parentCachedSoul)}, + } + // Also store locally and set the cached soul + withState := &ValueWithState{ValueRelation(parentCachedSoul), currState} + // TODO: Should I not store until the very end just in case it errors halfway + // though? There are no standard cases where it should fail. + if ok, err := s.gun.storage.Put(ctx, prevParentSoul, parent.field, withState); err != nil { + ch <- &PutResult{Err: err} + close(ch) + return ch + } else if !ok { + ch <- &PutResult{Err: fmt.Errorf("Unexpected deferred local store")} + close(ch) + return ch + } else if !parent.setCachedSoul(ValueRelation(parentCachedSoul)) { + ch <- &PutResult{Err: fmt.Errorf("Concurrent cached soul set")} + close(ch) + return ch + } + } + prevParentSoul = parentCachedSoul + } + // Now that we've setup all the parents, we can do this store locally. + withState := &ValueWithState{val, currState} + if ok, err := s.gun.storage.Put(ctx, prevParentSoul, s.field, withState); err != nil { + ch <- &PutResult{Err: err} + close(ch) + return ch + } else if !ok { + ch <- &PutResult{Err: fmt.Errorf("Unexpected deferred local store")} + close(ch) + return ch + } + // We need an ack for local store and stop if local only + ch <- &PutResult{} + if storeLocalOnly { + close(ch) + return ch + } + // Now, we begin the remote storing + req.Put[prevParentSoul] = &Node{ + Metadata: Metadata{ + Soul: prevParentSoul, + State: map[string]int64{s.field: currState}, + }, + Values: map[string]Value{s.field: val}, + } + // Make a msg chan and register it to listen for acks + msgCh := make(chan *MessageReceived) + s.putResultListenersLock.Lock() + s.putResultListeners[ch] = &putResultListener{req.ID, ch, msgCh} + s.putResultListenersLock.Unlock() + s.gun.RegisterMessageIDListener(req.ID, msgCh) + // Start message listener + go func() { + for { + select { + case <-ctx.Done(): + ch <- &PutResult{Err: ctx.Err()} + s.PutDone(ch) + return + case msg, ok := <-msgCh: + if !ok { + return + } + r := &PutResult{peer: msg.peer} + if msg.Err != "" { + r.Err = fmt.Errorf("Remote error: %v", msg.Err) + } else if msg.OK != 1 { + r.Err = fmt.Errorf("Unexpected remote ok value of %v", msg.OK) + } + safePutResultSend(ch, r) + } + } + }() + // Send async, sending back errors + go func() { + for peerErr := range s.gun.Send(ctx, req) { + safePutResultSend(ch, &PutResult{ + Err: peerErr.Err, + peer: peerErr.peer, + }) + } + }() + return ch +} + +func (s *Scoped) PutDone(ch <-chan *PutResult) bool { + s.putResultListenersLock.Lock() + l := s.putResultListeners[ch] + delete(s.putResultListeners, ch) + s.putResultListenersLock.Unlock() + if l != nil { + // Unregister the chan + s.gun.UnregisterMessageIDListener(l.id) + // Close the message chan and the result chan + close(l.receivedMessages) + close(l.results) + } + return l != nil +} + +func safePutResultSend(ch chan<- *PutResult, r *PutResult) { + // Due to the fact that we may send on a closed channel here, we ignore the panic + defer func() { recover() }() + ch <- r +} diff --git a/gun/storage.go b/gun/storage.go index 96ff852..425aa20 100644 --- a/gun/storage.go +++ b/gun/storage.go @@ -10,6 +10,7 @@ var ErrStorageNotFound = errors.New("Not found") type Storage interface { Get(ctx context.Context, parentSoul, field string) (*ValueWithState, error) + // If bool is false, it's deferred Put(ctx context.Context, parentSoul, field string, val *ValueWithState) (bool, error) Tracking(ctx context.Context, parentSoul, field string) (bool, error) } diff --git a/gun/tests/gun_test.go b/gun/tests/gun_test.go index 9cd6ff5..621adfe 100644 --- a/gun/tests/gun_test.go +++ b/gun/tests/gun_test.go @@ -1,6 +1,7 @@ package tests import ( + "strings" "testing" "github.com/cretz/esgopeta/gun" @@ -26,13 +27,37 @@ func TestGunGetSimple(t *testing.T) { g := ctx.newGunConnectedToGunJS() defer g.Close() // Make sure we got back the same value - f := g.Scoped(ctx, "esgopeta-test", "TestGunGetSimple", "some-key").Val(ctx) - ctx.Require.NoError(f.Err) - ctx.Require.Equal(gun.ValueString(randStr), f.Value.(gun.ValueString)) + r := g.Scoped(ctx, "esgopeta-test", "TestGunGetSimple", "some-key").FetchOne(ctx) + ctx.Require.NoError(r.Err) + ctx.Require.Equal(gun.ValueString(randStr), r.Value.(gun.ValueString)) // // Do it again TODO: make sure there are no network calls, it's all from mem // ctx.debugf("Asking for key again") - // f = g.Scoped(ctx, "esgopeta-test", "TestGunGetSimple", "some-key").Val(ctx) + // f = g.Scoped(ctx, "esgopeta-test", "TestGunGetSimple", "some-key").FetchOne(ctx) // ctx.Require.NoError(f.Err) - // ctx.Require.Equal(gun.ValueString(randStr), f.Value.Value.(gun.ValueString)) - + // ctx.Require.Equal(gun.ValueString(randStr), f.Value.(gun.ValueString)) +} + +func TestGunPutSimple(t *testing.T) { + ctx, cancelFn := newContext(t) + defer cancelFn() + ctx.startGunJSServer() + randStr := randString(30) + // Put + g := ctx.newGunConnectedToGunJS() + defer g.Close() + // Just wait for two acks (one local, one remote) + ch := g.Scoped(ctx, "esgopeta-test", "TestGunPutSimple", "some-key").Put(ctx, gun.ValueString(randStr)) + // TODO: test local is null peer and remote is non-null + r := <-ch + ctx.Require.NoError(r.Err) + r = <-ch + ctx.Require.NoError(r.Err) + // Get from JS + out := ctx.runJSWithGun(` + gun.get('esgopeta-test').get('TestGunPutSimple').get('some-key').once(data => { + console.log(data) + process.exit(0) + }) + `) + ctx.Require.Equal(randStr, strings.TrimSpace(string(out))) }