From a0e3944d8c5afd679161eb6a09c1987e5ee95d9c Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Wed, 20 Feb 2019 14:54:46 -0600 Subject: [PATCH] Initial commit --- .gitignore | 4 ++ gun/gun.go | 69 +++++++++++++++++++++ gun/node.go | 32 ++++++++++ gun/peer.go | 28 +++++++++ gun/scoped.go | 40 +++++++++++++ gun/storage.go | 7 +++ gun/tests/context_test.go | 70 ++++++++++++++++++++++ gun/tests/js_test.go | 47 +++++++++++++++ gun/tests/package.json | 8 +++ gun/tests/util_test.go | 35 +++++++++++ gun/tests/ws_test.go | 123 ++++++++++++++++++++++++++++++++++++++ gun/time.go | 46 ++++++++++++++ gun/util.go | 17 ++++++ 13 files changed, 526 insertions(+) create mode 100644 .gitignore create mode 100644 gun/gun.go create mode 100644 gun/node.go create mode 100644 gun/peer.go create mode 100644 gun/scoped.go create mode 100644 gun/storage.go create mode 100644 gun/tests/context_test.go create mode 100644 gun/tests/js_test.go create mode 100644 gun/tests/package.json create mode 100644 gun/tests/util_test.go create mode 100644 gun/tests/ws_test.go create mode 100644 gun/time.go create mode 100644 gun/util.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6dbe14b --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +/gun/tests/node_modules +/gun/tests/ossl +/gun/tests/radata-server +/gun/tests/package-lock.json diff --git a/gun/gun.go b/gun/gun.go new file mode 100644 index 0000000..e92fd15 --- /dev/null +++ b/gun/gun.go @@ -0,0 +1,69 @@ +package gun + +import ( + "context" + "fmt" + "net/url" +) + +type Gun struct { + peers []Peer + storage Storage + soulGen func() Soul +} + +type Config struct { + Peers []Peer + Storage Storage + SoulGen func() Soul +} + +func New(config Config) *Gun { + g := &Gun{ + peers: make([]Peer, len(config.Peers)), + storage: config.Storage, + soulGen: config.SoulGen, + } + // Copy over peers + copy(g.peers, config.Peers) + // Set defaults + if g.storage == nil { + g.storage = &StorageInMem{} + } + if g.soulGen == nil { + g.soulGen = SoulGenDefault + } + return g +} + +// To note: Fails on even one peer failure (otherwise, do this yourself). May connect to +// some peers temporarily until first failure, but closes them all on failure +func NewFromPeerURLs(ctx context.Context, peerURLs ...string) (g *Gun, err error) { + c := Config{Peers: make([]Peer, len(peerURLs))} + for i := 0; i < len(peerURLs) && err == nil; i++ { + if parsedURL, err := url.Parse(peerURLs[i]); err != nil { + err = fmt.Errorf("Failed parsing peer URL %v: %v", peerURLs[i], err) + } else if peerNew := PeerURLSchemes[parsedURL.Scheme]; peerNew == nil { + err = fmt.Errorf("Unknown peer URL scheme for %v", peerURLs[i]) + } else if c.Peers[i], err = peerNew(ctx, parsedURL); err != nil { + err = fmt.Errorf("Failed connecting to peer %v: %v", peerURLs[i], err) + } + } + if err != nil { + for _, peer := range c.Peers { + peer.Close() + } + return + } + return New(c), nil +} + +type Message struct { + Ack string `json:"@,omitEmpty"` + ID string `json:"#,omitEmpty"` + Sender string `json:"><,omitEmpty"` + Hash string `json:"##,omitempty"` + OK *int `json:"ok,omitempty"` + How string `json:"how,omitempty"` + // TODO: "get", "put", "dam" +} diff --git a/gun/node.go b/gun/node.go new file mode 100644 index 0000000..a414035 --- /dev/null +++ b/gun/node.go @@ -0,0 +1,32 @@ +package gun + +import "strconv" + +var SoulGenDefault = func() Soul { + ms, uniqueNum := TimeNowUniqueUnix() + s := strconv.FormatInt(ms, 36) + if uniqueNum > 0 { + s += strconv.FormatInt(uniqueNum, 36) + } + return Soul(s + randString(12)) +} + +type Node struct { + NodeMetadata + Values map[string]NodeValue +} + +type NodeMetadata struct { + Soul Soul + HAMState map[string]uint64 +} + +type Soul string + +type NodeValue interface { +} + +type NodeString string +type NodeNumber string +type NodeBool bool +type NodeRelation Soul diff --git a/gun/peer.go b/gun/peer.go new file mode 100644 index 0000000..2a1ddf2 --- /dev/null +++ b/gun/peer.go @@ -0,0 +1,28 @@ +package gun + +import ( + "context" + "net/url" + + "github.com/gorilla/websocket" +) + +type Peer interface { + Close() error +} + +var PeerURLSchemes = map[string]func(context.Context, *url.URL) (Peer, error){ + "ws": func(ctx context.Context, peerUrl *url.URL) (Peer, error) { return NewPeerWebSocket(ctx, peerUrl) }, +} + +type PeerWebSocket struct { + *websocket.Conn +} + +func NewPeerWebSocket(ctx context.Context, peerUrl *url.URL) (*PeerWebSocket, error) { + conn, _, err := websocket.DefaultDialer.DialContext(ctx, peerUrl.String(), nil) + if err != nil { + return nil, err + } + return &PeerWebSocket{conn}, nil +} diff --git a/gun/scoped.go b/gun/scoped.go new file mode 100644 index 0000000..d1e8189 --- /dev/null +++ b/gun/scoped.go @@ -0,0 +1,40 @@ +package gun + +// type Scoped interface { +// Path() []string +// // Shortcut for last Path() entry or empty string +// Key() string +// Scoped(...string) Scoped +// Up(count int) Scoped +// // Shortcut for Up(1) +// Parent() Scoped +// // Shortcut for Up(-1) +// Root() Scoped + +// Val(context.Context) *ValueFetch +// Watch(context.Context) <-chan *ValueFetch +// WatchChildren(context.Context) <-chan *ValueFetch +// Put(context.Context, Value) <-chan *Ack +// Add(context.Context, Value) <-chan *Ack +// } + +type Scoped struct { + gun *Gun + path []string +} + +type ValueFetch struct { + Err error + Key string + Value Value + Peer Peer +} + +type Value interface { +} + +type Ack struct { + Err error + Ok bool + Peer Peer +} diff --git a/gun/storage.go b/gun/storage.go new file mode 100644 index 0000000..0d4ee7a --- /dev/null +++ b/gun/storage.go @@ -0,0 +1,7 @@ +package gun + +type Storage interface { +} + +type StorageInMem struct { +} diff --git a/gun/tests/context_test.go b/gun/tests/context_test.go new file mode 100644 index 0000000..e982a21 --- /dev/null +++ b/gun/tests/context_test.go @@ -0,0 +1,70 @@ +package tests + +import ( + "bytes" + "context" + "log" + "os/exec" + "path/filepath" + "runtime" + "strconv" + "testing" + + "github.com/stretchr/testify/require" +) + +type testContext struct { + context.Context + *testing.T + Require *require.Assertions +} + +func newContext(t *testing.T) (*testContext, context.CancelFunc) { + return withTestContext(context.Background(), t) +} + +func withTestContext(ctx context.Context, t *testing.T) (*testContext, context.CancelFunc) { + ctx, cancelFn := context.WithCancel(ctx) + return &testContext{ + Context: ctx, + T: t, + Require: require.New(t), + }, cancelFn +} + +func (t *testContext) debugf(format string, args ...interface{}) { + if testing.Verbose() { + log.Printf(format, args...) + } +} + +func (t *testContext) runJS(script string) []byte { + cmd := exec.CommandContext(t, "node") + _, currFile, _, _ := runtime.Caller(0) + cmd.Dir = filepath.Dir(currFile) + cmd.Stdin = bytes.NewReader([]byte(script)) + out, err := cmd.CombinedOutput() + out = removeGunJSWelcome(out) + t.Require.NoErrorf(err, "JS failure, output:\n%v", string(out)) + return out +} + +func (t *testContext) startJS(script string) (*bytes.Buffer, *exec.Cmd, context.CancelFunc) { + cmdCtx, cancelFn := context.WithCancel(t) + cmd := exec.CommandContext(cmdCtx, "node") + _, currFile, _, _ := runtime.Caller(0) + cmd.Dir = filepath.Dir(currFile) + cmd.Stdin = bytes.NewReader([]byte(script)) + var buf bytes.Buffer + cmd.Stdout, cmd.Stderr = &buf, &buf + t.Require.NoError(cmd.Start()) + return &buf, cmd, cancelFn +} + +func (t *testContext) startGunServer(port int) { + t.startJS(` + var Gun = require('gun') + const server = require('http').createServer().listen(` + strconv.Itoa(port) + `) + const gun = Gun({web: server, file: 'radata-server'}) + `) +} diff --git a/gun/tests/js_test.go b/gun/tests/js_test.go new file mode 100644 index 0000000..e993828 --- /dev/null +++ b/gun/tests/js_test.go @@ -0,0 +1,47 @@ +package tests + +import ( + "strings" + "testing" +) + +func TestSimpleJS(t *testing.T) { + ctx, cancelFn := newContext(t) + defer cancelFn() + ctx.Require.Equal("yay 3\n", string(ctx.runJS("console.log('yay', 1 + 2)"))) +} + +func TestGunJS(t *testing.T) { + // Run the server, put in one call, get in another, then check + ctx, cancelFn := newContext(t) + defer cancelFn() + ctx.startGunServer(8080) + ctx.startGunWebSocketProxyLogger(8081, 8080) + randStr := randString(30) + ctx.runJS(` + var Gun = require('gun') + const gun = Gun({ + peers: ['http://127.0.0.1:8081/gun'], + radisk: false + }) + gun.get('esgopeta-test').get('TestGunJS').get('some-key').put('` + randStr + `', ack => { + if (ack.err) { + console.error(ack.err) + process.exit(1) + } + process.exit(0) + }) + `) + out := ctx.runJS(` + var Gun = require('gun') + const gun = Gun({ + peers: ['http://127.0.0.1:8081/gun'], + radisk: false + }) + gun.get('esgopeta-test').get('TestGunJS').get('some-key').once(data => { + console.log(data) + process.exit(0) + }) + `) + ctx.Require.Equal(randStr, strings.TrimSpace(string(out))) +} diff --git a/gun/tests/package.json b/gun/tests/package.json new file mode 100644 index 0000000..870fa15 --- /dev/null +++ b/gun/tests/package.json @@ -0,0 +1,8 @@ +{ + "name": "esgopeta-tests", + "version": "0.1.0", + "private": true, + "dependencies": { + "gun": "^0.9.9999991" + } +} diff --git a/gun/tests/util_test.go b/gun/tests/util_test.go new file mode 100644 index 0000000..3339874 --- /dev/null +++ b/gun/tests/util_test.go @@ -0,0 +1,35 @@ +package tests + +import ( + "bytes" + "crypto/rand" +) + +const randChars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + +func randString(n int) (s string) { + // We accept that a multiple of 64 is %'d on 62 potentially favoring 0 or 1 more, but we don't care + byts := make([]byte, n) + if _, err := rand.Read(byts); err != nil { + panic(err) + } + for _, byt := range byts { + s += string(randChars[int(byt)%len(randChars)]) + } + return s +} + +func removeGunJSWelcome(b []byte) []byte { + if bytes.Index(b, []byte("Hello wonderful person!")) == 0 { + b = b[bytes.IndexByte(b, '\n')+1:] + } + return b +} + +func skipGunJSWelcome(buf *bytes.Buffer) { + if bytes.Index(buf.Bytes(), []byte("Hello wonderful person!")) == 0 { + if _, err := buf.ReadBytes('\n'); err != nil { + panic(err) + } + } +} diff --git a/gun/tests/ws_test.go b/gun/tests/ws_test.go new file mode 100644 index 0000000..42d9d71 --- /dev/null +++ b/gun/tests/ws_test.go @@ -0,0 +1,123 @@ +package tests + +import ( + "log" + "net/http" + "strconv" + "time" + + "github.com/gorilla/websocket" +) + +func (t *testContext) startGunWebSocketProxyLogger(listenPort, targetPort int) { + fromGun, toGun := t.startGunWebSocketProxy(listenPort, targetPort) + time.Sleep(time.Second) + go func() { + for { + select { + case msg, ok := <-fromGun: + if !ok { + return + } + t.debugf("From gun: %v", string(msg)) + case msg, ok := <-toGun: + if !ok { + return + } + t.debugf("To gun: %v", string(msg)) + } + } + }() +} + +func (t *testContext) startGunWebSocketProxy(listenPort, targetPort int) (fromTarget <-chan []byte, toTarget <-chan []byte) { + fromTargetCh := make(chan []byte) + toTargetCh := make(chan []byte) + server := &http.Server{ + Addr: "127.0.0.1:" + strconv.Itoa(listenPort), + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.debugf("New ws proxy connection") + err := t.handleGunWebSocketProxy(targetPort, w, r, fromTargetCh, toTargetCh) + if _, ok := err.(*websocket.CloseError); !ok { + t.debugf("Unexpected web socket close error: %v", err) + } + }), + } + serverErrCh := make(chan error, 1) + go func() { serverErrCh <- server.ListenAndServe() }() + go func() { + defer server.Close() + select { + case <-t.Done(): + case err := <-serverErrCh: + log.Printf("Server error: %v", err) + } + }() + return fromTargetCh, toTargetCh +} + +var wsDefaultUpgrader = websocket.Upgrader{} + +func (t *testContext) handleGunWebSocketProxy( + targetPort int, + w http.ResponseWriter, + r *http.Request, + fromOther chan<- []byte, + toOther chan<- []byte, +) error { + otherConn, _, err := websocket.DefaultDialer.DialContext(t, "ws://127.0.0.1:"+strconv.Itoa(targetPort)+"/gun", nil) + if err != nil { + return err + } + defer otherConn.Close() + // Upgrade + c, err := wsDefaultUpgrader.Upgrade(w, r, nil) + if err != nil { + return err + } + defer c.Close() + type readMsg struct { + messageType int + p []byte + err error + } + + readCh := make(chan *readMsg) + go func() { + for { + msg := new(readMsg) + msg.messageType, msg.p, msg.err = c.ReadMessage() + readCh <- msg + } + }() + otherReadCh := make(chan *readMsg) + go func() { + for { + msg := new(readMsg) + msg.messageType, msg.p, msg.err = otherConn.ReadMessage() + otherReadCh <- msg + } + }() + for { + select { + case msg := <-readCh: + if msg.err != nil { + return msg.err + } + toOther <- msg.p + if err := otherConn.WriteMessage(msg.messageType, msg.p); err != nil { + return err + } + case otherMsg := <-otherReadCh: + if otherMsg.err != nil { + return otherMsg.err + } + fromOther <- otherMsg.p + if err := c.WriteMessage(otherMsg.messageType, otherMsg.p); err != nil { + return err + } + case <-t.Done(): + return t.Err() + } + } +} diff --git a/gun/time.go b/gun/time.go new file mode 100644 index 0000000..11f205e --- /dev/null +++ b/gun/time.go @@ -0,0 +1,46 @@ +package gun + +import ( + "sync/atomic" + "time" +) + +// TimeFromUnixMs returns zero'd time if ms is 0 +func TimeFromUnixMs(ms int64) time.Time { + if ms == 0 { + return time.Time{} + } + return time.Unix(0, ms*int64(time.Millisecond)) +} + +// TimeToUnixMs returns 0 if t.IsZero +func TimeToUnixMs(t time.Time) int64 { + if t.IsZero() { + return 0 + } + return t.UnixNano() / int64(time.Millisecond) +} + +func TimeNowUnixMs() int64 { + return TimeToUnixMs(time.Now()) +} + +var lastNano int64 + +// uniqueNano is 0 if ms is first time seen, otherwise a unique num in combination with ms +func TimeNowUniqueUnix() (ms int64, uniqueNum int64) { + now := time.Now() + newNano := now.UnixNano() + for { + prevLastNano := lastNano + if prevLastNano < newNano && atomic.CompareAndSwapInt64(&lastNano, prevLastNano, newNano) { + ms = newNano / int64(time.Millisecond) + // If was same ms as seen before, set uniqueNum to the nano part + if prevLastNano/int64(time.Millisecond) == ms { + uniqueNum = newNano%int64(time.Millisecond) + 1 + } + return + } + newNano = prevLastNano + 1 + } +} diff --git a/gun/util.go b/gun/util.go new file mode 100644 index 0000000..cdcc1d9 --- /dev/null +++ b/gun/util.go @@ -0,0 +1,17 @@ +package gun + +import "crypto/rand" + +const randChars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + +func randString(n int) (s string) { + // We accept that a multiple of 64 is %'d on 62 potentially favoring 0 or 1 more, but we don't care + byts := make([]byte, n) + if _, err := rand.Read(byts); err != nil { + panic(err) + } + for _, byt := range byts { + s += string(randChars[int(byt)%len(randChars)]) + } + return s +}