Refactor cache package to be more general purpose component
This commit is contained in:
parent
0bc8371f84
commit
7e3fca5de5
11
cache/cache.go
vendored
Normal file
11
cache/cache.go
vendored
Normal file
@ -0,0 +1,11 @@
|
||||
package cache
|
||||
|
||||
import "errors"
|
||||
|
||||
var ErrNilValue = errors.New("value is empty")
|
||||
|
||||
type Cache interface {
|
||||
Store(key string, value interface{}) error
|
||||
Get(key string, value interface{}) error
|
||||
Delete(key string)
|
||||
}
|
9
cache/event_cache_interface.go
vendored
9
cache/event_cache_interface.go
vendored
@ -1,9 +0,0 @@
|
||||
package cache
|
||||
|
||||
import "github.com/Secured-Finance/dione/contracts/dioneOracle"
|
||||
|
||||
type EventCache interface {
|
||||
Store(key string, event interface{}) error
|
||||
GetOracleRequestEvent(key string) (*dioneOracle.DioneOracleNewOracleRequest, error)
|
||||
Delete(key string)
|
||||
}
|
50
cache/event_log_cache.go
vendored
50
cache/event_log_cache.go
vendored
@ -1,50 +0,0 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"github.com/Secured-Finance/dione/contracts/dioneOracle"
|
||||
"github.com/VictoriaMetrics/fastcache"
|
||||
"github.com/fxamacker/cbor/v2"
|
||||
)
|
||||
|
||||
const (
|
||||
// in megabytes
|
||||
DefaultEventLogCacheCapacity = 32000000
|
||||
)
|
||||
|
||||
type EventLogCache struct {
|
||||
cache *fastcache.Cache
|
||||
}
|
||||
|
||||
func NewEventLogCache() *EventLogCache {
|
||||
return &EventLogCache{
|
||||
cache: fastcache.New(DefaultEventLogCacheCapacity),
|
||||
}
|
||||
}
|
||||
|
||||
func (elc *EventLogCache) Store(key string, event interface{}) error {
|
||||
mRes, err := cbor.Marshal(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
elc.cache.SetBig([]byte(key), mRes)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (elc *EventLogCache) GetOracleRequestEvent(key string) (*dioneOracle.DioneOracleNewOracleRequest, error) {
|
||||
var mData []byte
|
||||
mData = elc.cache.GetBig(mData, []byte(key))
|
||||
|
||||
var event *dioneOracle.DioneOracleNewOracleRequest
|
||||
err := cbor.Unmarshal(mData, &event)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return event, nil
|
||||
}
|
||||
|
||||
func (elc *EventLogCache) Delete(key string) {
|
||||
elc.cache.Del([]byte(key))
|
||||
}
|
58
cache/event_redis_cache.go
vendored
58
cache/event_redis_cache.go
vendored
@ -1,58 +0,0 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/Secured-Finance/dione/config"
|
||||
"github.com/Secured-Finance/dione/contracts/dioneOracle"
|
||||
"github.com/fxamacker/cbor/v2"
|
||||
"github.com/go-redis/redis/v8"
|
||||
)
|
||||
|
||||
type EventRedisCache struct {
|
||||
Client *redis.Client
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func NewEventRedisCache(config *config.Config) *EventRedisCache {
|
||||
client := redis.NewClient(&redis.Options{
|
||||
Addr: config.Redis.Addr,
|
||||
Password: config.Redis.Password,
|
||||
DB: config.Redis.DB,
|
||||
})
|
||||
|
||||
return &EventRedisCache{
|
||||
Client: client,
|
||||
ctx: context.Background(),
|
||||
}
|
||||
}
|
||||
|
||||
func (erc *EventRedisCache) Store(key string, event interface{}) error {
|
||||
mRes, err := cbor.Marshal(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
erc.Client.Set(erc.ctx, key, mRes, 0)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (erc *EventRedisCache) GetOracleRequestEvent(key string) (*dioneOracle.DioneOracleNewOracleRequest, error) {
|
||||
mData, err := erc.Client.Get(erc.ctx, key).Bytes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var event *dioneOracle.DioneOracleNewOracleRequest
|
||||
err = cbor.Unmarshal(mData, &event)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return event, nil
|
||||
}
|
||||
|
||||
func (erc *EventRedisCache) Delete(key string) {
|
||||
erc.Client.Del(erc.ctx, key)
|
||||
}
|
50
cache/inmemory_cache.go
vendored
Normal file
50
cache/inmemory_cache.go
vendored
Normal file
@ -0,0 +1,50 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"github.com/VictoriaMetrics/fastcache"
|
||||
"github.com/fxamacker/cbor/v2"
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultInMemoryCacheCapacity is maximal in-memory cache size in bytes
|
||||
DefaultInMemoryCacheCapacity = 32000000
|
||||
)
|
||||
|
||||
type InMemoryCache struct {
|
||||
cache *fastcache.Cache
|
||||
}
|
||||
|
||||
func NewInMemoryCache() *InMemoryCache {
|
||||
return &InMemoryCache{
|
||||
cache: fastcache.New(DefaultInMemoryCacheCapacity),
|
||||
}
|
||||
}
|
||||
|
||||
func (imc *InMemoryCache) Store(key string, value interface{}) error {
|
||||
mRes, err := cbor.Marshal(value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
imc.cache.SetBig([]byte(key), mRes)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (imc *InMemoryCache) Get(key string, v interface{}) error {
|
||||
data := make([]byte, 0)
|
||||
imc.cache.GetBig(data, []byte(key))
|
||||
if len(data) == 0 {
|
||||
return ErrNilValue
|
||||
}
|
||||
err := cbor.Unmarshal(data, v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (imc *InMemoryCache) Delete(key string) {
|
||||
imc.cache.Del([]byte(key))
|
||||
}
|
58
cache/redis_cache.go
vendored
Normal file
58
cache/redis_cache.go
vendored
Normal file
@ -0,0 +1,58 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/Secured-Finance/dione/config"
|
||||
"github.com/fxamacker/cbor/v2"
|
||||
"github.com/go-redis/redis/v8"
|
||||
)
|
||||
|
||||
type RedisCache struct {
|
||||
Client *redis.Client
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func NewRedisCache(config *config.Config) *RedisCache {
|
||||
client := redis.NewClient(&redis.Options{
|
||||
Addr: config.Redis.Addr,
|
||||
Password: config.Redis.Password,
|
||||
DB: config.Redis.DB,
|
||||
})
|
||||
|
||||
return &RedisCache{
|
||||
Client: client,
|
||||
ctx: context.Background(),
|
||||
}
|
||||
}
|
||||
|
||||
func (rc *RedisCache) Store(key string, value interface{}) error {
|
||||
mRes, err := cbor.Marshal(value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rc.Client.Set(rc.ctx, key, mRes, 0)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rc *RedisCache) Get(key string, value interface{}) error {
|
||||
data, err := rc.Client.Get(rc.ctx, key).Bytes()
|
||||
if err != nil {
|
||||
if err == redis.Nil {
|
||||
return ErrNilValue
|
||||
}
|
||||
return err
|
||||
}
|
||||
err = cbor.Unmarshal(data, &value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rc *RedisCache) Delete(key string) {
|
||||
rc.Client.Del(rc.ctx, key)
|
||||
}
|
@ -24,7 +24,7 @@ type PBFTConsensusManager struct {
|
||||
consensusMap map[string]*Consensus
|
||||
ethereumClient *ethclient.EthereumClient
|
||||
miner *Miner
|
||||
eventCache cache.EventCache
|
||||
cache cache.Cache
|
||||
}
|
||||
|
||||
type Consensus struct {
|
||||
@ -34,7 +34,7 @@ type Consensus struct {
|
||||
Task *types2.DioneTask
|
||||
}
|
||||
|
||||
func NewPBFTConsensusManager(psb *pubsub.PubSubRouter, minApprovals int, privKey []byte, ethereumClient *ethclient.EthereumClient, miner *Miner, evc cache.EventCache) *PBFTConsensusManager {
|
||||
func NewPBFTConsensusManager(psb *pubsub.PubSubRouter, minApprovals int, privKey []byte, ethereumClient *ethclient.EthereumClient, miner *Miner, evc cache.Cache) *PBFTConsensusManager {
|
||||
pcm := &PBFTConsensusManager{}
|
||||
pcm.psb = psb
|
||||
pcm.miner = miner
|
||||
@ -43,7 +43,7 @@ func NewPBFTConsensusManager(psb *pubsub.PubSubRouter, minApprovals int, privKey
|
||||
pcm.minApprovals = minApprovals
|
||||
pcm.privKey = privKey
|
||||
pcm.ethereumClient = ethereumClient
|
||||
pcm.eventCache = evc
|
||||
pcm.cache = evc
|
||||
pcm.consensusMap = map[string]*Consensus{}
|
||||
pcm.psb.Hook(types.MessageTypePrePrepare, pcm.handlePrePrepare)
|
||||
pcm.psb.Hook(types.MessageTypePrepare, pcm.handlePrepare)
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"github.com/Secured-Finance/dione/cache"
|
||||
types2 "github.com/Secured-Finance/dione/consensus/types"
|
||||
"github.com/Secured-Finance/dione/consensus/validation"
|
||||
"github.com/Secured-Finance/dione/contracts/dioneOracle"
|
||||
"github.com/Secured-Finance/dione/types"
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/filecoin-project/go-state-types/crypto"
|
||||
@ -12,14 +13,14 @@ import (
|
||||
|
||||
type ConsensusValidator struct {
|
||||
validationFuncMap map[types2.MessageType]func(msg types2.Message) bool
|
||||
eventCache cache.EventCache
|
||||
cache cache.Cache
|
||||
miner *Miner
|
||||
}
|
||||
|
||||
func NewConsensusValidator(ec cache.EventCache, miner *Miner) *ConsensusValidator {
|
||||
func NewConsensusValidator(ec cache.Cache, miner *Miner) *ConsensusValidator {
|
||||
cv := &ConsensusValidator{
|
||||
eventCache: ec,
|
||||
miner: miner,
|
||||
cache: ec,
|
||||
miner: miner,
|
||||
}
|
||||
|
||||
cv.validationFuncMap = map[types2.MessageType]func(msg types2.Message) bool{
|
||||
@ -35,17 +36,19 @@ func NewConsensusValidator(ec cache.EventCache, miner *Miner) *ConsensusValidato
|
||||
}
|
||||
/////////////////////////////////
|
||||
|
||||
// === verify if request exists in event log cache ===
|
||||
requestEvent, err := cv.eventCache.GetOracleRequestEvent("request_" + consensusMsg.Task.RequestID)
|
||||
// === verify if request exists in cache ===
|
||||
var requestEvent *dioneOracle.DioneOracleNewOracleRequest
|
||||
err = cv.cache.Get("request_"+consensusMsg.Task.RequestID, &requestEvent)
|
||||
if err != nil {
|
||||
logrus.Errorf("the incoming request task event doesn't exist in the EVC, or is broken: %v", err)
|
||||
logrus.Errorf("the request doesn't exist in the cache or has been failed to decode: %v", err)
|
||||
return false
|
||||
}
|
||||
|
||||
if requestEvent.OriginChain != consensusMsg.Task.OriginChain ||
|
||||
requestEvent.RequestType != consensusMsg.Task.RequestType ||
|
||||
requestEvent.RequestParams != consensusMsg.Task.RequestParams {
|
||||
|
||||
logrus.Errorf("the incoming task and cached request event don't match!")
|
||||
logrus.Errorf("the incoming task and cached request requestEvent don't match!")
|
||||
return false
|
||||
}
|
||||
/////////////////////////////////
|
||||
|
2
go.mod
2
go.mod
@ -55,7 +55,7 @@ require (
|
||||
github.com/stretchr/testify v1.7.0
|
||||
github.com/tyler-smith/go-bip39 v1.1.0 // indirect
|
||||
github.com/valyala/fasthttp v1.17.0
|
||||
github.com/wealdtech/go-merkletree v1.0.1-0.20190605192610-2bb163c2ea2a // indirect
|
||||
github.com/wealdtech/go-merkletree v1.0.1-0.20190605192610-2bb163c2ea2a
|
||||
github.com/whyrusleeping/cbor-gen v0.0.0-20210219115102-f37d292932f2
|
||||
github.com/xorcare/golden v0.6.1-0.20191112154924-b87f686d7542
|
||||
go.uber.org/zap v1.16.0
|
||||
|
4
go.sum
4
go.sum
@ -1013,8 +1013,6 @@ github.com/libp2p/go-libp2p-core v0.6.0/go.mod h1:txwbVEhHEXikXn9gfC7/UDDw7rkxuX
|
||||
github.com/libp2p/go-libp2p-core v0.6.1/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
|
||||
github.com/libp2p/go-libp2p-core v0.7.0 h1:4a0TMjrWNTZlNvcqxZmrMRDi/NQWrhwO2pkTuLSQ/IQ=
|
||||
github.com/libp2p/go-libp2p-core v0.7.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
|
||||
github.com/libp2p/go-libp2p-core v0.8.5 h1:aEgbIcPGsKy6zYcC+5AJivYFedhYa4sW7mIpWpUaLKw=
|
||||
github.com/libp2p/go-libp2p-core v0.8.5/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
|
||||
github.com/libp2p/go-libp2p-crypto v0.0.1/go.mod h1:yJkNyDmO341d5wwXxDUGO0LykUVT72ImHNUqh5D/dBE=
|
||||
github.com/libp2p/go-libp2p-crypto v0.0.2/go.mod h1:eETI5OUfBnvARGOHrJz2eWNyTUxEGZnBxMcbUjfIj4I=
|
||||
github.com/libp2p/go-libp2p-crypto v0.1.0 h1:k9MFy+o2zGDNGsaoZl0MA3iZ75qXxr9OOoAZF+sD5OQ=
|
||||
@ -1709,8 +1707,6 @@ github.com/warpfork/go-wish v0.0.0-20180510122957-5ad1f5abf436/go.mod h1:x6AKhvS
|
||||
github.com/warpfork/go-wish v0.0.0-20190328234359-8b3e70f8e830/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw=
|
||||
github.com/warpfork/go-wish v0.0.0-20200122115046-b9ea61034e4a h1:G++j5e0OC488te356JvdhaM8YS6nMsjLAYF7JxCv07w=
|
||||
github.com/warpfork/go-wish v0.0.0-20200122115046-b9ea61034e4a/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw=
|
||||
github.com/wealdtech/go-merkletree v1.0.0 h1:DsF1xMzj5rK3pSQM6mPv8jlyJyHXhFxpnA2bwEjMMBY=
|
||||
github.com/wealdtech/go-merkletree v1.0.0/go.mod h1:cdil512d/8ZC7Kx3bfrDvGMQXB25NTKbsm0rFrmDax4=
|
||||
github.com/wealdtech/go-merkletree v1.0.1-0.20190605192610-2bb163c2ea2a h1:MwXxGlHLoTCM3/5nlvGJqSWhcmWtGuc6RBtUsTKJwvU=
|
||||
github.com/wealdtech/go-merkletree v1.0.1-0.20190605192610-2bb163c2ea2a/go.mod h1:Q/vZYhXjtE/oLDRqlGLWwRrNKZJAwidHlVfwIkKEH2w=
|
||||
github.com/weaveworks/common v0.0.0-20200512154658-384f10054ec5 h1:EYxr08r8x6r/5fLEAMMkida1BVgxVXE4LfZv/XV+znU=
|
||||
|
18
node/node.go
18
node/node.go
@ -64,7 +64,7 @@ type Node struct {
|
||||
Miner *consensus.Miner
|
||||
Beacon beacon.BeaconNetworks
|
||||
Wallet *wallet.LocalWallet
|
||||
EventCache cache.EventCache
|
||||
Cache cache.Cache
|
||||
DisputeManager *consensus.DisputeManager
|
||||
}
|
||||
|
||||
@ -130,7 +130,7 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim
|
||||
|
||||
// initialize event log cache subsystem
|
||||
eventCache := provideEventCache(config)
|
||||
n.EventCache = eventCache
|
||||
n.Cache = eventCache
|
||||
logrus.Info("Event cache subsystem has initialized!")
|
||||
|
||||
// initialize consensus subsystem
|
||||
@ -225,7 +225,7 @@ func (n *Node) subscribeOnEthContractsAsync(ctx context.Context) {
|
||||
select {
|
||||
case event := <-eventChan:
|
||||
{
|
||||
err := n.EventCache.Store("request_"+event.ReqID.String(), event)
|
||||
err := n.Cache.Store("request_"+event.ReqID.String(), event)
|
||||
if err != nil {
|
||||
logrus.Errorf("Failed to store new request event to event log cache: %v", err)
|
||||
}
|
||||
@ -255,15 +255,15 @@ func (n *Node) subscribeOnEthContractsAsync(ctx context.Context) {
|
||||
}()
|
||||
}
|
||||
|
||||
func provideEventCache(config *config.Config) cache.EventCache {
|
||||
var backend cache.EventCache
|
||||
func provideEventCache(config *config.Config) cache.Cache {
|
||||
var backend cache.Cache
|
||||
switch config.CacheType {
|
||||
case "in-memory":
|
||||
backend = cache.NewEventLogCache()
|
||||
backend = cache.NewInMemoryCache()
|
||||
case "redis":
|
||||
backend = cache.NewEventRedisCache(config)
|
||||
backend = cache.NewRedisCache(config)
|
||||
default:
|
||||
backend = cache.NewEventLogCache()
|
||||
backend = cache.NewInMemoryCache()
|
||||
}
|
||||
return backend
|
||||
}
|
||||
@ -332,7 +332,7 @@ func providePubsubRouter(lhost host.Host, config *config.Config) *pubsub2.PubSub
|
||||
return pubsub2.NewPubSubRouter(lhost, config.PubSub.ServiceTopicName, config.IsBootstrap)
|
||||
}
|
||||
|
||||
func provideConsensusManager(psb *pubsub2.PubSubRouter, miner *consensus.Miner, ethClient *ethclient.EthereumClient, privateKey []byte, minApprovals int, evc cache.EventCache) *consensus.PBFTConsensusManager {
|
||||
func provideConsensusManager(psb *pubsub2.PubSubRouter, miner *consensus.Miner, ethClient *ethclient.EthereumClient, privateKey []byte, minApprovals int, evc cache.Cache) *consensus.PBFTConsensusManager {
|
||||
return consensus.NewPBFTConsensusManager(psb, minApprovals, privateKey, ethClient, miner, evc)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user