diff --git a/gun/gun.go b/gun/gun.go index a85a0d7..3c97a53 100644 --- a/gun/gun.go +++ b/gun/gun.go @@ -39,6 +39,7 @@ const ( ) const DefaultPeerSleepOnError = 30 * time.Second +const DefaultOldestAllowedStorageValue = 7 * (60 * time.Minute) func New(ctx context.Context, config Config) (*Gun, error) { g := &Gun{ @@ -75,7 +76,7 @@ func New(ctx context.Context, config Config) (*Gun, error) { } // Set defaults if g.storage == nil { - g.storage = &StorageInMem{} + g.storage = NewStorageInMem(DefaultOldestAllowedStorageValue) } if g.soulGen == nil { g.soulGen = DefaultSoulGen @@ -103,6 +104,9 @@ func (g *Gun) Close() error { errs = append(errs, err) } } + if err := g.storage.Close(); err != nil { + errs = append(errs, err) + } if len(errs) == 0 { return nil } else if len(errs) == 1 { @@ -165,6 +169,17 @@ func (g *Gun) startReceiving() { } func (g *Gun) onPeerMessage(ctx context.Context, msg *MessageReceived) { + // If we're tracking everything, persist all puts here. + if g.tracking == TrackingEverything { + for parentSoul, node := range msg.Put { + for field, value := range node.Values { + if state, ok := node.Metadata.State[field]; ok { + // TODO: warn on error or something + g.storage.Put(ctx, parentSoul, field, value, state, false) + } + } + } + } // If there is a listener for this message, use it if msg.Ack != "" { g.messageIDListenersLock.RLock() diff --git a/gun/node.go b/gun/node.go index e7e8b21..60e675f 100644 --- a/gun/node.go +++ b/gun/node.go @@ -122,9 +122,3 @@ func (ValueRelation) nodeValue() {} func (n ValueRelation) MarshalJSON() ([]byte, error) { return json.Marshal(map[string]string{"#": string(n)}) } - -// type ValueWithState struct { -// Value Value -// // This is 0 for top-level values -// State int64 -// } diff --git a/gun/scoped_fetch.go b/gun/scoped_fetch.go index 2f5d0e3..90f7275 100644 --- a/gun/scoped_fetch.go +++ b/gun/scoped_fetch.go @@ -89,8 +89,10 @@ func (s *Scoped) fetchRemote(ctx context.Context, ch chan *FetchResult) { s.fetchResultListenersLock.Unlock() // Listen for responses to this get s.gun.registerMessageIDListener(req.ID, msgCh) - // TODO: only for children: s.gun.RegisterValueIDListener(s.id, msgCh) + // TODO: Also listen for any changes to the value or just for specific requests? // Handle received messages turning them to value fetches + var lastSeenValue Value + var lastSeenState State go func() { for { select { @@ -106,12 +108,28 @@ func (s *Scoped) fetchRemote(ctx context.Context, ch chan *FetchResult) { // We asked for a single field, should only get that field or it doesn't exist if msg.Err != "" { r.Err = fmt.Errorf("Remote error: %v", msg.Err) - } else if n := msg.Put[parentSoul]; n != nil && n.Values[s.field] != nil { - r.Value, r.State, r.ValueExists = n.Values[s.field], n.State[s.field], true + } else if n := msg.Put[parentSoul]; n != nil { + if newVal, ok := n.Values[s.field]; ok { + newState := n.State[s.field] + // Dedupe the value + if lastSeenValue == newVal && lastSeenState == newState { + continue + } + // If we're storing only what we requested (we do "everything" at a higher level), do it here + // and only send result if it was an update. Otherwise only do it if we would have done one. + confRes := ConflictResolutionNeverSeenUpdate + if s.gun.tracking == TrackingRequested { + confRes, r.Err = s.gun.storage.Put(ctx, parentSoul, s.field, newVal, newState, false) + } else if lastSeenState > 0 { + confRes = ConflictResolve(lastSeenValue, lastSeenState, newVal, newState, StateNow()) + } + // If there are no errors and it was an update, update the last seen and set the response vals + if r.Err == nil && confRes.IsImmediateUpdate() { + lastSeenValue, lastSeenState = newVal, newState + r.Value, r.State, r.ValueExists = newVal, newState, true + } + } } - // TODO: conflict resolution and defer - // TODO: dedupe - // TODO: store and cache safeFetchResultSend(ch, r) } } diff --git a/gun/scoped_put.go b/gun/scoped_put.go index 8fad62e..2b3749d 100644 --- a/gun/scoped_put.go +++ b/gun/scoped_put.go @@ -87,14 +87,10 @@ func (s *Scoped) Put(ctx context.Context, val Value, opts ...PutOption) <-chan * // Also store locally and set the cached soul // TODO: Should I not store until the very end just in case it errors halfway // though? There are no standard cases where it should fail. - if ok, err := s.gun.storage.Put(ctx, prevParentSoul, parent.field, ValueRelation(parentCachedSoul), currState); err != nil { + if _, err := s.gun.storage.Put(ctx, prevParentSoul, parent.field, ValueRelation(parentCachedSoul), currState, false); err != nil { ch <- &PutResult{Err: err} close(ch) return ch - } else if !ok { - ch <- &PutResult{Err: fmt.Errorf("Unexpected deferred local store")} - close(ch) - return ch } else if !parent.setCachedSoul(ValueRelation(parentCachedSoul)) { ch <- &PutResult{Err: fmt.Errorf("Concurrent cached soul set")} close(ch) @@ -104,14 +100,10 @@ func (s *Scoped) Put(ctx context.Context, val Value, opts ...PutOption) <-chan * prevParentSoul = parentCachedSoul } // Now that we've setup all the parents, we can do this store locally - if ok, err := s.gun.storage.Put(ctx, prevParentSoul, s.field, val, currState); err != nil { + if _, err := s.gun.storage.Put(ctx, prevParentSoul, s.field, val, currState, false); err != nil { ch <- &PutResult{Err: err} close(ch) return ch - } else if !ok { - ch <- &PutResult{Err: fmt.Errorf("Unexpected deferred local store")} - close(ch) - return ch } // We need an ack for local store and stop if local only ch <- &PutResult{} diff --git a/gun/state.go b/gun/state.go index 96d50ad..f793a1d 100644 --- a/gun/state.go +++ b/gun/state.go @@ -1,7 +1,8 @@ package gun import ( - "sync/atomic" + "bytes" + "encoding/json" "time" ) @@ -9,42 +10,40 @@ type State uint64 func StateNow() State { return State(timeNowUnixMs()) } -// timeFromUnixMs returns zero'd time if ms is 0 -func timeFromUnixMs(ms int64) time.Time { - if ms == 0 { - return time.Time{} - } - return time.Unix(0, ms*int64(time.Millisecond)) +func StateFromTime(t time.Time) State { return State(timeToUnixMs(t)) } + +type ConflictResolution int + +const ( + ConflictResolutionNeverSeenUpdate ConflictResolution = iota + ConflictResolutionTooFutureDeferred + ConflictResolutionOlderHistorical + ConflictResolutionNewerUpdate + ConflictResolutionSameKeep + ConflictResolutionSameUpdate +) + +func (c ConflictResolution) IsImmediateUpdate() bool { + return c == ConflictResolutionNeverSeenUpdate || c == ConflictResolutionNewerUpdate || c == ConflictResolutionSameUpdate } -// timeToUnixMs returns 0 if t.IsZero -func timeToUnixMs(t time.Time) int64 { - if t.IsZero() { - return 0 - } - return t.UnixNano() / int64(time.Millisecond) -} - -func timeNowUnixMs() int64 { - return timeToUnixMs(time.Now()) -} - -var lastNano int64 - -// uniqueNano is 0 if ms is first time seen, otherwise a unique num in combination with ms -func timeNowUniqueUnix() (ms int64, uniqueNum int64) { - now := time.Now() - newNano := now.UnixNano() - for { - prevLastNano := lastNano - if prevLastNano < newNano && atomic.CompareAndSwapInt64(&lastNano, prevLastNano, newNano) { - ms = newNano / int64(time.Millisecond) - // If was same ms as seen before, set uniqueNum to the nano part - if prevLastNano/int64(time.Millisecond) == ms { - uniqueNum = newNano%int64(time.Millisecond) + 1 - } - return - } - newNano = prevLastNano + 1 +func ConflictResolve(existingVal Value, existingState State, newVal Value, newState State, sysState State) ConflictResolution { + // Existing gunjs impl serializes to JSON first to do lexical comparisons, so we will too + if sysState < newState { + return ConflictResolutionTooFutureDeferred + } else if newState < existingState { + return ConflictResolutionOlderHistorical + } else if existingState < newState { + return ConflictResolutionNewerUpdate + } else if existingVal == newVal { + return ConflictResolutionSameKeep + } else if existingJSON, err := json.Marshal(existingVal); err != nil { + panic(err) + } else if newJSON, err := json.Marshal(newVal); err != nil { + panic(err) + } else if bytes.Compare(existingJSON, newJSON) < 0 { + return ConflictResolutionSameUpdate + } else { + return ConflictResolutionSameKeep } } diff --git a/gun/storage.go b/gun/storage.go index a98ede4..71c2937 100644 --- a/gun/storage.go +++ b/gun/storage.go @@ -4,19 +4,21 @@ import ( "context" "errors" "sync" + "time" ) var ErrStorageNotFound = errors.New("Not found") type Storage interface { Get(ctx context.Context, parentSoul, field string) (Value, State, error) - // If bool is false, it's deferred - Put(ctx context.Context, parentSoul, field string, val Value, state State) (bool, error) - Tracking(ctx context.Context, parentSoul, field string) (bool, error) + Put(ctx context.Context, parentSoul, field string, val Value, state State, onlyIfExists bool) (ConflictResolution, error) + Close() error } -type StorageInMem struct { - values sync.Map +type storageInMem struct { + values map[parentSoulAndField]*valueWithState + valueLock sync.RWMutex + purgeCancelFn context.CancelFunc } type parentSoulAndField struct{ parentSoul, field string } @@ -26,22 +28,75 @@ type valueWithState struct { state State } -func (s *StorageInMem) Get(ctx context.Context, parentSoul, field string) (Value, State, error) { - v, ok := s.values.Load(parentSoulAndField{parentSoul, field}) - if !ok { - return nil, 0, ErrStorageNotFound +func NewStorageInMem(oldestAllowed time.Duration) Storage { + s := &storageInMem{values: map[parentSoulAndField]*valueWithState{}} + // Start the purger + if oldestAllowed > 0 { + var ctx context.Context + ctx, s.purgeCancelFn = context.WithCancel(context.Background()) + go func() { + tick := time.NewTicker(5 * time.Second) + defer tick.Stop() + for { + select { + case <-ctx.Done(): + return + case t := <-tick.C: + oldestStateAllowed := StateFromTime(t.Add(-oldestAllowed)) + s.valueLock.Lock() + s.valueLock.Unlock() + for k, v := range s.values { + if v.state < oldestStateAllowed { + delete(s.values, k) + } + } + } + } + }() } - vs := v.(*valueWithState) - return vs.val, vs.state, nil + return s } -func (s *StorageInMem) Put(ctx context.Context, parentSoul, field string, val Value, state State) (bool, error) { - s.values.Store(parentSoulAndField{parentSoul, field}, &valueWithState{val, state}) - // TODO: conflict resolution state check? - return true, nil +func (s *storageInMem) Get(ctx context.Context, parentSoul, field string) (Value, State, error) { + s.valueLock.RLock() + defer s.valueLock.RUnlock() + if vs := s.values[parentSoulAndField{parentSoul, field}]; vs == nil { + return nil, 0, ErrStorageNotFound + } else { + return vs.val, vs.state, nil + } } -func (s *StorageInMem) Tracking(ctx context.Context, parentSoul, field string) (bool, error) { - _, ok := s.values.Load(parentSoulAndField{parentSoul, field}) - return ok, nil +func (s *storageInMem) Put( + ctx context.Context, parentSoul, field string, val Value, state State, onlyIfExists bool, +) (confRes ConflictResolution, err error) { + s.valueLock.Lock() + defer s.valueLock.Unlock() + key, newVs := parentSoulAndField{parentSoul, field}, &valueWithState{val, state} + sysState := StateNow() + if existingVs := s.values[key]; existingVs == nil && onlyIfExists { + return 0, ErrStorageNotFound + } else if existingVs == nil { + confRes = ConflictResolutionNeverSeenUpdate + } else { + confRes = ConflictResolve(existingVs.val, existingVs.state, val, state, sysState) + } + if confRes == ConflictResolutionTooFutureDeferred { + // Schedule for 100ms past when it's deferred to + time.AfterFunc(time.Duration(state-sysState)*time.Millisecond+100, func() { + // TODO: should I check whether closed? + // TODO: what to do w/ error? + s.Put(ctx, parentSoul, field, val, state, onlyIfExists) + }) + } else if confRes.IsImmediateUpdate() { + s.values[key] = newVs + } + return +} + +func (s *storageInMem) Close() error { + if s.purgeCancelFn != nil { + s.purgeCancelFn() + } + return nil } diff --git a/gun/tests/gun_test.go b/gun/tests/gun_test.go index 6546750..6fd71d0 100644 --- a/gun/tests/gun_test.go +++ b/gun/tests/gun_test.go @@ -9,8 +9,10 @@ import ( func TestGunGetSimple(t *testing.T) { // Run the server, put in one call, get in another, then check - ctx, cancelFn := newContextWithGunJServer(t) + ctx, cancelFn := newContext(t) defer cancelFn() + serverCancelFn := ctx.startGunJSServer() + defer serverCancelFn() randStr := randString(30) // Write w/ JS ctx.runJSWithGun(` @@ -29,11 +31,12 @@ func TestGunGetSimple(t *testing.T) { r := g.Scoped(ctx, "esgopeta-test", "TestGunGetSimple", "some-key").FetchOne(ctx) ctx.Require.NoError(r.Err) ctx.Require.Equal(gun.ValueString(randStr), r.Value.(gun.ValueString)) - // // Do it again TODO: make sure there are no network calls, it's all from mem - // ctx.debugf("Asking for key again") - // f = g.Scoped(ctx, "esgopeta-test", "TestGunGetSimple", "some-key").FetchOne(ctx) - // ctx.Require.NoError(f.Err) - // ctx.Require.Equal(gun.ValueString(randStr), f.Value.(gun.ValueString)) + // Do it again with the JS server closed since it should fetch from memory + serverCancelFn() + ctx.debugf("Asking for key again") + r = g.Scoped(ctx, "esgopeta-test", "TestGunGetSimple", "some-key").FetchOne(ctx) + ctx.Require.NoError(r.Err) + ctx.Require.Equal(gun.ValueString(randStr), r.Value.(gun.ValueString)) } func TestGunPutSimple(t *testing.T) { @@ -59,3 +62,12 @@ func TestGunPutSimple(t *testing.T) { `) ctx.Require.Equal(randStr, strings.TrimSpace(string(out))) } + +/* +TODO Tests to write: +* test put w/ future state happens then +* test put w/ old state is discarded +* test put w/ new state is persisted +* test put w/ same state but greater is persisted +* test put w/ same state but less is discarded +*/ diff --git a/gun/util.go b/gun/util.go index a12b277..fbc6d1c 100644 --- a/gun/util.go +++ b/gun/util.go @@ -2,6 +2,8 @@ package gun import ( "crypto/rand" + "sync/atomic" + "time" ) const randChars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" @@ -17,3 +19,43 @@ func randString(n int) (s string) { } return s } + +// timeFromUnixMs returns zero'd time if ms is 0 +func timeFromUnixMs(ms int64) time.Time { + if ms == 0 { + return time.Time{} + } + return time.Unix(0, ms*int64(time.Millisecond)) +} + +// timeToUnixMs returns 0 if t.IsZero +func timeToUnixMs(t time.Time) int64 { + if t.IsZero() { + return 0 + } + return t.UnixNano() / int64(time.Millisecond) +} + +func timeNowUnixMs() int64 { + return timeToUnixMs(time.Now()) +} + +var lastNano int64 + +// uniqueNano is 0 if ms is first time seen, otherwise a unique num in combination with ms +func timeNowUniqueUnix() (ms int64, uniqueNum int64) { + now := time.Now() + newNano := now.UnixNano() + for { + prevLastNano := lastNano + if prevLastNano < newNano && atomic.CompareAndSwapInt64(&lastNano, prevLastNano, newNano) { + ms = newNano / int64(time.Millisecond) + // If was same ms as seen before, set uniqueNum to the nano part + if prevLastNano/int64(time.Millisecond) == ms { + uniqueNum = newNano%int64(time.Millisecond) + 1 + } + return + } + newNano = prevLastNano + 1 + } +}