Fix mempool init in SyncManager

This commit is contained in:
ChronosX88 2021-06-02 22:48:56 +03:00
parent 2afe85853f
commit 809c5f2a23
Signed by: ChronosXYZ
GPG Key ID: 085A69A82C8C511A
3 changed files with 28 additions and 16 deletions

View File

@ -43,10 +43,11 @@ type syncManager struct {
rpcClient *gorpc.Client rpcClient *gorpc.Client
} }
func NewSyncManager(bp *pool.BlockPool, p2pRPCClient *gorpc.Client, bootstrapPeer peer.ID) SyncManager { func NewSyncManager(bp *pool.BlockPool, mp *pool.Mempool, p2pRPCClient *gorpc.Client, bootstrapPeer peer.ID) SyncManager {
ctx, cancelFunc := context.WithCancel(context.Background()) ctx, cancelFunc := context.WithCancel(context.Background())
return &syncManager{ return &syncManager{
blockpool: bp, blockpool: bp,
mempool: mp,
ctx: ctx, ctx: ctx,
ctxCancelFunc: cancelFunc, ctxCancelFunc: cancelFunc,
initialSyncCompleted: false, initialSyncCompleted: false,

View File

@ -9,6 +9,10 @@ import (
"os" "os"
"time" "time"
types2 "github.com/Secured-Finance/dione/blockchain/types"
"github.com/fxamacker/cbor/v2"
gorpc "github.com/libp2p/go-libp2p-gorpc" gorpc "github.com/libp2p/go-libp2p-gorpc"
"github.com/Secured-Finance/dione/blockchain/pool" "github.com/Secured-Finance/dione/blockchain/pool"
@ -143,7 +147,7 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim
logrus.Info("Block pool database has been successfully initialized!") logrus.Info("Block pool database has been successfully initialized!")
// initialize mempool // initialize mempool
mp, err := provideMemPool(c) mp, err := provideMemPool()
if err != nil { if err != nil {
logrus.Fatalf("Failed to initialize mempool: %s", err.Error()) logrus.Fatalf("Failed to initialize mempool: %s", err.Error())
} }
@ -163,7 +167,7 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim
r := provideP2PRPCClient(lhost) r := provideP2PRPCClient(lhost)
// initialize sync manager // initialize sync manager
sm, err := provideSyncManager(bp, r, baddrs[0]) // FIXME here we just pick up first bootstrap in list sm, err := provideSyncManager(bp, mp, r, baddrs[0]) // FIXME here we just pick up first bootstrap in list
if err != nil { if err != nil {
logrus.Fatal(err) logrus.Fatal(err)
} }
@ -268,12 +272,7 @@ func (n *Node) subscribeOnEthContractsAsync(ctx context.Context) {
select { select {
case event := <-eventChan: case event := <-eventChan:
{ {
err := n.Cache.Store("request_"+event.ReqID.String(), event) logrus.Info("Let's wait a little so that all nodes have time to receive the request")
if err != nil {
logrus.Errorf("Failed to store new request event to event log cache: %v", err)
}
logrus.Info("Let's wait a little so that all nodes have time to receive the request and cache it")
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
task, err := n.Miner.MineTask(context.TODO(), event) task, err := n.Miner.MineTask(context.TODO(), event)
@ -281,13 +280,25 @@ func (n *Node) subscribeOnEthContractsAsync(ctx context.Context) {
logrus.Errorf("Failed to mine task: %v", err) logrus.Errorf("Failed to mine task: %v", err)
} }
if task == nil { if task == nil {
logrus.Warnf("Task is nil!")
continue continue
} }
logrus.Infof("Proposed new Dione task with ID: %s", event.ReqID.String()) payload, err := cbor.Marshal(task)
err = n.ConsensusManager.Propose(*task)
if err != nil { if err != nil {
logrus.Errorf("Failed to propose task: %v", err) logrus.Errorf("Failed to marshal request event")
continue
} }
tx := types2.CreateTransaction(payload)
err = n.MemPool.StoreTx(tx)
if err != nil {
logrus.Errorf("Failed to store tx in mempool: %s", err.Error())
continue
}
//logrus.Infof("Proposed new Dione task with ID: %s", event.ReqID.String())
//err = n.ConsensusManager.Propose(*task)
//if err != nil {
// logrus.Errorf("Failed to propose task: %v", err)
//}
} }
case <-ctx.Done(): case <-ctx.Done():
break EventLoop break EventLoop

View File

@ -155,16 +155,16 @@ func provideBlockPool(config *config.Config) (*pool.BlockPool, error) {
return pool.NewBlockPool(config.Blockchain.DatabasePath) return pool.NewBlockPool(config.Blockchain.DatabasePath)
} }
func provideMemPool(c cache.Cache) (*pool.Mempool, error) { func provideMemPool() (*pool.Mempool, error) {
return pool.NewMempool(c) return pool.NewMempool()
} }
func provideSyncManager(bp *pool.BlockPool, r *gorpc.Client, bootstrap multiaddr.Multiaddr) (sync.SyncManager, error) { func provideSyncManager(bp *pool.BlockPool, mp *pool.Mempool, r *gorpc.Client, bootstrap multiaddr.Multiaddr) (sync.SyncManager, error) {
addr, err := peer.AddrInfoFromP2pAddr(bootstrap) addr, err := peer.AddrInfoFromP2pAddr(bootstrap)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return sync.NewSyncManager(bp, r, addr.ID), nil return sync.NewSyncManager(bp, mp, r, addr.ID), nil
} }
func provideP2PRPCClient(h host.Host) *gorpc.Client { func provideP2PRPCClient(h host.Host) *gorpc.Client {