From 5aadbab60065f8e7f548a041ebf9a3cb60c80c27 Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Sun, 11 Jul 2021 03:08:03 +0300 Subject: [PATCH] Refactor dispute manager to new blockchain architecture --- blockchain/{blockpool.go => blockchain.go} | 0 consensus/dispute_manager.go | 213 ++++++++++++--------- node/node.go | 13 +- node/node_dep_providers.go | 17 +- 4 files changed, 146 insertions(+), 97 deletions(-) rename blockchain/{blockpool.go => blockchain.go} (100%) diff --git a/blockchain/blockpool.go b/blockchain/blockchain.go similarity index 100% rename from blockchain/blockpool.go rename to blockchain/blockchain.go diff --git a/consensus/dispute_manager.go b/consensus/dispute_manager.go index 70bfb7b..672ca9c 100644 --- a/consensus/dispute_manager.go +++ b/consensus/dispute_manager.go @@ -2,8 +2,19 @@ package consensus import ( "context" + "encoding/hex" "time" + types2 "github.com/Secured-Finance/dione/blockchain/types" + + "github.com/Secured-Finance/dione/types" + "github.com/fxamacker/cbor/v2" + + "github.com/sirupsen/logrus" + "golang.org/x/crypto/sha3" + + "github.com/Secured-Finance/dione/blockchain" + "github.com/Secured-Finance/dione/contracts/dioneDispute" "github.com/Secured-Finance/dione/contracts/dioneOracle" "github.com/Secured-Finance/dione/ethclient" @@ -16,9 +27,10 @@ type DisputeManager struct { submissionMap map[string]*dioneOracle.DioneOracleSubmittedOracleRequest disputeMap map[string]*dioneDispute.DioneDisputeNewDispute voteWindow time.Duration + blockchain *blockchain.BlockChain } -func NewDisputeManager(ctx context.Context, ethClient *ethclient.EthereumClient, pcm *PBFTConsensusManager, voteWindow int) (*DisputeManager, error) { +func NewDisputeManager(ctx context.Context, ethClient *ethclient.EthereumClient, pcm *PBFTConsensusManager, voteWindow int, bc *blockchain.BlockChain) (*DisputeManager, error) { newSubmittionsChan, submSubscription, err := ethClient.SubscribeOnNewSubmittions(ctx) if err != nil { return nil, err @@ -36,6 +48,7 @@ func NewDisputeManager(ctx context.Context, ethClient *ethclient.EthereumClient, submissionMap: map[string]*dioneOracle.DioneOracleSubmittedOracleRequest{}, disputeMap: map[string]*dioneDispute.DioneDisputeNewDispute{}, voteWindow: time.Duration(voteWindow) * time.Second, + blockchain: bc, } go func() { @@ -62,95 +75,117 @@ func NewDisputeManager(ctx context.Context, ethClient *ethclient.EthereumClient, return dm, nil } -func (dm *DisputeManager) onNewSubmission(submittion *dioneOracle.DioneOracleSubmittedOracleRequest) { - //c := dm.pcm.GetConsensusInfo(submittion.ReqID.String()) - //if c == nil { - // // todo: warn - // return - //} - // - //dm.submissionMap[submittion.ReqID.String()] = submittion - // - //submHashBytes := sha3.Sum256(submittion.Data) - //localHashBytes := sha3.Sum256(c.Task.Payload) - //submHash := hex.EncodeToString(submHashBytes[:]) - //localHash := hex.EncodeToString(localHashBytes[:]) - //if submHash != localHash { - // logrus.Debugf("submission of request id %s isn't valid - beginning dispute", c.Task.RequestID) - // addr := common.HexToAddress(c.Task.MinerEth) - // reqID, ok := big.NewInt(0).SetString(c.Task.RequestID, 10) - // if !ok { - // logrus.Errorf("cannot parse request id: %s", c.Task.RequestID) - // return - // } - // err := dm.ethClient.BeginDispute(addr, reqID) - // if err != nil { - // logrus.Errorf(err.Error()) - // return - // } - // disputeFinishTimer := time.NewTimer(dm.voteWindow) - // go func() { - // for { - // select { - // case <-dm.ctx.Done(): - // return - // case <-disputeFinishTimer.C: - // { - // d, ok := dm.disputeMap[reqID.String()] - // if !ok { - // logrus.Error("cannot finish dispute: it doesn't exist in manager's dispute map!") - // return - // } - // err := dm.ethClient.FinishDispute(d.Dhash) - // if err != nil { - // logrus.Errorf(err.Error()) - // return - // } - // disputeFinishTimer.Stop() - // return - // } - // } - // } - // }() - //} - // TODO refactor due to new architecture with blockchain +func (dm *DisputeManager) onNewSubmission(submission *dioneOracle.DioneOracleSubmittedOracleRequest) { + // find a block that contains the dione task with specified request id + task, block, err := dm.findTaskAndBlockWithRequestID(submission.ReqID.String()) + if err != nil { + logrus.Error(err) + return + } + + dm.submissionMap[submission.ReqID.String()] = submission + + submHashBytes := sha3.Sum256(submission.Data) + localHashBytes := sha3.Sum256(task.Payload) + submHash := hex.EncodeToString(submHashBytes[:]) + localHash := hex.EncodeToString(localHashBytes[:]) + if submHash != localHash { + logrus.Debugf("submission of request id %s isn't valid - beginning dispute", submission.ReqID) + err := dm.ethClient.BeginDispute(block.Header.ProposerEth, submission.ReqID) + if err != nil { + logrus.Errorf(err.Error()) + return + } + disputeFinishTimer := time.NewTimer(dm.voteWindow) + go func() { + for { + select { + case <-dm.ctx.Done(): + return + case <-disputeFinishTimer.C: + { + d, ok := dm.disputeMap[submission.ReqID.String()] + if !ok { + logrus.Error("cannot finish dispute: it doesn't exist in manager's dispute map!") + return + } + err := dm.ethClient.FinishDispute(d.Dhash) + if err != nil { + logrus.Errorf(err.Error()) + return + } + disputeFinishTimer.Stop() + return + } + } + } + }() + } +} + +func (dm *DisputeManager) findTaskAndBlockWithRequestID(requestID string) (*types.DioneTask, *types2.Block, error) { + height, err := dm.blockchain.GetLatestBlockHeight() + if err != nil { + return nil, nil, err + } + + for { + block, err := dm.blockchain.FetchBlockByHeight(height) + if err != nil { + return nil, nil, err + } + + for _, v := range block.Data { + var task types.DioneTask + err := cbor.Unmarshal(v.Data, &task) + if err != nil { + logrus.Error(err) + continue + } + + if task.RequestID == requestID { + return &task, block, nil + } + } + + height-- + } } func (dm *DisputeManager) onNewDispute(dispute *dioneDispute.DioneDisputeNewDispute) { - //c := dm.pcm.GetConsensusInfo(dispute.RequestID.String()) - //if c == nil { - // // todo: warn - // return - //} - // - //subm, ok := dm.submissionMap[dispute.RequestID.String()] - //if !ok { - // // todo: warn - // return - //} - // - //dm.disputeMap[dispute.RequestID.String()] = dispute - // - //if dispute.DisputeInitiator.Hex() == dm.ethClient.GetEthAddress().Hex() { - // return - //} - // - //submHashBytes := sha3.Sum256(subm.Data) - //localHashBytes := sha3.Sum256(c.Task.Payload) - //submHash := hex.EncodeToString(submHashBytes[:]) - //localHash := hex.EncodeToString(localHashBytes[:]) - //if submHash == localHash { - // err := dm.ethClient.VoteDispute(dispute.Dhash, false) - // if err != nil { - // logrus.Errorf(err.Error()) - // return - // } - //} - // - //err := dm.ethClient.VoteDispute(dispute.Dhash, true) - //if err != nil { - // logrus.Errorf(err.Error()) - // return - //} - // TODO refactor due to new architecture with blockchain + task, _, err := dm.findTaskAndBlockWithRequestID(dispute.RequestID.String()) + if err != nil { + logrus.Error(err) + return + } + + subm, ok := dm.submissionMap[dispute.RequestID.String()] + if !ok { + logrus.Warn("desired submission isn't found in map") + return + } + + dm.disputeMap[dispute.RequestID.String()] = dispute + + if dispute.DisputeInitiator.Hex() == dm.ethClient.GetEthAddress().Hex() { + return + } + + submHashBytes := sha3.Sum256(subm.Data) + localHashBytes := sha3.Sum256(task.Payload) + submHash := hex.EncodeToString(submHashBytes[:]) + localHash := hex.EncodeToString(localHashBytes[:]) + if submHash == localHash { + err := dm.ethClient.VoteDispute(dispute.Dhash, false) + if err != nil { + logrus.Errorf(err.Error()) + return + } + } + + err = dm.ethClient.VoteDispute(dispute.Dhash, true) + if err != nil { + logrus.Errorf(err.Error()) + return + } } diff --git a/node/node.go b/node/node.go index a3bae20..400e39d 100644 --- a/node/node.go +++ b/node/node.go @@ -9,6 +9,8 @@ import ( "os" "time" + "github.com/multiformats/go-multiaddr" + "github.com/asaskevich/EventBus" "github.com/fxamacker/cbor/v2" @@ -160,7 +162,14 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim r := provideP2PRPCClient(lhost) // initialize sync manager - sm, err := provideSyncManager(bus, bc, mp, r, baddrs[0], psb) // FIXME here we just pick up first bootstrap in list + + var baddr multiaddr.Multiaddr + if len(baddrs) == 0 { + baddr = nil + } else { + baddr = baddrs[0] + } + sm, err := provideSyncManager(bus, bc, mp, r, baddr, psb) // FIXME here we just pick up first bootstrap in list if err != nil { logrus.Fatal(err) } @@ -186,7 +195,7 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim logrus.Info("Random beacon subsystem has been initialized!") // initialize dispute subsystem - disputeManager, err := provideDisputeManager(context.TODO(), ethClient, consensusManager, config) + disputeManager, err := provideDisputeManager(context.TODO(), ethClient, consensusManager, config, bc) if err != nil { logrus.Fatal(err) } diff --git a/node/node_dep_providers.go b/node/node_dep_providers.go index 3ec1836..6ddc7e1 100644 --- a/node/node_dep_providers.go +++ b/node/node_dep_providers.go @@ -56,8 +56,8 @@ func provideCache(config *config.Config) cache.Cache { return backend } -func provideDisputeManager(ctx context.Context, ethClient *ethclient.EthereumClient, pcm *consensus.PBFTConsensusManager, cfg *config.Config) (*consensus.DisputeManager, error) { - return consensus.NewDisputeManager(ctx, ethClient, pcm, cfg.Ethereum.DisputeVoteWindow) +func provideDisputeManager(ctx context.Context, ethClient *ethclient.EthereumClient, pcm *consensus.PBFTConsensusManager, cfg *config.Config, bc *blockchain.BlockChain) (*consensus.DisputeManager, error) { + return consensus.NewDisputeManager(ctx, ethClient, pcm, cfg.Ethereum.DisputeVoteWindow, bc) } func provideMiner(peerID peer.ID, ethAddress common.Address, ethClient *ethclient.EthereumClient, privateKey crypto.PrivKey, mempool *pool.Mempool) *consensus.Miner { @@ -165,11 +165,16 @@ func provideMemPool() (*pool.Mempool, error) { } func provideSyncManager(bus EventBus.Bus, bp *blockchain.BlockChain, mp *pool.Mempool, r *gorpc.Client, bootstrap multiaddr.Multiaddr, psb *pubsub.PubSubRouter) (sync.SyncManager, error) { - addr, err := peer.AddrInfoFromP2pAddr(bootstrap) - if err != nil { - return nil, err + bootstrapPeerID := peer.ID("") + if bootstrap != nil { + addr, err := peer.AddrInfoFromP2pAddr(bootstrap) + if err != nil { + return nil, err + } + bootstrapPeerID = addr.ID } - return sync.NewSyncManager(bus, bp, mp, r, addr.ID, psb), nil + + return sync.NewSyncManager(bus, bp, mp, r, bootstrapPeerID, psb), nil } func provideP2PRPCClient(h host.Host) *gorpc.Client {