Fix ordering of pubsub subsystem initialization

This commit is contained in:
ChronosX88 2020-12-07 19:22:47 +04:00
parent 2278277f76
commit 56381b08bf
Signed by: ChronosXYZ
GPG Key ID: 085A69A82C8C511A
2 changed files with 60 additions and 21 deletions

View File

@ -9,6 +9,8 @@ import (
"os" "os"
"time" "time"
pex "github.com/Secured-Finance/go-libp2p-pex"
"github.com/Secured-Finance/dione/cache" "github.com/Secured-Finance/dione/cache"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
@ -35,8 +37,6 @@ import (
"github.com/Secured-Finance/dione/beacon" "github.com/Secured-Finance/dione/beacon"
pex "github.com/Secured-Finance/go-libp2p-pex"
"github.com/Secured-Finance/dione/config" "github.com/Secured-Finance/dione/config"
"github.com/Secured-Finance/dione/consensus" "github.com/Secured-Finance/dione/consensus"
"github.com/Secured-Finance/dione/ethclient" "github.com/Secured-Finance/dione/ethclient"
@ -72,47 +72,63 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim
Config: config, Config: config,
} }
lhost, pDiscovery, err := provideLibp2pNode(n.Config, prvKey, pexDiscoveryUpdateTime) // initialize libp2p host
lhost, err := provideLibp2pHost(n.Config, prvKey, pexDiscoveryUpdateTime)
if err != nil { if err != nil {
logrus.Fatal(err) logrus.Fatal(err)
} }
n.Host = lhost n.Host = lhost
n.PeerDiscovery = pDiscovery
// initialize ethereum client
ethClient, err := provideEthereumClient(n.Config) ethClient, err := provideEthereumClient(n.Config)
if err != nil { if err != nil {
logrus.Fatal(err) logrus.Fatal(err)
} }
n.Ethereum = ethClient n.Ethereum = ethClient
// initialize blockchain rpc clients
err = n.setupRPCClients() err = n.setupRPCClients()
if err != nil { if err != nil {
logrus.Fatal(err) logrus.Fatal(err)
} }
// initialize pubsub subsystem
psb := providePubsubRouter(lhost, n.Config) psb := providePubsubRouter(lhost, n.Config)
n.PubSubRouter = psb n.PubSubRouter = psb
// initialize peer discovery
peerDiscovery, err := providePeerDiscovery(n.Config, lhost, pexDiscoveryUpdateTime)
if err != nil {
logrus.Fatal(err)
}
n.PeerDiscovery = peerDiscovery
// get private key of libp2p host
rawPrivKey, err := prvKey.Raw() rawPrivKey, err := prvKey.Raw()
if err != nil { if err != nil {
logrus.Fatal(err) logrus.Fatal(err)
} }
beacon, err := provideBeacon(psb.Pubsub) // initialize random beacon network subsystem
randomBeaconNetwork, err := provideBeacon(psb.Pubsub)
if err != nil { if err != nil {
logrus.Fatal(err) logrus.Fatal(err)
} }
n.Beacon = beacon n.Beacon = randomBeaconNetwork
// initialize mining subsystem
miner := provideMiner(n.Host.ID(), *n.Ethereum.GetEthAddress(), n.Beacon, n.Ethereum, rawPrivKey) miner := provideMiner(n.Host.ID(), *n.Ethereum.GetEthAddress(), n.Beacon, n.Ethereum, rawPrivKey)
n.Miner = miner n.Miner = miner
// initialize event log cache subsystem
eventLogCache := provideEventLogCache() eventLogCache := provideEventLogCache()
n.EventLogCache = eventLogCache n.EventLogCache = eventLogCache
// initialize consensus subsystem
cManager := provideConsensusManager(psb, miner, ethClient, rawPrivKey, n.Config.ConsensusMinApprovals, eventLogCache) cManager := provideConsensusManager(psb, miner, ethClient, rawPrivKey, n.Config.ConsensusMinApprovals, eventLogCache)
n.ConsensusManager = cManager n.ConsensusManager = cManager
// initialize internal eth wallet
wallet, err := provideWallet(n.Host.ID(), rawPrivKey) wallet, err := provideWallet(n.Host.ID(), rawPrivKey)
if err != nil { if err != nil {
logrus.Fatal(err) logrus.Fatal(err)
@ -286,17 +302,17 @@ func (n *Node) setupRPCClients() error {
} }
func providePubsubRouter(lhost host.Host, config *config.Config) *pubsub2.PubSubRouter { func providePubsubRouter(lhost host.Host, config *config.Config) *pubsub2.PubSubRouter {
return pubsub2.NewPubSubRouter(lhost, config.PubSub.ServiceTopicName) return pubsub2.NewPubSubRouter(lhost, config.PubSub.ServiceTopicName, config.IsBootstrap)
} }
func provideConsensusManager(psb *pubsub2.PubSubRouter, miner *consensus.Miner, ethClient *ethclient.EthereumClient, privateKey []byte, minApprovals int, evc *cache.EventLogCache) *consensus.PBFTConsensusManager { func provideConsensusManager(psb *pubsub2.PubSubRouter, miner *consensus.Miner, ethClient *ethclient.EthereumClient, privateKey []byte, minApprovals int, evc *cache.EventLogCache) *consensus.PBFTConsensusManager {
return consensus.NewPBFTConsensusManager(psb, minApprovals, privateKey, ethClient, miner, evc) return consensus.NewPBFTConsensusManager(psb, minApprovals, privateKey, ethClient, miner, evc)
} }
func provideLibp2pNode(config *config.Config, privateKey crypto.PrivKey, pexDiscoveryUpdateTime time.Duration) (host.Host, *pex.PEXDiscovery, error) { func provideLibp2pHost(config *config.Config, privateKey crypto.PrivKey, pexDiscoveryUpdateTime time.Duration) (host.Host, error) {
listenMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", config.ListenAddr, config.ListenPort)) listenMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", config.ListenAddr, config.ListenPort))
if err != nil { if err != nil {
return nil, nil, xerrors.Errorf("failed to parse multiaddress: %v", err) return nil, xerrors.Errorf("failed to parse multiaddress: %v", err)
} }
host, err := libp2p.New( host, err := libp2p.New(
context.TODO(), context.TODO(),
@ -304,14 +320,18 @@ func provideLibp2pNode(config *config.Config, privateKey crypto.PrivKey, pexDisc
libp2p.Identity(privateKey), libp2p.Identity(privateKey),
) )
if err != nil { if err != nil {
return nil, nil, xerrors.Errorf("failed to setup libp2p host: %v", err) return nil, xerrors.Errorf("failed to setup libp2p host: %v", err)
} }
return host, nil
}
func providePeerDiscovery(config *config.Config, h host.Host, pexDiscoveryUpdateTime time.Duration) (discovery.Discovery, error) {
var bootstrapMaddrs []multiaddr.Multiaddr var bootstrapMaddrs []multiaddr.Multiaddr
for _, a := range config.BootstrapNodes { for _, a := range config.BootstrapNodes {
maddr, err := multiaddr.NewMultiaddr(a) maddr, err := multiaddr.NewMultiaddr(a)
if err != nil { if err != nil {
return nil, nil, xerrors.Errorf("invalid multiaddress of bootstrap node: %v", err) return nil, xerrors.Errorf("invalid multiaddress of bootstrap node: %v", err)
} }
bootstrapMaddrs = append(bootstrapMaddrs, maddr) bootstrapMaddrs = append(bootstrapMaddrs, maddr)
} }
@ -319,12 +339,13 @@ func provideLibp2pNode(config *config.Config, privateKey crypto.PrivKey, pexDisc
if config.IsBootstrap { if config.IsBootstrap {
bootstrapMaddrs = nil bootstrapMaddrs = nil
} }
discovery, err := pex.NewPEXDiscovery(host, bootstrapMaddrs, pexDiscoveryUpdateTime)
pexDiscovery, err := pex.NewPEXDiscovery(h, bootstrapMaddrs, pexDiscoveryUpdateTime)
if err != nil { if err != nil {
return nil, nil, xerrors.Errorf("failed to setup pex discovery: %v", err) return nil, xerrors.Errorf("failed to setup pex pexDiscovery: %v", err)
} }
return host, discovery, nil return pexDiscovery, nil
} }
func Start() { func Start() {
@ -375,6 +396,8 @@ func Start() {
logrus.SetLevel(logrus.DebugLevel) logrus.SetLevel(logrus.DebugLevel)
} }
//log.SetDebugLogging()
//ctx, ctxCancel := context.WithCancel(context.Background()) //ctx, ctxCancel := context.WithCancel(context.Background())
//node.GlobalCtx = ctx //node.GlobalCtx = ctx
//node.GlobalCtxCancel = ctxCancel //node.GlobalCtxCancel = ctxCancel

View File

@ -2,6 +2,7 @@ package pubsub
import ( import (
"context" "context"
"time"
"github.com/fxamacker/cbor/v2" "github.com/fxamacker/cbor/v2"
@ -24,7 +25,7 @@ type PubSubRouter struct {
oracleTopic *pubsub.Topic oracleTopic *pubsub.Topic
} }
func NewPubSubRouter(h host.Host, oracleTopic string) *PubSubRouter { func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubRouter {
ctx, ctxCancel := context.WithCancel(context.Background()) ctx, ctxCancel := context.WithCancel(context.Background())
psr := &PubSubRouter{ psr := &PubSubRouter{
@ -34,21 +35,36 @@ func NewPubSubRouter(h host.Host, oracleTopic string) *PubSubRouter {
handlers: make(map[types.MessageType][]Handler), handlers: make(map[types.MessageType][]Handler),
} }
var pbOptions []pubsub.Option
if isBootstrap {
// turn off the mesh in bootstrappers -- only do gossip and PX
pubsub.GossipSubD = 0
pubsub.GossipSubDscore = 0
pubsub.GossipSubDlo = 0
pubsub.GossipSubDhi = 0
pubsub.GossipSubDout = 0
pubsub.GossipSubDlazy = 64
pubsub.GossipSubGossipFactor = 0.25
pubsub.GossipSubPruneBackoff = 5 * time.Minute
// turn on PX
pbOptions = append(pbOptions, pubsub.WithPeerExchange(true))
}
pb, err := pubsub.NewGossipSub( pb, err := pubsub.NewGossipSub(
context.TODO(), context.TODO(),
psr.node, psr.node,
//pubsub.WithMessageSigning(true), pbOptions...,
//pubsub.WithStrictSignatureVerification(true),
pubsub.WithPeerExchange(true),
) )
if err != nil { if err != nil {
logrus.Fatal("Error occurred when create PubSub", err) logrus.Fatalf("Error occurred when initializing PubSub subsystem: %v", err)
} }
psr.oracleTopicName = oracleTopic psr.oracleTopicName = oracleTopic
topic, err := pb.Join(oracleTopic) topic, err := pb.Join(oracleTopic)
if err != nil { if err != nil {
logrus.Fatal("Error occurred when subscribing to service topic", err) logrus.Fatalf("Error occurred when subscribing to service topic: %v", err)
} }
subscription, err := topic.Subscribe() subscription, err := topic.Subscribe()
@ -65,7 +81,7 @@ func NewPubSubRouter(h host.Host, oracleTopic string) *PubSubRouter {
{ {
msg, err := subscription.Next(psr.context) msg, err := subscription.Next(psr.context)
if err != nil { if err != nil {
logrus.Warn("Failed to receive pubsub message: ", err.Error()) logrus.Warnf("Failed to receive pubsub message: %v", err)
} }
psr.handleMessage(msg) psr.handleMessage(msg)
} }