diff --git a/beacon/beacon.go b/beacon/beacon.go index 9773573..fa4e57b 100644 --- a/beacon/beacon.go +++ b/beacon/beacon.go @@ -36,7 +36,6 @@ type BeaconNetwork struct { type BeaconAPI interface { Entry(context.Context, uint64) (types.BeaconEntry, error) VerifyEntry(types.BeaconEntry, types.BeaconEntry) error - NewEntries() <-chan types.BeaconEntry LatestBeaconRound() uint64 } diff --git a/beacon/drand/drand.go b/beacon/drand/drand.go index a51aff5..b555ed6 100644 --- a/beacon/drand/drand.go +++ b/beacon/drand/drand.go @@ -6,7 +6,7 @@ import ( "fmt" "sync" - "github.com/Arceliar/phony" + "github.com/asaskevich/EventBus" "github.com/Secured-Finance/dione/beacon" "github.com/drand/drand/chain" @@ -32,17 +32,16 @@ var log = logrus.WithFields(logrus.Fields{ }) type DrandBeacon struct { - phony.Inbox DrandClient client.Client PublicKey kyber.Point drandResultChannel <-chan client.Result - beaconEntryChannel chan types.BeaconEntry cacheLock sync.Mutex localCache map[uint64]types.BeaconEntry latestDrandRound uint64 + bus EventBus.Bus } -func NewDrandBeacon(ps *pubsub.PubSub) (*DrandBeacon, error) { +func NewDrandBeacon(ps *pubsub.PubSub, bus EventBus.Bus) (*DrandBeacon, error) { cfg := config.NewDrandConfig() drandChain, err := chain.InfoFromJSON(bytes.NewReader([]byte(cfg.ChainInfo))) @@ -83,12 +82,12 @@ func NewDrandBeacon(ps *pubsub.PubSub) (*DrandBeacon, error) { db := &DrandBeacon{ DrandClient: drandClient, localCache: make(map[uint64]types.BeaconEntry), + bus: bus, } db.PublicKey = drandChain.PublicKey db.drandResultChannel = db.DrandClient.Watch(context.TODO()) - db.beaconEntryChannel = make(chan types.BeaconEntry) err = db.getLatestDrandResult() if err != nil { return nil, err @@ -120,7 +119,7 @@ func (db *DrandBeacon) loop(ctx context.Context) { { db.cacheValue(newBeaconEntryFromDrandResult(res)) db.updateLatestDrandRound(res.Round()) - db.newEntry(res) + db.bus.Publish("beacon:newEntry", types.NewBeaconEntry(res.Round(), res.Randomness(), map[string]interface{}{"signature": res.Signature()})) } } } @@ -186,16 +185,6 @@ func (db *DrandBeacon) LatestBeaconRound() uint64 { return db.latestDrandRound } -func (db *DrandBeacon) newEntry(res client.Result) { - db.Act(nil, func() { - db.beaconEntryChannel <- types.NewBeaconEntry(res.Round(), res.Randomness(), map[string]interface{}{"signature": res.Signature()}) - }) -} - -func (db *DrandBeacon) NewEntries() <-chan types.BeaconEntry { - return db.beaconEntryChannel -} - func newBeaconEntryFromDrandResult(res client.Result) types.BeaconEntry { return types.NewBeaconEntry(res.Round(), res.Randomness(), map[string]interface{}{"signature": res.Signature()}) } diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index a46c39b..6d5874c 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -6,6 +6,8 @@ import ( "errors" "os" + "github.com/asaskevich/EventBus" + "github.com/Secured-Finance/dione/blockchain/utils" types2 "github.com/Secured-Finance/dione/blockchain/types" @@ -31,10 +33,13 @@ type BlockChain struct { db lmdb.DBI metadataIndex *utils.Index heightIndex *utils.Index + bus EventBus.Bus } -func NewBlockChain(path string) (*BlockChain, error) { - chain := &BlockChain{} +func NewBlockChain(path string, bus EventBus.Bus) (*BlockChain, error) { + chain := &BlockChain{ + bus: bus, + } // configure lmdb env env, err := lmdb.NewEnv() @@ -84,7 +89,11 @@ func NewBlockChain(path string) (*BlockChain, error) { } func (bp *BlockChain) setLatestBlockHeight(height uint64) error { - return bp.metadataIndex.PutUint64([]byte(LatestBlockHeightKey), height) + err := bp.metadataIndex.PutUint64([]byte(LatestBlockHeightKey), height) + if err != nil { + return err + } + return nil } func (bp *BlockChain) GetLatestBlockHeight() (uint64, error) { @@ -134,17 +143,17 @@ func (bp *BlockChain) StoreBlock(block *types2.Block) error { return err } - if err == ErrLatestHeightNil { + if err == ErrLatestHeightNil || block.Header.Height > height { if err = bp.setLatestBlockHeight(block.Header.Height); err != nil { return err } - } else { - if block.Header.Height > height { - if err = bp.setLatestBlockHeight(block.Header.Height); err != nil { - return err - } + } else if block.Header.Height > height { + if err = bp.setLatestBlockHeight(block.Header.Height); err != nil { + return err } + bp.bus.Publish("blockchain:latestBlockHeightUpdated", block) } + bp.bus.Publish("blockchain:blockCommitted", block) return nil } diff --git a/blockchain/pool/blockpool.go b/blockchain/pool/blockpool.go index ae49e52..23246ce 100644 --- a/blockchain/pool/blockpool.go +++ b/blockchain/pool/blockpool.go @@ -3,6 +3,9 @@ package pool import ( "bytes" "encoding/hex" + "time" + + "github.com/asaskevich/EventBus" "github.com/sirupsen/logrus" @@ -15,20 +18,27 @@ type BlockPool struct { mempool *Mempool knownBlocks cache.Cache acceptedBlocks cache.Cache + bus EventBus.Bus } -func NewBlockPool(mp *Mempool) (*BlockPool, error) { +func NewBlockPool(mp *Mempool, bus EventBus.Bus) (*BlockPool, error) { bp := &BlockPool{ acceptedBlocks: cache.NewInMemoryCache(), // here we need to use separate cache knownBlocks: cache.NewInMemoryCache(), mempool: mp, + bus: bus, } return bp, nil } func (bp *BlockPool) AddBlock(block *types.Block) error { - return bp.knownBlocks.Store(hex.EncodeToString(block.Header.Hash), block) + err := bp.knownBlocks.StoreWithTTL(hex.EncodeToString(block.Header.Hash), block, 10*time.Minute) + if err != nil { + return err + } + bp.bus.Publish("blockpool:knownBlockAdded", block) + return nil } func (bp *BlockPool) GetBlock(blockhash []byte) (*types.Block, error) { @@ -42,10 +52,16 @@ func (bp *BlockPool) PruneBlocks() { for k := range bp.knownBlocks.Items() { bp.knownBlocks.Delete(k) } + bp.bus.Publish("blockpool:pruned") } func (bp *BlockPool) AddAcceptedBlock(block *types.Block) error { - return bp.acceptedBlocks.Store(hex.EncodeToString(block.Header.Hash), block) + err := bp.acceptedBlocks.Store(hex.EncodeToString(block.Header.Hash), block) + if err != nil { + return err + } + bp.bus.Publish("blockpool:acceptedBlockAdded", block) + return nil } func (bp *BlockPool) GetAllAcceptedBlocks() []*types.Block { diff --git a/blockchain/pool/mempool.go b/blockchain/pool/mempool.go index c94a99c..ce1753d 100644 --- a/blockchain/pool/mempool.go +++ b/blockchain/pool/mempool.go @@ -6,6 +6,8 @@ import ( "sort" "time" + "github.com/asaskevich/EventBus" + "github.com/sirupsen/logrus" types2 "github.com/Secured-Finance/dione/blockchain/types" @@ -26,11 +28,13 @@ var ( type Mempool struct { cache cache.Cache + bus EventBus.Bus } -func NewMempool() (*Mempool, error) { +func NewMempool(bus EventBus.Bus) (*Mempool, error) { mp := &Mempool{ cache: cache.NewInMemoryCache(), // here we need to use separate cache + bus: bus, } return mp, nil @@ -40,13 +44,21 @@ func (mp *Mempool) StoreTx(tx *types2.Transaction) error { hashStr := hex.EncodeToString(tx.Hash) err := mp.cache.StoreWithTTL(DefaultTxPrefix+hashStr, tx, DefaultTxTTL) logrus.Infof("Submitted new transaction in mempool with hash %x", tx.Hash) + mp.bus.Publish("mempool:transactionAdded", tx) return err } -func (mp *Mempool) DeleteTx(txHash []byte) { +func (mp *Mempool) DeleteTx(txHash []byte) error { hashStr := hex.EncodeToString(txHash) + var tx types2.Transaction + err := mp.cache.Get(DefaultTxPrefix+hashStr, &tx) + if err != nil { + return err + } mp.cache.Delete(DefaultTxPrefix + hashStr) logrus.Debugf("Deleted transaction from mempool %x", txHash) + mp.bus.Publish("mempool:transactionRemoved", tx) + return nil } func (mp *Mempool) GetTransactionsForNewBlock() []*types2.Transaction { diff --git a/consensus/consensus.go b/consensus/consensus.go index 86cbb02..a1c9b5b 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -1,6 +1,7 @@ package consensus import ( + "encoding/hex" "errors" "math/big" "sync" @@ -15,8 +16,6 @@ import ( "github.com/Secured-Finance/dione/blockchain" - "github.com/Arceliar/phony" - types3 "github.com/Secured-Finance/dione/blockchain/types" "github.com/libp2p/go-libp2p-core/crypto" @@ -47,7 +46,6 @@ const ( ) type PBFTConsensusManager struct { - phony.Inbox bus EventBus.Bus psb *pubsub.PubSubRouter minApprovals int // FIXME @@ -102,21 +100,11 @@ func NewPBFTConsensusManager( pcm.psb.Hook(pubsub.PrePrepareMessageType, pcm.handlePrePrepare) pcm.psb.Hook(pubsub.PrepareMessageType, pcm.handlePrepare) pcm.psb.Hook(pubsub.CommitMessageType, pcm.handleCommit) - //bus.SubscribeOnce("sync:initialSyncCompleted", func() { - // pcm.state.ready = true - //}) + bus.SubscribeAsync("beacon:newEntry", func(entry types2.BeaconEntry) { + pcm.onNewBeaconEntry(entry) + }, true) height, _ := pcm.blockchain.GetLatestBlockHeight() pcm.state.blockHeight = height + 1 - go func() { - for { - select { - case e := <-b.Beacon.NewEntries(): - { - pcm.NewDrandRound(nil, e) - } - } - } - }() return pcm } @@ -263,94 +251,105 @@ func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.PubSubMessage) { } } -func (pcm *PBFTConsensusManager) NewDrandRound(from phony.Actor, entry types2.BeaconEntry) { - pcm.Act(from, func() { - pcm.state.mutex.Lock() - defer pcm.state.mutex.Unlock() - block, err := pcm.commitAcceptedBlocks() +func (pcm *PBFTConsensusManager) onNewBeaconEntry(entry types2.BeaconEntry) { + block, err := pcm.commitAcceptedBlocks() + if err != nil { + if errors.Is(err, ErrNoAcceptedBlocks) { + logrus.WithFields(logrus.Fields{ + "round": pcm.state.blockHeight, + }).Infof("No accepted blocks in the current consensus round") + } else { + logrus.Errorf("Failed to select the block in consensus round %d: %s", pcm.state.blockHeight, err.Error()) + return + } + } + + if block != nil { + // broadcast new block + var newBlockMessage pubsub.PubSubMessage + newBlockMessage.Type = pubsub.NewBlockMessageType + blockSerialized, err := cbor.Marshal(block) if err != nil { - if errors.Is(err, ErrNoAcceptedBlocks) { - logrus.Infof("No accepted blocks for consensus round %d", pcm.state.blockHeight) - } else { - logrus.Errorf("Failed to select the block in consensus round %d: %s", pcm.state.blockHeight, err.Error()) - return - } + logrus.Errorf("Failed to serialize block %x for broadcasting!", block.Header.Hash) + } else { + newBlockMessage.Payload = blockSerialized + pcm.psb.BroadcastToServiceTopic(&newBlockMessage) } - if block != nil { - // broadcast new block - var newBlockMessage pubsub.PubSubMessage - newBlockMessage.Type = pubsub.NewBlockMessageType - blockSerialized, err := cbor.Marshal(block) - if err != nil { - logrus.Errorf("Failed to serialize block %x for broadcasting!", block.Header.Hash) - } else { - newBlockMessage.Payload = blockSerialized - pcm.psb.BroadcastToServiceTopic(&newBlockMessage) - } - - // if we are miner for this block - // then post dione tasks to target chains (currently, only Ethereum) - if *block.Header.Proposer == pcm.miner.address { - for _, v := range block.Data { - var task types2.DioneTask - err := cbor.Unmarshal(v.Data, &task) - if err != nil { - logrus.Errorf("Failed to unmarshal transaction %x payload: %s", v.Hash, err.Error()) - continue // FIXME - } - reqIDNumber, ok := big.NewInt(0).SetString(task.RequestID, 10) - if !ok { - logrus.Errorf("Failed to parse request id number in task of tx %x", v.Hash) - continue // FIXME - } - - err = pcm.ethereumClient.SubmitRequestAnswer(reqIDNumber, task.Payload) - if err != nil { - logrus.Errorf("Failed to submit task in tx %x: %s", v.Hash, err.Error()) - continue // FIXME - } + // if we are miner of this block + // then post dione tasks to target chains (currently, only Ethereum) + if block.Header.Proposer.String() == pcm.miner.address.String() { + for _, tx := range block.Data { + var task types2.DioneTask + err := cbor.Unmarshal(tx.Data, &task) + if err != nil { + logrus.WithFields(logrus.Fields{ + "err": err.Error(), + "txHash": hex.EncodeToString(tx.Hash), + }).Error("Failed to unmarshal transaction payload") + continue // FIXME + } + reqIDNumber, ok := big.NewInt(0).SetString(task.RequestID, 10) + if !ok { + logrus.WithFields(logrus.Fields{ + "txHash": hex.EncodeToString(tx.Hash), + }).Error("Failed to parse request id number in Dione task") + continue // FIXME } - } - pcm.state.blockHeight = pcm.state.blockHeight + 1 - } - - // get latest block - height, err := pcm.blockchain.GetLatestBlockHeight() - if err != nil { - logrus.Error(err) - return - } - blockHeader, err := pcm.blockchain.FetchBlockHeaderByHeight(height) - if err != nil { - logrus.Error(err) - return - } - - pcm.state.drandRound = entry.Round - pcm.state.randomness = entry.Data - - minedBlock, err := pcm.miner.MineBlock(entry.Data, entry.Round, blockHeader) - if err != nil { - if errors.Is(err, ErrNoTxForBlock) { - logrus.Info("Skipping consensus round, because we don't have transactions in mempool for including into block") - } else { - logrus.Errorf("Failed to mine the block: %s", err.Error()) - } - return - } - - // if we are round winner - if minedBlock != nil { - logrus.Infof("We are elected in consensus round %d", pcm.state.blockHeight) - err = pcm.propose(minedBlock) - if err != nil { - logrus.Errorf("Failed to propose the block: %s", err.Error()) - return + err = pcm.ethereumClient.SubmitRequestAnswer(reqIDNumber, task.Payload) + if err != nil { + logrus.WithFields(logrus.Fields{ + "err": err.Error(), + "txHash": hex.EncodeToString(tx.Hash), + "reqID": reqIDNumber.String(), + }).Error("Failed to submit task to ETH chain") + continue // FIXME + } + logrus.WithFields(logrus.Fields{ + "txHash": hex.EncodeToString(tx.Hash), + "reqID": reqIDNumber.String(), + }).Debug("Dione task has been sucessfully submitted to ETH chain (DioneOracle contract)") } } - }) + + pcm.state.blockHeight = pcm.state.blockHeight + 1 + } + + // get latest block + height, err := pcm.blockchain.GetLatestBlockHeight() + if err != nil { + logrus.Error(err) + return + } + blockHeader, err := pcm.blockchain.FetchBlockHeaderByHeight(height) + if err != nil { + logrus.Error(err) + return + } + + pcm.state.drandRound = entry.Round + pcm.state.randomness = entry.Data + + minedBlock, err := pcm.miner.MineBlock(entry.Data, entry.Round, blockHeader) + if err != nil { + if errors.Is(err, ErrNoTxForBlock) { + logrus.Info("Sealing skipped, no transactions in mempool") + } else { + logrus.Errorf("Failed to mine the block: %s", err.Error()) + } + return + } + + // if we are round winner + if minedBlock != nil { + logrus.WithField("round", pcm.state.blockHeight).Infof("We are elected in consensus round") + err = pcm.propose(minedBlock) + if err != nil { + logrus.Errorf("Failed to propose the block: %s", err.Error()) + return + } + } } func (pcm *PBFTConsensusManager) commitAcceptedBlocks() (*types3.Block, error) { @@ -374,10 +373,21 @@ func (pcm *PBFTConsensusManager) commitAcceptedBlocks() (*types3.Block, error) { maxStake = stake selectedBlock = v } - logrus.Infof("Committed block %x with height %d of miner %s", selectedBlock.Header.Hash, selectedBlock.Header.Height, selectedBlock.Header.Proposer.String()) + logrus.WithFields(logrus.Fields{ + "hash": hex.EncodeToString(selectedBlock.Header.Hash), + "height": selectedBlock.Header.Height, + "miner": selectedBlock.Header.Proposer.String(), + }).Info("Committed new block") pcm.blockPool.PruneAcceptedBlocks(selectedBlock) for _, v := range selectedBlock.Data { - pcm.mempool.DeleteTx(v.Hash) + err := pcm.mempool.DeleteTx(v.Hash) + if err != nil { + logrus.WithFields(logrus.Fields{ + "err": err.Error(), + "tx": hex.EncodeToString(v.Hash), + }).Errorf("Failed to delete committed tx from mempool") + continue + } } return selectedBlock, pcm.blockchain.StoreBlock(selectedBlock) } diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index 90145e7..42bf214 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -166,7 +166,10 @@ func NewConsensusValidator(miner *Miner, bc *blockchain.BlockChain, b beacon.Bea return } } else { - logrus.Debugf("Origin chain [%v]/request type[%v] doesn't have any payload validation!", task.OriginChain, task.RequestType) + logrus.WithFields(logrus.Fields{ + "originChain": task.OriginChain, + "requestType": task.RequestType, + }).Debug("This origin chain/request type doesn't have any payload validation!") } }(v, result) } diff --git a/consensus/miner.go b/consensus/miner.go index 5b6891f..ebbb982 100644 --- a/consensus/miner.go +++ b/consensus/miner.go @@ -90,7 +90,7 @@ func (m *Miner) GetStakeInfo(miner common.Address) (*big.Int, *big.Int, error) { } func (m *Miner) MineBlock(randomness []byte, randomnessRound uint64, lastBlockHeader *types2.BlockHeader) (*types2.Block, error) { - logrus.Debug("attempting to mine the block at epoch: ", lastBlockHeader.Height+1) + logrus.WithField("height", lastBlockHeader.Height+1).Debug("Trying to mine new block...") if err := m.UpdateCurrentStakeInfo(); err != nil { return nil, fmt.Errorf("failed to update miner stake: %w", err) diff --git a/node/node.go b/node/node.go index 212b4c2..747053d 100644 --- a/node/node.go +++ b/node/node.go @@ -93,7 +93,13 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim logrus.Fatal(err) } n.Host = lhost - logrus.Info("Libp2p host has been successfully initialized!") + logrus.WithField( + "multiaddress", + fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", + n.Config.ListenAddr, + n.Config.ListenPort, + n.Host.ID().Pretty(), + )).Info("Libp2p host has been initialized!") // initialize ethereum client ethClient, err := provideEthereumClient(n.Config) @@ -101,14 +107,14 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim logrus.Fatal(err) } n.Ethereum = ethClient - logrus.Info("Ethereum client has been successfully initialized!") + logrus.WithField("ethAddress", ethClient.GetEthAddress().Hex()).Info("Ethereum client has been initialized!") // initialize blockchain rpc clients err = n.setupRPCClients() if err != nil { logrus.Fatal(err) } - logrus.Info("RPC clients has been successfully configured!") + logrus.Info("Foreign Blockchain RPC clients has been successfully configured!") // initialize pubsub subsystem psb := providePubsubRouter(lhost, n.Config) @@ -137,7 +143,7 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim // == initialize blockchain modules // initialize blockpool database - bc, err := provideBlockChain(n.Config) + bc, err := provideBlockChain(n.Config, bus) if err != nil { logrus.Fatalf("Failed to initialize blockpool: %s", err.Error()) } @@ -145,14 +151,14 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim logrus.Info("Block pool database has been successfully initialized!") // initialize mempool - mp, err := provideMemPool() + mp, err := provideMemPool(bus) if err != nil { logrus.Fatalf("Failed to initialize mempool: %s", err.Error()) } n.MemPool = mp logrus.Info("Mempool has been successfully initialized!") - bp, err := provideBlockPool(mp) + bp, err := provideBlockPool(mp, bus) if err != nil { logrus.Fatalf("Failed to initialize blockpool: %s", err.Error()) } @@ -166,7 +172,7 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim if err != nil { logrus.Fatal(err) } - logrus.Info("Node p2p RPC network service has been successfully initialized!") + logrus.Info("Direct RPC has been successfully initialized!") // initialize libp2p-gorpc client r := provideP2PRPCClient(lhost) @@ -184,7 +190,7 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim logrus.Fatal(err) } n.SyncManager = sm - logrus.Info("Blockchain synchronization subsystem has been successfully initialized!") + logrus.Info("Blockchain sync subsystem has been successfully initialized!") // initialize mining subsystem miner := provideMiner(n.Host.ID(), *n.Ethereum.GetEthAddress(), n.Ethereum, prvKey, mp) @@ -192,7 +198,7 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim logrus.Info("Mining subsystem has been initialized!") // initialize random beacon network subsystem - randomBeaconNetwork, err := provideBeacon(psb.Pubsub) + randomBeaconNetwork, err := provideBeacon(psb.Pubsub, bus) if err != nil { logrus.Fatal(err) } @@ -238,8 +244,6 @@ func (n *Node) Run(ctx context.Context) error { } func (n *Node) runLibp2pAsync(ctx context.Context) error { - logrus.Info(fmt.Sprintf("[*] Your Multiaddress Is: /ip4/%s/tcp/%d/p2p/%s", n.Config.ListenAddr, n.Config.ListenPort, n.Host.ID().Pretty())) - logrus.Info("Announcing ourselves...") _, err := n.PeerDiscovery.Advertise(context.TODO(), n.Config.Rendezvous) if err != nil { @@ -267,12 +271,15 @@ func (n *Node) runLibp2pAsync(ctx context.Context) error { if newPeer.ID.String() == n.Host.ID().String() { continue } - logrus.Infof("Found peer: %s", newPeer) + logrus.WithField("peer", newPeer.ID).Info("Discovered new peer, connecting...") // Connect to the peer if err := n.Host.Connect(ctx, newPeer); err != nil { - logrus.Warn("Connection failed: ", err) + logrus.WithFields(logrus.Fields{ + "peer": newPeer.ID, + "err": err.Error(), + }).Warn("Connection with newly discovered peer has been failed") } - logrus.Info("Connected to newly discovered peer: ", newPeer) + logrus.WithField("peer", newPeer.ID).Info("Connected to newly discovered peer") } } } @@ -348,6 +355,7 @@ func (n *Node) setupRPCClients() error { func Start() { logrus.SetReportCaller(true) logrus.SetFormatter(&logrus.TextFormatter{ + FullTimestamp: true, CallerPrettyfier: func(f *runtime.Frame) (string, string) { filename := path.Base(f.File) return "", fmt.Sprintf("%s:%d:", filename, f.Line) diff --git a/node/node_dep_providers.go b/node/node_dep_providers.go index 124b97d..9a066c7 100644 --- a/node/node_dep_providers.go +++ b/node/node_dep_providers.go @@ -64,8 +64,8 @@ func provideMiner(peerID peer.ID, ethAddress common.Address, ethClient *ethclien return consensus.NewMiner(peerID, ethAddress, ethClient, privateKey, mempool) } -func provideBeacon(ps *pubsub2.PubSub) (beacon.BeaconNetwork, error) { - bc, err := drand2.NewDrandBeacon(ps) +func provideBeacon(ps *pubsub2.PubSub, bus EventBus.Bus) (beacon.BeaconNetwork, error) { + bc, err := drand2.NewDrandBeacon(ps, bus) if err != nil { return beacon.BeaconNetwork{}, fmt.Errorf("failed to setup drand beacon: %w", err) } @@ -176,12 +176,12 @@ func providePeerDiscovery(baddrs []multiaddr.Multiaddr, h host.Host, pexDiscover return pexDiscovery, nil } -func provideBlockChain(config *config.Config) (*blockchain.BlockChain, error) { - return blockchain.NewBlockChain(config.Blockchain.DatabasePath) +func provideBlockChain(config *config.Config, bus EventBus.Bus) (*blockchain.BlockChain, error) { + return blockchain.NewBlockChain(config.Blockchain.DatabasePath, bus) } -func provideMemPool() (*pool.Mempool, error) { - return pool.NewMempool() +func provideMemPool(bus EventBus.Bus) (*pool.Mempool, error) { + return pool.NewMempool(bus) } func provideSyncManager(bus EventBus.Bus, bp *blockchain.BlockChain, mp *pool.Mempool, r *gorpc.Client, bootstrap multiaddr.Multiaddr, psb *pubsub.PubSubRouter) (sync.SyncManager, error) { @@ -205,6 +205,6 @@ func provideNetworkService(bp *blockchain.BlockChain, mp *pool.Mempool) *Network return NewNetworkService(bp, mp) } -func provideBlockPool(mp *pool.Mempool) (*pool.BlockPool, error) { - return pool.NewBlockPool(mp) +func provideBlockPool(mp *pool.Mempool, bus EventBus.Bus) (*pool.BlockPool, error) { + return pool.NewBlockPool(mp, bus) }