Some rename and prep for puts

This commit is contained in:
Chad Retz 2019-02-22 15:40:55 -06:00
parent a1c1ff95ff
commit 578e8917ae
4 changed files with 89 additions and 56 deletions

View File

@ -14,6 +14,7 @@ type Gun struct {
peerErrorHandler func(*ErrPeer) peerErrorHandler func(*ErrPeer)
peerSleepOnError time.Duration peerSleepOnError time.Duration
myPeerID string myPeerID string
tracking Tracking
messageIDPutListeners map[string]chan<- *MessageReceived messageIDPutListeners map[string]chan<- *MessageReceived
messageIDPutListenersLock sync.RWMutex messageIDPutListenersLock sync.RWMutex
@ -26,8 +27,17 @@ type Config struct {
PeerErrorHandler func(*ErrPeer) PeerErrorHandler func(*ErrPeer)
PeerSleepOnError time.Duration PeerSleepOnError time.Duration
MyPeerID string MyPeerID string
Tracking Tracking
} }
type Tracking int
const (
TrackingRequested Tracking = iota
TrackingNothing
TrackingEverything
)
const DefaultPeerSleepOnError = 30 * time.Second const DefaultPeerSleepOnError = 30 * time.Second
func New(ctx context.Context, config Config) (*Gun, error) { func New(ctx context.Context, config Config) (*Gun, error) {
@ -38,6 +48,7 @@ func New(ctx context.Context, config Config) (*Gun, error) {
peerErrorHandler: config.PeerErrorHandler, peerErrorHandler: config.PeerErrorHandler,
peerSleepOnError: config.PeerSleepOnError, peerSleepOnError: config.PeerSleepOnError,
myPeerID: config.MyPeerID, myPeerID: config.MyPeerID,
tracking: config.Tracking,
messageIDPutListeners: map[string]chan<- *MessageReceived{}, messageIDPutListeners: map[string]chan<- *MessageReceived{},
} }
// Create all the peers // Create all the peers

View File

@ -14,13 +14,13 @@ type Scoped struct {
cachedParentSoul string cachedParentSoul string
cachedParentSoulLock sync.RWMutex cachedParentSoulLock sync.RWMutex
valueChansToListeners map[<-chan *ValueFetch]*messageIDListener resultChansToListeners map[<-chan *Result]*messageIDListener
valueChansToListenersLock sync.Mutex resultChansToListenersLock sync.Mutex
} }
type messageIDListener struct { type messageIDListener struct {
id string id string
values chan *ValueFetch results chan *Result
receivedMessages chan *MessageReceived receivedMessages chan *MessageReceived
} }
@ -29,16 +29,18 @@ func newScoped(gun *Gun, parent *Scoped, field string) *Scoped {
gun: gun, gun: gun,
parent: parent, parent: parent,
field: field, field: field,
valueChansToListeners: map[<-chan *ValueFetch]*messageIDListener{}, resultChansToListeners: map[<-chan *Result]*messageIDListener{},
} }
} }
type ValueFetch struct { type Result struct {
// This can be a context error on cancelation // This can be a context error on cancelation
Err error Err error
Field string Field string
// Nil if the value doesn't exist or there's an error // Nil if the value doesn't exist, exists and is nil, or there's an error
Value *ValueWithState Value Value
State int64 // This can be 0 for errors or top-level value relations
ValueExists bool
// Nil when local and sometimes on error // Nil when local and sometimes on error
peer *gunPeer peer *gunPeer
} }
@ -53,11 +55,11 @@ func (s *Scoped) Soul(ctx context.Context) (string, error) {
s.cachedParentSoulLock.RUnlock() s.cachedParentSoulLock.RUnlock()
if cachedParentSoul != "" { if cachedParentSoul != "" {
return cachedParentSoul, nil return cachedParentSoul, nil
} else if v := s.Val(ctx); v.Err != nil { } else if r := s.Val(ctx); r.Err != nil {
return "", v.Err return "", r.Err
} else if v.Value == nil { } else if !r.ValueExists {
return "", nil return "", nil
} else if rel, ok := v.Value.Value.(ValueRelation); !ok { } else if rel, ok := r.Value.(ValueRelation); !ok {
return "", ErrNotObject return "", ErrNotObject
} else { } else {
s.cachedParentSoulLock.Lock() s.cachedParentSoulLock.Lock()
@ -67,70 +69,79 @@ func (s *Scoped) Soul(ctx context.Context) (string, error) {
} }
} }
func (s *Scoped) Val(ctx context.Context) *ValueFetch { func (s *Scoped) Put(ctx context.Context, val Value) <-chan *Result {
panic("TODO")
}
func (s *Scoped) Val(ctx context.Context) *Result {
// Try local before remote // Try local before remote
if v := s.ValLocal(ctx); v != nil { if r := s.ValLocal(ctx); r.Err != nil || r.ValueExists {
return v return r
} }
return s.ValRemote(ctx) return s.ValRemote(ctx)
} }
func (s *Scoped) ValLocal(ctx context.Context) *ValueFetch { func (s *Scoped) ValLocal(ctx context.Context) *Result {
// If there is no parent, this is just the relation // If there is no parent, this is just the relation
if s.parent == nil { if s.parent == nil {
return &ValueFetch{Field: s.field, Value: &ValueWithState{Value: ValueRelation(s.field)}} return &Result{Field: s.field, Value: ValueRelation(s.field), ValueExists: true}
} }
v := &ValueFetch{Field: s.field} r := &Result{Field: s.field}
// Need parent soul for lookup // Need parent soul for lookup
var parentSoul string var parentSoul string
if parentSoul, v.Err = s.parent.Soul(ctx); v.Err == nil { if parentSoul, r.Err = s.parent.Soul(ctx); r.Err == nil {
if v.Value, v.Err = s.gun.storage.Get(ctx, parentSoul, s.field); v.Err == ErrStorageNotFound { var vs *ValueWithState
return nil if vs, r.Err = s.gun.storage.Get(ctx, parentSoul, s.field); r.Err == ErrStorageNotFound {
r.Err = nil
} else if r.Err == nil {
r.Value, r.State, r.ValueExists = vs.Value, vs.State, true
} }
} }
return v return r
} }
func (s *Scoped) ValRemote(ctx context.Context) *ValueFetch { func (s *Scoped) ValRemote(ctx context.Context) *Result {
if s.parent == nil { if s.parent == nil {
return &ValueFetch{Err: ErrLookupOnTopLevel, Field: s.field} return &Result{Err: ErrLookupOnTopLevel, Field: s.field}
} }
ch := s.OnRemote(ctx) ch := s.OnRemote(ctx)
defer s.Off(ch) defer s.Off(ch)
return <-ch return <-ch
} }
func (s *Scoped) On(ctx context.Context) <-chan *ValueFetch { func (s *Scoped) On(ctx context.Context) <-chan *Result {
ch := make(chan *ValueFetch, 1) ch := make(chan *Result, 1)
if s.parent == nil { if s.parent == nil {
ch <- &ValueFetch{Err: ErrLookupOnTopLevel, Field: s.field} ch <- &Result{Err: ErrLookupOnTopLevel, Field: s.field}
close(ch)
} else { } else {
if v := s.ValLocal(ctx); v != nil { if r := s.ValLocal(ctx); r.Err != nil || r.ValueExists {
ch <- v ch <- r
} }
go s.onRemote(ctx, ch) go s.onRemote(ctx, ch)
} }
return ch return ch
} }
func (s *Scoped) OnRemote(ctx context.Context) <-chan *ValueFetch { func (s *Scoped) OnRemote(ctx context.Context) <-chan *Result {
ch := make(chan *ValueFetch, 1) ch := make(chan *Result, 1)
if s.parent == nil { if s.parent == nil {
ch <- &ValueFetch{Err: ErrLookupOnTopLevel, Field: s.field} ch <- &Result{Err: ErrLookupOnTopLevel, Field: s.field}
close(ch)
} else { } else {
go s.onRemote(ctx, ch) go s.onRemote(ctx, ch)
} }
return ch return ch
} }
func (s *Scoped) onRemote(ctx context.Context, ch chan *ValueFetch) { func (s *Scoped) onRemote(ctx context.Context, ch chan *Result) {
if s.parent == nil { if s.parent == nil {
panic("No parent") panic("No parent")
} }
// We have to get the parent soul first // We have to get the parent soul first
parentSoul, err := s.parent.Soul(ctx) parentSoul, err := s.parent.Soul(ctx)
if err != nil { if err != nil {
ch <- &ValueFetch{Err: ErrLookupOnTopLevel, Field: s.field} ch <- &Result{Err: ErrLookupOnTopLevel, Field: s.field}
return return
} }
// Create get request // Create get request
@ -142,9 +153,9 @@ func (s *Scoped) onRemote(ctx context.Context, ch chan *ValueFetch) {
// the given one so we can turn it "off". Off will close this // the given one so we can turn it "off". Off will close this
// chan. // chan.
msgCh := make(chan *MessageReceived) msgCh := make(chan *MessageReceived)
s.valueChansToListenersLock.Lock() s.resultChansToListenersLock.Lock()
s.valueChansToListeners[ch] = &messageIDListener{req.ID, ch, msgCh} s.resultChansToListeners[ch] = &messageIDListener{req.ID, ch, msgCh}
s.valueChansToListenersLock.Unlock() s.resultChansToListenersLock.Unlock()
// Listen for responses to this get // Listen for responses to this get
s.gun.RegisterMessageIDPutListener(req.ID, msgCh) s.gun.RegisterMessageIDPutListener(req.ID, msgCh)
// TODO: only for children: s.gun.RegisterValueIDPutListener(s.id, msgCh) // TODO: only for children: s.gun.RegisterValueIDPutListener(s.id, msgCh)
@ -153,29 +164,29 @@ func (s *Scoped) onRemote(ctx context.Context, ch chan *ValueFetch) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
ch <- &ValueFetch{Err: ctx.Err(), Field: s.field} ch <- &Result{Err: ctx.Err(), Field: s.field}
s.Off(ch) s.Off(ch)
return return
case msg, ok := <-msgCh: case msg, ok := <-msgCh:
if !ok { if !ok {
return return
} }
f := &ValueFetch{Field: s.field, peer: msg.peer} r := &Result{Field: s.field, peer: msg.peer}
// We asked for a single field, should only get that field or it doesn't exist // We asked for a single field, should only get that field or it doesn't exist
if n := msg.Put[parentSoul]; n != nil && n.Values[s.field] != nil { if n := msg.Put[parentSoul]; n != nil && n.Values[s.field] != nil {
f.Value = &ValueWithState{n.Values[s.field], n.State[s.field]} r.Value, r.State, r.ValueExists = n.Values[s.field], n.State[s.field], true
} }
// TODO: conflict resolution and defer // TODO: conflict resolution and defer
// TODO: dedupe // TODO: dedupe
// TODO: store and cache // TODO: store and cache
safeValueFetchSend(ch, f) safeResultSend(ch, r)
} }
} }
}() }()
// Send async, sending back errors // Send async, sending back errors
go func() { go func() {
for peerErr := range s.gun.Send(ctx, req) { for peerErr := range s.gun.Send(ctx, req) {
safeValueFetchSend(ch, &ValueFetch{ safeResultSend(ch, &Result{
Err: peerErr.Err, Err: peerErr.Err,
Field: s.field, Field: s.field,
peer: peerErr.peer, peer: peerErr.peer,
@ -184,17 +195,17 @@ func (s *Scoped) onRemote(ctx context.Context, ch chan *ValueFetch) {
}() }()
} }
func (s *Scoped) Off(ch <-chan *ValueFetch) bool { func (s *Scoped) Off(ch <-chan *Result) bool {
s.valueChansToListenersLock.Lock() s.resultChansToListenersLock.Lock()
l := s.valueChansToListeners[ch] l := s.resultChansToListeners[ch]
delete(s.valueChansToListeners, ch) delete(s.resultChansToListeners, ch)
s.valueChansToListenersLock.Unlock() s.resultChansToListenersLock.Unlock()
if l != nil { if l != nil {
// Unregister the chan // Unregister the chan
s.gun.UnregisterMessageIDPutListener(l.id) s.gun.UnregisterMessageIDPutListener(l.id)
// Close the message chan and the value chan // Close the message chan and the result chan
close(l.receivedMessages) close(l.receivedMessages)
close(l.values) close(l.results)
} }
return l != nil return l != nil
} }
@ -207,8 +218,8 @@ func (s *Scoped) Scoped(ctx context.Context, key string, children ...string) *Sc
return ret return ret
} }
func safeValueFetchSend(ch chan<- *ValueFetch, f *ValueFetch) { func safeResultSend(ch chan<- *Result, r *Result) {
// Due to the fact that we may send on a closed channel here, we ignore the panic // Due to the fact that we may send on a closed channel here, we ignore the panic
defer func() { recover() }() defer func() { recover() }()
ch <- f ch <- r
} }

View File

@ -11,7 +11,7 @@ var ErrStorageNotFound = errors.New("Not found")
type Storage interface { type Storage interface {
Get(ctx context.Context, parentSoul, field string) (*ValueWithState, error) Get(ctx context.Context, parentSoul, field string) (*ValueWithState, error)
Put(ctx context.Context, parentSoul, field string, val *ValueWithState) (bool, error) Put(ctx context.Context, parentSoul, field string, val *ValueWithState) (bool, error)
// Tracking(ctx context.Context, id string) (bool, error) Tracking(ctx context.Context, parentSoul, field string) (bool, error)
} }
type StorageInMem struct { type StorageInMem struct {
@ -33,3 +33,8 @@ func (s *StorageInMem) Put(ctx context.Context, parentSoul, field string, val *V
// TODO: conflict resolution state check? // TODO: conflict resolution state check?
return true, nil return true, nil
} }
func (s *StorageInMem) Tracking(ctx context.Context, parentSoul, field string) (bool, error) {
_, ok := s.values.Load(parentSoulAndField{parentSoul, field})
return ok, nil
}

View File

@ -24,9 +24,15 @@ func TestGunGetSimple(t *testing.T) {
`) `)
// Get // Get
g := ctx.newGunConnectedToGunJS() g := ctx.newGunConnectedToGunJS()
defer g.Close()
// Make sure we got back the same value
f := g.Scoped(ctx, "esgopeta-test", "TestGunGetSimple", "some-key").Val(ctx) f := g.Scoped(ctx, "esgopeta-test", "TestGunGetSimple", "some-key").Val(ctx)
ctx.Require.NoError(f.Err) ctx.Require.NoError(f.Err)
// Make sure we got back the same value ctx.Require.Equal(gun.ValueString(randStr), f.Value.(gun.ValueString))
ctx.Require.Equal(gun.ValueString(randStr), f.Value.Value.(gun.ValueString)) // // Do it again TODO: make sure there are no network calls, it's all from mem
// ctx.debugf("Asking for key again")
// f = g.Scoped(ctx, "esgopeta-test", "TestGunGetSimple", "some-key").Val(ctx)
// ctx.Require.NoError(f.Err)
// ctx.Require.Equal(gun.ValueString(randStr), f.Value.Value.(gun.ValueString))
} }