From 5e0c7f02fa887176912590d2c1a33b105110f299 Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Thu, 22 Jul 2021 00:56:58 +0300 Subject: [PATCH] Integrate Uber's Fx DI framework, refactor node init code massively --- beacon/drand/drand.go | 22 +- blockchain/blockchain.go | 18 +- blockchain/sync/sync_mgr.go | 14 +- config/config.go | 1 + consensus/consensus.go | 17 +- consensus/consensus_validator.go | 9 +- consensus/dispute_manager.go | 44 ++-- go.mod | 2 +- go.sum | 5 + node/flags.go | 6 + node/node.go | 393 +++++++------------------------ node/node_dep_providers.go | 290 ++++++++++++++++++----- pubsub/pubsub_router.go | 18 +- 13 files changed, 426 insertions(+), 413 deletions(-) create mode 100644 node/flags.go diff --git a/beacon/drand/drand.go b/beacon/drand/drand.go index b555ed6..00e1ef6 100644 --- a/beacon/drand/drand.go +++ b/beacon/drand/drand.go @@ -83,20 +83,23 @@ func NewDrandBeacon(ps *pubsub.PubSub, bus EventBus.Bus) (*DrandBeacon, error) { DrandClient: drandClient, localCache: make(map[uint64]types.BeaconEntry), bus: bus, + PublicKey: drandChain.PublicKey, } - db.PublicKey = drandChain.PublicKey - - db.drandResultChannel = db.DrandClient.Watch(context.TODO()) - err = db.getLatestDrandResult() - if err != nil { - return nil, err - } - go db.loop(context.TODO()) - return db, nil } +func (db *DrandBeacon) Run(ctx context.Context) error { + db.drandResultChannel = db.DrandClient.Watch(ctx) + err := db.getLatestDrandResult() + if err != nil { + return err + } + go db.loop(ctx) + + return nil +} + func (db *DrandBeacon) getLatestDrandResult() error { latestDround, err := db.DrandClient.Get(context.TODO(), 0) if err != nil { @@ -113,6 +116,7 @@ func (db *DrandBeacon) loop(ctx context.Context) { select { case <-ctx.Done(): { + logrus.Debug("Stopping watching new DRAND entries...") return } case res := <-db.drandResultChannel: diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index bcd4a4a..a634a5a 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -10,6 +10,8 @@ import ( "os" "sync" + drand2 "github.com/Secured-Finance/dione/beacon/drand" + "github.com/Secured-Finance/dione/beacon" "github.com/Secured-Finance/dione/consensus/validation" @@ -47,16 +49,16 @@ type BlockChain struct { metadataIndex *utils.Index heightIndex *utils.Index - bus EventBus.Bus - miner *Miner - b beacon.BeaconAPI + bus EventBus.Bus + miner *Miner + drandBeacon *drand2.DrandBeacon } -func NewBlockChain(path string, bus EventBus.Bus, miner *Miner, b beacon.BeaconAPI) (*BlockChain, error) { +func NewBlockChain(path string, bus EventBus.Bus, miner *Miner, db *drand2.DrandBeacon) (*BlockChain, error) { chain := &BlockChain{ - bus: bus, - miner: miner, - b: b, + bus: bus, + miner: miner, + drandBeacon: db, } // configure lmdb env @@ -357,7 +359,7 @@ func (bc *BlockChain) ValidateBlock(block *types2.Block) error { return err } - res, err := bc.b.Entry(context.TODO(), block.Header.ElectionProof.RandomnessRound) + res, err := bc.drandBeacon.Entry(context.TODO(), block.Header.ElectionProof.RandomnessRound) if err != nil { return err } diff --git a/blockchain/sync/sync_mgr.go b/blockchain/sync/sync_mgr.go index 43d89c3..da9ef4e 100644 --- a/blockchain/sync/sync_mgr.go +++ b/blockchain/sync/sync_mgr.go @@ -37,7 +37,9 @@ import ( gorpc "github.com/libp2p/go-libp2p-gorpc" ) -type SyncManager interface{} +type SyncManager interface { + Run() +} type syncManager struct { blockpool *blockchain.BlockChain @@ -66,16 +68,18 @@ func NewSyncManager(bus EventBus.Bus, bc *blockchain.BlockChain, mp *pool.Mempoo psb: psb, } - psb.Hook(pubsub.NewTxMessageType, sm.onNewTransaction) - psb.Hook(pubsub.NewBlockMessageType, sm.onNewBlock) + return sm +} + +func (sm *syncManager) Run() { + sm.psb.Hook(pubsub.NewTxMessageType, sm.onNewTransaction) + sm.psb.Hook(pubsub.NewBlockMessageType, sm.onNewBlock) go func() { if err := sm.initialSync(); err != nil { logrus.Error(err) } }() - - return sm } func (sm *syncManager) initialSync() error { diff --git a/config/config.go b/config/config.go index 8f51f52..337a6b7 100644 --- a/config/config.go +++ b/config/config.go @@ -20,6 +20,7 @@ type Config struct { Redis RedisConfig `mapstructure:"redis"` CacheType string `mapstructure:"cache_type"` Blockchain BlockchainConfig `mapstructure:"blockchain"` + PrivateKeyPath string `mapstructure:"private_key_path"` } type EthereumConfig struct { diff --git a/consensus/consensus.go b/consensus/consensus.go index b6df23a..0e2909f 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -7,9 +7,9 @@ import ( "math/big" "sync" - "github.com/libp2p/go-libp2p-core/peer" + drand2 "github.com/Secured-Finance/dione/beacon/drand" - "github.com/Secured-Finance/dione/beacon" + "github.com/libp2p/go-libp2p-core/peer" "github.com/fxamacker/cbor/v2" @@ -82,14 +82,14 @@ func NewPBFTConsensusManager( miner *blockchain.Miner, bc *blockchain.BlockChain, bp *pool.BlockPool, - b beacon.BeaconNetwork, + db *drand2.DrandBeacon, mempool *pool.Mempool, address peer.ID, ) *PBFTConsensusManager { pcm := &PBFTConsensusManager{ psb: psb, miner: miner, - validator: NewConsensusValidator(miner, bc, b), + validator: NewConsensusValidator(miner, bc, db), msgLog: NewConsensusMessageLog(), minApprovals: minApprovals, privKey: privKey, @@ -105,15 +105,18 @@ func NewPBFTConsensusManager( address: address, } + return pcm +} + +func (pcm *PBFTConsensusManager) Run() { pcm.psb.Hook(pubsub.PrePrepareMessageType, pcm.handlePrePrepare) pcm.psb.Hook(pubsub.PrepareMessageType, pcm.handlePrepare) pcm.psb.Hook(pubsub.CommitMessageType, pcm.handleCommit) - bus.SubscribeAsync("beacon:newEntry", func(entry types2.BeaconEntry) { + pcm.bus.SubscribeAsync("beacon:newEntry", func(entry types2.BeaconEntry) { pcm.onNewBeaconEntry(entry) }, true) height, _ := pcm.blockchain.GetLatestBlockHeight() pcm.state.blockHeight = height + 1 - return pcm } func (pcm *PBFTConsensusManager) propose(blk *types3.Block) error { @@ -194,7 +197,7 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) { } if _, err := pcm.blockPool.GetBlock(cmsg.Blockhash); errors.Is(err, cache.ErrNotFound) { - logrus.WithField("blockHash", cmsg.Blockhash).Warnf("received unknown block %x", cmsg.Blockhash) + logrus.WithField("blockHash", hex.EncodeToString(cmsg.Blockhash)).Warnf("received unknown block %x", cmsg.Blockhash) return } diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index a0cf8da..7bf49cb 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -3,7 +3,8 @@ package consensus import ( "encoding/hex" - "github.com/Secured-Finance/dione/beacon" + drand2 "github.com/Secured-Finance/dione/beacon/drand" + "github.com/sirupsen/logrus" "github.com/Secured-Finance/dione/blockchain" @@ -13,15 +14,15 @@ import ( type ConsensusValidator struct { validationFuncMap map[types2.ConsensusMessageType]func(msg types2.ConsensusMessage) bool miner *blockchain.Miner - beacon beacon.BeaconNetwork + beacon *drand2.DrandBeacon blockchain *blockchain.BlockChain } -func NewConsensusValidator(miner *blockchain.Miner, bc *blockchain.BlockChain, b beacon.BeaconNetwork) *ConsensusValidator { +func NewConsensusValidator(miner *blockchain.Miner, bc *blockchain.BlockChain, db *drand2.DrandBeacon) *ConsensusValidator { cv := &ConsensusValidator{ miner: miner, blockchain: bc, - beacon: b, + beacon: db, } cv.validationFuncMap = map[types2.ConsensusMessageType]func(msg types2.ConsensusMessage) bool{ diff --git a/consensus/dispute_manager.go b/consensus/dispute_manager.go index 672ca9c..28e3ba6 100644 --- a/consensus/dispute_manager.go +++ b/consensus/dispute_manager.go @@ -5,6 +5,8 @@ import ( "encoding/hex" "time" + "github.com/ethereum/go-ethereum/event" + types2 "github.com/Secured-Finance/dione/blockchain/types" "github.com/Secured-Finance/dione/types" @@ -28,51 +30,63 @@ type DisputeManager struct { disputeMap map[string]*dioneDispute.DioneDisputeNewDispute voteWindow time.Duration blockchain *blockchain.BlockChain + + submissionChan chan *dioneOracle.DioneOracleSubmittedOracleRequest + submissionSubscription event.Subscription + + disputesChan chan *dioneDispute.DioneDisputeNewDispute + disputesSubscription event.Subscription } func NewDisputeManager(ctx context.Context, ethClient *ethclient.EthereumClient, pcm *PBFTConsensusManager, voteWindow int, bc *blockchain.BlockChain) (*DisputeManager, error) { - newSubmittionsChan, submSubscription, err := ethClient.SubscribeOnNewSubmittions(ctx) + submissionChan, submSubscription, err := ethClient.SubscribeOnNewSubmittions(ctx) if err != nil { return nil, err } - newDisputesChan, dispSubscription, err := ethClient.SubscribeOnNewDisputes(ctx) + disputesChan, dispSubscription, err := ethClient.SubscribeOnNewDisputes(ctx) if err != nil { return nil, err } dm := &DisputeManager{ - ethClient: ethClient, - pcm: pcm, - ctx: ctx, - submissionMap: map[string]*dioneOracle.DioneOracleSubmittedOracleRequest{}, - disputeMap: map[string]*dioneDispute.DioneDisputeNewDispute{}, - voteWindow: time.Duration(voteWindow) * time.Second, - blockchain: bc, + ethClient: ethClient, + pcm: pcm, + ctx: ctx, + submissionMap: map[string]*dioneOracle.DioneOracleSubmittedOracleRequest{}, + disputeMap: map[string]*dioneDispute.DioneDisputeNewDispute{}, + voteWindow: time.Duration(voteWindow) * time.Second, + blockchain: bc, + submissionChan: submissionChan, + submissionSubscription: submSubscription, + disputesChan: disputesChan, + disputesSubscription: dispSubscription, } + return dm, nil +} + +func (dm *DisputeManager) Run(ctx context.Context) { go func() { for { select { case <-ctx.Done(): { - submSubscription.Unsubscribe() - dispSubscription.Unsubscribe() + dm.disputesSubscription.Unsubscribe() + dm.disputesSubscription.Unsubscribe() return } - case s := <-newSubmittionsChan: + case s := <-dm.submissionChan: { dm.onNewSubmission(s) } - case d := <-newDisputesChan: + case d := <-dm.disputesChan: { dm.onNewDispute(d) } } } }() - - return dm, nil } func (dm *DisputeManager) onNewSubmission(submission *dioneOracle.DioneOracleSubmittedOracleRequest) { diff --git a/go.mod b/go.mod index 1b53929..4b954af 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,6 @@ require ( github.com/ethereum/go-ethereum v1.9.25 github.com/filecoin-project/go-address v0.0.5 github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 - github.com/filecoin-project/go-state-types v0.1.0 github.com/filecoin-project/lotus v1.6.0 github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 // indirect github.com/fxamacker/cbor/v2 v2.3.0 @@ -57,6 +56,7 @@ require ( github.com/valyala/fasthttp v1.17.0 github.com/wealdtech/go-merkletree v1.0.1-0.20190605192610-2bb163c2ea2a github.com/whyrusleeping/cbor-gen v0.0.0-20210219115102-f37d292932f2 + go.uber.org/fx v1.13.1 go.uber.org/multierr v1.7.0 // indirect go.uber.org/zap v1.17.0 golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 diff --git a/go.sum b/go.sum index 57fe94b..bf32893 100644 --- a/go.sum +++ b/go.sum @@ -1820,8 +1820,12 @@ go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/dig v1.10.0 h1:yLmDDj9/zuDjv3gz8GQGviXMs9TfysIUMUilCpgzUJY= go.uber.org/dig v1.10.0/go.mod h1:X34SnWGr8Fyla9zQNO2GSO2D+TIuqB14OS8JhYocIyw= go.uber.org/fx v1.9.0/go.mod h1:mFdUyAUuJ3w4jAckiKSKbldsxy1ojpAMJ+dVZg5Y0Aw= +go.uber.org/fx v1.13.1 h1:CFNTr1oin5OJ0VCZ8EycL3wzF29Jz2g0xe55RFsf2a4= +go.uber.org/fx v1.13.1/go.mod h1:bREWhavnedxpJeTq9pQT53BbvwhUv7TcpsOqcH4a+3w= +go.uber.org/goleak v0.10.0/go.mod h1:VCZuO8V8mFPlL0F5J5GK1rtHV3DrFcQ1R8ryq7FK0aI= go.uber.org/goleak v1.0.0 h1:qsup4IcBdlmsnGfqyLl4Ntn3C2XCCuKAE7DwHpScyUo= go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= @@ -2129,6 +2133,7 @@ golang.org/x/tools v0.0.0-20191030062658-86caa796c7ab/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191113191852-77e3bb0ad9e7/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191114200427-caa0b0f7d508/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191115202509-3a792d9c32b2/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= diff --git a/node/flags.go b/node/flags.go new file mode 100644 index 0000000..dd4a760 --- /dev/null +++ b/node/flags.go @@ -0,0 +1,6 @@ +package node + +type AppFlags struct { + ConfigPath string + Verbose bool +} diff --git a/node/node.go b/node/node.go index 786ba34..615c86b 100644 --- a/node/node.go +++ b/node/node.go @@ -2,20 +2,17 @@ package node import ( "context" - "crypto/rand" - "flag" - "fmt" - "io/ioutil" - "os" - "path" - "runtime" "time" - "github.com/Secured-Finance/dione/blockchain" + drand2 "github.com/Secured-Finance/dione/beacon/drand" - "github.com/multiformats/go-multiaddr" + "github.com/Secured-Finance/dione/pubsub" - "github.com/asaskevich/EventBus" + "github.com/Secured-Finance/dione/consensus" + + "github.com/Secured-Finance/dione/blockchain/sync" + + "go.uber.org/fx" "github.com/fxamacker/cbor/v2" @@ -23,31 +20,16 @@ import ( types2 "github.com/Secured-Finance/dione/blockchain/types" - gorpc "github.com/libp2p/go-libp2p-gorpc" - "github.com/Secured-Finance/dione/blockchain/pool" - "github.com/Secured-Finance/dione/blockchain/sync" - - "github.com/Secured-Finance/dione/consensus" - pubsub2 "github.com/Secured-Finance/dione/pubsub" - "github.com/libp2p/go-libp2p-core/discovery" "github.com/Secured-Finance/dione/rpc" - rtypes "github.com/Secured-Finance/dione/rpc/types" - - solana2 "github.com/Secured-Finance/dione/rpc/solana" - - "github.com/Secured-Finance/dione/rpc/filecoin" "golang.org/x/xerrors" - "github.com/Secured-Finance/dione/beacon" - "github.com/Secured-Finance/dione/config" "github.com/Secured-Finance/dione/ethclient" - "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" "github.com/sirupsen/logrus" ) @@ -56,192 +38,61 @@ const ( DefaultPEXUpdateTime = 6 * time.Second ) -type Node struct { - Host host.Host - PeerDiscovery discovery.Discovery - PubSubRouter *pubsub2.PubSubRouter - GlobalCtx context.Context - GlobalCtxCancel context.CancelFunc - Config *config.Config - Ethereum *ethclient.EthereumClient - ConsensusManager *consensus.PBFTConsensusManager - Miner *blockchain.Miner - Beacon beacon.BeaconNetwork - DisputeManager *consensus.DisputeManager - BlockPool *pool.BlockPool - MemPool *pool.Mempool - BlockChain *blockchain.BlockChain - SyncManager sync.SyncManager - NetworkService *NetworkService - NetworkRPCHost *gorpc.Server - Bus EventBus.Bus - //Cache cache.Cache - //Wallet *wallet.LocalWallet -} +func runNode( + lc fx.Lifecycle, + cfg *config.Config, + disco discovery.Discovery, + ethClient *ethclient.EthereumClient, + h host.Host, + mp *pool.Mempool, + syncManager sync.SyncManager, + consensusManager *consensus.PBFTConsensusManager, + pubSubRouter *pubsub.PubSubRouter, + disputeManager *consensus.DisputeManager, + db *drand2.DrandBeacon, +) { + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + err := runLibp2pAsync(context.TODO(), h, cfg, disco) + if err != nil { + return err + } -func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTime time.Duration) (*Node, error) { - n := &Node{ - Config: config, - } + err = db.Run(context.TODO()) + if err != nil { + return err + } - bus := EventBus.New() - n.Bus = bus + // Run pubsub router + pubSubRouter.Run() - // initialize libp2p host - lhost, err := provideLibp2pHost(n.Config, prvKey) - if err != nil { - logrus.Fatal(err) - } - n.Host = lhost - logrus.WithField( - "multiaddress", - fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", - n.Config.ListenAddr, - n.Config.ListenPort, - n.Host.ID().Pretty(), - )).Info("Libp2p host has been initialized!") + // Subscribe on new requests event channel from Ethereum + err = subscribeOnEthContractsAsync(context.TODO(), ethClient, mp) + if err != nil { + return err + } - // initialize ethereum client - ethClient, err := provideEthereumClient(n.Config) - if err != nil { - logrus.WithField("err", err.Error()).Fatal("Failed to initialize Ethereum client") - } - n.Ethereum = ethClient - //goland:noinspection ALL - logrus.WithField("ethAddress", ethClient.GetEthAddress().Hex()).Info("Ethereum client has been initialized!") + // Run blockchain sync manager + syncManager.Run() - // initialize blockchain rpc clients - err = n.setupRPCClients() - if err != nil { - logrus.Fatal(err) - } - logrus.Info("Foreign Blockchain RPC clients has been successfully configured!") + // Run consensus manager + consensusManager.Run() - // initialize pubsub subsystem - psb := providePubsubRouter(lhost, n.Config) - n.PubSubRouter = psb - logrus.Info("PubSub subsystem has been initialized!") + // Run dispute manager + disputeManager.Run(context.TODO()) - // get list of bootstrap multiaddresses - baddrs, err := provideBootstrapAddrs(n.Config) - if err != nil { - logrus.Fatal(err) - } - - // initialize peer discovery - peerDiscovery, err := providePeerDiscovery(baddrs, lhost, pexDiscoveryUpdateTime) - if err != nil { - logrus.Fatal(err) - } - n.PeerDiscovery = peerDiscovery - logrus.Info("Peer discovery subsystem has been initialized!") - - // initialize random beacon network subsystem - randomBeaconNetwork, err := provideBeacon(psb.Pubsub, bus) - if err != nil { - logrus.Fatal(err) - } - n.Beacon = randomBeaconNetwork - logrus.Info("Random beacon subsystem has been initialized!") - - // == initialize blockchain modules - - // initialize mempool - mp, err := provideMemPool(bus) - if err != nil { - logrus.Fatalf("Failed to initialize mempool: %s", err.Error()) - } - n.MemPool = mp - logrus.Info("Mempool has been successfully initialized!") - - // initialize mining subsystem - miner := provideMiner(n.Host.ID(), *n.Ethereum.GetEthAddress(), n.Ethereum, prvKey, mp) - n.Miner = miner - logrus.Info("Mining subsystem has been initialized!") - - // initialize blockpool database - bc, err := provideBlockChain(n.Config, bus, miner, randomBeaconNetwork.Beacon) - if err != nil { - logrus.Fatalf("Failed to initialize blockpool: %s", err.Error()) - } - n.BlockChain = bc - logrus.Info("Block pool database has been successfully initialized!") - - bp, err := provideBlockPool(mp, bus) - if err != nil { - logrus.Fatalf("Failed to initialize blockpool: %s", err.Error()) - } - n.BlockPool = bp - logrus.Info("Blockpool has been successfully initialized!") - - ns := provideNetworkService(bc, mp) - n.NetworkService = ns - rpcHost := provideNetworkRPCHost(lhost) - err = rpcHost.Register(ns) - if err != nil { - logrus.Fatal(err) - } - logrus.Info("Direct RPC has been successfully initialized!") - - // initialize libp2p-gorpc client - r := provideP2PRPCClient(lhost) - - // initialize sync manager - - var baddr multiaddr.Multiaddr - if len(baddrs) == 0 { - baddr = nil - } else { - baddr = baddrs[0] - } - sm, err := provideSyncManager(bus, bc, mp, r, baddr, psb) // FIXME here we just pick up first bootstrap in list - if err != nil { - logrus.Fatal(err) - } - n.SyncManager = sm - logrus.Info("Blockchain sync subsystem has been successfully initialized!") - - // initialize consensus subsystem - consensusManager := provideConsensusManager(bus, psb, miner, bc, ethClient, prvKey, n.Config.ConsensusMinApprovals, bp, randomBeaconNetwork, mp, n.Host.ID()) - n.ConsensusManager = consensusManager - logrus.Info("Consensus subsystem has been initialized!") - - // initialize dispute subsystem - disputeManager, err := provideDisputeManager(context.TODO(), ethClient, consensusManager, config, bc) - if err != nil { - logrus.Fatal(err) - } - n.DisputeManager = disputeManager - logrus.Info("Dispute subsystem has been initialized!") - - // initialize internal eth wallet - //w, err := provideWallet(n.Host.ID(), rawPrivKey) - //if err != nil { - // logrus.Fatal(err) - //} - //n.Wallet = w - - return n, nil -} - -func (n *Node) Run(ctx context.Context) error { - err := n.runLibp2pAsync(ctx) - if err != nil { - return err - } - n.subscribeOnEthContractsAsync(ctx) - - for { - select { - case <-ctx.Done(): return nil - } - } + }, + OnStop: func(ctx context.Context) error { + // TODO + return nil + }, + }) } -func (n *Node) runLibp2pAsync(ctx context.Context) error { +func runLibp2pAsync(ctx context.Context, h host.Host, cfg *config.Config, disco discovery.Discovery) error { logrus.Info("Announcing ourselves...") - _, err := n.PeerDiscovery.Advertise(context.TODO(), n.Config.Rendezvous) + _, err := disco.Advertise(context.TODO(), cfg.Rendezvous) if err != nil { return xerrors.Errorf("failed to announce this node to the network: %v", err) } @@ -249,7 +100,7 @@ func (n *Node) runLibp2pAsync(ctx context.Context) error { // Discover unbounded count of peers logrus.Info("Searching for other peers...") - peerChan, err := n.PeerDiscovery.FindPeers(context.TODO(), n.Config.Rendezvous) + peerChan, err := disco.FindPeers(context.TODO(), cfg.Rendezvous) if err != nil { return xerrors.Errorf("failed to find new peers: %v", err) } @@ -264,12 +115,12 @@ func (n *Node) runLibp2pAsync(ctx context.Context) error { if len(newPeer.Addrs) == 0 { continue } - if newPeer.ID.String() == n.Host.ID().String() { + if newPeer.ID.String() == h.ID().String() { continue } logrus.WithField("peer", newPeer.ID).Info("Discovered new peer, connecting...") // Connect to the peer - if err := n.Host.Connect(ctx, newPeer); err != nil { + if err := h.Connect(ctx, newPeer); err != nil { logrus.WithFields(logrus.Fields{ "peer": newPeer.ID, "err": err.Error(), @@ -283,10 +134,10 @@ func (n *Node) runLibp2pAsync(ctx context.Context) error { return nil } -func (n *Node) subscribeOnEthContractsAsync(ctx context.Context) { - eventChan, subscription, err := n.Ethereum.SubscribeOnOracleEvents(ctx) +func subscribeOnEthContractsAsync(ctx context.Context, ethClient *ethclient.EthereumClient, mp *pool.Mempool) error { + eventChan, subscription, err := ethClient.SubscribeOnOracleEvents(ctx) if err != nil { - logrus.Fatal("Couldn't subscribe on ethereum contracts, exiting... ", err) + return err } go func() { @@ -318,7 +169,7 @@ func (n *Node) subscribeOnEthContractsAsync(ctx context.Context) { continue } tx := types2.CreateTransaction(data) - err = n.MemPool.StoreTx(tx) + err = mp.StoreTx(tx) if err != nil { logrus.Errorf("Failed to store tx in mempool: %s", err.Error()) continue @@ -326,107 +177,45 @@ func (n *Node) subscribeOnEthContractsAsync(ctx context.Context) { } case <-ctx.Done(): break EventLoop - case <-subscription.Err(): - logrus.Fatal("Error with ethereum subscription, exiting... ", err) + case err := <-subscription.Err(): + logrus.Fatalf("Error has occurred in subscription to Ethereum event channel: %s", err.Error()) } } }() -} - -func (n *Node) setupRPCClients() error { - fc := filecoin.NewLotusClient() - rpc.RegisterRPC(rtypes.RPCTypeFilecoin, map[string]func(string) ([]byte, error){ - "getTransaction": fc.GetTransaction, - "getBlock": fc.GetBlock, - }) - - sl := solana2.NewSolanaClient() - rpc.RegisterRPC(rtypes.RPCTypeSolana, map[string]func(string) ([]byte, error){ - "getTransaction": sl.GetTransaction, - }) return nil } func Start() { - logrus.SetReportCaller(true) - logrus.SetFormatter(&logrus.TextFormatter{ - FullTimestamp: true, - CallerPrettyfier: func(f *runtime.Frame) (string, string) { - filename := path.Base(f.File) - return "", fmt.Sprintf("%s:%d:", filename, f.Line) - }, - }) - - configPath := flag.String("config", "", "Path to config") - verbose := flag.Bool("verbose", false, "Verbose logging") - flag.Parse() - - if *configPath == "" { - logrus.Fatal("no config path provided") - } - cfg, err := config.NewConfig(*configPath) - if err != nil { - logrus.Fatalf("failed to load config: %v", err) - } - - var privateKey crypto.PrivKey - - if cfg.IsBootstrap { - // FIXME just a little hack - if _, err := os.Stat(".bootstrap_privkey"); os.IsNotExist(err) { - privateKey, err = generatePrivateKey() - if err != nil { - logrus.Fatal(err) - } - - f, _ := os.Create(".bootstrap_privkey") - r, _ := privateKey.Raw() - _, err = f.Write(r) - if err != nil { - logrus.Fatal(err) - } - } else { - pkey, _ := ioutil.ReadFile(".bootstrap_privkey") - privateKey, _ = crypto.UnmarshalEd25519PrivateKey(pkey) - } - } else { - privateKey, err = generatePrivateKey() - if err != nil { - logrus.Fatal(err) - } - } - - node, err := NewNode(cfg, privateKey, DefaultPEXUpdateTime) - if err != nil { - logrus.Fatal(err) - } - - // log - if *verbose { - logrus.SetLevel(logrus.DebugLevel) - } else { - logrus.SetLevel(logrus.DebugLevel) - } - - //log.SetDebugLogging() - - //ctx, ctxCancel := context.WithCancel(context.Background()) - //node.GlobalCtx = ctx - //node.GlobalCtxCancel = ctxCancel - - err = node.Run(context.TODO()) - if err != nil { - logrus.Fatal(err) - } -} - -func generatePrivateKey() (crypto.PrivKey, error) { - r := rand.Reader - // Creates a new RSA key pair for this host. - prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 2048, r) - if err != nil { - return nil, err - } - return prvKey, nil + fx.New( + fx.Provide( + provideEventBus, + provideAppFlags, + provideConfig, + providePrivateKey, + provideLibp2pHost, + provideEthereumClient, + providePubsubRouter, + provideBootstrapAddrs, + providePeerDiscovery, + provideDrandBeacon, + provideMempool, + provideMiner, + provideBlockChain, + provideBlockPool, + provideSyncManager, + provideNetworkRPCHost, + provideNetworkService, + provideDirectRPCClient, + provideConsensusManager, + provideDisputeManager, + ), + fx.Invoke( + configureLogger, + configureDirectRPC, + configureForeignBlockchainRPC, + runNode, + ), + fx.NopLogger, + ).Run() } diff --git a/node/node_dep_providers.go b/node/node_dep_providers.go index e221b28..779ffe7 100644 --- a/node/node_dep_providers.go +++ b/node/node_dep_providers.go @@ -2,8 +2,20 @@ package node import ( "context" + "crypto/rand" + "flag" "fmt" - "time" + "io/ioutil" + "os" + "path" + "runtime" + + "github.com/Secured-Finance/dione/rpc" + "github.com/Secured-Finance/dione/rpc/filecoin" + solana2 "github.com/Secured-Finance/dione/rpc/solana" + rtypes "github.com/Secured-Finance/dione/rpc/types" + + "github.com/sirupsen/logrus" "github.com/asaskevich/EventBus" @@ -19,7 +31,6 @@ import ( "github.com/Secured-Finance/dione/blockchain/pool" - "github.com/Secured-Finance/dione/beacon" "github.com/Secured-Finance/dione/cache" "github.com/Secured-Finance/dione/config" "github.com/Secured-Finance/dione/consensus" @@ -28,13 +39,11 @@ import ( "github.com/Secured-Finance/dione/types" "github.com/Secured-Finance/dione/wallet" pex "github.com/Secured-Finance/go-libp2p-pex" - "github.com/ethereum/go-ethereum/common" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/discovery" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" - pubsub2 "github.com/libp2p/go-libp2p-pubsub" "github.com/multiformats/go-multiaddr" "golang.org/x/xerrors" ) @@ -56,21 +65,30 @@ func provideCache(config *config.Config) cache.Cache { return backend } -func provideDisputeManager(ctx context.Context, ethClient *ethclient.EthereumClient, pcm *consensus.PBFTConsensusManager, cfg *config.Config, bc *blockchain.BlockChain) (*consensus.DisputeManager, error) { - return consensus.NewDisputeManager(ctx, ethClient, pcm, cfg.Ethereum.DisputeVoteWindow, bc) -} - -func provideMiner(peerID peer.ID, ethAddress common.Address, ethClient *ethclient.EthereumClient, privateKey crypto.PrivKey, mempool *pool.Mempool) *blockchain.Miner { - return blockchain.NewMiner(peerID, ethAddress, ethClient, privateKey, mempool) -} - -func provideBeacon(ps *pubsub2.PubSub, bus EventBus.Bus) (beacon.BeaconNetwork, error) { - bc, err := drand2.NewDrandBeacon(ps, bus) +func provideDisputeManager(ethClient *ethclient.EthereumClient, pcm *consensus.PBFTConsensusManager, cfg *config.Config, bc *blockchain.BlockChain) *consensus.DisputeManager { + dm, err := consensus.NewDisputeManager(context.TODO(), ethClient, pcm, cfg.Ethereum.DisputeVoteWindow, bc) if err != nil { - return beacon.BeaconNetwork{}, fmt.Errorf("failed to setup drand beacon: %w", err) + logrus.Fatal(err) } - // NOTE: currently we use only one network - return beacon.BeaconNetwork{Start: config.DrandChainGenesisTime, Beacon: bc}, nil + + logrus.Info("Dispute subsystem has been initialized!") + + return dm +} + +func provideMiner(h host.Host, ethClient *ethclient.EthereumClient, privateKey crypto.PrivKey, mempool *pool.Mempool) *blockchain.Miner { + miner := blockchain.NewMiner(h.ID(), *ethClient.GetEthAddress(), ethClient, privateKey, mempool) + logrus.Info("Mining subsystem has been initialized!") + return miner +} + +func provideDrandBeacon(ps *pubsub.PubSubRouter, bus EventBus.Bus) *drand2.DrandBeacon { + db, err := drand2.NewDrandBeacon(ps.Pubsub, bus) + if err != nil { + logrus.Fatalf("Failed to setup drand beacon: %s", err) + } + logrus.Info("DRAND beacon subsystem has been initialized!") + return db } // FIXME: do we really need this? @@ -90,123 +108,287 @@ func provideWallet(peerID peer.ID, privKey []byte) (*wallet.LocalWallet, error) return w, nil } -func provideEthereumClient(config *config.Config) (*ethclient.EthereumClient, error) { +func provideEthereumClient(config *config.Config) *ethclient.EthereumClient { ethereum := ethclient.NewEthereumClient() err := ethereum.Initialize(&config.Ethereum) if err != nil { - return nil, err + logrus.Fatal(err) } - return ethereum, nil + + logrus.WithField("ethAddress", ethereum.GetEthAddress().Hex()).Info("Ethereum client has been initialized!") + + return ethereum } func providePubsubRouter(lhost host.Host, config *config.Config) *pubsub.PubSubRouter { - return pubsub.NewPubSubRouter(lhost, config.PubSub.ServiceTopicName, config.IsBootstrap) + psb := pubsub.NewPubSubRouter(lhost, config.PubSub.ServiceTopicName, config.IsBootstrap) + logrus.Info("PubSub subsystem has been initialized!") + return psb } func provideConsensusManager( + h host.Host, + cfg *config.Config, bus EventBus.Bus, psb *pubsub.PubSubRouter, miner *blockchain.Miner, bc *blockchain.BlockChain, ethClient *ethclient.EthereumClient, privateKey crypto.PrivKey, - minApprovals int, bp *pool.BlockPool, - b beacon.BeaconNetwork, + db *drand2.DrandBeacon, mp *pool.Mempool, - address peer.ID, ) *consensus.PBFTConsensusManager { - return consensus.NewPBFTConsensusManager( + c := consensus.NewPBFTConsensusManager( bus, psb, - minApprovals, + cfg.ConsensusMinApprovals, privateKey, ethClient, miner, bc, bp, - b, + db, mp, - address, + h.ID(), ) + logrus.Info("Consensus subsystem has been initialized!") + return c } -func provideLibp2pHost(config *config.Config, privateKey crypto.PrivKey) (host.Host, error) { +func provideLibp2pHost(config *config.Config, privateKey crypto.PrivKey) host.Host { listenMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", config.ListenAddr, config.ListenPort)) if err != nil { - return nil, xerrors.Errorf("failed to parse multiaddress: %v", err) + logrus.Fatalf("Failed to parse multiaddress: %s", err.Error()) } - host, err := libp2p.New( + libp2pHost, err := libp2p.New( context.TODO(), libp2p.ListenAddrs(listenMultiAddr), libp2p.Identity(privateKey), ) if err != nil { - return nil, xerrors.Errorf("failed to setup libp2p host: %v", err) + logrus.Fatal(err) } - return host, nil + logrus.WithField( + "multiaddress", + fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", + config.ListenAddr, + config.ListenPort, + libp2pHost.ID().Pretty(), + )).Info("Libp2p host has been initialized!") + + return libp2pHost } func provideNetworkRPCHost(h host.Host) *gorpc.Server { return gorpc.NewServer(h, DioneProtocolID) } -func provideBootstrapAddrs(c *config.Config) ([]multiaddr.Multiaddr, error) { +func provideBootstrapAddrs(c *config.Config) []multiaddr.Multiaddr { if c.IsBootstrap { - return nil, nil + return nil } var bootstrapMaddrs []multiaddr.Multiaddr for _, a := range c.BootstrapNodes { maddr, err := multiaddr.NewMultiaddr(a) if err != nil { - return nil, xerrors.Errorf("invalid multiaddress of bootstrap node: %v", err) + logrus.Fatalf("Invalid multiaddress of bootstrap node: %v", err) } bootstrapMaddrs = append(bootstrapMaddrs, maddr) } - return bootstrapMaddrs, nil + return bootstrapMaddrs } -func providePeerDiscovery(baddrs []multiaddr.Multiaddr, h host.Host, pexDiscoveryUpdateTime time.Duration) (discovery.Discovery, error) { - pexDiscovery, err := pex.NewPEXDiscovery(h, baddrs, pexDiscoveryUpdateTime) +func providePeerDiscovery(baddrs []multiaddr.Multiaddr, h host.Host) discovery.Discovery { + pexDiscovery, err := pex.NewPEXDiscovery(h, baddrs, DefaultPEXUpdateTime) if err != nil { - return nil, xerrors.Errorf("failed to setup pex pexDiscovery: %v", err) + logrus.Fatalf("Failed to setup libp2p PEX discovery: %s", err.Error()) } - return pexDiscovery, nil + logrus.Info("Peer discovery subsystem has been initialized!") + + return pexDiscovery } -func provideBlockChain(config *config.Config, bus EventBus.Bus, miner *blockchain.Miner, b beacon.BeaconAPI) (*blockchain.BlockChain, error) { - return blockchain.NewBlockChain(config.Blockchain.DatabasePath, bus, miner, b) +func provideBlockChain(config *config.Config, bus EventBus.Bus, miner *blockchain.Miner, db *drand2.DrandBeacon) *blockchain.BlockChain { + bc, err := blockchain.NewBlockChain(config.Blockchain.DatabasePath, bus, miner, db) + if err != nil { + logrus.Fatalf("Failed to initialize blockchain storage: %s", err.Error()) + } + logrus.Info("Blockchain storage has been successfully initialized!") + + return bc } -func provideMemPool(bus EventBus.Bus) (*pool.Mempool, error) { - return pool.NewMempool(bus) +func provideMempool(bus EventBus.Bus) *pool.Mempool { + mp, err := pool.NewMempool(bus) + if err != nil { + logrus.Fatalf("Failed to initialize mempool: %s", err.Error()) + } + + logrus.Info("Mempool has been successfully initialized!") + + return mp } -func provideSyncManager(bus EventBus.Bus, bp *blockchain.BlockChain, mp *pool.Mempool, r *gorpc.Client, bootstrap multiaddr.Multiaddr, psb *pubsub.PubSubRouter) (sync.SyncManager, error) { +func provideSyncManager( + bus EventBus.Bus, + bp *blockchain.BlockChain, + mp *pool.Mempool, + c *gorpc.Client, + bootstrapAddresses []multiaddr.Multiaddr, + psb *pubsub.PubSubRouter, +) sync.SyncManager { bootstrapPeerID := peer.ID("") - if bootstrap != nil { - addr, err := peer.AddrInfoFromP2pAddr(bootstrap) + + if bootstrapAddresses != nil { + addr, err := peer.AddrInfoFromP2pAddr(bootstrapAddresses[0]) // FIXME if err != nil { - return nil, err + logrus.Fatal(err) } bootstrapPeerID = addr.ID } - return sync.NewSyncManager(bus, bp, mp, r, bootstrapPeerID, psb), nil + sm := sync.NewSyncManager(bus, bp, mp, c, bootstrapPeerID, psb) + logrus.Info("Blockchain sync subsystem has been successfully initialized!") + + return sm } -func provideP2PRPCClient(h host.Host) *gorpc.Client { +func provideDirectRPCClient(h host.Host) *gorpc.Client { return gorpc.NewClient(h, DioneProtocolID) } func provideNetworkService(bp *blockchain.BlockChain, mp *pool.Mempool) *NetworkService { - return NewNetworkService(bp, mp) + ns := NewNetworkService(bp, mp) + logrus.Info("Direct RPC has been successfully initialized!") + return ns } -func provideBlockPool(mp *pool.Mempool, bus EventBus.Bus) (*pool.BlockPool, error) { - return pool.NewBlockPool(mp, bus) +func provideBlockPool(mp *pool.Mempool, bus EventBus.Bus) *pool.BlockPool { + bp, err := pool.NewBlockPool(mp, bus) + if err != nil { + logrus.Fatalf("Failed to initialize blockpool: %s", err.Error()) + } + logrus.Info("Blockpool has been successfully initialized!") + return bp +} + +func provideEventBus() EventBus.Bus { + return EventBus.New() +} + +func provideAppFlags() *AppFlags { + var flags AppFlags + + flag.StringVar(&flags.ConfigPath, "config", "", "Path to config") + flag.BoolVar(&flags.Verbose, "verbose", false, "Verbose logging") + + flag.Parse() + + return &flags +} + +func provideConfig(flags *AppFlags) *config.Config { + if flags.ConfigPath == "" { + logrus.Fatal("no config path provided") + + } + + cfg, err := config.NewConfig(flags.ConfigPath) + if err != nil { + logrus.Fatalf("failed to load config: %v", err) + } + + return cfg +} + +func providePrivateKey(cfg *config.Config) crypto.PrivKey { + var privateKey crypto.PrivKey + + if _, err := os.Stat(cfg.PrivateKeyPath); os.IsNotExist(err) { + privateKey, err = generatePrivateKey() + if err != nil { + logrus.Fatal(err) + } + + f, err := os.Create(cfg.PrivateKeyPath) + if err != nil { + logrus.Fatalf("Cannot create private key file: %s, ", err) + } + + r, err := privateKey.Raw() + if err != nil { + logrus.Fatal(err) + } + + _, err = f.Write(r) + if err != nil { + logrus.Fatal(err) + } + } else { + pkey, err := ioutil.ReadFile(cfg.PrivateKeyPath) + if err != nil { + logrus.Fatal(err) + } + + privateKey, err = crypto.UnmarshalEd25519PrivateKey(pkey) + if err != nil { + logrus.Fatal(err) + } + } + + return privateKey +} + +func generatePrivateKey() (crypto.PrivKey, error) { + r := rand.Reader + // Creates a new RSA key pair for this host. + prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 2048, r) + if err != nil { + return nil, err + } + return prvKey, nil +} + +func configureDirectRPC(rpcServer *gorpc.Server, ns *NetworkService) { + err := rpcServer.Register(ns) + if err != nil { + logrus.Fatal(err) + } +} + +func configureLogger(flags *AppFlags) { + logrus.SetReportCaller(true) + logrus.SetFormatter(&logrus.TextFormatter{ + FullTimestamp: true, + CallerPrettyfier: func(f *runtime.Frame) (string, string) { + filename := path.Base(f.File) + return "", fmt.Sprintf("%s:%d:", filename, f.Line) + }, + }) + + if flags.Verbose { + logrus.SetLevel(logrus.DebugLevel) + } else { + logrus.SetLevel(logrus.InfoLevel) + } +} + +func configureForeignBlockchainRPC() { + fc := filecoin.NewLotusClient() + rpc.RegisterRPC(rtypes.RPCTypeFilecoin, map[string]func(string) ([]byte, error){ + "getTransaction": fc.GetTransaction, + "getBlock": fc.GetBlock, + }) + + sl := solana2.NewSolanaClient() + rpc.RegisterRPC(rtypes.RPCTypeSolana, map[string]func(string) ([]byte, error){ + "getTransaction": sl.GetTransaction, + }) + + logrus.Info("Foreign Blockchain RPC clients has been successfully configured!") } diff --git a/pubsub/pubsub_router.go b/pubsub/pubsub_router.go index 4653583..0d547ad 100644 --- a/pubsub/pubsub_router.go +++ b/pubsub/pubsub_router.go @@ -29,10 +29,11 @@ func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubR ctx, ctxCancel := context.WithCancel(context.Background()) psr := &PubSubRouter{ - node: h, - context: ctx, - contextCancel: ctxCancel, - handlers: make(map[PubSubMessageType][]Handler), + node: h, + context: ctx, + contextCancel: ctxCancel, + handlers: make(map[PubSubMessageType][]Handler), + oracleTopicName: oracleTopic, } var pbOptions []pubsub.Option @@ -61,7 +62,6 @@ func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubR logrus.Fatalf("Error occurred when initializing PubSub subsystem: %v", err) } - psr.oracleTopicName = oracleTopic topic, err := pb.Join(oracleTopic) if err != nil { logrus.Fatalf("Error occurred when subscribing to service topic: %v", err) @@ -72,6 +72,10 @@ func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubR psr.Pubsub = pb psr.oracleTopic = topic + return psr +} + +func (psr *PubSubRouter) Run() { go func() { for { select { @@ -79,7 +83,7 @@ func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubR return default: { - msg, err := subscription.Next(psr.context) + msg, err := psr.serviceSubscription.Next(psr.context) if err != nil { logrus.Warnf("Failed to receive pubsub message: %v", err) } @@ -88,8 +92,6 @@ func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubR } } }() - - return psr } func (psr *PubSubRouter) handleMessage(p *pubsub.Message) {