Listeners for souls, readme, more tests

This commit is contained in:
Chad Retz 2019-02-26 13:06:16 -06:00
parent 1d987a5e79
commit 6271b5627b
8 changed files with 264 additions and 25 deletions

119
README.md Normal file
View File

@ -0,0 +1,119 @@
# Esgopeta [![GoDoc](https://godoc.org/github.com/cretz/esgopeta/gun?status.svg)](https://godoc.org/github.com/cretz/esgopeta/gun)
Esgopeta is a Go implementation of the [Gun](https://github.com/amark/gun) distributed graph database. See the
[Godoc](https://godoc.org/github.com/cretz/esgopeta/gun) for API details.
**WARNING: This is an early proof-of-concept alpha version. Many pieces are not implemented.**
Features:
* Client for reading and writing w/ rudimentary conflict resolution
* In-memory storage
Not yet implemented:
* Server
* Alternative storage methods
* SEA (i.e. encryption/auth)
### Usage
The package is `github.com/cretz/esgopeta/gun` which can be fetched via `go get`. To listen to database changes for a
value, use `Fetch`. The example below listens for updates on a key for a minute:
```go
package main
import (
"context"
"log"
"time"
"github.com/cretz/esgopeta/gun"
)
func main() {
// Let's listen for a minute
ctx, cancelFn := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancelFn()
// Create the Gun client connecting to common Gun server
g, err := gun.New(ctx, gun.Config{
PeerURLs: []string{"https://gunjs.herokuapp.com/gun"},
PeerErrorHandler: func(err *gun.ErrPeer) { log.Print(err) },
})
if err != nil {
log.Panic(err)
}
// Issue a fetch and get a channel for updates
fetchCh := g.Scoped(ctx, "esgopeta-example", "sample-key").Fetch(ctx)
// Log all updates and exit when context times out
log.Print("Waiting for value")
for {
select {
case <-ctx.Done():
log.Print("Time's up")
return
case fetchResult := <-fetchCh:
if fetchResult.Err != nil {
log.Printf("Error fetching: %v", fetchResult.Err)
} else if fetchResult.ValueExists {
log.Printf("Got value: %v", fetchResult.Value)
}
}
}
}
```
When that's running, we can send values via a `Put`. The example below sends two updates for that key:
```go
package main
import (
"context"
"log"
"time"
"github.com/cretz/esgopeta/gun"
)
func main() {
// Give a 1 minute timeout, but shouldn't get hit
ctx, cancelFn := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancelFn()
// Create the Gun client connecting to common Gun server
g, err := gun.New(ctx, gun.Config{
PeerURLs: []string{"https://gunjs.herokuapp.com/gun"},
PeerErrorHandler: func(err *gun.ErrPeer) { log.Print(err) },
})
if err != nil {
log.Panic(err)
}
// Issue a simple put and wait for a single peer ack
putScope := g.Scoped(ctx, "esgopeta-example", "sample-key")
log.Print("Sending first value")
putCh := putScope.Put(ctx, gun.ValueString("first value"))
for {
if result := <-putCh; result.Err != nil {
log.Printf("Error putting: %v", result.Err)
} else if result.Peer != nil {
log.Printf("Got ack from %v", result.Peer)
break
}
}
// Let's send another value
log.Print("Sending second value")
putCh = putScope.Put(ctx, gun.ValueString("second value"))
for {
if result := <-putCh; result.Err != nil {
log.Printf("Error putting: %v", result.Err)
} else if result.Peer != nil {
log.Printf("Got ack from %v", result.Peer)
break
}
}
}
```
Note, these are just examples and you may want to control the lifetime of the channels better. See the
[Godoc](https://godoc.org/github.com/cretz/esgopeta/gun) for more information.

View File

@ -23,6 +23,9 @@ type Gun struct {
messageIDListeners map[string]chan<- *messageReceived
messageIDListenersLock sync.RWMutex
messageSoulListeners map[string]chan<- *messageReceived
messageSoulListenersLock sync.RWMutex
}
type Config struct {
@ -49,14 +52,15 @@ const DefaultOldestAllowedStorageValue = 7 * (60 * time.Minute)
func New(ctx context.Context, config Config) (*Gun, error) {
g := &Gun{
currentPeers: make([]*Peer, len(config.PeerURLs)),
storage: config.Storage,
soulGen: config.SoulGen,
peerErrorHandler: config.PeerErrorHandler,
peerSleepOnError: config.PeerSleepOnError,
myPeerID: config.MyPeerID,
tracking: config.Tracking,
messageIDListeners: map[string]chan<- *messageReceived{},
currentPeers: make([]*Peer, len(config.PeerURLs)),
storage: config.Storage,
soulGen: config.SoulGen,
peerErrorHandler: config.PeerErrorHandler,
peerSleepOnError: config.PeerSleepOnError,
myPeerID: config.MyPeerID,
tracking: config.Tracking,
messageIDListeners: map[string]chan<- *messageReceived{},
messageSoulListeners: map[string]chan<- *messageReceived{},
}
// Create all the peers
sleepOnError := config.PeerSleepOnError
@ -216,7 +220,7 @@ func (g *Gun) onPeerMessage(ctx context.Context, msg *messageReceived) {
// to determine whether we even put here instead of how we do it now.
// * handle gets
// If we're tracking anything, we try to put it (may only be if exists)
// If we're tracking anything, we try to put it (may only be if exists).
if g.tracking != TrackingNothing {
// If we're tracking everything, we persist everything. Otherwise if we're
// only tracking requested, we persist only if it already exists.
@ -237,7 +241,7 @@ func (g *Gun) onPeerMessage(ctx context.Context, msg *messageReceived) {
}
}
// If there is a listener for this message, use it
// If there is a listener for this message ID, use it and consider the message handled
if msg.Ack != "" {
g.messageIDListenersLock.RLock()
l := g.messageIDListeners[msg.Ack]
@ -248,6 +252,16 @@ func (g *Gun) onPeerMessage(ctx context.Context, msg *messageReceived) {
}
}
// If there are listeners for any of the souls, use them but don't consider the message handled
for parentSoul := range msg.Put {
g.messageSoulListenersLock.RLock()
l := g.messageSoulListeners[parentSoul]
g.messageSoulListenersLock.RUnlock()
if l != nil {
go safeReceivedMessageSend(l, msg)
}
}
// DAM messages are either requests for our ID or setting of theirs
if msg.DAM != "" {
if msg.PID == "" {
@ -288,6 +302,18 @@ func (g *Gun) unregisterMessageIDListener(id string) {
delete(g.messageIDListeners, id)
}
func (g *Gun) registerMessageSoulListener(soul string, ch chan<- *messageReceived) {
g.messageSoulListenersLock.Lock()
defer g.messageSoulListenersLock.Unlock()
g.messageSoulListeners[soul] = ch
}
func (g *Gun) unregisterMessageSoulListener(soul string) {
g.messageSoulListenersLock.Lock()
defer g.messageSoulListenersLock.Unlock()
delete(g.messageSoulListeners, soul)
}
func safeReceivedMessageSend(ch chan<- *messageReceived, msg *messageReceived) {
// Due to the fact that we may send on a closed channel here, we ignore the panic
defer func() { recover() }()

View File

@ -65,7 +65,6 @@ type Metadata struct {
State map[string]State `json:">,omitempty"`
}
// TODO: put private method to seal enum
type Value interface {
nodeValue()
}

View File

@ -161,9 +161,18 @@ func init() {
schemeChangedURL.Scheme = "ws"
return DialPeerConnWebSocket(ctx, schemeChangedURL)
},
"https": func(ctx context.Context, peerURL *url.URL) (PeerConn, error) {
schemeChangedURL := &url.URL{}
*schemeChangedURL = *peerURL
schemeChangedURL.Scheme = "wss"
return DialPeerConnWebSocket(ctx, schemeChangedURL)
},
"ws": func(ctx context.Context, peerURL *url.URL) (PeerConn, error) {
return DialPeerConnWebSocket(ctx, peerURL)
},
"wss": func(ctx context.Context, peerURL *url.URL) (PeerConn, error) {
return DialPeerConnWebSocket(ctx, peerURL)
},
}
}

View File

@ -7,6 +7,7 @@ import (
type fetchResultListener struct {
id string
parentSoul string
results chan *FetchResult
receivedMessages chan *messageReceived
}
@ -103,10 +104,11 @@ func (s *Scoped) fetchRemote(ctx context.Context, ch chan *FetchResult) {
// chan.
msgCh := make(chan *messageReceived)
s.fetchResultListenersLock.Lock()
s.fetchResultListeners[ch] = &fetchResultListener{req.ID, ch, msgCh}
s.fetchResultListeners[ch] = &fetchResultListener{req.ID, parentSoul, ch, msgCh}
s.fetchResultListenersLock.Unlock()
// Listen for responses to this get
s.gun.registerMessageIDListener(req.ID, msgCh)
s.gun.registerMessageSoulListener(parentSoul, msgCh)
// TODO: Also listen for any changes to the value or just for specific requests?
// Handle received messages turning them to value fetches
var lastSeenValue Value
@ -182,6 +184,7 @@ func (s *Scoped) FetchDone(ch <-chan *FetchResult) bool {
if l != nil {
// Unregister the chan
s.gun.unregisterMessageIDListener(l.id)
s.gun.unregisterMessageSoulListener(l.parentSoul)
// Close the message chan and the result chan
close(l.receivedMessages)
close(l.results)

View File

@ -9,7 +9,9 @@ import (
"path/filepath"
"runtime"
"strconv"
"strings"
"testing"
"time"
"github.com/cretz/esgopeta/gun"
"github.com/stretchr/testify/require"
@ -22,6 +24,8 @@ type testContext struct {
GunJSPort int
}
const defaultTestTimeout = 1 * time.Minute
func newContext(t *testing.T) (*testContext, context.CancelFunc) {
return withTestContext(context.Background(), t)
}
@ -36,9 +40,10 @@ func newContextWithGunJServer(t *testing.T) (*testContext, context.CancelFunc) {
}
const defaultGunJSPort = 8080
const defaultRemoteGunServerURL = "https://gunjs.herokuapp.com/gun"
func withTestContext(ctx context.Context, t *testing.T) (*testContext, context.CancelFunc) {
ctx, cancelFn := context.WithCancel(ctx)
ctx, cancelFn := context.WithTimeout(ctx, defaultTestTimeout)
return &testContext{
Context: ctx,
T: t,
@ -65,10 +70,14 @@ func (t *testContext) runJS(script string) []byte {
}
func (t *testContext) runJSWithGun(script string) []byte {
return t.runJSWithGunURL("http://127.0.0.1:"+strconv.Itoa(t.GunJSPort)+"/gun", script)
}
func (t *testContext) runJSWithGunURL(url string, script string) []byte {
return t.runJS(`
var Gun = require('gun')
const gun = Gun({
peers: ['http://127.0.0.1:` + strconv.Itoa(t.GunJSPort) + `/gun'],
peers: ['` + url + `'],
radisk: false
})
` + script)
@ -90,7 +99,7 @@ func (t *testContext) startGunJSServer() context.CancelFunc {
// If we're logging, use a proxy
port := t.GunJSPort
if testing.Verbose() {
t.startGunWebSocketProxyLogger(port, port+1)
t.startGunWebSocketProxyLogger(port, "ws://127.0.0.1:"+strconv.Itoa(port+1)+"/gun")
port++
}
// Remove entire data folder first just in case
@ -108,12 +117,25 @@ func (t *testContext) startGunJSServer() context.CancelFunc {
}
}
func (t *testContext) prepareRemoteGunServer(origURL string) (newURL string) {
// If we're verbose, use proxy, otherwise just use orig
if !testing.Verbose() {
return origURL
}
origURL = strings.Replace(origURL, "http://", "ws://", 1)
origURL = strings.Replace(origURL, "https://", "wss://", 1)
t.startGunWebSocketProxyLogger(t.GunJSPort, origURL)
return "http://127.0.0.1:" + strconv.Itoa(t.GunJSPort) + "/gun"
}
func (t *testContext) newGunConnectedToGunJS() *gun.Gun {
return t.newGunConnectedToGunServer("http://127.0.0.1:" + strconv.Itoa(t.GunJSPort) + "/gun")
}
func (t *testContext) newGunConnectedToGunServer(url string) *gun.Gun {
config := gun.Config{
PeerURLs: []string{"http://127.0.0.1:" + strconv.Itoa(t.GunJSPort) + "/gun"},
PeerErrorHandler: func(errPeer *gun.ErrPeer) {
t.debugf("Got peer error: %v", errPeer)
},
PeerURLs: []string{url},
PeerErrorHandler: func(errPeer *gun.ErrPeer) { t.debugf("Got peer error: %v", errPeer) },
}
g, err := gun.New(t, config)
t.Require.NoError(err)

View File

@ -39,6 +39,33 @@ func TestGunGetSimple(t *testing.T) {
ctx.Require.Equal(gun.ValueString(randStr), r.Value.(gun.ValueString))
}
func TestGunGetSimpleRemote(t *testing.T) {
// Do the above but w/ remote server
ctx, cancelFn := newContext(t)
defer cancelFn()
remoteURL := ctx.prepareRemoteGunServer(defaultRemoteGunServerURL)
randKey, randVal := "key-"+randString(30), gun.ValueString(randString(30))
// Write w/ JS
ctx.debugf("Writing value")
ctx.runJSWithGunURL(remoteURL, `
gun.get('esgopeta-test').get('TestGunGetSimpleRemote').get('`+randKey+`').put('`+string(randVal)+`', ack => {
if (ack.err) {
console.error(ack.err)
process.exit(1)
}
process.exit(0)
})
`)
// Get
ctx.debugf("Reading value")
g := ctx.newGunConnectedToGunServer(remoteURL)
defer g.Close()
// Make sure we got back the same value
r := g.Scoped(ctx, "esgopeta-test", "TestGunGetSimpleRemote", randKey).FetchOne(ctx)
ctx.Require.NoError(r.Err)
ctx.Require.Equal(randVal, r.Value)
}
func TestGunPutSimple(t *testing.T) {
ctx, cancelFn := newContextWithGunJServer(t)
defer cancelFn()
@ -63,6 +90,40 @@ func TestGunPutSimple(t *testing.T) {
ctx.Require.Equal(randStr, strings.TrimSpace(string(out)))
}
func TestGunPubSubSimpleRemote(t *testing.T) {
ctx, cancelFn := newContext(t)
defer cancelFn()
remoteURL := ctx.prepareRemoteGunServer(defaultRemoteGunServerURL)
randKey, randVal := "key-"+randString(30), gun.ValueString(randString(30))
// Start a fetcher
ctx.debugf("Starting fetcher")
fetchGun := ctx.newGunConnectedToGunServer(remoteURL)
defer fetchGun.Close()
fetchCh := fetchGun.Scoped(ctx, "esgopeta-test", "TestGunPubSubSimpleRemote", randKey).Fetch(ctx)
// Now put it from another instance
ctx.debugf("Putting data")
putGun := ctx.newGunConnectedToGunServer(remoteURL)
defer putGun.Close()
putScope := putGun.Scoped(ctx, "esgopeta-test", "TestGunPubSubSimpleRemote", randKey)
putScope.Put(ctx, randVal)
ctx.debugf("Checking fetcher")
// See that the fetch got the value
for {
select {
case <-ctx.Done():
ctx.Require.NoError(ctx.Err())
case result := <-fetchCh:
ctx.Require.NoError(result.Err)
if !result.ValueExists {
ctx.debugf("No value, trying again (got %v)", result)
continue
}
ctx.Require.Equal(randVal, result.Value)
return
}
}
}
/*
TODO Tests to write:
* test put w/ future state happens then

View File

@ -11,8 +11,8 @@ import (
"github.com/gorilla/websocket"
)
func (t *testContext) startGunWebSocketProxyLogger(listenPort, targetPort int) {
fromGun, toGun := t.startGunWebSocketProxy(listenPort, targetPort)
func (t *testContext) startGunWebSocketProxyLogger(listenPort int, targetURL string) {
fromGun, toGun := t.startGunWebSocketProxy(listenPort, targetURL)
time.Sleep(time.Second)
go func() {
for {
@ -70,14 +70,14 @@ func (t *testContext) formattedGunJSONs(msg []byte) []string {
return ret
}
func (t *testContext) startGunWebSocketProxy(listenPort, targetPort int) (fromTarget <-chan []byte, toTarget <-chan []byte) {
func (t *testContext) startGunWebSocketProxy(listenPort int, targetURL string) (fromTarget <-chan []byte, toTarget <-chan []byte) {
fromTargetCh := make(chan []byte)
toTargetCh := make(chan []byte)
server := &http.Server{
Addr: "127.0.0.1:" + strconv.Itoa(listenPort),
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
t.debugf("New ws proxy connection")
err := t.handleGunWebSocketProxy(targetPort, w, r, fromTargetCh, toTargetCh)
err := t.handleGunWebSocketProxy(targetURL, w, r, fromTargetCh, toTargetCh)
if _, ok := err.(*websocket.CloseError); !ok {
t.debugf("Unexpected web socket close error: %v", err)
}
@ -99,13 +99,13 @@ func (t *testContext) startGunWebSocketProxy(listenPort, targetPort int) (fromTa
var wsDefaultUpgrader = websocket.Upgrader{}
func (t *testContext) handleGunWebSocketProxy(
targetPort int,
targetURL string,
w http.ResponseWriter,
r *http.Request,
fromOther chan<- []byte,
toOther chan<- []byte,
) error {
otherConn, _, err := websocket.DefaultDialer.DialContext(t, "ws://127.0.0.1:"+strconv.Itoa(targetPort)+"/gun", nil)
otherConn, _, err := websocket.DefaultDialer.DialContext(t, targetURL, nil)
if err != nil {
return err
}