Refactor pubsub package again, make it more flexible

This commit is contained in:
ChronosX88 2021-06-04 00:15:32 +03:00
parent 0695d23866
commit 025bb9a6d1
Signed by: ChronosXYZ
GPG Key ID: 085A69A82C8C511A
4 changed files with 57 additions and 30 deletions

View File

@ -1,11 +1,10 @@
package consensus package consensus
import ( import (
"fmt"
"math/big" "math/big"
"sync" "sync"
"github.com/fxamacker/cbor/v2"
"github.com/Secured-Finance/dione/cache" "github.com/Secured-Finance/dione/cache"
"github.com/Secured-Finance/dione/consensus/types" "github.com/Secured-Finance/dione/consensus/types"
@ -47,9 +46,9 @@ func NewPBFTConsensusManager(psb *pubsub.PubSubRouter, minApprovals int, privKey
pcm.ethereumClient = ethereumClient pcm.ethereumClient = ethereumClient
pcm.cache = evc pcm.cache = evc
pcm.consensusMap = map[string]*Consensus{} pcm.consensusMap = map[string]*Consensus{}
pcm.psb.Hook(pubsub.PrePrepareMessageType, pcm.handlePrePrepare) pcm.psb.Hook(pubsub.PrePrepareMessageType, pcm.handlePrePrepare, types2.DioneTask{})
pcm.psb.Hook(pubsub.PrepareMessageType, pcm.handlePrepare) pcm.psb.Hook(pubsub.PrepareMessageType, pcm.handlePrepare, types2.DioneTask{})
pcm.psb.Hook(pubsub.CommitMessageType, pcm.handleCommit) pcm.psb.Hook(pubsub.CommitMessageType, pcm.handleCommit, types2.DioneTask{})
return pcm return pcm
} }
@ -64,7 +63,7 @@ func (pcm *PBFTConsensusManager) Propose(task types2.DioneTask) error {
return nil return nil
} }
func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.PubSubMessage) { func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.GenericMessage) {
cmsg, err := unmarshalPayload(message) cmsg, err := unmarshalPayload(message)
if err != nil { if err != nil {
return return
@ -87,6 +86,7 @@ func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.PubSubMessage)
prepareMsg, err := NewMessage(message, pubsub.PrepareMessageType) prepareMsg, err := NewMessage(message, pubsub.PrepareMessageType)
if err != nil { if err != nil {
logrus.Errorf("failed to create prepare message: %v", err) logrus.Errorf("failed to create prepare message: %v", err)
return
} }
pcm.createConsensusInfo(&cmsg.Task, false) pcm.createConsensusInfo(&cmsg.Task, false)
@ -94,7 +94,7 @@ func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.PubSubMessage)
pcm.psb.BroadcastToServiceTopic(&prepareMsg) pcm.psb.BroadcastToServiceTopic(&prepareMsg)
} }
func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) { func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.GenericMessage) {
cmsg, err := unmarshalPayload(message) cmsg, err := unmarshalPayload(message)
if err != nil { if err != nil {
return return
@ -112,7 +112,7 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) {
pcm.msgLog.AddMessage(cmsg) pcm.msgLog.AddMessage(cmsg)
if len(pcm.msgLog.Get(types.MessageTypePrepare, cmsg.Task.ConsensusID)) >= pcm.minApprovals { if len(pcm.msgLog.Get(types.MessageTypePrepare, cmsg.Task.ConsensusID)) >= pcm.minApprovals {
commitMsg, err := NewMessage(message, types.MessageTypeCommit) commitMsg, err := NewMessage(message, pubsub.CommitMessageType)
if err != nil { if err != nil {
logrus.Errorf("failed to create commit message: %w", err) logrus.Errorf("failed to create commit message: %w", err)
} }
@ -120,7 +120,7 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) {
} }
} }
func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.PubSubMessage) { func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.GenericMessage) {
cmsg, err := unmarshalPayload(message) cmsg, err := unmarshalPayload(message)
if err != nil { if err != nil {
return return
@ -184,12 +184,10 @@ func (pcm *PBFTConsensusManager) GetConsensusInfo(consensusID string) *Consensus
return c return c
} }
func unmarshalPayload(msg *pubsub.PubSubMessage) (types.ConsensusMessage, error) { func unmarshalPayload(msg *pubsub.GenericMessage) (types.ConsensusMessage, error) {
var task types2.DioneTask task, ok := msg.Payload.(types2.DioneTask)
err := cbor.Unmarshal(msg.Payload, &task) if !ok {
if err != nil { return types.ConsensusMessage{}, fmt.Errorf("cannot convert payload to DioneTask")
logrus.Debug(err)
return types.ConsensusMessage{}, err
} }
var consensusMessageType types.MessageType var consensusMessageType types.MessageType
switch msg.Type { switch msg.Type {

View File

@ -4,6 +4,10 @@ import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"github.com/fxamacker/cbor/v2"
"github.com/Secured-Finance/dione/pubsub"
types2 "github.com/Secured-Finance/dione/consensus/types" types2 "github.com/Secured-Finance/dione/consensus/types"
"github.com/mitchellh/hashstructure/v2" "github.com/mitchellh/hashstructure/v2"
@ -107,17 +111,17 @@ func VerifyTaskSignature(task types.DioneTask) error {
return nil return nil
} }
func NewMessage(msg *types2.Message, typ types2.MessageType) (types2.Message, error) { func NewMessage(msg *pubsub.GenericMessage, typ pubsub.PubSubMessageType) (pubsub.GenericMessage, error) {
var newMsg types2.Message var newMsg pubsub.GenericMessage
newMsg.Type = typ newMsg.Type = typ
newCMsg := msg.Payload newCMsg := msg.Payload
newMsg.Payload = newCMsg newMsg.Payload = newCMsg
return newMsg, nil return newMsg, nil
} }
func CreatePrePrepareWithTaskSignature(task *types.DioneTask, privateKey []byte) (*types2.Message, error) { func CreatePrePrepareWithTaskSignature(task *types.DioneTask, privateKey []byte) (*pubsub.GenericMessage, error) {
var message types2.Message var message pubsub.GenericMessage
message.Type = types2.MessageTypePrePrepare message.Type = pubsub.PrePrepareMessageType
cHash, err := hashstructure.Hash(task, hashstructure.FormatV2, nil) cHash, err := hashstructure.Hash(task, hashstructure.FormatV2, nil)
if err != nil { if err != nil {
@ -128,6 +132,10 @@ func CreatePrePrepareWithTaskSignature(task *types.DioneTask, privateKey []byte)
return nil, err return nil, err
} }
task.Signature = signature.Data task.Signature = signature.Data
message.Payload = types2.ConsensusMessage{Task: *task} data, err := cbor.Marshal(types2.ConsensusMessage{Task: *task})
if err != nil {
return nil, err
}
message.Payload = data
return &message, nil return &message, nil
} }

View File

@ -13,8 +13,13 @@ const (
NewBlockMessageType NewBlockMessageType
) )
type PubSubMessage struct { type GenericMessage struct {
Type PubSubMessageType Type PubSubMessageType
From peer.ID `cbor:"-"` From peer.ID `cbor:"-"`
Payload interface{}
}
type PubSubMessage struct {
Type PubSubMessageType
Payload []byte Payload []byte
} }

View File

@ -21,9 +21,10 @@ type PubSubRouter struct {
handlers map[PubSubMessageType][]Handler handlers map[PubSubMessageType][]Handler
oracleTopicName string oracleTopicName string
oracleTopic *pubsub.Topic oracleTopic *pubsub.Topic
typeMapping map[PubSubMessageType]interface{} // message type -> sample
} }
type Handler func(message *PubSubMessage) type Handler func(message *GenericMessage)
func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubRouter { func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubRouter {
ctx, ctxCancel := context.WithCancel(context.Background()) ctx, ctxCancel := context.WithCancel(context.Background())
@ -102,16 +103,30 @@ func (psr *PubSubRouter) handleMessage(p *pubsub.Message) {
if senderPeerID == psr.node.ID() { if senderPeerID == psr.node.ID() {
return return
} }
var message PubSubMessage var genericMessage PubSubMessage
err = cbor.Unmarshal(p.Data, &message) var message GenericMessage
err = cbor.Unmarshal(p.Data, &genericMessage)
if err != nil { if err != nil {
logrus.Warn("Unable to decode message data! " + err.Error()) logrus.Warn("Unable to decode pubsub message data! " + err.Error())
return
}
sampleMsg, ok := psr.typeMapping[genericMessage.Type]
if !ok {
logrus.Warnf("Unknown message type %d: we have no clue how to decode it", genericMessage.Type)
return
}
destMsg := sampleMsg
err = cbor.Unmarshal(genericMessage.Payload, &destMsg)
if err != nil {
logrus.Warn("Unable to decode pubsub message data! " + err.Error())
return return
} }
message.From = senderPeerID message.From = senderPeerID
handlers, ok := psr.handlers[message.Type] message.Type = genericMessage.Type
message.Payload = destMsg
handlers, ok := psr.handlers[genericMessage.Type]
if !ok { if !ok {
logrus.Warn("Dropping message " + string(message.Type) + " because we don't have any handlers!") logrus.Warn("Dropping pubsub message " + string(genericMessage.Type) + " because we don't have any handlers!")
return return
} }
for _, v := range handlers { for _, v := range handlers {
@ -119,15 +134,16 @@ func (psr *PubSubRouter) handleMessage(p *pubsub.Message) {
} }
} }
func (psr *PubSubRouter) Hook(messageType PubSubMessageType, handler Handler) { func (psr *PubSubRouter) Hook(messageType PubSubMessageType, handler Handler, sample interface{}) {
_, ok := psr.handlers[messageType] _, ok := psr.handlers[messageType]
if !ok { if !ok {
psr.handlers[messageType] = []Handler{} psr.handlers[messageType] = []Handler{}
} }
psr.handlers[messageType] = append(psr.handlers[messageType], handler) psr.handlers[messageType] = append(psr.handlers[messageType], handler)
psr.typeMapping[messageType] = sample
} }
func (psr *PubSubRouter) BroadcastToServiceTopic(msg *PubSubMessage) error { func (psr *PubSubRouter) BroadcastToServiceTopic(msg *GenericMessage) error {
data, err := cbor.Marshal(msg) data, err := cbor.Marshal(msg)
if err != nil { if err != nil {
return err return err