dione/node/node.go
2021-08-25 22:10:56 +03:00

227 lines
5.4 KiB
Go

package node
import (
"context"
"time"
"github.com/asaskevich/EventBus"
"github.com/Secured-Finance/dione/blockchain"
drand2 "github.com/Secured-Finance/dione/beacon/drand"
"github.com/Secured-Finance/dione/pubsub"
"github.com/Secured-Finance/dione/consensus"
"github.com/Secured-Finance/dione/blockchain/sync"
"go.uber.org/fx"
"github.com/fxamacker/cbor/v2"
"github.com/Secured-Finance/dione/types"
types2 "github.com/Secured-Finance/dione/blockchain/types"
"github.com/Secured-Finance/dione/blockchain/pool"
"github.com/libp2p/go-libp2p-core/discovery"
"github.com/Secured-Finance/dione/rpc"
"golang.org/x/xerrors"
"github.com/Secured-Finance/dione/config"
"github.com/Secured-Finance/dione/ethclient"
"github.com/libp2p/go-libp2p-core/host"
"github.com/sirupsen/logrus"
)
const (
DefaultPEXUpdateTime = 6 * time.Second
)
func runNode(
lc fx.Lifecycle,
cfg *config.Config,
disco discovery.Discovery,
ethClient ethclient.EthereumSideAPI,
h host.Host,
mp *pool.Mempool,
syncManager sync.SyncManager,
consensusManager *consensus.ConsensusHandler,
pubSubRouter *pubsub.PubSubRouter,
disputeManager *consensus.DisputeManager,
db *drand2.DrandBeacon,
) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
err := runLibp2pAsync(context.TODO(), h, cfg, disco)
if err != nil {
return err
}
err = db.Run(context.TODO())
if err != nil {
return err
}
// Run pubsub router
pubSubRouter.Run()
// Subscribe on new requests event channel from Ethereum
err = subscribeOnEthContractsAsync(context.TODO(), ethClient, mp)
if err != nil {
return err
}
// Run blockchain sync manager
syncManager.Run()
// Run dispute manager
disputeManager.Run(context.TODO())
return nil
},
OnStop: func(ctx context.Context) error {
// TODO
return nil
},
})
}
func runLibp2pAsync(ctx context.Context, h host.Host, cfg *config.Config, disco discovery.Discovery) error {
logrus.Info("Announcing ourselves...")
_, err := disco.Advertise(context.TODO(), cfg.Rendezvous)
if err != nil {
return xerrors.Errorf("failed to announce this node to the network: %v", err)
}
logrus.Info("Successfully announced!")
// Discover unbounded count of peers
logrus.Info("Searching for other peers...")
peerChan, err := disco.FindPeers(context.TODO(), cfg.Rendezvous)
if err != nil {
return xerrors.Errorf("failed to find new peers: %v", err)
}
go func() {
MainLoop:
for {
select {
case <-ctx.Done():
break MainLoop
case newPeer := <-peerChan:
{
if len(newPeer.Addrs) == 0 {
continue
}
if newPeer.ID.String() == h.ID().String() {
continue
}
logrus.WithField("peer", newPeer.ID).Info("Discovered new peer, connecting...")
// Connect to the peer
if err := h.Connect(ctx, newPeer); err != nil {
logrus.WithFields(logrus.Fields{
"peer": newPeer.ID,
"err": err.Error(),
}).Warn("Connection with newly discovered peer has been failed")
}
logrus.WithField("peer", newPeer.ID).Info("Connected to newly discovered peer")
}
}
}
}()
return nil
}
func subscribeOnEthContractsAsync(ctx context.Context, ethClient ethclient.EthereumSideAPI, mp *pool.Mempool) error {
eventChan, subscription, err := ethClient.SubscribeOnOracleEvents(ctx)
if err != nil {
return err
}
go func() {
EventLoop:
for {
select {
case event := <-eventChan:
{
rpcMethod := rpc.GetRPCMethod(event.OriginChain, event.RequestType)
if rpcMethod == nil {
logrus.Errorf("Invalid RPC method name/type %d/%s for oracle request %s", event.OriginChain, event.RequestType, event.ReqID.String())
continue
}
res, err := rpcMethod(event.RequestParams)
if err != nil {
logrus.Errorf("Failed to invoke RPC method for oracle request %s: %s", event.ReqID.String(), err.Error())
continue
}
task := &types.DioneTask{
OriginChain: event.OriginChain,
RequestType: event.RequestType,
RequestParams: event.RequestParams,
Payload: res,
RequestID: event.ReqID.String(),
}
data, err := cbor.Marshal(task)
if err != nil {
logrus.Errorf("Failed to marshal RPC response for oracle request %s: %s", event.ReqID.String(), err.Error())
continue
}
tx := types2.CreateTransaction(data)
err = mp.StoreTx(tx)
if err != nil {
logrus.Errorf("Failed to store tx in mempool: %s", err.Error())
continue
}
}
case <-ctx.Done():
break EventLoop
case err := <-subscription.Err():
logrus.Fatalf("Error has occurred in subscription to Ethereum event channel: %s", err.Error())
}
}
}()
return nil
}
func Start() {
fx.New(
fx.Provide(
provideAppFlags,
provideConfig,
provideCacheManager,
providePrivateKey,
provideLibp2pHost,
ethclient.NewEthereumClient,
providePubsub,
providePubsubRouter,
provideBootstrapAddrs,
providePeerDiscovery,
drand2.NewDrandBeacon,
pool.NewMempool,
provideBlockchainDatabase,
blockchain.NewMiner,
blockchain.NewBlockChain,
sync.NewSyncManager,
provideNetworkRPCHost,
NewNetworkService,
provideDirectRPCClient,
func() EventBus.Bus { return EventBus.New() },
),
consensus.Module,
fx.Invoke(
configureLogger,
configureDirectRPC,
configureForeignBlockchainRPC,
initializeBlockchain,
configureMiner,
runNode,
),
fx.NopLogger,
).Run()
}