diff --git a/beacon/beacon.go b/beacon/beacon.go index e1a1503..9773573 100644 --- a/beacon/beacon.go +++ b/beacon/beacon.go @@ -34,8 +34,9 @@ type BeaconNetwork struct { // valid for a specific chain epoch. Also to verify beacon entries that have // been posted on chain. type BeaconAPI interface { - Entry(context.Context, uint64) <-chan BeaconResult + Entry(context.Context, uint64) (types.BeaconEntry, error) VerifyEntry(types.BeaconEntry, types.BeaconEntry) error + NewEntries() <-chan types.BeaconEntry LatestBeaconRound() uint64 } diff --git a/beacon/drand/drand.go b/beacon/drand/drand.go index 1c4b493..a51aff5 100644 --- a/beacon/drand/drand.go +++ b/beacon/drand/drand.go @@ -8,8 +8,6 @@ import ( "github.com/Arceliar/phony" - "github.com/Secured-Finance/dione/consensus" - "github.com/Secured-Finance/dione/beacon" "github.com/drand/drand/chain" "github.com/drand/drand/client" @@ -38,13 +36,13 @@ type DrandBeacon struct { DrandClient client.Client PublicKey kyber.Point drandResultChannel <-chan client.Result + beaconEntryChannel chan types.BeaconEntry cacheLock sync.Mutex localCache map[uint64]types.BeaconEntry latestDrandRound uint64 - consensusManager *consensus.PBFTConsensusManager } -func NewDrandBeacon(ps *pubsub.PubSub, pcm *consensus.PBFTConsensusManager) (*DrandBeacon, error) { +func NewDrandBeacon(ps *pubsub.PubSub) (*DrandBeacon, error) { cfg := config.NewDrandConfig() drandChain, err := chain.InfoFromJSON(bytes.NewReader([]byte(cfg.ChainInfo))) @@ -83,14 +81,14 @@ func NewDrandBeacon(ps *pubsub.PubSub, pcm *consensus.PBFTConsensusManager) (*Dr } db := &DrandBeacon{ - DrandClient: drandClient, - localCache: make(map[uint64]types.BeaconEntry), - consensusManager: pcm, + DrandClient: drandClient, + localCache: make(map[uint64]types.BeaconEntry), } db.PublicKey = drandChain.PublicKey db.drandResultChannel = db.DrandClient.Watch(context.TODO()) + db.beaconEntryChannel = make(chan types.BeaconEntry) err = db.getLatestDrandResult() if err != nil { return nil, err @@ -106,7 +104,7 @@ func (db *DrandBeacon) getLatestDrandResult() error { log.Errorf("failed to get latest drand round: %v", err) return err } - db.cacheValue(newBeaconResultFromDrandResult(latestDround)) + db.cacheValue(newBeaconEntryFromDrandResult(latestDround)) db.updateLatestDrandRound(latestDround.Round()) return nil } @@ -120,43 +118,30 @@ func (db *DrandBeacon) loop(ctx context.Context) { } case res := <-db.drandResultChannel: { - db.cacheValue(newBeaconResultFromDrandResult(res)) + db.cacheValue(newBeaconEntryFromDrandResult(res)) db.updateLatestDrandRound(res.Round()) - db.consensusManager.NewDrandRound(db, res) + db.newEntry(res) } } } } -func (db *DrandBeacon) Entry(ctx context.Context, round uint64) <-chan beacon.BeaconResult { - out := make(chan beacon.BeaconResult, 1) +func (db *DrandBeacon) Entry(ctx context.Context, round uint64) (types.BeaconEntry, error) { if round != 0 { be := db.getCachedValue(round) if be != nil { - out <- beacon.BeaconResult{Entry: *be} - close(out) - return out + return *be, nil } } - go func() { - start := lib.Clock.Now() - log.Infof("start fetching randomness: round %v", round) - resp, err := db.DrandClient.Get(ctx, round) - - var br beacon.BeaconResult - if err != nil { - br.Err = fmt.Errorf("drand failed Get request: %w", err) - } else { - br.Entry.Round = resp.Round() - br.Entry.Data = resp.Signature() - } - log.Infof("done fetching randomness: round %v, took %v", round, lib.Clock.Since(start)) - out <- br - close(out) - }() - - return out + start := lib.Clock.Now() + log.Infof("start fetching randomness: round %v", round) + resp, err := db.DrandClient.Get(ctx, round) + if err != nil { + return types.BeaconEntry{}, fmt.Errorf("drand failed Get request: %w", err) + } + log.Infof("done fetching randomness: round %v, took %v", round, lib.Clock.Since(start)) + return newBeaconEntryFromDrandResult(resp), nil } func (db *DrandBeacon) cacheValue(res types.BeaconEntry) { db.cacheLock.Lock() @@ -201,7 +186,17 @@ func (db *DrandBeacon) LatestBeaconRound() uint64 { return db.latestDrandRound } -func newBeaconResultFromDrandResult(res client.Result) types.BeaconEntry { +func (db *DrandBeacon) newEntry(res client.Result) { + db.Act(nil, func() { + db.beaconEntryChannel <- types.NewBeaconEntry(res.Round(), res.Randomness(), map[string]interface{}{"signature": res.Signature()}) + }) +} + +func (db *DrandBeacon) NewEntries() <-chan types.BeaconEntry { + return db.beaconEntryChannel +} + +func newBeaconEntryFromDrandResult(res client.Result) types.BeaconEntry { return types.NewBeaconEntry(res.Round(), res.Randomness(), map[string]interface{}{"signature": res.Signature()}) } diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index 092b4ff..a46c39b 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -180,12 +180,13 @@ func (bp *BlockChain) FetchBlockData(blockHash []byte) ([]*types2.Transaction, e } return err } - err = cbor.Unmarshal(blockData, data) + err = cbor.Unmarshal(blockData, &data) return err }) if err != nil { return nil, err } + return data, nil } @@ -227,7 +228,7 @@ func (bp *BlockChain) FetchBlock(blockHash []byte) (*types2.Block, error) { } func (bp *BlockChain) FetchBlockByHeight(height uint64) (*types2.Block, error) { - var heightBytes []byte + var heightBytes = make([]byte, 8) binary.LittleEndian.PutUint64(heightBytes, height) blockHash, err := bp.heightIndex.GetBytes(heightBytes) if err != nil { @@ -243,7 +244,7 @@ func (bp *BlockChain) FetchBlockByHeight(height uint64) (*types2.Block, error) { } func (bp *BlockChain) FetchBlockHeaderByHeight(height uint64) (*types2.BlockHeader, error) { - var heightBytes []byte + var heightBytes = make([]byte, 8) binary.LittleEndian.PutUint64(heightBytes, height) blockHash, err := bp.heightIndex.GetBytes(heightBytes) if err != nil { diff --git a/blockchain/pool/blockpool.go b/blockchain/pool/blockpool.go index 48c3296..ae49e52 100644 --- a/blockchain/pool/blockpool.go +++ b/blockchain/pool/blockpool.go @@ -1,8 +1,11 @@ package pool import ( + "bytes" "encoding/hex" + "github.com/sirupsen/logrus" + "github.com/Secured-Finance/dione/blockchain/types" "github.com/Secured-Finance/dione/cache" ) @@ -29,8 +32,9 @@ func (bp *BlockPool) AddBlock(block *types.Block) error { } func (bp *BlockPool) GetBlock(blockhash []byte) (*types.Block, error) { - var block *types.Block - return block, bp.knownBlocks.Get(hex.EncodeToString(blockhash), &block) + var block types.Block + err := bp.knownBlocks.Get(hex.EncodeToString(blockhash), &block) + return &block, err } // PruneBlocks cleans known blocks list. It is called when new consensus round starts. @@ -53,13 +57,27 @@ func (bp *BlockPool) GetAllAcceptedBlocks() []*types.Block { } // PruneAcceptedBlocks cleans accepted blocks list. It is called when new consensus round starts. -func (bp *BlockPool) PruneAcceptedBlocks() { +func (bp *BlockPool) PruneAcceptedBlocks(committedBlock *types.Block) { for k, v := range bp.acceptedBlocks.Items() { block := v.(*types.Block) for _, v := range block.Data { - v.MerkleProof = nil - bp.mempool.StoreTx(v) // return transactions back to mempool + if !containsTx(committedBlock.Data, v) { + v.MerkleProof = nil + err := bp.mempool.StoreTx(v) // return transactions back to mempool + if err != nil { + logrus.Error(err) + } + } } bp.acceptedBlocks.Delete(k) } } + +func containsTx(s []*types.Transaction, e *types.Transaction) bool { + for _, a := range s { + if bytes.Equal(a.Hash, e.Hash) { + return true + } + } + return false +} diff --git a/blockchain/pool/mempool.go b/blockchain/pool/mempool.go index 22fec10..c94a99c 100644 --- a/blockchain/pool/mempool.go +++ b/blockchain/pool/mempool.go @@ -6,6 +6,8 @@ import ( "sort" "time" + "github.com/sirupsen/logrus" + types2 "github.com/Secured-Finance/dione/blockchain/types" "github.com/Secured-Finance/dione/consensus/policy" @@ -37,9 +39,16 @@ func NewMempool() (*Mempool, error) { func (mp *Mempool) StoreTx(tx *types2.Transaction) error { hashStr := hex.EncodeToString(tx.Hash) err := mp.cache.StoreWithTTL(DefaultTxPrefix+hashStr, tx, DefaultTxTTL) + logrus.Infof("Submitted new transaction in mempool with hash %x", tx.Hash) return err } +func (mp *Mempool) DeleteTx(txHash []byte) { + hashStr := hex.EncodeToString(txHash) + mp.cache.Delete(DefaultTxPrefix + hashStr) + logrus.Debugf("Deleted transaction from mempool %x", txHash) +} + func (mp *Mempool) GetTransactionsForNewBlock() []*types2.Transaction { var txForBlock []*types2.Transaction allTxs := mp.GetAllTransactions() @@ -63,8 +72,8 @@ func (mp *Mempool) GetAllTransactions() []*types2.Transaction { var allTxs []*types2.Transaction for _, v := range mp.cache.Items() { - tx := v.(types2.Transaction) - allTxs = append(allTxs, &tx) + tx := v.(*types2.Transaction) + allTxs = append(allTxs, tx) } return allTxs } diff --git a/blockchain/sync/sync_mgr.go b/blockchain/sync/sync_mgr.go index 88d07a5..70ce135 100644 --- a/blockchain/sync/sync_mgr.go +++ b/blockchain/sync/sync_mgr.go @@ -8,6 +8,8 @@ import ( "strings" "sync" + "github.com/fxamacker/cbor/v2" + "github.com/asaskevich/EventBus" "github.com/Secured-Finance/dione/blockchain/utils" @@ -63,7 +65,7 @@ func NewSyncManager(bus EventBus.Bus, bp *blockchain.BlockChain, mp *pool.Mempoo psb: psb, } - psb.Hook(pubsub.NewTxMessageType, sm.onNewTransaction, types2.Transaction{}) + psb.Hook(pubsub.NewTxMessageType, sm.onNewTransaction) go func() { if err := sm.initialSync(); err != nil { @@ -236,10 +238,11 @@ func (sm *syncManager) processReceivedBlock(block types2.Block) error { return nil } -func (sm *syncManager) onNewTransaction(message *pubsub.GenericMessage) { - tx, ok := message.Payload.(types2.Transaction) - if !ok { - logrus.Warn("failed to convert payload to Transaction") +func (sm *syncManager) onNewTransaction(message *pubsub.PubSubMessage) { + var tx types2.Transaction + err := cbor.Unmarshal(message.Payload, &tx) + if err != nil { + logrus.Errorf("failed to convert payload to transaction: %s", err.Error()) return } @@ -248,7 +251,7 @@ func (sm *syncManager) onNewTransaction(message *pubsub.GenericMessage) { return } // TODO add more checks on tx - err := sm.mempool.StoreTx(&tx) + err = sm.mempool.StoreTx(&tx) if err != nil { logrus.Warnf("failed to store incoming transaction in mempool: %s", err.Error()) } diff --git a/blockchain/types/block.go b/blockchain/types/block.go index ff0877e..262599d 100644 --- a/blockchain/types/block.go +++ b/blockchain/types/block.go @@ -26,7 +26,7 @@ type BlockHeader struct { Hash []byte LastHash []byte LastHashProof *merkletree.Proof - Proposer peer.ID + Proposer *peer.ID ProposerEth common.Address Signature []byte BeaconEntry types.BeaconEntry @@ -78,11 +78,19 @@ func CreateBlock(lastBlockHeader *BlockHeader, txs []*Transaction, minerEth comm return nil, err } + for _, tx := range txs { + mp, err := tree.GenerateProof(tx.Hash, 0) + if err != nil { + return nil, err + } + tx.MerkleProof = mp + } + block := &Block{ Header: &BlockHeader{ Timestamp: timestamp, Height: lastBlockHeader.Height + 1, - Proposer: proposer, + Proposer: &proposer, ProposerEth: minerEth, Signature: s, Hash: blockHash, diff --git a/blockchain/types/transaction.go b/blockchain/types/transaction.go index f892c66..a3b8ebb 100644 --- a/blockchain/types/transaction.go +++ b/blockchain/types/transaction.go @@ -30,6 +30,7 @@ func CreateTransaction(data []byte) *Transaction { } func (tx *Transaction) ValidateHash() bool { - h := crypto.Keccak256([]byte(fmt.Sprintf("%d_%s", tx.Timestamp.Unix(), tx.Hash))) + encodedData := hex.EncodeToString(tx.Data) + h := crypto.Keccak256([]byte(fmt.Sprintf("%d_%s", tx.Timestamp.Unix(), encodedData))) return bytes.Equal(h, tx.Hash) } diff --git a/blockchain/utils/verification.go b/blockchain/utils/verification.go index bd5f0ba..964c32d 100644 --- a/blockchain/utils/verification.go +++ b/blockchain/utils/verification.go @@ -10,7 +10,7 @@ import ( func VerifyTx(blockHeader *types.BlockHeader, tx *types.Transaction) error { if tx.MerkleProof == nil { - return fmt.Errorf("block transaction hasn't merkle proof") + return fmt.Errorf("block transaction doesn't have merkle proof") } txProofVerified, err := merkletree.VerifyProofUsing(tx.Hash, false, tx.MerkleProof, [][]byte{blockHeader.Hash}, keccak256.New()) if err != nil { diff --git a/cache/inmemory_cache.go b/cache/inmemory_cache.go index fd4333f..902180a 100644 --- a/cache/inmemory_cache.go +++ b/cache/inmemory_cache.go @@ -1,6 +1,8 @@ package cache import ( + "fmt" + "reflect" "time" "github.com/patrickmn/go-cache" @@ -37,7 +39,14 @@ func (imc *InMemoryCache) Get(key string, value interface{}) error { if !exists { return ErrNotFound } - value = v + reflectedValue := reflect.ValueOf(value) + if reflectedValue.Kind() != reflect.Ptr { + return fmt.Errorf("value isn't a pointer") + } + if reflectedValue.IsNil() { + reflectedValue.Set(reflect.New(reflectedValue.Type().Elem())) + } + reflectedValue.Elem().Set(reflect.ValueOf(v).Elem()) return nil } diff --git a/consensus/consensus.go b/consensus/consensus.go index e7d6af8..86cbb02 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -5,6 +5,8 @@ import ( "math/big" "sync" + "github.com/Secured-Finance/dione/beacon" + "github.com/fxamacker/cbor/v2" "github.com/Secured-Finance/dione/cache" @@ -13,8 +15,6 @@ import ( "github.com/Secured-Finance/dione/blockchain" - "github.com/drand/drand/client" - "github.com/Arceliar/phony" types3 "github.com/Secured-Finance/dione/blockchain/types" @@ -57,7 +57,8 @@ type PBFTConsensusManager struct { ethereumClient *ethclient.EthereumClient miner *Miner blockPool *pool.BlockPool - blockchain blockchain.BlockChain + mempool *pool.Mempool + blockchain *blockchain.BlockChain state *State } @@ -67,7 +68,7 @@ type State struct { randomness []byte blockHeight uint64 status StateStatus - ready chan bool + ready bool } func NewPBFTConsensusManager( @@ -79,33 +80,47 @@ func NewPBFTConsensusManager( miner *Miner, bc *blockchain.BlockChain, bp *pool.BlockPool, + b beacon.BeaconNetwork, + mempool *pool.Mempool, ) *PBFTConsensusManager { pcm := &PBFTConsensusManager{} pcm.psb = psb pcm.miner = miner - pcm.validator = NewConsensusValidator(miner, bc) + pcm.validator = NewConsensusValidator(miner, bc, b) pcm.msgLog = NewConsensusMessageLog() pcm.minApprovals = minApprovals pcm.privKey = privKey pcm.ethereumClient = ethereumClient pcm.state = &State{ - ready: make(chan bool, 1), + ready: false, status: StateStatusUnknown, } pcm.bus = bus pcm.blockPool = bp - pcm.psb.Hook(pubsub.PrePrepareMessageType, pcm.handlePrePrepare, types.PrePrepareMessage{}) - pcm.psb.Hook(pubsub.PrepareMessageType, pcm.handlePrepare, types.PrepareMessage{}) - pcm.psb.Hook(pubsub.CommitMessageType, pcm.handleCommit, types.CommitMessage{}) - bus.SubscribeOnce("sync:initialSyncCompleted", func() { - pcm.state.ready <- true - }) + pcm.mempool = mempool + pcm.blockchain = bc + pcm.psb.Hook(pubsub.PrePrepareMessageType, pcm.handlePrePrepare) + pcm.psb.Hook(pubsub.PrepareMessageType, pcm.handlePrepare) + pcm.psb.Hook(pubsub.CommitMessageType, pcm.handleCommit) + //bus.SubscribeOnce("sync:initialSyncCompleted", func() { + // pcm.state.ready = true + //}) + height, _ := pcm.blockchain.GetLatestBlockHeight() + pcm.state.blockHeight = height + 1 + go func() { + for { + select { + case e := <-b.Beacon.NewEntries(): + { + pcm.NewDrandRound(nil, e) + } + } + } + }() return pcm } func (pcm *PBFTConsensusManager) propose(blk *types3.Block) error { - pcm.state.mutex.Lock() - defer pcm.state.mutex.Unlock() prePrepareMsg, err := NewMessage(types.ConsensusMessage{Block: blk}, types.ConsensusMessageTypePrePrepare, pcm.privKey) if err != nil { return err @@ -116,33 +131,33 @@ func (pcm *PBFTConsensusManager) propose(blk *types3.Block) error { return nil } -func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.GenericMessage) { +func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.PubSubMessage) { pcm.state.mutex.Lock() defer pcm.state.mutex.Unlock() - prePrepare, ok := message.Payload.(types.PrePrepareMessage) - if !ok { - logrus.Warn("failed to convert payload to PrePrepare message") + var prePrepare types.PrePrepareMessage + err := cbor.Unmarshal(message.Payload, &prePrepare) + if err != nil { + logrus.Errorf("failed to convert payload to PrePrepare message: %s", err.Error()) return } - if prePrepare.Block.Header.Proposer == pcm.miner.address { + if *prePrepare.Block.Header.Proposer == pcm.miner.address { return } cmsg := types.ConsensusMessage{ - Type: types.ConsensusMessageTypePrePrepare, - From: message.From, - Block: prePrepare.Block, + Type: types.ConsensusMessageTypePrePrepare, + From: message.From, + Block: prePrepare.Block, + Blockhash: prePrepare.Block.Header.Hash, } - <-pcm.state.ready - if pcm.msgLog.Exists(cmsg) { - logrus.Debugf("received existing pre_prepare msg, dropping...") + logrus.Tracef("received existing pre_prepare msg for block %x", cmsg.Block.Header.Hash) return } if !pcm.validator.Valid(cmsg, map[string]interface{}{"randomness": pcm.state.randomness}) { - logrus.Warn("received invalid pre_prepare msg, dropping...") + logrus.Warnf("received invalid pre_prepare msg for block %x", cmsg.Block.Header.Hash) return } @@ -159,12 +174,13 @@ func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.GenericMessage pcm.state.status = StateStatusPrePrepared } -func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.GenericMessage) { +func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) { pcm.state.mutex.Lock() defer pcm.state.mutex.Unlock() - prepare, ok := message.Payload.(types.PrepareMessage) - if !ok { - logrus.Warn("failed to convert payload to Prepare message") + var prepare types.PrepareMessage + err := cbor.Unmarshal(message.Payload, &prepare) + if err != nil { + logrus.Errorf("failed to convert payload to Prepare message: %s", err.Error()) return } @@ -176,17 +192,17 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.GenericMessage) { } if _, err := pcm.blockPool.GetBlock(cmsg.Blockhash); errors.Is(err, cache.ErrNotFound) { - logrus.Debugf("received unknown block") + logrus.Warnf("received unknown block %x", cmsg.Blockhash) return } if pcm.msgLog.Exists(cmsg) { - logrus.Debugf("received existing prepare msg, dropping...") + logrus.Tracef("received existing prepare msg for block %x", cmsg.Blockhash) return } if !pcm.validator.Valid(cmsg, nil) { - logrus.Warn("received invalid prepare msg, dropping...") + logrus.Warnf("received invalid prepare msg for block %x", cmsg.Blockhash) return } @@ -196,18 +212,20 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.GenericMessage) { commitMsg, err := NewMessage(cmsg, types.ConsensusMessageTypeCommit, pcm.privKey) if err != nil { logrus.Errorf("failed to create commit message: %v", err) + return } pcm.psb.BroadcastToServiceTopic(commitMsg) pcm.state.status = StateStatusPrepared } } -func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.GenericMessage) { +func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.PubSubMessage) { pcm.state.mutex.Lock() defer pcm.state.mutex.Unlock() - commit, ok := message.Payload.(types.CommitMessage) - if !ok { - logrus.Warn("failed to convert payload to Prepare message") + var commit types.CommitMessage + err := cbor.Unmarshal(message.Payload, &commit) + if err != nil { + logrus.Errorf("failed to convert payload to Commit message: %s", err.Error()) return } @@ -215,20 +233,20 @@ func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.GenericMessage) { Type: types.ConsensusMessageTypeCommit, From: message.From, Blockhash: commit.Blockhash, - Signature: commit.Signature, // TODO check the signature + Signature: commit.Signature, } if _, err := pcm.blockPool.GetBlock(cmsg.Blockhash); errors.Is(err, cache.ErrNotFound) { - logrus.Debugf("received unknown block") + logrus.Warnf("received unknown block %x", cmsg.Blockhash) return } if pcm.msgLog.Exists(cmsg) { - logrus.Debugf("received existing commit msg, dropping...") + logrus.Tracef("received existing commit msg for block %x", cmsg.Blockhash) return } if !pcm.validator.Valid(cmsg, nil) { - logrus.Warn("received invalid commit msg, dropping...") + logrus.Warnf("received invalid commit msg for block %x", cmsg.Blockhash) return } @@ -237,7 +255,7 @@ func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.GenericMessage) { if len(pcm.msgLog.Get(types.ConsensusMessageTypeCommit, cmsg.Blockhash)) >= pcm.minApprovals { block, err := pcm.blockPool.GetBlock(cmsg.Blockhash) if err != nil { - logrus.Debug(err) + logrus.Error(err) return } pcm.blockPool.AddAcceptedBlock(block) @@ -245,58 +263,87 @@ func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.GenericMessage) { } } -func (pcm *PBFTConsensusManager) NewDrandRound(from phony.Actor, res client.Result) { +func (pcm *PBFTConsensusManager) NewDrandRound(from phony.Actor, entry types2.BeaconEntry) { pcm.Act(from, func() { pcm.state.mutex.Lock() defer pcm.state.mutex.Unlock() block, err := pcm.commitAcceptedBlocks() if err != nil { if errors.Is(err, ErrNoAcceptedBlocks) { - logrus.Warnf("No accepted blocks for consensus round %d", pcm.state.blockHeight) + logrus.Infof("No accepted blocks for consensus round %d", pcm.state.blockHeight) } else { logrus.Errorf("Failed to select the block in consensus round %d: %s", pcm.state.blockHeight, err.Error()) - } - return - } - - // if we are miner for this block - // then post dione tasks to target chains (currently, only Ethereum) - if block.Header.Proposer == pcm.miner.address { - for _, v := range block.Data { - var task types2.DioneTask - err := cbor.Unmarshal(v.Data, &task) - if err != nil { - logrus.Errorf("Failed to unmarshal transaction %x payload: %s", v.Hash, err.Error()) - continue // FIXME - } - reqIDNumber, ok := big.NewInt(0).SetString(task.RequestID, 10) - if !ok { - logrus.Errorf("Failed to parse request id number in task of tx %x", v.Hash) - continue // FIXME - } - - err = pcm.ethereumClient.SubmitRequestAnswer(reqIDNumber, task.Payload) - if err != nil { - logrus.Errorf("Failed to submit task in tx %x: %s", v.Hash, err.Error()) - continue // FIXME - } + return } } - pcm.state.ready <- true + if block != nil { + // broadcast new block + var newBlockMessage pubsub.PubSubMessage + newBlockMessage.Type = pubsub.NewBlockMessageType + blockSerialized, err := cbor.Marshal(block) + if err != nil { + logrus.Errorf("Failed to serialize block %x for broadcasting!", block.Header.Hash) + } else { + newBlockMessage.Payload = blockSerialized + pcm.psb.BroadcastToServiceTopic(&newBlockMessage) + } - minedBlock, err := pcm.miner.MineBlock(res.Randomness(), block.Header) + // if we are miner for this block + // then post dione tasks to target chains (currently, only Ethereum) + if *block.Header.Proposer == pcm.miner.address { + for _, v := range block.Data { + var task types2.DioneTask + err := cbor.Unmarshal(v.Data, &task) + if err != nil { + logrus.Errorf("Failed to unmarshal transaction %x payload: %s", v.Hash, err.Error()) + continue // FIXME + } + reqIDNumber, ok := big.NewInt(0).SetString(task.RequestID, 10) + if !ok { + logrus.Errorf("Failed to parse request id number in task of tx %x", v.Hash) + continue // FIXME + } + + err = pcm.ethereumClient.SubmitRequestAnswer(reqIDNumber, task.Payload) + if err != nil { + logrus.Errorf("Failed to submit task in tx %x: %s", v.Hash, err.Error()) + continue // FIXME + } + } + } + + pcm.state.blockHeight = pcm.state.blockHeight + 1 + } + + // get latest block + height, err := pcm.blockchain.GetLatestBlockHeight() if err != nil { - logrus.Errorf("Failed to mine the block: %s", err.Error()) + logrus.Error(err) + return + } + blockHeader, err := pcm.blockchain.FetchBlockHeaderByHeight(height) + if err != nil { + logrus.Error(err) return } - pcm.state.drandRound = res.Round() - pcm.state.randomness = res.Randomness() - pcm.state.blockHeight = pcm.state.blockHeight + 1 + pcm.state.drandRound = entry.Round + pcm.state.randomness = entry.Data + + minedBlock, err := pcm.miner.MineBlock(entry.Data, entry.Round, blockHeader) + if err != nil { + if errors.Is(err, ErrNoTxForBlock) { + logrus.Info("Skipping consensus round, because we don't have transactions in mempool for including into block") + } else { + logrus.Errorf("Failed to mine the block: %s", err.Error()) + } + return + } // if we are round winner if minedBlock != nil { + logrus.Infof("We are elected in consensus round %d", pcm.state.blockHeight) err = pcm.propose(minedBlock) if err != nil { logrus.Errorf("Failed to propose the block: %s", err.Error()) @@ -327,7 +374,10 @@ func (pcm *PBFTConsensusManager) commitAcceptedBlocks() (*types3.Block, error) { maxStake = stake selectedBlock = v } - logrus.Debugf("Selected block of miner %s", selectedBlock.Header.ProposerEth.Hex()) - pcm.blockPool.PruneAcceptedBlocks() + logrus.Infof("Committed block %x with height %d of miner %s", selectedBlock.Header.Hash, selectedBlock.Header.Height, selectedBlock.Header.Proposer.String()) + pcm.blockPool.PruneAcceptedBlocks(selectedBlock) + for _, v := range selectedBlock.Data { + pcm.mempool.DeleteTx(v.Hash) + } return selectedBlock, pcm.blockchain.StoreBlock(selectedBlock) } diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index 571cc36..90145e7 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -2,9 +2,12 @@ package consensus import ( "bytes" + "context" "fmt" "sync" + "github.com/Secured-Finance/dione/beacon" + types3 "github.com/Secured-Finance/dione/blockchain/types" "github.com/Secured-Finance/dione/blockchain" @@ -22,13 +25,15 @@ import ( type ConsensusValidator struct { validationFuncMap map[types2.ConsensusMessageType]func(msg types2.ConsensusMessage, metadata map[string]interface{}) bool miner *Miner + beacon beacon.BeaconNetwork blockchain *blockchain.BlockChain } -func NewConsensusValidator(miner *Miner, bc *blockchain.BlockChain) *ConsensusValidator { +func NewConsensusValidator(miner *Miner, bc *blockchain.BlockChain, b beacon.BeaconNetwork) *ConsensusValidator { cv := &ConsensusValidator{ miner: miner, blockchain: bc, + beacon: b, } cv.validationFuncMap = map[types2.ConsensusMessageType]func(msg types2.ConsensusMessage, metadata map[string]interface{}) bool{ @@ -64,7 +69,7 @@ func NewConsensusValidator(miner *Miner, bc *blockchain.BlockChain) *ConsensusVa return false } if bytes.Compare(msg.Block.Header.LastHash, previousBlockHeader.Hash) != 0 { - logrus.Error("block header has invalid last block hash") + logrus.Errorf("block header has invalid last block hash (expected: %x, actual %x)", previousBlockHeader.Hash, msg.Block.Header.LastHash) return false } @@ -101,8 +106,13 @@ func NewConsensusValidator(miner *Miner, bc *blockchain.BlockChain) *ConsensusVa return false } + res, err := b.Beacon.Entry(context.TODO(), msg.Block.Header.ElectionProof.RandomnessRound) + if err != nil { + logrus.Error(err) + return false + } eproofRandomness, err := DrawRandomness( - metadata["randomness"].([]byte), + res.Data, crypto.DomainSeparationTag_ElectionProofProduction, msg.Block.Header.Height, proposerBuf, @@ -111,7 +121,7 @@ func NewConsensusValidator(miner *Miner, bc *blockchain.BlockChain) *ConsensusVa logrus.Errorf("failed to draw ElectionProof randomness: %s", err.Error()) return false } - err = VerifyVRF(msg.Block.Header.Proposer, eproofRandomness, msg.Block.Header.ElectionProof.VRFProof) + err = VerifyVRF(*msg.Block.Header.Proposer, eproofRandomness, msg.Block.Header.ElectionProof.VRFProof) if err != nil { logrus.Errorf("failed to verify election proof vrf: %v", err) return false diff --git a/consensus/miner.go b/consensus/miner.go index a4527a3..5b6891f 100644 --- a/consensus/miner.go +++ b/consensus/miner.go @@ -19,6 +19,10 @@ import ( "github.com/sirupsen/logrus" ) +var ( + ErrNoTxForBlock = fmt.Errorf("no transactions for including into block") +) + type Miner struct { address peer.ID ethAddress common.Address @@ -85,7 +89,7 @@ func (m *Miner) GetStakeInfo(miner common.Address) (*big.Int, *big.Int, error) { return mStake, nStake, nil } -func (m *Miner) MineBlock(randomness []byte, lastBlockHeader *types2.BlockHeader) (*types2.Block, error) { +func (m *Miner) MineBlock(randomness []byte, randomnessRound uint64, lastBlockHeader *types2.BlockHeader) (*types2.Block, error) { logrus.Debug("attempting to mine the block at epoch: ", lastBlockHeader.Height+1) if err := m.UpdateCurrentStakeInfo(); err != nil { @@ -96,6 +100,7 @@ func (m *Miner) MineBlock(randomness []byte, lastBlockHeader *types2.BlockHeader lastBlockHeader.Height+1, m.address, randomness, + randomnessRound, m.minerStake, m.networkStake, m.privateKey, @@ -110,7 +115,7 @@ func (m *Miner) MineBlock(randomness []byte, lastBlockHeader *types2.BlockHeader txs := m.mempool.GetTransactionsForNewBlock() if txs == nil { - return nil, fmt.Errorf("there is no txes for processing") // skip new consensus round because there is no transaction for processing + return nil, ErrNoTxForBlock // skip new consensus round because there is no transaction for processing } newBlock, err := types2.CreateBlock(lastBlockHeader, txs, m.ethAddress, m.privateKey, winner) diff --git a/consensus/utils.go b/consensus/utils.go index 490e992..7d62f77 100644 --- a/consensus/utils.go +++ b/consensus/utils.go @@ -5,6 +5,8 @@ import ( "fmt" "math/big" + "github.com/fxamacker/cbor/v2" + "github.com/Secured-Finance/dione/pubsub" types2 "github.com/Secured-Finance/dione/consensus/types" @@ -30,7 +32,7 @@ func VerifyVRF(worker peer.ID, vrfBase, vrfproof []byte) error { if err != nil { return err } - ok, err := pk.Verify(vrfproof, vrfBase) + ok, err := pk.Verify(vrfBase, vrfproof) if err != nil { return err } @@ -42,7 +44,7 @@ func VerifyVRF(worker peer.ID, vrfBase, vrfproof []byte) error { } func IsRoundWinner(round uint64, - worker peer.ID, randomness []byte, minerStake, networkStake *big.Int, privKey crypto.PrivKey) (*types.ElectionProof, error) { + worker peer.ID, randomness []byte, randomnessRound uint64, minerStake, networkStake *big.Int, privKey crypto.PrivKey) (*types.ElectionProof, error) { buf, err := worker.MarshalBinary() if err != nil { @@ -59,7 +61,7 @@ func IsRoundWinner(round uint64, return nil, fmt.Errorf("failed to compute VRF: %w", err) } - ep := &types.ElectionProof{VRFProof: vrfout} + ep := &types.ElectionProof{VRFProof: vrfout, RandomnessRound: randomnessRound} j := ep.ComputeWinCount(minerStake, networkStake) ep.WinCount = j if j < 1 { @@ -90,29 +92,38 @@ func DrawRandomness(rbase []byte, pers crypto2.DomainSeparationTag, round uint64 return h.Sum(nil), nil } -func NewMessage(cmsg types2.ConsensusMessage, typ types2.ConsensusMessageType, privKey crypto.PrivKey) (*pubsub.GenericMessage, error) { - var message pubsub.GenericMessage +func NewMessage(cmsg types2.ConsensusMessage, typ types2.ConsensusMessageType, privKey crypto.PrivKey) (*pubsub.PubSubMessage, error) { + var message pubsub.PubSubMessage switch typ { case types2.ConsensusMessageTypePrePrepare: { message.Type = pubsub.PrePrepareMessageType - message.Payload = types2.PrePrepareMessage{ + msg := types2.PrePrepareMessage{ Block: cmsg.Block, } + data, err := cbor.Marshal(msg) + if err != nil { + return nil, fmt.Errorf("failed to convert message to map: %s", err.Error()) + } + message.Payload = data break } case types2.ConsensusMessageTypePrepare: { message.Type = pubsub.PrepareMessageType - pm := types2.PrepareMessage{ - Blockhash: cmsg.Blockhash, - } signature, err := privKey.Sign(cmsg.Blockhash) if err != nil { return nil, fmt.Errorf("failed to create signature: %v", err) } - pm.Signature = signature - message.Payload = pm + pm := types2.PrepareMessage{ + Blockhash: cmsg.Block.Header.Hash, + Signature: signature, + } + data, err := cbor.Marshal(pm) + if err != nil { + return nil, fmt.Errorf("failed to convert message to map: %s", err.Error()) + } + message.Payload = data break } case types2.ConsensusMessageTypeCommit: @@ -126,7 +137,11 @@ func NewMessage(cmsg types2.ConsensusMessage, typ types2.ConsensusMessageType, p return nil, fmt.Errorf("failed to create signature: %v", err) } pm.Signature = signature - message.Payload = pm + data, err := cbor.Marshal(pm) + if err != nil { + return nil, fmt.Errorf("failed to convert message to map: %s", err.Error()) + } + message.Payload = data break } } diff --git a/node/node.go b/node/node.go index 932b981..212b4c2 100644 --- a/node/node.go +++ b/node/node.go @@ -7,6 +7,8 @@ import ( "fmt" "io/ioutil" "os" + "path" + "runtime" "time" "github.com/Secured-Finance/dione/blockchain" @@ -64,7 +66,7 @@ type Node struct { Ethereum *ethclient.EthereumClient ConsensusManager *consensus.PBFTConsensusManager Miner *consensus.Miner - Beacon beacon.BeaconNetworks + Beacon beacon.BeaconNetwork DisputeManager *consensus.DisputeManager BlockPool *pool.BlockPool MemPool *pool.Mempool @@ -189,19 +191,19 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim n.Miner = miner logrus.Info("Mining subsystem has been initialized!") - // initialize consensus subsystem - consensusManager := provideConsensusManager(bus, psb, miner, bc, ethClient, prvKey, n.Config.ConsensusMinApprovals, bp) - n.ConsensusManager = consensusManager - logrus.Info("Consensus subsystem has been initialized!") - // initialize random beacon network subsystem - randomBeaconNetwork, err := provideBeacon(psb.Pubsub, consensusManager) + randomBeaconNetwork, err := provideBeacon(psb.Pubsub) if err != nil { logrus.Fatal(err) } n.Beacon = randomBeaconNetwork logrus.Info("Random beacon subsystem has been initialized!") + // initialize consensus subsystem + consensusManager := provideConsensusManager(bus, psb, miner, bc, ethClient, prvKey, n.Config.ConsensusMinApprovals, bp, randomBeaconNetwork, mp) + n.ConsensusManager = consensusManager + logrus.Info("Consensus subsystem has been initialized!") + // initialize dispute subsystem disputeManager, err := provideDisputeManager(context.TODO(), ethClient, consensusManager, config, bc) if err != nil { @@ -344,6 +346,14 @@ func (n *Node) setupRPCClients() error { } func Start() { + logrus.SetReportCaller(true) + logrus.SetFormatter(&logrus.TextFormatter{ + CallerPrettyfier: func(f *runtime.Frame) (string, string) { + filename := path.Base(f.File) + return "", fmt.Sprintf("%s:%d:", filename, f.Line) + }, + }) + configPath := flag.String("config", "", "Path to config") verbose := flag.Bool("verbose", false, "Verbose logging") flag.Parse() diff --git a/node/node_dep_providers.go b/node/node_dep_providers.go index 8f43b78..124b97d 100644 --- a/node/node_dep_providers.go +++ b/node/node_dep_providers.go @@ -64,15 +64,13 @@ func provideMiner(peerID peer.ID, ethAddress common.Address, ethClient *ethclien return consensus.NewMiner(peerID, ethAddress, ethClient, privateKey, mempool) } -func provideBeacon(ps *pubsub2.PubSub, pcm *consensus.PBFTConsensusManager) (beacon.BeaconNetworks, error) { - networks := beacon.BeaconNetworks{} - bc, err := drand2.NewDrandBeacon(ps, pcm) +func provideBeacon(ps *pubsub2.PubSub) (beacon.BeaconNetwork, error) { + bc, err := drand2.NewDrandBeacon(ps) if err != nil { - return nil, fmt.Errorf("failed to setup drand beacon: %w", err) + return beacon.BeaconNetwork{}, fmt.Errorf("failed to setup drand beacon: %w", err) } - networks = append(networks, beacon.BeaconNetwork{Start: config.DrandChainGenesisTime, Beacon: bc}) // NOTE: currently we use only one network - return networks, nil + return beacon.BeaconNetwork{Start: config.DrandChainGenesisTime, Beacon: bc}, nil } // FIXME: do we really need this? @@ -114,6 +112,8 @@ func provideConsensusManager( privateKey crypto.PrivKey, minApprovals int, bp *pool.BlockPool, + b beacon.BeaconNetwork, + mp *pool.Mempool, ) *consensus.PBFTConsensusManager { return consensus.NewPBFTConsensusManager( bus, @@ -124,6 +124,8 @@ func provideConsensusManager( miner, bc, bp, + b, + mp, ) } diff --git a/pubsub/message.go b/pubsub/message.go index 4b803f3..5537d51 100644 --- a/pubsub/message.go +++ b/pubsub/message.go @@ -13,13 +13,8 @@ const ( NewBlockMessageType ) -type GenericMessage struct { - Type PubSubMessageType - From peer.ID `cbor:"-"` - Payload interface{} -} - type PubSubMessage struct { Type PubSubMessageType + From peer.ID `cbor:"-"` Payload []byte } diff --git a/pubsub/pubsub_router.go b/pubsub/pubsub_router.go index 1c7e97a..4653583 100644 --- a/pubsub/pubsub_router.go +++ b/pubsub/pubsub_router.go @@ -21,10 +21,9 @@ type PubSubRouter struct { handlers map[PubSubMessageType][]Handler oracleTopicName string oracleTopic *pubsub.Topic - typeMapping map[PubSubMessageType]interface{} // message type -> sample } -type Handler func(message *GenericMessage) +type Handler func(message *PubSubMessage) func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubRouter { ctx, ctxCancel := context.WithCancel(context.Background()) @@ -34,7 +33,6 @@ func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubR context: ctx, contextCancel: ctxCancel, handlers: make(map[PubSubMessageType][]Handler), - typeMapping: map[PubSubMessageType]interface{}{}, } var pbOptions []pubsub.Option @@ -104,30 +102,16 @@ func (psr *PubSubRouter) handleMessage(p *pubsub.Message) { if senderPeerID == psr.node.ID() { return } - var genericMessage PubSubMessage - var message GenericMessage - err = cbor.Unmarshal(p.Data, &genericMessage) - if err != nil { - logrus.Warn("Unable to decode pubsub message data! " + err.Error()) - return - } - sampleMsg, ok := psr.typeMapping[genericMessage.Type] - if !ok { - logrus.Warnf("Unknown message type %d: we have no clue how to decode it", genericMessage.Type) - return - } - destMsg := sampleMsg - err = cbor.Unmarshal(genericMessage.Payload, &destMsg) + var message PubSubMessage + err = cbor.Unmarshal(p.Data, &message) if err != nil { logrus.Warn("Unable to decode pubsub message data! " + err.Error()) return } message.From = senderPeerID - message.Type = genericMessage.Type - message.Payload = destMsg - handlers, ok := psr.handlers[genericMessage.Type] + handlers, ok := psr.handlers[message.Type] if !ok { - logrus.Warn("Dropping pubsub message " + string(genericMessage.Type) + " because we don't have any handlers!") + logrus.Warnf("Dropping pubsub message with type %d because we don't have any handlers!", message.Type) return } for _, v := range handlers { @@ -135,16 +119,15 @@ func (psr *PubSubRouter) handleMessage(p *pubsub.Message) { } } -func (psr *PubSubRouter) Hook(messageType PubSubMessageType, handler Handler, sample interface{}) { +func (psr *PubSubRouter) Hook(messageType PubSubMessageType, handler Handler) { _, ok := psr.handlers[messageType] if !ok { psr.handlers[messageType] = []Handler{} } psr.handlers[messageType] = append(psr.handlers[messageType], handler) - psr.typeMapping[messageType] = sample } -func (psr *PubSubRouter) BroadcastToServiceTopic(msg *GenericMessage) error { +func (psr *PubSubRouter) BroadcastToServiceTopic(msg *PubSubMessage) error { data, err := cbor.Marshal(msg) if err != nil { return err diff --git a/rpc/filecoin/filecoin.go b/rpc/filecoin/filecoin.go index ace13b1..a9ec9e5 100644 --- a/rpc/filecoin/filecoin.go +++ b/rpc/filecoin/filecoin.go @@ -114,6 +114,6 @@ func (c *LotusClient) HandleRequest(method string, params []interface{}) ([]byte return nil, err } bodyBytes := resp.Body() - logrus.Debugf("Filecoin RPC reply: %v", string(bodyBytes)) + logrus.Tracef("Filecoin RPC reply: %v", string(bodyBytes)) return bodyBytes, nil } diff --git a/types/electionproof.go b/types/electionproof.go index 62a7d7a..5350dde 100644 --- a/types/electionproof.go +++ b/types/electionproof.go @@ -8,8 +8,9 @@ import ( ) type ElectionProof struct { - WinCount int64 - VRFProof []byte + WinCount int64 + VRFProof []byte + RandomnessRound uint64 } const precision = 256