From 1d987a5e797a206fb20c9c0cbb259ec458366e6e Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Tue, 26 Feb 2019 01:10:13 -0600 Subject: [PATCH] Do eager storage of puts --- gun/gun.go | 25 +++++++++++++++++++++---- gun/message.go | 4 ++-- gun/scoped_fetch.go | 12 +++++++++++- 3 files changed, 34 insertions(+), 7 deletions(-) diff --git a/gun/gun.go b/gun/gun.go index fa25b0b..58d4c08 100644 --- a/gun/gun.go +++ b/gun/gun.go @@ -211,17 +211,32 @@ func (g *Gun) startReceiving(peer *Peer) { } func (g *Gun) onPeerMessage(ctx context.Context, msg *messageReceived) { - // If we're tracking everything, persist all puts here. - if g.tracking == TrackingEverything { + // TODO: + // * if message-acks are not considered part of a store-all server's storage, then use msg.Ack + // to determine whether we even put here instead of how we do it now. + // * handle gets + + // If we're tracking anything, we try to put it (may only be if exists) + if g.tracking != TrackingNothing { + // If we're tracking everything, we persist everything. Otherwise if we're + // only tracking requested, we persist only if it already exists. + putOnlyIfExists := g.tracking == TrackingRequested for parentSoul, node := range msg.Put { for field, value := range node.Values { if state, ok := node.Metadata.State[field]; ok { - // TODO: warn on error or something - g.storage.Put(ctx, parentSoul, field, value, state, false) + // TODO: warn on other error or something + _, err := g.storage.Put(ctx, parentSoul, field, value, state, putOnlyIfExists) + if err == nil { + if msg.storedPuts == nil { + msg.storedPuts = map[string][]string{} + } + msg.storedPuts[parentSoul] = append(msg.storedPuts[parentSoul], field) + } } } } } + // If there is a listener for this message, use it if msg.Ack != "" { g.messageIDListenersLock.RLock() @@ -232,6 +247,7 @@ func (g *Gun) onPeerMessage(ctx context.Context, msg *messageReceived) { return } } + // DAM messages are either requests for our ID or setting of theirs if msg.DAM != "" { if msg.PID == "" { @@ -249,6 +265,7 @@ func (g *Gun) onPeerMessage(ctx context.Context, msg *messageReceived) { } return } + // Unhandled message means rebroadcast g.send(ctx, msg.Message, msg.peer) } diff --git a/gun/message.go b/gun/message.go index f1af035..053c4c7 100644 --- a/gun/message.go +++ b/gun/message.go @@ -24,6 +24,6 @@ type MessageGetRequest struct { type messageReceived struct { *Message - peer *Peer - stored bool + peer *Peer + storedPuts map[string][]string } diff --git a/gun/scoped_fetch.go b/gun/scoped_fetch.go index b1071de..d189175 100644 --- a/gun/scoped_fetch.go +++ b/gun/scoped_fetch.go @@ -137,7 +137,17 @@ func (s *Scoped) fetchRemote(ctx context.Context, ch chan *FetchResult) { // and only send result if it was an update. Otherwise only do it if we would have done one. confRes := ConflictResolutionNeverSeenUpdate if s.gun.tracking == TrackingRequested { - confRes, r.Err = s.gun.storage.Put(ctx, parentSoul, s.field, newVal, newState, false) + // Wait, wait, we may have already stored this + alreadyStored := false + for _, storedField := range msg.storedPuts[parentSoul] { + if storedField == s.field { + alreadyStored = true + break + } + } + if !alreadyStored { + confRes, r.Err = s.gun.storage.Put(ctx, parentSoul, s.field, newVal, newState, false) + } } else if lastSeenState > 0 { confRes = ConflictResolve(lastSeenValue, lastSeenState, newVal, newState, StateNow()) }