From a6cf757fcf05756848093e4b84f8d7973da1567e Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Mon, 19 Jul 2021 23:19:06 +0300 Subject: [PATCH] Implement handler for NewBlock message in SyncManager, refactor block validation architecture --- beacon/domain_separation_tag.go | 8 + beacon/utils.go | 29 ++++ beacon/vrf.go | 28 ++++ blockchain/blockchain.go | 235 ++++++++++++++++++++++++----- {consensus => blockchain}/miner.go | 36 ++++- blockchain/pool/mempool.go | 4 +- blockchain/sync/sync_mgr.go | 29 +++- blockchain/types/block.go | 12 +- blockchain/utils/verification.go | 2 +- consensus/consensus.go | 133 ++++++++-------- consensus/consensus_validator.go | 222 ++++----------------------- consensus/utils.go | 80 ---------- node/node.go | 52 +++---- node/node_dep_providers.go | 14 +- 14 files changed, 468 insertions(+), 416 deletions(-) create mode 100644 beacon/domain_separation_tag.go create mode 100644 beacon/utils.go create mode 100644 beacon/vrf.go rename {consensus => blockchain}/miner.go (73%) diff --git a/beacon/domain_separation_tag.go b/beacon/domain_separation_tag.go new file mode 100644 index 0000000..d2196d6 --- /dev/null +++ b/beacon/domain_separation_tag.go @@ -0,0 +1,8 @@ +package beacon + +// RandomnessType specifies a type of randomness. +type RandomnessType int64 + +const ( + RandomnessTypeElectionProofProduction RandomnessType = 1 + iota +) diff --git a/beacon/utils.go b/beacon/utils.go new file mode 100644 index 0000000..3f47e4d --- /dev/null +++ b/beacon/utils.go @@ -0,0 +1,29 @@ +package beacon + +import ( + "encoding/binary" + + "github.com/minio/blake2b-simd" + "golang.org/x/xerrors" +) + +func DrawRandomness(rbase []byte, randomnessType RandomnessType, round uint64, entropy []byte) ([]byte, error) { + h := blake2b.New256() + if err := binary.Write(h, binary.BigEndian, int64(randomnessType)); err != nil { + return nil, xerrors.Errorf("deriving randomness: %v", err) + } + VRFDigest := blake2b.Sum256(rbase) + _, err := h.Write(VRFDigest[:]) + if err != nil { + return nil, xerrors.Errorf("hashing VRFDigest: %w", err) + } + if err := binary.Write(h, binary.BigEndian, round); err != nil { + return nil, xerrors.Errorf("deriving randomness: %v", err) + } + _, err = h.Write(entropy) + if err != nil { + return nil, xerrors.Errorf("hashing entropy: %v", err) + } + + return h.Sum(nil), nil +} diff --git a/beacon/vrf.go b/beacon/vrf.go new file mode 100644 index 0000000..5e1b7a0 --- /dev/null +++ b/beacon/vrf.go @@ -0,0 +1,28 @@ +package beacon + +import ( + "fmt" + + "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/peer" +) + +func ComputeVRF(privKey crypto.PrivKey, sigInput []byte) ([]byte, error) { + return privKey.Sign(sigInput) +} + +func VerifyVRF(worker peer.ID, vrfBase, vrfproof []byte) error { + pk, err := worker.ExtractPublicKey() + if err != nil { + return err + } + ok, err := pk.Verify(vrfBase, vrfproof) + if err != nil { + return err + } + if !ok { + return fmt.Errorf("vrf was invalid") + } + + return nil +} diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index 6d5874c..e92f01c 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -1,10 +1,22 @@ package blockchain import ( + "bytes" + "context" "encoding/binary" "encoding/hex" "errors" + "fmt" "os" + "sync" + + "github.com/Secured-Finance/dione/beacon" + + "github.com/Secured-Finance/dione/consensus/validation" + "github.com/Secured-Finance/dione/types" + "github.com/sirupsen/logrus" + "github.com/wealdtech/go-merkletree" + "github.com/wealdtech/go-merkletree/keccak256" "github.com/asaskevich/EventBus" @@ -29,16 +41,22 @@ var ( ) type BlockChain struct { + // db-related dbEnv *lmdb.Env db lmdb.DBI metadataIndex *utils.Index heightIndex *utils.Index - bus EventBus.Bus + + bus EventBus.Bus + miner *Miner + b beacon.BeaconAPI } -func NewBlockChain(path string, bus EventBus.Bus) (*BlockChain, error) { +func NewBlockChain(path string, bus EventBus.Bus, miner *Miner, b beacon.BeaconAPI) (*BlockChain, error) { chain := &BlockChain{ - bus: bus, + bus: bus, + miner: miner, + b: b, } // configure lmdb env @@ -88,16 +106,16 @@ func NewBlockChain(path string, bus EventBus.Bus) (*BlockChain, error) { return chain, nil } -func (bp *BlockChain) setLatestBlockHeight(height uint64) error { - err := bp.metadataIndex.PutUint64([]byte(LatestBlockHeightKey), height) +func (bc *BlockChain) setLatestBlockHeight(height uint64) error { + err := bc.metadataIndex.PutUint64([]byte(LatestBlockHeightKey), height) if err != nil { return err } return nil } -func (bp *BlockChain) GetLatestBlockHeight() (uint64, error) { - height, err := bp.metadataIndex.GetUint64([]byte(LatestBlockHeightKey)) +func (bc *BlockChain) GetLatestBlockHeight() (uint64, error) { + height, err := bc.metadataIndex.GetUint64([]byte(LatestBlockHeightKey)) if err != nil { if err == utils.ErrIndexKeyNotFound { return 0, ErrLatestHeightNil @@ -107,8 +125,20 @@ func (bp *BlockChain) GetLatestBlockHeight() (uint64, error) { return height, nil } -func (bp *BlockChain) StoreBlock(block *types2.Block) error { - err := bp.dbEnv.Update(func(txn *lmdb.Txn) error { +func (bc *BlockChain) StoreBlock(block *types2.Block) error { + if exists, err := bc.HasBlock(block.Header.Hash); err != nil { + return err + } else if exists { + //return fmt.Errorf("block already exists in blockchain") + return nil + } + + err := bc.ValidateBlock(block) + if err != nil { + return fmt.Errorf("failed to store block: %w", err) + } + + err = bc.dbEnv.Update(func(txn *lmdb.Txn) error { data, err := cbor.Marshal(block.Data) if err != nil { return err @@ -118,11 +148,11 @@ func (bp *BlockChain) StoreBlock(block *types2.Block) error { return err } blockHash := hex.EncodeToString(block.Header.Hash) - err = txn.Put(bp.db, []byte(DefaultBlockDataPrefix+blockHash), data, 0) + err = txn.Put(bc.db, []byte(DefaultBlockDataPrefix+blockHash), data, 0) if err != nil { return err } - err = txn.Put(bp.db, []byte(DefaultBlockHeaderPrefix+blockHash), headerData, 0) // store header separately for easy fetching + err = txn.Put(bc.db, []byte(DefaultBlockHeaderPrefix+blockHash), headerData, 0) // store header separately for easy fetching return err }) if err != nil { @@ -132,36 +162,36 @@ func (bp *BlockChain) StoreBlock(block *types2.Block) error { // update index "height -> block hash" heightBytes := make([]byte, 8) binary.LittleEndian.PutUint64(heightBytes, block.Header.Height) - err = bp.heightIndex.PutBytes(heightBytes, block.Header.Hash) + err = bc.heightIndex.PutBytes(heightBytes, block.Header.Hash) if err != nil { return err } // update latest block height - height, err := bp.GetLatestBlockHeight() + height, err := bc.GetLatestBlockHeight() if err != nil && err != ErrLatestHeightNil { return err } if err == ErrLatestHeightNil || block.Header.Height > height { - if err = bp.setLatestBlockHeight(block.Header.Height); err != nil { + if err = bc.setLatestBlockHeight(block.Header.Height); err != nil { return err } } else if block.Header.Height > height { - if err = bp.setLatestBlockHeight(block.Header.Height); err != nil { + if err = bc.setLatestBlockHeight(block.Header.Height); err != nil { return err } - bp.bus.Publish("blockchain:latestBlockHeightUpdated", block) + bc.bus.Publish("blockchain:latestBlockHeightUpdated", block) } - bp.bus.Publish("blockchain:blockCommitted", block) + bc.bus.Publish("blockchain:blockCommitted", block) return nil } -func (bp *BlockChain) HasBlock(blockHash []byte) (bool, error) { +func (bc *BlockChain) HasBlock(blockHash []byte) (bool, error) { var blockExists bool - err := bp.dbEnv.View(func(txn *lmdb.Txn) error { + err := bc.dbEnv.View(func(txn *lmdb.Txn) error { h := hex.EncodeToString(blockHash) - _, err := txn.Get(bp.db, []byte(DefaultBlockHeaderPrefix+h)) // try to fetch block header + _, err := txn.Get(bc.db, []byte(DefaultBlockHeaderPrefix+h)) // try to fetch block header if err != nil { if lmdb.IsNotFound(err) { blockExists = false @@ -178,11 +208,11 @@ func (bp *BlockChain) HasBlock(blockHash []byte) (bool, error) { return blockExists, nil } -func (bp *BlockChain) FetchBlockData(blockHash []byte) ([]*types2.Transaction, error) { +func (bc *BlockChain) FetchBlockData(blockHash []byte) ([]*types2.Transaction, error) { var data []*types2.Transaction - err := bp.dbEnv.View(func(txn *lmdb.Txn) error { + err := bc.dbEnv.View(func(txn *lmdb.Txn) error { h := hex.EncodeToString(blockHash) - blockData, err := txn.Get(bp.db, []byte(DefaultBlockDataPrefix+h)) + blockData, err := txn.Get(bc.db, []byte(DefaultBlockDataPrefix+h)) if err != nil { if lmdb.IsNotFound(err) { return ErrBlockNotFound @@ -199,11 +229,11 @@ func (bp *BlockChain) FetchBlockData(blockHash []byte) ([]*types2.Transaction, e return data, nil } -func (bp *BlockChain) FetchBlockHeader(blockHash []byte) (*types2.BlockHeader, error) { +func (bc *BlockChain) FetchBlockHeader(blockHash []byte) (*types2.BlockHeader, error) { var blockHeader types2.BlockHeader - err := bp.dbEnv.View(func(txn *lmdb.Txn) error { + err := bc.dbEnv.View(func(txn *lmdb.Txn) error { h := hex.EncodeToString(blockHash) - data, err := txn.Get(bp.db, []byte(DefaultBlockHeaderPrefix+h)) + data, err := txn.Get(bc.db, []byte(DefaultBlockHeaderPrefix+h)) if err != nil { if lmdb.IsNotFound(err) { return ErrBlockNotFound @@ -219,15 +249,15 @@ func (bp *BlockChain) FetchBlockHeader(blockHash []byte) (*types2.BlockHeader, e return &blockHeader, nil } -func (bp *BlockChain) FetchBlock(blockHash []byte) (*types2.Block, error) { +func (bc *BlockChain) FetchBlock(blockHash []byte) (*types2.Block, error) { var block types2.Block - header, err := bp.FetchBlockHeader(blockHash) + header, err := bc.FetchBlockHeader(blockHash) if err != nil { return nil, err } block.Header = header - data, err := bp.FetchBlockData(blockHash) + data, err := bc.FetchBlockData(blockHash) if err != nil { return nil, err } @@ -236,34 +266,169 @@ func (bp *BlockChain) FetchBlock(blockHash []byte) (*types2.Block, error) { return &block, nil } -func (bp *BlockChain) FetchBlockByHeight(height uint64) (*types2.Block, error) { +func (bc *BlockChain) FetchBlockByHeight(height uint64) (*types2.Block, error) { var heightBytes = make([]byte, 8) binary.LittleEndian.PutUint64(heightBytes, height) - blockHash, err := bp.heightIndex.GetBytes(heightBytes) + blockHash, err := bc.heightIndex.GetBytes(heightBytes) if err != nil { if err == utils.ErrIndexKeyNotFound { return nil, ErrBlockNotFound } } - block, err := bp.FetchBlock(blockHash) + block, err := bc.FetchBlock(blockHash) if err != nil { return nil, err } return block, nil } -func (bp *BlockChain) FetchBlockHeaderByHeight(height uint64) (*types2.BlockHeader, error) { +func (bc *BlockChain) FetchBlockHeaderByHeight(height uint64) (*types2.BlockHeader, error) { var heightBytes = make([]byte, 8) binary.LittleEndian.PutUint64(heightBytes, height) - blockHash, err := bp.heightIndex.GetBytes(heightBytes) + blockHash, err := bc.heightIndex.GetBytes(heightBytes) if err != nil { if err == utils.ErrIndexKeyNotFound { return nil, ErrBlockNotFound } } - blockHeader, err := bp.FetchBlockHeader(blockHash) + blockHeader, err := bc.FetchBlockHeader(blockHash) if err != nil { return nil, err } return blockHeader, nil } + +func (bc *BlockChain) ValidateBlock(block *types2.Block) error { + // === verify block signature === + pubkey, err := block.Header.Proposer.ExtractPublicKey() + if err != nil { + return fmt.Errorf("unable to extract public key from block proposer's peer id: %w", err) + } + + ok, err := pubkey.Verify(block.Header.Hash, block.Header.Signature) + if err != nil { + return fmt.Errorf("failed to verify block signature: %w", err) + } + if !ok { + return fmt.Errorf("signature of block %x is invalid", block.Header.Hash) + } + ///////////////////////////////// + + // === check last hash merkle proof === + latestHeight, err := bc.GetLatestBlockHeight() + if err != nil { + return err + } + previousBlockHeader, err := bc.FetchBlockHeaderByHeight(latestHeight) + if err != nil { + return err + } + if !bytes.Equal(block.Header.LastHash, previousBlockHeader.Hash) { + return fmt.Errorf("block header has invalid last block hash (expected: %x, actual %x)", previousBlockHeader.Hash, block.Header.LastHash) + } + + verified, err := merkletree.VerifyProofUsing(previousBlockHeader.Hash, true, block.Header.LastHashProof, [][]byte{block.Header.Hash}, keccak256.New()) + if err != nil { + return fmt.Errorf("failed to verify last block hash merkle proof: %w", err) + } + if !verified { + return fmt.Errorf("merkle hash of block doesn't contain hash of previous block") + } + ///////////////////////////////// + + // === verify election proof wincount preliminarily === + if block.Header.ElectionProof.WinCount < 1 { + return fmt.Errorf("block proposer %s is not a winner", block.Header.Proposer.String()) + } + ///////////////////////////////// + + // === verify miner's eligibility to propose this task === + err = bc.miner.IsMinerEligibleToProposeBlock(block.Header.ProposerEth) + if err != nil { + return fmt.Errorf("block proposer is not eligible to propose block: %w", err) + } + ///////////////////////////////// + + // === verify election proof vrf === + proposerBuf, err := block.Header.Proposer.MarshalBinary() + if err != nil { + return err + } + + res, err := bc.b.Entry(context.TODO(), block.Header.ElectionProof.RandomnessRound) + if err != nil { + return err + } + eproofRandomness, err := beacon.DrawRandomness( + res.Data, + beacon.RandomnessTypeElectionProofProduction, + block.Header.Height, + proposerBuf, + ) + if err != nil { + return fmt.Errorf("failed to draw ElectionProof randomness: %w", err) + } + + err = beacon.VerifyVRF(*block.Header.Proposer, eproofRandomness, block.Header.ElectionProof.VRFProof) + if err != nil { + return fmt.Errorf("failed to verify election proof vrf: %w", err) + } + ////////////////////////////////////// + + // === compute wincount locally and verify values === + mStake, nStake, err := bc.miner.GetStakeInfo(block.Header.ProposerEth) + if err != nil { + return fmt.Errorf("failed to get miner stake: %w", err) + } + + actualWinCount := block.Header.ElectionProof.ComputeWinCount(mStake, nStake) + if block.Header.ElectionProof.WinCount != actualWinCount { + return fmt.Errorf("locally computed wincount of block is not matching to the received value") + } + ////////////////////////////////////// + + // === validate block transactions === + result := make(chan error) + var wg sync.WaitGroup + for _, v := range block.Data { + wg.Add(1) + go func(v *types2.Transaction, c chan error) { + defer wg.Done() + if err := utils.VerifyTx(block.Header, v); err != nil { + c <- fmt.Errorf("failed to verify tx: %w", err) + return + } + + var task types.DioneTask + err = cbor.Unmarshal(v.Data, &task) + if err != nil { + c <- fmt.Errorf("failed to unmarshal transaction payload: %w", err) + return + } + + if validationFunc := validation.GetValidationMethod(task.OriginChain, task.RequestType); validationFunc != nil { + if err := validationFunc(&task); err != nil { + c <- fmt.Errorf("payload validation has been failed: %w", err) + return + } + } else { + logrus.WithFields(logrus.Fields{ + "originChain": task.OriginChain, + "requestType": task.RequestType, + }).Debug("This origin chain/request type doesn't have any payload validation!") + } + }(v, result) + } + go func() { + wg.Wait() + close(result) + }() + for err := range result { + if err != nil { + return err + } + } + ///////////////////////////////// + + return nil +} diff --git a/consensus/miner.go b/blockchain/miner.go similarity index 73% rename from consensus/miner.go rename to blockchain/miner.go index ebbb982..46b63bb 100644 --- a/consensus/miner.go +++ b/blockchain/miner.go @@ -1,4 +1,4 @@ -package consensus +package blockchain import ( "errors" @@ -6,6 +6,9 @@ import ( "math/big" "sync" + "github.com/Secured-Finance/dione/beacon" + "github.com/Secured-Finance/dione/types" + "github.com/Secured-Finance/dione/blockchain/pool" "github.com/libp2p/go-libp2p-core/crypto" @@ -96,7 +99,7 @@ func (m *Miner) MineBlock(randomness []byte, randomnessRound uint64, lastBlockHe return nil, fmt.Errorf("failed to update miner stake: %w", err) } - winner, err := IsRoundWinner( + winner, err := isRoundWinner( lastBlockHeader.Height+1, m.address, randomness, @@ -110,6 +113,7 @@ func (m *Miner) MineBlock(randomness []byte, randomnessRound uint64, lastBlockHe } if winner == nil { + logrus.WithField("height", lastBlockHeader.Height+1).Debug("Block is not mined because we are not leader in consensus round") return nil, nil } @@ -136,3 +140,31 @@ func (m *Miner) IsMinerEligibleToProposeBlock(ethAddress common.Address) error { } return nil } + +func isRoundWinner(round uint64, + worker peer.ID, randomness []byte, randomnessRound uint64, minerStake, networkStake *big.Int, privKey crypto.PrivKey) (*types.ElectionProof, error) { + + buf, err := worker.MarshalBinary() + if err != nil { + return nil, fmt.Errorf("failed to marshal address: %w", err) + } + + electionRand, err := beacon.DrawRandomness(randomness, beacon.RandomnessTypeElectionProofProduction, round, buf) + if err != nil { + return nil, fmt.Errorf("failed to draw randomness: %w", err) + } + + vrfout, err := beacon.ComputeVRF(privKey, electionRand) + if err != nil { + return nil, fmt.Errorf("failed to compute VRF: %w", err) + } + + ep := &types.ElectionProof{VRFProof: vrfout, RandomnessRound: randomnessRound} + j := ep.ComputeWinCount(minerStake, networkStake) + ep.WinCount = j + if j < 1 { + return nil, nil + } + + return ep, nil +} diff --git a/blockchain/pool/mempool.go b/blockchain/pool/mempool.go index ce1753d..0a64d55 100644 --- a/blockchain/pool/mempool.go +++ b/blockchain/pool/mempool.go @@ -43,7 +43,7 @@ func NewMempool(bus EventBus.Bus) (*Mempool, error) { func (mp *Mempool) StoreTx(tx *types2.Transaction) error { hashStr := hex.EncodeToString(tx.Hash) err := mp.cache.StoreWithTTL(DefaultTxPrefix+hashStr, tx, DefaultTxTTL) - logrus.Infof("Submitted new transaction in mempool with hash %x", tx.Hash) + logrus.WithField("txHash", hex.EncodeToString(tx.Hash)).Info("Submitted new transaction in mempool") mp.bus.Publish("mempool:transactionAdded", tx) return err } @@ -56,7 +56,7 @@ func (mp *Mempool) DeleteTx(txHash []byte) error { return err } mp.cache.Delete(DefaultTxPrefix + hashStr) - logrus.Debugf("Deleted transaction from mempool %x", txHash) + logrus.WithField("txHash", hex.EncodeToString(txHash)).Debugf("Deleted transaction from mempool") mp.bus.Publish("mempool:transactionRemoved", tx) return nil } diff --git a/blockchain/sync/sync_mgr.go b/blockchain/sync/sync_mgr.go index 70ce135..f18a73f 100644 --- a/blockchain/sync/sync_mgr.go +++ b/blockchain/sync/sync_mgr.go @@ -3,6 +3,7 @@ package sync import ( "bytes" "context" + "encoding/hex" "errors" "fmt" "strings" @@ -51,11 +52,11 @@ type syncManager struct { bus EventBus.Bus } -func NewSyncManager(bus EventBus.Bus, bp *blockchain.BlockChain, mp *pool.Mempool, p2pRPCClient *gorpc.Client, bootstrapPeer peer.ID, psb *pubsub.PubSubRouter) SyncManager { +func NewSyncManager(bus EventBus.Bus, bc *blockchain.BlockChain, mp *pool.Mempool, p2pRPCClient *gorpc.Client, bootstrapPeer peer.ID, psb *pubsub.PubSubRouter) SyncManager { ctx, cancelFunc := context.WithCancel(context.Background()) sm := &syncManager{ bus: bus, - blockpool: bp, + blockpool: bc, mempool: mp, ctx: ctx, ctxCancelFunc: cancelFunc, @@ -66,6 +67,7 @@ func NewSyncManager(bus EventBus.Bus, bp *blockchain.BlockChain, mp *pool.Mempoo } psb.Hook(pubsub.NewTxMessageType, sm.onNewTransaction) + psb.Hook(pubsub.NewBlockMessageType, sm.onNewBlock) go func() { if err := sm.initialSync(); err != nil { @@ -217,7 +219,7 @@ func (sm *syncManager) processReceivedBlock(block types2.Block) error { } verified, err := merkletree.VerifyProofUsing(previousBlockHeader.Hash, false, block.Header.LastHashProof, [][]byte{block.Header.Hash}, keccak256.New()) if err != nil { - return fmt.Errorf("failed to verify last block hash merkle proof: %s", err.Error()) + return fmt.Errorf("failed to verify last block hash merkle proof: %w", err) } if !verified { return fmt.Errorf("merkle hash of current block doesn't contain hash of previous block") @@ -246,13 +248,30 @@ func (sm *syncManager) onNewTransaction(message *pubsub.PubSubMessage) { return } + // TODO add more checks on tx if !tx.ValidateHash() { - logrus.Warn("failed to validate tx hash, rejecting it") + logrus.WithField("txHash", hex.EncodeToString(tx.Hash)).Warn("failed to validate transaction hash, rejecting it") return - } // TODO add more checks on tx + } err = sm.mempool.StoreTx(&tx) if err != nil { logrus.Warnf("failed to store incoming transaction in mempool: %s", err.Error()) } } + +func (sm *syncManager) onNewBlock(message *pubsub.PubSubMessage) { + var block types2.Block + + err := cbor.Unmarshal(message.Payload, &block) + if err != nil { + logrus.WithField("err", err.Error()).Error("failed to unmarshal payload of NewBlock message") + return + } + + err = sm.blockpool.StoreBlock(&block) + if err != nil { + logrus.WithField("err", err.Error()).Error("failed to store block from NewBlock message") + return + } +} diff --git a/blockchain/types/block.go b/blockchain/types/block.go index 262599d..bb0d6ad 100644 --- a/blockchain/types/block.go +++ b/blockchain/types/block.go @@ -1,6 +1,7 @@ package types import ( + "encoding/binary" "time" "github.com/Secured-Finance/dione/types" @@ -48,13 +49,16 @@ func CreateBlock(lastBlockHeader *BlockHeader, txs []*Transaction, minerEth comm timestamp := time.Now().Unix() // extract hashes from transactions - var txHashes [][]byte + var merkleHashes [][]byte for _, tx := range txs { - txHashes = append(txHashes, tx.Hash) + merkleHashes = append(merkleHashes, tx.Hash) } - txHashes = append(txHashes, lastBlockHeader.Hash) + merkleHashes = append(merkleHashes, lastBlockHeader.Hash) + timestampBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(timestampBytes, uint64(timestamp)) + merkleHashes = append(merkleHashes, timestampBytes) - tree, err := merkletree.NewUsing(txHashes, keccak256.New(), false) + tree, err := merkletree.NewUsing(merkleHashes, keccak256.New(), true) if err != nil { return nil, err } diff --git a/blockchain/utils/verification.go b/blockchain/utils/verification.go index 964c32d..21e9742 100644 --- a/blockchain/utils/verification.go +++ b/blockchain/utils/verification.go @@ -12,7 +12,7 @@ func VerifyTx(blockHeader *types.BlockHeader, tx *types.Transaction) error { if tx.MerkleProof == nil { return fmt.Errorf("block transaction doesn't have merkle proof") } - txProofVerified, err := merkletree.VerifyProofUsing(tx.Hash, false, tx.MerkleProof, [][]byte{blockHeader.Hash}, keccak256.New()) + txProofVerified, err := merkletree.VerifyProofUsing(tx.Hash, true, tx.MerkleProof, [][]byte{blockHeader.Hash}, keccak256.New()) if err != nil { return fmt.Errorf("failed to verify tx hash merkle proof: %s", err.Error()) } diff --git a/consensus/consensus.go b/consensus/consensus.go index 69a4a9d..1eeac6f 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -6,6 +6,8 @@ import ( "math/big" "sync" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/Secured-Finance/dione/beacon" "github.com/fxamacker/cbor/v2" @@ -53,11 +55,12 @@ type PBFTConsensusManager struct { msgLog *ConsensusMessageLog validator *ConsensusValidator ethereumClient *ethclient.EthereumClient - miner *Miner + miner *blockchain.Miner blockPool *pool.BlockPool mempool *pool.Mempool blockchain *blockchain.BlockChain state *State + address peer.ID } type State struct { @@ -75,28 +78,32 @@ func NewPBFTConsensusManager( minApprovals int, privKey crypto.PrivKey, ethereumClient *ethclient.EthereumClient, - miner *Miner, + miner *blockchain.Miner, bc *blockchain.BlockChain, bp *pool.BlockPool, b beacon.BeaconNetwork, mempool *pool.Mempool, + address peer.ID, ) *PBFTConsensusManager { - pcm := &PBFTConsensusManager{} - pcm.psb = psb - pcm.miner = miner - pcm.validator = NewConsensusValidator(miner, bc, b) - pcm.msgLog = NewConsensusMessageLog() - pcm.minApprovals = minApprovals - pcm.privKey = privKey - pcm.ethereumClient = ethereumClient - pcm.state = &State{ - ready: false, - status: StateStatusUnknown, + pcm := &PBFTConsensusManager{ + psb: psb, + miner: miner, + validator: NewConsensusValidator(miner, bc, b), + msgLog: NewConsensusMessageLog(), + minApprovals: minApprovals, + privKey: privKey, + ethereumClient: ethereumClient, + state: &State{ + ready: false, + status: StateStatusUnknown, + }, + bus: bus, + blockPool: bp, + mempool: mempool, + blockchain: bc, + address: address, } - pcm.bus = bus - pcm.blockPool = bp - pcm.mempool = mempool - pcm.blockchain = bc + pcm.psb.Hook(pubsub.PrePrepareMessageType, pcm.handlePrePrepare) pcm.psb.Hook(pubsub.PrepareMessageType, pcm.handlePrepare) pcm.psb.Hook(pubsub.CommitMessageType, pcm.handleCommit) @@ -120,8 +127,8 @@ func (pcm *PBFTConsensusManager) propose(blk *types3.Block) error { } func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.PubSubMessage) { - pcm.state.mutex.Lock() - defer pcm.state.mutex.Unlock() + //pcm.state.mutex.Lock() + //defer pcm.state.mutex.Unlock() var prePrepare types.PrePrepareMessage err := cbor.Unmarshal(message.Payload, &prePrepare) if err != nil { @@ -129,7 +136,7 @@ func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.PubSubMessage) return } - if *prePrepare.Block.Header.Proposer == pcm.miner.address { + if *prePrepare.Block.Header.Proposer == pcm.address { return } @@ -144,7 +151,7 @@ func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.PubSubMessage) logrus.Tracef("received existing pre_prepare msg for block %x", cmsg.Block.Header.Hash) return } - if !pcm.validator.Valid(cmsg, map[string]interface{}{"randomness": pcm.state.randomness}) { + if !pcm.validator.Valid(cmsg) { logrus.Warnf("received invalid pre_prepare msg for block %x", cmsg.Block.Header.Hash) return } @@ -163,8 +170,8 @@ func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.PubSubMessage) } func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) { - pcm.state.mutex.Lock() - defer pcm.state.mutex.Unlock() + //pcm.state.mutex.Lock() + //defer pcm.state.mutex.Unlock() var prepare types.PrepareMessage err := cbor.Unmarshal(message.Payload, &prepare) if err != nil { @@ -189,7 +196,7 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) { return } - if !pcm.validator.Valid(cmsg, nil) { + if !pcm.validator.Valid(cmsg) { logrus.Warnf("received invalid prepare msg for block %x", cmsg.Blockhash) return } @@ -208,8 +215,8 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) { } func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.PubSubMessage) { - pcm.state.mutex.Lock() - defer pcm.state.mutex.Unlock() + //pcm.state.mutex.Lock() + //defer pcm.state.mutex.Unlock() var commit types.CommitMessage err := cbor.Unmarshal(message.Payload, &commit) if err != nil { @@ -233,7 +240,7 @@ func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.PubSubMessage) { logrus.Tracef("received existing commit msg for block %x", cmsg.Blockhash) return } - if !pcm.validator.Valid(cmsg, nil) { + if !pcm.validator.Valid(cmsg) { logrus.Warnf("received invalid commit msg for block %x", cmsg.Blockhash) return } @@ -281,39 +288,8 @@ func (pcm *PBFTConsensusManager) onNewBeaconEntry(entry types2.BeaconEntry) { // if we are miner of this block // then post dione tasks to target chains (currently, only Ethereum) - if block.Header.Proposer.String() == pcm.miner.address.String() { - for _, tx := range block.Data { - var task types2.DioneTask - err := cbor.Unmarshal(tx.Data, &task) - if err != nil { - logrus.WithFields(logrus.Fields{ - "err": err.Error(), - "txHash": hex.EncodeToString(tx.Hash), - }).Error("Failed to unmarshal transaction payload") - continue // FIXME - } - reqIDNumber, ok := big.NewInt(0).SetString(task.RequestID, 10) - if !ok { - logrus.WithFields(logrus.Fields{ - "txHash": hex.EncodeToString(tx.Hash), - }).Error("Failed to parse request id number in Dione task") - continue // FIXME - } - - err = pcm.ethereumClient.SubmitRequestAnswer(reqIDNumber, task.Payload) - if err != nil { - logrus.WithFields(logrus.Fields{ - "err": err.Error(), - "txHash": hex.EncodeToString(tx.Hash), - "reqID": reqIDNumber.String(), - }).Error("Failed to submit task to ETH chain") - continue // FIXME - } - logrus.WithFields(logrus.Fields{ - "txHash": hex.EncodeToString(tx.Hash), - "reqID": reqIDNumber.String(), - }).Debug("Dione task has been sucessfully submitted to ETH chain (DioneOracle contract)") - } + if block.Header.Proposer.String() == pcm.address.String() { + pcm.submitTasksFromBlock(block) } pcm.state.blockHeight = pcm.state.blockHeight + 1 @@ -336,7 +312,7 @@ func (pcm *PBFTConsensusManager) onNewBeaconEntry(entry types2.BeaconEntry) { minedBlock, err := pcm.miner.MineBlock(entry.Data, entry.Round, blockHeader) if err != nil { - if errors.Is(err, ErrNoTxForBlock) { + if errors.Is(err, blockchain.ErrNoTxForBlock) { logrus.Info("Sealing skipped, no transactions in mempool") } else { logrus.Errorf("Failed to mine the block: %s", err.Error()) @@ -355,6 +331,41 @@ func (pcm *PBFTConsensusManager) onNewBeaconEntry(entry types2.BeaconEntry) { } } +func (pcm *PBFTConsensusManager) submitTasksFromBlock(block *types3.Block) { + for _, tx := range block.Data { + var task types2.DioneTask + err := cbor.Unmarshal(tx.Data, &task) + if err != nil { + logrus.WithFields(logrus.Fields{ + "err": err.Error(), + "txHash": hex.EncodeToString(tx.Hash), + }).Error("Failed to unmarshal transaction payload") + continue // FIXME + } + reqIDNumber, ok := big.NewInt(0).SetString(task.RequestID, 10) + if !ok { + logrus.WithFields(logrus.Fields{ + "txHash": hex.EncodeToString(tx.Hash), + }).Error("Failed to parse request id number in Dione task") + continue // FIXME + } + + err = pcm.ethereumClient.SubmitRequestAnswer(reqIDNumber, task.Payload) + if err != nil { + logrus.WithFields(logrus.Fields{ + "err": err.Error(), + "txHash": hex.EncodeToString(tx.Hash), + "reqID": reqIDNumber.String(), + }).Error("Failed to submit task to ETH chain") + continue // FIXME + } + logrus.WithFields(logrus.Fields{ + "txHash": hex.EncodeToString(tx.Hash), + "reqID": reqIDNumber.String(), + }).Debug("Dione task has been sucessfully submitted to ETH chain (DioneOracle contract)") + } +} + func (pcm *PBFTConsensusManager) commitAcceptedBlocks() (*types3.Block, error) { blocks := pcm.blockPool.GetAllAcceptedBlocks() if blocks == nil { diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index 42bf214..a0cf8da 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -1,223 +1,61 @@ package consensus import ( - "bytes" - "context" - "fmt" - "sync" + "encoding/hex" "github.com/Secured-Finance/dione/beacon" - - types3 "github.com/Secured-Finance/dione/blockchain/types" + "github.com/sirupsen/logrus" "github.com/Secured-Finance/dione/blockchain" - "github.com/Secured-Finance/dione/blockchain/utils" types2 "github.com/Secured-Finance/dione/consensus/types" - "github.com/Secured-Finance/dione/consensus/validation" - "github.com/Secured-Finance/dione/types" - "github.com/filecoin-project/go-state-types/crypto" - "github.com/fxamacker/cbor/v2" - "github.com/sirupsen/logrus" - "github.com/wealdtech/go-merkletree" - "github.com/wealdtech/go-merkletree/keccak256" ) type ConsensusValidator struct { - validationFuncMap map[types2.ConsensusMessageType]func(msg types2.ConsensusMessage, metadata map[string]interface{}) bool - miner *Miner + validationFuncMap map[types2.ConsensusMessageType]func(msg types2.ConsensusMessage) bool + miner *blockchain.Miner beacon beacon.BeaconNetwork blockchain *blockchain.BlockChain } -func NewConsensusValidator(miner *Miner, bc *blockchain.BlockChain, b beacon.BeaconNetwork) *ConsensusValidator { +func NewConsensusValidator(miner *blockchain.Miner, bc *blockchain.BlockChain, b beacon.BeaconNetwork) *ConsensusValidator { cv := &ConsensusValidator{ miner: miner, blockchain: bc, beacon: b, } - cv.validationFuncMap = map[types2.ConsensusMessageType]func(msg types2.ConsensusMessage, metadata map[string]interface{}) bool{ - // FIXME it all - types2.ConsensusMessageTypePrePrepare: func(msg types2.ConsensusMessage, metadata map[string]interface{}) bool { - // === verify block signature === - pubkey, err := msg.Block.Header.Proposer.ExtractPublicKey() - if err != nil { - logrus.Errorf("unable to extract public key from block proposer's peer id: %s", err.Error()) + cv.validationFuncMap = map[types2.ConsensusMessageType]func(msg types2.ConsensusMessage) bool{ + types2.ConsensusMessageTypePrePrepare: func(msg types2.ConsensusMessage) bool { + if err := cv.blockchain.ValidateBlock(msg.Block); err != nil { + logrus.WithFields(logrus.Fields{ + "blockHash": hex.EncodeToString(msg.Block.Header.Hash), + "err": err.Error(), + }).Error("failed to validate block from PrePrepare message") return false } - - ok, err := pubkey.Verify(msg.Block.Header.Hash, msg.Block.Header.Signature) - if err != nil { - logrus.Errorf("failed to verify block signature: %s", err.Error()) - return false - } - if !ok { - logrus.Errorf("signature of block %x is invalid", msg.Block.Header.Hash) - return false - } - ///////////////////////////////// - - // === check last hash merkle proof === - latestHeight, err := cv.blockchain.GetLatestBlockHeight() - if err != nil { - logrus.Error(err) - return false - } - previousBlockHeader, err := cv.blockchain.FetchBlockHeaderByHeight(latestHeight) - if err != nil { - logrus.Error(err) - return false - } - if bytes.Compare(msg.Block.Header.LastHash, previousBlockHeader.Hash) != 0 { - logrus.Errorf("block header has invalid last block hash (expected: %x, actual %x)", previousBlockHeader.Hash, msg.Block.Header.LastHash) - return false - } - - verified, err := merkletree.VerifyProofUsing(previousBlockHeader.Hash, false, msg.Block.Header.LastHashProof, [][]byte{msg.Block.Header.Hash}, keccak256.New()) - if err != nil { - logrus.Error("failed to verify last block hash merkle proof: %s", err.Error()) - return false - } - if !verified { - logrus.Error("merkle hash of current block doesn't contain hash of previous block: %s", err.Error()) - return false - } - ///////////////////////////////// - - // === verify election proof wincount preliminarily === - if msg.Block.Header.ElectionProof.WinCount < 1 { - logrus.Error("miner isn't a winner!") - return false - } - ///////////////////////////////// - - // === verify miner's eligibility to propose this task === - err = cv.miner.IsMinerEligibleToProposeBlock(msg.Block.Header.ProposerEth) - if err != nil { - logrus.Errorf("miner is not eligible to propose block: %v", err) - return false - } - ///////////////////////////////// - - // === verify election proof vrf === - proposerBuf, err := msg.Block.Header.Proposer.MarshalBinary() - if err != nil { - logrus.Error(err) - return false - } - - res, err := b.Beacon.Entry(context.TODO(), msg.Block.Header.ElectionProof.RandomnessRound) - if err != nil { - logrus.Error(err) - return false - } - eproofRandomness, err := DrawRandomness( - res.Data, - crypto.DomainSeparationTag_ElectionProofProduction, - msg.Block.Header.Height, - proposerBuf, - ) - if err != nil { - logrus.Errorf("failed to draw ElectionProof randomness: %s", err.Error()) - return false - } - err = VerifyVRF(*msg.Block.Header.Proposer, eproofRandomness, msg.Block.Header.ElectionProof.VRFProof) - if err != nil { - logrus.Errorf("failed to verify election proof vrf: %v", err) - return false - } - ////////////////////////////////////// - - // === compute wincount locally and verify values === - mStake, nStake, err := cv.miner.GetStakeInfo(msg.Block.Header.ProposerEth) - if err != nil { - logrus.Errorf("failed to get miner stake: %v", err) - return false - } - actualWinCount := msg.Block.Header.ElectionProof.ComputeWinCount(mStake, nStake) - if msg.Block.Header.ElectionProof.WinCount != actualWinCount { - logrus.Errorf("locally computed wincount of block %x isn't matching received value!", msg.Block.Header.Hash) - return false - } - ////////////////////////////////////// - - // === validate block transactions === - result := make(chan error) - var wg sync.WaitGroup - for _, v := range msg.Block.Data { - wg.Add(1) - go func(v *types3.Transaction, c chan error) { - defer wg.Done() - if err := utils.VerifyTx(msg.Block.Header, v); err != nil { - c <- fmt.Errorf("failed to verify tx: %w", err) - return - } - - var task types.DioneTask - err = cbor.Unmarshal(v.Data, &task) - if err != nil { - c <- fmt.Errorf("failed to unmarshal transaction payload: %w", err) - return - } - - if validationFunc := validation.GetValidationMethod(task.OriginChain, task.RequestType); validationFunc != nil { - if err := validationFunc(&task); err != nil { - c <- fmt.Errorf("payload validation has been failed: %w", err) - return - } - } else { - logrus.WithFields(logrus.Fields{ - "originChain": task.OriginChain, - "requestType": task.RequestType, - }).Debug("This origin chain/request type doesn't have any payload validation!") - } - }(v, result) - } - go func() { - wg.Wait() - close(result) - }() - for err := range result { - if err != nil { - logrus.Error(err) - return false - } - } - ///////////////////////////////// - return true }, - types2.ConsensusMessageTypePrepare: func(msg types2.ConsensusMessage, metadata map[string]interface{}) bool { - pubKey, err := msg.From.ExtractPublicKey() - if err != nil { - // TODO logging - return false - } - ok, err := pubKey.Verify(msg.Blockhash, msg.Signature) - if err != nil { - // TODO logging - return false - } - return ok - }, - types2.ConsensusMessageTypeCommit: func(msg types2.ConsensusMessage, metadata map[string]interface{}) bool { - pubKey, err := msg.From.ExtractPublicKey() - if err != nil { - // TODO logging - return false - } - ok, err := pubKey.Verify(msg.Blockhash, msg.Signature) - if err != nil { - // TODO logging - return false - } - return ok - }, + types2.ConsensusMessageTypePrepare: checkSignatureForBlockhash, + types2.ConsensusMessageTypeCommit: checkSignatureForBlockhash, } return cv } -func (cv *ConsensusValidator) Valid(msg types2.ConsensusMessage, metadata map[string]interface{}) bool { - return cv.validationFuncMap[msg.Type](msg, metadata) +func (cv *ConsensusValidator) Valid(msg types2.ConsensusMessage) bool { + return cv.validationFuncMap[msg.Type](msg) +} + +func checkSignatureForBlockhash(msg types2.ConsensusMessage) bool { + pubKey, err := msg.From.ExtractPublicKey() + if err != nil { + // TODO logging + return false + } + ok, err := pubKey.Verify(msg.Blockhash, msg.Signature) + if err != nil { + // TODO logging + return false + } + return ok } diff --git a/consensus/utils.go b/consensus/utils.go index 7d62f77..37e7e85 100644 --- a/consensus/utils.go +++ b/consensus/utils.go @@ -1,9 +1,7 @@ package consensus import ( - "encoding/binary" "fmt" - "math/big" "github.com/fxamacker/cbor/v2" @@ -11,87 +9,9 @@ import ( types2 "github.com/Secured-Finance/dione/consensus/types" - "github.com/minio/blake2b-simd" - - "github.com/libp2p/go-libp2p-core/peer" - - "github.com/Secured-Finance/dione/types" - crypto2 "github.com/filecoin-project/go-state-types/crypto" "github.com/libp2p/go-libp2p-core/crypto" - "golang.org/x/xerrors" ) -type SignFunc func(peer.ID, []byte) (*types.Signature, error) - -func ComputeVRF(privKey crypto.PrivKey, sigInput []byte) ([]byte, error) { - return privKey.Sign(sigInput) -} - -func VerifyVRF(worker peer.ID, vrfBase, vrfproof []byte) error { - pk, err := worker.ExtractPublicKey() - if err != nil { - return err - } - ok, err := pk.Verify(vrfBase, vrfproof) - if err != nil { - return err - } - if !ok { - return fmt.Errorf("vrf was invalid") - } - - return nil -} - -func IsRoundWinner(round uint64, - worker peer.ID, randomness []byte, randomnessRound uint64, minerStake, networkStake *big.Int, privKey crypto.PrivKey) (*types.ElectionProof, error) { - - buf, err := worker.MarshalBinary() - if err != nil { - return nil, fmt.Errorf("failed to marshal address: %w", err) - } - - electionRand, err := DrawRandomness(randomness, crypto2.DomainSeparationTag_ElectionProofProduction, round, buf) - if err != nil { - return nil, fmt.Errorf("failed to draw randomness: %w", err) - } - - vrfout, err := ComputeVRF(privKey, electionRand) - if err != nil { - return nil, fmt.Errorf("failed to compute VRF: %w", err) - } - - ep := &types.ElectionProof{VRFProof: vrfout, RandomnessRound: randomnessRound} - j := ep.ComputeWinCount(minerStake, networkStake) - ep.WinCount = j - if j < 1 { - return nil, nil - } - - return ep, nil -} - -func DrawRandomness(rbase []byte, pers crypto2.DomainSeparationTag, round uint64, entropy []byte) ([]byte, error) { - h := blake2b.New256() - if err := binary.Write(h, binary.BigEndian, int64(pers)); err != nil { - return nil, xerrors.Errorf("deriving randomness: %v", err) - } - VRFDigest := blake2b.Sum256(rbase) - _, err := h.Write(VRFDigest[:]) - if err != nil { - return nil, xerrors.Errorf("hashing VRFDigest: %w", err) - } - if err := binary.Write(h, binary.BigEndian, round); err != nil { - return nil, xerrors.Errorf("deriving randomness: %v", err) - } - _, err = h.Write(entropy) - if err != nil { - return nil, xerrors.Errorf("hashing entropy: %v", err) - } - - return h.Sum(nil), nil -} - func NewMessage(cmsg types2.ConsensusMessage, typ types2.ConsensusMessageType, privKey crypto.PrivKey) (*pubsub.PubSubMessage, error) { var message pubsub.PubSubMessage switch typ { diff --git a/node/node.go b/node/node.go index 747053d..786ba34 100644 --- a/node/node.go +++ b/node/node.go @@ -65,7 +65,7 @@ type Node struct { Config *config.Config Ethereum *ethclient.EthereumClient ConsensusManager *consensus.PBFTConsensusManager - Miner *consensus.Miner + Miner *blockchain.Miner Beacon beacon.BeaconNetwork DisputeManager *consensus.DisputeManager BlockPool *pool.BlockPool @@ -104,9 +104,10 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim // initialize ethereum client ethClient, err := provideEthereumClient(n.Config) if err != nil { - logrus.Fatal(err) + logrus.WithField("err", err.Error()).Fatal("Failed to initialize Ethereum client") } n.Ethereum = ethClient + //goland:noinspection ALL logrus.WithField("ethAddress", ethClient.GetEthAddress().Hex()).Info("Ethereum client has been initialized!") // initialize blockchain rpc clients @@ -135,21 +136,16 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim n.PeerDiscovery = peerDiscovery logrus.Info("Peer discovery subsystem has been initialized!") - // initialize event log cache subsystem - //c := provideCache(config) - //n.Cache = c - //logrus.Info("Event cache subsystem has initialized!") + // initialize random beacon network subsystem + randomBeaconNetwork, err := provideBeacon(psb.Pubsub, bus) + if err != nil { + logrus.Fatal(err) + } + n.Beacon = randomBeaconNetwork + logrus.Info("Random beacon subsystem has been initialized!") // == initialize blockchain modules - // initialize blockpool database - bc, err := provideBlockChain(n.Config, bus) - if err != nil { - logrus.Fatalf("Failed to initialize blockpool: %s", err.Error()) - } - n.BlockChain = bc - logrus.Info("Block pool database has been successfully initialized!") - // initialize mempool mp, err := provideMemPool(bus) if err != nil { @@ -158,6 +154,19 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim n.MemPool = mp logrus.Info("Mempool has been successfully initialized!") + // initialize mining subsystem + miner := provideMiner(n.Host.ID(), *n.Ethereum.GetEthAddress(), n.Ethereum, prvKey, mp) + n.Miner = miner + logrus.Info("Mining subsystem has been initialized!") + + // initialize blockpool database + bc, err := provideBlockChain(n.Config, bus, miner, randomBeaconNetwork.Beacon) + if err != nil { + logrus.Fatalf("Failed to initialize blockpool: %s", err.Error()) + } + n.BlockChain = bc + logrus.Info("Block pool database has been successfully initialized!") + bp, err := provideBlockPool(mp, bus) if err != nil { logrus.Fatalf("Failed to initialize blockpool: %s", err.Error()) @@ -192,21 +201,8 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim n.SyncManager = sm logrus.Info("Blockchain sync subsystem has been successfully initialized!") - // initialize mining subsystem - miner := provideMiner(n.Host.ID(), *n.Ethereum.GetEthAddress(), n.Ethereum, prvKey, mp) - n.Miner = miner - logrus.Info("Mining subsystem has been initialized!") - - // initialize random beacon network subsystem - randomBeaconNetwork, err := provideBeacon(psb.Pubsub, bus) - if err != nil { - logrus.Fatal(err) - } - n.Beacon = randomBeaconNetwork - logrus.Info("Random beacon subsystem has been initialized!") - // initialize consensus subsystem - consensusManager := provideConsensusManager(bus, psb, miner, bc, ethClient, prvKey, n.Config.ConsensusMinApprovals, bp, randomBeaconNetwork, mp) + consensusManager := provideConsensusManager(bus, psb, miner, bc, ethClient, prvKey, n.Config.ConsensusMinApprovals, bp, randomBeaconNetwork, mp, n.Host.ID()) n.ConsensusManager = consensusManager logrus.Info("Consensus subsystem has been initialized!") diff --git a/node/node_dep_providers.go b/node/node_dep_providers.go index 9a066c7..e221b28 100644 --- a/node/node_dep_providers.go +++ b/node/node_dep_providers.go @@ -60,8 +60,8 @@ func provideDisputeManager(ctx context.Context, ethClient *ethclient.EthereumCli return consensus.NewDisputeManager(ctx, ethClient, pcm, cfg.Ethereum.DisputeVoteWindow, bc) } -func provideMiner(peerID peer.ID, ethAddress common.Address, ethClient *ethclient.EthereumClient, privateKey crypto.PrivKey, mempool *pool.Mempool) *consensus.Miner { - return consensus.NewMiner(peerID, ethAddress, ethClient, privateKey, mempool) +func provideMiner(peerID peer.ID, ethAddress common.Address, ethClient *ethclient.EthereumClient, privateKey crypto.PrivKey, mempool *pool.Mempool) *blockchain.Miner { + return blockchain.NewMiner(peerID, ethAddress, ethClient, privateKey, mempool) } func provideBeacon(ps *pubsub2.PubSub, bus EventBus.Bus) (beacon.BeaconNetwork, error) { @@ -94,7 +94,7 @@ func provideEthereumClient(config *config.Config) (*ethclient.EthereumClient, er ethereum := ethclient.NewEthereumClient() err := ethereum.Initialize(&config.Ethereum) if err != nil { - return nil, xerrors.Errorf("failed to initialize ethereum client: %v", err) + return nil, err } return ethereum, nil } @@ -106,7 +106,7 @@ func providePubsubRouter(lhost host.Host, config *config.Config) *pubsub.PubSubR func provideConsensusManager( bus EventBus.Bus, psb *pubsub.PubSubRouter, - miner *consensus.Miner, + miner *blockchain.Miner, bc *blockchain.BlockChain, ethClient *ethclient.EthereumClient, privateKey crypto.PrivKey, @@ -114,6 +114,7 @@ func provideConsensusManager( bp *pool.BlockPool, b beacon.BeaconNetwork, mp *pool.Mempool, + address peer.ID, ) *consensus.PBFTConsensusManager { return consensus.NewPBFTConsensusManager( bus, @@ -126,6 +127,7 @@ func provideConsensusManager( bp, b, mp, + address, ) } @@ -176,8 +178,8 @@ func providePeerDiscovery(baddrs []multiaddr.Multiaddr, h host.Host, pexDiscover return pexDiscovery, nil } -func provideBlockChain(config *config.Config, bus EventBus.Bus) (*blockchain.BlockChain, error) { - return blockchain.NewBlockChain(config.Blockchain.DatabasePath, bus) +func provideBlockChain(config *config.Config, bus EventBus.Bus, miner *blockchain.Miner, b beacon.BeaconAPI) (*blockchain.BlockChain, error) { + return blockchain.NewBlockChain(config.Blockchain.DatabasePath, bus, miner, b) } func provideMemPool(bus EventBus.Bus) (*pool.Mempool, error) {