diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index 28b6846..092b4ff 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "encoding/hex" "errors" + "os" "github.com/Secured-Finance/dione/blockchain/utils" @@ -41,12 +42,21 @@ func NewBlockChain(path string) (*BlockChain, error) { return nil, err } + err = env.SetMaxDBs(1) + if err != nil { + return nil, err + } err = env.SetMapSize(100 * 1024 * 1024 * 1024) // 100 GB if err != nil { return nil, err } - err = env.Open(path, 0, 0664) + err = os.MkdirAll(path, 0755) + if err != nil { + return nil, err + } + + err = env.Open(path, 0, 0755) if err != nil { return nil, err } @@ -111,7 +121,7 @@ func (bp *BlockChain) StoreBlock(block *types2.Block) error { } // update index "height -> block hash" - var heightBytes []byte + heightBytes := make([]byte, 8) binary.LittleEndian.PutUint64(heightBytes, block.Header.Height) err = bp.heightIndex.PutBytes(heightBytes, block.Header.Hash) if err != nil { diff --git a/blockchain/sync/sync_mgr.go b/blockchain/sync/sync_mgr.go index 08fca6e..88d07a5 100644 --- a/blockchain/sync/sync_mgr.go +++ b/blockchain/sync/sync_mgr.go @@ -99,14 +99,15 @@ func (sm *syncManager) doInitialBlockPoolSync() error { } } + if sm.bootstrapPeer == "" { + return nil // FIXME + } + var reply wire.LastBlockHeightReply err = sm.rpcClient.Call(sm.bootstrapPeer, "NetworkService", "LastBlockHeight", nil, &reply) if err != nil { return err } - if reply.Error != nil { - return reply.Error - } if reply.Height > ourLastHeight { heightCount := reply.Height - ourLastHeight @@ -150,6 +151,10 @@ func (sm *syncManager) doInitialBlockPoolSync() error { } func (sm *syncManager) doInitialMempoolSync() error { + if sm.bootstrapPeer == "" { + return nil // FIXME + } + var reply wire.InvMessage err := sm.rpcClient.Call(sm.bootstrapPeer, "NetworkService", "Mempool", nil, &reply) if err != nil { @@ -187,9 +192,6 @@ func (sm *syncManager) doInitialMempoolSync() error { if err != nil { return err } - if getMempoolTxReply.Error != nil { - return getMempoolTxReply.Error - } for _, v := range getMempoolTxReply.Transactions { err := sm.mempool.StoreTx(&v) if err != nil { diff --git a/blockchain/utils/index.go b/blockchain/utils/index.go index ff3b074..24a7896 100644 --- a/blockchain/utils/index.go +++ b/blockchain/utils/index.go @@ -32,7 +32,7 @@ func NewIndex(name string, dbEnv *lmdb.Env, db lmdb.DBI) *Index { func (i *Index) PutUint64(key []byte, value uint64) error { return i.dbEnv.Update(func(txn *lmdb.Txn) error { - var data []byte + data := make([]byte, 8) binary.LittleEndian.PutUint64(data, value) return txn.Put(i.db, i.constructIndexKey(key), data, 0) }) diff --git a/consensus/consensus.go b/consensus/consensus.go index 240beca..e7d6af8 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -32,6 +32,10 @@ import ( "github.com/Secured-Finance/dione/pubsub" ) +var ( + ErrNoAcceptedBlocks = errors.New("there is no accepted blocks") +) + type StateStatus uint8 const ( @@ -52,7 +56,7 @@ type PBFTConsensusManager struct { validator *ConsensusValidator ethereumClient *ethclient.EthereumClient miner *Miner - blockPool pool.BlockPool + blockPool *pool.BlockPool blockchain blockchain.BlockChain state *State } @@ -66,7 +70,16 @@ type State struct { ready chan bool } -func NewPBFTConsensusManager(bus EventBus.Bus, psb *pubsub.PubSubRouter, minApprovals int, privKey crypto.PrivKey, ethereumClient *ethclient.EthereumClient, miner *Miner, bc *blockchain.BlockChain) *PBFTConsensusManager { +func NewPBFTConsensusManager( + bus EventBus.Bus, + psb *pubsub.PubSubRouter, + minApprovals int, + privKey crypto.PrivKey, + ethereumClient *ethclient.EthereumClient, + miner *Miner, + bc *blockchain.BlockChain, + bp *pool.BlockPool, +) *PBFTConsensusManager { pcm := &PBFTConsensusManager{} pcm.psb = psb pcm.miner = miner @@ -80,6 +93,7 @@ func NewPBFTConsensusManager(bus EventBus.Bus, psb *pubsub.PubSubRouter, minAppr status: StateStatusUnknown, } pcm.bus = bus + pcm.blockPool = bp pcm.psb.Hook(pubsub.PrePrepareMessageType, pcm.handlePrePrepare, types.PrePrepareMessage{}) pcm.psb.Hook(pubsub.PrepareMessageType, pcm.handlePrepare, types.PrepareMessage{}) pcm.psb.Hook(pubsub.CommitMessageType, pcm.handleCommit, types.CommitMessage{}) @@ -237,7 +251,11 @@ func (pcm *PBFTConsensusManager) NewDrandRound(from phony.Actor, res client.Resu defer pcm.state.mutex.Unlock() block, err := pcm.commitAcceptedBlocks() if err != nil { - logrus.Errorf("Failed to select the block in consensus round %d: %s", pcm.state.blockHeight, err.Error()) + if errors.Is(err, ErrNoAcceptedBlocks) { + logrus.Warnf("No accepted blocks for consensus round %d", pcm.state.blockHeight) + } else { + logrus.Errorf("Failed to select the block in consensus round %d: %s", pcm.state.blockHeight, err.Error()) + } return } @@ -245,7 +263,7 @@ func (pcm *PBFTConsensusManager) NewDrandRound(from phony.Actor, res client.Resu // then post dione tasks to target chains (currently, only Ethereum) if block.Header.Proposer == pcm.miner.address { for _, v := range block.Data { - var task *types2.DioneTask + var task types2.DioneTask err := cbor.Unmarshal(v.Data, &task) if err != nil { logrus.Errorf("Failed to unmarshal transaction %x payload: %s", v.Hash, err.Error()) @@ -291,7 +309,7 @@ func (pcm *PBFTConsensusManager) NewDrandRound(from phony.Actor, res client.Resu func (pcm *PBFTConsensusManager) commitAcceptedBlocks() (*types3.Block, error) { blocks := pcm.blockPool.GetAllAcceptedBlocks() if blocks == nil { - return nil, errors.New("there is no accepted blocks") + return nil, ErrNoAcceptedBlocks } var maxStake *big.Int var selectedBlock *types3.Block diff --git a/node/network_service.go b/node/network_service.go index 5b44b9a..ff3fc04 100644 --- a/node/network_service.go +++ b/node/network_service.go @@ -18,39 +18,36 @@ import ( ) type NetworkService struct { - blockpool *blockchain.BlockChain - mempool *pool.Mempool - rpcClient *gorpc.Client + blockchain *blockchain.BlockChain + mempool *pool.Mempool + rpcClient *gorpc.Client } -func NewNetworkService(bp *blockchain.BlockChain) *NetworkService { +func NewNetworkService(bc *blockchain.BlockChain, mp *pool.Mempool) *NetworkService { return &NetworkService{ - blockpool: bp, + blockchain: bc, + mempool: mp, } } -func (s *NetworkService) LastBlockHeight(ctx context.Context, arg struct{}, reply *wire.LastBlockHeightReply) { - height, err := s.blockpool.GetLatestBlockHeight() +func (s *NetworkService) LastBlockHeight(ctx context.Context, arg struct{}, reply *wire.LastBlockHeightReply) error { + height, err := s.blockchain.GetLatestBlockHeight() if err != nil { - reply.Error = err - return + return err } reply.Height = height + return nil } -func (s *NetworkService) GetRangeOfBlocks(ctx context.Context, arg wire.GetRangeOfBlocksArg, reply *wire.GetRangeOfBlocksReply) { +func (s *NetworkService) GetRangeOfBlocks(ctx context.Context, arg wire.GetRangeOfBlocksArg, reply *wire.GetRangeOfBlocksReply) error { if arg.From > arg.To { - errText := "incorrect arguments: from > to" - reply.Error = &errText - return + return fmt.Errorf("incorrect arguments: from > to") } if arg.To-arg.From > policy.MaxBlockCountForRetrieving { - errText := "incorrect arguments: count of block for retrieving is exceeded the limit" - reply.Error = &errText - return + return fmt.Errorf("incorrect arguments: count of block for retrieving is exceeded the limit") } for i := arg.From; i <= arg.To; i++ { - block, err := s.blockpool.FetchBlockByHeight(i) + block, err := s.blockchain.FetchBlockByHeight(i) if err != nil { logrus.Warnf("failed to retrieve block from blockpool with height %d", i) reply.FailedBlockHeights = append(reply.FailedBlockHeights, i) @@ -58,9 +55,10 @@ func (s *NetworkService) GetRangeOfBlocks(ctx context.Context, arg wire.GetRange } reply.Blocks = append(reply.Blocks, *block) } + return nil } -func (s *NetworkService) Mempool(ctx context.Context, arg struct{}, reply *wire.InvMessage) { +func (s *NetworkService) Mempool(ctx context.Context, arg struct{}, reply *wire.InvMessage) error { txs := s.mempool.GetAllTransactions() // extract hashes of txs @@ -70,14 +68,15 @@ func (s *NetworkService) Mempool(ctx context.Context, arg struct{}, reply *wire. Hash: v.Hash, }) } + + return nil } -func (s *NetworkService) GetMempoolTxs(ctx context.Context, arg wire.GetMempoolTxsArg, reply *wire.GetMempoolTxsReply) { +func (s *NetworkService) GetMempoolTxs(ctx context.Context, arg wire.GetMempoolTxsArg, reply *wire.GetMempoolTxsReply) error { if len(arg.Items) > policy.MaxTransactionCountForRetrieving { pid, _ := gorpc.GetRequestSender(ctx) logrus.Warnf("Max tx count limit exceeded for GetMempoolTxs request of node %s", pid) - reply.Error = fmt.Errorf("max tx count limit exceeded") - return + return fmt.Errorf("max tx count limit exceeded") } for _, v := range arg.Items { @@ -86,10 +85,11 @@ func (s *NetworkService) GetMempoolTxs(ctx context.Context, arg wire.GetMempoolT if errors.Is(err, pool.ErrTxNotFound) { reply.NotFoundTxs = append(reply.NotFoundTxs, v) } else { - reply.Error = err - return + return err } } reply.Transactions = append(reply.Transactions, *tx) } + + return nil } diff --git a/node/node.go b/node/node.go index 400e39d..932b981 100644 --- a/node/node.go +++ b/node/node.go @@ -9,6 +9,8 @@ import ( "os" "time" + "github.com/Secured-Finance/dione/blockchain" + "github.com/multiformats/go-multiaddr" "github.com/asaskevich/EventBus" @@ -17,8 +19,6 @@ import ( "github.com/Secured-Finance/dione/types" - "github.com/Secured-Finance/dione/blockchain" - types2 "github.com/Secured-Finance/dione/blockchain/types" gorpc "github.com/libp2p/go-libp2p-gorpc" @@ -66,8 +66,9 @@ type Node struct { Miner *consensus.Miner Beacon beacon.BeaconNetworks DisputeManager *consensus.DisputeManager - BlockPool *blockchain.BlockChain + BlockPool *pool.BlockPool MemPool *pool.Mempool + BlockChain *blockchain.BlockChain SyncManager sync.SyncManager NetworkService *NetworkService NetworkRPCHost *gorpc.Server @@ -138,7 +139,7 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim if err != nil { logrus.Fatalf("Failed to initialize blockpool: %s", err.Error()) } - n.BlockPool = bc + n.BlockChain = bc logrus.Info("Block pool database has been successfully initialized!") // initialize mempool @@ -149,7 +150,14 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim n.MemPool = mp logrus.Info("Mempool has been successfully initialized!") - ns := provideNetworkService(bc) + bp, err := provideBlockPool(mp) + if err != nil { + logrus.Fatalf("Failed to initialize blockpool: %s", err.Error()) + } + n.BlockPool = bp + logrus.Info("Blockpool has been successfully initialized!") + + ns := provideNetworkService(bc, mp) n.NetworkService = ns rpcHost := provideNetworkRPCHost(lhost) err = rpcHost.Register(ns) @@ -182,7 +190,7 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim logrus.Info("Mining subsystem has been initialized!") // initialize consensus subsystem - consensusManager := provideConsensusManager(bus, psb, miner, bc, ethClient, prvKey, n.Config.ConsensusMinApprovals) + consensusManager := provideConsensusManager(bus, psb, miner, bc, ethClient, prvKey, n.Config.ConsensusMinApprovals, bp) n.ConsensusManager = consensusManager logrus.Info("Consensus subsystem has been initialized!") diff --git a/node/node_dep_providers.go b/node/node_dep_providers.go index 6ddc7e1..8f43b78 100644 --- a/node/node_dep_providers.go +++ b/node/node_dep_providers.go @@ -105,8 +105,26 @@ func providePubsubRouter(lhost host.Host, config *config.Config) *pubsub.PubSubR return pubsub.NewPubSubRouter(lhost, config.PubSub.ServiceTopicName, config.IsBootstrap) } -func provideConsensusManager(bus EventBus.Bus, psb *pubsub.PubSubRouter, miner *consensus.Miner, bc *blockchain.BlockChain, ethClient *ethclient.EthereumClient, privateKey crypto.PrivKey, minApprovals int) *consensus.PBFTConsensusManager { - return consensus.NewPBFTConsensusManager(bus, psb, minApprovals, privateKey, ethClient, miner, bc) +func provideConsensusManager( + bus EventBus.Bus, + psb *pubsub.PubSubRouter, + miner *consensus.Miner, + bc *blockchain.BlockChain, + ethClient *ethclient.EthereumClient, + privateKey crypto.PrivKey, + minApprovals int, + bp *pool.BlockPool, +) *consensus.PBFTConsensusManager { + return consensus.NewPBFTConsensusManager( + bus, + psb, + minApprovals, + privateKey, + ethClient, + miner, + bc, + bp, + ) } func provideLibp2pHost(config *config.Config, privateKey crypto.PrivKey) (host.Host, error) { @@ -181,6 +199,10 @@ func provideP2PRPCClient(h host.Host) *gorpc.Client { return gorpc.NewClient(h, DioneProtocolID) } -func provideNetworkService(bp *blockchain.BlockChain) *NetworkService { - return NewNetworkService(bp) +func provideNetworkService(bp *blockchain.BlockChain, mp *pool.Mempool) *NetworkService { + return NewNetworkService(bp, mp) +} + +func provideBlockPool(mp *pool.Mempool) (*pool.BlockPool, error) { + return pool.NewBlockPool(mp) } diff --git a/node/wire/get_mempool_tx.go b/node/wire/get_mempool_tx.go index da364e5..b9881fb 100644 --- a/node/wire/get_mempool_tx.go +++ b/node/wire/get_mempool_tx.go @@ -9,5 +9,4 @@ type GetMempoolTxsArg struct { type GetMempoolTxsReply struct { Transactions []types.Transaction NotFoundTxs [][]byte - Error error } diff --git a/node/wire/get_range_of_blocks.go b/node/wire/get_range_of_blocks.go index 5ecc770..2c8321d 100644 --- a/node/wire/get_range_of_blocks.go +++ b/node/wire/get_range_of_blocks.go @@ -10,5 +10,4 @@ type GetRangeOfBlocksArg struct { type GetRangeOfBlocksReply struct { Blocks []types.Block FailedBlockHeights []uint64 // list of block heights the node was unable to retrieve - Error *string } diff --git a/node/wire/last_block_height.go b/node/wire/last_block_height.go index c7ceaac..eb3f2e3 100644 --- a/node/wire/last_block_height.go +++ b/node/wire/last_block_height.go @@ -2,5 +2,4 @@ package wire type LastBlockHeightReply struct { Height uint64 - Error error } diff --git a/pubsub/pubsub_router.go b/pubsub/pubsub_router.go index e76cae8..1c7e97a 100644 --- a/pubsub/pubsub_router.go +++ b/pubsub/pubsub_router.go @@ -34,6 +34,7 @@ func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubR context: ctx, contextCancel: ctxCancel, handlers: make(map[PubSubMessageType][]Handler), + typeMapping: map[PubSubMessageType]interface{}{}, } var pbOptions []pubsub.Option