diff --git a/.gitignore b/.gitignore index 1725504..6c43d47 100644 --- a/.gitignore +++ b/.gitignore @@ -16,4 +16,6 @@ # Environment files .env -eth-contracts/node_modules \ No newline at end of file +eth-contracts/node_modules +/dione +/.dione-config.toml \ No newline at end of file diff --git a/README.md b/README.md index 5ee6f1b..9a75b42 100644 --- a/README.md +++ b/README.md @@ -67,7 +67,7 @@ Governance mechanism would take place on [snapshot.page](https://snapshot.page/# [Bach Adylbekov](https://github.com/bahadylbekov) -We welcome every contributions big and small! Take a look at the [community contributing notes](). Please make sure to check the [issues](https://github.com/Secured-Finance/p2p-oracle-node/issues). Search the closed ones before reporting things, and help us with the open ones. +We welcome every contributions big and small! Take a look at the [community contributing notes](). Please make sure to check the [issues](https://github.com/Secured-Finance/dione/issues). Search the closed ones before reporting things, and help us with the open ones. # License diff --git a/cmd/dione/dione.go b/cmd/dione/dione.go index 092087b..4045330 100644 --- a/cmd/dione/dione.go +++ b/cmd/dione/dione.go @@ -1,7 +1,7 @@ package main import ( - "github.com/Secured-Finance/p2p-oracle-node/node" + "github.com/Secured-Finance/dione/node" "github.com/ipfs/go-log" ) diff --git a/config/config.go b/config/config.go index 77fa1a6..1b8c9b9 100644 --- a/config/config.go +++ b/config/config.go @@ -10,7 +10,6 @@ type Config struct { Bootstrap bool `mapstructure:"is_bootstrap"` BootstrapNodeMultiaddr string `mapstructure:"bootstrap_node_multiaddr"` Rendezvous string `mapstructure:"rendezvous"` - SessionKey string `mapstructure:"session_key"` Ethereum EthereumConfig `mapstructure:"ethereum"` Filecoin FilecoinConfig `mapstructure:"filecoin"` PubSub PubSubConfig `mapstructure:"pubSub"` @@ -43,7 +42,6 @@ func NewConfig(configPath string) (*Config, error) { Ethereum: EthereumConfig{ PrivateKey: "", }, - SessionKey: "go", PubSub: PubSubConfig{ ProtocolID: "p2p-oracle", }, diff --git a/consensus/consensus.go b/consensus/consensus.go index 4bbf829..4faeded 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -1 +1,118 @@ package consensus + +import ( + "sync" + + "github.com/Secured-Finance/dione/models" + "github.com/Secured-Finance/dione/pb" + "github.com/google/uuid" + "github.com/ipfs/go-log" +) + +type ConsensusState int + +const ( + consensusPrePrepared ConsensusState = 0x0 + consensusPrepared ConsensusState = 0x1 + consensusCommitted ConsensusState = 0x2 + + testValidData = "test" +) + +type PBFTConsensusManager struct { + psb *pb.PubSubRouter + logger *log.ZapEventLogger + Consensuses map[string]*ConsensusData + maxFaultNodes int +} + +type ConsensusData struct { + preparedCount int + commitCount int + State ConsensusState + mutex sync.Mutex + test bool +} + +func NewPBFTConsensusManager(psb *pb.PubSubRouter, maxFaultNodes int) *PBFTConsensusManager { + pcm := &PBFTConsensusManager{} + pcm.logger = log.Logger("PBFTConsensusManager") + pcm.Consensuses = make(map[string]*ConsensusData) + pcm.psb = psb + pcm.psb.Hook("prepared", pcm.handlePreparedMessage) + pcm.psb.Hook("commit", pcm.handleCommitMessage) + return pcm +} + +func (pcm *PBFTConsensusManager) NewTestConsensus(data string) { + consensusID := uuid.New().String() + cData := &ConsensusData{} + cData.test = true + pcm.Consensuses[consensusID] = cData + + msg := models.Message{} + msg.Type = "prepared" + msg.Payload = make(map[string]interface{}) + msg.Payload["consensusID"] = consensusID + msg.Payload["data"] = data + pcm.psb.BroadcastToServiceTopic(&msg) + + cData.State = consensusPrePrepared + pcm.logger.Debug("started new consensus: " + consensusID) +} + +func (pcm *PBFTConsensusManager) handlePreparedMessage(message *models.Message) { + // TODO add check on view of the message + consensusID := message.Payload["consensusID"].(string) + if _, ok := pcm.Consensuses[consensusID]; !ok { + pcm.logger.Warn("Unknown consensus ID: " + consensusID) + return + } + data := pcm.Consensuses[consensusID] + + // validate payload data + if data.test { + rData := message.Payload["data"].(string) + if rData != testValidData { + pcm.logger.Error("Incorrect data was received! Ignoring this message, because it was sent from fault node!") + return + } + } else { + // TODO + } + + data.mutex.Lock() + data.preparedCount++ + data.mutex.Unlock() + + if data.preparedCount > 2*pcm.maxFaultNodes+1 { + msg := models.Message{} + msg.Type = "commit" + msg.Payload["consensusID"] = consensusID + err := pcm.psb.BroadcastToServiceTopic(&msg) + if err != nil { + pcm.logger.Warn("Unable to send COMMIT message: " + err.Error()) + return + } + data.State = consensusPrepared + } +} + +func (pcm *PBFTConsensusManager) handleCommitMessage(message *models.Message) { + // TODO add check on view of the message + consensusID := message.Payload["consensusID"].(string) + if _, ok := pcm.Consensuses[consensusID]; !ok { + pcm.logger.Warn("Unknown consensus ID: " + consensusID) + return + } + data := pcm.Consensuses[consensusID] + data.mutex.Lock() + data.commitCount++ + data.mutex.Unlock() + + if data.commitCount > 2*pcm.maxFaultNodes+1 { + pcm.logger.Debug("consensus successfully finished") + pcm.logger.Debug("") + data.State = consensusPrepared + } +} diff --git a/go.mod b/go.mod index 92832ac..0cf5148 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/Secured-Finance/p2p-oracle-node +module github.com/Secured-Finance/dione go 1.14 @@ -11,6 +11,7 @@ require ( github.com/filecoin-project/lotus v0.4.2 github.com/filecoin-project/specs-actors v0.8.1-0.20200723200253-a3c01bc62f99 github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08 // indirect + github.com/google/uuid v1.1.1 github.com/hashicorp/raft v1.1.2 github.com/ipfs/go-log v1.0.4 github.com/karalabe/usb v0.0.0-20191104083709-911d15fe12a9 // indirect diff --git a/node/handler.go b/node/handler.go deleted file mode 100644 index 3b3876f..0000000 --- a/node/handler.go +++ /dev/null @@ -1,7 +0,0 @@ -package node - -import "github.com/Secured-Finance/p2p-oracle-node/models" - -type Handler interface { - HandleMessage(message *models.Message) -} diff --git a/node/node.go b/node/node.go index 67bb450..cf4effd 100644 --- a/node/node.go +++ b/node/node.go @@ -6,10 +6,13 @@ import ( "flag" "fmt" "sync" + "time" - "github.com/Secured-Finance/p2p-oracle-node/config" - "github.com/Secured-Finance/p2p-oracle-node/rpc" - "github.com/Secured-Finance/p2p-oracle-node/rpcclient" + "github.com/Secured-Finance/dione/config" + "github.com/Secured-Finance/dione/consensus" + "github.com/Secured-Finance/dione/pb" + "github.com/Secured-Finance/dione/rpc" + "github.com/Secured-Finance/dione/rpcclient" "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p" crypto "github.com/libp2p/go-libp2p-core/crypto" @@ -18,20 +21,20 @@ import ( discovery "github.com/libp2p/go-libp2p-discovery" dht "github.com/libp2p/go-libp2p-kad-dht" peerstore "github.com/libp2p/go-libp2p-peerstore" - pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/multiformats/go-multiaddr" ) type Node struct { - Host host.Host - PubSub *pubsub.PubSub - GlobalCtx context.Context - GlobalCtxCancel context.CancelFunc - OracleTopic string - Config *config.Config - Logger *log.ZapEventLogger - Lotus *rpc.LotusClient - Ethereum *rpcclient.EthereumClient + Host host.Host + PubSubRouter *pb.PubSubRouter + GlobalCtx context.Context + GlobalCtxCancel context.CancelFunc + OracleTopic string + Config *config.Config + Logger *log.ZapEventLogger + Lotus *rpc.LotusClient + Ethereum *rpcclient.EthereumClient + ConsensusManager *consensus.PBFTConsensusManager } func NewNode(configPath string) (*Node, error) { @@ -40,33 +43,20 @@ func NewNode(configPath string) (*Node, error) { return nil, err } node := &Node{ - OracleTopic: "p2p_oracle", + OracleTopic: "dione", Config: cfg, Logger: log.Logger("node"), } - log.SetAllLoggers(log.LevelInfo) return node, nil } func (n *Node) setupNode(ctx context.Context, prvKey crypto.PrivKey) { - listenMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%s", n.Config.ListenAddr, n.Config.ListenPort)) - if err != nil { - n.Logger.Fatal("Failed to generate new node multiaddress:", err) - } - host, err := libp2p.New( - ctx, - libp2p.ListenAddrs(listenMultiAddr), - libp2p.Identity(prvKey), - ) - if err != nil { - n.Logger.Fatal("Failed to set a new libp2p node:", err) - } - n.Host = host - n.bootstrapLibp2pHost(context.TODO()) - n.setupEthereumClient() - n.setupFilecoinClient() - //n.startPubSub(ctx, host) + n.setupLibp2pHost(context.TODO(), prvKey) + //n.setupEthereumClient() + //n.setupFilecoinClient() + n.setupPubsub() + n.setupConsensusManager() } func (n *Node) setupEthereumClient() error { @@ -85,7 +75,33 @@ func (n *Node) setupFilecoinClient() { n.Lotus = lotus } -func (n *Node) bootstrapLibp2pHost(ctx context.Context) { +func (n *Node) setupPubsub() { + n.PubSubRouter = pb.NewPubSubRouter(n.Host, n.OracleTopic) + // wait for setting up pubsub + time.Sleep(3 * time.Second) +} + +func (n *Node) setupConsensusManager() { + n.ConsensusManager = consensus.NewPBFTConsensusManager(n.PubSubRouter, 2) +} + +func (n *Node) setupLibp2pHost(ctx context.Context, privateKey crypto.PrivKey) { + listenMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%s", n.Config.ListenAddr, n.Config.ListenPort)) + if err != nil { + n.Logger.Fatal("Failed to generate new node multiaddress:", err) + } + host, err := libp2p.New( + ctx, + libp2p.ListenAddrs(listenMultiAddr), + libp2p.Identity(privateKey), + ) + if err != nil { + n.Logger.Fatal("Failed to set a new libp2p node:", err) + } + n.Host = host + + n.Logger.Info(fmt.Sprintf("[*] Your Multiaddress Is: /ip4/%s/tcp/%v/p2p/%s\n", n.Config.ListenAddr, n.Config.ListenPort, host.ID().Pretty())) + kademliaDHT, err := dht.New(context.Background(), n.Host) if err != nil { n.Logger.Fatal("Failed to create new DHT instance: ", err) @@ -119,7 +135,7 @@ func (n *Node) bootstrapLibp2pHost(ctx context.Context) { n.Logger.Info("Successfully announced!") // Randezvous string = service tag - // Disvover all peers with our service (all ms devices) + // Discover all peers with our service n.Logger.Info("Searching for other peers...") peerChan, err := routingDiscovery.FindPeers(context.Background(), n.Config.Rendezvous) if err != nil { @@ -133,6 +149,12 @@ func (n *Node) bootstrapLibp2pHost(ctx context.Context) { break MainLoop case newPeer := <-peerChan: { + if len(newPeer.Addrs) == 0 { + continue + } + if newPeer.ID.String() == n.Host.ID().String() { + continue + } n.Logger.Info("Found peer:", newPeer, ", put it to the peerstore") n.Host.Peerstore().AddAddr(newPeer.ID, newPeer.Addrs[0], peerstore.PermanentAddrTTL) // Connect to the peer @@ -175,7 +197,12 @@ func Start() error { node.GlobalCtxCancel = ctxCancel node.setupNode(ctx, privKey) - return nil + for { + select { + case <-ctx.Done(): + return nil + } + } } func generatePrivateKey() (crypto.PrivKey, error) { diff --git a/node/node_test.go b/node/node_test.go new file mode 100644 index 0000000..b7bd7a6 --- /dev/null +++ b/node/node_test.go @@ -0,0 +1,90 @@ +package node + +import ( + "context" + "fmt" + "testing" + + "github.com/Secured-Finance/dione/config" + "github.com/Secured-Finance/dione/consensus" + "github.com/ipfs/go-log" +) + +func TestConsensus(t *testing.T) { + var logger = log.Logger("test") + log.SetAllLoggers(log.LevelDebug) + + cfg := &config.Config{ + ListenPort: "1234", + ListenAddr: "127.0.0.1", + Bootstrap: true, + Rendezvous: "dione", + PubSub: config.PubSubConfig{ + ProtocolID: "/test/1.0", + }, + } + + // setup first node + privKey, err := generatePrivateKey() + if err != nil { + t.Error(err) + } + ctx, ctxCancel := context.WithCancel(context.Background()) + node1 := &Node{ + OracleTopic: "dione", + Config: cfg, + GlobalCtx: ctx, + GlobalCtxCancel: ctxCancel, + Logger: log.Logger("node"), + } + node1.setupNode(ctx, privKey) + + // setup second node + privKey, err = generatePrivateKey() + if err != nil { + t.Error(err) + } + ctx, ctxCancel = context.WithCancel(context.Background()) + cfg.ListenPort = "1235" + cfg.Bootstrap = false + cfg.BootstrapNodeMultiaddr = node1.Host.Addrs()[0].String() + fmt.Sprintf("/p2p/%s", node1.Host.ID().String()) + node2 := &Node{ + OracleTopic: "dione", + Config: cfg, + GlobalCtx: ctx, + GlobalCtxCancel: ctxCancel, + Logger: log.Logger("node"), + } + node2.setupNode(ctx, privKey) + + // setup third node + privKey, err = generatePrivateKey() + if err != nil { + t.Error(err) + } + ctx, ctxCancel = context.WithCancel(context.Background()) + cfg.ListenPort = "1236" + node3 := &Node{ + OracleTopic: "dione", + Config: cfg, + GlobalCtx: ctx, + GlobalCtxCancel: ctxCancel, + Logger: log.Logger("node"), + } + node3.setupNode(ctx, privKey) + + logger.Debug(node3.Host.Peerstore().Peers()) + + node2.ConsensusManager.NewTestConsensus("test") + node1.ConsensusManager.NewTestConsensus("test1") + node3.ConsensusManager.NewTestConsensus("test") + var last consensus.ConsensusState = -1 + for { + for _, v := range node1.ConsensusManager.Consensuses { + if v.State != last { + last = v.State + t.Log("new state: " + fmt.Sprint(v.State)) + } + } + } +} diff --git a/pb/handler.go b/pb/handler.go new file mode 100644 index 0000000..33a7357 --- /dev/null +++ b/pb/handler.go @@ -0,0 +1,5 @@ +package pb + +import "github.com/Secured-Finance/dione/models" + +type Handler func(message *models.Message) diff --git a/node/pubsub_router.go b/pb/pubsub_router.go similarity index 76% rename from node/pubsub_router.go rename to pb/pubsub_router.go index 0f18223..5b5e17f 100644 --- a/node/pubsub_router.go +++ b/pb/pubsub_router.go @@ -1,45 +1,48 @@ -package node +package pb import ( "context" "encoding/json" - "github.com/Secured-Finance/p2p-oracle-node/models" + "github.com/Secured-Finance/dione/models" "github.com/ipfs/go-log" + host "github.com/libp2p/go-libp2p-core/host" peer "github.com/libp2p/go-libp2p-core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" ) type PubSubRouter struct { - node *Node + node host.Host pubsub *pubsub.PubSub logger *log.ZapEventLogger context context.Context contextCancel context.CancelFunc handlers map[string][]Handler + oracleTopic string } -func NewPubSubRouter(n *Node) *PubSubRouter { +func NewPubSubRouter(h host.Host, oracleTopic string) *PubSubRouter { ctx, ctxCancel := context.WithCancel(context.Background()) psr := &PubSubRouter{ - node: n, + node: h, logger: log.Logger("PubSubRouter"), context: ctx, contextCancel: ctxCancel, + handlers: make(map[string][]Handler), } pb, err := pubsub.NewGossipSub( context.Background(), - psr.node.Host, pubsub.WithMessageSigning(true), + psr.node, pubsub.WithMessageSigning(true), pubsub.WithStrictSignatureVerification(true), ) if err != nil { psr.logger.Fatal("Error occurred when create PubSub", err) } - n.OracleTopic = n.Config.Rendezvous - subscription, err := pb.Subscribe(n.OracleTopic) + psr.oracleTopic = oracleTopic + subscription, err := pb.Subscribe(oracleTopic) if err != nil { psr.logger.Fatal("Error occurred when subscribing to service topic", err) } @@ -72,7 +75,7 @@ func (psr *PubSubRouter) handleMessage(p *pubsub.Message) { return } // We can receive our own messages when sending to the topic. So we should drop them. - if senderPeerID == psr.node.Host.ID() { + if senderPeerID == psr.node.ID() { return } var message models.Message @@ -88,7 +91,7 @@ func (psr *PubSubRouter) handleMessage(p *pubsub.Message) { return } for _, v := range handlers { - go v.HandleMessage(&message) + go v(&message) } } @@ -102,6 +105,15 @@ func (psr *PubSubRouter) Hook(messageType string, handler Handler) { psr.handlers[messageType] = append(handlers, handler) } +func (psr *PubSubRouter) BroadcastToServiceTopic(msg *models.Message) error { + data, err := json.Marshal(msg) + if err != nil { + return err + } + err = psr.pubsub.Publish(psr.oracleTopic, data) + return err +} + func (psr *PubSubRouter) Shutdown() { psr.contextCancel() } diff --git a/rpcclient/ethereum.go b/rpcclient/ethereum.go index 5e05599..ccaa1af 100644 --- a/rpcclient/ethereum.go +++ b/rpcclient/ethereum.go @@ -4,8 +4,8 @@ import ( "context" "math/big" - "github.com/Secured-Finance/p2p-oracle-node/contracts/aggregator" - "github.com/Secured-Finance/p2p-oracle-node/contracts/oracleemitter" + "github.com/Secured-Finance/dione/contracts/aggregator" + "github.com/Secured-Finance/dione/contracts/oracleemitter" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto"