From ea9ceaeda9461d771c060144011d4c6d53f686e0 Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Fri, 11 Jun 2021 14:40:32 +0300 Subject: [PATCH] Implement block validation at PREPREPARE stage --- blockchain/pool/blockpool.go | 12 +- blockchain/sync/sync_mgr.go | 41 ++-- blockchain/types/block.go | 3 +- blockchain/utils/verification.go | 27 +++ consensus/consensus.go | 27 ++- consensus/consensus_validator.go | 266 ++++++++++++++-------- consensus/miner.go | 8 +- consensus/utils.go | 18 +- consensus/validation/filecoin/filecoin.go | 46 ++-- consensus/validation/payload_validator.go | 8 +- consensus/validation/solana/solana.go | 17 ++ consensus/validation/utils.go | 24 ++ go.mod | 1 + go.sum | 2 + node/node.go | 16 +- node/node_dep_providers.go | 10 +- 16 files changed, 355 insertions(+), 171 deletions(-) create mode 100644 blockchain/utils/verification.go create mode 100644 consensus/validation/solana/solana.go create mode 100644 consensus/validation/utils.go diff --git a/blockchain/pool/blockpool.go b/blockchain/pool/blockpool.go index 5727464..48c3296 100644 --- a/blockchain/pool/blockpool.go +++ b/blockchain/pool/blockpool.go @@ -9,13 +9,16 @@ import ( // BlockPool is pool for blocks that isn't not validated or committed yet type BlockPool struct { + mempool *Mempool knownBlocks cache.Cache acceptedBlocks cache.Cache } -func NewBlockPool() (*BlockPool, error) { +func NewBlockPool(mp *Mempool) (*BlockPool, error) { bp := &BlockPool{ acceptedBlocks: cache.NewInMemoryCache(), // here we need to use separate cache + knownBlocks: cache.NewInMemoryCache(), + mempool: mp, } return bp, nil @@ -51,7 +54,12 @@ func (bp *BlockPool) GetAllAcceptedBlocks() []*types.Block { // PruneAcceptedBlocks cleans accepted blocks list. It is called when new consensus round starts. func (bp *BlockPool) PruneAcceptedBlocks() { - for k := range bp.acceptedBlocks.Items() { + for k, v := range bp.acceptedBlocks.Items() { + block := v.(*types.Block) + for _, v := range block.Data { + v.MerkleProof = nil + bp.mempool.StoreTx(v) // return transactions back to mempool + } bp.acceptedBlocks.Delete(k) } } diff --git a/blockchain/sync/sync_mgr.go b/blockchain/sync/sync_mgr.go index 8e88855..08fca6e 100644 --- a/blockchain/sync/sync_mgr.go +++ b/blockchain/sync/sync_mgr.go @@ -8,6 +8,10 @@ import ( "strings" "sync" + "github.com/asaskevich/EventBus" + + "github.com/Secured-Finance/dione/blockchain/utils" + "github.com/Secured-Finance/dione/blockchain" "github.com/Secured-Finance/dione/pubsub" @@ -42,11 +46,13 @@ type syncManager struct { bootstrapPeer peer.ID rpcClient *gorpc.Client psb *pubsub.PubSubRouter + bus EventBus.Bus } -func NewSyncManager(bp *blockchain.BlockChain, mp *pool.Mempool, p2pRPCClient *gorpc.Client, bootstrapPeer peer.ID, psb *pubsub.PubSubRouter) SyncManager { +func NewSyncManager(bus EventBus.Bus, bp *blockchain.BlockChain, mp *pool.Mempool, p2pRPCClient *gorpc.Client, bootstrapPeer peer.ID, psb *pubsub.PubSubRouter) SyncManager { ctx, cancelFunc := context.WithCancel(context.Background()) sm := &syncManager{ + bus: bus, blockpool: bp, mempool: mp, ctx: ctx, @@ -59,9 +65,26 @@ func NewSyncManager(bp *blockchain.BlockChain, mp *pool.Mempool, p2pRPCClient *g psb.Hook(pubsub.NewTxMessageType, sm.onNewTransaction, types2.Transaction{}) + go func() { + if err := sm.initialSync(); err != nil { + logrus.Error(err) + } + }() + return sm } +func (sm *syncManager) initialSync() error { + if err := sm.doInitialBlockPoolSync(); err != nil { + return err + } + if err := sm.doInitialMempoolSync(); err != nil { + return err + } + sm.bus.Publish("sync:initialSyncCompleted") + return nil +} + func (sm *syncManager) doInitialBlockPoolSync() error { if sm.initialSyncCompleted { return nil @@ -173,7 +196,7 @@ func (sm *syncManager) doInitialMempoolSync() error { logrus.Warnf(err.Error()) } } - // FIXME handle not found transactions + // TODO handle not found transactions } return nil @@ -198,18 +221,8 @@ func (sm *syncManager) processReceivedBlock(block types2.Block) error { // check if hashes of block transactions are present in the block hash merkle tree for _, tx := range block.Data { // FIXME we need to do something with rejected txs - if tx.MerkleProof == nil { - return fmt.Errorf("block transaction hasn't merkle proof") - } - txProofVerified, err := merkletree.VerifyProofUsing(tx.Hash, false, tx.MerkleProof, [][]byte{block.Header.Hash}, keccak256.New()) - if err != nil { - return fmt.Errorf("failed to verify tx hash merkle proof: %s", err.Error()) - } - if !txProofVerified { - return fmt.Errorf("transaction doesn't present in block hash merkle tree") - } - if !tx.ValidateHash() { - return fmt.Errorf("transaction hash is invalid") + if err := utils.VerifyTx(block.Header, tx); err != nil { + return err } } diff --git a/blockchain/types/block.go b/blockchain/types/block.go index 09af79d..ff0877e 100644 --- a/blockchain/types/block.go +++ b/blockchain/types/block.go @@ -44,7 +44,7 @@ func GenesisBlock() *Block { } } -func CreateBlock(lastBlockHeader *BlockHeader, txs []*Transaction, minerEth common.Address, privateKey crypto.PrivKey) (*Block, error) { +func CreateBlock(lastBlockHeader *BlockHeader, txs []*Transaction, minerEth common.Address, privateKey crypto.PrivKey, eproof *types.ElectionProof) (*Block, error) { timestamp := time.Now().Unix() // extract hashes from transactions @@ -88,6 +88,7 @@ func CreateBlock(lastBlockHeader *BlockHeader, txs []*Transaction, minerEth comm Hash: blockHash, LastHash: lastBlockHeader.Hash, LastHashProof: lastHashProof, + ElectionProof: eproof, }, Data: txs, } diff --git a/blockchain/utils/verification.go b/blockchain/utils/verification.go new file mode 100644 index 0000000..bd5f0ba --- /dev/null +++ b/blockchain/utils/verification.go @@ -0,0 +1,27 @@ +package utils + +import ( + "fmt" + + "github.com/Secured-Finance/dione/blockchain/types" + "github.com/wealdtech/go-merkletree" + "github.com/wealdtech/go-merkletree/keccak256" +) + +func VerifyTx(blockHeader *types.BlockHeader, tx *types.Transaction) error { + if tx.MerkleProof == nil { + return fmt.Errorf("block transaction hasn't merkle proof") + } + txProofVerified, err := merkletree.VerifyProofUsing(tx.Hash, false, tx.MerkleProof, [][]byte{blockHeader.Hash}, keccak256.New()) + if err != nil { + return fmt.Errorf("failed to verify tx hash merkle proof: %s", err.Error()) + } + if !txProofVerified { + return fmt.Errorf("transaction doesn't present in block hash merkle tree") + } + if !tx.ValidateHash() { + return fmt.Errorf("transaction hash is invalid") + } + + return nil +} diff --git a/consensus/consensus.go b/consensus/consensus.go index 9139890..861ca22 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -5,6 +5,8 @@ import ( "math/big" "sync" + "github.com/asaskevich/EventBus" + "github.com/Secured-Finance/dione/blockchain" "github.com/drand/drand/client" @@ -37,6 +39,7 @@ const ( type PBFTConsensusManager struct { phony.Inbox + bus EventBus.Bus psb *pubsub.PubSubRouter minApprovals int // FIXME privKey crypto.PrivKey @@ -55,21 +58,29 @@ type State struct { randomness []byte blockHeight uint64 status StateStatus + ready chan bool } -func NewPBFTConsensusManager(psb *pubsub.PubSubRouter, minApprovals int, privKey crypto.PrivKey, ethereumClient *ethclient.EthereumClient, miner *Miner) *PBFTConsensusManager { +func NewPBFTConsensusManager(bus EventBus.Bus, psb *pubsub.PubSubRouter, minApprovals int, privKey crypto.PrivKey, ethereumClient *ethclient.EthereumClient, miner *Miner, bc *blockchain.BlockChain) *PBFTConsensusManager { pcm := &PBFTConsensusManager{} pcm.psb = psb pcm.miner = miner - pcm.validator = NewConsensusValidator(miner) + pcm.validator = NewConsensusValidator(miner, bc) pcm.msgLog = NewConsensusMessageLog() pcm.minApprovals = minApprovals pcm.privKey = privKey pcm.ethereumClient = ethereumClient - pcm.state = &State{} + pcm.state = &State{ + ready: make(chan bool, 1), + status: StateStatusUnknown, + } + pcm.bus = bus pcm.psb.Hook(pubsub.PrePrepareMessageType, pcm.handlePrePrepare, types.PrePrepareMessage{}) pcm.psb.Hook(pubsub.PrepareMessageType, pcm.handlePrepare, types.PrepareMessage{}) pcm.psb.Hook(pubsub.CommitMessageType, pcm.handleCommit, types.CommitMessage{}) + bus.SubscribeOnce("sync:initialSyncCompleted", func() { + pcm.state.ready <- true + }) return pcm } @@ -104,11 +115,13 @@ func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.GenericMessage Block: prePrepare.Block, } + <-pcm.state.ready + if pcm.msgLog.Exists(cmsg) { logrus.Debugf("received existing pre_prepare msg, dropping...") return } - if !pcm.validator.Valid(cmsg) { + if !pcm.validator.Valid(cmsg, map[string]interface{}{"randomness": pcm.state.randomness}) { logrus.Warn("received invalid pre_prepare msg, dropping...") return } @@ -158,7 +171,7 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.GenericMessage) { logrus.Debugf("received existing prepare msg, dropping...") return } - if !pcm.validator.Valid(cmsg) { + if !pcm.validator.Valid(cmsg, nil) { logrus.Warn("received invalid prepare msg, dropping...") return } @@ -207,7 +220,7 @@ func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.GenericMessage) { logrus.Debugf("received existing commit msg, dropping...") return } - if !pcm.validator.Valid(cmsg) { + if !pcm.validator.Valid(cmsg, nil) { logrus.Warn("received invalid commit msg, dropping...") return } @@ -235,7 +248,7 @@ func (pcm *PBFTConsensusManager) NewDrandRound(from phony.Actor, res client.Resu return } - minedBlock, err := pcm.miner.MineBlock(res.Randomness(), res.Round(), block.Header) + minedBlock, err := pcm.miner.MineBlock(res.Randomness(), block.Header) if err != nil { logrus.Errorf("Failed to mine the block: %s", err.Error()) return diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index 60aab94..1c43a24 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -1,114 +1,180 @@ package consensus import ( + "bytes" + "fmt" + "sync" + + types3 "github.com/Secured-Finance/dione/blockchain/types" + + "github.com/Secured-Finance/dione/blockchain" + "github.com/Secured-Finance/dione/blockchain/utils" types2 "github.com/Secured-Finance/dione/consensus/types" + "github.com/Secured-Finance/dione/consensus/validation" + "github.com/Secured-Finance/dione/types" + "github.com/filecoin-project/go-state-types/crypto" + "github.com/fxamacker/cbor/v2" + "github.com/sirupsen/logrus" + "github.com/wealdtech/go-merkletree" + "github.com/wealdtech/go-merkletree/keccak256" ) type ConsensusValidator struct { - validationFuncMap map[types2.ConsensusMessageType]func(msg types2.ConsensusMessage) bool + validationFuncMap map[types2.ConsensusMessageType]func(msg types2.ConsensusMessage, metadata map[string]interface{}) bool miner *Miner + blockchain *blockchain.BlockChain } -func NewConsensusValidator(miner *Miner) *ConsensusValidator { +func NewConsensusValidator(miner *Miner, bc *blockchain.BlockChain) *ConsensusValidator { cv := &ConsensusValidator{ - miner: miner, + miner: miner, + blockchain: bc, } - cv.validationFuncMap = map[types2.ConsensusMessageType]func(msg types2.ConsensusMessage) bool{ + cv.validationFuncMap = map[types2.ConsensusMessageType]func(msg types2.ConsensusMessage, metadata map[string]interface{}) bool{ // FIXME it all - //types2.ConsensusMessageTypePrePrepare: func(msg types2.PrePrepareMessage) bool { - // // TODO here we need to do validation of block itself - // - // // === verify task signature === - // err := VerifyTaskSignature(msg.Task) - // if err != nil { - // logrus.Errorf("unable to verify signature: %v", err) - // return false - // } - // ///////////////////////////////// - // - // // === verify if request exists in cache === - // var requestEvent *dioneOracle.DioneOracleNewOracleRequest - // err = cv.cache.Get("request_"+msg.Task.RequestID, &requestEvent) - // if err != nil { - // logrus.Errorf("the request doesn't exist in the cache or has been failed to decode: %v", err) - // return false - // } - // - // if requestEvent.OriginChain != msg.Task.OriginChain || - // requestEvent.RequestType != msg.Task.RequestType || - // requestEvent.RequestParams != msg.Task.RequestParams { - // - // logrus.Errorf("the incoming task and cached request requestEvent don't match!") - // return false - // } - // ///////////////////////////////// - // - // // === verify election proof wincount preliminarily === - // if msg.Task.ElectionProof.WinCount < 1 { - // logrus.Error("miner isn't a winner!") - // return false - // } - // ///////////////////////////////// - // - // // === verify miner's eligibility to propose this task === - // err = cv.miner.IsMinerEligibleToProposeBlock(common.HexToAddress(msg.Task.MinerEth)) - // if err != nil { - // logrus.Errorf("miner is not eligible to propose task: %v", err) - // return false - // } - // ///////////////////////////////// - // - // // === verify election proof vrf === - // minerAddressMarshalled, err := msg.Task.Miner.MarshalBinary() - // if err != nil { - // logrus.Errorf("failed to marshal miner address: %v", err) - // return false - // } - // electionProofRandomness, err := DrawRandomness( - // msg.Task.BeaconEntries[1].Data, - // crypto.DomainSeparationTag_ElectionProofProduction, - // msg.Task.DrandRound, - // minerAddressMarshalled, - // ) - // if err != nil { - // logrus.Errorf("failed to draw electionProofRandomness: %v", err) - // return false - // } - // err = VerifyVRF(msg.Task.Miner, electionProofRandomness, msg.Task.ElectionProof.VRFProof) - // if err != nil { - // logrus.Errorf("failed to verify election proof vrf: %v", err) - // } - // ////////////////////////////////////// - // - // // === compute wincount locally and verify values === - // mStake, nStake, err := cv.miner.GetStakeInfo(common.HexToAddress(msg.Task.MinerEth)) - // if err != nil { - // logrus.Errorf("failed to get miner stake: %v", err) - // return false - // } - // actualWinCount := msg.Task.ElectionProof.ComputeWinCount(*mStake, *nStake) - // if msg.Task.ElectionProof.WinCount != actualWinCount { - // logrus.Errorf("locally computed wincount isn't matching received value!", err) - // return false - // } - // ////////////////////////////////////// - // - // // === validate payload by specific-chain checks === - // if validationFunc := validation.GetValidationMethod(msg.Task.OriginChain, msg.Task.RequestType); validationFunc != nil { - // err := validationFunc(msg.Task.Payload) - // if err != nil { - // logrus.Errorf("payload validation has failed: %v", err) - // return false - // } - // } else { - // logrus.Debugf("Origin chain [%v]/request type[%v] doesn't have any payload validation!", msg.Task.OriginChain, msg.Task.RequestType) - // } - // ///////////////////////////////// - // - // return true - //}, - types2.ConsensusMessageTypePrepare: func(msg types2.ConsensusMessage) bool { + types2.ConsensusMessageTypePrePrepare: func(msg types2.ConsensusMessage, metadata map[string]interface{}) bool { + // === verify block signature === + pubkey, err := msg.Block.Header.Proposer.ExtractPublicKey() + if err != nil { + logrus.Errorf("unable to extract public key from block proposer's peer id: %s", err.Error()) + return false + } + + ok, err := pubkey.Verify(msg.Block.Header.Hash, msg.Block.Header.Signature) + if err != nil { + logrus.Errorf("failed to verify block signature: %s", err.Error()) + return false + } + if !ok { + logrus.Errorf("signature of block %x is invalid", msg.Block.Header.Hash) + return false + } + ///////////////////////////////// + + // === check last hash merkle proof === + latestHeight, err := cv.blockchain.GetLatestBlockHeight() + if err != nil { + logrus.Error(err) + return false + } + previousBlockHeader, err := cv.blockchain.FetchBlockHeaderByHeight(latestHeight) + if err != nil { + logrus.Error(err) + return false + } + if bytes.Compare(msg.Block.Header.LastHash, previousBlockHeader.Hash) != 0 { + logrus.Error("block header has invalid last block hash") + return false + } + + verified, err := merkletree.VerifyProofUsing(previousBlockHeader.Hash, false, msg.Block.Header.LastHashProof, [][]byte{msg.Block.Header.Hash}, keccak256.New()) + if err != nil { + logrus.Error("failed to verify last block hash merkle proof: %s", err.Error()) + return false + } + if !verified { + logrus.Error("merkle hash of current block doesn't contain hash of previous block: %s", err.Error()) + return false + } + ///////////////////////////////// + + // === verify election proof wincount preliminarily === + if msg.Block.Header.ElectionProof.WinCount < 1 { + logrus.Error("miner isn't a winner!") + return false + } + ///////////////////////////////// + + // === verify miner's eligibility to propose this task === + err = cv.miner.IsMinerEligibleToProposeBlock(msg.Block.Header.ProposerEth) + if err != nil { + logrus.Errorf("miner is not eligible to propose block: %v", err) + return false + } + ///////////////////////////////// + + // === verify election proof vrf === + proposerBuf, err := msg.Block.Header.Proposer.MarshalBinary() + if err != nil { + logrus.Error(err) + return false + } + + eproofRandomness, err := DrawRandomness( + metadata["randomness"].([]byte), + crypto.DomainSeparationTag_ElectionProofProduction, + msg.Block.Header.Height, + proposerBuf, + ) + if err != nil { + logrus.Errorf("failed to draw ElectionProof randomness: %s", err.Error()) + return false + } + err = VerifyVRF(msg.Block.Header.Proposer, eproofRandomness, msg.Block.Header.ElectionProof.VRFProof) + if err != nil { + logrus.Errorf("failed to verify election proof vrf: %v", err) + return false + } + ////////////////////////////////////// + + // === compute wincount locally and verify values === + mStake, nStake, err := cv.miner.GetStakeInfo(msg.Block.Header.ProposerEth) + if err != nil { + logrus.Errorf("failed to get miner stake: %v", err) + return false + } + actualWinCount := msg.Block.Header.ElectionProof.ComputeWinCount(mStake, nStake) + if msg.Block.Header.ElectionProof.WinCount != actualWinCount { + logrus.Errorf("locally computed wincount of block %x isn't matching received value!", msg.Block.Header.Hash) + return false + } + ////////////////////////////////////// + + // === validate block transactions === + result := make(chan error) + var wg sync.WaitGroup + for _, v := range msg.Block.Data { + wg.Add(1) + go func(v *types3.Transaction, c chan error) { + if err := utils.VerifyTx(msg.Block.Header, v); err != nil { + c <- fmt.Errorf("failed to verify tx: %w", err) + return + } + + var task types.DioneTask + err = cbor.Unmarshal(v.Data, &task) + if err != nil { + c <- fmt.Errorf("failed to unmarshal transaction payload: %w", err) + return + } + + if validationFunc := validation.GetValidationMethod(task.OriginChain, task.RequestType); validationFunc != nil { + if err := validationFunc(&task); err != nil { + c <- fmt.Errorf("payload validation has been failed: %w", err) + return + } + } else { + logrus.Debugf("Origin chain [%v]/request type[%v] doesn't have any payload validation!", task.OriginChain, task.RequestType) + } + wg.Done() + }(v, result) + } + go func() { + wg.Wait() + close(result) + }() + for err := range result { + if err != nil { + logrus.Error(err) + return false + } + } + ///////////////////////////////// + + return true + }, + types2.ConsensusMessageTypePrepare: func(msg types2.ConsensusMessage, metadata map[string]interface{}) bool { pubKey, err := msg.From.ExtractPublicKey() if err != nil { // TODO logging @@ -121,7 +187,7 @@ func NewConsensusValidator(miner *Miner) *ConsensusValidator { } return ok }, - types2.ConsensusMessageTypeCommit: func(msg types2.ConsensusMessage) bool { + types2.ConsensusMessageTypeCommit: func(msg types2.ConsensusMessage, metadata map[string]interface{}) bool { pubKey, err := msg.From.ExtractPublicKey() if err != nil { // TODO logging @@ -139,6 +205,6 @@ func NewConsensusValidator(miner *Miner) *ConsensusValidator { return cv } -func (cv *ConsensusValidator) Valid(msg types2.ConsensusMessage) bool { - return cv.validationFuncMap[msg.Type](msg) +func (cv *ConsensusValidator) Valid(msg types2.ConsensusMessage, metadata map[string]interface{}) bool { + return cv.validationFuncMap[msg.Type](msg, metadata) } diff --git a/consensus/miner.go b/consensus/miner.go index b007e81..42a7317 100644 --- a/consensus/miner.go +++ b/consensus/miner.go @@ -85,15 +85,15 @@ func (m *Miner) GetStakeInfo(miner common.Address) (*big.Int, *big.Int, error) { return mStake, nStake, nil } -func (m *Miner) MineBlock(randomness []byte, drandRound uint64, lastBlockHeader *types2.BlockHeader) (*types2.Block, error) { - logrus.Debug("attempting to mine the block at epoch: ", drandRound) +func (m *Miner) MineBlock(randomness []byte, lastBlockHeader *types2.BlockHeader) (*types2.Block, error) { + logrus.Debug("attempting to mine the block at epoch: ", lastBlockHeader.Height+1) if err := m.UpdateCurrentStakeInfo(); err != nil { return nil, fmt.Errorf("failed to update miner stake: %w", err) } winner, err := IsRoundWinner( - drandRound, + lastBlockHeader.Height+1, m.address, randomness, m.minerStake, @@ -113,7 +113,7 @@ func (m *Miner) MineBlock(randomness []byte, drandRound uint64, lastBlockHeader return nil, fmt.Errorf("there is no txes for processing") // skip new consensus round because there is no transaction for processing } - newBlock, err := types2.CreateBlock(lastBlockHeader, txs, m.ethAddress, m.privateKey) + newBlock, err := types2.CreateBlock(lastBlockHeader, txs, m.ethAddress, m.privateKey, winner) if err != nil { return nil, fmt.Errorf("failed to create new block: %w", err) } diff --git a/consensus/utils.go b/consensus/utils.go index 76463ed..490e992 100644 --- a/consensus/utils.go +++ b/consensus/utils.go @@ -9,7 +9,6 @@ import ( types2 "github.com/Secured-Finance/dione/consensus/types" - "github.com/Secured-Finance/dione/sigs" "github.com/minio/blake2b-simd" "github.com/libp2p/go-libp2p-core/peer" @@ -27,9 +26,16 @@ func ComputeVRF(privKey crypto.PrivKey, sigInput []byte) ([]byte, error) { } func VerifyVRF(worker peer.ID, vrfBase, vrfproof []byte) error { - err := sigs.Verify(&types.Signature{Type: types.SigTypeEd25519, Data: vrfproof}, []byte(worker), vrfBase) + pk, err := worker.ExtractPublicKey() if err != nil { - return xerrors.Errorf("vrf was invalid: %w", err) + return err + } + ok, err := pk.Verify(vrfproof, vrfBase) + if err != nil { + return err + } + if !ok { + return fmt.Errorf("vrf was invalid") } return nil @@ -40,17 +46,17 @@ func IsRoundWinner(round uint64, buf, err := worker.MarshalBinary() if err != nil { - return nil, xerrors.Errorf("failed to marshal address: %w", err) + return nil, fmt.Errorf("failed to marshal address: %w", err) } electionRand, err := DrawRandomness(randomness, crypto2.DomainSeparationTag_ElectionProofProduction, round, buf) if err != nil { - return nil, xerrors.Errorf("failed to draw randomness: %w", err) + return nil, fmt.Errorf("failed to draw randomness: %w", err) } vrfout, err := ComputeVRF(privKey, electionRand) if err != nil { - return nil, xerrors.Errorf("failed to compute VRF: %w", err) + return nil, fmt.Errorf("failed to compute VRF: %w", err) } ep := &types.ElectionProof{VRFProof: vrfout} diff --git a/consensus/validation/filecoin/filecoin.go b/consensus/validation/filecoin/filecoin.go index 076f43d..fb5c867 100644 --- a/consensus/validation/filecoin/filecoin.go +++ b/consensus/validation/filecoin/filecoin.go @@ -1,39 +1,35 @@ package filecoin import ( - "bytes" + "github.com/Secured-Finance/dione/types" "github.com/Secured-Finance/dione/consensus/validation" rtypes "github.com/Secured-Finance/dione/rpc/types" - - ftypes "github.com/Secured-Finance/dione/rpc/filecoin/types" - "github.com/Secured-Finance/dione/sigs" - "github.com/sirupsen/logrus" - "golang.org/x/xerrors" ) -func ValidateGetTransaction(payload []byte) error { - var msg ftypes.SignedMessage - if err := msg.UnmarshalCBOR(bytes.NewReader(payload)); err != nil { - if err := msg.Message.UnmarshalCBOR(bytes.NewReader(payload)); err != nil { - return xerrors.Errorf("cannot unmarshal payload: %s", err.Error()) - } - } - - if msg.Type == ftypes.MessageTypeSecp256k1 { - if err := sigs.Verify(&msg.Signature, msg.Message.From.Bytes(), msg.Message.Cid().Bytes()); err != nil { - logrus.Errorf("Couldn't verify transaction %v", err) - return xerrors.Errorf("Couldn't verify transaction: %v") - } - return nil - } else { - // TODO: BLS Signature verification - return nil - } +func ValidateGetTransaction(task *types.DioneTask) error { + //var msg ftypes.SignedMessage + //if err := msg.UnmarshalCBOR(bytes.NewReader(payload)); err != nil { + // if err := msg.Message.UnmarshalCBOR(bytes.NewReader(payload)); err != nil { + // return xerrors.Errorf("cannot unmarshal payload: %s", err.Error()) + // } + //} + // + //if msg.Type == ftypes.MessageTypeSecp256k1 { + // if err := sigs.Verify(&msg.Signature, msg.Message.From.Bytes(), msg.Message.Cid().Bytes()); err != nil { + // logrus.Errorf("Couldn't verify transaction %v", err) + // return xerrors.Errorf("Couldn't verify transaction: %v") + // } + // return nil + //} else { + // // TODO: BLS Signature verification + // return nil + //} + return validation.VerifyExactMatching(task) } func init() { - validation.RegisterValidation(rtypes.RPCTypeFilecoin, map[string]func([]byte) error{ + validation.RegisterValidation(rtypes.RPCTypeFilecoin, map[string]func(*types.DioneTask) error{ "getTransaction": ValidateGetTransaction, }) } diff --git a/consensus/validation/payload_validator.go b/consensus/validation/payload_validator.go index 0eed568..145d35c 100644 --- a/consensus/validation/payload_validator.go +++ b/consensus/validation/payload_validator.go @@ -1,12 +1,14 @@ package validation -var validations = map[uint8]map[string]func([]byte) error{} // rpcType -> {rpcMethodName -> actual func var} +import "github.com/Secured-Finance/dione/types" -func RegisterValidation(typ uint8, methods map[string]func([]byte) error) { +var validations = map[uint8]map[string]func(*types.DioneTask) error{} // rpcType -> {rpcMethodName -> actual func var} + +func RegisterValidation(typ uint8, methods map[string]func(*types.DioneTask) error) { validations[typ] = methods } -func GetValidationMethod(typ uint8, methodName string) func([]byte) error { +func GetValidationMethod(typ uint8, methodName string) func(*types.DioneTask) error { rpcMethods, ok := validations[typ] if !ok { return nil diff --git a/consensus/validation/solana/solana.go b/consensus/validation/solana/solana.go new file mode 100644 index 0000000..96592a0 --- /dev/null +++ b/consensus/validation/solana/solana.go @@ -0,0 +1,17 @@ +package solana + +import ( + "github.com/Secured-Finance/dione/consensus/validation" + rtypes "github.com/Secured-Finance/dione/rpc/types" + "github.com/Secured-Finance/dione/types" +) + +func ValidateGetTransaction(task *types.DioneTask) error { + return validation.VerifyExactMatching(task) +} + +func init() { + validation.RegisterValidation(rtypes.RPCTypeSolana, map[string]func(*types.DioneTask) error{ + "getTransaction": ValidateGetTransaction, + }) +} diff --git a/consensus/validation/utils.go b/consensus/validation/utils.go new file mode 100644 index 0000000..44181fc --- /dev/null +++ b/consensus/validation/utils.go @@ -0,0 +1,24 @@ +package validation + +import ( + "bytes" + "fmt" + + "github.com/Secured-Finance/dione/rpc" + "github.com/Secured-Finance/dione/types" +) + +func VerifyExactMatching(task *types.DioneTask) error { + rpcMethod := rpc.GetRPCMethod(task.OriginChain, task.RequestType) + if rpcMethod == nil { + return fmt.Errorf("invalid RPC method") + } + res, err := rpcMethod(task.RequestParams) + if err != nil { + return fmt.Errorf("failed to invoke RPC method: %w", err) + } + if bytes.Compare(res, task.Payload) != 0 { + return fmt.Errorf("actual rpc response doesn't match with task's payload") + } + return nil +} diff --git a/go.mod b/go.mod index 567e402..2a52f26 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/Secured-Finance/go-libp2p-pex v1.1.0 github.com/Secured-Finance/golang-set v1.8.0 github.com/aristanetworks/goarista v0.0.0-20210308203447-b196d8410f1d // indirect + github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef // indirect github.com/asaskevich/govalidator v0.0.0-20200907205600-7a23bdc65eef // indirect github.com/cespare/cp v1.1.1 // indirect github.com/deckarep/golang-set v1.7.1 // indirect diff --git a/go.sum b/go.sum index 3481542..3907208 100644 --- a/go.sum +++ b/go.sum @@ -109,6 +109,8 @@ github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmV github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A= +github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef h1:2JGTg6JapxP9/R33ZaagQtAM4EkkSYnIAlOG5EI8gkM= +github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef/go.mod h1:JS7hed4L1fj0hXcyEejnW57/7LCetXggd+vwrRnYeII= github.com/asaskevich/govalidator v0.0.0-20200907205600-7a23bdc65eef h1:46PFijGLmAjMPwCCCo7Jf0W6f9slllCkkv7vyc1yOSg= github.com/asaskevich/govalidator v0.0.0-20200907205600-7a23bdc65eef/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw= github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU= diff --git a/node/node.go b/node/node.go index 3ab3af8..a3bae20 100644 --- a/node/node.go +++ b/node/node.go @@ -9,6 +9,8 @@ import ( "os" "time" + "github.com/asaskevich/EventBus" + "github.com/fxamacker/cbor/v2" "github.com/Secured-Finance/dione/types" @@ -67,6 +69,7 @@ type Node struct { SyncManager sync.SyncManager NetworkService *NetworkService NetworkRPCHost *gorpc.Server + Bus EventBus.Bus //Cache cache.Cache //Wallet *wallet.LocalWallet } @@ -76,6 +79,9 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim Config: config, } + bus := EventBus.New() + n.Bus = bus + // initialize libp2p host lhost, err := provideLibp2pHost(n.Config, prvKey) if err != nil { @@ -126,11 +132,11 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim // == initialize blockchain modules // initialize blockpool database - bp, err := provideBlockChain(n.Config) + bc, err := provideBlockChain(n.Config) if err != nil { logrus.Fatalf("Failed to initialize blockpool: %s", err.Error()) } - n.BlockPool = bp + n.BlockPool = bc logrus.Info("Block pool database has been successfully initialized!") // initialize mempool @@ -141,7 +147,7 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim n.MemPool = mp logrus.Info("Mempool has been successfully initialized!") - ns := provideNetworkService(bp) + ns := provideNetworkService(bc) n.NetworkService = ns rpcHost := provideNetworkRPCHost(lhost) err = rpcHost.Register(ns) @@ -154,7 +160,7 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim r := provideP2PRPCClient(lhost) // initialize sync manager - sm, err := provideSyncManager(bp, mp, r, baddrs[0], psb) // FIXME here we just pick up first bootstrap in list + sm, err := provideSyncManager(bus, bc, mp, r, baddrs[0], psb) // FIXME here we just pick up first bootstrap in list if err != nil { logrus.Fatal(err) } @@ -167,7 +173,7 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim logrus.Info("Mining subsystem has been initialized!") // initialize consensus subsystem - consensusManager := provideConsensusManager(psb, miner, ethClient, prvKey, n.Config.ConsensusMinApprovals) + consensusManager := provideConsensusManager(bus, psb, miner, bc, ethClient, prvKey, n.Config.ConsensusMinApprovals) n.ConsensusManager = consensusManager logrus.Info("Consensus subsystem has been initialized!") diff --git a/node/node_dep_providers.go b/node/node_dep_providers.go index 212658d..3ec1836 100644 --- a/node/node_dep_providers.go +++ b/node/node_dep_providers.go @@ -5,6 +5,8 @@ import ( "fmt" "time" + "github.com/asaskevich/EventBus" + "github.com/Secured-Finance/dione/blockchain" drand2 "github.com/Secured-Finance/dione/beacon/drand" @@ -103,8 +105,8 @@ func providePubsubRouter(lhost host.Host, config *config.Config) *pubsub.PubSubR return pubsub.NewPubSubRouter(lhost, config.PubSub.ServiceTopicName, config.IsBootstrap) } -func provideConsensusManager(psb *pubsub.PubSubRouter, miner *consensus.Miner, ethClient *ethclient.EthereumClient, privateKey crypto.PrivKey, minApprovals int) *consensus.PBFTConsensusManager { - return consensus.NewPBFTConsensusManager(psb, minApprovals, privateKey, ethClient, miner) +func provideConsensusManager(bus EventBus.Bus, psb *pubsub.PubSubRouter, miner *consensus.Miner, bc *blockchain.BlockChain, ethClient *ethclient.EthereumClient, privateKey crypto.PrivKey, minApprovals int) *consensus.PBFTConsensusManager { + return consensus.NewPBFTConsensusManager(bus, psb, minApprovals, privateKey, ethClient, miner, bc) } func provideLibp2pHost(config *config.Config, privateKey crypto.PrivKey) (host.Host, error) { @@ -162,12 +164,12 @@ func provideMemPool() (*pool.Mempool, error) { return pool.NewMempool() } -func provideSyncManager(bp *blockchain.BlockChain, mp *pool.Mempool, r *gorpc.Client, bootstrap multiaddr.Multiaddr, psb *pubsub.PubSubRouter) (sync.SyncManager, error) { +func provideSyncManager(bus EventBus.Bus, bp *blockchain.BlockChain, mp *pool.Mempool, r *gorpc.Client, bootstrap multiaddr.Multiaddr, psb *pubsub.PubSubRouter) (sync.SyncManager, error) { addr, err := peer.AddrInfoFromP2pAddr(bootstrap) if err != nil { return nil, err } - return sync.NewSyncManager(bp, mp, r, addr.ID, psb), nil + return sync.NewSyncManager(bus, bp, mp, r, addr.ID, psb), nil } func provideP2PRPCClient(h host.Host) *gorpc.Client {