From 2278277f7659da025bbfb12a688dd47b318d6e15 Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Fri, 4 Dec 2020 22:30:03 +0400 Subject: [PATCH] Fix cbor marshalling/unmarshalling pubsub message (invalid UTF8 string), partly fix gossipsub (sadly, bootstrap node still doesn't see other messages in network) --- {node => cache}/event_log_cache.go | 9 +++++---- consensus/consensus.go | 10 +++++----- consensus/pre_prepare_pool.go | 20 +++++++++----------- consensus/types/message.go | 11 +++++------ node/node.go | 10 ++++++---- pubsub/pubsub_router.go | 14 +++++++++----- types/task.go | 1 - 7 files changed, 39 insertions(+), 36 deletions(-) rename {node => cache}/event_log_cache.go (70%) diff --git a/node/event_log_cache.go b/cache/event_log_cache.go similarity index 70% rename from node/event_log_cache.go rename to cache/event_log_cache.go index e4bf50d..f83e190 100644 --- a/node/event_log_cache.go +++ b/cache/event_log_cache.go @@ -1,6 +1,7 @@ -package node +package cache import ( + oracleEmitter "github.com/Secured-Finance/dione/contracts/oracleemitter" "github.com/VictoriaMetrics/fastcache" "github.com/fxamacker/cbor/v2" ) @@ -31,11 +32,11 @@ func (elc *EventLogCache) Store(key string, event interface{}) error { return nil } -func (elc *EventLogCache) Get(key string) (interface{}, error) { +func (elc *EventLogCache) GetOracleRequestEvent(key string) (*oracleEmitter.OracleEmitterNewOracleRequest, error) { var mData []byte - elc.cache.GetBig(mData, []byte(key)) + mData = elc.cache.GetBig(mData, []byte(key)) - var event interface{} + var event *oracleEmitter.OracleEmitterNewOracleRequest err := cbor.Unmarshal(mData, &event) if err != nil { return nil, err diff --git a/consensus/consensus.go b/consensus/consensus.go index a86af73..e4b9cb2 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -4,7 +4,7 @@ import ( "math/big" "sync" - "github.com/Secured-Finance/dione/node" + "github.com/Secured-Finance/dione/cache" oracleEmitter "github.com/Secured-Finance/dione/contracts/oracleemitter" @@ -36,7 +36,7 @@ type ConsensusData struct { alreadySubmitted bool } -func NewPBFTConsensusManager(psb *pubsub.PubSubRouter, minApprovals int, privKey []byte, ethereumClient *ethclient.EthereumClient, miner *Miner, evc *node.EventLogCache) *PBFTConsensusManager { +func NewPBFTConsensusManager(psb *pubsub.PubSubRouter, minApprovals int, privKey []byte, ethereumClient *ethclient.EthereumClient, miner *Miner, evc *cache.EventLogCache) *PBFTConsensusManager { pcm := &PBFTConsensusManager{} pcm.psb = psb pcm.miner = miner @@ -60,8 +60,8 @@ func (pcm *PBFTConsensusManager) Propose(consensusID string, task types2.DioneTa consensusID, task, requestEvent.RequestID.String(), - requestEvent.CallbackAddress.Hex(), - string(requestEvent.CallbackMethodID[:]), + requestEvent.CallbackAddress.Bytes(), + requestEvent.CallbackMethodID[:], pcm.privKey, ) if err != nil { @@ -151,7 +151,7 @@ func (pcm *PBFTConsensusManager) handleCommit(message *types.Message) { if !ok { logrus.Errorf("Failed to parse big int: %v", consensusMsg.RequestID) } - callbackAddress := common.HexToAddress(consensusMsg.CallbackAddress) + callbackAddress := common.BytesToAddress(consensusMsg.CallbackAddress) err := pcm.ethereumClient.SubmitRequestAnswer(reqID, string(consensusMsg.Task.Payload), callbackAddress) if err != nil { logrus.Errorf("Failed to submit on-chain result: %w", err) diff --git a/consensus/pre_prepare_pool.go b/consensus/pre_prepare_pool.go index 34f90e8..911d1ed 100644 --- a/consensus/pre_prepare_pool.go +++ b/consensus/pre_prepare_pool.go @@ -1,13 +1,12 @@ package consensus import ( + "bytes" "fmt" + "github.com/Secured-Finance/dione/cache" + "github.com/Secured-Finance/dione/consensus/validation" - oracleEmitter "github.com/Secured-Finance/dione/contracts/oracleemitter" - - "github.com/Secured-Finance/dione/node" - "github.com/filecoin-project/go-state-types/crypto" types2 "github.com/Secured-Finance/dione/consensus/types" @@ -21,10 +20,10 @@ import ( type PrePreparePool struct { prePrepareMsgs map[string][]*types2.Message miner *Miner - eventLogCache *node.EventLogCache + eventLogCache *cache.EventLogCache } -func NewPrePreparePool(miner *Miner, evc *node.EventLogCache) *PrePreparePool { +func NewPrePreparePool(miner *Miner, evc *cache.EventLogCache) *PrePreparePool { return &PrePreparePool{ prePrepareMsgs: map[string][]*types2.Message{}, miner: miner, @@ -32,7 +31,7 @@ func NewPrePreparePool(miner *Miner, evc *node.EventLogCache) *PrePreparePool { } } -func (pp *PrePreparePool) CreatePrePrepare(consensusID string, task types.DioneTask, requestID, callbackAddress, callbackMethodID string, privateKey []byte) (*types2.Message, error) { +func (pp *PrePreparePool) CreatePrePrepare(consensusID string, task types.DioneTask, requestID string, callbackAddress, callbackMethodID, privateKey []byte) (*types2.Message, error) { var message types2.Message message.Type = types2.MessageTypePrePrepare var consensusMsg types2.ConsensusMessage @@ -78,14 +77,13 @@ func (ppp *PrePreparePool) IsValidPrePrepare(prePrepare *types2.Message) bool { ///////////////////////////////// // === verify if request exists in event log cache === - requestEventPlain, err := ppp.eventLogCache.Get("request_" + consensusMsg.RequestID) + requestEvent, err := ppp.eventLogCache.GetOracleRequestEvent("request_" + consensusMsg.RequestID) if err != nil { logrus.Errorf("the incoming request task event doesn't exist in the EVC, or is broken: %v", err) return false } - requestEvent := requestEventPlain.(*oracleEmitter.OracleEmitterNewOracleRequest) - if requestEvent.CallbackAddress.String() != consensusMsg.CallbackAddress || - string(requestEvent.CallbackMethodID[:]) != consensusMsg.CallbackMethodID || + if bytes.Compare(requestEvent.CallbackAddress.Bytes(), consensusMsg.CallbackAddress) != 0 || + bytes.Compare(requestEvent.CallbackMethodID[:], consensusMsg.CallbackMethodID) != 0 || requestEvent.OriginChain != consensusMsg.Task.OriginChain || requestEvent.RequestType != consensusMsg.Task.RequestType || requestEvent.RequestParams != consensusMsg.Task.RequestParams { diff --git a/consensus/types/message.go b/consensus/types/message.go index 23e1691..7b6284a 100644 --- a/consensus/types/message.go +++ b/consensus/types/message.go @@ -16,17 +16,16 @@ const ( ) type ConsensusMessage struct { - _ struct{} `cbor:",toarray" hash:"-"` ConsensusID string Signature []byte `hash:"-"` RequestID string - CallbackAddress string - CallbackMethodID string + CallbackAddress []byte + CallbackMethodID []byte Task types.DioneTask } type Message struct { - Type MessageType `cbor:"1,keyasint"` - Payload ConsensusMessage `cbor:"2,keyasint"` - From peer.ID `cbor:"-"` + Type MessageType + Payload ConsensusMessage + From peer.ID `cbor:"-"` } diff --git a/node/node.go b/node/node.go index cdf7a20..0eb3e86 100644 --- a/node/node.go +++ b/node/node.go @@ -9,6 +9,8 @@ import ( "os" "time" + "github.com/Secured-Finance/dione/cache" + pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/Secured-Finance/dione/drand" @@ -62,7 +64,7 @@ type Node struct { Miner *consensus.Miner Beacon beacon.BeaconNetworks Wallet *wallet.LocalWallet - EventLogCache *EventLogCache + EventLogCache *cache.EventLogCache } func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTime time.Duration) (*Node, error) { @@ -219,8 +221,8 @@ func (n *Node) subscribeOnEthContractsAsync(ctx context.Context) { }() } -func provideEventLogCache() *EventLogCache { - return NewEventLogCache() +func provideEventLogCache() *cache.EventLogCache { + return cache.NewEventLogCache() } func provideMiner(peerID peer.ID, ethAddress common.Address, beacon beacon.BeaconNetworks, ethClient *ethclient.EthereumClient, privateKey []byte) *consensus.Miner { @@ -287,7 +289,7 @@ func providePubsubRouter(lhost host.Host, config *config.Config) *pubsub2.PubSub return pubsub2.NewPubSubRouter(lhost, config.PubSub.ServiceTopicName) } -func provideConsensusManager(psb *pubsub2.PubSubRouter, miner *consensus.Miner, ethClient *ethclient.EthereumClient, privateKey []byte, minApprovals int, evc *EventLogCache) *consensus.PBFTConsensusManager { +func provideConsensusManager(psb *pubsub2.PubSubRouter, miner *consensus.Miner, ethClient *ethclient.EthereumClient, privateKey []byte, minApprovals int, evc *cache.EventLogCache) *consensus.PBFTConsensusManager { return consensus.NewPBFTConsensusManager(psb, minApprovals, privateKey, ethClient, miner, evc) } diff --git a/pubsub/pubsub_router.go b/pubsub/pubsub_router.go index 90790aa..103da37 100644 --- a/pubsub/pubsub_router.go +++ b/pubsub/pubsub_router.go @@ -20,7 +20,8 @@ type PubSubRouter struct { contextCancel context.CancelFunc serviceSubscription *pubsub.Subscription handlers map[types.MessageType][]Handler - oracleTopic string + oracleTopicName string + oracleTopic *pubsub.Topic } func NewPubSubRouter(h host.Host, oracleTopic string) *PubSubRouter { @@ -33,16 +34,18 @@ func NewPubSubRouter(h host.Host, oracleTopic string) *PubSubRouter { handlers: make(map[types.MessageType][]Handler), } - pb, err := pubsub.NewFloodSub( + pb, err := pubsub.NewGossipSub( context.TODO(), - psr.node, //pubsub.WithMessageSigning(true), + psr.node, + //pubsub.WithMessageSigning(true), //pubsub.WithStrictSignatureVerification(true), + pubsub.WithPeerExchange(true), ) if err != nil { logrus.Fatal("Error occurred when create PubSub", err) } - psr.oracleTopic = oracleTopic + psr.oracleTopicName = oracleTopic topic, err := pb.Join(oracleTopic) if err != nil { logrus.Fatal("Error occurred when subscribing to service topic", err) @@ -51,6 +54,7 @@ func NewPubSubRouter(h host.Host, oracleTopic string) *PubSubRouter { subscription, err := topic.Subscribe() psr.serviceSubscription = subscription psr.Pubsub = pb + psr.oracleTopic = topic go func() { for { @@ -112,7 +116,7 @@ func (psr *PubSubRouter) BroadcastToServiceTopic(msg *types.Message) error { if err != nil { return err } - err = psr.Pubsub.Publish(psr.oracleTopic, data) + err = psr.oracleTopic.Publish(context.TODO(), data) return err } diff --git a/types/task.go b/types/task.go index 6d3dc7a..3db7f56 100644 --- a/types/task.go +++ b/types/task.go @@ -19,7 +19,6 @@ func (e DrandRound) String() string { // DioneTask represents the values of task computation type DioneTask struct { - _ struct{} `cbor:",toarray" hash:"-"` OriginChain uint8 RequestType string RequestParams string