go-gun/gun/storage.go

133 lines
4.1 KiB
Go
Raw Permalink Normal View History

2019-02-20 20:54:46 +00:00
package gun
2019-02-22 06:46:19 +00:00
import (
"context"
"errors"
2019-02-26 21:59:44 +00:00
"fmt"
2019-02-22 06:46:19 +00:00
"sync"
"time"
2019-02-22 06:46:19 +00:00
)
2019-02-26 21:59:44 +00:00
// ErrStorageNotFound is returned by Storage.Get and sometimes Storage.Put when
// the field doesn't exist.
2019-02-22 06:46:19 +00:00
var ErrStorageNotFound = errors.New("Not found")
2019-02-26 21:59:44 +00:00
// Storage is the interface that storage adapters must implement.
2019-02-20 20:54:46 +00:00
type Storage interface {
2019-02-26 21:59:44 +00:00
// Get obtains the value (which can be nil) and state from storage for the
// given field. If the field does not exist, this errors with
// ErrStorageNotFound.
2019-02-25 05:14:26 +00:00
Get(ctx context.Context, parentSoul, field string) (Value, State, error)
2019-02-26 21:59:44 +00:00
// Put sets the value (which can be nil) and state in storage for the given
// field if the conflict resolution says it should (see ConflictResolve). It
// also returns the conflict resolution. If onlyIfExists is true and the
// field does not exist, this errors with ErrStorageNotFound. Otherwise, if
// the resulting resolution is an immediate update, it is done. If the
// resulting resolution is deferred for the future, it is scheduled for then
// but is not even attempted if context is completed or storage is closed.
Put(ctx context.Context, parentSoul, field string, val Value, state State, onlyIfExists bool) (ConflictResolution, error)
2019-02-26 21:59:44 +00:00
// Close closes this storage and disallows future gets or puts.
Close() error
2019-02-20 20:54:46 +00:00
}
type storageInMem struct {
values map[parentSoulAndField]*valueWithState
valueLock sync.RWMutex
2019-02-26 21:59:44 +00:00
closed bool // Do not mutate outside of valueLock
purgeCancelFn context.CancelFunc
2019-02-22 06:46:19 +00:00
}
type parentSoulAndField struct{ parentSoul, field string }
2019-02-25 05:14:26 +00:00
type valueWithState struct {
val Value
state State
}
2019-02-26 21:59:44 +00:00
// NewStorageInMem creates an in-memory storage that automatically purges
// values that are older than the given oldestAllowed. If oldestAllowed is 0,
// it keeps all values forever.
func NewStorageInMem(oldestAllowed time.Duration) Storage {
s := &storageInMem{values: map[parentSoulAndField]*valueWithState{}}
// Start the purger
if oldestAllowed > 0 {
var ctx context.Context
ctx, s.purgeCancelFn = context.WithCancel(context.Background())
go func() {
tick := time.NewTicker(5 * time.Second)
defer tick.Stop()
for {
select {
case <-ctx.Done():
return
case t := <-tick.C:
oldestStateAllowed := StateFromTime(t.Add(-oldestAllowed))
s.valueLock.Lock()
s.valueLock.Unlock()
for k, v := range s.values {
if v.state < oldestStateAllowed {
delete(s.values, k)
}
}
}
}
}()
}
return s
}
func (s *storageInMem) Get(ctx context.Context, parentSoul, field string) (Value, State, error) {
s.valueLock.RLock()
defer s.valueLock.RUnlock()
2019-02-26 21:59:44 +00:00
if s.closed {
return nil, 0, fmt.Errorf("Storage closed")
} else if vs := s.values[parentSoulAndField{parentSoul, field}]; vs == nil {
2019-02-25 05:14:26 +00:00
return nil, 0, ErrStorageNotFound
} else {
return vs.val, vs.state, nil
}
2019-02-22 06:46:19 +00:00
}
func (s *storageInMem) Put(
ctx context.Context, parentSoul, field string, val Value, state State, onlyIfExists bool,
) (confRes ConflictResolution, err error) {
s.valueLock.Lock()
defer s.valueLock.Unlock()
key, newVs := parentSoulAndField{parentSoul, field}, &valueWithState{val, state}
sysState := StateNow()
2019-02-26 21:59:44 +00:00
if s.closed {
return 0, fmt.Errorf("Storage closed")
} else if existingVs := s.values[key]; existingVs == nil && onlyIfExists {
return 0, ErrStorageNotFound
} else if existingVs == nil {
confRes = ConflictResolutionNeverSeenUpdate
} else {
confRes = ConflictResolve(existingVs.val, existingVs.state, val, state, sysState)
}
if confRes == ConflictResolutionTooFutureDeferred {
// Schedule for 100ms past when it's deferred to
time.AfterFunc(time.Duration(state-sysState)*time.Millisecond+100, func() {
2019-02-26 21:59:44 +00:00
s.valueLock.RLock()
closed := s.closed
s.valueLock.RUnlock()
// TODO: what to do w/ error?
2019-02-26 21:59:44 +00:00
if !closed && ctx.Err() == nil {
s.Put(ctx, parentSoul, field, val, state, onlyIfExists)
}
})
} else if confRes.IsImmediateUpdate() {
s.values[key] = newVs
}
return
2019-02-20 20:54:46 +00:00
}
2019-02-22 21:40:55 +00:00
func (s *storageInMem) Close() error {
if s.purgeCancelFn != nil {
s.purgeCancelFn()
}
2019-02-26 21:59:44 +00:00
s.valueLock.Lock()
defer s.valueLock.Unlock()
s.closed = true
return nil
2019-02-22 21:40:55 +00:00
}