Implement event bus for cross-communication between Dione components, overhaul logging part

This commit is contained in:
ChronosX88 2021-07-15 23:51:22 +03:00
parent d7a1e87939
commit 13951d5c32
Signed by: ChronosXYZ
GPG Key ID: 085A69A82C8C511A
10 changed files with 200 additions and 154 deletions

View File

@ -36,7 +36,6 @@ type BeaconNetwork struct {
type BeaconAPI interface { type BeaconAPI interface {
Entry(context.Context, uint64) (types.BeaconEntry, error) Entry(context.Context, uint64) (types.BeaconEntry, error)
VerifyEntry(types.BeaconEntry, types.BeaconEntry) error VerifyEntry(types.BeaconEntry, types.BeaconEntry) error
NewEntries() <-chan types.BeaconEntry
LatestBeaconRound() uint64 LatestBeaconRound() uint64
} }

View File

@ -6,7 +6,7 @@ import (
"fmt" "fmt"
"sync" "sync"
"github.com/Arceliar/phony" "github.com/asaskevich/EventBus"
"github.com/Secured-Finance/dione/beacon" "github.com/Secured-Finance/dione/beacon"
"github.com/drand/drand/chain" "github.com/drand/drand/chain"
@ -32,17 +32,16 @@ var log = logrus.WithFields(logrus.Fields{
}) })
type DrandBeacon struct { type DrandBeacon struct {
phony.Inbox
DrandClient client.Client DrandClient client.Client
PublicKey kyber.Point PublicKey kyber.Point
drandResultChannel <-chan client.Result drandResultChannel <-chan client.Result
beaconEntryChannel chan types.BeaconEntry
cacheLock sync.Mutex cacheLock sync.Mutex
localCache map[uint64]types.BeaconEntry localCache map[uint64]types.BeaconEntry
latestDrandRound uint64 latestDrandRound uint64
bus EventBus.Bus
} }
func NewDrandBeacon(ps *pubsub.PubSub) (*DrandBeacon, error) { func NewDrandBeacon(ps *pubsub.PubSub, bus EventBus.Bus) (*DrandBeacon, error) {
cfg := config.NewDrandConfig() cfg := config.NewDrandConfig()
drandChain, err := chain.InfoFromJSON(bytes.NewReader([]byte(cfg.ChainInfo))) drandChain, err := chain.InfoFromJSON(bytes.NewReader([]byte(cfg.ChainInfo)))
@ -83,12 +82,12 @@ func NewDrandBeacon(ps *pubsub.PubSub) (*DrandBeacon, error) {
db := &DrandBeacon{ db := &DrandBeacon{
DrandClient: drandClient, DrandClient: drandClient,
localCache: make(map[uint64]types.BeaconEntry), localCache: make(map[uint64]types.BeaconEntry),
bus: bus,
} }
db.PublicKey = drandChain.PublicKey db.PublicKey = drandChain.PublicKey
db.drandResultChannel = db.DrandClient.Watch(context.TODO()) db.drandResultChannel = db.DrandClient.Watch(context.TODO())
db.beaconEntryChannel = make(chan types.BeaconEntry)
err = db.getLatestDrandResult() err = db.getLatestDrandResult()
if err != nil { if err != nil {
return nil, err return nil, err
@ -120,7 +119,7 @@ func (db *DrandBeacon) loop(ctx context.Context) {
{ {
db.cacheValue(newBeaconEntryFromDrandResult(res)) db.cacheValue(newBeaconEntryFromDrandResult(res))
db.updateLatestDrandRound(res.Round()) db.updateLatestDrandRound(res.Round())
db.newEntry(res) db.bus.Publish("beacon:newEntry", types.NewBeaconEntry(res.Round(), res.Randomness(), map[string]interface{}{"signature": res.Signature()}))
} }
} }
} }
@ -186,16 +185,6 @@ func (db *DrandBeacon) LatestBeaconRound() uint64 {
return db.latestDrandRound return db.latestDrandRound
} }
func (db *DrandBeacon) newEntry(res client.Result) {
db.Act(nil, func() {
db.beaconEntryChannel <- types.NewBeaconEntry(res.Round(), res.Randomness(), map[string]interface{}{"signature": res.Signature()})
})
}
func (db *DrandBeacon) NewEntries() <-chan types.BeaconEntry {
return db.beaconEntryChannel
}
func newBeaconEntryFromDrandResult(res client.Result) types.BeaconEntry { func newBeaconEntryFromDrandResult(res client.Result) types.BeaconEntry {
return types.NewBeaconEntry(res.Round(), res.Randomness(), map[string]interface{}{"signature": res.Signature()}) return types.NewBeaconEntry(res.Round(), res.Randomness(), map[string]interface{}{"signature": res.Signature()})
} }

View File

@ -6,6 +6,8 @@ import (
"errors" "errors"
"os" "os"
"github.com/asaskevich/EventBus"
"github.com/Secured-Finance/dione/blockchain/utils" "github.com/Secured-Finance/dione/blockchain/utils"
types2 "github.com/Secured-Finance/dione/blockchain/types" types2 "github.com/Secured-Finance/dione/blockchain/types"
@ -31,10 +33,13 @@ type BlockChain struct {
db lmdb.DBI db lmdb.DBI
metadataIndex *utils.Index metadataIndex *utils.Index
heightIndex *utils.Index heightIndex *utils.Index
bus EventBus.Bus
} }
func NewBlockChain(path string) (*BlockChain, error) { func NewBlockChain(path string, bus EventBus.Bus) (*BlockChain, error) {
chain := &BlockChain{} chain := &BlockChain{
bus: bus,
}
// configure lmdb env // configure lmdb env
env, err := lmdb.NewEnv() env, err := lmdb.NewEnv()
@ -84,7 +89,11 @@ func NewBlockChain(path string) (*BlockChain, error) {
} }
func (bp *BlockChain) setLatestBlockHeight(height uint64) error { func (bp *BlockChain) setLatestBlockHeight(height uint64) error {
return bp.metadataIndex.PutUint64([]byte(LatestBlockHeightKey), height) err := bp.metadataIndex.PutUint64([]byte(LatestBlockHeightKey), height)
if err != nil {
return err
}
return nil
} }
func (bp *BlockChain) GetLatestBlockHeight() (uint64, error) { func (bp *BlockChain) GetLatestBlockHeight() (uint64, error) {
@ -134,17 +143,17 @@ func (bp *BlockChain) StoreBlock(block *types2.Block) error {
return err return err
} }
if err == ErrLatestHeightNil { if err == ErrLatestHeightNil || block.Header.Height > height {
if err = bp.setLatestBlockHeight(block.Header.Height); err != nil { if err = bp.setLatestBlockHeight(block.Header.Height); err != nil {
return err return err
} }
} else { } else if block.Header.Height > height {
if block.Header.Height > height { if err = bp.setLatestBlockHeight(block.Header.Height); err != nil {
if err = bp.setLatestBlockHeight(block.Header.Height); err != nil { return err
return err
}
} }
bp.bus.Publish("blockchain:latestBlockHeightUpdated", block)
} }
bp.bus.Publish("blockchain:blockCommitted", block)
return nil return nil
} }

View File

@ -3,6 +3,9 @@ package pool
import ( import (
"bytes" "bytes"
"encoding/hex" "encoding/hex"
"time"
"github.com/asaskevich/EventBus"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -15,20 +18,27 @@ type BlockPool struct {
mempool *Mempool mempool *Mempool
knownBlocks cache.Cache knownBlocks cache.Cache
acceptedBlocks cache.Cache acceptedBlocks cache.Cache
bus EventBus.Bus
} }
func NewBlockPool(mp *Mempool) (*BlockPool, error) { func NewBlockPool(mp *Mempool, bus EventBus.Bus) (*BlockPool, error) {
bp := &BlockPool{ bp := &BlockPool{
acceptedBlocks: cache.NewInMemoryCache(), // here we need to use separate cache acceptedBlocks: cache.NewInMemoryCache(), // here we need to use separate cache
knownBlocks: cache.NewInMemoryCache(), knownBlocks: cache.NewInMemoryCache(),
mempool: mp, mempool: mp,
bus: bus,
} }
return bp, nil return bp, nil
} }
func (bp *BlockPool) AddBlock(block *types.Block) error { func (bp *BlockPool) AddBlock(block *types.Block) error {
return bp.knownBlocks.Store(hex.EncodeToString(block.Header.Hash), block) err := bp.knownBlocks.StoreWithTTL(hex.EncodeToString(block.Header.Hash), block, 10*time.Minute)
if err != nil {
return err
}
bp.bus.Publish("blockpool:knownBlockAdded", block)
return nil
} }
func (bp *BlockPool) GetBlock(blockhash []byte) (*types.Block, error) { func (bp *BlockPool) GetBlock(blockhash []byte) (*types.Block, error) {
@ -42,10 +52,16 @@ func (bp *BlockPool) PruneBlocks() {
for k := range bp.knownBlocks.Items() { for k := range bp.knownBlocks.Items() {
bp.knownBlocks.Delete(k) bp.knownBlocks.Delete(k)
} }
bp.bus.Publish("blockpool:pruned")
} }
func (bp *BlockPool) AddAcceptedBlock(block *types.Block) error { func (bp *BlockPool) AddAcceptedBlock(block *types.Block) error {
return bp.acceptedBlocks.Store(hex.EncodeToString(block.Header.Hash), block) err := bp.acceptedBlocks.Store(hex.EncodeToString(block.Header.Hash), block)
if err != nil {
return err
}
bp.bus.Publish("blockpool:acceptedBlockAdded", block)
return nil
} }
func (bp *BlockPool) GetAllAcceptedBlocks() []*types.Block { func (bp *BlockPool) GetAllAcceptedBlocks() []*types.Block {

View File

@ -6,6 +6,8 @@ import (
"sort" "sort"
"time" "time"
"github.com/asaskevich/EventBus"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
types2 "github.com/Secured-Finance/dione/blockchain/types" types2 "github.com/Secured-Finance/dione/blockchain/types"
@ -26,11 +28,13 @@ var (
type Mempool struct { type Mempool struct {
cache cache.Cache cache cache.Cache
bus EventBus.Bus
} }
func NewMempool() (*Mempool, error) { func NewMempool(bus EventBus.Bus) (*Mempool, error) {
mp := &Mempool{ mp := &Mempool{
cache: cache.NewInMemoryCache(), // here we need to use separate cache cache: cache.NewInMemoryCache(), // here we need to use separate cache
bus: bus,
} }
return mp, nil return mp, nil
@ -40,13 +44,21 @@ func (mp *Mempool) StoreTx(tx *types2.Transaction) error {
hashStr := hex.EncodeToString(tx.Hash) hashStr := hex.EncodeToString(tx.Hash)
err := mp.cache.StoreWithTTL(DefaultTxPrefix+hashStr, tx, DefaultTxTTL) err := mp.cache.StoreWithTTL(DefaultTxPrefix+hashStr, tx, DefaultTxTTL)
logrus.Infof("Submitted new transaction in mempool with hash %x", tx.Hash) logrus.Infof("Submitted new transaction in mempool with hash %x", tx.Hash)
mp.bus.Publish("mempool:transactionAdded", tx)
return err return err
} }
func (mp *Mempool) DeleteTx(txHash []byte) { func (mp *Mempool) DeleteTx(txHash []byte) error {
hashStr := hex.EncodeToString(txHash) hashStr := hex.EncodeToString(txHash)
var tx types2.Transaction
err := mp.cache.Get(DefaultTxPrefix+hashStr, &tx)
if err != nil {
return err
}
mp.cache.Delete(DefaultTxPrefix + hashStr) mp.cache.Delete(DefaultTxPrefix + hashStr)
logrus.Debugf("Deleted transaction from mempool %x", txHash) logrus.Debugf("Deleted transaction from mempool %x", txHash)
mp.bus.Publish("mempool:transactionRemoved", tx)
return nil
} }
func (mp *Mempool) GetTransactionsForNewBlock() []*types2.Transaction { func (mp *Mempool) GetTransactionsForNewBlock() []*types2.Transaction {

View File

@ -1,6 +1,7 @@
package consensus package consensus
import ( import (
"encoding/hex"
"errors" "errors"
"math/big" "math/big"
"sync" "sync"
@ -15,8 +16,6 @@ import (
"github.com/Secured-Finance/dione/blockchain" "github.com/Secured-Finance/dione/blockchain"
"github.com/Arceliar/phony"
types3 "github.com/Secured-Finance/dione/blockchain/types" types3 "github.com/Secured-Finance/dione/blockchain/types"
"github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/crypto"
@ -47,7 +46,6 @@ const (
) )
type PBFTConsensusManager struct { type PBFTConsensusManager struct {
phony.Inbox
bus EventBus.Bus bus EventBus.Bus
psb *pubsub.PubSubRouter psb *pubsub.PubSubRouter
minApprovals int // FIXME minApprovals int // FIXME
@ -102,21 +100,11 @@ func NewPBFTConsensusManager(
pcm.psb.Hook(pubsub.PrePrepareMessageType, pcm.handlePrePrepare) pcm.psb.Hook(pubsub.PrePrepareMessageType, pcm.handlePrePrepare)
pcm.psb.Hook(pubsub.PrepareMessageType, pcm.handlePrepare) pcm.psb.Hook(pubsub.PrepareMessageType, pcm.handlePrepare)
pcm.psb.Hook(pubsub.CommitMessageType, pcm.handleCommit) pcm.psb.Hook(pubsub.CommitMessageType, pcm.handleCommit)
//bus.SubscribeOnce("sync:initialSyncCompleted", func() { bus.SubscribeAsync("beacon:newEntry", func(entry types2.BeaconEntry) {
// pcm.state.ready = true pcm.onNewBeaconEntry(entry)
//}) }, true)
height, _ := pcm.blockchain.GetLatestBlockHeight() height, _ := pcm.blockchain.GetLatestBlockHeight()
pcm.state.blockHeight = height + 1 pcm.state.blockHeight = height + 1
go func() {
for {
select {
case e := <-b.Beacon.NewEntries():
{
pcm.NewDrandRound(nil, e)
}
}
}
}()
return pcm return pcm
} }
@ -263,94 +251,105 @@ func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.PubSubMessage) {
} }
} }
func (pcm *PBFTConsensusManager) NewDrandRound(from phony.Actor, entry types2.BeaconEntry) { func (pcm *PBFTConsensusManager) onNewBeaconEntry(entry types2.BeaconEntry) {
pcm.Act(from, func() { block, err := pcm.commitAcceptedBlocks()
pcm.state.mutex.Lock() if err != nil {
defer pcm.state.mutex.Unlock() if errors.Is(err, ErrNoAcceptedBlocks) {
block, err := pcm.commitAcceptedBlocks() logrus.WithFields(logrus.Fields{
"round": pcm.state.blockHeight,
}).Infof("No accepted blocks in the current consensus round")
} else {
logrus.Errorf("Failed to select the block in consensus round %d: %s", pcm.state.blockHeight, err.Error())
return
}
}
if block != nil {
// broadcast new block
var newBlockMessage pubsub.PubSubMessage
newBlockMessage.Type = pubsub.NewBlockMessageType
blockSerialized, err := cbor.Marshal(block)
if err != nil { if err != nil {
if errors.Is(err, ErrNoAcceptedBlocks) { logrus.Errorf("Failed to serialize block %x for broadcasting!", block.Header.Hash)
logrus.Infof("No accepted blocks for consensus round %d", pcm.state.blockHeight) } else {
} else { newBlockMessage.Payload = blockSerialized
logrus.Errorf("Failed to select the block in consensus round %d: %s", pcm.state.blockHeight, err.Error()) pcm.psb.BroadcastToServiceTopic(&newBlockMessage)
return
}
} }
if block != nil { // if we are miner of this block
// broadcast new block // then post dione tasks to target chains (currently, only Ethereum)
var newBlockMessage pubsub.PubSubMessage if block.Header.Proposer.String() == pcm.miner.address.String() {
newBlockMessage.Type = pubsub.NewBlockMessageType for _, tx := range block.Data {
blockSerialized, err := cbor.Marshal(block) var task types2.DioneTask
if err != nil { err := cbor.Unmarshal(tx.Data, &task)
logrus.Errorf("Failed to serialize block %x for broadcasting!", block.Header.Hash) if err != nil {
} else { logrus.WithFields(logrus.Fields{
newBlockMessage.Payload = blockSerialized "err": err.Error(),
pcm.psb.BroadcastToServiceTopic(&newBlockMessage) "txHash": hex.EncodeToString(tx.Hash),
} }).Error("Failed to unmarshal transaction payload")
continue // FIXME
// if we are miner for this block }
// then post dione tasks to target chains (currently, only Ethereum) reqIDNumber, ok := big.NewInt(0).SetString(task.RequestID, 10)
if *block.Header.Proposer == pcm.miner.address { if !ok {
for _, v := range block.Data { logrus.WithFields(logrus.Fields{
var task types2.DioneTask "txHash": hex.EncodeToString(tx.Hash),
err := cbor.Unmarshal(v.Data, &task) }).Error("Failed to parse request id number in Dione task")
if err != nil { continue // FIXME
logrus.Errorf("Failed to unmarshal transaction %x payload: %s", v.Hash, err.Error())
continue // FIXME
}
reqIDNumber, ok := big.NewInt(0).SetString(task.RequestID, 10)
if !ok {
logrus.Errorf("Failed to parse request id number in task of tx %x", v.Hash)
continue // FIXME
}
err = pcm.ethereumClient.SubmitRequestAnswer(reqIDNumber, task.Payload)
if err != nil {
logrus.Errorf("Failed to submit task in tx %x: %s", v.Hash, err.Error())
continue // FIXME
}
} }
}
pcm.state.blockHeight = pcm.state.blockHeight + 1 err = pcm.ethereumClient.SubmitRequestAnswer(reqIDNumber, task.Payload)
} if err != nil {
logrus.WithFields(logrus.Fields{
// get latest block "err": err.Error(),
height, err := pcm.blockchain.GetLatestBlockHeight() "txHash": hex.EncodeToString(tx.Hash),
if err != nil { "reqID": reqIDNumber.String(),
logrus.Error(err) }).Error("Failed to submit task to ETH chain")
return continue // FIXME
} }
blockHeader, err := pcm.blockchain.FetchBlockHeaderByHeight(height) logrus.WithFields(logrus.Fields{
if err != nil { "txHash": hex.EncodeToString(tx.Hash),
logrus.Error(err) "reqID": reqIDNumber.String(),
return }).Debug("Dione task has been sucessfully submitted to ETH chain (DioneOracle contract)")
}
pcm.state.drandRound = entry.Round
pcm.state.randomness = entry.Data
minedBlock, err := pcm.miner.MineBlock(entry.Data, entry.Round, blockHeader)
if err != nil {
if errors.Is(err, ErrNoTxForBlock) {
logrus.Info("Skipping consensus round, because we don't have transactions in mempool for including into block")
} else {
logrus.Errorf("Failed to mine the block: %s", err.Error())
}
return
}
// if we are round winner
if minedBlock != nil {
logrus.Infof("We are elected in consensus round %d", pcm.state.blockHeight)
err = pcm.propose(minedBlock)
if err != nil {
logrus.Errorf("Failed to propose the block: %s", err.Error())
return
} }
} }
})
pcm.state.blockHeight = pcm.state.blockHeight + 1
}
// get latest block
height, err := pcm.blockchain.GetLatestBlockHeight()
if err != nil {
logrus.Error(err)
return
}
blockHeader, err := pcm.blockchain.FetchBlockHeaderByHeight(height)
if err != nil {
logrus.Error(err)
return
}
pcm.state.drandRound = entry.Round
pcm.state.randomness = entry.Data
minedBlock, err := pcm.miner.MineBlock(entry.Data, entry.Round, blockHeader)
if err != nil {
if errors.Is(err, ErrNoTxForBlock) {
logrus.Info("Sealing skipped, no transactions in mempool")
} else {
logrus.Errorf("Failed to mine the block: %s", err.Error())
}
return
}
// if we are round winner
if minedBlock != nil {
logrus.WithField("round", pcm.state.blockHeight).Infof("We are elected in consensus round")
err = pcm.propose(minedBlock)
if err != nil {
logrus.Errorf("Failed to propose the block: %s", err.Error())
return
}
}
} }
func (pcm *PBFTConsensusManager) commitAcceptedBlocks() (*types3.Block, error) { func (pcm *PBFTConsensusManager) commitAcceptedBlocks() (*types3.Block, error) {
@ -374,10 +373,21 @@ func (pcm *PBFTConsensusManager) commitAcceptedBlocks() (*types3.Block, error) {
maxStake = stake maxStake = stake
selectedBlock = v selectedBlock = v
} }
logrus.Infof("Committed block %x with height %d of miner %s", selectedBlock.Header.Hash, selectedBlock.Header.Height, selectedBlock.Header.Proposer.String()) logrus.WithFields(logrus.Fields{
"hash": hex.EncodeToString(selectedBlock.Header.Hash),
"height": selectedBlock.Header.Height,
"miner": selectedBlock.Header.Proposer.String(),
}).Info("Committed new block")
pcm.blockPool.PruneAcceptedBlocks(selectedBlock) pcm.blockPool.PruneAcceptedBlocks(selectedBlock)
for _, v := range selectedBlock.Data { for _, v := range selectedBlock.Data {
pcm.mempool.DeleteTx(v.Hash) err := pcm.mempool.DeleteTx(v.Hash)
if err != nil {
logrus.WithFields(logrus.Fields{
"err": err.Error(),
"tx": hex.EncodeToString(v.Hash),
}).Errorf("Failed to delete committed tx from mempool")
continue
}
} }
return selectedBlock, pcm.blockchain.StoreBlock(selectedBlock) return selectedBlock, pcm.blockchain.StoreBlock(selectedBlock)
} }

View File

@ -166,7 +166,10 @@ func NewConsensusValidator(miner *Miner, bc *blockchain.BlockChain, b beacon.Bea
return return
} }
} else { } else {
logrus.Debugf("Origin chain [%v]/request type[%v] doesn't have any payload validation!", task.OriginChain, task.RequestType) logrus.WithFields(logrus.Fields{
"originChain": task.OriginChain,
"requestType": task.RequestType,
}).Debug("This origin chain/request type doesn't have any payload validation!")
} }
}(v, result) }(v, result)
} }

View File

@ -90,7 +90,7 @@ func (m *Miner) GetStakeInfo(miner common.Address) (*big.Int, *big.Int, error) {
} }
func (m *Miner) MineBlock(randomness []byte, randomnessRound uint64, lastBlockHeader *types2.BlockHeader) (*types2.Block, error) { func (m *Miner) MineBlock(randomness []byte, randomnessRound uint64, lastBlockHeader *types2.BlockHeader) (*types2.Block, error) {
logrus.Debug("attempting to mine the block at epoch: ", lastBlockHeader.Height+1) logrus.WithField("height", lastBlockHeader.Height+1).Debug("Trying to mine new block...")
if err := m.UpdateCurrentStakeInfo(); err != nil { if err := m.UpdateCurrentStakeInfo(); err != nil {
return nil, fmt.Errorf("failed to update miner stake: %w", err) return nil, fmt.Errorf("failed to update miner stake: %w", err)

View File

@ -93,7 +93,13 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim
logrus.Fatal(err) logrus.Fatal(err)
} }
n.Host = lhost n.Host = lhost
logrus.Info("Libp2p host has been successfully initialized!") logrus.WithField(
"multiaddress",
fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s",
n.Config.ListenAddr,
n.Config.ListenPort,
n.Host.ID().Pretty(),
)).Info("Libp2p host has been initialized!")
// initialize ethereum client // initialize ethereum client
ethClient, err := provideEthereumClient(n.Config) ethClient, err := provideEthereumClient(n.Config)
@ -101,14 +107,14 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim
logrus.Fatal(err) logrus.Fatal(err)
} }
n.Ethereum = ethClient n.Ethereum = ethClient
logrus.Info("Ethereum client has been successfully initialized!") logrus.WithField("ethAddress", ethClient.GetEthAddress().Hex()).Info("Ethereum client has been initialized!")
// initialize blockchain rpc clients // initialize blockchain rpc clients
err = n.setupRPCClients() err = n.setupRPCClients()
if err != nil { if err != nil {
logrus.Fatal(err) logrus.Fatal(err)
} }
logrus.Info("RPC clients has been successfully configured!") logrus.Info("Foreign Blockchain RPC clients has been successfully configured!")
// initialize pubsub subsystem // initialize pubsub subsystem
psb := providePubsubRouter(lhost, n.Config) psb := providePubsubRouter(lhost, n.Config)
@ -137,7 +143,7 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim
// == initialize blockchain modules // == initialize blockchain modules
// initialize blockpool database // initialize blockpool database
bc, err := provideBlockChain(n.Config) bc, err := provideBlockChain(n.Config, bus)
if err != nil { if err != nil {
logrus.Fatalf("Failed to initialize blockpool: %s", err.Error()) logrus.Fatalf("Failed to initialize blockpool: %s", err.Error())
} }
@ -145,14 +151,14 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim
logrus.Info("Block pool database has been successfully initialized!") logrus.Info("Block pool database has been successfully initialized!")
// initialize mempool // initialize mempool
mp, err := provideMemPool() mp, err := provideMemPool(bus)
if err != nil { if err != nil {
logrus.Fatalf("Failed to initialize mempool: %s", err.Error()) logrus.Fatalf("Failed to initialize mempool: %s", err.Error())
} }
n.MemPool = mp n.MemPool = mp
logrus.Info("Mempool has been successfully initialized!") logrus.Info("Mempool has been successfully initialized!")
bp, err := provideBlockPool(mp) bp, err := provideBlockPool(mp, bus)
if err != nil { if err != nil {
logrus.Fatalf("Failed to initialize blockpool: %s", err.Error()) logrus.Fatalf("Failed to initialize blockpool: %s", err.Error())
} }
@ -166,7 +172,7 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim
if err != nil { if err != nil {
logrus.Fatal(err) logrus.Fatal(err)
} }
logrus.Info("Node p2p RPC network service has been successfully initialized!") logrus.Info("Direct RPC has been successfully initialized!")
// initialize libp2p-gorpc client // initialize libp2p-gorpc client
r := provideP2PRPCClient(lhost) r := provideP2PRPCClient(lhost)
@ -184,7 +190,7 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim
logrus.Fatal(err) logrus.Fatal(err)
} }
n.SyncManager = sm n.SyncManager = sm
logrus.Info("Blockchain synchronization subsystem has been successfully initialized!") logrus.Info("Blockchain sync subsystem has been successfully initialized!")
// initialize mining subsystem // initialize mining subsystem
miner := provideMiner(n.Host.ID(), *n.Ethereum.GetEthAddress(), n.Ethereum, prvKey, mp) miner := provideMiner(n.Host.ID(), *n.Ethereum.GetEthAddress(), n.Ethereum, prvKey, mp)
@ -192,7 +198,7 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim
logrus.Info("Mining subsystem has been initialized!") logrus.Info("Mining subsystem has been initialized!")
// initialize random beacon network subsystem // initialize random beacon network subsystem
randomBeaconNetwork, err := provideBeacon(psb.Pubsub) randomBeaconNetwork, err := provideBeacon(psb.Pubsub, bus)
if err != nil { if err != nil {
logrus.Fatal(err) logrus.Fatal(err)
} }
@ -238,8 +244,6 @@ func (n *Node) Run(ctx context.Context) error {
} }
func (n *Node) runLibp2pAsync(ctx context.Context) error { func (n *Node) runLibp2pAsync(ctx context.Context) error {
logrus.Info(fmt.Sprintf("[*] Your Multiaddress Is: /ip4/%s/tcp/%d/p2p/%s", n.Config.ListenAddr, n.Config.ListenPort, n.Host.ID().Pretty()))
logrus.Info("Announcing ourselves...") logrus.Info("Announcing ourselves...")
_, err := n.PeerDiscovery.Advertise(context.TODO(), n.Config.Rendezvous) _, err := n.PeerDiscovery.Advertise(context.TODO(), n.Config.Rendezvous)
if err != nil { if err != nil {
@ -267,12 +271,15 @@ func (n *Node) runLibp2pAsync(ctx context.Context) error {
if newPeer.ID.String() == n.Host.ID().String() { if newPeer.ID.String() == n.Host.ID().String() {
continue continue
} }
logrus.Infof("Found peer: %s", newPeer) logrus.WithField("peer", newPeer.ID).Info("Discovered new peer, connecting...")
// Connect to the peer // Connect to the peer
if err := n.Host.Connect(ctx, newPeer); err != nil { if err := n.Host.Connect(ctx, newPeer); err != nil {
logrus.Warn("Connection failed: ", err) logrus.WithFields(logrus.Fields{
"peer": newPeer.ID,
"err": err.Error(),
}).Warn("Connection with newly discovered peer has been failed")
} }
logrus.Info("Connected to newly discovered peer: ", newPeer) logrus.WithField("peer", newPeer.ID).Info("Connected to newly discovered peer")
} }
} }
} }
@ -348,6 +355,7 @@ func (n *Node) setupRPCClients() error {
func Start() { func Start() {
logrus.SetReportCaller(true) logrus.SetReportCaller(true)
logrus.SetFormatter(&logrus.TextFormatter{ logrus.SetFormatter(&logrus.TextFormatter{
FullTimestamp: true,
CallerPrettyfier: func(f *runtime.Frame) (string, string) { CallerPrettyfier: func(f *runtime.Frame) (string, string) {
filename := path.Base(f.File) filename := path.Base(f.File)
return "", fmt.Sprintf("%s:%d:", filename, f.Line) return "", fmt.Sprintf("%s:%d:", filename, f.Line)

View File

@ -64,8 +64,8 @@ func provideMiner(peerID peer.ID, ethAddress common.Address, ethClient *ethclien
return consensus.NewMiner(peerID, ethAddress, ethClient, privateKey, mempool) return consensus.NewMiner(peerID, ethAddress, ethClient, privateKey, mempool)
} }
func provideBeacon(ps *pubsub2.PubSub) (beacon.BeaconNetwork, error) { func provideBeacon(ps *pubsub2.PubSub, bus EventBus.Bus) (beacon.BeaconNetwork, error) {
bc, err := drand2.NewDrandBeacon(ps) bc, err := drand2.NewDrandBeacon(ps, bus)
if err != nil { if err != nil {
return beacon.BeaconNetwork{}, fmt.Errorf("failed to setup drand beacon: %w", err) return beacon.BeaconNetwork{}, fmt.Errorf("failed to setup drand beacon: %w", err)
} }
@ -176,12 +176,12 @@ func providePeerDiscovery(baddrs []multiaddr.Multiaddr, h host.Host, pexDiscover
return pexDiscovery, nil return pexDiscovery, nil
} }
func provideBlockChain(config *config.Config) (*blockchain.BlockChain, error) { func provideBlockChain(config *config.Config, bus EventBus.Bus) (*blockchain.BlockChain, error) {
return blockchain.NewBlockChain(config.Blockchain.DatabasePath) return blockchain.NewBlockChain(config.Blockchain.DatabasePath, bus)
} }
func provideMemPool() (*pool.Mempool, error) { func provideMemPool(bus EventBus.Bus) (*pool.Mempool, error) {
return pool.NewMempool() return pool.NewMempool(bus)
} }
func provideSyncManager(bus EventBus.Bus, bp *blockchain.BlockChain, mp *pool.Mempool, r *gorpc.Client, bootstrap multiaddr.Multiaddr, psb *pubsub.PubSubRouter) (sync.SyncManager, error) { func provideSyncManager(bus EventBus.Bus, bp *blockchain.BlockChain, mp *pool.Mempool, r *gorpc.Client, bootstrap multiaddr.Multiaddr, psb *pubsub.PubSubRouter) (sync.SyncManager, error) {
@ -205,6 +205,6 @@ func provideNetworkService(bp *blockchain.BlockChain, mp *pool.Mempool) *Network
return NewNetworkService(bp, mp) return NewNetworkService(bp, mp)
} }
func provideBlockPool(mp *pool.Mempool) (*pool.BlockPool, error) { func provideBlockPool(mp *pool.Mempool, bus EventBus.Bus) (*pool.BlockPool, error) {
return pool.NewBlockPool(mp) return pool.NewBlockPool(mp, bus)
} }