Refactor node unit test code, make it passable

This commit is contained in:
ChronosX88 2020-11-04 22:17:01 +04:00
parent 3f252de3f8
commit 6636a279e7
Signed by: ChronosXYZ
GPG Key ID: 085A69A82C8C511A
4 changed files with 75 additions and 37 deletions

View File

@ -7,7 +7,7 @@ import (
) )
type Config struct { type Config struct {
ListenPort string `mapstructure:"listen_port"` ListenPort int `mapstructure:"listen_port"`
ListenAddr string `mapstructure:"listen_addr"` ListenAddr string `mapstructure:"listen_addr"`
BootstrapNodes []string `mapstructure:"bootstrap_node_multiaddr"` BootstrapNodes []string `mapstructure:"bootstrap_node_multiaddr"`
Rendezvous string `mapstructure:"rendezvous"` Rendezvous string `mapstructure:"rendezvous"`
@ -47,7 +47,7 @@ func NewConfig(configPath string) (*Config, error) {
cfg := &Config{ cfg := &Config{
ListenAddr: "localhost", ListenAddr: "localhost",
ListenPort: ":8000", ListenPort: 8000,
BootstrapNodes: []string{"/ip4/127.0.0.1/tcp/0"}, BootstrapNodes: []string{"/ip4/127.0.0.1/tcp/0"},
Rendezvous: "filecoin-p2p-oracle", Rendezvous: "filecoin-p2p-oracle",
Ethereum: EthereumConfig{ Ethereum: EthereumConfig{

View File

@ -31,6 +31,7 @@ type ConsensusData struct {
mutex sync.Mutex mutex sync.Mutex
result string result string
test bool test bool
onConsensusFinishCallback func(finalData string)
} }
func NewPBFTConsensusManager(psb *pb.PubSubRouter, maxFaultNodes int) *PBFTConsensusManager { func NewPBFTConsensusManager(psb *pb.PubSubRouter, maxFaultNodes int) *PBFTConsensusManager {
@ -42,10 +43,11 @@ func NewPBFTConsensusManager(psb *pb.PubSubRouter, maxFaultNodes int) *PBFTConse
return pcm return pcm
} }
func (pcm *PBFTConsensusManager) NewTestConsensus(data string, consensusID string) { func (pcm *PBFTConsensusManager) NewTestConsensus(data string, consensusID string, onConsensusFinishCallback func(finalData string)) {
//consensusID := uuid.New().String() //consensusID := uuid.New().String()
cData := &ConsensusData{} cData := &ConsensusData{}
cData.test = true cData.test = true
cData.onConsensusFinishCallback = onConsensusFinishCallback
pcm.Consensuses[consensusID] = cData pcm.Consensuses[consensusID] = cData
msg := models.Message{} msg := models.Message{}
@ -109,6 +111,8 @@ func (pcm *PBFTConsensusManager) handleCommitMessage(message *models.Message) {
} }
data := pcm.Consensuses[consensusID] data := pcm.Consensuses[consensusID]
data.mutex.Lock()
defer data.mutex.Unlock()
if data.State == consensusCommitted { if data.State == consensusCommitted {
logrus.Debug("consensus already finished, dropping COMMIT message") logrus.Debug("consensus already finished, dropping COMMIT message")
return return
@ -116,13 +120,12 @@ func (pcm *PBFTConsensusManager) handleCommitMessage(message *models.Message) {
logrus.Debug("received commit msg") logrus.Debug("received commit msg")
data.mutex.Lock()
data.commitCount++ data.commitCount++
data.mutex.Unlock()
if data.commitCount > 2*pcm.maxFaultNodes+1 { if data.commitCount > 2*pcm.maxFaultNodes+1 {
data.State = consensusCommitted data.State = consensusCommitted
data.result = message.Payload["data"].(string) data.result = message.Payload["data"].(string)
logrus.Debug("consensus successfully finished with result: " + data.result) logrus.Debug("consensus successfully finished with result: " + data.result)
data.onConsensusFinishCallback(data.result)
} }
} }

View File

@ -49,12 +49,12 @@ func NewNode(configPath string) (*Node, error) {
return node, nil return node, nil
} }
func (n *Node) setupNode(ctx context.Context, prvKey crypto.PrivKey, pexDiscoveryUpdateTime time.Duration, maxFaultNodes int) { func (n *Node) setupNode(ctx context.Context, prvKey crypto.PrivKey, pexDiscoveryUpdateTime time.Duration) {
n.setupLibp2pHost(context.TODO(), prvKey, pexDiscoveryUpdateTime) n.setupLibp2pHost(context.TODO(), prvKey, pexDiscoveryUpdateTime)
//n.setupEthereumClient() //n.setupEthereumClient()
//n.setupFilecoinClient() //n.setupFilecoinClient()
n.setupPubsub() n.setupPubsub()
n.setupConsensusManager(maxFaultNodes) n.setupConsensusManager(n.Config.ConsensusMaxFaultNodes)
} }
func (n *Node) setupEthereumClient() error { func (n *Node) setupEthereumClient() error {
@ -84,7 +84,7 @@ func (n *Node) setupConsensusManager(maxFaultNodes int) {
} }
func (n *Node) setupLibp2pHost(ctx context.Context, privateKey crypto.PrivKey, pexDiscoveryUpdateTime time.Duration) { func (n *Node) setupLibp2pHost(ctx context.Context, privateKey crypto.PrivKey, pexDiscoveryUpdateTime time.Duration) {
listenMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%s", n.Config.ListenAddr, n.Config.ListenPort)) listenMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", n.Config.ListenAddr, n.Config.ListenPort))
if err != nil { if err != nil {
logrus.Fatal("Failed to generate new node multiaddress:", err) logrus.Fatal("Failed to generate new node multiaddress:", err)
} }
@ -98,7 +98,7 @@ func (n *Node) setupLibp2pHost(ctx context.Context, privateKey crypto.PrivKey, p
} }
n.Host = host n.Host = host
logrus.Info(fmt.Sprintf("[*] Your Multiaddress Is: /ip4/%s/tcp/%v/p2p/%s", n.Config.ListenAddr, n.Config.ListenPort, host.ID().Pretty())) logrus.Info(fmt.Sprintf("[*] Your Multiaddress Is: /ip4/%s/tcp/%d/p2p/%s", n.Config.ListenAddr, n.Config.ListenPort, host.ID().Pretty()))
var bootstrapMaddrs []multiaddr.Multiaddr var bootstrapMaddrs []multiaddr.Multiaddr
for _, a := range n.Config.BootstrapNodes { for _, a := range n.Config.BootstrapNodes {
@ -178,7 +178,7 @@ func Start() error {
node.GlobalCtx = ctx node.GlobalCtx = ctx
node.GlobalCtxCancel = ctxCancel node.GlobalCtxCancel = ctxCancel
node.setupNode(ctx, privKey, DefaultPEXUpdateTime, node.Config.ConsensusMaxFaultNodes) node.setupNode(ctx, privKey, DefaultPEXUpdateTime)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():

View File

@ -3,6 +3,8 @@ package node
import ( import (
"context" "context"
"fmt" "fmt"
"math/rand"
"sync"
"testing" "testing"
"time" "time"
@ -14,44 +16,55 @@ func TestConsensus(t *testing.T) {
logrus.SetLevel(logrus.DebugLevel) logrus.SetLevel(logrus.DebugLevel)
//log.SetAllLoggers(log.LevelDebug) //log.SetAllLoggers(log.LevelDebug)
boolgen := newBoolgen()
rand.Seed(time.Now().UnixNano())
port := rand.Intn(100) + 10000
cfg := &config.Config{ cfg := &config.Config{
ListenPort: "1234", ListenPort: port,
ListenAddr: "0.0.0.0", ListenAddr: "0.0.0.0",
Rendezvous: "dione", Rendezvous: "dione",
PubSub: config.PubSubConfig{ PubSub: config.PubSubConfig{
ProtocolID: "/test/1.0", ProtocolID: "/dione/1.0",
}, },
ConsensusMaxFaultNodes: 3,
} }
//cfg.BootstrapNodes = "/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN" var nodes []*Node
// setup first node bNode := newNode(cfg)
node1 := newNode(cfg) t.Logf("Bootstrap ID: %s", bNode.Host.ID())
cfg.BootstrapNodes = []string{bNode.Host.Addrs()[0].String() + fmt.Sprintf("/p2p/%s", bNode.Host.ID().String())}
nodes = append(nodes, bNode)
// setup second node maxNodes := 10
cfg.ListenPort = "1235"
cfg.BootstrapNodes = []string{node1.Host.Addrs()[0].String() + fmt.Sprintf("/p2p/%s", node1.Host.ID().String())}
node2 := newNode(cfg)
// setup third node for i := 1; i <= maxNodes; i++ {
cfg.ListenPort = "1236" cfg.ListenPort += 1
node3 := newNode(cfg) node := newNode(cfg)
nodes = append(nodes, node)
}
cfg.ListenPort = "1237" time.Sleep(5*time.Second)
node4 := newNode(cfg)
cfg.ListenPort = "1238"
node5 := newNode(cfg)
cfg.ListenPort = "1239"
node6 := newNode(cfg)
time.Sleep(10 * time.Second) var wg sync.WaitGroup
go node2.ConsensusManager.NewTestConsensus("test", "123")
go node1.ConsensusManager.NewTestConsensus("test1", "123") wg.Add(len(nodes))
go node3.ConsensusManager.NewTestConsensus("test", "123") for _, n := range nodes {
go node4.ConsensusManager.NewTestConsensus("test1", "123") var testData string
go node5.ConsensusManager.NewTestConsensus("test", "123") if boolgen.Bool() {
go node6.ConsensusManager.NewTestConsensus("test2", "123") testData = "test"
select {} } else {
testData = "test1"
}
n.ConsensusManager.NewTestConsensus(testData, "123", func(finalData string) {
if finalData != "test" {
t.Errorf("Expected final data %s, not %s", "test", finalData)
}
wg.Done()
})
}
wg.Wait()
} }
func newNode(cfg *config.Config) *Node { func newNode(cfg *config.Config) *Node {
@ -67,6 +80,28 @@ func newNode(cfg *config.Config) *Node {
GlobalCtx: ctx, GlobalCtx: ctx,
GlobalCtxCancel: ctxCancel, GlobalCtxCancel: ctxCancel,
} }
node.setupNode(ctx, privKey, 1*time.Second, 3) node.setupNode(ctx, privKey, 1*time.Second)
return node return node
} }
type boolgen struct {
src rand.Source
cache int64
remaining int
}
func newBoolgen() *boolgen {
return &boolgen{src: rand.NewSource(time.Now().UnixNano())}
}
func (b *boolgen) Bool() bool {
if b.remaining == 0 {
b.cache, b.remaining = b.src.Int63(), 63
}
result := b.cache&0x01 == 1
b.cache >>= 1
b.remaining--
return result
}