Fix cbor marshalling/unmarshalling pubsub message (invalid UTF8 string), partly fix gossipsub (sadly, bootstrap node still doesn't see other messages in network)

This commit is contained in:
ChronosX88 2020-12-04 22:30:03 +04:00
parent e3ce7f4175
commit 2278277f76
Signed by: ChronosXYZ
GPG Key ID: 085A69A82C8C511A
7 changed files with 39 additions and 36 deletions

View File

@ -1,6 +1,7 @@
package node package cache
import ( import (
oracleEmitter "github.com/Secured-Finance/dione/contracts/oracleemitter"
"github.com/VictoriaMetrics/fastcache" "github.com/VictoriaMetrics/fastcache"
"github.com/fxamacker/cbor/v2" "github.com/fxamacker/cbor/v2"
) )
@ -31,11 +32,11 @@ func (elc *EventLogCache) Store(key string, event interface{}) error {
return nil return nil
} }
func (elc *EventLogCache) Get(key string) (interface{}, error) { func (elc *EventLogCache) GetOracleRequestEvent(key string) (*oracleEmitter.OracleEmitterNewOracleRequest, error) {
var mData []byte var mData []byte
elc.cache.GetBig(mData, []byte(key)) mData = elc.cache.GetBig(mData, []byte(key))
var event interface{} var event *oracleEmitter.OracleEmitterNewOracleRequest
err := cbor.Unmarshal(mData, &event) err := cbor.Unmarshal(mData, &event)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -4,7 +4,7 @@ import (
"math/big" "math/big"
"sync" "sync"
"github.com/Secured-Finance/dione/node" "github.com/Secured-Finance/dione/cache"
oracleEmitter "github.com/Secured-Finance/dione/contracts/oracleemitter" oracleEmitter "github.com/Secured-Finance/dione/contracts/oracleemitter"
@ -36,7 +36,7 @@ type ConsensusData struct {
alreadySubmitted bool alreadySubmitted bool
} }
func NewPBFTConsensusManager(psb *pubsub.PubSubRouter, minApprovals int, privKey []byte, ethereumClient *ethclient.EthereumClient, miner *Miner, evc *node.EventLogCache) *PBFTConsensusManager { func NewPBFTConsensusManager(psb *pubsub.PubSubRouter, minApprovals int, privKey []byte, ethereumClient *ethclient.EthereumClient, miner *Miner, evc *cache.EventLogCache) *PBFTConsensusManager {
pcm := &PBFTConsensusManager{} pcm := &PBFTConsensusManager{}
pcm.psb = psb pcm.psb = psb
pcm.miner = miner pcm.miner = miner
@ -60,8 +60,8 @@ func (pcm *PBFTConsensusManager) Propose(consensusID string, task types2.DioneTa
consensusID, consensusID,
task, task,
requestEvent.RequestID.String(), requestEvent.RequestID.String(),
requestEvent.CallbackAddress.Hex(), requestEvent.CallbackAddress.Bytes(),
string(requestEvent.CallbackMethodID[:]), requestEvent.CallbackMethodID[:],
pcm.privKey, pcm.privKey,
) )
if err != nil { if err != nil {
@ -151,7 +151,7 @@ func (pcm *PBFTConsensusManager) handleCommit(message *types.Message) {
if !ok { if !ok {
logrus.Errorf("Failed to parse big int: %v", consensusMsg.RequestID) logrus.Errorf("Failed to parse big int: %v", consensusMsg.RequestID)
} }
callbackAddress := common.HexToAddress(consensusMsg.CallbackAddress) callbackAddress := common.BytesToAddress(consensusMsg.CallbackAddress)
err := pcm.ethereumClient.SubmitRequestAnswer(reqID, string(consensusMsg.Task.Payload), callbackAddress) err := pcm.ethereumClient.SubmitRequestAnswer(reqID, string(consensusMsg.Task.Payload), callbackAddress)
if err != nil { if err != nil {
logrus.Errorf("Failed to submit on-chain result: %w", err) logrus.Errorf("Failed to submit on-chain result: %w", err)

View File

@ -1,13 +1,12 @@
package consensus package consensus
import ( import (
"bytes"
"fmt" "fmt"
"github.com/Secured-Finance/dione/cache"
"github.com/Secured-Finance/dione/consensus/validation" "github.com/Secured-Finance/dione/consensus/validation"
oracleEmitter "github.com/Secured-Finance/dione/contracts/oracleemitter"
"github.com/Secured-Finance/dione/node"
"github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/crypto"
types2 "github.com/Secured-Finance/dione/consensus/types" types2 "github.com/Secured-Finance/dione/consensus/types"
@ -21,10 +20,10 @@ import (
type PrePreparePool struct { type PrePreparePool struct {
prePrepareMsgs map[string][]*types2.Message prePrepareMsgs map[string][]*types2.Message
miner *Miner miner *Miner
eventLogCache *node.EventLogCache eventLogCache *cache.EventLogCache
} }
func NewPrePreparePool(miner *Miner, evc *node.EventLogCache) *PrePreparePool { func NewPrePreparePool(miner *Miner, evc *cache.EventLogCache) *PrePreparePool {
return &PrePreparePool{ return &PrePreparePool{
prePrepareMsgs: map[string][]*types2.Message{}, prePrepareMsgs: map[string][]*types2.Message{},
miner: miner, miner: miner,
@ -32,7 +31,7 @@ func NewPrePreparePool(miner *Miner, evc *node.EventLogCache) *PrePreparePool {
} }
} }
func (pp *PrePreparePool) CreatePrePrepare(consensusID string, task types.DioneTask, requestID, callbackAddress, callbackMethodID string, privateKey []byte) (*types2.Message, error) { func (pp *PrePreparePool) CreatePrePrepare(consensusID string, task types.DioneTask, requestID string, callbackAddress, callbackMethodID, privateKey []byte) (*types2.Message, error) {
var message types2.Message var message types2.Message
message.Type = types2.MessageTypePrePrepare message.Type = types2.MessageTypePrePrepare
var consensusMsg types2.ConsensusMessage var consensusMsg types2.ConsensusMessage
@ -78,14 +77,13 @@ func (ppp *PrePreparePool) IsValidPrePrepare(prePrepare *types2.Message) bool {
///////////////////////////////// /////////////////////////////////
// === verify if request exists in event log cache === // === verify if request exists in event log cache ===
requestEventPlain, err := ppp.eventLogCache.Get("request_" + consensusMsg.RequestID) requestEvent, err := ppp.eventLogCache.GetOracleRequestEvent("request_" + consensusMsg.RequestID)
if err != nil { if err != nil {
logrus.Errorf("the incoming request task event doesn't exist in the EVC, or is broken: %v", err) logrus.Errorf("the incoming request task event doesn't exist in the EVC, or is broken: %v", err)
return false return false
} }
requestEvent := requestEventPlain.(*oracleEmitter.OracleEmitterNewOracleRequest) if bytes.Compare(requestEvent.CallbackAddress.Bytes(), consensusMsg.CallbackAddress) != 0 ||
if requestEvent.CallbackAddress.String() != consensusMsg.CallbackAddress || bytes.Compare(requestEvent.CallbackMethodID[:], consensusMsg.CallbackMethodID) != 0 ||
string(requestEvent.CallbackMethodID[:]) != consensusMsg.CallbackMethodID ||
requestEvent.OriginChain != consensusMsg.Task.OriginChain || requestEvent.OriginChain != consensusMsg.Task.OriginChain ||
requestEvent.RequestType != consensusMsg.Task.RequestType || requestEvent.RequestType != consensusMsg.Task.RequestType ||
requestEvent.RequestParams != consensusMsg.Task.RequestParams { requestEvent.RequestParams != consensusMsg.Task.RequestParams {

View File

@ -16,17 +16,16 @@ const (
) )
type ConsensusMessage struct { type ConsensusMessage struct {
_ struct{} `cbor:",toarray" hash:"-"`
ConsensusID string ConsensusID string
Signature []byte `hash:"-"` Signature []byte `hash:"-"`
RequestID string RequestID string
CallbackAddress string CallbackAddress []byte
CallbackMethodID string CallbackMethodID []byte
Task types.DioneTask Task types.DioneTask
} }
type Message struct { type Message struct {
Type MessageType `cbor:"1,keyasint"` Type MessageType
Payload ConsensusMessage `cbor:"2,keyasint"` Payload ConsensusMessage
From peer.ID `cbor:"-"` From peer.ID `cbor:"-"`
} }

View File

@ -9,6 +9,8 @@ import (
"os" "os"
"time" "time"
"github.com/Secured-Finance/dione/cache"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/Secured-Finance/dione/drand" "github.com/Secured-Finance/dione/drand"
@ -62,7 +64,7 @@ type Node struct {
Miner *consensus.Miner Miner *consensus.Miner
Beacon beacon.BeaconNetworks Beacon beacon.BeaconNetworks
Wallet *wallet.LocalWallet Wallet *wallet.LocalWallet
EventLogCache *EventLogCache EventLogCache *cache.EventLogCache
} }
func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTime time.Duration) (*Node, error) { func NewNode(config *config.Config, prvKey crypto.PrivKey, pexDiscoveryUpdateTime time.Duration) (*Node, error) {
@ -219,8 +221,8 @@ func (n *Node) subscribeOnEthContractsAsync(ctx context.Context) {
}() }()
} }
func provideEventLogCache() *EventLogCache { func provideEventLogCache() *cache.EventLogCache {
return NewEventLogCache() return cache.NewEventLogCache()
} }
func provideMiner(peerID peer.ID, ethAddress common.Address, beacon beacon.BeaconNetworks, ethClient *ethclient.EthereumClient, privateKey []byte) *consensus.Miner { func provideMiner(peerID peer.ID, ethAddress common.Address, beacon beacon.BeaconNetworks, ethClient *ethclient.EthereumClient, privateKey []byte) *consensus.Miner {
@ -287,7 +289,7 @@ func providePubsubRouter(lhost host.Host, config *config.Config) *pubsub2.PubSub
return pubsub2.NewPubSubRouter(lhost, config.PubSub.ServiceTopicName) return pubsub2.NewPubSubRouter(lhost, config.PubSub.ServiceTopicName)
} }
func provideConsensusManager(psb *pubsub2.PubSubRouter, miner *consensus.Miner, ethClient *ethclient.EthereumClient, privateKey []byte, minApprovals int, evc *EventLogCache) *consensus.PBFTConsensusManager { func provideConsensusManager(psb *pubsub2.PubSubRouter, miner *consensus.Miner, ethClient *ethclient.EthereumClient, privateKey []byte, minApprovals int, evc *cache.EventLogCache) *consensus.PBFTConsensusManager {
return consensus.NewPBFTConsensusManager(psb, minApprovals, privateKey, ethClient, miner, evc) return consensus.NewPBFTConsensusManager(psb, minApprovals, privateKey, ethClient, miner, evc)
} }

View File

@ -20,7 +20,8 @@ type PubSubRouter struct {
contextCancel context.CancelFunc contextCancel context.CancelFunc
serviceSubscription *pubsub.Subscription serviceSubscription *pubsub.Subscription
handlers map[types.MessageType][]Handler handlers map[types.MessageType][]Handler
oracleTopic string oracleTopicName string
oracleTopic *pubsub.Topic
} }
func NewPubSubRouter(h host.Host, oracleTopic string) *PubSubRouter { func NewPubSubRouter(h host.Host, oracleTopic string) *PubSubRouter {
@ -33,16 +34,18 @@ func NewPubSubRouter(h host.Host, oracleTopic string) *PubSubRouter {
handlers: make(map[types.MessageType][]Handler), handlers: make(map[types.MessageType][]Handler),
} }
pb, err := pubsub.NewFloodSub( pb, err := pubsub.NewGossipSub(
context.TODO(), context.TODO(),
psr.node, //pubsub.WithMessageSigning(true), psr.node,
//pubsub.WithMessageSigning(true),
//pubsub.WithStrictSignatureVerification(true), //pubsub.WithStrictSignatureVerification(true),
pubsub.WithPeerExchange(true),
) )
if err != nil { if err != nil {
logrus.Fatal("Error occurred when create PubSub", err) logrus.Fatal("Error occurred when create PubSub", err)
} }
psr.oracleTopic = oracleTopic psr.oracleTopicName = oracleTopic
topic, err := pb.Join(oracleTopic) topic, err := pb.Join(oracleTopic)
if err != nil { if err != nil {
logrus.Fatal("Error occurred when subscribing to service topic", err) logrus.Fatal("Error occurred when subscribing to service topic", err)
@ -51,6 +54,7 @@ func NewPubSubRouter(h host.Host, oracleTopic string) *PubSubRouter {
subscription, err := topic.Subscribe() subscription, err := topic.Subscribe()
psr.serviceSubscription = subscription psr.serviceSubscription = subscription
psr.Pubsub = pb psr.Pubsub = pb
psr.oracleTopic = topic
go func() { go func() {
for { for {
@ -112,7 +116,7 @@ func (psr *PubSubRouter) BroadcastToServiceTopic(msg *types.Message) error {
if err != nil { if err != nil {
return err return err
} }
err = psr.Pubsub.Publish(psr.oracleTopic, data) err = psr.oracleTopic.Publish(context.TODO(), data)
return err return err
} }

View File

@ -19,7 +19,6 @@ func (e DrandRound) String() string {
// DioneTask represents the values of task computation // DioneTask represents the values of task computation
type DioneTask struct { type DioneTask struct {
_ struct{} `cbor:",toarray" hash:"-"`
OriginChain uint8 OriginChain uint8
RequestType string RequestType string
RequestParams string RequestParams string