From 6271b5627b9e22c0953ea353d2dfe45d955aaed7 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Tue, 26 Feb 2019 13:06:16 -0600 Subject: [PATCH] Listeners for souls, readme, more tests --- README.md | 119 ++++++++++++++++++++++++++++++++++++++ gun/gun.go | 46 +++++++++++---- gun/node.go | 1 - gun/peer.go | 9 +++ gun/scoped_fetch.go | 5 +- gun/tests/context_test.go | 36 +++++++++--- gun/tests/gun_test.go | 61 +++++++++++++++++++ gun/tests/ws_test.go | 12 ++-- 8 files changed, 264 insertions(+), 25 deletions(-) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..7c0cb93 --- /dev/null +++ b/README.md @@ -0,0 +1,119 @@ +# Esgopeta [![GoDoc](https://godoc.org/github.com/cretz/esgopeta/gun?status.svg)](https://godoc.org/github.com/cretz/esgopeta/gun) + +Esgopeta is a Go implementation of the [Gun](https://github.com/amark/gun) distributed graph database. See the +[Godoc](https://godoc.org/github.com/cretz/esgopeta/gun) for API details. + +**WARNING: This is an early proof-of-concept alpha version. Many pieces are not implemented.** + +Features: + +* Client for reading and writing w/ rudimentary conflict resolution +* In-memory storage + +Not yet implemented: + +* Server +* Alternative storage methods +* SEA (i.e. encryption/auth) + +### Usage + +The package is `github.com/cretz/esgopeta/gun` which can be fetched via `go get`. To listen to database changes for a +value, use `Fetch`. The example below listens for updates on a key for a minute: + +```go +package main + +import ( + "context" + "log" + "time" + + "github.com/cretz/esgopeta/gun" +) + +func main() { + // Let's listen for a minute + ctx, cancelFn := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancelFn() + // Create the Gun client connecting to common Gun server + g, err := gun.New(ctx, gun.Config{ + PeerURLs: []string{"https://gunjs.herokuapp.com/gun"}, + PeerErrorHandler: func(err *gun.ErrPeer) { log.Print(err) }, + }) + if err != nil { + log.Panic(err) + } + // Issue a fetch and get a channel for updates + fetchCh := g.Scoped(ctx, "esgopeta-example", "sample-key").Fetch(ctx) + // Log all updates and exit when context times out + log.Print("Waiting for value") + for { + select { + case <-ctx.Done(): + log.Print("Time's up") + return + case fetchResult := <-fetchCh: + if fetchResult.Err != nil { + log.Printf("Error fetching: %v", fetchResult.Err) + } else if fetchResult.ValueExists { + log.Printf("Got value: %v", fetchResult.Value) + } + } + } +} +``` + +When that's running, we can send values via a `Put`. The example below sends two updates for that key: + +```go +package main + +import ( + "context" + "log" + "time" + + "github.com/cretz/esgopeta/gun" +) + +func main() { + // Give a 1 minute timeout, but shouldn't get hit + ctx, cancelFn := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancelFn() + // Create the Gun client connecting to common Gun server + g, err := gun.New(ctx, gun.Config{ + PeerURLs: []string{"https://gunjs.herokuapp.com/gun"}, + PeerErrorHandler: func(err *gun.ErrPeer) { log.Print(err) }, + }) + if err != nil { + log.Panic(err) + } + // Issue a simple put and wait for a single peer ack + putScope := g.Scoped(ctx, "esgopeta-example", "sample-key") + log.Print("Sending first value") + putCh := putScope.Put(ctx, gun.ValueString("first value")) + for { + if result := <-putCh; result.Err != nil { + log.Printf("Error putting: %v", result.Err) + } else if result.Peer != nil { + log.Printf("Got ack from %v", result.Peer) + break + } + } + // Let's send another value + log.Print("Sending second value") + putCh = putScope.Put(ctx, gun.ValueString("second value")) + for { + if result := <-putCh; result.Err != nil { + log.Printf("Error putting: %v", result.Err) + } else if result.Peer != nil { + log.Printf("Got ack from %v", result.Peer) + break + } + } +} +``` + +Note, these are just examples and you may want to control the lifetime of the channels better. See the +[Godoc](https://godoc.org/github.com/cretz/esgopeta/gun) for more information. \ No newline at end of file diff --git a/gun/gun.go b/gun/gun.go index 58d4c08..b3d06dc 100644 --- a/gun/gun.go +++ b/gun/gun.go @@ -23,6 +23,9 @@ type Gun struct { messageIDListeners map[string]chan<- *messageReceived messageIDListenersLock sync.RWMutex + + messageSoulListeners map[string]chan<- *messageReceived + messageSoulListenersLock sync.RWMutex } type Config struct { @@ -49,14 +52,15 @@ const DefaultOldestAllowedStorageValue = 7 * (60 * time.Minute) func New(ctx context.Context, config Config) (*Gun, error) { g := &Gun{ - currentPeers: make([]*Peer, len(config.PeerURLs)), - storage: config.Storage, - soulGen: config.SoulGen, - peerErrorHandler: config.PeerErrorHandler, - peerSleepOnError: config.PeerSleepOnError, - myPeerID: config.MyPeerID, - tracking: config.Tracking, - messageIDListeners: map[string]chan<- *messageReceived{}, + currentPeers: make([]*Peer, len(config.PeerURLs)), + storage: config.Storage, + soulGen: config.SoulGen, + peerErrorHandler: config.PeerErrorHandler, + peerSleepOnError: config.PeerSleepOnError, + myPeerID: config.MyPeerID, + tracking: config.Tracking, + messageIDListeners: map[string]chan<- *messageReceived{}, + messageSoulListeners: map[string]chan<- *messageReceived{}, } // Create all the peers sleepOnError := config.PeerSleepOnError @@ -216,7 +220,7 @@ func (g *Gun) onPeerMessage(ctx context.Context, msg *messageReceived) { // to determine whether we even put here instead of how we do it now. // * handle gets - // If we're tracking anything, we try to put it (may only be if exists) + // If we're tracking anything, we try to put it (may only be if exists). if g.tracking != TrackingNothing { // If we're tracking everything, we persist everything. Otherwise if we're // only tracking requested, we persist only if it already exists. @@ -237,7 +241,7 @@ func (g *Gun) onPeerMessage(ctx context.Context, msg *messageReceived) { } } - // If there is a listener for this message, use it + // If there is a listener for this message ID, use it and consider the message handled if msg.Ack != "" { g.messageIDListenersLock.RLock() l := g.messageIDListeners[msg.Ack] @@ -248,6 +252,16 @@ func (g *Gun) onPeerMessage(ctx context.Context, msg *messageReceived) { } } + // If there are listeners for any of the souls, use them but don't consider the message handled + for parentSoul := range msg.Put { + g.messageSoulListenersLock.RLock() + l := g.messageSoulListeners[parentSoul] + g.messageSoulListenersLock.RUnlock() + if l != nil { + go safeReceivedMessageSend(l, msg) + } + } + // DAM messages are either requests for our ID or setting of theirs if msg.DAM != "" { if msg.PID == "" { @@ -288,6 +302,18 @@ func (g *Gun) unregisterMessageIDListener(id string) { delete(g.messageIDListeners, id) } +func (g *Gun) registerMessageSoulListener(soul string, ch chan<- *messageReceived) { + g.messageSoulListenersLock.Lock() + defer g.messageSoulListenersLock.Unlock() + g.messageSoulListeners[soul] = ch +} + +func (g *Gun) unregisterMessageSoulListener(soul string) { + g.messageSoulListenersLock.Lock() + defer g.messageSoulListenersLock.Unlock() + delete(g.messageSoulListeners, soul) +} + func safeReceivedMessageSend(ch chan<- *messageReceived, msg *messageReceived) { // Due to the fact that we may send on a closed channel here, we ignore the panic defer func() { recover() }() diff --git a/gun/node.go b/gun/node.go index 60e675f..9d9babf 100644 --- a/gun/node.go +++ b/gun/node.go @@ -65,7 +65,6 @@ type Metadata struct { State map[string]State `json:">,omitempty"` } -// TODO: put private method to seal enum type Value interface { nodeValue() } diff --git a/gun/peer.go b/gun/peer.go index b66f095..32b53c7 100644 --- a/gun/peer.go +++ b/gun/peer.go @@ -161,9 +161,18 @@ func init() { schemeChangedURL.Scheme = "ws" return DialPeerConnWebSocket(ctx, schemeChangedURL) }, + "https": func(ctx context.Context, peerURL *url.URL) (PeerConn, error) { + schemeChangedURL := &url.URL{} + *schemeChangedURL = *peerURL + schemeChangedURL.Scheme = "wss" + return DialPeerConnWebSocket(ctx, schemeChangedURL) + }, "ws": func(ctx context.Context, peerURL *url.URL) (PeerConn, error) { return DialPeerConnWebSocket(ctx, peerURL) }, + "wss": func(ctx context.Context, peerURL *url.URL) (PeerConn, error) { + return DialPeerConnWebSocket(ctx, peerURL) + }, } } diff --git a/gun/scoped_fetch.go b/gun/scoped_fetch.go index d189175..51109d3 100644 --- a/gun/scoped_fetch.go +++ b/gun/scoped_fetch.go @@ -7,6 +7,7 @@ import ( type fetchResultListener struct { id string + parentSoul string results chan *FetchResult receivedMessages chan *messageReceived } @@ -103,10 +104,11 @@ func (s *Scoped) fetchRemote(ctx context.Context, ch chan *FetchResult) { // chan. msgCh := make(chan *messageReceived) s.fetchResultListenersLock.Lock() - s.fetchResultListeners[ch] = &fetchResultListener{req.ID, ch, msgCh} + s.fetchResultListeners[ch] = &fetchResultListener{req.ID, parentSoul, ch, msgCh} s.fetchResultListenersLock.Unlock() // Listen for responses to this get s.gun.registerMessageIDListener(req.ID, msgCh) + s.gun.registerMessageSoulListener(parentSoul, msgCh) // TODO: Also listen for any changes to the value or just for specific requests? // Handle received messages turning them to value fetches var lastSeenValue Value @@ -182,6 +184,7 @@ func (s *Scoped) FetchDone(ch <-chan *FetchResult) bool { if l != nil { // Unregister the chan s.gun.unregisterMessageIDListener(l.id) + s.gun.unregisterMessageSoulListener(l.parentSoul) // Close the message chan and the result chan close(l.receivedMessages) close(l.results) diff --git a/gun/tests/context_test.go b/gun/tests/context_test.go index a5172cc..bb02544 100644 --- a/gun/tests/context_test.go +++ b/gun/tests/context_test.go @@ -9,7 +9,9 @@ import ( "path/filepath" "runtime" "strconv" + "strings" "testing" + "time" "github.com/cretz/esgopeta/gun" "github.com/stretchr/testify/require" @@ -22,6 +24,8 @@ type testContext struct { GunJSPort int } +const defaultTestTimeout = 1 * time.Minute + func newContext(t *testing.T) (*testContext, context.CancelFunc) { return withTestContext(context.Background(), t) } @@ -36,9 +40,10 @@ func newContextWithGunJServer(t *testing.T) (*testContext, context.CancelFunc) { } const defaultGunJSPort = 8080 +const defaultRemoteGunServerURL = "https://gunjs.herokuapp.com/gun" func withTestContext(ctx context.Context, t *testing.T) (*testContext, context.CancelFunc) { - ctx, cancelFn := context.WithCancel(ctx) + ctx, cancelFn := context.WithTimeout(ctx, defaultTestTimeout) return &testContext{ Context: ctx, T: t, @@ -65,10 +70,14 @@ func (t *testContext) runJS(script string) []byte { } func (t *testContext) runJSWithGun(script string) []byte { + return t.runJSWithGunURL("http://127.0.0.1:"+strconv.Itoa(t.GunJSPort)+"/gun", script) +} + +func (t *testContext) runJSWithGunURL(url string, script string) []byte { return t.runJS(` var Gun = require('gun') const gun = Gun({ - peers: ['http://127.0.0.1:` + strconv.Itoa(t.GunJSPort) + `/gun'], + peers: ['` + url + `'], radisk: false }) ` + script) @@ -90,7 +99,7 @@ func (t *testContext) startGunJSServer() context.CancelFunc { // If we're logging, use a proxy port := t.GunJSPort if testing.Verbose() { - t.startGunWebSocketProxyLogger(port, port+1) + t.startGunWebSocketProxyLogger(port, "ws://127.0.0.1:"+strconv.Itoa(port+1)+"/gun") port++ } // Remove entire data folder first just in case @@ -108,12 +117,25 @@ func (t *testContext) startGunJSServer() context.CancelFunc { } } +func (t *testContext) prepareRemoteGunServer(origURL string) (newURL string) { + // If we're verbose, use proxy, otherwise just use orig + if !testing.Verbose() { + return origURL + } + origURL = strings.Replace(origURL, "http://", "ws://", 1) + origURL = strings.Replace(origURL, "https://", "wss://", 1) + t.startGunWebSocketProxyLogger(t.GunJSPort, origURL) + return "http://127.0.0.1:" + strconv.Itoa(t.GunJSPort) + "/gun" +} + func (t *testContext) newGunConnectedToGunJS() *gun.Gun { + return t.newGunConnectedToGunServer("http://127.0.0.1:" + strconv.Itoa(t.GunJSPort) + "/gun") +} + +func (t *testContext) newGunConnectedToGunServer(url string) *gun.Gun { config := gun.Config{ - PeerURLs: []string{"http://127.0.0.1:" + strconv.Itoa(t.GunJSPort) + "/gun"}, - PeerErrorHandler: func(errPeer *gun.ErrPeer) { - t.debugf("Got peer error: %v", errPeer) - }, + PeerURLs: []string{url}, + PeerErrorHandler: func(errPeer *gun.ErrPeer) { t.debugf("Got peer error: %v", errPeer) }, } g, err := gun.New(t, config) t.Require.NoError(err) diff --git a/gun/tests/gun_test.go b/gun/tests/gun_test.go index 6fd71d0..0b636f3 100644 --- a/gun/tests/gun_test.go +++ b/gun/tests/gun_test.go @@ -39,6 +39,33 @@ func TestGunGetSimple(t *testing.T) { ctx.Require.Equal(gun.ValueString(randStr), r.Value.(gun.ValueString)) } +func TestGunGetSimpleRemote(t *testing.T) { + // Do the above but w/ remote server + ctx, cancelFn := newContext(t) + defer cancelFn() + remoteURL := ctx.prepareRemoteGunServer(defaultRemoteGunServerURL) + randKey, randVal := "key-"+randString(30), gun.ValueString(randString(30)) + // Write w/ JS + ctx.debugf("Writing value") + ctx.runJSWithGunURL(remoteURL, ` + gun.get('esgopeta-test').get('TestGunGetSimpleRemote').get('`+randKey+`').put('`+string(randVal)+`', ack => { + if (ack.err) { + console.error(ack.err) + process.exit(1) + } + process.exit(0) + }) + `) + // Get + ctx.debugf("Reading value") + g := ctx.newGunConnectedToGunServer(remoteURL) + defer g.Close() + // Make sure we got back the same value + r := g.Scoped(ctx, "esgopeta-test", "TestGunGetSimpleRemote", randKey).FetchOne(ctx) + ctx.Require.NoError(r.Err) + ctx.Require.Equal(randVal, r.Value) +} + func TestGunPutSimple(t *testing.T) { ctx, cancelFn := newContextWithGunJServer(t) defer cancelFn() @@ -63,6 +90,40 @@ func TestGunPutSimple(t *testing.T) { ctx.Require.Equal(randStr, strings.TrimSpace(string(out))) } +func TestGunPubSubSimpleRemote(t *testing.T) { + ctx, cancelFn := newContext(t) + defer cancelFn() + remoteURL := ctx.prepareRemoteGunServer(defaultRemoteGunServerURL) + randKey, randVal := "key-"+randString(30), gun.ValueString(randString(30)) + // Start a fetcher + ctx.debugf("Starting fetcher") + fetchGun := ctx.newGunConnectedToGunServer(remoteURL) + defer fetchGun.Close() + fetchCh := fetchGun.Scoped(ctx, "esgopeta-test", "TestGunPubSubSimpleRemote", randKey).Fetch(ctx) + // Now put it from another instance + ctx.debugf("Putting data") + putGun := ctx.newGunConnectedToGunServer(remoteURL) + defer putGun.Close() + putScope := putGun.Scoped(ctx, "esgopeta-test", "TestGunPubSubSimpleRemote", randKey) + putScope.Put(ctx, randVal) + ctx.debugf("Checking fetcher") + // See that the fetch got the value + for { + select { + case <-ctx.Done(): + ctx.Require.NoError(ctx.Err()) + case result := <-fetchCh: + ctx.Require.NoError(result.Err) + if !result.ValueExists { + ctx.debugf("No value, trying again (got %v)", result) + continue + } + ctx.Require.Equal(randVal, result.Value) + return + } + } +} + /* TODO Tests to write: * test put w/ future state happens then diff --git a/gun/tests/ws_test.go b/gun/tests/ws_test.go index 7a113be..5de2e1b 100644 --- a/gun/tests/ws_test.go +++ b/gun/tests/ws_test.go @@ -11,8 +11,8 @@ import ( "github.com/gorilla/websocket" ) -func (t *testContext) startGunWebSocketProxyLogger(listenPort, targetPort int) { - fromGun, toGun := t.startGunWebSocketProxy(listenPort, targetPort) +func (t *testContext) startGunWebSocketProxyLogger(listenPort int, targetURL string) { + fromGun, toGun := t.startGunWebSocketProxy(listenPort, targetURL) time.Sleep(time.Second) go func() { for { @@ -70,14 +70,14 @@ func (t *testContext) formattedGunJSONs(msg []byte) []string { return ret } -func (t *testContext) startGunWebSocketProxy(listenPort, targetPort int) (fromTarget <-chan []byte, toTarget <-chan []byte) { +func (t *testContext) startGunWebSocketProxy(listenPort int, targetURL string) (fromTarget <-chan []byte, toTarget <-chan []byte) { fromTargetCh := make(chan []byte) toTargetCh := make(chan []byte) server := &http.Server{ Addr: "127.0.0.1:" + strconv.Itoa(listenPort), Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { t.debugf("New ws proxy connection") - err := t.handleGunWebSocketProxy(targetPort, w, r, fromTargetCh, toTargetCh) + err := t.handleGunWebSocketProxy(targetURL, w, r, fromTargetCh, toTargetCh) if _, ok := err.(*websocket.CloseError); !ok { t.debugf("Unexpected web socket close error: %v", err) } @@ -99,13 +99,13 @@ func (t *testContext) startGunWebSocketProxy(listenPort, targetPort int) (fromTa var wsDefaultUpgrader = websocket.Upgrader{} func (t *testContext) handleGunWebSocketProxy( - targetPort int, + targetURL string, w http.ResponseWriter, r *http.Request, fromOther chan<- []byte, toOther chan<- []byte, ) error { - otherConn, _, err := websocket.DefaultDialer.DialContext(t, "ws://127.0.0.1:"+strconv.Itoa(targetPort)+"/gun", nil) + otherConn, _, err := websocket.DefaultDialer.DialContext(t, targetURL, nil) if err != nil { return err }