Make cbor message encoding for consensus message model, slightly refactor pubsub part

This commit is contained in:
ChronosX88 2020-11-18 23:53:52 +04:00
parent 354d0dd833
commit d38bc81dcf
Signed by: ChronosXYZ
GPG Key ID: 085A69A82C8C511A
12 changed files with 124 additions and 79 deletions

View File

@ -1,25 +1,25 @@
package consensus
import (
"github.com/Secured-Finance/dione/models"
types2 "github.com/Secured-Finance/dione/consensus/types"
"github.com/Secured-Finance/dione/sigs"
"github.com/Secured-Finance/dione/types"
)
type CommitPool struct {
commitMsgs map[string][]*models.Message
commitMsgs map[string][]*types2.Message
}
func NewCommitPool() *CommitPool {
return &CommitPool{
commitMsgs: map[string][]*models.Message{},
commitMsgs: map[string][]*types2.Message{},
}
}
func (cp *CommitPool) CreateCommit(prepareMsg *models.Message, privateKey []byte) (*models.Message, error) {
var message models.Message
message.Type = models.MessageTypeCommit
var consensusMsg models.ConsensusMessage
func (cp *CommitPool) CreateCommit(prepareMsg *types2.Message, privateKey []byte) (*types2.Message, error) {
var message types2.Message
message.Type = types2.MessageTypeCommit
var consensusMsg types2.ConsensusMessage
prepareCMessage := prepareMsg.Payload
consensusMsg.ConsensusID = prepareCMessage.ConsensusID
consensusMsg.RequestID = prepareMsg.Payload.RequestID
@ -34,7 +34,7 @@ func (cp *CommitPool) CreateCommit(prepareMsg *models.Message, privateKey []byte
return &message, nil
}
func (cp *CommitPool) IsExistingCommit(commitMsg *models.Message) bool {
func (cp *CommitPool) IsExistingCommit(commitMsg *types2.Message) bool {
consensusMessage := commitMsg.Payload
var exists bool
for _, v := range cp.commitMsgs[consensusMessage.ConsensusID] {
@ -45,7 +45,7 @@ func (cp *CommitPool) IsExistingCommit(commitMsg *models.Message) bool {
return exists
}
func (cp *CommitPool) IsValidCommit(commit *models.Message) bool {
func (cp *CommitPool) IsValidCommit(commit *types2.Message) bool {
consensusMsg := commit.Payload
err := sigs.Verify(&types.Signature{Type: types.SigTypeEd25519, Data: consensusMsg.Signature}, commit.From, []byte(consensusMsg.Data))
if err != nil {
@ -54,10 +54,10 @@ func (cp *CommitPool) IsValidCommit(commit *models.Message) bool {
return true
}
func (cp *CommitPool) AddCommit(commit *models.Message) {
func (cp *CommitPool) AddCommit(commit *types2.Message) {
consensusID := commit.Payload.ConsensusID
if _, ok := cp.commitMsgs[consensusID]; !ok {
cp.commitMsgs[consensusID] = []*models.Message{}
cp.commitMsgs[consensusID] = []*types2.Message{}
}
cp.commitMsgs[consensusID] = append(cp.commitMsgs[consensusID], commit)

View File

@ -3,17 +3,18 @@ package consensus
import (
"math/big"
"github.com/Secured-Finance/dione/consensus/types"
"github.com/ethereum/go-ethereum/common"
"github.com/Secured-Finance/dione/ethclient"
"github.com/sirupsen/logrus"
"github.com/Secured-Finance/dione/models"
"github.com/Secured-Finance/dione/pb"
"github.com/Secured-Finance/dione/pubsub"
)
type PBFTConsensusManager struct {
psb *pb.PubSubRouter
psb *pubsub.PubSubRouter
//Consensuses map[string]*ConsensusData
//maxFaultNodes int
minApprovals int
@ -34,7 +35,7 @@ type PBFTConsensusManager struct {
// onConsensusFinishCallback func(finalData string)
//}
func NewPBFTConsensusManager(psb *pb.PubSubRouter, minApprovals int, privKey []byte, ethereumClient *ethclient.EthereumClient) *PBFTConsensusManager {
func NewPBFTConsensusManager(psb *pubsub.PubSubRouter, minApprovals int, privKey []byte, ethereumClient *ethclient.EthereumClient) *PBFTConsensusManager {
pcm := &PBFTConsensusManager{}
pcm.psb = psb
pcm.prePreparePool = NewPrePreparePool()
@ -44,9 +45,9 @@ func NewPBFTConsensusManager(psb *pb.PubSubRouter, minApprovals int, privKey []b
pcm.privKey = privKey
pcm.ethereumClient = ethereumClient
pcm.consensusLeaders = map[string]bool{}
pcm.psb.Hook(models.MessageTypePrePrepare, pcm.handlePrePrepare)
pcm.psb.Hook(models.MessageTypePrepare, pcm.handlePrepare)
pcm.psb.Hook(models.MessageTypeCommit, pcm.handleCommit)
pcm.psb.Hook(types.MessageTypePrePrepare, pcm.handlePrePrepare)
pcm.psb.Hook(types.MessageTypePrepare, pcm.handlePrepare)
pcm.psb.Hook(types.MessageTypeCommit, pcm.handleCommit)
return pcm
}
@ -195,7 +196,7 @@ func (pcm *PBFTConsensusManager) Propose(consensusID, data string, requestID *bi
return nil
}
func (pcm *PBFTConsensusManager) handlePrePrepare(message *models.Message) {
func (pcm *PBFTConsensusManager) handlePrePrepare(message *types.Message) {
if pcm.prePreparePool.IsExistingPrePrepare(message) {
logrus.Debug("received existing pre_prepare msg, dropping...")
return
@ -213,7 +214,7 @@ func (pcm *PBFTConsensusManager) handlePrePrepare(message *models.Message) {
pcm.psb.BroadcastToServiceTopic(prepareMsg)
}
func (pcm *PBFTConsensusManager) handlePrepare(message *models.Message) {
func (pcm *PBFTConsensusManager) handlePrepare(message *types.Message) {
if pcm.preparePool.IsExistingPrepare(message) {
logrus.Debug("received existing prepare msg, dropping...")
return
@ -235,7 +236,7 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *models.Message) {
}
}
func (pcm *PBFTConsensusManager) handleCommit(message *models.Message) {
func (pcm *PBFTConsensusManager) handleCommit(message *types.Message) {
if pcm.commitPool.IsExistingCommit(message) {
logrus.Debug("received existing commit msg, dropping...")
return

View File

@ -1,26 +1,26 @@
package consensus
import (
"github.com/Secured-Finance/dione/models"
types2 "github.com/Secured-Finance/dione/consensus/types"
"github.com/Secured-Finance/dione/sigs"
"github.com/Secured-Finance/dione/types"
"github.com/sirupsen/logrus"
)
type PrePreparePool struct {
prePrepareMsgs map[string][]*models.Message
prePrepareMsgs map[string][]*types2.Message
}
func NewPrePreparePool() *PrePreparePool {
return &PrePreparePool{
prePrepareMsgs: map[string][]*models.Message{},
prePrepareMsgs: map[string][]*types2.Message{},
}
}
func (pp *PrePreparePool) CreatePrePrepare(consensusID, data string, requestID string, callbackAddress string, privateKey []byte) (*models.Message, error) {
var message models.Message
message.Type = models.MessageTypePrePrepare
var consensusMsg models.ConsensusMessage
func (pp *PrePreparePool) CreatePrePrepare(consensusID, data string, requestID string, callbackAddress string, privateKey []byte) (*types2.Message, error) {
var message types2.Message
message.Type = types2.MessageTypePrePrepare
var consensusMsg types2.ConsensusMessage
consensusMsg.ConsensusID = consensusID
consensusMsg.RequestID = requestID
consensusMsg.CallbackAddress = callbackAddress
@ -34,7 +34,7 @@ func (pp *PrePreparePool) CreatePrePrepare(consensusID, data string, requestID s
return &message, nil
}
func (ppp *PrePreparePool) IsExistingPrePrepare(prepareMsg *models.Message) bool {
func (ppp *PrePreparePool) IsExistingPrePrepare(prepareMsg *types2.Message) bool {
consensusMessage := prepareMsg.Payload
var exists bool
for _, v := range ppp.prePrepareMsgs[consensusMessage.ConsensusID] {
@ -45,7 +45,7 @@ func (ppp *PrePreparePool) IsExistingPrePrepare(prepareMsg *models.Message) bool
return exists
}
func (ppp *PrePreparePool) IsValidPrePrepare(prePrepare *models.Message) bool {
func (ppp *PrePreparePool) IsValidPrePrepare(prePrepare *types2.Message) bool {
// TODO here we need to do validation of tx itself
consensusMsg := prePrepare.Payload
err := sigs.Verify(&types.Signature{Type: types.SigTypeEd25519, Data: consensusMsg.Signature}, prePrepare.From, []byte(consensusMsg.Data))
@ -56,10 +56,10 @@ func (ppp *PrePreparePool) IsValidPrePrepare(prePrepare *models.Message) bool {
return true
}
func (ppp *PrePreparePool) AddPrePrepare(prePrepare *models.Message) {
func (ppp *PrePreparePool) AddPrePrepare(prePrepare *types2.Message) {
consensusID := prePrepare.Payload.ConsensusID
if _, ok := ppp.prePrepareMsgs[consensusID]; !ok {
ppp.prePrepareMsgs[consensusID] = []*models.Message{}
ppp.prePrepareMsgs[consensusID] = []*types2.Message{}
}
ppp.prePrepareMsgs[consensusID] = append(ppp.prePrepareMsgs[consensusID], prePrepare)

View File

@ -1,26 +1,26 @@
package consensus
import (
"github.com/Secured-Finance/dione/models"
types2 "github.com/Secured-Finance/dione/consensus/types"
"github.com/Secured-Finance/dione/sigs"
"github.com/Secured-Finance/dione/types"
)
type PreparePool struct {
prepareMsgs map[string][]*models.Message
prepareMsgs map[string][]*types2.Message
privateKey []byte
}
func NewPreparePool() *PreparePool {
return &PreparePool{
prepareMsgs: map[string][]*models.Message{},
prepareMsgs: map[string][]*types2.Message{},
}
}
func (pp *PreparePool) CreatePrepare(prePrepareMsg *models.Message, privateKey []byte) (*models.Message, error) {
var message models.Message
message.Type = models.MessageTypePrepare
var consensusMsg models.ConsensusMessage
func (pp *PreparePool) CreatePrepare(prePrepareMsg *types2.Message, privateKey []byte) (*types2.Message, error) {
var message types2.Message
message.Type = types2.MessageTypePrepare
var consensusMsg types2.ConsensusMessage
prepareCMessage := prePrepareMsg.Payload
consensusMsg.ConsensusID = prepareCMessage.ConsensusID
consensusMsg.RequestID = prePrepareMsg.Payload.RequestID
@ -35,7 +35,7 @@ func (pp *PreparePool) CreatePrepare(prePrepareMsg *models.Message, privateKey [
return &message, nil
}
func (pp *PreparePool) IsExistingPrepare(prepareMsg *models.Message) bool {
func (pp *PreparePool) IsExistingPrepare(prepareMsg *types2.Message) bool {
consensusMessage := prepareMsg.Payload
var exists bool
for _, v := range pp.prepareMsgs[consensusMessage.ConsensusID] {
@ -46,7 +46,7 @@ func (pp *PreparePool) IsExistingPrepare(prepareMsg *models.Message) bool {
return exists
}
func (pp *PreparePool) IsValidPrepare(prepare *models.Message) bool {
func (pp *PreparePool) IsValidPrepare(prepare *types2.Message) bool {
consensusMsg := prepare.Payload
err := sigs.Verify(&types.Signature{Type: types.SigTypeEd25519, Data: consensusMsg.Signature}, prepare.From, []byte(consensusMsg.Data))
if err != nil {
@ -55,10 +55,10 @@ func (pp *PreparePool) IsValidPrepare(prepare *models.Message) bool {
return true
}
func (pp *PreparePool) AddPrepare(prepare *models.Message) {
func (pp *PreparePool) AddPrepare(prepare *types2.Message) {
consensusID := prepare.Payload.ConsensusID
if _, ok := pp.prepareMsgs[consensusID]; !ok {
pp.prepareMsgs[consensusID] = []*models.Message{}
pp.prepareMsgs[consensusID] = []*types2.Message{}
}
pp.prepareMsgs[consensusID] = append(pp.prepareMsgs[consensusID], prepare)

View File

@ -1,4 +1,4 @@
package models
package types
import (
"github.com/libp2p/go-libp2p-core/peer"
@ -15,6 +15,7 @@ const (
)
type ConsensusMessage struct {
_ struct{} `cbor:",toarray"`
ConsensusID string
Signature []byte
RequestID string
@ -23,7 +24,7 @@ type ConsensusMessage struct {
}
type Message struct {
Type MessageType `json:"type"`
Payload ConsensusMessage `json:"payload"`
From peer.ID `json:"-"`
Type MessageType `cbor:"1,keyasint"`
Payload ConsensusMessage `cbor:"2,keyasint"`
From peer.ID `cbor:"-"`
}

8
go.mod
View File

@ -18,12 +18,13 @@ require (
github.com/filecoin-project/go-address v0.0.4
github.com/filecoin-project/go-state-types v0.0.0-20201021025442-0ac4de847f4f
github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 // indirect
github.com/fxamacker/cbor/v2 v2.2.0
github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08 // indirect
github.com/go-ozzo/ozzo-validation v3.6.0+incompatible
github.com/gobwas/ws v1.0.4 // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/snappy v0.0.2-0.20190904063534-ff6b7dc882cf // indirect
github.com/google/gopacket v1.1.18 // indirect
github.com/google/logger v1.1.0
github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f // indirect
github.com/ipfs/go-datastore v0.4.5 // indirect
github.com/ipfs/go-ds-badger2 v0.1.1-0.20200708190120-187fc06f714e // indirect
@ -41,11 +42,11 @@ require (
github.com/mattn/go-colorable v0.1.6 // indirect
github.com/mattn/go-sqlite3 v1.9.0
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1
github.com/mitchellh/mapstructure v1.3.3
github.com/mitchellh/mapstructure v1.3.3 // indirect
github.com/multiformats/go-multiaddr v0.3.1
github.com/olekukonko/tablewriter v0.0.4 // indirect
github.com/onsi/ginkgo v1.14.0 // indirect
github.com/polydawn/refmt v0.0.0-20190809202753-05966cbd336a // indirect
github.com/polydawn/refmt v0.0.0-20190809202753-05966cbd336a
github.com/raulk/clock v1.1.0
github.com/rjeczalik/notify v0.9.2 // indirect
github.com/rs/cors v1.7.0 // indirect
@ -71,4 +72,5 @@ require (
gopkg.in/olebedev/go-duktape.v3 v3.0.0-20200619000410-60c24ae608a6 // indirect
gopkg.in/urfave/cli.v1 v1.20.0 // indirect
honnef.co/go/tools v0.0.1-2020.1.3 // indirect
nhooyr.io/websocket v1.8.6 // indirect
)

41
go.sum
View File

@ -208,10 +208,17 @@ github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fxamacker/cbor v1.5.1 h1:XjQWBgdmQyqimslUh5r4tUGmoqzHmBFQOImkWGi2awg=
github.com/fxamacker/cbor/v2 v2.2.0 h1:6eXqdDDe588rSYAi1HfZKbx6YYQO4mxQ9eC6xYpU/JQ=
github.com/fxamacker/cbor/v2 v2.2.0/go.mod h1:TA1xS00nchWmaBnEIxPSE5oHLuJBAVvqrtAnWBwBCVo=
github.com/garyburd/redigo v1.6.0/go.mod h1:NR3MbYisc3/PwhQ00EMzDiPmrwpPxAn5GI05/YaO1SY=
github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08 h1:f6D9Hr8xV8uYKlyuj8XIruxlh9WjVjdh1gIicAS7ays=
github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08/go.mod h1:x7DCsMOv1taUwEWCzT4cmDeAkigA5/QCwUodaVOe8Ww=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14=
github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M=
github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
@ -224,11 +231,26 @@ github.com/go-logfmt/logfmt v0.5.0 h1:TrB8swr/68K7m9CcGut2g3UOihhbcbiMAYiuTXdEih
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-ozzo/ozzo-validation v3.6.0+incompatible h1:msy24VGS42fKO9K1vLz82/GeYW1cILu7Nuuj1N3BBkE=
github.com/go-ozzo/ozzo-validation v3.6.0+incompatible/go.mod h1:gsEKFIVnabGBt6mXmxK0MoFy+cZoTJY6mu5Ll3LVLBU=
github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A=
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q=
github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no=
github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY=
github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=
github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8=
github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
github.com/gobwas/ws v1.0.4 h1:5eXU1CZhpQdq5kXbKb+sECH5Ia5KiO6CYzIzdlVx6Bs=
github.com/gobwas/ws v1.0.4/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
github.com/godbus/dbus v0.0.0-20190402143921-271e53dc4968/go.mod h1:/YcGZj5zSblfDWMMoOzV4fas9FZnQYTkDnsGvmh2Grw=
github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
@ -258,6 +280,7 @@ github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
@ -286,8 +309,6 @@ github.com/google/gopacket v1.1.17 h1:rMrlX2ZY2UbvT+sdz3+6J+pp2z+msCq9MxTU6ymxbB
github.com/google/gopacket v1.1.17/go.mod h1:UdDNZ1OO62aGYVnPhxT1U6aI7ukYtA/kB8vaU0diBUM=
github.com/google/gopacket v1.1.18 h1:lum7VRA9kdlvBi7/v2p7/zcbkduHaCH/SVVyurs7OpY=
github.com/google/gopacket v1.1.18/go.mod h1:UdDNZ1OO62aGYVnPhxT1U6aI7ukYtA/kB8vaU0diBUM=
github.com/google/logger v1.1.0 h1:saB74Etb4EAJNH3z74CVbCKk75hld/8T0CsXKetWCwM=
github.com/google/logger v1.1.0/go.mod h1:w7O8nrRr0xufejBlQMI83MXqRusvREoJdaAxV+CoAB4=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
@ -456,6 +477,7 @@ github.com/jsimonetti/rtnetlink v0.0.0-20200117123717-f846d4f6c1f4/go.mod h1:WGu
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.9 h1:9yzud/Ht36ygwatGx56VwCZtlI/2AD15T1X2sjSuGns=
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
@ -475,6 +497,7 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.10.4/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.10.7 h1:7rix8v8GpI3ZBb0nSozFRgbtXKv+hOe+qfEpZqybrAg=
github.com/klauspost/compress v1.10.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
@ -493,6 +516,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.7.0 h1:h93mCPfUSkaul3Ka/VG8uZdmW1uMHDGxzu0NWHuJmHY=
github.com/lib/pq v1.7.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
@ -746,8 +771,10 @@ github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh
github.com/mitchellh/mapstructure v1.3.3 h1:SzB1nHZ2Xi+17FP0zVQBHIZqvwRN9408fJO8h+eeNA8=
github.com/mitchellh/mapstructure v1.3.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8=
github.com/mr-tron/base58 v1.1.1/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8=
@ -1025,7 +1052,11 @@ github.com/uber/jaeger-client-go v2.23.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMW
github.com/uber/jaeger-lib v1.5.1-0.20181102163054-1fc5c315e03c/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.22.1 h1:+mkCCcOFKPnCmVYVcURKps1Xe+3zP90gSYGNfRkjoIY=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
@ -1063,6 +1094,8 @@ github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:
github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208 h1:1cngl9mPEoITZG8s8cVcUy5CeIBYhEESkOB7m6Gmkrk=
github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208/go.mod h1:IotVbo4F+mw0EzQ08zFqg7pK3FebNXpaMsRy2RT+Ees=
github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
@ -1206,8 +1239,6 @@ golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/
golang.org/x/net v0.0.0-20200602114024-627f9648deb9 h1:pNX+40auqi2JqRfOP1akLGtYcn15TUbkhwuCO3foqqM=
golang.org/x/net v0.0.0-20200602114024-627f9648deb9/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU=
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201016165138-7b1cca2348c0 h1:5kGOVHlq0euqwzgTC9Vu15p6fV1Wi0ArVi8da2urnVg=
golang.org/x/net v0.0.0-20201016165138-7b1cca2348c0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@ -1441,6 +1472,8 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3 h1:sXmLre5bzIR6ypkjXCDI3jHPssRhc8KD/Ome589sc3U=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
nhooyr.io/websocket v1.8.6 h1:s+C3xAMLwGmlI31Nyn/eAehUlZPwfYZu2JXM621Q5/k=
nhooyr.io/websocket v1.8.6/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=

View File

@ -26,7 +26,7 @@ import (
"github.com/Secured-Finance/dione/config"
"github.com/Secured-Finance/dione/consensus"
"github.com/Secured-Finance/dione/ethclient"
"github.com/Secured-Finance/dione/pb"
"github.com/Secured-Finance/dione/pubsub"
"github.com/libp2p/go-libp2p"
crypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
@ -40,7 +40,7 @@ const (
type Node struct {
Host host.Host
PubSubRouter *pb.PubSubRouter
PubSubRouter *pubsub.PubSubRouter
GlobalCtx context.Context
GlobalCtxCancel context.CancelFunc
OracleTopic string
@ -153,7 +153,7 @@ func (n *Node) setupSolanaClient() {
}
func (n *Node) setupPubsub() {
n.PubSubRouter = pb.NewPubSubRouter(n.Host, n.OracleTopic)
n.PubSubRouter = pubsub.NewPubSubRouter(n.Host, n.OracleTopic)
// wait for setting up pubsub
//time.Sleep(3 * time.Second)
}

View File

@ -1,5 +0,0 @@
package pb
import "github.com/Secured-Finance/dione/models"
type Handler func(message *models.Message)

7
pubsub/handler.go Normal file
View File

@ -0,0 +1,7 @@
package pubsub
import (
"github.com/Secured-Finance/dione/consensus/types"
)
type Handler func(message *types.Message)

View File

@ -1,10 +1,12 @@
package pb
package pubsub
import (
"context"
"encoding/json"
"github.com/Secured-Finance/dione/models"
"github.com/fxamacker/cbor/v2"
"github.com/Secured-Finance/dione/consensus/types"
host "github.com/libp2p/go-libp2p-core/host"
peer "github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
@ -12,12 +14,13 @@ import (
)
type PubSubRouter struct {
node host.Host
Pubsub *pubsub.PubSub
context context.Context
contextCancel context.CancelFunc
handlers map[models.MessageType][]Handler
oracleTopic string
node host.Host
Pubsub *pubsub.PubSub
context context.Context
contextCancel context.CancelFunc
serviceSubscription *pubsub.Subscription
handlers map[types.MessageType][]Handler
oracleTopic string
}
func NewPubSubRouter(h host.Host, oracleTopic string) *PubSubRouter {
@ -27,7 +30,7 @@ func NewPubSubRouter(h host.Host, oracleTopic string) *PubSubRouter {
node: h,
context: ctx,
contextCancel: ctxCancel,
handlers: make(map[models.MessageType][]Handler),
handlers: make(map[types.MessageType][]Handler),
}
pb, err := pubsub.NewFloodSub(
@ -40,10 +43,13 @@ func NewPubSubRouter(h host.Host, oracleTopic string) *PubSubRouter {
}
psr.oracleTopic = oracleTopic
subscription, err := pb.Subscribe(oracleTopic)
topic, err := pb.Join(oracleTopic)
if err != nil {
logrus.Fatal("Error occurred when subscribing to service topic", err)
}
subscription, err := topic.Subscribe()
psr.serviceSubscription = subscription
psr.Pubsub = pb
go func() {
@ -76,8 +82,8 @@ func (psr *PubSubRouter) handleMessage(p *pubsub.Message) {
if senderPeerID == psr.node.ID() {
return
}
var message models.Message
err = json.Unmarshal(p.Data, &message)
var message types.Message
err = cbor.Unmarshal(p.Data, &message)
if err != nil {
logrus.Warn("Unable to decode message data! " + err.Error())
return
@ -93,7 +99,7 @@ func (psr *PubSubRouter) handleMessage(p *pubsub.Message) {
}
}
func (psr *PubSubRouter) Hook(messageType models.MessageType, handler Handler) {
func (psr *PubSubRouter) Hook(messageType types.MessageType, handler Handler) {
handlers, ok := psr.handlers[messageType]
if !ok {
emptyArray := []Handler{}
@ -103,8 +109,8 @@ func (psr *PubSubRouter) Hook(messageType models.MessageType, handler Handler) {
psr.handlers[messageType] = append(handlers, handler)
}
func (psr *PubSubRouter) BroadcastToServiceTopic(msg *models.Message) error {
data, err := json.Marshal(msg)
func (psr *PubSubRouter) BroadcastToServiceTopic(msg *types.Message) error {
data, err := cbor.Marshal(msg)
if err != nil {
return err
}

View File

@ -1,4 +1,4 @@
package models
package types
import (
"github.com/filecoin-project/go-address"