diff --git a/consensus/consensus.go b/consensus/consensus.go index 2b81c38..0236026 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -29,6 +29,7 @@ type ConsensusData struct { commitCount int State ConsensusState mutex sync.Mutex + result string test bool } @@ -85,8 +86,10 @@ func (pcm *PBFTConsensusManager) handlePreparedMessage(message *models.Message) if data.preparedCount > 2*pcm.maxFaultNodes+1 { msg := models.Message{} + msg.Payload = make(map[string]interface{}) msg.Type = "commit" msg.Payload["consensusID"] = consensusID + msg.Payload["data"] = message.Payload["data"] err := pcm.psb.BroadcastToServiceTopic(&msg) if err != nil { logrus.Warn("Unable to send COMMIT message: " + err.Error()) @@ -98,18 +101,28 @@ func (pcm *PBFTConsensusManager) handlePreparedMessage(message *models.Message) func (pcm *PBFTConsensusManager) handleCommitMessage(message *models.Message) { // TODO add check on view of the message + // TODO add validation of data to this stage consensusID := message.Payload["consensusID"].(string) if _, ok := pcm.Consensuses[consensusID]; !ok { logrus.Warn("Unknown consensus ID: " + consensusID) return } data := pcm.Consensuses[consensusID] + + if data.State == consensusCommitted { + logrus.Debug("consensus already finished, dropping COMMIT message") + return + } + + logrus.Debug("received commit msg") + data.mutex.Lock() data.commitCount++ data.mutex.Unlock() if data.commitCount > 2*pcm.maxFaultNodes+1 { - logrus.Debug("consensus successfully finished") data.State = consensusCommitted + data.result = message.Payload["data"].(string) + logrus.Debug("consensus successfully finished with result: " + data.result) } }