Tracking, conflict resolution, and dedupe

This commit is contained in:
Chad Retz 2019-02-25 14:09:47 -06:00
parent 5120875087
commit c79ea0e04c
8 changed files with 210 additions and 83 deletions

View File

@ -39,6 +39,7 @@ const (
)
const DefaultPeerSleepOnError = 30 * time.Second
const DefaultOldestAllowedStorageValue = 7 * (60 * time.Minute)
func New(ctx context.Context, config Config) (*Gun, error) {
g := &Gun{
@ -75,7 +76,7 @@ func New(ctx context.Context, config Config) (*Gun, error) {
}
// Set defaults
if g.storage == nil {
g.storage = &StorageInMem{}
g.storage = NewStorageInMem(DefaultOldestAllowedStorageValue)
}
if g.soulGen == nil {
g.soulGen = DefaultSoulGen
@ -103,6 +104,9 @@ func (g *Gun) Close() error {
errs = append(errs, err)
}
}
if err := g.storage.Close(); err != nil {
errs = append(errs, err)
}
if len(errs) == 0 {
return nil
} else if len(errs) == 1 {
@ -165,6 +169,17 @@ func (g *Gun) startReceiving() {
}
func (g *Gun) onPeerMessage(ctx context.Context, msg *MessageReceived) {
// If we're tracking everything, persist all puts here.
if g.tracking == TrackingEverything {
for parentSoul, node := range msg.Put {
for field, value := range node.Values {
if state, ok := node.Metadata.State[field]; ok {
// TODO: warn on error or something
g.storage.Put(ctx, parentSoul, field, value, state, false)
}
}
}
}
// If there is a listener for this message, use it
if msg.Ack != "" {
g.messageIDListenersLock.RLock()

View File

@ -122,9 +122,3 @@ func (ValueRelation) nodeValue() {}
func (n ValueRelation) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]string{"#": string(n)})
}
// type ValueWithState struct {
// Value Value
// // This is 0 for top-level values
// State int64
// }

View File

@ -89,8 +89,10 @@ func (s *Scoped) fetchRemote(ctx context.Context, ch chan *FetchResult) {
s.fetchResultListenersLock.Unlock()
// Listen for responses to this get
s.gun.registerMessageIDListener(req.ID, msgCh)
// TODO: only for children: s.gun.RegisterValueIDListener(s.id, msgCh)
// TODO: Also listen for any changes to the value or just for specific requests?
// Handle received messages turning them to value fetches
var lastSeenValue Value
var lastSeenState State
go func() {
for {
select {
@ -106,12 +108,28 @@ func (s *Scoped) fetchRemote(ctx context.Context, ch chan *FetchResult) {
// We asked for a single field, should only get that field or it doesn't exist
if msg.Err != "" {
r.Err = fmt.Errorf("Remote error: %v", msg.Err)
} else if n := msg.Put[parentSoul]; n != nil && n.Values[s.field] != nil {
r.Value, r.State, r.ValueExists = n.Values[s.field], n.State[s.field], true
} else if n := msg.Put[parentSoul]; n != nil {
if newVal, ok := n.Values[s.field]; ok {
newState := n.State[s.field]
// Dedupe the value
if lastSeenValue == newVal && lastSeenState == newState {
continue
}
// If we're storing only what we requested (we do "everything" at a higher level), do it here
// and only send result if it was an update. Otherwise only do it if we would have done one.
confRes := ConflictResolutionNeverSeenUpdate
if s.gun.tracking == TrackingRequested {
confRes, r.Err = s.gun.storage.Put(ctx, parentSoul, s.field, newVal, newState, false)
} else if lastSeenState > 0 {
confRes = ConflictResolve(lastSeenValue, lastSeenState, newVal, newState, StateNow())
}
// If there are no errors and it was an update, update the last seen and set the response vals
if r.Err == nil && confRes.IsImmediateUpdate() {
lastSeenValue, lastSeenState = newVal, newState
r.Value, r.State, r.ValueExists = newVal, newState, true
}
}
}
// TODO: conflict resolution and defer
// TODO: dedupe
// TODO: store and cache
safeFetchResultSend(ch, r)
}
}

View File

@ -87,14 +87,10 @@ func (s *Scoped) Put(ctx context.Context, val Value, opts ...PutOption) <-chan *
// Also store locally and set the cached soul
// TODO: Should I not store until the very end just in case it errors halfway
// though? There are no standard cases where it should fail.
if ok, err := s.gun.storage.Put(ctx, prevParentSoul, parent.field, ValueRelation(parentCachedSoul), currState); err != nil {
if _, err := s.gun.storage.Put(ctx, prevParentSoul, parent.field, ValueRelation(parentCachedSoul), currState, false); err != nil {
ch <- &PutResult{Err: err}
close(ch)
return ch
} else if !ok {
ch <- &PutResult{Err: fmt.Errorf("Unexpected deferred local store")}
close(ch)
return ch
} else if !parent.setCachedSoul(ValueRelation(parentCachedSoul)) {
ch <- &PutResult{Err: fmt.Errorf("Concurrent cached soul set")}
close(ch)
@ -104,14 +100,10 @@ func (s *Scoped) Put(ctx context.Context, val Value, opts ...PutOption) <-chan *
prevParentSoul = parentCachedSoul
}
// Now that we've setup all the parents, we can do this store locally
if ok, err := s.gun.storage.Put(ctx, prevParentSoul, s.field, val, currState); err != nil {
if _, err := s.gun.storage.Put(ctx, prevParentSoul, s.field, val, currState, false); err != nil {
ch <- &PutResult{Err: err}
close(ch)
return ch
} else if !ok {
ch <- &PutResult{Err: fmt.Errorf("Unexpected deferred local store")}
close(ch)
return ch
}
// We need an ack for local store and stop if local only
ch <- &PutResult{}

View File

@ -1,7 +1,8 @@
package gun
import (
"sync/atomic"
"bytes"
"encoding/json"
"time"
)
@ -9,42 +10,40 @@ type State uint64
func StateNow() State { return State(timeNowUnixMs()) }
// timeFromUnixMs returns zero'd time if ms is 0
func timeFromUnixMs(ms int64) time.Time {
if ms == 0 {
return time.Time{}
}
return time.Unix(0, ms*int64(time.Millisecond))
func StateFromTime(t time.Time) State { return State(timeToUnixMs(t)) }
type ConflictResolution int
const (
ConflictResolutionNeverSeenUpdate ConflictResolution = iota
ConflictResolutionTooFutureDeferred
ConflictResolutionOlderHistorical
ConflictResolutionNewerUpdate
ConflictResolutionSameKeep
ConflictResolutionSameUpdate
)
func (c ConflictResolution) IsImmediateUpdate() bool {
return c == ConflictResolutionNeverSeenUpdate || c == ConflictResolutionNewerUpdate || c == ConflictResolutionSameUpdate
}
// timeToUnixMs returns 0 if t.IsZero
func timeToUnixMs(t time.Time) int64 {
if t.IsZero() {
return 0
}
return t.UnixNano() / int64(time.Millisecond)
}
func timeNowUnixMs() int64 {
return timeToUnixMs(time.Now())
}
var lastNano int64
// uniqueNano is 0 if ms is first time seen, otherwise a unique num in combination with ms
func timeNowUniqueUnix() (ms int64, uniqueNum int64) {
now := time.Now()
newNano := now.UnixNano()
for {
prevLastNano := lastNano
if prevLastNano < newNano && atomic.CompareAndSwapInt64(&lastNano, prevLastNano, newNano) {
ms = newNano / int64(time.Millisecond)
// If was same ms as seen before, set uniqueNum to the nano part
if prevLastNano/int64(time.Millisecond) == ms {
uniqueNum = newNano%int64(time.Millisecond) + 1
}
return
}
newNano = prevLastNano + 1
func ConflictResolve(existingVal Value, existingState State, newVal Value, newState State, sysState State) ConflictResolution {
// Existing gunjs impl serializes to JSON first to do lexical comparisons, so we will too
if sysState < newState {
return ConflictResolutionTooFutureDeferred
} else if newState < existingState {
return ConflictResolutionOlderHistorical
} else if existingState < newState {
return ConflictResolutionNewerUpdate
} else if existingVal == newVal {
return ConflictResolutionSameKeep
} else if existingJSON, err := json.Marshal(existingVal); err != nil {
panic(err)
} else if newJSON, err := json.Marshal(newVal); err != nil {
panic(err)
} else if bytes.Compare(existingJSON, newJSON) < 0 {
return ConflictResolutionSameUpdate
} else {
return ConflictResolutionSameKeep
}
}

View File

@ -4,19 +4,21 @@ import (
"context"
"errors"
"sync"
"time"
)
var ErrStorageNotFound = errors.New("Not found")
type Storage interface {
Get(ctx context.Context, parentSoul, field string) (Value, State, error)
// If bool is false, it's deferred
Put(ctx context.Context, parentSoul, field string, val Value, state State) (bool, error)
Tracking(ctx context.Context, parentSoul, field string) (bool, error)
Put(ctx context.Context, parentSoul, field string, val Value, state State, onlyIfExists bool) (ConflictResolution, error)
Close() error
}
type StorageInMem struct {
values sync.Map
type storageInMem struct {
values map[parentSoulAndField]*valueWithState
valueLock sync.RWMutex
purgeCancelFn context.CancelFunc
}
type parentSoulAndField struct{ parentSoul, field string }
@ -26,22 +28,75 @@ type valueWithState struct {
state State
}
func (s *StorageInMem) Get(ctx context.Context, parentSoul, field string) (Value, State, error) {
v, ok := s.values.Load(parentSoulAndField{parentSoul, field})
if !ok {
return nil, 0, ErrStorageNotFound
func NewStorageInMem(oldestAllowed time.Duration) Storage {
s := &storageInMem{values: map[parentSoulAndField]*valueWithState{}}
// Start the purger
if oldestAllowed > 0 {
var ctx context.Context
ctx, s.purgeCancelFn = context.WithCancel(context.Background())
go func() {
tick := time.NewTicker(5 * time.Second)
defer tick.Stop()
for {
select {
case <-ctx.Done():
return
case t := <-tick.C:
oldestStateAllowed := StateFromTime(t.Add(-oldestAllowed))
s.valueLock.Lock()
s.valueLock.Unlock()
for k, v := range s.values {
if v.state < oldestStateAllowed {
delete(s.values, k)
}
vs := v.(*valueWithState)
}
}
}
}()
}
return s
}
func (s *storageInMem) Get(ctx context.Context, parentSoul, field string) (Value, State, error) {
s.valueLock.RLock()
defer s.valueLock.RUnlock()
if vs := s.values[parentSoulAndField{parentSoul, field}]; vs == nil {
return nil, 0, ErrStorageNotFound
} else {
return vs.val, vs.state, nil
}
}
func (s *StorageInMem) Put(ctx context.Context, parentSoul, field string, val Value, state State) (bool, error) {
s.values.Store(parentSoulAndField{parentSoul, field}, &valueWithState{val, state})
// TODO: conflict resolution state check?
return true, nil
func (s *storageInMem) Put(
ctx context.Context, parentSoul, field string, val Value, state State, onlyIfExists bool,
) (confRes ConflictResolution, err error) {
s.valueLock.Lock()
defer s.valueLock.Unlock()
key, newVs := parentSoulAndField{parentSoul, field}, &valueWithState{val, state}
sysState := StateNow()
if existingVs := s.values[key]; existingVs == nil && onlyIfExists {
return 0, ErrStorageNotFound
} else if existingVs == nil {
confRes = ConflictResolutionNeverSeenUpdate
} else {
confRes = ConflictResolve(existingVs.val, existingVs.state, val, state, sysState)
}
if confRes == ConflictResolutionTooFutureDeferred {
// Schedule for 100ms past when it's deferred to
time.AfterFunc(time.Duration(state-sysState)*time.Millisecond+100, func() {
// TODO: should I check whether closed?
// TODO: what to do w/ error?
s.Put(ctx, parentSoul, field, val, state, onlyIfExists)
})
} else if confRes.IsImmediateUpdate() {
s.values[key] = newVs
}
return
}
func (s *StorageInMem) Tracking(ctx context.Context, parentSoul, field string) (bool, error) {
_, ok := s.values.Load(parentSoulAndField{parentSoul, field})
return ok, nil
func (s *storageInMem) Close() error {
if s.purgeCancelFn != nil {
s.purgeCancelFn()
}
return nil
}

View File

@ -9,8 +9,10 @@ import (
func TestGunGetSimple(t *testing.T) {
// Run the server, put in one call, get in another, then check
ctx, cancelFn := newContextWithGunJServer(t)
ctx, cancelFn := newContext(t)
defer cancelFn()
serverCancelFn := ctx.startGunJSServer()
defer serverCancelFn()
randStr := randString(30)
// Write w/ JS
ctx.runJSWithGun(`
@ -29,11 +31,12 @@ func TestGunGetSimple(t *testing.T) {
r := g.Scoped(ctx, "esgopeta-test", "TestGunGetSimple", "some-key").FetchOne(ctx)
ctx.Require.NoError(r.Err)
ctx.Require.Equal(gun.ValueString(randStr), r.Value.(gun.ValueString))
// // Do it again TODO: make sure there are no network calls, it's all from mem
// ctx.debugf("Asking for key again")
// f = g.Scoped(ctx, "esgopeta-test", "TestGunGetSimple", "some-key").FetchOne(ctx)
// ctx.Require.NoError(f.Err)
// ctx.Require.Equal(gun.ValueString(randStr), f.Value.(gun.ValueString))
// Do it again with the JS server closed since it should fetch from memory
serverCancelFn()
ctx.debugf("Asking for key again")
r = g.Scoped(ctx, "esgopeta-test", "TestGunGetSimple", "some-key").FetchOne(ctx)
ctx.Require.NoError(r.Err)
ctx.Require.Equal(gun.ValueString(randStr), r.Value.(gun.ValueString))
}
func TestGunPutSimple(t *testing.T) {
@ -59,3 +62,12 @@ func TestGunPutSimple(t *testing.T) {
`)
ctx.Require.Equal(randStr, strings.TrimSpace(string(out)))
}
/*
TODO Tests to write:
* test put w/ future state happens then
* test put w/ old state is discarded
* test put w/ new state is persisted
* test put w/ same state but greater is persisted
* test put w/ same state but less is discarded
*/

View File

@ -2,6 +2,8 @@ package gun
import (
"crypto/rand"
"sync/atomic"
"time"
)
const randChars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
@ -17,3 +19,43 @@ func randString(n int) (s string) {
}
return s
}
// timeFromUnixMs returns zero'd time if ms is 0
func timeFromUnixMs(ms int64) time.Time {
if ms == 0 {
return time.Time{}
}
return time.Unix(0, ms*int64(time.Millisecond))
}
// timeToUnixMs returns 0 if t.IsZero
func timeToUnixMs(t time.Time) int64 {
if t.IsZero() {
return 0
}
return t.UnixNano() / int64(time.Millisecond)
}
func timeNowUnixMs() int64 {
return timeToUnixMs(time.Now())
}
var lastNano int64
// uniqueNano is 0 if ms is first time seen, otherwise a unique num in combination with ms
func timeNowUniqueUnix() (ms int64, uniqueNum int64) {
now := time.Now()
newNano := now.UnixNano()
for {
prevLastNano := lastNano
if prevLastNano < newNano && atomic.CompareAndSwapInt64(&lastNano, prevLastNano, newNano) {
ms = newNano / int64(time.Millisecond)
// If was same ms as seen before, set uniqueNum to the nano part
if prevLastNano/int64(time.Millisecond) == ms {
uniqueNum = newNano%int64(time.Millisecond) + 1
}
return
}
newNano = prevLastNano + 1
}
}