From 7e3fca5de59884f1ee43faa37e97be35aa894a25 Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Thu, 13 May 2021 14:49:38 +0300 Subject: [PATCH] Refactor cache package to be more general purpose component --- cache/cache.go | 11 ++++++ cache/event_cache_interface.go | 9 ----- cache/event_log_cache.go | 50 --------------------------- cache/event_redis_cache.go | 58 -------------------------------- cache/inmemory_cache.go | 50 +++++++++++++++++++++++++++ cache/redis_cache.go | 58 ++++++++++++++++++++++++++++++++ consensus/consensus.go | 6 ++-- consensus/consensus_validator.go | 19 ++++++----- go.mod | 2 +- go.sum | 4 --- node/node.go | 18 +++++----- 11 files changed, 143 insertions(+), 142 deletions(-) create mode 100644 cache/cache.go delete mode 100644 cache/event_cache_interface.go delete mode 100644 cache/event_log_cache.go delete mode 100644 cache/event_redis_cache.go create mode 100644 cache/inmemory_cache.go create mode 100644 cache/redis_cache.go diff --git a/cache/cache.go b/cache/cache.go new file mode 100644 index 0000000..0783f34 --- /dev/null +++ b/cache/cache.go @@ -0,0 +1,11 @@ +package cache + +import "errors" + +var ErrNilValue = errors.New("value is empty") + +type Cache interface { + Store(key string, value interface{}) error + Get(key string, value interface{}) error + Delete(key string) +} diff --git a/cache/event_cache_interface.go b/cache/event_cache_interface.go deleted file mode 100644 index e9f5f39..0000000 --- a/cache/event_cache_interface.go +++ /dev/null @@ -1,9 +0,0 @@ -package cache - -import "github.com/Secured-Finance/dione/contracts/dioneOracle" - -type EventCache interface { - Store(key string, event interface{}) error - GetOracleRequestEvent(key string) (*dioneOracle.DioneOracleNewOracleRequest, error) - Delete(key string) -} diff --git a/cache/event_log_cache.go b/cache/event_log_cache.go deleted file mode 100644 index 4a5ec61..0000000 --- a/cache/event_log_cache.go +++ /dev/null @@ -1,50 +0,0 @@ -package cache - -import ( - "github.com/Secured-Finance/dione/contracts/dioneOracle" - "github.com/VictoriaMetrics/fastcache" - "github.com/fxamacker/cbor/v2" -) - -const ( - // in megabytes - DefaultEventLogCacheCapacity = 32000000 -) - -type EventLogCache struct { - cache *fastcache.Cache -} - -func NewEventLogCache() *EventLogCache { - return &EventLogCache{ - cache: fastcache.New(DefaultEventLogCacheCapacity), - } -} - -func (elc *EventLogCache) Store(key string, event interface{}) error { - mRes, err := cbor.Marshal(event) - if err != nil { - return err - } - - elc.cache.SetBig([]byte(key), mRes) - - return nil -} - -func (elc *EventLogCache) GetOracleRequestEvent(key string) (*dioneOracle.DioneOracleNewOracleRequest, error) { - var mData []byte - mData = elc.cache.GetBig(mData, []byte(key)) - - var event *dioneOracle.DioneOracleNewOracleRequest - err := cbor.Unmarshal(mData, &event) - if err != nil { - return nil, err - } - - return event, nil -} - -func (elc *EventLogCache) Delete(key string) { - elc.cache.Del([]byte(key)) -} diff --git a/cache/event_redis_cache.go b/cache/event_redis_cache.go deleted file mode 100644 index f259ddf..0000000 --- a/cache/event_redis_cache.go +++ /dev/null @@ -1,58 +0,0 @@ -package cache - -import ( - "context" - - "github.com/Secured-Finance/dione/config" - "github.com/Secured-Finance/dione/contracts/dioneOracle" - "github.com/fxamacker/cbor/v2" - "github.com/go-redis/redis/v8" -) - -type EventRedisCache struct { - Client *redis.Client - ctx context.Context -} - -func NewEventRedisCache(config *config.Config) *EventRedisCache { - client := redis.NewClient(&redis.Options{ - Addr: config.Redis.Addr, - Password: config.Redis.Password, - DB: config.Redis.DB, - }) - - return &EventRedisCache{ - Client: client, - ctx: context.Background(), - } -} - -func (erc *EventRedisCache) Store(key string, event interface{}) error { - mRes, err := cbor.Marshal(event) - if err != nil { - return err - } - - erc.Client.Set(erc.ctx, key, mRes, 0) - - return nil -} - -func (erc *EventRedisCache) GetOracleRequestEvent(key string) (*dioneOracle.DioneOracleNewOracleRequest, error) { - mData, err := erc.Client.Get(erc.ctx, key).Bytes() - if err != nil { - return nil, err - } - - var event *dioneOracle.DioneOracleNewOracleRequest - err = cbor.Unmarshal(mData, &event) - if err != nil { - return nil, err - } - - return event, nil -} - -func (erc *EventRedisCache) Delete(key string) { - erc.Client.Del(erc.ctx, key) -} diff --git a/cache/inmemory_cache.go b/cache/inmemory_cache.go new file mode 100644 index 0000000..80b84fd --- /dev/null +++ b/cache/inmemory_cache.go @@ -0,0 +1,50 @@ +package cache + +import ( + "github.com/VictoriaMetrics/fastcache" + "github.com/fxamacker/cbor/v2" +) + +const ( + // DefaultInMemoryCacheCapacity is maximal in-memory cache size in bytes + DefaultInMemoryCacheCapacity = 32000000 +) + +type InMemoryCache struct { + cache *fastcache.Cache +} + +func NewInMemoryCache() *InMemoryCache { + return &InMemoryCache{ + cache: fastcache.New(DefaultInMemoryCacheCapacity), + } +} + +func (imc *InMemoryCache) Store(key string, value interface{}) error { + mRes, err := cbor.Marshal(value) + if err != nil { + return err + } + + imc.cache.SetBig([]byte(key), mRes) + + return nil +} + +func (imc *InMemoryCache) Get(key string, v interface{}) error { + data := make([]byte, 0) + imc.cache.GetBig(data, []byte(key)) + if len(data) == 0 { + return ErrNilValue + } + err := cbor.Unmarshal(data, v) + if err != nil { + return err + } + + return nil +} + +func (imc *InMemoryCache) Delete(key string) { + imc.cache.Del([]byte(key)) +} diff --git a/cache/redis_cache.go b/cache/redis_cache.go new file mode 100644 index 0000000..70b9f98 --- /dev/null +++ b/cache/redis_cache.go @@ -0,0 +1,58 @@ +package cache + +import ( + "context" + + "github.com/Secured-Finance/dione/config" + "github.com/fxamacker/cbor/v2" + "github.com/go-redis/redis/v8" +) + +type RedisCache struct { + Client *redis.Client + ctx context.Context +} + +func NewRedisCache(config *config.Config) *RedisCache { + client := redis.NewClient(&redis.Options{ + Addr: config.Redis.Addr, + Password: config.Redis.Password, + DB: config.Redis.DB, + }) + + return &RedisCache{ + Client: client, + ctx: context.Background(), + } +} + +func (rc *RedisCache) Store(key string, value interface{}) error { + mRes, err := cbor.Marshal(value) + if err != nil { + return err + } + + rc.Client.Set(rc.ctx, key, mRes, 0) + + return nil +} + +func (rc *RedisCache) Get(key string, value interface{}) error { + data, err := rc.Client.Get(rc.ctx, key).Bytes() + if err != nil { + if err == redis.Nil { + return ErrNilValue + } + return err + } + err = cbor.Unmarshal(data, &value) + if err != nil { + return err + } + + return nil +} + +func (rc *RedisCache) Delete(key string) { + rc.Client.Del(rc.ctx, key) +} diff --git a/consensus/consensus.go b/consensus/consensus.go index 4d94982..d63e0f6 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -24,7 +24,7 @@ type PBFTConsensusManager struct { consensusMap map[string]*Consensus ethereumClient *ethclient.EthereumClient miner *Miner - eventCache cache.EventCache + cache cache.Cache } type Consensus struct { @@ -34,7 +34,7 @@ type Consensus struct { Task *types2.DioneTask } -func NewPBFTConsensusManager(psb *pubsub.PubSubRouter, minApprovals int, privKey []byte, ethereumClient *ethclient.EthereumClient, miner *Miner, evc cache.EventCache) *PBFTConsensusManager { +func NewPBFTConsensusManager(psb *pubsub.PubSubRouter, minApprovals int, privKey []byte, ethereumClient *ethclient.EthereumClient, miner *Miner, evc cache.Cache) *PBFTConsensusManager { pcm := &PBFTConsensusManager{} pcm.psb = psb pcm.miner = miner @@ -43,7 +43,7 @@ func NewPBFTConsensusManager(psb *pubsub.PubSubRouter, minApprovals int, privKey pcm.minApprovals = minApprovals pcm.privKey = privKey pcm.ethereumClient = ethereumClient - pcm.eventCache = evc + pcm.cache = evc pcm.consensusMap = map[string]*Consensus{} pcm.psb.Hook(types.MessageTypePrePrepare, pcm.handlePrePrepare) pcm.psb.Hook(types.MessageTypePrepare, pcm.handlePrepare) diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index 6ca65cd..4c37be8 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -4,6 +4,7 @@ import ( "github.com/Secured-Finance/dione/cache" types2 "github.com/Secured-Finance/dione/consensus/types" "github.com/Secured-Finance/dione/consensus/validation" + "github.com/Secured-Finance/dione/contracts/dioneOracle" "github.com/Secured-Finance/dione/types" "github.com/ethereum/go-ethereum/common" "github.com/filecoin-project/go-state-types/crypto" @@ -12,14 +13,14 @@ import ( type ConsensusValidator struct { validationFuncMap map[types2.MessageType]func(msg types2.Message) bool - eventCache cache.EventCache + cache cache.Cache miner *Miner } -func NewConsensusValidator(ec cache.EventCache, miner *Miner) *ConsensusValidator { +func NewConsensusValidator(ec cache.Cache, miner *Miner) *ConsensusValidator { cv := &ConsensusValidator{ - eventCache: ec, - miner: miner, + cache: ec, + miner: miner, } cv.validationFuncMap = map[types2.MessageType]func(msg types2.Message) bool{ @@ -35,17 +36,19 @@ func NewConsensusValidator(ec cache.EventCache, miner *Miner) *ConsensusValidato } ///////////////////////////////// - // === verify if request exists in event log cache === - requestEvent, err := cv.eventCache.GetOracleRequestEvent("request_" + consensusMsg.Task.RequestID) + // === verify if request exists in cache === + var requestEvent *dioneOracle.DioneOracleNewOracleRequest + err = cv.cache.Get("request_"+consensusMsg.Task.RequestID, &requestEvent) if err != nil { - logrus.Errorf("the incoming request task event doesn't exist in the EVC, or is broken: %v", err) + logrus.Errorf("the request doesn't exist in the cache or has been failed to decode: %v", err) return false } + if requestEvent.OriginChain != consensusMsg.Task.OriginChain || requestEvent.RequestType != consensusMsg.Task.RequestType || requestEvent.RequestParams != consensusMsg.Task.RequestParams { - logrus.Errorf("the incoming task and cached request event don't match!") + logrus.Errorf("the incoming task and cached request requestEvent don't match!") return false } ///////////////////////////////// diff --git a/go.mod b/go.mod index 4c33a81..a677663 100644 --- a/go.mod +++ b/go.mod @@ -55,7 +55,7 @@ require ( github.com/stretchr/testify v1.7.0 github.com/tyler-smith/go-bip39 v1.1.0 // indirect github.com/valyala/fasthttp v1.17.0 - github.com/wealdtech/go-merkletree v1.0.1-0.20190605192610-2bb163c2ea2a // indirect + github.com/wealdtech/go-merkletree v1.0.1-0.20190605192610-2bb163c2ea2a github.com/whyrusleeping/cbor-gen v0.0.0-20210219115102-f37d292932f2 github.com/xorcare/golden v0.6.1-0.20191112154924-b87f686d7542 go.uber.org/zap v1.16.0 diff --git a/go.sum b/go.sum index aaf874f..b86294b 100644 --- a/go.sum +++ b/go.sum @@ -1013,8 +1013,6 @@ github.com/libp2p/go-libp2p-core v0.6.0/go.mod h1:txwbVEhHEXikXn9gfC7/UDDw7rkxuX github.com/libp2p/go-libp2p-core v0.6.1/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= github.com/libp2p/go-libp2p-core v0.7.0 h1:4a0TMjrWNTZlNvcqxZmrMRDi/NQWrhwO2pkTuLSQ/IQ= github.com/libp2p/go-libp2p-core v0.7.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= -github.com/libp2p/go-libp2p-core v0.8.5 h1:aEgbIcPGsKy6zYcC+5AJivYFedhYa4sW7mIpWpUaLKw= -github.com/libp2p/go-libp2p-core v0.8.5/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= github.com/libp2p/go-libp2p-crypto v0.0.1/go.mod h1:yJkNyDmO341d5wwXxDUGO0LykUVT72ImHNUqh5D/dBE= github.com/libp2p/go-libp2p-crypto v0.0.2/go.mod h1:eETI5OUfBnvARGOHrJz2eWNyTUxEGZnBxMcbUjfIj4I= github.com/libp2p/go-libp2p-crypto v0.1.0 h1:k9MFy+o2zGDNGsaoZl0MA3iZ75qXxr9OOoAZF+sD5OQ= @@ -1709,8 +1707,6 @@ github.com/warpfork/go-wish v0.0.0-20180510122957-5ad1f5abf436/go.mod h1:x6AKhvS github.com/warpfork/go-wish v0.0.0-20190328234359-8b3e70f8e830/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= github.com/warpfork/go-wish v0.0.0-20200122115046-b9ea61034e4a h1:G++j5e0OC488te356JvdhaM8YS6nMsjLAYF7JxCv07w= github.com/warpfork/go-wish v0.0.0-20200122115046-b9ea61034e4a/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= -github.com/wealdtech/go-merkletree v1.0.0 h1:DsF1xMzj5rK3pSQM6mPv8jlyJyHXhFxpnA2bwEjMMBY= -github.com/wealdtech/go-merkletree v1.0.0/go.mod h1:cdil512d/8ZC7Kx3bfrDvGMQXB25NTKbsm0rFrmDax4= github.com/wealdtech/go-merkletree v1.0.1-0.20190605192610-2bb163c2ea2a h1:MwXxGlHLoTCM3/5nlvGJqSWhcmWtGuc6RBtUsTKJwvU= github.com/wealdtech/go-merkletree v1.0.1-0.20190605192610-2bb163c2ea2a/go.mod h1:Q/vZYhXjtE/oLDRqlGLWwRrNKZJAwidHlVfwIkKEH2w= github.com/weaveworks/common v0.0.0-20200512154658-384f10054ec5 h1:EYxr08r8x6r/5fLEAMMkida1BVgxVXE4LfZv/XV+znU= diff --git a/node/node.go b/node/node.go index f429497..94ecbe2 100644 --- a/node/node.go +++ b/node/node.go @@ -64,7 +64,7 @@ type Node struct { Miner *consensus.Miner Beacon beacon.BeaconNetworks Wallet *wallet.LocalWallet - EventCache cache.EventCache + Cache cache.Cache DisputeManager *consensus.DisputeManager } @@ -130,7 +130,7 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim // initialize event log cache subsystem eventCache := provideEventCache(config) - n.EventCache = eventCache + n.Cache = eventCache logrus.Info("Event cache subsystem has initialized!") // initialize consensus subsystem @@ -225,7 +225,7 @@ func (n *Node) subscribeOnEthContractsAsync(ctx context.Context) { select { case event := <-eventChan: { - err := n.EventCache.Store("request_"+event.ReqID.String(), event) + err := n.Cache.Store("request_"+event.ReqID.String(), event) if err != nil { logrus.Errorf("Failed to store new request event to event log cache: %v", err) } @@ -255,15 +255,15 @@ func (n *Node) subscribeOnEthContractsAsync(ctx context.Context) { }() } -func provideEventCache(config *config.Config) cache.EventCache { - var backend cache.EventCache +func provideEventCache(config *config.Config) cache.Cache { + var backend cache.Cache switch config.CacheType { case "in-memory": - backend = cache.NewEventLogCache() + backend = cache.NewInMemoryCache() case "redis": - backend = cache.NewEventRedisCache(config) + backend = cache.NewRedisCache(config) default: - backend = cache.NewEventLogCache() + backend = cache.NewInMemoryCache() } return backend } @@ -332,7 +332,7 @@ func providePubsubRouter(lhost host.Host, config *config.Config) *pubsub2.PubSub return pubsub2.NewPubSubRouter(lhost, config.PubSub.ServiceTopicName, config.IsBootstrap) } -func provideConsensusManager(psb *pubsub2.PubSubRouter, miner *consensus.Miner, ethClient *ethclient.EthereumClient, privateKey []byte, minApprovals int, evc cache.EventCache) *consensus.PBFTConsensusManager { +func provideConsensusManager(psb *pubsub2.PubSubRouter, miner *consensus.Miner, ethClient *ethclient.EthereumClient, privateKey []byte, minApprovals int, evc cache.Cache) *consensus.PBFTConsensusManager { return consensus.NewPBFTConsensusManager(psb, minApprovals, privateKey, ethClient, miner, evc) }