dione/consensus/consensus.go

138 lines
3.8 KiB
Go

package consensus
import (
"sync"
"github.com/Secured-Finance/dione/models"
"github.com/Secured-Finance/dione/pb"
"github.com/sirupsen/logrus"
)
type ConsensusState int
const (
consensusPrePrepared ConsensusState = 0x0
consensusPrepared ConsensusState = 0x1
consensusCommitted ConsensusState = 0x2
testValidData = "test"
)
type PBFTConsensusManager struct {
psb *pb.PubSubRouter
Consensuses map[string]*ConsensusData
maxFaultNodes int
miner *Miner
}
type ConsensusData struct {
preparedCount int
commitCount int
State ConsensusState
mutex sync.Mutex
result string
test bool
onConsensusFinishCallback func(finalData string)
}
func NewPBFTConsensusManager(psb *pb.PubSubRouter, maxFaultNodes int) *PBFTConsensusManager {
pcm := &PBFTConsensusManager{}
pcm.Consensuses = make(map[string]*ConsensusData)
pcm.psb = psb
pcm.psb.Hook("prepared", pcm.handlePreparedMessage)
pcm.psb.Hook("commit", pcm.handleCommitMessage)
return pcm
}
func (pcm *PBFTConsensusManager) NewTestConsensus(data string, consensusID string, onConsensusFinishCallback func(finalData string)) {
//consensusID := uuid.New().String()
cData := &ConsensusData{}
cData.test = true
cData.onConsensusFinishCallback = onConsensusFinishCallback
pcm.Consensuses[consensusID] = cData
// here we will create DioneTask
msg := models.Message{}
msg.Type = "prepared"
msg.Payload = make(map[string]interface{})
msg.Payload["consensusID"] = consensusID
msg.Payload["data"] = data
pcm.psb.BroadcastToServiceTopic(&msg)
cData.State = consensusPrePrepared
logrus.Debug("started new consensus: " + consensusID)
}
func (pcm *PBFTConsensusManager) handlePreparedMessage(message *models.Message) {
// TODO add check on view of the message
consensusID := message.Payload["consensusID"].(string)
if _, ok := pcm.Consensuses[consensusID]; !ok {
logrus.Warn("Unknown consensus ID: " + consensusID)
return
}
logrus.Debug("received prepared msg")
data := pcm.Consensuses[consensusID]
//// validate payload data
//if data.test {
// rData := message.Payload["data"].(string)
// if rData != testValidData {
// logrus.Error("Incorrect data was received! Ignoring this message, because it was sent from fault node!")
// return
// }
//} else {
// // TODO
//}
// here we can validate miner which produced this task, is he winner, and so on
// we must to reconstruct transaction here for validating task itself
data.mutex.Lock()
data.preparedCount++
data.mutex.Unlock()
if data.preparedCount > 2*pcm.maxFaultNodes+1 {
msg := models.Message{}
msg.Payload = make(map[string]interface{})
msg.Type = "commit"
msg.Payload["consensusID"] = consensusID
msg.Payload["data"] = message.Payload["data"]
err := pcm.psb.BroadcastToServiceTopic(&msg)
if err != nil {
logrus.Warn("Unable to send COMMIT message: " + err.Error())
return
}
data.State = consensusPrepared
}
}
func (pcm *PBFTConsensusManager) handleCommitMessage(message *models.Message) {
// TODO add check on view of the message
// TODO add validation of data by hash to this stage
consensusID := message.Payload["consensusID"].(string)
if _, ok := pcm.Consensuses[consensusID]; !ok {
logrus.Warn("Unknown consensus ID: " + consensusID)
return
}
data := pcm.Consensuses[consensusID]
data.mutex.Lock()
defer data.mutex.Unlock()
if data.State == consensusCommitted {
logrus.Debug("consensus already finished, dropping COMMIT message")
return
}
logrus.Debug("received commit msg")
data.commitCount++
if data.commitCount > 2*pcm.maxFaultNodes+1 {
data.State = consensusCommitted
data.result = message.Payload["data"].(string)
logrus.Debug("consensus successfully finished with result: " + data.result)
data.onConsensusFinishCallback(data.result)
}
}