More work with scoping

This commit is contained in:
Chad Retz 2019-02-22 03:23:14 -06:00
parent 3180319998
commit 22189e051d
9 changed files with 184 additions and 99 deletions

View File

@ -7,18 +7,20 @@ import (
) )
type Gun struct { type Gun struct {
peers []Peer peers []Peer
storage Storage storage Storage
soulGen func() string soulGen func() string
peerErrorHandler func(*ErrPeer)
messageIDPutListeners map[string]chan<- *ReceivedMessage messageIDPutListeners map[string]chan<- *MessageReceived
messageIDPutListenersLock sync.RWMutex messageIDPutListenersLock sync.RWMutex
} }
type Config struct { type Config struct {
Peers []Peer Peers []Peer
Storage Storage Storage Storage
SoulGen func() string SoulGen func() string
PeerErrorHandler func(*ErrPeer)
} }
func New(config Config) *Gun { func New(config Config) *Gun {
@ -26,7 +28,8 @@ func New(config Config) *Gun {
peers: make([]Peer, len(config.Peers)), peers: make([]Peer, len(config.Peers)),
storage: config.Storage, storage: config.Storage,
soulGen: config.SoulGen, soulGen: config.SoulGen,
messageIDPutListeners: map[string]chan<- *ReceivedMessage{}, peerErrorHandler: config.PeerErrorHandler,
messageIDPutListeners: map[string]chan<- *MessageReceived{},
} }
// Copy over peers // Copy over peers
copy(g.peers, config.Peers) copy(g.peers, config.Peers)
@ -62,45 +65,43 @@ func NewFromPeerURLs(ctx context.Context, peerURLs ...string) (g *Gun, err error
return New(c), nil return New(c), nil
} }
type Message struct { func (g *Gun) Close() error {
Ack string `json:"@,omitEmpty"` var errs []error
ID string `json:"#,omitEmpty"` for _, p := range g.peers {
Sender string `json:"><,omitEmpty"` if err := p.Close(); err != nil {
Hash string `json:"##,omitempty"` errs = append(errs, err)
How string `json:"how,omitempty"` }
Get *MessageGetRequest `json:"get,omitempty"` }
Put map[string]*Node `json:"put,omitempty"` if len(errs) == 0 {
DAM string `json:"dam,omitempty"` return nil
PID string `json:"pid,omitempty"` } else if len(errs) == 1 {
return errs[0]
} else {
return fmt.Errorf("Multiple errors: %v", errs)
}
} }
type MessageGetRequest struct { func (g *Gun) Send(ctx context.Context, msg *Message) <-chan *ErrPeer {
ID string `json:"#,omitempty"` return g.send(ctx, msg, nil)
Field string `json:".,omitempty"`
} }
type ReceivedMessage struct { func (g *Gun) send(ctx context.Context, msg *Message, ignorePeer Peer) <-chan *ErrPeer {
*Message ch := make(chan *ErrPeer, len(g.peers))
Peer Peer
}
type PeerError struct {
Err error
Peer Peer
}
func (g *Gun) Send(ctx context.Context, msg *Message) <-chan *PeerError {
ch := make(chan *PeerError, len(g.peers))
// Everything async // Everything async
go func() { go func() {
defer close(ch) defer close(ch)
var wg sync.WaitGroup var wg sync.WaitGroup
for _, peer := range g.peers { for _, peer := range g.peers {
if peer == ignorePeer {
continue
}
wg.Add(1) wg.Add(1)
go func(peer Peer) { go func(peer Peer) {
defer wg.Done() defer wg.Done()
if err := peer.Send(ctx, msg); err != nil { if err := peer.Send(ctx, msg); err != nil {
ch <- &PeerError{err, peer} peerErr := &ErrPeer{err, peer}
go g.onPeerError(peerErr)
ch <- peerErr
} }
}(peer) }(peer)
} }
@ -113,37 +114,40 @@ func (g *Gun) startReceiving() {
for _, peer := range g.peers { for _, peer := range g.peers {
go func(peer Peer) { go func(peer Peer) {
for msgOrErr := range peer.Receive() { for msgOrErr := range peer.Receive() {
// TODO: what to do with error?
if msgOrErr.Err != nil { if msgOrErr.Err != nil {
g.onPeerReceiveError(&PeerError{msgOrErr.Err, peer}) go g.onPeerError(&ErrPeer{msgOrErr.Err, peer})
continue continue
} }
// See if a listener is around to handle it instead of rebroadcasting // See if a listener is around to handle it instead of rebroadcasting
msg := &ReceivedMessage{Message: msgOrErr.Message, Peer: peer} msg := &MessageReceived{Message: msgOrErr.Message, Peer: peer}
if msg.Ack != "" && len(msg.Put) > 0 { if msg.Ack != "" && len(msg.Put) > 0 {
g.messageIDPutListenersLock.RLock() g.messageIDPutListenersLock.RLock()
l := g.messageIDPutListeners[msg.Ack] l := g.messageIDPutListeners[msg.Ack]
g.messageIDPutListenersLock.RUnlock() g.messageIDPutListenersLock.RUnlock()
if l != nil { if l != nil {
safeReceivedMessageSend(l, msg) go safeReceivedMessageSend(l, msg)
continue continue
} }
} }
g.onUnhandledMessage(msg) go g.onUnhandledMessage(msg)
} }
}(peer) }(peer)
} }
} }
func (g *Gun) onUnhandledMessage(msg *ReceivedMessage) { func (g *Gun) onUnhandledMessage(msg *MessageReceived) {
// Unhandled message means rebroadcast
// TODO: we need a timeout or global context here...
g.send(context.TODO(), msg.Message, msg.Peer)
} }
func (g *Gun) onPeerReceiveError(err *PeerError) { func (g *Gun) onPeerError(err *ErrPeer) {
if g.peerErrorHandler != nil {
g.peerErrorHandler(err)
}
} }
func (g *Gun) RegisterMessageIDPutListener(id string, ch chan<- *ReceivedMessage) { func (g *Gun) RegisterMessageIDPutListener(id string, ch chan<- *MessageReceived) {
g.messageIDPutListenersLock.Lock() g.messageIDPutListenersLock.Lock()
defer g.messageIDPutListenersLock.Unlock() defer g.messageIDPutListenersLock.Unlock()
g.messageIDPutListeners[id] = ch g.messageIDPutListeners[id] = ch
@ -160,14 +164,14 @@ func (g *Gun) UnregisterMessageIDPutListener(id string) {
// } // }
func (g *Gun) Scoped(ctx context.Context, key string, children ...string) *Scoped { func (g *Gun) Scoped(ctx context.Context, key string, children ...string) *Scoped {
s := newScoped(g, "", key) s := newScoped(g, nil, key)
if len(children) > 0 { if len(children) > 0 {
s = s.Scoped(ctx, children[0], children[1:]...) s = s.Scoped(ctx, children[0], children[1:]...)
} }
return s return s
} }
func safeReceivedMessageSend(ch chan<- *ReceivedMessage, msg *ReceivedMessage) { func safeReceivedMessageSend(ch chan<- *MessageReceived, msg *MessageReceived) {
// Due to the fact that we may send on a closed channel here, we ignore the panic // Due to the fact that we may send on a closed channel here, we ignore the panic
defer func() { recover() }() defer func() { recover() }()
ch <- msg ch <- msg

23
gun/message.go Normal file
View File

@ -0,0 +1,23 @@
package gun
type Message struct {
Ack string `json:"@,omitEmpty"`
ID string `json:"#,omitEmpty"`
To string `json:"><,omitEmpty"`
Hash string `json:"##,omitempty"`
How string `json:"how,omitempty"`
Get *MessageGetRequest `json:"get,omitempty"`
Put map[string]*Node `json:"put,omitempty"`
DAM string `json:"dam,omitempty"`
PID string `json:"pid,omitempty"`
}
type MessageGetRequest struct {
Soul string `json:"#,omitempty"`
Field string `json:".,omitempty"`
}
type MessageReceived struct {
*Message
Peer Peer
}

View File

@ -17,14 +17,14 @@ var SoulGenDefault = func() string {
} }
type Node struct { type Node struct {
NodeMetadata Metadata
Values map[string]Value Values map[string]Value
} }
func (n *Node) MarshalJSON() ([]byte, error) { func (n *Node) MarshalJSON() ([]byte, error) {
// Just put it all in a map and then encode it // Just put it all in a map and then encode it
toEnc := make(map[string]interface{}, len(n.Values)+1) toEnc := make(map[string]interface{}, len(n.Values)+1)
toEnc["_"] = &n.NodeMetadata toEnc["_"] = &n.Metadata
for k, v := range n.Values { for k, v := range n.Values {
toEnc[k] = v toEnc[k] = v
} }
@ -49,7 +49,7 @@ func (n *Node) UnmarshalJSON(b []byte) error {
} else if keyStr, ok := key.(string); !ok { } else if keyStr, ok := key.(string); !ok {
return fmt.Errorf("Unrecognized token %v", key) return fmt.Errorf("Unrecognized token %v", key)
} else if keyStr == "_" { } else if keyStr == "_" {
if err = dec.Decode(&n.NodeMetadata); err != nil { if err = dec.Decode(&n.Metadata); err != nil {
return fmt.Errorf("Failed unmarshaling metadata: %v", err) return fmt.Errorf("Failed unmarshaling metadata: %v", err)
} }
} else if val, err := dec.Token(); err != nil { } else if val, err := dec.Token(); err != nil {
@ -60,8 +60,8 @@ func (n *Node) UnmarshalJSON(b []byte) error {
} }
} }
type NodeMetadata struct { type Metadata struct {
ID string `json:"#"` Soul string `json:"#"`
State map[string]int64 `json:">"` State map[string]int64 `json:">"`
} }
@ -110,7 +110,8 @@ func (n ValueRelation) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]string{"#": string(n)}) return json.Marshal(map[string]string{"#": string(n)})
} }
type StatefulValue struct { type ValueWithState struct {
Value Value Value Value
// This is 0 for top-level values
State int64 State int64
} }

View File

@ -8,6 +8,13 @@ import (
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
type ErrPeer struct {
Err error
Peer Peer
}
func (e *ErrPeer) Error() string { return fmt.Sprintf("Error on peer %v: %v", e.Peer, e.Err) }
type Peer interface { type Peer interface {
Send(ctx context.Context, msg *Message) error Send(ctx context.Context, msg *Message) error
Receive() <-chan *MessageOrError Receive() <-chan *MessageOrError

View File

@ -2,13 +2,17 @@ package gun
import ( import (
"context" "context"
"errors"
"sync" "sync"
) )
type Scoped struct { type Scoped struct {
gun *Gun gun *Gun
parentID string
field string parent *Scoped
field string
cachedParentSoul string
cachedParentSoulLock sync.RWMutex
valueChansToListeners map[<-chan *ValueFetch]*messageIDListener valueChansToListeners map[<-chan *ValueFetch]*messageIDListener
valueChansToListenersLock sync.Mutex valueChansToListenersLock sync.Mutex
@ -17,14 +21,14 @@ type Scoped struct {
type messageIDListener struct { type messageIDListener struct {
id string id string
values chan *ValueFetch values chan *ValueFetch
receivedMessages chan *ReceivedMessage receivedMessages chan *MessageReceived
} }
func newScoped(gun *Gun, parentID string, field string) *Scoped { func newScoped(gun *Gun, parent *Scoped, field string) *Scoped {
return &Scoped{ return &Scoped{
gun: gun, gun: gun,
parentID: parentID, parent: parent,
field: field, field: field,
} }
} }
@ -32,16 +36,34 @@ type ValueFetch struct {
// This can be a context error on cancelation // This can be a context error on cancelation
Err error Err error
Field string Field string
// Nil if there is an error // Nil if the value doesn't exist or there's an error
Value *StatefulValue Value *ValueWithState
// Nil when local and sometimes on error // Nil when local and sometimes on error
Peer Peer Peer Peer
} }
type Ack struct { var ErrNotObject = errors.New("Scoped value not an object")
Err error var ErrLookupOnTopLevel = errors.New("Cannot do lookup on top level")
Ok bool
Peer Peer // Empty string if doesn't exist, ErrNotObject if self or parent not an object
func (s *Scoped) Soul(ctx context.Context) (string, error) {
s.cachedParentSoulLock.RLock()
cachedParentSoul := s.cachedParentSoul
s.cachedParentSoulLock.RUnlock()
if cachedParentSoul != "" {
return cachedParentSoul, nil
} else if v := s.Val(ctx); v.Err != nil {
return "", v.Err
} else if v.Value == nil {
return "", nil
} else if rel, ok := v.Value.Value.(ValueRelation); !ok {
return "", ErrNotObject
} else {
s.cachedParentSoulLock.Lock()
s.cachedParentSoul = string(rel)
s.cachedParentSoulLock.Unlock()
return string(rel), nil
}
} }
func (s *Scoped) Val(ctx context.Context) *ValueFetch { func (s *Scoped) Val(ctx context.Context) *ValueFetch {
@ -53,14 +75,25 @@ func (s *Scoped) Val(ctx context.Context) *ValueFetch {
} }
func (s *Scoped) ValLocal(ctx context.Context) *ValueFetch { func (s *Scoped) ValLocal(ctx context.Context) *ValueFetch {
var v ValueFetch // If there is no parent, this is just the relation
if v.Value, v.Err = s.gun.storage.Get(ctx, s.parentID, s.field); v.Err == ErrStorageNotFound { if s.parent == nil {
return nil return &ValueFetch{Field: s.field, Value: &ValueWithState{Value: ValueRelation(s.field)}}
} }
return &v v := &ValueFetch{Field: s.field}
// Need parent soul for lookup
var parentSoul string
if parentSoul, v.Err = s.parent.Soul(ctx); v.Err == nil {
if v.Value, v.Err = s.gun.storage.Get(ctx, parentSoul, s.field); v.Err == ErrStorageNotFound {
return nil
}
}
return v
} }
func (s *Scoped) ValRemote(ctx context.Context) *ValueFetch { func (s *Scoped) ValRemote(ctx context.Context) *ValueFetch {
if s.parent == nil {
return &ValueFetch{Err: ErrLookupOnTopLevel, Field: s.field}
}
ch := s.OnRemote(ctx) ch := s.OnRemote(ctx)
defer s.Off(ch) defer s.Off(ch)
return <-ch return <-ch
@ -68,29 +101,46 @@ func (s *Scoped) ValRemote(ctx context.Context) *ValueFetch {
func (s *Scoped) On(ctx context.Context) <-chan *ValueFetch { func (s *Scoped) On(ctx context.Context) <-chan *ValueFetch {
ch := make(chan *ValueFetch, 1) ch := make(chan *ValueFetch, 1)
if v := s.ValLocal(ctx); v != nil { if s.parent == nil {
ch <- v ch <- &ValueFetch{Err: ErrLookupOnTopLevel, Field: s.field}
} else {
if v := s.ValLocal(ctx); v != nil {
ch <- v
}
go s.onRemote(ctx, ch)
} }
go s.onRemote(ctx, ch)
return ch return ch
} }
func (s *Scoped) OnRemote(ctx context.Context) <-chan *ValueFetch { func (s *Scoped) OnRemote(ctx context.Context) <-chan *ValueFetch {
ch := make(chan *ValueFetch, 1) ch := make(chan *ValueFetch, 1)
go s.onRemote(ctx, ch) if s.parent == nil {
ch <- &ValueFetch{Err: ErrLookupOnTopLevel, Field: s.field}
} else {
go s.onRemote(ctx, ch)
}
return ch return ch
} }
func (s *Scoped) onRemote(ctx context.Context, ch chan *ValueFetch) { func (s *Scoped) onRemote(ctx context.Context, ch chan *ValueFetch) {
if s.parent == nil {
panic("No parent")
}
// We have to get the parent soul first
parentSoul, err := s.parent.Soul(ctx)
if err != nil {
ch <- &ValueFetch{Err: ErrLookupOnTopLevel, Field: s.field}
return
}
// Create get request // Create get request
req := &Message{ req := &Message{
ID: randString(9), ID: randString(9),
Get: &MessageGetRequest{ID: s.parentID, Field: s.field}, Get: &MessageGetRequest{Soul: parentSoul, Field: s.field},
} }
// Make a chan to listen for received messages and link it to // Make a chan to listen for received messages and link it to
// the given one so we can turn it "off". Off will close this // the given one so we can turn it "off". Off will close this
// chan. // chan.
msgCh := make(chan *ReceivedMessage) msgCh := make(chan *MessageReceived)
s.valueChansToListenersLock.Lock() s.valueChansToListenersLock.Lock()
s.valueChansToListeners[ch] = &messageIDListener{req.ID, ch, msgCh} s.valueChansToListeners[ch] = &messageIDListener{req.ID, ch, msgCh}
s.valueChansToListenersLock.Unlock() s.valueChansToListenersLock.Unlock()
@ -109,17 +159,15 @@ func (s *Scoped) onRemote(ctx context.Context, ch chan *ValueFetch) {
if !ok { if !ok {
return return
} }
// We asked for a single field, should only get that field f := &ValueFetch{Field: s.field, Peer: msg.Peer}
if n := msg.Put[s.parentID]; n != nil && n.Values[s.field] != nil { // We asked for a single field, should only get that field or it doesn't exist
// TODO: conflict resolution if n := msg.Put[parentSoul]; n != nil && n.Values[s.field] != nil {
// TODO: dedupe f.Value = &ValueWithState{n.Values[s.field], n.State[s.field]}
// TODO: store and cache
safeValueFetchSend(ch, &ValueFetch{
Field: s.field,
Value: &StatefulValue{n.Values[s.field], n.State[s.field]},
Peer: msg.Peer,
})
} }
// TODO: conflict resolution and defer
// TODO: dedupe
// TODO: store and cache
safeValueFetchSend(ch, f)
} }
} }
}() }()
@ -151,7 +199,11 @@ func (s *Scoped) Off(ch <-chan *ValueFetch) bool {
} }
func (s *Scoped) Scoped(ctx context.Context, key string, children ...string) *Scoped { func (s *Scoped) Scoped(ctx context.Context, key string, children ...string) *Scoped {
panic("TODO") ret := newScoped(s.gun, s, key)
for _, child := range children {
ret = newScoped(s.gun, ret, child)
}
return ret
} }
func safeValueFetchSend(ch chan<- *ValueFetch, f *ValueFetch) { func safeValueFetchSend(ch chan<- *ValueFetch, f *ValueFetch) {

View File

@ -9,8 +9,8 @@ import (
var ErrStorageNotFound = errors.New("Not found") var ErrStorageNotFound = errors.New("Not found")
type Storage interface { type Storage interface {
Get(ctx context.Context, parentID, field string) (*StatefulValue, error) Get(ctx context.Context, parentSoul, field string) (*ValueWithState, error)
Put(ctx context.Context, parentID, field string, val *StatefulValue) (bool, error) Put(ctx context.Context, parentSoul, field string, val *ValueWithState) (bool, error)
// Tracking(ctx context.Context, id string) (bool, error) // Tracking(ctx context.Context, id string) (bool, error)
} }
@ -18,10 +18,10 @@ type StorageInMem struct {
values sync.Map values sync.Map
} }
func (s *StorageInMem) Get(ctx context.Context, parentID, field string) (*StatefulValue, error) { func (s *StorageInMem) Get(ctx context.Context, parentSoul, field string) (*ValueWithState, error) {
panic("TODO") panic("TODO")
} }
func (s *StorageInMem) Put(ctx context.Context, parentID, field string, val *StatefulValue) (bool, error) { func (s *StorageInMem) Put(ctx context.Context, parentSoul, field string, val *ValueWithState) (bool, error) {
panic("TODO") panic("TODO")
} }

View File

@ -64,7 +64,7 @@ func (t *testContext) startJS(script string) (*bytes.Buffer, *exec.Cmd, context.
func (t *testContext) startGunServer(port int) { func (t *testContext) startGunServer(port int) {
// Remove entire data folder first // Remove entire data folder first
t.Require.NoError(os.RemoveAll("rodata-server")) t.Require.NoError(os.RemoveAll("radata-server"))
t.startJS(` t.startJS(`
var Gun = require('gun') var Gun = require('gun')
const server = require('http').createServer().listen(` + strconv.Itoa(port) + `) const server = require('http').createServer().listen(` + strconv.Itoa(port) + `)

View File

@ -1,11 +1,6 @@
package tests package tests
import ( /*
"testing"
"github.com/cretz/esgopeta/gun"
)
func TestGunGo(t *testing.T) { func TestGunGo(t *testing.T) {
// Run the server, put in one call, get in another, then check // Run the server, put in one call, get in another, then check
ctx, cancelFn := newContext(t) ctx, cancelFn := newContext(t)
@ -46,3 +41,4 @@ func TestGunGo(t *testing.T) {
ctx.Require.NoError(f.Err) ctx.Require.NoError(f.Err)
} }
*/

View File

@ -1,6 +1,8 @@
package gun package gun
import "crypto/rand" import (
"crypto/rand"
)
const randChars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" const randChars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"