From e8b220455e1d31c09ee6d04003b0369f38ce38c5 Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Thu, 29 Jul 2021 00:48:01 +0300 Subject: [PATCH] Fix sync/race issues in consensus, refactor accepted blocks sorting --- consensus/consensus.go | 82 ++++++++++++++++++++++++++++++------------ node/node.go | 3 +- 2 files changed, 61 insertions(+), 24 deletions(-) diff --git a/consensus/consensus.go b/consensus/consensus.go index 64a12e8..b099469 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -5,6 +5,8 @@ import ( "errors" "fmt" "math/big" + "sort" + "time" drand2 "github.com/Secured-Finance/dione/beacon/drand" @@ -99,6 +101,7 @@ func (pcm *PBFTConsensusManager) propose(blk *types3.Block) error { if err != nil { return err } + time.Sleep(1 * time.Second) // wait until all nodes will commit previous blocks pcm.psb.BroadcastToServiceTopic(prePrepareMsg) pcm.consensusRoundPool.AddConsensusInfo(blk) logrus.WithField("blockHash", fmt.Sprintf("%x", blk.Header.Hash)).Debugf("Entered into PREPREPARED state") @@ -178,8 +181,8 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) { if _, err := pcm.consensusRoundPool.GetConsensusInfo(cmsg.Blockhash); errors.Is(err, cache.ErrNotFound) { encodedHash := hex.EncodeToString(cmsg.Blockhash) - logrus.WithField("blockHash", encodedHash).Warn("received PREPARE for unknown block") - waitingCh := make(chan bool) + logrus.WithField("blockHash", encodedHash).Warn("Received PREPARE before PREPREPARED state, waiting...") + waitingCh := make(chan bool, 1) if _, ok := pcm.stateChangeChannels[encodedHash]; !ok { pcm.stateChangeChannels[encodedHash] = map[State][]chan bool{} } @@ -207,6 +210,14 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) { }).Debug("Received PREPARE message") if len(pcm.msgLog.Get(types.ConsensusMessageTypePrepare, cmsg.Blockhash)) >= pcm.minApprovals-1 { + ci, err := pcm.consensusRoundPool.GetConsensusInfo(cmsg.Blockhash) + if err != nil { + logrus.Fatalf("This should never happen: %s", err.Error()) + } + if ci.State >= StateStatusPrepared { + return + } + commitMsg, err := NewMessage(cmsg, types.ConsensusMessageTypeCommit, pcm.privKey) if err != nil { logrus.Errorf("failed to create commit message: %v", err) @@ -247,15 +258,14 @@ func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.PubSubMessage) { ci, err := pcm.consensusRoundPool.GetConsensusInfo(cmsg.Blockhash) - if errors.Is(err, cache.ErrNotFound) { - logrus.WithField("blockHash", hex.EncodeToString(cmsg.Blockhash)).Warnf("received COMMIT for unknown block") - return - } + encodedHash := hex.EncodeToString(cmsg.Blockhash) - if ci.State < StateStatusPrepared { - encodedHash := hex.EncodeToString(cmsg.Blockhash) - logrus.WithField("blockHash", encodedHash).Warnf("incorrect state of block consensus") - waitingCh := make(chan bool) + if errors.Is(err, cache.ErrNotFound) || ci.State < StateStatusPrepared { + logrus.WithFields(logrus.Fields{ + "blockHash": encodedHash, + "from": cmsg.From, + }).Warnf("Received COMMIT message before PREPARED state, waiting...") + waitingCh := make(chan bool, 1) if _, ok := pcm.stateChangeChannels[encodedHash]; !ok { pcm.stateChangeChannels[encodedHash] = map[State][]chan bool{} } @@ -283,6 +293,14 @@ func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.PubSubMessage) { }).Debug("Received COMMIT message") if len(pcm.msgLog.Get(types.ConsensusMessageTypeCommit, cmsg.Blockhash)) >= pcm.minApprovals { + ci, err := pcm.consensusRoundPool.GetConsensusInfo(cmsg.Blockhash) + if err != nil { + logrus.Fatalf("This should never happen: %s", err.Error()) + } + if ci.State == StateStatusCommited { + return + } + logrus.WithField("blockHash", fmt.Sprintf("%x", cmsg.Blockhash)).Debugf("Entered into COMMIT state") pcm.consensusRoundPool.UpdateConsensusState(cmsg.Blockhash, StateStatusCommited) } @@ -395,24 +413,44 @@ func (pcm *PBFTConsensusManager) commitAcceptedBlocks() (*types3.Block, error) { if blocks == nil { return nil, ErrNoAcceptedBlocks } - var maxStake *big.Int - var maxWinCount int64 = -1 var selectedBlock *types3.Block - for _, v := range blocks { - stake, err := pcm.ethereumClient.GetMinerStake(v.Block.Header.ProposerEth) + + sort.Slice(blocks, func(i, j int) bool { + iStake, err := pcm.ethereumClient.GetMinerStake(blocks[i].Block.Header.ProposerEth) if err != nil { - return nil, err + logrus.Error(err) + return false } - if maxStake != nil && maxWinCount != -1 { - if stake.Cmp(maxStake) == -1 || v.Block.Header.ElectionProof.WinCount < maxWinCount { - continue - } + jStake, err := pcm.ethereumClient.GetMinerStake(blocks[j].Block.Header.ProposerEth) + if err != nil { + logrus.Error(err) + return false } - maxStake = stake - maxWinCount = v.Block.Header.ElectionProof.WinCount - selectedBlock = v.Block + + if iStake.Cmp(jStake) == -1 { + return false + } + if iStake.Cmp(jStake) == 1 { + return true + } + + return blocks[i].Block.Header.ElectionProof.WinCount > blocks[i].Block.Header.ElectionProof.WinCount + }) + + selectedBlock = blocks[0].Block + + stake, err := pcm.ethereumClient.GetMinerStake(selectedBlock.Header.ProposerEth) + if err != nil { + logrus.Error(err) + return nil, err } + + logrus.WithFields(logrus.Fields{ + "winCount": selectedBlock.Header.ElectionProof.WinCount, + "proposerStake": stake.String(), + }).Debug("Selected the block with maximal win count and proposer's stake.") + logrus.WithFields(logrus.Fields{ "hash": hex.EncodeToString(selectedBlock.Header.Hash), "height": selectedBlock.Header.Height, diff --git a/node/node.go b/node/node.go index b1b847b..ec72329 100644 --- a/node/node.go +++ b/node/node.go @@ -52,7 +52,6 @@ func runNode( pubSubRouter *pubsub.PubSubRouter, disputeManager *consensus.DisputeManager, db *drand2.DrandBeacon, - bc *blockchain.BlockChain, ) { lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { @@ -221,6 +220,6 @@ func Start() { configureMiner, runNode, ), - //fx.NopLogger, + fx.NopLogger, ).Run() }