diff --git a/gun/LICENSE b/gun/LICENSE new file mode 100644 index 0000000..78f8f2e --- /dev/null +++ b/gun/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2019 Chad Retz + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/gun/gun.go b/gun/gun.go index 01d0c72..a368179 100644 --- a/gun/gun.go +++ b/gun/gun.go @@ -13,6 +13,7 @@ type Gun struct { soulGen func() string peerErrorHandler func(*ErrPeer) peerSleepOnError time.Duration + myPeerID string messageIDPutListeners map[string]chan<- *MessageReceived messageIDPutListenersLock sync.RWMutex @@ -24,6 +25,7 @@ type Config struct { SoulGen func() string PeerErrorHandler func(*ErrPeer) PeerSleepOnError time.Duration + MyPeerID string } const DefaultPeerSleepOnError = 30 * time.Second @@ -35,6 +37,7 @@ func New(ctx context.Context, config Config) (*Gun, error) { soulGen: config.SoulGen, peerErrorHandler: config.PeerErrorHandler, peerSleepOnError: config.PeerSleepOnError, + myPeerID: config.MyPeerID, messageIDPutListeners: map[string]chan<- *MessageReceived{}, } // Create all the peers @@ -46,7 +49,7 @@ func New(ctx context.Context, config Config) (*Gun, error) { for i := 0; i < len(config.PeerURLs) && err == nil; i++ { peerURL := config.PeerURLs[i] connPeer := func() (Peer, error) { return NewPeer(ctx, peerURL) } - if g.peers[i], err = newGunPeer(connPeer, sleepOnError); err != nil { + if g.peers[i], err = newGunPeer(peerURL, connPeer, sleepOnError); err != nil { err = fmt.Errorf("Failed connecting to peer %v: %v", peerURL, err) } } @@ -66,6 +69,9 @@ func New(ctx context.Context, config Config) (*Gun, error) { if g.soulGen == nil { g.soulGen = SoulGenDefault } + if g.myPeerID == "" { + g.myPeerID = randString(9) + } // Start receiving g.startReceiving() return g, nil @@ -123,7 +129,7 @@ func (g *Gun) startReceiving() { // TDO: some kind of overall context is probably needed ctx, cancelFn := context.WithCancel(context.TODO()) defer cancelFn() - for { + for !peer.closed() { // We might not be able receive because peer is sleeping from // an error happened within or a just-before send error. if ok, msgs, err := peer.receive(ctx); !ok { @@ -135,17 +141,7 @@ func (g *Gun) startReceiving() { } else { // Go over each message and see if it needs delivering or rebroadcasting for _, msg := range msgs { - rMsg := &MessageReceived{Message: msg, peer: peer} - if msg.Ack != "" && len(msg.Put) > 0 { - g.messageIDPutListenersLock.RLock() - l := g.messageIDPutListeners[msg.Ack] - g.messageIDPutListenersLock.RUnlock() - if l != nil { - go safeReceivedMessageSend(l, rMsg) - continue - } - } - go g.onUnhandledMessage(rMsg) + g.onPeerMessage(ctx, &MessageReceived{Message: msg, peer: peer}) } } } @@ -153,10 +149,33 @@ func (g *Gun) startReceiving() { } } -func (g *Gun) onUnhandledMessage(msg *MessageReceived) { +func (g *Gun) onPeerMessage(ctx context.Context, msg *MessageReceived) { + // If there is a listener for this message, use it + if msg.Ack != "" && len(msg.Put) > 0 { + g.messageIDPutListenersLock.RLock() + l := g.messageIDPutListeners[msg.Ack] + g.messageIDPutListenersLock.RUnlock() + if l != nil { + go safeReceivedMessageSend(l, msg) + return + } + } + // DAM messages are either requests for our ID or setting of theirs + if msg.DAM != "" { + if msg.PID == "" { + // This is a request, set the PID and send it back + msg.PID = g.myPeerID + if _, err := msg.peer.send(ctx, msg.Message); err != nil { + go g.onPeerError(&ErrPeer{err, msg.peer}) + } + } else { + // This is them telling us theirs + msg.peer.id = msg.PID + } + return + } // Unhandled message means rebroadcast - // TODO: we need a timeout or global context here... - g.send(context.TODO(), msg.Message, msg.peer) + g.send(ctx, msg.Message, msg.peer) } func (g *Gun) onPeerError(err *ErrPeer) { diff --git a/gun/gun_peer.go b/gun/gun_peer.go index ec0920d..0dfca1b 100644 --- a/gun/gun_peer.go +++ b/gun/gun_peer.go @@ -7,16 +7,18 @@ import ( ) type gunPeer struct { + url string connPeer func() (Peer, error) sleepOnErr time.Duration // TODO: would be better as backoff + id string peer Peer peerBad bool // If true, don't try anything peerLock sync.Mutex } -func newGunPeer(connPeer func() (Peer, error), sleepOnErr time.Duration) (*gunPeer, error) { - p := &gunPeer{connPeer: connPeer, sleepOnErr: sleepOnErr} +func newGunPeer(url string, connPeer func() (Peer, error), sleepOnErr time.Duration) (*gunPeer, error) { + p := &gunPeer{url: url, connPeer: connPeer, sleepOnErr: sleepOnErr} var err error if p.peer, err = connPeer(); err != nil { return nil, err @@ -24,9 +26,7 @@ func newGunPeer(connPeer func() (Peer, error), sleepOnErr time.Duration) (*gunPe return p, nil } -func (g *gunPeer) ID() string { - panic("TODO") -} +func (g *gunPeer) ID() string { return g.id } func (g *gunPeer) reconnectPeer() (err error) { g.peerLock.Lock() @@ -60,9 +60,20 @@ func (g *gunPeer) markPeerErrored(p Peer) { } func (g *gunPeer) send(ctx context.Context, msg *Message, moreMsgs ...*Message) (ok bool, err error) { - if p := g.connectedPeer(); p == nil { + p := g.connectedPeer() + if p == nil { return false, nil - } else if err = p.Send(ctx, msg, moreMsgs...); err != nil { + } + // Clone them with peer "to" + updatedMsg := msg.Clone() + updatedMsg.To = g.url + updatedMoreMsgs := make([]*Message, len(moreMsgs)) + for i, moreMsg := range moreMsgs { + moreMsg := moreMsg.Clone() + moreMsg.To = g.url + updatedMoreMsgs[i] = moreMsg + } + if err = p.Send(ctx, updatedMsg, updatedMoreMsgs...); err != nil { g.markPeerErrored(p) return false, err } else { @@ -89,3 +100,9 @@ func (g *gunPeer) Close() error { g.peerBad = false return err } + +func (g *gunPeer) closed() bool { + g.peerLock.Lock() + defer g.peerLock.Unlock() + return g.peer == nil && !g.peerBad +} diff --git a/gun/message.go b/gun/message.go index 3dfbb3b..83cf56c 100644 --- a/gun/message.go +++ b/gun/message.go @@ -1,10 +1,12 @@ package gun +import "encoding/json" + type Message struct { - Ack string `json:"@,omitEmpty"` - ID string `json:"#,omitEmpty"` - To string `json:"><,omitEmpty"` - Hash string `json:"##,omitempty"` + Ack string `json:"@,omitempty"` + ID string `json:"#,omitempty"` + To string `json:"><,omitempty"` + Hash json.Number `json:"##,omitempty"` How string `json:"how,omitempty"` Get *MessageGetRequest `json:"get,omitempty"` Put map[string]*Node `json:"put,omitempty"` @@ -12,6 +14,12 @@ type Message struct { PID string `json:"pid,omitempty"` } +func (m *Message) Clone() *Message { + msg := &Message{} + *msg = *m + return msg +} + type MessageGetRequest struct { Soul string `json:"#,omitempty"` Field string `json:".,omitempty"` diff --git a/gun/node.go b/gun/node.go index 9e275e1..19b1e17 100644 --- a/gun/node.go +++ b/gun/node.go @@ -61,8 +61,8 @@ func (n *Node) UnmarshalJSON(b []byte) error { } type Metadata struct { - Soul string `json:"#"` - State map[string]int64 `json:">"` + Soul string `json:"#,omitempty"` + State map[string]int64 `json:">,omitempty"` } type Value interface { diff --git a/gun/peer.go b/gun/peer.go index 68a5f76..fb11b23 100644 --- a/gun/peer.go +++ b/gun/peer.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "net/url" + "sync" "github.com/gorilla/websocket" ) @@ -24,7 +25,15 @@ type Peer interface { } var PeerURLSchemes = map[string]func(context.Context, *url.URL) (Peer, error){ - "ws": func(ctx context.Context, peerUrl *url.URL) (Peer, error) { return NewPeerWebSocket(ctx, peerUrl) }, + "http": func(ctx context.Context, peerURL *url.URL) (Peer, error) { + schemeChangedURL := &url.URL{} + *schemeChangedURL = *peerURL + schemeChangedURL.Scheme = "ws" + return NewPeerWebSocket(ctx, schemeChangedURL) + }, + "ws": func(ctx context.Context, peerURL *url.URL) (Peer, error) { + return NewPeerWebSocket(ctx, peerURL) + }, } func NewPeer(ctx context.Context, peerURL string) (Peer, error) { @@ -39,6 +48,7 @@ func NewPeer(ctx context.Context, peerURL string) (Peer, error) { type PeerWebSocket struct { Underlying *websocket.Conn + WriteLock sync.Mutex } func NewPeerWebSocket(ctx context.Context, peerUrl *url.URL) (*PeerWebSocket, error) { @@ -46,7 +56,7 @@ func NewPeerWebSocket(ctx context.Context, peerUrl *url.URL) (*PeerWebSocket, er if err != nil { return nil, err } - return &PeerWebSocket{conn}, nil + return &PeerWebSocket{Underlying: conn}, nil } func (p *PeerWebSocket) Send(ctx context.Context, msg *Message, moreMsgs ...*Message) error { @@ -70,7 +80,11 @@ func (p *PeerWebSocket) Send(ctx context.Context, msg *Message, moreMsgs ...*Mes } // Send async so we can wait on context errCh := make(chan error, 1) - go func() { errCh <- p.Underlying.WriteJSON(toWrite) }() + go func() { + p.WriteLock.Lock() + defer p.WriteLock.Unlock() + errCh <- p.Underlying.WriteJSON(toWrite) + }() select { case err := <-errCh: return err diff --git a/gun/scoped.go b/gun/scoped.go index f58a66a..0479a3f 100644 --- a/gun/scoped.go +++ b/gun/scoped.go @@ -26,9 +26,10 @@ type messageIDListener struct { func newScoped(gun *Gun, parent *Scoped, field string) *Scoped { return &Scoped{ - gun: gun, - parent: parent, - field: field, + gun: gun, + parent: parent, + field: field, + valueChansToListeners: map[<-chan *ValueFetch]*messageIDListener{}, } } diff --git a/gun/storage.go b/gun/storage.go index d730def..c47375c 100644 --- a/gun/storage.go +++ b/gun/storage.go @@ -18,10 +18,18 @@ type StorageInMem struct { values sync.Map } +type parentSoulAndField struct{ parentSoul, field string } + func (s *StorageInMem) Get(ctx context.Context, parentSoul, field string) (*ValueWithState, error) { - panic("TODO") + v, ok := s.values.Load(parentSoulAndField{parentSoul, field}) + if !ok { + return nil, ErrStorageNotFound + } + return v.(*ValueWithState), nil } func (s *StorageInMem) Put(ctx context.Context, parentSoul, field string, val *ValueWithState) (bool, error) { - panic("TODO") + s.values.Store(parentSoulAndField{parentSoul, field}, val) + // TODO: conflict resolution state check? + return true, nil } diff --git a/gun/tests/context_test.go b/gun/tests/context_test.go index b51e196..3aa6f61 100644 --- a/gun/tests/context_test.go +++ b/gun/tests/context_test.go @@ -94,7 +94,13 @@ func (t *testContext) startGunJSServer() { } func (t *testContext) newGunConnectedToGunJS() *gun.Gun { - g, err := gun.NewFromPeerURLs(t, "http://127.0.0.1:"+strconv.Itoa(t.GunJSPort)+"/gun") + config := gun.Config{ + PeerURLs: []string{"http://127.0.0.1:" + strconv.Itoa(t.GunJSPort) + "/gun"}, + PeerErrorHandler: func(errPeer *gun.ErrPeer) { + t.debugf("Got peer error: %v", errPeer) + }, + } + g, err := gun.New(t, config) t.Require.NoError(err) return g } diff --git a/gun/tests/gun_test.go b/gun/tests/gun_test.go index 5672714..0d2d897 100644 --- a/gun/tests/gun_test.go +++ b/gun/tests/gun_test.go @@ -2,6 +2,8 @@ package tests import ( "testing" + + "github.com/cretz/esgopeta/gun" ) func TestGunGetSimple(t *testing.T) { @@ -22,7 +24,9 @@ func TestGunGetSimple(t *testing.T) { `) // Get g := ctx.newGunConnectedToGunJS() - f := g.Scoped(ctx, "esgopeta-test", "TestGunGet", "some-key").Val(ctx) + f := g.Scoped(ctx, "esgopeta-test", "TestGunGetSimple", "some-key").Val(ctx) ctx.Require.NoError(f.Err) + // Make sure we got back the same value + ctx.Require.Equal(gun.ValueString(randStr), f.Value.Value.(gun.ValueString)) } diff --git a/gun/tests/ws_test.go b/gun/tests/ws_test.go index 67615df..7a113be 100644 --- a/gun/tests/ws_test.go +++ b/gun/tests/ws_test.go @@ -22,6 +22,7 @@ func (t *testContext) startGunWebSocketProxyLogger(listenPort, targetPort int) { return } if testing.Verbose() { + t.debugf("From gun raw: %v", string(msg)) for _, s := range t.formattedGunJSONs(msg) { t.debugf("From gun: %v", s) } @@ -31,8 +32,13 @@ func (t *testContext) startGunWebSocketProxyLogger(listenPort, targetPort int) { return } if testing.Verbose() { - for _, s := range t.formattedGunJSONs(msg) { - t.debugf("To gun: %v", s) + t.debugf("To gun raw: %v", string(msg)) + if len(msg) == 0 { + t.debugf("To gun: empty message") + } else { + for _, s := range t.formattedGunJSONs(msg) { + t.debugf("To gun: %v", s) + } } } }