go-gun/gun/gun.go

366 lines
11 KiB
Go
Raw Permalink Normal View History

2019-02-20 20:54:46 +00:00
package gun
import (
"context"
"fmt"
2019-02-22 06:46:19 +00:00
"sync"
2019-02-22 18:40:02 +00:00
"time"
2019-02-20 20:54:46 +00:00
)
2019-02-26 21:59:44 +00:00
// Gun is the main client/server instance for the database.
2019-02-20 20:54:46 +00:00
type Gun struct {
2019-02-26 21:59:44 +00:00
// currentPeers is the up-to-date peer list. Do not access this without
// using the associated lock. This slice is never mutated, only replaced.
// There are methods to get and change the list.
currentPeers []*Peer
currentPeersLock sync.RWMutex
2019-02-22 09:23:14 +00:00
storage Storage
soulGen func() string
peerErrorHandler func(*ErrPeer)
2019-02-22 18:40:02 +00:00
peerSleepOnError time.Duration
myPeerID string
2019-02-22 21:40:55 +00:00
tracking Tracking
2019-02-22 06:46:19 +00:00
serversCancelFn context.CancelFunc
messageIDListeners map[string]chan<- *messageReceived
2019-02-25 04:23:15 +00:00
messageIDListenersLock sync.RWMutex
messageSoulListeners map[string]chan<- *messageReceived
messageSoulListenersLock sync.RWMutex
2019-02-20 20:54:46 +00:00
}
2019-02-26 21:59:44 +00:00
// Config is the configuration for a Gun instance.
2019-02-20 20:54:46 +00:00
type Config struct {
2019-02-26 21:59:44 +00:00
// PeerURLs is the initial set of peer URLs to connect to. This can be empty
// to not connect to any peers.
PeerURLs []string
// Servers is the set of local servers to listen for new peers on. This can
// be empty to not run any server.
Servers []Server
// Storage is the backend storage to locally store data. If not present, a
// NewStorageInMem is created with a value expiration of
// DefaultOldestAllowedStorageValue.
Storage Storage
// SoulGen is a generator for new soul values. If not present,
// DefaultSoulGen is used.
SoulGen func() string
// PeerErrorHandler is called on every error from or concerning a peer. Not
// required.
2019-02-22 09:23:14 +00:00
PeerErrorHandler func(*ErrPeer)
2019-02-26 21:59:44 +00:00
// PeerSleepOnError is the amount of time we will consider a reconnectable
// peer "bad" on error before attempting reconnect. If 0 (i.e. not set),
// DefaultPeerSleepOnError is used.
2019-02-22 18:40:02 +00:00
PeerSleepOnError time.Duration
2019-02-26 21:59:44 +00:00
// MyPeerID is the identifier given for this peer to whoever asks. If empty
// it is set to a random string
MyPeerID string
// Tracking is how seen values are updated when they are seen on the wire.
// When set to TrackingNothing, no seen values will be updated. When set to
// TrackingRequested (the default), only values that are already in storage
// will be updated when seen. When set to TrackingEverything, all values
// seen on the wire will be stored.
Tracking Tracking
2019-02-20 20:54:46 +00:00
}
2019-02-26 21:59:44 +00:00
// Tracking is what to do when a value update is seen on the wire.
2019-02-22 21:40:55 +00:00
type Tracking int
const (
2019-02-26 21:59:44 +00:00
// TrackingRequested means any value we have already stored will be updated
// when seen.
2019-02-22 21:40:55 +00:00
TrackingRequested Tracking = iota
2019-02-26 21:59:44 +00:00
// TrackingNothing means no values will be updated.
2019-02-22 21:40:55 +00:00
TrackingNothing
2019-02-26 21:59:44 +00:00
// TrackingEverything means every value update is stored.
2019-02-22 21:40:55 +00:00
TrackingEverything
)
2019-02-26 21:59:44 +00:00
// DefaultPeerSleepOnError is the default amount of time that an errored
// reconnectable peer will wait until trying to reconnect.
2019-02-22 18:40:02 +00:00
const DefaultPeerSleepOnError = 30 * time.Second
2019-02-26 21:59:44 +00:00
// DefaultOldestAllowedStorageValue is the default NewStorageInMem expiration.
const DefaultOldestAllowedStorageValue = 7 * (60 * time.Minute)
2019-02-22 18:40:02 +00:00
2019-02-26 21:59:44 +00:00
// New creates a new Gun instance for the given config. The given context is
// used for all peer connection reconnects now/later, all server servings, and
// peer message handling. Therefore it should be kept available at least until
// Close. Callers must still call Close on complete, the completion of the
// context does not automatically do it.
//
// This returns nil with an error on any initial peer connection failure (and
// cleans up any other peers temporarily connected to).
2019-02-22 18:40:02 +00:00
func New(ctx context.Context, config Config) (*Gun, error) {
2019-02-20 20:54:46 +00:00
g := &Gun{
currentPeers: make([]*Peer, len(config.PeerURLs)),
storage: config.Storage,
soulGen: config.SoulGen,
peerErrorHandler: config.PeerErrorHandler,
peerSleepOnError: config.PeerSleepOnError,
myPeerID: config.MyPeerID,
tracking: config.Tracking,
messageIDListeners: map[string]chan<- *messageReceived{},
messageSoulListeners: map[string]chan<- *messageReceived{},
2019-02-20 20:54:46 +00:00
}
2019-02-26 21:59:44 +00:00
// Set config defaults
if g.storage == nil {
g.storage = NewStorageInMem(DefaultOldestAllowedStorageValue)
}
if g.soulGen == nil {
g.soulGen = DefaultSoulGen
}
if g.peerSleepOnError == 0 {
g.peerSleepOnError = DefaultPeerSleepOnError
}
if g.myPeerID == "" {
g.myPeerID = randString(9)
2019-02-20 20:54:46 +00:00
}
2019-02-26 21:59:44 +00:00
// Create all the peers
2019-02-22 18:40:02 +00:00
var err error
for i := 0; i < len(config.PeerURLs) && err == nil; i++ {
peerURL := config.PeerURLs[i]
2019-02-25 05:14:26 +00:00
newConn := func() (PeerConn, error) { return NewPeerConn(ctx, peerURL) }
2019-02-26 21:59:44 +00:00
if g.currentPeers[i], err = newPeer(peerURL, newConn, g.peerSleepOnError); err != nil {
2019-02-22 18:40:02 +00:00
err = fmt.Errorf("Failed connecting to peer %v: %v", peerURL, err)
2019-02-20 20:54:46 +00:00
}
}
2019-02-22 18:40:02 +00:00
// If there was an error, we need to close what we did create
2019-02-20 20:54:46 +00:00
if err != nil {
for _, peer := range g.currentPeers {
2019-02-22 06:46:19 +00:00
if peer != nil {
peer.Close()
}
2019-02-20 20:54:46 +00:00
}
2019-02-22 18:40:02 +00:00
return nil, err
2019-02-20 20:54:46 +00:00
}
// Start receiving from peers
for _, peer := range g.currentPeers {
2019-02-26 21:59:44 +00:00
go g.startReceiving(ctx, peer)
}
// Start all the servers
2019-02-26 21:59:44 +00:00
go g.startServers(ctx, config.Servers)
2019-02-22 18:40:02 +00:00
return g, nil
2019-02-20 20:54:46 +00:00
}
2019-02-26 21:59:44 +00:00
// Scoped returns a scoped instance to the given field and children for reading
// and writing. This is the equivalent of calling the Gun JS API "get" function
// (sans callback) for each field/child.
func (g *Gun) Scoped(ctx context.Context, field string, children ...string) *Scoped {
s := newScoped(g, nil, field)
2019-02-25 05:14:26 +00:00
if len(children) > 0 {
s = s.Scoped(ctx, children[0], children[1:]...)
}
return s
}
2019-02-26 21:59:44 +00:00
// Close stops all peers and servers and closes the underlying storage.
2019-02-22 09:23:14 +00:00
func (g *Gun) Close() error {
var errs []error
for _, p := range g.peers() {
2019-02-22 09:23:14 +00:00
if err := p.Close(); err != nil {
errs = append(errs, err)
}
}
g.serversCancelFn()
if err := g.storage.Close(); err != nil {
errs = append(errs, err)
}
2019-02-22 09:23:14 +00:00
if len(errs) == 0 {
return nil
} else if len(errs) == 1 {
return errs[0]
} else {
return fmt.Errorf("Multiple errors: %v", errs)
}
2019-02-22 06:46:19 +00:00
}
func (g *Gun) peers() []*Peer {
g.currentPeersLock.RLock()
defer g.currentPeersLock.RUnlock()
return g.currentPeers
}
func (g *Gun) addPeer(p *Peer) {
g.currentPeersLock.Lock()
defer g.currentPeersLock.Unlock()
prev := g.currentPeers
g.currentPeers = make([]*Peer, len(prev)+1)
copy(g.currentPeers, prev)
g.currentPeers[len(prev)] = p
}
func (g *Gun) removePeer(p *Peer) {
g.currentPeersLock.Lock()
defer g.currentPeersLock.Unlock()
prev := g.currentPeers
g.currentPeers = make([]*Peer, 0, len(prev))
for _, peer := range prev {
if peer != p {
g.currentPeers = append(g.currentPeers, peer)
}
}
}
2019-02-25 05:14:26 +00:00
func (g *Gun) send(ctx context.Context, msg *Message, ignorePeer *Peer) <-chan *ErrPeer {
peers := g.peers()
ch := make(chan *ErrPeer, len(peers))
2019-02-22 06:46:19 +00:00
// Everything async
go func() {
defer close(ch)
var wg sync.WaitGroup
for _, peer := range peers {
2019-02-22 09:23:14 +00:00
if peer == ignorePeer {
continue
}
2019-02-22 06:46:19 +00:00
wg.Add(1)
2019-02-25 05:14:26 +00:00
go func(peer *Peer) {
2019-02-22 06:46:19 +00:00
defer wg.Done()
2019-02-22 18:40:02 +00:00
// Just do nothing if the peer is bad and we couldn't send
if _, err := peer.send(ctx, msg); err != nil {
if !peer.reconnectSupported() {
g.removePeer(peer)
}
2019-02-22 09:23:14 +00:00
peerErr := &ErrPeer{err, peer}
go g.onPeerError(peerErr)
ch <- peerErr
2019-02-22 06:46:19 +00:00
}
}(peer)
}
wg.Wait()
}()
return ch
}
2019-02-26 21:59:44 +00:00
func (g *Gun) startReceiving(ctx context.Context, peer *Peer) {
ctx, cancelFn := context.WithCancel(ctx)
defer cancelFn()
2019-02-26 21:59:44 +00:00
for !peer.Closed() && ctx.Err() == nil {
// We might not be able receive because peer is sleeping from
// an error happened within or a just-before send error.
if ok, msgs, err := peer.receive(ctx); !ok {
if err != nil {
go g.onPeerError(&ErrPeer{err, peer})
2019-02-22 06:46:19 +00:00
}
// If can reconnect, sleep at least the err duration, otherwise remove
if peer.reconnectSupported() {
time.Sleep(g.peerSleepOnError)
} else {
g.removePeer(peer)
}
} else {
// Go over each message and see if it needs delivering or rebroadcasting
for _, msg := range msgs {
g.onPeerMessage(ctx, &messageReceived{Message: msg, peer: peer})
}
}
2019-02-22 06:46:19 +00:00
}
}
func (g *Gun) onPeerMessage(ctx context.Context, msg *messageReceived) {
2019-02-26 07:10:13 +00:00
// TODO:
// * if message-acks are not considered part of a store-all server's storage, then use msg.Ack
// to determine whether we even put here instead of how we do it now.
// * handle gets
// If we're tracking anything, we try to put it (may only be if exists).
2019-02-26 07:10:13 +00:00
if g.tracking != TrackingNothing {
// If we're tracking everything, we persist everything. Otherwise if we're
// only tracking requested, we persist only if it already exists.
putOnlyIfExists := g.tracking == TrackingRequested
for parentSoul, node := range msg.Put {
for field, value := range node.Values {
if state, ok := node.Metadata.State[field]; ok {
2019-02-26 07:10:13 +00:00
// TODO: warn on other error or something
_, err := g.storage.Put(ctx, parentSoul, field, value, state, putOnlyIfExists)
if err == nil {
if msg.storedPuts == nil {
msg.storedPuts = map[string][]string{}
}
msg.storedPuts[parentSoul] = append(msg.storedPuts[parentSoul], field)
}
}
}
}
}
2019-02-26 07:10:13 +00:00
// If there is a listener for this message ID, use it and consider the message handled
2019-02-25 04:23:15 +00:00
if msg.Ack != "" {
g.messageIDListenersLock.RLock()
l := g.messageIDListeners[msg.Ack]
g.messageIDListenersLock.RUnlock()
if l != nil {
go safeReceivedMessageSend(l, msg)
return
}
}
2019-02-26 07:10:13 +00:00
// If there are listeners for any of the souls, use them but don't consider the message handled
for parentSoul := range msg.Put {
g.messageSoulListenersLock.RLock()
l := g.messageSoulListeners[parentSoul]
g.messageSoulListenersLock.RUnlock()
if l != nil {
go safeReceivedMessageSend(l, msg)
}
}
// DAM messages are either requests for our ID or setting of theirs
if msg.DAM != "" {
if msg.PID == "" {
// This is a request, set the PID and send it back
msg.PID = g.myPeerID
if _, err := msg.peer.send(ctx, msg.Message); err != nil {
go g.onPeerError(&ErrPeer{err, msg.peer})
if !msg.peer.reconnectSupported() {
g.removePeer(msg.peer)
}
}
} else {
// This is them telling us theirs
msg.peer.id = msg.PID
}
return
}
2019-02-26 07:10:13 +00:00
2019-02-22 09:23:14 +00:00
// Unhandled message means rebroadcast
g.send(ctx, msg.Message, msg.peer)
2019-02-22 06:46:19 +00:00
}
2019-02-22 09:23:14 +00:00
func (g *Gun) onPeerError(err *ErrPeer) {
if g.peerErrorHandler != nil {
g.peerErrorHandler(err)
}
2019-02-22 06:46:19 +00:00
}
func (g *Gun) registerMessageIDListener(id string, ch chan<- *messageReceived) {
2019-02-25 04:23:15 +00:00
g.messageIDListenersLock.Lock()
defer g.messageIDListenersLock.Unlock()
g.messageIDListeners[id] = ch
2019-02-22 06:46:19 +00:00
}
2019-02-25 05:14:26 +00:00
func (g *Gun) unregisterMessageIDListener(id string) {
2019-02-25 04:23:15 +00:00
g.messageIDListenersLock.Lock()
defer g.messageIDListenersLock.Unlock()
delete(g.messageIDListeners, id)
2019-02-22 06:46:19 +00:00
}
func (g *Gun) registerMessageSoulListener(soul string, ch chan<- *messageReceived) {
g.messageSoulListenersLock.Lock()
defer g.messageSoulListenersLock.Unlock()
g.messageSoulListeners[soul] = ch
}
func (g *Gun) unregisterMessageSoulListener(soul string) {
g.messageSoulListenersLock.Lock()
defer g.messageSoulListenersLock.Unlock()
delete(g.messageSoulListeners, soul)
}
func safeReceivedMessageSend(ch chan<- *messageReceived, msg *messageReceived) {
2019-02-22 06:46:19 +00:00
// Due to the fact that we may send on a closed channel here, we ignore the panic
defer func() { recover() }()
ch <- msg
2019-02-20 20:54:46 +00:00
}