From e55c19a38d7ac75f1fbf48b9d4f4a759f67e347a Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Fri, 27 Nov 2020 22:47:58 +0400 Subject: [PATCH] Change node initialization process: implement manual dependency injection to prevent NPE issues --- cmd/dione/dione.go | 6 +- config/config.go | 3 +- go.mod | 1 + go.sum | 7 + node/beacon.go | 24 --- node/ethereum.go | 41 ----- node/node.go | 367 +++++++++++++++++++++++++++------------------ 7 files changed, 230 insertions(+), 219 deletions(-) delete mode 100644 node/beacon.go delete mode 100644 node/ethereum.go diff --git a/cmd/dione/dione.go b/cmd/dione/dione.go index 80bd689..c875620 100644 --- a/cmd/dione/dione.go +++ b/cmd/dione/dione.go @@ -2,12 +2,8 @@ package main import ( "github.com/Secured-Finance/dione/node" - "github.com/sirupsen/logrus" ) func main() { - err := node.Start() - if err != nil { - logrus.Panic(err) - } + node.Start() } diff --git a/config/config.go b/config/config.go index ddb7c3a..9268820 100644 --- a/config/config.go +++ b/config/config.go @@ -33,7 +33,8 @@ type FilecoinConfig struct { } type PubSubConfig struct { - ProtocolID string `mapstructure:"protocolID"` + ProtocolID string `mapstructure:"protocolID"` + ServiceTopicName string `mapstructure:"serviceTopicName"` } type StoreConfig struct { diff --git a/go.mod b/go.mod index 3c196c6..61c2332 100644 --- a/go.mod +++ b/go.mod @@ -67,6 +67,7 @@ require ( github.com/whyrusleeping/cbor-gen v0.0.0-20200826160007-0b9f6c5fb163 github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208 // indirect github.com/xorcare/golden v0.6.1-0.20191112154924-b87f686d7542 + go.uber.org/fx v1.13.1 // indirect go.uber.org/zap v1.15.0 golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect golang.org/x/tools v0.0.0-20200827010519-17fd2f27a9e3 // indirect diff --git a/go.sum b/go.sum index 7355ee2..9d3fa2f 100644 --- a/go.sum +++ b/go.sum @@ -1135,6 +1135,11 @@ go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/dig v1.10.0 h1:yLmDDj9/zuDjv3gz8GQGviXMs9TfysIUMUilCpgzUJY= +go.uber.org/dig v1.10.0/go.mod h1:X34SnWGr8Fyla9zQNO2GSO2D+TIuqB14OS8JhYocIyw= +go.uber.org/fx v1.13.1 h1:CFNTr1oin5OJ0VCZ8EycL3wzF29Jz2g0xe55RFsf2a4= +go.uber.org/fx v1.13.1/go.mod h1:bREWhavnedxpJeTq9pQT53BbvwhUv7TcpsOqcH4a+3w= +go.uber.org/goleak v0.10.0/go.mod h1:VCZuO8V8mFPlL0F5J5GK1rtHV3DrFcQ1R8ryq7FK0aI= go.uber.org/goleak v1.0.0 h1:qsup4IcBdlmsnGfqyLl4Ntn3C2XCCuKAE7DwHpScyUo= go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= @@ -1350,8 +1355,10 @@ golang.org/x/tools v0.0.0-20190912185636-87d9f09c5d89/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191030062658-86caa796c7ab/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191114200427-caa0b0f7d508/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191130070609-6e064ea0cf2d/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191216052735-49a3e744a425/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= diff --git a/node/beacon.go b/node/beacon.go deleted file mode 100644 index 978f7dd..0000000 --- a/node/beacon.go +++ /dev/null @@ -1,24 +0,0 @@ -package node - -import ( - "fmt" - - "github.com/Secured-Finance/dione/types" - - "github.com/Secured-Finance/dione/beacon" - "github.com/Secured-Finance/dione/config" - "github.com/Secured-Finance/dione/drand" -) - -// NewBeaconClient creates a new beacon chain client -func (n *Node) NewBeaconClient() (beacon.BeaconNetworks, error) { - networks := beacon.BeaconNetworks{} - bc, err := drand.NewDrandBeacon(config.ChainGenesis, config.TaskEpochInterval, n.PubSubRouter.Pubsub) - if err != nil { - return nil, fmt.Errorf("creating drand beacon: %w", err) - } - networks = append(networks, beacon.BeaconNetwork{Start: types.DrandRound(config.ChainGenesis), Beacon: bc}) - // NOTE: currently we use only one network - - return networks, nil -} diff --git a/node/ethereum.go b/node/ethereum.go deleted file mode 100644 index 277586c..0000000 --- a/node/ethereum.go +++ /dev/null @@ -1,41 +0,0 @@ -package node - -import ( - "context" - - "github.com/sirupsen/logrus" -) - -func (n *Node) subscribeOnEthContracts(ctx context.Context) { - eventChan, subscription, err := n.Ethereum.SubscribeOnOracleEvents(ctx) - if err != nil { - logrus.Fatal("Couldn't subscribe on ethereum contracts, exiting... ", err) - } - - go func() { - EventLoop: - for { - select { - case event := <-eventChan: - { - task, err := n.Miner.MineTask(ctx, event) - if err != nil { - logrus.Fatal("Failed to mine task, exiting... ", err) - } - if task == nil { - continue - } - logrus.Infof("Started new consensus round with ID: %s", event.RequestID.String()) - err = n.ConsensusManager.Propose(event.RequestID.String(), *task, event.RequestID, event.CallbackAddress) - if err != nil { - logrus.Errorf("Failed to propose task: %w", err) - } - } - case <-ctx.Done(): - break EventLoop - case <-subscription.Err(): - logrus.Fatal("Error with ethereum subscription, exiting... ", err) - } - } - }() -} diff --git a/node/node.go b/node/node.go index 553a6bb..e8b3c0c 100644 --- a/node/node.go +++ b/node/node.go @@ -9,6 +9,15 @@ import ( "os" "time" + pubsub "github.com/libp2p/go-libp2p-pubsub" + + "github.com/Secured-Finance/dione/drand" + + "github.com/ethereum/go-ethereum/common" + "github.com/libp2p/go-libp2p-core/peer" + + "github.com/libp2p/go-libp2p-core/discovery" + "github.com/Secured-Finance/dione/rpc" rtypes "github.com/Secured-Finance/dione/rpc/types" @@ -29,7 +38,7 @@ import ( "github.com/Secured-Finance/dione/config" "github.com/Secured-Finance/dione/consensus" "github.com/Secured-Finance/dione/ethclient" - "github.com/Secured-Finance/dione/pubsub" + pubsub2 "github.com/Secured-Finance/dione/pubsub" "github.com/libp2p/go-libp2p" crypto "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" @@ -43,10 +52,10 @@ const ( type Node struct { Host host.Host - PubSubRouter *pubsub.PubSubRouter + PeerDiscovery discovery.Discovery + PubSubRouter *pubsub2.PubSubRouter GlobalCtx context.Context GlobalCtxCancel context.CancelFunc - OracleTopic string Config *config.Config Ethereum *ethclient.EthereumClient ConsensusManager *consensus.PBFTConsensusManager @@ -55,168 +64,87 @@ type Node struct { Wallet *wallet.LocalWallet } -func NewNode(configPath string) (*Node, error) { - cfg, err := config.NewConfig(configPath) - if err != nil { - return nil, err - } - node := &Node{ - OracleTopic: "dione", - Config: cfg, +func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTime time.Duration) (*Node, error) { + n := &Node{ + Config: config, } - return node, nil -} - -func (n *Node) setupNode(ctx context.Context, prvKey crypto.PrivKey, pexDiscoveryUpdateTime time.Duration) { - n.setupLibp2pHost(context.TODO(), prvKey, pexDiscoveryUpdateTime) - err := n.setupEthereumClient() + lhost, pDiscovery, err := provideLibp2pNode(n.Config, prvKey, pexDiscoveryUpdateTime) if err != nil { logrus.Fatal(err) } + n.Host = lhost + n.PeerDiscovery = pDiscovery + + ethClient, err := provideEthereumClient(n.Config) + if err != nil { + logrus.Fatal(err) + } + n.Ethereum = ethClient err = n.setupRPCClients() if err != nil { logrus.Fatal(err) } - n.setupPubsub() + psb := providePubsubRouter(lhost, n.Config) + n.PubSubRouter = psb + rawPrivKey, err := prvKey.Raw() if err != nil { logrus.Fatal(err) } - err = n.setupBeacon() - if err != nil { - logrus.Fatal(err) - } - err = n.setupMiner(rawPrivKey) - if err != nil { - logrus.Fatal(err) - } - err = n.setupConsensusManager(rawPrivKey, n.Config.ConsensusMinApprovals) - if err != nil { - logrus.Fatalf("Failed to setup consensus manager: %w", err) - } - err = n.setupWallet(rawPrivKey) - if err != nil { - logrus.Fatal(err) - } - n.subscribeOnEthContracts(ctx) -} -func (n *Node) setupMiner(privKey []byte) error { - n.Miner = consensus.NewMiner(n.Host.ID(), *n.Ethereum.GetEthAddress(), n.Beacon, n.Ethereum, privKey) - return nil -} - -func (n *Node) setupBeacon() error { - beacon, err := n.NewBeaconClient() + beacon, err := provideBeacon(psb.Pubsub) if err != nil { - return xerrors.Errorf("failed to setup beacon: %w", err) + logrus.Fatal(err) } n.Beacon = beacon - return nil -} -func (n *Node) setupWallet(privKey []byte) error { - // TODO make persistent keystore - kstore := wallet.NewMemKeyStore() - keyInfo := types.KeyInfo{ - Type: types.KTEd25519, - PrivateKey: privKey, - } + miner := provideMiner(n.Host.ID(), *n.Ethereum.GetEthAddress(), n.Beacon, n.Ethereum, rawPrivKey) + n.Miner = miner - kstore.Put(wallet.KNamePrefix+n.Host.ID().String(), keyInfo) - w, err := wallet.NewWallet(kstore) + cManager := provideConsensusManager(psb, miner, ethClient, rawPrivKey, n.Config.ConsensusMinApprovals) + n.ConsensusManager = cManager + + wallet, err := provideWallet(n.Host.ID(), rawPrivKey) if err != nil { - return xerrors.Errorf("failed to setup wallet: %w", err) + logrus.Fatal(err) } - n.Wallet = w - return nil + n.Wallet = wallet + + return n, nil } -func (n *Node) setupEthereumClient() error { - ethereum := ethclient.NewEthereumClient() - n.Ethereum = ethereum - return ethereum.Initialize(context.Background(), - n.Config.Ethereum.GatewayAddress, - n.Config.Ethereum.PrivateKey, - n.Config.Ethereum.OracleEmitterContractAddress, - n.Config.Ethereum.AggregatorContractAddress, - n.Config.Ethereum.DioneStakingContractAddress, - ) -} +func (n *Node) Run(ctx context.Context) error { + n.runLibp2pAsync(ctx) + n.subscribeOnEthContractsAsync(ctx) -func (n *Node) setupRPCClients() error { - fc := filecoin.NewLotusClient() - rpc.RegisterRPC(rtypes.RPCTypeFilecoin, map[string]func(string) (string, error){ - "getTransaction": fc.GetTransaction, - }) - - sl := solana2.NewSolanaClient() - rpc.RegisterRPC(rtypes.RPCTypeSolana, map[string]func(string) (string, error){ - "getTransaction": sl.GetTransaction, - }) - - return nil -} - -func (n *Node) setupPubsub() { - n.PubSubRouter = pubsub.NewPubSubRouter(n.Host, n.OracleTopic) - // wait for setting up pubsub - //time.Sleep(3 * time.Second) -} - -func (n *Node) setupConsensusManager(privateKey []byte, minApprovals int) error { - n.ConsensusManager = consensus.NewPBFTConsensusManager(n.PubSubRouter, minApprovals, privateKey, n.Ethereum, n.Miner) - return nil -} - -func (n *Node) setupLibp2pHost(ctx context.Context, privateKey crypto.PrivKey, pexDiscoveryUpdateTime time.Duration) { - listenMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", n.Config.ListenAddr, n.Config.ListenPort)) - if err != nil { - logrus.Fatal("Failed to generate new node multiaddress:", err) - } - host, err := libp2p.New( - ctx, - libp2p.ListenAddrs(listenMultiAddr), - libp2p.Identity(privateKey), - ) - if err != nil { - logrus.Fatal("Failed to set a new libp2p node:", err) - } - n.Host = host - - logrus.Info(fmt.Sprintf("[*] Your Multiaddress Is: /ip4/%s/tcp/%d/p2p/%s", n.Config.ListenAddr, n.Config.ListenPort, host.ID().Pretty())) - - var bootstrapMaddrs []multiaddr.Multiaddr - for _, a := range n.Config.BootstrapNodes { - maddr, err := multiaddr.NewMultiaddr(a) - if err != nil { - logrus.Fatalf("Invalid multiaddress of bootstrap node: %s", err.Error()) + for { + select { + case <-ctx.Done(): + return nil } - bootstrapMaddrs = append(bootstrapMaddrs, maddr) - } - if n.Config.IsBootstrap { - bootstrapMaddrs = nil - } - discovery, err := pex.NewPEXDiscovery(host, bootstrapMaddrs, pexDiscoveryUpdateTime) - if err != nil { - logrus.Fatal("Can't set up PEX discovery protocol, exiting... ", err) } + return nil +} + +func (n *Node) runLibp2pAsync(ctx context.Context) error { + logrus.Info(fmt.Sprintf("[*] Your Multiaddress Is: /ip4/%s/tcp/%d/p2p/%s", n.Config.ListenAddr, n.Config.ListenPort, n.Host.ID().Pretty())) + logrus.Info("Announcing ourselves...") - _, err = discovery.Advertise(context.TODO(), n.Config.Rendezvous) + _, err := n.PeerDiscovery.Advertise(context.TODO(), n.Config.Rendezvous) if err != nil { - logrus.Fatalf("Failed to announce this node to the network: %s", err.Error()) + return xerrors.Errorf("failed to announce this node to the network: %v", err) } logrus.Info("Successfully announced!") // Discover unbounded count of peers logrus.Info("Searching for other peers...") - peerChan, err := discovery.FindPeers(context.TODO(), n.Config.Rendezvous) + peerChan, err := n.PeerDiscovery.FindPeers(context.TODO(), n.Config.Rendezvous) if err != nil { - logrus.Fatal("Failed to find new peers, exiting...", err) + return xerrors.Errorf("failed to find new peers: %v", err) } go func() { MainLoop: @@ -242,30 +170,161 @@ func (n *Node) setupLibp2pHost(ctx context.Context, privateKey crypto.PrivKey, p } } }() + return nil } -func Start() error { +func (n *Node) subscribeOnEthContractsAsync(ctx context.Context) { + eventChan, subscription, err := n.Ethereum.SubscribeOnOracleEvents(ctx) + if err != nil { + logrus.Fatal("Couldn't subscribe on ethereum contracts, exiting... ", err) + } + + go func() { + EventLoop: + for { + select { + case event := <-eventChan: + { + task, err := n.Miner.MineTask(ctx, event) + if err != nil { + logrus.Fatal("Failed to mine task, exiting... ", err) + } + if task == nil { + continue + } + logrus.Infof("Started new consensus round with ID: %s", event.RequestID.String()) + err = n.ConsensusManager.Propose(event.RequestID.String(), *task, event.RequestID, event.CallbackAddress) + if err != nil { + logrus.Errorf("Failed to propose task: %w", err) + } + } + case <-ctx.Done(): + break EventLoop + case <-subscription.Err(): + logrus.Fatal("Error with ethereum subscription, exiting... ", err) + } + } + }() +} + +func provideMiner(peerID peer.ID, ethAddress common.Address, beacon beacon.BeaconNetworks, ethClient *ethclient.EthereumClient, privateKey []byte) *consensus.Miner { + return consensus.NewMiner(peerID, ethAddress, beacon, ethClient, privateKey) +} + +func provideBeacon(ps *pubsub.PubSub) (beacon.BeaconNetworks, error) { + networks := beacon.BeaconNetworks{} + bc, err := drand.NewDrandBeacon(config.ChainGenesis, config.TaskEpochInterval, ps) + if err != nil { + return nil, fmt.Errorf("failed to setup drand beacon: %w", err) + } + networks = append(networks, beacon.BeaconNetwork{Start: types.DrandRound(config.ChainGenesis), Beacon: bc}) + // NOTE: currently we use only one network + return networks, nil +} + +func provideWallet(peerID peer.ID, privKey []byte) (*wallet.LocalWallet, error) { + // TODO make persistent keystore + kstore := wallet.NewMemKeyStore() + keyInfo := types.KeyInfo{ + Type: types.KTEd25519, + PrivateKey: privKey, + } + + kstore.Put(wallet.KNamePrefix+peerID.String(), keyInfo) + w, err := wallet.NewWallet(kstore) + if err != nil { + return nil, xerrors.Errorf("failed to setup wallet: %w", err) + } + return w, nil +} + +func provideEthereumClient(config *config.Config) (*ethclient.EthereumClient, error) { + ethereum := ethclient.NewEthereumClient() + err := ethereum.Initialize(context.Background(), + config.Ethereum.GatewayAddress, + config.Ethereum.PrivateKey, + config.Ethereum.OracleEmitterContractAddress, + config.Ethereum.AggregatorContractAddress, + config.Ethereum.DioneStakingContractAddress, + ) + if err != nil { + return nil, xerrors.Errorf("failed to initialize ethereum client: %v", err) + } + return ethereum, nil +} + +func (n *Node) setupRPCClients() error { + fc := filecoin.NewLotusClient() + rpc.RegisterRPC(rtypes.RPCTypeFilecoin, map[string]func(string) (string, error){ + "getTransaction": fc.GetTransaction, + }) + + sl := solana2.NewSolanaClient() + rpc.RegisterRPC(rtypes.RPCTypeSolana, map[string]func(string) (string, error){ + "getTransaction": sl.GetTransaction, + }) + + return nil +} + +func providePubsubRouter(lhost host.Host, config *config.Config) *pubsub2.PubSubRouter { + return pubsub2.NewPubSubRouter(lhost, config.PubSub.ServiceTopicName) +} + +func provideConsensusManager(psb *pubsub2.PubSubRouter, miner *consensus.Miner, ethClient *ethclient.EthereumClient, privateKey []byte, minApprovals int) *consensus.PBFTConsensusManager { + return consensus.NewPBFTConsensusManager(psb, minApprovals, privateKey, ethClient, miner) +} + +func provideLibp2pNode(config *config.Config, privateKey crypto.PrivKey, pexDiscoveryUpdateTime time.Duration) (host.Host, *pex.PEXDiscovery, error) { + listenMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", config.ListenAddr, config.ListenPort)) + if err != nil { + return nil, nil, xerrors.Errorf("failed to parse multiaddress: %v", err) + } + host, err := libp2p.New( + context.TODO(), + libp2p.ListenAddrs(listenMultiAddr), + libp2p.Identity(privateKey), + ) + if err != nil { + return nil, nil, xerrors.Errorf("failed to setup libp2p host: %v", err) + } + + var bootstrapMaddrs []multiaddr.Multiaddr + for _, a := range config.BootstrapNodes { + maddr, err := multiaddr.NewMultiaddr(a) + if err != nil { + return nil, nil, xerrors.Errorf("invalid multiaddress of bootstrap node: %v", err) + } + bootstrapMaddrs = append(bootstrapMaddrs, maddr) + } + + if config.IsBootstrap { + bootstrapMaddrs = nil + } + discovery, err := pex.NewPEXDiscovery(host, bootstrapMaddrs, pexDiscoveryUpdateTime) + if err != nil { + return nil, nil, xerrors.Errorf("failed to setup pex discovery: %v", err) + } + + return host, discovery, nil +} + +func Start() { configPath := flag.String("config", "", "Path to config") verbose := flag.Bool("verbose", false, "Verbose logging") flag.Parse() if *configPath == "" { - return fmt.Errorf("no config path provided") - } - - node, err := NewNode(*configPath) - if *verbose { - logrus.SetLevel(logrus.DebugLevel) - } else { - logrus.SetLevel(logrus.DebugLevel) + logrus.Fatal("no config path provided") } + cfg, err := config.NewConfig(*configPath) if err != nil { - logrus.Panic(err) + logrus.Fatalf("failed to load config: %v", err) } var privateKey crypto.PrivKey - if node.Config.IsBootstrap { + if cfg.IsBootstrap { if _, err := os.Stat(".bootstrap_privkey"); os.IsNotExist(err) { privateKey, err = generatePrivateKey() if err != nil { @@ -281,18 +340,30 @@ func Start() error { } } else { privateKey, err = generatePrivateKey() + if err != nil { + logrus.Fatal(err) + } } - ctx, ctxCancel := context.WithCancel(context.Background()) - node.GlobalCtx = ctx - node.GlobalCtxCancel = ctxCancel + node, err := NewNode(cfg, privateKey, DefaultPEXUpdateTime) + if err != nil { + logrus.Fatal(err) + } - node.setupNode(ctx, privateKey, DefaultPEXUpdateTime) - for { - select { - case <-ctx.Done(): - return nil - } + // log + if *verbose { + logrus.SetLevel(logrus.DebugLevel) + } else { + logrus.SetLevel(logrus.DebugLevel) + } + + //ctx, ctxCancel := context.WithCancel(context.Background()) + //node.GlobalCtx = ctx + //node.GlobalCtxCancel = ctxCancel + + err = node.Run(context.TODO()) + if err != nil { + logrus.Fatal(err) } }