From 025bb9a6d11b4aba15c5cb256e2a8089016b11cb Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Fri, 4 Jun 2021 00:15:32 +0300 Subject: [PATCH] Refactor pubsub package again, make it more flexible --- consensus/consensus.go | 28 +++++++++++++--------------- consensus/utils.go | 20 ++++++++++++++------ pubsub/message.go | 7 ++++++- pubsub/pubsub_router.go | 32 ++++++++++++++++++++++++-------- 4 files changed, 57 insertions(+), 30 deletions(-) diff --git a/consensus/consensus.go b/consensus/consensus.go index 8079a5f..1aff9d1 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -1,11 +1,10 @@ package consensus import ( + "fmt" "math/big" "sync" - "github.com/fxamacker/cbor/v2" - "github.com/Secured-Finance/dione/cache" "github.com/Secured-Finance/dione/consensus/types" @@ -47,9 +46,9 @@ func NewPBFTConsensusManager(psb *pubsub.PubSubRouter, minApprovals int, privKey pcm.ethereumClient = ethereumClient pcm.cache = evc pcm.consensusMap = map[string]*Consensus{} - pcm.psb.Hook(pubsub.PrePrepareMessageType, pcm.handlePrePrepare) - pcm.psb.Hook(pubsub.PrepareMessageType, pcm.handlePrepare) - pcm.psb.Hook(pubsub.CommitMessageType, pcm.handleCommit) + pcm.psb.Hook(pubsub.PrePrepareMessageType, pcm.handlePrePrepare, types2.DioneTask{}) + pcm.psb.Hook(pubsub.PrepareMessageType, pcm.handlePrepare, types2.DioneTask{}) + pcm.psb.Hook(pubsub.CommitMessageType, pcm.handleCommit, types2.DioneTask{}) return pcm } @@ -64,7 +63,7 @@ func (pcm *PBFTConsensusManager) Propose(task types2.DioneTask) error { return nil } -func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.PubSubMessage) { +func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.GenericMessage) { cmsg, err := unmarshalPayload(message) if err != nil { return @@ -87,6 +86,7 @@ func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.PubSubMessage) prepareMsg, err := NewMessage(message, pubsub.PrepareMessageType) if err != nil { logrus.Errorf("failed to create prepare message: %v", err) + return } pcm.createConsensusInfo(&cmsg.Task, false) @@ -94,7 +94,7 @@ func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.PubSubMessage) pcm.psb.BroadcastToServiceTopic(&prepareMsg) } -func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) { +func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.GenericMessage) { cmsg, err := unmarshalPayload(message) if err != nil { return @@ -112,7 +112,7 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) { pcm.msgLog.AddMessage(cmsg) if len(pcm.msgLog.Get(types.MessageTypePrepare, cmsg.Task.ConsensusID)) >= pcm.minApprovals { - commitMsg, err := NewMessage(message, types.MessageTypeCommit) + commitMsg, err := NewMessage(message, pubsub.CommitMessageType) if err != nil { logrus.Errorf("failed to create commit message: %w", err) } @@ -120,7 +120,7 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) { } } -func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.PubSubMessage) { +func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.GenericMessage) { cmsg, err := unmarshalPayload(message) if err != nil { return @@ -184,12 +184,10 @@ func (pcm *PBFTConsensusManager) GetConsensusInfo(consensusID string) *Consensus return c } -func unmarshalPayload(msg *pubsub.PubSubMessage) (types.ConsensusMessage, error) { - var task types2.DioneTask - err := cbor.Unmarshal(msg.Payload, &task) - if err != nil { - logrus.Debug(err) - return types.ConsensusMessage{}, err +func unmarshalPayload(msg *pubsub.GenericMessage) (types.ConsensusMessage, error) { + task, ok := msg.Payload.(types2.DioneTask) + if !ok { + return types.ConsensusMessage{}, fmt.Errorf("cannot convert payload to DioneTask") } var consensusMessageType types.MessageType switch msg.Type { diff --git a/consensus/utils.go b/consensus/utils.go index d1813a0..a94a1df 100644 --- a/consensus/utils.go +++ b/consensus/utils.go @@ -4,6 +4,10 @@ import ( "encoding/binary" "fmt" + "github.com/fxamacker/cbor/v2" + + "github.com/Secured-Finance/dione/pubsub" + types2 "github.com/Secured-Finance/dione/consensus/types" "github.com/mitchellh/hashstructure/v2" @@ -107,17 +111,17 @@ func VerifyTaskSignature(task types.DioneTask) error { return nil } -func NewMessage(msg *types2.Message, typ types2.MessageType) (types2.Message, error) { - var newMsg types2.Message +func NewMessage(msg *pubsub.GenericMessage, typ pubsub.PubSubMessageType) (pubsub.GenericMessage, error) { + var newMsg pubsub.GenericMessage newMsg.Type = typ newCMsg := msg.Payload newMsg.Payload = newCMsg return newMsg, nil } -func CreatePrePrepareWithTaskSignature(task *types.DioneTask, privateKey []byte) (*types2.Message, error) { - var message types2.Message - message.Type = types2.MessageTypePrePrepare +func CreatePrePrepareWithTaskSignature(task *types.DioneTask, privateKey []byte) (*pubsub.GenericMessage, error) { + var message pubsub.GenericMessage + message.Type = pubsub.PrePrepareMessageType cHash, err := hashstructure.Hash(task, hashstructure.FormatV2, nil) if err != nil { @@ -128,6 +132,10 @@ func CreatePrePrepareWithTaskSignature(task *types.DioneTask, privateKey []byte) return nil, err } task.Signature = signature.Data - message.Payload = types2.ConsensusMessage{Task: *task} + data, err := cbor.Marshal(types2.ConsensusMessage{Task: *task}) + if err != nil { + return nil, err + } + message.Payload = data return &message, nil } diff --git a/pubsub/message.go b/pubsub/message.go index 5537d51..4b803f3 100644 --- a/pubsub/message.go +++ b/pubsub/message.go @@ -13,8 +13,13 @@ const ( NewBlockMessageType ) -type PubSubMessage struct { +type GenericMessage struct { Type PubSubMessageType From peer.ID `cbor:"-"` + Payload interface{} +} + +type PubSubMessage struct { + Type PubSubMessageType Payload []byte } diff --git a/pubsub/pubsub_router.go b/pubsub/pubsub_router.go index bd39edb..e76cae8 100644 --- a/pubsub/pubsub_router.go +++ b/pubsub/pubsub_router.go @@ -21,9 +21,10 @@ type PubSubRouter struct { handlers map[PubSubMessageType][]Handler oracleTopicName string oracleTopic *pubsub.Topic + typeMapping map[PubSubMessageType]interface{} // message type -> sample } -type Handler func(message *PubSubMessage) +type Handler func(message *GenericMessage) func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubRouter { ctx, ctxCancel := context.WithCancel(context.Background()) @@ -102,16 +103,30 @@ func (psr *PubSubRouter) handleMessage(p *pubsub.Message) { if senderPeerID == psr.node.ID() { return } - var message PubSubMessage - err = cbor.Unmarshal(p.Data, &message) + var genericMessage PubSubMessage + var message GenericMessage + err = cbor.Unmarshal(p.Data, &genericMessage) if err != nil { - logrus.Warn("Unable to decode message data! " + err.Error()) + logrus.Warn("Unable to decode pubsub message data! " + err.Error()) + return + } + sampleMsg, ok := psr.typeMapping[genericMessage.Type] + if !ok { + logrus.Warnf("Unknown message type %d: we have no clue how to decode it", genericMessage.Type) + return + } + destMsg := sampleMsg + err = cbor.Unmarshal(genericMessage.Payload, &destMsg) + if err != nil { + logrus.Warn("Unable to decode pubsub message data! " + err.Error()) return } message.From = senderPeerID - handlers, ok := psr.handlers[message.Type] + message.Type = genericMessage.Type + message.Payload = destMsg + handlers, ok := psr.handlers[genericMessage.Type] if !ok { - logrus.Warn("Dropping message " + string(message.Type) + " because we don't have any handlers!") + logrus.Warn("Dropping pubsub message " + string(genericMessage.Type) + " because we don't have any handlers!") return } for _, v := range handlers { @@ -119,15 +134,16 @@ func (psr *PubSubRouter) handleMessage(p *pubsub.Message) { } } -func (psr *PubSubRouter) Hook(messageType PubSubMessageType, handler Handler) { +func (psr *PubSubRouter) Hook(messageType PubSubMessageType, handler Handler, sample interface{}) { _, ok := psr.handlers[messageType] if !ok { psr.handlers[messageType] = []Handler{} } psr.handlers[messageType] = append(psr.handlers[messageType], handler) + psr.typeMapping[messageType] = sample } -func (psr *PubSubRouter) BroadcastToServiceTopic(msg *PubSubMessage) error { +func (psr *PubSubRouter) BroadcastToServiceTopic(msg *GenericMessage) error { data, err := cbor.Marshal(msg) if err != nil { return err