LICENSE and initial successful get test

This commit is contained in:
Chad Retz 2019-02-22 13:51:50 -06:00
parent 8a023d2017
commit 28da432b03
11 changed files with 145 additions and 41 deletions

21
gun/LICENSE Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2019 Chad Retz
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -13,6 +13,7 @@ type Gun struct {
soulGen func() string
peerErrorHandler func(*ErrPeer)
peerSleepOnError time.Duration
myPeerID string
messageIDPutListeners map[string]chan<- *MessageReceived
messageIDPutListenersLock sync.RWMutex
@ -24,6 +25,7 @@ type Config struct {
SoulGen func() string
PeerErrorHandler func(*ErrPeer)
PeerSleepOnError time.Duration
MyPeerID string
}
const DefaultPeerSleepOnError = 30 * time.Second
@ -35,6 +37,7 @@ func New(ctx context.Context, config Config) (*Gun, error) {
soulGen: config.SoulGen,
peerErrorHandler: config.PeerErrorHandler,
peerSleepOnError: config.PeerSleepOnError,
myPeerID: config.MyPeerID,
messageIDPutListeners: map[string]chan<- *MessageReceived{},
}
// Create all the peers
@ -46,7 +49,7 @@ func New(ctx context.Context, config Config) (*Gun, error) {
for i := 0; i < len(config.PeerURLs) && err == nil; i++ {
peerURL := config.PeerURLs[i]
connPeer := func() (Peer, error) { return NewPeer(ctx, peerURL) }
if g.peers[i], err = newGunPeer(connPeer, sleepOnError); err != nil {
if g.peers[i], err = newGunPeer(peerURL, connPeer, sleepOnError); err != nil {
err = fmt.Errorf("Failed connecting to peer %v: %v", peerURL, err)
}
}
@ -66,6 +69,9 @@ func New(ctx context.Context, config Config) (*Gun, error) {
if g.soulGen == nil {
g.soulGen = SoulGenDefault
}
if g.myPeerID == "" {
g.myPeerID = randString(9)
}
// Start receiving
g.startReceiving()
return g, nil
@ -123,7 +129,7 @@ func (g *Gun) startReceiving() {
// TDO: some kind of overall context is probably needed
ctx, cancelFn := context.WithCancel(context.TODO())
defer cancelFn()
for {
for !peer.closed() {
// We might not be able receive because peer is sleeping from
// an error happened within or a just-before send error.
if ok, msgs, err := peer.receive(ctx); !ok {
@ -135,17 +141,7 @@ func (g *Gun) startReceiving() {
} else {
// Go over each message and see if it needs delivering or rebroadcasting
for _, msg := range msgs {
rMsg := &MessageReceived{Message: msg, peer: peer}
if msg.Ack != "" && len(msg.Put) > 0 {
g.messageIDPutListenersLock.RLock()
l := g.messageIDPutListeners[msg.Ack]
g.messageIDPutListenersLock.RUnlock()
if l != nil {
go safeReceivedMessageSend(l, rMsg)
continue
}
}
go g.onUnhandledMessage(rMsg)
g.onPeerMessage(ctx, &MessageReceived{Message: msg, peer: peer})
}
}
}
@ -153,10 +149,33 @@ func (g *Gun) startReceiving() {
}
}
func (g *Gun) onUnhandledMessage(msg *MessageReceived) {
func (g *Gun) onPeerMessage(ctx context.Context, msg *MessageReceived) {
// If there is a listener for this message, use it
if msg.Ack != "" && len(msg.Put) > 0 {
g.messageIDPutListenersLock.RLock()
l := g.messageIDPutListeners[msg.Ack]
g.messageIDPutListenersLock.RUnlock()
if l != nil {
go safeReceivedMessageSend(l, msg)
return
}
}
// DAM messages are either requests for our ID or setting of theirs
if msg.DAM != "" {
if msg.PID == "" {
// This is a request, set the PID and send it back
msg.PID = g.myPeerID
if _, err := msg.peer.send(ctx, msg.Message); err != nil {
go g.onPeerError(&ErrPeer{err, msg.peer})
}
} else {
// This is them telling us theirs
msg.peer.id = msg.PID
}
return
}
// Unhandled message means rebroadcast
// TODO: we need a timeout or global context here...
g.send(context.TODO(), msg.Message, msg.peer)
g.send(ctx, msg.Message, msg.peer)
}
func (g *Gun) onPeerError(err *ErrPeer) {

View File

@ -7,16 +7,18 @@ import (
)
type gunPeer struct {
url string
connPeer func() (Peer, error)
sleepOnErr time.Duration // TODO: would be better as backoff
id string
peer Peer
peerBad bool // If true, don't try anything
peerLock sync.Mutex
}
func newGunPeer(connPeer func() (Peer, error), sleepOnErr time.Duration) (*gunPeer, error) {
p := &gunPeer{connPeer: connPeer, sleepOnErr: sleepOnErr}
func newGunPeer(url string, connPeer func() (Peer, error), sleepOnErr time.Duration) (*gunPeer, error) {
p := &gunPeer{url: url, connPeer: connPeer, sleepOnErr: sleepOnErr}
var err error
if p.peer, err = connPeer(); err != nil {
return nil, err
@ -24,9 +26,7 @@ func newGunPeer(connPeer func() (Peer, error), sleepOnErr time.Duration) (*gunPe
return p, nil
}
func (g *gunPeer) ID() string {
panic("TODO")
}
func (g *gunPeer) ID() string { return g.id }
func (g *gunPeer) reconnectPeer() (err error) {
g.peerLock.Lock()
@ -60,9 +60,20 @@ func (g *gunPeer) markPeerErrored(p Peer) {
}
func (g *gunPeer) send(ctx context.Context, msg *Message, moreMsgs ...*Message) (ok bool, err error) {
if p := g.connectedPeer(); p == nil {
p := g.connectedPeer()
if p == nil {
return false, nil
} else if err = p.Send(ctx, msg, moreMsgs...); err != nil {
}
// Clone them with peer "to"
updatedMsg := msg.Clone()
updatedMsg.To = g.url
updatedMoreMsgs := make([]*Message, len(moreMsgs))
for i, moreMsg := range moreMsgs {
moreMsg := moreMsg.Clone()
moreMsg.To = g.url
updatedMoreMsgs[i] = moreMsg
}
if err = p.Send(ctx, updatedMsg, updatedMoreMsgs...); err != nil {
g.markPeerErrored(p)
return false, err
} else {
@ -89,3 +100,9 @@ func (g *gunPeer) Close() error {
g.peerBad = false
return err
}
func (g *gunPeer) closed() bool {
g.peerLock.Lock()
defer g.peerLock.Unlock()
return g.peer == nil && !g.peerBad
}

View File

@ -1,10 +1,12 @@
package gun
import "encoding/json"
type Message struct {
Ack string `json:"@,omitEmpty"`
ID string `json:"#,omitEmpty"`
To string `json:"><,omitEmpty"`
Hash string `json:"##,omitempty"`
Ack string `json:"@,omitempty"`
ID string `json:"#,omitempty"`
To string `json:"><,omitempty"`
Hash json.Number `json:"##,omitempty"`
How string `json:"how,omitempty"`
Get *MessageGetRequest `json:"get,omitempty"`
Put map[string]*Node `json:"put,omitempty"`
@ -12,6 +14,12 @@ type Message struct {
PID string `json:"pid,omitempty"`
}
func (m *Message) Clone() *Message {
msg := &Message{}
*msg = *m
return msg
}
type MessageGetRequest struct {
Soul string `json:"#,omitempty"`
Field string `json:".,omitempty"`

View File

@ -61,8 +61,8 @@ func (n *Node) UnmarshalJSON(b []byte) error {
}
type Metadata struct {
Soul string `json:"#"`
State map[string]int64 `json:">"`
Soul string `json:"#,omitempty"`
State map[string]int64 `json:">,omitempty"`
}
type Value interface {

View File

@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"net/url"
"sync"
"github.com/gorilla/websocket"
)
@ -24,7 +25,15 @@ type Peer interface {
}
var PeerURLSchemes = map[string]func(context.Context, *url.URL) (Peer, error){
"ws": func(ctx context.Context, peerUrl *url.URL) (Peer, error) { return NewPeerWebSocket(ctx, peerUrl) },
"http": func(ctx context.Context, peerURL *url.URL) (Peer, error) {
schemeChangedURL := &url.URL{}
*schemeChangedURL = *peerURL
schemeChangedURL.Scheme = "ws"
return NewPeerWebSocket(ctx, schemeChangedURL)
},
"ws": func(ctx context.Context, peerURL *url.URL) (Peer, error) {
return NewPeerWebSocket(ctx, peerURL)
},
}
func NewPeer(ctx context.Context, peerURL string) (Peer, error) {
@ -39,6 +48,7 @@ func NewPeer(ctx context.Context, peerURL string) (Peer, error) {
type PeerWebSocket struct {
Underlying *websocket.Conn
WriteLock sync.Mutex
}
func NewPeerWebSocket(ctx context.Context, peerUrl *url.URL) (*PeerWebSocket, error) {
@ -46,7 +56,7 @@ func NewPeerWebSocket(ctx context.Context, peerUrl *url.URL) (*PeerWebSocket, er
if err != nil {
return nil, err
}
return &PeerWebSocket{conn}, nil
return &PeerWebSocket{Underlying: conn}, nil
}
func (p *PeerWebSocket) Send(ctx context.Context, msg *Message, moreMsgs ...*Message) error {
@ -70,7 +80,11 @@ func (p *PeerWebSocket) Send(ctx context.Context, msg *Message, moreMsgs ...*Mes
}
// Send async so we can wait on context
errCh := make(chan error, 1)
go func() { errCh <- p.Underlying.WriteJSON(toWrite) }()
go func() {
p.WriteLock.Lock()
defer p.WriteLock.Unlock()
errCh <- p.Underlying.WriteJSON(toWrite)
}()
select {
case err := <-errCh:
return err

View File

@ -29,6 +29,7 @@ func newScoped(gun *Gun, parent *Scoped, field string) *Scoped {
gun: gun,
parent: parent,
field: field,
valueChansToListeners: map[<-chan *ValueFetch]*messageIDListener{},
}
}

View File

@ -18,10 +18,18 @@ type StorageInMem struct {
values sync.Map
}
type parentSoulAndField struct{ parentSoul, field string }
func (s *StorageInMem) Get(ctx context.Context, parentSoul, field string) (*ValueWithState, error) {
panic("TODO")
v, ok := s.values.Load(parentSoulAndField{parentSoul, field})
if !ok {
return nil, ErrStorageNotFound
}
return v.(*ValueWithState), nil
}
func (s *StorageInMem) Put(ctx context.Context, parentSoul, field string, val *ValueWithState) (bool, error) {
panic("TODO")
s.values.Store(parentSoulAndField{parentSoul, field}, val)
// TODO: conflict resolution state check?
return true, nil
}

View File

@ -94,7 +94,13 @@ func (t *testContext) startGunJSServer() {
}
func (t *testContext) newGunConnectedToGunJS() *gun.Gun {
g, err := gun.NewFromPeerURLs(t, "http://127.0.0.1:"+strconv.Itoa(t.GunJSPort)+"/gun")
config := gun.Config{
PeerURLs: []string{"http://127.0.0.1:" + strconv.Itoa(t.GunJSPort) + "/gun"},
PeerErrorHandler: func(errPeer *gun.ErrPeer) {
t.debugf("Got peer error: %v", errPeer)
},
}
g, err := gun.New(t, config)
t.Require.NoError(err)
return g
}

View File

@ -2,6 +2,8 @@ package tests
import (
"testing"
"github.com/cretz/esgopeta/gun"
)
func TestGunGetSimple(t *testing.T) {
@ -22,7 +24,9 @@ func TestGunGetSimple(t *testing.T) {
`)
// Get
g := ctx.newGunConnectedToGunJS()
f := g.Scoped(ctx, "esgopeta-test", "TestGunGet", "some-key").Val(ctx)
f := g.Scoped(ctx, "esgopeta-test", "TestGunGetSimple", "some-key").Val(ctx)
ctx.Require.NoError(f.Err)
// Make sure we got back the same value
ctx.Require.Equal(gun.ValueString(randStr), f.Value.Value.(gun.ValueString))
}

View File

@ -22,6 +22,7 @@ func (t *testContext) startGunWebSocketProxyLogger(listenPort, targetPort int) {
return
}
if testing.Verbose() {
t.debugf("From gun raw: %v", string(msg))
for _, s := range t.formattedGunJSONs(msg) {
t.debugf("From gun: %v", s)
}
@ -31,12 +32,17 @@ func (t *testContext) startGunWebSocketProxyLogger(listenPort, targetPort int) {
return
}
if testing.Verbose() {
t.debugf("To gun raw: %v", string(msg))
if len(msg) == 0 {
t.debugf("To gun: empty message")
} else {
for _, s := range t.formattedGunJSONs(msg) {
t.debugf("To gun: %v", s)
}
}
}
}
}
}()
}