Fix unnecessary rebroadcast of received consensus message

This commit is contained in:
ChronosX88 2021-04-30 23:09:55 +03:00
parent ae9c21baaf
commit 90d1e6d2ad
Signed by: ChronosXYZ
GPG Key ID: 085A69A82C8C511A

View File

@ -67,6 +67,7 @@ func (pcm *PBFTConsensusManager) handlePrePrepare(message *types.Message) {
return return
} }
if pcm.msgLog.Exists(*message) { if pcm.msgLog.Exists(*message) {
logrus.Debugf("received existing pre_prepare msg, dropping...")
return return
} }
if !pcm.validator.Valid(*message) { if !pcm.validator.Valid(*message) {
@ -75,15 +76,10 @@ func (pcm *PBFTConsensusManager) handlePrePrepare(message *types.Message) {
} }
pcm.msgLog.AddMessage(*message) pcm.msgLog.AddMessage(*message)
err := pcm.psb.BroadcastToServiceTopic(message)
if err != nil {
logrus.Errorf(err.Error())
return
}
prepareMsg, err := NewMessage(message, types.MessageTypePrepare) prepareMsg, err := NewMessage(message, types.MessageTypePrepare)
if err != nil { if err != nil {
logrus.Errorf("failed to create prepare message: %w", err) logrus.Errorf("failed to create prepare message: %v", err)
} }
pcm.createConsensusInfo(&message.Payload.Task, false) pcm.createConsensusInfo(&message.Payload.Task, false)
@ -93,6 +89,7 @@ func (pcm *PBFTConsensusManager) handlePrePrepare(message *types.Message) {
func (pcm *PBFTConsensusManager) handlePrepare(message *types.Message) { func (pcm *PBFTConsensusManager) handlePrepare(message *types.Message) {
if pcm.msgLog.Exists(*message) { if pcm.msgLog.Exists(*message) {
logrus.Debugf("received existing prepare msg, dropping...")
return return
} }
if !pcm.validator.Valid(*message) { if !pcm.validator.Valid(*message) {
@ -101,11 +98,6 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *types.Message) {
} }
pcm.msgLog.AddMessage(*message) pcm.msgLog.AddMessage(*message)
err := pcm.psb.BroadcastToServiceTopic(message)
if err != nil {
logrus.Errorf(err.Error())
return
}
if len(pcm.msgLog.GetMessagesByTypeAndConsensusID(types.MessageTypePrepare, message.Payload.Task.ConsensusID)) >= pcm.minApprovals { if len(pcm.msgLog.GetMessagesByTypeAndConsensusID(types.MessageTypePrepare, message.Payload.Task.ConsensusID)) >= pcm.minApprovals {
commitMsg, err := NewMessage(message, types.MessageTypeCommit) commitMsg, err := NewMessage(message, types.MessageTypeCommit)
@ -118,6 +110,7 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *types.Message) {
func (pcm *PBFTConsensusManager) handleCommit(message *types.Message) { func (pcm *PBFTConsensusManager) handleCommit(message *types.Message) {
if pcm.msgLog.Exists(*message) { if pcm.msgLog.Exists(*message) {
logrus.Debugf("received existing commit msg, dropping...")
return return
} }
if !pcm.validator.Valid(*message) { if !pcm.validator.Valid(*message) {
@ -126,11 +119,6 @@ func (pcm *PBFTConsensusManager) handleCommit(message *types.Message) {
} }
pcm.msgLog.AddMessage(*message) pcm.msgLog.AddMessage(*message)
err := pcm.psb.BroadcastToServiceTopic(message)
if err != nil {
logrus.Errorf(err.Error())
return
}
consensusMsg := message.Payload consensusMsg := message.Payload
if len(pcm.msgLog.GetMessagesByTypeAndConsensusID(types.MessageTypeCommit, message.Payload.Task.ConsensusID)) >= pcm.minApprovals { if len(pcm.msgLog.GetMessagesByTypeAndConsensusID(types.MessageTypeCommit, message.Payload.Task.ConsensusID)) >= pcm.minApprovals {
@ -151,7 +139,7 @@ func (pcm *PBFTConsensusManager) handleCommit(message *types.Message) {
logrus.Errorf("Failed to parse request ID: %v", consensusMsg.Task.RequestID) logrus.Errorf("Failed to parse request ID: %v", consensusMsg.Task.RequestID)
} }
err = pcm.ethereumClient.SubmitRequestAnswer(reqID, consensusMsg.Task.Payload) err := pcm.ethereumClient.SubmitRequestAnswer(reqID, consensusMsg.Task.Payload)
if err != nil { if err != nil {
logrus.Errorf("Failed to submit on-chain result: %v", err) logrus.Errorf("Failed to submit on-chain result: %v", err)
} }