From 578e8917ae518958bba505d9258c663ed734fd9d Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Fri, 22 Feb 2019 15:40:55 -0600 Subject: [PATCH] Some rename and prep for puts --- gun/gun.go | 11 ++++ gun/scoped.go | 117 +++++++++++++++++++++++------------------- gun/storage.go | 7 ++- gun/tests/gun_test.go | 10 +++- 4 files changed, 89 insertions(+), 56 deletions(-) diff --git a/gun/gun.go b/gun/gun.go index a368179..eeb8547 100644 --- a/gun/gun.go +++ b/gun/gun.go @@ -14,6 +14,7 @@ type Gun struct { peerErrorHandler func(*ErrPeer) peerSleepOnError time.Duration myPeerID string + tracking Tracking messageIDPutListeners map[string]chan<- *MessageReceived messageIDPutListenersLock sync.RWMutex @@ -26,8 +27,17 @@ type Config struct { PeerErrorHandler func(*ErrPeer) PeerSleepOnError time.Duration MyPeerID string + Tracking Tracking } +type Tracking int + +const ( + TrackingRequested Tracking = iota + TrackingNothing + TrackingEverything +) + const DefaultPeerSleepOnError = 30 * time.Second func New(ctx context.Context, config Config) (*Gun, error) { @@ -38,6 +48,7 @@ func New(ctx context.Context, config Config) (*Gun, error) { peerErrorHandler: config.PeerErrorHandler, peerSleepOnError: config.PeerSleepOnError, myPeerID: config.MyPeerID, + tracking: config.Tracking, messageIDPutListeners: map[string]chan<- *MessageReceived{}, } // Create all the peers diff --git a/gun/scoped.go b/gun/scoped.go index 0479a3f..5f9829c 100644 --- a/gun/scoped.go +++ b/gun/scoped.go @@ -14,31 +14,33 @@ type Scoped struct { cachedParentSoul string cachedParentSoulLock sync.RWMutex - valueChansToListeners map[<-chan *ValueFetch]*messageIDListener - valueChansToListenersLock sync.Mutex + resultChansToListeners map[<-chan *Result]*messageIDListener + resultChansToListenersLock sync.Mutex } type messageIDListener struct { id string - values chan *ValueFetch + results chan *Result receivedMessages chan *MessageReceived } func newScoped(gun *Gun, parent *Scoped, field string) *Scoped { return &Scoped{ - gun: gun, - parent: parent, - field: field, - valueChansToListeners: map[<-chan *ValueFetch]*messageIDListener{}, + gun: gun, + parent: parent, + field: field, + resultChansToListeners: map[<-chan *Result]*messageIDListener{}, } } -type ValueFetch struct { +type Result struct { // This can be a context error on cancelation Err error Field string - // Nil if the value doesn't exist or there's an error - Value *ValueWithState + // Nil if the value doesn't exist, exists and is nil, or there's an error + Value Value + State int64 // This can be 0 for errors or top-level value relations + ValueExists bool // Nil when local and sometimes on error peer *gunPeer } @@ -53,11 +55,11 @@ func (s *Scoped) Soul(ctx context.Context) (string, error) { s.cachedParentSoulLock.RUnlock() if cachedParentSoul != "" { return cachedParentSoul, nil - } else if v := s.Val(ctx); v.Err != nil { - return "", v.Err - } else if v.Value == nil { + } else if r := s.Val(ctx); r.Err != nil { + return "", r.Err + } else if !r.ValueExists { return "", nil - } else if rel, ok := v.Value.Value.(ValueRelation); !ok { + } else if rel, ok := r.Value.(ValueRelation); !ok { return "", ErrNotObject } else { s.cachedParentSoulLock.Lock() @@ -67,70 +69,79 @@ func (s *Scoped) Soul(ctx context.Context) (string, error) { } } -func (s *Scoped) Val(ctx context.Context) *ValueFetch { +func (s *Scoped) Put(ctx context.Context, val Value) <-chan *Result { + panic("TODO") +} + +func (s *Scoped) Val(ctx context.Context) *Result { // Try local before remote - if v := s.ValLocal(ctx); v != nil { - return v + if r := s.ValLocal(ctx); r.Err != nil || r.ValueExists { + return r } return s.ValRemote(ctx) } -func (s *Scoped) ValLocal(ctx context.Context) *ValueFetch { +func (s *Scoped) ValLocal(ctx context.Context) *Result { // If there is no parent, this is just the relation if s.parent == nil { - return &ValueFetch{Field: s.field, Value: &ValueWithState{Value: ValueRelation(s.field)}} + return &Result{Field: s.field, Value: ValueRelation(s.field), ValueExists: true} } - v := &ValueFetch{Field: s.field} + r := &Result{Field: s.field} // Need parent soul for lookup var parentSoul string - if parentSoul, v.Err = s.parent.Soul(ctx); v.Err == nil { - if v.Value, v.Err = s.gun.storage.Get(ctx, parentSoul, s.field); v.Err == ErrStorageNotFound { - return nil + if parentSoul, r.Err = s.parent.Soul(ctx); r.Err == nil { + var vs *ValueWithState + if vs, r.Err = s.gun.storage.Get(ctx, parentSoul, s.field); r.Err == ErrStorageNotFound { + r.Err = nil + } else if r.Err == nil { + r.Value, r.State, r.ValueExists = vs.Value, vs.State, true } } - return v + return r } -func (s *Scoped) ValRemote(ctx context.Context) *ValueFetch { +func (s *Scoped) ValRemote(ctx context.Context) *Result { if s.parent == nil { - return &ValueFetch{Err: ErrLookupOnTopLevel, Field: s.field} + return &Result{Err: ErrLookupOnTopLevel, Field: s.field} } ch := s.OnRemote(ctx) defer s.Off(ch) return <-ch } -func (s *Scoped) On(ctx context.Context) <-chan *ValueFetch { - ch := make(chan *ValueFetch, 1) +func (s *Scoped) On(ctx context.Context) <-chan *Result { + ch := make(chan *Result, 1) if s.parent == nil { - ch <- &ValueFetch{Err: ErrLookupOnTopLevel, Field: s.field} + ch <- &Result{Err: ErrLookupOnTopLevel, Field: s.field} + close(ch) } else { - if v := s.ValLocal(ctx); v != nil { - ch <- v + if r := s.ValLocal(ctx); r.Err != nil || r.ValueExists { + ch <- r } go s.onRemote(ctx, ch) } return ch } -func (s *Scoped) OnRemote(ctx context.Context) <-chan *ValueFetch { - ch := make(chan *ValueFetch, 1) +func (s *Scoped) OnRemote(ctx context.Context) <-chan *Result { + ch := make(chan *Result, 1) if s.parent == nil { - ch <- &ValueFetch{Err: ErrLookupOnTopLevel, Field: s.field} + ch <- &Result{Err: ErrLookupOnTopLevel, Field: s.field} + close(ch) } else { go s.onRemote(ctx, ch) } return ch } -func (s *Scoped) onRemote(ctx context.Context, ch chan *ValueFetch) { +func (s *Scoped) onRemote(ctx context.Context, ch chan *Result) { if s.parent == nil { panic("No parent") } // We have to get the parent soul first parentSoul, err := s.parent.Soul(ctx) if err != nil { - ch <- &ValueFetch{Err: ErrLookupOnTopLevel, Field: s.field} + ch <- &Result{Err: ErrLookupOnTopLevel, Field: s.field} return } // Create get request @@ -142,9 +153,9 @@ func (s *Scoped) onRemote(ctx context.Context, ch chan *ValueFetch) { // the given one so we can turn it "off". Off will close this // chan. msgCh := make(chan *MessageReceived) - s.valueChansToListenersLock.Lock() - s.valueChansToListeners[ch] = &messageIDListener{req.ID, ch, msgCh} - s.valueChansToListenersLock.Unlock() + s.resultChansToListenersLock.Lock() + s.resultChansToListeners[ch] = &messageIDListener{req.ID, ch, msgCh} + s.resultChansToListenersLock.Unlock() // Listen for responses to this get s.gun.RegisterMessageIDPutListener(req.ID, msgCh) // TODO: only for children: s.gun.RegisterValueIDPutListener(s.id, msgCh) @@ -153,29 +164,29 @@ func (s *Scoped) onRemote(ctx context.Context, ch chan *ValueFetch) { for { select { case <-ctx.Done(): - ch <- &ValueFetch{Err: ctx.Err(), Field: s.field} + ch <- &Result{Err: ctx.Err(), Field: s.field} s.Off(ch) return case msg, ok := <-msgCh: if !ok { return } - f := &ValueFetch{Field: s.field, peer: msg.peer} + r := &Result{Field: s.field, peer: msg.peer} // We asked for a single field, should only get that field or it doesn't exist if n := msg.Put[parentSoul]; n != nil && n.Values[s.field] != nil { - f.Value = &ValueWithState{n.Values[s.field], n.State[s.field]} + r.Value, r.State, r.ValueExists = n.Values[s.field], n.State[s.field], true } // TODO: conflict resolution and defer // TODO: dedupe // TODO: store and cache - safeValueFetchSend(ch, f) + safeResultSend(ch, r) } } }() // Send async, sending back errors go func() { for peerErr := range s.gun.Send(ctx, req) { - safeValueFetchSend(ch, &ValueFetch{ + safeResultSend(ch, &Result{ Err: peerErr.Err, Field: s.field, peer: peerErr.peer, @@ -184,17 +195,17 @@ func (s *Scoped) onRemote(ctx context.Context, ch chan *ValueFetch) { }() } -func (s *Scoped) Off(ch <-chan *ValueFetch) bool { - s.valueChansToListenersLock.Lock() - l := s.valueChansToListeners[ch] - delete(s.valueChansToListeners, ch) - s.valueChansToListenersLock.Unlock() +func (s *Scoped) Off(ch <-chan *Result) bool { + s.resultChansToListenersLock.Lock() + l := s.resultChansToListeners[ch] + delete(s.resultChansToListeners, ch) + s.resultChansToListenersLock.Unlock() if l != nil { // Unregister the chan s.gun.UnregisterMessageIDPutListener(l.id) - // Close the message chan and the value chan + // Close the message chan and the result chan close(l.receivedMessages) - close(l.values) + close(l.results) } return l != nil } @@ -207,8 +218,8 @@ func (s *Scoped) Scoped(ctx context.Context, key string, children ...string) *Sc return ret } -func safeValueFetchSend(ch chan<- *ValueFetch, f *ValueFetch) { +func safeResultSend(ch chan<- *Result, r *Result) { // Due to the fact that we may send on a closed channel here, we ignore the panic defer func() { recover() }() - ch <- f + ch <- r } diff --git a/gun/storage.go b/gun/storage.go index c47375c..96ff852 100644 --- a/gun/storage.go +++ b/gun/storage.go @@ -11,7 +11,7 @@ var ErrStorageNotFound = errors.New("Not found") type Storage interface { Get(ctx context.Context, parentSoul, field string) (*ValueWithState, error) Put(ctx context.Context, parentSoul, field string, val *ValueWithState) (bool, error) - // Tracking(ctx context.Context, id string) (bool, error) + Tracking(ctx context.Context, parentSoul, field string) (bool, error) } type StorageInMem struct { @@ -33,3 +33,8 @@ func (s *StorageInMem) Put(ctx context.Context, parentSoul, field string, val *V // TODO: conflict resolution state check? return true, nil } + +func (s *StorageInMem) Tracking(ctx context.Context, parentSoul, field string) (bool, error) { + _, ok := s.values.Load(parentSoulAndField{parentSoul, field}) + return ok, nil +} diff --git a/gun/tests/gun_test.go b/gun/tests/gun_test.go index 0d2d897..9cd6ff5 100644 --- a/gun/tests/gun_test.go +++ b/gun/tests/gun_test.go @@ -24,9 +24,15 @@ func TestGunGetSimple(t *testing.T) { `) // Get g := ctx.newGunConnectedToGunJS() + defer g.Close() + // Make sure we got back the same value f := g.Scoped(ctx, "esgopeta-test", "TestGunGetSimple", "some-key").Val(ctx) ctx.Require.NoError(f.Err) - // Make sure we got back the same value - ctx.Require.Equal(gun.ValueString(randStr), f.Value.Value.(gun.ValueString)) + ctx.Require.Equal(gun.ValueString(randStr), f.Value.(gun.ValueString)) + // // Do it again TODO: make sure there are no network calls, it's all from mem + // ctx.debugf("Asking for key again") + // f = g.Scoped(ctx, "esgopeta-test", "TestGunGetSimple", "some-key").Val(ctx) + // ctx.Require.NoError(f.Err) + // ctx.Require.Equal(gun.ValueString(randStr), f.Value.Value.(gun.ValueString)) }