From 027f0607032617ba847ed72a29c8191ae185d545 Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Tue, 1 Dec 2020 01:38:54 +0400 Subject: [PATCH] Implement basic event log cache --- go.mod | 1 + go.sum | 3 +++ node/event_log_cache.go | 49 +++++++++++++++++++++++++++++++++++++++++ node/node.go | 13 +++++++++++ 4 files changed, 66 insertions(+) create mode 100644 node/event_log_cache.go diff --git a/go.mod b/go.mod index 61c2332..c8faf63 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.14 require ( github.com/Secured-Finance/go-libp2p-pex v1.0.1 + github.com/VictoriaMetrics/fastcache v1.5.7 github.com/allegro/bigcache v1.2.1 // indirect github.com/aristanetworks/goarista v0.0.0-20200224203130-895b4c57c44d // indirect github.com/asaskevich/govalidator v0.0.0-20200907205600-7a23bdc65eef // indirect diff --git a/go.sum b/go.sum index 9d3fa2f..319f1b6 100644 --- a/go.sum +++ b/go.sum @@ -29,6 +29,8 @@ github.com/Secured-Finance/go-libp2p-pex v1.0.1/go.mod h1:Q4llSmPACDBpWvqHT/nU95 github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/sarama v1.23.1/go.mod h1:XLH1GYJnLVE0XCr6KdJGVJRTwY30moWNJ4sERjXX6fs= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= +github.com/VictoriaMetrics/fastcache v1.5.7 h1:4y6y0G8PRzszQUYIQHHssv/jgPHAb5qQuuDNdCbyAgw= +github.com/VictoriaMetrics/fastcache v1.5.7/go.mod h1:ptDBkNMQI4RtmVo8VS/XwRY6RoTu1dAWCbrk+6WsEM8= github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c= @@ -39,6 +41,7 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/allegro/bigcache v1.2.1 h1:hg1sY1raCwic3Vnsvje6TT7/pnZba83LeFck5NrFKSc= github.com/allegro/bigcache v1.2.1/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/andybalholm/brotli v1.0.0 h1:7UCwP93aiSfvWpapti8g88vVVGp2qqtGyePsSuDafo4= diff --git a/node/event_log_cache.go b/node/event_log_cache.go new file mode 100644 index 0000000..e4bf50d --- /dev/null +++ b/node/event_log_cache.go @@ -0,0 +1,49 @@ +package node + +import ( + "github.com/VictoriaMetrics/fastcache" + "github.com/fxamacker/cbor/v2" +) + +const ( + // in megabytes + DefaultEventLogCacheCapacity = 32000000 +) + +type EventLogCache struct { + cache *fastcache.Cache +} + +func NewEventLogCache() *EventLogCache { + return &EventLogCache{ + cache: fastcache.New(DefaultEventLogCacheCapacity), + } +} + +func (elc *EventLogCache) Store(key string, event interface{}) error { + mRes, err := cbor.Marshal(event) + if err != nil { + return err + } + + elc.cache.SetBig([]byte(key), mRes) + + return nil +} + +func (elc *EventLogCache) Get(key string) (interface{}, error) { + var mData []byte + elc.cache.GetBig(mData, []byte(key)) + + var event interface{} + err := cbor.Unmarshal(mData, &event) + if err != nil { + return nil, err + } + + return event, nil +} + +func (elc *EventLogCache) Delete(key string) { + elc.cache.Del([]byte(key)) +} diff --git a/node/node.go b/node/node.go index e8b3c0c..672406d 100644 --- a/node/node.go +++ b/node/node.go @@ -62,6 +62,7 @@ type Node struct { Miner *consensus.Miner Beacon beacon.BeaconNetworks Wallet *wallet.LocalWallet + EventLogCache *EventLogCache } func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTime time.Duration) (*Node, error) { @@ -113,6 +114,9 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim } n.Wallet = wallet + eventLogCache := provideEventLogCache() + n.EventLogCache = eventLogCache + return n, nil } @@ -185,6 +189,11 @@ func (n *Node) subscribeOnEthContractsAsync(ctx context.Context) { select { case event := <-eventChan: { + err := n.EventLogCache.Store("request_"+event.RequestID.String(), event) + if err != nil { + logrus.Errorf("Failed to store new request event to event log cache: %v", err) + } + task, err := n.Miner.MineTask(ctx, event) if err != nil { logrus.Fatal("Failed to mine task, exiting... ", err) @@ -207,6 +216,10 @@ func (n *Node) subscribeOnEthContractsAsync(ctx context.Context) { }() } +func provideEventLogCache() *EventLogCache { + return NewEventLogCache() +} + func provideMiner(peerID peer.ID, ethAddress common.Address, beacon beacon.BeaconNetworks, ethClient *ethclient.EthereumClient, privateKey []byte) *consensus.Miner { return consensus.NewMiner(peerID, ethAddress, beacon, ethClient, privateKey) }