Fix sync/race issues in consensus, refactor accepted blocks sorting

This commit is contained in:
ChronosX88 2021-07-29 00:48:01 +03:00
parent cd671e6165
commit e8b220455e
Signed by: ChronosXYZ
GPG Key ID: 085A69A82C8C511A
2 changed files with 61 additions and 24 deletions

View File

@ -5,6 +5,8 @@ import (
"errors" "errors"
"fmt" "fmt"
"math/big" "math/big"
"sort"
"time"
drand2 "github.com/Secured-Finance/dione/beacon/drand" drand2 "github.com/Secured-Finance/dione/beacon/drand"
@ -99,6 +101,7 @@ func (pcm *PBFTConsensusManager) propose(blk *types3.Block) error {
if err != nil { if err != nil {
return err return err
} }
time.Sleep(1 * time.Second) // wait until all nodes will commit previous blocks
pcm.psb.BroadcastToServiceTopic(prePrepareMsg) pcm.psb.BroadcastToServiceTopic(prePrepareMsg)
pcm.consensusRoundPool.AddConsensusInfo(blk) pcm.consensusRoundPool.AddConsensusInfo(blk)
logrus.WithField("blockHash", fmt.Sprintf("%x", blk.Header.Hash)).Debugf("Entered into PREPREPARED state") logrus.WithField("blockHash", fmt.Sprintf("%x", blk.Header.Hash)).Debugf("Entered into PREPREPARED state")
@ -178,8 +181,8 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) {
if _, err := pcm.consensusRoundPool.GetConsensusInfo(cmsg.Blockhash); errors.Is(err, cache.ErrNotFound) { if _, err := pcm.consensusRoundPool.GetConsensusInfo(cmsg.Blockhash); errors.Is(err, cache.ErrNotFound) {
encodedHash := hex.EncodeToString(cmsg.Blockhash) encodedHash := hex.EncodeToString(cmsg.Blockhash)
logrus.WithField("blockHash", encodedHash).Warn("received PREPARE for unknown block") logrus.WithField("blockHash", encodedHash).Warn("Received PREPARE before PREPREPARED state, waiting...")
waitingCh := make(chan bool) waitingCh := make(chan bool, 1)
if _, ok := pcm.stateChangeChannels[encodedHash]; !ok { if _, ok := pcm.stateChangeChannels[encodedHash]; !ok {
pcm.stateChangeChannels[encodedHash] = map[State][]chan bool{} pcm.stateChangeChannels[encodedHash] = map[State][]chan bool{}
} }
@ -207,6 +210,14 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) {
}).Debug("Received PREPARE message") }).Debug("Received PREPARE message")
if len(pcm.msgLog.Get(types.ConsensusMessageTypePrepare, cmsg.Blockhash)) >= pcm.minApprovals-1 { if len(pcm.msgLog.Get(types.ConsensusMessageTypePrepare, cmsg.Blockhash)) >= pcm.minApprovals-1 {
ci, err := pcm.consensusRoundPool.GetConsensusInfo(cmsg.Blockhash)
if err != nil {
logrus.Fatalf("This should never happen: %s", err.Error())
}
if ci.State >= StateStatusPrepared {
return
}
commitMsg, err := NewMessage(cmsg, types.ConsensusMessageTypeCommit, pcm.privKey) commitMsg, err := NewMessage(cmsg, types.ConsensusMessageTypeCommit, pcm.privKey)
if err != nil { if err != nil {
logrus.Errorf("failed to create commit message: %v", err) logrus.Errorf("failed to create commit message: %v", err)
@ -247,15 +258,14 @@ func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.PubSubMessage) {
ci, err := pcm.consensusRoundPool.GetConsensusInfo(cmsg.Blockhash) ci, err := pcm.consensusRoundPool.GetConsensusInfo(cmsg.Blockhash)
if errors.Is(err, cache.ErrNotFound) {
logrus.WithField("blockHash", hex.EncodeToString(cmsg.Blockhash)).Warnf("received COMMIT for unknown block")
return
}
if ci.State < StateStatusPrepared {
encodedHash := hex.EncodeToString(cmsg.Blockhash) encodedHash := hex.EncodeToString(cmsg.Blockhash)
logrus.WithField("blockHash", encodedHash).Warnf("incorrect state of block consensus")
waitingCh := make(chan bool) if errors.Is(err, cache.ErrNotFound) || ci.State < StateStatusPrepared {
logrus.WithFields(logrus.Fields{
"blockHash": encodedHash,
"from": cmsg.From,
}).Warnf("Received COMMIT message before PREPARED state, waiting...")
waitingCh := make(chan bool, 1)
if _, ok := pcm.stateChangeChannels[encodedHash]; !ok { if _, ok := pcm.stateChangeChannels[encodedHash]; !ok {
pcm.stateChangeChannels[encodedHash] = map[State][]chan bool{} pcm.stateChangeChannels[encodedHash] = map[State][]chan bool{}
} }
@ -283,6 +293,14 @@ func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.PubSubMessage) {
}).Debug("Received COMMIT message") }).Debug("Received COMMIT message")
if len(pcm.msgLog.Get(types.ConsensusMessageTypeCommit, cmsg.Blockhash)) >= pcm.minApprovals { if len(pcm.msgLog.Get(types.ConsensusMessageTypeCommit, cmsg.Blockhash)) >= pcm.minApprovals {
ci, err := pcm.consensusRoundPool.GetConsensusInfo(cmsg.Blockhash)
if err != nil {
logrus.Fatalf("This should never happen: %s", err.Error())
}
if ci.State == StateStatusCommited {
return
}
logrus.WithField("blockHash", fmt.Sprintf("%x", cmsg.Blockhash)).Debugf("Entered into COMMIT state") logrus.WithField("blockHash", fmt.Sprintf("%x", cmsg.Blockhash)).Debugf("Entered into COMMIT state")
pcm.consensusRoundPool.UpdateConsensusState(cmsg.Blockhash, StateStatusCommited) pcm.consensusRoundPool.UpdateConsensusState(cmsg.Blockhash, StateStatusCommited)
} }
@ -395,24 +413,44 @@ func (pcm *PBFTConsensusManager) commitAcceptedBlocks() (*types3.Block, error) {
if blocks == nil { if blocks == nil {
return nil, ErrNoAcceptedBlocks return nil, ErrNoAcceptedBlocks
} }
var maxStake *big.Int
var maxWinCount int64 = -1
var selectedBlock *types3.Block var selectedBlock *types3.Block
for _, v := range blocks {
stake, err := pcm.ethereumClient.GetMinerStake(v.Block.Header.ProposerEth) sort.Slice(blocks, func(i, j int) bool {
iStake, err := pcm.ethereumClient.GetMinerStake(blocks[i].Block.Header.ProposerEth)
if err != nil { if err != nil {
logrus.Error(err)
return false
}
jStake, err := pcm.ethereumClient.GetMinerStake(blocks[j].Block.Header.ProposerEth)
if err != nil {
logrus.Error(err)
return false
}
if iStake.Cmp(jStake) == -1 {
return false
}
if iStake.Cmp(jStake) == 1 {
return true
}
return blocks[i].Block.Header.ElectionProof.WinCount > blocks[i].Block.Header.ElectionProof.WinCount
})
selectedBlock = blocks[0].Block
stake, err := pcm.ethereumClient.GetMinerStake(selectedBlock.Header.ProposerEth)
if err != nil {
logrus.Error(err)
return nil, err return nil, err
} }
if maxStake != nil && maxWinCount != -1 { logrus.WithFields(logrus.Fields{
if stake.Cmp(maxStake) == -1 || v.Block.Header.ElectionProof.WinCount < maxWinCount { "winCount": selectedBlock.Header.ElectionProof.WinCount,
continue "proposerStake": stake.String(),
} }).Debug("Selected the block with maximal win count and proposer's stake.")
}
maxStake = stake
maxWinCount = v.Block.Header.ElectionProof.WinCount
selectedBlock = v.Block
}
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"hash": hex.EncodeToString(selectedBlock.Header.Hash), "hash": hex.EncodeToString(selectedBlock.Header.Hash),
"height": selectedBlock.Header.Height, "height": selectedBlock.Header.Height,

View File

@ -52,7 +52,6 @@ func runNode(
pubSubRouter *pubsub.PubSubRouter, pubSubRouter *pubsub.PubSubRouter,
disputeManager *consensus.DisputeManager, disputeManager *consensus.DisputeManager,
db *drand2.DrandBeacon, db *drand2.DrandBeacon,
bc *blockchain.BlockChain,
) { ) {
lc.Append(fx.Hook{ lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error { OnStart: func(ctx context.Context) error {
@ -221,6 +220,6 @@ func Start() {
configureMiner, configureMiner,
runNode, runNode,
), ),
//fx.NopLogger, fx.NopLogger,
).Run() ).Run()
} }