diff --git a/gun/gun.go b/gun/gun.go index ed5c27a..a85a0d7 100644 --- a/gun/gun.go +++ b/gun/gun.go @@ -8,7 +8,7 @@ import ( ) type Gun struct { - peers []*gunPeer + peers []*Peer storage Storage soulGen func() string peerErrorHandler func(*ErrPeer) @@ -42,7 +42,7 @@ const DefaultPeerSleepOnError = 30 * time.Second func New(ctx context.Context, config Config) (*Gun, error) { g := &Gun{ - peers: make([]*gunPeer, len(config.PeerURLs)), + peers: make([]*Peer, len(config.PeerURLs)), storage: config.Storage, soulGen: config.SoulGen, peerErrorHandler: config.PeerErrorHandler, @@ -59,8 +59,8 @@ func New(ctx context.Context, config Config) (*Gun, error) { var err error for i := 0; i < len(config.PeerURLs) && err == nil; i++ { peerURL := config.PeerURLs[i] - connPeer := func() (Peer, error) { return NewPeer(ctx, peerURL) } - if g.peers[i], err = newGunPeer(peerURL, connPeer, sleepOnError); err != nil { + newConn := func() (PeerConn, error) { return NewPeerConn(ctx, peerURL) } + if g.peers[i], err = newPeer(peerURL, newConn, sleepOnError); err != nil { err = fmt.Errorf("Failed connecting to peer %v: %v", peerURL, err) } } @@ -88,6 +88,14 @@ func New(ctx context.Context, config Config) (*Gun, error) { return g, nil } +func (g *Gun) Scoped(ctx context.Context, key string, children ...string) *Scoped { + s := newScoped(g, nil, key) + if len(children) > 0 { + s = s.Scoped(ctx, children[0], children[1:]...) + } + return s +} + func (g *Gun) Close() error { var errs []error for _, p := range g.peers { @@ -104,11 +112,7 @@ func (g *Gun) Close() error { } } -func (g *Gun) Send(ctx context.Context, msg *Message) <-chan *ErrPeer { - return g.send(ctx, msg, nil) -} - -func (g *Gun) send(ctx context.Context, msg *Message, ignorePeer *gunPeer) <-chan *ErrPeer { +func (g *Gun) send(ctx context.Context, msg *Message, ignorePeer *Peer) <-chan *ErrPeer { ch := make(chan *ErrPeer, len(g.peers)) // Everything async go func() { @@ -119,7 +123,7 @@ func (g *Gun) send(ctx context.Context, msg *Message, ignorePeer *gunPeer) <-cha continue } wg.Add(1) - go func(peer *gunPeer) { + go func(peer *Peer) { defer wg.Done() // Just do nothing if the peer is bad and we couldn't send if _, err := peer.send(ctx, msg); err != nil { @@ -136,11 +140,11 @@ func (g *Gun) send(ctx context.Context, msg *Message, ignorePeer *gunPeer) <-cha func (g *Gun) startReceiving() { for _, peer := range g.peers { - go func(peer *gunPeer) { + go func(peer *Peer) { // TDO: some kind of overall context is probably needed ctx, cancelFn := context.WithCancel(context.TODO()) defer cancelFn() - for !peer.closed() { + for !peer.Closed() { // We might not be able receive because peer is sleeping from // an error happened within or a just-before send error. if ok, msgs, err := peer.receive(ctx); !ok { @@ -152,7 +156,7 @@ func (g *Gun) startReceiving() { } else { // Go over each message and see if it needs delivering or rebroadcasting for _, msg := range msgs { - g.onPeerMessage(ctx, &MessageReceived{Message: msg, peer: peer}) + g.onPeerMessage(ctx, &MessageReceived{Message: msg, Peer: peer}) } } } @@ -176,17 +180,17 @@ func (g *Gun) onPeerMessage(ctx context.Context, msg *MessageReceived) { if msg.PID == "" { // This is a request, set the PID and send it back msg.PID = g.myPeerID - if _, err := msg.peer.send(ctx, msg.Message); err != nil { - go g.onPeerError(&ErrPeer{err, msg.peer}) + if _, err := msg.Peer.send(ctx, msg.Message); err != nil { + go g.onPeerError(&ErrPeer{err, msg.Peer}) } } else { // This is them telling us theirs - msg.peer.id = msg.PID + msg.Peer.id = msg.PID } return } // Unhandled message means rebroadcast - g.send(ctx, msg.Message, msg.peer) + g.send(ctx, msg.Message, msg.Peer) } func (g *Gun) onPeerError(err *ErrPeer) { @@ -195,26 +199,18 @@ func (g *Gun) onPeerError(err *ErrPeer) { } } -func (g *Gun) RegisterMessageIDListener(id string, ch chan<- *MessageReceived) { +func (g *Gun) registerMessageIDListener(id string, ch chan<- *MessageReceived) { g.messageIDListenersLock.Lock() defer g.messageIDListenersLock.Unlock() g.messageIDListeners[id] = ch } -func (g *Gun) UnregisterMessageIDListener(id string) { +func (g *Gun) unregisterMessageIDListener(id string) { g.messageIDListenersLock.Lock() defer g.messageIDListenersLock.Unlock() delete(g.messageIDListeners, id) } -func (g *Gun) Scoped(ctx context.Context, key string, children ...string) *Scoped { - s := newScoped(g, nil, key) - if len(children) > 0 { - s = s.Scoped(ctx, children[0], children[1:]...) - } - return s -} - func safeReceivedMessageSend(ch chan<- *MessageReceived, msg *MessageReceived) { // Due to the fact that we may send on a closed channel here, we ignore the panic defer func() { recover() }() diff --git a/gun/gun_peer.go b/gun/gun_peer.go deleted file mode 100644 index 3a3725d..0000000 --- a/gun/gun_peer.go +++ /dev/null @@ -1,111 +0,0 @@ -package gun - -import ( - "context" - "sync" - "time" -) - -type gunPeer struct { - url string - connPeer func() (Peer, error) - sleepOnErr time.Duration // TODO: would be better as backoff - id string - - peer Peer - peerBad bool // If true, don't try anything - peerLock sync.Mutex -} - -func newGunPeer(url string, connPeer func() (Peer, error), sleepOnErr time.Duration) (*gunPeer, error) { - p := &gunPeer{url: url, connPeer: connPeer, sleepOnErr: sleepOnErr} - var err error - if p.peer, err = connPeer(); err != nil { - return nil, err - } - return p, nil -} - -func (g *gunPeer) ID() string { return g.id } - -func (g *gunPeer) reconnectPeer() (err error) { - g.peerLock.Lock() - defer g.peerLock.Unlock() - if g.peer == nil && g.peerBad { - g.peerBad = false - if g.peer, err = g.connPeer(); err != nil { - g.peerBad = true - time.AfterFunc(g.sleepOnErr, func() { g.reconnectPeer() }) - } - } - return -} - -// Can be nil peer if currently bad -func (g *gunPeer) connectedPeer() Peer { - g.peerLock.Lock() - defer g.peerLock.Unlock() - return g.peer -} - -func (g *gunPeer) markPeerErrored(p Peer) { - g.peerLock.Lock() - defer g.peerLock.Unlock() - if p == g.peer { - g.peer = nil - g.peerBad = true - p.Close() - time.AfterFunc(g.sleepOnErr, func() { g.reconnectPeer() }) - } -} - -func (g *gunPeer) send(ctx context.Context, msg *Message, moreMsgs ...*Message) (ok bool, err error) { - p := g.connectedPeer() - if p == nil { - return false, nil - } - // Clone them with peer "to" - updatedMsg := msg.Clone() - updatedMsg.To = g.url - updatedMoreMsgs := make([]*Message, len(moreMsgs)) - for i, moreMsg := range moreMsgs { - moreMsg := moreMsg.Clone() - moreMsg.To = g.url - updatedMoreMsgs[i] = moreMsg - } - if err = p.Send(ctx, updatedMsg, updatedMoreMsgs...); err != nil { - g.markPeerErrored(p) - return false, err - } else { - return true, nil - } -} - -func (g *gunPeer) receive(ctx context.Context) (ok bool, msgs []*Message, err error) { - if p := g.connectedPeer(); p == nil { - return false, nil, nil - } else if msgs, err = p.Receive(ctx); err != nil { - g.markPeerErrored(p) - return false, nil, err - } else { - return true, msgs, nil - } -} - -func (g *gunPeer) Close() error { - g.peerLock.Lock() - defer g.peerLock.Unlock() - var err error - if g.peer != nil { - err = g.peer.Close() - g.peer = nil - } - g.peerBad = false - return err -} - -func (g *gunPeer) closed() bool { - g.peerLock.Lock() - defer g.peerLock.Unlock() - return g.peer == nil && !g.peerBad -} diff --git a/gun/message.go b/gun/message.go index 596f4a1..508f1f8 100644 --- a/gun/message.go +++ b/gun/message.go @@ -16,12 +16,6 @@ type Message struct { Err string `json:"err,omitempty"` } -func (m *Message) Clone() *Message { - msg := &Message{} - *msg = *m - return msg -} - type MessageGetRequest struct { Soul string `json:"#,omitempty"` Field string `json:".,omitempty"` @@ -29,5 +23,5 @@ type MessageGetRequest struct { type MessageReceived struct { *Message - peer *gunPeer + Peer *Peer } diff --git a/gun/node.go b/gun/node.go index f86a356..e7e8b21 100644 --- a/gun/node.go +++ b/gun/node.go @@ -8,7 +8,7 @@ import ( ) func DefaultSoulGen() string { - ms, uniqueNum := TimeNowUniqueUnix() + ms, uniqueNum := timeNowUniqueUnix() s := strconv.FormatInt(ms, 36) if uniqueNum > 0 { s += strconv.FormatInt(uniqueNum, 36) @@ -54,7 +54,7 @@ func (n *Node) UnmarshalJSON(b []byte) error { } } else if val, err := dec.Token(); err != nil { return err - } else if n.Values[keyStr], err = DecodeJSONValue(val, dec); err != nil { + } else if n.Values[keyStr], err = ValueDecodeJSON(val, dec); err != nil { return err } } @@ -62,14 +62,15 @@ func (n *Node) UnmarshalJSON(b []byte) error { type Metadata struct { Soul string `json:"#,omitempty"` - State map[string]int64 `json:">,omitempty"` + State map[string]State `json:">,omitempty"` } -// TODO: put private methd to seal enum +// TODO: put private method to seal enum type Value interface { + nodeValue() } -func DecodeJSONValue(token json.Token, dec *json.Decoder) (Value, error) { +func ValueDecodeJSON(token json.Token, dec *json.Decoder) (Value, error) { switch token := token.(type) { case nil: return nil, nil @@ -103,16 +104,27 @@ func DecodeJSONValue(token json.Token, dec *json.Decoder) (Value, error) { } type ValueString string + +func (ValueString) nodeValue() {} + type ValueNumber string + +func (ValueNumber) nodeValue() {} + type ValueBool bool + +func (ValueBool) nodeValue() {} + type ValueRelation string +func (ValueRelation) nodeValue() {} + func (n ValueRelation) MarshalJSON() ([]byte, error) { return json.Marshal(map[string]string{"#": string(n)}) } -type ValueWithState struct { - Value Value - // This is 0 for top-level values - State int64 -} +// type ValueWithState struct { +// Value Value +// // This is 0 for top-level values +// State int64 +// } diff --git a/gun/peer.go b/gun/peer.go index 4fd0ab1..892af82 100644 --- a/gun/peer.go +++ b/gun/peer.go @@ -6,41 +6,160 @@ import ( "fmt" "net/url" "sync" + "time" "github.com/gorilla/websocket" ) type ErrPeer struct { Err error - peer *gunPeer + Peer *Peer } -func (e *ErrPeer) Error() string { return fmt.Sprintf("Error on peer %v: %v", e.peer, e.Err) } +func (e *ErrPeer) Error() string { return fmt.Sprintf("Error on peer %v: %v", e.Peer, e.Err) } -type Peer interface { +type Peer struct { + url string + newConn func() (PeerConn, error) + sleepOnErr time.Duration // TODO: would be better as backoff + id string + + connCurrent PeerConn + connBad bool // If true, don't try anything + connLock sync.Mutex +} + +func newPeer(url string, newConn func() (PeerConn, error), sleepOnErr time.Duration) (*Peer, error) { + p := &Peer{url: url, newConn: newConn, sleepOnErr: sleepOnErr} + var err error + if p.connCurrent, err = newConn(); err != nil { + return nil, err + } + return p, nil +} + +func (p *Peer) ID() string { return p.id } + +func (p *Peer) String() string { + id := "" + if p.id != "" { + id = "(id: " + p.id + ")" + } + connStatus := "connected" + if p.Conn() == nil { + connStatus = "disconnected" + } + return fmt.Sprintf("Peer%v %v (%v)", id, p.url, connStatus) +} + +func (p *Peer) reconnect() (err error) { + p.connLock.Lock() + defer p.connLock.Unlock() + if p.connCurrent == nil && p.connBad { + p.connBad = false + if p.connCurrent, err = p.newConn(); err != nil { + p.connBad = true + time.AfterFunc(p.sleepOnErr, func() { p.reconnect() }) + } + } + return +} + +// Can be nil peer if currently bad or closed +func (p *Peer) Conn() PeerConn { + p.connLock.Lock() + defer p.connLock.Unlock() + return p.connCurrent +} + +func (p *Peer) markConnErrored(conn PeerConn) { + p.connLock.Lock() + defer p.connLock.Unlock() + if conn == p.connCurrent { + p.connCurrent = nil + p.connBad = true + conn.Close() + time.AfterFunc(p.sleepOnErr, func() { p.reconnect() }) + } +} + +func (p *Peer) send(ctx context.Context, msg *Message, moreMsgs ...*Message) (ok bool, err error) { + conn := p.Conn() + if conn == nil { + return false, nil + } + // Clone them with peer "to" + updatedMsg := &Message{} + *updatedMsg = *msg + updatedMsg.To = p.url + updatedMoreMsgs := make([]*Message, len(moreMsgs)) + for i, moreMsg := range moreMsgs { + updatedMoreMsg := &Message{} + *updatedMoreMsg = *moreMsg + updatedMoreMsg.To = p.url + updatedMoreMsgs[i] = updatedMoreMsg + } + if err = conn.Send(ctx, updatedMsg, updatedMoreMsgs...); err != nil { + p.markConnErrored(conn) + return false, err + } else { + return true, nil + } +} + +func (p *Peer) receive(ctx context.Context) (ok bool, msgs []*Message, err error) { + if conn := p.Conn(); conn == nil { + return false, nil, nil + } else if msgs, err = conn.Receive(ctx); err != nil { + p.markConnErrored(conn) + return false, nil, err + } else { + return true, msgs, nil + } +} + +func (p *Peer) Close() error { + p.connLock.Lock() + defer p.connLock.Unlock() + var err error + if p.connCurrent != nil { + err = p.connCurrent.Close() + p.connCurrent = nil + } + p.connBad = false + return err +} + +func (p *Peer) Closed() bool { + p.connLock.Lock() + defer p.connLock.Unlock() + return p.connCurrent == nil && !p.connBad +} + +type PeerConn interface { Send(ctx context.Context, msg *Message, moreMsgs ...*Message) error // Chan is closed on first err, when context is closed, or when peer is closed Receive(ctx context.Context) ([]*Message, error) Close() error } -var PeerURLSchemes map[string]func(context.Context, *url.URL) (Peer, error) +var PeerURLSchemes map[string]func(context.Context, *url.URL) (PeerConn, error) func init() { - PeerURLSchemes = map[string]func(context.Context, *url.URL) (Peer, error){ - "http": func(ctx context.Context, peerURL *url.URL) (Peer, error) { + PeerURLSchemes = map[string]func(context.Context, *url.URL) (PeerConn, error){ + "http": func(ctx context.Context, peerURL *url.URL) (PeerConn, error) { schemeChangedURL := &url.URL{} *schemeChangedURL = *peerURL schemeChangedURL.Scheme = "ws" - return NewPeerWebSocket(ctx, schemeChangedURL) + return NewPeerConnWebSocket(ctx, schemeChangedURL) }, - "ws": func(ctx context.Context, peerURL *url.URL) (Peer, error) { - return NewPeerWebSocket(ctx, peerURL) + "ws": func(ctx context.Context, peerURL *url.URL) (PeerConn, error) { + return NewPeerConnWebSocket(ctx, peerURL) }, } } -func NewPeer(ctx context.Context, peerURL string) (Peer, error) { +func NewPeerConn(ctx context.Context, peerURL string) (PeerConn, error) { if parsedURL, err := url.Parse(peerURL); err != nil { return nil, err } else if peerNew := PeerURLSchemes[parsedURL.Scheme]; peerNew == nil { @@ -50,20 +169,20 @@ func NewPeer(ctx context.Context, peerURL string) (Peer, error) { } } -type PeerWebSocket struct { +type PeerConnWebSocket struct { Underlying *websocket.Conn WriteLock sync.Mutex } -func NewPeerWebSocket(ctx context.Context, peerUrl *url.URL) (*PeerWebSocket, error) { +func NewPeerConnWebSocket(ctx context.Context, peerUrl *url.URL) (*PeerConnWebSocket, error) { conn, _, err := websocket.DefaultDialer.DialContext(ctx, peerUrl.String(), nil) if err != nil { return nil, err } - return &PeerWebSocket{Underlying: conn}, nil + return &PeerConnWebSocket{Underlying: conn}, nil } -func (p *PeerWebSocket) Send(ctx context.Context, msg *Message, moreMsgs ...*Message) error { +func (p *PeerConnWebSocket) Send(ctx context.Context, msg *Message, moreMsgs ...*Message) error { // If there are more, send all as an array of JSON strings, otherwise just the msg var toWrite interface{} if len(moreMsgs) == 0 { @@ -97,7 +216,7 @@ func (p *PeerWebSocket) Send(ctx context.Context, msg *Message, moreMsgs ...*Mes } } -func (p *PeerWebSocket) Receive(ctx context.Context) ([]*Message, error) { +func (p *PeerConnWebSocket) Receive(ctx context.Context) ([]*Message, error) { bytsCh := make(chan []byte, 1) errCh := make(chan error, 1) go func() { @@ -135,6 +254,6 @@ func (p *PeerWebSocket) Receive(ctx context.Context) ([]*Message, error) { } } -func (p *PeerWebSocket) Close() error { +func (p *PeerConnWebSocket) Close() error { return p.Underlying.Close() } diff --git a/gun/scoped_fetch.go b/gun/scoped_fetch.go index 7486965..2f5d0e3 100644 --- a/gun/scoped_fetch.go +++ b/gun/scoped_fetch.go @@ -22,11 +22,10 @@ func (s *Scoped) FetchOneLocal(ctx context.Context) *FetchResult { // Need parent soul for lookup var parentSoul string if parentSoul, r.Err = s.parent.Soul(ctx); r.Err == nil { - var vs *ValueWithState - if vs, r.Err = s.gun.storage.Get(ctx, parentSoul, s.field); r.Err == ErrStorageNotFound { + if r.Value, r.State, r.Err = s.gun.storage.Get(ctx, parentSoul, s.field); r.Err == ErrStorageNotFound { r.Err = nil } else if r.Err == nil { - r.Value, r.State, r.ValueExists = vs.Value, vs.State, true + r.ValueExists = true } } return r @@ -89,7 +88,7 @@ func (s *Scoped) fetchRemote(ctx context.Context, ch chan *FetchResult) { s.fetchResultListeners[ch] = &fetchResultListener{req.ID, ch, msgCh} s.fetchResultListenersLock.Unlock() // Listen for responses to this get - s.gun.RegisterMessageIDListener(req.ID, msgCh) + s.gun.registerMessageIDListener(req.ID, msgCh) // TODO: only for children: s.gun.RegisterValueIDListener(s.id, msgCh) // Handle received messages turning them to value fetches go func() { @@ -103,7 +102,7 @@ func (s *Scoped) fetchRemote(ctx context.Context, ch chan *FetchResult) { if !ok { return } - r := &FetchResult{Field: s.field, peer: msg.peer} + r := &FetchResult{Field: s.field, Peer: msg.Peer} // We asked for a single field, should only get that field or it doesn't exist if msg.Err != "" { r.Err = fmt.Errorf("Remote error: %v", msg.Err) @@ -119,11 +118,11 @@ func (s *Scoped) fetchRemote(ctx context.Context, ch chan *FetchResult) { }() // Send async, sending back errors go func() { - for peerErr := range s.gun.Send(ctx, req) { + for peerErr := range s.gun.send(ctx, req, nil) { safeFetchResultSend(ch, &FetchResult{ Err: peerErr.Err, Field: s.field, - peer: peerErr.peer, + Peer: peerErr.Peer, }) } }() @@ -136,7 +135,7 @@ func (s *Scoped) FetchDone(ch <-chan *FetchResult) bool { s.fetchResultListenersLock.Unlock() if l != nil { // Unregister the chan - s.gun.UnregisterMessageIDListener(l.id) + s.gun.unregisterMessageIDListener(l.id) // Close the message chan and the result chan close(l.receivedMessages) close(l.results) @@ -162,8 +161,8 @@ type FetchResult struct { Field string // Nil if the value doesn't exist, exists and is nil, or there's an error Value Value - State int64 // This can be 0 for errors or top-level value relations + State State // This can be 0 for errors or top-level value relations ValueExists bool // Nil when local and sometimes on error - peer *gunPeer + Peer *Peer } diff --git a/gun/scoped_put.go b/gun/scoped_put.go index 4273235..8fad62e 100644 --- a/gun/scoped_put.go +++ b/gun/scoped_put.go @@ -13,8 +13,8 @@ type putResultListener struct { type PutResult struct { Err error - - peer *gunPeer + // Nil on error or local put success + Peer *Peer } type PutOption interface{} @@ -71,7 +71,7 @@ func (s *Scoped) Put(ctx context.Context, val Value, opts ...PutOption) <-chan * } // We know that the first has a soul prevParentSoul := parents[0].cachedSoul() - currState := TimeNowUnixMs() + currState := StateNow() for _, parent := range parents[1:] { parentCachedSoul := parent.cachedSoul() if parentCachedSoul == "" { @@ -80,15 +80,14 @@ func (s *Scoped) Put(ctx context.Context, val Value, opts ...PutOption) <-chan * req.Put[prevParentSoul] = &Node{ Metadata: Metadata{ Soul: prevParentSoul, - State: map[string]int64{parent.field: currState}, + State: map[string]State{parent.field: currState}, }, Values: map[string]Value{parent.field: ValueRelation(parentCachedSoul)}, } // Also store locally and set the cached soul - withState := &ValueWithState{ValueRelation(parentCachedSoul), currState} // TODO: Should I not store until the very end just in case it errors halfway // though? There are no standard cases where it should fail. - if ok, err := s.gun.storage.Put(ctx, prevParentSoul, parent.field, withState); err != nil { + if ok, err := s.gun.storage.Put(ctx, prevParentSoul, parent.field, ValueRelation(parentCachedSoul), currState); err != nil { ch <- &PutResult{Err: err} close(ch) return ch @@ -104,9 +103,8 @@ func (s *Scoped) Put(ctx context.Context, val Value, opts ...PutOption) <-chan * } prevParentSoul = parentCachedSoul } - // Now that we've setup all the parents, we can do this store locally. - withState := &ValueWithState{val, currState} - if ok, err := s.gun.storage.Put(ctx, prevParentSoul, s.field, withState); err != nil { + // Now that we've setup all the parents, we can do this store locally + if ok, err := s.gun.storage.Put(ctx, prevParentSoul, s.field, val, currState); err != nil { ch <- &PutResult{Err: err} close(ch) return ch @@ -125,7 +123,7 @@ func (s *Scoped) Put(ctx context.Context, val Value, opts ...PutOption) <-chan * req.Put[prevParentSoul] = &Node{ Metadata: Metadata{ Soul: prevParentSoul, - State: map[string]int64{s.field: currState}, + State: map[string]State{s.field: currState}, }, Values: map[string]Value{s.field: val}, } @@ -134,7 +132,7 @@ func (s *Scoped) Put(ctx context.Context, val Value, opts ...PutOption) <-chan * s.putResultListenersLock.Lock() s.putResultListeners[ch] = &putResultListener{req.ID, ch, msgCh} s.putResultListenersLock.Unlock() - s.gun.RegisterMessageIDListener(req.ID, msgCh) + s.gun.registerMessageIDListener(req.ID, msgCh) // Start message listener go func() { for { @@ -147,7 +145,7 @@ func (s *Scoped) Put(ctx context.Context, val Value, opts ...PutOption) <-chan * if !ok { return } - r := &PutResult{peer: msg.peer} + r := &PutResult{Peer: msg.Peer} if msg.Err != "" { r.Err = fmt.Errorf("Remote error: %v", msg.Err) } else if msg.OK != 1 { @@ -159,10 +157,10 @@ func (s *Scoped) Put(ctx context.Context, val Value, opts ...PutOption) <-chan * }() // Send async, sending back errors go func() { - for peerErr := range s.gun.Send(ctx, req) { + for peerErr := range s.gun.send(ctx, req, nil) { safePutResultSend(ch, &PutResult{ Err: peerErr.Err, - peer: peerErr.peer, + Peer: peerErr.Peer, }) } }() @@ -176,7 +174,7 @@ func (s *Scoped) PutDone(ch <-chan *PutResult) bool { s.putResultListenersLock.Unlock() if l != nil { // Unregister the chan - s.gun.UnregisterMessageIDListener(l.id) + s.gun.unregisterMessageIDListener(l.id) // Close the message chan and the result chan close(l.receivedMessages) close(l.results) diff --git a/gun/time.go b/gun/state.go similarity index 68% rename from gun/time.go rename to gun/state.go index 11f205e..96d50ad 100644 --- a/gun/time.go +++ b/gun/state.go @@ -5,30 +5,34 @@ import ( "time" ) -// TimeFromUnixMs returns zero'd time if ms is 0 -func TimeFromUnixMs(ms int64) time.Time { +type State uint64 + +func StateNow() State { return State(timeNowUnixMs()) } + +// timeFromUnixMs returns zero'd time if ms is 0 +func timeFromUnixMs(ms int64) time.Time { if ms == 0 { return time.Time{} } return time.Unix(0, ms*int64(time.Millisecond)) } -// TimeToUnixMs returns 0 if t.IsZero -func TimeToUnixMs(t time.Time) int64 { +// timeToUnixMs returns 0 if t.IsZero +func timeToUnixMs(t time.Time) int64 { if t.IsZero() { return 0 } return t.UnixNano() / int64(time.Millisecond) } -func TimeNowUnixMs() int64 { - return TimeToUnixMs(time.Now()) +func timeNowUnixMs() int64 { + return timeToUnixMs(time.Now()) } var lastNano int64 // uniqueNano is 0 if ms is first time seen, otherwise a unique num in combination with ms -func TimeNowUniqueUnix() (ms int64, uniqueNum int64) { +func timeNowUniqueUnix() (ms int64, uniqueNum int64) { now := time.Now() newNano := now.UnixNano() for { diff --git a/gun/storage.go b/gun/storage.go index 425aa20..a98ede4 100644 --- a/gun/storage.go +++ b/gun/storage.go @@ -9,9 +9,9 @@ import ( var ErrStorageNotFound = errors.New("Not found") type Storage interface { - Get(ctx context.Context, parentSoul, field string) (*ValueWithState, error) + Get(ctx context.Context, parentSoul, field string) (Value, State, error) // If bool is false, it's deferred - Put(ctx context.Context, parentSoul, field string, val *ValueWithState) (bool, error) + Put(ctx context.Context, parentSoul, field string, val Value, state State) (bool, error) Tracking(ctx context.Context, parentSoul, field string) (bool, error) } @@ -21,16 +21,22 @@ type StorageInMem struct { type parentSoulAndField struct{ parentSoul, field string } -func (s *StorageInMem) Get(ctx context.Context, parentSoul, field string) (*ValueWithState, error) { - v, ok := s.values.Load(parentSoulAndField{parentSoul, field}) - if !ok { - return nil, ErrStorageNotFound - } - return v.(*ValueWithState), nil +type valueWithState struct { + val Value + state State } -func (s *StorageInMem) Put(ctx context.Context, parentSoul, field string, val *ValueWithState) (bool, error) { - s.values.Store(parentSoulAndField{parentSoul, field}, val) +func (s *StorageInMem) Get(ctx context.Context, parentSoul, field string) (Value, State, error) { + v, ok := s.values.Load(parentSoulAndField{parentSoul, field}) + if !ok { + return nil, 0, ErrStorageNotFound + } + vs := v.(*valueWithState) + return vs.val, vs.state, nil +} + +func (s *StorageInMem) Put(ctx context.Context, parentSoul, field string, val Value, state State) (bool, error) { + s.values.Store(parentSoulAndField{parentSoul, field}, &valueWithState{val, state}) // TODO: conflict resolution state check? return true, nil } diff --git a/gun/tests/context_test.go b/gun/tests/context_test.go index 3aa6f61..a5172cc 100644 --- a/gun/tests/context_test.go +++ b/gun/tests/context_test.go @@ -26,6 +26,15 @@ func newContext(t *testing.T) (*testContext, context.CancelFunc) { return withTestContext(context.Background(), t) } +func newContextWithGunJServer(t *testing.T) (*testContext, context.CancelFunc) { + ctx, cancelFn := newContext(t) + serverCancelFn := ctx.startGunJSServer() + return ctx, func() { + serverCancelFn() + cancelFn() + } +} + const defaultGunJSPort = 8080 func withTestContext(ctx context.Context, t *testing.T) (*testContext, context.CancelFunc) { @@ -77,20 +86,26 @@ func (t *testContext) startJS(script string) (*bytes.Buffer, *exec.Cmd, context. return &buf, cmd, cancelFn } -func (t *testContext) startGunJSServer() { +func (t *testContext) startGunJSServer() context.CancelFunc { // If we're logging, use a proxy port := t.GunJSPort if testing.Verbose() { t.startGunWebSocketProxyLogger(port, port+1) port++ } - // Remove entire data folder first + // Remove entire data folder first just in case t.Require.NoError(os.RemoveAll("radata-server")) - t.startJS(` + _, cmd, cancelFn := t.startJS(` var Gun = require('gun') const server = require('http').createServer().listen(` + strconv.Itoa(port) + `) const gun = Gun({web: server, file: 'radata-server'}) `) + return func() { + cancelFn() + cmd.Wait() + // Remove the data folder at the end + os.RemoveAll("radata-server") + } } func (t *testContext) newGunConnectedToGunJS() *gun.Gun { diff --git a/gun/tests/gun_test.go b/gun/tests/gun_test.go index 621adfe..6546750 100644 --- a/gun/tests/gun_test.go +++ b/gun/tests/gun_test.go @@ -9,9 +9,8 @@ import ( func TestGunGetSimple(t *testing.T) { // Run the server, put in one call, get in another, then check - ctx, cancelFn := newContext(t) + ctx, cancelFn := newContextWithGunJServer(t) defer cancelFn() - ctx.startGunJSServer() randStr := randString(30) // Write w/ JS ctx.runJSWithGun(` @@ -38,9 +37,8 @@ func TestGunGetSimple(t *testing.T) { } func TestGunPutSimple(t *testing.T) { - ctx, cancelFn := newContext(t) + ctx, cancelFn := newContextWithGunJServer(t) defer cancelFn() - ctx.startGunJSServer() randStr := randString(30) // Put g := ctx.newGunConnectedToGunJS() diff --git a/gun/tests/js_test.go b/gun/tests/js_test.go index 93789b2..bc327b8 100644 --- a/gun/tests/js_test.go +++ b/gun/tests/js_test.go @@ -13,9 +13,8 @@ func TestSimpleJS(t *testing.T) { func TestGunJS(t *testing.T) { // Run the server, put in one call, get in another, then check - ctx, cancelFn := newContext(t) + ctx, cancelFn := newContextWithGunJServer(t) defer cancelFn() - ctx.startGunJSServer() randStr := randString(30) ctx.runJSWithGun(` gun.get('esgopeta-test').get('TestGunJS').get('some-key').put('` + randStr + `', ack => {