From d179ffcd76c8d67f9a9bf9a0b387498427a26df2 Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Tue, 27 Jul 2021 00:18:16 +0300 Subject: [PATCH] Implement consensus state change watching to fix stage race conditions --- blockchain/blockchain.go | 4 - blockchain/miner.go | 69 +++++++--- blockchain/pool/blockpool.go | 101 --------------- blockchain/sync/sync_mgr.go | 11 +- cache/cache.go | 1 + cache/inmemory_cache.go | 5 + cache/redis_cache.go | 11 ++ config/win_config.go | 2 +- consensus/consensus.go | 205 +++++++++++++++--------------- consensus/consensus_round_pool.go | 119 +++++++++++++++++ node/node.go | 9 +- node/node_dep_providers.go | 30 +++-- types/electionproof.go | 2 +- 13 files changed, 322 insertions(+), 247 deletions(-) delete mode 100644 blockchain/pool/blockpool.go create mode 100644 consensus/consensus_round_pool.go diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index a634a5a..86dbc3d 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -181,10 +181,6 @@ func (bc *BlockChain) StoreBlock(block *types2.Block) error { if err = bc.setLatestBlockHeight(block.Header.Height); err != nil { return err } - } else if block.Header.Height > height { - if err = bc.setLatestBlockHeight(block.Header.Height); err != nil { - return err - } bc.bus.Publish("blockchain:latestBlockHeightUpdated", block) } bc.bus.Publish("blockchain:blockCommitted", block) diff --git a/blockchain/miner.go b/blockchain/miner.go index 46b63bb..3f5c97d 100644 --- a/blockchain/miner.go +++ b/blockchain/miner.go @@ -4,7 +4,10 @@ import ( "errors" "fmt" "math/big" - "sync" + + "github.com/libp2p/go-libp2p-core/host" + + "github.com/asaskevich/EventBus" "github.com/Secured-Finance/dione/beacon" "github.com/Secured-Finance/dione/types" @@ -27,34 +30,54 @@ var ( ) type Miner struct { - address peer.ID - ethAddress common.Address - mutex sync.Mutex - ethClient *ethclient.EthereumClient - minerStake *big.Int - networkStake *big.Int - privateKey crypto.PrivKey - mempool *pool.Mempool + bus EventBus.Bus + address peer.ID + ethClient *ethclient.EthereumClient + minerStake *big.Int + networkStake *big.Int + privateKey crypto.PrivKey + mempool *pool.Mempool + latestBlockHeader *types2.BlockHeader + blockchain *BlockChain } func NewMiner( - address peer.ID, - ethAddress common.Address, + h host.Host, ethClient *ethclient.EthereumClient, privateKey crypto.PrivKey, mempool *pool.Mempool, + bus EventBus.Bus, ) *Miner { - return &Miner{ - address: address, - ethAddress: ethAddress, + m := &Miner{ + address: h.ID(), ethClient: ethClient, privateKey: privateKey, mempool: mempool, + bus: bus, } + + return m +} + +func (m *Miner) SetBlockchainInstance(b *BlockChain) { + m.blockchain = b + + m.bus.SubscribeAsync("blockchain:latestBlockHeightUpdated", func(block *types2.Block) { + m.latestBlockHeader = block.Header + }, true) + + height, _ := m.blockchain.GetLatestBlockHeight() + header, err := m.blockchain.FetchBlockHeaderByHeight(height) + if err != nil { + logrus.WithField("err", err.Error()).Fatal("Failed to initialize miner subsystem") + } + m.latestBlockHeader = header + + logrus.Info("Mining subsystem has been initialized!") } func (m *Miner) UpdateCurrentStakeInfo() error { - mStake, err := m.ethClient.GetMinerStake(m.ethAddress) + mStake, err := m.ethClient.GetMinerStake(*m.ethClient.GetEthAddress()) if err != nil { logrus.Warn("Can't get miner stake", err) @@ -92,15 +115,19 @@ func (m *Miner) GetStakeInfo(miner common.Address) (*big.Int, *big.Int, error) { return mStake, nStake, nil } -func (m *Miner) MineBlock(randomness []byte, randomnessRound uint64, lastBlockHeader *types2.BlockHeader) (*types2.Block, error) { - logrus.WithField("height", lastBlockHeader.Height+1).Debug("Trying to mine new block...") +func (m *Miner) MineBlock(randomness []byte, randomnessRound uint64) (*types2.Block, error) { + if m.latestBlockHeader == nil { + return nil, fmt.Errorf("latest block header is null") + } + + logrus.WithField("height", m.latestBlockHeader.Height+1).Debug("Trying to mine new block...") if err := m.UpdateCurrentStakeInfo(); err != nil { return nil, fmt.Errorf("failed to update miner stake: %w", err) } winner, err := isRoundWinner( - lastBlockHeader.Height+1, + m.latestBlockHeader.Height+1, m.address, randomness, randomnessRound, @@ -113,16 +140,18 @@ func (m *Miner) MineBlock(randomness []byte, randomnessRound uint64, lastBlockHe } if winner == nil { - logrus.WithField("height", lastBlockHeader.Height+1).Debug("Block is not mined because we are not leader in consensus round") + logrus.WithField("height", m.latestBlockHeader.Height+1).Debug("Block is not mined because we are not leader in consensus round") return nil, nil } + logrus.WithField("height", m.latestBlockHeader.Height+1).Infof("We have been elected in the current consensus round") + txs := m.mempool.GetTransactionsForNewBlock() if txs == nil { return nil, ErrNoTxForBlock // skip new consensus round because there is no transaction for processing } - newBlock, err := types2.CreateBlock(lastBlockHeader, txs, m.ethAddress, m.privateKey, winner) + newBlock, err := types2.CreateBlock(m.latestBlockHeader, txs, *m.ethClient.GetEthAddress(), m.privateKey, winner) if err != nil { return nil, fmt.Errorf("failed to create new block: %w", err) } diff --git a/blockchain/pool/blockpool.go b/blockchain/pool/blockpool.go deleted file mode 100644 index e8882da..0000000 --- a/blockchain/pool/blockpool.go +++ /dev/null @@ -1,101 +0,0 @@ -package pool - -import ( - "bytes" - "encoding/hex" - "fmt" - "time" - - "github.com/asaskevich/EventBus" - - "github.com/sirupsen/logrus" - - "github.com/Secured-Finance/dione/blockchain/types" - "github.com/Secured-Finance/dione/cache" -) - -// BlockPool is pool for blocks that isn't not validated or committed yet -type BlockPool struct { - mempool *Mempool - knownBlocks cache.Cache - acceptedBlocks cache.Cache - bus EventBus.Bus -} - -func NewBlockPool(mp *Mempool, bus EventBus.Bus) (*BlockPool, error) { - bp := &BlockPool{ - acceptedBlocks: cache.NewInMemoryCache(), // here we need to use separate cache - knownBlocks: cache.NewInMemoryCache(), - mempool: mp, - bus: bus, - } - - return bp, nil -} - -func (bp *BlockPool) AddBlock(block *types.Block) error { - err := bp.knownBlocks.StoreWithTTL(hex.EncodeToString(block.Header.Hash), block, 10*time.Minute) - if err != nil { - return err - } - logrus.WithField("hash", fmt.Sprintf("%x", block.Header.Hash)).Debug("New block discovered") - bp.bus.Publish("blockpool:knownBlockAdded", block) - return nil -} - -func (bp *BlockPool) GetBlock(blockhash []byte) (*types.Block, error) { - var block types.Block - err := bp.knownBlocks.Get(hex.EncodeToString(blockhash), &block) - return &block, err -} - -// PruneBlocks cleans known blocks list. It is called when new consensus round starts. -func (bp *BlockPool) PruneBlocks() { - for k := range bp.knownBlocks.Items() { - bp.knownBlocks.Delete(k) - } - bp.bus.Publish("blockpool:pruned") -} - -func (bp *BlockPool) AddAcceptedBlock(block *types.Block) error { - err := bp.acceptedBlocks.Store(hex.EncodeToString(block.Header.Hash), block) - if err != nil { - return err - } - bp.bus.Publish("blockpool:acceptedBlockAdded", block) - return nil -} - -func (bp *BlockPool) GetAllAcceptedBlocks() []*types.Block { - var blocks []*types.Block - for _, v := range bp.acceptedBlocks.Items() { - blocks = append(blocks, v.(*types.Block)) - } - return blocks -} - -// PruneAcceptedBlocks cleans accepted blocks list. It is called when new consensus round starts. -func (bp *BlockPool) PruneAcceptedBlocks(committedBlock *types.Block) { - for k, v := range bp.acceptedBlocks.Items() { - block := v.(*types.Block) - for _, v := range block.Data { - if !containsTx(committedBlock.Data, v) { - v.MerkleProof = nil - err := bp.mempool.StoreTx(v) // return transactions back to mempool - if err != nil { - logrus.Error(err) - } - } - } - bp.acceptedBlocks.Delete(k) - } -} - -func containsTx(s []*types.Transaction, e *types.Transaction) bool { - for _, a := range s { - if bytes.Equal(a.Hash, e.Hash) { - return true - } - } - return false -} diff --git a/blockchain/sync/sync_mgr.go b/blockchain/sync/sync_mgr.go index da9ef4e..cab9b7f 100644 --- a/blockchain/sync/sync_mgr.go +++ b/blockchain/sync/sync_mgr.go @@ -98,21 +98,14 @@ func (sm *syncManager) doInitialBlockPoolSync() error { return nil } - ourLastHeight, err := sm.blockpool.GetLatestBlockHeight() - if err == blockchain.ErrLatestHeightNil { - gBlock := types2.GenesisBlock() - err = sm.blockpool.StoreBlock(gBlock) // commit genesis block - if err != nil { - return err - } - } + ourLastHeight, _ := sm.blockpool.GetLatestBlockHeight() if sm.bootstrapPeer == "" { return nil // FIXME } var reply wire.LastBlockHeightReply - err = sm.rpcClient.Call(sm.bootstrapPeer, "NetworkService", "LastBlockHeight", nil, &reply) + err := sm.rpcClient.Call(sm.bootstrapPeer, "NetworkService", "LastBlockHeight", nil, &reply) if err != nil { return err } diff --git a/cache/cache.go b/cache/cache.go index 5af78e3..96f37fe 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -13,4 +13,5 @@ type Cache interface { Get(key string, value interface{}) error Delete(key string) Items() map[string]interface{} + Exists(key string) bool } diff --git a/cache/inmemory_cache.go b/cache/inmemory_cache.go index 902180a..0acef75 100644 --- a/cache/inmemory_cache.go +++ b/cache/inmemory_cache.go @@ -62,3 +62,8 @@ func (imc *InMemoryCache) Items() map[string]interface{} { } return m } + +func (imc *InMemoryCache) Exists(key string) bool { + _, exists := imc.cache.Get(key) + return exists +} diff --git a/cache/redis_cache.go b/cache/redis_cache.go index 3f7ace3..9eacdab 100644 --- a/cache/redis_cache.go +++ b/cache/redis_cache.go @@ -85,3 +85,14 @@ func (rc *RedisCache) Delete(key string) { func (rc *RedisCache) Items() map[string]interface{} { return nil // TODO } + +func (rc *RedisCache) Exists(key string) bool { + res := rc.Client.Exists(context.TODO(), key) + if res.Err() != nil { + return false + } + if res.Val() == 0 { + return false + } + return true +} diff --git a/config/win_config.go b/config/win_config.go index 1cf9295..10a76c0 100644 --- a/config/win_config.go +++ b/config/win_config.go @@ -2,4 +2,4 @@ package config import "math/big" -var ExpectedLeadersPerEpoch = big.NewInt(1) +var ExpectedLeadersPerEpoch = big.NewInt(2) diff --git a/consensus/consensus.go b/consensus/consensus.go index 0e2909f..64a12e8 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "math/big" - "sync" drand2 "github.com/Secured-Finance/dione/beacon/drand" @@ -38,39 +37,20 @@ var ( ErrNoAcceptedBlocks = errors.New("there is no accepted blocks") ) -type StateStatus uint8 - -const ( - StateStatusUnknown = iota - - StateStatusPrePrepared - StateStatusPrepared - StateStatusCommited -) - type PBFTConsensusManager struct { - bus EventBus.Bus - psb *pubsub.PubSubRouter - minApprovals int // FIXME - privKey crypto.PrivKey - msgLog *ConsensusMessageLog - validator *ConsensusValidator - ethereumClient *ethclient.EthereumClient - miner *blockchain.Miner - blockPool *pool.BlockPool - mempool *pool.Mempool - blockchain *blockchain.BlockChain - state *State - address peer.ID -} - -type State struct { - mutex sync.Mutex - drandRound uint64 - randomness []byte - blockHeight uint64 - status StateStatus - ready bool + bus EventBus.Bus + psb *pubsub.PubSubRouter + minApprovals int // FIXME + privKey crypto.PrivKey + msgLog *ConsensusMessageLog + validator *ConsensusValidator + ethereumClient *ethclient.EthereumClient + miner *blockchain.Miner + consensusRoundPool *ConsensusRoundPool + mempool *pool.Mempool + blockchain *blockchain.BlockChain + address peer.ID + stateChangeChannels map[string]map[State][]chan bool } func NewPBFTConsensusManager( @@ -81,28 +61,25 @@ func NewPBFTConsensusManager( ethereumClient *ethclient.EthereumClient, miner *blockchain.Miner, bc *blockchain.BlockChain, - bp *pool.BlockPool, + bp *ConsensusRoundPool, db *drand2.DrandBeacon, mempool *pool.Mempool, address peer.ID, ) *PBFTConsensusManager { pcm := &PBFTConsensusManager{ - psb: psb, - miner: miner, - validator: NewConsensusValidator(miner, bc, db), - msgLog: NewConsensusMessageLog(), - minApprovals: minApprovals, - privKey: privKey, - ethereumClient: ethereumClient, - state: &State{ - ready: false, - status: StateStatusUnknown, - }, - bus: bus, - blockPool: bp, - mempool: mempool, - blockchain: bc, - address: address, + psb: psb, + miner: miner, + validator: NewConsensusValidator(miner, bc, db), + msgLog: NewConsensusMessageLog(), + minApprovals: minApprovals, + privKey: privKey, + ethereumClient: ethereumClient, + bus: bus, + consensusRoundPool: bp, + mempool: mempool, + blockchain: bc, + address: address, + stateChangeChannels: map[string]map[State][]chan bool{}, } return pcm @@ -115,8 +92,6 @@ func (pcm *PBFTConsensusManager) Run() { pcm.bus.SubscribeAsync("beacon:newEntry", func(entry types2.BeaconEntry) { pcm.onNewBeaconEntry(entry) }, true) - height, _ := pcm.blockchain.GetLatestBlockHeight() - pcm.state.blockHeight = height + 1 } func (pcm *PBFTConsensusManager) propose(blk *types3.Block) error { @@ -125,15 +100,12 @@ func (pcm *PBFTConsensusManager) propose(blk *types3.Block) error { return err } pcm.psb.BroadcastToServiceTopic(prePrepareMsg) - pcm.blockPool.AddBlock(blk) + pcm.consensusRoundPool.AddConsensusInfo(blk) logrus.WithField("blockHash", fmt.Sprintf("%x", blk.Header.Hash)).Debugf("Entered into PREPREPARED state") - pcm.state.status = StateStatusPrePrepared return nil } func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.PubSubMessage) { - //pcm.state.mutex.Lock() - //defer pcm.state.mutex.Unlock() var prePrepare types.PrePrepareMessage err := cbor.Unmarshal(message.Payload, &prePrepare) if err != nil { @@ -166,7 +138,18 @@ func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.PubSubMessage) "blockHash": fmt.Sprintf("%x", cmsg.Block.Header.Hash), "from": message.From.String(), }).Debug("Received PREPREPARE message") - pcm.blockPool.AddBlock(cmsg.Block) + pcm.consensusRoundPool.AddConsensusInfo(cmsg.Block) + + encodedHash := hex.EncodeToString(cmsg.Blockhash) + if m, ok := pcm.stateChangeChannels[encodedHash]; ok { + if channels, ok := m[StateStatusPrePrepared]; ok { + for _, v := range channels { + v <- true + close(v) + delete(pcm.stateChangeChannels, encodedHash) + } + } + } prepareMsg, err := NewMessage(cmsg, types.ConsensusMessageTypePrepare, pcm.privKey) if err != nil { @@ -176,12 +159,9 @@ func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.PubSubMessage) logrus.WithField("blockHash", fmt.Sprintf("%x", prePrepare.Block.Header.Hash)).Debugf("Entered into PREPREPARED state") pcm.psb.BroadcastToServiceTopic(prepareMsg) - pcm.state.status = StateStatusPrePrepared } func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) { - //pcm.state.mutex.Lock() - //defer pcm.state.mutex.Unlock() var prepare types.PrepareMessage err := cbor.Unmarshal(message.Payload, &prepare) if err != nil { @@ -196,9 +176,18 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) { Signature: prepare.Signature, } - if _, err := pcm.blockPool.GetBlock(cmsg.Blockhash); errors.Is(err, cache.ErrNotFound) { - logrus.WithField("blockHash", hex.EncodeToString(cmsg.Blockhash)).Warnf("received unknown block %x", cmsg.Blockhash) - return + if _, err := pcm.consensusRoundPool.GetConsensusInfo(cmsg.Blockhash); errors.Is(err, cache.ErrNotFound) { + encodedHash := hex.EncodeToString(cmsg.Blockhash) + logrus.WithField("blockHash", encodedHash).Warn("received PREPARE for unknown block") + waitingCh := make(chan bool) + if _, ok := pcm.stateChangeChannels[encodedHash]; !ok { + pcm.stateChangeChannels[encodedHash] = map[State][]chan bool{} + } + pcm.stateChangeChannels[encodedHash][StateStatusPrePrepared] = append(pcm.stateChangeChannels[encodedHash][StateStatusPrePrepared], waitingCh) + result := <-waitingCh + if !result { + return + } } if pcm.msgLog.Exists(cmsg) { @@ -225,13 +214,23 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) { } pcm.psb.BroadcastToServiceTopic(commitMsg) logrus.WithField("blockHash", fmt.Sprintf("%x", cmsg.Blockhash)).Debugf("Entered into PREPARED state") - pcm.state.status = StateStatusPrepared + pcm.consensusRoundPool.UpdateConsensusState(cmsg.Blockhash, StateStatusPrepared) + + // pull watchers + encodedHash := hex.EncodeToString(cmsg.Blockhash) + if m, ok := pcm.stateChangeChannels[encodedHash]; ok { + if channels, ok := m[StateStatusPrepared]; ok { + for _, v := range channels { + v <- true + close(v) + delete(pcm.stateChangeChannels, encodedHash) + } + } + } } } func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.PubSubMessage) { - //pcm.state.mutex.Lock() - //defer pcm.state.mutex.Unlock() var commit types.CommitMessage err := cbor.Unmarshal(message.Payload, &commit) if err != nil { @@ -246,11 +245,27 @@ func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.PubSubMessage) { Signature: commit.Signature, } - if _, err := pcm.blockPool.GetBlock(cmsg.Blockhash); errors.Is(err, cache.ErrNotFound) { - logrus.Warnf("received unknown block %x", cmsg.Blockhash) + ci, err := pcm.consensusRoundPool.GetConsensusInfo(cmsg.Blockhash) + + if errors.Is(err, cache.ErrNotFound) { + logrus.WithField("blockHash", hex.EncodeToString(cmsg.Blockhash)).Warnf("received COMMIT for unknown block") return } + if ci.State < StateStatusPrepared { + encodedHash := hex.EncodeToString(cmsg.Blockhash) + logrus.WithField("blockHash", encodedHash).Warnf("incorrect state of block consensus") + waitingCh := make(chan bool) + if _, ok := pcm.stateChangeChannels[encodedHash]; !ok { + pcm.stateChangeChannels[encodedHash] = map[State][]chan bool{} + } + pcm.stateChangeChannels[encodedHash][StateStatusPrepared] = append(pcm.stateChangeChannels[encodedHash][StateStatusPrepared], waitingCh) + result := <-waitingCh + if !result { + return + } + } + if pcm.msgLog.Exists(cmsg) { logrus.Tracef("received existing commit msg for block %x", cmsg.Blockhash) return @@ -268,27 +283,22 @@ func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.PubSubMessage) { }).Debug("Received COMMIT message") if len(pcm.msgLog.Get(types.ConsensusMessageTypeCommit, cmsg.Blockhash)) >= pcm.minApprovals { - block, err := pcm.blockPool.GetBlock(cmsg.Blockhash) - if err != nil { - logrus.Error(err) - return - } - pcm.blockPool.AddAcceptedBlock(block) logrus.WithField("blockHash", fmt.Sprintf("%x", cmsg.Blockhash)).Debugf("Entered into COMMIT state") - pcm.state.status = StateStatusCommited + pcm.consensusRoundPool.UpdateConsensusState(cmsg.Blockhash, StateStatusCommited) } } func (pcm *PBFTConsensusManager) onNewBeaconEntry(entry types2.BeaconEntry) { block, err := pcm.commitAcceptedBlocks() + height, _ := pcm.blockchain.GetLatestBlockHeight() if err != nil { if errors.Is(err, ErrNoAcceptedBlocks) { logrus.WithFields(logrus.Fields{ - "height": pcm.state.blockHeight, + "height": height + 1, }).Infof("No accepted blocks in the current consensus round") } else { logrus.WithFields(logrus.Fields{ - "height": pcm.state.blockHeight, + "height": height + 1, "err": err.Error(), }).Errorf("Failed to select the block in the current consensus round") return @@ -312,26 +322,20 @@ func (pcm *PBFTConsensusManager) onNewBeaconEntry(entry types2.BeaconEntry) { if block.Header.Proposer.String() == pcm.address.String() { pcm.submitTasksFromBlock(block) } - - pcm.state.blockHeight = pcm.state.blockHeight + 1 } - // get latest block - height, err := pcm.blockchain.GetLatestBlockHeight() - if err != nil { - logrus.Error(err) - return - } - blockHeader, err := pcm.blockchain.FetchBlockHeaderByHeight(height) - if err != nil { - logrus.Error(err) - return + for k, v := range pcm.stateChangeChannels { + for k1, j := range v { + for _, ch := range j { + ch <- true + close(ch) + } + delete(v, k1) + } + delete(pcm.stateChangeChannels, k) } - pcm.state.drandRound = entry.Round - pcm.state.randomness = entry.Data - - minedBlock, err := pcm.miner.MineBlock(entry.Data, entry.Round, blockHeader) + minedBlock, err := pcm.miner.MineBlock(entry.Data, entry.Round) if err != nil { if errors.Is(err, blockchain.ErrNoTxForBlock) { logrus.Info("Sealing skipped, no transactions in mempool") @@ -343,7 +347,6 @@ func (pcm *PBFTConsensusManager) onNewBeaconEntry(entry types2.BeaconEntry) { // if we are round winner if minedBlock != nil { - logrus.WithField("height", pcm.state.blockHeight).Infof("We have been elected in the current consensus round") err = pcm.propose(minedBlock) if err != nil { logrus.Errorf("Failed to propose the block: %s", err.Error()) @@ -388,32 +391,34 @@ func (pcm *PBFTConsensusManager) submitTasksFromBlock(block *types3.Block) { } func (pcm *PBFTConsensusManager) commitAcceptedBlocks() (*types3.Block, error) { - blocks := pcm.blockPool.GetAllAcceptedBlocks() + blocks := pcm.consensusRoundPool.GetAllBlocksWithCommit() if blocks == nil { return nil, ErrNoAcceptedBlocks } var maxStake *big.Int + var maxWinCount int64 = -1 var selectedBlock *types3.Block for _, v := range blocks { - stake, err := pcm.ethereumClient.GetMinerStake(v.Header.ProposerEth) + stake, err := pcm.ethereumClient.GetMinerStake(v.Block.Header.ProposerEth) if err != nil { return nil, err } - if maxStake != nil { - if stake.Cmp(maxStake) == -1 { + if maxStake != nil && maxWinCount != -1 { + if stake.Cmp(maxStake) == -1 || v.Block.Header.ElectionProof.WinCount < maxWinCount { continue } } maxStake = stake - selectedBlock = v + maxWinCount = v.Block.Header.ElectionProof.WinCount + selectedBlock = v.Block } logrus.WithFields(logrus.Fields{ "hash": hex.EncodeToString(selectedBlock.Header.Hash), "height": selectedBlock.Header.Height, "miner": selectedBlock.Header.Proposer.String(), }).Info("Committed new block") - pcm.blockPool.PruneAcceptedBlocks(selectedBlock) + pcm.consensusRoundPool.Prune() for _, v := range selectedBlock.Data { err := pcm.mempool.DeleteTx(v.Hash) if err != nil { diff --git a/consensus/consensus_round_pool.go b/consensus/consensus_round_pool.go new file mode 100644 index 0000000..902e066 --- /dev/null +++ b/consensus/consensus_round_pool.go @@ -0,0 +1,119 @@ +package consensus + +import ( + "bytes" + "encoding/hex" + "fmt" + "time" + + "github.com/Secured-Finance/dione/blockchain/pool" + + "github.com/asaskevich/EventBus" + + "github.com/sirupsen/logrus" + + "github.com/Secured-Finance/dione/blockchain/types" + "github.com/Secured-Finance/dione/cache" +) + +type State uint8 + +const ( + StateStatusUnknown = iota + + StateStatusPrePrepared + StateStatusPrepared + StateStatusCommited +) + +// ConsensusRoundPool is pool for blocks that isn't not validated or committed yet +type ConsensusRoundPool struct { + mempool *pool.Mempool + consensusInfoStorage cache.Cache + bus EventBus.Bus +} + +func NewConsensusRoundPool(mp *pool.Mempool, bus EventBus.Bus) (*ConsensusRoundPool, error) { + bp := &ConsensusRoundPool{ + consensusInfoStorage: cache.NewInMemoryCache(), + mempool: mp, + bus: bus, + } + + return bp, nil +} + +type ConsensusInfo struct { + Block *types.Block + State State +} + +func (crp *ConsensusRoundPool) AddConsensusInfo(block *types.Block) error { + encodedHash := hex.EncodeToString(block.Header.Hash) + + if crp.consensusInfoStorage.Exists(encodedHash) { + return nil + } + + err := crp.consensusInfoStorage.StoreWithTTL(encodedHash, &ConsensusInfo{ + Block: block, + State: StateStatusPrePrepared, + }, 10*time.Minute) + if err != nil { + return err + } + logrus.WithField("hash", fmt.Sprintf("%x", block.Header.Hash)).Debug("New block discovered") + crp.bus.Publish("blockpool:knownBlockAdded", block) + return nil +} + +func (crp *ConsensusRoundPool) UpdateConsensusState(blockhash []byte, newState State) error { + encodedHash := hex.EncodeToString(blockhash) + + var consensusInfo ConsensusInfo + err := crp.consensusInfoStorage.Get(encodedHash, &consensusInfo) + if err != nil { + return err + } + + if newState < consensusInfo.State { + return fmt.Errorf("attempt to set incorrect state") + } + consensusInfo.State = newState + crp.bus.Publish("blockpool:newConsensusState", blockhash, newState) + return crp.consensusInfoStorage.StoreWithTTL(encodedHash, &consensusInfo, 10*time.Minute) +} + +func (crp *ConsensusRoundPool) GetConsensusInfo(blockhash []byte) (*ConsensusInfo, error) { + var consensusInfo ConsensusInfo + err := crp.consensusInfoStorage.Get(hex.EncodeToString(blockhash), &consensusInfo) + return &consensusInfo, err +} + +// Prune cleans known blocks list. It is called when new consensus round starts. +func (crp *ConsensusRoundPool) Prune() { + for k := range crp.consensusInfoStorage.Items() { + crp.consensusInfoStorage.Delete(k) + } + crp.bus.Publish("blockpool:pruned") +} + +func (crp *ConsensusRoundPool) GetAllBlocksWithCommit() []*ConsensusInfo { + var consensusInfos []*ConsensusInfo + for _, v := range crp.consensusInfoStorage.Items() { + ci := v.(*ConsensusInfo) + if ci.State == StateStatusCommited { + consensusInfos = append(consensusInfos, ci) + } + } + return consensusInfos +} + +func containsTx(s []*types.Transaction, e *types.Transaction) bool { + for _, a := range s { + if bytes.Equal(a.Hash, e.Hash) { + return true + } + } + return false +} diff --git a/node/node.go b/node/node.go index 615c86b..b1b847b 100644 --- a/node/node.go +++ b/node/node.go @@ -4,6 +4,8 @@ import ( "context" "time" + "github.com/Secured-Finance/dione/blockchain" + drand2 "github.com/Secured-Finance/dione/beacon/drand" "github.com/Secured-Finance/dione/pubsub" @@ -50,6 +52,7 @@ func runNode( pubSubRouter *pubsub.PubSubRouter, disputeManager *consensus.DisputeManager, db *drand2.DrandBeacon, + bc *blockchain.BlockChain, ) { lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { @@ -200,7 +203,7 @@ func Start() { providePeerDiscovery, provideDrandBeacon, provideMempool, - provideMiner, + blockchain.NewMiner, provideBlockChain, provideBlockPool, provideSyncManager, @@ -214,8 +217,10 @@ func Start() { configureLogger, configureDirectRPC, configureForeignBlockchainRPC, + initializeBlockchain, + configureMiner, runNode, ), - fx.NopLogger, + //fx.NopLogger, ).Run() } diff --git a/node/node_dep_providers.go b/node/node_dep_providers.go index 779ffe7..93f9b0e 100644 --- a/node/node_dep_providers.go +++ b/node/node_dep_providers.go @@ -10,6 +10,8 @@ import ( "path" "runtime" + types2 "github.com/Secured-Finance/dione/blockchain/types" + "github.com/Secured-Finance/dione/rpc" "github.com/Secured-Finance/dione/rpc/filecoin" solana2 "github.com/Secured-Finance/dione/rpc/solana" @@ -76,12 +78,6 @@ func provideDisputeManager(ethClient *ethclient.EthereumClient, pcm *consensus.P return dm } -func provideMiner(h host.Host, ethClient *ethclient.EthereumClient, privateKey crypto.PrivKey, mempool *pool.Mempool) *blockchain.Miner { - miner := blockchain.NewMiner(h.ID(), *ethClient.GetEthAddress(), ethClient, privateKey, mempool) - logrus.Info("Mining subsystem has been initialized!") - return miner -} - func provideDrandBeacon(ps *pubsub.PubSubRouter, bus EventBus.Bus) *drand2.DrandBeacon { db, err := drand2.NewDrandBeacon(ps.Pubsub, bus) if err != nil { @@ -135,7 +131,7 @@ func provideConsensusManager( bc *blockchain.BlockChain, ethClient *ethclient.EthereumClient, privateKey crypto.PrivKey, - bp *pool.BlockPool, + bp *consensus.ConsensusRoundPool, db *drand2.DrandBeacon, mp *pool.Mempool, ) *consensus.PBFTConsensusManager { @@ -268,8 +264,8 @@ func provideNetworkService(bp *blockchain.BlockChain, mp *pool.Mempool) *Network return ns } -func provideBlockPool(mp *pool.Mempool, bus EventBus.Bus) *pool.BlockPool { - bp, err := pool.NewBlockPool(mp, bus) +func provideBlockPool(mp *pool.Mempool, bus EventBus.Bus) *consensus.ConsensusRoundPool { + bp, err := consensus.NewConsensusRoundPool(mp, bus) if err != nil { logrus.Fatalf("Failed to initialize blockpool: %s", err.Error()) } @@ -392,3 +388,19 @@ func configureForeignBlockchainRPC() { logrus.Info("Foreign Blockchain RPC clients has been successfully configured!") } + +func configureMiner(m *blockchain.Miner, b *blockchain.BlockChain) { + m.SetBlockchainInstance(b) +} + +func initializeBlockchain(bc *blockchain.BlockChain) { + _, err := bc.GetLatestBlockHeight() + if err == blockchain.ErrLatestHeightNil { + gBlock := types2.GenesisBlock() + err = bc.StoreBlock(gBlock) // commit genesis block + if err != nil { + logrus.Fatal(err) + } + logrus.Info("Committed genesis block") + } +} diff --git a/types/electionproof.go b/types/electionproof.go index 5350dde..e95b2eb 100644 --- a/types/electionproof.go +++ b/types/electionproof.go @@ -106,7 +106,7 @@ func lambda(power, totalPower *big.Int) *big.Int { return lam } -var MaxWinCount = 3 * config.ExpectedLeadersPerEpoch.Int64() +var MaxWinCount = 10 * config.ExpectedLeadersPerEpoch.Int64() type poiss struct { lam *big.Int