diff --git a/gun/gun.go b/gun/gun.go index f2cad9e..01d0c72 100644 --- a/gun/gun.go +++ b/gun/gun.go @@ -4,35 +4,61 @@ import ( "context" "fmt" "sync" + "time" ) type Gun struct { - peers []Peer + peers []*gunPeer storage Storage soulGen func() string peerErrorHandler func(*ErrPeer) + peerSleepOnError time.Duration messageIDPutListeners map[string]chan<- *MessageReceived messageIDPutListenersLock sync.RWMutex } type Config struct { - Peers []Peer + PeerURLs []string Storage Storage SoulGen func() string PeerErrorHandler func(*ErrPeer) + PeerSleepOnError time.Duration } -func New(config Config) *Gun { +const DefaultPeerSleepOnError = 30 * time.Second + +func New(ctx context.Context, config Config) (*Gun, error) { g := &Gun{ - peers: make([]Peer, len(config.Peers)), + peers: make([]*gunPeer, len(config.PeerURLs)), storage: config.Storage, soulGen: config.SoulGen, peerErrorHandler: config.PeerErrorHandler, + peerSleepOnError: config.PeerSleepOnError, messageIDPutListeners: map[string]chan<- *MessageReceived{}, } - // Copy over peers - copy(g.peers, config.Peers) + // Create all the peers + sleepOnError := config.PeerSleepOnError + if sleepOnError == 0 { + sleepOnError = DefaultPeerSleepOnError + } + var err error + for i := 0; i < len(config.PeerURLs) && err == nil; i++ { + peerURL := config.PeerURLs[i] + connPeer := func() (Peer, error) { return NewPeer(ctx, peerURL) } + if g.peers[i], err = newGunPeer(connPeer, sleepOnError); err != nil { + err = fmt.Errorf("Failed connecting to peer %v: %v", peerURL, err) + } + } + // If there was an error, we need to close what we did create + if err != nil { + for _, peer := range g.peers { + if peer != nil { + peer.Close() + } + } + return nil, err + } // Set defaults if g.storage == nil { g.storage = &StorageInMem{} @@ -42,27 +68,7 @@ func New(config Config) *Gun { } // Start receiving g.startReceiving() - return g -} - -// To note: Fails on even one peer failure (otherwise, do this yourself). May connect to -// some peers temporarily until first failure, but closes them all on failure -func NewFromPeerURLs(ctx context.Context, peerURLs ...string) (g *Gun, err error) { - c := Config{Peers: make([]Peer, len(peerURLs))} - for i := 0; i < len(peerURLs) && err == nil; i++ { - if c.Peers[i], err = NewPeer(ctx, peerURLs[i]); err != nil { - err = fmt.Errorf("Failed connecting to peer %v: %v", peerURLs[i], err) - } - } - if err != nil { - for _, peer := range c.Peers { - if peer != nil { - peer.Close() - } - } - return - } - return New(c), nil + return g, nil } func (g *Gun) Close() error { @@ -85,7 +91,7 @@ func (g *Gun) Send(ctx context.Context, msg *Message) <-chan *ErrPeer { return g.send(ctx, msg, nil) } -func (g *Gun) send(ctx context.Context, msg *Message, ignorePeer Peer) <-chan *ErrPeer { +func (g *Gun) send(ctx context.Context, msg *Message, ignorePeer *gunPeer) <-chan *ErrPeer { ch := make(chan *ErrPeer, len(g.peers)) // Everything async go func() { @@ -96,9 +102,10 @@ func (g *Gun) send(ctx context.Context, msg *Message, ignorePeer Peer) <-chan *E continue } wg.Add(1) - go func(peer Peer) { + go func(peer *gunPeer) { defer wg.Done() - if err := peer.Send(ctx, msg); err != nil { + // Just do nothing if the peer is bad and we couldn't send + if _, err := peer.send(ctx, msg); err != nil { peerErr := &ErrPeer{err, peer} go g.onPeerError(peerErr) ch <- peerErr @@ -112,24 +119,35 @@ func (g *Gun) send(ctx context.Context, msg *Message, ignorePeer Peer) <-chan *E func (g *Gun) startReceiving() { for _, peer := range g.peers { - go func(peer Peer) { - for msgOrErr := range peer.Receive() { - if msgOrErr.Err != nil { - go g.onPeerError(&ErrPeer{msgOrErr.Err, peer}) - continue - } - // See if a listener is around to handle it instead of rebroadcasting - msg := &MessageReceived{Message: msgOrErr.Message, Peer: peer} - if msg.Ack != "" && len(msg.Put) > 0 { - g.messageIDPutListenersLock.RLock() - l := g.messageIDPutListeners[msg.Ack] - g.messageIDPutListenersLock.RUnlock() - if l != nil { - go safeReceivedMessageSend(l, msg) - continue + go func(peer *gunPeer) { + // TDO: some kind of overall context is probably needed + ctx, cancelFn := context.WithCancel(context.TODO()) + defer cancelFn() + for { + // We might not be able receive because peer is sleeping from + // an error happened within or a just-before send error. + if ok, msgs, err := peer.receive(ctx); !ok { + if err != nil { + go g.onPeerError(&ErrPeer{err, peer}) + } + // Always sleep at least the err duration + time.Sleep(g.peerSleepOnError) + } else { + // Go over each message and see if it needs delivering or rebroadcasting + for _, msg := range msgs { + rMsg := &MessageReceived{Message: msg, peer: peer} + if msg.Ack != "" && len(msg.Put) > 0 { + g.messageIDPutListenersLock.RLock() + l := g.messageIDPutListeners[msg.Ack] + g.messageIDPutListenersLock.RUnlock() + if l != nil { + go safeReceivedMessageSend(l, rMsg) + continue + } + } + go g.onUnhandledMessage(rMsg) } } - go g.onUnhandledMessage(msg) } }(peer) } @@ -138,7 +156,7 @@ func (g *Gun) startReceiving() { func (g *Gun) onUnhandledMessage(msg *MessageReceived) { // Unhandled message means rebroadcast // TODO: we need a timeout or global context here... - g.send(context.TODO(), msg.Message, msg.Peer) + g.send(context.TODO(), msg.Message, msg.peer) } func (g *Gun) onPeerError(err *ErrPeer) { diff --git a/gun/gun_peer.go b/gun/gun_peer.go new file mode 100644 index 0000000..ec0920d --- /dev/null +++ b/gun/gun_peer.go @@ -0,0 +1,91 @@ +package gun + +import ( + "context" + "sync" + "time" +) + +type gunPeer struct { + connPeer func() (Peer, error) + sleepOnErr time.Duration // TODO: would be better as backoff + + peer Peer + peerBad bool // If true, don't try anything + peerLock sync.Mutex +} + +func newGunPeer(connPeer func() (Peer, error), sleepOnErr time.Duration) (*gunPeer, error) { + p := &gunPeer{connPeer: connPeer, sleepOnErr: sleepOnErr} + var err error + if p.peer, err = connPeer(); err != nil { + return nil, err + } + return p, nil +} + +func (g *gunPeer) ID() string { + panic("TODO") +} + +func (g *gunPeer) reconnectPeer() (err error) { + g.peerLock.Lock() + defer g.peerLock.Unlock() + if g.peer == nil && g.peerBad { + g.peerBad = false + if g.peer, err = g.connPeer(); err != nil { + g.peerBad = true + time.AfterFunc(g.sleepOnErr, func() { g.reconnectPeer() }) + } + } + return +} + +// Can be nil peer if currently bad +func (g *gunPeer) connectedPeer() Peer { + g.peerLock.Lock() + defer g.peerLock.Unlock() + return g.peer +} + +func (g *gunPeer) markPeerErrored(p Peer) { + g.peerLock.Lock() + defer g.peerLock.Unlock() + if p == g.peer { + g.peer = nil + g.peerBad = true + p.Close() + time.AfterFunc(g.sleepOnErr, func() { g.reconnectPeer() }) + } +} + +func (g *gunPeer) send(ctx context.Context, msg *Message, moreMsgs ...*Message) (ok bool, err error) { + if p := g.connectedPeer(); p == nil { + return false, nil + } else if err = p.Send(ctx, msg, moreMsgs...); err != nil { + g.markPeerErrored(p) + return false, err + } else { + return true, nil + } +} + +func (g *gunPeer) receive(ctx context.Context) (ok bool, msgs []*Message, err error) { + if p := g.connectedPeer(); p == nil { + return false, nil, nil + } else if msgs, err = p.Receive(ctx); err != nil { + g.markPeerErrored(p) + return false, nil, err + } else { + return true, msgs, nil + } +} + +func (g *gunPeer) Close() error { + g.peerLock.Lock() + defer g.peerLock.Unlock() + err := g.peer.Close() + g.peer = nil + g.peerBad = false + return err +} diff --git a/gun/message.go b/gun/message.go index f9624f7..3dfbb3b 100644 --- a/gun/message.go +++ b/gun/message.go @@ -19,5 +19,5 @@ type MessageGetRequest struct { type MessageReceived struct { *Message - Peer Peer + peer *gunPeer } diff --git a/gun/peer.go b/gun/peer.go index 5704b56..68a5f76 100644 --- a/gun/peer.go +++ b/gun/peer.go @@ -2,6 +2,7 @@ package gun import ( "context" + "encoding/json" "fmt" "net/url" @@ -10,22 +11,18 @@ import ( type ErrPeer struct { Err error - Peer Peer + peer *gunPeer } -func (e *ErrPeer) Error() string { return fmt.Sprintf("Error on peer %v: %v", e.Peer, e.Err) } +func (e *ErrPeer) Error() string { return fmt.Sprintf("Error on peer %v: %v", e.peer, e.Err) } type Peer interface { - Send(ctx context.Context, msg *Message) error - Receive() <-chan *MessageOrError + Send(ctx context.Context, msg *Message, moreMsgs ...*Message) error + // Chan is closed on first err, when context is closed, or when peer is closed + Receive(ctx context.Context) ([]*Message, error) Close() error } -type MessageOrError struct { - Message *Message - Err error -} - var PeerURLSchemes = map[string]func(context.Context, *url.URL) (Peer, error){ "ws": func(ctx context.Context, peerUrl *url.URL) (Peer, error) { return NewPeerWebSocket(ctx, peerUrl) }, } @@ -41,7 +38,7 @@ func NewPeer(ctx context.Context, peerURL string) (Peer, error) { } type PeerWebSocket struct { - *websocket.Conn + Underlying *websocket.Conn } func NewPeerWebSocket(ctx context.Context, peerUrl *url.URL) (*PeerWebSocket, error) { @@ -52,10 +49,74 @@ func NewPeerWebSocket(ctx context.Context, peerUrl *url.URL) (*PeerWebSocket, er return &PeerWebSocket{conn}, nil } -func (p *PeerWebSocket) Send(ctx context.Context, msg *Message) error { - panic("TODO") +func (p *PeerWebSocket) Send(ctx context.Context, msg *Message, moreMsgs ...*Message) error { + // If there are more, send all as an array of JSON strings, otherwise just the msg + var toWrite interface{} + if len(moreMsgs) == 0 { + toWrite = msg + } else { + b, err := json.Marshal(msg) + if err != nil { + return err + } + msgs := []string{string(b)} + for _, nextMsg := range moreMsgs { + if b, err = json.Marshal(nextMsg); err != nil { + return err + } + msgs = append(msgs, string(b)) + } + toWrite = msgs + } + // Send async so we can wait on context + errCh := make(chan error, 1) + go func() { errCh <- p.Underlying.WriteJSON(toWrite) }() + select { + case err := <-errCh: + return err + case <-ctx.Done(): + return ctx.Err() + } } -func (p *PeerWebSocket) Receive() <-chan *MessageOrError { - panic("TODO") +func (p *PeerWebSocket) Receive(ctx context.Context) ([]*Message, error) { + bytsCh := make(chan []byte, 1) + errCh := make(chan error, 1) + go func() { + if _, b, err := p.Underlying.ReadMessage(); err != nil { + errCh <- err + } else { + bytsCh <- b + } + }() + select { + case err := <-errCh: + return nil, err + case <-ctx.Done(): + return nil, ctx.Err() + case byts := <-bytsCh: + // If it's a JSON array, it means it's an array of JSON strings, otherwise it's one message + if byts[0] != '[' { + var msg Message + if err := json.Unmarshal(byts, &msg); err != nil { + return nil, err + } + return []*Message{&msg}, nil + } + var jsonStrs []string + if err := json.Unmarshal(byts, &jsonStrs); err != nil { + return nil, err + } + msgs := make([]*Message, len(jsonStrs)) + for i, jsonStr := range jsonStrs { + if err := json.Unmarshal([]byte(jsonStr), &(msgs[i])); err != nil { + return nil, err + } + } + return msgs, nil + } +} + +func (p *PeerWebSocket) Close() error { + return p.Underlying.Close() } diff --git a/gun/scoped.go b/gun/scoped.go index 9d12814..f58a66a 100644 --- a/gun/scoped.go +++ b/gun/scoped.go @@ -39,7 +39,7 @@ type ValueFetch struct { // Nil if the value doesn't exist or there's an error Value *ValueWithState // Nil when local and sometimes on error - Peer Peer + peer *gunPeer } var ErrNotObject = errors.New("Scoped value not an object") @@ -159,7 +159,7 @@ func (s *Scoped) onRemote(ctx context.Context, ch chan *ValueFetch) { if !ok { return } - f := &ValueFetch{Field: s.field, Peer: msg.Peer} + f := &ValueFetch{Field: s.field, peer: msg.peer} // We asked for a single field, should only get that field or it doesn't exist if n := msg.Put[parentSoul]; n != nil && n.Values[s.field] != nil { f.Value = &ValueWithState{n.Values[s.field], n.State[s.field]} @@ -177,7 +177,7 @@ func (s *Scoped) onRemote(ctx context.Context, ch chan *ValueFetch) { safeValueFetchSend(ch, &ValueFetch{ Err: peerErr.Err, Field: s.field, - Peer: peerErr.Peer, + peer: peerErr.peer, }) } }() diff --git a/gun/tests/context_test.go b/gun/tests/context_test.go index 55f36c0..b51e196 100644 --- a/gun/tests/context_test.go +++ b/gun/tests/context_test.go @@ -11,25 +11,30 @@ import ( "strconv" "testing" + "github.com/cretz/esgopeta/gun" "github.com/stretchr/testify/require" ) type testContext struct { context.Context *testing.T - Require *require.Assertions + Require *require.Assertions + GunJSPort int } func newContext(t *testing.T) (*testContext, context.CancelFunc) { return withTestContext(context.Background(), t) } +const defaultGunJSPort = 8080 + func withTestContext(ctx context.Context, t *testing.T) (*testContext, context.CancelFunc) { ctx, cancelFn := context.WithCancel(ctx) return &testContext{ - Context: ctx, - T: t, - Require: require.New(t), + Context: ctx, + T: t, + Require: require.New(t), + GunJSPort: defaultGunJSPort, }, cancelFn } @@ -50,6 +55,16 @@ func (t *testContext) runJS(script string) []byte { return out } +func (t *testContext) runJSWithGun(script string) []byte { + return t.runJS(` + var Gun = require('gun') + const gun = Gun({ + peers: ['http://127.0.0.1:` + strconv.Itoa(t.GunJSPort) + `/gun'], + radisk: false + }) + ` + script) +} + func (t *testContext) startJS(script string) (*bytes.Buffer, *exec.Cmd, context.CancelFunc) { cmdCtx, cancelFn := context.WithCancel(t) cmd := exec.CommandContext(cmdCtx, "node") @@ -62,7 +77,13 @@ func (t *testContext) startJS(script string) (*bytes.Buffer, *exec.Cmd, context. return &buf, cmd, cancelFn } -func (t *testContext) startGunServer(port int) { +func (t *testContext) startGunJSServer() { + // If we're logging, use a proxy + port := t.GunJSPort + if testing.Verbose() { + t.startGunWebSocketProxyLogger(port, port+1) + port++ + } // Remove entire data folder first t.Require.NoError(os.RemoveAll("radata-server")) t.startJS(` @@ -71,3 +92,9 @@ func (t *testContext) startGunServer(port int) { const gun = Gun({web: server, file: 'radata-server'}) `) } + +func (t *testContext) newGunConnectedToGunJS() *gun.Gun { + g, err := gun.NewFromPeerURLs(t, "http://127.0.0.1:"+strconv.Itoa(t.GunJSPort)+"/gun") + t.Require.NoError(err) + return g +} diff --git a/gun/tests/gun_test.go b/gun/tests/gun_test.go index 09fe0dd..5672714 100644 --- a/gun/tests/gun_test.go +++ b/gun/tests/gun_test.go @@ -1,20 +1,18 @@ package tests -/* -func TestGunGo(t *testing.T) { +import ( + "testing" +) + +func TestGunGetSimple(t *testing.T) { // Run the server, put in one call, get in another, then check ctx, cancelFn := newContext(t) defer cancelFn() - ctx.startGunServer(8080) - ctx.startGunWebSocketProxyLogger(8081, 8080) + ctx.startGunJSServer() randStr := randString(30) - ctx.runJS(` - var Gun = require('gun') - const gun = Gun({ - peers: ['http://127.0.0.1:8081/gun'], - radisk: false - }) - gun.get('esgopeta-test').get('TestGunJS').get('some-key').put('` + randStr + `', ack => { + // Write w/ JS + ctx.runJSWithGun(` + gun.get('esgopeta-test').get('TestGunGetSimple').get('some-key').put('` + randStr + `', ack => { if (ack.err) { console.error(ack.err) process.exit(1) @@ -22,23 +20,9 @@ func TestGunGo(t *testing.T) { process.exit(0) }) `) - // out := ctx.runJS(` - // var Gun = require('gun') - // const gun = Gun({ - // peers: ['http://127.0.0.1:8081/gun'], - // radisk: false - // }) - // gun.get('esgopeta-test').get('TestGunJS').get('some-key').once(data => { - // console.log(data) - // process.exit(0) - // }) - // `) - // ctx.Require.Equal(randStr, strings.TrimSpace(string(out))) - - g, err := gun.NewFromPeerURLs(ctx, "http://127.0.0.1:8081/gun") - ctx.Require.NoError(err) - f := g.Scoped("esgopeta-test", "TestGunJS", "some-key").Val(ctx) + // Get + g := ctx.newGunConnectedToGunJS() + f := g.Scoped(ctx, "esgopeta-test", "TestGunGet", "some-key").Val(ctx) ctx.Require.NoError(f.Err) } -*/ diff --git a/gun/tests/js_test.go b/gun/tests/js_test.go index e993828..93789b2 100644 --- a/gun/tests/js_test.go +++ b/gun/tests/js_test.go @@ -15,15 +15,9 @@ func TestGunJS(t *testing.T) { // Run the server, put in one call, get in another, then check ctx, cancelFn := newContext(t) defer cancelFn() - ctx.startGunServer(8080) - ctx.startGunWebSocketProxyLogger(8081, 8080) + ctx.startGunJSServer() randStr := randString(30) - ctx.runJS(` - var Gun = require('gun') - const gun = Gun({ - peers: ['http://127.0.0.1:8081/gun'], - radisk: false - }) + ctx.runJSWithGun(` gun.get('esgopeta-test').get('TestGunJS').get('some-key').put('` + randStr + `', ack => { if (ack.err) { console.error(ack.err) @@ -32,12 +26,7 @@ func TestGunJS(t *testing.T) { process.exit(0) }) `) - out := ctx.runJS(` - var Gun = require('gun') - const gun = Gun({ - peers: ['http://127.0.0.1:8081/gun'], - radisk: false - }) + out := ctx.runJSWithGun(` gun.get('esgopeta-test').get('TestGunJS').get('some-key').once(data => { console.log(data) process.exit(0)