FINALLY fix consensus desync issue!!! WOOHOOOOOOOO !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

This commit is contained in:
ChronosX88 2021-07-30 02:04:27 +03:00
parent 445fbf44f0
commit 65afa056bb
Signed by: ChronosXYZ
GPG Key ID: 085A69A82C8C511A
7 changed files with 180 additions and 226 deletions

View File

@ -14,8 +14,6 @@ import (
"github.com/fxamacker/cbor/v2" "github.com/fxamacker/cbor/v2"
"github.com/Secured-Finance/dione/cache"
"github.com/asaskevich/EventBus" "github.com/asaskevich/EventBus"
"github.com/Secured-Finance/dione/blockchain" "github.com/Secured-Finance/dione/blockchain"
@ -42,13 +40,11 @@ var (
type PBFTConsensusManager struct { type PBFTConsensusManager struct {
bus EventBus.Bus bus EventBus.Bus
psb *pubsub.PubSubRouter psb *pubsub.PubSubRouter
minApprovals int // FIXME
privKey crypto.PrivKey privKey crypto.PrivKey
msgLog *ConsensusMessageLog
validator *ConsensusValidator validator *ConsensusValidator
ethereumClient *ethclient.EthereumClient ethereumClient *ethclient.EthereumClient
miner *blockchain.Miner miner *blockchain.Miner
consensusRoundPool *ConsensusRoundPool consensusRoundPool *ConsensusStatePool
mempool *pool.Mempool mempool *pool.Mempool
blockchain *blockchain.BlockChain blockchain *blockchain.BlockChain
address peer.ID address peer.ID
@ -58,12 +54,11 @@ type PBFTConsensusManager struct {
func NewPBFTConsensusManager( func NewPBFTConsensusManager(
bus EventBus.Bus, bus EventBus.Bus,
psb *pubsub.PubSubRouter, psb *pubsub.PubSubRouter,
minApprovals int,
privKey crypto.PrivKey, privKey crypto.PrivKey,
ethereumClient *ethclient.EthereumClient, ethereumClient *ethclient.EthereumClient,
miner *blockchain.Miner, miner *blockchain.Miner,
bc *blockchain.BlockChain, bc *blockchain.BlockChain,
bp *ConsensusRoundPool, bp *ConsensusStatePool,
db *drand2.DrandBeacon, db *drand2.DrandBeacon,
mempool *pool.Mempool, mempool *pool.Mempool,
address peer.ID, address peer.ID,
@ -72,8 +67,6 @@ func NewPBFTConsensusManager(
psb: psb, psb: psb,
miner: miner, miner: miner,
validator: NewConsensusValidator(miner, bc, db), validator: NewConsensusValidator(miner, bc, db),
msgLog: NewConsensusMessageLog(),
minApprovals: minApprovals,
privKey: privKey, privKey: privKey,
ethereumClient: ethereumClient, ethereumClient: ethereumClient,
bus: bus, bus: bus,
@ -84,26 +77,85 @@ func NewPBFTConsensusManager(
stateChangeChannels: map[string]map[State][]chan bool{}, stateChangeChannels: map[string]map[State][]chan bool{},
} }
return pcm
}
func (pcm *PBFTConsensusManager) Run() {
pcm.psb.Hook(pubsub.PrePrepareMessageType, pcm.handlePrePrepare) pcm.psb.Hook(pubsub.PrePrepareMessageType, pcm.handlePrePrepare)
pcm.psb.Hook(pubsub.PrepareMessageType, pcm.handlePrepare) pcm.psb.Hook(pubsub.PrepareMessageType, pcm.handlePrepare)
pcm.psb.Hook(pubsub.CommitMessageType, pcm.handleCommit) pcm.psb.Hook(pubsub.CommitMessageType, pcm.handleCommit)
pcm.bus.SubscribeAsync("beacon:newEntry", func(entry types2.BeaconEntry) { pcm.bus.SubscribeAsync("beacon:newEntry", func(entry types2.BeaconEntry) {
pcm.onNewBeaconEntry(entry) pcm.onNewBeaconEntry(entry)
}, true) }, true)
pcm.bus.SubscribeAsync("consensus:newState", func(block *types3.Block, newStateNumber int) {
newState := State(newStateNumber) // hacky, because reflection panics if we pass int to a handler which has type-alias for int
consensusMessageType := types.ConsensusMessageTypeUnknown
switch newState {
case StateStatusPrePrepared:
{
logrus.WithField("blockHash", fmt.Sprintf("%x", block.Header.Hash)).Debugf("Entered into PREPREPARED state")
if *block.Header.Proposer == pcm.address {
return
}
consensusMessageType = types.ConsensusMessageTypePrepare
break
}
case StateStatusPrepared:
{
consensusMessageType = types.ConsensusMessageTypeCommit
logrus.WithField("blockHash", fmt.Sprintf("%x", block.Header.Hash)).Debugf("Entered into PREPARED state")
break
}
case StateStatusCommited:
{
logrus.WithField("blockHash", fmt.Sprintf("%x", block.Header.Hash)).Debugf("Entered into COMMITTED state")
break
}
}
if consensusMessageType == types.ConsensusMessageTypeUnknown {
return
}
message, err := NewMessage(&types.ConsensusMessage{
Type: consensusMessageType,
Blockhash: block.Header.Hash,
}, pcm.privKey)
if err != nil {
logrus.Errorf("Failed to create consensus message: %v", err)
return
}
if err = pcm.psb.BroadcastToServiceTopic(message); err != nil {
logrus.Errorf("Failed to send consensus message: %s", err.Error())
return
}
}, true)
return pcm
} }
func (pcm *PBFTConsensusManager) propose(blk *types3.Block) error { func (pcm *PBFTConsensusManager) propose(blk *types3.Block) error {
prePrepareMsg, err := NewMessage(types.ConsensusMessage{Block: blk}, types.ConsensusMessageTypePrePrepare, pcm.privKey) cmsg := &types.ConsensusMessage{
Type: StateStatusPrePrepared,
Block: blk,
Blockhash: blk.Header.Hash,
From: pcm.address,
}
prePrepareMsg, err := NewMessage(cmsg, pcm.privKey)
if err != nil { if err != nil {
return err return err
} }
time.Sleep(1 * time.Second) // wait until all nodes will commit previous blocks time.Sleep(1 * time.Second) // wait until all nodes will commit previous blocks
pcm.psb.BroadcastToServiceTopic(prePrepareMsg)
pcm.consensusRoundPool.AddConsensusInfo(blk) if err = pcm.consensusRoundPool.InsertMessageIntoLog(cmsg); err != nil {
return err
}
if err = pcm.psb.BroadcastToServiceTopic(prePrepareMsg); err != nil {
return err
}
logrus.WithField("blockHash", fmt.Sprintf("%x", blk.Header.Hash)).Debugf("Entered into PREPREPARED state") logrus.WithField("blockHash", fmt.Sprintf("%x", blk.Header.Hash)).Debugf("Entered into PREPREPARED state")
return nil return nil
} }
@ -120,48 +172,28 @@ func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.PubSubMessage)
return return
} }
cmsg := types.ConsensusMessage{ cmsg := &types.ConsensusMessage{
Type: types.ConsensusMessageTypePrePrepare, Type: types.ConsensusMessageTypePrePrepare,
From: message.From, From: message.From,
Block: prePrepare.Block, Block: prePrepare.Block,
Blockhash: prePrepare.Block.Header.Hash, Blockhash: prePrepare.Block.Header.Hash,
} }
if pcm.msgLog.Exists(cmsg) {
logrus.Tracef("received existing pre_prepare msg for block %x", cmsg.Block.Header.Hash)
return
}
if !pcm.validator.Valid(cmsg) { if !pcm.validator.Valid(cmsg) {
logrus.Warnf("received invalid pre_prepare msg for block %x", cmsg.Block.Header.Hash) logrus.WithField("blockHash", hex.EncodeToString(cmsg.Block.Header.Hash)).Warn("Received invalid PREPREPARE for block")
return
}
err = pcm.consensusRoundPool.InsertMessageIntoLog(cmsg)
if err != nil {
logrus.WithField("err", err.Error()).Warn("Failed to add PREPARE message to log")
return return
} }
pcm.msgLog.AddMessage(cmsg)
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"blockHash": fmt.Sprintf("%x", cmsg.Block.Header.Hash), "blockHash": fmt.Sprintf("%x", cmsg.Block.Header.Hash),
"from": message.From.String(), "from": message.From.String(),
}).Debug("Received PREPREPARE message") }).Debug("Received PREPREPARE message")
pcm.consensusRoundPool.AddConsensusInfo(cmsg.Block)
encodedHash := hex.EncodeToString(cmsg.Blockhash)
if m, ok := pcm.stateChangeChannels[encodedHash]; ok {
if channels, ok := m[StateStatusPrePrepared]; ok {
for _, v := range channels {
v <- true
close(v)
delete(pcm.stateChangeChannels, encodedHash)
}
}
}
prepareMsg, err := NewMessage(cmsg, types.ConsensusMessageTypePrepare, pcm.privKey)
if err != nil {
logrus.Errorf("failed to create prepare message: %v", err)
return
}
logrus.WithField("blockHash", fmt.Sprintf("%x", prePrepare.Block.Header.Hash)).Debugf("Entered into PREPREPARED state")
pcm.psb.BroadcastToServiceTopic(prepareMsg)
} }
func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) { func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) {
@ -172,73 +204,28 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) {
return return
} }
cmsg := types.ConsensusMessage{ cmsg := &types.ConsensusMessage{
Type: types.ConsensusMessageTypePrepare, Type: types.ConsensusMessageTypePrepare,
From: message.From, From: message.From,
Blockhash: prepare.Blockhash, Blockhash: prepare.Blockhash,
Signature: prepare.Signature, Signature: prepare.Signature,
} }
if _, err := pcm.consensusRoundPool.GetConsensusInfo(cmsg.Blockhash); errors.Is(err, cache.ErrNotFound) {
encodedHash := hex.EncodeToString(cmsg.Blockhash)
logrus.WithField("blockHash", encodedHash).Warn("Received PREPARE before PREPREPARED state, waiting...")
waitingCh := make(chan bool, 1)
if _, ok := pcm.stateChangeChannels[encodedHash]; !ok {
pcm.stateChangeChannels[encodedHash] = map[State][]chan bool{}
}
pcm.stateChangeChannels[encodedHash][StateStatusPrePrepared] = append(pcm.stateChangeChannels[encodedHash][StateStatusPrePrepared], waitingCh)
result := <-waitingCh
if !result {
return
}
}
if pcm.msgLog.Exists(cmsg) {
logrus.Tracef("received existing prepare msg for block %x", cmsg.Blockhash)
return
}
if !pcm.validator.Valid(cmsg) { if !pcm.validator.Valid(cmsg) {
logrus.Warnf("received invalid prepare msg for block %x", cmsg.Blockhash) logrus.Warnf("received invalid prepare msg for block %x", cmsg.Blockhash)
return return
} }
pcm.msgLog.AddMessage(cmsg) err = pcm.consensusRoundPool.InsertMessageIntoLog(cmsg)
if err != nil {
logrus.WithField("err", err.Error()).Warn("Failed to add PREPARE message to log")
return
}
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"blockHash": fmt.Sprintf("%x", cmsg.Blockhash), "blockHash": fmt.Sprintf("%x", cmsg.Blockhash),
"from": message.From.String(), "from": message.From.String(),
}).Debug("Received PREPARE message") }).Debug("Received PREPARE message")
if len(pcm.msgLog.Get(types.ConsensusMessageTypePrepare, cmsg.Blockhash)) >= pcm.minApprovals-1 {
ci, err := pcm.consensusRoundPool.GetConsensusInfo(cmsg.Blockhash)
if err != nil {
logrus.Fatalf("This should never happen: %s", err.Error())
}
if ci.State >= StateStatusPrepared {
return
}
commitMsg, err := NewMessage(cmsg, types.ConsensusMessageTypeCommit, pcm.privKey)
if err != nil {
logrus.Errorf("failed to create commit message: %v", err)
return
}
pcm.psb.BroadcastToServiceTopic(commitMsg)
logrus.WithField("blockHash", fmt.Sprintf("%x", cmsg.Blockhash)).Debugf("Entered into PREPARED state")
pcm.consensusRoundPool.UpdateConsensusState(cmsg.Blockhash, StateStatusPrepared)
// pull watchers
encodedHash := hex.EncodeToString(cmsg.Blockhash)
if m, ok := pcm.stateChangeChannels[encodedHash]; ok {
if channels, ok := m[StateStatusPrepared]; ok {
for _, v := range channels {
v <- true
close(v)
delete(pcm.stateChangeChannels, encodedHash)
}
}
}
}
} }
func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.PubSubMessage) { func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.PubSubMessage) {
@ -249,61 +236,28 @@ func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.PubSubMessage) {
return return
} }
cmsg := types.ConsensusMessage{ cmsg := &types.ConsensusMessage{
Type: types.ConsensusMessageTypeCommit, Type: types.ConsensusMessageTypeCommit,
From: message.From, From: message.From,
Blockhash: commit.Blockhash, Blockhash: commit.Blockhash,
Signature: commit.Signature, Signature: commit.Signature,
} }
ci, err := pcm.consensusRoundPool.GetConsensusInfo(cmsg.Blockhash)
encodedHash := hex.EncodeToString(cmsg.Blockhash)
if errors.Is(err, cache.ErrNotFound) || ci.State < StateStatusPrepared {
logrus.WithFields(logrus.Fields{
"blockHash": encodedHash,
"from": cmsg.From,
}).Warnf("Received COMMIT message before PREPARED state, waiting...")
waitingCh := make(chan bool, 1)
if _, ok := pcm.stateChangeChannels[encodedHash]; !ok {
pcm.stateChangeChannels[encodedHash] = map[State][]chan bool{}
}
pcm.stateChangeChannels[encodedHash][StateStatusPrepared] = append(pcm.stateChangeChannels[encodedHash][StateStatusPrepared], waitingCh)
result := <-waitingCh
if !result {
return
}
}
if pcm.msgLog.Exists(cmsg) {
logrus.Tracef("received existing commit msg for block %x", cmsg.Blockhash)
return
}
if !pcm.validator.Valid(cmsg) { if !pcm.validator.Valid(cmsg) {
logrus.Warnf("received invalid commit msg for block %x", cmsg.Blockhash) logrus.Warnf("received invalid commit msg for block %x", cmsg.Blockhash)
return return
} }
pcm.msgLog.AddMessage(cmsg) err = pcm.consensusRoundPool.InsertMessageIntoLog(cmsg)
if err != nil {
logrus.WithField("err", err.Error()).Warn("Failed to add COMMIT message to log")
return
}
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"blockHash": fmt.Sprintf("%x", cmsg.Blockhash), "blockHash": fmt.Sprintf("%x", cmsg.Blockhash),
"from": message.From.String(), "from": message.From.String(),
}).Debug("Received COMMIT message") }).Debug("Received COMMIT message")
if len(pcm.msgLog.Get(types.ConsensusMessageTypeCommit, cmsg.Blockhash)) >= pcm.minApprovals {
ci, err := pcm.consensusRoundPool.GetConsensusInfo(cmsg.Blockhash)
if err != nil {
logrus.Fatalf("This should never happen: %s", err.Error())
}
if ci.State == StateStatusCommited {
return
}
logrus.WithField("blockHash", fmt.Sprintf("%x", cmsg.Blockhash)).Debugf("Entered into COMMIT state")
pcm.consensusRoundPool.UpdateConsensusState(cmsg.Blockhash, StateStatusCommited)
}
} }
func (pcm *PBFTConsensusManager) onNewBeaconEntry(entry types2.BeaconEntry) { func (pcm *PBFTConsensusManager) onNewBeaconEntry(entry types2.BeaconEntry) {

View File

@ -1,10 +1,11 @@
package consensus package consensus
import ( import (
"bytes"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"time" "sync"
types2 "github.com/Secured-Finance/dione/consensus/types"
"github.com/Secured-Finance/dione/blockchain/pool" "github.com/Secured-Finance/dione/blockchain/pool"
@ -13,7 +14,6 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/Secured-Finance/dione/blockchain/types" "github.com/Secured-Finance/dione/blockchain/types"
"github.com/Secured-Finance/dione/cache"
) )
type State uint8 type State uint8
@ -26,94 +26,101 @@ const (
StateStatusCommited StateStatusCommited
) )
// ConsensusRoundPool is pool for blocks that isn't not validated or committed yet // ConsensusStatePool is pool for blocks that isn't not validated or committed yet
type ConsensusRoundPool struct { type ConsensusStatePool struct {
mempool *pool.Mempool mempool *pool.Mempool
consensusInfoStorage cache.Cache consensusInfoMap map[string]*ConsensusInfo
mapMutex sync.Mutex
bus EventBus.Bus bus EventBus.Bus
minApprovals int // FIXME
} }
func NewConsensusRoundPool(mp *pool.Mempool, bus EventBus.Bus) (*ConsensusRoundPool, error) { func NewConsensusRoundPool(mp *pool.Mempool, bus EventBus.Bus, minApprovals int) (*ConsensusStatePool, error) {
bp := &ConsensusRoundPool{ bp := &ConsensusStatePool{
consensusInfoStorage: cache.NewInMemoryCache(), consensusInfoMap: map[string]*ConsensusInfo{},
mempool: mp, mempool: mp,
bus: bus, bus: bus,
minApprovals: minApprovals,
} }
return bp, nil return bp, nil
} }
type ConsensusInfo struct { type ConsensusInfo struct {
Blockhash []byte
Block *types.Block Block *types.Block
State State State State
MessageLog *ConsensusMessageLog
} }
func (crp *ConsensusRoundPool) AddConsensusInfo(block *types.Block) error { func (crp *ConsensusStatePool) InsertMessageIntoLog(cmsg *types2.ConsensusMessage) error {
encodedHash := hex.EncodeToString(block.Header.Hash) crp.mapMutex.Lock()
defer crp.mapMutex.Unlock()
consensusInfo, ok := crp.consensusInfoMap[hex.EncodeToString(cmsg.Blockhash)]
if !ok {
consensusInfo = &ConsensusInfo{
Block: cmsg.Block,
Blockhash: cmsg.Blockhash,
State: StateStatusUnknown,
MessageLog: NewConsensusMessageLog(),
}
crp.consensusInfoMap[hex.EncodeToString(cmsg.Blockhash)] = consensusInfo
}
added := consensusInfo.MessageLog.AddMessage(cmsg)
if !added {
return fmt.Errorf("consensus message already exists in message log")
}
crp.maybeUpdateConsensusState(consensusInfo, cmsg)
if crp.consensusInfoStorage.Exists(encodedHash) {
return nil return nil
} }
err := crp.consensusInfoStorage.StoreWithTTL(encodedHash, &ConsensusInfo{ func (crp *ConsensusStatePool) maybeUpdateConsensusState(ci *ConsensusInfo, cmsg *types2.ConsensusMessage) {
Block: block, if ci.State == StateStatusUnknown && cmsg.Type == types2.ConsensusMessageTypePrePrepare && cmsg.Block != nil {
State: StateStatusPrePrepared, ci.Block = cmsg.Block
}, 10*time.Minute) logrus.WithField("hash", fmt.Sprintf("%x", cmsg.Block.Header.Hash)).Debug("New block discovered")
if err != nil { ci.State = StateStatusPrePrepared
return err crp.bus.Publish("consensus:newState", ci.Block, StateStatusPrePrepared)
}
logrus.WithField("hash", fmt.Sprintf("%x", block.Header.Hash)).Debug("New block discovered")
crp.bus.Publish("blockpool:knownBlockAdded", block)
return nil
} }
func (crp *ConsensusRoundPool) UpdateConsensusState(blockhash []byte, newState State) error { if len(ci.MessageLog.Get(types2.ConsensusMessageTypePrepare, cmsg.Blockhash)) >= crp.minApprovals-1 && ci.State == StateStatusPrePrepared { // FIXME approval across 2f nodes
encodedHash := hex.EncodeToString(blockhash) ci.State = StateStatusPrepared
crp.bus.Publish("consensus:newState", ci.Block, StateStatusPrepared)
var consensusInfo ConsensusInfo
err := crp.consensusInfoStorage.Get(encodedHash, &consensusInfo)
if err != nil {
return err
} }
if newState < consensusInfo.State { if len(ci.MessageLog.Get(types2.ConsensusMessageTypeCommit, cmsg.Blockhash)) >= crp.minApprovals && ci.State == StateStatusPrepared { // FIXME approval across 2f+1 nodes
return fmt.Errorf("attempt to set incorrect state") ci.State = StateStatusCommited
crp.bus.Publish("consensus:newState", ci.Block, StateStatusCommited)
} }
consensusInfo.State = newState
crp.bus.Publish("blockpool:newConsensusState", blockhash, newState)
return crp.consensusInfoStorage.StoreWithTTL(encodedHash, &consensusInfo, 10*time.Minute)
}
func (crp *ConsensusRoundPool) GetConsensusInfo(blockhash []byte) (*ConsensusInfo, error) {
var consensusInfo ConsensusInfo
err := crp.consensusInfoStorage.Get(hex.EncodeToString(blockhash), &consensusInfo)
return &consensusInfo, err
} }
// Prune cleans known blocks list. It is called when new consensus round starts. // Prune cleans known blocks list. It is called when new consensus round starts.
func (crp *ConsensusRoundPool) Prune() { func (crp *ConsensusStatePool) Prune() {
for k := range crp.consensusInfoStorage.Items() { for k := range crp.consensusInfoMap {
crp.consensusInfoStorage.Delete(k) delete(crp.consensusInfoMap, k)
} }
crp.bus.Publish("blockpool:pruned") crp.bus.Publish("blockpool:pruned")
} }
func (crp *ConsensusRoundPool) GetAllBlocksWithCommit() []*ConsensusInfo { func (crp *ConsensusStatePool) GetAllBlocksWithCommit() []*ConsensusInfo {
crp.mapMutex.Lock()
defer crp.mapMutex.Unlock()
var consensusInfos []*ConsensusInfo var consensusInfos []*ConsensusInfo
for _, v := range crp.consensusInfoStorage.Items() { for _, v := range crp.consensusInfoMap {
ci := v.(*ConsensusInfo) if v.State == StateStatusCommited {
if ci.State == StateStatusCommited { consensusInfos = append(consensusInfos, v)
consensusInfos = append(consensusInfos, ci)
} }
} }
return consensusInfos return consensusInfos
} }
func containsTx(s []*types.Transaction, e *types.Transaction) bool { //func containsTx(s []*types.Transaction, e *types.Transaction) bool {
for _, a := range s { // for _, a := range s {
if bytes.Equal(a.Hash, e.Hash) { // if bytes.Equal(a.Hash, e.Hash) {
return true // return true
} // }
} // }
return false // return false
} //}

View File

@ -12,7 +12,7 @@ import (
) )
type ConsensusValidator struct { type ConsensusValidator struct {
validationFuncMap map[types2.ConsensusMessageType]func(msg types2.ConsensusMessage) bool validationFuncMap map[types2.ConsensusMessageType]func(msg *types2.ConsensusMessage) bool
miner *blockchain.Miner miner *blockchain.Miner
beacon *drand2.DrandBeacon beacon *drand2.DrandBeacon
blockchain *blockchain.BlockChain blockchain *blockchain.BlockChain
@ -25,8 +25,8 @@ func NewConsensusValidator(miner *blockchain.Miner, bc *blockchain.BlockChain, d
beacon: db, beacon: db,
} }
cv.validationFuncMap = map[types2.ConsensusMessageType]func(msg types2.ConsensusMessage) bool{ cv.validationFuncMap = map[types2.ConsensusMessageType]func(msg *types2.ConsensusMessage) bool{
types2.ConsensusMessageTypePrePrepare: func(msg types2.ConsensusMessage) bool { types2.ConsensusMessageTypePrePrepare: func(msg *types2.ConsensusMessage) bool {
if err := cv.blockchain.ValidateBlock(msg.Block); err != nil { if err := cv.blockchain.ValidateBlock(msg.Block); err != nil {
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"blockHash": hex.EncodeToString(msg.Block.Header.Hash), "blockHash": hex.EncodeToString(msg.Block.Header.Hash),
@ -43,11 +43,11 @@ func NewConsensusValidator(miner *blockchain.Miner, bc *blockchain.BlockChain, d
return cv return cv
} }
func (cv *ConsensusValidator) Valid(msg types2.ConsensusMessage) bool { func (cv *ConsensusValidator) Valid(msg *types2.ConsensusMessage) bool {
return cv.validationFuncMap[msg.Type](msg) return cv.validationFuncMap[msg.Type](msg)
} }
func checkSignatureForBlockhash(msg types2.ConsensusMessage) bool { func checkSignatureForBlockhash(msg *types2.ConsensusMessage) bool {
pubKey, err := msg.From.ExtractPublicKey() pubKey, err := msg.From.ExtractPublicKey()
if err != nil { if err != nil {
// TODO logging // TODO logging

View File

@ -9,23 +9,21 @@ import (
type ConsensusMessageLog struct { type ConsensusMessageLog struct {
messages mapset.Set messages mapset.Set
maxLogSize int
} }
func NewConsensusMessageLog() *ConsensusMessageLog { func NewConsensusMessageLog() *ConsensusMessageLog {
msgLog := &ConsensusMessageLog{ msgLog := &ConsensusMessageLog{
messages: mapset.NewSet(), messages: mapset.NewSet(),
maxLogSize: 0, // TODO
} }
return msgLog return msgLog
} }
func (ml *ConsensusMessageLog) AddMessage(msg types2.ConsensusMessage) { func (ml *ConsensusMessageLog) AddMessage(msg *types2.ConsensusMessage) bool {
ml.messages.Add(msg) return ml.messages.Add(msg)
} }
func (ml *ConsensusMessageLog) Exists(msg types2.ConsensusMessage) bool { func (ml *ConsensusMessageLog) Exists(msg *types2.ConsensusMessage) bool {
return ml.messages.Contains(msg) return ml.messages.Contains(msg)
} }
@ -33,7 +31,7 @@ func (ml *ConsensusMessageLog) Get(typ types2.ConsensusMessageType, blockhash []
var result []*types2.ConsensusMessage var result []*types2.ConsensusMessage
for v := range ml.messages.Iter() { for v := range ml.messages.Iter() {
msg := v.(types2.ConsensusMessage) msg := v.(*types2.ConsensusMessage)
if msg.Block != nil { if msg.Block != nil {
} }
@ -45,7 +43,7 @@ func (ml *ConsensusMessageLog) Get(typ types2.ConsensusMessageType, blockhash []
msgBlockHash = msg.Blockhash msgBlockHash = msg.Blockhash
} }
if bytes.Compare(msgBlockHash, blockhash) == 0 { if bytes.Compare(msgBlockHash, blockhash) == 0 {
result = append(result, &msg) result = append(result, msg)
} }
} }
} }

View File

@ -12,9 +12,9 @@ import (
"github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/crypto"
) )
func NewMessage(cmsg types2.ConsensusMessage, typ types2.ConsensusMessageType, privKey crypto.PrivKey) (*pubsub.PubSubMessage, error) { func NewMessage(cmsg *types2.ConsensusMessage, privKey crypto.PrivKey) (*pubsub.PubSubMessage, error) {
var message pubsub.PubSubMessage var message pubsub.PubSubMessage
switch typ { switch cmsg.Type {
case types2.ConsensusMessageTypePrePrepare: case types2.ConsensusMessageTypePrePrepare:
{ {
message.Type = pubsub.PrePrepareMessageType message.Type = pubsub.PrePrepareMessageType
@ -36,7 +36,7 @@ func NewMessage(cmsg types2.ConsensusMessage, typ types2.ConsensusMessageType, p
return nil, fmt.Errorf("failed to create signature: %v", err) return nil, fmt.Errorf("failed to create signature: %v", err)
} }
pm := types2.PrepareMessage{ pm := types2.PrepareMessage{
Blockhash: cmsg.Block.Header.Hash, Blockhash: cmsg.Blockhash,
Signature: signature, Signature: signature,
} }
data, err := cbor.Marshal(pm) data, err := cbor.Marshal(pm)
@ -49,14 +49,14 @@ func NewMessage(cmsg types2.ConsensusMessage, typ types2.ConsensusMessageType, p
case types2.ConsensusMessageTypeCommit: case types2.ConsensusMessageTypeCommit:
{ {
message.Type = pubsub.CommitMessageType message.Type = pubsub.CommitMessageType
pm := types2.CommitMessage{
Blockhash: cmsg.Blockhash,
}
signature, err := privKey.Sign(cmsg.Blockhash) signature, err := privKey.Sign(cmsg.Blockhash)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create signature: %v", err) return nil, fmt.Errorf("failed to create signature: %v", err)
} }
pm.Signature = signature pm := types2.CommitMessage{
Blockhash: cmsg.Blockhash,
Signature: signature,
}
data, err := cbor.Marshal(pm) data, err := cbor.Marshal(pm)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to convert message to map: %s", err.Error()) return nil, fmt.Errorf("failed to convert message to map: %s", err.Error())

View File

@ -77,9 +77,6 @@ func runNode(
// Run blockchain sync manager // Run blockchain sync manager
syncManager.Run() syncManager.Run()
// Run consensus manager
consensusManager.Run()
// Run dispute manager // Run dispute manager
disputeManager.Run(context.TODO()) disputeManager.Run(context.TODO())

View File

@ -125,21 +125,19 @@ func providePubsubRouter(lhost host.Host, config *config.Config) *pubsub.PubSubR
func provideConsensusManager( func provideConsensusManager(
h host.Host, h host.Host,
cfg *config.Config,
bus EventBus.Bus, bus EventBus.Bus,
psb *pubsub.PubSubRouter, psb *pubsub.PubSubRouter,
miner *blockchain.Miner, miner *blockchain.Miner,
bc *blockchain.BlockChain, bc *blockchain.BlockChain,
ethClient *ethclient.EthereumClient, ethClient *ethclient.EthereumClient,
privateKey crypto.PrivKey, privateKey crypto.PrivKey,
bp *consensus.ConsensusRoundPool, bp *consensus.ConsensusStatePool,
db *drand2.DrandBeacon, db *drand2.DrandBeacon,
mp *pool.Mempool, mp *pool.Mempool,
) *consensus.PBFTConsensusManager { ) *consensus.PBFTConsensusManager {
c := consensus.NewPBFTConsensusManager( c := consensus.NewPBFTConsensusManager(
bus, bus,
psb, psb,
cfg.ConsensusMinApprovals,
privateKey, privateKey,
ethClient, ethClient,
miner, miner,
@ -265,12 +263,12 @@ func provideNetworkService(bp *blockchain.BlockChain, mp *pool.Mempool) *Network
return ns return ns
} }
func provideBlockPool(mp *pool.Mempool, bus EventBus.Bus) *consensus.ConsensusRoundPool { func provideBlockPool(mp *pool.Mempool, bus EventBus.Bus, config *config.Config) *consensus.ConsensusStatePool {
bp, err := consensus.NewConsensusRoundPool(mp, bus) bp, err := consensus.NewConsensusRoundPool(mp, bus, config.ConsensusMinApprovals)
if err != nil { if err != nil {
logrus.Fatalf("Failed to initialize blockpool: %s", err.Error()) logrus.Fatalf("Failed to initialize blockpool: %s", err.Error())
} }
logrus.Info("Blockpool has been successfully initialized!") logrus.Info("Consensus state pool has been successfully initialized!")
return bp return bp
} }