316 lines
9.8 KiB
Go
316 lines
9.8 KiB
Go
package consensus
|
|
|
|
import (
|
|
"math/big"
|
|
"sync"
|
|
|
|
"github.com/Secured-Finance/dione/sigs"
|
|
|
|
"github.com/Secured-Finance/dione/consensus/types"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
|
|
"github.com/Secured-Finance/dione/ethclient"
|
|
"github.com/sirupsen/logrus"
|
|
|
|
"github.com/Secured-Finance/dione/pubsub"
|
|
types2 "github.com/Secured-Finance/dione/types"
|
|
)
|
|
|
|
type PBFTConsensusManager struct {
|
|
psb *pubsub.PubSubRouter
|
|
minApprovals int
|
|
privKey []byte
|
|
prePreparePool *PrePreparePool
|
|
preparePool *PreparePool
|
|
commitPool *CommitPool
|
|
consensusInfo map[string]*ConsensusData
|
|
ethereumClient *ethclient.EthereumClient
|
|
}
|
|
|
|
type ConsensusData struct {
|
|
// preparedCount int
|
|
// commitCount int
|
|
mutex sync.Mutex
|
|
alreadySubmitted bool
|
|
// result string
|
|
// test bool
|
|
// onConsensusFinishCallback func(finalData string)
|
|
}
|
|
|
|
func NewPBFTConsensusManager(psb *pubsub.PubSubRouter, minApprovals int, privKey []byte, ethereumClient *ethclient.EthereumClient) *PBFTConsensusManager {
|
|
pcm := &PBFTConsensusManager{}
|
|
pcm.psb = psb
|
|
pcm.prePreparePool = NewPrePreparePool()
|
|
pcm.preparePool = NewPreparePool()
|
|
pcm.commitPool = NewCommitPool()
|
|
pcm.minApprovals = minApprovals
|
|
pcm.privKey = privKey
|
|
pcm.ethereumClient = ethereumClient
|
|
pcm.consensusInfo = map[string]*ConsensusData{}
|
|
pcm.psb.Hook(types.MessageTypePrePrepare, pcm.handlePrePrepare)
|
|
pcm.psb.Hook(types.MessageTypePrepare, pcm.handlePrepare)
|
|
pcm.psb.Hook(types.MessageTypeCommit, pcm.handleCommit)
|
|
return pcm
|
|
}
|
|
|
|
//func (pcm *PBFTConsensusManager) NewTestConsensus(data string, consensusID string, onConsensusFinishCallback func(finalData string)) {
|
|
// //consensusID := uuid.New().String()
|
|
// cData := &ConsensusData{}
|
|
// cData.test = true
|
|
// cData.onConsensusFinishCallback = onConsensusFinishCallback
|
|
// pcm.Consensuses[consensusID] = cData
|
|
//
|
|
// // here we will create DioneTask
|
|
//
|
|
// msg := models.Message{}
|
|
// msg.Type = "prepared"
|
|
// msg.Payload = make(map[string]interface{})
|
|
// msg.Payload["consensusID"] = consensusID
|
|
// msg.Payload["data"] = data
|
|
// sign, err := sigs.Sign(types.SigTypeEd25519, pcm.privKey, []byte(data))
|
|
// if err != nil {
|
|
// logrus.Warnf("failed to sign data: %w", err)
|
|
// return
|
|
// }
|
|
// msg.Payload["signature"] = string(sign.Data)
|
|
// pcm.psb.BroadcastToServiceTopic(&msg)
|
|
//
|
|
// cData.State = ConsensusPrePrepared
|
|
// logrus.Debug("started new consensus: " + consensusID)
|
|
//}
|
|
//
|
|
//func (pcm *PBFTConsensusManager) handlePrePrepareMessage(sender peer.ID, message *models.Message) {
|
|
// consensusID := message.Payload["consensusID"].(string)
|
|
// if _, ok := pcm.Consensuses[consensusID]; !ok {
|
|
// logrus.Warn("Unknown consensus ID,: " + consensusID + ", creating consensusInfo")
|
|
// pcm.Consensuses[consensusID] = &ConsensusData{
|
|
// State: ConsensusPrePrepared,
|
|
// onConsensusFinishCallback: func(finalData string) {},
|
|
// }
|
|
// }
|
|
// data := pcm.Consensuses[consensusID]
|
|
// logrus.Debug("received pre_prepare msg")
|
|
//}
|
|
//
|
|
//func (pcm *PBFTConsensusManager) handlePreparedMessage(sender peer.ID, message *models.Message) {
|
|
// // TODO add check on view of the message
|
|
// consensusID := message.Payload["consensusID"].(string)
|
|
// if _, ok := pcm.Consensuses[consensusID]; !ok {
|
|
// logrus.Warn("Unknown consensus ID,: " + consensusID + ", creating consensusInfo")
|
|
// pcm.Consensuses[consensusID] = &ConsensusData{
|
|
// State: ConsensusPrePrepared,
|
|
// onConsensusFinishCallback: func(finalData string) {},
|
|
// }
|
|
// }
|
|
// data := pcm.Consensuses[consensusID]
|
|
// logrus.Debug("received prepared msg")
|
|
// //data := pcm.Consensuses[consensusID]
|
|
//
|
|
// // TODO
|
|
// // here we can validate miner which produced this task, is he winner, and so on
|
|
// // validation steps:
|
|
// // 1. validate sender eligibility to mine (check if it has minimal stake)
|
|
// // 2. validate sender wincount
|
|
// // 3. validate randomness
|
|
// // 4. validate vrf
|
|
// // 5. validate payload signature
|
|
// // 6. validate transaction (get from rpc client and compare with received)
|
|
//
|
|
// signStr := message.Payload["signature"].(string)
|
|
// signRaw := []byte(signStr)
|
|
// err := sigs.Verify(&types.Signature{Data: signRaw, Type: types.SigTypeEd25519}, sender, message.Payload["data"].([]byte))
|
|
// if err != nil {
|
|
// logrus.Warn("failed to verify data signature")
|
|
// return
|
|
// }
|
|
//
|
|
// data.mutex.Lock()
|
|
// defer data.mutex.Unlock()
|
|
// data.preparedCount++
|
|
//
|
|
// if data.preparedCount > 2*pcm.maxFaultNodes+1 {
|
|
// msg := models.Message{}
|
|
// msg.Payload = make(map[string]interface{})
|
|
// msg.Type = "commit"
|
|
// msg.Payload["consensusID"] = consensusID
|
|
// msg.Payload["data"] = message.Payload["data"]
|
|
// sign, err := sigs.Sign(types.SigTypeEd25519, pcm.privKey, message.Payload["data"].([]byte))
|
|
// if err != nil {
|
|
// logrus.Warnf("failed to sign data: %w", err)
|
|
// return
|
|
// }
|
|
// msg.Payload["signature"] = string(sign.Data)
|
|
// err = pcm.psb.BroadcastToServiceTopic(&msg)
|
|
// if err != nil {
|
|
// logrus.Warn("Unable to send COMMIT message: " + err.Error())
|
|
// return
|
|
// }
|
|
// data.State = ConsensusPrepared
|
|
// }
|
|
//}
|
|
//
|
|
//func (pcm *PBFTConsensusManager) handleCommitMessage(sender peer.ID, message *models.Message) {
|
|
// // TODO add check on view of the message
|
|
// // TODO add validation of data by hash to this stage
|
|
// consensusID := message.Payload["consensusID"].(string)
|
|
// if _, ok := pcm.Consensuses[consensusID]; !ok {
|
|
// logrus.Warn("Unknown consensus ID: " + consensusID)
|
|
// return
|
|
// }
|
|
// data := pcm.Consensuses[consensusID]
|
|
//
|
|
// data.mutex.Lock()
|
|
// defer data.mutex.Unlock()
|
|
// if data.State == ConsensusCommitted {
|
|
// logrus.Debug("consensus already finished, dropping COMMIT message")
|
|
// return
|
|
// }
|
|
//
|
|
// logrus.Debug("received commit msg")
|
|
//
|
|
// signStr := message.Payload["signature"].(string)
|
|
// signRaw := []byte(signStr)
|
|
// err := sigs.Verify(&types.Signature{Data: signRaw, Type: types.SigTypeEd25519}, sender, message.Payload["data"].([]byte))
|
|
// if err != nil {
|
|
// logrus.Warn("failed to verify data signature")
|
|
// return
|
|
// }
|
|
//
|
|
// data.commitCount++
|
|
//
|
|
// if data.commitCount > 2*pcm.maxFaultNodes+1 {
|
|
// data.State = ConsensusCommitted
|
|
// data.result = message.Payload["data"].(string)
|
|
// logrus.Debug("consensus successfully finished with result: " + data.result)
|
|
// data.onConsensusFinishCallback(data.result)
|
|
// }
|
|
//}
|
|
|
|
func (pcm *PBFTConsensusManager) Propose(consensusID, data string, requestID *big.Int, callbackAddress common.Address) error {
|
|
pcm.consensusInfo[consensusID] = &ConsensusData{}
|
|
reqIDRaw := requestID.String()
|
|
callbackAddressHex := callbackAddress.Hex()
|
|
prePrepareMsg, err := pcm.prePreparePool.CreatePrePrepare(consensusID, data, reqIDRaw, callbackAddressHex, pcm.privKey)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
pcm.psb.BroadcastToServiceTopic(prePrepareMsg)
|
|
return nil
|
|
}
|
|
|
|
func (pcm *PBFTConsensusManager) handlePrePrepare(message *types.Message) {
|
|
if pcm.prePreparePool.IsExistingPrePrepare(message) {
|
|
logrus.Debug("received existing pre_prepare msg, dropping...")
|
|
return
|
|
}
|
|
if !pcm.prePreparePool.IsValidPrePrepare(message) {
|
|
logrus.Debug("received invalid pre_prepare msg, dropping...")
|
|
return
|
|
}
|
|
|
|
pcm.prePreparePool.AddPrePrepare(message)
|
|
|
|
err := pcm.resignMessage(message)
|
|
if err != nil {
|
|
logrus.Errorf(err.Error())
|
|
return
|
|
}
|
|
err = pcm.psb.BroadcastToServiceTopic(message)
|
|
if err != nil {
|
|
logrus.Errorf(err.Error())
|
|
return
|
|
}
|
|
|
|
prepareMsg, err := pcm.preparePool.CreatePrepare(message, pcm.privKey)
|
|
if err != nil {
|
|
logrus.Errorf("failed to create prepare message: %w", err)
|
|
}
|
|
pcm.psb.BroadcastToServiceTopic(prepareMsg)
|
|
}
|
|
|
|
func (pcm *PBFTConsensusManager) handlePrepare(message *types.Message) {
|
|
if pcm.preparePool.IsExistingPrepare(message) {
|
|
logrus.Debug("received existing prepare msg, dropping...")
|
|
return
|
|
}
|
|
if !pcm.preparePool.IsValidPrepare(message) {
|
|
logrus.Debug("received invalid prepare msg, dropping...")
|
|
return
|
|
}
|
|
|
|
pcm.preparePool.AddPrepare(message)
|
|
err := pcm.resignMessage(message)
|
|
if err != nil {
|
|
logrus.Errorf(err.Error())
|
|
return
|
|
}
|
|
err = pcm.psb.BroadcastToServiceTopic(message)
|
|
if err != nil {
|
|
logrus.Errorf(err.Error())
|
|
return
|
|
}
|
|
|
|
if pcm.preparePool.PrepareSize(message.Payload.ConsensusID) >= pcm.minApprovals {
|
|
commitMsg, err := pcm.commitPool.CreateCommit(message, pcm.privKey)
|
|
if err != nil {
|
|
logrus.Errorf("failed to create commit message: %w", err)
|
|
}
|
|
pcm.psb.BroadcastToServiceTopic(commitMsg)
|
|
}
|
|
}
|
|
|
|
func (pcm *PBFTConsensusManager) handleCommit(message *types.Message) {
|
|
if pcm.commitPool.IsExistingCommit(message) {
|
|
logrus.Debug("received existing commit msg, dropping...")
|
|
return
|
|
}
|
|
if !pcm.commitPool.IsValidCommit(message) {
|
|
logrus.Debug("received invalid commit msg, dropping...")
|
|
return
|
|
}
|
|
|
|
pcm.commitPool.AddCommit(message)
|
|
err := pcm.resignMessage(message)
|
|
if err != nil {
|
|
logrus.Errorf(err.Error())
|
|
return
|
|
}
|
|
err = pcm.psb.BroadcastToServiceTopic(message)
|
|
if err != nil {
|
|
logrus.Errorf(err.Error())
|
|
return
|
|
}
|
|
|
|
consensusMsg := message.Payload
|
|
if pcm.commitPool.CommitSize(consensusMsg.ConsensusID) >= pcm.minApprovals {
|
|
if info, ok := pcm.consensusInfo[consensusMsg.ConsensusID]; ok {
|
|
info.mutex.Lock()
|
|
defer info.mutex.Unlock()
|
|
if info.alreadySubmitted {
|
|
return
|
|
}
|
|
logrus.Infof("Submitting on-chain result for consensus ID: %s", consensusMsg.ConsensusID)
|
|
reqID, ok := new(big.Int).SetString(consensusMsg.RequestID, 10)
|
|
if !ok {
|
|
logrus.Errorf("Failed to parse big int: %v", consensusMsg.RequestID)
|
|
}
|
|
callbackAddress := common.HexToAddress(consensusMsg.CallbackAddress)
|
|
err := pcm.ethereumClient.SubmitRequestAnswer(reqID, consensusMsg.Data, callbackAddress)
|
|
if err != nil {
|
|
logrus.Errorf("Failed to submit on-chain result: %w", err)
|
|
}
|
|
info.alreadySubmitted = true
|
|
}
|
|
}
|
|
}
|
|
|
|
func (pcm *PBFTConsensusManager) resignMessage(msg *types.Message) error {
|
|
sig, err := sigs.Sign(types2.SigTypeEd25519, pcm.privKey, []byte(msg.Payload.Data))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
msg.Payload.Signature = sig.Data
|
|
return nil
|
|
}
|