Change node initialization process: implement manual dependency injection to prevent NPE issues

This commit is contained in:
ChronosX88 2020-11-27 22:47:58 +04:00
parent 003f49174b
commit e55c19a38d
Signed by: ChronosXYZ
GPG Key ID: 085A69A82C8C511A
7 changed files with 230 additions and 219 deletions

View File

@ -2,12 +2,8 @@ package main
import (
"github.com/Secured-Finance/dione/node"
"github.com/sirupsen/logrus"
)
func main() {
err := node.Start()
if err != nil {
logrus.Panic(err)
}
node.Start()
}

View File

@ -33,7 +33,8 @@ type FilecoinConfig struct {
}
type PubSubConfig struct {
ProtocolID string `mapstructure:"protocolID"`
ProtocolID string `mapstructure:"protocolID"`
ServiceTopicName string `mapstructure:"serviceTopicName"`
}
type StoreConfig struct {

1
go.mod
View File

@ -67,6 +67,7 @@ require (
github.com/whyrusleeping/cbor-gen v0.0.0-20200826160007-0b9f6c5fb163
github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208 // indirect
github.com/xorcare/golden v0.6.1-0.20191112154924-b87f686d7542
go.uber.org/fx v1.13.1 // indirect
go.uber.org/zap v1.15.0
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
golang.org/x/tools v0.0.0-20200827010519-17fd2f27a9e3 // indirect

7
go.sum
View File

@ -1135,6 +1135,11 @@ go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/dig v1.10.0 h1:yLmDDj9/zuDjv3gz8GQGviXMs9TfysIUMUilCpgzUJY=
go.uber.org/dig v1.10.0/go.mod h1:X34SnWGr8Fyla9zQNO2GSO2D+TIuqB14OS8JhYocIyw=
go.uber.org/fx v1.13.1 h1:CFNTr1oin5OJ0VCZ8EycL3wzF29Jz2g0xe55RFsf2a4=
go.uber.org/fx v1.13.1/go.mod h1:bREWhavnedxpJeTq9pQT53BbvwhUv7TcpsOqcH4a+3w=
go.uber.org/goleak v0.10.0/go.mod h1:VCZuO8V8mFPlL0F5J5GK1rtHV3DrFcQ1R8ryq7FK0aI=
go.uber.org/goleak v1.0.0 h1:qsup4IcBdlmsnGfqyLl4Ntn3C2XCCuKAE7DwHpScyUo=
go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
@ -1350,8 +1355,10 @@ golang.org/x/tools v0.0.0-20190912185636-87d9f09c5d89/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191030062658-86caa796c7ab/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191114200427-caa0b0f7d508/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191130070609-6e064ea0cf2d/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191216052735-49a3e744a425/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=

View File

@ -1,24 +0,0 @@
package node
import (
"fmt"
"github.com/Secured-Finance/dione/types"
"github.com/Secured-Finance/dione/beacon"
"github.com/Secured-Finance/dione/config"
"github.com/Secured-Finance/dione/drand"
)
// NewBeaconClient creates a new beacon chain client
func (n *Node) NewBeaconClient() (beacon.BeaconNetworks, error) {
networks := beacon.BeaconNetworks{}
bc, err := drand.NewDrandBeacon(config.ChainGenesis, config.TaskEpochInterval, n.PubSubRouter.Pubsub)
if err != nil {
return nil, fmt.Errorf("creating drand beacon: %w", err)
}
networks = append(networks, beacon.BeaconNetwork{Start: types.DrandRound(config.ChainGenesis), Beacon: bc})
// NOTE: currently we use only one network
return networks, nil
}

View File

@ -1,41 +0,0 @@
package node
import (
"context"
"github.com/sirupsen/logrus"
)
func (n *Node) subscribeOnEthContracts(ctx context.Context) {
eventChan, subscription, err := n.Ethereum.SubscribeOnOracleEvents(ctx)
if err != nil {
logrus.Fatal("Couldn't subscribe on ethereum contracts, exiting... ", err)
}
go func() {
EventLoop:
for {
select {
case event := <-eventChan:
{
task, err := n.Miner.MineTask(ctx, event)
if err != nil {
logrus.Fatal("Failed to mine task, exiting... ", err)
}
if task == nil {
continue
}
logrus.Infof("Started new consensus round with ID: %s", event.RequestID.String())
err = n.ConsensusManager.Propose(event.RequestID.String(), *task, event.RequestID, event.CallbackAddress)
if err != nil {
logrus.Errorf("Failed to propose task: %w", err)
}
}
case <-ctx.Done():
break EventLoop
case <-subscription.Err():
logrus.Fatal("Error with ethereum subscription, exiting... ", err)
}
}
}()
}

View File

@ -9,6 +9,15 @@ import (
"os"
"time"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/Secured-Finance/dione/drand"
"github.com/ethereum/go-ethereum/common"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/discovery"
"github.com/Secured-Finance/dione/rpc"
rtypes "github.com/Secured-Finance/dione/rpc/types"
@ -29,7 +38,7 @@ import (
"github.com/Secured-Finance/dione/config"
"github.com/Secured-Finance/dione/consensus"
"github.com/Secured-Finance/dione/ethclient"
"github.com/Secured-Finance/dione/pubsub"
pubsub2 "github.com/Secured-Finance/dione/pubsub"
"github.com/libp2p/go-libp2p"
crypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
@ -43,10 +52,10 @@ const (
type Node struct {
Host host.Host
PubSubRouter *pubsub.PubSubRouter
PeerDiscovery discovery.Discovery
PubSubRouter *pubsub2.PubSubRouter
GlobalCtx context.Context
GlobalCtxCancel context.CancelFunc
OracleTopic string
Config *config.Config
Ethereum *ethclient.EthereumClient
ConsensusManager *consensus.PBFTConsensusManager
@ -55,168 +64,87 @@ type Node struct {
Wallet *wallet.LocalWallet
}
func NewNode(configPath string) (*Node, error) {
cfg, err := config.NewConfig(configPath)
if err != nil {
return nil, err
}
node := &Node{
OracleTopic: "dione",
Config: cfg,
func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTime time.Duration) (*Node, error) {
n := &Node{
Config: config,
}
return node, nil
}
func (n *Node) setupNode(ctx context.Context, prvKey crypto.PrivKey, pexDiscoveryUpdateTime time.Duration) {
n.setupLibp2pHost(context.TODO(), prvKey, pexDiscoveryUpdateTime)
err := n.setupEthereumClient()
lhost, pDiscovery, err := provideLibp2pNode(n.Config, prvKey, pexDiscoveryUpdateTime)
if err != nil {
logrus.Fatal(err)
}
n.Host = lhost
n.PeerDiscovery = pDiscovery
ethClient, err := provideEthereumClient(n.Config)
if err != nil {
logrus.Fatal(err)
}
n.Ethereum = ethClient
err = n.setupRPCClients()
if err != nil {
logrus.Fatal(err)
}
n.setupPubsub()
psb := providePubsubRouter(lhost, n.Config)
n.PubSubRouter = psb
rawPrivKey, err := prvKey.Raw()
if err != nil {
logrus.Fatal(err)
}
err = n.setupBeacon()
if err != nil {
logrus.Fatal(err)
}
err = n.setupMiner(rawPrivKey)
if err != nil {
logrus.Fatal(err)
}
err = n.setupConsensusManager(rawPrivKey, n.Config.ConsensusMinApprovals)
if err != nil {
logrus.Fatalf("Failed to setup consensus manager: %w", err)
}
err = n.setupWallet(rawPrivKey)
if err != nil {
logrus.Fatal(err)
}
n.subscribeOnEthContracts(ctx)
}
func (n *Node) setupMiner(privKey []byte) error {
n.Miner = consensus.NewMiner(n.Host.ID(), *n.Ethereum.GetEthAddress(), n.Beacon, n.Ethereum, privKey)
return nil
}
func (n *Node) setupBeacon() error {
beacon, err := n.NewBeaconClient()
beacon, err := provideBeacon(psb.Pubsub)
if err != nil {
return xerrors.Errorf("failed to setup beacon: %w", err)
logrus.Fatal(err)
}
n.Beacon = beacon
return nil
}
func (n *Node) setupWallet(privKey []byte) error {
// TODO make persistent keystore
kstore := wallet.NewMemKeyStore()
keyInfo := types.KeyInfo{
Type: types.KTEd25519,
PrivateKey: privKey,
}
miner := provideMiner(n.Host.ID(), *n.Ethereum.GetEthAddress(), n.Beacon, n.Ethereum, rawPrivKey)
n.Miner = miner
kstore.Put(wallet.KNamePrefix+n.Host.ID().String(), keyInfo)
w, err := wallet.NewWallet(kstore)
cManager := provideConsensusManager(psb, miner, ethClient, rawPrivKey, n.Config.ConsensusMinApprovals)
n.ConsensusManager = cManager
wallet, err := provideWallet(n.Host.ID(), rawPrivKey)
if err != nil {
return xerrors.Errorf("failed to setup wallet: %w", err)
logrus.Fatal(err)
}
n.Wallet = w
return nil
n.Wallet = wallet
return n, nil
}
func (n *Node) setupEthereumClient() error {
ethereum := ethclient.NewEthereumClient()
n.Ethereum = ethereum
return ethereum.Initialize(context.Background(),
n.Config.Ethereum.GatewayAddress,
n.Config.Ethereum.PrivateKey,
n.Config.Ethereum.OracleEmitterContractAddress,
n.Config.Ethereum.AggregatorContractAddress,
n.Config.Ethereum.DioneStakingContractAddress,
)
}
func (n *Node) Run(ctx context.Context) error {
n.runLibp2pAsync(ctx)
n.subscribeOnEthContractsAsync(ctx)
func (n *Node) setupRPCClients() error {
fc := filecoin.NewLotusClient()
rpc.RegisterRPC(rtypes.RPCTypeFilecoin, map[string]func(string) (string, error){
"getTransaction": fc.GetTransaction,
})
sl := solana2.NewSolanaClient()
rpc.RegisterRPC(rtypes.RPCTypeSolana, map[string]func(string) (string, error){
"getTransaction": sl.GetTransaction,
})
return nil
}
func (n *Node) setupPubsub() {
n.PubSubRouter = pubsub.NewPubSubRouter(n.Host, n.OracleTopic)
// wait for setting up pubsub
//time.Sleep(3 * time.Second)
}
func (n *Node) setupConsensusManager(privateKey []byte, minApprovals int) error {
n.ConsensusManager = consensus.NewPBFTConsensusManager(n.PubSubRouter, minApprovals, privateKey, n.Ethereum, n.Miner)
return nil
}
func (n *Node) setupLibp2pHost(ctx context.Context, privateKey crypto.PrivKey, pexDiscoveryUpdateTime time.Duration) {
listenMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", n.Config.ListenAddr, n.Config.ListenPort))
if err != nil {
logrus.Fatal("Failed to generate new node multiaddress:", err)
}
host, err := libp2p.New(
ctx,
libp2p.ListenAddrs(listenMultiAddr),
libp2p.Identity(privateKey),
)
if err != nil {
logrus.Fatal("Failed to set a new libp2p node:", err)
}
n.Host = host
logrus.Info(fmt.Sprintf("[*] Your Multiaddress Is: /ip4/%s/tcp/%d/p2p/%s", n.Config.ListenAddr, n.Config.ListenPort, host.ID().Pretty()))
var bootstrapMaddrs []multiaddr.Multiaddr
for _, a := range n.Config.BootstrapNodes {
maddr, err := multiaddr.NewMultiaddr(a)
if err != nil {
logrus.Fatalf("Invalid multiaddress of bootstrap node: %s", err.Error())
for {
select {
case <-ctx.Done():
return nil
}
bootstrapMaddrs = append(bootstrapMaddrs, maddr)
}
if n.Config.IsBootstrap {
bootstrapMaddrs = nil
}
discovery, err := pex.NewPEXDiscovery(host, bootstrapMaddrs, pexDiscoveryUpdateTime)
if err != nil {
logrus.Fatal("Can't set up PEX discovery protocol, exiting... ", err)
}
return nil
}
func (n *Node) runLibp2pAsync(ctx context.Context) error {
logrus.Info(fmt.Sprintf("[*] Your Multiaddress Is: /ip4/%s/tcp/%d/p2p/%s", n.Config.ListenAddr, n.Config.ListenPort, n.Host.ID().Pretty()))
logrus.Info("Announcing ourselves...")
_, err = discovery.Advertise(context.TODO(), n.Config.Rendezvous)
_, err := n.PeerDiscovery.Advertise(context.TODO(), n.Config.Rendezvous)
if err != nil {
logrus.Fatalf("Failed to announce this node to the network: %s", err.Error())
return xerrors.Errorf("failed to announce this node to the network: %v", err)
}
logrus.Info("Successfully announced!")
// Discover unbounded count of peers
logrus.Info("Searching for other peers...")
peerChan, err := discovery.FindPeers(context.TODO(), n.Config.Rendezvous)
peerChan, err := n.PeerDiscovery.FindPeers(context.TODO(), n.Config.Rendezvous)
if err != nil {
logrus.Fatal("Failed to find new peers, exiting...", err)
return xerrors.Errorf("failed to find new peers: %v", err)
}
go func() {
MainLoop:
@ -242,30 +170,161 @@ func (n *Node) setupLibp2pHost(ctx context.Context, privateKey crypto.PrivKey, p
}
}
}()
return nil
}
func Start() error {
func (n *Node) subscribeOnEthContractsAsync(ctx context.Context) {
eventChan, subscription, err := n.Ethereum.SubscribeOnOracleEvents(ctx)
if err != nil {
logrus.Fatal("Couldn't subscribe on ethereum contracts, exiting... ", err)
}
go func() {
EventLoop:
for {
select {
case event := <-eventChan:
{
task, err := n.Miner.MineTask(ctx, event)
if err != nil {
logrus.Fatal("Failed to mine task, exiting... ", err)
}
if task == nil {
continue
}
logrus.Infof("Started new consensus round with ID: %s", event.RequestID.String())
err = n.ConsensusManager.Propose(event.RequestID.String(), *task, event.RequestID, event.CallbackAddress)
if err != nil {
logrus.Errorf("Failed to propose task: %w", err)
}
}
case <-ctx.Done():
break EventLoop
case <-subscription.Err():
logrus.Fatal("Error with ethereum subscription, exiting... ", err)
}
}
}()
}
func provideMiner(peerID peer.ID, ethAddress common.Address, beacon beacon.BeaconNetworks, ethClient *ethclient.EthereumClient, privateKey []byte) *consensus.Miner {
return consensus.NewMiner(peerID, ethAddress, beacon, ethClient, privateKey)
}
func provideBeacon(ps *pubsub.PubSub) (beacon.BeaconNetworks, error) {
networks := beacon.BeaconNetworks{}
bc, err := drand.NewDrandBeacon(config.ChainGenesis, config.TaskEpochInterval, ps)
if err != nil {
return nil, fmt.Errorf("failed to setup drand beacon: %w", err)
}
networks = append(networks, beacon.BeaconNetwork{Start: types.DrandRound(config.ChainGenesis), Beacon: bc})
// NOTE: currently we use only one network
return networks, nil
}
func provideWallet(peerID peer.ID, privKey []byte) (*wallet.LocalWallet, error) {
// TODO make persistent keystore
kstore := wallet.NewMemKeyStore()
keyInfo := types.KeyInfo{
Type: types.KTEd25519,
PrivateKey: privKey,
}
kstore.Put(wallet.KNamePrefix+peerID.String(), keyInfo)
w, err := wallet.NewWallet(kstore)
if err != nil {
return nil, xerrors.Errorf("failed to setup wallet: %w", err)
}
return w, nil
}
func provideEthereumClient(config *config.Config) (*ethclient.EthereumClient, error) {
ethereum := ethclient.NewEthereumClient()
err := ethereum.Initialize(context.Background(),
config.Ethereum.GatewayAddress,
config.Ethereum.PrivateKey,
config.Ethereum.OracleEmitterContractAddress,
config.Ethereum.AggregatorContractAddress,
config.Ethereum.DioneStakingContractAddress,
)
if err != nil {
return nil, xerrors.Errorf("failed to initialize ethereum client: %v", err)
}
return ethereum, nil
}
func (n *Node) setupRPCClients() error {
fc := filecoin.NewLotusClient()
rpc.RegisterRPC(rtypes.RPCTypeFilecoin, map[string]func(string) (string, error){
"getTransaction": fc.GetTransaction,
})
sl := solana2.NewSolanaClient()
rpc.RegisterRPC(rtypes.RPCTypeSolana, map[string]func(string) (string, error){
"getTransaction": sl.GetTransaction,
})
return nil
}
func providePubsubRouter(lhost host.Host, config *config.Config) *pubsub2.PubSubRouter {
return pubsub2.NewPubSubRouter(lhost, config.PubSub.ServiceTopicName)
}
func provideConsensusManager(psb *pubsub2.PubSubRouter, miner *consensus.Miner, ethClient *ethclient.EthereumClient, privateKey []byte, minApprovals int) *consensus.PBFTConsensusManager {
return consensus.NewPBFTConsensusManager(psb, minApprovals, privateKey, ethClient, miner)
}
func provideLibp2pNode(config *config.Config, privateKey crypto.PrivKey, pexDiscoveryUpdateTime time.Duration) (host.Host, *pex.PEXDiscovery, error) {
listenMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", config.ListenAddr, config.ListenPort))
if err != nil {
return nil, nil, xerrors.Errorf("failed to parse multiaddress: %v", err)
}
host, err := libp2p.New(
context.TODO(),
libp2p.ListenAddrs(listenMultiAddr),
libp2p.Identity(privateKey),
)
if err != nil {
return nil, nil, xerrors.Errorf("failed to setup libp2p host: %v", err)
}
var bootstrapMaddrs []multiaddr.Multiaddr
for _, a := range config.BootstrapNodes {
maddr, err := multiaddr.NewMultiaddr(a)
if err != nil {
return nil, nil, xerrors.Errorf("invalid multiaddress of bootstrap node: %v", err)
}
bootstrapMaddrs = append(bootstrapMaddrs, maddr)
}
if config.IsBootstrap {
bootstrapMaddrs = nil
}
discovery, err := pex.NewPEXDiscovery(host, bootstrapMaddrs, pexDiscoveryUpdateTime)
if err != nil {
return nil, nil, xerrors.Errorf("failed to setup pex discovery: %v", err)
}
return host, discovery, nil
}
func Start() {
configPath := flag.String("config", "", "Path to config")
verbose := flag.Bool("verbose", false, "Verbose logging")
flag.Parse()
if *configPath == "" {
return fmt.Errorf("no config path provided")
}
node, err := NewNode(*configPath)
if *verbose {
logrus.SetLevel(logrus.DebugLevel)
} else {
logrus.SetLevel(logrus.DebugLevel)
logrus.Fatal("no config path provided")
}
cfg, err := config.NewConfig(*configPath)
if err != nil {
logrus.Panic(err)
logrus.Fatalf("failed to load config: %v", err)
}
var privateKey crypto.PrivKey
if node.Config.IsBootstrap {
if cfg.IsBootstrap {
if _, err := os.Stat(".bootstrap_privkey"); os.IsNotExist(err) {
privateKey, err = generatePrivateKey()
if err != nil {
@ -281,18 +340,30 @@ func Start() error {
}
} else {
privateKey, err = generatePrivateKey()
if err != nil {
logrus.Fatal(err)
}
}
ctx, ctxCancel := context.WithCancel(context.Background())
node.GlobalCtx = ctx
node.GlobalCtxCancel = ctxCancel
node, err := NewNode(cfg, privateKey, DefaultPEXUpdateTime)
if err != nil {
logrus.Fatal(err)
}
node.setupNode(ctx, privateKey, DefaultPEXUpdateTime)
for {
select {
case <-ctx.Done():
return nil
}
// log
if *verbose {
logrus.SetLevel(logrus.DebugLevel)
} else {
logrus.SetLevel(logrus.DebugLevel)
}
//ctx, ctxCancel := context.WithCancel(context.Background())
//node.GlobalCtx = ctx
//node.GlobalCtxCancel = ctxCancel
err = node.Run(context.TODO())
if err != nil {
logrus.Fatal(err)
}
}