mirror of
https://github.com/ChronosX88/go-gun.git
synced 2024-11-09 12:41:00 +00:00
Successful put
This commit is contained in:
parent
578e8917ae
commit
5fa024d1e3
50
gun/gun.go
50
gun/gun.go
@ -16,8 +16,8 @@ type Gun struct {
|
|||||||
myPeerID string
|
myPeerID string
|
||||||
tracking Tracking
|
tracking Tracking
|
||||||
|
|
||||||
messageIDPutListeners map[string]chan<- *MessageReceived
|
messageIDListeners map[string]chan<- *MessageReceived
|
||||||
messageIDPutListenersLock sync.RWMutex
|
messageIDListenersLock sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
@ -42,14 +42,14 @@ const DefaultPeerSleepOnError = 30 * time.Second
|
|||||||
|
|
||||||
func New(ctx context.Context, config Config) (*Gun, error) {
|
func New(ctx context.Context, config Config) (*Gun, error) {
|
||||||
g := &Gun{
|
g := &Gun{
|
||||||
peers: make([]*gunPeer, len(config.PeerURLs)),
|
peers: make([]*gunPeer, len(config.PeerURLs)),
|
||||||
storage: config.Storage,
|
storage: config.Storage,
|
||||||
soulGen: config.SoulGen,
|
soulGen: config.SoulGen,
|
||||||
peerErrorHandler: config.PeerErrorHandler,
|
peerErrorHandler: config.PeerErrorHandler,
|
||||||
peerSleepOnError: config.PeerSleepOnError,
|
peerSleepOnError: config.PeerSleepOnError,
|
||||||
myPeerID: config.MyPeerID,
|
myPeerID: config.MyPeerID,
|
||||||
tracking: config.Tracking,
|
tracking: config.Tracking,
|
||||||
messageIDPutListeners: map[string]chan<- *MessageReceived{},
|
messageIDListeners: map[string]chan<- *MessageReceived{},
|
||||||
}
|
}
|
||||||
// Create all the peers
|
// Create all the peers
|
||||||
sleepOnError := config.PeerSleepOnError
|
sleepOnError := config.PeerSleepOnError
|
||||||
@ -78,7 +78,7 @@ func New(ctx context.Context, config Config) (*Gun, error) {
|
|||||||
g.storage = &StorageInMem{}
|
g.storage = &StorageInMem{}
|
||||||
}
|
}
|
||||||
if g.soulGen == nil {
|
if g.soulGen == nil {
|
||||||
g.soulGen = SoulGenDefault
|
g.soulGen = DefaultSoulGen
|
||||||
}
|
}
|
||||||
if g.myPeerID == "" {
|
if g.myPeerID == "" {
|
||||||
g.myPeerID = randString(9)
|
g.myPeerID = randString(9)
|
||||||
@ -162,10 +162,10 @@ func (g *Gun) startReceiving() {
|
|||||||
|
|
||||||
func (g *Gun) onPeerMessage(ctx context.Context, msg *MessageReceived) {
|
func (g *Gun) onPeerMessage(ctx context.Context, msg *MessageReceived) {
|
||||||
// If there is a listener for this message, use it
|
// If there is a listener for this message, use it
|
||||||
if msg.Ack != "" && len(msg.Put) > 0 {
|
if msg.Ack != "" {
|
||||||
g.messageIDPutListenersLock.RLock()
|
g.messageIDListenersLock.RLock()
|
||||||
l := g.messageIDPutListeners[msg.Ack]
|
l := g.messageIDListeners[msg.Ack]
|
||||||
g.messageIDPutListenersLock.RUnlock()
|
g.messageIDListenersLock.RUnlock()
|
||||||
if l != nil {
|
if l != nil {
|
||||||
go safeReceivedMessageSend(l, msg)
|
go safeReceivedMessageSend(l, msg)
|
||||||
return
|
return
|
||||||
@ -195,22 +195,18 @@ func (g *Gun) onPeerError(err *ErrPeer) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *Gun) RegisterMessageIDPutListener(id string, ch chan<- *MessageReceived) {
|
func (g *Gun) RegisterMessageIDListener(id string, ch chan<- *MessageReceived) {
|
||||||
g.messageIDPutListenersLock.Lock()
|
g.messageIDListenersLock.Lock()
|
||||||
defer g.messageIDPutListenersLock.Unlock()
|
defer g.messageIDListenersLock.Unlock()
|
||||||
g.messageIDPutListeners[id] = ch
|
g.messageIDListeners[id] = ch
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *Gun) UnregisterMessageIDPutListener(id string) {
|
func (g *Gun) UnregisterMessageIDListener(id string) {
|
||||||
g.messageIDPutListenersLock.Lock()
|
g.messageIDListenersLock.Lock()
|
||||||
defer g.messageIDPutListenersLock.Unlock()
|
defer g.messageIDListenersLock.Unlock()
|
||||||
delete(g.messageIDPutListeners, id)
|
delete(g.messageIDListeners, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
// func (g *Gun) RegisterValueIDPutListener(id string, ch chan<- *ReceivedMessage) {
|
|
||||||
// panic("TODO")
|
|
||||||
// }
|
|
||||||
|
|
||||||
func (g *Gun) Scoped(ctx context.Context, key string, children ...string) *Scoped {
|
func (g *Gun) Scoped(ctx context.Context, key string, children ...string) *Scoped {
|
||||||
s := newScoped(g, nil, key)
|
s := newScoped(g, nil, key)
|
||||||
if len(children) > 0 {
|
if len(children) > 0 {
|
||||||
|
@ -12,6 +12,8 @@ type Message struct {
|
|||||||
Put map[string]*Node `json:"put,omitempty"`
|
Put map[string]*Node `json:"put,omitempty"`
|
||||||
DAM string `json:"dam,omitempty"`
|
DAM string `json:"dam,omitempty"`
|
||||||
PID string `json:"pid,omitempty"`
|
PID string `json:"pid,omitempty"`
|
||||||
|
OK int `json:"ok,omitempty"`
|
||||||
|
Err string `json:"err,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Message) Clone() *Message {
|
func (m *Message) Clone() *Message {
|
||||||
|
@ -7,7 +7,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
var SoulGenDefault = func() string {
|
func DefaultSoulGen() string {
|
||||||
ms, uniqueNum := TimeNowUniqueUnix()
|
ms, uniqueNum := TimeNowUniqueUnix()
|
||||||
s := strconv.FormatInt(ms, 36)
|
s := strconv.FormatInt(ms, 36)
|
||||||
if uniqueNum > 0 {
|
if uniqueNum > 0 {
|
||||||
@ -65,6 +65,7 @@ type Metadata struct {
|
|||||||
State map[string]int64 `json:">,omitempty"`
|
State map[string]int64 `json:">,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: put private methd to seal enum
|
||||||
type Value interface {
|
type Value interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
24
gun/peer.go
24
gun/peer.go
@ -24,16 +24,20 @@ type Peer interface {
|
|||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
var PeerURLSchemes = map[string]func(context.Context, *url.URL) (Peer, error){
|
var PeerURLSchemes map[string]func(context.Context, *url.URL) (Peer, error)
|
||||||
"http": func(ctx context.Context, peerURL *url.URL) (Peer, error) {
|
|
||||||
schemeChangedURL := &url.URL{}
|
func init() {
|
||||||
*schemeChangedURL = *peerURL
|
PeerURLSchemes = map[string]func(context.Context, *url.URL) (Peer, error){
|
||||||
schemeChangedURL.Scheme = "ws"
|
"http": func(ctx context.Context, peerURL *url.URL) (Peer, error) {
|
||||||
return NewPeerWebSocket(ctx, schemeChangedURL)
|
schemeChangedURL := &url.URL{}
|
||||||
},
|
*schemeChangedURL = *peerURL
|
||||||
"ws": func(ctx context.Context, peerURL *url.URL) (Peer, error) {
|
schemeChangedURL.Scheme = "ws"
|
||||||
return NewPeerWebSocket(ctx, peerURL)
|
return NewPeerWebSocket(ctx, schemeChangedURL)
|
||||||
},
|
},
|
||||||
|
"ws": func(ctx context.Context, peerURL *url.URL) (Peer, error) {
|
||||||
|
return NewPeerWebSocket(ctx, peerURL)
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPeer(ctx context.Context, peerURL string) (Peer, error) {
|
func NewPeer(ctx context.Context, peerURL string) (Peer, error) {
|
||||||
|
202
gun/scoped.go
202
gun/scoped.go
@ -3,6 +3,7 @@ package gun
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -14,200 +15,57 @@ type Scoped struct {
|
|||||||
cachedParentSoul string
|
cachedParentSoul string
|
||||||
cachedParentSoulLock sync.RWMutex
|
cachedParentSoulLock sync.RWMutex
|
||||||
|
|
||||||
resultChansToListeners map[<-chan *Result]*messageIDListener
|
fetchResultListeners map[<-chan *FetchResult]*fetchResultListener
|
||||||
resultChansToListenersLock sync.Mutex
|
fetchResultListenersLock sync.Mutex
|
||||||
}
|
|
||||||
|
|
||||||
type messageIDListener struct {
|
putResultListeners map[<-chan *PutResult]*putResultListener
|
||||||
id string
|
putResultListenersLock sync.Mutex
|
||||||
results chan *Result
|
|
||||||
receivedMessages chan *MessageReceived
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newScoped(gun *Gun, parent *Scoped, field string) *Scoped {
|
func newScoped(gun *Gun, parent *Scoped, field string) *Scoped {
|
||||||
return &Scoped{
|
return &Scoped{
|
||||||
gun: gun,
|
gun: gun,
|
||||||
parent: parent,
|
parent: parent,
|
||||||
field: field,
|
field: field,
|
||||||
resultChansToListeners: map[<-chan *Result]*messageIDListener{},
|
fetchResultListeners: map[<-chan *FetchResult]*fetchResultListener{},
|
||||||
|
putResultListeners: map[<-chan *PutResult]*putResultListener{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type Result struct {
|
|
||||||
// This can be a context error on cancelation
|
|
||||||
Err error
|
|
||||||
Field string
|
|
||||||
// Nil if the value doesn't exist, exists and is nil, or there's an error
|
|
||||||
Value Value
|
|
||||||
State int64 // This can be 0 for errors or top-level value relations
|
|
||||||
ValueExists bool
|
|
||||||
// Nil when local and sometimes on error
|
|
||||||
peer *gunPeer
|
|
||||||
}
|
|
||||||
|
|
||||||
var ErrNotObject = errors.New("Scoped value not an object")
|
var ErrNotObject = errors.New("Scoped value not an object")
|
||||||
var ErrLookupOnTopLevel = errors.New("Cannot do lookup on top level")
|
var ErrLookupOnTopLevel = errors.New("Cannot do put/lookup on top level")
|
||||||
|
|
||||||
// Empty string if doesn't exist, ErrNotObject if self or parent not an object
|
// Empty string if doesn't exist, ErrNotObject if self or parent not an object
|
||||||
func (s *Scoped) Soul(ctx context.Context) (string, error) {
|
func (s *Scoped) Soul(ctx context.Context) (string, error) {
|
||||||
s.cachedParentSoulLock.RLock()
|
if cachedSoul := s.cachedSoul(); cachedSoul != "" {
|
||||||
cachedParentSoul := s.cachedParentSoul
|
return cachedSoul, nil
|
||||||
s.cachedParentSoulLock.RUnlock()
|
} else if r := s.FetchOne(ctx); r.Err != nil {
|
||||||
if cachedParentSoul != "" {
|
|
||||||
return cachedParentSoul, nil
|
|
||||||
} else if r := s.Val(ctx); r.Err != nil {
|
|
||||||
return "", r.Err
|
return "", r.Err
|
||||||
} else if !r.ValueExists {
|
} else if !r.ValueExists {
|
||||||
return "", nil
|
return "", nil
|
||||||
} else if rel, ok := r.Value.(ValueRelation); !ok {
|
} else if rel, ok := r.Value.(ValueRelation); !ok {
|
||||||
return "", ErrNotObject
|
return "", ErrNotObject
|
||||||
|
} else if !s.setCachedSoul(rel) {
|
||||||
|
return "", fmt.Errorf("Concurrent soul cache set")
|
||||||
} else {
|
} else {
|
||||||
s.cachedParentSoulLock.Lock()
|
|
||||||
s.cachedParentSoul = string(rel)
|
|
||||||
s.cachedParentSoulLock.Unlock()
|
|
||||||
return string(rel), nil
|
return string(rel), nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Scoped) Put(ctx context.Context, val Value) <-chan *Result {
|
func (s *Scoped) cachedSoul() string {
|
||||||
panic("TODO")
|
s.cachedParentSoulLock.RLock()
|
||||||
|
defer s.cachedParentSoulLock.RUnlock()
|
||||||
|
return s.cachedParentSoul
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Scoped) Val(ctx context.Context) *Result {
|
func (s *Scoped) setCachedSoul(val ValueRelation) bool {
|
||||||
// Try local before remote
|
s.cachedParentSoulLock.Lock()
|
||||||
if r := s.ValLocal(ctx); r.Err != nil || r.ValueExists {
|
defer s.cachedParentSoulLock.Unlock()
|
||||||
return r
|
if s.cachedParentSoul != "" {
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
return s.ValRemote(ctx)
|
s.cachedParentSoul = string(val)
|
||||||
}
|
return true
|
||||||
|
|
||||||
func (s *Scoped) ValLocal(ctx context.Context) *Result {
|
|
||||||
// If there is no parent, this is just the relation
|
|
||||||
if s.parent == nil {
|
|
||||||
return &Result{Field: s.field, Value: ValueRelation(s.field), ValueExists: true}
|
|
||||||
}
|
|
||||||
r := &Result{Field: s.field}
|
|
||||||
// Need parent soul for lookup
|
|
||||||
var parentSoul string
|
|
||||||
if parentSoul, r.Err = s.parent.Soul(ctx); r.Err == nil {
|
|
||||||
var vs *ValueWithState
|
|
||||||
if vs, r.Err = s.gun.storage.Get(ctx, parentSoul, s.field); r.Err == ErrStorageNotFound {
|
|
||||||
r.Err = nil
|
|
||||||
} else if r.Err == nil {
|
|
||||||
r.Value, r.State, r.ValueExists = vs.Value, vs.State, true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return r
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Scoped) ValRemote(ctx context.Context) *Result {
|
|
||||||
if s.parent == nil {
|
|
||||||
return &Result{Err: ErrLookupOnTopLevel, Field: s.field}
|
|
||||||
}
|
|
||||||
ch := s.OnRemote(ctx)
|
|
||||||
defer s.Off(ch)
|
|
||||||
return <-ch
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Scoped) On(ctx context.Context) <-chan *Result {
|
|
||||||
ch := make(chan *Result, 1)
|
|
||||||
if s.parent == nil {
|
|
||||||
ch <- &Result{Err: ErrLookupOnTopLevel, Field: s.field}
|
|
||||||
close(ch)
|
|
||||||
} else {
|
|
||||||
if r := s.ValLocal(ctx); r.Err != nil || r.ValueExists {
|
|
||||||
ch <- r
|
|
||||||
}
|
|
||||||
go s.onRemote(ctx, ch)
|
|
||||||
}
|
|
||||||
return ch
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Scoped) OnRemote(ctx context.Context) <-chan *Result {
|
|
||||||
ch := make(chan *Result, 1)
|
|
||||||
if s.parent == nil {
|
|
||||||
ch <- &Result{Err: ErrLookupOnTopLevel, Field: s.field}
|
|
||||||
close(ch)
|
|
||||||
} else {
|
|
||||||
go s.onRemote(ctx, ch)
|
|
||||||
}
|
|
||||||
return ch
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Scoped) onRemote(ctx context.Context, ch chan *Result) {
|
|
||||||
if s.parent == nil {
|
|
||||||
panic("No parent")
|
|
||||||
}
|
|
||||||
// We have to get the parent soul first
|
|
||||||
parentSoul, err := s.parent.Soul(ctx)
|
|
||||||
if err != nil {
|
|
||||||
ch <- &Result{Err: ErrLookupOnTopLevel, Field: s.field}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// Create get request
|
|
||||||
req := &Message{
|
|
||||||
ID: randString(9),
|
|
||||||
Get: &MessageGetRequest{Soul: parentSoul, Field: s.field},
|
|
||||||
}
|
|
||||||
// Make a chan to listen for received messages and link it to
|
|
||||||
// the given one so we can turn it "off". Off will close this
|
|
||||||
// chan.
|
|
||||||
msgCh := make(chan *MessageReceived)
|
|
||||||
s.resultChansToListenersLock.Lock()
|
|
||||||
s.resultChansToListeners[ch] = &messageIDListener{req.ID, ch, msgCh}
|
|
||||||
s.resultChansToListenersLock.Unlock()
|
|
||||||
// Listen for responses to this get
|
|
||||||
s.gun.RegisterMessageIDPutListener(req.ID, msgCh)
|
|
||||||
// TODO: only for children: s.gun.RegisterValueIDPutListener(s.id, msgCh)
|
|
||||||
// Handle received messages turning them to value fetches
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
ch <- &Result{Err: ctx.Err(), Field: s.field}
|
|
||||||
s.Off(ch)
|
|
||||||
return
|
|
||||||
case msg, ok := <-msgCh:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
r := &Result{Field: s.field, peer: msg.peer}
|
|
||||||
// We asked for a single field, should only get that field or it doesn't exist
|
|
||||||
if n := msg.Put[parentSoul]; n != nil && n.Values[s.field] != nil {
|
|
||||||
r.Value, r.State, r.ValueExists = n.Values[s.field], n.State[s.field], true
|
|
||||||
}
|
|
||||||
// TODO: conflict resolution and defer
|
|
||||||
// TODO: dedupe
|
|
||||||
// TODO: store and cache
|
|
||||||
safeResultSend(ch, r)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
// Send async, sending back errors
|
|
||||||
go func() {
|
|
||||||
for peerErr := range s.gun.Send(ctx, req) {
|
|
||||||
safeResultSend(ch, &Result{
|
|
||||||
Err: peerErr.Err,
|
|
||||||
Field: s.field,
|
|
||||||
peer: peerErr.peer,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Scoped) Off(ch <-chan *Result) bool {
|
|
||||||
s.resultChansToListenersLock.Lock()
|
|
||||||
l := s.resultChansToListeners[ch]
|
|
||||||
delete(s.resultChansToListeners, ch)
|
|
||||||
s.resultChansToListenersLock.Unlock()
|
|
||||||
if l != nil {
|
|
||||||
// Unregister the chan
|
|
||||||
s.gun.UnregisterMessageIDPutListener(l.id)
|
|
||||||
// Close the message chan and the result chan
|
|
||||||
close(l.receivedMessages)
|
|
||||||
close(l.results)
|
|
||||||
}
|
|
||||||
return l != nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Scoped) Scoped(ctx context.Context, key string, children ...string) *Scoped {
|
func (s *Scoped) Scoped(ctx context.Context, key string, children ...string) *Scoped {
|
||||||
@ -217,9 +75,3 @@ func (s *Scoped) Scoped(ctx context.Context, key string, children ...string) *Sc
|
|||||||
}
|
}
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
func safeResultSend(ch chan<- *Result, r *Result) {
|
|
||||||
// Due to the fact that we may send on a closed channel here, we ignore the panic
|
|
||||||
defer func() { recover() }()
|
|
||||||
ch <- r
|
|
||||||
}
|
|
||||||
|
169
gun/scoped_fetch.go
Normal file
169
gun/scoped_fetch.go
Normal file
@ -0,0 +1,169 @@
|
|||||||
|
package gun
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (s *Scoped) FetchOne(ctx context.Context) *FetchResult {
|
||||||
|
// Try local before remote
|
||||||
|
if r := s.FetchOneLocal(ctx); r.Err != nil || r.ValueExists {
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
return s.FetchOneRemote(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scoped) FetchOneLocal(ctx context.Context) *FetchResult {
|
||||||
|
// If there is no parent, this is just the relation
|
||||||
|
if s.parent == nil {
|
||||||
|
return &FetchResult{Field: s.field, Value: ValueRelation(s.field), ValueExists: true}
|
||||||
|
}
|
||||||
|
r := &FetchResult{Field: s.field}
|
||||||
|
// Need parent soul for lookup
|
||||||
|
var parentSoul string
|
||||||
|
if parentSoul, r.Err = s.parent.Soul(ctx); r.Err == nil {
|
||||||
|
var vs *ValueWithState
|
||||||
|
if vs, r.Err = s.gun.storage.Get(ctx, parentSoul, s.field); r.Err == ErrStorageNotFound {
|
||||||
|
r.Err = nil
|
||||||
|
} else if r.Err == nil {
|
||||||
|
r.Value, r.State, r.ValueExists = vs.Value, vs.State, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scoped) FetchOneRemote(ctx context.Context) *FetchResult {
|
||||||
|
if s.parent == nil {
|
||||||
|
return &FetchResult{Err: ErrLookupOnTopLevel, Field: s.field}
|
||||||
|
}
|
||||||
|
ch := s.FetchRemote(ctx)
|
||||||
|
defer s.FetchDone(ch)
|
||||||
|
return <-ch
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scoped) Fetch(ctx context.Context) <-chan *FetchResult {
|
||||||
|
ch := make(chan *FetchResult, 1)
|
||||||
|
if s.parent == nil {
|
||||||
|
ch <- &FetchResult{Err: ErrLookupOnTopLevel, Field: s.field}
|
||||||
|
close(ch)
|
||||||
|
} else {
|
||||||
|
if r := s.FetchOneLocal(ctx); r.Err != nil || r.ValueExists {
|
||||||
|
ch <- r
|
||||||
|
}
|
||||||
|
go s.fetchRemote(ctx, ch)
|
||||||
|
}
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scoped) FetchRemote(ctx context.Context) <-chan *FetchResult {
|
||||||
|
ch := make(chan *FetchResult, 1)
|
||||||
|
if s.parent == nil {
|
||||||
|
ch <- &FetchResult{Err: ErrLookupOnTopLevel, Field: s.field}
|
||||||
|
close(ch)
|
||||||
|
} else {
|
||||||
|
go s.fetchRemote(ctx, ch)
|
||||||
|
}
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scoped) fetchRemote(ctx context.Context, ch chan *FetchResult) {
|
||||||
|
if s.parent == nil {
|
||||||
|
panic("No parent")
|
||||||
|
}
|
||||||
|
// We have to get the parent soul first
|
||||||
|
parentSoul, err := s.parent.Soul(ctx)
|
||||||
|
if err != nil {
|
||||||
|
ch <- &FetchResult{Err: ErrLookupOnTopLevel, Field: s.field}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Create get request
|
||||||
|
req := &Message{
|
||||||
|
ID: randString(9),
|
||||||
|
Get: &MessageGetRequest{Soul: parentSoul, Field: s.field},
|
||||||
|
}
|
||||||
|
// Make a chan to listen for received messages and link it to
|
||||||
|
// the given one so we can turn it "off". Off will close this
|
||||||
|
// chan.
|
||||||
|
msgCh := make(chan *MessageReceived)
|
||||||
|
s.fetchResultListenersLock.Lock()
|
||||||
|
s.fetchResultListeners[ch] = &fetchResultListener{req.ID, ch, msgCh}
|
||||||
|
s.fetchResultListenersLock.Unlock()
|
||||||
|
// Listen for responses to this get
|
||||||
|
s.gun.RegisterMessageIDListener(req.ID, msgCh)
|
||||||
|
// TODO: only for children: s.gun.RegisterValueIDListener(s.id, msgCh)
|
||||||
|
// Handle received messages turning them to value fetches
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
ch <- &FetchResult{Err: ctx.Err(), Field: s.field}
|
||||||
|
s.FetchDone(ch)
|
||||||
|
return
|
||||||
|
case msg, ok := <-msgCh:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
r := &FetchResult{Field: s.field, peer: msg.peer}
|
||||||
|
// We asked for a single field, should only get that field or it doesn't exist
|
||||||
|
if msg.Err != "" {
|
||||||
|
r.Err = fmt.Errorf("Remote error: %v", msg.Err)
|
||||||
|
} else if n := msg.Put[parentSoul]; n != nil && n.Values[s.field] != nil {
|
||||||
|
r.Value, r.State, r.ValueExists = n.Values[s.field], n.State[s.field], true
|
||||||
|
}
|
||||||
|
// TODO: conflict resolution and defer
|
||||||
|
// TODO: dedupe
|
||||||
|
// TODO: store and cache
|
||||||
|
safeFetchResultSend(ch, r)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
// Send async, sending back errors
|
||||||
|
go func() {
|
||||||
|
for peerErr := range s.gun.Send(ctx, req) {
|
||||||
|
safeFetchResultSend(ch, &FetchResult{
|
||||||
|
Err: peerErr.Err,
|
||||||
|
Field: s.field,
|
||||||
|
peer: peerErr.peer,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scoped) FetchDone(ch <-chan *FetchResult) bool {
|
||||||
|
s.fetchResultListenersLock.Lock()
|
||||||
|
l := s.fetchResultListeners[ch]
|
||||||
|
delete(s.fetchResultListeners, ch)
|
||||||
|
s.fetchResultListenersLock.Unlock()
|
||||||
|
if l != nil {
|
||||||
|
// Unregister the chan
|
||||||
|
s.gun.UnregisterMessageIDListener(l.id)
|
||||||
|
// Close the message chan and the result chan
|
||||||
|
close(l.receivedMessages)
|
||||||
|
close(l.results)
|
||||||
|
}
|
||||||
|
return l != nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func safeFetchResultSend(ch chan<- *FetchResult, r *FetchResult) {
|
||||||
|
// Due to the fact that we may send on a closed channel here, we ignore the panic
|
||||||
|
defer func() { recover() }()
|
||||||
|
ch <- r
|
||||||
|
}
|
||||||
|
|
||||||
|
type fetchResultListener struct {
|
||||||
|
id string
|
||||||
|
results chan *FetchResult
|
||||||
|
receivedMessages chan *MessageReceived
|
||||||
|
}
|
||||||
|
|
||||||
|
type FetchResult struct {
|
||||||
|
// This can be a context error on cancelation
|
||||||
|
Err error
|
||||||
|
Field string
|
||||||
|
// Nil if the value doesn't exist, exists and is nil, or there's an error
|
||||||
|
Value Value
|
||||||
|
State int64 // This can be 0 for errors or top-level value relations
|
||||||
|
ValueExists bool
|
||||||
|
// Nil when local and sometimes on error
|
||||||
|
peer *gunPeer
|
||||||
|
}
|
191
gun/scoped_put.go
Normal file
191
gun/scoped_put.go
Normal file
@ -0,0 +1,191 @@
|
|||||||
|
package gun
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
type putResultListener struct {
|
||||||
|
id string
|
||||||
|
results chan *PutResult
|
||||||
|
receivedMessages chan *MessageReceived
|
||||||
|
}
|
||||||
|
|
||||||
|
type PutResult struct {
|
||||||
|
Err error
|
||||||
|
|
||||||
|
peer *gunPeer
|
||||||
|
}
|
||||||
|
|
||||||
|
type PutOption interface{}
|
||||||
|
|
||||||
|
type putOptionStoreLocalOnly struct{}
|
||||||
|
|
||||||
|
func PutOptionStoreLocalOnly() PutOption { return putOptionStoreLocalOnly{} }
|
||||||
|
|
||||||
|
type putOptionFailWithoutParent struct{}
|
||||||
|
|
||||||
|
func PutOptionFailWithoutParent() PutOption { return putOptionFailWithoutParent{} }
|
||||||
|
|
||||||
|
func (s *Scoped) Put(ctx context.Context, val Value, opts ...PutOption) <-chan *PutResult {
|
||||||
|
// Collect the options
|
||||||
|
storeLocalOnly := false
|
||||||
|
failWithoutParent := false
|
||||||
|
for _, opt := range opts {
|
||||||
|
switch opt.(type) {
|
||||||
|
case putOptionStoreLocalOnly:
|
||||||
|
storeLocalOnly = true
|
||||||
|
case putOptionFailWithoutParent:
|
||||||
|
failWithoutParent = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ch := make(chan *PutResult, 1)
|
||||||
|
// Get all the parents
|
||||||
|
parents := []*Scoped{}
|
||||||
|
for next := s.parent; next != nil; next = next.parent {
|
||||||
|
parents = append([]*Scoped{next}, parents...)
|
||||||
|
}
|
||||||
|
if len(parents) == 0 {
|
||||||
|
ch <- &PutResult{Err: ErrLookupOnTopLevel}
|
||||||
|
close(ch)
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
// Ask for the soul on the last parent. What this will do is trigger
|
||||||
|
// lazy soul fetch up the chain. Then we can go through and find who doesn't have a
|
||||||
|
// cached soul, create one, and store locally.
|
||||||
|
if soul, err := parents[len(parents)-1].Soul(ctx); err != nil {
|
||||||
|
ch <- &PutResult{Err: ErrLookupOnTopLevel}
|
||||||
|
close(ch)
|
||||||
|
return ch
|
||||||
|
} else if soul == "" && failWithoutParent {
|
||||||
|
ch <- &PutResult{Err: fmt.Errorf("Parent not present but required")}
|
||||||
|
close(ch)
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
// Now for every parent that doesn't have a cached soul we create one and
|
||||||
|
// put as part of the message. We accept fetching the cache this way is a bit
|
||||||
|
// racy.
|
||||||
|
req := &Message{
|
||||||
|
ID: randString(9),
|
||||||
|
Put: make(map[string]*Node),
|
||||||
|
}
|
||||||
|
// We know that the first has a soul
|
||||||
|
prevParentSoul := parents[0].cachedSoul()
|
||||||
|
currState := TimeNowUnixMs()
|
||||||
|
for _, parent := range parents[1:] {
|
||||||
|
parentCachedSoul := parent.cachedSoul()
|
||||||
|
if parentCachedSoul == "" {
|
||||||
|
// Create the soul and make it as part of the next put
|
||||||
|
parentCachedSoul = s.gun.soulGen()
|
||||||
|
req.Put[prevParentSoul] = &Node{
|
||||||
|
Metadata: Metadata{
|
||||||
|
Soul: prevParentSoul,
|
||||||
|
State: map[string]int64{parent.field: currState},
|
||||||
|
},
|
||||||
|
Values: map[string]Value{parent.field: ValueRelation(parentCachedSoul)},
|
||||||
|
}
|
||||||
|
// Also store locally and set the cached soul
|
||||||
|
withState := &ValueWithState{ValueRelation(parentCachedSoul), currState}
|
||||||
|
// TODO: Should I not store until the very end just in case it errors halfway
|
||||||
|
// though? There are no standard cases where it should fail.
|
||||||
|
if ok, err := s.gun.storage.Put(ctx, prevParentSoul, parent.field, withState); err != nil {
|
||||||
|
ch <- &PutResult{Err: err}
|
||||||
|
close(ch)
|
||||||
|
return ch
|
||||||
|
} else if !ok {
|
||||||
|
ch <- &PutResult{Err: fmt.Errorf("Unexpected deferred local store")}
|
||||||
|
close(ch)
|
||||||
|
return ch
|
||||||
|
} else if !parent.setCachedSoul(ValueRelation(parentCachedSoul)) {
|
||||||
|
ch <- &PutResult{Err: fmt.Errorf("Concurrent cached soul set")}
|
||||||
|
close(ch)
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
}
|
||||||
|
prevParentSoul = parentCachedSoul
|
||||||
|
}
|
||||||
|
// Now that we've setup all the parents, we can do this store locally.
|
||||||
|
withState := &ValueWithState{val, currState}
|
||||||
|
if ok, err := s.gun.storage.Put(ctx, prevParentSoul, s.field, withState); err != nil {
|
||||||
|
ch <- &PutResult{Err: err}
|
||||||
|
close(ch)
|
||||||
|
return ch
|
||||||
|
} else if !ok {
|
||||||
|
ch <- &PutResult{Err: fmt.Errorf("Unexpected deferred local store")}
|
||||||
|
close(ch)
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
// We need an ack for local store and stop if local only
|
||||||
|
ch <- &PutResult{}
|
||||||
|
if storeLocalOnly {
|
||||||
|
close(ch)
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
// Now, we begin the remote storing
|
||||||
|
req.Put[prevParentSoul] = &Node{
|
||||||
|
Metadata: Metadata{
|
||||||
|
Soul: prevParentSoul,
|
||||||
|
State: map[string]int64{s.field: currState},
|
||||||
|
},
|
||||||
|
Values: map[string]Value{s.field: val},
|
||||||
|
}
|
||||||
|
// Make a msg chan and register it to listen for acks
|
||||||
|
msgCh := make(chan *MessageReceived)
|
||||||
|
s.putResultListenersLock.Lock()
|
||||||
|
s.putResultListeners[ch] = &putResultListener{req.ID, ch, msgCh}
|
||||||
|
s.putResultListenersLock.Unlock()
|
||||||
|
s.gun.RegisterMessageIDListener(req.ID, msgCh)
|
||||||
|
// Start message listener
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
ch <- &PutResult{Err: ctx.Err()}
|
||||||
|
s.PutDone(ch)
|
||||||
|
return
|
||||||
|
case msg, ok := <-msgCh:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
r := &PutResult{peer: msg.peer}
|
||||||
|
if msg.Err != "" {
|
||||||
|
r.Err = fmt.Errorf("Remote error: %v", msg.Err)
|
||||||
|
} else if msg.OK != 1 {
|
||||||
|
r.Err = fmt.Errorf("Unexpected remote ok value of %v", msg.OK)
|
||||||
|
}
|
||||||
|
safePutResultSend(ch, r)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
// Send async, sending back errors
|
||||||
|
go func() {
|
||||||
|
for peerErr := range s.gun.Send(ctx, req) {
|
||||||
|
safePutResultSend(ch, &PutResult{
|
||||||
|
Err: peerErr.Err,
|
||||||
|
peer: peerErr.peer,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scoped) PutDone(ch <-chan *PutResult) bool {
|
||||||
|
s.putResultListenersLock.Lock()
|
||||||
|
l := s.putResultListeners[ch]
|
||||||
|
delete(s.putResultListeners, ch)
|
||||||
|
s.putResultListenersLock.Unlock()
|
||||||
|
if l != nil {
|
||||||
|
// Unregister the chan
|
||||||
|
s.gun.UnregisterMessageIDListener(l.id)
|
||||||
|
// Close the message chan and the result chan
|
||||||
|
close(l.receivedMessages)
|
||||||
|
close(l.results)
|
||||||
|
}
|
||||||
|
return l != nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func safePutResultSend(ch chan<- *PutResult, r *PutResult) {
|
||||||
|
// Due to the fact that we may send on a closed channel here, we ignore the panic
|
||||||
|
defer func() { recover() }()
|
||||||
|
ch <- r
|
||||||
|
}
|
@ -10,6 +10,7 @@ var ErrStorageNotFound = errors.New("Not found")
|
|||||||
|
|
||||||
type Storage interface {
|
type Storage interface {
|
||||||
Get(ctx context.Context, parentSoul, field string) (*ValueWithState, error)
|
Get(ctx context.Context, parentSoul, field string) (*ValueWithState, error)
|
||||||
|
// If bool is false, it's deferred
|
||||||
Put(ctx context.Context, parentSoul, field string, val *ValueWithState) (bool, error)
|
Put(ctx context.Context, parentSoul, field string, val *ValueWithState) (bool, error)
|
||||||
Tracking(ctx context.Context, parentSoul, field string) (bool, error)
|
Tracking(ctx context.Context, parentSoul, field string) (bool, error)
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package tests
|
package tests
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/cretz/esgopeta/gun"
|
"github.com/cretz/esgopeta/gun"
|
||||||
@ -26,13 +27,37 @@ func TestGunGetSimple(t *testing.T) {
|
|||||||
g := ctx.newGunConnectedToGunJS()
|
g := ctx.newGunConnectedToGunJS()
|
||||||
defer g.Close()
|
defer g.Close()
|
||||||
// Make sure we got back the same value
|
// Make sure we got back the same value
|
||||||
f := g.Scoped(ctx, "esgopeta-test", "TestGunGetSimple", "some-key").Val(ctx)
|
r := g.Scoped(ctx, "esgopeta-test", "TestGunGetSimple", "some-key").FetchOne(ctx)
|
||||||
ctx.Require.NoError(f.Err)
|
ctx.Require.NoError(r.Err)
|
||||||
ctx.Require.Equal(gun.ValueString(randStr), f.Value.(gun.ValueString))
|
ctx.Require.Equal(gun.ValueString(randStr), r.Value.(gun.ValueString))
|
||||||
// // Do it again TODO: make sure there are no network calls, it's all from mem
|
// // Do it again TODO: make sure there are no network calls, it's all from mem
|
||||||
// ctx.debugf("Asking for key again")
|
// ctx.debugf("Asking for key again")
|
||||||
// f = g.Scoped(ctx, "esgopeta-test", "TestGunGetSimple", "some-key").Val(ctx)
|
// f = g.Scoped(ctx, "esgopeta-test", "TestGunGetSimple", "some-key").FetchOne(ctx)
|
||||||
// ctx.Require.NoError(f.Err)
|
// ctx.Require.NoError(f.Err)
|
||||||
// ctx.Require.Equal(gun.ValueString(randStr), f.Value.Value.(gun.ValueString))
|
// ctx.Require.Equal(gun.ValueString(randStr), f.Value.(gun.ValueString))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGunPutSimple(t *testing.T) {
|
||||||
|
ctx, cancelFn := newContext(t)
|
||||||
|
defer cancelFn()
|
||||||
|
ctx.startGunJSServer()
|
||||||
|
randStr := randString(30)
|
||||||
|
// Put
|
||||||
|
g := ctx.newGunConnectedToGunJS()
|
||||||
|
defer g.Close()
|
||||||
|
// Just wait for two acks (one local, one remote)
|
||||||
|
ch := g.Scoped(ctx, "esgopeta-test", "TestGunPutSimple", "some-key").Put(ctx, gun.ValueString(randStr))
|
||||||
|
// TODO: test local is null peer and remote is non-null
|
||||||
|
r := <-ch
|
||||||
|
ctx.Require.NoError(r.Err)
|
||||||
|
r = <-ch
|
||||||
|
ctx.Require.NoError(r.Err)
|
||||||
|
// Get from JS
|
||||||
|
out := ctx.runJSWithGun(`
|
||||||
|
gun.get('esgopeta-test').get('TestGunPutSimple').get('some-key').once(data => {
|
||||||
|
console.log(data)
|
||||||
|
process.exit(0)
|
||||||
|
})
|
||||||
|
`)
|
||||||
|
ctx.Require.Equal(randStr, strings.TrimSpace(string(out)))
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user