Add states for submission/dispute to make dispute manager more persistent

This commit is contained in:
ChronosX88 2021-08-24 18:18:38 +03:00
parent 6e657cd20c
commit 656a7f2b29
Signed by: ChronosXYZ
GPG Key ID: 085A69A82C8C511A
4 changed files with 113 additions and 62 deletions

View File

@ -3,8 +3,17 @@ package consensus
import ( import (
"context" "context"
"encoding/hex" "encoding/hex"
"math/big"
"time" "time"
"github.com/asaskevich/EventBus"
"github.com/ethereum/go-ethereum/common"
"github.com/Secured-Finance/dione/config"
"github.com/Secured-Finance/dione/cache"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
types2 "github.com/Secured-Finance/dione/blockchain/types" types2 "github.com/Secured-Finance/dione/blockchain/types"
@ -24,22 +33,42 @@ import (
type DisputeManager struct { type DisputeManager struct {
ctx context.Context ctx context.Context
bus EventBus.Bus
ethClient *ethclient.EthereumClient ethClient *ethclient.EthereumClient
pcm *PBFTConsensusManager pcm *PBFTConsensusManager
submissionMap map[string]*dioneOracle.DioneOracleSubmittedOracleRequest
disputeMap map[string]*dioneDispute.DioneDisputeNewDispute
voteWindow time.Duration voteWindow time.Duration
blockchain *blockchain.BlockChain blockchain *blockchain.BlockChain
submissionChan chan *dioneOracle.DioneOracleSubmittedOracleRequest submissionChan chan *dioneOracle.DioneOracleSubmittedOracleRequest
submissionSubscription event.Subscription submissionEthSubscription event.Subscription
submissionCache cache.Cache
disputesChan chan *dioneDispute.DioneDisputeNewDispute disputesChan chan *dioneDispute.DioneDisputeNewDispute
disputesSubscription event.Subscription disputeEthSubscription event.Subscription
disputeCache cache.Cache
} }
func NewDisputeManager(ctx context.Context, ethClient *ethclient.EthereumClient, pcm *PBFTConsensusManager, voteWindow int, bc *blockchain.BlockChain) (*DisputeManager, error) { type Dispute struct {
submissionChan, submSubscription, err := ethClient.SubscribeOnNewSubmittions(ctx) Dhash [32]byte
RequestID *big.Int
Miner common.Address
DisputeInitiator common.Address
Timestamp int64
Voted bool
Finished bool // if we are dispute initiator
}
type Submission struct {
ReqID *big.Int
Data []byte
Timestamp int64
Checked bool
}
func NewDisputeManager(bus EventBus.Bus, ethClient *ethclient.EthereumClient, bc *blockchain.BlockChain, cfg *config.Config) (*DisputeManager, error) {
ctx := context.TODO()
submissionChan, submSubscription, err := ethClient.SubscribeOnNewSubmissions(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -51,16 +80,16 @@ func NewDisputeManager(ctx context.Context, ethClient *ethclient.EthereumClient,
dm := &DisputeManager{ dm := &DisputeManager{
ethClient: ethClient, ethClient: ethClient,
pcm: pcm,
ctx: ctx, ctx: ctx,
submissionMap: map[string]*dioneOracle.DioneOracleSubmittedOracleRequest{}, bus: bus,
disputeMap: map[string]*dioneDispute.DioneDisputeNewDispute{}, voteWindow: time.Duration(cfg.Ethereum.DisputeVoteWindow) * time.Second,
voteWindow: time.Duration(voteWindow) * time.Second,
blockchain: bc, blockchain: bc,
submissionChan: submissionChan, submissionChan: submissionChan,
submissionSubscription: submSubscription, submissionEthSubscription: submSubscription,
submissionCache: cache.NewInMemoryCache(), // FIXME
disputesChan: disputesChan, disputesChan: disputesChan,
disputesSubscription: dispSubscription, disputeEthSubscription: dispSubscription,
disputeCache: cache.NewInMemoryCache(), // FIXME
} }
return dm, nil return dm, nil
@ -72,8 +101,8 @@ func (dm *DisputeManager) Run(ctx context.Context) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
{ {
dm.disputesSubscription.Unsubscribe() dm.submissionEthSubscription.Unsubscribe()
dm.disputesSubscription.Unsubscribe() dm.disputeEthSubscription.Unsubscribe()
return return
} }
case s := <-dm.submissionChan: case s := <-dm.submissionChan:
@ -90,6 +119,10 @@ func (dm *DisputeManager) Run(ctx context.Context) {
} }
func (dm *DisputeManager) onNewSubmission(submission *dioneOracle.DioneOracleSubmittedOracleRequest) { func (dm *DisputeManager) onNewSubmission(submission *dioneOracle.DioneOracleSubmittedOracleRequest) {
s := wrapSubmission(submission)
s.Timestamp = time.Now().Unix()
dm.submissionCache.Store(submission.ReqID.String(), s)
// find a block that contains the dione task with specified request id // find a block that contains the dione task with specified request id
task, block, err := dm.findTaskAndBlockWithRequestID(submission.ReqID.String()) task, block, err := dm.findTaskAndBlockWithRequestID(submission.ReqID.String())
if err != nil { if err != nil {
@ -97,15 +130,13 @@ func (dm *DisputeManager) onNewSubmission(submission *dioneOracle.DioneOracleSub
return return
} }
dm.submissionMap[submission.ReqID.String()] = submission submHashBytes := sha3.Sum256(s.Data)
submHashBytes := sha3.Sum256(submission.Data)
localHashBytes := sha3.Sum256(task.Payload) localHashBytes := sha3.Sum256(task.Payload)
submHash := hex.EncodeToString(submHashBytes[:]) submHash := hex.EncodeToString(submHashBytes[:])
localHash := hex.EncodeToString(localHashBytes[:]) localHash := hex.EncodeToString(localHashBytes[:])
if submHash != localHash { if submHash != localHash {
logrus.Debugf("submission of request id %s isn't valid - beginning dispute", submission.ReqID) logrus.Debugf("submission of request id %s isn't valid - beginning dispute", s.ReqID)
err := dm.ethClient.BeginDispute(block.Header.ProposerEth, submission.ReqID) err := dm.ethClient.BeginDispute(block.Header.ProposerEth, s.ReqID)
if err != nil { if err != nil {
logrus.Errorf(err.Error()) logrus.Errorf(err.Error())
return return
@ -118,23 +149,30 @@ func (dm *DisputeManager) onNewSubmission(submission *dioneOracle.DioneOracleSub
return return
case <-disputeFinishTimer.C: case <-disputeFinishTimer.C:
{ {
d, ok := dm.disputeMap[submission.ReqID.String()] var d Dispute
if !ok { err := dm.disputeCache.Get(s.ReqID.String(), &d)
logrus.Error("cannot finish dispute: it doesn't exist in manager's dispute map!")
return
}
err := dm.ethClient.FinishDispute(d.Dhash)
if err != nil { if err != nil {
logrus.Errorf(err.Error()) logrus.Errorf(err.Error())
return return
} }
err = dm.ethClient.FinishDispute(d.Dhash)
if err != nil {
logrus.Errorf(err.Error())
disputeFinishTimer.Stop() disputeFinishTimer.Stop()
return return
} }
disputeFinishTimer.Stop()
d.Finished = true
dm.disputeCache.Store(d.RequestID.String(), d)
return
}
} }
} }
}() }()
} }
s.Checked = true
dm.submissionCache.Store(s.ReqID.String(), s)
} }
func (dm *DisputeManager) findTaskAndBlockWithRequestID(requestID string) (*types.DioneTask, *types2.Block, error) { func (dm *DisputeManager) findTaskAndBlockWithRequestID(requestID string) (*types.DioneTask, *types2.Block, error) {
@ -167,39 +205,63 @@ func (dm *DisputeManager) findTaskAndBlockWithRequestID(requestID string) (*type
} }
func (dm *DisputeManager) onNewDispute(dispute *dioneDispute.DioneDisputeNewDispute) { func (dm *DisputeManager) onNewDispute(dispute *dioneDispute.DioneDisputeNewDispute) {
task, _, err := dm.findTaskAndBlockWithRequestID(dispute.RequestID.String()) d := wrapDispute(dispute)
d.Timestamp = time.Now().Unix()
dm.disputeCache.Store(d.RequestID.String(), d)
task, _, err := dm.findTaskAndBlockWithRequestID(d.RequestID.String())
if err != nil { if err != nil {
logrus.Error(err) logrus.Error(err)
return return
} }
subm, ok := dm.submissionMap[dispute.RequestID.String()] var s Submission
if !ok { err = dm.submissionCache.Get(d.RequestID.String(), &s)
logrus.Warn("desired submission isn't found in map") if err != nil {
logrus.Warnf("submission of request id %s isn't found in cache", d.RequestID.String())
return return
} }
dm.disputeMap[dispute.RequestID.String()] = dispute
if dispute.DisputeInitiator.Hex() == dm.ethClient.GetEthAddress().Hex() { if dispute.DisputeInitiator.Hex() == dm.ethClient.GetEthAddress().Hex() {
d.Voted = true
dm.disputeCache.Store(d.RequestID.String(), d)
return return
} }
submHashBytes := sha3.Sum256(subm.Data) submHashBytes := sha3.Sum256(s.Data)
localHashBytes := sha3.Sum256(task.Payload) localHashBytes := sha3.Sum256(task.Payload)
submHash := hex.EncodeToString(submHashBytes[:]) submHash := hex.EncodeToString(submHashBytes[:])
localHash := hex.EncodeToString(localHashBytes[:]) localHash := hex.EncodeToString(localHashBytes[:])
if submHash == localHash { if submHash == localHash {
err := dm.ethClient.VoteDispute(dispute.Dhash, false) err := dm.ethClient.VoteDispute(d.Dhash, false)
if err != nil {
logrus.Errorf(err.Error())
return
}
} else {
err = dm.ethClient.VoteDispute(d.Dhash, true)
if err != nil { if err != nil {
logrus.Errorf(err.Error()) logrus.Errorf(err.Error())
return return
} }
} }
err = dm.ethClient.VoteDispute(dispute.Dhash, true) d.Voted = true
if err != nil { dm.disputeCache.Store(dispute.RequestID.String(), d)
logrus.Errorf(err.Error()) }
return
func wrapDispute(d *dioneDispute.DioneDisputeNewDispute) *Dispute {
return &Dispute{
Dhash: d.Dhash,
RequestID: d.RequestID,
Miner: d.Miner,
DisputeInitiator: d.DisputeInitiator,
}
}
func wrapSubmission(s *dioneOracle.DioneOracleSubmittedOracleRequest) *Submission {
return &Submission{
ReqID: s.ReqID,
Data: s.Data,
} }
} }

View File

@ -226,7 +226,7 @@ func (c *EthereumClient) SubscribeOnNewDisputes(ctx context.Context) (chan *dion
return resChan, subscription, err return resChan, subscription, err
} }
func (c *EthereumClient) SubscribeOnNewSubmittions(ctx context.Context) (chan *dioneOracle.DioneOracleSubmittedOracleRequest, event.Subscription, error) { func (c *EthereumClient) SubscribeOnNewSubmissions(ctx context.Context) (chan *dioneOracle.DioneOracleSubmittedOracleRequest, event.Subscription, error) {
resChan := make(chan *dioneOracle.DioneOracleSubmittedOracleRequest) resChan := make(chan *dioneOracle.DioneOracleSubmittedOracleRequest)
requestsFilter := c.dioneOracle.Contract.DioneOracleFilterer requestsFilter := c.dioneOracle.Contract.DioneOracleFilterer
subscription, err := requestsFilter.WatchSubmittedOracleRequest(&bind.WatchOpts{ subscription, err := requestsFilter.WatchSubmittedOracleRequest(&bind.WatchOpts{

View File

@ -207,7 +207,7 @@ func Start() {
provideNetworkService, provideNetworkService,
provideDirectRPCClient, provideDirectRPCClient,
provideConsensusManager, provideConsensusManager,
provideDisputeManager, consensus.NewDisputeManager,
), ),
fx.Invoke( fx.Invoke(
configureLogger, configureLogger,

View File

@ -68,17 +68,6 @@ func provideCache(config *config.Config) cache.Cache {
return backend return backend
} }
func provideDisputeManager(ethClient *ethclient.EthereumClient, pcm *consensus.PBFTConsensusManager, cfg *config.Config, bc *blockchain.BlockChain) *consensus.DisputeManager {
dm, err := consensus.NewDisputeManager(context.TODO(), ethClient, pcm, cfg.Ethereum.DisputeVoteWindow, bc)
if err != nil {
logrus.Fatal(err)
}
logrus.Info("Dispute subsystem has been initialized!")
return dm
}
func provideDrandBeacon(ps *pubsub.PubSubRouter, bus EventBus.Bus) *drand2.DrandBeacon { func provideDrandBeacon(ps *pubsub.PubSubRouter, bus EventBus.Bus) *drand2.DrandBeacon {
db, err := drand2.NewDrandBeacon(ps.Pubsub, bus) db, err := drand2.NewDrandBeacon(ps.Pubsub, bus)
if err != nil { if err != nil {