Implement request event validation from cache

This commit is contained in:
ChronosX88 2020-12-02 17:42:02 +04:00
parent 027f060703
commit 221cf45dad
Signed by: ChronosXYZ
GPG Key ID: 085A69A82C8C511A
4 changed files with 62 additions and 23 deletions

View File

@ -4,6 +4,10 @@ import (
"math/big" "math/big"
"sync" "sync"
"github.com/Secured-Finance/dione/node"
oracleEmitter "github.com/Secured-Finance/dione/contracts/oracleemitter"
"github.com/Secured-Finance/dione/consensus/types" "github.com/Secured-Finance/dione/consensus/types"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -32,11 +36,11 @@ type ConsensusData struct {
alreadySubmitted bool alreadySubmitted bool
} }
func NewPBFTConsensusManager(psb *pubsub.PubSubRouter, minApprovals int, privKey []byte, ethereumClient *ethclient.EthereumClient, miner *Miner) *PBFTConsensusManager { func NewPBFTConsensusManager(psb *pubsub.PubSubRouter, minApprovals int, privKey []byte, ethereumClient *ethclient.EthereumClient, miner *Miner, evc *node.EventLogCache) *PBFTConsensusManager {
pcm := &PBFTConsensusManager{} pcm := &PBFTConsensusManager{}
pcm.psb = psb pcm.psb = psb
pcm.miner = miner pcm.miner = miner
pcm.prePreparePool = NewPrePreparePool(miner) pcm.prePreparePool = NewPrePreparePool(miner, evc)
pcm.preparePool = NewPreparePool() pcm.preparePool = NewPreparePool()
pcm.commitPool = NewCommitPool() pcm.commitPool = NewCommitPool()
pcm.minApprovals = minApprovals pcm.minApprovals = minApprovals
@ -49,11 +53,17 @@ func NewPBFTConsensusManager(psb *pubsub.PubSubRouter, minApprovals int, privKey
return pcm return pcm
} }
func (pcm *PBFTConsensusManager) Propose(consensusID string, task types2.DioneTask, requestID *big.Int, callbackAddress common.Address) error { func (pcm *PBFTConsensusManager) Propose(consensusID string, task types2.DioneTask, requestEvent *oracleEmitter.OracleEmitterNewOracleRequest) error {
pcm.consensusInfo[consensusID] = &ConsensusData{} pcm.consensusInfo[consensusID] = &ConsensusData{}
reqIDRaw := requestID.String()
callbackAddressHex := callbackAddress.Hex() prePrepareMsg, err := pcm.prePreparePool.CreatePrePrepare(
prePrepareMsg, err := pcm.prePreparePool.CreatePrePrepare(consensusID, task, reqIDRaw, callbackAddressHex, pcm.privKey) consensusID,
task,
requestEvent.RequestID.String(),
requestEvent.CallbackAddress.Hex(),
string(requestEvent.CallbackMethodID[:]),
pcm.privKey,
)
if err != nil { if err != nil {
return err return err
} }

View File

@ -3,6 +3,10 @@ package consensus
import ( import (
"fmt" "fmt"
oracleEmitter "github.com/Secured-Finance/dione/contracts/oracleemitter"
"github.com/Secured-Finance/dione/node"
"github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/crypto"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -17,22 +21,25 @@ import (
type PrePreparePool struct { type PrePreparePool struct {
prePrepareMsgs map[string][]*types2.Message prePrepareMsgs map[string][]*types2.Message
miner *Miner miner *Miner
eventLogCache *node.EventLogCache
} }
func NewPrePreparePool(miner *Miner) *PrePreparePool { func NewPrePreparePool(miner *Miner, evc *node.EventLogCache) *PrePreparePool {
return &PrePreparePool{ return &PrePreparePool{
prePrepareMsgs: map[string][]*types2.Message{}, prePrepareMsgs: map[string][]*types2.Message{},
miner: miner, miner: miner,
eventLogCache: evc,
} }
} }
func (pp *PrePreparePool) CreatePrePrepare(consensusID string, task types.DioneTask, requestID string, callbackAddress string, privateKey []byte) (*types2.Message, error) { func (pp *PrePreparePool) CreatePrePrepare(consensusID string, task types.DioneTask, requestID, callbackAddress, callbackMethodID string, privateKey []byte) (*types2.Message, error) {
var message types2.Message var message types2.Message
message.Type = types2.MessageTypePrePrepare message.Type = types2.MessageTypePrePrepare
var consensusMsg types2.ConsensusMessage var consensusMsg types2.ConsensusMessage
consensusMsg.ConsensusID = consensusID consensusMsg.ConsensusID = consensusID
consensusMsg.RequestID = requestID consensusMsg.RequestID = requestID
consensusMsg.CallbackAddress = callbackAddress consensusMsg.CallbackAddress = callbackAddress
consensusMsg.CallbackMethodID = callbackMethodID
consensusMsg.Task = task consensusMsg.Task = task
cHash, err := hashstructure.Hash(consensusMsg, hashstructure.FormatV2, nil) cHash, err := hashstructure.Hash(consensusMsg, hashstructure.FormatV2, nil)
if err != nil { if err != nil {
@ -70,6 +77,24 @@ func (ppp *PrePreparePool) IsValidPrePrepare(prePrepare *types2.Message) bool {
} }
///////////////////////////////// /////////////////////////////////
// === verify if request exists in event log cache ===
requestEventPlain, err := ppp.eventLogCache.Get("request_" + consensusMsg.RequestID)
if err != nil {
logrus.Errorf("the incoming request task event doesn't exist in the EVC, or is broken: %v", err)
return false
}
requestEvent := requestEventPlain.(*oracleEmitter.OracleEmitterNewOracleRequest)
if requestEvent.CallbackAddress.String() != consensusMsg.CallbackAddress ||
string(requestEvent.CallbackMethodID[:]) != consensusMsg.CallbackMethodID ||
requestEvent.OriginChain != consensusMsg.Task.OriginChain ||
requestEvent.RequestType != consensusMsg.Task.RequestType ||
requestEvent.RequestParams != consensusMsg.Task.RequestParams {
logrus.Errorf("the incoming task and cached request event don't match!")
return false
}
/////////////////////////////////
// === verify election proof wincount preliminarily === // === verify election proof wincount preliminarily ===
if consensusMsg.Task.ElectionProof.WinCount < 1 { if consensusMsg.Task.ElectionProof.WinCount < 1 {
logrus.Error("miner isn't a winner!") logrus.Error("miner isn't a winner!")

View File

@ -21,6 +21,7 @@ type ConsensusMessage struct {
Signature []byte `hash:"-"` Signature []byte `hash:"-"`
RequestID string RequestID string
CallbackAddress string CallbackAddress string
CallbackMethodID string
Task types.DioneTask Task types.DioneTask
} }

View File

@ -105,7 +105,10 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim
miner := provideMiner(n.Host.ID(), *n.Ethereum.GetEthAddress(), n.Beacon, n.Ethereum, rawPrivKey) miner := provideMiner(n.Host.ID(), *n.Ethereum.GetEthAddress(), n.Beacon, n.Ethereum, rawPrivKey)
n.Miner = miner n.Miner = miner
cManager := provideConsensusManager(psb, miner, ethClient, rawPrivKey, n.Config.ConsensusMinApprovals) eventLogCache := provideEventLogCache()
n.EventLogCache = eventLogCache
cManager := provideConsensusManager(psb, miner, ethClient, rawPrivKey, n.Config.ConsensusMinApprovals, eventLogCache)
n.ConsensusManager = cManager n.ConsensusManager = cManager
wallet, err := provideWallet(n.Host.ID(), rawPrivKey) wallet, err := provideWallet(n.Host.ID(), rawPrivKey)
@ -114,9 +117,6 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim
} }
n.Wallet = wallet n.Wallet = wallet
eventLogCache := provideEventLogCache()
n.EventLogCache = eventLogCache
return n, nil return n, nil
} }
@ -194,15 +194,18 @@ func (n *Node) subscribeOnEthContractsAsync(ctx context.Context) {
logrus.Errorf("Failed to store new request event to event log cache: %v", err) logrus.Errorf("Failed to store new request event to event log cache: %v", err)
} }
task, err := n.Miner.MineTask(ctx, event) logrus.Info("Let's wait a little so that all nodes have time to receive the request and cache it")
time.Sleep(5 * time.Second)
task, err := n.Miner.MineTask(context.TODO(), event)
if err != nil { if err != nil {
logrus.Fatal("Failed to mine task, exiting... ", err) logrus.Fatal("Failed to mine task, exiting... ", err)
} }
if task == nil { if task == nil {
continue continue
} }
logrus.Infof("Started new consensus round with ID: %s", event.RequestID.String()) logrus.Infof("Proposed new Dione task with ID: %s", event.RequestID.String())
err = n.ConsensusManager.Propose(event.RequestID.String(), *task, event.RequestID, event.CallbackAddress) err = n.ConsensusManager.Propose(event.RequestID.String(), *task, event)
if err != nil { if err != nil {
logrus.Errorf("Failed to propose task: %w", err) logrus.Errorf("Failed to propose task: %w", err)
} }
@ -284,8 +287,8 @@ func providePubsubRouter(lhost host.Host, config *config.Config) *pubsub2.PubSub
return pubsub2.NewPubSubRouter(lhost, config.PubSub.ServiceTopicName) return pubsub2.NewPubSubRouter(lhost, config.PubSub.ServiceTopicName)
} }
func provideConsensusManager(psb *pubsub2.PubSubRouter, miner *consensus.Miner, ethClient *ethclient.EthereumClient, privateKey []byte, minApprovals int) *consensus.PBFTConsensusManager { func provideConsensusManager(psb *pubsub2.PubSubRouter, miner *consensus.Miner, ethClient *ethclient.EthereumClient, privateKey []byte, minApprovals int, evc *EventLogCache) *consensus.PBFTConsensusManager {
return consensus.NewPBFTConsensusManager(psb, minApprovals, privateKey, ethClient, miner) return consensus.NewPBFTConsensusManager(psb, minApprovals, privateKey, ethClient, miner, evc)
} }
func provideLibp2pNode(config *config.Config, privateKey crypto.PrivKey, pexDiscoveryUpdateTime time.Duration) (host.Host, *pex.PEXDiscovery, error) { func provideLibp2pNode(config *config.Config, privateKey crypto.PrivKey, pexDiscoveryUpdateTime time.Duration) (host.Host, *pex.PEXDiscovery, error) {