From c1cb7a72f73dd3e0cd9d2276f2053db66ac32c6c Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Tue, 24 Aug 2021 22:36:42 +0300 Subject: [PATCH] Refactor cache subsystem for modularity --- blockchain/pool/mempool.go | 17 ++++-- blockchain/types/block.go | 2 +- blockchain/types/transaction.go | 2 +- blockchain/utils/verification.go | 4 +- cache/cache.go | 2 +- cache/cache_manager.go | 6 ++ cache/inmemory/cache.go | 71 +++++++++++++++++++++ cache/inmemory/cache_manager.go | 24 ++++++++ cache/inmemory_cache.go | 69 --------------------- cache/redis/cache.go | 102 +++++++++++++++++++++++++++++++ cache/redis/cache_manager.go | 34 +++++++++++ cache/redis_cache.go | 98 ----------------------------- config/config.go | 5 ++ consensus/dispute_manager.go | 6 +- node/node.go | 1 + node/node_dep_providers.go | 19 +++--- 16 files changed, 274 insertions(+), 188 deletions(-) create mode 100644 cache/cache_manager.go create mode 100644 cache/inmemory/cache.go create mode 100644 cache/inmemory/cache_manager.go delete mode 100644 cache/inmemory_cache.go create mode 100644 cache/redis/cache.go create mode 100644 cache/redis/cache_manager.go delete mode 100644 cache/redis_cache.go diff --git a/blockchain/pool/mempool.go b/blockchain/pool/mempool.go index 0a64d55..bdf21d6 100644 --- a/blockchain/pool/mempool.go +++ b/blockchain/pool/mempool.go @@ -31,12 +31,14 @@ type Mempool struct { bus EventBus.Bus } -func NewMempool(bus EventBus.Bus) (*Mempool, error) { +func NewMempool(bus EventBus.Bus, cm cache.CacheManager) (*Mempool, error) { mp := &Mempool{ - cache: cache.NewInMemoryCache(), // here we need to use separate cache + cache: cm.Cache("mempool"), bus: bus, } + logrus.Info("Mempool has been successfully initialized!") + return mp, nil } @@ -83,9 +85,14 @@ func (mp *Mempool) GetTransactionsForNewBlock() []*types2.Transaction { func (mp *Mempool) GetAllTransactions() []*types2.Transaction { var allTxs []*types2.Transaction - for _, v := range mp.cache.Items() { - tx := v.(*types2.Transaction) - allTxs = append(allTxs, tx) + for _, v := range mp.cache.Keys() { + var tx types2.Transaction + err := mp.cache.Get(v, &tx) + if err != nil { + logrus.Error(err) + continue + } + allTxs = append(allTxs, &tx) } return allTxs } diff --git a/blockchain/types/block.go b/blockchain/types/block.go index ce99ec2..f85b3e2 100644 --- a/blockchain/types/block.go +++ b/blockchain/types/block.go @@ -87,7 +87,7 @@ func CreateBlock(lastBlockHeader *BlockHeader, txs []*Transaction, minerEth comm if err != nil { return nil, err } - tx.MerkleProof = mp + tx.MerkleProof = *mp } block := &Block{ diff --git a/blockchain/types/transaction.go b/blockchain/types/transaction.go index 96481bf..bce6ce4 100644 --- a/blockchain/types/transaction.go +++ b/blockchain/types/transaction.go @@ -13,7 +13,7 @@ import ( type Transaction struct { Hash []byte - MerkleProof *merkletree.Proof // sets when transaction is added to block + MerkleProof merkletree.Proof // sets when transaction is added to block Timestamp time.Time Data []byte } diff --git a/blockchain/utils/verification.go b/blockchain/utils/verification.go index 21e9742..8af44c4 100644 --- a/blockchain/utils/verification.go +++ b/blockchain/utils/verification.go @@ -9,10 +9,10 @@ import ( ) func VerifyTx(blockHeader *types.BlockHeader, tx *types.Transaction) error { - if tx.MerkleProof == nil { + if tx.MerkleProof.Hashes == nil { return fmt.Errorf("block transaction doesn't have merkle proof") } - txProofVerified, err := merkletree.VerifyProofUsing(tx.Hash, true, tx.MerkleProof, [][]byte{blockHeader.Hash}, keccak256.New()) + txProofVerified, err := merkletree.VerifyProofUsing(tx.Hash, true, &tx.MerkleProof, [][]byte{blockHeader.Hash}, keccak256.New()) if err != nil { return fmt.Errorf("failed to verify tx hash merkle proof: %s", err.Error()) } diff --git a/cache/cache.go b/cache/cache.go index 96f37fe..0d8aa49 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -12,6 +12,6 @@ type Cache interface { StoreWithTTL(key string, value interface{}, ttl time.Duration) error Get(key string, value interface{}) error Delete(key string) - Items() map[string]interface{} + Keys() []string Exists(key string) bool } diff --git a/cache/cache_manager.go b/cache/cache_manager.go new file mode 100644 index 0000000..c94906d --- /dev/null +++ b/cache/cache_manager.go @@ -0,0 +1,6 @@ +package cache + +type CacheManager interface { + // Cache returns kv cache with specific name + Cache(name string) Cache +} diff --git a/cache/inmemory/cache.go b/cache/inmemory/cache.go new file mode 100644 index 0000000..e67aeae --- /dev/null +++ b/cache/inmemory/cache.go @@ -0,0 +1,71 @@ +package inmemory + +import ( + "fmt" + "reflect" + "time" + + "github.com/Secured-Finance/dione/cache" + + cache2 "github.com/patrickmn/go-cache" +) + +const ( + DefaultCacheExpiration = 5 * time.Minute + DefaultGCInterval = 10 * time.Minute +) + +type Cache struct { + cache *cache2.Cache +} + +func NewCache() *Cache { + return &Cache{ + cache: cache2.New(DefaultCacheExpiration, DefaultGCInterval), + } +} + +func (imc *Cache) Store(key string, value interface{}) error { + imc.cache.Set(key, value, cache2.NoExpiration) + + return nil +} + +func (imc *Cache) StoreWithTTL(key string, value interface{}, ttl time.Duration) error { + imc.cache.Set(key, value, ttl) + return nil +} + +func (imc *Cache) Get(key string, value interface{}) error { + v, exists := imc.cache.Get(key) + if !exists { + return cache.ErrNotFound + } + reflectedValue := reflect.ValueOf(value) + if reflectedValue.Kind() != reflect.Ptr { + return fmt.Errorf("value isn't a pointer") + } + if reflectedValue.IsNil() { + reflectedValue.Set(reflect.New(reflectedValue.Type().Elem())) + } + reflectedValue.Elem().Set(reflect.ValueOf(v).Elem()) + + return nil +} + +func (imc *Cache) Delete(key string) { + imc.cache.Delete(key) +} + +func (imc *Cache) Keys() []string { + var keys []string + for k := range imc.cache.Items() { + keys = append(keys, k) + } + return keys +} + +func (imc *Cache) Exists(key string) bool { + _, exists := imc.cache.Get(key) + return exists +} diff --git a/cache/inmemory/cache_manager.go b/cache/inmemory/cache_manager.go new file mode 100644 index 0000000..4f0c388 --- /dev/null +++ b/cache/inmemory/cache_manager.go @@ -0,0 +1,24 @@ +package inmemory + +import "github.com/Secured-Finance/dione/cache" + +type CacheManager struct { + caches map[string]*Cache +} + +func NewCacheManager() cache.CacheManager { + return &CacheManager{ + caches: map[string]*Cache{}, + } +} + +func (cm *CacheManager) Cache(name string) cache.Cache { + var c *Cache + if v, ok := cm.caches[name]; !ok { + c = NewCache() + cm.caches[name] = c + } else { + c = v + } + return c +} diff --git a/cache/inmemory_cache.go b/cache/inmemory_cache.go deleted file mode 100644 index 0acef75..0000000 --- a/cache/inmemory_cache.go +++ /dev/null @@ -1,69 +0,0 @@ -package cache - -import ( - "fmt" - "reflect" - "time" - - "github.com/patrickmn/go-cache" -) - -const ( - DefaultCacheExpiration = 5 * time.Minute - DefaultGCInterval = 10 * time.Minute -) - -type InMemoryCache struct { - cache *cache.Cache -} - -func NewInMemoryCache() Cache { - return &InMemoryCache{ - cache: cache.New(DefaultCacheExpiration, DefaultGCInterval), - } -} - -func (imc *InMemoryCache) Store(key string, value interface{}) error { - imc.cache.Set(key, value, cache.NoExpiration) - - return nil -} - -func (imc *InMemoryCache) StoreWithTTL(key string, value interface{}, ttl time.Duration) error { - imc.cache.Set(key, value, ttl) - return nil -} - -func (imc *InMemoryCache) Get(key string, value interface{}) error { - v, exists := imc.cache.Get(key) - if !exists { - return ErrNotFound - } - reflectedValue := reflect.ValueOf(value) - if reflectedValue.Kind() != reflect.Ptr { - return fmt.Errorf("value isn't a pointer") - } - if reflectedValue.IsNil() { - reflectedValue.Set(reflect.New(reflectedValue.Type().Elem())) - } - reflectedValue.Elem().Set(reflect.ValueOf(v).Elem()) - - return nil -} - -func (imc *InMemoryCache) Delete(key string) { - imc.cache.Delete(key) -} - -func (imc *InMemoryCache) Items() map[string]interface{} { - m := make(map[string]interface{}) - for k, v := range imc.cache.Items() { - m[k] = v.Object - } - return m -} - -func (imc *InMemoryCache) Exists(key string) bool { - _, exists := imc.cache.Get(key) - return exists -} diff --git a/cache/redis/cache.go b/cache/redis/cache.go new file mode 100644 index 0000000..3c4e6a9 --- /dev/null +++ b/cache/redis/cache.go @@ -0,0 +1,102 @@ +package redis + +import ( + "bytes" + "context" + "encoding/gob" + "errors" + "time" + + "github.com/sirupsen/logrus" + + "github.com/Secured-Finance/dione/cache" + + "github.com/go-redis/redis/v8" +) + +type Cache struct { + redisClient *redis.Client + ctx context.Context + name string +} + +func NewCache(redisClient *redis.Client, name string) *Cache { + return &Cache{ + redisClient: redisClient, + ctx: context.Background(), + name: name, + } +} + +func (rc *Cache) Store(key string, value interface{}) error { + data, err := gobMarshal(value) + if err != nil { + return err + } + + rc.redisClient.Set(rc.ctx, rc.name+"/"+key, data, 0) + + return nil +} + +func (rc *Cache) StoreWithTTL(key string, value interface{}, ttl time.Duration) error { + data, err := gobMarshal(value) + if err != nil { + return err + } + rc.redisClient.Set(rc.ctx, rc.name+"/"+key, data, ttl) + + return nil +} + +func gobMarshal(val interface{}) ([]byte, error) { + buf := bytes.NewBuffer(nil) + enc := gob.NewEncoder(buf) + err := enc.Encode(val) + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func gobUnmarshal(data []byte, val interface{}) error { + buf := bytes.NewBuffer(data) + dec := gob.NewDecoder(buf) + return dec.Decode(&val) +} + +func (rc *Cache) Get(key string, value interface{}) error { + data, err := rc.redisClient.Get(rc.ctx, rc.name+"/"+key).Bytes() + if err != nil { + if errors.Is(err, redis.Nil) { + return cache.ErrNotFound + } + return err + } + + return gobUnmarshal(data, &value) +} + +func (rc *Cache) Delete(key string) { + rc.redisClient.Del(rc.ctx, rc.name+"/"+key) +} + +func (rc *Cache) Keys() []string { + res := rc.redisClient.Keys(rc.ctx, rc.name+"/"+"*") + if res.Err() != nil { + logrus.Error(res.Err()) + return nil + } + return res.Val() +} + +func (rc *Cache) Exists(key string) bool { + res := rc.redisClient.Exists(rc.ctx, rc.name+"/"+key) + if res.Err() != nil { + return false + } + if res.Val() == 0 { + return false + } + return true +} diff --git a/cache/redis/cache_manager.go b/cache/redis/cache_manager.go new file mode 100644 index 0000000..e509e4b --- /dev/null +++ b/cache/redis/cache_manager.go @@ -0,0 +1,34 @@ +package redis + +import ( + "github.com/Secured-Finance/dione/cache" + "github.com/Secured-Finance/dione/config" + "github.com/go-redis/redis/v8" +) + +type CacheManager struct { + redisClient *redis.Client + caches map[string]*Cache +} + +func NewCacheManager(cfg *config.Config) cache.CacheManager { + redisClient := redis.NewClient(&redis.Options{ + Addr: cfg.Redis.Addr, + Password: cfg.Redis.Password, + DB: cfg.Redis.DB, + }) + return &CacheManager{ + redisClient: redisClient, + } +} + +func (cm *CacheManager) Cache(name string) cache.Cache { + var c *Cache + if v, ok := cm.caches[name]; !ok { + c = NewCache(cm.redisClient, name) + cm.caches[name] = c + } else { + c = v + } + return c +} diff --git a/cache/redis_cache.go b/cache/redis_cache.go deleted file mode 100644 index 9eacdab..0000000 --- a/cache/redis_cache.go +++ /dev/null @@ -1,98 +0,0 @@ -package cache - -import ( - "bytes" - "context" - "encoding/gob" - "errors" - "time" - - "github.com/Secured-Finance/dione/config" - "github.com/go-redis/redis/v8" -) - -type RedisCache struct { - Client *redis.Client - ctx context.Context -} - -func NewRedisCache(config *config.Config) *RedisCache { - client := redis.NewClient(&redis.Options{ - Addr: config.Redis.Addr, - Password: config.Redis.Password, - DB: config.Redis.DB, - }) - - return &RedisCache{ - Client: client, - ctx: context.Background(), - } -} - -func (rc *RedisCache) Store(key string, value interface{}) error { - data, err := gobMarshal(value) - if err != nil { - return err - } - - rc.Client.Set(rc.ctx, key, data, 0) - - return nil -} - -func (rc *RedisCache) StoreWithTTL(key string, value interface{}, ttl time.Duration) error { - data, err := gobMarshal(value) - if err != nil { - return err - } - rc.Client.Set(rc.ctx, key, data, ttl) - - return nil -} - -func gobMarshal(val interface{}) ([]byte, error) { - buf := bytes.NewBuffer(nil) - enc := gob.NewEncoder(buf) - err := enc.Encode(val) - if err != nil { - return nil, err - } - return buf.Bytes(), nil -} - -func gobUnmarshal(data []byte, val interface{}) error { - buf := bytes.NewBuffer(data) - dec := gob.NewDecoder(buf) - return dec.Decode(&val) -} - -func (rc *RedisCache) Get(key string, value interface{}) error { - data, err := rc.Client.Get(rc.ctx, key).Bytes() - if err != nil { - if errors.Is(err, redis.Nil) { - return ErrNotFound - } - return err - } - - return gobUnmarshal(data, &value) -} - -func (rc *RedisCache) Delete(key string) { - rc.Client.Del(rc.ctx, key) -} - -func (rc *RedisCache) Items() map[string]interface{} { - return nil // TODO -} - -func (rc *RedisCache) Exists(key string) bool { - res := rc.Client.Exists(context.TODO(), key) - if res.Err() != nil { - return false - } - if res.Val() == 0 { - return false - } - return true -} diff --git a/config/config.go b/config/config.go index 337a6b7..c07550d 100644 --- a/config/config.go +++ b/config/config.go @@ -6,6 +6,11 @@ import ( "github.com/spf13/viper" ) +const ( + CacheTypeInMemory = "in-memory" + CacheTypeRedis = "redis" +) + type Config struct { ListenPort int `mapstructure:"listen_port"` ListenAddr string `mapstructure:"listen_addr"` diff --git a/consensus/dispute_manager.go b/consensus/dispute_manager.go index 1664c14..cfbd6c9 100644 --- a/consensus/dispute_manager.go +++ b/consensus/dispute_manager.go @@ -65,7 +65,7 @@ type Submission struct { Checked bool } -func NewDisputeManager(bus EventBus.Bus, ethClient *ethclient.EthereumClient, bc *blockchain.BlockChain, cfg *config.Config) (*DisputeManager, error) { +func NewDisputeManager(bus EventBus.Bus, ethClient *ethclient.EthereumClient, bc *blockchain.BlockChain, cfg *config.Config, cm cache.CacheManager) (*DisputeManager, error) { ctx := context.TODO() submissionChan, submSubscription, err := ethClient.SubscribeOnNewSubmissions(ctx) @@ -86,10 +86,10 @@ func NewDisputeManager(bus EventBus.Bus, ethClient *ethclient.EthereumClient, bc blockchain: bc, submissionChan: submissionChan, submissionEthSubscription: submSubscription, - submissionCache: cache.NewInMemoryCache(), // FIXME + submissionCache: cm.Cache("submissions"), disputesChan: disputesChan, disputeEthSubscription: dispSubscription, - disputeCache: cache.NewInMemoryCache(), // FIXME + disputeCache: cm.Cache("disputes"), } return dm, nil diff --git a/node/node.go b/node/node.go index 76a5727..e6eb22a 100644 --- a/node/node.go +++ b/node/node.go @@ -208,6 +208,7 @@ func Start() { provideDirectRPCClient, provideConsensusManager, consensus.NewDisputeManager, + provideCacheManager, ), fx.Invoke( configureLogger, diff --git a/node/node_dep_providers.go b/node/node_dep_providers.go index 1c0d083..c51a490 100644 --- a/node/node_dep_providers.go +++ b/node/node_dep_providers.go @@ -11,6 +11,9 @@ import ( "path/filepath" "runtime" + "github.com/Secured-Finance/dione/cache/inmemory" + "github.com/Secured-Finance/dione/cache/redis" + types2 "github.com/Secured-Finance/dione/blockchain/types" "github.com/Secured-Finance/dione/rpc" @@ -55,15 +58,15 @@ const ( DioneProtocolID = protocol.ID("/dione/1.0") ) -func provideCache(config *config.Config) cache.Cache { - var backend cache.Cache - switch config.CacheType { - case "in-memory": - backend = cache.NewInMemoryCache() - case "redis": - backend = cache.NewRedisCache(config) +func provideCacheManager(cfg *config.Config) cache.CacheManager { + var backend cache.CacheManager + switch cfg.CacheType { + case config.CacheTypeInMemory: + backend = inmemory.NewCacheManager() + case config.CacheTypeRedis: + backend = redis.NewCacheManager(cfg) default: - backend = cache.NewInMemoryCache() + backend = inmemory.NewCacheManager() } return backend }