Implement PBFT Consensus manager and rename the current module

This commit is contained in:
ChronosX88 2020-10-21 23:54:40 +04:00
parent 9fa99ab8ef
commit 5f1f11d099
Signed by: ChronosXYZ
GPG Key ID: 085A69A82C8C511A
12 changed files with 305 additions and 60 deletions

2
.gitignore vendored
View File

@ -17,3 +17,5 @@
# Environment files # Environment files
.env .env
eth-contracts/node_modules eth-contracts/node_modules
/dione
/.dione-config.toml

View File

@ -67,7 +67,7 @@ Governance mechanism would take place on [snapshot.page](https://snapshot.page/#
[Bach Adylbekov](https://github.com/bahadylbekov) [Bach Adylbekov](https://github.com/bahadylbekov)
We welcome every contributions big and small! Take a look at the [community contributing notes](). Please make sure to check the [issues](https://github.com/Secured-Finance/p2p-oracle-node/issues). Search the closed ones before reporting things, and help us with the open ones. We welcome every contributions big and small! Take a look at the [community contributing notes](). Please make sure to check the [issues](https://github.com/Secured-Finance/dione/issues). Search the closed ones before reporting things, and help us with the open ones.
# License # License

View File

@ -1,7 +1,7 @@
package main package main
import ( import (
"github.com/Secured-Finance/p2p-oracle-node/node" "github.com/Secured-Finance/dione/node"
"github.com/ipfs/go-log" "github.com/ipfs/go-log"
) )

View File

@ -10,7 +10,6 @@ type Config struct {
Bootstrap bool `mapstructure:"is_bootstrap"` Bootstrap bool `mapstructure:"is_bootstrap"`
BootstrapNodeMultiaddr string `mapstructure:"bootstrap_node_multiaddr"` BootstrapNodeMultiaddr string `mapstructure:"bootstrap_node_multiaddr"`
Rendezvous string `mapstructure:"rendezvous"` Rendezvous string `mapstructure:"rendezvous"`
SessionKey string `mapstructure:"session_key"`
Ethereum EthereumConfig `mapstructure:"ethereum"` Ethereum EthereumConfig `mapstructure:"ethereum"`
Filecoin FilecoinConfig `mapstructure:"filecoin"` Filecoin FilecoinConfig `mapstructure:"filecoin"`
PubSub PubSubConfig `mapstructure:"pubSub"` PubSub PubSubConfig `mapstructure:"pubSub"`
@ -43,7 +42,6 @@ func NewConfig(configPath string) (*Config, error) {
Ethereum: EthereumConfig{ Ethereum: EthereumConfig{
PrivateKey: "", PrivateKey: "",
}, },
SessionKey: "go",
PubSub: PubSubConfig{ PubSub: PubSubConfig{
ProtocolID: "p2p-oracle", ProtocolID: "p2p-oracle",
}, },

View File

@ -1 +1,118 @@
package consensus package consensus
import (
"sync"
"github.com/Secured-Finance/dione/models"
"github.com/Secured-Finance/dione/pb"
"github.com/google/uuid"
"github.com/ipfs/go-log"
)
type ConsensusState int
const (
consensusPrePrepared ConsensusState = 0x0
consensusPrepared ConsensusState = 0x1
consensusCommitted ConsensusState = 0x2
testValidData = "test"
)
type PBFTConsensusManager struct {
psb *pb.PubSubRouter
logger *log.ZapEventLogger
Consensuses map[string]*ConsensusData
maxFaultNodes int
}
type ConsensusData struct {
preparedCount int
commitCount int
State ConsensusState
mutex sync.Mutex
test bool
}
func NewPBFTConsensusManager(psb *pb.PubSubRouter, maxFaultNodes int) *PBFTConsensusManager {
pcm := &PBFTConsensusManager{}
pcm.logger = log.Logger("PBFTConsensusManager")
pcm.Consensuses = make(map[string]*ConsensusData)
pcm.psb = psb
pcm.psb.Hook("prepared", pcm.handlePreparedMessage)
pcm.psb.Hook("commit", pcm.handleCommitMessage)
return pcm
}
func (pcm *PBFTConsensusManager) NewTestConsensus(data string) {
consensusID := uuid.New().String()
cData := &ConsensusData{}
cData.test = true
pcm.Consensuses[consensusID] = cData
msg := models.Message{}
msg.Type = "prepared"
msg.Payload = make(map[string]interface{})
msg.Payload["consensusID"] = consensusID
msg.Payload["data"] = data
pcm.psb.BroadcastToServiceTopic(&msg)
cData.State = consensusPrePrepared
pcm.logger.Debug("started new consensus: " + consensusID)
}
func (pcm *PBFTConsensusManager) handlePreparedMessage(message *models.Message) {
// TODO add check on view of the message
consensusID := message.Payload["consensusID"].(string)
if _, ok := pcm.Consensuses[consensusID]; !ok {
pcm.logger.Warn("Unknown consensus ID: " + consensusID)
return
}
data := pcm.Consensuses[consensusID]
// validate payload data
if data.test {
rData := message.Payload["data"].(string)
if rData != testValidData {
pcm.logger.Error("Incorrect data was received! Ignoring this message, because it was sent from fault node!")
return
}
} else {
// TODO
}
data.mutex.Lock()
data.preparedCount++
data.mutex.Unlock()
if data.preparedCount > 2*pcm.maxFaultNodes+1 {
msg := models.Message{}
msg.Type = "commit"
msg.Payload["consensusID"] = consensusID
err := pcm.psb.BroadcastToServiceTopic(&msg)
if err != nil {
pcm.logger.Warn("Unable to send COMMIT message: " + err.Error())
return
}
data.State = consensusPrepared
}
}
func (pcm *PBFTConsensusManager) handleCommitMessage(message *models.Message) {
// TODO add check on view of the message
consensusID := message.Payload["consensusID"].(string)
if _, ok := pcm.Consensuses[consensusID]; !ok {
pcm.logger.Warn("Unknown consensus ID: " + consensusID)
return
}
data := pcm.Consensuses[consensusID]
data.mutex.Lock()
data.commitCount++
data.mutex.Unlock()
if data.commitCount > 2*pcm.maxFaultNodes+1 {
pcm.logger.Debug("consensus successfully finished")
pcm.logger.Debug("")
data.State = consensusPrepared
}
}

3
go.mod
View File

@ -1,4 +1,4 @@
module github.com/Secured-Finance/p2p-oracle-node module github.com/Secured-Finance/dione
go 1.14 go 1.14
@ -11,6 +11,7 @@ require (
github.com/filecoin-project/lotus v0.4.2 github.com/filecoin-project/lotus v0.4.2
github.com/filecoin-project/specs-actors v0.8.1-0.20200723200253-a3c01bc62f99 github.com/filecoin-project/specs-actors v0.8.1-0.20200723200253-a3c01bc62f99
github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08 // indirect github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08 // indirect
github.com/google/uuid v1.1.1
github.com/hashicorp/raft v1.1.2 github.com/hashicorp/raft v1.1.2
github.com/ipfs/go-log v1.0.4 github.com/ipfs/go-log v1.0.4
github.com/karalabe/usb v0.0.0-20191104083709-911d15fe12a9 // indirect github.com/karalabe/usb v0.0.0-20191104083709-911d15fe12a9 // indirect

View File

@ -1,7 +0,0 @@
package node
import "github.com/Secured-Finance/p2p-oracle-node/models"
type Handler interface {
HandleMessage(message *models.Message)
}

View File

@ -6,10 +6,13 @@ import (
"flag" "flag"
"fmt" "fmt"
"sync" "sync"
"time"
"github.com/Secured-Finance/p2p-oracle-node/config" "github.com/Secured-Finance/dione/config"
"github.com/Secured-Finance/p2p-oracle-node/rpc" "github.com/Secured-Finance/dione/consensus"
"github.com/Secured-Finance/p2p-oracle-node/rpcclient" "github.com/Secured-Finance/dione/pb"
"github.com/Secured-Finance/dione/rpc"
"github.com/Secured-Finance/dione/rpcclient"
"github.com/ipfs/go-log" "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p"
crypto "github.com/libp2p/go-libp2p-core/crypto" crypto "github.com/libp2p/go-libp2p-core/crypto"
@ -18,20 +21,20 @@ import (
discovery "github.com/libp2p/go-libp2p-discovery" discovery "github.com/libp2p/go-libp2p-discovery"
dht "github.com/libp2p/go-libp2p-kad-dht" dht "github.com/libp2p/go-libp2p-kad-dht"
peerstore "github.com/libp2p/go-libp2p-peerstore" peerstore "github.com/libp2p/go-libp2p-peerstore"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
) )
type Node struct { type Node struct {
Host host.Host Host host.Host
PubSub *pubsub.PubSub PubSubRouter *pb.PubSubRouter
GlobalCtx context.Context GlobalCtx context.Context
GlobalCtxCancel context.CancelFunc GlobalCtxCancel context.CancelFunc
OracleTopic string OracleTopic string
Config *config.Config Config *config.Config
Logger *log.ZapEventLogger Logger *log.ZapEventLogger
Lotus *rpc.LotusClient Lotus *rpc.LotusClient
Ethereum *rpcclient.EthereumClient Ethereum *rpcclient.EthereumClient
ConsensusManager *consensus.PBFTConsensusManager
} }
func NewNode(configPath string) (*Node, error) { func NewNode(configPath string) (*Node, error) {
@ -40,33 +43,20 @@ func NewNode(configPath string) (*Node, error) {
return nil, err return nil, err
} }
node := &Node{ node := &Node{
OracleTopic: "p2p_oracle", OracleTopic: "dione",
Config: cfg, Config: cfg,
Logger: log.Logger("node"), Logger: log.Logger("node"),
} }
log.SetAllLoggers(log.LevelInfo)
return node, nil return node, nil
} }
func (n *Node) setupNode(ctx context.Context, prvKey crypto.PrivKey) { func (n *Node) setupNode(ctx context.Context, prvKey crypto.PrivKey) {
listenMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%s", n.Config.ListenAddr, n.Config.ListenPort)) n.setupLibp2pHost(context.TODO(), prvKey)
if err != nil { //n.setupEthereumClient()
n.Logger.Fatal("Failed to generate new node multiaddress:", err) //n.setupFilecoinClient()
} n.setupPubsub()
host, err := libp2p.New( n.setupConsensusManager()
ctx,
libp2p.ListenAddrs(listenMultiAddr),
libp2p.Identity(prvKey),
)
if err != nil {
n.Logger.Fatal("Failed to set a new libp2p node:", err)
}
n.Host = host
n.bootstrapLibp2pHost(context.TODO())
n.setupEthereumClient()
n.setupFilecoinClient()
//n.startPubSub(ctx, host)
} }
func (n *Node) setupEthereumClient() error { func (n *Node) setupEthereumClient() error {
@ -85,7 +75,33 @@ func (n *Node) setupFilecoinClient() {
n.Lotus = lotus n.Lotus = lotus
} }
func (n *Node) bootstrapLibp2pHost(ctx context.Context) { func (n *Node) setupPubsub() {
n.PubSubRouter = pb.NewPubSubRouter(n.Host, n.OracleTopic)
// wait for setting up pubsub
time.Sleep(3 * time.Second)
}
func (n *Node) setupConsensusManager() {
n.ConsensusManager = consensus.NewPBFTConsensusManager(n.PubSubRouter, 2)
}
func (n *Node) setupLibp2pHost(ctx context.Context, privateKey crypto.PrivKey) {
listenMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%s", n.Config.ListenAddr, n.Config.ListenPort))
if err != nil {
n.Logger.Fatal("Failed to generate new node multiaddress:", err)
}
host, err := libp2p.New(
ctx,
libp2p.ListenAddrs(listenMultiAddr),
libp2p.Identity(privateKey),
)
if err != nil {
n.Logger.Fatal("Failed to set a new libp2p node:", err)
}
n.Host = host
n.Logger.Info(fmt.Sprintf("[*] Your Multiaddress Is: /ip4/%s/tcp/%v/p2p/%s\n", n.Config.ListenAddr, n.Config.ListenPort, host.ID().Pretty()))
kademliaDHT, err := dht.New(context.Background(), n.Host) kademliaDHT, err := dht.New(context.Background(), n.Host)
if err != nil { if err != nil {
n.Logger.Fatal("Failed to create new DHT instance: ", err) n.Logger.Fatal("Failed to create new DHT instance: ", err)
@ -119,7 +135,7 @@ func (n *Node) bootstrapLibp2pHost(ctx context.Context) {
n.Logger.Info("Successfully announced!") n.Logger.Info("Successfully announced!")
// Randezvous string = service tag // Randezvous string = service tag
// Disvover all peers with our service (all ms devices) // Discover all peers with our service
n.Logger.Info("Searching for other peers...") n.Logger.Info("Searching for other peers...")
peerChan, err := routingDiscovery.FindPeers(context.Background(), n.Config.Rendezvous) peerChan, err := routingDiscovery.FindPeers(context.Background(), n.Config.Rendezvous)
if err != nil { if err != nil {
@ -133,6 +149,12 @@ func (n *Node) bootstrapLibp2pHost(ctx context.Context) {
break MainLoop break MainLoop
case newPeer := <-peerChan: case newPeer := <-peerChan:
{ {
if len(newPeer.Addrs) == 0 {
continue
}
if newPeer.ID.String() == n.Host.ID().String() {
continue
}
n.Logger.Info("Found peer:", newPeer, ", put it to the peerstore") n.Logger.Info("Found peer:", newPeer, ", put it to the peerstore")
n.Host.Peerstore().AddAddr(newPeer.ID, newPeer.Addrs[0], peerstore.PermanentAddrTTL) n.Host.Peerstore().AddAddr(newPeer.ID, newPeer.Addrs[0], peerstore.PermanentAddrTTL)
// Connect to the peer // Connect to the peer
@ -175,7 +197,12 @@ func Start() error {
node.GlobalCtxCancel = ctxCancel node.GlobalCtxCancel = ctxCancel
node.setupNode(ctx, privKey) node.setupNode(ctx, privKey)
return nil for {
select {
case <-ctx.Done():
return nil
}
}
} }
func generatePrivateKey() (crypto.PrivKey, error) { func generatePrivateKey() (crypto.PrivKey, error) {

90
node/node_test.go Normal file
View File

@ -0,0 +1,90 @@
package node
import (
"context"
"fmt"
"testing"
"github.com/Secured-Finance/dione/config"
"github.com/Secured-Finance/dione/consensus"
"github.com/ipfs/go-log"
)
func TestConsensus(t *testing.T) {
var logger = log.Logger("test")
log.SetAllLoggers(log.LevelDebug)
cfg := &config.Config{
ListenPort: "1234",
ListenAddr: "127.0.0.1",
Bootstrap: true,
Rendezvous: "dione",
PubSub: config.PubSubConfig{
ProtocolID: "/test/1.0",
},
}
// setup first node
privKey, err := generatePrivateKey()
if err != nil {
t.Error(err)
}
ctx, ctxCancel := context.WithCancel(context.Background())
node1 := &Node{
OracleTopic: "dione",
Config: cfg,
GlobalCtx: ctx,
GlobalCtxCancel: ctxCancel,
Logger: log.Logger("node"),
}
node1.setupNode(ctx, privKey)
// setup second node
privKey, err = generatePrivateKey()
if err != nil {
t.Error(err)
}
ctx, ctxCancel = context.WithCancel(context.Background())
cfg.ListenPort = "1235"
cfg.Bootstrap = false
cfg.BootstrapNodeMultiaddr = node1.Host.Addrs()[0].String() + fmt.Sprintf("/p2p/%s", node1.Host.ID().String())
node2 := &Node{
OracleTopic: "dione",
Config: cfg,
GlobalCtx: ctx,
GlobalCtxCancel: ctxCancel,
Logger: log.Logger("node"),
}
node2.setupNode(ctx, privKey)
// setup third node
privKey, err = generatePrivateKey()
if err != nil {
t.Error(err)
}
ctx, ctxCancel = context.WithCancel(context.Background())
cfg.ListenPort = "1236"
node3 := &Node{
OracleTopic: "dione",
Config: cfg,
GlobalCtx: ctx,
GlobalCtxCancel: ctxCancel,
Logger: log.Logger("node"),
}
node3.setupNode(ctx, privKey)
logger.Debug(node3.Host.Peerstore().Peers())
node2.ConsensusManager.NewTestConsensus("test")
node1.ConsensusManager.NewTestConsensus("test1")
node3.ConsensusManager.NewTestConsensus("test")
var last consensus.ConsensusState = -1
for {
for _, v := range node1.ConsensusManager.Consensuses {
if v.State != last {
last = v.State
t.Log("new state: " + fmt.Sprint(v.State))
}
}
}
}

5
pb/handler.go Normal file
View File

@ -0,0 +1,5 @@
package pb
import "github.com/Secured-Finance/dione/models"
type Handler func(message *models.Message)

View File

@ -1,45 +1,48 @@
package node package pb
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"github.com/Secured-Finance/p2p-oracle-node/models" "github.com/Secured-Finance/dione/models"
"github.com/ipfs/go-log" "github.com/ipfs/go-log"
host "github.com/libp2p/go-libp2p-core/host"
peer "github.com/libp2p/go-libp2p-core/peer" peer "github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
) )
type PubSubRouter struct { type PubSubRouter struct {
node *Node node host.Host
pubsub *pubsub.PubSub pubsub *pubsub.PubSub
logger *log.ZapEventLogger logger *log.ZapEventLogger
context context.Context context context.Context
contextCancel context.CancelFunc contextCancel context.CancelFunc
handlers map[string][]Handler handlers map[string][]Handler
oracleTopic string
} }
func NewPubSubRouter(n *Node) *PubSubRouter { func NewPubSubRouter(h host.Host, oracleTopic string) *PubSubRouter {
ctx, ctxCancel := context.WithCancel(context.Background()) ctx, ctxCancel := context.WithCancel(context.Background())
psr := &PubSubRouter{ psr := &PubSubRouter{
node: n, node: h,
logger: log.Logger("PubSubRouter"), logger: log.Logger("PubSubRouter"),
context: ctx, context: ctx,
contextCancel: ctxCancel, contextCancel: ctxCancel,
handlers: make(map[string][]Handler),
} }
pb, err := pubsub.NewGossipSub( pb, err := pubsub.NewGossipSub(
context.Background(), context.Background(),
psr.node.Host, pubsub.WithMessageSigning(true), psr.node, pubsub.WithMessageSigning(true),
pubsub.WithStrictSignatureVerification(true), pubsub.WithStrictSignatureVerification(true),
) )
if err != nil { if err != nil {
psr.logger.Fatal("Error occurred when create PubSub", err) psr.logger.Fatal("Error occurred when create PubSub", err)
} }
n.OracleTopic = n.Config.Rendezvous psr.oracleTopic = oracleTopic
subscription, err := pb.Subscribe(n.OracleTopic) subscription, err := pb.Subscribe(oracleTopic)
if err != nil { if err != nil {
psr.logger.Fatal("Error occurred when subscribing to service topic", err) psr.logger.Fatal("Error occurred when subscribing to service topic", err)
} }
@ -72,7 +75,7 @@ func (psr *PubSubRouter) handleMessage(p *pubsub.Message) {
return return
} }
// We can receive our own messages when sending to the topic. So we should drop them. // We can receive our own messages when sending to the topic. So we should drop them.
if senderPeerID == psr.node.Host.ID() { if senderPeerID == psr.node.ID() {
return return
} }
var message models.Message var message models.Message
@ -88,7 +91,7 @@ func (psr *PubSubRouter) handleMessage(p *pubsub.Message) {
return return
} }
for _, v := range handlers { for _, v := range handlers {
go v.HandleMessage(&message) go v(&message)
} }
} }
@ -102,6 +105,15 @@ func (psr *PubSubRouter) Hook(messageType string, handler Handler) {
psr.handlers[messageType] = append(handlers, handler) psr.handlers[messageType] = append(handlers, handler)
} }
func (psr *PubSubRouter) BroadcastToServiceTopic(msg *models.Message) error {
data, err := json.Marshal(msg)
if err != nil {
return err
}
err = psr.pubsub.Publish(psr.oracleTopic, data)
return err
}
func (psr *PubSubRouter) Shutdown() { func (psr *PubSubRouter) Shutdown() {
psr.contextCancel() psr.contextCancel()
} }

View File

@ -4,8 +4,8 @@ import (
"context" "context"
"math/big" "math/big"
"github.com/Secured-Finance/p2p-oracle-node/contracts/aggregator" "github.com/Secured-Finance/dione/contracts/aggregator"
"github.com/Secured-Finance/p2p-oracle-node/contracts/oracleemitter" "github.com/Secured-Finance/dione/contracts/oracleemitter"
"github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"