From d38bc81dcf8c8b17d6a7197d138c4efc318b1385 Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Wed, 18 Nov 2020 23:53:52 +0400 Subject: [PATCH] Make cbor message encoding for consensus message model, slightly refactor pubsub part --- consensus/commit_pool.go | 22 +++++----- consensus/consensus.go | 21 +++++----- consensus/pre_prepare_pool.go | 22 +++++----- consensus/prepare_pool.go | 22 +++++----- {models => consensus/types}/message.go | 9 ++-- go.mod | 8 ++-- go.sum | 41 +++++++++++++++++-- node/node.go | 6 +-- pb/handler.go | 5 --- pubsub/handler.go | 7 ++++ {pb => pubsub}/pubsub_router.go | 38 +++++++++-------- .../filecoin/types}/filecoin-message.go | 2 +- 12 files changed, 124 insertions(+), 79 deletions(-) rename {models => consensus/types}/message.go (66%) delete mode 100644 pb/handler.go create mode 100644 pubsub/handler.go rename {pb => pubsub}/pubsub_router.go (73%) rename {models => rpc/filecoin/types}/filecoin-message.go (96%) diff --git a/consensus/commit_pool.go b/consensus/commit_pool.go index f5ce783..e910785 100644 --- a/consensus/commit_pool.go +++ b/consensus/commit_pool.go @@ -1,25 +1,25 @@ package consensus import ( - "github.com/Secured-Finance/dione/models" + types2 "github.com/Secured-Finance/dione/consensus/types" "github.com/Secured-Finance/dione/sigs" "github.com/Secured-Finance/dione/types" ) type CommitPool struct { - commitMsgs map[string][]*models.Message + commitMsgs map[string][]*types2.Message } func NewCommitPool() *CommitPool { return &CommitPool{ - commitMsgs: map[string][]*models.Message{}, + commitMsgs: map[string][]*types2.Message{}, } } -func (cp *CommitPool) CreateCommit(prepareMsg *models.Message, privateKey []byte) (*models.Message, error) { - var message models.Message - message.Type = models.MessageTypeCommit - var consensusMsg models.ConsensusMessage +func (cp *CommitPool) CreateCommit(prepareMsg *types2.Message, privateKey []byte) (*types2.Message, error) { + var message types2.Message + message.Type = types2.MessageTypeCommit + var consensusMsg types2.ConsensusMessage prepareCMessage := prepareMsg.Payload consensusMsg.ConsensusID = prepareCMessage.ConsensusID consensusMsg.RequestID = prepareMsg.Payload.RequestID @@ -34,7 +34,7 @@ func (cp *CommitPool) CreateCommit(prepareMsg *models.Message, privateKey []byte return &message, nil } -func (cp *CommitPool) IsExistingCommit(commitMsg *models.Message) bool { +func (cp *CommitPool) IsExistingCommit(commitMsg *types2.Message) bool { consensusMessage := commitMsg.Payload var exists bool for _, v := range cp.commitMsgs[consensusMessage.ConsensusID] { @@ -45,7 +45,7 @@ func (cp *CommitPool) IsExistingCommit(commitMsg *models.Message) bool { return exists } -func (cp *CommitPool) IsValidCommit(commit *models.Message) bool { +func (cp *CommitPool) IsValidCommit(commit *types2.Message) bool { consensusMsg := commit.Payload err := sigs.Verify(&types.Signature{Type: types.SigTypeEd25519, Data: consensusMsg.Signature}, commit.From, []byte(consensusMsg.Data)) if err != nil { @@ -54,10 +54,10 @@ func (cp *CommitPool) IsValidCommit(commit *models.Message) bool { return true } -func (cp *CommitPool) AddCommit(commit *models.Message) { +func (cp *CommitPool) AddCommit(commit *types2.Message) { consensusID := commit.Payload.ConsensusID if _, ok := cp.commitMsgs[consensusID]; !ok { - cp.commitMsgs[consensusID] = []*models.Message{} + cp.commitMsgs[consensusID] = []*types2.Message{} } cp.commitMsgs[consensusID] = append(cp.commitMsgs[consensusID], commit) diff --git a/consensus/consensus.go b/consensus/consensus.go index 6674663..82a5a54 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -3,17 +3,18 @@ package consensus import ( "math/big" + "github.com/Secured-Finance/dione/consensus/types" + "github.com/ethereum/go-ethereum/common" "github.com/Secured-Finance/dione/ethclient" "github.com/sirupsen/logrus" - "github.com/Secured-Finance/dione/models" - "github.com/Secured-Finance/dione/pb" + "github.com/Secured-Finance/dione/pubsub" ) type PBFTConsensusManager struct { - psb *pb.PubSubRouter + psb *pubsub.PubSubRouter //Consensuses map[string]*ConsensusData //maxFaultNodes int minApprovals int @@ -34,7 +35,7 @@ type PBFTConsensusManager struct { // onConsensusFinishCallback func(finalData string) //} -func NewPBFTConsensusManager(psb *pb.PubSubRouter, minApprovals int, privKey []byte, ethereumClient *ethclient.EthereumClient) *PBFTConsensusManager { +func NewPBFTConsensusManager(psb *pubsub.PubSubRouter, minApprovals int, privKey []byte, ethereumClient *ethclient.EthereumClient) *PBFTConsensusManager { pcm := &PBFTConsensusManager{} pcm.psb = psb pcm.prePreparePool = NewPrePreparePool() @@ -44,9 +45,9 @@ func NewPBFTConsensusManager(psb *pb.PubSubRouter, minApprovals int, privKey []b pcm.privKey = privKey pcm.ethereumClient = ethereumClient pcm.consensusLeaders = map[string]bool{} - pcm.psb.Hook(models.MessageTypePrePrepare, pcm.handlePrePrepare) - pcm.psb.Hook(models.MessageTypePrepare, pcm.handlePrepare) - pcm.psb.Hook(models.MessageTypeCommit, pcm.handleCommit) + pcm.psb.Hook(types.MessageTypePrePrepare, pcm.handlePrePrepare) + pcm.psb.Hook(types.MessageTypePrepare, pcm.handlePrepare) + pcm.psb.Hook(types.MessageTypeCommit, pcm.handleCommit) return pcm } @@ -195,7 +196,7 @@ func (pcm *PBFTConsensusManager) Propose(consensusID, data string, requestID *bi return nil } -func (pcm *PBFTConsensusManager) handlePrePrepare(message *models.Message) { +func (pcm *PBFTConsensusManager) handlePrePrepare(message *types.Message) { if pcm.prePreparePool.IsExistingPrePrepare(message) { logrus.Debug("received existing pre_prepare msg, dropping...") return @@ -213,7 +214,7 @@ func (pcm *PBFTConsensusManager) handlePrePrepare(message *models.Message) { pcm.psb.BroadcastToServiceTopic(prepareMsg) } -func (pcm *PBFTConsensusManager) handlePrepare(message *models.Message) { +func (pcm *PBFTConsensusManager) handlePrepare(message *types.Message) { if pcm.preparePool.IsExistingPrepare(message) { logrus.Debug("received existing prepare msg, dropping...") return @@ -235,7 +236,7 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *models.Message) { } } -func (pcm *PBFTConsensusManager) handleCommit(message *models.Message) { +func (pcm *PBFTConsensusManager) handleCommit(message *types.Message) { if pcm.commitPool.IsExistingCommit(message) { logrus.Debug("received existing commit msg, dropping...") return diff --git a/consensus/pre_prepare_pool.go b/consensus/pre_prepare_pool.go index 3bd125e..fc00000 100644 --- a/consensus/pre_prepare_pool.go +++ b/consensus/pre_prepare_pool.go @@ -1,26 +1,26 @@ package consensus import ( - "github.com/Secured-Finance/dione/models" + types2 "github.com/Secured-Finance/dione/consensus/types" "github.com/Secured-Finance/dione/sigs" "github.com/Secured-Finance/dione/types" "github.com/sirupsen/logrus" ) type PrePreparePool struct { - prePrepareMsgs map[string][]*models.Message + prePrepareMsgs map[string][]*types2.Message } func NewPrePreparePool() *PrePreparePool { return &PrePreparePool{ - prePrepareMsgs: map[string][]*models.Message{}, + prePrepareMsgs: map[string][]*types2.Message{}, } } -func (pp *PrePreparePool) CreatePrePrepare(consensusID, data string, requestID string, callbackAddress string, privateKey []byte) (*models.Message, error) { - var message models.Message - message.Type = models.MessageTypePrePrepare - var consensusMsg models.ConsensusMessage +func (pp *PrePreparePool) CreatePrePrepare(consensusID, data string, requestID string, callbackAddress string, privateKey []byte) (*types2.Message, error) { + var message types2.Message + message.Type = types2.MessageTypePrePrepare + var consensusMsg types2.ConsensusMessage consensusMsg.ConsensusID = consensusID consensusMsg.RequestID = requestID consensusMsg.CallbackAddress = callbackAddress @@ -34,7 +34,7 @@ func (pp *PrePreparePool) CreatePrePrepare(consensusID, data string, requestID s return &message, nil } -func (ppp *PrePreparePool) IsExistingPrePrepare(prepareMsg *models.Message) bool { +func (ppp *PrePreparePool) IsExistingPrePrepare(prepareMsg *types2.Message) bool { consensusMessage := prepareMsg.Payload var exists bool for _, v := range ppp.prePrepareMsgs[consensusMessage.ConsensusID] { @@ -45,7 +45,7 @@ func (ppp *PrePreparePool) IsExistingPrePrepare(prepareMsg *models.Message) bool return exists } -func (ppp *PrePreparePool) IsValidPrePrepare(prePrepare *models.Message) bool { +func (ppp *PrePreparePool) IsValidPrePrepare(prePrepare *types2.Message) bool { // TODO here we need to do validation of tx itself consensusMsg := prePrepare.Payload err := sigs.Verify(&types.Signature{Type: types.SigTypeEd25519, Data: consensusMsg.Signature}, prePrepare.From, []byte(consensusMsg.Data)) @@ -56,10 +56,10 @@ func (ppp *PrePreparePool) IsValidPrePrepare(prePrepare *models.Message) bool { return true } -func (ppp *PrePreparePool) AddPrePrepare(prePrepare *models.Message) { +func (ppp *PrePreparePool) AddPrePrepare(prePrepare *types2.Message) { consensusID := prePrepare.Payload.ConsensusID if _, ok := ppp.prePrepareMsgs[consensusID]; !ok { - ppp.prePrepareMsgs[consensusID] = []*models.Message{} + ppp.prePrepareMsgs[consensusID] = []*types2.Message{} } ppp.prePrepareMsgs[consensusID] = append(ppp.prePrepareMsgs[consensusID], prePrepare) diff --git a/consensus/prepare_pool.go b/consensus/prepare_pool.go index c395c4c..8580c15 100644 --- a/consensus/prepare_pool.go +++ b/consensus/prepare_pool.go @@ -1,26 +1,26 @@ package consensus import ( - "github.com/Secured-Finance/dione/models" + types2 "github.com/Secured-Finance/dione/consensus/types" "github.com/Secured-Finance/dione/sigs" "github.com/Secured-Finance/dione/types" ) type PreparePool struct { - prepareMsgs map[string][]*models.Message + prepareMsgs map[string][]*types2.Message privateKey []byte } func NewPreparePool() *PreparePool { return &PreparePool{ - prepareMsgs: map[string][]*models.Message{}, + prepareMsgs: map[string][]*types2.Message{}, } } -func (pp *PreparePool) CreatePrepare(prePrepareMsg *models.Message, privateKey []byte) (*models.Message, error) { - var message models.Message - message.Type = models.MessageTypePrepare - var consensusMsg models.ConsensusMessage +func (pp *PreparePool) CreatePrepare(prePrepareMsg *types2.Message, privateKey []byte) (*types2.Message, error) { + var message types2.Message + message.Type = types2.MessageTypePrepare + var consensusMsg types2.ConsensusMessage prepareCMessage := prePrepareMsg.Payload consensusMsg.ConsensusID = prepareCMessage.ConsensusID consensusMsg.RequestID = prePrepareMsg.Payload.RequestID @@ -35,7 +35,7 @@ func (pp *PreparePool) CreatePrepare(prePrepareMsg *models.Message, privateKey [ return &message, nil } -func (pp *PreparePool) IsExistingPrepare(prepareMsg *models.Message) bool { +func (pp *PreparePool) IsExistingPrepare(prepareMsg *types2.Message) bool { consensusMessage := prepareMsg.Payload var exists bool for _, v := range pp.prepareMsgs[consensusMessage.ConsensusID] { @@ -46,7 +46,7 @@ func (pp *PreparePool) IsExistingPrepare(prepareMsg *models.Message) bool { return exists } -func (pp *PreparePool) IsValidPrepare(prepare *models.Message) bool { +func (pp *PreparePool) IsValidPrepare(prepare *types2.Message) bool { consensusMsg := prepare.Payload err := sigs.Verify(&types.Signature{Type: types.SigTypeEd25519, Data: consensusMsg.Signature}, prepare.From, []byte(consensusMsg.Data)) if err != nil { @@ -55,10 +55,10 @@ func (pp *PreparePool) IsValidPrepare(prepare *models.Message) bool { return true } -func (pp *PreparePool) AddPrepare(prepare *models.Message) { +func (pp *PreparePool) AddPrepare(prepare *types2.Message) { consensusID := prepare.Payload.ConsensusID if _, ok := pp.prepareMsgs[consensusID]; !ok { - pp.prepareMsgs[consensusID] = []*models.Message{} + pp.prepareMsgs[consensusID] = []*types2.Message{} } pp.prepareMsgs[consensusID] = append(pp.prepareMsgs[consensusID], prepare) diff --git a/models/message.go b/consensus/types/message.go similarity index 66% rename from models/message.go rename to consensus/types/message.go index 1a8af34..b2ea217 100644 --- a/models/message.go +++ b/consensus/types/message.go @@ -1,4 +1,4 @@ -package models +package types import ( "github.com/libp2p/go-libp2p-core/peer" @@ -15,6 +15,7 @@ const ( ) type ConsensusMessage struct { + _ struct{} `cbor:",toarray"` ConsensusID string Signature []byte RequestID string @@ -23,7 +24,7 @@ type ConsensusMessage struct { } type Message struct { - Type MessageType `json:"type"` - Payload ConsensusMessage `json:"payload"` - From peer.ID `json:"-"` + Type MessageType `cbor:"1,keyasint"` + Payload ConsensusMessage `cbor:"2,keyasint"` + From peer.ID `cbor:"-"` } diff --git a/go.mod b/go.mod index c647c51..18f1a4f 100644 --- a/go.mod +++ b/go.mod @@ -18,12 +18,13 @@ require ( github.com/filecoin-project/go-address v0.0.4 github.com/filecoin-project/go-state-types v0.0.0-20201021025442-0ac4de847f4f github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 // indirect + github.com/fxamacker/cbor/v2 v2.2.0 github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08 // indirect github.com/go-ozzo/ozzo-validation v3.6.0+incompatible + github.com/gobwas/ws v1.0.4 // indirect github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect github.com/golang/snappy v0.0.2-0.20190904063534-ff6b7dc882cf // indirect github.com/google/gopacket v1.1.18 // indirect - github.com/google/logger v1.1.0 github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f // indirect github.com/ipfs/go-datastore v0.4.5 // indirect github.com/ipfs/go-ds-badger2 v0.1.1-0.20200708190120-187fc06f714e // indirect @@ -41,11 +42,11 @@ require ( github.com/mattn/go-colorable v0.1.6 // indirect github.com/mattn/go-sqlite3 v1.9.0 github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 - github.com/mitchellh/mapstructure v1.3.3 + github.com/mitchellh/mapstructure v1.3.3 // indirect github.com/multiformats/go-multiaddr v0.3.1 github.com/olekukonko/tablewriter v0.0.4 // indirect github.com/onsi/ginkgo v1.14.0 // indirect - github.com/polydawn/refmt v0.0.0-20190809202753-05966cbd336a // indirect + github.com/polydawn/refmt v0.0.0-20190809202753-05966cbd336a github.com/raulk/clock v1.1.0 github.com/rjeczalik/notify v0.9.2 // indirect github.com/rs/cors v1.7.0 // indirect @@ -71,4 +72,5 @@ require ( gopkg.in/olebedev/go-duktape.v3 v3.0.0-20200619000410-60c24ae608a6 // indirect gopkg.in/urfave/cli.v1 v1.20.0 // indirect honnef.co/go/tools v0.0.1-2020.1.3 // indirect + nhooyr.io/websocket v1.8.6 // indirect ) diff --git a/go.sum b/go.sum index 27c71b2..72719e3 100644 --- a/go.sum +++ b/go.sum @@ -208,10 +208,17 @@ github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/fxamacker/cbor v1.5.1 h1:XjQWBgdmQyqimslUh5r4tUGmoqzHmBFQOImkWGi2awg= +github.com/fxamacker/cbor/v2 v2.2.0 h1:6eXqdDDe588rSYAi1HfZKbx6YYQO4mxQ9eC6xYpU/JQ= +github.com/fxamacker/cbor/v2 v2.2.0/go.mod h1:TA1xS00nchWmaBnEIxPSE5oHLuJBAVvqrtAnWBwBCVo= github.com/garyburd/redigo v1.6.0/go.mod h1:NR3MbYisc3/PwhQ00EMzDiPmrwpPxAn5GI05/YaO1SY= github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08 h1:f6D9Hr8xV8uYKlyuj8XIruxlh9WjVjdh1gIicAS7ays= github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08/go.mod h1:x7DCsMOv1taUwEWCzT4cmDeAkigA5/QCwUodaVOe8Ww= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= +github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= +github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14= +github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M= github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= @@ -224,11 +231,26 @@ github.com/go-logfmt/logfmt v0.5.0 h1:TrB8swr/68K7m9CcGut2g3UOihhbcbiMAYiuTXdEih github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-ozzo/ozzo-validation v3.6.0+incompatible h1:msy24VGS42fKO9K1vLz82/GeYW1cILu7Nuuj1N3BBkE= github.com/go-ozzo/ozzo-validation v3.6.0+incompatible/go.mod h1:gsEKFIVnabGBt6mXmxK0MoFy+cZoTJY6mu5Ll3LVLBU= +github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A= +github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q= +github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= +github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no= +github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= +github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY= +github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0= +github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= +github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8= +github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= +github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= +github.com/gobwas/ws v1.0.4 h1:5eXU1CZhpQdq5kXbKb+sECH5Ia5KiO6CYzIzdlVx6Bs= +github.com/gobwas/ws v1.0.4/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= github.com/godbus/dbus v0.0.0-20190402143921-271e53dc4968/go.mod h1:/YcGZj5zSblfDWMMoOzV4fas9FZnQYTkDnsGvmh2Grw= github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= @@ -258,6 +280,7 @@ github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= @@ -286,8 +309,6 @@ github.com/google/gopacket v1.1.17 h1:rMrlX2ZY2UbvT+sdz3+6J+pp2z+msCq9MxTU6ymxbB github.com/google/gopacket v1.1.17/go.mod h1:UdDNZ1OO62aGYVnPhxT1U6aI7ukYtA/kB8vaU0diBUM= github.com/google/gopacket v1.1.18 h1:lum7VRA9kdlvBi7/v2p7/zcbkduHaCH/SVVyurs7OpY= github.com/google/gopacket v1.1.18/go.mod h1:UdDNZ1OO62aGYVnPhxT1U6aI7ukYtA/kB8vaU0diBUM= -github.com/google/logger v1.1.0 h1:saB74Etb4EAJNH3z74CVbCKk75hld/8T0CsXKetWCwM= -github.com/google/logger v1.1.0/go.mod h1:w7O8nrRr0xufejBlQMI83MXqRusvREoJdaAxV+CoAB4= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= @@ -456,6 +477,7 @@ github.com/jsimonetti/rtnetlink v0.0.0-20200117123717-f846d4f6c1f4/go.mod h1:WGu github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/json-iterator/go v1.1.9 h1:9yzud/Ht36ygwatGx56VwCZtlI/2AD15T1X2sjSuGns= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= @@ -475,6 +497,7 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= +github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.10.4/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.10.7 h1:7rix8v8GpI3ZBb0nSozFRgbtXKv+hOe+qfEpZqybrAg= github.com/klauspost/compress v1.10.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= @@ -493,6 +516,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= +github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.7.0 h1:h93mCPfUSkaul3Ka/VG8uZdmW1uMHDGxzu0NWHuJmHY= github.com/lib/pq v1.7.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= @@ -746,8 +771,10 @@ github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh github.com/mitchellh/mapstructure v1.3.3 h1:SzB1nHZ2Xi+17FP0zVQBHIZqvwRN9408fJO8h+eeNA8= github.com/mitchellh/mapstructure v1.3.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= github.com/mr-tron/base58 v1.1.1/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= @@ -1025,7 +1052,11 @@ github.com/uber/jaeger-client-go v2.23.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMW github.com/uber/jaeger-lib v1.5.1-0.20181102163054-1fc5c315e03c/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw= github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= +github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= +github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= +github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= +github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli v1.22.1 h1:+mkCCcOFKPnCmVYVcURKps1Xe+3zP90gSYGNfRkjoIY= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= @@ -1063,6 +1094,8 @@ github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1: github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208 h1:1cngl9mPEoITZG8s8cVcUy5CeIBYhEESkOB7m6Gmkrk= github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208/go.mod h1:IotVbo4F+mw0EzQ08zFqg7pK3FebNXpaMsRy2RT+Ees= github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= +github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= +github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= @@ -1206,8 +1239,6 @@ golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20200602114024-627f9648deb9 h1:pNX+40auqi2JqRfOP1akLGtYcn15TUbkhwuCO3foqqM= golang.org/x/net v0.0.0-20200602114024-627f9648deb9/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= -golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU= -golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201016165138-7b1cca2348c0 h1:5kGOVHlq0euqwzgTC9Vu15p6fV1Wi0ArVi8da2urnVg= golang.org/x/net v0.0.0-20201016165138-7b1cca2348c0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -1441,6 +1472,8 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3 h1:sXmLre5bzIR6ypkjXCDI3jHPssRhc8KD/Ome589sc3U= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= +nhooyr.io/websocket v1.8.6 h1:s+C3xAMLwGmlI31Nyn/eAehUlZPwfYZu2JXM621Q5/k= +nhooyr.io/websocket v1.8.6/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU= diff --git a/node/node.go b/node/node.go index f9738f4..74a0911 100644 --- a/node/node.go +++ b/node/node.go @@ -26,7 +26,7 @@ import ( "github.com/Secured-Finance/dione/config" "github.com/Secured-Finance/dione/consensus" "github.com/Secured-Finance/dione/ethclient" - "github.com/Secured-Finance/dione/pb" + "github.com/Secured-Finance/dione/pubsub" "github.com/libp2p/go-libp2p" crypto "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" @@ -40,7 +40,7 @@ const ( type Node struct { Host host.Host - PubSubRouter *pb.PubSubRouter + PubSubRouter *pubsub.PubSubRouter GlobalCtx context.Context GlobalCtxCancel context.CancelFunc OracleTopic string @@ -153,7 +153,7 @@ func (n *Node) setupSolanaClient() { } func (n *Node) setupPubsub() { - n.PubSubRouter = pb.NewPubSubRouter(n.Host, n.OracleTopic) + n.PubSubRouter = pubsub.NewPubSubRouter(n.Host, n.OracleTopic) // wait for setting up pubsub //time.Sleep(3 * time.Second) } diff --git a/pb/handler.go b/pb/handler.go deleted file mode 100644 index 33a7357..0000000 --- a/pb/handler.go +++ /dev/null @@ -1,5 +0,0 @@ -package pb - -import "github.com/Secured-Finance/dione/models" - -type Handler func(message *models.Message) diff --git a/pubsub/handler.go b/pubsub/handler.go new file mode 100644 index 0000000..d53ef51 --- /dev/null +++ b/pubsub/handler.go @@ -0,0 +1,7 @@ +package pubsub + +import ( + "github.com/Secured-Finance/dione/consensus/types" +) + +type Handler func(message *types.Message) diff --git a/pb/pubsub_router.go b/pubsub/pubsub_router.go similarity index 73% rename from pb/pubsub_router.go rename to pubsub/pubsub_router.go index d29491f..be4d33a 100644 --- a/pb/pubsub_router.go +++ b/pubsub/pubsub_router.go @@ -1,10 +1,12 @@ -package pb +package pubsub import ( "context" - "encoding/json" - "github.com/Secured-Finance/dione/models" + "github.com/fxamacker/cbor/v2" + + "github.com/Secured-Finance/dione/consensus/types" + host "github.com/libp2p/go-libp2p-core/host" peer "github.com/libp2p/go-libp2p-core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -12,12 +14,13 @@ import ( ) type PubSubRouter struct { - node host.Host - Pubsub *pubsub.PubSub - context context.Context - contextCancel context.CancelFunc - handlers map[models.MessageType][]Handler - oracleTopic string + node host.Host + Pubsub *pubsub.PubSub + context context.Context + contextCancel context.CancelFunc + serviceSubscription *pubsub.Subscription + handlers map[types.MessageType][]Handler + oracleTopic string } func NewPubSubRouter(h host.Host, oracleTopic string) *PubSubRouter { @@ -27,7 +30,7 @@ func NewPubSubRouter(h host.Host, oracleTopic string) *PubSubRouter { node: h, context: ctx, contextCancel: ctxCancel, - handlers: make(map[models.MessageType][]Handler), + handlers: make(map[types.MessageType][]Handler), } pb, err := pubsub.NewFloodSub( @@ -40,10 +43,13 @@ func NewPubSubRouter(h host.Host, oracleTopic string) *PubSubRouter { } psr.oracleTopic = oracleTopic - subscription, err := pb.Subscribe(oracleTopic) + topic, err := pb.Join(oracleTopic) if err != nil { logrus.Fatal("Error occurred when subscribing to service topic", err) } + + subscription, err := topic.Subscribe() + psr.serviceSubscription = subscription psr.Pubsub = pb go func() { @@ -76,8 +82,8 @@ func (psr *PubSubRouter) handleMessage(p *pubsub.Message) { if senderPeerID == psr.node.ID() { return } - var message models.Message - err = json.Unmarshal(p.Data, &message) + var message types.Message + err = cbor.Unmarshal(p.Data, &message) if err != nil { logrus.Warn("Unable to decode message data! " + err.Error()) return @@ -93,7 +99,7 @@ func (psr *PubSubRouter) handleMessage(p *pubsub.Message) { } } -func (psr *PubSubRouter) Hook(messageType models.MessageType, handler Handler) { +func (psr *PubSubRouter) Hook(messageType types.MessageType, handler Handler) { handlers, ok := psr.handlers[messageType] if !ok { emptyArray := []Handler{} @@ -103,8 +109,8 @@ func (psr *PubSubRouter) Hook(messageType models.MessageType, handler Handler) { psr.handlers[messageType] = append(handlers, handler) } -func (psr *PubSubRouter) BroadcastToServiceTopic(msg *models.Message) error { - data, err := json.Marshal(msg) +func (psr *PubSubRouter) BroadcastToServiceTopic(msg *types.Message) error { + data, err := cbor.Marshal(msg) if err != nil { return err } diff --git a/models/filecoin-message.go b/rpc/filecoin/types/filecoin-message.go similarity index 96% rename from models/filecoin-message.go rename to rpc/filecoin/types/filecoin-message.go index ae07dcc..5b8e748 100644 --- a/models/filecoin-message.go +++ b/rpc/filecoin/types/filecoin-message.go @@ -1,4 +1,4 @@ -package models +package types import ( "github.com/filecoin-project/go-address"