From 318031999895e6292b44bc0a5a0b817477465e3e Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Fri, 22 Feb 2019 00:46:19 -0600 Subject: [PATCH] More work on API --- gun/gun.go | 143 ++++++++++++++++++++++++++----- gun/node.go | 110 +++++++++++++++++++++--- gun/peer.go | 26 ++++++ gun/scoped.go | 171 ++++++++++++++++++++++++++++++++------ gun/storage.go | 20 +++++ gun/tests/context_test.go | 3 + gun/tests/gun_test.go | 48 +++++++++++ gun/tests/ws_test.go | 38 ++++++++- 8 files changed, 500 insertions(+), 59 deletions(-) create mode 100644 gun/tests/gun_test.go diff --git a/gun/gun.go b/gun/gun.go index e92fd15..75f19f3 100644 --- a/gun/gun.go +++ b/gun/gun.go @@ -3,26 +3,30 @@ package gun import ( "context" "fmt" - "net/url" + "sync" ) type Gun struct { peers []Peer storage Storage - soulGen func() Soul + soulGen func() string + + messageIDPutListeners map[string]chan<- *ReceivedMessage + messageIDPutListenersLock sync.RWMutex } type Config struct { Peers []Peer Storage Storage - SoulGen func() Soul + SoulGen func() string } func New(config Config) *Gun { g := &Gun{ - peers: make([]Peer, len(config.Peers)), - storage: config.Storage, - soulGen: config.SoulGen, + peers: make([]Peer, len(config.Peers)), + storage: config.Storage, + soulGen: config.SoulGen, + messageIDPutListeners: map[string]chan<- *ReceivedMessage{}, } // Copy over peers copy(g.peers, config.Peers) @@ -33,6 +37,8 @@ func New(config Config) *Gun { if g.soulGen == nil { g.soulGen = SoulGenDefault } + // Start receiving + g.startReceiving() return g } @@ -41,17 +47,15 @@ func New(config Config) *Gun { func NewFromPeerURLs(ctx context.Context, peerURLs ...string) (g *Gun, err error) { c := Config{Peers: make([]Peer, len(peerURLs))} for i := 0; i < len(peerURLs) && err == nil; i++ { - if parsedURL, err := url.Parse(peerURLs[i]); err != nil { - err = fmt.Errorf("Failed parsing peer URL %v: %v", peerURLs[i], err) - } else if peerNew := PeerURLSchemes[parsedURL.Scheme]; peerNew == nil { - err = fmt.Errorf("Unknown peer URL scheme for %v", peerURLs[i]) - } else if c.Peers[i], err = peerNew(ctx, parsedURL); err != nil { + if c.Peers[i], err = NewPeer(ctx, peerURLs[i]); err != nil { err = fmt.Errorf("Failed connecting to peer %v: %v", peerURLs[i], err) } } if err != nil { for _, peer := range c.Peers { - peer.Close() + if peer != nil { + peer.Close() + } } return } @@ -59,11 +63,112 @@ func NewFromPeerURLs(ctx context.Context, peerURLs ...string) (g *Gun, err error } type Message struct { - Ack string `json:"@,omitEmpty"` - ID string `json:"#,omitEmpty"` - Sender string `json:"><,omitEmpty"` - Hash string `json:"##,omitempty"` - OK *int `json:"ok,omitempty"` - How string `json:"how,omitempty"` - // TODO: "get", "put", "dam" + Ack string `json:"@,omitEmpty"` + ID string `json:"#,omitEmpty"` + Sender string `json:"><,omitEmpty"` + Hash string `json:"##,omitempty"` + How string `json:"how,omitempty"` + Get *MessageGetRequest `json:"get,omitempty"` + Put map[string]*Node `json:"put,omitempty"` + DAM string `json:"dam,omitempty"` + PID string `json:"pid,omitempty"` +} + +type MessageGetRequest struct { + ID string `json:"#,omitempty"` + Field string `json:".,omitempty"` +} + +type ReceivedMessage struct { + *Message + Peer Peer +} + +type PeerError struct { + Err error + Peer Peer +} + +func (g *Gun) Send(ctx context.Context, msg *Message) <-chan *PeerError { + ch := make(chan *PeerError, len(g.peers)) + // Everything async + go func() { + defer close(ch) + var wg sync.WaitGroup + for _, peer := range g.peers { + wg.Add(1) + go func(peer Peer) { + defer wg.Done() + if err := peer.Send(ctx, msg); err != nil { + ch <- &PeerError{err, peer} + } + }(peer) + } + wg.Wait() + }() + return ch +} + +func (g *Gun) startReceiving() { + for _, peer := range g.peers { + go func(peer Peer) { + for msgOrErr := range peer.Receive() { + // TODO: what to do with error? + if msgOrErr.Err != nil { + g.onPeerReceiveError(&PeerError{msgOrErr.Err, peer}) + continue + } + // See if a listener is around to handle it instead of rebroadcasting + msg := &ReceivedMessage{Message: msgOrErr.Message, Peer: peer} + if msg.Ack != "" && len(msg.Put) > 0 { + g.messageIDPutListenersLock.RLock() + l := g.messageIDPutListeners[msg.Ack] + g.messageIDPutListenersLock.RUnlock() + if l != nil { + safeReceivedMessageSend(l, msg) + continue + } + } + g.onUnhandledMessage(msg) + } + }(peer) + } +} + +func (g *Gun) onUnhandledMessage(msg *ReceivedMessage) { + +} + +func (g *Gun) onPeerReceiveError(err *PeerError) { + +} + +func (g *Gun) RegisterMessageIDPutListener(id string, ch chan<- *ReceivedMessage) { + g.messageIDPutListenersLock.Lock() + defer g.messageIDPutListenersLock.Unlock() + g.messageIDPutListeners[id] = ch +} + +func (g *Gun) UnregisterMessageIDPutListener(id string) { + g.messageIDPutListenersLock.Lock() + defer g.messageIDPutListenersLock.Unlock() + delete(g.messageIDPutListeners, id) +} + +// func (g *Gun) RegisterValueIDPutListener(id string, ch chan<- *ReceivedMessage) { +// panic("TODO") +// } + +func (g *Gun) Scoped(ctx context.Context, key string, children ...string) *Scoped { + s := newScoped(g, "", key) + if len(children) > 0 { + s = s.Scoped(ctx, children[0], children[1:]...) + } + return s +} + +func safeReceivedMessageSend(ch chan<- *ReceivedMessage, msg *ReceivedMessage) { + // Due to the fact that we may send on a closed channel here, we ignore the panic + defer func() { recover() }() + ch <- msg } diff --git a/gun/node.go b/gun/node.go index a414035..7f548ac 100644 --- a/gun/node.go +++ b/gun/node.go @@ -1,32 +1,116 @@ package gun -import "strconv" +import ( + "bytes" + "encoding/json" + "fmt" + "strconv" +) -var SoulGenDefault = func() Soul { +var SoulGenDefault = func() string { ms, uniqueNum := TimeNowUniqueUnix() s := strconv.FormatInt(ms, 36) if uniqueNum > 0 { s += strconv.FormatInt(uniqueNum, 36) } - return Soul(s + randString(12)) + return s + randString(12) } type Node struct { NodeMetadata - Values map[string]NodeValue + Values map[string]Value +} + +func (n *Node) MarshalJSON() ([]byte, error) { + // Just put it all in a map and then encode it + toEnc := make(map[string]interface{}, len(n.Values)+1) + toEnc["_"] = &n.NodeMetadata + for k, v := range n.Values { + toEnc[k] = v + } + return json.Marshal(toEnc) +} + +func (n *Node) UnmarshalJSON(b []byte) error { + dec := json.NewDecoder(bytes.NewReader(b)) + dec.UseNumber() + // We'll just go from start brace to end brace + if t, err := dec.Token(); err != nil { + return err + } else if t != json.Delim('{') { + return fmt.Errorf("Unexpected token %v", t) + } + n.Values = map[string]Value{} + for { + if key, err := dec.Token(); err != nil { + return err + } else if key == json.Delim('}') { + return nil + } else if keyStr, ok := key.(string); !ok { + return fmt.Errorf("Unrecognized token %v", key) + } else if keyStr == "_" { + if err = dec.Decode(&n.NodeMetadata); err != nil { + return fmt.Errorf("Failed unmarshaling metadata: %v", err) + } + } else if val, err := dec.Token(); err != nil { + return err + } else if n.Values[keyStr], err = DecodeJSONValue(val, dec); err != nil { + return err + } + } } type NodeMetadata struct { - Soul Soul - HAMState map[string]uint64 + ID string `json:"#"` + State map[string]int64 `json:">"` } -type Soul string - -type NodeValue interface { +type Value interface { } -type NodeString string -type NodeNumber string -type NodeBool bool -type NodeRelation Soul +func DecodeJSONValue(token json.Token, dec *json.Decoder) (Value, error) { + switch token := token.(type) { + case nil: + return nil, nil + case json.Number: + return ValueNumber(token), nil + case string: + return ValueString(token), nil + case bool: + return ValueBool(token), nil + case json.Delim: + if token != json.Delim('{') { + return nil, fmt.Errorf("Unrecognized token %v", token) + } else if relKey, err := dec.Token(); err != nil { + return nil, err + } else if relKey != "#" { + return nil, fmt.Errorf("Unrecognized token %v", relKey) + } else if relVal, err := dec.Token(); err != nil { + return nil, err + } else if relValStr, ok := relVal.(string); !ok { + return nil, fmt.Errorf("Unrecognized token %v", relVal) + } else if endTok, err := dec.Token(); err != nil { + return nil, err + } else if endTok != json.Delim('}') { + return nil, fmt.Errorf("Unrecognized token %v", endTok) + } else { + return ValueRelation(relValStr), nil + } + default: + return nil, fmt.Errorf("Unrecognized token %v", token) + } +} + +type ValueString string +type ValueNumber string +type ValueBool bool +type ValueRelation string + +func (n ValueRelation) MarshalJSON() ([]byte, error) { + return json.Marshal(map[string]string{"#": string(n)}) +} + +type StatefulValue struct { + Value Value + State int64 +} diff --git a/gun/peer.go b/gun/peer.go index 2a1ddf2..9e4c27b 100644 --- a/gun/peer.go +++ b/gun/peer.go @@ -2,19 +2,37 @@ package gun import ( "context" + "fmt" "net/url" "github.com/gorilla/websocket" ) type Peer interface { + Send(ctx context.Context, msg *Message) error + Receive() <-chan *MessageOrError Close() error } +type MessageOrError struct { + Message *Message + Err error +} + var PeerURLSchemes = map[string]func(context.Context, *url.URL) (Peer, error){ "ws": func(ctx context.Context, peerUrl *url.URL) (Peer, error) { return NewPeerWebSocket(ctx, peerUrl) }, } +func NewPeer(ctx context.Context, peerURL string) (Peer, error) { + if parsedURL, err := url.Parse(peerURL); err != nil { + return nil, err + } else if peerNew := PeerURLSchemes[parsedURL.Scheme]; peerNew == nil { + return nil, fmt.Errorf("Unknown peer URL scheme %v", parsedURL.Scheme) + } else { + return peerNew(ctx, parsedURL) + } +} + type PeerWebSocket struct { *websocket.Conn } @@ -26,3 +44,11 @@ func NewPeerWebSocket(ctx context.Context, peerUrl *url.URL) (*PeerWebSocket, er } return &PeerWebSocket{conn}, nil } + +func (p *PeerWebSocket) Send(ctx context.Context, msg *Message) error { + panic("TODO") +} + +func (p *PeerWebSocket) Receive() <-chan *MessageOrError { + panic("TODO") +} diff --git a/gun/scoped.go b/gun/scoped.go index d1e8189..198597c 100644 --- a/gun/scoped.go +++ b/gun/scoped.go @@ -1,36 +1,41 @@ package gun -// type Scoped interface { -// Path() []string -// // Shortcut for last Path() entry or empty string -// Key() string -// Scoped(...string) Scoped -// Up(count int) Scoped -// // Shortcut for Up(1) -// Parent() Scoped -// // Shortcut for Up(-1) -// Root() Scoped - -// Val(context.Context) *ValueFetch -// Watch(context.Context) <-chan *ValueFetch -// WatchChildren(context.Context) <-chan *ValueFetch -// Put(context.Context, Value) <-chan *Ack -// Add(context.Context, Value) <-chan *Ack -// } +import ( + "context" + "sync" +) type Scoped struct { - gun *Gun - path []string + gun *Gun + parentID string + field string + + valueChansToListeners map[<-chan *ValueFetch]*messageIDListener + valueChansToListenersLock sync.Mutex +} + +type messageIDListener struct { + id string + values chan *ValueFetch + receivedMessages chan *ReceivedMessage +} + +func newScoped(gun *Gun, parentID string, field string) *Scoped { + return &Scoped{ + gun: gun, + parentID: parentID, + field: field, + } } type ValueFetch struct { + // This can be a context error on cancelation Err error - Key string - Value Value - Peer Peer -} - -type Value interface { + Field string + // Nil if there is an error + Value *StatefulValue + // Nil when local and sometimes on error + Peer Peer } type Ack struct { @@ -38,3 +43,119 @@ type Ack struct { Ok bool Peer Peer } + +func (s *Scoped) Val(ctx context.Context) *ValueFetch { + // Try local before remote + if v := s.ValLocal(ctx); v != nil { + return v + } + return s.ValRemote(ctx) +} + +func (s *Scoped) ValLocal(ctx context.Context) *ValueFetch { + var v ValueFetch + if v.Value, v.Err = s.gun.storage.Get(ctx, s.parentID, s.field); v.Err == ErrStorageNotFound { + return nil + } + return &v +} + +func (s *Scoped) ValRemote(ctx context.Context) *ValueFetch { + ch := s.OnRemote(ctx) + defer s.Off(ch) + return <-ch +} + +func (s *Scoped) On(ctx context.Context) <-chan *ValueFetch { + ch := make(chan *ValueFetch, 1) + if v := s.ValLocal(ctx); v != nil { + ch <- v + } + go s.onRemote(ctx, ch) + return ch +} + +func (s *Scoped) OnRemote(ctx context.Context) <-chan *ValueFetch { + ch := make(chan *ValueFetch, 1) + go s.onRemote(ctx, ch) + return ch +} + +func (s *Scoped) onRemote(ctx context.Context, ch chan *ValueFetch) { + // Create get request + req := &Message{ + ID: randString(9), + Get: &MessageGetRequest{ID: s.parentID, Field: s.field}, + } + // Make a chan to listen for received messages and link it to + // the given one so we can turn it "off". Off will close this + // chan. + msgCh := make(chan *ReceivedMessage) + s.valueChansToListenersLock.Lock() + s.valueChansToListeners[ch] = &messageIDListener{req.ID, ch, msgCh} + s.valueChansToListenersLock.Unlock() + // Listen for responses to this get + s.gun.RegisterMessageIDPutListener(req.ID, msgCh) + // TODO: only for children: s.gun.RegisterValueIDPutListener(s.id, msgCh) + // Handle received messages turning them to value fetches + go func() { + for { + select { + case <-ctx.Done(): + ch <- &ValueFetch{Err: ctx.Err(), Field: s.field} + s.Off(ch) + return + case msg, ok := <-msgCh: + if !ok { + return + } + // We asked for a single field, should only get that field + if n := msg.Put[s.parentID]; n != nil && n.Values[s.field] != nil { + // TODO: conflict resolution + // TODO: dedupe + // TODO: store and cache + safeValueFetchSend(ch, &ValueFetch{ + Field: s.field, + Value: &StatefulValue{n.Values[s.field], n.State[s.field]}, + Peer: msg.Peer, + }) + } + } + } + }() + // Send async, sending back errors + go func() { + for peerErr := range s.gun.Send(ctx, req) { + safeValueFetchSend(ch, &ValueFetch{ + Err: peerErr.Err, + Field: s.field, + Peer: peerErr.Peer, + }) + } + }() +} + +func (s *Scoped) Off(ch <-chan *ValueFetch) bool { + s.valueChansToListenersLock.Lock() + l := s.valueChansToListeners[ch] + delete(s.valueChansToListeners, ch) + s.valueChansToListenersLock.Unlock() + if l != nil { + // Unregister the chan + s.gun.UnregisterMessageIDPutListener(l.id) + // Close the message chan and the value chan + close(l.receivedMessages) + close(l.values) + } + return l != nil +} + +func (s *Scoped) Scoped(ctx context.Context, key string, children ...string) *Scoped { + panic("TODO") +} + +func safeValueFetchSend(ch chan<- *ValueFetch, f *ValueFetch) { + // Due to the fact that we may send on a closed channel here, we ignore the panic + defer func() { recover() }() + ch <- f +} diff --git a/gun/storage.go b/gun/storage.go index 0d4ee7a..e4c87e1 100644 --- a/gun/storage.go +++ b/gun/storage.go @@ -1,7 +1,27 @@ package gun +import ( + "context" + "errors" + "sync" +) + +var ErrStorageNotFound = errors.New("Not found") + type Storage interface { + Get(ctx context.Context, parentID, field string) (*StatefulValue, error) + Put(ctx context.Context, parentID, field string, val *StatefulValue) (bool, error) + // Tracking(ctx context.Context, id string) (bool, error) } type StorageInMem struct { + values sync.Map +} + +func (s *StorageInMem) Get(ctx context.Context, parentID, field string) (*StatefulValue, error) { + panic("TODO") +} + +func (s *StorageInMem) Put(ctx context.Context, parentID, field string, val *StatefulValue) (bool, error) { + panic("TODO") } diff --git a/gun/tests/context_test.go b/gun/tests/context_test.go index e982a21..a588982 100644 --- a/gun/tests/context_test.go +++ b/gun/tests/context_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "log" + "os" "os/exec" "path/filepath" "runtime" @@ -62,6 +63,8 @@ func (t *testContext) startJS(script string) (*bytes.Buffer, *exec.Cmd, context. } func (t *testContext) startGunServer(port int) { + // Remove entire data folder first + t.Require.NoError(os.RemoveAll("rodata-server")) t.startJS(` var Gun = require('gun') const server = require('http').createServer().listen(` + strconv.Itoa(port) + `) diff --git a/gun/tests/gun_test.go b/gun/tests/gun_test.go new file mode 100644 index 0000000..70c9aae --- /dev/null +++ b/gun/tests/gun_test.go @@ -0,0 +1,48 @@ +package tests + +import ( + "testing" + + "github.com/cretz/esgopeta/gun" +) + +func TestGunGo(t *testing.T) { + // Run the server, put in one call, get in another, then check + ctx, cancelFn := newContext(t) + defer cancelFn() + ctx.startGunServer(8080) + ctx.startGunWebSocketProxyLogger(8081, 8080) + randStr := randString(30) + ctx.runJS(` + var Gun = require('gun') + const gun = Gun({ + peers: ['http://127.0.0.1:8081/gun'], + radisk: false + }) + gun.get('esgopeta-test').get('TestGunJS').get('some-key').put('` + randStr + `', ack => { + if (ack.err) { + console.error(ack.err) + process.exit(1) + } + process.exit(0) + }) + `) + // out := ctx.runJS(` + // var Gun = require('gun') + // const gun = Gun({ + // peers: ['http://127.0.0.1:8081/gun'], + // radisk: false + // }) + // gun.get('esgopeta-test').get('TestGunJS').get('some-key').once(data => { + // console.log(data) + // process.exit(0) + // }) + // `) + // ctx.Require.Equal(randStr, strings.TrimSpace(string(out))) + + g, err := gun.NewFromPeerURLs(ctx, "http://127.0.0.1:8081/gun") + ctx.Require.NoError(err) + f := g.Scoped("esgopeta-test", "TestGunJS", "some-key").Val(ctx) + ctx.Require.NoError(f.Err) + +} diff --git a/gun/tests/ws_test.go b/gun/tests/ws_test.go index 42d9d71..67615df 100644 --- a/gun/tests/ws_test.go +++ b/gun/tests/ws_test.go @@ -1,9 +1,11 @@ package tests import ( + "encoding/json" "log" "net/http" "strconv" + "testing" "time" "github.com/gorilla/websocket" @@ -19,17 +21,49 @@ func (t *testContext) startGunWebSocketProxyLogger(listenPort, targetPort int) { if !ok { return } - t.debugf("From gun: %v", string(msg)) + if testing.Verbose() { + for _, s := range t.formattedGunJSONs(msg) { + t.debugf("From gun: %v", s) + } + } case msg, ok := <-toGun: if !ok { return } - t.debugf("To gun: %v", string(msg)) + if testing.Verbose() { + for _, s := range t.formattedGunJSONs(msg) { + t.debugf("To gun: %v", s) + } + } } } }() } +func (t *testContext) formattedGunJSONs(msg []byte) []string { + var objs []interface{} + if msg[0] == '[' { + arr := []string{} + t.Require.NoError(json.Unmarshal(msg, &arr)) + for _, v := range arr { + var obj interface{} + t.Require.NoError(json.Unmarshal([]byte(v), &obj)) + objs = append(objs, obj) + } + } else { + var obj interface{} + t.Require.NoError(json.Unmarshal(msg, &obj)) + objs = append(objs, obj) + } + ret := make([]string, len(objs)) + for i, obj := range objs { + b, err := json.MarshalIndent(obj, "", " ") + t.Require.NoError(err) + ret[i] = string(b) + } + return ret +} + func (t *testContext) startGunWebSocketProxy(listenPort, targetPort int) (fromTarget <-chan []byte, toTarget <-chan []byte) { fromTargetCh := make(chan []byte) toTargetCh := make(chan []byte)