diff --git a/consensus/consensus.go b/consensus/consensus.go index b099469..e47c808 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -14,8 +14,6 @@ import ( "github.com/fxamacker/cbor/v2" - "github.com/Secured-Finance/dione/cache" - "github.com/asaskevich/EventBus" "github.com/Secured-Finance/dione/blockchain" @@ -42,13 +40,11 @@ var ( type PBFTConsensusManager struct { bus EventBus.Bus psb *pubsub.PubSubRouter - minApprovals int // FIXME privKey crypto.PrivKey - msgLog *ConsensusMessageLog validator *ConsensusValidator ethereumClient *ethclient.EthereumClient miner *blockchain.Miner - consensusRoundPool *ConsensusRoundPool + consensusRoundPool *ConsensusStatePool mempool *pool.Mempool blockchain *blockchain.BlockChain address peer.ID @@ -58,12 +54,11 @@ type PBFTConsensusManager struct { func NewPBFTConsensusManager( bus EventBus.Bus, psb *pubsub.PubSubRouter, - minApprovals int, privKey crypto.PrivKey, ethereumClient *ethclient.EthereumClient, miner *blockchain.Miner, bc *blockchain.BlockChain, - bp *ConsensusRoundPool, + bp *ConsensusStatePool, db *drand2.DrandBeacon, mempool *pool.Mempool, address peer.ID, @@ -72,8 +67,6 @@ func NewPBFTConsensusManager( psb: psb, miner: miner, validator: NewConsensusValidator(miner, bc, db), - msgLog: NewConsensusMessageLog(), - minApprovals: minApprovals, privKey: privKey, ethereumClient: ethereumClient, bus: bus, @@ -84,26 +77,85 @@ func NewPBFTConsensusManager( stateChangeChannels: map[string]map[State][]chan bool{}, } - return pcm -} - -func (pcm *PBFTConsensusManager) Run() { pcm.psb.Hook(pubsub.PrePrepareMessageType, pcm.handlePrePrepare) pcm.psb.Hook(pubsub.PrepareMessageType, pcm.handlePrepare) pcm.psb.Hook(pubsub.CommitMessageType, pcm.handleCommit) + pcm.bus.SubscribeAsync("beacon:newEntry", func(entry types2.BeaconEntry) { pcm.onNewBeaconEntry(entry) }, true) + + pcm.bus.SubscribeAsync("consensus:newState", func(block *types3.Block, newStateNumber int) { + newState := State(newStateNumber) // hacky, because reflection panics if we pass int to a handler which has type-alias for int + + consensusMessageType := types.ConsensusMessageTypeUnknown + + switch newState { + case StateStatusPrePrepared: + { + logrus.WithField("blockHash", fmt.Sprintf("%x", block.Header.Hash)).Debugf("Entered into PREPREPARED state") + if *block.Header.Proposer == pcm.address { + return + } + consensusMessageType = types.ConsensusMessageTypePrepare + break + } + case StateStatusPrepared: + { + consensusMessageType = types.ConsensusMessageTypeCommit + logrus.WithField("blockHash", fmt.Sprintf("%x", block.Header.Hash)).Debugf("Entered into PREPARED state") + break + } + case StateStatusCommited: + { + logrus.WithField("blockHash", fmt.Sprintf("%x", block.Header.Hash)).Debugf("Entered into COMMITTED state") + break + } + } + + if consensusMessageType == types.ConsensusMessageTypeUnknown { + return + } + + message, err := NewMessage(&types.ConsensusMessage{ + Type: consensusMessageType, + Blockhash: block.Header.Hash, + }, pcm.privKey) + if err != nil { + logrus.Errorf("Failed to create consensus message: %v", err) + return + } + if err = pcm.psb.BroadcastToServiceTopic(message); err != nil { + logrus.Errorf("Failed to send consensus message: %s", err.Error()) + return + } + }, true) + + return pcm } func (pcm *PBFTConsensusManager) propose(blk *types3.Block) error { - prePrepareMsg, err := NewMessage(types.ConsensusMessage{Block: blk}, types.ConsensusMessageTypePrePrepare, pcm.privKey) + cmsg := &types.ConsensusMessage{ + Type: StateStatusPrePrepared, + Block: blk, + Blockhash: blk.Header.Hash, + From: pcm.address, + } + prePrepareMsg, err := NewMessage(cmsg, pcm.privKey) if err != nil { return err } + time.Sleep(1 * time.Second) // wait until all nodes will commit previous blocks - pcm.psb.BroadcastToServiceTopic(prePrepareMsg) - pcm.consensusRoundPool.AddConsensusInfo(blk) + + if err = pcm.consensusRoundPool.InsertMessageIntoLog(cmsg); err != nil { + return err + } + + if err = pcm.psb.BroadcastToServiceTopic(prePrepareMsg); err != nil { + return err + } + logrus.WithField("blockHash", fmt.Sprintf("%x", blk.Header.Hash)).Debugf("Entered into PREPREPARED state") return nil } @@ -120,48 +172,28 @@ func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.PubSubMessage) return } - cmsg := types.ConsensusMessage{ + cmsg := &types.ConsensusMessage{ Type: types.ConsensusMessageTypePrePrepare, From: message.From, Block: prePrepare.Block, Blockhash: prePrepare.Block.Header.Hash, } - if pcm.msgLog.Exists(cmsg) { - logrus.Tracef("received existing pre_prepare msg for block %x", cmsg.Block.Header.Hash) - return - } if !pcm.validator.Valid(cmsg) { - logrus.Warnf("received invalid pre_prepare msg for block %x", cmsg.Block.Header.Hash) + logrus.WithField("blockHash", hex.EncodeToString(cmsg.Block.Header.Hash)).Warn("Received invalid PREPREPARE for block") + return + } + + err = pcm.consensusRoundPool.InsertMessageIntoLog(cmsg) + if err != nil { + logrus.WithField("err", err.Error()).Warn("Failed to add PREPARE message to log") return } - pcm.msgLog.AddMessage(cmsg) logrus.WithFields(logrus.Fields{ "blockHash": fmt.Sprintf("%x", cmsg.Block.Header.Hash), "from": message.From.String(), }).Debug("Received PREPREPARE message") - pcm.consensusRoundPool.AddConsensusInfo(cmsg.Block) - - encodedHash := hex.EncodeToString(cmsg.Blockhash) - if m, ok := pcm.stateChangeChannels[encodedHash]; ok { - if channels, ok := m[StateStatusPrePrepared]; ok { - for _, v := range channels { - v <- true - close(v) - delete(pcm.stateChangeChannels, encodedHash) - } - } - } - - prepareMsg, err := NewMessage(cmsg, types.ConsensusMessageTypePrepare, pcm.privKey) - if err != nil { - logrus.Errorf("failed to create prepare message: %v", err) - return - } - - logrus.WithField("blockHash", fmt.Sprintf("%x", prePrepare.Block.Header.Hash)).Debugf("Entered into PREPREPARED state") - pcm.psb.BroadcastToServiceTopic(prepareMsg) } func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) { @@ -172,73 +204,28 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) { return } - cmsg := types.ConsensusMessage{ + cmsg := &types.ConsensusMessage{ Type: types.ConsensusMessageTypePrepare, From: message.From, Blockhash: prepare.Blockhash, Signature: prepare.Signature, } - if _, err := pcm.consensusRoundPool.GetConsensusInfo(cmsg.Blockhash); errors.Is(err, cache.ErrNotFound) { - encodedHash := hex.EncodeToString(cmsg.Blockhash) - logrus.WithField("blockHash", encodedHash).Warn("Received PREPARE before PREPREPARED state, waiting...") - waitingCh := make(chan bool, 1) - if _, ok := pcm.stateChangeChannels[encodedHash]; !ok { - pcm.stateChangeChannels[encodedHash] = map[State][]chan bool{} - } - pcm.stateChangeChannels[encodedHash][StateStatusPrePrepared] = append(pcm.stateChangeChannels[encodedHash][StateStatusPrePrepared], waitingCh) - result := <-waitingCh - if !result { - return - } - } - - if pcm.msgLog.Exists(cmsg) { - logrus.Tracef("received existing prepare msg for block %x", cmsg.Blockhash) - return - } - if !pcm.validator.Valid(cmsg) { logrus.Warnf("received invalid prepare msg for block %x", cmsg.Blockhash) return } - pcm.msgLog.AddMessage(cmsg) + err = pcm.consensusRoundPool.InsertMessageIntoLog(cmsg) + if err != nil { + logrus.WithField("err", err.Error()).Warn("Failed to add PREPARE message to log") + return + } + logrus.WithFields(logrus.Fields{ "blockHash": fmt.Sprintf("%x", cmsg.Blockhash), "from": message.From.String(), }).Debug("Received PREPARE message") - - if len(pcm.msgLog.Get(types.ConsensusMessageTypePrepare, cmsg.Blockhash)) >= pcm.minApprovals-1 { - ci, err := pcm.consensusRoundPool.GetConsensusInfo(cmsg.Blockhash) - if err != nil { - logrus.Fatalf("This should never happen: %s", err.Error()) - } - if ci.State >= StateStatusPrepared { - return - } - - commitMsg, err := NewMessage(cmsg, types.ConsensusMessageTypeCommit, pcm.privKey) - if err != nil { - logrus.Errorf("failed to create commit message: %v", err) - return - } - pcm.psb.BroadcastToServiceTopic(commitMsg) - logrus.WithField("blockHash", fmt.Sprintf("%x", cmsg.Blockhash)).Debugf("Entered into PREPARED state") - pcm.consensusRoundPool.UpdateConsensusState(cmsg.Blockhash, StateStatusPrepared) - - // pull watchers - encodedHash := hex.EncodeToString(cmsg.Blockhash) - if m, ok := pcm.stateChangeChannels[encodedHash]; ok { - if channels, ok := m[StateStatusPrepared]; ok { - for _, v := range channels { - v <- true - close(v) - delete(pcm.stateChangeChannels, encodedHash) - } - } - } - } } func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.PubSubMessage) { @@ -249,61 +236,28 @@ func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.PubSubMessage) { return } - cmsg := types.ConsensusMessage{ + cmsg := &types.ConsensusMessage{ Type: types.ConsensusMessageTypeCommit, From: message.From, Blockhash: commit.Blockhash, Signature: commit.Signature, } - ci, err := pcm.consensusRoundPool.GetConsensusInfo(cmsg.Blockhash) - - encodedHash := hex.EncodeToString(cmsg.Blockhash) - - if errors.Is(err, cache.ErrNotFound) || ci.State < StateStatusPrepared { - logrus.WithFields(logrus.Fields{ - "blockHash": encodedHash, - "from": cmsg.From, - }).Warnf("Received COMMIT message before PREPARED state, waiting...") - waitingCh := make(chan bool, 1) - if _, ok := pcm.stateChangeChannels[encodedHash]; !ok { - pcm.stateChangeChannels[encodedHash] = map[State][]chan bool{} - } - pcm.stateChangeChannels[encodedHash][StateStatusPrepared] = append(pcm.stateChangeChannels[encodedHash][StateStatusPrepared], waitingCh) - result := <-waitingCh - if !result { - return - } - } - - if pcm.msgLog.Exists(cmsg) { - logrus.Tracef("received existing commit msg for block %x", cmsg.Blockhash) - return - } if !pcm.validator.Valid(cmsg) { logrus.Warnf("received invalid commit msg for block %x", cmsg.Blockhash) return } - pcm.msgLog.AddMessage(cmsg) + err = pcm.consensusRoundPool.InsertMessageIntoLog(cmsg) + if err != nil { + logrus.WithField("err", err.Error()).Warn("Failed to add COMMIT message to log") + return + } logrus.WithFields(logrus.Fields{ "blockHash": fmt.Sprintf("%x", cmsg.Blockhash), "from": message.From.String(), }).Debug("Received COMMIT message") - - if len(pcm.msgLog.Get(types.ConsensusMessageTypeCommit, cmsg.Blockhash)) >= pcm.minApprovals { - ci, err := pcm.consensusRoundPool.GetConsensusInfo(cmsg.Blockhash) - if err != nil { - logrus.Fatalf("This should never happen: %s", err.Error()) - } - if ci.State == StateStatusCommited { - return - } - - logrus.WithField("blockHash", fmt.Sprintf("%x", cmsg.Blockhash)).Debugf("Entered into COMMIT state") - pcm.consensusRoundPool.UpdateConsensusState(cmsg.Blockhash, StateStatusCommited) - } } func (pcm *PBFTConsensusManager) onNewBeaconEntry(entry types2.BeaconEntry) { diff --git a/consensus/consensus_round_pool.go b/consensus/consensus_round_pool.go index 902e066..54e172c 100644 --- a/consensus/consensus_round_pool.go +++ b/consensus/consensus_round_pool.go @@ -1,10 +1,11 @@ package consensus import ( - "bytes" "encoding/hex" "fmt" - "time" + "sync" + + types2 "github.com/Secured-Finance/dione/consensus/types" "github.com/Secured-Finance/dione/blockchain/pool" @@ -13,7 +14,6 @@ import ( "github.com/sirupsen/logrus" "github.com/Secured-Finance/dione/blockchain/types" - "github.com/Secured-Finance/dione/cache" ) type State uint8 @@ -26,94 +26,101 @@ const ( StateStatusCommited ) -// ConsensusRoundPool is pool for blocks that isn't not validated or committed yet -type ConsensusRoundPool struct { - mempool *pool.Mempool - consensusInfoStorage cache.Cache - bus EventBus.Bus +// ConsensusStatePool is pool for blocks that isn't not validated or committed yet +type ConsensusStatePool struct { + mempool *pool.Mempool + consensusInfoMap map[string]*ConsensusInfo + mapMutex sync.Mutex + bus EventBus.Bus + minApprovals int // FIXME } -func NewConsensusRoundPool(mp *pool.Mempool, bus EventBus.Bus) (*ConsensusRoundPool, error) { - bp := &ConsensusRoundPool{ - consensusInfoStorage: cache.NewInMemoryCache(), - mempool: mp, - bus: bus, +func NewConsensusRoundPool(mp *pool.Mempool, bus EventBus.Bus, minApprovals int) (*ConsensusStatePool, error) { + bp := &ConsensusStatePool{ + consensusInfoMap: map[string]*ConsensusInfo{}, + mempool: mp, + bus: bus, + minApprovals: minApprovals, } return bp, nil } type ConsensusInfo struct { - Block *types.Block - State State + Blockhash []byte + Block *types.Block + State State + MessageLog *ConsensusMessageLog } -func (crp *ConsensusRoundPool) AddConsensusInfo(block *types.Block) error { - encodedHash := hex.EncodeToString(block.Header.Hash) - - if crp.consensusInfoStorage.Exists(encodedHash) { - return nil +func (crp *ConsensusStatePool) InsertMessageIntoLog(cmsg *types2.ConsensusMessage) error { + crp.mapMutex.Lock() + defer crp.mapMutex.Unlock() + consensusInfo, ok := crp.consensusInfoMap[hex.EncodeToString(cmsg.Blockhash)] + if !ok { + consensusInfo = &ConsensusInfo{ + Block: cmsg.Block, + Blockhash: cmsg.Blockhash, + State: StateStatusUnknown, + MessageLog: NewConsensusMessageLog(), + } + crp.consensusInfoMap[hex.EncodeToString(cmsg.Blockhash)] = consensusInfo } - err := crp.consensusInfoStorage.StoreWithTTL(encodedHash, &ConsensusInfo{ - Block: block, - State: StateStatusPrePrepared, - }, 10*time.Minute) - if err != nil { - return err + added := consensusInfo.MessageLog.AddMessage(cmsg) + if !added { + return fmt.Errorf("consensus message already exists in message log") } - logrus.WithField("hash", fmt.Sprintf("%x", block.Header.Hash)).Debug("New block discovered") - crp.bus.Publish("blockpool:knownBlockAdded", block) + + crp.maybeUpdateConsensusState(consensusInfo, cmsg) + return nil } -func (crp *ConsensusRoundPool) UpdateConsensusState(blockhash []byte, newState State) error { - encodedHash := hex.EncodeToString(blockhash) - - var consensusInfo ConsensusInfo - err := crp.consensusInfoStorage.Get(encodedHash, &consensusInfo) - if err != nil { - return err +func (crp *ConsensusStatePool) maybeUpdateConsensusState(ci *ConsensusInfo, cmsg *types2.ConsensusMessage) { + if ci.State == StateStatusUnknown && cmsg.Type == types2.ConsensusMessageTypePrePrepare && cmsg.Block != nil { + ci.Block = cmsg.Block + logrus.WithField("hash", fmt.Sprintf("%x", cmsg.Block.Header.Hash)).Debug("New block discovered") + ci.State = StateStatusPrePrepared + crp.bus.Publish("consensus:newState", ci.Block, StateStatusPrePrepared) } - if newState < consensusInfo.State { - return fmt.Errorf("attempt to set incorrect state") + if len(ci.MessageLog.Get(types2.ConsensusMessageTypePrepare, cmsg.Blockhash)) >= crp.minApprovals-1 && ci.State == StateStatusPrePrepared { // FIXME approval across 2f nodes + ci.State = StateStatusPrepared + crp.bus.Publish("consensus:newState", ci.Block, StateStatusPrepared) } - consensusInfo.State = newState - crp.bus.Publish("blockpool:newConsensusState", blockhash, newState) - return crp.consensusInfoStorage.StoreWithTTL(encodedHash, &consensusInfo, 10*time.Minute) -} -func (crp *ConsensusRoundPool) GetConsensusInfo(blockhash []byte) (*ConsensusInfo, error) { - var consensusInfo ConsensusInfo - err := crp.consensusInfoStorage.Get(hex.EncodeToString(blockhash), &consensusInfo) - return &consensusInfo, err + if len(ci.MessageLog.Get(types2.ConsensusMessageTypeCommit, cmsg.Blockhash)) >= crp.minApprovals && ci.State == StateStatusPrepared { // FIXME approval across 2f+1 nodes + ci.State = StateStatusCommited + crp.bus.Publish("consensus:newState", ci.Block, StateStatusCommited) + } } // Prune cleans known blocks list. It is called when new consensus round starts. -func (crp *ConsensusRoundPool) Prune() { - for k := range crp.consensusInfoStorage.Items() { - crp.consensusInfoStorage.Delete(k) +func (crp *ConsensusStatePool) Prune() { + for k := range crp.consensusInfoMap { + delete(crp.consensusInfoMap, k) } crp.bus.Publish("blockpool:pruned") } -func (crp *ConsensusRoundPool) GetAllBlocksWithCommit() []*ConsensusInfo { +func (crp *ConsensusStatePool) GetAllBlocksWithCommit() []*ConsensusInfo { + crp.mapMutex.Lock() + defer crp.mapMutex.Unlock() var consensusInfos []*ConsensusInfo - for _, v := range crp.consensusInfoStorage.Items() { - ci := v.(*ConsensusInfo) - if ci.State == StateStatusCommited { - consensusInfos = append(consensusInfos, ci) + for _, v := range crp.consensusInfoMap { + if v.State == StateStatusCommited { + consensusInfos = append(consensusInfos, v) } } return consensusInfos } -func containsTx(s []*types.Transaction, e *types.Transaction) bool { - for _, a := range s { - if bytes.Equal(a.Hash, e.Hash) { - return true - } - } - return false -} +//func containsTx(s []*types.Transaction, e *types.Transaction) bool { +// for _, a := range s { +// if bytes.Equal(a.Hash, e.Hash) { +// return true +// } +// } +// return false +//} diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index 7bf49cb..24e84fc 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -12,7 +12,7 @@ import ( ) type ConsensusValidator struct { - validationFuncMap map[types2.ConsensusMessageType]func(msg types2.ConsensusMessage) bool + validationFuncMap map[types2.ConsensusMessageType]func(msg *types2.ConsensusMessage) bool miner *blockchain.Miner beacon *drand2.DrandBeacon blockchain *blockchain.BlockChain @@ -25,8 +25,8 @@ func NewConsensusValidator(miner *blockchain.Miner, bc *blockchain.BlockChain, d beacon: db, } - cv.validationFuncMap = map[types2.ConsensusMessageType]func(msg types2.ConsensusMessage) bool{ - types2.ConsensusMessageTypePrePrepare: func(msg types2.ConsensusMessage) bool { + cv.validationFuncMap = map[types2.ConsensusMessageType]func(msg *types2.ConsensusMessage) bool{ + types2.ConsensusMessageTypePrePrepare: func(msg *types2.ConsensusMessage) bool { if err := cv.blockchain.ValidateBlock(msg.Block); err != nil { logrus.WithFields(logrus.Fields{ "blockHash": hex.EncodeToString(msg.Block.Header.Hash), @@ -43,11 +43,11 @@ func NewConsensusValidator(miner *blockchain.Miner, bc *blockchain.BlockChain, d return cv } -func (cv *ConsensusValidator) Valid(msg types2.ConsensusMessage) bool { +func (cv *ConsensusValidator) Valid(msg *types2.ConsensusMessage) bool { return cv.validationFuncMap[msg.Type](msg) } -func checkSignatureForBlockhash(msg types2.ConsensusMessage) bool { +func checkSignatureForBlockhash(msg *types2.ConsensusMessage) bool { pubKey, err := msg.From.ExtractPublicKey() if err != nil { // TODO logging diff --git a/consensus/msg_log.go b/consensus/msg_log.go index 77a57aa..3276a0f 100644 --- a/consensus/msg_log.go +++ b/consensus/msg_log.go @@ -8,24 +8,22 @@ import ( ) type ConsensusMessageLog struct { - messages mapset.Set - maxLogSize int + messages mapset.Set } func NewConsensusMessageLog() *ConsensusMessageLog { msgLog := &ConsensusMessageLog{ - messages: mapset.NewSet(), - maxLogSize: 0, // TODO + messages: mapset.NewSet(), } return msgLog } -func (ml *ConsensusMessageLog) AddMessage(msg types2.ConsensusMessage) { - ml.messages.Add(msg) +func (ml *ConsensusMessageLog) AddMessage(msg *types2.ConsensusMessage) bool { + return ml.messages.Add(msg) } -func (ml *ConsensusMessageLog) Exists(msg types2.ConsensusMessage) bool { +func (ml *ConsensusMessageLog) Exists(msg *types2.ConsensusMessage) bool { return ml.messages.Contains(msg) } @@ -33,7 +31,7 @@ func (ml *ConsensusMessageLog) Get(typ types2.ConsensusMessageType, blockhash [] var result []*types2.ConsensusMessage for v := range ml.messages.Iter() { - msg := v.(types2.ConsensusMessage) + msg := v.(*types2.ConsensusMessage) if msg.Block != nil { } @@ -45,7 +43,7 @@ func (ml *ConsensusMessageLog) Get(typ types2.ConsensusMessageType, blockhash [] msgBlockHash = msg.Blockhash } if bytes.Compare(msgBlockHash, blockhash) == 0 { - result = append(result, &msg) + result = append(result, msg) } } } diff --git a/consensus/utils.go b/consensus/utils.go index 37e7e85..88a1ded 100644 --- a/consensus/utils.go +++ b/consensus/utils.go @@ -12,9 +12,9 @@ import ( "github.com/libp2p/go-libp2p-core/crypto" ) -func NewMessage(cmsg types2.ConsensusMessage, typ types2.ConsensusMessageType, privKey crypto.PrivKey) (*pubsub.PubSubMessage, error) { +func NewMessage(cmsg *types2.ConsensusMessage, privKey crypto.PrivKey) (*pubsub.PubSubMessage, error) { var message pubsub.PubSubMessage - switch typ { + switch cmsg.Type { case types2.ConsensusMessageTypePrePrepare: { message.Type = pubsub.PrePrepareMessageType @@ -36,7 +36,7 @@ func NewMessage(cmsg types2.ConsensusMessage, typ types2.ConsensusMessageType, p return nil, fmt.Errorf("failed to create signature: %v", err) } pm := types2.PrepareMessage{ - Blockhash: cmsg.Block.Header.Hash, + Blockhash: cmsg.Blockhash, Signature: signature, } data, err := cbor.Marshal(pm) @@ -49,14 +49,14 @@ func NewMessage(cmsg types2.ConsensusMessage, typ types2.ConsensusMessageType, p case types2.ConsensusMessageTypeCommit: { message.Type = pubsub.CommitMessageType - pm := types2.CommitMessage{ - Blockhash: cmsg.Blockhash, - } signature, err := privKey.Sign(cmsg.Blockhash) if err != nil { return nil, fmt.Errorf("failed to create signature: %v", err) } - pm.Signature = signature + pm := types2.CommitMessage{ + Blockhash: cmsg.Blockhash, + Signature: signature, + } data, err := cbor.Marshal(pm) if err != nil { return nil, fmt.Errorf("failed to convert message to map: %s", err.Error()) diff --git a/node/node.go b/node/node.go index ec72329..8526b2a 100644 --- a/node/node.go +++ b/node/node.go @@ -77,9 +77,6 @@ func runNode( // Run blockchain sync manager syncManager.Run() - // Run consensus manager - consensusManager.Run() - // Run dispute manager disputeManager.Run(context.TODO()) diff --git a/node/node_dep_providers.go b/node/node_dep_providers.go index 25616be..73ce43c 100644 --- a/node/node_dep_providers.go +++ b/node/node_dep_providers.go @@ -125,21 +125,19 @@ func providePubsubRouter(lhost host.Host, config *config.Config) *pubsub.PubSubR func provideConsensusManager( h host.Host, - cfg *config.Config, bus EventBus.Bus, psb *pubsub.PubSubRouter, miner *blockchain.Miner, bc *blockchain.BlockChain, ethClient *ethclient.EthereumClient, privateKey crypto.PrivKey, - bp *consensus.ConsensusRoundPool, + bp *consensus.ConsensusStatePool, db *drand2.DrandBeacon, mp *pool.Mempool, ) *consensus.PBFTConsensusManager { c := consensus.NewPBFTConsensusManager( bus, psb, - cfg.ConsensusMinApprovals, privateKey, ethClient, miner, @@ -265,12 +263,12 @@ func provideNetworkService(bp *blockchain.BlockChain, mp *pool.Mempool) *Network return ns } -func provideBlockPool(mp *pool.Mempool, bus EventBus.Bus) *consensus.ConsensusRoundPool { - bp, err := consensus.NewConsensusRoundPool(mp, bus) +func provideBlockPool(mp *pool.Mempool, bus EventBus.Bus, config *config.Config) *consensus.ConsensusStatePool { + bp, err := consensus.NewConsensusRoundPool(mp, bus, config.ConsensusMinApprovals) if err != nil { logrus.Fatalf("Failed to initialize blockpool: %s", err.Error()) } - logrus.Info("Blockpool has been successfully initialized!") + logrus.Info("Consensus state pool has been successfully initialized!") return bp }