Fix small bugs/mistakes/typos

This commit is contained in:
ChronosX88 2021-07-11 03:32:58 +03:00
parent 5aadbab600
commit 2845a04704
Signed by: ChronosXYZ
GPG Key ID: 085A69A82C8C511A
11 changed files with 108 additions and 50 deletions

View File

@ -4,6 +4,7 @@ import (
"encoding/binary"
"encoding/hex"
"errors"
"os"
"github.com/Secured-Finance/dione/blockchain/utils"
@ -41,12 +42,21 @@ func NewBlockChain(path string) (*BlockChain, error) {
return nil, err
}
err = env.SetMaxDBs(1)
if err != nil {
return nil, err
}
err = env.SetMapSize(100 * 1024 * 1024 * 1024) // 100 GB
if err != nil {
return nil, err
}
err = env.Open(path, 0, 0664)
err = os.MkdirAll(path, 0755)
if err != nil {
return nil, err
}
err = env.Open(path, 0, 0755)
if err != nil {
return nil, err
}
@ -111,7 +121,7 @@ func (bp *BlockChain) StoreBlock(block *types2.Block) error {
}
// update index "height -> block hash"
var heightBytes []byte
heightBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(heightBytes, block.Header.Height)
err = bp.heightIndex.PutBytes(heightBytes, block.Header.Hash)
if err != nil {

View File

@ -99,14 +99,15 @@ func (sm *syncManager) doInitialBlockPoolSync() error {
}
}
if sm.bootstrapPeer == "" {
return nil // FIXME
}
var reply wire.LastBlockHeightReply
err = sm.rpcClient.Call(sm.bootstrapPeer, "NetworkService", "LastBlockHeight", nil, &reply)
if err != nil {
return err
}
if reply.Error != nil {
return reply.Error
}
if reply.Height > ourLastHeight {
heightCount := reply.Height - ourLastHeight
@ -150,6 +151,10 @@ func (sm *syncManager) doInitialBlockPoolSync() error {
}
func (sm *syncManager) doInitialMempoolSync() error {
if sm.bootstrapPeer == "" {
return nil // FIXME
}
var reply wire.InvMessage
err := sm.rpcClient.Call(sm.bootstrapPeer, "NetworkService", "Mempool", nil, &reply)
if err != nil {
@ -187,9 +192,6 @@ func (sm *syncManager) doInitialMempoolSync() error {
if err != nil {
return err
}
if getMempoolTxReply.Error != nil {
return getMempoolTxReply.Error
}
for _, v := range getMempoolTxReply.Transactions {
err := sm.mempool.StoreTx(&v)
if err != nil {

View File

@ -32,7 +32,7 @@ func NewIndex(name string, dbEnv *lmdb.Env, db lmdb.DBI) *Index {
func (i *Index) PutUint64(key []byte, value uint64) error {
return i.dbEnv.Update(func(txn *lmdb.Txn) error {
var data []byte
data := make([]byte, 8)
binary.LittleEndian.PutUint64(data, value)
return txn.Put(i.db, i.constructIndexKey(key), data, 0)
})

View File

@ -32,6 +32,10 @@ import (
"github.com/Secured-Finance/dione/pubsub"
)
var (
ErrNoAcceptedBlocks = errors.New("there is no accepted blocks")
)
type StateStatus uint8
const (
@ -52,7 +56,7 @@ type PBFTConsensusManager struct {
validator *ConsensusValidator
ethereumClient *ethclient.EthereumClient
miner *Miner
blockPool pool.BlockPool
blockPool *pool.BlockPool
blockchain blockchain.BlockChain
state *State
}
@ -66,7 +70,16 @@ type State struct {
ready chan bool
}
func NewPBFTConsensusManager(bus EventBus.Bus, psb *pubsub.PubSubRouter, minApprovals int, privKey crypto.PrivKey, ethereumClient *ethclient.EthereumClient, miner *Miner, bc *blockchain.BlockChain) *PBFTConsensusManager {
func NewPBFTConsensusManager(
bus EventBus.Bus,
psb *pubsub.PubSubRouter,
minApprovals int,
privKey crypto.PrivKey,
ethereumClient *ethclient.EthereumClient,
miner *Miner,
bc *blockchain.BlockChain,
bp *pool.BlockPool,
) *PBFTConsensusManager {
pcm := &PBFTConsensusManager{}
pcm.psb = psb
pcm.miner = miner
@ -80,6 +93,7 @@ func NewPBFTConsensusManager(bus EventBus.Bus, psb *pubsub.PubSubRouter, minAppr
status: StateStatusUnknown,
}
pcm.bus = bus
pcm.blockPool = bp
pcm.psb.Hook(pubsub.PrePrepareMessageType, pcm.handlePrePrepare, types.PrePrepareMessage{})
pcm.psb.Hook(pubsub.PrepareMessageType, pcm.handlePrepare, types.PrepareMessage{})
pcm.psb.Hook(pubsub.CommitMessageType, pcm.handleCommit, types.CommitMessage{})
@ -237,7 +251,11 @@ func (pcm *PBFTConsensusManager) NewDrandRound(from phony.Actor, res client.Resu
defer pcm.state.mutex.Unlock()
block, err := pcm.commitAcceptedBlocks()
if err != nil {
logrus.Errorf("Failed to select the block in consensus round %d: %s", pcm.state.blockHeight, err.Error())
if errors.Is(err, ErrNoAcceptedBlocks) {
logrus.Warnf("No accepted blocks for consensus round %d", pcm.state.blockHeight)
} else {
logrus.Errorf("Failed to select the block in consensus round %d: %s", pcm.state.blockHeight, err.Error())
}
return
}
@ -245,7 +263,7 @@ func (pcm *PBFTConsensusManager) NewDrandRound(from phony.Actor, res client.Resu
// then post dione tasks to target chains (currently, only Ethereum)
if block.Header.Proposer == pcm.miner.address {
for _, v := range block.Data {
var task *types2.DioneTask
var task types2.DioneTask
err := cbor.Unmarshal(v.Data, &task)
if err != nil {
logrus.Errorf("Failed to unmarshal transaction %x payload: %s", v.Hash, err.Error())
@ -291,7 +309,7 @@ func (pcm *PBFTConsensusManager) NewDrandRound(from phony.Actor, res client.Resu
func (pcm *PBFTConsensusManager) commitAcceptedBlocks() (*types3.Block, error) {
blocks := pcm.blockPool.GetAllAcceptedBlocks()
if blocks == nil {
return nil, errors.New("there is no accepted blocks")
return nil, ErrNoAcceptedBlocks
}
var maxStake *big.Int
var selectedBlock *types3.Block

View File

@ -18,39 +18,36 @@ import (
)
type NetworkService struct {
blockpool *blockchain.BlockChain
mempool *pool.Mempool
rpcClient *gorpc.Client
blockchain *blockchain.BlockChain
mempool *pool.Mempool
rpcClient *gorpc.Client
}
func NewNetworkService(bp *blockchain.BlockChain) *NetworkService {
func NewNetworkService(bc *blockchain.BlockChain, mp *pool.Mempool) *NetworkService {
return &NetworkService{
blockpool: bp,
blockchain: bc,
mempool: mp,
}
}
func (s *NetworkService) LastBlockHeight(ctx context.Context, arg struct{}, reply *wire.LastBlockHeightReply) {
height, err := s.blockpool.GetLatestBlockHeight()
func (s *NetworkService) LastBlockHeight(ctx context.Context, arg struct{}, reply *wire.LastBlockHeightReply) error {
height, err := s.blockchain.GetLatestBlockHeight()
if err != nil {
reply.Error = err
return
return err
}
reply.Height = height
return nil
}
func (s *NetworkService) GetRangeOfBlocks(ctx context.Context, arg wire.GetRangeOfBlocksArg, reply *wire.GetRangeOfBlocksReply) {
func (s *NetworkService) GetRangeOfBlocks(ctx context.Context, arg wire.GetRangeOfBlocksArg, reply *wire.GetRangeOfBlocksReply) error {
if arg.From > arg.To {
errText := "incorrect arguments: from > to"
reply.Error = &errText
return
return fmt.Errorf("incorrect arguments: from > to")
}
if arg.To-arg.From > policy.MaxBlockCountForRetrieving {
errText := "incorrect arguments: count of block for retrieving is exceeded the limit"
reply.Error = &errText
return
return fmt.Errorf("incorrect arguments: count of block for retrieving is exceeded the limit")
}
for i := arg.From; i <= arg.To; i++ {
block, err := s.blockpool.FetchBlockByHeight(i)
block, err := s.blockchain.FetchBlockByHeight(i)
if err != nil {
logrus.Warnf("failed to retrieve block from blockpool with height %d", i)
reply.FailedBlockHeights = append(reply.FailedBlockHeights, i)
@ -58,9 +55,10 @@ func (s *NetworkService) GetRangeOfBlocks(ctx context.Context, arg wire.GetRange
}
reply.Blocks = append(reply.Blocks, *block)
}
return nil
}
func (s *NetworkService) Mempool(ctx context.Context, arg struct{}, reply *wire.InvMessage) {
func (s *NetworkService) Mempool(ctx context.Context, arg struct{}, reply *wire.InvMessage) error {
txs := s.mempool.GetAllTransactions()
// extract hashes of txs
@ -70,14 +68,15 @@ func (s *NetworkService) Mempool(ctx context.Context, arg struct{}, reply *wire.
Hash: v.Hash,
})
}
return nil
}
func (s *NetworkService) GetMempoolTxs(ctx context.Context, arg wire.GetMempoolTxsArg, reply *wire.GetMempoolTxsReply) {
func (s *NetworkService) GetMempoolTxs(ctx context.Context, arg wire.GetMempoolTxsArg, reply *wire.GetMempoolTxsReply) error {
if len(arg.Items) > policy.MaxTransactionCountForRetrieving {
pid, _ := gorpc.GetRequestSender(ctx)
logrus.Warnf("Max tx count limit exceeded for GetMempoolTxs request of node %s", pid)
reply.Error = fmt.Errorf("max tx count limit exceeded")
return
return fmt.Errorf("max tx count limit exceeded")
}
for _, v := range arg.Items {
@ -86,10 +85,11 @@ func (s *NetworkService) GetMempoolTxs(ctx context.Context, arg wire.GetMempoolT
if errors.Is(err, pool.ErrTxNotFound) {
reply.NotFoundTxs = append(reply.NotFoundTxs, v)
} else {
reply.Error = err
return
return err
}
}
reply.Transactions = append(reply.Transactions, *tx)
}
return nil
}

View File

@ -9,6 +9,8 @@ import (
"os"
"time"
"github.com/Secured-Finance/dione/blockchain"
"github.com/multiformats/go-multiaddr"
"github.com/asaskevich/EventBus"
@ -17,8 +19,6 @@ import (
"github.com/Secured-Finance/dione/types"
"github.com/Secured-Finance/dione/blockchain"
types2 "github.com/Secured-Finance/dione/blockchain/types"
gorpc "github.com/libp2p/go-libp2p-gorpc"
@ -66,8 +66,9 @@ type Node struct {
Miner *consensus.Miner
Beacon beacon.BeaconNetworks
DisputeManager *consensus.DisputeManager
BlockPool *blockchain.BlockChain
BlockPool *pool.BlockPool
MemPool *pool.Mempool
BlockChain *blockchain.BlockChain
SyncManager sync.SyncManager
NetworkService *NetworkService
NetworkRPCHost *gorpc.Server
@ -138,7 +139,7 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim
if err != nil {
logrus.Fatalf("Failed to initialize blockpool: %s", err.Error())
}
n.BlockPool = bc
n.BlockChain = bc
logrus.Info("Block pool database has been successfully initialized!")
// initialize mempool
@ -149,7 +150,14 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim
n.MemPool = mp
logrus.Info("Mempool has been successfully initialized!")
ns := provideNetworkService(bc)
bp, err := provideBlockPool(mp)
if err != nil {
logrus.Fatalf("Failed to initialize blockpool: %s", err.Error())
}
n.BlockPool = bp
logrus.Info("Blockpool has been successfully initialized!")
ns := provideNetworkService(bc, mp)
n.NetworkService = ns
rpcHost := provideNetworkRPCHost(lhost)
err = rpcHost.Register(ns)
@ -182,7 +190,7 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim
logrus.Info("Mining subsystem has been initialized!")
// initialize consensus subsystem
consensusManager := provideConsensusManager(bus, psb, miner, bc, ethClient, prvKey, n.Config.ConsensusMinApprovals)
consensusManager := provideConsensusManager(bus, psb, miner, bc, ethClient, prvKey, n.Config.ConsensusMinApprovals, bp)
n.ConsensusManager = consensusManager
logrus.Info("Consensus subsystem has been initialized!")

View File

@ -105,8 +105,26 @@ func providePubsubRouter(lhost host.Host, config *config.Config) *pubsub.PubSubR
return pubsub.NewPubSubRouter(lhost, config.PubSub.ServiceTopicName, config.IsBootstrap)
}
func provideConsensusManager(bus EventBus.Bus, psb *pubsub.PubSubRouter, miner *consensus.Miner, bc *blockchain.BlockChain, ethClient *ethclient.EthereumClient, privateKey crypto.PrivKey, minApprovals int) *consensus.PBFTConsensusManager {
return consensus.NewPBFTConsensusManager(bus, psb, minApprovals, privateKey, ethClient, miner, bc)
func provideConsensusManager(
bus EventBus.Bus,
psb *pubsub.PubSubRouter,
miner *consensus.Miner,
bc *blockchain.BlockChain,
ethClient *ethclient.EthereumClient,
privateKey crypto.PrivKey,
minApprovals int,
bp *pool.BlockPool,
) *consensus.PBFTConsensusManager {
return consensus.NewPBFTConsensusManager(
bus,
psb,
minApprovals,
privateKey,
ethClient,
miner,
bc,
bp,
)
}
func provideLibp2pHost(config *config.Config, privateKey crypto.PrivKey) (host.Host, error) {
@ -181,6 +199,10 @@ func provideP2PRPCClient(h host.Host) *gorpc.Client {
return gorpc.NewClient(h, DioneProtocolID)
}
func provideNetworkService(bp *blockchain.BlockChain) *NetworkService {
return NewNetworkService(bp)
func provideNetworkService(bp *blockchain.BlockChain, mp *pool.Mempool) *NetworkService {
return NewNetworkService(bp, mp)
}
func provideBlockPool(mp *pool.Mempool) (*pool.BlockPool, error) {
return pool.NewBlockPool(mp)
}

View File

@ -9,5 +9,4 @@ type GetMempoolTxsArg struct {
type GetMempoolTxsReply struct {
Transactions []types.Transaction
NotFoundTxs [][]byte
Error error
}

View File

@ -10,5 +10,4 @@ type GetRangeOfBlocksArg struct {
type GetRangeOfBlocksReply struct {
Blocks []types.Block
FailedBlockHeights []uint64 // list of block heights the node was unable to retrieve
Error *string
}

View File

@ -2,5 +2,4 @@ package wire
type LastBlockHeightReply struct {
Height uint64
Error error
}

View File

@ -34,6 +34,7 @@ func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubR
context: ctx,
contextCancel: ctxCancel,
handlers: make(map[PubSubMessageType][]Handler),
typeMapping: map[PubSubMessageType]interface{}{},
}
var pbOptions []pubsub.Option