From 90d1e6d2ada443f85b2b1c8e8ef71f4ba7f99905 Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Fri, 30 Apr 2021 23:09:55 +0300 Subject: [PATCH] Fix unnecessary rebroadcast of received consensus message --- consensus/consensus.go | 22 +++++----------------- 1 file changed, 5 insertions(+), 17 deletions(-) diff --git a/consensus/consensus.go b/consensus/consensus.go index 43ca535..4d94982 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -67,6 +67,7 @@ func (pcm *PBFTConsensusManager) handlePrePrepare(message *types.Message) { return } if pcm.msgLog.Exists(*message) { + logrus.Debugf("received existing pre_prepare msg, dropping...") return } if !pcm.validator.Valid(*message) { @@ -75,15 +76,10 @@ func (pcm *PBFTConsensusManager) handlePrePrepare(message *types.Message) { } pcm.msgLog.AddMessage(*message) - err := pcm.psb.BroadcastToServiceTopic(message) - if err != nil { - logrus.Errorf(err.Error()) - return - } prepareMsg, err := NewMessage(message, types.MessageTypePrepare) if err != nil { - logrus.Errorf("failed to create prepare message: %w", err) + logrus.Errorf("failed to create prepare message: %v", err) } pcm.createConsensusInfo(&message.Payload.Task, false) @@ -93,6 +89,7 @@ func (pcm *PBFTConsensusManager) handlePrePrepare(message *types.Message) { func (pcm *PBFTConsensusManager) handlePrepare(message *types.Message) { if pcm.msgLog.Exists(*message) { + logrus.Debugf("received existing prepare msg, dropping...") return } if !pcm.validator.Valid(*message) { @@ -101,11 +98,6 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *types.Message) { } pcm.msgLog.AddMessage(*message) - err := pcm.psb.BroadcastToServiceTopic(message) - if err != nil { - logrus.Errorf(err.Error()) - return - } if len(pcm.msgLog.GetMessagesByTypeAndConsensusID(types.MessageTypePrepare, message.Payload.Task.ConsensusID)) >= pcm.minApprovals { commitMsg, err := NewMessage(message, types.MessageTypeCommit) @@ -118,6 +110,7 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *types.Message) { func (pcm *PBFTConsensusManager) handleCommit(message *types.Message) { if pcm.msgLog.Exists(*message) { + logrus.Debugf("received existing commit msg, dropping...") return } if !pcm.validator.Valid(*message) { @@ -126,11 +119,6 @@ func (pcm *PBFTConsensusManager) handleCommit(message *types.Message) { } pcm.msgLog.AddMessage(*message) - err := pcm.psb.BroadcastToServiceTopic(message) - if err != nil { - logrus.Errorf(err.Error()) - return - } consensusMsg := message.Payload if len(pcm.msgLog.GetMessagesByTypeAndConsensusID(types.MessageTypeCommit, message.Payload.Task.ConsensusID)) >= pcm.minApprovals { @@ -151,7 +139,7 @@ func (pcm *PBFTConsensusManager) handleCommit(message *types.Message) { logrus.Errorf("Failed to parse request ID: %v", consensusMsg.Task.RequestID) } - err = pcm.ethereumClient.SubmitRequestAnswer(reqID, consensusMsg.Task.Payload) + err := pcm.ethereumClient.SubmitRequestAnswer(reqID, consensusMsg.Task.Payload) if err != nil { logrus.Errorf("Failed to submit on-chain result: %v", err) }