Do massive refactor - remove unneccessary stuff, refactor Node, rewrite EtheriumClient, implement PubSubRouter

This commit is contained in:
ChronosX88 2020-10-20 22:18:36 +04:00
parent 1b71a23e73
commit a1db429717
Signed by: ChronosXYZ
GPG Key ID: 085A69A82C8C511A
13 changed files with 411 additions and 997 deletions

View File

@ -11,12 +11,21 @@ type Config struct {
BootstrapNodeMultiaddr string `mapstructure:"bootstrap_node_multiaddr"`
Rendezvous string `mapstructure:"rendezvous"`
SessionKey string `mapstructure:"session_key"`
Etherium EtheriumConfig `mapstructure:"eth"`
Etherium EtheriumConfig `mapstructure:"etherium"`
Filecoin FilecoinConfig `mapstructure:"filecoin"`
PubSub PubSubConfig `mapstructure:"pubSub"`
}
type EtheriumConfig struct {
PrivateKey string `mapstructure:"private_key"`
GatewayAddress string `mapstructure:"gateway_address"`
PrivateKey string `mapstructure:"private_key"`
OracleEmitterContractAddress string `mapstructure:"oracle_emitter_contract_address"`
AggregatorContractAddress string `mapstructure:"aggregator_contract_address"`
}
type FilecoinConfig struct {
LotusHost string `mapstructure:"lotusHost"`
LotusToken string `mapstructure:"lotusToken"`
}
type PubSubConfig struct {

View File

@ -1,198 +1 @@
package consensus
import (
"context"
"fmt"
"io/ioutil"
"time"
"github.com/ipfs/go-log"
consensus "github.com/libp2p/go-libp2p-consensus"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
raft "github.com/hashicorp/raft"
libp2praft "github.com/libp2p/go-libp2p-raft"
)
var raftTmpFolder = "raft-consensus"
var raftQuiet = true
type RaftConsensus struct {
Logger *log.ZapEventLogger
Raft *raft.Raft
Consensus *libp2praft.Consensus
State consensus.State
Transport *raft.NetworkTransport
Actor *libp2praft.Actor
}
type RaftState struct {
Value string
}
func NewRaftConsensus() *RaftConsensus {
raftConsensus := &RaftConsensus{
Logger: log.Logger("rendezvous"),
}
return raftConsensus
}
func (raftConsensus *RaftConsensus) NewState(value string) {
raftConsensus.State = &RaftState{value}
}
func (raftConsensus *RaftConsensus) NewConsensus(op consensus.Op) {
if op != nil {
raftConsensus.Consensus = libp2praft.NewOpLog(&RaftState{}, op)
} else {
raftConsensus.Consensus = libp2praft.NewConsensus(&RaftState{})
}
}
func (raftConsensus *RaftConsensus) GetConsensusState(consensus *libp2praft.Consensus) (consensus.State, error) {
var err error
raftConsensus.State, err = raftConsensus.Consensus.GetCurrentState()
if err != nil {
fmt.Println(err)
return nil, err
}
return raftConsensus.State, nil
}
func (raftConsensus *RaftConsensus) UpdateState(value string) error {
raftConsensus.NewState(value)
// CommitState() blocks until the state has been
// agreed upon by everyone
agreedState, err := raftConsensus.Consensus.CommitState(raftConsensus.State)
if err != nil {
raftConsensus.Logger.Warn("Failed to commit new state", err)
return err
}
if agreedState == nil {
fmt.Println("agreedState is nil: commited on a non-leader?")
return err
}
return nil
}
func (raftConsensus *RaftConsensus) Shutdown() {
err := raftConsensus.Raft.Shutdown().Error()
if err != nil {
raftConsensus.Logger.Fatal(err)
}
}
func (raftConsensus *RaftConsensus) WaitForLeader(r *raft.Raft) {
obsCh := make(chan raft.Observation, 1)
observer := raft.NewObserver(obsCh, false, nil)
r.RegisterObserver(observer)
defer r.DeregisterObserver(observer)
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
ticker := time.NewTicker(time.Second / 2)
defer ticker.Stop()
for {
select {
case obs := <-obsCh:
switch obs.Data.(type) {
case raft.RaftState:
if r.Leader() != "" {
return
}
}
case <-ticker.C:
if r.Leader() != "" {
return
}
case <-ctx.Done():
raftConsensus.Logger.Fatal("timed out waiting for Leader")
}
}
}
func (raftConsensus *RaftConsensus) SetupConsensus(h host.Host, pids []peer.ID, op consensus.Op) {
raftConsensus.NewConsensus(op)
// -- Create Raft servers configuration
servers := make([]raft.Server, len(pids))
for i, pid := range pids {
servers[i] = raft.Server{
Suffrage: raft.Voter,
ID: raft.ServerID(pid.Pretty()),
Address: raft.ServerAddress(pid.Pretty()),
}
}
serverConfig := raft.Configuration{
Servers: servers,
}
// -- Create LibP2P transports Raft
transport, err := libp2praft.NewLibp2pTransport(h, 2*time.Second)
if err != nil {
raftConsensus.Logger.Fatal(err)
}
raftConsensus.Transport = transport
// -- Configuration
config := raft.DefaultConfig()
if raftQuiet {
config.LogOutput = ioutil.Discard
config.Logger = nil
}
config.LocalID = raft.ServerID(h.ID().Pretty())
// -- SnapshotStore
snapshots, err := raft.NewFileSnapshotStore(raftTmpFolder, 3, nil)
if err != nil {
raftConsensus.Logger.Fatal(err)
}
// -- Log store and stable store: we use inmem.
logStore := raft.NewInmemStore()
// -- Boostrap everything if necessary
bootstrapped, err := raft.HasExistingState(logStore, logStore, snapshots)
if err != nil {
raftConsensus.Logger.Fatal(err)
}
if !bootstrapped {
raft.BootstrapCluster(config, logStore, logStore, snapshots, transport, serverConfig)
} else {
raftConsensus.Logger.Info("Already initialized!!")
}
raft, err := raft.NewRaft(config, raftConsensus.Consensus.FSM(), logStore, logStore, snapshots, transport)
if err != nil {
raftConsensus.Logger.Fatal(err)
}
raftConsensus.Raft = raft
}
func (raftConsensus *RaftConsensus) StartConsensus(host host.Host, peers []peer.ID) {
raftConsensus.SetupConsensus(host, peers, nil)
// Create the actors using the Raft nodes
raftConsensus.Actor = libp2praft.NewActor(raftConsensus.Raft)
// Set the actors so that we can CommitState() and GetCurrentState()
raftConsensus.Consensus.SetActor(raftConsensus.Actor)
// Provide some time for leader election
time.Sleep(5 * time.Second)
}
func (raft *RaftConsensus) UpdateConsensus(value string) {
if raft.Actor.IsLeader() {
if err := raft.UpdateState(value); err != nil {
raft.Logger.Fatal("Failed to update state", err)
}
} else {
raft.WaitForLeader(raft.Raft)
}
}

View File

@ -1,124 +0,0 @@
package handler
import (
"encoding/json"
"sync"
"github.com/Secured-Finance/p2p-oracle-node/rpcclient"
mapset "github.com/deckarep/golang-set"
"github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)
type Handler struct {
pb *pubsub.PubSub
oracleTopic string
networkTopics mapset.Set
peerID peer.ID
identityMap map[peer.ID]string
Logger *log.ZapEventLogger
PbMutex sync.Mutex
}
// TextMessage is more end-user model of regular text messages
type EventMessage struct {
Topic string `json:"topic"`
Body *rpcclient.OracleEvent `json:"body"`
From peer.ID `json:"from"`
}
func NewHandler(pb *pubsub.PubSub, oracleTopic string, peerID peer.ID, networkTopics mapset.Set) *Handler {
handler := &Handler{
pb: pb,
oracleTopic: oracleTopic,
networkTopics: networkTopics,
peerID: peerID,
identityMap: make(map[peer.ID]string),
Logger: log.Logger("rendezvous"),
}
log.SetAllLoggers(log.LevelInfo)
return handler
}
func (h *Handler) HandleIncomingMessage(topic string, msg pubsub.Message, handleTextMessage func(EventMessage)) {
fromPeerID, err := peer.IDFromBytes(msg.From)
if err != nil {
h.Logger.Warn("Error occurred when reading message from field...")
return
}
message := &BaseMessage{}
if err = json.Unmarshal(msg.Data, message); err != nil {
h.Logger.Warn("Error occurred during unmarshalling the base message data")
return
}
if message.To != "" && message.To != h.peerID {
return // Drop message, because it is not for us
}
switch message.Flag {
// Getting regular message
case FlagGenericMessage:
eventMessage := EventMessage{
Topic: topic,
Body: message.Body,
From: fromPeerID,
}
handleTextMessage(eventMessage)
// Getting topic request, answer topic response
case FlagTopicsRequest:
respond := &GetTopicsRespondMessage{
BaseMessage: BaseMessage{
Body: &rpcclient.OracleEvent{},
Flag: FlagTopicsResponse,
To: fromPeerID,
},
Topics: h.GetTopics(),
}
sendData, err := json.Marshal(respond)
if err != nil {
h.Logger.Warn("Error occurred during marshalling the respond from TopicsRequest")
return
}
go func() {
h.PbMutex.Lock()
if err = h.pb.Publish(h.oracleTopic, sendData); err != nil {
h.Logger.Warn("Failed to send new message to pubsub topic", err)
}
h.PbMutex.Unlock()
}()
// Getting topic respond, adding topics to `networkTopics`
case FlagTopicsResponse:
respond := &GetTopicsRespondMessage{}
if err = json.Unmarshal(msg.Data, respond); err != nil {
h.Logger.Warn("Error occurred during unmarshalling the message data from TopicsResponse")
return
}
for i := 0; i < len(respond.Topics); i++ {
h.networkTopics.Add(respond.Topics[i])
}
// Getting identity request, answer identity response
case FlagIdentityRequest:
h.sendIdentityResponse(h.oracleTopic, fromPeerID)
// Getting identity respond, mapping Multiaddress/MatrixID
case FlagIdentityResponse:
h.identityMap[peer.ID(fromPeerID.String())] = message.From.String()
case FlagGreeting:
h.Logger.Info("Greetings from " + fromPeerID.String() + " in topic " + topic)
h.sendIdentityResponse(topic, fromPeerID)
case FlagGreetingRespond:
h.Logger.Info("Greeting respond from " + fromPeerID.String() + ":" + message.From.String() + " in topic " + topic)
case FlagFarewell:
h.Logger.Info("Greeting respond from " + fromPeerID.String() + ":" + message.From.String() + " in topic " + topic)
case FlagEventMessage:
eventMessage := EventMessage{
Topic: topic,
Body: message.Body,
From: fromPeerID,
}
handleTextMessage(eventMessage)
default:
h.Logger.Info("\nUnknown message type: %#x\n", message.Flag)
}
}

View File

@ -1,120 +0,0 @@
package handler
import (
"encoding/json"
"github.com/Secured-Finance/p2p-oracle-node/rpcclient"
"github.com/libp2p/go-libp2p-core/peer"
)
const (
FlagGenericMessage int = 0x0
FlagTopicsRequest int = 0x1
FlagTopicsResponse int = 0x2
FlagIdentityRequest int = 0x3
FlagIdentityResponse int = 0x4
FlagGreeting int = 0x5
FlagFarewell int = 0x6
FlagGreetingRespond int = 0x7
FlagEventMessage int = 0x8
)
// BaseMessage is the basic message format of our protocol
type BaseMessage struct {
Body *rpcclient.OracleEvent `json:"body"`
To peer.ID `json:"to"`
Flag int `json:"flag"`
From peer.ID `json:"from"`
}
// GetTopicsRespondMessage is the format of the message to answer of request for topics
// Flag: 0x2
type GetTopicsRespondMessage struct {
BaseMessage
Topics []string `json:"topics"`
}
func (h *Handler) sendIdentityResponse(topic string, fromPeerID peer.ID) {
var flag int
if topic == h.oracleTopic {
flag = FlagIdentityResponse
} else {
flag = FlagGreetingRespond
}
respond := &BaseMessage{
Body: &rpcclient.OracleEvent{},
Flag: flag,
From: "",
To: fromPeerID,
}
sendData, err := json.Marshal(respond)
if err != nil {
h.Logger.Warn("Error occurred during marshalling the respond from IdentityRequest")
return
}
go func() {
h.PbMutex.Lock()
if err = h.pb.Publish(topic, sendData); err != nil {
h.Logger.Warn("Failed to send new message to pubsub topic", err)
}
h.PbMutex.Unlock()
}()
}
// Requests MatrixID from specific peer
// TODO: refactor with promise
func (h *Handler) RequestPeerIdentity(peerID peer.ID) {
requestPeersIdentity := &BaseMessage{
Body: &rpcclient.OracleEvent{},
To: peerID,
Flag: FlagIdentityRequest,
From: h.peerID,
}
h.SendMessageToServiceTopic(requestPeersIdentity)
}
// TODO: refactor
func (h *Handler) SendGreetingInTopic(topic string) {
greetingMessage := &BaseMessage{
Body: &rpcclient.OracleEvent{},
To: "",
Flag: FlagGreeting,
From: h.peerID,
}
h.SendMessageToTopic(topic, greetingMessage)
}
// TODO: refactor
func (h *Handler) SendFarewellInTopic(topic string) {
farewellMessage := &BaseMessage{
Body: &rpcclient.OracleEvent{},
To: "",
Flag: FlagFarewell,
From: h.peerID,
}
h.SendMessageToTopic(topic, farewellMessage)
}
// Sends marshaled message to the service topic
func (h *Handler) SendMessageToServiceTopic(message *BaseMessage) {
h.SendMessageToTopic(h.oracleTopic, message)
}
func (h *Handler) SendMessageToTopic(topic string, message *BaseMessage) {
sendData, err := json.Marshal(message)
if err != nil {
h.Logger.Warn("Failed to send message to topic", err)
return
}
go func() {
h.PbMutex.Lock()
if err = h.pb.Publish(topic, sendData); err != nil {
h.Logger.Warn("Failed to send new message to pubsub topic", err)
}
h.PbMutex.Unlock()
}()
}

View File

@ -1,19 +0,0 @@
package handler
import peer "github.com/libp2p/go-libp2p-core/peer"
// Returns copy of handler's identity map ([peer.ID]=>[matrixID])
func (h *Handler) GetIdentityMap() map[peer.ID]string {
return h.identityMap
}
// Get list of peers subscribed on specific topic
func (h *Handler) GetPeers(topic string) []peer.ID {
peers := h.pb.ListPeers(topic)
return peers
}
// Blacklists a peer by its id
func (h *Handler) BlacklistPeer(pid peer.ID) {
h.pb.BlacklistPeer(pid)
}

View File

@ -1,21 +0,0 @@
package handler
import "github.com/Secured-Finance/p2p-oracle-node/rpcclient"
// Get list of topics **this** node is subscribed to
func (h *Handler) GetTopics() []string {
topics := h.pb.GetTopics()
return topics
}
// Requesting topics from **other** peers
func (h *Handler) RequestNetworkTopics() {
requestTopicsMessage := &BaseMessage{
Body: &rpcclient.OracleEvent{},
Flag: FlagTopicsRequest,
To: "",
From: h.peerID,
}
h.SendMessageToServiceTopic(requestTopicsMessage)
}

7
models/message.go Normal file
View File

@ -0,0 +1,7 @@
package models
type Message struct {
Type string `json:"type"`
Payload map[string]interface{} `json:"payload"`
From string `json:"-"`
}

7
node/handler.go Normal file
View File

@ -0,0 +1,7 @@
package node
import "github.com/Secured-Finance/p2p-oracle-node/models"
type Handler interface {
HandleMessage(message *models.Message)
}

View File

@ -5,17 +5,19 @@ import (
"crypto/rand"
"flag"
"fmt"
"sync"
"github.com/Secured-Finance/p2p-oracle-node/config"
consensus "github.com/Secured-Finance/p2p-oracle-node/consensus"
"github.com/Secured-Finance/p2p-oracle-node/handler"
"github.com/Secured-Finance/p2p-oracle-node/rpc"
"github.com/Secured-Finance/p2p-oracle-node/rpcclient"
mapset "github.com/deckarep/golang-set"
"github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p"
crypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
peer "github.com/libp2p/go-libp2p-core/peer"
discovery "github.com/libp2p/go-libp2p-discovery"
dht "github.com/libp2p/go-libp2p-kad-dht"
peerstore "github.com/libp2p/go-libp2p-peerstore"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/multiformats/go-multiaddr"
)
@ -26,11 +28,8 @@ type Node struct {
GlobalCtx context.Context
GlobalCtxCancel context.CancelFunc
OracleTopic string
networkTopics mapset.Set
handler *handler.Handler
Config *config.Config
Logger *log.ZapEventLogger
Consensus *consensus.RaftConsensus
Lotus *rpc.LotusClient
Ethereum *rpcclient.EthereumClient
}
@ -41,20 +40,19 @@ func NewNode(configPath string) (*Node, error) {
return nil, err
}
node := &Node{
OracleTopic: "p2p_oracle",
Config: cfg,
Logger: log.Logger("node"),
networkTopics: mapset.NewSet(),
OracleTopic: "p2p_oracle",
Config: cfg,
Logger: log.Logger("node"),
}
log.SetAllLoggers(log.LevelInfo)
return node, nil
}
func (node *Node) setupNode(ctx context.Context, prvKey crypto.PrivKey) {
listenMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%s", node.Config.ListenAddr, node.Config.ListenPort))
func (n *Node) setupNode(ctx context.Context, prvKey crypto.PrivKey) {
listenMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%s", n.Config.ListenAddr, n.Config.ListenPort))
if err != nil {
node.Logger.Fatal("Failed to generate new node multiaddress:", err)
n.Logger.Fatal("Failed to generate new node multiaddress:", err)
}
host, err := libp2p.New(
ctx,
@ -62,10 +60,90 @@ func (node *Node) setupNode(ctx context.Context, prvKey crypto.PrivKey) {
libp2p.Identity(prvKey),
)
if err != nil {
node.Logger.Fatal("Failed to set a new libp2p node:", err)
n.Logger.Fatal("Failed to set a new libp2p node:", err)
}
node.Host = host
node.startPubSub(ctx, host)
n.Host = host
n.bootstrapLibp2pHost(context.TODO())
n.setupEtheriumClient()
n.setupFilecoinClient()
//n.startPubSub(ctx, host)
}
func (n *Node) setupEtheriumClient() error {
ethereum := rpcclient.NewEthereumClient()
n.Ethereum = ethereum
return ethereum.Initialize(context.Background(),
n.Config.Etherium.GatewayAddress,
n.Config.Etherium.PrivateKey,
n.Config.Etherium.OracleEmitterContractAddress,
n.Config.Etherium.AggregatorContractAddress,
)
}
func (n *Node) setupFilecoinClient() {
lotus := rpc.NewLotusClient(n.Config.Filecoin.LotusHost, n.Config.Filecoin.LotusToken)
n.Lotus = lotus
}
func (n *Node) bootstrapLibp2pHost(ctx context.Context) {
kademliaDHT, err := dht.New(context.Background(), n.Host)
if err != nil {
n.Logger.Fatal("Failed to create new DHT instance: ", err)
}
if err = kademliaDHT.Bootstrap(context.Background()); err != nil {
n.Logger.Fatal(err)
}
if !n.Config.Bootstrap {
var wg sync.WaitGroup
bootstrapMultiaddr, err := multiaddr.NewMultiaddr(n.Config.BootstrapNodeMultiaddr)
if err != nil {
n.Logger.Fatal(err)
}
peerinfo, _ := peer.AddrInfoFromP2pAddr(bootstrapMultiaddr)
wg.Add(1)
go func() {
defer wg.Done()
if err := n.Host.Connect(context.Background(), *peerinfo); err != nil {
n.Logger.Fatal(err)
}
n.Logger.Info("Connection established with bootstrap node:", *peerinfo)
}()
wg.Wait()
}
n.Logger.Info("Announcing ourselves...")
routingDiscovery := discovery.NewRoutingDiscovery(kademliaDHT)
discovery.Advertise(context.Background(), routingDiscovery, n.Config.Rendezvous)
n.Logger.Info("Successfully announced!")
// Randezvous string = service tag
// Disvover all peers with our service (all ms devices)
n.Logger.Info("Searching for other peers...")
peerChan, err := routingDiscovery.FindPeers(context.Background(), n.Config.Rendezvous)
if err != nil {
n.Logger.Fatal("Failed to find new peers, exiting...", err)
}
go func() {
MainLoop:
for {
select {
case <-ctx.Done():
break MainLoop
case newPeer := <-peerChan:
{
n.Logger.Info("Found peer:", newPeer, ", put it to the peerstore")
n.Host.Peerstore().AddAddr(newPeer.ID, newPeer.Addrs[0], peerstore.PermanentAddrTTL)
// Connect to the peer
if err := n.Host.Connect(ctx, newPeer); err != nil {
n.Logger.Warn("Connection failed: ", err)
}
n.Logger.Info("Connected to: ", newPeer)
}
}
}
}()
}
func Start() error {
@ -87,10 +165,7 @@ func Start() error {
log.Logger("node").Panic(err)
}
r := rand.Reader
// Creates a new RSA key pair for this host.
prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r)
privKey, err := generatePrivateKey()
if err != nil {
node.Logger.Fatal(err)
}
@ -99,6 +174,16 @@ func Start() error {
node.GlobalCtx = ctx
node.GlobalCtxCancel = ctxCancel
node.setupNode(ctx, prvKey)
node.setupNode(ctx, privKey)
return nil
}
func generatePrivateKey() (crypto.PrivKey, error) {
r := rand.Reader
// Creates a new RSA key pair for this host.
prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r)
if err != nil {
return nil, err
}
return prvKey, nil
}

View File

@ -1,292 +0,0 @@
package node
import (
"context"
"encoding/json"
"io/ioutil"
"sync"
"time"
consensus "github.com/Secured-Finance/p2p-oracle-node/consensus"
"github.com/Secured-Finance/p2p-oracle-node/handler"
"github.com/Secured-Finance/p2p-oracle-node/rpc"
"github.com/Secured-Finance/p2p-oracle-node/rpcclient"
"github.com/filecoin-project/go-address"
lotusTypes "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/libp2p/go-libp2p-core/host"
peer "github.com/libp2p/go-libp2p-core/peer"
discovery "github.com/libp2p/go-libp2p-discovery"
dht "github.com/libp2p/go-libp2p-kad-dht"
peerstore "github.com/libp2p/go-libp2p-peerstore"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/multiformats/go-multiaddr"
)
var (
LotusHost = ""
LotusJWT = ""
EthWsUrl = "wss://ropsten.infura.io/ws/v3/b9faa807bb814588bfdb3d6e94a37737"
EthUrl = "https://ropsten.infura.io/v3/b9faa807bb814588bfdb3d6e94a37737"
)
type LotusMessage struct {
Version int64
To address.Address
From address.Address
Nonce uint64
Value lotusTypes.BigInt
GasPrice lotusTypes.BigInt
GasLimit int64
Method abi.MethodNum
Params []byte
}
func (node *Node) readSub(subscription *pubsub.Subscription, incomingMessagesChan chan pubsub.Message) {
ctx := node.GlobalCtx
for {
select {
case <-ctx.Done():
return
default:
}
msg, err := subscription.Next(context.Background())
if err != nil {
node.Logger.Warn("Error reading from buffer", err)
return
}
if string(msg.Data) == "" {
return
}
if string(msg.Data) != "\n" {
addr, err := peer.IDFromBytes(msg.From)
if err != nil {
node.Logger.Warn("Error occurred when reading message From field...", err)
return
}
// This checks if sender address of incoming message is ours. It is need because we get our messages when subscribed to the same topic.
if addr == node.Host.ID() {
continue
}
incomingMessagesChan <- *msg
}
}
}
// Subscribes to a topic and then get messages ..
func (node *Node) newTopic(topic string) {
ctx := node.GlobalCtx
subscription, err := node.PubSub.Subscribe(topic)
if err != nil {
node.Logger.Warn("Error occurred when subscribing to topic", err)
return
}
time.Sleep(3 * time.Second)
incomingMessages := make(chan pubsub.Message)
go node.readSub(subscription, incomingMessages)
for {
select {
case <-ctx.Done():
return
case msg := <-incomingMessages:
{
node.handler.HandleIncomingMessage(node.OracleTopic, msg, func(textMessage handler.EventMessage) {
node.Logger.Info("%s \x1b[32m%s\x1b[0m> ", textMessage.From, textMessage.Body)
})
}
}
}
}
// Write messages to subscription (topic)
// NOTE: we don't need to be subscribed to publish something
func (node *Node) writeTopic(topic string) {
ctx := node.GlobalCtx
// stdReader := bufio.NewReader(os.Stdin)
for {
select {
case <-ctx.Done():
return
default:
}
node.Logger.Info("> ")
message := &handler.BaseMessage{
Body: &rpcclient.OracleEvent{},
Flag: handler.FlagGenericMessage,
}
sendData, err := json.Marshal(message)
if err != nil {
node.Logger.Warn("Error occurred when marshalling message object")
continue
}
err = node.PubSub.Publish(topic, sendData)
if err != nil {
node.Logger.Warn("Error occurred when publishing", err)
return
}
}
}
func (node *Node) getNetworkTopics() {
// ctx := node.GlobalCtx
node.handler.RequestNetworkTopics()
}
func (node *Node) startPubSub(ctx context.Context, host host.Host) {
pb, err := pubsub.NewGossipSub(ctx, host)
if err != nil {
node.Logger.Fatal("Error occurred when create PubSub", err)
}
// pb, err := pubsub.NewFloodsubWithProtocols(context.Background(), host, []protocol.ID{protocol.ID(node.Config.ProtocolID)}, pubsub.WithMessageSigning(true), pubsub.WithStrictSignatureVerification(true))
// if err != nil {
// node.Logger.Fatal("Error occurred when create PubSub", err)
// }
// Set global PubSub object
node.PubSub = pb
node.handler = handler.NewHandler(pb, node.OracleTopic, host.ID(), node.networkTopics)
kademliaDHT, err := dht.New(ctx, host)
if err != nil {
node.Logger.Fatal("Failed to set a new DHT:", err)
}
if err = kademliaDHT.Bootstrap(ctx); err != nil {
node.Logger.Fatal(err)
}
if !node.Config.Bootstrap {
var wg sync.WaitGroup
bootstrapMultiaddr, err := multiaddr.NewMultiaddr(node.Config.BootstrapNodeMultiaddr)
if err != nil {
node.Logger.Fatal(err)
}
peerinfo, _ := peer.AddrInfoFromP2pAddr(bootstrapMultiaddr)
wg.Add(1)
go func() {
defer wg.Done()
if err := host.Connect(ctx, *peerinfo); err != nil {
node.Logger.Fatal(err)
} else {
node.Logger.Info("Connection established with bootstrap node:", *peerinfo)
}
}()
wg.Wait()
}
node.Logger.Info("Announcing ourselves...")
routingDiscovery := discovery.NewRoutingDiscovery(kademliaDHT)
discovery.Advertise(ctx, routingDiscovery, node.Config.Rendezvous)
node.Logger.Info("Successfully announced!")
// Randezvous string = service tag
// Disvover all peers with our service (all ms devices)
node.Logger.Info("Searching for other peers...")
peerChan, err := routingDiscovery.FindPeers(ctx, node.Config.Rendezvous)
if err != nil {
node.Logger.Fatal("Failed to find new peers, exiting...", err)
}
// NOTE: here we use Randezvous string as 'topic' by default .. topic != service tag
node.OracleTopic = node.Config.Rendezvous
subscription, err := pb.Subscribe(node.OracleTopic)
if err != nil {
node.Logger.Warn("Error occurred when subscribing to topic", err)
return
}
node.Logger.Info("Waiting for correct set up of PubSub...")
time.Sleep(3 * time.Second)
peers := node.Host.Peerstore().Peers()
consensus := consensus.NewRaftConsensus()
node.Consensus = consensus
node.Consensus.StartConsensus(node.Host, peers)
ethereum := rpcclient.NewEthereumClient()
node.Ethereum = ethereum
ethereum.Connect(ctx, EthUrl, "rpc")
ethereum.Connect(ctx, EthWsUrl, "websocket")
lotus := rpc.NewLotusClient(LotusHost, LotusJWT)
node.Lotus = lotus
incomingEvents := make(chan rpcclient.OracleEvent)
incomingMessages := make(chan pubsub.Message)
go func() {
node.writeTopic(node.OracleTopic)
node.GlobalCtxCancel()
}()
go node.readSub(subscription, incomingMessages)
go ethereum.SubscribeOnOracleEvents(ctx, "0x89d3A6151a9E608c51FF70E0F7f78a109949c2c1", incomingEvents)
go node.getNetworkTopics()
MainLoop:
for {
select {
case <-ctx.Done():
break MainLoop
case msg := <-incomingMessages:
{
node.handler.HandleIncomingMessage(node.OracleTopic, msg, func(textMessage handler.EventMessage) {
node.Logger.Info("%s > \x1b[32m%s\x1b[0m", textMessage.From, textMessage.Body)
node.Logger.Info("> ")
response, err := node.Lotus.GetMessage(textMessage.Body.RequestType)
if err != nil {
node.Logger.Warn("Failed to get transaction data from lotus node")
}
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
if err != nil {
node.Logger.Warn("Failed to read lotus response")
}
var lotusMessage = new(LotusMessage)
if err := json.Unmarshal(body, &lotusMessage); err != nil {
node.Logger.Warn("Failed to unmarshal to get message request")
}
node.Consensus.UpdateConsensus(lotusMessage.Value.String())
})
}
case newPeer := <-peerChan:
{
node.Logger.Info("\nFound peer:", newPeer, ", add address to peerstore")
// Adding peer addresses to local peerstore
host.Peerstore().AddAddr(newPeer.ID, newPeer.Addrs[0], peerstore.PermanentAddrTTL)
// Connect to the peer
if err := host.Connect(ctx, newPeer); err != nil {
node.Logger.Warn("Connection failed:", err)
}
node.Logger.Info("Connected to:", newPeer)
node.Logger.Info("> ")
}
case event := <-incomingEvents:
{
message := &handler.BaseMessage{
Body: &event,
Flag: handler.FlagEventMessage,
From: node.Host.ID(),
To: "",
}
node.handler.SendMessageToServiceTopic(message)
}
}
}
if err := host.Close(); err != nil {
node.Logger.Info("\nClosing host failed:", err)
}
node.Logger.Info("\nBye")
}

107
node/pubsub_router.go Normal file
View File

@ -0,0 +1,107 @@
package node
import (
"context"
"encoding/json"
"github.com/Secured-Finance/p2p-oracle-node/models"
"github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)
type PubSubRouter struct {
node *Node
pubsub *pubsub.PubSub
logger *log.ZapEventLogger
context context.Context
contextCancel context.CancelFunc
handlers map[string][]Handler
}
func NewPubSubRouter(n *Node) *PubSubRouter {
ctx, ctxCancel := context.WithCancel(context.Background())
psr := &PubSubRouter{
node: n,
logger: log.Logger("PubSubRouter"),
context: ctx,
contextCancel: ctxCancel,
}
pb, err := pubsub.NewGossipSub(
context.Background(),
psr.node.Host, pubsub.WithMessageSigning(true),
pubsub.WithStrictSignatureVerification(true),
)
if err != nil {
psr.logger.Fatal("Error occurred when create PubSub", err)
}
n.OracleTopic = n.Config.Rendezvous
subscription, err := pb.Subscribe(n.OracleTopic)
if err != nil {
psr.logger.Fatal("Error occurred when subscribing to service topic", err)
}
psr.pubsub = pb
go func() {
for {
select {
case <-psr.context.Done():
return
default:
{
msg, err := subscription.Next(psr.context)
if err != nil {
psr.logger.Warn("Failed to receive pubsub message: ", err.Error())
}
psr.handleMessage(msg)
}
}
}
}()
return psr
}
func (psr *PubSubRouter) handleMessage(p *pubsub.Message) {
senderPeerID, err := peer.IDFromBytes(p.From)
if err != nil {
psr.logger.Warn("Unable to decode sender peer ID! " + err.Error())
return
}
// We can receive our own messages when sending to the topic. So we should drop them.
if senderPeerID == psr.node.Host.ID() {
return
}
var message models.Message
err = json.Unmarshal(p.Data, &message)
if err != nil {
psr.logger.Warn("Unable to decode message data! " + err.Error())
return
}
message.From = senderPeerID.String()
handlers, ok := psr.handlers[message.Type]
if !ok {
psr.logger.Warn("Dropping message " + message.Type + " because we don't have any handlers!")
return
}
for _, v := range handlers {
go v.HandleMessage(&message)
}
}
func (psr *PubSubRouter) Hook(messageType string, handler Handler) {
handlers, ok := psr.handlers[messageType]
if !ok {
emptyArray := []Handler{}
psr.handlers[messageType] = emptyArray
handlers = emptyArray
}
psr.handlers[messageType] = append(handlers, handler)
}
func (psr *PubSubRouter) Shutdown() {
psr.contextCancel()
}

View File

@ -2,29 +2,24 @@ package rpcclient
import (
"context"
"crypto/ecdsa"
"io/ioutil"
"math/big"
"strings"
"github.com/Secured-Finance/p2p-oracle-node/contracts/aggregator"
"github.com/Secured-Finance/p2p-oracle-node/contracts/oracleemitter"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/event"
"github.com/ipfs/go-log"
)
type EthereumClient struct {
HttpClient *ethclient.Client
WsClient *ethclient.Client
Logger *log.ZapEventLogger
client *ethclient.Client
Logger *log.ZapEventLogger
authTransactor *bind.TransactOpts
oracleEmitter *oracleemitter.SmartcontractsSession
aggregator *aggregator.SmartcontractsSession
}
type OracleEvent struct {
@ -35,13 +30,10 @@ type OracleEvent struct {
}
type Ethereum interface {
Connect(context.Context, string) error
Initialize(ctx context.Context, url, connectionType, privateKey, oracleEmitterContractAddress, aggregatorContractAddress string) error
Balance(context.Context, string) (*big.Int, error)
SubscribeOnSmartContractEvents(context.Context, string)
GenerateAddressFromPrivateKey(string) string
SendTransaction(string, string, int64) string
createKeyStore(string) string
importKeyStore(string, string) string
SubmitRequestAnswer(reqID *big.Int, data string, callbackAddress common.Address, callbackMethodID [4]byte) error
}
func NewEthereumClient() *EthereumClient {
@ -53,203 +45,183 @@ func NewEthereumClient() *EthereumClient {
return ethereumClient
}
func (c *EthereumClient) Connect(ctx context.Context, url string, connectionType string) {
func (c *EthereumClient) Initialize(ctx context.Context, url, privateKey, oracleEmitterContractAddress, aggregatorContractAddress string) error {
client, err := ethclient.Dial(url)
if err != nil {
c.Logger.Fatal(err)
return err
}
if connectionType == "websocket" {
c.WsClient = client
} else {
c.HttpClient = client
c.client = client
ecdsaKey, err := crypto.HexToECDSA(privateKey)
if err != nil {
return err
}
authTransactor := bind.NewKeyedTransactor(ecdsaKey)
c.authTransactor = authTransactor
oracleEmitter, err := oracleemitter.NewSmartcontracts(common.HexToAddress(oracleEmitterContractAddress), client)
if err != nil {
return err
}
aggregatorPlainSC, err := aggregator.NewSmartcontracts(common.HexToAddress(aggregatorContractAddress), client)
if err != nil {
return err
}
c.oracleEmitter = &oracleemitter.SmartcontractsSession{
Contract: oracleEmitter,
CallOpts: bind.CallOpts{
Pending: true,
From: authTransactor.From,
Context: context.Background(),
},
TransactOpts: bind.TransactOpts{
From: authTransactor.From,
Signer: authTransactor.Signer,
GasLimit: 0, // 0 automatically estimates gas limit
GasPrice: nil, // nil automatically suggests gas price
Context: context.Background(),
},
}
c.aggregator = &aggregator.SmartcontractsSession{
Contract: aggregatorPlainSC,
CallOpts: bind.CallOpts{
Pending: true,
From: authTransactor.From,
Context: context.Background(),
},
TransactOpts: bind.TransactOpts{
From: authTransactor.From,
Signer: authTransactor.Signer,
GasLimit: 0, // 0 automatically estimates gas limit
GasPrice: nil, // nil automatically suggests gas price
Context: context.Background(),
},
}
return nil
}
// Balance returns the balance of the given ethereum address.
func (c *EthereumClient) Balance(ctx context.Context, address string) (*big.Int, error) {
ethereumAddress := common.HexToAddress(address)
value, err := c.HttpClient.BalanceAt(ctx, ethereumAddress, nil)
// // Balance returns the balance of the given ethereum address.
// func (c *EthereumClient) Balance(ctx context.Context, address string) (*big.Int, error) {
// ethereumAddress := common.HexToAddress(address)
// value, err := c.HttpClient.BalanceAt(ctx, ethereumAddress, nil)
// if err != nil {
// return nil, err
// }
// return value, nil
// }
// func (c *EthereumClient) SendTransaction(ctx context.Context, private_key, to string, amount int64) string {
// privateKey, err := crypto.HexToECDSA(private_key)
// if err != nil {
// c.Logger.Fatal("Failed to parse private key", err)
// }
// publicKey := privateKey.Public()
// publicKeyECDSA, ok := publicKey.(*ecdsa.PublicKey)
// if !ok {
// c.Logger.Fatal("Cannot assert type: publicKey is not of type *ecdsa.PublicKey", err)
// }
// fromAddress := crypto.PubkeyToAddress(*publicKeyECDSA)
// nonce, err := c.HttpClient.PendingNonceAt(ctx, fromAddress)
// if err != nil {
// c.Logger.Fatal("Failed to generate wallet nonce value", err)
// }
// value := big.NewInt(amount)
// gasLimit := uint64(21000) // in units
// gasPrice, err := c.HttpClient.SuggestGasPrice(ctx)
// if err != nil {
// c.Logger.Fatal("Failed to suggest new gas price", err)
// }
// toAddress := common.HexToAddress(to)
// var data []byte
// tx := types.NewTransaction(nonce, toAddress, value, gasLimit, gasPrice, data)
// chainID, err := c.HttpClient.NetworkID(ctx)
// if err != nil {
// c.Logger.Fatal("Failed to get network ID", err)
// }
// signedTx, err := types.SignTx(tx, types.NewEIP155Signer(chainID), privateKey)
// if err != nil {
// c.Logger.Fatal("Failed to sign transaction", err)
// }
// err = c.HttpClient.SendTransaction(ctx, signedTx)
// if err != nil {
// c.Logger.Fatal("Failed to send signed transaction", err)
// }
// TxHash := signedTx.Hash().Hex()
// c.Logger.Info("Transaction sent: %s", TxHash)
// return TxHash
// }
// func (c *EthereumClient) GenerateAddressFromPrivateKey(private_key string) string {
// privateKey, err := crypto.HexToECDSA(private_key)
// if err != nil {
// c.Logger.Fatal("Failed to generate private key", err)
// }
// publicKey := privateKey.Public()
// publicKeyECDSA, ok := publicKey.(*ecdsa.PublicKey)
// if !ok {
// c.Logger.Fatal("cannot assert type: publicKey is not of type *ecdsa.PublicKey")
// }
// publicKeyBytes := crypto.FromECDSAPub(publicKeyECDSA)
// c.Logger.Info(hexutil.Encode(publicKeyBytes)[4:])
// address := crypto.PubkeyToAddress(*publicKeyECDSA).Hex()
// return address
// }
func (c *EthereumClient) SubscribeOnOracleEvents(incomingEventsChan chan *oracleemitter.SmartcontractsNewOracleRequest) (event.Subscription, error) {
requestsFilter := c.oracleEmitter.Contract.SmartcontractsFilterer
subscription, err := requestsFilter.WatchNewOracleRequest(&bind.WatchOpts{
Start: nil, //last block
Context: nil,
}, incomingEventsChan)
if err != nil {
return nil, err
}
return value, nil
return subscription, err
}
func (c *EthereumClient) SendTransaction(ctx context.Context, private_key, to string, amount int64) string {
privateKey, err := crypto.HexToECDSA(private_key)
func (c *EthereumClient) SubmitRequestAnswer(reqID *big.Int, data string, callbackAddress common.Address, callbackMethodID [4]byte) error {
// privateKey, err := crypto.HexToECDSA(private_key)
// if err != nil {
// c.Logger.Fatal("Failed to generate private key", err)
// }
// publicKey := privateKey.Public()
// publicKeyECDSA, ok := publicKey.(*ecdsa.PublicKey)
// if !ok {
// c.Logger.Fatal("cannot assert type: publicKey is not of type *ecdsa.PublicKey")
// }
// publicKeyBytes := crypto.FromECDSAPub(publicKeyECDSA)
// c.Logger.Info(hexutil.Encode(publicKeyBytes)[4:])
// fromAddress := crypto.PubkeyToAddress(*publicKeyECDSA)
// nonce, err := c.HttpClient.PendingNonceAt(ctx, fromAddress)
// if err != nil {
// c.Logger.Fatal(err)
// }
// gasPrice, err := c.HttpClient.SuggestGasPrice(ctx)
// if err != nil {
// c.Logger.Fatal(err)
// }
_, err := c.aggregator.CollectData(reqID, data, callbackAddress, callbackMethodID)
if err != nil {
c.Logger.Fatal("Failed to parse private key", err)
return err
}
publicKey := privateKey.Public()
publicKeyECDSA, ok := publicKey.(*ecdsa.PublicKey)
if !ok {
c.Logger.Fatal("Cannot assert type: publicKey is not of type *ecdsa.PublicKey", err)
}
fromAddress := crypto.PubkeyToAddress(*publicKeyECDSA)
nonce, err := c.HttpClient.PendingNonceAt(ctx, fromAddress)
if err != nil {
c.Logger.Fatal("Failed to generate wallet nonce value", err)
}
value := big.NewInt(amount)
gasLimit := uint64(21000) // in units
gasPrice, err := c.HttpClient.SuggestGasPrice(ctx)
if err != nil {
c.Logger.Fatal("Failed to suggest new gas price", err)
}
toAddress := common.HexToAddress(to)
var data []byte
tx := types.NewTransaction(nonce, toAddress, value, gasLimit, gasPrice, data)
chainID, err := c.HttpClient.NetworkID(ctx)
if err != nil {
c.Logger.Fatal("Failed to get network ID", err)
}
signedTx, err := types.SignTx(tx, types.NewEIP155Signer(chainID), privateKey)
if err != nil {
c.Logger.Fatal("Failed to sign transaction", err)
}
err = c.HttpClient.SendTransaction(ctx, signedTx)
if err != nil {
c.Logger.Fatal("Failed to send signed transaction", err)
}
TxHash := signedTx.Hash().Hex()
c.Logger.Info("Transaction sent: %s", TxHash)
return TxHash
}
func (c *EthereumClient) GenerateAddressFromPrivateKey(private_key string) string {
privateKey, err := crypto.HexToECDSA(private_key)
if err != nil {
c.Logger.Fatal("Failed to generate private key", err)
}
publicKey := privateKey.Public()
publicKeyECDSA, ok := publicKey.(*ecdsa.PublicKey)
if !ok {
c.Logger.Fatal("cannot assert type: publicKey is not of type *ecdsa.PublicKey")
}
publicKeyBytes := crypto.FromECDSAPub(publicKeyECDSA)
c.Logger.Info(hexutil.Encode(publicKeyBytes)[4:])
address := crypto.PubkeyToAddress(*publicKeyECDSA).Hex()
return address
}
func (c *EthereumClient) SubscribeOnOracleEvents(ctx context.Context, address string, incomingEventsChan chan OracleEvent) {
contractAddress := common.HexToAddress(address)
query := ethereum.FilterQuery{
Addresses: []common.Address{contractAddress},
}
contractAbi, err := abi.JSON(strings.NewReader(string(oracleemitter.SmartcontractsABI)))
if err != nil {
c.Logger.Fatal(err)
}
logs := make(chan types.Log)
sub, err := c.WsClient.SubscribeFilterLogs(ctx, query, logs)
if err != nil {
c.Logger.Fatal(err)
}
for {
select {
case err := <-sub.Err():
c.Logger.Fatal(err)
case vLog := <-logs:
var event OracleEvent
err := contractAbi.Unpack(&event, "NewOracleRequest", vLog.Data)
if err != nil {
c.Logger.Fatal(err)
}
c.Logger.Info(event)
incomingEventsChan <- event
}
}
}
func (c *EthereumClient) createKeyStore(password string) string {
ks := keystore.NewKeyStore("./wallets", keystore.StandardScryptN, keystore.StandardScryptP)
account, err := ks.NewAccount(password)
if err != nil {
c.Logger.Fatal("Failed to create new keystore", err)
}
return account.Address.Hex()
}
func (c *EthereumClient) importKeyStore(filePath string, password string) string {
ks := keystore.NewKeyStore("./wallets", keystore.StandardScryptN, keystore.StandardScryptP)
jsonBytes, err := ioutil.ReadFile(filePath)
if err != nil {
c.Logger.Fatal("Failed to read keystore file", err)
}
account, err := ks.Import(jsonBytes, password, password)
if err != nil {
c.Logger.Fatal("Failed to import keystore", err)
}
return account.Address.Hex()
}
func (c *EthereumClient) SendConsensusValues(ctx context.Context, private_key string, smartContractAddress string, reqID *big.Int, data string, callbackAddress common.Address, callbackMethodID [4]byte) string {
privateKey, err := crypto.HexToECDSA(private_key)
if err != nil {
c.Logger.Fatal("Failed to generate private key", err)
}
publicKey := privateKey.Public()
publicKeyECDSA, ok := publicKey.(*ecdsa.PublicKey)
if !ok {
c.Logger.Fatal("cannot assert type: publicKey is not of type *ecdsa.PublicKey")
}
publicKeyBytes := crypto.FromECDSAPub(publicKeyECDSA)
c.Logger.Info(hexutil.Encode(publicKeyBytes)[4:])
fromAddress := crypto.PubkeyToAddress(*publicKeyECDSA)
nonce, err := c.HttpClient.PendingNonceAt(ctx, fromAddress)
if err != nil {
c.Logger.Fatal(err)
}
gasPrice, err := c.HttpClient.SuggestGasPrice(ctx)
if err != nil {
c.Logger.Fatal(err)
}
auth := bind.NewKeyedTransactor(privateKey)
auth.Nonce = big.NewInt(int64(nonce))
auth.Value = big.NewInt(0) // in wei
auth.GasLimit = uint64(300000) // in units
auth.GasPrice = gasPrice
address := common.HexToAddress(smartContractAddress)
contract, err := aggregator.NewSmartcontracts(address, c.HttpClient)
if err != nil {
c.Logger.Fatal(err)
}
tx, err := contract.CollectData(auth, reqID, data, callbackAddress, callbackMethodID)
if err != nil {
c.Logger.Fatal(err)
}
txHash := tx.Hash().Hex()
return txHash
return nil
}