go-gun/gun/peer.go

214 lines
5.9 KiB
Go
Raw Permalink Normal View History

2019-02-20 20:54:46 +00:00
package gun
import (
"context"
2019-02-22 06:46:19 +00:00
"fmt"
2019-02-20 20:54:46 +00:00
"net/url"
"sync"
2019-02-25 05:14:26 +00:00
"time"
2019-02-20 20:54:46 +00:00
)
2019-02-26 21:59:44 +00:00
// ErrPeer is an error specific to a peer.
2019-02-22 09:23:14 +00:00
type ErrPeer struct {
2019-02-26 21:59:44 +00:00
// Err is the error.
Err error
// Peer is the peer the error relates to.
2019-02-25 05:14:26 +00:00
Peer *Peer
2019-02-22 09:23:14 +00:00
}
2019-02-25 05:14:26 +00:00
func (e *ErrPeer) Error() string { return fmt.Sprintf("Error on peer %v: %v", e.Peer, e.Err) }
2019-02-22 09:23:14 +00:00
2019-02-26 21:59:44 +00:00
// Peer is a known peer to Gun. It has a single connection. Some peers are
// "reconnectable" which means due to failure, they may be in a "bad" state
// awaiting reconnection.
2019-02-25 05:14:26 +00:00
type Peer struct {
name string
2019-02-25 05:14:26 +00:00
newConn func() (PeerConn, error)
sleepOnErr time.Duration // TODO: would be better as backoff
id string
connCurrent PeerConn
connBad bool // If true, don't try anything
connLock sync.Mutex
}
func newPeer(name string, newConn func() (PeerConn, error), sleepOnErr time.Duration) (*Peer, error) {
p := &Peer{name: name, newConn: newConn, sleepOnErr: sleepOnErr}
2019-02-25 05:14:26 +00:00
var err error
if p.connCurrent, err = newConn(); err != nil {
return nil, err
}
return p, nil
}
2019-02-26 21:59:44 +00:00
// ID is the identifier of the peer as given by the peer. It is empty if the
// peer wasn't asked for or didn't give an ID.
2019-02-25 05:14:26 +00:00
func (p *Peer) ID() string { return p.id }
2019-02-26 21:59:44 +00:00
// Name is the name of the peer which is usually the URL.
func (p *Peer) Name() string { return p.name }
// String is a string representation of the peer including whether it's
// connected.
2019-02-25 05:14:26 +00:00
func (p *Peer) String() string {
id := ""
if p.id != "" {
id = "(id: " + p.id + ")"
}
connStatus := "disconnected"
if conn := p.Conn(); conn != nil {
connStatus = "connected to " + conn.RemoteURL()
2019-02-25 05:14:26 +00:00
}
return fmt.Sprintf("Peer%v %v (%v)", id, p.name, connStatus)
}
func (p *Peer) reconnectSupported() bool {
return p.sleepOnErr > 0
2019-02-25 05:14:26 +00:00
}
func (p *Peer) reconnect() (err error) {
if !p.reconnectSupported() {
return fmt.Errorf("Reconnect not supported")
}
2019-02-25 05:14:26 +00:00
p.connLock.Lock()
defer p.connLock.Unlock()
if p.connCurrent == nil && p.connBad {
p.connBad = false
if p.connCurrent, err = p.newConn(); err != nil {
p.connBad = true
time.AfterFunc(p.sleepOnErr, func() { p.reconnect() })
}
}
return
}
2019-02-26 21:59:44 +00:00
// Conn is the underlying PeerConn. This can be nil if the peer is currently
// "bad" or closed.
2019-02-25 05:14:26 +00:00
func (p *Peer) Conn() PeerConn {
p.connLock.Lock()
defer p.connLock.Unlock()
return p.connCurrent
}
func (p *Peer) markConnErrored(conn PeerConn) {
if !p.reconnectSupported() {
p.Close()
return
}
2019-02-25 05:14:26 +00:00
p.connLock.Lock()
defer p.connLock.Unlock()
if conn == p.connCurrent {
p.connCurrent = nil
p.connBad = true
conn.Close()
time.AfterFunc(p.sleepOnErr, func() { p.reconnect() })
}
}
func (p *Peer) send(ctx context.Context, msg *Message, moreMsgs ...*Message) (ok bool, err error) {
conn := p.Conn()
if conn == nil {
return false, nil
}
// Clone them with peer "to"
updatedMsg := &Message{}
*updatedMsg = *msg
updatedMsg.To = conn.RemoteURL()
2019-02-25 05:14:26 +00:00
updatedMoreMsgs := make([]*Message, len(moreMsgs))
for i, moreMsg := range moreMsgs {
updatedMoreMsg := &Message{}
*updatedMoreMsg = *moreMsg
updatedMoreMsg.To = conn.RemoteURL()
2019-02-25 05:14:26 +00:00
updatedMoreMsgs[i] = updatedMoreMsg
}
if err = conn.Send(ctx, updatedMsg, updatedMoreMsgs...); err != nil {
p.markConnErrored(conn)
return false, err
}
2019-02-26 21:59:44 +00:00
return true, nil
2019-02-25 05:14:26 +00:00
}
func (p *Peer) receive(ctx context.Context) (ok bool, msgs []*Message, err error) {
if conn := p.Conn(); conn == nil {
return false, nil, nil
} else if msgs, err = conn.Receive(ctx); err != nil {
p.markConnErrored(conn)
return false, nil, err
} else {
return true, msgs, nil
}
}
2019-02-26 21:59:44 +00:00
// Close closes the peer and the connection is connected.
2019-02-25 05:14:26 +00:00
func (p *Peer) Close() error {
p.connLock.Lock()
defer p.connLock.Unlock()
var err error
if p.connCurrent != nil {
err = p.connCurrent.Close()
p.connCurrent = nil
}
p.connBad = false
return err
}
2019-02-26 21:59:44 +00:00
// Closed is whether the peer is closed.
2019-02-25 05:14:26 +00:00
func (p *Peer) Closed() bool {
p.connLock.Lock()
defer p.connLock.Unlock()
return p.connCurrent == nil && !p.connBad
}
2019-02-26 21:59:44 +00:00
// PeerConn is a single peer connection.
2019-02-25 05:14:26 +00:00
type PeerConn interface {
2019-02-26 21:59:44 +00:00
// Send sends the given message (and maybe others) to the peer. The context
// governs just this send.
2019-02-22 18:40:02 +00:00
Send(ctx context.Context, msg *Message, moreMsgs ...*Message) error
2019-02-26 21:59:44 +00:00
// Receive waits for the next message (or set of messages if sent at once)
// from a peer. The context can be used to control a timeout.
2019-02-22 18:40:02 +00:00
Receive(ctx context.Context) ([]*Message, error)
2019-02-26 21:59:44 +00:00
// RemoteURL is the URL this peer is connected via.
RemoteURL() string
2019-02-26 21:59:44 +00:00
// Close closes this connection.
2019-02-20 20:54:46 +00:00
Close() error
}
2019-02-26 21:59:44 +00:00
// PeerURLSchemes is the map that maps URL schemes to factory functions to
// create the connection. Currently "http" and "https" simply defer to "ws" and
// "wss" respectively. "ws" and "wss" use DialPeerConnWebSocket.
2019-02-25 05:14:26 +00:00
var PeerURLSchemes map[string]func(context.Context, *url.URL) (PeerConn, error)
2019-02-25 04:23:15 +00:00
func init() {
2019-02-25 05:14:26 +00:00
PeerURLSchemes = map[string]func(context.Context, *url.URL) (PeerConn, error){
"http": func(ctx context.Context, peerURL *url.URL) (PeerConn, error) {
2019-02-25 04:23:15 +00:00
schemeChangedURL := &url.URL{}
*schemeChangedURL = *peerURL
schemeChangedURL.Scheme = "ws"
return DialPeerConnWebSocket(ctx, schemeChangedURL)
2019-02-25 04:23:15 +00:00
},
"https": func(ctx context.Context, peerURL *url.URL) (PeerConn, error) {
schemeChangedURL := &url.URL{}
*schemeChangedURL = *peerURL
schemeChangedURL.Scheme = "wss"
return DialPeerConnWebSocket(ctx, schemeChangedURL)
},
2019-02-25 05:14:26 +00:00
"ws": func(ctx context.Context, peerURL *url.URL) (PeerConn, error) {
return DialPeerConnWebSocket(ctx, peerURL)
2019-02-25 04:23:15 +00:00
},
"wss": func(ctx context.Context, peerURL *url.URL) (PeerConn, error) {
return DialPeerConnWebSocket(ctx, peerURL)
},
2019-02-25 04:23:15 +00:00
}
2019-02-20 20:54:46 +00:00
}
2019-02-26 21:59:44 +00:00
// NewPeerConn connects to a peer for the given URL.
2019-02-25 05:14:26 +00:00
func NewPeerConn(ctx context.Context, peerURL string) (PeerConn, error) {
2019-02-22 06:46:19 +00:00
if parsedURL, err := url.Parse(peerURL); err != nil {
return nil, err
} else if peerNew := PeerURLSchemes[parsedURL.Scheme]; peerNew == nil {
return nil, fmt.Errorf("Unknown peer URL scheme %v", parsedURL.Scheme)
} else {
return peerNew(ctx, parsedURL)
}
}