package consensus import ( "errors" "math/big" "sync" "github.com/asaskevich/EventBus" "github.com/Secured-Finance/dione/blockchain" "github.com/drand/drand/client" "github.com/Arceliar/phony" types3 "github.com/Secured-Finance/dione/blockchain/types" "github.com/libp2p/go-libp2p-core/crypto" "github.com/Secured-Finance/dione/blockchain/pool" "github.com/Secured-Finance/dione/consensus/types" "github.com/Secured-Finance/dione/ethclient" "github.com/sirupsen/logrus" "github.com/Secured-Finance/dione/pubsub" ) type StateStatus uint8 const ( StateStatusUnknown = iota StateStatusPrePrepared StateStatusPrepared StateStatusCommited ) type PBFTConsensusManager struct { phony.Inbox bus EventBus.Bus psb *pubsub.PubSubRouter minApprovals int // FIXME privKey crypto.PrivKey msgLog *ConsensusMessageLog validator *ConsensusValidator ethereumClient *ethclient.EthereumClient miner *Miner blockPool pool.BlockPool blockchain blockchain.BlockChain state *State } type State struct { mutex sync.Mutex drandRound uint64 randomness []byte blockHeight uint64 status StateStatus ready chan bool } func NewPBFTConsensusManager(bus EventBus.Bus, psb *pubsub.PubSubRouter, minApprovals int, privKey crypto.PrivKey, ethereumClient *ethclient.EthereumClient, miner *Miner, bc *blockchain.BlockChain) *PBFTConsensusManager { pcm := &PBFTConsensusManager{} pcm.psb = psb pcm.miner = miner pcm.validator = NewConsensusValidator(miner, bc) pcm.msgLog = NewConsensusMessageLog() pcm.minApprovals = minApprovals pcm.privKey = privKey pcm.ethereumClient = ethereumClient pcm.state = &State{ ready: make(chan bool, 1), status: StateStatusUnknown, } pcm.bus = bus pcm.psb.Hook(pubsub.PrePrepareMessageType, pcm.handlePrePrepare, types.PrePrepareMessage{}) pcm.psb.Hook(pubsub.PrepareMessageType, pcm.handlePrepare, types.PrepareMessage{}) pcm.psb.Hook(pubsub.CommitMessageType, pcm.handleCommit, types.CommitMessage{}) bus.SubscribeOnce("sync:initialSyncCompleted", func() { pcm.state.ready <- true }) return pcm } func (pcm *PBFTConsensusManager) propose(blk *types3.Block) error { pcm.state.mutex.Lock() defer pcm.state.mutex.Unlock() prePrepareMsg, err := NewMessage(types.ConsensusMessage{Block: blk}, types.ConsensusMessageTypePrePrepare, pcm.privKey) if err != nil { return err } pcm.psb.BroadcastToServiceTopic(prePrepareMsg) pcm.state.status = StateStatusPrePrepared return nil } func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.GenericMessage) { pcm.state.mutex.Lock() defer pcm.state.mutex.Unlock() prePrepare, ok := message.Payload.(types.PrePrepareMessage) if !ok { logrus.Warn("failed to convert payload to PrePrepare message") return } if prePrepare.Block.Header.Proposer == pcm.miner.address { return } cmsg := types.ConsensusMessage{ Type: types.ConsensusMessageTypePrePrepare, From: message.From, Block: prePrepare.Block, } <-pcm.state.ready if pcm.msgLog.Exists(cmsg) { logrus.Debugf("received existing pre_prepare msg, dropping...") return } if !pcm.validator.Valid(cmsg, map[string]interface{}{"randomness": pcm.state.randomness}) { logrus.Warn("received invalid pre_prepare msg, dropping...") return } pcm.msgLog.AddMessage(cmsg) pcm.blockPool.AddBlock(cmsg.Block) prepareMsg, err := NewMessage(cmsg, types.ConsensusMessageTypePrepare, pcm.privKey) if err != nil { logrus.Errorf("failed to create prepare message: %v", err) return } pcm.psb.BroadcastToServiceTopic(prepareMsg) pcm.state.status = StateStatusPrePrepared } func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.GenericMessage) { pcm.state.mutex.Lock() defer pcm.state.mutex.Unlock() prepare, ok := message.Payload.(types.PrepareMessage) if !ok { logrus.Warn("failed to convert payload to Prepare message") return } cmsg := types.ConsensusMessage{ Type: types.ConsensusMessageTypePrepare, From: message.From, Blockhash: prepare.Blockhash, Signature: prepare.Signature, // TODO check the signature } pk, _ := message.From.ExtractPublicKey() ok, err := pk.Verify(cmsg.Blockhash, cmsg.Signature) if err != nil { logrus.Warnf("Failed to verify PREPARE message signature: %s", err.Error()) return } if !ok { logrus.Errorf("Signature of PREPARE message of peer %s isn't valid!", cmsg.From) return } if pcm.msgLog.Exists(cmsg) { logrus.Debugf("received existing prepare msg, dropping...") return } if !pcm.validator.Valid(cmsg, nil) { logrus.Warn("received invalid prepare msg, dropping...") return } pcm.msgLog.AddMessage(cmsg) if len(pcm.msgLog.Get(types.ConsensusMessageTypePrepare, cmsg.Blockhash)) >= pcm.minApprovals { commitMsg, err := NewMessage(cmsg, types.ConsensusMessageTypeCommit, pcm.privKey) if err != nil { logrus.Errorf("failed to create commit message: %v", err) } pcm.psb.BroadcastToServiceTopic(commitMsg) pcm.state.status = StateStatusPrepared } } func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.GenericMessage) { pcm.state.mutex.Lock() defer pcm.state.mutex.Unlock() commit, ok := message.Payload.(types.CommitMessage) if !ok { logrus.Warn("failed to convert payload to Prepare message") return } cmsg := types.ConsensusMessage{ Type: types.ConsensusMessageTypeCommit, From: message.From, Blockhash: commit.Blockhash, Signature: commit.Signature, // TODO check the signature } pk, _ := message.From.ExtractPublicKey() ok, err := pk.Verify(cmsg.Blockhash, cmsg.Signature) if err != nil { logrus.Warnf("Failed to verify COMMIT message signature: %s", err.Error()) return } if !ok { logrus.Errorf("Signature of COMMIT message of peer %s isn't valid!", cmsg.From) return } if pcm.msgLog.Exists(cmsg) { logrus.Debugf("received existing commit msg, dropping...") return } if !pcm.validator.Valid(cmsg, nil) { logrus.Warn("received invalid commit msg, dropping...") return } pcm.msgLog.AddMessage(cmsg) if len(pcm.msgLog.Get(types.ConsensusMessageTypeCommit, cmsg.Blockhash)) >= pcm.minApprovals { block, err := pcm.blockPool.GetBlock(cmsg.Blockhash) if err != nil { logrus.Debug(err) return } pcm.blockPool.AddAcceptedBlock(block) pcm.state.status = StateStatusCommited } } func (pcm *PBFTConsensusManager) NewDrandRound(from phony.Actor, res client.Result) { pcm.Act(from, func() { pcm.state.mutex.Lock() defer pcm.state.mutex.Unlock() block, err := pcm.commitAcceptedBlocks() if err != nil { logrus.Errorf("Failed to select the block in consensus round %d: %s", pcm.state.blockHeight, err.Error()) return } minedBlock, err := pcm.miner.MineBlock(res.Randomness(), block.Header) if err != nil { logrus.Errorf("Failed to mine the block: %s", err.Error()) return } pcm.state.drandRound = res.Round() pcm.state.randomness = res.Randomness() pcm.state.blockHeight = pcm.state.blockHeight + 1 // if we are round winner if minedBlock != nil { err = pcm.propose(minedBlock) if err != nil { logrus.Errorf("Failed to propose the block: %s", err.Error()) return } } }) } func (pcm *PBFTConsensusManager) commitAcceptedBlocks() (*types3.Block, error) { blocks := pcm.blockPool.GetAllAcceptedBlocks() if blocks == nil { return nil, errors.New("there is no accepted blocks") } var maxStake *big.Int var selectedBlock *types3.Block for _, v := range blocks { stake, err := pcm.ethereumClient.GetMinerStake(v.Header.ProposerEth) if err != nil { return nil, err } if maxStake != nil { if stake.Cmp(maxStake) == -1 { continue } } maxStake = stake selectedBlock = v } logrus.Debugf("Selected block of miner %s", selectedBlock.Header.ProposerEth.Hex()) pcm.blockPool.PruneAcceptedBlocks() return selectedBlock, pcm.blockchain.StoreBlock(selectedBlock) }