Rename some consensus subsystem components, clean up init code

This commit is contained in:
ChronosX88 2021-08-24 22:41:40 +03:00
parent c1cb7a72f7
commit b9047797cc
Signed by: ChronosXYZ
GPG Key ID: 085A69A82C8C511A
10 changed files with 138 additions and 201 deletions

View File

@ -6,9 +6,8 @@ import (
"fmt"
"sync"
"github.com/asaskevich/EventBus"
"github.com/Secured-Finance/dione/beacon"
"github.com/asaskevich/EventBus"
"github.com/drand/drand/chain"
"github.com/drand/drand/client"
httpClient "github.com/drand/drand/client/http"
@ -27,10 +26,6 @@ import (
types "github.com/Secured-Finance/dione/types"
)
var log = logrus.WithFields(logrus.Fields{
"subsystem": "drand",
})
type DrandBeacon struct {
DrandClient client.Client
PublicKey kyber.Point
@ -70,13 +65,11 @@ func NewDrandBeacon(ps *pubsub.PubSub, bus EventBus.Bus) (*DrandBeacon, error) {
if ps != nil {
opts = append(opts, libp2pClient.WithPubsub(ps))
} else {
log.Info("Initiated drand with PubSub")
}
drandClient, err := client.Wrap(clients, opts...)
if err != nil {
return nil, fmt.Errorf("Couldn't create Drand clients")
logrus.Fatal(fmt.Errorf("cannot create drand client: %w", err))
}
db := &DrandBeacon{
@ -86,6 +79,8 @@ func NewDrandBeacon(ps *pubsub.PubSub, bus EventBus.Bus) (*DrandBeacon, error) {
PublicKey: drandChain.PublicKey,
}
logrus.Info("DRAND beacon subsystem has been initialized!")
return db, nil
}
@ -103,7 +98,7 @@ func (db *DrandBeacon) Run(ctx context.Context) error {
func (db *DrandBeacon) getLatestDrandResult() error {
latestDround, err := db.DrandClient.Get(context.TODO(), 0)
if err != nil {
log.Errorf("failed to get latest drand round: %v", err)
logrus.Errorf("failed to get latest drand round: %v", err)
return err
}
db.cacheValue(newBeaconEntryFromDrandResult(latestDround))
@ -138,12 +133,12 @@ func (db *DrandBeacon) Entry(ctx context.Context, round uint64) (types.BeaconEnt
}
start := lib.Clock.Now()
log.Infof("start fetching randomness: round %v", round)
logrus.Infof("start fetching randomness: round %v", round)
resp, err := db.DrandClient.Get(ctx, round)
if err != nil {
return types.BeaconEntry{}, fmt.Errorf("drand failed Get request: %w", err)
}
log.Infof("done fetching randomness: round %v, took %v", round, lib.Clock.Since(start))
logrus.Infof("done fetching randomness: round %v, took %v", round, lib.Clock.Since(start))
return newBeaconEntryFromDrandResult(resp), nil
}
func (db *DrandBeacon) cacheValue(res types.BeaconEntry) {

View File

@ -9,6 +9,8 @@ import (
"strings"
"sync"
"github.com/multiformats/go-multiaddr"
"github.com/fxamacker/cbor/v2"
"github.com/asaskevich/EventBus"
@ -54,8 +56,26 @@ type syncManager struct {
bus EventBus.Bus
}
func NewSyncManager(bus EventBus.Bus, bc *blockchain.BlockChain, mp *pool.Mempool, p2pRPCClient *gorpc.Client, bootstrapPeer peer.ID, psb *pubsub.PubSubRouter) SyncManager {
func NewSyncManager(
bus EventBus.Bus,
bc *blockchain.BlockChain,
mp *pool.Mempool,
p2pRPCClient *gorpc.Client,
bootstrapAddresses []multiaddr.Multiaddr,
psb *pubsub.PubSubRouter,
) SyncManager {
ctx, cancelFunc := context.WithCancel(context.Background())
bootstrapPeer := peer.ID("")
if bootstrapAddresses != nil {
addr, err := peer.AddrInfoFromP2pAddr(bootstrapAddresses[0]) // FIXME
if err != nil {
logrus.Fatal(err)
}
bootstrapPeer = addr.ID
}
sm := &syncManager{
bus: bus,
blockpool: bc,
@ -68,6 +88,8 @@ func NewSyncManager(bus EventBus.Bus, bc *blockchain.BlockChain, mp *pool.Mempoo
psb: psb,
}
logrus.Info("Blockchain sync subsystem has been successfully initialized!")
return sm
}

View File

@ -8,6 +8,8 @@ import (
"sort"
"time"
"github.com/libp2p/go-libp2p-core/host"
drand2 "github.com/Secured-Finance/dione/beacon/drand"
"github.com/libp2p/go-libp2p-core/peer"
@ -37,43 +39,43 @@ var (
ErrNoAcceptedBlocks = errors.New("there is no accepted blocks")
)
type PBFTConsensusManager struct {
type ConsensusHandler struct {
bus EventBus.Bus
psb *pubsub.PubSubRouter
privKey crypto.PrivKey
validator *ConsensusValidator
ethereumClient *ethclient.EthereumClient
miner *blockchain.Miner
consensusRoundPool *ConsensusStatePool
consensus *ConsensusManager
mempool *pool.Mempool
blockchain *blockchain.BlockChain
address peer.ID
stateChangeChannels map[string]map[State][]chan bool
}
func NewPBFTConsensusManager(
func NewConsensusHandler(
bus EventBus.Bus,
psb *pubsub.PubSubRouter,
privKey crypto.PrivKey,
ethereumClient *ethclient.EthereumClient,
miner *blockchain.Miner,
bc *blockchain.BlockChain,
bp *ConsensusStatePool,
bp *ConsensusManager,
db *drand2.DrandBeacon,
mempool *pool.Mempool,
address peer.ID,
) *PBFTConsensusManager {
pcm := &PBFTConsensusManager{
h host.Host,
) *ConsensusHandler {
pcm := &ConsensusHandler{
psb: psb,
miner: miner,
validator: NewConsensusValidator(miner, bc, db),
privKey: privKey,
ethereumClient: ethereumClient,
bus: bus,
consensusRoundPool: bp,
consensus: bp,
mempool: mempool,
blockchain: bc,
address: address,
address: h.ID(),
stateChangeChannels: map[string]map[State][]chan bool{},
}
@ -131,10 +133,12 @@ func NewPBFTConsensusManager(
}
}, true)
logrus.Info("Consensus handler has been initialized!")
return pcm
}
func (pcm *PBFTConsensusManager) propose(blk *types3.Block) error {
func (pcm *ConsensusHandler) propose(blk *types3.Block) error {
cmsg := &types.ConsensusMessage{
Type: StateStatusPrePrepared,
Block: blk,
@ -148,7 +152,7 @@ func (pcm *PBFTConsensusManager) propose(blk *types3.Block) error {
time.Sleep(1 * time.Second) // wait until all nodes will commit previous blocks
if err = pcm.consensusRoundPool.InsertMessageIntoLog(cmsg); err != nil {
if err = pcm.consensus.InsertMessageIntoLog(cmsg); err != nil {
return err
}
@ -160,7 +164,7 @@ func (pcm *PBFTConsensusManager) propose(blk *types3.Block) error {
return nil
}
func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.PubSubMessage) {
func (pcm *ConsensusHandler) handlePrePrepare(message *pubsub.PubSubMessage) {
var prePrepare types.PrePrepareMessage
err := cbor.Unmarshal(message.Payload, &prePrepare)
if err != nil {
@ -184,7 +188,7 @@ func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.PubSubMessage)
return
}
err = pcm.consensusRoundPool.InsertMessageIntoLog(cmsg)
err = pcm.consensus.InsertMessageIntoLog(cmsg)
if err != nil {
logrus.WithField("err", err.Error()).Warn("Failed to add PREPARE message to log")
return
@ -196,7 +200,7 @@ func (pcm *PBFTConsensusManager) handlePrePrepare(message *pubsub.PubSubMessage)
}).Debug("Received PREPREPARE message")
}
func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) {
func (pcm *ConsensusHandler) handlePrepare(message *pubsub.PubSubMessage) {
var prepare types.PrepareMessage
err := cbor.Unmarshal(message.Payload, &prepare)
if err != nil {
@ -216,7 +220,7 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) {
return
}
err = pcm.consensusRoundPool.InsertMessageIntoLog(cmsg)
err = pcm.consensus.InsertMessageIntoLog(cmsg)
if err != nil {
logrus.WithField("err", err.Error()).Warn("Failed to add PREPARE message to log")
return
@ -228,7 +232,7 @@ func (pcm *PBFTConsensusManager) handlePrepare(message *pubsub.PubSubMessage) {
}).Debug("Received PREPARE message")
}
func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.PubSubMessage) {
func (pcm *ConsensusHandler) handleCommit(message *pubsub.PubSubMessage) {
var commit types.CommitMessage
err := cbor.Unmarshal(message.Payload, &commit)
if err != nil {
@ -248,7 +252,7 @@ func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.PubSubMessage) {
return
}
err = pcm.consensusRoundPool.InsertMessageIntoLog(cmsg)
err = pcm.consensus.InsertMessageIntoLog(cmsg)
if err != nil {
logrus.WithField("err", err.Error()).Warn("Failed to add COMMIT message to log")
return
@ -260,7 +264,7 @@ func (pcm *PBFTConsensusManager) handleCommit(message *pubsub.PubSubMessage) {
}).Debug("Received COMMIT message")
}
func (pcm *PBFTConsensusManager) onNewBeaconEntry(entry types2.BeaconEntry) {
func (pcm *ConsensusHandler) onNewBeaconEntry(entry types2.BeaconEntry) {
block, err := pcm.commitAcceptedBlocks()
height, _ := pcm.blockchain.GetLatestBlockHeight()
if err != nil {
@ -327,7 +331,7 @@ func (pcm *PBFTConsensusManager) onNewBeaconEntry(entry types2.BeaconEntry) {
}
}
func (pcm *PBFTConsensusManager) submitTasksFromBlock(block *types3.Block) {
func (pcm *ConsensusHandler) submitTasksFromBlock(block *types3.Block) {
for _, tx := range block.Data {
var task types2.DioneTask
err := cbor.Unmarshal(tx.Data, &task)
@ -362,8 +366,8 @@ func (pcm *PBFTConsensusManager) submitTasksFromBlock(block *types3.Block) {
}
}
func (pcm *PBFTConsensusManager) commitAcceptedBlocks() (*types3.Block, error) {
blocks := pcm.consensusRoundPool.GetAllBlocksWithCommit()
func (pcm *ConsensusHandler) commitAcceptedBlocks() (*types3.Block, error) {
blocks := pcm.consensus.GetAllBlocksWithCommit()
if blocks == nil {
return nil, ErrNoAcceptedBlocks
}
@ -409,7 +413,7 @@ func (pcm *PBFTConsensusManager) commitAcceptedBlocks() (*types3.Block, error) {
"height": selectedBlock.Header.Height,
"miner": selectedBlock.Header.Proposer.String(),
}).Info("Committed new block")
pcm.consensusRoundPool.Prune()
pcm.consensus.Prune()
for _, v := range selectedBlock.Data {
err := pcm.mempool.DeleteTx(v.Hash)
if err != nil {

View File

@ -5,6 +5,8 @@ import (
"fmt"
"sync"
"github.com/Secured-Finance/dione/config"
types2 "github.com/Secured-Finance/dione/consensus/types"
"github.com/Secured-Finance/dione/blockchain/pool"
@ -26,8 +28,7 @@ const (
StateStatusCommited
)
// ConsensusStatePool is pool for blocks that isn't not validated or committed yet
type ConsensusStatePool struct {
type ConsensusManager struct {
mempool *pool.Mempool
consensusInfoMap map[string]*ConsensusInfo
mapMutex sync.Mutex
@ -35,12 +36,12 @@ type ConsensusStatePool struct {
minApprovals int // FIXME
}
func NewConsensusRoundPool(mp *pool.Mempool, bus EventBus.Bus, minApprovals int) (*ConsensusStatePool, error) {
bp := &ConsensusStatePool{
func NewConsensusManager(mp *pool.Mempool, bus EventBus.Bus, cfg *config.Config) (*ConsensusManager, error) {
bp := &ConsensusManager{
consensusInfoMap: map[string]*ConsensusInfo{},
mempool: mp,
bus: bus,
minApprovals: minApprovals,
minApprovals: cfg.ConsensusMinApprovals,
}
return bp, nil
@ -53,7 +54,7 @@ type ConsensusInfo struct {
MessageLog *ConsensusMessageLog
}
func (crp *ConsensusStatePool) InsertMessageIntoLog(cmsg *types2.ConsensusMessage) error {
func (crp *ConsensusManager) InsertMessageIntoLog(cmsg *types2.ConsensusMessage) error {
crp.mapMutex.Lock()
defer crp.mapMutex.Unlock()
consensusInfo, ok := crp.consensusInfoMap[hex.EncodeToString(cmsg.Blockhash)]
@ -77,7 +78,7 @@ func (crp *ConsensusStatePool) InsertMessageIntoLog(cmsg *types2.ConsensusMessag
return nil
}
func (crp *ConsensusStatePool) maybeUpdateConsensusState(ci *ConsensusInfo, cmsg *types2.ConsensusMessage) {
func (crp *ConsensusManager) maybeUpdateConsensusState(ci *ConsensusInfo, cmsg *types2.ConsensusMessage) {
if ci.State == StateStatusUnknown && cmsg.Type == types2.ConsensusMessageTypePrePrepare && cmsg.Block != nil {
ci.Block = cmsg.Block
logrus.WithField("hash", fmt.Sprintf("%x", cmsg.Block.Header.Hash)).Debug("New block discovered")
@ -97,14 +98,14 @@ func (crp *ConsensusStatePool) maybeUpdateConsensusState(ci *ConsensusInfo, cmsg
}
// Prune cleans known blocks list. It is called when new consensus round starts.
func (crp *ConsensusStatePool) Prune() {
func (crp *ConsensusManager) Prune() {
for k := range crp.consensusInfoMap {
delete(crp.consensusInfoMap, k)
}
crp.bus.Publish("blockpool:pruned")
}
func (crp *ConsensusStatePool) GetAllBlocksWithCommit() []*ConsensusInfo {
func (crp *ConsensusManager) GetAllBlocksWithCommit() []*ConsensusInfo {
crp.mapMutex.Lock()
defer crp.mapMutex.Unlock()
var consensusInfos []*ConsensusInfo

View File

@ -35,7 +35,6 @@ type DisputeManager struct {
ctx context.Context
bus EventBus.Bus
ethClient *ethclient.EthereumClient
pcm *PBFTConsensusManager
voteWindow time.Duration
blockchain *blockchain.BlockChain

11
consensus/fx_module.go Normal file
View File

@ -0,0 +1,11 @@
package consensus
import "go.uber.org/fx"
var Module = fx.Options(
fx.Provide(
NewConsensusManager,
NewConsensusHandler,
NewDisputeManager,
),
)

View File

@ -24,10 +24,14 @@ type NetworkService struct {
}
func NewNetworkService(bc *blockchain.BlockChain, mp *pool.Mempool) *NetworkService {
return &NetworkService{
ns := &NetworkService{
blockchain: bc,
mempool: mp,
}
logrus.Info("Direct RPC has been successfully initialized!")
return ns
}
func (s *NetworkService) LastBlockHeight(ctx context.Context, arg struct{}, reply *wire.LastBlockHeightReply) error {

View File

@ -4,6 +4,8 @@ import (
"context"
"time"
"github.com/asaskevich/EventBus"
"github.com/Secured-Finance/dione/blockchain"
drand2 "github.com/Secured-Finance/dione/beacon/drand"
@ -48,7 +50,7 @@ func runNode(
h host.Host,
mp *pool.Mempool,
syncManager sync.SyncManager,
consensusManager *consensus.PBFTConsensusManager,
consensusManager *consensus.ConsensusHandler,
pubSubRouter *pubsub.PubSubRouter,
disputeManager *consensus.DisputeManager,
db *drand2.DrandBeacon,
@ -188,28 +190,28 @@ func subscribeOnEthContractsAsync(ctx context.Context, ethClient *ethclient.Ethe
func Start() {
fx.New(
fx.Provide(
provideEventBus,
provideAppFlags,
provideConfig,
provideCacheManager,
providePrivateKey,
provideLibp2pHost,
provideEthereumClient,
providePubsub,
providePubsubRouter,
provideBootstrapAddrs,
providePeerDiscovery,
provideDrandBeacon,
provideMempool,
drand2.NewDrandBeacon,
pool.NewMempool,
blockchain.NewMiner,
provideBlockChain,
provideBlockPool,
provideSyncManager,
sync.NewSyncManager,
provideNetworkRPCHost,
provideNetworkService,
NewNetworkService,
provideDirectRPCClient,
provideConsensusManager,
consensus.NewDisputeManager,
provideCacheManager,
func() EventBus.Bus { return EventBus.New() },
),
consensus.Module,
fx.Invoke(
configureLogger,
configureDirectRPC,

View File

@ -12,6 +12,7 @@ import (
"runtime"
"github.com/Secured-Finance/dione/cache/inmemory"
"github.com/Secured-Finance/dione/cache/redis"
types2 "github.com/Secured-Finance/dione/blockchain/types"
@ -33,25 +34,17 @@ import (
gorpc "github.com/libp2p/go-libp2p-gorpc"
"github.com/Secured-Finance/dione/blockchain/sync"
"github.com/Secured-Finance/dione/blockchain/pool"
"github.com/Secured-Finance/dione/cache"
"github.com/Secured-Finance/dione/config"
"github.com/Secured-Finance/dione/consensus"
"github.com/Secured-Finance/dione/ethclient"
"github.com/Secured-Finance/dione/pubsub"
"github.com/Secured-Finance/dione/types"
"github.com/Secured-Finance/dione/wallet"
pex "github.com/Secured-Finance/go-libp2p-pex"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
pubsub2 "github.com/libp2p/go-libp2p-pubsub"
"github.com/multiformats/go-multiaddr"
"golang.org/x/xerrors"
)
const (
@ -71,31 +64,22 @@ func provideCacheManager(cfg *config.Config) cache.CacheManager {
return backend
}
func provideDrandBeacon(ps *pubsub.PubSubRouter, bus EventBus.Bus) *drand2.DrandBeacon {
db, err := drand2.NewDrandBeacon(ps.Pubsub, bus)
if err != nil {
logrus.Fatalf("Failed to setup drand beacon: %s", err)
}
logrus.Info("DRAND beacon subsystem has been initialized!")
return db
}
// FIXME: do we really need this?
func provideWallet(peerID peer.ID, privKey []byte) (*wallet.LocalWallet, error) {
// TODO make persistent keystore
kstore := wallet.NewMemKeyStore()
keyInfo := types.KeyInfo{
Type: types.KTEd25519,
PrivateKey: privKey,
}
kstore.Put(wallet.KNamePrefix+peerID.String(), keyInfo)
w, err := wallet.NewWallet(kstore)
if err != nil {
return nil, xerrors.Errorf("failed to setup wallet: %w", err)
}
return w, nil
}
//func provideWallet(peerID peer.ID, privKey []byte) (*wallet.LocalWallet, error) {
// // TODO make persistent keystore
// kstore := wallet.NewMemKeyStore()
// keyInfo := types.KeyInfo{
// Type: types.KTEd25519,
// PrivateKey: privKey,
// }
//
// kstore.Put(wallet.KNamePrefix+peerID.String(), keyInfo)
// w, err := wallet.NewWallet(kstore)
// if err != nil {
// return nil, xerrors.Errorf("failed to setup wallet: %w", err)
// }
// return w, nil
//}
func provideEthereumClient(config *config.Config) *ethclient.EthereumClient {
ethereum := ethclient.NewEthereumClient()
@ -109,38 +93,17 @@ func provideEthereumClient(config *config.Config) *ethclient.EthereumClient {
return ethereum
}
func providePubsubRouter(lhost host.Host, config *config.Config) *pubsub.PubSubRouter {
psb := pubsub.NewPubSubRouter(lhost, config.PubSub.ServiceTopicName, config.IsBootstrap)
logrus.Info("PubSub subsystem has been initialized!")
return psb
func providePubsub(h host.Host) (*pubsub2.PubSub, error) {
return pubsub2.NewFloodSub(
context.TODO(),
h,
)
}
func provideConsensusManager(
h host.Host,
bus EventBus.Bus,
psb *pubsub.PubSubRouter,
miner *blockchain.Miner,
bc *blockchain.BlockChain,
ethClient *ethclient.EthereumClient,
privateKey crypto.PrivKey,
bp *consensus.ConsensusStatePool,
db *drand2.DrandBeacon,
mp *pool.Mempool,
) *consensus.PBFTConsensusManager {
c := consensus.NewPBFTConsensusManager(
bus,
psb,
privateKey,
ethClient,
miner,
bc,
bp,
db,
mp,
h.ID(),
)
logrus.Info("Consensus subsystem has been initialized!")
return c
func providePubsubRouter(h host.Host, ps *pubsub2.PubSub, config *config.Config) *pubsub.PubSubRouter {
psb := pubsub.NewPubSubRouter(h, ps, config.PubSub.ServiceTopicName, config.IsBootstrap)
logrus.Info("PubSub subsystem has been initialized!")
return psb
}
func provideLibp2pHost(config *config.Config, privateKey crypto.PrivKey) host.Host {
@ -210,64 +173,10 @@ func provideBlockChain(config *config.Config, bus EventBus.Bus, miner *blockchai
return bc
}
func provideMempool(bus EventBus.Bus) *pool.Mempool {
mp, err := pool.NewMempool(bus)
if err != nil {
logrus.Fatalf("Failed to initialize mempool: %s", err.Error())
}
logrus.Info("Mempool has been successfully initialized!")
return mp
}
func provideSyncManager(
bus EventBus.Bus,
bp *blockchain.BlockChain,
mp *pool.Mempool,
c *gorpc.Client,
bootstrapAddresses []multiaddr.Multiaddr,
psb *pubsub.PubSubRouter,
) sync.SyncManager {
bootstrapPeerID := peer.ID("")
if bootstrapAddresses != nil {
addr, err := peer.AddrInfoFromP2pAddr(bootstrapAddresses[0]) // FIXME
if err != nil {
logrus.Fatal(err)
}
bootstrapPeerID = addr.ID
}
sm := sync.NewSyncManager(bus, bp, mp, c, bootstrapPeerID, psb)
logrus.Info("Blockchain sync subsystem has been successfully initialized!")
return sm
}
func provideDirectRPCClient(h host.Host) *gorpc.Client {
return gorpc.NewClient(h, DioneProtocolID)
}
func provideNetworkService(bp *blockchain.BlockChain, mp *pool.Mempool) *NetworkService {
ns := NewNetworkService(bp, mp)
logrus.Info("Direct RPC has been successfully initialized!")
return ns
}
func provideBlockPool(mp *pool.Mempool, bus EventBus.Bus, config *config.Config) *consensus.ConsensusStatePool {
bp, err := consensus.NewConsensusRoundPool(mp, bus, config.ConsensusMinApprovals)
if err != nil {
logrus.Fatalf("Failed to initialize blockpool: %s", err.Error())
}
logrus.Info("Consensus state pool has been successfully initialized!")
return bp
}
func provideEventBus() EventBus.Bus {
return EventBus.New()
}
func provideAppFlags() *AppFlags {
var flags AppFlags

View File

@ -24,7 +24,7 @@ type PubSubRouter struct {
type Handler func(message *PubSubMessage)
func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubRouter {
func NewPubSubRouter(h host.Host, ps *pubsub.PubSub, oracleTopic string, isBootstrap bool) *PubSubRouter {
ctx, ctxCancel := context.WithCancel(context.Background())
psr := &PubSubRouter{
@ -35,40 +35,30 @@ func NewPubSubRouter(h host.Host, oracleTopic string, isBootstrap bool) *PubSubR
oracleTopicName: oracleTopic,
}
var pbOptions []pubsub.Option
//var pbOptions []pubsub.Option
//
//if isBootstrap {
// // turn off the mesh in bootstrappers -- only do gossip and PX
// pubsub.GossipSubD = 0
// pubsub.GossipSubDscore = 0
// pubsub.GossipSubDlo = 0
// pubsub.GossipSubDhi = 0
// pubsub.GossipSubDout = 0
// pubsub.GossipSubDlazy = 64
// pubsub.GossipSubGossipFactor = 0.25
// pubsub.GossipSubPruneBackoff = 5 * time.Minute
// // turn on PX
// pbOptions = append(pbOptions, pubsub.WithPeerExchange(true))
//}
if isBootstrap {
// turn off the mesh in bootstrappers -- only do gossip and PX
//pubsub.GossipSubD = 0
//pubsub.GossipSubDscore = 0
//pubsub.GossipSubDlo = 0
//pubsub.GossipSubDhi = 0
//pubsub.GossipSubDout = 0
//pubsub.GossipSubDlazy = 64
//pubsub.GossipSubGossipFactor = 0.25
//pubsub.GossipSubPruneBackoff = 5 * time.Minute
// turn on PX
//pbOptions = append(pbOptions, pubsub.WithPeerExchange(true))
}
pb, err := pubsub.NewFloodSub(
context.TODO(),
psr.node,
pbOptions...,
)
if err != nil {
logrus.Fatalf("Error occurred when initializing PubSub subsystem: %v", err)
}
topic, err := pb.Join(oracleTopic)
topic, err := ps.Join(oracleTopic)
if err != nil {
logrus.Fatalf("Error occurred when subscribing to service topic: %v", err)
}
subscription, err := topic.Subscribe()
psr.serviceSubscription = subscription
psr.Pubsub = pb
psr.Pubsub = ps
psr.oracleTopic = topic
return psr