add: pubsub oracle topic, message exchange operations, ethereum rpc connection
This commit is contained in:
parent
e476e7f208
commit
cdae566e8c
20
.gitignore
vendored
20
.gitignore
vendored
@ -1 +1,19 @@
|
||||
/node
|
||||
# Binaries for programs and plugins
|
||||
*.exe
|
||||
*.exe~
|
||||
*.dll
|
||||
*.so
|
||||
*.dylib
|
||||
|
||||
# Test binary, built with `go test -c`
|
||||
*.test
|
||||
|
||||
# Output of the go coverage tool, specifically when used with LiteIDE
|
||||
*.out
|
||||
|
||||
# Dependency directories (remove the comment below to include it)
|
||||
vendor/
|
||||
.vscode/
|
||||
|
||||
# Environment files
|
||||
.env
|
12
Dockerfile
Normal file
12
Dockerfile
Normal file
@ -0,0 +1,12 @@
|
||||
FROM golang:1.14.6-alpine3.12
|
||||
RUN mkdir /p2p-oracle-node
|
||||
COPY . /p2p-oracle-node
|
||||
WORKDIR /p2p-oracle-node
|
||||
|
||||
RUN apk add git
|
||||
RUN apk add --update make
|
||||
RUN go mod download
|
||||
|
||||
RUN make build
|
||||
|
||||
CMD ["./main"]
|
8
Makefile
Normal file
8
Makefile
Normal file
@ -0,0 +1,8 @@
|
||||
.PHONY: build
|
||||
build:
|
||||
go build -v main.go
|
||||
|
||||
test:
|
||||
go test -v -race -timeout 30s ./ ...
|
||||
|
||||
.DEFAULT_GOAL := build
|
113
cmd/node/main.go
113
cmd/node/main.go
@ -1,113 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"sync"
|
||||
|
||||
"github.com/google/logger"
|
||||
"github.com/libp2p/go-libp2p"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
crypto "github.com/libp2p/go-libp2p-crypto"
|
||||
discovery "github.com/libp2p/go-libp2p-discovery"
|
||||
dht "github.com/libp2p/go-libp2p-kad-dht"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
func main() {
|
||||
listenPort := flag.Int("port", 0, "Listen port number")
|
||||
listenAddr := flag.String("addr", "", "Listen address")
|
||||
verbose := flag.Bool("verbose", false, "Verbose logs")
|
||||
syslog := flag.Bool("syslog", false, "Log to system logging daemon")
|
||||
bootstrap := flag.Bool("bootstrap", false, "Start up bootstrap node")
|
||||
bootstrapAddress := flag.String("baddr", "", "Address of bootstrap node")
|
||||
rendezvousString := flag.String("rendezvous", "", "DHT rendezvous string")
|
||||
|
||||
flag.Parse()
|
||||
|
||||
defer logger.Init("node", *verbose, *syslog, ioutil.Discard).Close()
|
||||
|
||||
r := rand.Reader
|
||||
|
||||
// Creates a new RSA key pair for this host.
|
||||
prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r)
|
||||
if err != nil {
|
||||
logger.Fatal(err)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
listenMaddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", *listenAddr, *listenPort))
|
||||
if err != nil {
|
||||
logger.Fatal(err)
|
||||
}
|
||||
host, err := libp2p.New(
|
||||
ctx,
|
||||
libp2p.ListenAddrs(listenMaddr),
|
||||
libp2p.Identity(prvKey),
|
||||
)
|
||||
|
||||
kademliaDHT, err := dht.New(ctx, host)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err = kademliaDHT.Bootstrap(ctx); err != nil {
|
||||
logger.Fatal(err)
|
||||
}
|
||||
|
||||
if !*bootstrap {
|
||||
var wg sync.WaitGroup
|
||||
bMaddr, err := multiaddr.NewMultiaddr(*bootstrapAddress)
|
||||
if err != nil {
|
||||
logger.Fatal(err)
|
||||
}
|
||||
peerinfo, _ := peer.AddrInfoFromP2pAddr(bMaddr)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if err := host.Connect(ctx, *peerinfo); err != nil {
|
||||
logger.Fatal(err)
|
||||
} else {
|
||||
logger.Info("Connection established with bootstrap node:", *peerinfo)
|
||||
}
|
||||
}()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
logger.Info("Libp2p node is successfully started!")
|
||||
multiaddress := fmt.Sprintf("/ip4/%s/tcp/%v/p2p/%s", *listenAddr, *listenPort, host.ID().Pretty())
|
||||
logger.Infof("Your multiaddress: %s", multiaddress)
|
||||
|
||||
logger.Info("Announcing ourselves...")
|
||||
routingDiscovery := discovery.NewRoutingDiscovery(kademliaDHT)
|
||||
discovery.Advertise(ctx, routingDiscovery, *rendezvousString)
|
||||
logger.Info("Successfully announced!")
|
||||
|
||||
logger.Info("Searching for other peers...")
|
||||
peerChan, err := routingDiscovery.FindPeers(ctx, *rendezvousString)
|
||||
if err != nil {
|
||||
logger.Fatal(err)
|
||||
}
|
||||
|
||||
for peer := range peerChan {
|
||||
if peer.ID == host.ID() {
|
||||
continue
|
||||
}
|
||||
logger.Info("Found peer:", peer)
|
||||
|
||||
logger.Info("Connecting to:", peer)
|
||||
err := host.Connect(ctx, peer)
|
||||
if err != nil {
|
||||
logger.Error("Connection failed: " + err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Info("Connected to: ", peer)
|
||||
}
|
||||
|
||||
select {}
|
||||
}
|
71
config/config.go
Normal file
71
config/config.go
Normal file
@ -0,0 +1,71 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"log"
|
||||
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
ListenPort string `toml:"listen_port"`
|
||||
ListenAddr string `toml:"listen_addr"`
|
||||
Bootstrap bool `toml:"is_bootstrap"`
|
||||
BootstrapNodeMultiaddr string `toml:"bootstrap_node_multiaddr"`
|
||||
Rendezvous string `toml:"rendezvous"`
|
||||
ProtocolID string `toml:"protocol_id"`
|
||||
SessionKey string `toml:"session_key"`
|
||||
}
|
||||
|
||||
// viperEnvVariable loads config parameters from .env file
|
||||
func viperEnvString(key string, default_value string) string {
|
||||
viper.SetConfigFile(".env")
|
||||
|
||||
err := viper.ReadInConfig()
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("Error while reading config file %s", err)
|
||||
}
|
||||
|
||||
value, ok := viper.Get(key).(string)
|
||||
|
||||
if !ok {
|
||||
return default_value
|
||||
}
|
||||
|
||||
return value
|
||||
}
|
||||
|
||||
func viperEnvBoolean(key string, default_value bool) bool {
|
||||
viper.SetConfigFile(".env")
|
||||
|
||||
err := viper.ReadInConfig()
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("Error while reading config file %s", err)
|
||||
}
|
||||
|
||||
value := viper.GetBool(key)
|
||||
|
||||
return value
|
||||
}
|
||||
|
||||
// NewConfig creates a new config based on default values or provided .env file
|
||||
func NewConfig() *Config {
|
||||
ListenPort := viperEnvString("LISTEN_PORT", ":8000")
|
||||
ListenAddr := viperEnvString("LISTEN_ADDRESS", "debug")
|
||||
Bootstrap := viperEnvBoolean("BOOTSTRAP_NODE", false)
|
||||
BootstrapNodeMultiaddr := viperEnvString("BOOTSTRAP_NODE_MULTIADDRESS", "/ip4/127.0.0.1/tcp/0")
|
||||
Rendezvous := viperEnvString("RENDEZVOUS", "filecoin-p2p-oracle")
|
||||
ProtocolID := viperEnvString("PROTOCOL_ID", "p2p-oracle")
|
||||
SessionKey := viperEnvString("SESSION_KEY", "go")
|
||||
|
||||
return &Config{
|
||||
ListenPort: ListenPort,
|
||||
ListenAddr: ListenAddr,
|
||||
Bootstrap: Bootstrap,
|
||||
BootstrapNodeMultiaddr: BootstrapNodeMultiaddr,
|
||||
Rendezvous: Rendezvous,
|
||||
ProtocolID: ProtocolID,
|
||||
SessionKey: SessionKey,
|
||||
}
|
||||
}
|
11
go.mod
11
go.mod
@ -3,11 +3,18 @@ module github.com/Secured-Finance/p2p-oracle-node
|
||||
go 1.14
|
||||
|
||||
require (
|
||||
github.com/google/logger v1.1.0
|
||||
github.com/deckarep/golang-set v1.7.1
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible
|
||||
github.com/ethereum/go-ethereum v1.9.5
|
||||
github.com/ipfs/go-log v1.0.4
|
||||
github.com/libp2p/go-libp2p v0.10.2
|
||||
github.com/libp2p/go-libp2p-core v0.6.1
|
||||
github.com/libp2p/go-libp2p-crypto v0.1.0
|
||||
github.com/libp2p/go-libp2p-discovery v0.5.0
|
||||
github.com/libp2p/go-libp2p-kad-dht v0.8.3
|
||||
github.com/libp2p/go-libp2p-peer v0.2.0
|
||||
github.com/libp2p/go-libp2p-peerstore v0.2.6
|
||||
github.com/libp2p/go-libp2p-pubsub v0.3.3
|
||||
github.com/multiformats/go-multiaddr v0.2.2
|
||||
github.com/renproject/mercury v0.3.14
|
||||
github.com/spf13/viper v1.7.1
|
||||
)
|
||||
|
114
handler/handler.go
Normal file
114
handler/handler.go
Normal file
@ -0,0 +1,114 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"sync"
|
||||
|
||||
mapset "github.com/deckarep/golang-set"
|
||||
"github.com/ipfs/go-log"
|
||||
peer "github.com/libp2p/go-libp2p-core/peer"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
)
|
||||
|
||||
type Handler struct {
|
||||
pb *pubsub.PubSub
|
||||
oracleTopic string
|
||||
networkTopics mapset.Set
|
||||
peerID peer.ID
|
||||
identityMap map[peer.ID]string
|
||||
Logger *log.ZapEventLogger
|
||||
PbMutex sync.Mutex
|
||||
}
|
||||
|
||||
// TextMessage is more end-user model of regular text messages
|
||||
type TextMessage struct {
|
||||
Topic string `json:"topic"`
|
||||
Body string `json:"body"`
|
||||
From peer.ID `json:"from"`
|
||||
}
|
||||
|
||||
func NewHandler(pb *pubsub.PubSub, oracleTopic string, peerID peer.ID, networkTopics mapset.Set) *Handler {
|
||||
handler := &Handler{
|
||||
pb: pb,
|
||||
oracleTopic: oracleTopic,
|
||||
networkTopics: networkTopics,
|
||||
peerID: peerID,
|
||||
identityMap: make(map[peer.ID]string),
|
||||
Logger: log.Logger("rendezvous"),
|
||||
}
|
||||
return handler
|
||||
}
|
||||
|
||||
func (h *Handler) HandleIncomingMessage(topic string, msg pubsub.Message, handleTextMessage func(TextMessage)) {
|
||||
fromPeerID, err := peer.IDFromBytes(msg.From)
|
||||
if err != nil {
|
||||
h.Logger.Warn("Error occurred when reading message from field...")
|
||||
return
|
||||
}
|
||||
message := &BaseMessage{}
|
||||
if err = json.Unmarshal(msg.Data, message); err != nil {
|
||||
h.Logger.Warn("Error occurred during unmarshalling the base message data")
|
||||
return
|
||||
}
|
||||
if message.To != "" && message.To != h.peerID {
|
||||
return // Drop message, because it is not for us
|
||||
}
|
||||
|
||||
switch message.Flag {
|
||||
// Getting regular message
|
||||
case FlagGenericMessage:
|
||||
textMessage := TextMessage{
|
||||
Topic: topic,
|
||||
Body: message.Body,
|
||||
From: fromPeerID,
|
||||
}
|
||||
handleTextMessage(textMessage)
|
||||
// Getting topic request, answer topic response
|
||||
case FlagTopicsRequest:
|
||||
respond := &GetTopicsRespondMessage{
|
||||
BaseMessage: BaseMessage{
|
||||
Body: "",
|
||||
Flag: FlagTopicsResponse,
|
||||
To: fromPeerID,
|
||||
},
|
||||
Topics: h.GetTopics(),
|
||||
}
|
||||
sendData, err := json.Marshal(respond)
|
||||
if err != nil {
|
||||
h.Logger.Warn("Error occurred during marshalling the respond from TopicsRequest")
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
h.PbMutex.Lock()
|
||||
if err = h.pb.Publish(h.oracleTopic, sendData); err != nil {
|
||||
h.Logger.Warn("Failed to send new message to pubsub topic", err)
|
||||
}
|
||||
h.PbMutex.Unlock()
|
||||
}()
|
||||
// Getting topic respond, adding topics to `networkTopics`
|
||||
case FlagTopicsResponse:
|
||||
respond := &GetTopicsRespondMessage{}
|
||||
if err = json.Unmarshal(msg.Data, respond); err != nil {
|
||||
h.Logger.Warn("Error occurred during unmarshalling the message data from TopicsResponse")
|
||||
return
|
||||
}
|
||||
for i := 0; i < len(respond.Topics); i++ {
|
||||
h.networkTopics.Add(respond.Topics[i])
|
||||
}
|
||||
// Getting identity request, answer identity response
|
||||
case FlagIdentityRequest:
|
||||
h.sendIdentityResponse(h.oracleTopic, fromPeerID)
|
||||
// Getting identity respond, mapping Multiaddress/MatrixID
|
||||
case FlagIdentityResponse:
|
||||
h.identityMap[peer.ID(fromPeerID.String())] = message.From.String()
|
||||
case FlagGreeting:
|
||||
h.Logger.Info("Greetings from " + fromPeerID.String() + " in topic " + topic)
|
||||
h.sendIdentityResponse(topic, fromPeerID)
|
||||
case FlagGreetingRespond:
|
||||
h.Logger.Info("Greeting respond from " + fromPeerID.String() + ":" + message.From.String() + " in topic " + topic)
|
||||
case FlagFarewell:
|
||||
h.Logger.Info("Greeting respond from " + fromPeerID.String() + ":" + message.From.String() + " in topic " + topic)
|
||||
default:
|
||||
h.Logger.Info("\nUnknown message type: %#x\n", message.Flag)
|
||||
}
|
||||
}
|
118
handler/messages.go
Normal file
118
handler/messages.go
Normal file
@ -0,0 +1,118 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
)
|
||||
|
||||
const (
|
||||
FlagGenericMessage int = 0x0
|
||||
FlagTopicsRequest int = 0x1
|
||||
FlagTopicsResponse int = 0x2
|
||||
FlagIdentityRequest int = 0x3
|
||||
FlagIdentityResponse int = 0x4
|
||||
FlagGreeting int = 0x5
|
||||
FlagFarewell int = 0x6
|
||||
FlagGreetingRespond int = 0x7
|
||||
)
|
||||
|
||||
// BaseMessage is the basic message format of our protocol
|
||||
type BaseMessage struct {
|
||||
Body string `json:"body"`
|
||||
To peer.ID `json:"to"`
|
||||
Flag int `json:"flag"`
|
||||
From peer.ID `json:"from"`
|
||||
}
|
||||
|
||||
// GetTopicsRespondMessage is the format of the message to answer of request for topics
|
||||
// Flag: 0x2
|
||||
type GetTopicsRespondMessage struct {
|
||||
BaseMessage
|
||||
Topics []string `json:"topics"`
|
||||
}
|
||||
|
||||
func (h *Handler) sendIdentityResponse(topic string, fromPeerID peer.ID) {
|
||||
var flag int
|
||||
if topic == h.oracleTopic {
|
||||
flag = FlagIdentityResponse
|
||||
} else {
|
||||
flag = FlagGreetingRespond
|
||||
}
|
||||
respond := &BaseMessage{
|
||||
Body: "",
|
||||
Flag: flag,
|
||||
From: "",
|
||||
To: fromPeerID,
|
||||
}
|
||||
sendData, err := json.Marshal(respond)
|
||||
if err != nil {
|
||||
h.Logger.Warn("Error occurred during marshalling the respond from IdentityRequest")
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
h.PbMutex.Lock()
|
||||
if err = h.pb.Publish(topic, sendData); err != nil {
|
||||
h.Logger.Warn("Failed to send new message to pubsub topic", err)
|
||||
}
|
||||
h.PbMutex.Unlock()
|
||||
}()
|
||||
}
|
||||
|
||||
// Requests MatrixID from specific peer
|
||||
// TODO: refactor with promise
|
||||
func (h *Handler) RequestPeerIdentity(peerID peer.ID) {
|
||||
requestPeersIdentity := &BaseMessage{
|
||||
Body: "",
|
||||
To: peerID,
|
||||
Flag: FlagIdentityRequest,
|
||||
From: h.peerID,
|
||||
}
|
||||
|
||||
h.sendMessageToServiceTopic(requestPeersIdentity)
|
||||
}
|
||||
|
||||
// TODO: refactor
|
||||
func (h *Handler) SendGreetingInTopic(topic string) {
|
||||
greetingMessage := &BaseMessage{
|
||||
Body: "",
|
||||
To: "",
|
||||
Flag: FlagGreeting,
|
||||
From: h.peerID,
|
||||
}
|
||||
|
||||
h.sendMessageToTopic(topic, greetingMessage)
|
||||
}
|
||||
|
||||
// TODO: refactor
|
||||
func (h *Handler) SendFarewellInTopic(topic string) {
|
||||
farewellMessage := &BaseMessage{
|
||||
Body: "",
|
||||
To: "",
|
||||
Flag: FlagFarewell,
|
||||
From: h.peerID,
|
||||
}
|
||||
|
||||
h.sendMessageToTopic(topic, farewellMessage)
|
||||
}
|
||||
|
||||
// Sends marshaled message to the service topic
|
||||
func (h *Handler) sendMessageToServiceTopic(message *BaseMessage) {
|
||||
h.sendMessageToTopic(h.oracleTopic, message)
|
||||
}
|
||||
|
||||
func (h *Handler) sendMessageToTopic(topic string, message *BaseMessage) {
|
||||
sendData, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
h.Logger.Warn("Failed to send message to topic", err)
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
h.PbMutex.Lock()
|
||||
if err = h.pb.Publish(topic, sendData); err != nil {
|
||||
h.Logger.Warn("Failed to send new message to pubsub topic", err)
|
||||
}
|
||||
h.PbMutex.Unlock()
|
||||
}()
|
||||
}
|
19
handler/peers.go
Normal file
19
handler/peers.go
Normal file
@ -0,0 +1,19 @@
|
||||
package handler
|
||||
|
||||
import peer "github.com/libp2p/go-libp2p-core/peer"
|
||||
|
||||
// Returns copy of handler's identity map ([peer.ID]=>[matrixID])
|
||||
func (h *Handler) GetIdentityMap() map[peer.ID]string {
|
||||
return h.identityMap
|
||||
}
|
||||
|
||||
// Get list of peers subscribed on specific topic
|
||||
func (h *Handler) GetPeers(topic string) []peer.ID {
|
||||
peers := h.pb.ListPeers(topic)
|
||||
return peers
|
||||
}
|
||||
|
||||
// Blacklists a peer by its id
|
||||
func (h *Handler) BlacklistPeer(pid peer.ID) {
|
||||
h.pb.BlacklistPeer(pid)
|
||||
}
|
19
handler/topics.go
Normal file
19
handler/topics.go
Normal file
@ -0,0 +1,19 @@
|
||||
package handler
|
||||
|
||||
// Get list of topics **this** node is subscribed to
|
||||
func (h *Handler) GetTopics() []string {
|
||||
topics := h.pb.GetTopics()
|
||||
return topics
|
||||
}
|
||||
|
||||
// Requesting topics from **other** peers
|
||||
func (h *Handler) RequestNetworkTopics() {
|
||||
requestTopicsMessage := &BaseMessage{
|
||||
Body: "",
|
||||
Flag: FlagTopicsRequest,
|
||||
To: "",
|
||||
From: h.peerID,
|
||||
}
|
||||
|
||||
h.sendMessageToServiceTopic(requestTopicsMessage)
|
||||
}
|
9
main.go
Normal file
9
main.go
Normal file
@ -0,0 +1,9 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/Secured-Finance/p2p-oracle-node/node"
|
||||
)
|
||||
|
||||
func main() {
|
||||
node.Start()
|
||||
}
|
28
node/flags.go
Normal file
28
node/flags.go
Normal file
@ -0,0 +1,28 @@
|
||||
package node
|
||||
|
||||
import (
|
||||
"flag"
|
||||
|
||||
"github.com/Secured-Finance/p2p-oracle-node/config"
|
||||
)
|
||||
|
||||
func (node *Node) parseFlags() {
|
||||
listenPort := flag.String("port", node.Config.ListenPort, "Listen port number")
|
||||
listenAddr := flag.String("addr", node.Config.ListenAddr, "Listen address")
|
||||
bootstrap := flag.Bool("bootstrap", node.Config.Bootstrap, "Start up bootstrap node")
|
||||
bootstrapAddress := flag.String("baddr", node.Config.BootstrapNodeMultiaddr, "Address of bootstrap node")
|
||||
rendezvousString := flag.String("rendezvous", node.Config.Rendezvous, "DHT rendezvous string")
|
||||
protocolID := flag.String("protocol-id", node.Config.ProtocolID, "PubSub protocol ID")
|
||||
|
||||
flag.Parse()
|
||||
|
||||
new_config := &config.Config{
|
||||
ListenPort: *listenPort,
|
||||
ListenAddr: *listenAddr,
|
||||
Bootstrap: *bootstrap,
|
||||
BootstrapNodeMultiaddr: *bootstrapAddress,
|
||||
Rendezvous: *rendezvousString,
|
||||
ProtocolID: *protocolID,
|
||||
}
|
||||
node.Config = new_config
|
||||
}
|
82
node/node.go
Normal file
82
node/node.go
Normal file
@ -0,0 +1,82 @@
|
||||
package node
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
|
||||
"github.com/Secured-Finance/p2p-oracle-node/config"
|
||||
"github.com/Secured-Finance/p2p-oracle-node/handler"
|
||||
mapset "github.com/deckarep/golang-set"
|
||||
"github.com/ipfs/go-log"
|
||||
"github.com/libp2p/go-libp2p"
|
||||
crypto "github.com/libp2p/go-libp2p-core/crypto"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
type Node struct {
|
||||
Host host.Host
|
||||
PubSub *pubsub.PubSub
|
||||
GlobalCtx context.Context
|
||||
GlobalCtxCancel context.CancelFunc
|
||||
OracleTopic string
|
||||
networkTopics mapset.Set
|
||||
handler *handler.Handler
|
||||
Config *config.Config
|
||||
Logger *log.ZapEventLogger
|
||||
}
|
||||
|
||||
func NewNode() *Node {
|
||||
node := &Node{
|
||||
Config: config.NewConfig(),
|
||||
Logger: log.Logger("rendezvous"),
|
||||
networkTopics: mapset.NewSet(),
|
||||
}
|
||||
|
||||
return node
|
||||
}
|
||||
|
||||
func (node *Node) setupNode(ctx context.Context, prvKey crypto.PrivKey) {
|
||||
listenMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%s", node.Config.ListenAddr, node.Config.ListenPort))
|
||||
if err != nil {
|
||||
node.Logger.Fatal("Failed to generate new node multiaddress:", err)
|
||||
}
|
||||
host, err := libp2p.New(
|
||||
ctx,
|
||||
libp2p.ListenAddrs(listenMultiAddr),
|
||||
libp2p.Identity(prvKey),
|
||||
)
|
||||
if err != nil {
|
||||
node.Logger.Fatal("Failed to set a new libp2p node:", err)
|
||||
}
|
||||
node.Host = host
|
||||
node.startPubSub(ctx, host)
|
||||
}
|
||||
|
||||
func Start() {
|
||||
node := NewNode()
|
||||
log.SetAllLoggers(log.LevelInfo)
|
||||
|
||||
err := log.SetLogLevel("rendezvous", "info")
|
||||
if err != nil {
|
||||
node.Logger.Warn("Failed to set a rendezvous log level:", err)
|
||||
}
|
||||
|
||||
node.parseFlags()
|
||||
|
||||
r := rand.Reader
|
||||
|
||||
// Creates a new RSA key pair for this host.
|
||||
prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r)
|
||||
if err != nil {
|
||||
node.Logger.Fatal(err)
|
||||
}
|
||||
|
||||
ctx, ctxCancel := context.WithCancel(context.Background())
|
||||
node.GlobalCtx = ctx
|
||||
node.GlobalCtxCancel = ctxCancel
|
||||
|
||||
node.setupNode(ctx, prvKey)
|
||||
}
|
236
node/pubsub.go
Normal file
236
node/pubsub.go
Normal file
@ -0,0 +1,236 @@
|
||||
package node
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Secured-Finance/p2p-oracle-node/handler"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
peer "github.com/libp2p/go-libp2p-core/peer"
|
||||
discovery "github.com/libp2p/go-libp2p-discovery"
|
||||
dht "github.com/libp2p/go-libp2p-kad-dht"
|
||||
peerstore "github.com/libp2p/go-libp2p-peerstore"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
func (node *Node) readSub(subscription *pubsub.Subscription, incomingMessagesChan chan pubsub.Message) {
|
||||
ctx := node.GlobalCtx
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
msg, err := subscription.Next(context.Background())
|
||||
if err != nil {
|
||||
node.Logger.Warn("Error reading from buffer", err)
|
||||
return
|
||||
}
|
||||
|
||||
if string(msg.Data) == "" {
|
||||
return
|
||||
}
|
||||
if string(msg.Data) != "\n" {
|
||||
addr, err := peer.IDFromBytes(msg.From)
|
||||
if err != nil {
|
||||
node.Logger.Warn("Error occurred when reading message From field...", err)
|
||||
return
|
||||
}
|
||||
|
||||
// This checks if sender address of incoming message is ours. It is need because we get our messages when subscribed to the same topic.
|
||||
if addr == node.Host.ID() {
|
||||
continue
|
||||
}
|
||||
incomingMessagesChan <- *msg
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribes to a topic and then get messages ..
|
||||
func (node *Node) newTopic(topic string) {
|
||||
ctx := node.GlobalCtx
|
||||
subscription, err := node.PubSub.Subscribe(topic)
|
||||
if err != nil {
|
||||
node.Logger.Warn("Error occurred when subscribing to topic", err)
|
||||
return
|
||||
}
|
||||
time.Sleep(3 * time.Second)
|
||||
incomingMessages := make(chan pubsub.Message)
|
||||
|
||||
go node.readSub(subscription, incomingMessages)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case msg := <-incomingMessages:
|
||||
{
|
||||
node.handler.HandleIncomingMessage(node.OracleTopic, msg, func(textMessage handler.TextMessage) {
|
||||
node.Logger.Info("%s \x1b[32m%s\x1b[0m> ", textMessage.From, textMessage.Body)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Write messages to subscription (topic)
|
||||
// NOTE: we don't need to be subscribed to publish something
|
||||
func (node *Node) writeTopic(topic string) {
|
||||
ctx := node.GlobalCtx
|
||||
stdReader := bufio.NewReader(os.Stdin)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
node.Logger.Info("> ")
|
||||
text, err := stdReader.ReadString('\n')
|
||||
if err != nil {
|
||||
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
|
||||
node.Logger.Warn("Error reading from stdin", err)
|
||||
return
|
||||
}
|
||||
message := &handler.BaseMessage{
|
||||
Body: text,
|
||||
Flag: handler.FlagGenericMessage,
|
||||
}
|
||||
|
||||
sendData, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
node.Logger.Warn("Error occurred when marshalling message object")
|
||||
continue
|
||||
}
|
||||
err = node.PubSub.Publish(topic, sendData)
|
||||
if err != nil {
|
||||
node.Logger.Warn("Error occurred when publishing", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (node *Node) getNetworkTopics() {
|
||||
// ctx := node.GlobalCtx
|
||||
node.handler.RequestNetworkTopics()
|
||||
}
|
||||
|
||||
func (node *Node) startPubSub(ctx context.Context, host host.Host) {
|
||||
pb, err := pubsub.NewGossipSub(ctx, host)
|
||||
if err != nil {
|
||||
node.Logger.Fatal("Error occurred when create PubSub", err)
|
||||
}
|
||||
|
||||
// pb, err := pubsub.NewFloodsubWithProtocols(context.Background(), host, []protocol.ID{protocol.ID(node.Config.ProtocolID)}, pubsub.WithMessageSigning(true), pubsub.WithStrictSignatureVerification(true))
|
||||
// if err != nil {
|
||||
// node.Logger.Fatal("Error occurred when create PubSub", err)
|
||||
// }
|
||||
|
||||
// Set global PubSub object
|
||||
node.PubSub = pb
|
||||
|
||||
node.handler = handler.NewHandler(pb, node.OracleTopic, host.ID(), node.networkTopics)
|
||||
|
||||
kademliaDHT, err := dht.New(ctx, host)
|
||||
if err != nil {
|
||||
node.Logger.Fatal("Failed to set a new DHT:", err)
|
||||
}
|
||||
|
||||
if err = kademliaDHT.Bootstrap(ctx); err != nil {
|
||||
node.Logger.Fatal(err)
|
||||
}
|
||||
|
||||
if !node.Config.Bootstrap {
|
||||
var wg sync.WaitGroup
|
||||
bootstrapMultiaddr, err := multiaddr.NewMultiaddr(node.Config.BootstrapNodeMultiaddr)
|
||||
if err != nil {
|
||||
node.Logger.Fatal(err)
|
||||
}
|
||||
peerinfo, _ := peer.AddrInfoFromP2pAddr(bootstrapMultiaddr)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if err := host.Connect(ctx, *peerinfo); err != nil {
|
||||
node.Logger.Fatal(err)
|
||||
} else {
|
||||
node.Logger.Info("Connection established with bootstrap node:", *peerinfo)
|
||||
}
|
||||
}()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
node.Logger.Info("Announcing ourselves...")
|
||||
routingDiscovery := discovery.NewRoutingDiscovery(kademliaDHT)
|
||||
discovery.Advertise(ctx, routingDiscovery, node.Config.Rendezvous)
|
||||
node.Logger.Info("Successfully announced!")
|
||||
|
||||
// Randezvous string = service tag
|
||||
// Disvover all peers with our service (all ms devices)
|
||||
node.Logger.Info("Searching for other peers...")
|
||||
peerChan, err := routingDiscovery.FindPeers(ctx, node.Config.Rendezvous)
|
||||
if err != nil {
|
||||
node.Logger.Fatal("Failed to find new peers, exiting...", err)
|
||||
}
|
||||
|
||||
// NOTE: here we use Randezvous string as 'topic' by default .. topic != service tag
|
||||
node.OracleTopic = node.Config.Rendezvous
|
||||
subscription, err := pb.Subscribe(node.OracleTopic)
|
||||
if err != nil {
|
||||
node.Logger.Warn("Error occurred when subscribing to topic", err)
|
||||
return
|
||||
}
|
||||
|
||||
node.Logger.Info("Waiting for correct set up of PubSub...")
|
||||
time.Sleep(3 * time.Second)
|
||||
|
||||
incomingMessages := make(chan pubsub.Message)
|
||||
|
||||
go func() {
|
||||
node.writeTopic(node.OracleTopic)
|
||||
node.GlobalCtxCancel()
|
||||
}()
|
||||
go node.readSub(subscription, incomingMessages)
|
||||
go node.getNetworkTopics()
|
||||
|
||||
MainLoop:
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
break MainLoop
|
||||
case msg := <-incomingMessages:
|
||||
{
|
||||
node.handler.HandleIncomingMessage(node.OracleTopic, msg, func(textMessage handler.TextMessage) {
|
||||
node.Logger.Info("%s > \x1b[32m%s\x1b[0m", textMessage.From, textMessage.Body)
|
||||
node.Logger.Info("> ")
|
||||
})
|
||||
}
|
||||
case newPeer := <-peerChan:
|
||||
{
|
||||
node.Logger.Info("\nFound peer:", newPeer, ", add address to peerstore")
|
||||
|
||||
// Adding peer addresses to local peerstore
|
||||
host.Peerstore().AddAddr(newPeer.ID, newPeer.Addrs[0], peerstore.PermanentAddrTTL)
|
||||
// Connect to the peer
|
||||
if err := host.Connect(ctx, newPeer); err != nil {
|
||||
node.Logger.Warn("Connection failed:", err)
|
||||
}
|
||||
node.Logger.Info("Connected to:", newPeer)
|
||||
node.Logger.Info("> ")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := host.Close(); err != nil {
|
||||
node.Logger.Info("\nClosing host failed:", err)
|
||||
}
|
||||
node.Logger.Info("\nBye")
|
||||
}
|
36
rpc/ethereum.go
Normal file
36
rpc/ethereum.go
Normal file
@ -0,0 +1,36 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
type infuraClient struct {
|
||||
url string
|
||||
apiKey string
|
||||
network string
|
||||
}
|
||||
|
||||
// NewInfuraClient returns a new infuraClient.
|
||||
func NewInfuraClient(apiKey string, network string) Client {
|
||||
return &infuraClient{
|
||||
url: fmt.Sprintf("https://%s.infura.io/v3", network),
|
||||
apiKey: apiKey,
|
||||
network: network,
|
||||
}
|
||||
}
|
||||
|
||||
// HandleRequest implements the `Client` interface.
|
||||
func (infura *infuraClient) HandleRequest(r *http.Request, data []byte) (*http.Response, error) {
|
||||
apiKey := infura.apiKey
|
||||
if apiKey == "" {
|
||||
return nil, fmt.Errorf("Can't find any infura API keys")
|
||||
}
|
||||
client := http.Client{}
|
||||
req, err := http.NewRequest("POST", fmt.Sprintf("%s/%s", infura.url, apiKey), bytes.NewBuffer(data))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to construct Infura post request: %v", err)
|
||||
}
|
||||
return client.Do(req)
|
||||
}
|
35
rpc/filecoin.go
Normal file
35
rpc/filecoin.go
Normal file
@ -0,0 +1,35 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/dgrijalva/jwt-go"
|
||||
)
|
||||
|
||||
// client implements the `Client` interface.
|
||||
type lotusClient struct {
|
||||
host string
|
||||
jwt jwt.Token
|
||||
}
|
||||
|
||||
// NewClient returns a new client.
|
||||
func NewLotusClient(host string, token jwt.Token) Client {
|
||||
return &lotusClient{
|
||||
host: host,
|
||||
jwt: token,
|
||||
}
|
||||
}
|
||||
|
||||
// HandleRequest implements the `Client` interface.
|
||||
func (c *lotusClient) HandleRequest(r *http.Request, data []byte) (*http.Response, error) {
|
||||
client := http.Client{}
|
||||
req, err := http.NewRequest("POST", c.host, bytes.NewBuffer(data))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("Authentication", "Bearer "+c.jwt.Raw)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to construct lotus node rpc request %v", err)
|
||||
}
|
||||
return client.Do(req)
|
||||
}
|
7
rpc/rpc.go
Normal file
7
rpc/rpc.go
Normal file
@ -0,0 +1,7 @@
|
||||
package rpc
|
||||
|
||||
import "net/http"
|
||||
|
||||
type Client interface {
|
||||
HandleRequest(r *http.Request, data []byte) (*http.Response, error)
|
||||
}
|
70
rpcclient/ethereum.go
Normal file
70
rpcclient/ethereum.go
Normal file
@ -0,0 +1,70 @@
|
||||
package rpcclient
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/big"
|
||||
|
||||
"github.com/ethereum/go-ethereum"
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/ethclient"
|
||||
)
|
||||
|
||||
type ethereumClient struct {
|
||||
httpClient *ethclient.Client
|
||||
wsClient *ethclient.Client
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
type EthereumClient interface {
|
||||
Connect(context.Context, string) error
|
||||
Balance(context.Context, common.Address) (*big.Int, error)
|
||||
SubscribeOnSmartContractEvents(context.Context, string)
|
||||
}
|
||||
|
||||
func (c *ethereumClient) Connect(ctx context.Context, url string, connectionType string) error {
|
||||
client, err := ethclient.Dial(url)
|
||||
if err != nil {
|
||||
c.logger.Fatal(err)
|
||||
}
|
||||
if connectionType == "websocket" {
|
||||
c.wsClient = client
|
||||
} else {
|
||||
c.httpClient = client
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Balance returns the balance of the given ethereum address.
|
||||
func (c *ethereumClient) Balance(ctx context.Context, address common.Address) (*big.Int, error) {
|
||||
value, err := c.httpClient.BalanceAt(ctx, address, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return value, nil
|
||||
}
|
||||
|
||||
func (c *ethereumClient) SubscribeOnSmartContractEvents(ctx context.Context, address string) {
|
||||
contractAddress := common.HexToAddress(address)
|
||||
query := ethereum.FilterQuery{
|
||||
Addresses: []common.Address{contractAddress},
|
||||
}
|
||||
|
||||
logs := make(chan types.Log)
|
||||
sub, err := c.wsClient.SubscribeFilterLogs(context.Background(), query, logs)
|
||||
if err != nil {
|
||||
c.logger.Fatal(err)
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case err := <-sub.Err():
|
||||
c.logger.Fatal(err)
|
||||
case vLog := <-logs:
|
||||
fmt.Println(vLog) // pointer to event log
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user