From 656a7f2b2946bc294cccfe8ae70f9fb8b5354367 Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Tue, 24 Aug 2021 18:18:38 +0300 Subject: [PATCH] Add states for submission/dispute to make dispute manager more persistent --- consensus/dispute_manager.go | 160 ++++++++++++++++++++++++----------- ethclient/ethereum.go | 2 +- node/node.go | 2 +- node/node_dep_providers.go | 11 --- 4 files changed, 113 insertions(+), 62 deletions(-) diff --git a/consensus/dispute_manager.go b/consensus/dispute_manager.go index 28e3ba6..1664c14 100644 --- a/consensus/dispute_manager.go +++ b/consensus/dispute_manager.go @@ -3,8 +3,17 @@ package consensus import ( "context" "encoding/hex" + "math/big" "time" + "github.com/asaskevich/EventBus" + + "github.com/ethereum/go-ethereum/common" + + "github.com/Secured-Finance/dione/config" + + "github.com/Secured-Finance/dione/cache" + "github.com/ethereum/go-ethereum/event" types2 "github.com/Secured-Finance/dione/blockchain/types" @@ -23,23 +32,43 @@ import ( ) type DisputeManager struct { - ctx context.Context - ethClient *ethclient.EthereumClient - pcm *PBFTConsensusManager - submissionMap map[string]*dioneOracle.DioneOracleSubmittedOracleRequest - disputeMap map[string]*dioneDispute.DioneDisputeNewDispute - voteWindow time.Duration - blockchain *blockchain.BlockChain + ctx context.Context + bus EventBus.Bus + ethClient *ethclient.EthereumClient + pcm *PBFTConsensusManager + voteWindow time.Duration + blockchain *blockchain.BlockChain - submissionChan chan *dioneOracle.DioneOracleSubmittedOracleRequest - submissionSubscription event.Subscription + submissionChan chan *dioneOracle.DioneOracleSubmittedOracleRequest + submissionEthSubscription event.Subscription + submissionCache cache.Cache - disputesChan chan *dioneDispute.DioneDisputeNewDispute - disputesSubscription event.Subscription + disputesChan chan *dioneDispute.DioneDisputeNewDispute + disputeEthSubscription event.Subscription + disputeCache cache.Cache } -func NewDisputeManager(ctx context.Context, ethClient *ethclient.EthereumClient, pcm *PBFTConsensusManager, voteWindow int, bc *blockchain.BlockChain) (*DisputeManager, error) { - submissionChan, submSubscription, err := ethClient.SubscribeOnNewSubmittions(ctx) +type Dispute struct { + Dhash [32]byte + RequestID *big.Int + Miner common.Address + DisputeInitiator common.Address + Timestamp int64 + Voted bool + Finished bool // if we are dispute initiator +} + +type Submission struct { + ReqID *big.Int + Data []byte + Timestamp int64 + Checked bool +} + +func NewDisputeManager(bus EventBus.Bus, ethClient *ethclient.EthereumClient, bc *blockchain.BlockChain, cfg *config.Config) (*DisputeManager, error) { + ctx := context.TODO() + + submissionChan, submSubscription, err := ethClient.SubscribeOnNewSubmissions(ctx) if err != nil { return nil, err } @@ -50,17 +79,17 @@ func NewDisputeManager(ctx context.Context, ethClient *ethclient.EthereumClient, } dm := &DisputeManager{ - ethClient: ethClient, - pcm: pcm, - ctx: ctx, - submissionMap: map[string]*dioneOracle.DioneOracleSubmittedOracleRequest{}, - disputeMap: map[string]*dioneDispute.DioneDisputeNewDispute{}, - voteWindow: time.Duration(voteWindow) * time.Second, - blockchain: bc, - submissionChan: submissionChan, - submissionSubscription: submSubscription, - disputesChan: disputesChan, - disputesSubscription: dispSubscription, + ethClient: ethClient, + ctx: ctx, + bus: bus, + voteWindow: time.Duration(cfg.Ethereum.DisputeVoteWindow) * time.Second, + blockchain: bc, + submissionChan: submissionChan, + submissionEthSubscription: submSubscription, + submissionCache: cache.NewInMemoryCache(), // FIXME + disputesChan: disputesChan, + disputeEthSubscription: dispSubscription, + disputeCache: cache.NewInMemoryCache(), // FIXME } return dm, nil @@ -72,8 +101,8 @@ func (dm *DisputeManager) Run(ctx context.Context) { select { case <-ctx.Done(): { - dm.disputesSubscription.Unsubscribe() - dm.disputesSubscription.Unsubscribe() + dm.submissionEthSubscription.Unsubscribe() + dm.disputeEthSubscription.Unsubscribe() return } case s := <-dm.submissionChan: @@ -90,6 +119,10 @@ func (dm *DisputeManager) Run(ctx context.Context) { } func (dm *DisputeManager) onNewSubmission(submission *dioneOracle.DioneOracleSubmittedOracleRequest) { + s := wrapSubmission(submission) + s.Timestamp = time.Now().Unix() + dm.submissionCache.Store(submission.ReqID.String(), s) + // find a block that contains the dione task with specified request id task, block, err := dm.findTaskAndBlockWithRequestID(submission.ReqID.String()) if err != nil { @@ -97,15 +130,13 @@ func (dm *DisputeManager) onNewSubmission(submission *dioneOracle.DioneOracleSub return } - dm.submissionMap[submission.ReqID.String()] = submission - - submHashBytes := sha3.Sum256(submission.Data) + submHashBytes := sha3.Sum256(s.Data) localHashBytes := sha3.Sum256(task.Payload) submHash := hex.EncodeToString(submHashBytes[:]) localHash := hex.EncodeToString(localHashBytes[:]) if submHash != localHash { - logrus.Debugf("submission of request id %s isn't valid - beginning dispute", submission.ReqID) - err := dm.ethClient.BeginDispute(block.Header.ProposerEth, submission.ReqID) + logrus.Debugf("submission of request id %s isn't valid - beginning dispute", s.ReqID) + err := dm.ethClient.BeginDispute(block.Header.ProposerEth, s.ReqID) if err != nil { logrus.Errorf(err.Error()) return @@ -118,23 +149,30 @@ func (dm *DisputeManager) onNewSubmission(submission *dioneOracle.DioneOracleSub return case <-disputeFinishTimer.C: { - d, ok := dm.disputeMap[submission.ReqID.String()] - if !ok { - logrus.Error("cannot finish dispute: it doesn't exist in manager's dispute map!") - return - } - err := dm.ethClient.FinishDispute(d.Dhash) + var d Dispute + err := dm.disputeCache.Get(s.ReqID.String(), &d) if err != nil { logrus.Errorf(err.Error()) return } + err = dm.ethClient.FinishDispute(d.Dhash) + if err != nil { + logrus.Errorf(err.Error()) + disputeFinishTimer.Stop() + return + } disputeFinishTimer.Stop() + + d.Finished = true + dm.disputeCache.Store(d.RequestID.String(), d) return } } } }() } + s.Checked = true + dm.submissionCache.Store(s.ReqID.String(), s) } func (dm *DisputeManager) findTaskAndBlockWithRequestID(requestID string) (*types.DioneTask, *types2.Block, error) { @@ -167,39 +205,63 @@ func (dm *DisputeManager) findTaskAndBlockWithRequestID(requestID string) (*type } func (dm *DisputeManager) onNewDispute(dispute *dioneDispute.DioneDisputeNewDispute) { - task, _, err := dm.findTaskAndBlockWithRequestID(dispute.RequestID.String()) + d := wrapDispute(dispute) + d.Timestamp = time.Now().Unix() + dm.disputeCache.Store(d.RequestID.String(), d) + + task, _, err := dm.findTaskAndBlockWithRequestID(d.RequestID.String()) if err != nil { logrus.Error(err) return } - subm, ok := dm.submissionMap[dispute.RequestID.String()] - if !ok { - logrus.Warn("desired submission isn't found in map") + var s Submission + err = dm.submissionCache.Get(d.RequestID.String(), &s) + if err != nil { + logrus.Warnf("submission of request id %s isn't found in cache", d.RequestID.String()) return } - dm.disputeMap[dispute.RequestID.String()] = dispute - if dispute.DisputeInitiator.Hex() == dm.ethClient.GetEthAddress().Hex() { + d.Voted = true + dm.disputeCache.Store(d.RequestID.String(), d) return } - submHashBytes := sha3.Sum256(subm.Data) + submHashBytes := sha3.Sum256(s.Data) localHashBytes := sha3.Sum256(task.Payload) submHash := hex.EncodeToString(submHashBytes[:]) localHash := hex.EncodeToString(localHashBytes[:]) if submHash == localHash { - err := dm.ethClient.VoteDispute(dispute.Dhash, false) + err := dm.ethClient.VoteDispute(d.Dhash, false) + if err != nil { + logrus.Errorf(err.Error()) + return + } + } else { + err = dm.ethClient.VoteDispute(d.Dhash, true) if err != nil { logrus.Errorf(err.Error()) return } } - err = dm.ethClient.VoteDispute(dispute.Dhash, true) - if err != nil { - logrus.Errorf(err.Error()) - return + d.Voted = true + dm.disputeCache.Store(dispute.RequestID.String(), d) +} + +func wrapDispute(d *dioneDispute.DioneDisputeNewDispute) *Dispute { + return &Dispute{ + Dhash: d.Dhash, + RequestID: d.RequestID, + Miner: d.Miner, + DisputeInitiator: d.DisputeInitiator, + } +} + +func wrapSubmission(s *dioneOracle.DioneOracleSubmittedOracleRequest) *Submission { + return &Submission{ + ReqID: s.ReqID, + Data: s.Data, } } diff --git a/ethclient/ethereum.go b/ethclient/ethereum.go index f5dc312..2b0dc9d 100644 --- a/ethclient/ethereum.go +++ b/ethclient/ethereum.go @@ -226,7 +226,7 @@ func (c *EthereumClient) SubscribeOnNewDisputes(ctx context.Context) (chan *dion return resChan, subscription, err } -func (c *EthereumClient) SubscribeOnNewSubmittions(ctx context.Context) (chan *dioneOracle.DioneOracleSubmittedOracleRequest, event.Subscription, error) { +func (c *EthereumClient) SubscribeOnNewSubmissions(ctx context.Context) (chan *dioneOracle.DioneOracleSubmittedOracleRequest, event.Subscription, error) { resChan := make(chan *dioneOracle.DioneOracleSubmittedOracleRequest) requestsFilter := c.dioneOracle.Contract.DioneOracleFilterer subscription, err := requestsFilter.WatchSubmittedOracleRequest(&bind.WatchOpts{ diff --git a/node/node.go b/node/node.go index 8526b2a..76a5727 100644 --- a/node/node.go +++ b/node/node.go @@ -207,7 +207,7 @@ func Start() { provideNetworkService, provideDirectRPCClient, provideConsensusManager, - provideDisputeManager, + consensus.NewDisputeManager, ), fx.Invoke( configureLogger, diff --git a/node/node_dep_providers.go b/node/node_dep_providers.go index 73ce43c..1c0d083 100644 --- a/node/node_dep_providers.go +++ b/node/node_dep_providers.go @@ -68,17 +68,6 @@ func provideCache(config *config.Config) cache.Cache { return backend } -func provideDisputeManager(ethClient *ethclient.EthereumClient, pcm *consensus.PBFTConsensusManager, cfg *config.Config, bc *blockchain.BlockChain) *consensus.DisputeManager { - dm, err := consensus.NewDisputeManager(context.TODO(), ethClient, pcm, cfg.Ethereum.DisputeVoteWindow, bc) - if err != nil { - logrus.Fatal(err) - } - - logrus.Info("Dispute subsystem has been initialized!") - - return dm -} - func provideDrandBeacon(ps *pubsub.PubSubRouter, bus EventBus.Bus) *drand2.DrandBeacon { db, err := drand2.NewDrandBeacon(ps.Pubsub, bus) if err != nil {