Refactor pubsub package

This commit is contained in:
ChronosX88 2021-06-03 00:19:52 +03:00
parent 809c5f2a23
commit 0695d23866
Signed by: ChronosXYZ
GPG Key ID: 085A69A82C8C511A
9 changed files with 145 additions and 88 deletions

View File

@ -4,6 +4,8 @@ import (
"math/big" "math/big"
"sync" "sync"
"github.com/fxamacker/cbor/v2"
"github.com/Secured-Finance/dione/cache" "github.com/Secured-Finance/dione/cache"
"github.com/Secured-Finance/dione/consensus/types" "github.com/Secured-Finance/dione/consensus/types"
@ -45,9 +47,9 @@ func NewPBFTConsensusManager(psb *pubsub.PubSubRouter, minApprovals int, privKey
pcm.ethereumClient = ethereumClient pcm.ethereumClient = ethereumClient
pcm.cache = evc pcm.cache = evc
pcm.consensusMap = map[string]*Consensus{} pcm.consensusMap = map[string]*Consensus{}
pcm.psb.Hook(types.MessageTypePrePrepare, pcm.handlePrePrepare) pcm.psb.Hook(pubsub.PrePrepareMessageType, pcm.handlePrePrepare)
pcm.psb.Hook(types.MessageTypePrepare, pcm.handlePrepare) pcm.psb.Hook(pubsub.PrepareMessageType, pcm.handlePrepare)
pcm.psb.Hook(types.MessageTypeCommit, pcm.handleCommit) pcm.psb.Hook(pubsub.CommitMessageType, pcm.handleCommit)
return pcm return pcm
} }
@ -62,44 +64,54 @@ func (pcm *PBFTConsensusManager) Propose(task types2.DioneTask) error {
return nil return nil
} }
func (pcm *PBFTConsensusManager) handlePrePrepare(message *types.Message) { func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.PubSubMessage) {
if message.Payload.Task.Miner == pcm.miner.address { cmsg, err := unmarshalPayload(message)
if err != nil {
return return
} }
if pcm.msgLog.Exists(*message) {
if cmsg.Task.Miner == pcm.miner.address {
return
}
if pcm.msgLog.Exists(cmsg) {
logrus.Debugf("received existing pre_prepare msg, dropping...") logrus.Debugf("received existing pre_prepare msg, dropping...")
return return
} }
if !pcm.validator.Valid(*message) { if !pcm.validator.Valid(cmsg) {
logrus.Warn("received invalid pre_prepare msg, dropping...") logrus.Warn("received invalid pre_prepare msg, dropping...")
return return
} }
pcm.msgLog.AddMessage(*message) pcm.msgLog.AddMessage(cmsg)
prepareMsg, err := NewMessage(message, types.MessageTypePrepare) prepareMsg, err := NewMessage(message, pubsub.PrepareMessageType)
if err != nil { if err != nil {
logrus.Errorf("failed to create prepare message: %v", err) logrus.Errorf("failed to create prepare message: %v", err)
} }
pcm.createConsensusInfo(&message.Payload.Task, false) pcm.createConsensusInfo(&cmsg.Task, false)
pcm.psb.BroadcastToServiceTopic(&prepareMsg) pcm.psb.BroadcastToServiceTopic(&prepareMsg)
} }
func (pcm *PBFTConsensusManager) handlePrepare(message *types.Message) { func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) {
if pcm.msgLog.Exists(*message) { cmsg, err := unmarshalPayload(message)
if err != nil {
return
}
if pcm.msgLog.Exists(cmsg) {
logrus.Debugf("received existing prepare msg, dropping...") logrus.Debugf("received existing prepare msg, dropping...")
return return
} }
if !pcm.validator.Valid(*message) { if !pcm.validator.Valid(cmsg) {
logrus.Warn("received invalid prepare msg, dropping...") logrus.Warn("received invalid prepare msg, dropping...")
return return
} }
pcm.msgLog.AddMessage(*message) pcm.msgLog.AddMessage(cmsg)
if len(pcm.msgLog.GetMessagesByTypeAndConsensusID(types.MessageTypePrepare, message.Payload.Task.ConsensusID)) >= pcm.minApprovals { if len(pcm.msgLog.Get(types.MessageTypePrepare, cmsg.Task.ConsensusID)) >= pcm.minApprovals {
commitMsg, err := NewMessage(message, types.MessageTypeCommit) commitMsg, err := NewMessage(message, types.MessageTypeCommit)
if err != nil { if err != nil {
logrus.Errorf("failed to create commit message: %w", err) logrus.Errorf("failed to create commit message: %w", err)
@ -108,21 +120,25 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *types.Message) {
} }
} }
func (pcm *PBFTConsensusManager) handleCommit(message *types.Message) { func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.PubSubMessage) {
if pcm.msgLog.Exists(*message) { cmsg, err := unmarshalPayload(message)
if err != nil {
return
}
if pcm.msgLog.Exists(cmsg) {
logrus.Debugf("received existing commit msg, dropping...") logrus.Debugf("received existing commit msg, dropping...")
return return
} }
if !pcm.validator.Valid(*message) { if !pcm.validator.Valid(cmsg) {
logrus.Warn("received invalid commit msg, dropping...") logrus.Warn("received invalid commit msg, dropping...")
return return
} }
pcm.msgLog.AddMessage(*message) pcm.msgLog.AddMessage(cmsg)
consensusMsg := message.Payload if len(pcm.msgLog.Get(types.MessageTypeCommit, cmsg.Task.ConsensusID)) >= pcm.minApprovals {
if len(pcm.msgLog.GetMessagesByTypeAndConsensusID(types.MessageTypeCommit, message.Payload.Task.ConsensusID)) >= pcm.minApprovals { info := pcm.GetConsensusInfo(cmsg.Task.ConsensusID)
info := pcm.GetConsensusInfo(consensusMsg.Task.ConsensusID)
if info == nil { if info == nil {
logrus.Debugf("consensus doesn't exist in our consensus map - skipping...") logrus.Debugf("consensus doesn't exist in our consensus map - skipping...")
return return
@ -133,13 +149,13 @@ func (pcm *PBFTConsensusManager) handleCommit(message *types.Message) {
return return
} }
if info.IsCurrentMinerLeader { if info.IsCurrentMinerLeader {
logrus.Infof("Submitting on-chain result for consensus ID: %s", consensusMsg.Task.ConsensusID) logrus.Infof("Submitting on-chain result for consensus ID: %s", cmsg.Task.ConsensusID)
reqID, ok := new(big.Int).SetString(consensusMsg.Task.RequestID, 10) reqID, ok := new(big.Int).SetString(cmsg.Task.RequestID, 10)
if !ok { if !ok {
logrus.Errorf("Failed to parse request ID: %v", consensusMsg.Task.RequestID) logrus.Errorf("Failed to parse request ID: %v", cmsg.Task.RequestID)
} }
err := pcm.ethereumClient.SubmitRequestAnswer(reqID, consensusMsg.Task.Payload) err := pcm.ethereumClient.SubmitRequestAnswer(reqID, cmsg.Task.Payload)
if err != nil { if err != nil {
logrus.Errorf("Failed to submit on-chain result: %v", err) logrus.Errorf("Failed to submit on-chain result: %v", err)
} }
@ -167,3 +183,36 @@ func (pcm *PBFTConsensusManager) GetConsensusInfo(consensusID string) *Consensus
return c return c
} }
func unmarshalPayload(msg *pubsub.PubSubMessage) (types.ConsensusMessage, error) {
var task types2.DioneTask
err := cbor.Unmarshal(msg.Payload, &task)
if err != nil {
logrus.Debug(err)
return types.ConsensusMessage{}, err
}
var consensusMessageType types.MessageType
switch msg.Type {
case pubsub.PrePrepareMessageType:
{
consensusMessageType = types.MessageTypePrePrepare
break
}
case pubsub.PrepareMessageType:
{
consensusMessageType = types.MessageTypePrepare
break
}
case pubsub.CommitMessageType:
{
consensusMessageType = types.MessageTypeCommit
break
}
}
cmsg := types.ConsensusMessage{
Type: consensusMessageType,
From: msg.From,
Task: task,
}
return cmsg, nil
}

View File

@ -12,7 +12,7 @@ import (
) )
type ConsensusValidator struct { type ConsensusValidator struct {
validationFuncMap map[types2.MessageType]func(msg types2.Message) bool validationFuncMap map[types2.MessageType]func(msg types2.ConsensusMessage) bool
cache cache.Cache cache cache.Cache
miner *Miner miner *Miner
} }
@ -23,13 +23,12 @@ func NewConsensusValidator(ec cache.Cache, miner *Miner) *ConsensusValidator {
miner: miner, miner: miner,
} }
cv.validationFuncMap = map[types2.MessageType]func(msg types2.Message) bool{ cv.validationFuncMap = map[types2.MessageType]func(msg types2.ConsensusMessage) bool{
types2.MessageTypePrePrepare: func(msg types2.Message) bool { types2.MessageTypePrePrepare: func(msg types2.ConsensusMessage) bool {
// TODO here we need to do validation of tx itself // TODO here we need to do validation of tx itself
consensusMsg := msg.Payload
// === verify task signature === // === verify task signature ===
err := VerifyTaskSignature(consensusMsg.Task) err := VerifyTaskSignature(msg.Task)
if err != nil { if err != nil {
logrus.Errorf("unable to verify signature: %v", err) logrus.Errorf("unable to verify signature: %v", err)
return false return false
@ -38,15 +37,15 @@ func NewConsensusValidator(ec cache.Cache, miner *Miner) *ConsensusValidator {
// === verify if request exists in cache === // === verify if request exists in cache ===
var requestEvent *dioneOracle.DioneOracleNewOracleRequest var requestEvent *dioneOracle.DioneOracleNewOracleRequest
err = cv.cache.Get("request_"+consensusMsg.Task.RequestID, &requestEvent) err = cv.cache.Get("request_"+msg.Task.RequestID, &requestEvent)
if err != nil { if err != nil {
logrus.Errorf("the request doesn't exist in the cache or has been failed to decode: %v", err) logrus.Errorf("the request doesn't exist in the cache or has been failed to decode: %v", err)
return false return false
} }
if requestEvent.OriginChain != consensusMsg.Task.OriginChain || if requestEvent.OriginChain != msg.Task.OriginChain ||
requestEvent.RequestType != consensusMsg.Task.RequestType || requestEvent.RequestType != msg.Task.RequestType ||
requestEvent.RequestParams != consensusMsg.Task.RequestParams { requestEvent.RequestParams != msg.Task.RequestParams {
logrus.Errorf("the incoming task and cached request requestEvent don't match!") logrus.Errorf("the incoming task and cached request requestEvent don't match!")
return false return false
@ -54,14 +53,14 @@ func NewConsensusValidator(ec cache.Cache, miner *Miner) *ConsensusValidator {
///////////////////////////////// /////////////////////////////////
// === verify election proof wincount preliminarily === // === verify election proof wincount preliminarily ===
if consensusMsg.Task.ElectionProof.WinCount < 1 { if msg.Task.ElectionProof.WinCount < 1 {
logrus.Error("miner isn't a winner!") logrus.Error("miner isn't a winner!")
return false return false
} }
///////////////////////////////// /////////////////////////////////
// === verify miner's eligibility to propose this task === // === verify miner's eligibility to propose this task ===
err = cv.miner.IsMinerEligibleToProposeTask(common.HexToAddress(consensusMsg.Task.MinerEth)) err = cv.miner.IsMinerEligibleToProposeTask(common.HexToAddress(msg.Task.MinerEth))
if err != nil { if err != nil {
logrus.Errorf("miner is not eligible to propose task: %v", err) logrus.Errorf("miner is not eligible to propose task: %v", err)
return false return false
@ -69,22 +68,22 @@ func NewConsensusValidator(ec cache.Cache, miner *Miner) *ConsensusValidator {
///////////////////////////////// /////////////////////////////////
// === verify election proof vrf === // === verify election proof vrf ===
minerAddressMarshalled, err := consensusMsg.Task.Miner.MarshalBinary() minerAddressMarshalled, err := msg.Task.Miner.MarshalBinary()
if err != nil { if err != nil {
logrus.Errorf("failed to marshal miner address: %v", err) logrus.Errorf("failed to marshal miner address: %v", err)
return false return false
} }
electionProofRandomness, err := DrawRandomness( electionProofRandomness, err := DrawRandomness(
consensusMsg.Task.BeaconEntries[1].Data, msg.Task.BeaconEntries[1].Data,
crypto.DomainSeparationTag_ElectionProofProduction, crypto.DomainSeparationTag_ElectionProofProduction,
consensusMsg.Task.DrandRound, msg.Task.DrandRound,
minerAddressMarshalled, minerAddressMarshalled,
) )
if err != nil { if err != nil {
logrus.Errorf("failed to draw electionProofRandomness: %v", err) logrus.Errorf("failed to draw electionProofRandomness: %v", err)
return false return false
} }
err = VerifyVRF(consensusMsg.Task.Miner, electionProofRandomness, consensusMsg.Task.ElectionProof.VRFProof) err = VerifyVRF(msg.Task.Miner, electionProofRandomness, msg.Task.ElectionProof.VRFProof)
if err != nil { if err != nil {
logrus.Errorf("failed to verify election proof vrf: %v", err) logrus.Errorf("failed to verify election proof vrf: %v", err)
} }
@ -92,9 +91,9 @@ func NewConsensusValidator(ec cache.Cache, miner *Miner) *ConsensusValidator {
// === verify ticket vrf === // === verify ticket vrf ===
ticketRandomness, err := DrawRandomness( ticketRandomness, err := DrawRandomness(
consensusMsg.Task.BeaconEntries[1].Data, msg.Task.BeaconEntries[1].Data,
crypto.DomainSeparationTag_TicketProduction, crypto.DomainSeparationTag_TicketProduction,
consensusMsg.Task.DrandRound-types.TicketRandomnessLookback, msg.Task.DrandRound-types.TicketRandomnessLookback,
minerAddressMarshalled, minerAddressMarshalled,
) )
if err != nil { if err != nil {
@ -102,48 +101,48 @@ func NewConsensusValidator(ec cache.Cache, miner *Miner) *ConsensusValidator {
return false return false
} }
err = VerifyVRF(consensusMsg.Task.Miner, ticketRandomness, consensusMsg.Task.Ticket.VRFProof) err = VerifyVRF(msg.Task.Miner, ticketRandomness, msg.Task.Ticket.VRFProof)
if err != nil { if err != nil {
logrus.Errorf("failed to verify ticket vrf: %v", err) logrus.Errorf("failed to verify ticket vrf: %v", err)
} }
////////////////////////////////////// //////////////////////////////////////
// === compute wincount locally and verify values === // === compute wincount locally and verify values ===
mStake, nStake, err := cv.miner.GetStakeInfo(common.HexToAddress(consensusMsg.Task.MinerEth)) mStake, nStake, err := cv.miner.GetStakeInfo(common.HexToAddress(msg.Task.MinerEth))
if err != nil { if err != nil {
logrus.Errorf("failed to get miner stake: %v", err) logrus.Errorf("failed to get miner stake: %v", err)
return false return false
} }
actualWinCount := consensusMsg.Task.ElectionProof.ComputeWinCount(*mStake, *nStake) actualWinCount := msg.Task.ElectionProof.ComputeWinCount(*mStake, *nStake)
if consensusMsg.Task.ElectionProof.WinCount != actualWinCount { if msg.Task.ElectionProof.WinCount != actualWinCount {
logrus.Errorf("locally computed wincount isn't matching received value!", err) logrus.Errorf("locally computed wincount isn't matching received value!", err)
return false return false
} }
////////////////////////////////////// //////////////////////////////////////
// === validate payload by specific-chain checks === // === validate payload by specific-chain checks ===
if validationFunc := validation.GetValidationMethod(consensusMsg.Task.OriginChain, consensusMsg.Task.RequestType); validationFunc != nil { if validationFunc := validation.GetValidationMethod(msg.Task.OriginChain, msg.Task.RequestType); validationFunc != nil {
err := validationFunc(consensusMsg.Task.Payload) err := validationFunc(msg.Task.Payload)
if err != nil { if err != nil {
logrus.Errorf("payload validation has failed: %v", err) logrus.Errorf("payload validation has failed: %v", err)
return false return false
} }
} else { } else {
logrus.Debugf("Origin chain [%v]/request type[%v] doesn't have any payload validation!", consensusMsg.Task.OriginChain, consensusMsg.Task.RequestType) logrus.Debugf("Origin chain [%v]/request type[%v] doesn't have any payload validation!", msg.Task.OriginChain, msg.Task.RequestType)
} }
///////////////////////////////// /////////////////////////////////
return true return true
}, },
types2.MessageTypePrepare: func(msg types2.Message) bool { types2.MessageTypePrepare: func(msg types2.ConsensusMessage) bool {
err := VerifyTaskSignature(msg.Payload.Task) err := VerifyTaskSignature(msg.Task)
if err != nil { if err != nil {
return false return false
} }
return true return true
}, },
types2.MessageTypeCommit: func(msg types2.Message) bool { types2.MessageTypeCommit: func(msg types2.ConsensusMessage) bool {
err := VerifyTaskSignature(msg.Payload.Task) err := VerifyTaskSignature(msg.Task)
if err != nil { if err != nil {
return false return false
} }
@ -154,6 +153,6 @@ func NewConsensusValidator(ec cache.Cache, miner *Miner) *ConsensusValidator {
return cv return cv
} }
func (cv *ConsensusValidator) Valid(msg types2.Message) bool { func (cv *ConsensusValidator) Valid(msg types2.ConsensusMessage) bool {
return cv.validationFuncMap[msg.Type](msg) return cv.validationFuncMap[msg.Type](msg)
} }

View File

@ -8,7 +8,7 @@ import (
type MessageLog struct { type MessageLog struct {
messages mapset.Set messages mapset.Set
maxLogSize int maxLogSize int
validationFuncMap map[types2.MessageType]func(message types2.Message) validationFuncMap map[types2.MessageType]func(message types2.ConsensusMessage)
} }
func NewMessageLog() *MessageLog { func NewMessageLog() *MessageLog {
@ -20,21 +20,21 @@ func NewMessageLog() *MessageLog {
return msgLog return msgLog
} }
func (ml *MessageLog) AddMessage(msg types2.Message) { func (ml *MessageLog) AddMessage(msg types2.ConsensusMessage) {
ml.messages.Add(msg) ml.messages.Add(msg)
} }
func (ml *MessageLog) Exists(msg types2.Message) bool { func (ml *MessageLog) Exists(msg types2.ConsensusMessage) bool {
return ml.messages.Contains(msg) return ml.messages.Contains(msg)
} }
func (ml *MessageLog) GetMessagesByTypeAndConsensusID(typ types2.MessageType, consensusID string) []types2.Message { func (ml *MessageLog) Get(typ types2.MessageType, consensusID string) []*types2.ConsensusMessage {
var result []types2.Message var result []*types2.ConsensusMessage
for v := range ml.messages.Iter() { for v := range ml.messages.Iter() {
msg := v.(types2.Message) msg := v.(types2.ConsensusMessage)
if msg.Type == typ && msg.Payload.Task.ConsensusID == consensusID { if msg.Type == typ && msg.Task.ConsensusID == consensusID {
result = append(result, msg) result = append(result, &msg)
} }
} }

View File

@ -17,10 +17,6 @@ const (
type ConsensusMessage struct { type ConsensusMessage struct {
Task types.DioneTask Task types.DioneTask
} From peer.ID
Type MessageType
type Message struct {
Type MessageType
Payload ConsensusMessage
From peer.ID `cbor:"-"`
} }

View File

@ -167,7 +167,7 @@ func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTim
r := provideP2PRPCClient(lhost) r := provideP2PRPCClient(lhost)
// initialize sync manager // initialize sync manager
sm, err := provideSyncManager(bp, mp, r, baddrs[0]) // FIXME here we just pick up first bootstrap in list sm, err := provideSyncManager(bp, mp, r, baddrs[0], psb) // FIXME here we just pick up first bootstrap in list
if err != nil { if err != nil {
logrus.Fatal(err) logrus.Fatal(err)
} }

View File

@ -159,12 +159,12 @@ func provideMemPool() (*pool.Mempool, error) {
return pool.NewMempool() return pool.NewMempool()
} }
func provideSyncManager(bp *pool.BlockPool, mp *pool.Mempool, r *gorpc.Client, bootstrap multiaddr.Multiaddr) (sync.SyncManager, error) { func provideSyncManager(bp *pool.BlockPool, mp *pool.Mempool, r *gorpc.Client, bootstrap multiaddr.Multiaddr, psb *pubsub.PubSubRouter) (sync.SyncManager, error) {
addr, err := peer.AddrInfoFromP2pAddr(bootstrap) addr, err := peer.AddrInfoFromP2pAddr(bootstrap)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return sync.NewSyncManager(bp, mp, r, addr.ID), nil return sync.NewSyncManager(bp, mp, r, addr.ID, psb), nil
} }
func provideP2PRPCClient(h host.Host) *gorpc.Client { func provideP2PRPCClient(h host.Host) *gorpc.Client {

View File

@ -1,7 +0,0 @@
package pubsub
import (
"github.com/Secured-Finance/dione/consensus/types"
)
type Handler func(message *types.Message)

20
pubsub/message.go Normal file
View File

@ -0,0 +1,20 @@
package pubsub
import "github.com/libp2p/go-libp2p-core/peer"
type PubSubMessageType int
const (
UnknownMessageType = iota
PrePrepareMessageType
PrepareMessageType
CommitMessageType
NewTxMessageType
NewBlockMessageType
)
type PubSubMessage struct {
Type PubSubMessageType
From peer.ID `cbor:"-"`
Payload []byte
}

View File

@ -6,10 +6,8 @@ import (
"github.com/fxamacker/cbor/v2" "github.com/fxamacker/cbor/v2"
"github.com/Secured-Finance/dione/consensus/types" "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
host "github.com/libp2p/go-libp2p-core/host"
peer "github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -20,11 +18,13 @@ type PubSubRouter struct {
context context.Context context context.Context
contextCancel context.CancelFunc contextCancel context.CancelFunc
serviceSubscription *pubsub.Subscription serviceSubscription *pubsub.Subscription
handlers map[types.MessageType][]Handler handlers map[PubSubMessageType][]Handler
oracleTopicName string oracleTopicName string
oracleTopic *pubsub.Topic oracleTopic *pubsub.Topic
} }
type Handler func(message *PubSubMessage)
func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubRouter { func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubRouter {
ctx, ctxCancel := context.WithCancel(context.Background()) ctx, ctxCancel := context.WithCancel(context.Background())
@ -32,7 +32,7 @@ func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubR
node: h, node: h,
context: ctx, context: ctx,
contextCancel: ctxCancel, contextCancel: ctxCancel,
handlers: make(map[types.MessageType][]Handler), handlers: make(map[PubSubMessageType][]Handler),
} }
var pbOptions []pubsub.Option var pbOptions []pubsub.Option
@ -102,7 +102,7 @@ func (psr *PubSubRouter) handleMessage(p *pubsub.Message) {
if senderPeerID == psr.node.ID() { if senderPeerID == psr.node.ID() {
return return
} }
var message types.Message var message PubSubMessage
err = cbor.Unmarshal(p.Data, &message) err = cbor.Unmarshal(p.Data, &message)
if err != nil { if err != nil {
logrus.Warn("Unable to decode message data! " + err.Error()) logrus.Warn("Unable to decode message data! " + err.Error())
@ -119,7 +119,7 @@ func (psr *PubSubRouter) handleMessage(p *pubsub.Message) {
} }
} }
func (psr *PubSubRouter) Hook(messageType types.MessageType, handler Handler) { func (psr *PubSubRouter) Hook(messageType PubSubMessageType, handler Handler) {
_, ok := psr.handlers[messageType] _, ok := psr.handlers[messageType]
if !ok { if !ok {
psr.handlers[messageType] = []Handler{} psr.handlers[messageType] = []Handler{}
@ -127,7 +127,7 @@ func (psr *PubSubRouter) Hook(messageType types.MessageType, handler Handler) {
psr.handlers[messageType] = append(psr.handlers[messageType], handler) psr.handlers[messageType] = append(psr.handlers[messageType], handler)
} }
func (psr *PubSubRouter) BroadcastToServiceTopic(msg *types.Message) error { func (psr *PubSubRouter) BroadcastToServiceTopic(msg *PubSubMessage) error {
data, err := cbor.Marshal(msg) data, err := cbor.Marshal(msg)
if err != nil { if err != nil {
return err return err