From b9047797cc813a22775fbe321d4231206e847d07 Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Tue, 24 Aug 2021 22:41:40 +0300 Subject: [PATCH] Rename some consensus subsystem components, clean up init code --- beacon/drand/drand.go | 19 +-- blockchain/sync/sync_mgr.go | 24 ++- .../{consensus.go => consensus_handler.go} | 48 +++--- ...sus_round_pool.go => consensus_manager.go} | 19 +-- consensus/dispute_manager.go | 1 - consensus/fx_module.go | 11 ++ node/network_service.go | 6 +- node/node.go | 22 +-- node/node_dep_providers.go | 143 ++++-------------- pubsub/pubsub_router.go | 46 +++--- 10 files changed, 138 insertions(+), 201 deletions(-) rename consensus/{consensus.go => consensus_handler.go} (90%) rename consensus/{consensus_round_pool.go => consensus_manager.go} (83%) create mode 100644 consensus/fx_module.go diff --git a/beacon/drand/drand.go b/beacon/drand/drand.go index 00e1ef6..dc6452a 100644 --- a/beacon/drand/drand.go +++ b/beacon/drand/drand.go @@ -6,9 +6,8 @@ import ( "fmt" "sync" - "github.com/asaskevich/EventBus" - "github.com/Secured-Finance/dione/beacon" + "github.com/asaskevich/EventBus" "github.com/drand/drand/chain" "github.com/drand/drand/client" httpClient "github.com/drand/drand/client/http" @@ -27,10 +26,6 @@ import ( types "github.com/Secured-Finance/dione/types" ) -var log = logrus.WithFields(logrus.Fields{ - "subsystem": "drand", -}) - type DrandBeacon struct { DrandClient client.Client PublicKey kyber.Point @@ -70,13 +65,11 @@ func NewDrandBeacon(ps *pubsub.PubSub, bus EventBus.Bus) (*DrandBeacon, error) { if ps != nil { opts = append(opts, libp2pClient.WithPubsub(ps)) - } else { - log.Info("Initiated drand with PubSub") } drandClient, err := client.Wrap(clients, opts...) if err != nil { - return nil, fmt.Errorf("Couldn't create Drand clients") + logrus.Fatal(fmt.Errorf("cannot create drand client: %w", err)) } db := &DrandBeacon{ @@ -86,6 +79,8 @@ func NewDrandBeacon(ps *pubsub.PubSub, bus EventBus.Bus) (*DrandBeacon, error) { PublicKey: drandChain.PublicKey, } + logrus.Info("DRAND beacon subsystem has been initialized!") + return db, nil } @@ -103,7 +98,7 @@ func (db *DrandBeacon) Run(ctx context.Context) error { func (db *DrandBeacon) getLatestDrandResult() error { latestDround, err := db.DrandClient.Get(context.TODO(), 0) if err != nil { - log.Errorf("failed to get latest drand round: %v", err) + logrus.Errorf("failed to get latest drand round: %v", err) return err } db.cacheValue(newBeaconEntryFromDrandResult(latestDround)) @@ -138,12 +133,12 @@ func (db *DrandBeacon) Entry(ctx context.Context, round uint64) (types.BeaconEnt } start := lib.Clock.Now() - log.Infof("start fetching randomness: round %v", round) + logrus.Infof("start fetching randomness: round %v", round) resp, err := db.DrandClient.Get(ctx, round) if err != nil { return types.BeaconEntry{}, fmt.Errorf("drand failed Get request: %w", err) } - log.Infof("done fetching randomness: round %v, took %v", round, lib.Clock.Since(start)) + logrus.Infof("done fetching randomness: round %v, took %v", round, lib.Clock.Since(start)) return newBeaconEntryFromDrandResult(resp), nil } func (db *DrandBeacon) cacheValue(res types.BeaconEntry) { diff --git a/blockchain/sync/sync_mgr.go b/blockchain/sync/sync_mgr.go index ce9f51e..d07a231 100644 --- a/blockchain/sync/sync_mgr.go +++ b/blockchain/sync/sync_mgr.go @@ -9,6 +9,8 @@ import ( "strings" "sync" + "github.com/multiformats/go-multiaddr" + "github.com/fxamacker/cbor/v2" "github.com/asaskevich/EventBus" @@ -54,8 +56,26 @@ type syncManager struct { bus EventBus.Bus } -func NewSyncManager(bus EventBus.Bus, bc *blockchain.BlockChain, mp *pool.Mempool, p2pRPCClient *gorpc.Client, bootstrapPeer peer.ID, psb *pubsub.PubSubRouter) SyncManager { +func NewSyncManager( + bus EventBus.Bus, + bc *blockchain.BlockChain, + mp *pool.Mempool, + p2pRPCClient *gorpc.Client, + bootstrapAddresses []multiaddr.Multiaddr, + psb *pubsub.PubSubRouter, +) SyncManager { ctx, cancelFunc := context.WithCancel(context.Background()) + + bootstrapPeer := peer.ID("") + + if bootstrapAddresses != nil { + addr, err := peer.AddrInfoFromP2pAddr(bootstrapAddresses[0]) // FIXME + if err != nil { + logrus.Fatal(err) + } + bootstrapPeer = addr.ID + } + sm := &syncManager{ bus: bus, blockpool: bc, @@ -68,6 +88,8 @@ func NewSyncManager(bus EventBus.Bus, bc *blockchain.BlockChain, mp *pool.Mempoo psb: psb, } + logrus.Info("Blockchain sync subsystem has been successfully initialized!") + return sm } diff --git a/consensus/consensus.go b/consensus/consensus_handler.go similarity index 90% rename from consensus/consensus.go rename to consensus/consensus_handler.go index 1490604..66d975e 100644 --- a/consensus/consensus.go +++ b/consensus/consensus_handler.go @@ -8,6 +8,8 @@ import ( "sort" "time" + "github.com/libp2p/go-libp2p-core/host" + drand2 "github.com/Secured-Finance/dione/beacon/drand" "github.com/libp2p/go-libp2p-core/peer" @@ -37,43 +39,43 @@ var ( ErrNoAcceptedBlocks = errors.New("there is no accepted blocks") ) -type PBFTConsensusManager struct { +type ConsensusHandler struct { bus EventBus.Bus psb *pubsub.PubSubRouter privKey crypto.PrivKey validator *ConsensusValidator ethereumClient *ethclient.EthereumClient miner *blockchain.Miner - consensusRoundPool *ConsensusStatePool + consensus *ConsensusManager mempool *pool.Mempool blockchain *blockchain.BlockChain address peer.ID stateChangeChannels map[string]map[State][]chan bool } -func NewPBFTConsensusManager( +func NewConsensusHandler( bus EventBus.Bus, psb *pubsub.PubSubRouter, privKey crypto.PrivKey, ethereumClient *ethclient.EthereumClient, miner *blockchain.Miner, bc *blockchain.BlockChain, - bp *ConsensusStatePool, + bp *ConsensusManager, db *drand2.DrandBeacon, mempool *pool.Mempool, - address peer.ID, -) *PBFTConsensusManager { - pcm := &PBFTConsensusManager{ + h host.Host, +) *ConsensusHandler { + pcm := &ConsensusHandler{ psb: psb, miner: miner, validator: NewConsensusValidator(miner, bc, db), privKey: privKey, ethereumClient: ethereumClient, bus: bus, - consensusRoundPool: bp, + consensus: bp, mempool: mempool, blockchain: bc, - address: address, + address: h.ID(), stateChangeChannels: map[string]map[State][]chan bool{}, } @@ -131,10 +133,12 @@ func NewPBFTConsensusManager( } }, true) + logrus.Info("Consensus handler has been initialized!") + return pcm } -func (pcm *PBFTConsensusManager) propose(blk *types3.Block) error { +func (pcm *ConsensusHandler) propose(blk *types3.Block) error { cmsg := &types.ConsensusMessage{ Type: StateStatusPrePrepared, Block: blk, @@ -148,7 +152,7 @@ func (pcm *PBFTConsensusManager) propose(blk *types3.Block) error { time.Sleep(1 * time.Second) // wait until all nodes will commit previous blocks - if err = pcm.consensusRoundPool.InsertMessageIntoLog(cmsg); err != nil { + if err = pcm.consensus.InsertMessageIntoLog(cmsg); err != nil { return err } @@ -160,7 +164,7 @@ func (pcm *PBFTConsensusManager) propose(blk *types3.Block) error { return nil } -func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.PubSubMessage) { +func (pcm *ConsensusHandler) handlePrePrepare(message *pubsub.PubSubMessage) { var prePrepare types.PrePrepareMessage err := cbor.Unmarshal(message.Payload, &prePrepare) if err != nil { @@ -184,7 +188,7 @@ func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.PubSubMessage) return } - err = pcm.consensusRoundPool.InsertMessageIntoLog(cmsg) + err = pcm.consensus.InsertMessageIntoLog(cmsg) if err != nil { logrus.WithField("err", err.Error()).Warn("Failed to add PREPARE message to log") return @@ -196,7 +200,7 @@ func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.PubSubMessage) }).Debug("Received PREPREPARE message") } -func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) { +func (pcm *ConsensusHandler) handlePrepare(message *pubsub.PubSubMessage) { var prepare types.PrepareMessage err := cbor.Unmarshal(message.Payload, &prepare) if err != nil { @@ -216,7 +220,7 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) { return } - err = pcm.consensusRoundPool.InsertMessageIntoLog(cmsg) + err = pcm.consensus.InsertMessageIntoLog(cmsg) if err != nil { logrus.WithField("err", err.Error()).Warn("Failed to add PREPARE message to log") return @@ -228,7 +232,7 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) { }).Debug("Received PREPARE message") } -func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.PubSubMessage) { +func (pcm *ConsensusHandler) handleCommit(message *pubsub.PubSubMessage) { var commit types.CommitMessage err := cbor.Unmarshal(message.Payload, &commit) if err != nil { @@ -248,7 +252,7 @@ func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.PubSubMessage) { return } - err = pcm.consensusRoundPool.InsertMessageIntoLog(cmsg) + err = pcm.consensus.InsertMessageIntoLog(cmsg) if err != nil { logrus.WithField("err", err.Error()).Warn("Failed to add COMMIT message to log") return @@ -260,7 +264,7 @@ func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.PubSubMessage) { }).Debug("Received COMMIT message") } -func (pcm *PBFTConsensusManager) onNewBeaconEntry(entry types2.BeaconEntry) { +func (pcm *ConsensusHandler) onNewBeaconEntry(entry types2.BeaconEntry) { block, err := pcm.commitAcceptedBlocks() height, _ := pcm.blockchain.GetLatestBlockHeight() if err != nil { @@ -327,7 +331,7 @@ func (pcm *PBFTConsensusManager) onNewBeaconEntry(entry types2.BeaconEntry) { } } -func (pcm *PBFTConsensusManager) submitTasksFromBlock(block *types3.Block) { +func (pcm *ConsensusHandler) submitTasksFromBlock(block *types3.Block) { for _, tx := range block.Data { var task types2.DioneTask err := cbor.Unmarshal(tx.Data, &task) @@ -362,8 +366,8 @@ func (pcm *PBFTConsensusManager) submitTasksFromBlock(block *types3.Block) { } } -func (pcm *PBFTConsensusManager) commitAcceptedBlocks() (*types3.Block, error) { - blocks := pcm.consensusRoundPool.GetAllBlocksWithCommit() +func (pcm *ConsensusHandler) commitAcceptedBlocks() (*types3.Block, error) { + blocks := pcm.consensus.GetAllBlocksWithCommit() if blocks == nil { return nil, ErrNoAcceptedBlocks } @@ -409,7 +413,7 @@ func (pcm *PBFTConsensusManager) commitAcceptedBlocks() (*types3.Block, error) { "height": selectedBlock.Header.Height, "miner": selectedBlock.Header.Proposer.String(), }).Info("Committed new block") - pcm.consensusRoundPool.Prune() + pcm.consensus.Prune() for _, v := range selectedBlock.Data { err := pcm.mempool.DeleteTx(v.Hash) if err != nil { diff --git a/consensus/consensus_round_pool.go b/consensus/consensus_manager.go similarity index 83% rename from consensus/consensus_round_pool.go rename to consensus/consensus_manager.go index 54e172c..4500fd6 100644 --- a/consensus/consensus_round_pool.go +++ b/consensus/consensus_manager.go @@ -5,6 +5,8 @@ import ( "fmt" "sync" + "github.com/Secured-Finance/dione/config" + types2 "github.com/Secured-Finance/dione/consensus/types" "github.com/Secured-Finance/dione/blockchain/pool" @@ -26,8 +28,7 @@ const ( StateStatusCommited ) -// ConsensusStatePool is pool for blocks that isn't not validated or committed yet -type ConsensusStatePool struct { +type ConsensusManager struct { mempool *pool.Mempool consensusInfoMap map[string]*ConsensusInfo mapMutex sync.Mutex @@ -35,12 +36,12 @@ type ConsensusStatePool struct { minApprovals int // FIXME } -func NewConsensusRoundPool(mp *pool.Mempool, bus EventBus.Bus, minApprovals int) (*ConsensusStatePool, error) { - bp := &ConsensusStatePool{ +func NewConsensusManager(mp *pool.Mempool, bus EventBus.Bus, cfg *config.Config) (*ConsensusManager, error) { + bp := &ConsensusManager{ consensusInfoMap: map[string]*ConsensusInfo{}, mempool: mp, bus: bus, - minApprovals: minApprovals, + minApprovals: cfg.ConsensusMinApprovals, } return bp, nil @@ -53,7 +54,7 @@ type ConsensusInfo struct { MessageLog *ConsensusMessageLog } -func (crp *ConsensusStatePool) InsertMessageIntoLog(cmsg *types2.ConsensusMessage) error { +func (crp *ConsensusManager) InsertMessageIntoLog(cmsg *types2.ConsensusMessage) error { crp.mapMutex.Lock() defer crp.mapMutex.Unlock() consensusInfo, ok := crp.consensusInfoMap[hex.EncodeToString(cmsg.Blockhash)] @@ -77,7 +78,7 @@ func (crp *ConsensusStatePool) InsertMessageIntoLog(cmsg *types2.ConsensusMessag return nil } -func (crp *ConsensusStatePool) maybeUpdateConsensusState(ci *ConsensusInfo, cmsg *types2.ConsensusMessage) { +func (crp *ConsensusManager) maybeUpdateConsensusState(ci *ConsensusInfo, cmsg *types2.ConsensusMessage) { if ci.State == StateStatusUnknown && cmsg.Type == types2.ConsensusMessageTypePrePrepare && cmsg.Block != nil { ci.Block = cmsg.Block logrus.WithField("hash", fmt.Sprintf("%x", cmsg.Block.Header.Hash)).Debug("New block discovered") @@ -97,14 +98,14 @@ func (crp *ConsensusStatePool) maybeUpdateConsensusState(ci *ConsensusInfo, cmsg } // Prune cleans known blocks list. It is called when new consensus round starts. -func (crp *ConsensusStatePool) Prune() { +func (crp *ConsensusManager) Prune() { for k := range crp.consensusInfoMap { delete(crp.consensusInfoMap, k) } crp.bus.Publish("blockpool:pruned") } -func (crp *ConsensusStatePool) GetAllBlocksWithCommit() []*ConsensusInfo { +func (crp *ConsensusManager) GetAllBlocksWithCommit() []*ConsensusInfo { crp.mapMutex.Lock() defer crp.mapMutex.Unlock() var consensusInfos []*ConsensusInfo diff --git a/consensus/dispute_manager.go b/consensus/dispute_manager.go index cfbd6c9..9bdfc38 100644 --- a/consensus/dispute_manager.go +++ b/consensus/dispute_manager.go @@ -35,7 +35,6 @@ type DisputeManager struct { ctx context.Context bus EventBus.Bus ethClient *ethclient.EthereumClient - pcm *PBFTConsensusManager voteWindow time.Duration blockchain *blockchain.BlockChain diff --git a/consensus/fx_module.go b/consensus/fx_module.go new file mode 100644 index 0000000..1274bb6 --- /dev/null +++ b/consensus/fx_module.go @@ -0,0 +1,11 @@ +package consensus + +import "go.uber.org/fx" + +var Module = fx.Options( + fx.Provide( + NewConsensusManager, + NewConsensusHandler, + NewDisputeManager, + ), +) diff --git a/node/network_service.go b/node/network_service.go index ff3fc04..fde697c 100644 --- a/node/network_service.go +++ b/node/network_service.go @@ -24,10 +24,14 @@ type NetworkService struct { } func NewNetworkService(bc *blockchain.BlockChain, mp *pool.Mempool) *NetworkService { - return &NetworkService{ + ns := &NetworkService{ blockchain: bc, mempool: mp, } + + logrus.Info("Direct RPC has been successfully initialized!") + + return ns } func (s *NetworkService) LastBlockHeight(ctx context.Context, arg struct{}, reply *wire.LastBlockHeightReply) error { diff --git a/node/node.go b/node/node.go index e6eb22a..9834942 100644 --- a/node/node.go +++ b/node/node.go @@ -4,6 +4,8 @@ import ( "context" "time" + "github.com/asaskevich/EventBus" + "github.com/Secured-Finance/dione/blockchain" drand2 "github.com/Secured-Finance/dione/beacon/drand" @@ -48,7 +50,7 @@ func runNode( h host.Host, mp *pool.Mempool, syncManager sync.SyncManager, - consensusManager *consensus.PBFTConsensusManager, + consensusManager *consensus.ConsensusHandler, pubSubRouter *pubsub.PubSubRouter, disputeManager *consensus.DisputeManager, db *drand2.DrandBeacon, @@ -188,28 +190,28 @@ func subscribeOnEthContractsAsync(ctx context.Context, ethClient *ethclient.Ethe func Start() { fx.New( fx.Provide( - provideEventBus, provideAppFlags, provideConfig, + provideCacheManager, providePrivateKey, provideLibp2pHost, provideEthereumClient, + providePubsub, providePubsubRouter, provideBootstrapAddrs, providePeerDiscovery, - provideDrandBeacon, - provideMempool, + drand2.NewDrandBeacon, + pool.NewMempool, blockchain.NewMiner, provideBlockChain, - provideBlockPool, - provideSyncManager, + sync.NewSyncManager, provideNetworkRPCHost, - provideNetworkService, + NewNetworkService, provideDirectRPCClient, - provideConsensusManager, - consensus.NewDisputeManager, - provideCacheManager, + func() EventBus.Bus { return EventBus.New() }, ), + consensus.Module, + fx.Invoke( configureLogger, configureDirectRPC, diff --git a/node/node_dep_providers.go b/node/node_dep_providers.go index c51a490..133f7d1 100644 --- a/node/node_dep_providers.go +++ b/node/node_dep_providers.go @@ -12,6 +12,7 @@ import ( "runtime" "github.com/Secured-Finance/dione/cache/inmemory" + "github.com/Secured-Finance/dione/cache/redis" types2 "github.com/Secured-Finance/dione/blockchain/types" @@ -33,25 +34,17 @@ import ( gorpc "github.com/libp2p/go-libp2p-gorpc" - "github.com/Secured-Finance/dione/blockchain/sync" - - "github.com/Secured-Finance/dione/blockchain/pool" - "github.com/Secured-Finance/dione/cache" "github.com/Secured-Finance/dione/config" - "github.com/Secured-Finance/dione/consensus" "github.com/Secured-Finance/dione/ethclient" "github.com/Secured-Finance/dione/pubsub" - "github.com/Secured-Finance/dione/types" - "github.com/Secured-Finance/dione/wallet" pex "github.com/Secured-Finance/go-libp2p-pex" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/discovery" "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/peer" + pubsub2 "github.com/libp2p/go-libp2p-pubsub" "github.com/multiformats/go-multiaddr" - "golang.org/x/xerrors" ) const ( @@ -71,31 +64,22 @@ func provideCacheManager(cfg *config.Config) cache.CacheManager { return backend } -func provideDrandBeacon(ps *pubsub.PubSubRouter, bus EventBus.Bus) *drand2.DrandBeacon { - db, err := drand2.NewDrandBeacon(ps.Pubsub, bus) - if err != nil { - logrus.Fatalf("Failed to setup drand beacon: %s", err) - } - logrus.Info("DRAND beacon subsystem has been initialized!") - return db -} - // FIXME: do we really need this? -func provideWallet(peerID peer.ID, privKey []byte) (*wallet.LocalWallet, error) { - // TODO make persistent keystore - kstore := wallet.NewMemKeyStore() - keyInfo := types.KeyInfo{ - Type: types.KTEd25519, - PrivateKey: privKey, - } - - kstore.Put(wallet.KNamePrefix+peerID.String(), keyInfo) - w, err := wallet.NewWallet(kstore) - if err != nil { - return nil, xerrors.Errorf("failed to setup wallet: %w", err) - } - return w, nil -} +//func provideWallet(peerID peer.ID, privKey []byte) (*wallet.LocalWallet, error) { +// // TODO make persistent keystore +// kstore := wallet.NewMemKeyStore() +// keyInfo := types.KeyInfo{ +// Type: types.KTEd25519, +// PrivateKey: privKey, +// } +// +// kstore.Put(wallet.KNamePrefix+peerID.String(), keyInfo) +// w, err := wallet.NewWallet(kstore) +// if err != nil { +// return nil, xerrors.Errorf("failed to setup wallet: %w", err) +// } +// return w, nil +//} func provideEthereumClient(config *config.Config) *ethclient.EthereumClient { ethereum := ethclient.NewEthereumClient() @@ -109,38 +93,17 @@ func provideEthereumClient(config *config.Config) *ethclient.EthereumClient { return ethereum } -func providePubsubRouter(lhost host.Host, config *config.Config) *pubsub.PubSubRouter { - psb := pubsub.NewPubSubRouter(lhost, config.PubSub.ServiceTopicName, config.IsBootstrap) - logrus.Info("PubSub subsystem has been initialized!") - return psb +func providePubsub(h host.Host) (*pubsub2.PubSub, error) { + return pubsub2.NewFloodSub( + context.TODO(), + h, + ) } -func provideConsensusManager( - h host.Host, - bus EventBus.Bus, - psb *pubsub.PubSubRouter, - miner *blockchain.Miner, - bc *blockchain.BlockChain, - ethClient *ethclient.EthereumClient, - privateKey crypto.PrivKey, - bp *consensus.ConsensusStatePool, - db *drand2.DrandBeacon, - mp *pool.Mempool, -) *consensus.PBFTConsensusManager { - c := consensus.NewPBFTConsensusManager( - bus, - psb, - privateKey, - ethClient, - miner, - bc, - bp, - db, - mp, - h.ID(), - ) - logrus.Info("Consensus subsystem has been initialized!") - return c +func providePubsubRouter(h host.Host, ps *pubsub2.PubSub, config *config.Config) *pubsub.PubSubRouter { + psb := pubsub.NewPubSubRouter(h, ps, config.PubSub.ServiceTopicName, config.IsBootstrap) + logrus.Info("PubSub subsystem has been initialized!") + return psb } func provideLibp2pHost(config *config.Config, privateKey crypto.PrivKey) host.Host { @@ -210,64 +173,10 @@ func provideBlockChain(config *config.Config, bus EventBus.Bus, miner *blockchai return bc } -func provideMempool(bus EventBus.Bus) *pool.Mempool { - mp, err := pool.NewMempool(bus) - if err != nil { - logrus.Fatalf("Failed to initialize mempool: %s", err.Error()) - } - - logrus.Info("Mempool has been successfully initialized!") - - return mp -} - -func provideSyncManager( - bus EventBus.Bus, - bp *blockchain.BlockChain, - mp *pool.Mempool, - c *gorpc.Client, - bootstrapAddresses []multiaddr.Multiaddr, - psb *pubsub.PubSubRouter, -) sync.SyncManager { - bootstrapPeerID := peer.ID("") - - if bootstrapAddresses != nil { - addr, err := peer.AddrInfoFromP2pAddr(bootstrapAddresses[0]) // FIXME - if err != nil { - logrus.Fatal(err) - } - bootstrapPeerID = addr.ID - } - - sm := sync.NewSyncManager(bus, bp, mp, c, bootstrapPeerID, psb) - logrus.Info("Blockchain sync subsystem has been successfully initialized!") - - return sm -} - func provideDirectRPCClient(h host.Host) *gorpc.Client { return gorpc.NewClient(h, DioneProtocolID) } -func provideNetworkService(bp *blockchain.BlockChain, mp *pool.Mempool) *NetworkService { - ns := NewNetworkService(bp, mp) - logrus.Info("Direct RPC has been successfully initialized!") - return ns -} - -func provideBlockPool(mp *pool.Mempool, bus EventBus.Bus, config *config.Config) *consensus.ConsensusStatePool { - bp, err := consensus.NewConsensusRoundPool(mp, bus, config.ConsensusMinApprovals) - if err != nil { - logrus.Fatalf("Failed to initialize blockpool: %s", err.Error()) - } - logrus.Info("Consensus state pool has been successfully initialized!") - return bp -} - -func provideEventBus() EventBus.Bus { - return EventBus.New() -} - func provideAppFlags() *AppFlags { var flags AppFlags diff --git a/pubsub/pubsub_router.go b/pubsub/pubsub_router.go index 2ff7fda..da87201 100644 --- a/pubsub/pubsub_router.go +++ b/pubsub/pubsub_router.go @@ -24,7 +24,7 @@ type PubSubRouter struct { type Handler func(message *PubSubMessage) -func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubRouter { +func NewPubSubRouter(h host.Host, ps *pubsub.PubSub, oracleTopic string, isBootstrap bool) *PubSubRouter { ctx, ctxCancel := context.WithCancel(context.Background()) psr := &PubSubRouter{ @@ -35,40 +35,30 @@ func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubR oracleTopicName: oracleTopic, } - var pbOptions []pubsub.Option + //var pbOptions []pubsub.Option + // + //if isBootstrap { + // // turn off the mesh in bootstrappers -- only do gossip and PX + // pubsub.GossipSubD = 0 + // pubsub.GossipSubDscore = 0 + // pubsub.GossipSubDlo = 0 + // pubsub.GossipSubDhi = 0 + // pubsub.GossipSubDout = 0 + // pubsub.GossipSubDlazy = 64 + // pubsub.GossipSubGossipFactor = 0.25 + // pubsub.GossipSubPruneBackoff = 5 * time.Minute + // // turn on PX + // pbOptions = append(pbOptions, pubsub.WithPeerExchange(true)) + //} - if isBootstrap { - // turn off the mesh in bootstrappers -- only do gossip and PX - //pubsub.GossipSubD = 0 - //pubsub.GossipSubDscore = 0 - //pubsub.GossipSubDlo = 0 - //pubsub.GossipSubDhi = 0 - //pubsub.GossipSubDout = 0 - //pubsub.GossipSubDlazy = 64 - //pubsub.GossipSubGossipFactor = 0.25 - //pubsub.GossipSubPruneBackoff = 5 * time.Minute - // turn on PX - //pbOptions = append(pbOptions, pubsub.WithPeerExchange(true)) - } - - pb, err := pubsub.NewFloodSub( - context.TODO(), - psr.node, - pbOptions..., - ) - - if err != nil { - logrus.Fatalf("Error occurred when initializing PubSub subsystem: %v", err) - } - - topic, err := pb.Join(oracleTopic) + topic, err := ps.Join(oracleTopic) if err != nil { logrus.Fatalf("Error occurred when subscribing to service topic: %v", err) } subscription, err := topic.Subscribe() psr.serviceSubscription = subscription - psr.Pubsub = pb + psr.Pubsub = ps psr.oracleTopic = topic return psr