From 56381b08bf5858292b671e7c5067c51caeae6477 Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Mon, 7 Dec 2020 19:22:47 +0400 Subject: [PATCH] Fix ordering of pubsub subsystem initialization --- node/node.go | 51 ++++++++++++++++++++++++++++++----------- pubsub/pubsub_router.go | 30 ++++++++++++++++++------ 2 files changed, 60 insertions(+), 21 deletions(-) diff --git a/node/node.go b/node/node.go index 0eb3e86..1d48989 100644 --- a/node/node.go +++ b/node/node.go @@ -9,6 +9,8 @@ import ( "os" "time" + pex "github.com/Secured-Finance/go-libp2p-pex" + "github.com/Secured-Finance/dione/cache" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -35,8 +37,6 @@ import ( "github.com/Secured-Finance/dione/beacon" - pex "github.com/Secured-Finance/go-libp2p-pex" - "github.com/Secured-Finance/dione/config" "github.com/Secured-Finance/dione/consensus" "github.com/Secured-Finance/dione/ethclient" @@ -72,47 +72,63 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim Config: config, } - lhost, pDiscovery, err := provideLibp2pNode(n.Config, prvKey, pexDiscoveryUpdateTime) + // initialize libp2p host + lhost, err := provideLibp2pHost(n.Config, prvKey, pexDiscoveryUpdateTime) if err != nil { logrus.Fatal(err) } n.Host = lhost - n.PeerDiscovery = pDiscovery + // initialize ethereum client ethClient, err := provideEthereumClient(n.Config) if err != nil { logrus.Fatal(err) } n.Ethereum = ethClient + // initialize blockchain rpc clients err = n.setupRPCClients() if err != nil { logrus.Fatal(err) } + // initialize pubsub subsystem psb := providePubsubRouter(lhost, n.Config) n.PubSubRouter = psb + // initialize peer discovery + peerDiscovery, err := providePeerDiscovery(n.Config, lhost, pexDiscoveryUpdateTime) + if err != nil { + logrus.Fatal(err) + } + n.PeerDiscovery = peerDiscovery + + // get private key of libp2p host rawPrivKey, err := prvKey.Raw() if err != nil { logrus.Fatal(err) } - beacon, err := provideBeacon(psb.Pubsub) + // initialize random beacon network subsystem + randomBeaconNetwork, err := provideBeacon(psb.Pubsub) if err != nil { logrus.Fatal(err) } - n.Beacon = beacon + n.Beacon = randomBeaconNetwork + // initialize mining subsystem miner := provideMiner(n.Host.ID(), *n.Ethereum.GetEthAddress(), n.Beacon, n.Ethereum, rawPrivKey) n.Miner = miner + // initialize event log cache subsystem eventLogCache := provideEventLogCache() n.EventLogCache = eventLogCache + // initialize consensus subsystem cManager := provideConsensusManager(psb, miner, ethClient, rawPrivKey, n.Config.ConsensusMinApprovals, eventLogCache) n.ConsensusManager = cManager + // initialize internal eth wallet wallet, err := provideWallet(n.Host.ID(), rawPrivKey) if err != nil { logrus.Fatal(err) @@ -286,17 +302,17 @@ func (n *Node) setupRPCClients() error { } func providePubsubRouter(lhost host.Host, config *config.Config) *pubsub2.PubSubRouter { - return pubsub2.NewPubSubRouter(lhost, config.PubSub.ServiceTopicName) + return pubsub2.NewPubSubRouter(lhost, config.PubSub.ServiceTopicName, config.IsBootstrap) } func provideConsensusManager(psb *pubsub2.PubSubRouter, miner *consensus.Miner, ethClient *ethclient.EthereumClient, privateKey []byte, minApprovals int, evc *cache.EventLogCache) *consensus.PBFTConsensusManager { return consensus.NewPBFTConsensusManager(psb, minApprovals, privateKey, ethClient, miner, evc) } -func provideLibp2pNode(config *config.Config, privateKey crypto.PrivKey, pexDiscoveryUpdateTime time.Duration) (host.Host, *pex.PEXDiscovery, error) { +func provideLibp2pHost(config *config.Config, privateKey crypto.PrivKey, pexDiscoveryUpdateTime time.Duration) (host.Host, error) { listenMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", config.ListenAddr, config.ListenPort)) if err != nil { - return nil, nil, xerrors.Errorf("failed to parse multiaddress: %v", err) + return nil, xerrors.Errorf("failed to parse multiaddress: %v", err) } host, err := libp2p.New( context.TODO(), @@ -304,14 +320,18 @@ func provideLibp2pNode(config *config.Config, privateKey crypto.PrivKey, pexDisc libp2p.Identity(privateKey), ) if err != nil { - return nil, nil, xerrors.Errorf("failed to setup libp2p host: %v", err) + return nil, xerrors.Errorf("failed to setup libp2p host: %v", err) } + return host, nil +} + +func providePeerDiscovery(config *config.Config, h host.Host, pexDiscoveryUpdateTime time.Duration) (discovery.Discovery, error) { var bootstrapMaddrs []multiaddr.Multiaddr for _, a := range config.BootstrapNodes { maddr, err := multiaddr.NewMultiaddr(a) if err != nil { - return nil, nil, xerrors.Errorf("invalid multiaddress of bootstrap node: %v", err) + return nil, xerrors.Errorf("invalid multiaddress of bootstrap node: %v", err) } bootstrapMaddrs = append(bootstrapMaddrs, maddr) } @@ -319,12 +339,13 @@ func provideLibp2pNode(config *config.Config, privateKey crypto.PrivKey, pexDisc if config.IsBootstrap { bootstrapMaddrs = nil } - discovery, err := pex.NewPEXDiscovery(host, bootstrapMaddrs, pexDiscoveryUpdateTime) + + pexDiscovery, err := pex.NewPEXDiscovery(h, bootstrapMaddrs, pexDiscoveryUpdateTime) if err != nil { - return nil, nil, xerrors.Errorf("failed to setup pex discovery: %v", err) + return nil, xerrors.Errorf("failed to setup pex pexDiscovery: %v", err) } - return host, discovery, nil + return pexDiscovery, nil } func Start() { @@ -375,6 +396,8 @@ func Start() { logrus.SetLevel(logrus.DebugLevel) } + //log.SetDebugLogging() + //ctx, ctxCancel := context.WithCancel(context.Background()) //node.GlobalCtx = ctx //node.GlobalCtxCancel = ctxCancel diff --git a/pubsub/pubsub_router.go b/pubsub/pubsub_router.go index 103da37..3d1e7f3 100644 --- a/pubsub/pubsub_router.go +++ b/pubsub/pubsub_router.go @@ -2,6 +2,7 @@ package pubsub import ( "context" + "time" "github.com/fxamacker/cbor/v2" @@ -24,7 +25,7 @@ type PubSubRouter struct { oracleTopic *pubsub.Topic } -func NewPubSubRouter(h host.Host, oracleTopic string) *PubSubRouter { +func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubRouter { ctx, ctxCancel := context.WithCancel(context.Background()) psr := &PubSubRouter{ @@ -34,21 +35,36 @@ func NewPubSubRouter(h host.Host, oracleTopic string) *PubSubRouter { handlers: make(map[types.MessageType][]Handler), } + var pbOptions []pubsub.Option + + if isBootstrap { + // turn off the mesh in bootstrappers -- only do gossip and PX + pubsub.GossipSubD = 0 + pubsub.GossipSubDscore = 0 + pubsub.GossipSubDlo = 0 + pubsub.GossipSubDhi = 0 + pubsub.GossipSubDout = 0 + pubsub.GossipSubDlazy = 64 + pubsub.GossipSubGossipFactor = 0.25 + pubsub.GossipSubPruneBackoff = 5 * time.Minute + // turn on PX + pbOptions = append(pbOptions, pubsub.WithPeerExchange(true)) + } + pb, err := pubsub.NewGossipSub( context.TODO(), psr.node, - //pubsub.WithMessageSigning(true), - //pubsub.WithStrictSignatureVerification(true), - pubsub.WithPeerExchange(true), + pbOptions..., ) + if err != nil { - logrus.Fatal("Error occurred when create PubSub", err) + logrus.Fatalf("Error occurred when initializing PubSub subsystem: %v", err) } psr.oracleTopicName = oracleTopic topic, err := pb.Join(oracleTopic) if err != nil { - logrus.Fatal("Error occurred when subscribing to service topic", err) + logrus.Fatalf("Error occurred when subscribing to service topic: %v", err) } subscription, err := topic.Subscribe() @@ -65,7 +81,7 @@ func NewPubSubRouter(h host.Host, oracleTopic string) *PubSubRouter { { msg, err := subscription.Next(psr.context) if err != nil { - logrus.Warn("Failed to receive pubsub message: ", err.Error()) + logrus.Warnf("Failed to receive pubsub message: %v", err) } psr.handleMessage(msg) }