From 6636a279e7d2010aee10f040a44e08c0a457234a Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Wed, 4 Nov 2020 22:17:01 +0400 Subject: [PATCH] Refactor node unit test code, make it passable --- config/config.go | 4 +- consensus/consensus.go | 9 +++-- node/node.go | 10 ++--- node/node_test.go | 89 +++++++++++++++++++++++++++++------------- 4 files changed, 75 insertions(+), 37 deletions(-) diff --git a/config/config.go b/config/config.go index 15e5384..9d18856 100644 --- a/config/config.go +++ b/config/config.go @@ -7,7 +7,7 @@ import ( ) type Config struct { - ListenPort string `mapstructure:"listen_port"` + ListenPort int `mapstructure:"listen_port"` ListenAddr string `mapstructure:"listen_addr"` BootstrapNodes []string `mapstructure:"bootstrap_node_multiaddr"` Rendezvous string `mapstructure:"rendezvous"` @@ -47,7 +47,7 @@ func NewConfig(configPath string) (*Config, error) { cfg := &Config{ ListenAddr: "localhost", - ListenPort: ":8000", + ListenPort: 8000, BootstrapNodes: []string{"/ip4/127.0.0.1/tcp/0"}, Rendezvous: "filecoin-p2p-oracle", Ethereum: EthereumConfig{ diff --git a/consensus/consensus.go b/consensus/consensus.go index 0236026..959ff46 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -31,6 +31,7 @@ type ConsensusData struct { mutex sync.Mutex result string test bool + onConsensusFinishCallback func(finalData string) } func NewPBFTConsensusManager(psb *pb.PubSubRouter, maxFaultNodes int) *PBFTConsensusManager { @@ -42,10 +43,11 @@ func NewPBFTConsensusManager(psb *pb.PubSubRouter, maxFaultNodes int) *PBFTConse return pcm } -func (pcm *PBFTConsensusManager) NewTestConsensus(data string, consensusID string) { +func (pcm *PBFTConsensusManager) NewTestConsensus(data string, consensusID string, onConsensusFinishCallback func(finalData string)) { //consensusID := uuid.New().String() cData := &ConsensusData{} cData.test = true + cData.onConsensusFinishCallback = onConsensusFinishCallback pcm.Consensuses[consensusID] = cData msg := models.Message{} @@ -109,6 +111,8 @@ func (pcm *PBFTConsensusManager) handleCommitMessage(message *models.Message) { } data := pcm.Consensuses[consensusID] + data.mutex.Lock() + defer data.mutex.Unlock() if data.State == consensusCommitted { logrus.Debug("consensus already finished, dropping COMMIT message") return @@ -116,13 +120,12 @@ func (pcm *PBFTConsensusManager) handleCommitMessage(message *models.Message) { logrus.Debug("received commit msg") - data.mutex.Lock() data.commitCount++ - data.mutex.Unlock() if data.commitCount > 2*pcm.maxFaultNodes+1 { data.State = consensusCommitted data.result = message.Payload["data"].(string) logrus.Debug("consensus successfully finished with result: " + data.result) + data.onConsensusFinishCallback(data.result) } } diff --git a/node/node.go b/node/node.go index 0fbeea8..7a033fa 100644 --- a/node/node.go +++ b/node/node.go @@ -49,12 +49,12 @@ func NewNode(configPath string) (*Node, error) { return node, nil } -func (n *Node) setupNode(ctx context.Context, prvKey crypto.PrivKey, pexDiscoveryUpdateTime time.Duration, maxFaultNodes int) { +func (n *Node) setupNode(ctx context.Context, prvKey crypto.PrivKey, pexDiscoveryUpdateTime time.Duration) { n.setupLibp2pHost(context.TODO(), prvKey, pexDiscoveryUpdateTime) //n.setupEthereumClient() //n.setupFilecoinClient() n.setupPubsub() - n.setupConsensusManager(maxFaultNodes) + n.setupConsensusManager(n.Config.ConsensusMaxFaultNodes) } func (n *Node) setupEthereumClient() error { @@ -84,7 +84,7 @@ func (n *Node) setupConsensusManager(maxFaultNodes int) { } func (n *Node) setupLibp2pHost(ctx context.Context, privateKey crypto.PrivKey, pexDiscoveryUpdateTime time.Duration) { - listenMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%s", n.Config.ListenAddr, n.Config.ListenPort)) + listenMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", n.Config.ListenAddr, n.Config.ListenPort)) if err != nil { logrus.Fatal("Failed to generate new node multiaddress:", err) } @@ -98,7 +98,7 @@ func (n *Node) setupLibp2pHost(ctx context.Context, privateKey crypto.PrivKey, p } n.Host = host - logrus.Info(fmt.Sprintf("[*] Your Multiaddress Is: /ip4/%s/tcp/%v/p2p/%s", n.Config.ListenAddr, n.Config.ListenPort, host.ID().Pretty())) + logrus.Info(fmt.Sprintf("[*] Your Multiaddress Is: /ip4/%s/tcp/%d/p2p/%s", n.Config.ListenAddr, n.Config.ListenPort, host.ID().Pretty())) var bootstrapMaddrs []multiaddr.Multiaddr for _, a := range n.Config.BootstrapNodes { @@ -178,7 +178,7 @@ func Start() error { node.GlobalCtx = ctx node.GlobalCtxCancel = ctxCancel - node.setupNode(ctx, privKey, DefaultPEXUpdateTime, node.Config.ConsensusMaxFaultNodes) + node.setupNode(ctx, privKey, DefaultPEXUpdateTime) for { select { case <-ctx.Done(): diff --git a/node/node_test.go b/node/node_test.go index 8060d0d..7c66347 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -3,6 +3,8 @@ package node import ( "context" "fmt" + "math/rand" + "sync" "testing" "time" @@ -14,44 +16,55 @@ func TestConsensus(t *testing.T) { logrus.SetLevel(logrus.DebugLevel) //log.SetAllLoggers(log.LevelDebug) + boolgen := newBoolgen() + rand.Seed(time.Now().UnixNano()) + port := rand.Intn(100) + 10000 + cfg := &config.Config{ - ListenPort: "1234", + ListenPort: port, ListenAddr: "0.0.0.0", Rendezvous: "dione", PubSub: config.PubSubConfig{ - ProtocolID: "/test/1.0", + ProtocolID: "/dione/1.0", }, + ConsensusMaxFaultNodes: 3, } - //cfg.BootstrapNodes = "/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN" + var nodes []*Node - // setup first node - node1 := newNode(cfg) + bNode := newNode(cfg) + t.Logf("Bootstrap ID: %s", bNode.Host.ID()) + cfg.BootstrapNodes = []string{bNode.Host.Addrs()[0].String() + fmt.Sprintf("/p2p/%s", bNode.Host.ID().String())} + nodes = append(nodes, bNode) - // setup second node - cfg.ListenPort = "1235" - cfg.BootstrapNodes = []string{node1.Host.Addrs()[0].String() + fmt.Sprintf("/p2p/%s", node1.Host.ID().String())} - node2 := newNode(cfg) + maxNodes := 10 - // setup third node - cfg.ListenPort = "1236" - node3 := newNode(cfg) + for i := 1; i <= maxNodes; i++ { + cfg.ListenPort += 1 + node := newNode(cfg) + nodes = append(nodes, node) + } - cfg.ListenPort = "1237" - node4 := newNode(cfg) - cfg.ListenPort = "1238" - node5 := newNode(cfg) - cfg.ListenPort = "1239" - node6 := newNode(cfg) + time.Sleep(5*time.Second) - time.Sleep(10 * time.Second) - go node2.ConsensusManager.NewTestConsensus("test", "123") - go node1.ConsensusManager.NewTestConsensus("test1", "123") - go node3.ConsensusManager.NewTestConsensus("test", "123") - go node4.ConsensusManager.NewTestConsensus("test1", "123") - go node5.ConsensusManager.NewTestConsensus("test", "123") - go node6.ConsensusManager.NewTestConsensus("test2", "123") - select {} + var wg sync.WaitGroup + + wg.Add(len(nodes)) + for _, n := range nodes { + var testData string + if boolgen.Bool() { + testData = "test" + } else { + testData = "test1" + } + n.ConsensusManager.NewTestConsensus(testData, "123", func(finalData string) { + if finalData != "test" { + t.Errorf("Expected final data %s, not %s", "test", finalData) + } + wg.Done() + }) + } + wg.Wait() } func newNode(cfg *config.Config) *Node { @@ -67,6 +80,28 @@ func newNode(cfg *config.Config) *Node { GlobalCtx: ctx, GlobalCtxCancel: ctxCancel, } - node.setupNode(ctx, privKey, 1*time.Second, 3) + node.setupNode(ctx, privKey, 1*time.Second) return node } + +type boolgen struct { + src rand.Source + cache int64 + remaining int +} + +func newBoolgen() *boolgen { + return &boolgen{src: rand.NewSource(time.Now().UnixNano())} +} + +func (b *boolgen) Bool() bool { + if b.remaining == 0 { + b.cache, b.remaining = b.src.Int63(), 63 + } + + result := b.cache&0x01 == 1 + b.cache >>= 1 + b.remaining-- + + return result +}