Merge pull request #15 from Secured-Finance/refactor/fx
Integrate Uber's Fx DI framework
This commit is contained in:
commit
6d73e1103d
@ -83,20 +83,23 @@ func NewDrandBeacon(ps *pubsub.PubSub, bus EventBus.Bus) (*DrandBeacon, error) {
|
||||
DrandClient: drandClient,
|
||||
localCache: make(map[uint64]types.BeaconEntry),
|
||||
bus: bus,
|
||||
PublicKey: drandChain.PublicKey,
|
||||
}
|
||||
|
||||
db.PublicKey = drandChain.PublicKey
|
||||
|
||||
db.drandResultChannel = db.DrandClient.Watch(context.TODO())
|
||||
err = db.getLatestDrandResult()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go db.loop(context.TODO())
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
||||
func (db *DrandBeacon) Run(ctx context.Context) error {
|
||||
db.drandResultChannel = db.DrandClient.Watch(ctx)
|
||||
err := db.getLatestDrandResult()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go db.loop(ctx)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *DrandBeacon) getLatestDrandResult() error {
|
||||
latestDround, err := db.DrandClient.Get(context.TODO(), 0)
|
||||
if err != nil {
|
||||
@ -113,6 +116,7 @@ func (db *DrandBeacon) loop(ctx context.Context) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
{
|
||||
logrus.Debug("Stopping watching new DRAND entries...")
|
||||
return
|
||||
}
|
||||
case res := <-db.drandResultChannel:
|
||||
|
@ -10,6 +10,8 @@ import (
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
drand2 "github.com/Secured-Finance/dione/beacon/drand"
|
||||
|
||||
"github.com/Secured-Finance/dione/beacon"
|
||||
|
||||
"github.com/Secured-Finance/dione/consensus/validation"
|
||||
@ -47,16 +49,16 @@ type BlockChain struct {
|
||||
metadataIndex *utils.Index
|
||||
heightIndex *utils.Index
|
||||
|
||||
bus EventBus.Bus
|
||||
miner *Miner
|
||||
b beacon.BeaconAPI
|
||||
bus EventBus.Bus
|
||||
miner *Miner
|
||||
drandBeacon *drand2.DrandBeacon
|
||||
}
|
||||
|
||||
func NewBlockChain(path string, bus EventBus.Bus, miner *Miner, b beacon.BeaconAPI) (*BlockChain, error) {
|
||||
func NewBlockChain(path string, bus EventBus.Bus, miner *Miner, db *drand2.DrandBeacon) (*BlockChain, error) {
|
||||
chain := &BlockChain{
|
||||
bus: bus,
|
||||
miner: miner,
|
||||
b: b,
|
||||
bus: bus,
|
||||
miner: miner,
|
||||
drandBeacon: db,
|
||||
}
|
||||
|
||||
// configure lmdb env
|
||||
@ -357,7 +359,7 @@ func (bc *BlockChain) ValidateBlock(block *types2.Block) error {
|
||||
return err
|
||||
}
|
||||
|
||||
res, err := bc.b.Entry(context.TODO(), block.Header.ElectionProof.RandomnessRound)
|
||||
res, err := bc.drandBeacon.Entry(context.TODO(), block.Header.ElectionProof.RandomnessRound)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -37,7 +37,9 @@ import (
|
||||
gorpc "github.com/libp2p/go-libp2p-gorpc"
|
||||
)
|
||||
|
||||
type SyncManager interface{}
|
||||
type SyncManager interface {
|
||||
Run()
|
||||
}
|
||||
|
||||
type syncManager struct {
|
||||
blockpool *blockchain.BlockChain
|
||||
@ -66,16 +68,18 @@ func NewSyncManager(bus EventBus.Bus, bc *blockchain.BlockChain, mp *pool.Mempoo
|
||||
psb: psb,
|
||||
}
|
||||
|
||||
psb.Hook(pubsub.NewTxMessageType, sm.onNewTransaction)
|
||||
psb.Hook(pubsub.NewBlockMessageType, sm.onNewBlock)
|
||||
return sm
|
||||
}
|
||||
|
||||
func (sm *syncManager) Run() {
|
||||
sm.psb.Hook(pubsub.NewTxMessageType, sm.onNewTransaction)
|
||||
sm.psb.Hook(pubsub.NewBlockMessageType, sm.onNewBlock)
|
||||
|
||||
go func() {
|
||||
if err := sm.initialSync(); err != nil {
|
||||
logrus.Error(err)
|
||||
}
|
||||
}()
|
||||
|
||||
return sm
|
||||
}
|
||||
|
||||
func (sm *syncManager) initialSync() error {
|
||||
|
@ -20,6 +20,7 @@ type Config struct {
|
||||
Redis RedisConfig `mapstructure:"redis"`
|
||||
CacheType string `mapstructure:"cache_type"`
|
||||
Blockchain BlockchainConfig `mapstructure:"blockchain"`
|
||||
PrivateKeyPath string `mapstructure:"private_key_path"`
|
||||
}
|
||||
|
||||
type EthereumConfig struct {
|
||||
|
@ -7,9 +7,9 @@ import (
|
||||
"math/big"
|
||||
"sync"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
drand2 "github.com/Secured-Finance/dione/beacon/drand"
|
||||
|
||||
"github.com/Secured-Finance/dione/beacon"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
|
||||
"github.com/fxamacker/cbor/v2"
|
||||
|
||||
@ -82,14 +82,14 @@ func NewPBFTConsensusManager(
|
||||
miner *blockchain.Miner,
|
||||
bc *blockchain.BlockChain,
|
||||
bp *pool.BlockPool,
|
||||
b beacon.BeaconNetwork,
|
||||
db *drand2.DrandBeacon,
|
||||
mempool *pool.Mempool,
|
||||
address peer.ID,
|
||||
) *PBFTConsensusManager {
|
||||
pcm := &PBFTConsensusManager{
|
||||
psb: psb,
|
||||
miner: miner,
|
||||
validator: NewConsensusValidator(miner, bc, b),
|
||||
validator: NewConsensusValidator(miner, bc, db),
|
||||
msgLog: NewConsensusMessageLog(),
|
||||
minApprovals: minApprovals,
|
||||
privKey: privKey,
|
||||
@ -105,15 +105,18 @@ func NewPBFTConsensusManager(
|
||||
address: address,
|
||||
}
|
||||
|
||||
return pcm
|
||||
}
|
||||
|
||||
func (pcm *PBFTConsensusManager) Run() {
|
||||
pcm.psb.Hook(pubsub.PrePrepareMessageType, pcm.handlePrePrepare)
|
||||
pcm.psb.Hook(pubsub.PrepareMessageType, pcm.handlePrepare)
|
||||
pcm.psb.Hook(pubsub.CommitMessageType, pcm.handleCommit)
|
||||
bus.SubscribeAsync("beacon:newEntry", func(entry types2.BeaconEntry) {
|
||||
pcm.bus.SubscribeAsync("beacon:newEntry", func(entry types2.BeaconEntry) {
|
||||
pcm.onNewBeaconEntry(entry)
|
||||
}, true)
|
||||
height, _ := pcm.blockchain.GetLatestBlockHeight()
|
||||
pcm.state.blockHeight = height + 1
|
||||
return pcm
|
||||
}
|
||||
|
||||
func (pcm *PBFTConsensusManager) propose(blk *types3.Block) error {
|
||||
@ -194,7 +197,7 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) {
|
||||
}
|
||||
|
||||
if _, err := pcm.blockPool.GetBlock(cmsg.Blockhash); errors.Is(err, cache.ErrNotFound) {
|
||||
logrus.WithField("blockHash", cmsg.Blockhash).Warnf("received unknown block %x", cmsg.Blockhash)
|
||||
logrus.WithField("blockHash", hex.EncodeToString(cmsg.Blockhash)).Warnf("received unknown block %x", cmsg.Blockhash)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -3,7 +3,8 @@ package consensus
|
||||
import (
|
||||
"encoding/hex"
|
||||
|
||||
"github.com/Secured-Finance/dione/beacon"
|
||||
drand2 "github.com/Secured-Finance/dione/beacon/drand"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/Secured-Finance/dione/blockchain"
|
||||
@ -13,15 +14,15 @@ import (
|
||||
type ConsensusValidator struct {
|
||||
validationFuncMap map[types2.ConsensusMessageType]func(msg types2.ConsensusMessage) bool
|
||||
miner *blockchain.Miner
|
||||
beacon beacon.BeaconNetwork
|
||||
beacon *drand2.DrandBeacon
|
||||
blockchain *blockchain.BlockChain
|
||||
}
|
||||
|
||||
func NewConsensusValidator(miner *blockchain.Miner, bc *blockchain.BlockChain, b beacon.BeaconNetwork) *ConsensusValidator {
|
||||
func NewConsensusValidator(miner *blockchain.Miner, bc *blockchain.BlockChain, db *drand2.DrandBeacon) *ConsensusValidator {
|
||||
cv := &ConsensusValidator{
|
||||
miner: miner,
|
||||
blockchain: bc,
|
||||
beacon: b,
|
||||
beacon: db,
|
||||
}
|
||||
|
||||
cv.validationFuncMap = map[types2.ConsensusMessageType]func(msg types2.ConsensusMessage) bool{
|
||||
|
@ -5,6 +5,8 @@ import (
|
||||
"encoding/hex"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/event"
|
||||
|
||||
types2 "github.com/Secured-Finance/dione/blockchain/types"
|
||||
|
||||
"github.com/Secured-Finance/dione/types"
|
||||
@ -28,51 +30,63 @@ type DisputeManager struct {
|
||||
disputeMap map[string]*dioneDispute.DioneDisputeNewDispute
|
||||
voteWindow time.Duration
|
||||
blockchain *blockchain.BlockChain
|
||||
|
||||
submissionChan chan *dioneOracle.DioneOracleSubmittedOracleRequest
|
||||
submissionSubscription event.Subscription
|
||||
|
||||
disputesChan chan *dioneDispute.DioneDisputeNewDispute
|
||||
disputesSubscription event.Subscription
|
||||
}
|
||||
|
||||
func NewDisputeManager(ctx context.Context, ethClient *ethclient.EthereumClient, pcm *PBFTConsensusManager, voteWindow int, bc *blockchain.BlockChain) (*DisputeManager, error) {
|
||||
newSubmittionsChan, submSubscription, err := ethClient.SubscribeOnNewSubmittions(ctx)
|
||||
submissionChan, submSubscription, err := ethClient.SubscribeOnNewSubmittions(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
newDisputesChan, dispSubscription, err := ethClient.SubscribeOnNewDisputes(ctx)
|
||||
disputesChan, dispSubscription, err := ethClient.SubscribeOnNewDisputes(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dm := &DisputeManager{
|
||||
ethClient: ethClient,
|
||||
pcm: pcm,
|
||||
ctx: ctx,
|
||||
submissionMap: map[string]*dioneOracle.DioneOracleSubmittedOracleRequest{},
|
||||
disputeMap: map[string]*dioneDispute.DioneDisputeNewDispute{},
|
||||
voteWindow: time.Duration(voteWindow) * time.Second,
|
||||
blockchain: bc,
|
||||
ethClient: ethClient,
|
||||
pcm: pcm,
|
||||
ctx: ctx,
|
||||
submissionMap: map[string]*dioneOracle.DioneOracleSubmittedOracleRequest{},
|
||||
disputeMap: map[string]*dioneDispute.DioneDisputeNewDispute{},
|
||||
voteWindow: time.Duration(voteWindow) * time.Second,
|
||||
blockchain: bc,
|
||||
submissionChan: submissionChan,
|
||||
submissionSubscription: submSubscription,
|
||||
disputesChan: disputesChan,
|
||||
disputesSubscription: dispSubscription,
|
||||
}
|
||||
|
||||
return dm, nil
|
||||
}
|
||||
|
||||
func (dm *DisputeManager) Run(ctx context.Context) {
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
{
|
||||
submSubscription.Unsubscribe()
|
||||
dispSubscription.Unsubscribe()
|
||||
dm.disputesSubscription.Unsubscribe()
|
||||
dm.disputesSubscription.Unsubscribe()
|
||||
return
|
||||
}
|
||||
case s := <-newSubmittionsChan:
|
||||
case s := <-dm.submissionChan:
|
||||
{
|
||||
dm.onNewSubmission(s)
|
||||
}
|
||||
case d := <-newDisputesChan:
|
||||
case d := <-dm.disputesChan:
|
||||
{
|
||||
dm.onNewDispute(d)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return dm, nil
|
||||
}
|
||||
|
||||
func (dm *DisputeManager) onNewSubmission(submission *dioneOracle.DioneOracleSubmittedOracleRequest) {
|
||||
|
2
go.mod
2
go.mod
@ -16,7 +16,6 @@ require (
|
||||
github.com/ethereum/go-ethereum v1.9.25
|
||||
github.com/filecoin-project/go-address v0.0.5
|
||||
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
|
||||
github.com/filecoin-project/go-state-types v0.1.0
|
||||
github.com/filecoin-project/lotus v1.6.0
|
||||
github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 // indirect
|
||||
github.com/fxamacker/cbor/v2 v2.3.0
|
||||
@ -57,6 +56,7 @@ require (
|
||||
github.com/valyala/fasthttp v1.17.0
|
||||
github.com/wealdtech/go-merkletree v1.0.1-0.20190605192610-2bb163c2ea2a
|
||||
github.com/whyrusleeping/cbor-gen v0.0.0-20210219115102-f37d292932f2
|
||||
go.uber.org/fx v1.13.1
|
||||
go.uber.org/multierr v1.7.0 // indirect
|
||||
go.uber.org/zap v1.17.0
|
||||
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97
|
||||
|
5
go.sum
5
go.sum
@ -1820,8 +1820,12 @@ go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
|
||||
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
|
||||
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
|
||||
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
|
||||
go.uber.org/dig v1.10.0 h1:yLmDDj9/zuDjv3gz8GQGviXMs9TfysIUMUilCpgzUJY=
|
||||
go.uber.org/dig v1.10.0/go.mod h1:X34SnWGr8Fyla9zQNO2GSO2D+TIuqB14OS8JhYocIyw=
|
||||
go.uber.org/fx v1.9.0/go.mod h1:mFdUyAUuJ3w4jAckiKSKbldsxy1ojpAMJ+dVZg5Y0Aw=
|
||||
go.uber.org/fx v1.13.1 h1:CFNTr1oin5OJ0VCZ8EycL3wzF29Jz2g0xe55RFsf2a4=
|
||||
go.uber.org/fx v1.13.1/go.mod h1:bREWhavnedxpJeTq9pQT53BbvwhUv7TcpsOqcH4a+3w=
|
||||
go.uber.org/goleak v0.10.0/go.mod h1:VCZuO8V8mFPlL0F5J5GK1rtHV3DrFcQ1R8ryq7FK0aI=
|
||||
go.uber.org/goleak v1.0.0 h1:qsup4IcBdlmsnGfqyLl4Ntn3C2XCCuKAE7DwHpScyUo=
|
||||
go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
|
||||
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
|
||||
@ -2129,6 +2133,7 @@ golang.org/x/tools v0.0.0-20191030062658-86caa796c7ab/go.mod h1:b+2E5dAYhXwXZwtn
|
||||
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20191113191852-77e3bb0ad9e7/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20191114200427-caa0b0f7d508/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20191115202509-3a792d9c32b2/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
|
6
node/flags.go
Normal file
6
node/flags.go
Normal file
@ -0,0 +1,6 @@
|
||||
package node
|
||||
|
||||
type AppFlags struct {
|
||||
ConfigPath string
|
||||
Verbose bool
|
||||
}
|
393
node/node.go
393
node/node.go
@ -2,20 +2,17 @@ package node
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"github.com/Secured-Finance/dione/blockchain"
|
||||
drand2 "github.com/Secured-Finance/dione/beacon/drand"
|
||||
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"github.com/Secured-Finance/dione/pubsub"
|
||||
|
||||
"github.com/asaskevich/EventBus"
|
||||
"github.com/Secured-Finance/dione/consensus"
|
||||
|
||||
"github.com/Secured-Finance/dione/blockchain/sync"
|
||||
|
||||
"go.uber.org/fx"
|
||||
|
||||
"github.com/fxamacker/cbor/v2"
|
||||
|
||||
@ -23,31 +20,16 @@ import (
|
||||
|
||||
types2 "github.com/Secured-Finance/dione/blockchain/types"
|
||||
|
||||
gorpc "github.com/libp2p/go-libp2p-gorpc"
|
||||
|
||||
"github.com/Secured-Finance/dione/blockchain/pool"
|
||||
|
||||
"github.com/Secured-Finance/dione/blockchain/sync"
|
||||
|
||||
"github.com/Secured-Finance/dione/consensus"
|
||||
pubsub2 "github.com/Secured-Finance/dione/pubsub"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/discovery"
|
||||
|
||||
"github.com/Secured-Finance/dione/rpc"
|
||||
rtypes "github.com/Secured-Finance/dione/rpc/types"
|
||||
|
||||
solana2 "github.com/Secured-Finance/dione/rpc/solana"
|
||||
|
||||
"github.com/Secured-Finance/dione/rpc/filecoin"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/Secured-Finance/dione/beacon"
|
||||
|
||||
"github.com/Secured-Finance/dione/config"
|
||||
"github.com/Secured-Finance/dione/ethclient"
|
||||
"github.com/libp2p/go-libp2p-core/crypto"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
@ -56,192 +38,61 @@ const (
|
||||
DefaultPEXUpdateTime = 6 * time.Second
|
||||
)
|
||||
|
||||
type Node struct {
|
||||
Host host.Host
|
||||
PeerDiscovery discovery.Discovery
|
||||
PubSubRouter *pubsub2.PubSubRouter
|
||||
GlobalCtx context.Context
|
||||
GlobalCtxCancel context.CancelFunc
|
||||
Config *config.Config
|
||||
Ethereum *ethclient.EthereumClient
|
||||
ConsensusManager *consensus.PBFTConsensusManager
|
||||
Miner *blockchain.Miner
|
||||
Beacon beacon.BeaconNetwork
|
||||
DisputeManager *consensus.DisputeManager
|
||||
BlockPool *pool.BlockPool
|
||||
MemPool *pool.Mempool
|
||||
BlockChain *blockchain.BlockChain
|
||||
SyncManager sync.SyncManager
|
||||
NetworkService *NetworkService
|
||||
NetworkRPCHost *gorpc.Server
|
||||
Bus EventBus.Bus
|
||||
//Cache cache.Cache
|
||||
//Wallet *wallet.LocalWallet
|
||||
}
|
||||
func runNode(
|
||||
lc fx.Lifecycle,
|
||||
cfg *config.Config,
|
||||
disco discovery.Discovery,
|
||||
ethClient *ethclient.EthereumClient,
|
||||
h host.Host,
|
||||
mp *pool.Mempool,
|
||||
syncManager sync.SyncManager,
|
||||
consensusManager *consensus.PBFTConsensusManager,
|
||||
pubSubRouter *pubsub.PubSubRouter,
|
||||
disputeManager *consensus.DisputeManager,
|
||||
db *drand2.DrandBeacon,
|
||||
) {
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(ctx context.Context) error {
|
||||
err := runLibp2pAsync(context.TODO(), h, cfg, disco)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTime time.Duration) (*Node, error) {
|
||||
n := &Node{
|
||||
Config: config,
|
||||
}
|
||||
err = db.Run(context.TODO())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
bus := EventBus.New()
|
||||
n.Bus = bus
|
||||
// Run pubsub router
|
||||
pubSubRouter.Run()
|
||||
|
||||
// initialize libp2p host
|
||||
lhost, err := provideLibp2pHost(n.Config, prvKey)
|
||||
if err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
n.Host = lhost
|
||||
logrus.WithField(
|
||||
"multiaddress",
|
||||
fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s",
|
||||
n.Config.ListenAddr,
|
||||
n.Config.ListenPort,
|
||||
n.Host.ID().Pretty(),
|
||||
)).Info("Libp2p host has been initialized!")
|
||||
// Subscribe on new requests event channel from Ethereum
|
||||
err = subscribeOnEthContractsAsync(context.TODO(), ethClient, mp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// initialize ethereum client
|
||||
ethClient, err := provideEthereumClient(n.Config)
|
||||
if err != nil {
|
||||
logrus.WithField("err", err.Error()).Fatal("Failed to initialize Ethereum client")
|
||||
}
|
||||
n.Ethereum = ethClient
|
||||
//goland:noinspection ALL
|
||||
logrus.WithField("ethAddress", ethClient.GetEthAddress().Hex()).Info("Ethereum client has been initialized!")
|
||||
// Run blockchain sync manager
|
||||
syncManager.Run()
|
||||
|
||||
// initialize blockchain rpc clients
|
||||
err = n.setupRPCClients()
|
||||
if err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
logrus.Info("Foreign Blockchain RPC clients has been successfully configured!")
|
||||
// Run consensus manager
|
||||
consensusManager.Run()
|
||||
|
||||
// initialize pubsub subsystem
|
||||
psb := providePubsubRouter(lhost, n.Config)
|
||||
n.PubSubRouter = psb
|
||||
logrus.Info("PubSub subsystem has been initialized!")
|
||||
// Run dispute manager
|
||||
disputeManager.Run(context.TODO())
|
||||
|
||||
// get list of bootstrap multiaddresses
|
||||
baddrs, err := provideBootstrapAddrs(n.Config)
|
||||
if err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
|
||||
// initialize peer discovery
|
||||
peerDiscovery, err := providePeerDiscovery(baddrs, lhost, pexDiscoveryUpdateTime)
|
||||
if err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
n.PeerDiscovery = peerDiscovery
|
||||
logrus.Info("Peer discovery subsystem has been initialized!")
|
||||
|
||||
// initialize random beacon network subsystem
|
||||
randomBeaconNetwork, err := provideBeacon(psb.Pubsub, bus)
|
||||
if err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
n.Beacon = randomBeaconNetwork
|
||||
logrus.Info("Random beacon subsystem has been initialized!")
|
||||
|
||||
// == initialize blockchain modules
|
||||
|
||||
// initialize mempool
|
||||
mp, err := provideMemPool(bus)
|
||||
if err != nil {
|
||||
logrus.Fatalf("Failed to initialize mempool: %s", err.Error())
|
||||
}
|
||||
n.MemPool = mp
|
||||
logrus.Info("Mempool has been successfully initialized!")
|
||||
|
||||
// initialize mining subsystem
|
||||
miner := provideMiner(n.Host.ID(), *n.Ethereum.GetEthAddress(), n.Ethereum, prvKey, mp)
|
||||
n.Miner = miner
|
||||
logrus.Info("Mining subsystem has been initialized!")
|
||||
|
||||
// initialize blockpool database
|
||||
bc, err := provideBlockChain(n.Config, bus, miner, randomBeaconNetwork.Beacon)
|
||||
if err != nil {
|
||||
logrus.Fatalf("Failed to initialize blockpool: %s", err.Error())
|
||||
}
|
||||
n.BlockChain = bc
|
||||
logrus.Info("Block pool database has been successfully initialized!")
|
||||
|
||||
bp, err := provideBlockPool(mp, bus)
|
||||
if err != nil {
|
||||
logrus.Fatalf("Failed to initialize blockpool: %s", err.Error())
|
||||
}
|
||||
n.BlockPool = bp
|
||||
logrus.Info("Blockpool has been successfully initialized!")
|
||||
|
||||
ns := provideNetworkService(bc, mp)
|
||||
n.NetworkService = ns
|
||||
rpcHost := provideNetworkRPCHost(lhost)
|
||||
err = rpcHost.Register(ns)
|
||||
if err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
logrus.Info("Direct RPC has been successfully initialized!")
|
||||
|
||||
// initialize libp2p-gorpc client
|
||||
r := provideP2PRPCClient(lhost)
|
||||
|
||||
// initialize sync manager
|
||||
|
||||
var baddr multiaddr.Multiaddr
|
||||
if len(baddrs) == 0 {
|
||||
baddr = nil
|
||||
} else {
|
||||
baddr = baddrs[0]
|
||||
}
|
||||
sm, err := provideSyncManager(bus, bc, mp, r, baddr, psb) // FIXME here we just pick up first bootstrap in list
|
||||
if err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
n.SyncManager = sm
|
||||
logrus.Info("Blockchain sync subsystem has been successfully initialized!")
|
||||
|
||||
// initialize consensus subsystem
|
||||
consensusManager := provideConsensusManager(bus, psb, miner, bc, ethClient, prvKey, n.Config.ConsensusMinApprovals, bp, randomBeaconNetwork, mp, n.Host.ID())
|
||||
n.ConsensusManager = consensusManager
|
||||
logrus.Info("Consensus subsystem has been initialized!")
|
||||
|
||||
// initialize dispute subsystem
|
||||
disputeManager, err := provideDisputeManager(context.TODO(), ethClient, consensusManager, config, bc)
|
||||
if err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
n.DisputeManager = disputeManager
|
||||
logrus.Info("Dispute subsystem has been initialized!")
|
||||
|
||||
// initialize internal eth wallet
|
||||
//w, err := provideWallet(n.Host.ID(), rawPrivKey)
|
||||
//if err != nil {
|
||||
// logrus.Fatal(err)
|
||||
//}
|
||||
//n.Wallet = w
|
||||
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (n *Node) Run(ctx context.Context) error {
|
||||
err := n.runLibp2pAsync(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n.subscribeOnEthContractsAsync(ctx)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
},
|
||||
OnStop: func(ctx context.Context) error {
|
||||
// TODO
|
||||
return nil
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func (n *Node) runLibp2pAsync(ctx context.Context) error {
|
||||
func runLibp2pAsync(ctx context.Context, h host.Host, cfg *config.Config, disco discovery.Discovery) error {
|
||||
logrus.Info("Announcing ourselves...")
|
||||
_, err := n.PeerDiscovery.Advertise(context.TODO(), n.Config.Rendezvous)
|
||||
_, err := disco.Advertise(context.TODO(), cfg.Rendezvous)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to announce this node to the network: %v", err)
|
||||
}
|
||||
@ -249,7 +100,7 @@ func (n *Node) runLibp2pAsync(ctx context.Context) error {
|
||||
|
||||
// Discover unbounded count of peers
|
||||
logrus.Info("Searching for other peers...")
|
||||
peerChan, err := n.PeerDiscovery.FindPeers(context.TODO(), n.Config.Rendezvous)
|
||||
peerChan, err := disco.FindPeers(context.TODO(), cfg.Rendezvous)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to find new peers: %v", err)
|
||||
}
|
||||
@ -264,12 +115,12 @@ func (n *Node) runLibp2pAsync(ctx context.Context) error {
|
||||
if len(newPeer.Addrs) == 0 {
|
||||
continue
|
||||
}
|
||||
if newPeer.ID.String() == n.Host.ID().String() {
|
||||
if newPeer.ID.String() == h.ID().String() {
|
||||
continue
|
||||
}
|
||||
logrus.WithField("peer", newPeer.ID).Info("Discovered new peer, connecting...")
|
||||
// Connect to the peer
|
||||
if err := n.Host.Connect(ctx, newPeer); err != nil {
|
||||
if err := h.Connect(ctx, newPeer); err != nil {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"peer": newPeer.ID,
|
||||
"err": err.Error(),
|
||||
@ -283,10 +134,10 @@ func (n *Node) runLibp2pAsync(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *Node) subscribeOnEthContractsAsync(ctx context.Context) {
|
||||
eventChan, subscription, err := n.Ethereum.SubscribeOnOracleEvents(ctx)
|
||||
func subscribeOnEthContractsAsync(ctx context.Context, ethClient *ethclient.EthereumClient, mp *pool.Mempool) error {
|
||||
eventChan, subscription, err := ethClient.SubscribeOnOracleEvents(ctx)
|
||||
if err != nil {
|
||||
logrus.Fatal("Couldn't subscribe on ethereum contracts, exiting... ", err)
|
||||
return err
|
||||
}
|
||||
|
||||
go func() {
|
||||
@ -318,7 +169,7 @@ func (n *Node) subscribeOnEthContractsAsync(ctx context.Context) {
|
||||
continue
|
||||
}
|
||||
tx := types2.CreateTransaction(data)
|
||||
err = n.MemPool.StoreTx(tx)
|
||||
err = mp.StoreTx(tx)
|
||||
if err != nil {
|
||||
logrus.Errorf("Failed to store tx in mempool: %s", err.Error())
|
||||
continue
|
||||
@ -326,107 +177,45 @@ func (n *Node) subscribeOnEthContractsAsync(ctx context.Context) {
|
||||
}
|
||||
case <-ctx.Done():
|
||||
break EventLoop
|
||||
case <-subscription.Err():
|
||||
logrus.Fatal("Error with ethereum subscription, exiting... ", err)
|
||||
case err := <-subscription.Err():
|
||||
logrus.Fatalf("Error has occurred in subscription to Ethereum event channel: %s", err.Error())
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (n *Node) setupRPCClients() error {
|
||||
fc := filecoin.NewLotusClient()
|
||||
rpc.RegisterRPC(rtypes.RPCTypeFilecoin, map[string]func(string) ([]byte, error){
|
||||
"getTransaction": fc.GetTransaction,
|
||||
"getBlock": fc.GetBlock,
|
||||
})
|
||||
|
||||
sl := solana2.NewSolanaClient()
|
||||
rpc.RegisterRPC(rtypes.RPCTypeSolana, map[string]func(string) ([]byte, error){
|
||||
"getTransaction": sl.GetTransaction,
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func Start() {
|
||||
logrus.SetReportCaller(true)
|
||||
logrus.SetFormatter(&logrus.TextFormatter{
|
||||
FullTimestamp: true,
|
||||
CallerPrettyfier: func(f *runtime.Frame) (string, string) {
|
||||
filename := path.Base(f.File)
|
||||
return "", fmt.Sprintf("%s:%d:", filename, f.Line)
|
||||
},
|
||||
})
|
||||
|
||||
configPath := flag.String("config", "", "Path to config")
|
||||
verbose := flag.Bool("verbose", false, "Verbose logging")
|
||||
flag.Parse()
|
||||
|
||||
if *configPath == "" {
|
||||
logrus.Fatal("no config path provided")
|
||||
}
|
||||
cfg, err := config.NewConfig(*configPath)
|
||||
if err != nil {
|
||||
logrus.Fatalf("failed to load config: %v", err)
|
||||
}
|
||||
|
||||
var privateKey crypto.PrivKey
|
||||
|
||||
if cfg.IsBootstrap {
|
||||
// FIXME just a little hack
|
||||
if _, err := os.Stat(".bootstrap_privkey"); os.IsNotExist(err) {
|
||||
privateKey, err = generatePrivateKey()
|
||||
if err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
|
||||
f, _ := os.Create(".bootstrap_privkey")
|
||||
r, _ := privateKey.Raw()
|
||||
_, err = f.Write(r)
|
||||
if err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
} else {
|
||||
pkey, _ := ioutil.ReadFile(".bootstrap_privkey")
|
||||
privateKey, _ = crypto.UnmarshalEd25519PrivateKey(pkey)
|
||||
}
|
||||
} else {
|
||||
privateKey, err = generatePrivateKey()
|
||||
if err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
node, err := NewNode(cfg, privateKey, DefaultPEXUpdateTime)
|
||||
if err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
|
||||
// log
|
||||
if *verbose {
|
||||
logrus.SetLevel(logrus.DebugLevel)
|
||||
} else {
|
||||
logrus.SetLevel(logrus.DebugLevel)
|
||||
}
|
||||
|
||||
//log.SetDebugLogging()
|
||||
|
||||
//ctx, ctxCancel := context.WithCancel(context.Background())
|
||||
//node.GlobalCtx = ctx
|
||||
//node.GlobalCtxCancel = ctxCancel
|
||||
|
||||
err = node.Run(context.TODO())
|
||||
if err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func generatePrivateKey() (crypto.PrivKey, error) {
|
||||
r := rand.Reader
|
||||
// Creates a new RSA key pair for this host.
|
||||
prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 2048, r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return prvKey, nil
|
||||
fx.New(
|
||||
fx.Provide(
|
||||
provideEventBus,
|
||||
provideAppFlags,
|
||||
provideConfig,
|
||||
providePrivateKey,
|
||||
provideLibp2pHost,
|
||||
provideEthereumClient,
|
||||
providePubsubRouter,
|
||||
provideBootstrapAddrs,
|
||||
providePeerDiscovery,
|
||||
provideDrandBeacon,
|
||||
provideMempool,
|
||||
provideMiner,
|
||||
provideBlockChain,
|
||||
provideBlockPool,
|
||||
provideSyncManager,
|
||||
provideNetworkRPCHost,
|
||||
provideNetworkService,
|
||||
provideDirectRPCClient,
|
||||
provideConsensusManager,
|
||||
provideDisputeManager,
|
||||
),
|
||||
fx.Invoke(
|
||||
configureLogger,
|
||||
configureDirectRPC,
|
||||
configureForeignBlockchainRPC,
|
||||
runNode,
|
||||
),
|
||||
fx.NopLogger,
|
||||
).Run()
|
||||
}
|
||||
|
@ -2,8 +2,20 @@ package node
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"flag"
|
||||
"fmt"
|
||||
"time"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"runtime"
|
||||
|
||||
"github.com/Secured-Finance/dione/rpc"
|
||||
"github.com/Secured-Finance/dione/rpc/filecoin"
|
||||
solana2 "github.com/Secured-Finance/dione/rpc/solana"
|
||||
rtypes "github.com/Secured-Finance/dione/rpc/types"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/asaskevich/EventBus"
|
||||
|
||||
@ -19,7 +31,6 @@ import (
|
||||
|
||||
"github.com/Secured-Finance/dione/blockchain/pool"
|
||||
|
||||
"github.com/Secured-Finance/dione/beacon"
|
||||
"github.com/Secured-Finance/dione/cache"
|
||||
"github.com/Secured-Finance/dione/config"
|
||||
"github.com/Secured-Finance/dione/consensus"
|
||||
@ -28,13 +39,11 @@ import (
|
||||
"github.com/Secured-Finance/dione/types"
|
||||
"github.com/Secured-Finance/dione/wallet"
|
||||
pex "github.com/Secured-Finance/go-libp2p-pex"
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/libp2p/go-libp2p"
|
||||
"github.com/libp2p/go-libp2p-core/crypto"
|
||||
"github.com/libp2p/go-libp2p-core/discovery"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
pubsub2 "github.com/libp2p/go-libp2p-pubsub"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
@ -56,21 +65,30 @@ func provideCache(config *config.Config) cache.Cache {
|
||||
return backend
|
||||
}
|
||||
|
||||
func provideDisputeManager(ctx context.Context, ethClient *ethclient.EthereumClient, pcm *consensus.PBFTConsensusManager, cfg *config.Config, bc *blockchain.BlockChain) (*consensus.DisputeManager, error) {
|
||||
return consensus.NewDisputeManager(ctx, ethClient, pcm, cfg.Ethereum.DisputeVoteWindow, bc)
|
||||
}
|
||||
|
||||
func provideMiner(peerID peer.ID, ethAddress common.Address, ethClient *ethclient.EthereumClient, privateKey crypto.PrivKey, mempool *pool.Mempool) *blockchain.Miner {
|
||||
return blockchain.NewMiner(peerID, ethAddress, ethClient, privateKey, mempool)
|
||||
}
|
||||
|
||||
func provideBeacon(ps *pubsub2.PubSub, bus EventBus.Bus) (beacon.BeaconNetwork, error) {
|
||||
bc, err := drand2.NewDrandBeacon(ps, bus)
|
||||
func provideDisputeManager(ethClient *ethclient.EthereumClient, pcm *consensus.PBFTConsensusManager, cfg *config.Config, bc *blockchain.BlockChain) *consensus.DisputeManager {
|
||||
dm, err := consensus.NewDisputeManager(context.TODO(), ethClient, pcm, cfg.Ethereum.DisputeVoteWindow, bc)
|
||||
if err != nil {
|
||||
return beacon.BeaconNetwork{}, fmt.Errorf("failed to setup drand beacon: %w", err)
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
// NOTE: currently we use only one network
|
||||
return beacon.BeaconNetwork{Start: config.DrandChainGenesisTime, Beacon: bc}, nil
|
||||
|
||||
logrus.Info("Dispute subsystem has been initialized!")
|
||||
|
||||
return dm
|
||||
}
|
||||
|
||||
func provideMiner(h host.Host, ethClient *ethclient.EthereumClient, privateKey crypto.PrivKey, mempool *pool.Mempool) *blockchain.Miner {
|
||||
miner := blockchain.NewMiner(h.ID(), *ethClient.GetEthAddress(), ethClient, privateKey, mempool)
|
||||
logrus.Info("Mining subsystem has been initialized!")
|
||||
return miner
|
||||
}
|
||||
|
||||
func provideDrandBeacon(ps *pubsub.PubSubRouter, bus EventBus.Bus) *drand2.DrandBeacon {
|
||||
db, err := drand2.NewDrandBeacon(ps.Pubsub, bus)
|
||||
if err != nil {
|
||||
logrus.Fatalf("Failed to setup drand beacon: %s", err)
|
||||
}
|
||||
logrus.Info("DRAND beacon subsystem has been initialized!")
|
||||
return db
|
||||
}
|
||||
|
||||
// FIXME: do we really need this?
|
||||
@ -90,123 +108,287 @@ func provideWallet(peerID peer.ID, privKey []byte) (*wallet.LocalWallet, error)
|
||||
return w, nil
|
||||
}
|
||||
|
||||
func provideEthereumClient(config *config.Config) (*ethclient.EthereumClient, error) {
|
||||
func provideEthereumClient(config *config.Config) *ethclient.EthereumClient {
|
||||
ethereum := ethclient.NewEthereumClient()
|
||||
err := ethereum.Initialize(&config.Ethereum)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
return ethereum, nil
|
||||
|
||||
logrus.WithField("ethAddress", ethereum.GetEthAddress().Hex()).Info("Ethereum client has been initialized!")
|
||||
|
||||
return ethereum
|
||||
}
|
||||
|
||||
func providePubsubRouter(lhost host.Host, config *config.Config) *pubsub.PubSubRouter {
|
||||
return pubsub.NewPubSubRouter(lhost, config.PubSub.ServiceTopicName, config.IsBootstrap)
|
||||
psb := pubsub.NewPubSubRouter(lhost, config.PubSub.ServiceTopicName, config.IsBootstrap)
|
||||
logrus.Info("PubSub subsystem has been initialized!")
|
||||
return psb
|
||||
}
|
||||
|
||||
func provideConsensusManager(
|
||||
h host.Host,
|
||||
cfg *config.Config,
|
||||
bus EventBus.Bus,
|
||||
psb *pubsub.PubSubRouter,
|
||||
miner *blockchain.Miner,
|
||||
bc *blockchain.BlockChain,
|
||||
ethClient *ethclient.EthereumClient,
|
||||
privateKey crypto.PrivKey,
|
||||
minApprovals int,
|
||||
bp *pool.BlockPool,
|
||||
b beacon.BeaconNetwork,
|
||||
db *drand2.DrandBeacon,
|
||||
mp *pool.Mempool,
|
||||
address peer.ID,
|
||||
) *consensus.PBFTConsensusManager {
|
||||
return consensus.NewPBFTConsensusManager(
|
||||
c := consensus.NewPBFTConsensusManager(
|
||||
bus,
|
||||
psb,
|
||||
minApprovals,
|
||||
cfg.ConsensusMinApprovals,
|
||||
privateKey,
|
||||
ethClient,
|
||||
miner,
|
||||
bc,
|
||||
bp,
|
||||
b,
|
||||
db,
|
||||
mp,
|
||||
address,
|
||||
h.ID(),
|
||||
)
|
||||
logrus.Info("Consensus subsystem has been initialized!")
|
||||
return c
|
||||
}
|
||||
|
||||
func provideLibp2pHost(config *config.Config, privateKey crypto.PrivKey) (host.Host, error) {
|
||||
func provideLibp2pHost(config *config.Config, privateKey crypto.PrivKey) host.Host {
|
||||
listenMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", config.ListenAddr, config.ListenPort))
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to parse multiaddress: %v", err)
|
||||
logrus.Fatalf("Failed to parse multiaddress: %s", err.Error())
|
||||
}
|
||||
host, err := libp2p.New(
|
||||
libp2pHost, err := libp2p.New(
|
||||
context.TODO(),
|
||||
libp2p.ListenAddrs(listenMultiAddr),
|
||||
libp2p.Identity(privateKey),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to setup libp2p host: %v", err)
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
|
||||
return host, nil
|
||||
logrus.WithField(
|
||||
"multiaddress",
|
||||
fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s",
|
||||
config.ListenAddr,
|
||||
config.ListenPort,
|
||||
libp2pHost.ID().Pretty(),
|
||||
)).Info("Libp2p host has been initialized!")
|
||||
|
||||
return libp2pHost
|
||||
}
|
||||
|
||||
func provideNetworkRPCHost(h host.Host) *gorpc.Server {
|
||||
return gorpc.NewServer(h, DioneProtocolID)
|
||||
}
|
||||
|
||||
func provideBootstrapAddrs(c *config.Config) ([]multiaddr.Multiaddr, error) {
|
||||
func provideBootstrapAddrs(c *config.Config) []multiaddr.Multiaddr {
|
||||
if c.IsBootstrap {
|
||||
return nil, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
var bootstrapMaddrs []multiaddr.Multiaddr
|
||||
for _, a := range c.BootstrapNodes {
|
||||
maddr, err := multiaddr.NewMultiaddr(a)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("invalid multiaddress of bootstrap node: %v", err)
|
||||
logrus.Fatalf("Invalid multiaddress of bootstrap node: %v", err)
|
||||
}
|
||||
bootstrapMaddrs = append(bootstrapMaddrs, maddr)
|
||||
}
|
||||
|
||||
return bootstrapMaddrs, nil
|
||||
return bootstrapMaddrs
|
||||
}
|
||||
|
||||
func providePeerDiscovery(baddrs []multiaddr.Multiaddr, h host.Host, pexDiscoveryUpdateTime time.Duration) (discovery.Discovery, error) {
|
||||
pexDiscovery, err := pex.NewPEXDiscovery(h, baddrs, pexDiscoveryUpdateTime)
|
||||
func providePeerDiscovery(baddrs []multiaddr.Multiaddr, h host.Host) discovery.Discovery {
|
||||
pexDiscovery, err := pex.NewPEXDiscovery(h, baddrs, DefaultPEXUpdateTime)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to setup pex pexDiscovery: %v", err)
|
||||
logrus.Fatalf("Failed to setup libp2p PEX discovery: %s", err.Error())
|
||||
}
|
||||
|
||||
return pexDiscovery, nil
|
||||
logrus.Info("Peer discovery subsystem has been initialized!")
|
||||
|
||||
return pexDiscovery
|
||||
}
|
||||
|
||||
func provideBlockChain(config *config.Config, bus EventBus.Bus, miner *blockchain.Miner, b beacon.BeaconAPI) (*blockchain.BlockChain, error) {
|
||||
return blockchain.NewBlockChain(config.Blockchain.DatabasePath, bus, miner, b)
|
||||
func provideBlockChain(config *config.Config, bus EventBus.Bus, miner *blockchain.Miner, db *drand2.DrandBeacon) *blockchain.BlockChain {
|
||||
bc, err := blockchain.NewBlockChain(config.Blockchain.DatabasePath, bus, miner, db)
|
||||
if err != nil {
|
||||
logrus.Fatalf("Failed to initialize blockchain storage: %s", err.Error())
|
||||
}
|
||||
logrus.Info("Blockchain storage has been successfully initialized!")
|
||||
|
||||
return bc
|
||||
}
|
||||
|
||||
func provideMemPool(bus EventBus.Bus) (*pool.Mempool, error) {
|
||||
return pool.NewMempool(bus)
|
||||
func provideMempool(bus EventBus.Bus) *pool.Mempool {
|
||||
mp, err := pool.NewMempool(bus)
|
||||
if err != nil {
|
||||
logrus.Fatalf("Failed to initialize mempool: %s", err.Error())
|
||||
}
|
||||
|
||||
logrus.Info("Mempool has been successfully initialized!")
|
||||
|
||||
return mp
|
||||
}
|
||||
|
||||
func provideSyncManager(bus EventBus.Bus, bp *blockchain.BlockChain, mp *pool.Mempool, r *gorpc.Client, bootstrap multiaddr.Multiaddr, psb *pubsub.PubSubRouter) (sync.SyncManager, error) {
|
||||
func provideSyncManager(
|
||||
bus EventBus.Bus,
|
||||
bp *blockchain.BlockChain,
|
||||
mp *pool.Mempool,
|
||||
c *gorpc.Client,
|
||||
bootstrapAddresses []multiaddr.Multiaddr,
|
||||
psb *pubsub.PubSubRouter,
|
||||
) sync.SyncManager {
|
||||
bootstrapPeerID := peer.ID("")
|
||||
if bootstrap != nil {
|
||||
addr, err := peer.AddrInfoFromP2pAddr(bootstrap)
|
||||
|
||||
if bootstrapAddresses != nil {
|
||||
addr, err := peer.AddrInfoFromP2pAddr(bootstrapAddresses[0]) // FIXME
|
||||
if err != nil {
|
||||
return nil, err
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
bootstrapPeerID = addr.ID
|
||||
}
|
||||
|
||||
return sync.NewSyncManager(bus, bp, mp, r, bootstrapPeerID, psb), nil
|
||||
sm := sync.NewSyncManager(bus, bp, mp, c, bootstrapPeerID, psb)
|
||||
logrus.Info("Blockchain sync subsystem has been successfully initialized!")
|
||||
|
||||
return sm
|
||||
}
|
||||
|
||||
func provideP2PRPCClient(h host.Host) *gorpc.Client {
|
||||
func provideDirectRPCClient(h host.Host) *gorpc.Client {
|
||||
return gorpc.NewClient(h, DioneProtocolID)
|
||||
}
|
||||
|
||||
func provideNetworkService(bp *blockchain.BlockChain, mp *pool.Mempool) *NetworkService {
|
||||
return NewNetworkService(bp, mp)
|
||||
ns := NewNetworkService(bp, mp)
|
||||
logrus.Info("Direct RPC has been successfully initialized!")
|
||||
return ns
|
||||
}
|
||||
|
||||
func provideBlockPool(mp *pool.Mempool, bus EventBus.Bus) (*pool.BlockPool, error) {
|
||||
return pool.NewBlockPool(mp, bus)
|
||||
func provideBlockPool(mp *pool.Mempool, bus EventBus.Bus) *pool.BlockPool {
|
||||
bp, err := pool.NewBlockPool(mp, bus)
|
||||
if err != nil {
|
||||
logrus.Fatalf("Failed to initialize blockpool: %s", err.Error())
|
||||
}
|
||||
logrus.Info("Blockpool has been successfully initialized!")
|
||||
return bp
|
||||
}
|
||||
|
||||
func provideEventBus() EventBus.Bus {
|
||||
return EventBus.New()
|
||||
}
|
||||
|
||||
func provideAppFlags() *AppFlags {
|
||||
var flags AppFlags
|
||||
|
||||
flag.StringVar(&flags.ConfigPath, "config", "", "Path to config")
|
||||
flag.BoolVar(&flags.Verbose, "verbose", false, "Verbose logging")
|
||||
|
||||
flag.Parse()
|
||||
|
||||
return &flags
|
||||
}
|
||||
|
||||
func provideConfig(flags *AppFlags) *config.Config {
|
||||
if flags.ConfigPath == "" {
|
||||
logrus.Fatal("no config path provided")
|
||||
|
||||
}
|
||||
|
||||
cfg, err := config.NewConfig(flags.ConfigPath)
|
||||
if err != nil {
|
||||
logrus.Fatalf("failed to load config: %v", err)
|
||||
}
|
||||
|
||||
return cfg
|
||||
}
|
||||
|
||||
func providePrivateKey(cfg *config.Config) crypto.PrivKey {
|
||||
var privateKey crypto.PrivKey
|
||||
|
||||
if _, err := os.Stat(cfg.PrivateKeyPath); os.IsNotExist(err) {
|
||||
privateKey, err = generatePrivateKey()
|
||||
if err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
|
||||
f, err := os.Create(cfg.PrivateKeyPath)
|
||||
if err != nil {
|
||||
logrus.Fatalf("Cannot create private key file: %s, ", err)
|
||||
}
|
||||
|
||||
r, err := privateKey.Raw()
|
||||
if err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = f.Write(r)
|
||||
if err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
} else {
|
||||
pkey, err := ioutil.ReadFile(cfg.PrivateKeyPath)
|
||||
if err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
|
||||
privateKey, err = crypto.UnmarshalEd25519PrivateKey(pkey)
|
||||
if err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
return privateKey
|
||||
}
|
||||
|
||||
func generatePrivateKey() (crypto.PrivKey, error) {
|
||||
r := rand.Reader
|
||||
// Creates a new RSA key pair for this host.
|
||||
prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 2048, r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return prvKey, nil
|
||||
}
|
||||
|
||||
func configureDirectRPC(rpcServer *gorpc.Server, ns *NetworkService) {
|
||||
err := rpcServer.Register(ns)
|
||||
if err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func configureLogger(flags *AppFlags) {
|
||||
logrus.SetReportCaller(true)
|
||||
logrus.SetFormatter(&logrus.TextFormatter{
|
||||
FullTimestamp: true,
|
||||
CallerPrettyfier: func(f *runtime.Frame) (string, string) {
|
||||
filename := path.Base(f.File)
|
||||
return "", fmt.Sprintf("%s:%d:", filename, f.Line)
|
||||
},
|
||||
})
|
||||
|
||||
if flags.Verbose {
|
||||
logrus.SetLevel(logrus.DebugLevel)
|
||||
} else {
|
||||
logrus.SetLevel(logrus.InfoLevel)
|
||||
}
|
||||
}
|
||||
|
||||
func configureForeignBlockchainRPC() {
|
||||
fc := filecoin.NewLotusClient()
|
||||
rpc.RegisterRPC(rtypes.RPCTypeFilecoin, map[string]func(string) ([]byte, error){
|
||||
"getTransaction": fc.GetTransaction,
|
||||
"getBlock": fc.GetBlock,
|
||||
})
|
||||
|
||||
sl := solana2.NewSolanaClient()
|
||||
rpc.RegisterRPC(rtypes.RPCTypeSolana, map[string]func(string) ([]byte, error){
|
||||
"getTransaction": sl.GetTransaction,
|
||||
})
|
||||
|
||||
logrus.Info("Foreign Blockchain RPC clients has been successfully configured!")
|
||||
}
|
||||
|
@ -29,10 +29,11 @@ func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubR
|
||||
ctx, ctxCancel := context.WithCancel(context.Background())
|
||||
|
||||
psr := &PubSubRouter{
|
||||
node: h,
|
||||
context: ctx,
|
||||
contextCancel: ctxCancel,
|
||||
handlers: make(map[PubSubMessageType][]Handler),
|
||||
node: h,
|
||||
context: ctx,
|
||||
contextCancel: ctxCancel,
|
||||
handlers: make(map[PubSubMessageType][]Handler),
|
||||
oracleTopicName: oracleTopic,
|
||||
}
|
||||
|
||||
var pbOptions []pubsub.Option
|
||||
@ -61,7 +62,6 @@ func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubR
|
||||
logrus.Fatalf("Error occurred when initializing PubSub subsystem: %v", err)
|
||||
}
|
||||
|
||||
psr.oracleTopicName = oracleTopic
|
||||
topic, err := pb.Join(oracleTopic)
|
||||
if err != nil {
|
||||
logrus.Fatalf("Error occurred when subscribing to service topic: %v", err)
|
||||
@ -72,6 +72,10 @@ func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubR
|
||||
psr.Pubsub = pb
|
||||
psr.oracleTopic = topic
|
||||
|
||||
return psr
|
||||
}
|
||||
|
||||
func (psr *PubSubRouter) Run() {
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
@ -79,7 +83,7 @@ func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubR
|
||||
return
|
||||
default:
|
||||
{
|
||||
msg, err := subscription.Next(psr.context)
|
||||
msg, err := psr.serviceSubscription.Next(psr.context)
|
||||
if err != nil {
|
||||
logrus.Warnf("Failed to receive pubsub message: %v", err)
|
||||
}
|
||||
@ -88,8 +92,6 @@ func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubR
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return psr
|
||||
}
|
||||
|
||||
func (psr *PubSubRouter) handleMessage(p *pubsub.Message) {
|
||||
|
Loading…
Reference in New Issue
Block a user