Make blockchain finally working!

This commit is contained in:
ChronosX88 2021-07-12 02:23:00 +03:00
parent 6c18edd471
commit d7a1e87939
Signed by: ChronosXYZ
GPG Key ID: 085A69A82C8C511A
20 changed files with 313 additions and 197 deletions

View File

@ -34,8 +34,9 @@ type BeaconNetwork struct {
// valid for a specific chain epoch. Also to verify beacon entries that have // valid for a specific chain epoch. Also to verify beacon entries that have
// been posted on chain. // been posted on chain.
type BeaconAPI interface { type BeaconAPI interface {
Entry(context.Context, uint64) <-chan BeaconResult Entry(context.Context, uint64) (types.BeaconEntry, error)
VerifyEntry(types.BeaconEntry, types.BeaconEntry) error VerifyEntry(types.BeaconEntry, types.BeaconEntry) error
NewEntries() <-chan types.BeaconEntry
LatestBeaconRound() uint64 LatestBeaconRound() uint64
} }

View File

@ -8,8 +8,6 @@ import (
"github.com/Arceliar/phony" "github.com/Arceliar/phony"
"github.com/Secured-Finance/dione/consensus"
"github.com/Secured-Finance/dione/beacon" "github.com/Secured-Finance/dione/beacon"
"github.com/drand/drand/chain" "github.com/drand/drand/chain"
"github.com/drand/drand/client" "github.com/drand/drand/client"
@ -38,13 +36,13 @@ type DrandBeacon struct {
DrandClient client.Client DrandClient client.Client
PublicKey kyber.Point PublicKey kyber.Point
drandResultChannel <-chan client.Result drandResultChannel <-chan client.Result
beaconEntryChannel chan types.BeaconEntry
cacheLock sync.Mutex cacheLock sync.Mutex
localCache map[uint64]types.BeaconEntry localCache map[uint64]types.BeaconEntry
latestDrandRound uint64 latestDrandRound uint64
consensusManager *consensus.PBFTConsensusManager
} }
func NewDrandBeacon(ps *pubsub.PubSub, pcm *consensus.PBFTConsensusManager) (*DrandBeacon, error) { func NewDrandBeacon(ps *pubsub.PubSub) (*DrandBeacon, error) {
cfg := config.NewDrandConfig() cfg := config.NewDrandConfig()
drandChain, err := chain.InfoFromJSON(bytes.NewReader([]byte(cfg.ChainInfo))) drandChain, err := chain.InfoFromJSON(bytes.NewReader([]byte(cfg.ChainInfo)))
@ -85,12 +83,12 @@ func NewDrandBeacon(ps *pubsub.PubSub, pcm *consensus.PBFTConsensusManager) (*Dr
db := &DrandBeacon{ db := &DrandBeacon{
DrandClient: drandClient, DrandClient: drandClient,
localCache: make(map[uint64]types.BeaconEntry), localCache: make(map[uint64]types.BeaconEntry),
consensusManager: pcm,
} }
db.PublicKey = drandChain.PublicKey db.PublicKey = drandChain.PublicKey
db.drandResultChannel = db.DrandClient.Watch(context.TODO()) db.drandResultChannel = db.DrandClient.Watch(context.TODO())
db.beaconEntryChannel = make(chan types.BeaconEntry)
err = db.getLatestDrandResult() err = db.getLatestDrandResult()
if err != nil { if err != nil {
return nil, err return nil, err
@ -106,7 +104,7 @@ func (db *DrandBeacon) getLatestDrandResult() error {
log.Errorf("failed to get latest drand round: %v", err) log.Errorf("failed to get latest drand round: %v", err)
return err return err
} }
db.cacheValue(newBeaconResultFromDrandResult(latestDround)) db.cacheValue(newBeaconEntryFromDrandResult(latestDround))
db.updateLatestDrandRound(latestDround.Round()) db.updateLatestDrandRound(latestDround.Round())
return nil return nil
} }
@ -120,43 +118,30 @@ func (db *DrandBeacon) loop(ctx context.Context) {
} }
case res := <-db.drandResultChannel: case res := <-db.drandResultChannel:
{ {
db.cacheValue(newBeaconResultFromDrandResult(res)) db.cacheValue(newBeaconEntryFromDrandResult(res))
db.updateLatestDrandRound(res.Round()) db.updateLatestDrandRound(res.Round())
db.consensusManager.NewDrandRound(db, res) db.newEntry(res)
} }
} }
} }
} }
func (db *DrandBeacon) Entry(ctx context.Context, round uint64) <-chan beacon.BeaconResult { func (db *DrandBeacon) Entry(ctx context.Context, round uint64) (types.BeaconEntry, error) {
out := make(chan beacon.BeaconResult, 1)
if round != 0 { if round != 0 {
be := db.getCachedValue(round) be := db.getCachedValue(round)
if be != nil { if be != nil {
out <- beacon.BeaconResult{Entry: *be} return *be, nil
close(out)
return out
} }
} }
go func() {
start := lib.Clock.Now() start := lib.Clock.Now()
log.Infof("start fetching randomness: round %v", round) log.Infof("start fetching randomness: round %v", round)
resp, err := db.DrandClient.Get(ctx, round) resp, err := db.DrandClient.Get(ctx, round)
var br beacon.BeaconResult
if err != nil { if err != nil {
br.Err = fmt.Errorf("drand failed Get request: %w", err) return types.BeaconEntry{}, fmt.Errorf("drand failed Get request: %w", err)
} else {
br.Entry.Round = resp.Round()
br.Entry.Data = resp.Signature()
} }
log.Infof("done fetching randomness: round %v, took %v", round, lib.Clock.Since(start)) log.Infof("done fetching randomness: round %v, took %v", round, lib.Clock.Since(start))
out <- br return newBeaconEntryFromDrandResult(resp), nil
close(out)
}()
return out
} }
func (db *DrandBeacon) cacheValue(res types.BeaconEntry) { func (db *DrandBeacon) cacheValue(res types.BeaconEntry) {
db.cacheLock.Lock() db.cacheLock.Lock()
@ -201,7 +186,17 @@ func (db *DrandBeacon) LatestBeaconRound() uint64 {
return db.latestDrandRound return db.latestDrandRound
} }
func newBeaconResultFromDrandResult(res client.Result) types.BeaconEntry { func (db *DrandBeacon) newEntry(res client.Result) {
db.Act(nil, func() {
db.beaconEntryChannel <- types.NewBeaconEntry(res.Round(), res.Randomness(), map[string]interface{}{"signature": res.Signature()})
})
}
func (db *DrandBeacon) NewEntries() <-chan types.BeaconEntry {
return db.beaconEntryChannel
}
func newBeaconEntryFromDrandResult(res client.Result) types.BeaconEntry {
return types.NewBeaconEntry(res.Round(), res.Randomness(), map[string]interface{}{"signature": res.Signature()}) return types.NewBeaconEntry(res.Round(), res.Randomness(), map[string]interface{}{"signature": res.Signature()})
} }

View File

@ -180,12 +180,13 @@ func (bp *BlockChain) FetchBlockData(blockHash []byte) ([]*types2.Transaction, e
} }
return err return err
} }
err = cbor.Unmarshal(blockData, data) err = cbor.Unmarshal(blockData, &data)
return err return err
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
return data, nil return data, nil
} }
@ -227,7 +228,7 @@ func (bp *BlockChain) FetchBlock(blockHash []byte) (*types2.Block, error) {
} }
func (bp *BlockChain) FetchBlockByHeight(height uint64) (*types2.Block, error) { func (bp *BlockChain) FetchBlockByHeight(height uint64) (*types2.Block, error) {
var heightBytes []byte var heightBytes = make([]byte, 8)
binary.LittleEndian.PutUint64(heightBytes, height) binary.LittleEndian.PutUint64(heightBytes, height)
blockHash, err := bp.heightIndex.GetBytes(heightBytes) blockHash, err := bp.heightIndex.GetBytes(heightBytes)
if err != nil { if err != nil {
@ -243,7 +244,7 @@ func (bp *BlockChain) FetchBlockByHeight(height uint64) (*types2.Block, error) {
} }
func (bp *BlockChain) FetchBlockHeaderByHeight(height uint64) (*types2.BlockHeader, error) { func (bp *BlockChain) FetchBlockHeaderByHeight(height uint64) (*types2.BlockHeader, error) {
var heightBytes []byte var heightBytes = make([]byte, 8)
binary.LittleEndian.PutUint64(heightBytes, height) binary.LittleEndian.PutUint64(heightBytes, height)
blockHash, err := bp.heightIndex.GetBytes(heightBytes) blockHash, err := bp.heightIndex.GetBytes(heightBytes)
if err != nil { if err != nil {

View File

@ -1,8 +1,11 @@
package pool package pool
import ( import (
"bytes"
"encoding/hex" "encoding/hex"
"github.com/sirupsen/logrus"
"github.com/Secured-Finance/dione/blockchain/types" "github.com/Secured-Finance/dione/blockchain/types"
"github.com/Secured-Finance/dione/cache" "github.com/Secured-Finance/dione/cache"
) )
@ -29,8 +32,9 @@ func (bp *BlockPool) AddBlock(block *types.Block) error {
} }
func (bp *BlockPool) GetBlock(blockhash []byte) (*types.Block, error) { func (bp *BlockPool) GetBlock(blockhash []byte) (*types.Block, error) {
var block *types.Block var block types.Block
return block, bp.knownBlocks.Get(hex.EncodeToString(blockhash), &block) err := bp.knownBlocks.Get(hex.EncodeToString(blockhash), &block)
return &block, err
} }
// PruneBlocks cleans known blocks list. It is called when new consensus round starts. // PruneBlocks cleans known blocks list. It is called when new consensus round starts.
@ -53,13 +57,27 @@ func (bp *BlockPool) GetAllAcceptedBlocks() []*types.Block {
} }
// PruneAcceptedBlocks cleans accepted blocks list. It is called when new consensus round starts. // PruneAcceptedBlocks cleans accepted blocks list. It is called when new consensus round starts.
func (bp *BlockPool) PruneAcceptedBlocks() { func (bp *BlockPool) PruneAcceptedBlocks(committedBlock *types.Block) {
for k, v := range bp.acceptedBlocks.Items() { for k, v := range bp.acceptedBlocks.Items() {
block := v.(*types.Block) block := v.(*types.Block)
for _, v := range block.Data { for _, v := range block.Data {
if !containsTx(committedBlock.Data, v) {
v.MerkleProof = nil v.MerkleProof = nil
bp.mempool.StoreTx(v) // return transactions back to mempool err := bp.mempool.StoreTx(v) // return transactions back to mempool
if err != nil {
logrus.Error(err)
}
}
} }
bp.acceptedBlocks.Delete(k) bp.acceptedBlocks.Delete(k)
} }
} }
func containsTx(s []*types.Transaction, e *types.Transaction) bool {
for _, a := range s {
if bytes.Equal(a.Hash, e.Hash) {
return true
}
}
return false
}

View File

@ -6,6 +6,8 @@ import (
"sort" "sort"
"time" "time"
"github.com/sirupsen/logrus"
types2 "github.com/Secured-Finance/dione/blockchain/types" types2 "github.com/Secured-Finance/dione/blockchain/types"
"github.com/Secured-Finance/dione/consensus/policy" "github.com/Secured-Finance/dione/consensus/policy"
@ -37,9 +39,16 @@ func NewMempool() (*Mempool, error) {
func (mp *Mempool) StoreTx(tx *types2.Transaction) error { func (mp *Mempool) StoreTx(tx *types2.Transaction) error {
hashStr := hex.EncodeToString(tx.Hash) hashStr := hex.EncodeToString(tx.Hash)
err := mp.cache.StoreWithTTL(DefaultTxPrefix+hashStr, tx, DefaultTxTTL) err := mp.cache.StoreWithTTL(DefaultTxPrefix+hashStr, tx, DefaultTxTTL)
logrus.Infof("Submitted new transaction in mempool with hash %x", tx.Hash)
return err return err
} }
func (mp *Mempool) DeleteTx(txHash []byte) {
hashStr := hex.EncodeToString(txHash)
mp.cache.Delete(DefaultTxPrefix + hashStr)
logrus.Debugf("Deleted transaction from mempool %x", txHash)
}
func (mp *Mempool) GetTransactionsForNewBlock() []*types2.Transaction { func (mp *Mempool) GetTransactionsForNewBlock() []*types2.Transaction {
var txForBlock []*types2.Transaction var txForBlock []*types2.Transaction
allTxs := mp.GetAllTransactions() allTxs := mp.GetAllTransactions()
@ -63,8 +72,8 @@ func (mp *Mempool) GetAllTransactions() []*types2.Transaction {
var allTxs []*types2.Transaction var allTxs []*types2.Transaction
for _, v := range mp.cache.Items() { for _, v := range mp.cache.Items() {
tx := v.(types2.Transaction) tx := v.(*types2.Transaction)
allTxs = append(allTxs, &tx) allTxs = append(allTxs, tx)
} }
return allTxs return allTxs
} }

View File

@ -8,6 +8,8 @@ import (
"strings" "strings"
"sync" "sync"
"github.com/fxamacker/cbor/v2"
"github.com/asaskevich/EventBus" "github.com/asaskevich/EventBus"
"github.com/Secured-Finance/dione/blockchain/utils" "github.com/Secured-Finance/dione/blockchain/utils"
@ -63,7 +65,7 @@ func NewSyncManager(bus EventBus.Bus, bp *blockchain.BlockChain, mp *pool.Mempoo
psb: psb, psb: psb,
} }
psb.Hook(pubsub.NewTxMessageType, sm.onNewTransaction, types2.Transaction{}) psb.Hook(pubsub.NewTxMessageType, sm.onNewTransaction)
go func() { go func() {
if err := sm.initialSync(); err != nil { if err := sm.initialSync(); err != nil {
@ -236,10 +238,11 @@ func (sm *syncManager) processReceivedBlock(block types2.Block) error {
return nil return nil
} }
func (sm *syncManager) onNewTransaction(message *pubsub.GenericMessage) { func (sm *syncManager) onNewTransaction(message *pubsub.PubSubMessage) {
tx, ok := message.Payload.(types2.Transaction) var tx types2.Transaction
if !ok { err := cbor.Unmarshal(message.Payload, &tx)
logrus.Warn("failed to convert payload to Transaction") if err != nil {
logrus.Errorf("failed to convert payload to transaction: %s", err.Error())
return return
} }
@ -248,7 +251,7 @@ func (sm *syncManager) onNewTransaction(message *pubsub.GenericMessage) {
return return
} // TODO add more checks on tx } // TODO add more checks on tx
err := sm.mempool.StoreTx(&tx) err = sm.mempool.StoreTx(&tx)
if err != nil { if err != nil {
logrus.Warnf("failed to store incoming transaction in mempool: %s", err.Error()) logrus.Warnf("failed to store incoming transaction in mempool: %s", err.Error())
} }

View File

@ -26,7 +26,7 @@ type BlockHeader struct {
Hash []byte Hash []byte
LastHash []byte LastHash []byte
LastHashProof *merkletree.Proof LastHashProof *merkletree.Proof
Proposer peer.ID Proposer *peer.ID
ProposerEth common.Address ProposerEth common.Address
Signature []byte Signature []byte
BeaconEntry types.BeaconEntry BeaconEntry types.BeaconEntry
@ -78,11 +78,19 @@ func CreateBlock(lastBlockHeader *BlockHeader, txs []*Transaction, minerEth comm
return nil, err return nil, err
} }
for _, tx := range txs {
mp, err := tree.GenerateProof(tx.Hash, 0)
if err != nil {
return nil, err
}
tx.MerkleProof = mp
}
block := &Block{ block := &Block{
Header: &BlockHeader{ Header: &BlockHeader{
Timestamp: timestamp, Timestamp: timestamp,
Height: lastBlockHeader.Height + 1, Height: lastBlockHeader.Height + 1,
Proposer: proposer, Proposer: &proposer,
ProposerEth: minerEth, ProposerEth: minerEth,
Signature: s, Signature: s,
Hash: blockHash, Hash: blockHash,

View File

@ -30,6 +30,7 @@ func CreateTransaction(data []byte) *Transaction {
} }
func (tx *Transaction) ValidateHash() bool { func (tx *Transaction) ValidateHash() bool {
h := crypto.Keccak256([]byte(fmt.Sprintf("%d_%s", tx.Timestamp.Unix(), tx.Hash))) encodedData := hex.EncodeToString(tx.Data)
h := crypto.Keccak256([]byte(fmt.Sprintf("%d_%s", tx.Timestamp.Unix(), encodedData)))
return bytes.Equal(h, tx.Hash) return bytes.Equal(h, tx.Hash)
} }

View File

@ -10,7 +10,7 @@ import (
func VerifyTx(blockHeader *types.BlockHeader, tx *types.Transaction) error { func VerifyTx(blockHeader *types.BlockHeader, tx *types.Transaction) error {
if tx.MerkleProof == nil { if tx.MerkleProof == nil {
return fmt.Errorf("block transaction hasn't merkle proof") return fmt.Errorf("block transaction doesn't have merkle proof")
} }
txProofVerified, err := merkletree.VerifyProofUsing(tx.Hash, false, tx.MerkleProof, [][]byte{blockHeader.Hash}, keccak256.New()) txProofVerified, err := merkletree.VerifyProofUsing(tx.Hash, false, tx.MerkleProof, [][]byte{blockHeader.Hash}, keccak256.New())
if err != nil { if err != nil {

View File

@ -1,6 +1,8 @@
package cache package cache
import ( import (
"fmt"
"reflect"
"time" "time"
"github.com/patrickmn/go-cache" "github.com/patrickmn/go-cache"
@ -37,7 +39,14 @@ func (imc *InMemoryCache) Get(key string, value interface{}) error {
if !exists { if !exists {
return ErrNotFound return ErrNotFound
} }
value = v reflectedValue := reflect.ValueOf(value)
if reflectedValue.Kind() != reflect.Ptr {
return fmt.Errorf("value isn't a pointer")
}
if reflectedValue.IsNil() {
reflectedValue.Set(reflect.New(reflectedValue.Type().Elem()))
}
reflectedValue.Elem().Set(reflect.ValueOf(v).Elem())
return nil return nil
} }

View File

@ -5,6 +5,8 @@ import (
"math/big" "math/big"
"sync" "sync"
"github.com/Secured-Finance/dione/beacon"
"github.com/fxamacker/cbor/v2" "github.com/fxamacker/cbor/v2"
"github.com/Secured-Finance/dione/cache" "github.com/Secured-Finance/dione/cache"
@ -13,8 +15,6 @@ import (
"github.com/Secured-Finance/dione/blockchain" "github.com/Secured-Finance/dione/blockchain"
"github.com/drand/drand/client"
"github.com/Arceliar/phony" "github.com/Arceliar/phony"
types3 "github.com/Secured-Finance/dione/blockchain/types" types3 "github.com/Secured-Finance/dione/blockchain/types"
@ -57,7 +57,8 @@ type PBFTConsensusManager struct {
ethereumClient *ethclient.EthereumClient ethereumClient *ethclient.EthereumClient
miner *Miner miner *Miner
blockPool *pool.BlockPool blockPool *pool.BlockPool
blockchain blockchain.BlockChain mempool *pool.Mempool
blockchain *blockchain.BlockChain
state *State state *State
} }
@ -67,7 +68,7 @@ type State struct {
randomness []byte randomness []byte
blockHeight uint64 blockHeight uint64
status StateStatus status StateStatus
ready chan bool ready bool
} }
func NewPBFTConsensusManager( func NewPBFTConsensusManager(
@ -79,33 +80,47 @@ func NewPBFTConsensusManager(
miner *Miner, miner *Miner,
bc *blockchain.BlockChain, bc *blockchain.BlockChain,
bp *pool.BlockPool, bp *pool.BlockPool,
b beacon.BeaconNetwork,
mempool *pool.Mempool,
) *PBFTConsensusManager { ) *PBFTConsensusManager {
pcm := &PBFTConsensusManager{} pcm := &PBFTConsensusManager{}
pcm.psb = psb pcm.psb = psb
pcm.miner = miner pcm.miner = miner
pcm.validator = NewConsensusValidator(miner, bc) pcm.validator = NewConsensusValidator(miner, bc, b)
pcm.msgLog = NewConsensusMessageLog() pcm.msgLog = NewConsensusMessageLog()
pcm.minApprovals = minApprovals pcm.minApprovals = minApprovals
pcm.privKey = privKey pcm.privKey = privKey
pcm.ethereumClient = ethereumClient pcm.ethereumClient = ethereumClient
pcm.state = &State{ pcm.state = &State{
ready: make(chan bool, 1), ready: false,
status: StateStatusUnknown, status: StateStatusUnknown,
} }
pcm.bus = bus pcm.bus = bus
pcm.blockPool = bp pcm.blockPool = bp
pcm.psb.Hook(pubsub.PrePrepareMessageType, pcm.handlePrePrepare, types.PrePrepareMessage{}) pcm.mempool = mempool
pcm.psb.Hook(pubsub.PrepareMessageType, pcm.handlePrepare, types.PrepareMessage{}) pcm.blockchain = bc
pcm.psb.Hook(pubsub.CommitMessageType, pcm.handleCommit, types.CommitMessage{}) pcm.psb.Hook(pubsub.PrePrepareMessageType, pcm.handlePrePrepare)
bus.SubscribeOnce("sync:initialSyncCompleted", func() { pcm.psb.Hook(pubsub.PrepareMessageType, pcm.handlePrepare)
pcm.state.ready <- true pcm.psb.Hook(pubsub.CommitMessageType, pcm.handleCommit)
}) //bus.SubscribeOnce("sync:initialSyncCompleted", func() {
// pcm.state.ready = true
//})
height, _ := pcm.blockchain.GetLatestBlockHeight()
pcm.state.blockHeight = height + 1
go func() {
for {
select {
case e := <-b.Beacon.NewEntries():
{
pcm.NewDrandRound(nil, e)
}
}
}
}()
return pcm return pcm
} }
func (pcm *PBFTConsensusManager) propose(blk *types3.Block) error { func (pcm *PBFTConsensusManager) propose(blk *types3.Block) error {
pcm.state.mutex.Lock()
defer pcm.state.mutex.Unlock()
prePrepareMsg, err := NewMessage(types.ConsensusMessage{Block: blk}, types.ConsensusMessageTypePrePrepare, pcm.privKey) prePrepareMsg, err := NewMessage(types.ConsensusMessage{Block: blk}, types.ConsensusMessageTypePrePrepare, pcm.privKey)
if err != nil { if err != nil {
return err return err
@ -116,16 +131,17 @@ func (pcm *PBFTConsensusManager) propose(blk *types3.Block) error {
return nil return nil
} }
func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.GenericMessage) { func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.PubSubMessage) {
pcm.state.mutex.Lock() pcm.state.mutex.Lock()
defer pcm.state.mutex.Unlock() defer pcm.state.mutex.Unlock()
prePrepare, ok := message.Payload.(types.PrePrepareMessage) var prePrepare types.PrePrepareMessage
if !ok { err := cbor.Unmarshal(message.Payload, &prePrepare)
logrus.Warn("failed to convert payload to PrePrepare message") if err != nil {
logrus.Errorf("failed to convert payload to PrePrepare message: %s", err.Error())
return return
} }
if prePrepare.Block.Header.Proposer == pcm.miner.address { if *prePrepare.Block.Header.Proposer == pcm.miner.address {
return return
} }
@ -133,16 +149,15 @@ func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.GenericMessage
Type: types.ConsensusMessageTypePrePrepare, Type: types.ConsensusMessageTypePrePrepare,
From: message.From, From: message.From,
Block: prePrepare.Block, Block: prePrepare.Block,
Blockhash: prePrepare.Block.Header.Hash,
} }
<-pcm.state.ready
if pcm.msgLog.Exists(cmsg) { if pcm.msgLog.Exists(cmsg) {
logrus.Debugf("received existing pre_prepare msg, dropping...") logrus.Tracef("received existing pre_prepare msg for block %x", cmsg.Block.Header.Hash)
return return
} }
if !pcm.validator.Valid(cmsg, map[string]interface{}{"randomness": pcm.state.randomness}) { if !pcm.validator.Valid(cmsg, map[string]interface{}{"randomness": pcm.state.randomness}) {
logrus.Warn("received invalid pre_prepare msg, dropping...") logrus.Warnf("received invalid pre_prepare msg for block %x", cmsg.Block.Header.Hash)
return return
} }
@ -159,12 +174,13 @@ func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.GenericMessage
pcm.state.status = StateStatusPrePrepared pcm.state.status = StateStatusPrePrepared
} }
func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.GenericMessage) { func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) {
pcm.state.mutex.Lock() pcm.state.mutex.Lock()
defer pcm.state.mutex.Unlock() defer pcm.state.mutex.Unlock()
prepare, ok := message.Payload.(types.PrepareMessage) var prepare types.PrepareMessage
if !ok { err := cbor.Unmarshal(message.Payload, &prepare)
logrus.Warn("failed to convert payload to Prepare message") if err != nil {
logrus.Errorf("failed to convert payload to Prepare message: %s", err.Error())
return return
} }
@ -176,17 +192,17 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.GenericMessage) {
} }
if _, err := pcm.blockPool.GetBlock(cmsg.Blockhash); errors.Is(err, cache.ErrNotFound) { if _, err := pcm.blockPool.GetBlock(cmsg.Blockhash); errors.Is(err, cache.ErrNotFound) {
logrus.Debugf("received unknown block") logrus.Warnf("received unknown block %x", cmsg.Blockhash)
return return
} }
if pcm.msgLog.Exists(cmsg) { if pcm.msgLog.Exists(cmsg) {
logrus.Debugf("received existing prepare msg, dropping...") logrus.Tracef("received existing prepare msg for block %x", cmsg.Blockhash)
return return
} }
if !pcm.validator.Valid(cmsg, nil) { if !pcm.validator.Valid(cmsg, nil) {
logrus.Warn("received invalid prepare msg, dropping...") logrus.Warnf("received invalid prepare msg for block %x", cmsg.Blockhash)
return return
} }
@ -196,18 +212,20 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.GenericMessage) {
commitMsg, err := NewMessage(cmsg, types.ConsensusMessageTypeCommit, pcm.privKey) commitMsg, err := NewMessage(cmsg, types.ConsensusMessageTypeCommit, pcm.privKey)
if err != nil { if err != nil {
logrus.Errorf("failed to create commit message: %v", err) logrus.Errorf("failed to create commit message: %v", err)
return
} }
pcm.psb.BroadcastToServiceTopic(commitMsg) pcm.psb.BroadcastToServiceTopic(commitMsg)
pcm.state.status = StateStatusPrepared pcm.state.status = StateStatusPrepared
} }
} }
func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.GenericMessage) { func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.PubSubMessage) {
pcm.state.mutex.Lock() pcm.state.mutex.Lock()
defer pcm.state.mutex.Unlock() defer pcm.state.mutex.Unlock()
commit, ok := message.Payload.(types.CommitMessage) var commit types.CommitMessage
if !ok { err := cbor.Unmarshal(message.Payload, &commit)
logrus.Warn("failed to convert payload to Prepare message") if err != nil {
logrus.Errorf("failed to convert payload to Commit message: %s", err.Error())
return return
} }
@ -215,20 +233,20 @@ func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.GenericMessage) {
Type: types.ConsensusMessageTypeCommit, Type: types.ConsensusMessageTypeCommit,
From: message.From, From: message.From,
Blockhash: commit.Blockhash, Blockhash: commit.Blockhash,
Signature: commit.Signature, // TODO check the signature Signature: commit.Signature,
} }
if _, err := pcm.blockPool.GetBlock(cmsg.Blockhash); errors.Is(err, cache.ErrNotFound) { if _, err := pcm.blockPool.GetBlock(cmsg.Blockhash); errors.Is(err, cache.ErrNotFound) {
logrus.Debugf("received unknown block") logrus.Warnf("received unknown block %x", cmsg.Blockhash)
return return
} }
if pcm.msgLog.Exists(cmsg) { if pcm.msgLog.Exists(cmsg) {
logrus.Debugf("received existing commit msg, dropping...") logrus.Tracef("received existing commit msg for block %x", cmsg.Blockhash)
return return
} }
if !pcm.validator.Valid(cmsg, nil) { if !pcm.validator.Valid(cmsg, nil) {
logrus.Warn("received invalid commit msg, dropping...") logrus.Warnf("received invalid commit msg for block %x", cmsg.Blockhash)
return return
} }
@ -237,7 +255,7 @@ func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.GenericMessage) {
if len(pcm.msgLog.Get(types.ConsensusMessageTypeCommit, cmsg.Blockhash)) >= pcm.minApprovals { if len(pcm.msgLog.Get(types.ConsensusMessageTypeCommit, cmsg.Blockhash)) >= pcm.minApprovals {
block, err := pcm.blockPool.GetBlock(cmsg.Blockhash) block, err := pcm.blockPool.GetBlock(cmsg.Blockhash)
if err != nil { if err != nil {
logrus.Debug(err) logrus.Error(err)
return return
} }
pcm.blockPool.AddAcceptedBlock(block) pcm.blockPool.AddAcceptedBlock(block)
@ -245,23 +263,35 @@ func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.GenericMessage) {
} }
} }
func (pcm *PBFTConsensusManager) NewDrandRound(from phony.Actor, res client.Result) { func (pcm *PBFTConsensusManager) NewDrandRound(from phony.Actor, entry types2.BeaconEntry) {
pcm.Act(from, func() { pcm.Act(from, func() {
pcm.state.mutex.Lock() pcm.state.mutex.Lock()
defer pcm.state.mutex.Unlock() defer pcm.state.mutex.Unlock()
block, err := pcm.commitAcceptedBlocks() block, err := pcm.commitAcceptedBlocks()
if err != nil { if err != nil {
if errors.Is(err, ErrNoAcceptedBlocks) { if errors.Is(err, ErrNoAcceptedBlocks) {
logrus.Warnf("No accepted blocks for consensus round %d", pcm.state.blockHeight) logrus.Infof("No accepted blocks for consensus round %d", pcm.state.blockHeight)
} else { } else {
logrus.Errorf("Failed to select the block in consensus round %d: %s", pcm.state.blockHeight, err.Error()) logrus.Errorf("Failed to select the block in consensus round %d: %s", pcm.state.blockHeight, err.Error())
}
return return
} }
}
if block != nil {
// broadcast new block
var newBlockMessage pubsub.PubSubMessage
newBlockMessage.Type = pubsub.NewBlockMessageType
blockSerialized, err := cbor.Marshal(block)
if err != nil {
logrus.Errorf("Failed to serialize block %x for broadcasting!", block.Header.Hash)
} else {
newBlockMessage.Payload = blockSerialized
pcm.psb.BroadcastToServiceTopic(&newBlockMessage)
}
// if we are miner for this block // if we are miner for this block
// then post dione tasks to target chains (currently, only Ethereum) // then post dione tasks to target chains (currently, only Ethereum)
if block.Header.Proposer == pcm.miner.address { if *block.Header.Proposer == pcm.miner.address {
for _, v := range block.Data { for _, v := range block.Data {
var task types2.DioneTask var task types2.DioneTask
err := cbor.Unmarshal(v.Data, &task) err := cbor.Unmarshal(v.Data, &task)
@ -283,20 +313,37 @@ func (pcm *PBFTConsensusManager) NewDrandRound(from phony.Actor, res client.Resu
} }
} }
pcm.state.ready <- true pcm.state.blockHeight = pcm.state.blockHeight + 1
}
minedBlock, err := pcm.miner.MineBlock(res.Randomness(), block.Header) // get latest block
height, err := pcm.blockchain.GetLatestBlockHeight()
if err != nil { if err != nil {
logrus.Errorf("Failed to mine the block: %s", err.Error()) logrus.Error(err)
return
}
blockHeader, err := pcm.blockchain.FetchBlockHeaderByHeight(height)
if err != nil {
logrus.Error(err)
return return
} }
pcm.state.drandRound = res.Round() pcm.state.drandRound = entry.Round
pcm.state.randomness = res.Randomness() pcm.state.randomness = entry.Data
pcm.state.blockHeight = pcm.state.blockHeight + 1
minedBlock, err := pcm.miner.MineBlock(entry.Data, entry.Round, blockHeader)
if err != nil {
if errors.Is(err, ErrNoTxForBlock) {
logrus.Info("Skipping consensus round, because we don't have transactions in mempool for including into block")
} else {
logrus.Errorf("Failed to mine the block: %s", err.Error())
}
return
}
// if we are round winner // if we are round winner
if minedBlock != nil { if minedBlock != nil {
logrus.Infof("We are elected in consensus round %d", pcm.state.blockHeight)
err = pcm.propose(minedBlock) err = pcm.propose(minedBlock)
if err != nil { if err != nil {
logrus.Errorf("Failed to propose the block: %s", err.Error()) logrus.Errorf("Failed to propose the block: %s", err.Error())
@ -327,7 +374,10 @@ func (pcm *PBFTConsensusManager) commitAcceptedBlocks() (*types3.Block, error) {
maxStake = stake maxStake = stake
selectedBlock = v selectedBlock = v
} }
logrus.Debugf("Selected block of miner %s", selectedBlock.Header.ProposerEth.Hex()) logrus.Infof("Committed block %x with height %d of miner %s", selectedBlock.Header.Hash, selectedBlock.Header.Height, selectedBlock.Header.Proposer.String())
pcm.blockPool.PruneAcceptedBlocks() pcm.blockPool.PruneAcceptedBlocks(selectedBlock)
for _, v := range selectedBlock.Data {
pcm.mempool.DeleteTx(v.Hash)
}
return selectedBlock, pcm.blockchain.StoreBlock(selectedBlock) return selectedBlock, pcm.blockchain.StoreBlock(selectedBlock)
} }

View File

@ -2,9 +2,12 @@ package consensus
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"sync" "sync"
"github.com/Secured-Finance/dione/beacon"
types3 "github.com/Secured-Finance/dione/blockchain/types" types3 "github.com/Secured-Finance/dione/blockchain/types"
"github.com/Secured-Finance/dione/blockchain" "github.com/Secured-Finance/dione/blockchain"
@ -22,13 +25,15 @@ import (
type ConsensusValidator struct { type ConsensusValidator struct {
validationFuncMap map[types2.ConsensusMessageType]func(msg types2.ConsensusMessage, metadata map[string]interface{}) bool validationFuncMap map[types2.ConsensusMessageType]func(msg types2.ConsensusMessage, metadata map[string]interface{}) bool
miner *Miner miner *Miner
beacon beacon.BeaconNetwork
blockchain *blockchain.BlockChain blockchain *blockchain.BlockChain
} }
func NewConsensusValidator(miner *Miner, bc *blockchain.BlockChain) *ConsensusValidator { func NewConsensusValidator(miner *Miner, bc *blockchain.BlockChain, b beacon.BeaconNetwork) *ConsensusValidator {
cv := &ConsensusValidator{ cv := &ConsensusValidator{
miner: miner, miner: miner,
blockchain: bc, blockchain: bc,
beacon: b,
} }
cv.validationFuncMap = map[types2.ConsensusMessageType]func(msg types2.ConsensusMessage, metadata map[string]interface{}) bool{ cv.validationFuncMap = map[types2.ConsensusMessageType]func(msg types2.ConsensusMessage, metadata map[string]interface{}) bool{
@ -64,7 +69,7 @@ func NewConsensusValidator(miner *Miner, bc *blockchain.BlockChain) *ConsensusVa
return false return false
} }
if bytes.Compare(msg.Block.Header.LastHash, previousBlockHeader.Hash) != 0 { if bytes.Compare(msg.Block.Header.LastHash, previousBlockHeader.Hash) != 0 {
logrus.Error("block header has invalid last block hash") logrus.Errorf("block header has invalid last block hash (expected: %x, actual %x)", previousBlockHeader.Hash, msg.Block.Header.LastHash)
return false return false
} }
@ -101,8 +106,13 @@ func NewConsensusValidator(miner *Miner, bc *blockchain.BlockChain) *ConsensusVa
return false return false
} }
res, err := b.Beacon.Entry(context.TODO(), msg.Block.Header.ElectionProof.RandomnessRound)
if err != nil {
logrus.Error(err)
return false
}
eproofRandomness, err := DrawRandomness( eproofRandomness, err := DrawRandomness(
metadata["randomness"].([]byte), res.Data,
crypto.DomainSeparationTag_ElectionProofProduction, crypto.DomainSeparationTag_ElectionProofProduction,
msg.Block.Header.Height, msg.Block.Header.Height,
proposerBuf, proposerBuf,
@ -111,7 +121,7 @@ func NewConsensusValidator(miner *Miner, bc *blockchain.BlockChain) *ConsensusVa
logrus.Errorf("failed to draw ElectionProof randomness: %s", err.Error()) logrus.Errorf("failed to draw ElectionProof randomness: %s", err.Error())
return false return false
} }
err = VerifyVRF(msg.Block.Header.Proposer, eproofRandomness, msg.Block.Header.ElectionProof.VRFProof) err = VerifyVRF(*msg.Block.Header.Proposer, eproofRandomness, msg.Block.Header.ElectionProof.VRFProof)
if err != nil { if err != nil {
logrus.Errorf("failed to verify election proof vrf: %v", err) logrus.Errorf("failed to verify election proof vrf: %v", err)
return false return false

View File

@ -19,6 +19,10 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
var (
ErrNoTxForBlock = fmt.Errorf("no transactions for including into block")
)
type Miner struct { type Miner struct {
address peer.ID address peer.ID
ethAddress common.Address ethAddress common.Address
@ -85,7 +89,7 @@ func (m *Miner) GetStakeInfo(miner common.Address) (*big.Int, *big.Int, error) {
return mStake, nStake, nil return mStake, nStake, nil
} }
func (m *Miner) MineBlock(randomness []byte, lastBlockHeader *types2.BlockHeader) (*types2.Block, error) { func (m *Miner) MineBlock(randomness []byte, randomnessRound uint64, lastBlockHeader *types2.BlockHeader) (*types2.Block, error) {
logrus.Debug("attempting to mine the block at epoch: ", lastBlockHeader.Height+1) logrus.Debug("attempting to mine the block at epoch: ", lastBlockHeader.Height+1)
if err := m.UpdateCurrentStakeInfo(); err != nil { if err := m.UpdateCurrentStakeInfo(); err != nil {
@ -96,6 +100,7 @@ func (m *Miner) MineBlock(randomness []byte, lastBlockHeader *types2.BlockHeader
lastBlockHeader.Height+1, lastBlockHeader.Height+1,
m.address, m.address,
randomness, randomness,
randomnessRound,
m.minerStake, m.minerStake,
m.networkStake, m.networkStake,
m.privateKey, m.privateKey,
@ -110,7 +115,7 @@ func (m *Miner) MineBlock(randomness []byte, lastBlockHeader *types2.BlockHeader
txs := m.mempool.GetTransactionsForNewBlock() txs := m.mempool.GetTransactionsForNewBlock()
if txs == nil { if txs == nil {
return nil, fmt.Errorf("there is no txes for processing") // skip new consensus round because there is no transaction for processing return nil, ErrNoTxForBlock // skip new consensus round because there is no transaction for processing
} }
newBlock, err := types2.CreateBlock(lastBlockHeader, txs, m.ethAddress, m.privateKey, winner) newBlock, err := types2.CreateBlock(lastBlockHeader, txs, m.ethAddress, m.privateKey, winner)

View File

@ -5,6 +5,8 @@ import (
"fmt" "fmt"
"math/big" "math/big"
"github.com/fxamacker/cbor/v2"
"github.com/Secured-Finance/dione/pubsub" "github.com/Secured-Finance/dione/pubsub"
types2 "github.com/Secured-Finance/dione/consensus/types" types2 "github.com/Secured-Finance/dione/consensus/types"
@ -30,7 +32,7 @@ func VerifyVRF(worker peer.ID, vrfBase, vrfproof []byte) error {
if err != nil { if err != nil {
return err return err
} }
ok, err := pk.Verify(vrfproof, vrfBase) ok, err := pk.Verify(vrfBase, vrfproof)
if err != nil { if err != nil {
return err return err
} }
@ -42,7 +44,7 @@ func VerifyVRF(worker peer.ID, vrfBase, vrfproof []byte) error {
} }
func IsRoundWinner(round uint64, func IsRoundWinner(round uint64,
worker peer.ID, randomness []byte, minerStake, networkStake *big.Int, privKey crypto.PrivKey) (*types.ElectionProof, error) { worker peer.ID, randomness []byte, randomnessRound uint64, minerStake, networkStake *big.Int, privKey crypto.PrivKey) (*types.ElectionProof, error) {
buf, err := worker.MarshalBinary() buf, err := worker.MarshalBinary()
if err != nil { if err != nil {
@ -59,7 +61,7 @@ func IsRoundWinner(round uint64,
return nil, fmt.Errorf("failed to compute VRF: %w", err) return nil, fmt.Errorf("failed to compute VRF: %w", err)
} }
ep := &types.ElectionProof{VRFProof: vrfout} ep := &types.ElectionProof{VRFProof: vrfout, RandomnessRound: randomnessRound}
j := ep.ComputeWinCount(minerStake, networkStake) j := ep.ComputeWinCount(minerStake, networkStake)
ep.WinCount = j ep.WinCount = j
if j < 1 { if j < 1 {
@ -90,29 +92,38 @@ func DrawRandomness(rbase []byte, pers crypto2.DomainSeparationTag, round uint64
return h.Sum(nil), nil return h.Sum(nil), nil
} }
func NewMessage(cmsg types2.ConsensusMessage, typ types2.ConsensusMessageType, privKey crypto.PrivKey) (*pubsub.GenericMessage, error) { func NewMessage(cmsg types2.ConsensusMessage, typ types2.ConsensusMessageType, privKey crypto.PrivKey) (*pubsub.PubSubMessage, error) {
var message pubsub.GenericMessage var message pubsub.PubSubMessage
switch typ { switch typ {
case types2.ConsensusMessageTypePrePrepare: case types2.ConsensusMessageTypePrePrepare:
{ {
message.Type = pubsub.PrePrepareMessageType message.Type = pubsub.PrePrepareMessageType
message.Payload = types2.PrePrepareMessage{ msg := types2.PrePrepareMessage{
Block: cmsg.Block, Block: cmsg.Block,
} }
data, err := cbor.Marshal(msg)
if err != nil {
return nil, fmt.Errorf("failed to convert message to map: %s", err.Error())
}
message.Payload = data
break break
} }
case types2.ConsensusMessageTypePrepare: case types2.ConsensusMessageTypePrepare:
{ {
message.Type = pubsub.PrepareMessageType message.Type = pubsub.PrepareMessageType
pm := types2.PrepareMessage{
Blockhash: cmsg.Blockhash,
}
signature, err := privKey.Sign(cmsg.Blockhash) signature, err := privKey.Sign(cmsg.Blockhash)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create signature: %v", err) return nil, fmt.Errorf("failed to create signature: %v", err)
} }
pm.Signature = signature pm := types2.PrepareMessage{
message.Payload = pm Blockhash: cmsg.Block.Header.Hash,
Signature: signature,
}
data, err := cbor.Marshal(pm)
if err != nil {
return nil, fmt.Errorf("failed to convert message to map: %s", err.Error())
}
message.Payload = data
break break
} }
case types2.ConsensusMessageTypeCommit: case types2.ConsensusMessageTypeCommit:
@ -126,7 +137,11 @@ func NewMessage(cmsg types2.ConsensusMessage, typ types2.ConsensusMessageType, p
return nil, fmt.Errorf("failed to create signature: %v", err) return nil, fmt.Errorf("failed to create signature: %v", err)
} }
pm.Signature = signature pm.Signature = signature
message.Payload = pm data, err := cbor.Marshal(pm)
if err != nil {
return nil, fmt.Errorf("failed to convert message to map: %s", err.Error())
}
message.Payload = data
break break
} }
} }

View File

@ -7,6 +7,8 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"os" "os"
"path"
"runtime"
"time" "time"
"github.com/Secured-Finance/dione/blockchain" "github.com/Secured-Finance/dione/blockchain"
@ -64,7 +66,7 @@ type Node struct {
Ethereum *ethclient.EthereumClient Ethereum *ethclient.EthereumClient
ConsensusManager *consensus.PBFTConsensusManager ConsensusManager *consensus.PBFTConsensusManager
Miner *consensus.Miner Miner *consensus.Miner
Beacon beacon.BeaconNetworks Beacon beacon.BeaconNetwork
DisputeManager *consensus.DisputeManager DisputeManager *consensus.DisputeManager
BlockPool *pool.BlockPool BlockPool *pool.BlockPool
MemPool *pool.Mempool MemPool *pool.Mempool
@ -189,19 +191,19 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim
n.Miner = miner n.Miner = miner
logrus.Info("Mining subsystem has been initialized!") logrus.Info("Mining subsystem has been initialized!")
// initialize consensus subsystem
consensusManager := provideConsensusManager(bus, psb, miner, bc, ethClient, prvKey, n.Config.ConsensusMinApprovals, bp)
n.ConsensusManager = consensusManager
logrus.Info("Consensus subsystem has been initialized!")
// initialize random beacon network subsystem // initialize random beacon network subsystem
randomBeaconNetwork, err := provideBeacon(psb.Pubsub, consensusManager) randomBeaconNetwork, err := provideBeacon(psb.Pubsub)
if err != nil { if err != nil {
logrus.Fatal(err) logrus.Fatal(err)
} }
n.Beacon = randomBeaconNetwork n.Beacon = randomBeaconNetwork
logrus.Info("Random beacon subsystem has been initialized!") logrus.Info("Random beacon subsystem has been initialized!")
// initialize consensus subsystem
consensusManager := provideConsensusManager(bus, psb, miner, bc, ethClient, prvKey, n.Config.ConsensusMinApprovals, bp, randomBeaconNetwork, mp)
n.ConsensusManager = consensusManager
logrus.Info("Consensus subsystem has been initialized!")
// initialize dispute subsystem // initialize dispute subsystem
disputeManager, err := provideDisputeManager(context.TODO(), ethClient, consensusManager, config, bc) disputeManager, err := provideDisputeManager(context.TODO(), ethClient, consensusManager, config, bc)
if err != nil { if err != nil {
@ -344,6 +346,14 @@ func (n *Node) setupRPCClients() error {
} }
func Start() { func Start() {
logrus.SetReportCaller(true)
logrus.SetFormatter(&logrus.TextFormatter{
CallerPrettyfier: func(f *runtime.Frame) (string, string) {
filename := path.Base(f.File)
return "", fmt.Sprintf("%s:%d:", filename, f.Line)
},
})
configPath := flag.String("config", "", "Path to config") configPath := flag.String("config", "", "Path to config")
verbose := flag.Bool("verbose", false, "Verbose logging") verbose := flag.Bool("verbose", false, "Verbose logging")
flag.Parse() flag.Parse()

View File

@ -64,15 +64,13 @@ func provideMiner(peerID peer.ID, ethAddress common.Address, ethClient *ethclien
return consensus.NewMiner(peerID, ethAddress, ethClient, privateKey, mempool) return consensus.NewMiner(peerID, ethAddress, ethClient, privateKey, mempool)
} }
func provideBeacon(ps *pubsub2.PubSub, pcm *consensus.PBFTConsensusManager) (beacon.BeaconNetworks, error) { func provideBeacon(ps *pubsub2.PubSub) (beacon.BeaconNetwork, error) {
networks := beacon.BeaconNetworks{} bc, err := drand2.NewDrandBeacon(ps)
bc, err := drand2.NewDrandBeacon(ps, pcm)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to setup drand beacon: %w", err) return beacon.BeaconNetwork{}, fmt.Errorf("failed to setup drand beacon: %w", err)
} }
networks = append(networks, beacon.BeaconNetwork{Start: config.DrandChainGenesisTime, Beacon: bc})
// NOTE: currently we use only one network // NOTE: currently we use only one network
return networks, nil return beacon.BeaconNetwork{Start: config.DrandChainGenesisTime, Beacon: bc}, nil
} }
// FIXME: do we really need this? // FIXME: do we really need this?
@ -114,6 +112,8 @@ func provideConsensusManager(
privateKey crypto.PrivKey, privateKey crypto.PrivKey,
minApprovals int, minApprovals int,
bp *pool.BlockPool, bp *pool.BlockPool,
b beacon.BeaconNetwork,
mp *pool.Mempool,
) *consensus.PBFTConsensusManager { ) *consensus.PBFTConsensusManager {
return consensus.NewPBFTConsensusManager( return consensus.NewPBFTConsensusManager(
bus, bus,
@ -124,6 +124,8 @@ func provideConsensusManager(
miner, miner,
bc, bc,
bp, bp,
b,
mp,
) )
} }

View File

@ -13,13 +13,8 @@ const (
NewBlockMessageType NewBlockMessageType
) )
type GenericMessage struct {
Type PubSubMessageType
From peer.ID `cbor:"-"`
Payload interface{}
}
type PubSubMessage struct { type PubSubMessage struct {
Type PubSubMessageType Type PubSubMessageType
From peer.ID `cbor:"-"`
Payload []byte Payload []byte
} }

View File

@ -21,10 +21,9 @@ type PubSubRouter struct {
handlers map[PubSubMessageType][]Handler handlers map[PubSubMessageType][]Handler
oracleTopicName string oracleTopicName string
oracleTopic *pubsub.Topic oracleTopic *pubsub.Topic
typeMapping map[PubSubMessageType]interface{} // message type -> sample
} }
type Handler func(message *GenericMessage) type Handler func(message *PubSubMessage)
func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubRouter { func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubRouter {
ctx, ctxCancel := context.WithCancel(context.Background()) ctx, ctxCancel := context.WithCancel(context.Background())
@ -34,7 +33,6 @@ func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubR
context: ctx, context: ctx,
contextCancel: ctxCancel, contextCancel: ctxCancel,
handlers: make(map[PubSubMessageType][]Handler), handlers: make(map[PubSubMessageType][]Handler),
typeMapping: map[PubSubMessageType]interface{}{},
} }
var pbOptions []pubsub.Option var pbOptions []pubsub.Option
@ -104,30 +102,16 @@ func (psr *PubSubRouter) handleMessage(p *pubsub.Message) {
if senderPeerID == psr.node.ID() { if senderPeerID == psr.node.ID() {
return return
} }
var genericMessage PubSubMessage var message PubSubMessage
var message GenericMessage err = cbor.Unmarshal(p.Data, &message)
err = cbor.Unmarshal(p.Data, &genericMessage)
if err != nil {
logrus.Warn("Unable to decode pubsub message data! " + err.Error())
return
}
sampleMsg, ok := psr.typeMapping[genericMessage.Type]
if !ok {
logrus.Warnf("Unknown message type %d: we have no clue how to decode it", genericMessage.Type)
return
}
destMsg := sampleMsg
err = cbor.Unmarshal(genericMessage.Payload, &destMsg)
if err != nil { if err != nil {
logrus.Warn("Unable to decode pubsub message data! " + err.Error()) logrus.Warn("Unable to decode pubsub message data! " + err.Error())
return return
} }
message.From = senderPeerID message.From = senderPeerID
message.Type = genericMessage.Type handlers, ok := psr.handlers[message.Type]
message.Payload = destMsg
handlers, ok := psr.handlers[genericMessage.Type]
if !ok { if !ok {
logrus.Warn("Dropping pubsub message " + string(genericMessage.Type) + " because we don't have any handlers!") logrus.Warnf("Dropping pubsub message with type %d because we don't have any handlers!", message.Type)
return return
} }
for _, v := range handlers { for _, v := range handlers {
@ -135,16 +119,15 @@ func (psr *PubSubRouter) handleMessage(p *pubsub.Message) {
} }
} }
func (psr *PubSubRouter) Hook(messageType PubSubMessageType, handler Handler, sample interface{}) { func (psr *PubSubRouter) Hook(messageType PubSubMessageType, handler Handler) {
_, ok := psr.handlers[messageType] _, ok := psr.handlers[messageType]
if !ok { if !ok {
psr.handlers[messageType] = []Handler{} psr.handlers[messageType] = []Handler{}
} }
psr.handlers[messageType] = append(psr.handlers[messageType], handler) psr.handlers[messageType] = append(psr.handlers[messageType], handler)
psr.typeMapping[messageType] = sample
} }
func (psr *PubSubRouter) BroadcastToServiceTopic(msg *GenericMessage) error { func (psr *PubSubRouter) BroadcastToServiceTopic(msg *PubSubMessage) error {
data, err := cbor.Marshal(msg) data, err := cbor.Marshal(msg)
if err != nil { if err != nil {
return err return err

View File

@ -114,6 +114,6 @@ func (c *LotusClient) HandleRequest(method string, params []interface{}) ([]byte
return nil, err return nil, err
} }
bodyBytes := resp.Body() bodyBytes := resp.Body()
logrus.Debugf("Filecoin RPC reply: %v", string(bodyBytes)) logrus.Tracef("Filecoin RPC reply: %v", string(bodyBytes))
return bodyBytes, nil return bodyBytes, nil
} }

View File

@ -10,6 +10,7 @@ import (
type ElectionProof struct { type ElectionProof struct {
WinCount int64 WinCount int64
VRFProof []byte VRFProof []byte
RandomnessRound uint64
} }
const precision = 256 const precision = 256