package consensus import ( "sync" "github.com/Secured-Finance/dione/models" "github.com/Secured-Finance/dione/pb" "github.com/sirupsen/logrus" ) type ConsensusState int const ( consensusPrePrepared ConsensusState = 0x0 consensusPrepared ConsensusState = 0x1 consensusCommitted ConsensusState = 0x2 testValidData = "test" ) type PBFTConsensusManager struct { psb *pb.PubSubRouter Consensuses map[string]*ConsensusData maxFaultNodes int } type ConsensusData struct { preparedCount int commitCount int State ConsensusState mutex sync.Mutex result string test bool } func NewPBFTConsensusManager(psb *pb.PubSubRouter, maxFaultNodes int) *PBFTConsensusManager { pcm := &PBFTConsensusManager{} pcm.Consensuses = make(map[string]*ConsensusData) pcm.psb = psb pcm.psb.Hook("prepared", pcm.handlePreparedMessage) pcm.psb.Hook("commit", pcm.handleCommitMessage) return pcm } func (pcm *PBFTConsensusManager) NewTestConsensus(data string, consensusID string) { //consensusID := uuid.New().String() cData := &ConsensusData{} cData.test = true pcm.Consensuses[consensusID] = cData msg := models.Message{} msg.Type = "prepared" msg.Payload = make(map[string]interface{}) msg.Payload["consensusID"] = consensusID msg.Payload["data"] = data pcm.psb.BroadcastToServiceTopic(&msg) cData.State = consensusPrePrepared logrus.Debug("started new consensus: " + consensusID) } func (pcm *PBFTConsensusManager) handlePreparedMessage(message *models.Message) { // TODO add check on view of the message consensusID := message.Payload["consensusID"].(string) if _, ok := pcm.Consensuses[consensusID]; !ok { logrus.Warn("Unknown consensus ID: " + consensusID) return } logrus.Debug("received prepared msg") data := pcm.Consensuses[consensusID] // validate payload data if data.test { rData := message.Payload["data"].(string) if rData != testValidData { logrus.Error("Incorrect data was received! Ignoring this message, because it was sent from fault node!") return } } else { // TODO } data.mutex.Lock() data.preparedCount++ data.mutex.Unlock() if data.preparedCount > 2*pcm.maxFaultNodes+1 { msg := models.Message{} msg.Payload = make(map[string]interface{}) msg.Type = "commit" msg.Payload["consensusID"] = consensusID msg.Payload["data"] = message.Payload["data"] err := pcm.psb.BroadcastToServiceTopic(&msg) if err != nil { logrus.Warn("Unable to send COMMIT message: " + err.Error()) return } data.State = consensusPrepared } } func (pcm *PBFTConsensusManager) handleCommitMessage(message *models.Message) { // TODO add check on view of the message // TODO add validation of data to this stage consensusID := message.Payload["consensusID"].(string) if _, ok := pcm.Consensuses[consensusID]; !ok { logrus.Warn("Unknown consensus ID: " + consensusID) return } data := pcm.Consensuses[consensusID] if data.State == consensusCommitted { logrus.Debug("consensus already finished, dropping COMMIT message") return } logrus.Debug("received commit msg") data.mutex.Lock() data.commitCount++ data.mutex.Unlock() if data.commitCount > 2*pcm.maxFaultNodes+1 { data.State = consensusCommitted data.result = message.Payload["data"].(string) logrus.Debug("consensus successfully finished with result: " + data.result) } }