From 809c5f2a23cd36dd54dcf050a58095384704a4bc Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Wed, 2 Jun 2021 22:48:56 +0300 Subject: [PATCH] Fix mempool init in SyncManager --- blockchain/sync/sync_mgr.go | 3 ++- node/node.go | 33 ++++++++++++++++++++++----------- node/node_dep_providers.go | 8 ++++---- 3 files changed, 28 insertions(+), 16 deletions(-) diff --git a/blockchain/sync/sync_mgr.go b/blockchain/sync/sync_mgr.go index 7063415..3b6773d 100644 --- a/blockchain/sync/sync_mgr.go +++ b/blockchain/sync/sync_mgr.go @@ -43,10 +43,11 @@ type syncManager struct { rpcClient *gorpc.Client } -func NewSyncManager(bp *pool.BlockPool, p2pRPCClient *gorpc.Client, bootstrapPeer peer.ID) SyncManager { +func NewSyncManager(bp *pool.BlockPool, mp *pool.Mempool, p2pRPCClient *gorpc.Client, bootstrapPeer peer.ID) SyncManager { ctx, cancelFunc := context.WithCancel(context.Background()) return &syncManager{ blockpool: bp, + mempool: mp, ctx: ctx, ctxCancelFunc: cancelFunc, initialSyncCompleted: false, diff --git a/node/node.go b/node/node.go index 108ccd4..8f1ae21 100644 --- a/node/node.go +++ b/node/node.go @@ -9,6 +9,10 @@ import ( "os" "time" + types2 "github.com/Secured-Finance/dione/blockchain/types" + + "github.com/fxamacker/cbor/v2" + gorpc "github.com/libp2p/go-libp2p-gorpc" "github.com/Secured-Finance/dione/blockchain/pool" @@ -143,7 +147,7 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim logrus.Info("Block pool database has been successfully initialized!") // initialize mempool - mp, err := provideMemPool(c) + mp, err := provideMemPool() if err != nil { logrus.Fatalf("Failed to initialize mempool: %s", err.Error()) } @@ -163,7 +167,7 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim r := provideP2PRPCClient(lhost) // initialize sync manager - sm, err := provideSyncManager(bp, r, baddrs[0]) // FIXME here we just pick up first bootstrap in list + sm, err := provideSyncManager(bp, mp, r, baddrs[0]) // FIXME here we just pick up first bootstrap in list if err != nil { logrus.Fatal(err) } @@ -268,12 +272,7 @@ func (n *Node) subscribeOnEthContractsAsync(ctx context.Context) { select { case event := <-eventChan: { - err := n.Cache.Store("request_"+event.ReqID.String(), event) - if err != nil { - logrus.Errorf("Failed to store new request event to event log cache: %v", err) - } - - logrus.Info("Let's wait a little so that all nodes have time to receive the request and cache it") + logrus.Info("Let's wait a little so that all nodes have time to receive the request") time.Sleep(5 * time.Second) task, err := n.Miner.MineTask(context.TODO(), event) @@ -281,13 +280,25 @@ func (n *Node) subscribeOnEthContractsAsync(ctx context.Context) { logrus.Errorf("Failed to mine task: %v", err) } if task == nil { + logrus.Warnf("Task is nil!") continue } - logrus.Infof("Proposed new Dione task with ID: %s", event.ReqID.String()) - err = n.ConsensusManager.Propose(*task) + payload, err := cbor.Marshal(task) if err != nil { - logrus.Errorf("Failed to propose task: %v", err) + logrus.Errorf("Failed to marshal request event") + continue } + tx := types2.CreateTransaction(payload) + err = n.MemPool.StoreTx(tx) + if err != nil { + logrus.Errorf("Failed to store tx in mempool: %s", err.Error()) + continue + } + //logrus.Infof("Proposed new Dione task with ID: %s", event.ReqID.String()) + //err = n.ConsensusManager.Propose(*task) + //if err != nil { + // logrus.Errorf("Failed to propose task: %v", err) + //} } case <-ctx.Done(): break EventLoop diff --git a/node/node_dep_providers.go b/node/node_dep_providers.go index e34b60b..f52b55c 100644 --- a/node/node_dep_providers.go +++ b/node/node_dep_providers.go @@ -155,16 +155,16 @@ func provideBlockPool(config *config.Config) (*pool.BlockPool, error) { return pool.NewBlockPool(config.Blockchain.DatabasePath) } -func provideMemPool(c cache.Cache) (*pool.Mempool, error) { - return pool.NewMempool(c) +func provideMemPool() (*pool.Mempool, error) { + return pool.NewMempool() } -func provideSyncManager(bp *pool.BlockPool, r *gorpc.Client, bootstrap multiaddr.Multiaddr) (sync.SyncManager, error) { +func provideSyncManager(bp *pool.BlockPool, mp *pool.Mempool, r *gorpc.Client, bootstrap multiaddr.Multiaddr) (sync.SyncManager, error) { addr, err := peer.AddrInfoFromP2pAddr(bootstrap) if err != nil { return nil, err } - return sync.NewSyncManager(bp, r, addr.ID), nil + return sync.NewSyncManager(bp, mp, r, addr.ID), nil } func provideP2PRPCClient(h host.Host) *gorpc.Client {