Integrate Uber's Fx DI framework, refactor node init code massively

This commit is contained in:
ChronosX88 2021-07-22 00:56:58 +03:00
parent 550d69fb26
commit 5e0c7f02fa
Signed by: ChronosXYZ
GPG Key ID: 085A69A82C8C511A
13 changed files with 426 additions and 413 deletions

View File

@ -83,20 +83,23 @@ func NewDrandBeacon(ps *pubsub.PubSub, bus EventBus.Bus) (*DrandBeacon, error) {
DrandClient: drandClient,
localCache: make(map[uint64]types.BeaconEntry),
bus: bus,
PublicKey: drandChain.PublicKey,
}
db.PublicKey = drandChain.PublicKey
db.drandResultChannel = db.DrandClient.Watch(context.TODO())
err = db.getLatestDrandResult()
if err != nil {
return nil, err
}
go db.loop(context.TODO())
return db, nil
}
func (db *DrandBeacon) Run(ctx context.Context) error {
db.drandResultChannel = db.DrandClient.Watch(ctx)
err := db.getLatestDrandResult()
if err != nil {
return err
}
go db.loop(ctx)
return nil
}
func (db *DrandBeacon) getLatestDrandResult() error {
latestDround, err := db.DrandClient.Get(context.TODO(), 0)
if err != nil {
@ -113,6 +116,7 @@ func (db *DrandBeacon) loop(ctx context.Context) {
select {
case <-ctx.Done():
{
logrus.Debug("Stopping watching new DRAND entries...")
return
}
case res := <-db.drandResultChannel:

View File

@ -10,6 +10,8 @@ import (
"os"
"sync"
drand2 "github.com/Secured-Finance/dione/beacon/drand"
"github.com/Secured-Finance/dione/beacon"
"github.com/Secured-Finance/dione/consensus/validation"
@ -49,14 +51,14 @@ type BlockChain struct {
bus EventBus.Bus
miner *Miner
b beacon.BeaconAPI
drandBeacon *drand2.DrandBeacon
}
func NewBlockChain(path string, bus EventBus.Bus, miner *Miner, b beacon.BeaconAPI) (*BlockChain, error) {
func NewBlockChain(path string, bus EventBus.Bus, miner *Miner, db *drand2.DrandBeacon) (*BlockChain, error) {
chain := &BlockChain{
bus: bus,
miner: miner,
b: b,
drandBeacon: db,
}
// configure lmdb env
@ -357,7 +359,7 @@ func (bc *BlockChain) ValidateBlock(block *types2.Block) error {
return err
}
res, err := bc.b.Entry(context.TODO(), block.Header.ElectionProof.RandomnessRound)
res, err := bc.drandBeacon.Entry(context.TODO(), block.Header.ElectionProof.RandomnessRound)
if err != nil {
return err
}

View File

@ -37,7 +37,9 @@ import (
gorpc "github.com/libp2p/go-libp2p-gorpc"
)
type SyncManager interface{}
type SyncManager interface {
Run()
}
type syncManager struct {
blockpool *blockchain.BlockChain
@ -66,16 +68,18 @@ func NewSyncManager(bus EventBus.Bus, bc *blockchain.BlockChain, mp *pool.Mempoo
psb: psb,
}
psb.Hook(pubsub.NewTxMessageType, sm.onNewTransaction)
psb.Hook(pubsub.NewBlockMessageType, sm.onNewBlock)
return sm
}
func (sm *syncManager) Run() {
sm.psb.Hook(pubsub.NewTxMessageType, sm.onNewTransaction)
sm.psb.Hook(pubsub.NewBlockMessageType, sm.onNewBlock)
go func() {
if err := sm.initialSync(); err != nil {
logrus.Error(err)
}
}()
return sm
}
func (sm *syncManager) initialSync() error {

View File

@ -20,6 +20,7 @@ type Config struct {
Redis RedisConfig `mapstructure:"redis"`
CacheType string `mapstructure:"cache_type"`
Blockchain BlockchainConfig `mapstructure:"blockchain"`
PrivateKeyPath string `mapstructure:"private_key_path"`
}
type EthereumConfig struct {

View File

@ -7,9 +7,9 @@ import (
"math/big"
"sync"
"github.com/libp2p/go-libp2p-core/peer"
drand2 "github.com/Secured-Finance/dione/beacon/drand"
"github.com/Secured-Finance/dione/beacon"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/fxamacker/cbor/v2"
@ -82,14 +82,14 @@ func NewPBFTConsensusManager(
miner *blockchain.Miner,
bc *blockchain.BlockChain,
bp *pool.BlockPool,
b beacon.BeaconNetwork,
db *drand2.DrandBeacon,
mempool *pool.Mempool,
address peer.ID,
) *PBFTConsensusManager {
pcm := &PBFTConsensusManager{
psb: psb,
miner: miner,
validator: NewConsensusValidator(miner, bc, b),
validator: NewConsensusValidator(miner, bc, db),
msgLog: NewConsensusMessageLog(),
minApprovals: minApprovals,
privKey: privKey,
@ -105,15 +105,18 @@ func NewPBFTConsensusManager(
address: address,
}
return pcm
}
func (pcm *PBFTConsensusManager) Run() {
pcm.psb.Hook(pubsub.PrePrepareMessageType, pcm.handlePrePrepare)
pcm.psb.Hook(pubsub.PrepareMessageType, pcm.handlePrepare)
pcm.psb.Hook(pubsub.CommitMessageType, pcm.handleCommit)
bus.SubscribeAsync("beacon:newEntry", func(entry types2.BeaconEntry) {
pcm.bus.SubscribeAsync("beacon:newEntry", func(entry types2.BeaconEntry) {
pcm.onNewBeaconEntry(entry)
}, true)
height, _ := pcm.blockchain.GetLatestBlockHeight()
pcm.state.blockHeight = height + 1
return pcm
}
func (pcm *PBFTConsensusManager) propose(blk *types3.Block) error {
@ -194,7 +197,7 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) {
}
if _, err := pcm.blockPool.GetBlock(cmsg.Blockhash); errors.Is(err, cache.ErrNotFound) {
logrus.WithField("blockHash", cmsg.Blockhash).Warnf("received unknown block %x", cmsg.Blockhash)
logrus.WithField("blockHash", hex.EncodeToString(cmsg.Blockhash)).Warnf("received unknown block %x", cmsg.Blockhash)
return
}

View File

@ -3,7 +3,8 @@ package consensus
import (
"encoding/hex"
"github.com/Secured-Finance/dione/beacon"
drand2 "github.com/Secured-Finance/dione/beacon/drand"
"github.com/sirupsen/logrus"
"github.com/Secured-Finance/dione/blockchain"
@ -13,15 +14,15 @@ import (
type ConsensusValidator struct {
validationFuncMap map[types2.ConsensusMessageType]func(msg types2.ConsensusMessage) bool
miner *blockchain.Miner
beacon beacon.BeaconNetwork
beacon *drand2.DrandBeacon
blockchain *blockchain.BlockChain
}
func NewConsensusValidator(miner *blockchain.Miner, bc *blockchain.BlockChain, b beacon.BeaconNetwork) *ConsensusValidator {
func NewConsensusValidator(miner *blockchain.Miner, bc *blockchain.BlockChain, db *drand2.DrandBeacon) *ConsensusValidator {
cv := &ConsensusValidator{
miner: miner,
blockchain: bc,
beacon: b,
beacon: db,
}
cv.validationFuncMap = map[types2.ConsensusMessageType]func(msg types2.ConsensusMessage) bool{

View File

@ -5,6 +5,8 @@ import (
"encoding/hex"
"time"
"github.com/ethereum/go-ethereum/event"
types2 "github.com/Secured-Finance/dione/blockchain/types"
"github.com/Secured-Finance/dione/types"
@ -28,15 +30,21 @@ type DisputeManager struct {
disputeMap map[string]*dioneDispute.DioneDisputeNewDispute
voteWindow time.Duration
blockchain *blockchain.BlockChain
submissionChan chan *dioneOracle.DioneOracleSubmittedOracleRequest
submissionSubscription event.Subscription
disputesChan chan *dioneDispute.DioneDisputeNewDispute
disputesSubscription event.Subscription
}
func NewDisputeManager(ctx context.Context, ethClient *ethclient.EthereumClient, pcm *PBFTConsensusManager, voteWindow int, bc *blockchain.BlockChain) (*DisputeManager, error) {
newSubmittionsChan, submSubscription, err := ethClient.SubscribeOnNewSubmittions(ctx)
submissionChan, submSubscription, err := ethClient.SubscribeOnNewSubmittions(ctx)
if err != nil {
return nil, err
}
newDisputesChan, dispSubscription, err := ethClient.SubscribeOnNewDisputes(ctx)
disputesChan, dispSubscription, err := ethClient.SubscribeOnNewDisputes(ctx)
if err != nil {
return nil, err
}
@ -49,30 +57,36 @@ func NewDisputeManager(ctx context.Context, ethClient *ethclient.EthereumClient,
disputeMap: map[string]*dioneDispute.DioneDisputeNewDispute{},
voteWindow: time.Duration(voteWindow) * time.Second,
blockchain: bc,
submissionChan: submissionChan,
submissionSubscription: submSubscription,
disputesChan: disputesChan,
disputesSubscription: dispSubscription,
}
return dm, nil
}
func (dm *DisputeManager) Run(ctx context.Context) {
go func() {
for {
select {
case <-ctx.Done():
{
submSubscription.Unsubscribe()
dispSubscription.Unsubscribe()
dm.disputesSubscription.Unsubscribe()
dm.disputesSubscription.Unsubscribe()
return
}
case s := <-newSubmittionsChan:
case s := <-dm.submissionChan:
{
dm.onNewSubmission(s)
}
case d := <-newDisputesChan:
case d := <-dm.disputesChan:
{
dm.onNewDispute(d)
}
}
}
}()
return dm, nil
}
func (dm *DisputeManager) onNewSubmission(submission *dioneOracle.DioneOracleSubmittedOracleRequest) {

2
go.mod
View File

@ -16,7 +16,6 @@ require (
github.com/ethereum/go-ethereum v1.9.25
github.com/filecoin-project/go-address v0.0.5
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
github.com/filecoin-project/go-state-types v0.1.0
github.com/filecoin-project/lotus v1.6.0
github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 // indirect
github.com/fxamacker/cbor/v2 v2.3.0
@ -57,6 +56,7 @@ require (
github.com/valyala/fasthttp v1.17.0
github.com/wealdtech/go-merkletree v1.0.1-0.20190605192610-2bb163c2ea2a
github.com/whyrusleeping/cbor-gen v0.0.0-20210219115102-f37d292932f2
go.uber.org/fx v1.13.1
go.uber.org/multierr v1.7.0 // indirect
go.uber.org/zap v1.17.0
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97

5
go.sum
View File

@ -1820,8 +1820,12 @@ go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/dig v1.10.0 h1:yLmDDj9/zuDjv3gz8GQGviXMs9TfysIUMUilCpgzUJY=
go.uber.org/dig v1.10.0/go.mod h1:X34SnWGr8Fyla9zQNO2GSO2D+TIuqB14OS8JhYocIyw=
go.uber.org/fx v1.9.0/go.mod h1:mFdUyAUuJ3w4jAckiKSKbldsxy1ojpAMJ+dVZg5Y0Aw=
go.uber.org/fx v1.13.1 h1:CFNTr1oin5OJ0VCZ8EycL3wzF29Jz2g0xe55RFsf2a4=
go.uber.org/fx v1.13.1/go.mod h1:bREWhavnedxpJeTq9pQT53BbvwhUv7TcpsOqcH4a+3w=
go.uber.org/goleak v0.10.0/go.mod h1:VCZuO8V8mFPlL0F5J5GK1rtHV3DrFcQ1R8ryq7FK0aI=
go.uber.org/goleak v1.0.0 h1:qsup4IcBdlmsnGfqyLl4Ntn3C2XCCuKAE7DwHpScyUo=
go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
@ -2129,6 +2133,7 @@ golang.org/x/tools v0.0.0-20191030062658-86caa796c7ab/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191113191852-77e3bb0ad9e7/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191114200427-caa0b0f7d508/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191115202509-3a792d9c32b2/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=

6
node/flags.go Normal file
View File

@ -0,0 +1,6 @@
package node
type AppFlags struct {
ConfigPath string
Verbose bool
}

View File

@ -2,20 +2,17 @@ package node
import (
"context"
"crypto/rand"
"flag"
"fmt"
"io/ioutil"
"os"
"path"
"runtime"
"time"
"github.com/Secured-Finance/dione/blockchain"
drand2 "github.com/Secured-Finance/dione/beacon/drand"
"github.com/multiformats/go-multiaddr"
"github.com/Secured-Finance/dione/pubsub"
"github.com/asaskevich/EventBus"
"github.com/Secured-Finance/dione/consensus"
"github.com/Secured-Finance/dione/blockchain/sync"
"go.uber.org/fx"
"github.com/fxamacker/cbor/v2"
@ -23,31 +20,16 @@ import (
types2 "github.com/Secured-Finance/dione/blockchain/types"
gorpc "github.com/libp2p/go-libp2p-gorpc"
"github.com/Secured-Finance/dione/blockchain/pool"
"github.com/Secured-Finance/dione/blockchain/sync"
"github.com/Secured-Finance/dione/consensus"
pubsub2 "github.com/Secured-Finance/dione/pubsub"
"github.com/libp2p/go-libp2p-core/discovery"
"github.com/Secured-Finance/dione/rpc"
rtypes "github.com/Secured-Finance/dione/rpc/types"
solana2 "github.com/Secured-Finance/dione/rpc/solana"
"github.com/Secured-Finance/dione/rpc/filecoin"
"golang.org/x/xerrors"
"github.com/Secured-Finance/dione/beacon"
"github.com/Secured-Finance/dione/config"
"github.com/Secured-Finance/dione/ethclient"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
"github.com/sirupsen/logrus"
)
@ -56,192 +38,61 @@ const (
DefaultPEXUpdateTime = 6 * time.Second
)
type Node struct {
Host host.Host
PeerDiscovery discovery.Discovery
PubSubRouter *pubsub2.PubSubRouter
GlobalCtx context.Context
GlobalCtxCancel context.CancelFunc
Config *config.Config
Ethereum *ethclient.EthereumClient
ConsensusManager *consensus.PBFTConsensusManager
Miner *blockchain.Miner
Beacon beacon.BeaconNetwork
DisputeManager *consensus.DisputeManager
BlockPool *pool.BlockPool
MemPool *pool.Mempool
BlockChain *blockchain.BlockChain
SyncManager sync.SyncManager
NetworkService *NetworkService
NetworkRPCHost *gorpc.Server
Bus EventBus.Bus
//Cache cache.Cache
//Wallet *wallet.LocalWallet
}
func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTime time.Duration) (*Node, error) {
n := &Node{
Config: config,
}
bus := EventBus.New()
n.Bus = bus
// initialize libp2p host
lhost, err := provideLibp2pHost(n.Config, prvKey)
if err != nil {
logrus.Fatal(err)
}
n.Host = lhost
logrus.WithField(
"multiaddress",
fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s",
n.Config.ListenAddr,
n.Config.ListenPort,
n.Host.ID().Pretty(),
)).Info("Libp2p host has been initialized!")
// initialize ethereum client
ethClient, err := provideEthereumClient(n.Config)
if err != nil {
logrus.WithField("err", err.Error()).Fatal("Failed to initialize Ethereum client")
}
n.Ethereum = ethClient
//goland:noinspection ALL
logrus.WithField("ethAddress", ethClient.GetEthAddress().Hex()).Info("Ethereum client has been initialized!")
// initialize blockchain rpc clients
err = n.setupRPCClients()
if err != nil {
logrus.Fatal(err)
}
logrus.Info("Foreign Blockchain RPC clients has been successfully configured!")
// initialize pubsub subsystem
psb := providePubsubRouter(lhost, n.Config)
n.PubSubRouter = psb
logrus.Info("PubSub subsystem has been initialized!")
// get list of bootstrap multiaddresses
baddrs, err := provideBootstrapAddrs(n.Config)
if err != nil {
logrus.Fatal(err)
}
// initialize peer discovery
peerDiscovery, err := providePeerDiscovery(baddrs, lhost, pexDiscoveryUpdateTime)
if err != nil {
logrus.Fatal(err)
}
n.PeerDiscovery = peerDiscovery
logrus.Info("Peer discovery subsystem has been initialized!")
// initialize random beacon network subsystem
randomBeaconNetwork, err := provideBeacon(psb.Pubsub, bus)
if err != nil {
logrus.Fatal(err)
}
n.Beacon = randomBeaconNetwork
logrus.Info("Random beacon subsystem has been initialized!")
// == initialize blockchain modules
// initialize mempool
mp, err := provideMemPool(bus)
if err != nil {
logrus.Fatalf("Failed to initialize mempool: %s", err.Error())
}
n.MemPool = mp
logrus.Info("Mempool has been successfully initialized!")
// initialize mining subsystem
miner := provideMiner(n.Host.ID(), *n.Ethereum.GetEthAddress(), n.Ethereum, prvKey, mp)
n.Miner = miner
logrus.Info("Mining subsystem has been initialized!")
// initialize blockpool database
bc, err := provideBlockChain(n.Config, bus, miner, randomBeaconNetwork.Beacon)
if err != nil {
logrus.Fatalf("Failed to initialize blockpool: %s", err.Error())
}
n.BlockChain = bc
logrus.Info("Block pool database has been successfully initialized!")
bp, err := provideBlockPool(mp, bus)
if err != nil {
logrus.Fatalf("Failed to initialize blockpool: %s", err.Error())
}
n.BlockPool = bp
logrus.Info("Blockpool has been successfully initialized!")
ns := provideNetworkService(bc, mp)
n.NetworkService = ns
rpcHost := provideNetworkRPCHost(lhost)
err = rpcHost.Register(ns)
if err != nil {
logrus.Fatal(err)
}
logrus.Info("Direct RPC has been successfully initialized!")
// initialize libp2p-gorpc client
r := provideP2PRPCClient(lhost)
// initialize sync manager
var baddr multiaddr.Multiaddr
if len(baddrs) == 0 {
baddr = nil
} else {
baddr = baddrs[0]
}
sm, err := provideSyncManager(bus, bc, mp, r, baddr, psb) // FIXME here we just pick up first bootstrap in list
if err != nil {
logrus.Fatal(err)
}
n.SyncManager = sm
logrus.Info("Blockchain sync subsystem has been successfully initialized!")
// initialize consensus subsystem
consensusManager := provideConsensusManager(bus, psb, miner, bc, ethClient, prvKey, n.Config.ConsensusMinApprovals, bp, randomBeaconNetwork, mp, n.Host.ID())
n.ConsensusManager = consensusManager
logrus.Info("Consensus subsystem has been initialized!")
// initialize dispute subsystem
disputeManager, err := provideDisputeManager(context.TODO(), ethClient, consensusManager, config, bc)
if err != nil {
logrus.Fatal(err)
}
n.DisputeManager = disputeManager
logrus.Info("Dispute subsystem has been initialized!")
// initialize internal eth wallet
//w, err := provideWallet(n.Host.ID(), rawPrivKey)
//if err != nil {
// logrus.Fatal(err)
//}
//n.Wallet = w
return n, nil
}
func (n *Node) Run(ctx context.Context) error {
err := n.runLibp2pAsync(ctx)
func runNode(
lc fx.Lifecycle,
cfg *config.Config,
disco discovery.Discovery,
ethClient *ethclient.EthereumClient,
h host.Host,
mp *pool.Mempool,
syncManager sync.SyncManager,
consensusManager *consensus.PBFTConsensusManager,
pubSubRouter *pubsub.PubSubRouter,
disputeManager *consensus.DisputeManager,
db *drand2.DrandBeacon,
) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
err := runLibp2pAsync(context.TODO(), h, cfg, disco)
if err != nil {
return err
}
n.subscribeOnEthContractsAsync(ctx)
for {
select {
case <-ctx.Done():
err = db.Run(context.TODO())
if err != nil {
return err
}
// Run pubsub router
pubSubRouter.Run()
// Subscribe on new requests event channel from Ethereum
err = subscribeOnEthContractsAsync(context.TODO(), ethClient, mp)
if err != nil {
return err
}
// Run blockchain sync manager
syncManager.Run()
// Run consensus manager
consensusManager.Run()
// Run dispute manager
disputeManager.Run(context.TODO())
return nil
}
}
},
OnStop: func(ctx context.Context) error {
// TODO
return nil
},
})
}
func (n *Node) runLibp2pAsync(ctx context.Context) error {
func runLibp2pAsync(ctx context.Context, h host.Host, cfg *config.Config, disco discovery.Discovery) error {
logrus.Info("Announcing ourselves...")
_, err := n.PeerDiscovery.Advertise(context.TODO(), n.Config.Rendezvous)
_, err := disco.Advertise(context.TODO(), cfg.Rendezvous)
if err != nil {
return xerrors.Errorf("failed to announce this node to the network: %v", err)
}
@ -249,7 +100,7 @@ func (n *Node) runLibp2pAsync(ctx context.Context) error {
// Discover unbounded count of peers
logrus.Info("Searching for other peers...")
peerChan, err := n.PeerDiscovery.FindPeers(context.TODO(), n.Config.Rendezvous)
peerChan, err := disco.FindPeers(context.TODO(), cfg.Rendezvous)
if err != nil {
return xerrors.Errorf("failed to find new peers: %v", err)
}
@ -264,12 +115,12 @@ func (n *Node) runLibp2pAsync(ctx context.Context) error {
if len(newPeer.Addrs) == 0 {
continue
}
if newPeer.ID.String() == n.Host.ID().String() {
if newPeer.ID.String() == h.ID().String() {
continue
}
logrus.WithField("peer", newPeer.ID).Info("Discovered new peer, connecting...")
// Connect to the peer
if err := n.Host.Connect(ctx, newPeer); err != nil {
if err := h.Connect(ctx, newPeer); err != nil {
logrus.WithFields(logrus.Fields{
"peer": newPeer.ID,
"err": err.Error(),
@ -283,10 +134,10 @@ func (n *Node) runLibp2pAsync(ctx context.Context) error {
return nil
}
func (n *Node) subscribeOnEthContractsAsync(ctx context.Context) {
eventChan, subscription, err := n.Ethereum.SubscribeOnOracleEvents(ctx)
func subscribeOnEthContractsAsync(ctx context.Context, ethClient *ethclient.EthereumClient, mp *pool.Mempool) error {
eventChan, subscription, err := ethClient.SubscribeOnOracleEvents(ctx)
if err != nil {
logrus.Fatal("Couldn't subscribe on ethereum contracts, exiting... ", err)
return err
}
go func() {
@ -318,7 +169,7 @@ func (n *Node) subscribeOnEthContractsAsync(ctx context.Context) {
continue
}
tx := types2.CreateTransaction(data)
err = n.MemPool.StoreTx(tx)
err = mp.StoreTx(tx)
if err != nil {
logrus.Errorf("Failed to store tx in mempool: %s", err.Error())
continue
@ -326,107 +177,45 @@ func (n *Node) subscribeOnEthContractsAsync(ctx context.Context) {
}
case <-ctx.Done():
break EventLoop
case <-subscription.Err():
logrus.Fatal("Error with ethereum subscription, exiting... ", err)
case err := <-subscription.Err():
logrus.Fatalf("Error has occurred in subscription to Ethereum event channel: %s", err.Error())
}
}
}()
}
func (n *Node) setupRPCClients() error {
fc := filecoin.NewLotusClient()
rpc.RegisterRPC(rtypes.RPCTypeFilecoin, map[string]func(string) ([]byte, error){
"getTransaction": fc.GetTransaction,
"getBlock": fc.GetBlock,
})
sl := solana2.NewSolanaClient()
rpc.RegisterRPC(rtypes.RPCTypeSolana, map[string]func(string) ([]byte, error){
"getTransaction": sl.GetTransaction,
})
return nil
}
func Start() {
logrus.SetReportCaller(true)
logrus.SetFormatter(&logrus.TextFormatter{
FullTimestamp: true,
CallerPrettyfier: func(f *runtime.Frame) (string, string) {
filename := path.Base(f.File)
return "", fmt.Sprintf("%s:%d:", filename, f.Line)
},
})
configPath := flag.String("config", "", "Path to config")
verbose := flag.Bool("verbose", false, "Verbose logging")
flag.Parse()
if *configPath == "" {
logrus.Fatal("no config path provided")
}
cfg, err := config.NewConfig(*configPath)
if err != nil {
logrus.Fatalf("failed to load config: %v", err)
}
var privateKey crypto.PrivKey
if cfg.IsBootstrap {
// FIXME just a little hack
if _, err := os.Stat(".bootstrap_privkey"); os.IsNotExist(err) {
privateKey, err = generatePrivateKey()
if err != nil {
logrus.Fatal(err)
}
f, _ := os.Create(".bootstrap_privkey")
r, _ := privateKey.Raw()
_, err = f.Write(r)
if err != nil {
logrus.Fatal(err)
}
} else {
pkey, _ := ioutil.ReadFile(".bootstrap_privkey")
privateKey, _ = crypto.UnmarshalEd25519PrivateKey(pkey)
}
} else {
privateKey, err = generatePrivateKey()
if err != nil {
logrus.Fatal(err)
}
}
node, err := NewNode(cfg, privateKey, DefaultPEXUpdateTime)
if err != nil {
logrus.Fatal(err)
}
// log
if *verbose {
logrus.SetLevel(logrus.DebugLevel)
} else {
logrus.SetLevel(logrus.DebugLevel)
}
//log.SetDebugLogging()
//ctx, ctxCancel := context.WithCancel(context.Background())
//node.GlobalCtx = ctx
//node.GlobalCtxCancel = ctxCancel
err = node.Run(context.TODO())
if err != nil {
logrus.Fatal(err)
}
}
func generatePrivateKey() (crypto.PrivKey, error) {
r := rand.Reader
// Creates a new RSA key pair for this host.
prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 2048, r)
if err != nil {
return nil, err
}
return prvKey, nil
fx.New(
fx.Provide(
provideEventBus,
provideAppFlags,
provideConfig,
providePrivateKey,
provideLibp2pHost,
provideEthereumClient,
providePubsubRouter,
provideBootstrapAddrs,
providePeerDiscovery,
provideDrandBeacon,
provideMempool,
provideMiner,
provideBlockChain,
provideBlockPool,
provideSyncManager,
provideNetworkRPCHost,
provideNetworkService,
provideDirectRPCClient,
provideConsensusManager,
provideDisputeManager,
),
fx.Invoke(
configureLogger,
configureDirectRPC,
configureForeignBlockchainRPC,
runNode,
),
fx.NopLogger,
).Run()
}

View File

@ -2,8 +2,20 @@ package node
import (
"context"
"crypto/rand"
"flag"
"fmt"
"time"
"io/ioutil"
"os"
"path"
"runtime"
"github.com/Secured-Finance/dione/rpc"
"github.com/Secured-Finance/dione/rpc/filecoin"
solana2 "github.com/Secured-Finance/dione/rpc/solana"
rtypes "github.com/Secured-Finance/dione/rpc/types"
"github.com/sirupsen/logrus"
"github.com/asaskevich/EventBus"
@ -19,7 +31,6 @@ import (
"github.com/Secured-Finance/dione/blockchain/pool"
"github.com/Secured-Finance/dione/beacon"
"github.com/Secured-Finance/dione/cache"
"github.com/Secured-Finance/dione/config"
"github.com/Secured-Finance/dione/consensus"
@ -28,13 +39,11 @@ import (
"github.com/Secured-Finance/dione/types"
"github.com/Secured-Finance/dione/wallet"
pex "github.com/Secured-Finance/go-libp2p-pex"
"github.com/ethereum/go-ethereum/common"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
pubsub2 "github.com/libp2p/go-libp2p-pubsub"
"github.com/multiformats/go-multiaddr"
"golang.org/x/xerrors"
)
@ -56,21 +65,30 @@ func provideCache(config *config.Config) cache.Cache {
return backend
}
func provideDisputeManager(ctx context.Context, ethClient *ethclient.EthereumClient, pcm *consensus.PBFTConsensusManager, cfg *config.Config, bc *blockchain.BlockChain) (*consensus.DisputeManager, error) {
return consensus.NewDisputeManager(ctx, ethClient, pcm, cfg.Ethereum.DisputeVoteWindow, bc)
}
func provideMiner(peerID peer.ID, ethAddress common.Address, ethClient *ethclient.EthereumClient, privateKey crypto.PrivKey, mempool *pool.Mempool) *blockchain.Miner {
return blockchain.NewMiner(peerID, ethAddress, ethClient, privateKey, mempool)
}
func provideBeacon(ps *pubsub2.PubSub, bus EventBus.Bus) (beacon.BeaconNetwork, error) {
bc, err := drand2.NewDrandBeacon(ps, bus)
func provideDisputeManager(ethClient *ethclient.EthereumClient, pcm *consensus.PBFTConsensusManager, cfg *config.Config, bc *blockchain.BlockChain) *consensus.DisputeManager {
dm, err := consensus.NewDisputeManager(context.TODO(), ethClient, pcm, cfg.Ethereum.DisputeVoteWindow, bc)
if err != nil {
return beacon.BeaconNetwork{}, fmt.Errorf("failed to setup drand beacon: %w", err)
logrus.Fatal(err)
}
// NOTE: currently we use only one network
return beacon.BeaconNetwork{Start: config.DrandChainGenesisTime, Beacon: bc}, nil
logrus.Info("Dispute subsystem has been initialized!")
return dm
}
func provideMiner(h host.Host, ethClient *ethclient.EthereumClient, privateKey crypto.PrivKey, mempool *pool.Mempool) *blockchain.Miner {
miner := blockchain.NewMiner(h.ID(), *ethClient.GetEthAddress(), ethClient, privateKey, mempool)
logrus.Info("Mining subsystem has been initialized!")
return miner
}
func provideDrandBeacon(ps *pubsub.PubSubRouter, bus EventBus.Bus) *drand2.DrandBeacon {
db, err := drand2.NewDrandBeacon(ps.Pubsub, bus)
if err != nil {
logrus.Fatalf("Failed to setup drand beacon: %s", err)
}
logrus.Info("DRAND beacon subsystem has been initialized!")
return db
}
// FIXME: do we really need this?
@ -90,123 +108,287 @@ func provideWallet(peerID peer.ID, privKey []byte) (*wallet.LocalWallet, error)
return w, nil
}
func provideEthereumClient(config *config.Config) (*ethclient.EthereumClient, error) {
func provideEthereumClient(config *config.Config) *ethclient.EthereumClient {
ethereum := ethclient.NewEthereumClient()
err := ethereum.Initialize(&config.Ethereum)
if err != nil {
return nil, err
logrus.Fatal(err)
}
return ethereum, nil
logrus.WithField("ethAddress", ethereum.GetEthAddress().Hex()).Info("Ethereum client has been initialized!")
return ethereum
}
func providePubsubRouter(lhost host.Host, config *config.Config) *pubsub.PubSubRouter {
return pubsub.NewPubSubRouter(lhost, config.PubSub.ServiceTopicName, config.IsBootstrap)
psb := pubsub.NewPubSubRouter(lhost, config.PubSub.ServiceTopicName, config.IsBootstrap)
logrus.Info("PubSub subsystem has been initialized!")
return psb
}
func provideConsensusManager(
h host.Host,
cfg *config.Config,
bus EventBus.Bus,
psb *pubsub.PubSubRouter,
miner *blockchain.Miner,
bc *blockchain.BlockChain,
ethClient *ethclient.EthereumClient,
privateKey crypto.PrivKey,
minApprovals int,
bp *pool.BlockPool,
b beacon.BeaconNetwork,
db *drand2.DrandBeacon,
mp *pool.Mempool,
address peer.ID,
) *consensus.PBFTConsensusManager {
return consensus.NewPBFTConsensusManager(
c := consensus.NewPBFTConsensusManager(
bus,
psb,
minApprovals,
cfg.ConsensusMinApprovals,
privateKey,
ethClient,
miner,
bc,
bp,
b,
db,
mp,
address,
h.ID(),
)
logrus.Info("Consensus subsystem has been initialized!")
return c
}
func provideLibp2pHost(config *config.Config, privateKey crypto.PrivKey) (host.Host, error) {
func provideLibp2pHost(config *config.Config, privateKey crypto.PrivKey) host.Host {
listenMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", config.ListenAddr, config.ListenPort))
if err != nil {
return nil, xerrors.Errorf("failed to parse multiaddress: %v", err)
logrus.Fatalf("Failed to parse multiaddress: %s", err.Error())
}
host, err := libp2p.New(
libp2pHost, err := libp2p.New(
context.TODO(),
libp2p.ListenAddrs(listenMultiAddr),
libp2p.Identity(privateKey),
)
if err != nil {
return nil, xerrors.Errorf("failed to setup libp2p host: %v", err)
logrus.Fatal(err)
}
return host, nil
logrus.WithField(
"multiaddress",
fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s",
config.ListenAddr,
config.ListenPort,
libp2pHost.ID().Pretty(),
)).Info("Libp2p host has been initialized!")
return libp2pHost
}
func provideNetworkRPCHost(h host.Host) *gorpc.Server {
return gorpc.NewServer(h, DioneProtocolID)
}
func provideBootstrapAddrs(c *config.Config) ([]multiaddr.Multiaddr, error) {
func provideBootstrapAddrs(c *config.Config) []multiaddr.Multiaddr {
if c.IsBootstrap {
return nil, nil
return nil
}
var bootstrapMaddrs []multiaddr.Multiaddr
for _, a := range c.BootstrapNodes {
maddr, err := multiaddr.NewMultiaddr(a)
if err != nil {
return nil, xerrors.Errorf("invalid multiaddress of bootstrap node: %v", err)
logrus.Fatalf("Invalid multiaddress of bootstrap node: %v", err)
}
bootstrapMaddrs = append(bootstrapMaddrs, maddr)
}
return bootstrapMaddrs, nil
return bootstrapMaddrs
}
func providePeerDiscovery(baddrs []multiaddr.Multiaddr, h host.Host, pexDiscoveryUpdateTime time.Duration) (discovery.Discovery, error) {
pexDiscovery, err := pex.NewPEXDiscovery(h, baddrs, pexDiscoveryUpdateTime)
func providePeerDiscovery(baddrs []multiaddr.Multiaddr, h host.Host) discovery.Discovery {
pexDiscovery, err := pex.NewPEXDiscovery(h, baddrs, DefaultPEXUpdateTime)
if err != nil {
return nil, xerrors.Errorf("failed to setup pex pexDiscovery: %v", err)
logrus.Fatalf("Failed to setup libp2p PEX discovery: %s", err.Error())
}
return pexDiscovery, nil
logrus.Info("Peer discovery subsystem has been initialized!")
return pexDiscovery
}
func provideBlockChain(config *config.Config, bus EventBus.Bus, miner *blockchain.Miner, b beacon.BeaconAPI) (*blockchain.BlockChain, error) {
return blockchain.NewBlockChain(config.Blockchain.DatabasePath, bus, miner, b)
}
func provideMemPool(bus EventBus.Bus) (*pool.Mempool, error) {
return pool.NewMempool(bus)
}
func provideSyncManager(bus EventBus.Bus, bp *blockchain.BlockChain, mp *pool.Mempool, r *gorpc.Client, bootstrap multiaddr.Multiaddr, psb *pubsub.PubSubRouter) (sync.SyncManager, error) {
bootstrapPeerID := peer.ID("")
if bootstrap != nil {
addr, err := peer.AddrInfoFromP2pAddr(bootstrap)
func provideBlockChain(config *config.Config, bus EventBus.Bus, miner *blockchain.Miner, db *drand2.DrandBeacon) *blockchain.BlockChain {
bc, err := blockchain.NewBlockChain(config.Blockchain.DatabasePath, bus, miner, db)
if err != nil {
return nil, err
logrus.Fatalf("Failed to initialize blockchain storage: %s", err.Error())
}
logrus.Info("Blockchain storage has been successfully initialized!")
return bc
}
func provideMempool(bus EventBus.Bus) *pool.Mempool {
mp, err := pool.NewMempool(bus)
if err != nil {
logrus.Fatalf("Failed to initialize mempool: %s", err.Error())
}
logrus.Info("Mempool has been successfully initialized!")
return mp
}
func provideSyncManager(
bus EventBus.Bus,
bp *blockchain.BlockChain,
mp *pool.Mempool,
c *gorpc.Client,
bootstrapAddresses []multiaddr.Multiaddr,
psb *pubsub.PubSubRouter,
) sync.SyncManager {
bootstrapPeerID := peer.ID("")
if bootstrapAddresses != nil {
addr, err := peer.AddrInfoFromP2pAddr(bootstrapAddresses[0]) // FIXME
if err != nil {
logrus.Fatal(err)
}
bootstrapPeerID = addr.ID
}
return sync.NewSyncManager(bus, bp, mp, r, bootstrapPeerID, psb), nil
sm := sync.NewSyncManager(bus, bp, mp, c, bootstrapPeerID, psb)
logrus.Info("Blockchain sync subsystem has been successfully initialized!")
return sm
}
func provideP2PRPCClient(h host.Host) *gorpc.Client {
func provideDirectRPCClient(h host.Host) *gorpc.Client {
return gorpc.NewClient(h, DioneProtocolID)
}
func provideNetworkService(bp *blockchain.BlockChain, mp *pool.Mempool) *NetworkService {
return NewNetworkService(bp, mp)
ns := NewNetworkService(bp, mp)
logrus.Info("Direct RPC has been successfully initialized!")
return ns
}
func provideBlockPool(mp *pool.Mempool, bus EventBus.Bus) (*pool.BlockPool, error) {
return pool.NewBlockPool(mp, bus)
func provideBlockPool(mp *pool.Mempool, bus EventBus.Bus) *pool.BlockPool {
bp, err := pool.NewBlockPool(mp, bus)
if err != nil {
logrus.Fatalf("Failed to initialize blockpool: %s", err.Error())
}
logrus.Info("Blockpool has been successfully initialized!")
return bp
}
func provideEventBus() EventBus.Bus {
return EventBus.New()
}
func provideAppFlags() *AppFlags {
var flags AppFlags
flag.StringVar(&flags.ConfigPath, "config", "", "Path to config")
flag.BoolVar(&flags.Verbose, "verbose", false, "Verbose logging")
flag.Parse()
return &flags
}
func provideConfig(flags *AppFlags) *config.Config {
if flags.ConfigPath == "" {
logrus.Fatal("no config path provided")
}
cfg, err := config.NewConfig(flags.ConfigPath)
if err != nil {
logrus.Fatalf("failed to load config: %v", err)
}
return cfg
}
func providePrivateKey(cfg *config.Config) crypto.PrivKey {
var privateKey crypto.PrivKey
if _, err := os.Stat(cfg.PrivateKeyPath); os.IsNotExist(err) {
privateKey, err = generatePrivateKey()
if err != nil {
logrus.Fatal(err)
}
f, err := os.Create(cfg.PrivateKeyPath)
if err != nil {
logrus.Fatalf("Cannot create private key file: %s, ", err)
}
r, err := privateKey.Raw()
if err != nil {
logrus.Fatal(err)
}
_, err = f.Write(r)
if err != nil {
logrus.Fatal(err)
}
} else {
pkey, err := ioutil.ReadFile(cfg.PrivateKeyPath)
if err != nil {
logrus.Fatal(err)
}
privateKey, err = crypto.UnmarshalEd25519PrivateKey(pkey)
if err != nil {
logrus.Fatal(err)
}
}
return privateKey
}
func generatePrivateKey() (crypto.PrivKey, error) {
r := rand.Reader
// Creates a new RSA key pair for this host.
prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 2048, r)
if err != nil {
return nil, err
}
return prvKey, nil
}
func configureDirectRPC(rpcServer *gorpc.Server, ns *NetworkService) {
err := rpcServer.Register(ns)
if err != nil {
logrus.Fatal(err)
}
}
func configureLogger(flags *AppFlags) {
logrus.SetReportCaller(true)
logrus.SetFormatter(&logrus.TextFormatter{
FullTimestamp: true,
CallerPrettyfier: func(f *runtime.Frame) (string, string) {
filename := path.Base(f.File)
return "", fmt.Sprintf("%s:%d:", filename, f.Line)
},
})
if flags.Verbose {
logrus.SetLevel(logrus.DebugLevel)
} else {
logrus.SetLevel(logrus.InfoLevel)
}
}
func configureForeignBlockchainRPC() {
fc := filecoin.NewLotusClient()
rpc.RegisterRPC(rtypes.RPCTypeFilecoin, map[string]func(string) ([]byte, error){
"getTransaction": fc.GetTransaction,
"getBlock": fc.GetBlock,
})
sl := solana2.NewSolanaClient()
rpc.RegisterRPC(rtypes.RPCTypeSolana, map[string]func(string) ([]byte, error){
"getTransaction": sl.GetTransaction,
})
logrus.Info("Foreign Blockchain RPC clients has been successfully configured!")
}

View File

@ -33,6 +33,7 @@ func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubR
context: ctx,
contextCancel: ctxCancel,
handlers: make(map[PubSubMessageType][]Handler),
oracleTopicName: oracleTopic,
}
var pbOptions []pubsub.Option
@ -61,7 +62,6 @@ func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubR
logrus.Fatalf("Error occurred when initializing PubSub subsystem: %v", err)
}
psr.oracleTopicName = oracleTopic
topic, err := pb.Join(oracleTopic)
if err != nil {
logrus.Fatalf("Error occurred when subscribing to service topic: %v", err)
@ -72,6 +72,10 @@ func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubR
psr.Pubsub = pb
psr.oracleTopic = topic
return psr
}
func (psr *PubSubRouter) Run() {
go func() {
for {
select {
@ -79,7 +83,7 @@ func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubR
return
default:
{
msg, err := subscription.Next(psr.context)
msg, err := psr.serviceSubscription.Next(psr.context)
if err != nil {
logrus.Warnf("Failed to receive pubsub message: %v", err)
}
@ -88,8 +92,6 @@ func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubR
}
}
}()
return psr
}
func (psr *PubSubRouter) handleMessage(p *pubsub.Message) {