diff --git a/README.md b/README.md index 7c0cb93..8f36ee2 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ Esgopeta is a Go implementation of the [Gun](https://github.com/amark/gun) distributed graph database. See the [Godoc](https://godoc.org/github.com/cretz/esgopeta/gun) for API details. -**WARNING: This is an early proof-of-concept alpha version. Many pieces are not implemented.** +**WARNING: This is an early proof-of-concept alpha version. Many pieces are not implemented or don't work.** Features: diff --git a/gun/doc.go b/gun/doc.go new file mode 100644 index 0000000..8c820b0 --- /dev/null +++ b/gun/doc.go @@ -0,0 +1,12 @@ +/* +Package gun is an implementation of the Gun distributed graph database in Go. +See https://gun.eco for more information on the Gun distributed graph database. + +For common use, create a Gun instance via New, use Scoped to arrive at the +desired field, then use Fetch to get/listen to values and/or Put to write +values. A simple example is in the README at https://github.com/cretz/esgopeta. + +WARNING: This is an early proof-of-concept alpha version. Many pieces are not +implemented or don't work. +*/ +package gun diff --git a/gun/gun.go b/gun/gun.go index b3d06dc..3166b29 100644 --- a/gun/gun.go +++ b/gun/gun.go @@ -7,8 +7,11 @@ import ( "time" ) +// Gun is the main client/server instance for the database. type Gun struct { - // Never mutated, always overwritten + // currentPeers is the up-to-date peer list. Do not access this without + // using the associated lock. This slice is never mutated, only replaced. + // There are methods to get and change the list. currentPeers []*Peer currentPeersLock sync.RWMutex @@ -28,28 +31,67 @@ type Gun struct { messageSoulListenersLock sync.RWMutex } +// Config is the configuration for a Gun instance. type Config struct { - PeerURLs []string - Servers []Server - Storage Storage - SoulGen func() string + // PeerURLs is the initial set of peer URLs to connect to. This can be empty + // to not connect to any peers. + PeerURLs []string + // Servers is the set of local servers to listen for new peers on. This can + // be empty to not run any server. + Servers []Server + // Storage is the backend storage to locally store data. If not present, a + // NewStorageInMem is created with a value expiration of + // DefaultOldestAllowedStorageValue. + Storage Storage + // SoulGen is a generator for new soul values. If not present, + // DefaultSoulGen is used. + SoulGen func() string + // PeerErrorHandler is called on every error from or concerning a peer. Not + // required. PeerErrorHandler func(*ErrPeer) + // PeerSleepOnError is the amount of time we will consider a reconnectable + // peer "bad" on error before attempting reconnect. If 0 (i.e. not set), + // DefaultPeerSleepOnError is used. PeerSleepOnError time.Duration - MyPeerID string - Tracking Tracking + // MyPeerID is the identifier given for this peer to whoever asks. If empty + // it is set to a random string + MyPeerID string + // Tracking is how seen values are updated when they are seen on the wire. + // When set to TrackingNothing, no seen values will be updated. When set to + // TrackingRequested (the default), only values that are already in storage + // will be updated when seen. When set to TrackingEverything, all values + // seen on the wire will be stored. + Tracking Tracking } +// Tracking is what to do when a value update is seen on the wire. type Tracking int const ( + // TrackingRequested means any value we have already stored will be updated + // when seen. TrackingRequested Tracking = iota + // TrackingNothing means no values will be updated. TrackingNothing + // TrackingEverything means every value update is stored. TrackingEverything ) +// DefaultPeerSleepOnError is the default amount of time that an errored +// reconnectable peer will wait until trying to reconnect. const DefaultPeerSleepOnError = 30 * time.Second + +// DefaultOldestAllowedStorageValue is the default NewStorageInMem expiration. const DefaultOldestAllowedStorageValue = 7 * (60 * time.Minute) +// New creates a new Gun instance for the given config. The given context is +// used for all peer connection reconnects now/later, all server servings, and +// peer message handling. Therefore it should be kept available at least until +// Close. Callers must still call Close on complete, the completion of the +// context does not automatically do it. +// +// This returns nil with an error on any initial peer connection failure (and +// cleans up any other peers temporarily connected to). func New(ctx context.Context, config Config) (*Gun, error) { g := &Gun{ currentPeers: make([]*Peer, len(config.PeerURLs)), @@ -62,16 +104,25 @@ func New(ctx context.Context, config Config) (*Gun, error) { messageIDListeners: map[string]chan<- *messageReceived{}, messageSoulListeners: map[string]chan<- *messageReceived{}, } - // Create all the peers - sleepOnError := config.PeerSleepOnError - if sleepOnError == 0 { - sleepOnError = DefaultPeerSleepOnError + // Set config defaults + if g.storage == nil { + g.storage = NewStorageInMem(DefaultOldestAllowedStorageValue) } + if g.soulGen == nil { + g.soulGen = DefaultSoulGen + } + if g.peerSleepOnError == 0 { + g.peerSleepOnError = DefaultPeerSleepOnError + } + if g.myPeerID == "" { + g.myPeerID = randString(9) + } + // Create all the peers var err error for i := 0; i < len(config.PeerURLs) && err == nil; i++ { peerURL := config.PeerURLs[i] newConn := func() (PeerConn, error) { return NewPeerConn(ctx, peerURL) } - if g.currentPeers[i], err = newPeer(peerURL, newConn, sleepOnError); err != nil { + if g.currentPeers[i], err = newPeer(peerURL, newConn, g.peerSleepOnError); err != nil { err = fmt.Errorf("Failed connecting to peer %v: %v", peerURL, err) } } @@ -84,33 +135,27 @@ func New(ctx context.Context, config Config) (*Gun, error) { } return nil, err } - // Set defaults - if g.storage == nil { - g.storage = NewStorageInMem(DefaultOldestAllowedStorageValue) - } - if g.soulGen == nil { - g.soulGen = DefaultSoulGen - } - if g.myPeerID == "" { - g.myPeerID = randString(9) - } // Start receiving from peers for _, peer := range g.currentPeers { - go g.startReceiving(peer) + go g.startReceiving(ctx, peer) } // Start all the servers - go g.startServers(config.Servers) + go g.startServers(ctx, config.Servers) return g, nil } -func (g *Gun) Scoped(ctx context.Context, key string, children ...string) *Scoped { - s := newScoped(g, nil, key) +// Scoped returns a scoped instance to the given field and children for reading +// and writing. This is the equivalent of calling the Gun JS API "get" function +// (sans callback) for each field/child. +func (g *Gun) Scoped(ctx context.Context, field string, children ...string) *Scoped { + s := newScoped(g, nil, field) if len(children) > 0 { s = s.Scoped(ctx, children[0], children[1:]...) } return s } +// Close stops all peers and servers and closes the underlying storage. func (g *Gun) Close() error { var errs []error for _, p := range g.peers() { @@ -188,11 +233,10 @@ func (g *Gun) send(ctx context.Context, msg *Message, ignorePeer *Peer) <-chan * return ch } -func (g *Gun) startReceiving(peer *Peer) { - // TDO: some kind of overall context is probably needed - ctx, cancelFn := context.WithCancel(context.TODO()) +func (g *Gun) startReceiving(ctx context.Context, peer *Peer) { + ctx, cancelFn := context.WithCancel(ctx) defer cancelFn() - for !peer.Closed() { + for !peer.Closed() && ctx.Err() == nil { // We might not be able receive because peer is sleeping from // an error happened within or a just-before send error. if ok, msgs, err := peer.receive(ctx); !ok { diff --git a/gun/message.go b/gun/message.go index 053c4c7..73060aa 100644 --- a/gun/message.go +++ b/gun/message.go @@ -2,6 +2,7 @@ package gun import "encoding/json" +// Message is the JSON-encodable message that Gun peers send to each other. type Message struct { Ack string `json:"@,omitempty"` ID string `json:"#,omitempty"` @@ -16,6 +17,7 @@ type Message struct { Err string `json:"err,omitempty"` } +// MessageGetRequest is the format for Message.Get. type MessageGetRequest struct { Soul string `json:"#,omitempty"` Field string `json:".,omitempty"` @@ -23,7 +25,10 @@ type MessageGetRequest struct { type messageReceived struct { *Message - - peer *Peer + peer *Peer + // storedPuts are the souls and their fields that have been stored by + // another part of the code. This is useful if the main instance stores + // something it sees, there's no need for the message listener to do so as + // well. storedPuts map[string][]string } diff --git a/gun/node.go b/gun/node.go index 9d9babf..1e74d8b 100644 --- a/gun/node.go +++ b/gun/node.go @@ -7,6 +7,10 @@ import ( "strconv" ) +// DefaultSoulGen is the default soul generator. It uses the current time in +// MS as the first part of the string. However if that MS was already used in +// this process, the a guaranteed-process-unique-nano-level time is added to +// the first part. The second part is a random string. func DefaultSoulGen() string { ms, uniqueNum := timeNowUniqueUnix() s := strconv.FormatInt(ms, 36) @@ -16,11 +20,16 @@ func DefaultSoulGen() string { return s + randString(12) } +// Node is a JSON-encodable representation of a Gun node which is a set of +// scalar values by name and metadata about those values. type Node struct { + // Metadata is the metadata for this node. Metadata + // Values is the set of values (including null) keyed by the field name. Values map[string]Value } +// MarshalJSON implements encoding/json.Marshaler for Node. func (n *Node) MarshalJSON() ([]byte, error) { // Just put it all in a map and then encode it toEnc := make(map[string]interface{}, len(n.Values)+1) @@ -31,6 +40,7 @@ func (n *Node) MarshalJSON() ([]byte, error) { return json.Marshal(toEnc) } +// UnmarshalJSON implements encoding/json.Unmarshaler for Node. func (n *Node) UnmarshalJSON(b []byte) error { dec := json.NewDecoder(bytes.NewReader(b)) dec.UseNumber() @@ -60,15 +70,24 @@ func (n *Node) UnmarshalJSON(b []byte) error { } } +// Metadata is the soul of a node and the state of its values. type Metadata struct { - Soul string `json:"#,omitempty"` + // Soul is the unique identifier of this node. + Soul string `json:"#,omitempty"` + // State is the conflict state value for each node field. State map[string]State `json:">,omitempty"` } +// Value is the common interface implemented by all possible Gun values. The +// possible values of Value are nil and instances of ValueNumber, ValueString, +// ValueBool, and ValueRelation. They can all be marshaled into JSON. type Value interface { nodeValue() } +// ValueDecodeJSON decodes a single Value from JSON. For the given JSON decoder +// and last read token, this decodes a Value. The decoder needs to have ran +// UseNumber. Unrecognized values are errors. func ValueDecodeJSON(token json.Token, dec *json.Decoder) (Value, error) { switch token := token.(type) { case nil: @@ -102,22 +121,39 @@ func ValueDecodeJSON(token json.Token, dec *json.Decoder) (Value, error) { } } +// ValueString is a representation of a string Value. type ValueString string -func (ValueString) nodeValue() {} +func (ValueString) nodeValue() {} +func (v ValueString) String() string { return string(v) } +// ValueNumber is a representation of a number Value. It is typed as a string +// similar to how encoding/json.Number works since it can overflow numeric +// types. type ValueNumber string -func (ValueNumber) nodeValue() {} +func (ValueNumber) nodeValue() {} +func (v ValueNumber) String() string { return string(v) } +// Float64 returns the number as a float64. +func (v ValueNumber) Float64() (float64, error) { return strconv.ParseFloat(string(v), 64) } + +// Int64 returns the number as an int64. +func (v ValueNumber) Int64() (int64, error) { return strconv.ParseInt(string(v), 10, 64) } + +// ValueBool is a representation of a bool Value. type ValueBool bool func (ValueBool) nodeValue() {} +// ValueRelation is a representation of a relation Value. The value is the soul +// of the linked node. It has a custom JSON encoding. type ValueRelation string -func (ValueRelation) nodeValue() {} +func (ValueRelation) nodeValue() {} +func (v ValueRelation) String() string { return string(v) } -func (n ValueRelation) MarshalJSON() ([]byte, error) { - return json.Marshal(map[string]string{"#": string(n)}) +// MarshalJSON implements encoding/json.Marshaler fr ValueRelation. +func (v ValueRelation) MarshalJSON() ([]byte, error) { + return json.Marshal(map[string]string{"#": string(v)}) } diff --git a/gun/peer.go b/gun/peer.go index 32b53c7..d35f677 100644 --- a/gun/peer.go +++ b/gun/peer.go @@ -8,13 +8,19 @@ import ( "time" ) +// ErrPeer is an error specific to a peer. type ErrPeer struct { - Err error + // Err is the error. + Err error + // Peer is the peer the error relates to. Peer *Peer } func (e *ErrPeer) Error() string { return fmt.Sprintf("Error on peer %v: %v", e.Peer, e.Err) } +// Peer is a known peer to Gun. It has a single connection. Some peers are +// "reconnectable" which means due to failure, they may be in a "bad" state +// awaiting reconnection. type Peer struct { name string newConn func() (PeerConn, error) @@ -35,8 +41,15 @@ func newPeer(name string, newConn func() (PeerConn, error), sleepOnErr time.Dura return p, nil } +// ID is the identifier of the peer as given by the peer. It is empty if the +// peer wasn't asked for or didn't give an ID. func (p *Peer) ID() string { return p.id } +// Name is the name of the peer which is usually the URL. +func (p *Peer) Name() string { return p.name } + +// String is a string representation of the peer including whether it's +// connected. func (p *Peer) String() string { id := "" if p.id != "" { @@ -69,7 +82,8 @@ func (p *Peer) reconnect() (err error) { return } -// Can be nil peer if currently bad or closed +// Conn is the underlying PeerConn. This can be nil if the peer is currently +// "bad" or closed. func (p *Peer) Conn() PeerConn { p.connLock.Lock() defer p.connLock.Unlock() @@ -110,9 +124,8 @@ func (p *Peer) send(ctx context.Context, msg *Message, moreMsgs ...*Message) (ok if err = conn.Send(ctx, updatedMsg, updatedMoreMsgs...); err != nil { p.markConnErrored(conn) return false, err - } else { - return true, nil } + return true, nil } func (p *Peer) receive(ctx context.Context) (ok bool, msgs []*Message, err error) { @@ -126,6 +139,7 @@ func (p *Peer) receive(ctx context.Context) (ok bool, msgs []*Message, err error } } +// Close closes the peer and the connection is connected. func (p *Peer) Close() error { p.connLock.Lock() defer p.connLock.Unlock() @@ -138,19 +152,30 @@ func (p *Peer) Close() error { return err } +// Closed is whether the peer is closed. func (p *Peer) Closed() bool { p.connLock.Lock() defer p.connLock.Unlock() return p.connCurrent == nil && !p.connBad } +// PeerConn is a single peer connection. type PeerConn interface { + // Send sends the given message (and maybe others) to the peer. The context + // governs just this send. Send(ctx context.Context, msg *Message, moreMsgs ...*Message) error + // Receive waits for the next message (or set of messages if sent at once) + // from a peer. The context can be used to control a timeout. Receive(ctx context.Context) ([]*Message, error) + // RemoteURL is the URL this peer is connected via. RemoteURL() string + // Close closes this connection. Close() error } +// PeerURLSchemes is the map that maps URL schemes to factory functions to +// create the connection. Currently "http" and "https" simply defer to "ws" and +// "wss" respectively. "ws" and "wss" use DialPeerConnWebSocket. var PeerURLSchemes map[string]func(context.Context, *url.URL) (PeerConn, error) func init() { @@ -176,6 +201,7 @@ func init() { } } +// NewPeerConn connects to a peer for the given URL. func NewPeerConn(ctx context.Context, peerURL string) (PeerConn, error) { if parsedURL, err := url.Parse(peerURL); err != nil { return nil, err diff --git a/gun/scoped.go b/gun/scoped.go index 3720cbd..8e82cbd 100644 --- a/gun/scoped.go +++ b/gun/scoped.go @@ -7,6 +7,7 @@ import ( "sync" ) +// Scoped is a contextual, namespaced field to read or write. type Scoped struct { gun *Gun @@ -32,10 +33,21 @@ func newScoped(gun *Gun, parent *Scoped, field string) *Scoped { } } +// ErrNotObject occurs when a put or a fetch is attempted as a child of an +// existing, non-relation value. var ErrNotObject = errors.New("Scoped value not an object") + +// ErrLookupOnTopLevel occurs when a put or remote fetch is attempted on +// a top-level field. var ErrLookupOnTopLevel = errors.New("Cannot do put/lookup on top level") -// Empty string if doesn't exist, ErrNotObject if self or parent not an object +// Soul returns the current soul of this value relation. It returns a cached +// value if called before. Otherwise, it does a FetchOne to get the value +// and return its soul if a relation. If any parent is not a relation or this +// value exists and is not a relation, ErrNotObject is returned. If this field +// doesn't exist yet, an empty string is returned with no error. Otherwise, +// the soul of the relation is returned. The context can be used to timeout the +// fetch. func (s *Scoped) Soul(ctx context.Context) (string, error) { if cachedSoul := s.cachedSoul(); cachedSoul != "" { return cachedSoul, nil @@ -68,8 +80,11 @@ func (s *Scoped) setCachedSoul(val ValueRelation) bool { return true } -func (s *Scoped) Scoped(ctx context.Context, key string, children ...string) *Scoped { - ret := newScoped(s.gun, s, key) +// Scoped returns a scoped instance to the given field and children for reading +// and writing. This is the equivalent of calling the Gun JS API "get" function +// (sans callback) for each field/child. +func (s *Scoped) Scoped(ctx context.Context, field string, children ...string) *Scoped { + ret := newScoped(s.gun, s, field) for _, child := range children { ret = newScoped(s.gun, ret, child) } diff --git a/gun/scoped_fetch.go b/gun/scoped_fetch.go index 51109d3..c589e41 100644 --- a/gun/scoped_fetch.go +++ b/gun/scoped_fetch.go @@ -12,18 +12,39 @@ type fetchResultListener struct { receivedMessages chan *messageReceived } +// FetchResult is a result of a fetch. type FetchResult struct { - // This can be a context error on cancelation - Err error + // Err is any error on fetch, local or remote. This can be a context error + // if the fetch's context completes. This is nil on successful fetch. + // + // This may be ErrLookupOnTopLevel for a remote fetch of a top-level field. + // This may be ErrNotObject if the field is a child of a non-relation value. + Err error + // Field is the name of the field that was fetched. It is a convenience + // value for the scope's field this was originally called on. Field string - // Nil if the value doesn't exist, exists and is nil, or there's an error - Value Value - State State // This can be 0 for errors or top-level value relations + // Value is the fetched value. This will be nil if Err is not nil. This will + // also be nil if a peer said the value does not exist. This will also be + // nil if the value exists but is nil. Use ValueExists to distinguish + // between the last two cases. + Value Value + // State is the conflict state of the value. It may be 0 for errors. It may + // also be 0 if this is a top-level field. + State State + // ValueExists is true if there is no error and the fetched value, nil or + // not, does exist. ValueExists bool - // Nil when local and sometimes on error + // Peer is the peer this result is for. This is nil for results from local + // storage. This may be nil on error. Peer *Peer } +// FetchOne fetches a single result, trying local first. It is a shortcut for +// calling FetchOneLocal and if it doesn't exist falling back to FetchOneRemote. +// This should not be called on a top-level field. The context can be used to +// timeout the wait. +// +// This is the equivalent of the Gun JS API "once" function. func (s *Scoped) FetchOne(ctx context.Context) *FetchResult { // Try local before remote if r := s.FetchOneLocal(ctx); r.Err != nil || r.ValueExists { @@ -32,6 +53,8 @@ func (s *Scoped) FetchOne(ctx context.Context) *FetchResult { return s.FetchOneRemote(ctx) } +// FetchOneLocal gets a local value from storage. For top-level fields, it +// simply returns the field name as a relation. func (s *Scoped) FetchOneLocal(ctx context.Context) *FetchResult { // If there is no parent, this is just the relation if s.parent == nil { @@ -50,6 +73,10 @@ func (s *Scoped) FetchOneLocal(ctx context.Context) *FetchResult { return r } +// FetchOneRemote fetches a single result from the first peer that responds. It +// will not look in storage first. This will error if called for a top-level +// field. This is a shortcut for calling FetchRemote, waiting for a single +// value, then calling FetchDone. The context can be used to timeout the wait. func (s *Scoped) FetchOneRemote(ctx context.Context) *FetchResult { if s.parent == nil { return &FetchResult{Err: ErrLookupOnTopLevel, Field: s.field} @@ -59,6 +86,15 @@ func (s *Scoped) FetchOneRemote(ctx context.Context) *FetchResult { return <-ch } +// Fetch fetches and listens for updates on the field and sends them to the +// resulting channel. This will error if called for a top-level field. The +// resulting channel is closed on Gun close, context completion, or when +// FetchDone is called with it. Users should ensure one of the three happen in a +// reasonable timeframe to stop listening and prevent leaks. This is a shortcut +// for FetchOneLocal (but doesn't send to channel if doesn't exist) followed by +// FetchRemote. +// +// This is the equivalent of the Gun JS API "on" function. func (s *Scoped) Fetch(ctx context.Context) <-chan *FetchResult { ch := make(chan *FetchResult, 1) if s.parent == nil { @@ -73,11 +109,15 @@ func (s *Scoped) Fetch(ctx context.Context) <-chan *FetchResult { return ch } +// FetchRemote fetches and listens for updates on a field only from peers, not +// via storage. This will error if called for a top-level field. The resulting +// channel is closed on Gun close, context completion, or when FetchDone is +// called with it. Users should ensure one of the three happen in a reasonable +// timeframe to stop listening and prevent leaks. func (s *Scoped) FetchRemote(ctx context.Context) <-chan *FetchResult { ch := make(chan *FetchResult, 1) if s.parent == nil { ch <- &FetchResult{Err: ErrLookupOnTopLevel, Field: s.field} - close(ch) } else { go s.fetchRemote(ctx, ch) } @@ -176,6 +216,9 @@ func (s *Scoped) fetchRemote(ctx context.Context, ch chan *FetchResult) { }() } +// FetchDone is called with a channel returned from Fetch or FetchRemote to stop +// listening and close the channel. It returns true if it actually stopped +// listening or false if it wasn't listening. func (s *Scoped) FetchDone(ch <-chan *FetchResult) bool { s.fetchResultListenersLock.Lock() l := s.fetchResultListeners[ch] diff --git a/gun/scoped_put.go b/gun/scoped_put.go index 125415a..f64dbe2 100644 --- a/gun/scoped_put.go +++ b/gun/scoped_put.go @@ -11,22 +11,53 @@ type putResultListener struct { receivedMessages chan *messageReceived } +// PutResult is either an acknowledgement or an error for a put. type PutResult struct { + // Err is any error on put, local or remote. This can be a context error + // if the put's context completes. This is nil on successful put. + // + // This may be ErrLookupOnTopLevel for a remote fetch of a top-level field. + // This may be ErrNotObject if the field is a child of a non-relation value. Err error - // Nil on error or local put success + // Field is the name of the field that was put. It is a convenience value + // for the scope's field this was originally called on. + Field string + // Peer is the peer this result is for. This is nil for results from local + // storage. This may be nil on error. Peer *Peer } -type PutOption interface{} +// PutOption is the base interface for all options that can be passed to Put. +type PutOption interface { + putOption() +} type putOptionStoreLocalOnly struct{} -func PutOptionStoreLocalOnly() PutOption { return putOptionStoreLocalOnly{} } +func (putOptionStoreLocalOnly) putOption() {} + +// PutOptionStoreLocalOnly makes Put only store locally and then be done. +var PutOptionStoreLocalOnly PutOption = putOptionStoreLocalOnly{} type putOptionFailWithoutParent struct{} -func PutOptionFailWithoutParent() PutOption { return putOptionFailWithoutParent{} } +func (putOptionFailWithoutParent) putOption() {} +// PutOptionFailWithoutParent makes Put fail if it would need to lazily create +// parent relations. +var PutOptionFailWithoutParent PutOption = putOptionFailWithoutParent{} + +// Put puts a value on the field in local storage. It also sends the put to all +// peers unless the PutOptionStoreLocalOnly option is present. Each +// acknowledgement or error will be sent to the resulting channel. Unless the +// PutOptionFailWithoutParent option is present, this will lazily create all +// parent relations that do not already exist. This will error if called for a +// top-level field. The resulting channel is closed on Gun close, context +// completion, or when PutDone is called with it. Users should ensure one of the +// three happen in a reasonable timeframe to stop listening for acks and prevent +// leaks. +// +// This is the equivalent of the Gun JS API "put" function. func (s *Scoped) Put(ctx context.Context, val Value, opts ...PutOption) <-chan *PutResult { // Collect the options storeLocalOnly := false @@ -47,19 +78,16 @@ func (s *Scoped) Put(ctx context.Context, val Value, opts ...PutOption) <-chan * } if len(parents) == 0 { ch <- &PutResult{Err: ErrLookupOnTopLevel} - close(ch) return ch } // Ask for the soul on the last parent. What this will do is trigger // lazy soul fetch up the chain. Then we can go through and find who doesn't have a // cached soul, create one, and store locally. if soul, err := parents[len(parents)-1].Soul(ctx); err != nil { - ch <- &PutResult{Err: ErrLookupOnTopLevel} - close(ch) + ch <- &PutResult{Err: ErrLookupOnTopLevel, Field: s.field} return ch } else if soul == "" && failWithoutParent { - ch <- &PutResult{Err: fmt.Errorf("Parent not present but required")} - close(ch) + ch <- &PutResult{Err: fmt.Errorf("Parent not present but required"), Field: s.field} return ch } // Now for every parent that doesn't have a cached soul we create one and @@ -88,12 +116,10 @@ func (s *Scoped) Put(ctx context.Context, val Value, opts ...PutOption) <-chan * // TODO: Should I not store until the very end just in case it errors halfway // though? There are no standard cases where it should fail. if _, err := s.gun.storage.Put(ctx, prevParentSoul, parent.field, ValueRelation(parentCachedSoul), currState, false); err != nil { - ch <- &PutResult{Err: err} - close(ch) + ch <- &PutResult{Err: err, Field: s.field} return ch } else if !parent.setCachedSoul(ValueRelation(parentCachedSoul)) { - ch <- &PutResult{Err: fmt.Errorf("Concurrent cached soul set")} - close(ch) + ch <- &PutResult{Err: fmt.Errorf("Concurrent cached soul set"), Field: s.field} return ch } } @@ -101,14 +127,12 @@ func (s *Scoped) Put(ctx context.Context, val Value, opts ...PutOption) <-chan * } // Now that we've setup all the parents, we can do this store locally if _, err := s.gun.storage.Put(ctx, prevParentSoul, s.field, val, currState, false); err != nil { - ch <- &PutResult{Err: err} - close(ch) + ch <- &PutResult{Err: err, Field: s.field} return ch } // We need an ack for local store and stop if local only - ch <- &PutResult{} + ch <- &PutResult{Field: s.field} if storeLocalOnly { - close(ch) return ch } // Now, we begin the remote storing @@ -130,14 +154,14 @@ func (s *Scoped) Put(ctx context.Context, val Value, opts ...PutOption) <-chan * for { select { case <-ctx.Done(): - ch <- &PutResult{Err: ctx.Err()} + ch <- &PutResult{Err: ctx.Err(), Field: s.field} s.PutDone(ch) return case msg, ok := <-msgCh: if !ok { return } - r := &PutResult{Peer: msg.peer} + r := &PutResult{Field: s.field, Peer: msg.peer} if msg.Err != "" { r.Err = fmt.Errorf("Remote error: %v", msg.Err) } else if msg.OK != 1 { @@ -151,14 +175,18 @@ func (s *Scoped) Put(ctx context.Context, val Value, opts ...PutOption) <-chan * go func() { for peerErr := range s.gun.send(ctx, req, nil) { safePutResultSend(ch, &PutResult{ - Err: peerErr.Err, - Peer: peerErr.Peer, + Err: peerErr.Err, + Field: s.field, + Peer: peerErr.Peer, }) } }() return ch } +// PutDone is called with a channel returned from Put to stop +// listening for acks and close the channel. It returns true if it actually +// stopped listening or false if it wasn't listening. func (s *Scoped) PutDone(ch <-chan *PutResult) bool { s.putResultListenersLock.Lock() l := s.putResultListeners[ch] diff --git a/gun/server.go b/gun/server.go index 4d1cec1..80264f8 100644 --- a/gun/server.go +++ b/gun/server.go @@ -4,14 +4,19 @@ import ( "context" ) +// Server is the interface implemented by servers. type Server interface { - Serve() error // Hangs forever + // Serve is called by Gun to start the server. It should not return until + // an error occurs or Close is called. + Serve() error + // Accept is called to wait for the next peer connection or if an error + // occurs while trying. Accept() (PeerConn, error) + // Close is called by Gun to stop and close this server. Close() error } -func (g *Gun) startServers(servers []Server) { - ctx := context.Background() +func (g *Gun) startServers(ctx context.Context, servers []Server) { ctx, g.serversCancelFn = context.WithCancel(ctx) for _, server := range servers { // TODO: log error? diff --git a/gun/state.go b/gun/state.go index f793a1d..8fae131 100644 --- a/gun/state.go +++ b/gun/state.go @@ -6,27 +6,57 @@ import ( "time" ) -type State uint64 +// State represents the conflict state of this value. It is usually the Unix +// time in milliseconds. +type State float64 // TODO: what if larger? -func StateNow() State { return State(timeNowUnixMs()) } +// StateNow is the current machine state (i.e. current Unix time in ms). +func StateNow() State { + // TODO: Should I use timeNowUniqueUnix or otherwise set decimal spots to disambiguate? + return State(timeNowUnixMs()) +} +// StateFromTime converts a time to a State (i.e. converts to Unix ms). func StateFromTime(t time.Time) State { return State(timeToUnixMs(t)) } +// ConflictResolution is how to handle two values for the same field. type ConflictResolution int const ( + // ConflictResolutionNeverSeenUpdate occurs when there is no existing value. + // It means an update should always occur. ConflictResolutionNeverSeenUpdate ConflictResolution = iota + // ConflictResolutionTooFutureDeferred occurs when the update is after our + // current machine state. It means the update should be deferred. ConflictResolutionTooFutureDeferred + // ConflictResolutionOlderHistorical occurs when the update happened before + // the existing value's last update. It means it can be noted, but the + // update should be discarded. ConflictResolutionOlderHistorical + // ConflictResolutionNewerUpdate occurs when the update happened after last + // update but is not beyond ur current machine state. It means the update + // should overwrite. ConflictResolutionNewerUpdate + // ConflictResolutionSameKeep occurs when the update happened at the same + // time and it is lexically not the one chosen. It means the update should + // be discarded. ConflictResolutionSameKeep + // ConflictResolutionSameUpdate occurs when the update happened at the same + // time and it is lexically the one chosen. It means the update should + // overwrite. ConflictResolutionSameUpdate ) +// IsImmediateUpdate returns true for ConflictResolutionNeverSeenUpdate, +// ConflictResolutionNewerUpdate, and ConflictResolutionSameUpdate func (c ConflictResolution) IsImmediateUpdate() bool { return c == ConflictResolutionNeverSeenUpdate || c == ConflictResolutionNewerUpdate || c == ConflictResolutionSameUpdate } +// ConflictResolve checks the existing val/state, new val/state, and the current +// machine state to choose what to do with the update. Note, the existing val +// should always exist meaning it will never return +// ConflictResolutionNeverSeenUpdate. func ConflictResolve(existingVal Value, existingState State, newVal Value, newState State, sysState State) ConflictResolution { // Existing gunjs impl serializes to JSON first to do lexical comparisons, so we will too if sysState < newState { diff --git a/gun/storage.go b/gun/storage.go index 71c2937..189846f 100644 --- a/gun/storage.go +++ b/gun/storage.go @@ -3,21 +3,37 @@ package gun import ( "context" "errors" + "fmt" "sync" "time" ) +// ErrStorageNotFound is returned by Storage.Get and sometimes Storage.Put when +// the field doesn't exist. var ErrStorageNotFound = errors.New("Not found") +// Storage is the interface that storage adapters must implement. type Storage interface { + // Get obtains the value (which can be nil) and state from storage for the + // given field. If the field does not exist, this errors with + // ErrStorageNotFound. Get(ctx context.Context, parentSoul, field string) (Value, State, error) + // Put sets the value (which can be nil) and state in storage for the given + // field if the conflict resolution says it should (see ConflictResolve). It + // also returns the conflict resolution. If onlyIfExists is true and the + // field does not exist, this errors with ErrStorageNotFound. Otherwise, if + // the resulting resolution is an immediate update, it is done. If the + // resulting resolution is deferred for the future, it is scheduled for then + // but is not even attempted if context is completed or storage is closed. Put(ctx context.Context, parentSoul, field string, val Value, state State, onlyIfExists bool) (ConflictResolution, error) + // Close closes this storage and disallows future gets or puts. Close() error } type storageInMem struct { values map[parentSoulAndField]*valueWithState valueLock sync.RWMutex + closed bool // Do not mutate outside of valueLock purgeCancelFn context.CancelFunc } @@ -28,6 +44,9 @@ type valueWithState struct { state State } +// NewStorageInMem creates an in-memory storage that automatically purges +// values that are older than the given oldestAllowed. If oldestAllowed is 0, +// it keeps all values forever. func NewStorageInMem(oldestAllowed time.Duration) Storage { s := &storageInMem{values: map[parentSoulAndField]*valueWithState{}} // Start the purger @@ -60,7 +79,9 @@ func NewStorageInMem(oldestAllowed time.Duration) Storage { func (s *storageInMem) Get(ctx context.Context, parentSoul, field string) (Value, State, error) { s.valueLock.RLock() defer s.valueLock.RUnlock() - if vs := s.values[parentSoulAndField{parentSoul, field}]; vs == nil { + if s.closed { + return nil, 0, fmt.Errorf("Storage closed") + } else if vs := s.values[parentSoulAndField{parentSoul, field}]; vs == nil { return nil, 0, ErrStorageNotFound } else { return vs.val, vs.state, nil @@ -74,7 +95,9 @@ func (s *storageInMem) Put( defer s.valueLock.Unlock() key, newVs := parentSoulAndField{parentSoul, field}, &valueWithState{val, state} sysState := StateNow() - if existingVs := s.values[key]; existingVs == nil && onlyIfExists { + if s.closed { + return 0, fmt.Errorf("Storage closed") + } else if existingVs := s.values[key]; existingVs == nil && onlyIfExists { return 0, ErrStorageNotFound } else if existingVs == nil { confRes = ConflictResolutionNeverSeenUpdate @@ -84,9 +107,13 @@ func (s *storageInMem) Put( if confRes == ConflictResolutionTooFutureDeferred { // Schedule for 100ms past when it's deferred to time.AfterFunc(time.Duration(state-sysState)*time.Millisecond+100, func() { - // TODO: should I check whether closed? + s.valueLock.RLock() + closed := s.closed + s.valueLock.RUnlock() // TODO: what to do w/ error? - s.Put(ctx, parentSoul, field, val, state, onlyIfExists) + if !closed && ctx.Err() == nil { + s.Put(ctx, parentSoul, field, val, state, onlyIfExists) + } }) } else if confRes.IsImmediateUpdate() { s.values[key] = newVs @@ -98,5 +125,8 @@ func (s *storageInMem) Close() error { if s.purgeCancelFn != nil { s.purgeCancelFn() } + s.valueLock.Lock() + defer s.valueLock.Unlock() + s.closed = true return nil } diff --git a/gun/tests/gun_test.go b/gun/tests/gun_test.go index 0b636f3..a9b9f8b 100644 --- a/gun/tests/gun_test.go +++ b/gun/tests/gun_test.go @@ -16,7 +16,7 @@ func TestGunGetSimple(t *testing.T) { randStr := randString(30) // Write w/ JS ctx.runJSWithGun(` - gun.get('esgopeta-test').get('TestGunGetSimple').get('some-key').put('` + randStr + `', ack => { + gun.get('esgopeta-test').get('TestGunGetSimple').get('some-field').put('` + randStr + `', ack => { if (ack.err) { console.error(ack.err) process.exit(1) @@ -28,13 +28,13 @@ func TestGunGetSimple(t *testing.T) { g := ctx.newGunConnectedToGunJS() defer g.Close() // Make sure we got back the same value - r := g.Scoped(ctx, "esgopeta-test", "TestGunGetSimple", "some-key").FetchOne(ctx) + r := g.Scoped(ctx, "esgopeta-test", "TestGunGetSimple", "some-field").FetchOne(ctx) ctx.Require.NoError(r.Err) ctx.Require.Equal(gun.ValueString(randStr), r.Value.(gun.ValueString)) // Do it again with the JS server closed since it should fetch from memory serverCancelFn() - ctx.debugf("Asking for key again") - r = g.Scoped(ctx, "esgopeta-test", "TestGunGetSimple", "some-key").FetchOne(ctx) + ctx.debugf("Asking for field again") + r = g.Scoped(ctx, "esgopeta-test", "TestGunGetSimple", "some-field").FetchOne(ctx) ctx.Require.NoError(r.Err) ctx.Require.Equal(gun.ValueString(randStr), r.Value.(gun.ValueString)) } @@ -44,11 +44,11 @@ func TestGunGetSimpleRemote(t *testing.T) { ctx, cancelFn := newContext(t) defer cancelFn() remoteURL := ctx.prepareRemoteGunServer(defaultRemoteGunServerURL) - randKey, randVal := "key-"+randString(30), gun.ValueString(randString(30)) + randField, randVal := "field-"+randString(30), gun.ValueString(randString(30)) // Write w/ JS ctx.debugf("Writing value") ctx.runJSWithGunURL(remoteURL, ` - gun.get('esgopeta-test').get('TestGunGetSimpleRemote').get('`+randKey+`').put('`+string(randVal)+`', ack => { + gun.get('esgopeta-test').get('TestGunGetSimpleRemote').get('`+randField+`').put('`+string(randVal)+`', ack => { if (ack.err) { console.error(ack.err) process.exit(1) @@ -61,7 +61,7 @@ func TestGunGetSimpleRemote(t *testing.T) { g := ctx.newGunConnectedToGunServer(remoteURL) defer g.Close() // Make sure we got back the same value - r := g.Scoped(ctx, "esgopeta-test", "TestGunGetSimpleRemote", randKey).FetchOne(ctx) + r := g.Scoped(ctx, "esgopeta-test", "TestGunGetSimpleRemote", randField).FetchOne(ctx) ctx.Require.NoError(r.Err) ctx.Require.Equal(randVal, r.Value) } @@ -74,7 +74,7 @@ func TestGunPutSimple(t *testing.T) { g := ctx.newGunConnectedToGunJS() defer g.Close() // Just wait for two acks (one local, one remote) - ch := g.Scoped(ctx, "esgopeta-test", "TestGunPutSimple", "some-key").Put(ctx, gun.ValueString(randStr)) + ch := g.Scoped(ctx, "esgopeta-test", "TestGunPutSimple", "some-field").Put(ctx, gun.ValueString(randStr)) // TODO: test local is null peer and remote is non-null r := <-ch ctx.Require.NoError(r.Err) @@ -82,7 +82,7 @@ func TestGunPutSimple(t *testing.T) { ctx.Require.NoError(r.Err) // Get from JS out := ctx.runJSWithGun(` - gun.get('esgopeta-test').get('TestGunPutSimple').get('some-key').once(data => { + gun.get('esgopeta-test').get('TestGunPutSimple').get('some-field').once(data => { console.log(data) process.exit(0) }) @@ -94,17 +94,17 @@ func TestGunPubSubSimpleRemote(t *testing.T) { ctx, cancelFn := newContext(t) defer cancelFn() remoteURL := ctx.prepareRemoteGunServer(defaultRemoteGunServerURL) - randKey, randVal := "key-"+randString(30), gun.ValueString(randString(30)) + randField, randVal := "field-"+randString(30), gun.ValueString(randString(30)) // Start a fetcher ctx.debugf("Starting fetcher") fetchGun := ctx.newGunConnectedToGunServer(remoteURL) defer fetchGun.Close() - fetchCh := fetchGun.Scoped(ctx, "esgopeta-test", "TestGunPubSubSimpleRemote", randKey).Fetch(ctx) + fetchCh := fetchGun.Scoped(ctx, "esgopeta-test", "TestGunPubSubSimpleRemote", randField).Fetch(ctx) // Now put it from another instance ctx.debugf("Putting data") putGun := ctx.newGunConnectedToGunServer(remoteURL) defer putGun.Close() - putScope := putGun.Scoped(ctx, "esgopeta-test", "TestGunPubSubSimpleRemote", randKey) + putScope := putGun.Scoped(ctx, "esgopeta-test", "TestGunPubSubSimpleRemote", randField) putScope.Put(ctx, randVal) ctx.debugf("Checking fetcher") // See that the fetch got the value diff --git a/gun/tests/js_test.go b/gun/tests/js_test.go index bc327b8..278946d 100644 --- a/gun/tests/js_test.go +++ b/gun/tests/js_test.go @@ -17,7 +17,7 @@ func TestGunJS(t *testing.T) { defer cancelFn() randStr := randString(30) ctx.runJSWithGun(` - gun.get('esgopeta-test').get('TestGunJS').get('some-key').put('` + randStr + `', ack => { + gun.get('esgopeta-test').get('TestGunJS').get('some-field').put('` + randStr + `', ack => { if (ack.err) { console.error(ack.err) process.exit(1) @@ -26,7 +26,7 @@ func TestGunJS(t *testing.T) { }) `) out := ctx.runJSWithGun(` - gun.get('esgopeta-test').get('TestGunJS').get('some-key').once(data => { + gun.get('esgopeta-test').get('TestGunJS').get('some-field').once(data => { console.log(data) process.exit(0) }) diff --git a/gun/websocket.go b/gun/websocket.go index 92f4a45..f675130 100644 --- a/gun/websocket.go +++ b/gun/websocket.go @@ -19,7 +19,9 @@ type serverWebSocket struct { serveErrCh chan error } +// NewServerWebSocket creates a new Server for the given HTTP server and given upgrader. func NewServerWebSocket(server *http.Server, upgrader *websocket.Upgrader) Server { + // TODO: wait, what if they already have a server and want to control serve, close, and handler? if upgrader == nil { upgrader = &websocket.Upgrader{} } @@ -64,23 +66,27 @@ func (s *serverWebSocket) Close() error { return s.server.Close() } +// PeerConnWebSocket implements PeerConn for a websocket connection. type PeerConnWebSocket struct { Underlying *websocket.Conn WriteLock sync.Mutex } -func DialPeerConnWebSocket(ctx context.Context, peerUrl *url.URL) (*PeerConnWebSocket, error) { - conn, _, err := websocket.DefaultDialer.DialContext(ctx, peerUrl.String(), nil) +// DialPeerConnWebSocket opens a peer websocket connection. +func DialPeerConnWebSocket(ctx context.Context, peerURL *url.URL) (*PeerConnWebSocket, error) { + conn, _, err := websocket.DefaultDialer.DialContext(ctx, peerURL.String(), nil) if err != nil { return nil, err } return NewPeerConnWebSocket(conn), nil } +// NewPeerConnWebSocket wraps an existing websocket connection. func NewPeerConnWebSocket(underlying *websocket.Conn) *PeerConnWebSocket { return &PeerConnWebSocket{Underlying: underlying} } +// Send implements PeerConn.Send. func (p *PeerConnWebSocket) Send(ctx context.Context, msg *Message, moreMsgs ...*Message) error { // If there are more, send all as an array of JSON strings, otherwise just the msg var toWrite interface{} @@ -115,6 +121,7 @@ func (p *PeerConnWebSocket) Send(ctx context.Context, msg *Message, moreMsgs ... } } +// Receive implements PeerConn.Receive. func (p *PeerConnWebSocket) Receive(ctx context.Context) ([]*Message, error) { bytsCh := make(chan []byte, 1) errCh := make(chan error, 1) @@ -153,10 +160,12 @@ func (p *PeerConnWebSocket) Receive(ctx context.Context) ([]*Message, error) { } } +// RemoteURL implements PeerConn.RemoteURL. func (p *PeerConnWebSocket) RemoteURL() string { return fmt.Sprintf("http://%v", p.Underlying.RemoteAddr()) } +// Close implements PeerConn.Close. func (p *PeerConnWebSocket) Close() error { return p.Underlying.Close() }