Change logger library to logrus

This commit is contained in:
ChronosX88 2020-10-22 00:36:05 +04:00
parent 5f1f11d099
commit 14240186bf
Signed by: ChronosXYZ
GPG Key ID: 085A69A82C8C511A
8 changed files with 115 additions and 797 deletions

View File

@ -2,12 +2,12 @@ package main
import ( import (
"github.com/Secured-Finance/dione/node" "github.com/Secured-Finance/dione/node"
"github.com/ipfs/go-log" "github.com/sirupsen/logrus"
) )
func main() { func main() {
err := node.Start() err := node.Start()
if err != nil { if err != nil {
log.Logger("node").Panic(err) logrus.Panic(err)
} }
} }

View File

@ -6,7 +6,7 @@ import (
"github.com/Secured-Finance/dione/models" "github.com/Secured-Finance/dione/models"
"github.com/Secured-Finance/dione/pb" "github.com/Secured-Finance/dione/pb"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/ipfs/go-log" "github.com/sirupsen/logrus"
) )
type ConsensusState int type ConsensusState int
@ -21,7 +21,6 @@ const (
type PBFTConsensusManager struct { type PBFTConsensusManager struct {
psb *pb.PubSubRouter psb *pb.PubSubRouter
logger *log.ZapEventLogger
Consensuses map[string]*ConsensusData Consensuses map[string]*ConsensusData
maxFaultNodes int maxFaultNodes int
} }
@ -36,7 +35,6 @@ type ConsensusData struct {
func NewPBFTConsensusManager(psb *pb.PubSubRouter, maxFaultNodes int) *PBFTConsensusManager { func NewPBFTConsensusManager(psb *pb.PubSubRouter, maxFaultNodes int) *PBFTConsensusManager {
pcm := &PBFTConsensusManager{} pcm := &PBFTConsensusManager{}
pcm.logger = log.Logger("PBFTConsensusManager")
pcm.Consensuses = make(map[string]*ConsensusData) pcm.Consensuses = make(map[string]*ConsensusData)
pcm.psb = psb pcm.psb = psb
pcm.psb.Hook("prepared", pcm.handlePreparedMessage) pcm.psb.Hook("prepared", pcm.handlePreparedMessage)
@ -58,14 +56,14 @@ func (pcm *PBFTConsensusManager) NewTestConsensus(data string) {
pcm.psb.BroadcastToServiceTopic(&msg) pcm.psb.BroadcastToServiceTopic(&msg)
cData.State = consensusPrePrepared cData.State = consensusPrePrepared
pcm.logger.Debug("started new consensus: " + consensusID) logrus.Debug("started new consensus: " + consensusID)
} }
func (pcm *PBFTConsensusManager) handlePreparedMessage(message *models.Message) { func (pcm *PBFTConsensusManager) handlePreparedMessage(message *models.Message) {
// TODO add check on view of the message // TODO add check on view of the message
consensusID := message.Payload["consensusID"].(string) consensusID := message.Payload["consensusID"].(string)
if _, ok := pcm.Consensuses[consensusID]; !ok { if _, ok := pcm.Consensuses[consensusID]; !ok {
pcm.logger.Warn("Unknown consensus ID: " + consensusID) logrus.Warn("Unknown consensus ID: " + consensusID)
return return
} }
data := pcm.Consensuses[consensusID] data := pcm.Consensuses[consensusID]
@ -74,7 +72,7 @@ func (pcm *PBFTConsensusManager) handlePreparedMessage(message *models.Message)
if data.test { if data.test {
rData := message.Payload["data"].(string) rData := message.Payload["data"].(string)
if rData != testValidData { if rData != testValidData {
pcm.logger.Error("Incorrect data was received! Ignoring this message, because it was sent from fault node!") logrus.Error("Incorrect data was received! Ignoring this message, because it was sent from fault node!")
return return
} }
} else { } else {
@ -91,7 +89,7 @@ func (pcm *PBFTConsensusManager) handlePreparedMessage(message *models.Message)
msg.Payload["consensusID"] = consensusID msg.Payload["consensusID"] = consensusID
err := pcm.psb.BroadcastToServiceTopic(&msg) err := pcm.psb.BroadcastToServiceTopic(&msg)
if err != nil { if err != nil {
pcm.logger.Warn("Unable to send COMMIT message: " + err.Error()) logrus.Warn("Unable to send COMMIT message: " + err.Error())
return return
} }
data.State = consensusPrepared data.State = consensusPrepared
@ -102,7 +100,7 @@ func (pcm *PBFTConsensusManager) handleCommitMessage(message *models.Message) {
// TODO add check on view of the message // TODO add check on view of the message
consensusID := message.Payload["consensusID"].(string) consensusID := message.Payload["consensusID"].(string)
if _, ok := pcm.Consensuses[consensusID]; !ok { if _, ok := pcm.Consensuses[consensusID]; !ok {
pcm.logger.Warn("Unknown consensus ID: " + consensusID) logrus.Warn("Unknown consensus ID: " + consensusID)
return return
} }
data := pcm.Consensuses[consensusID] data := pcm.Consensuses[consensusID]
@ -111,8 +109,7 @@ func (pcm *PBFTConsensusManager) handleCommitMessage(message *models.Message) {
data.mutex.Unlock() data.mutex.Unlock()
if data.commitCount > 2*pcm.maxFaultNodes+1 { if data.commitCount > 2*pcm.maxFaultNodes+1 {
pcm.logger.Debug("consensus successfully finished") logrus.Debug("consensus successfully finished")
pcm.logger.Debug("")
data.State = consensusPrepared data.State = consensusPrepared
} }
} }

45
go.mod
View File

@ -3,38 +3,51 @@ module github.com/Secured-Finance/dione
go 1.14 go 1.14
require ( require (
github.com/deckarep/golang-set v1.7.1 github.com/allegro/bigcache v1.2.1 // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/aristanetworks/goarista v0.0.0-20200224203130-895b4c57c44d // indirect
github.com/edsrzf/mmap-go v1.0.0 // indirect github.com/cespare/cp v1.1.1 // indirect
github.com/deckarep/golang-set v1.7.1 // indirect
github.com/elastic/gosigar v0.10.5 // indirect
github.com/ethereum/go-ethereum v1.9.5 github.com/ethereum/go-ethereum v1.9.5
github.com/filecoin-project/go-address v0.0.2-0.20200504173055-8b6f2fb2b3ef github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 // indirect
github.com/filecoin-project/lotus v0.4.2
github.com/filecoin-project/specs-actors v0.8.1-0.20200723200253-a3c01bc62f99
github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08 // indirect github.com/gballet/go-libpcsclite v0.0.0-20191108122812-4678299bea08 // indirect
github.com/go-kit/kit v0.10.0 // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/protobuf v1.4.2 // indirect
github.com/google/uuid v1.1.1 github.com/google/uuid v1.1.1
github.com/hashicorp/raft v1.1.2 github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f // indirect
github.com/ipfs/go-log v1.0.4 github.com/ipfs/go-log/v2 v2.1.2-0.20200626104915-0016c0b4b3e4 // indirect
github.com/karalabe/usb v0.0.0-20191104083709-911d15fe12a9 // indirect github.com/karalabe/usb v0.0.0-20191104083709-911d15fe12a9 // indirect
github.com/libp2p/go-libp2p v0.10.2 github.com/libp2p/go-libp2p v0.10.2
github.com/libp2p/go-libp2p-consensus v0.0.1
github.com/libp2p/go-libp2p-core v0.6.1 github.com/libp2p/go-libp2p-core v0.6.1
github.com/libp2p/go-libp2p-discovery v0.5.0 github.com/libp2p/go-libp2p-discovery v0.5.0
github.com/libp2p/go-libp2p-kad-dht v0.8.3 github.com/libp2p/go-libp2p-kad-dht v0.8.3
github.com/libp2p/go-libp2p-peer v0.2.0
github.com/libp2p/go-libp2p-peerstore v0.2.6 github.com/libp2p/go-libp2p-peerstore v0.2.6
github.com/libp2p/go-libp2p-pubsub v0.3.3 github.com/libp2p/go-libp2p-pubsub v0.3.3
github.com/libp2p/go-libp2p-raft v0.1.5 github.com/mattn/go-colorable v0.1.4 // indirect
github.com/mattn/go-isatty v0.0.12 // indirect
github.com/multiformats/go-multiaddr v0.2.2 github.com/multiformats/go-multiaddr v0.2.2
github.com/olekukonko/tablewriter v0.0.4 // indirect github.com/olekukonko/tablewriter v0.0.4 // indirect
github.com/pborman/uuid v1.2.0 // indirect
github.com/renproject/hyperdrive v1.2.0
github.com/rjeczalik/notify v0.9.2 // indirect github.com/rjeczalik/notify v0.9.2 // indirect
github.com/rs/cors v1.7.0 // indirect github.com/rs/cors v1.7.0 // indirect
github.com/sirupsen/logrus v1.6.0
github.com/smartystreets/assertions v1.0.1 // indirect
github.com/spf13/viper v1.7.1 github.com/spf13/viper v1.7.1
github.com/status-im/keycard-go v0.0.0-20200402102358-957c09536969 // indirect github.com/status-im/keycard-go v0.0.0-20200402102358-957c09536969 // indirect
github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570 // indirect
github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3 // indirect
github.com/tyler-smith/go-bip39 v1.0.2 // indirect github.com/tyler-smith/go-bip39 v1.0.2 // indirect
github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208 // indirect github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208 // indirect
golang.org/x/crypto v0.0.0-20200604202706-70a84ac30bf9 golang.org/x/crypto v0.0.0-20200604202706-70a84ac30bf9 // indirect
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
golang.org/x/net v0.0.0-20200602114024-627f9648deb9 // indirect
golang.org/x/sys v0.0.0-20200602225109-6fdc65e7d980 // indirect
golang.org/x/tools v0.0.0-20200331025713-a30bf2db82d4 // indirect
google.golang.org/grpc v1.29.1 // indirect
google.golang.org/protobuf v1.24.0 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
gopkg.in/olebedev/go-duktape.v3 v3.0.0-20200619000410-60c24ae608a6 // indirect
gopkg.in/urfave/cli.v1 v1.20.0 // indirect
gopkg.in/yaml.v2 v2.2.8 // indirect
honnef.co/go/tools v0.0.1-2020.1.3 // indirect
) )
replace github.com/libp2p/go-libp2p-raft v0.1.5 => github.com/ItalyPaleAle/go-libp2p-raft v0.1.6-0.20200703060436-0b37aa16095e

758
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -13,7 +13,6 @@ import (
"github.com/Secured-Finance/dione/pb" "github.com/Secured-Finance/dione/pb"
"github.com/Secured-Finance/dione/rpc" "github.com/Secured-Finance/dione/rpc"
"github.com/Secured-Finance/dione/rpcclient" "github.com/Secured-Finance/dione/rpcclient"
"github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p"
crypto "github.com/libp2p/go-libp2p-core/crypto" crypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
@ -22,6 +21,7 @@ import (
dht "github.com/libp2p/go-libp2p-kad-dht" dht "github.com/libp2p/go-libp2p-kad-dht"
peerstore "github.com/libp2p/go-libp2p-peerstore" peerstore "github.com/libp2p/go-libp2p-peerstore"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
"github.com/sirupsen/logrus"
) )
type Node struct { type Node struct {
@ -31,7 +31,6 @@ type Node struct {
GlobalCtxCancel context.CancelFunc GlobalCtxCancel context.CancelFunc
OracleTopic string OracleTopic string
Config *config.Config Config *config.Config
Logger *log.ZapEventLogger
Lotus *rpc.LotusClient Lotus *rpc.LotusClient
Ethereum *rpcclient.EthereumClient Ethereum *rpcclient.EthereumClient
ConsensusManager *consensus.PBFTConsensusManager ConsensusManager *consensus.PBFTConsensusManager
@ -45,7 +44,6 @@ func NewNode(configPath string) (*Node, error) {
node := &Node{ node := &Node{
OracleTopic: "dione", OracleTopic: "dione",
Config: cfg, Config: cfg,
Logger: log.Logger("node"),
} }
return node, nil return node, nil
@ -88,7 +86,7 @@ func (n *Node) setupConsensusManager() {
func (n *Node) setupLibp2pHost(ctx context.Context, privateKey crypto.PrivKey) { func (n *Node) setupLibp2pHost(ctx context.Context, privateKey crypto.PrivKey) {
listenMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%s", n.Config.ListenAddr, n.Config.ListenPort)) listenMultiAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%s", n.Config.ListenAddr, n.Config.ListenPort))
if err != nil { if err != nil {
n.Logger.Fatal("Failed to generate new node multiaddress:", err) logrus.Fatal("Failed to generate new node multiaddress:", err)
} }
host, err := libp2p.New( host, err := libp2p.New(
ctx, ctx,
@ -96,50 +94,50 @@ func (n *Node) setupLibp2pHost(ctx context.Context, privateKey crypto.PrivKey) {
libp2p.Identity(privateKey), libp2p.Identity(privateKey),
) )
if err != nil { if err != nil {
n.Logger.Fatal("Failed to set a new libp2p node:", err) logrus.Fatal("Failed to set a new libp2p node:", err)
} }
n.Host = host n.Host = host
n.Logger.Info(fmt.Sprintf("[*] Your Multiaddress Is: /ip4/%s/tcp/%v/p2p/%s\n", n.Config.ListenAddr, n.Config.ListenPort, host.ID().Pretty())) logrus.Info(fmt.Sprintf("[*] Your Multiaddress Is: /ip4/%s/tcp/%v/p2p/%s\n", n.Config.ListenAddr, n.Config.ListenPort, host.ID().Pretty()))
kademliaDHT, err := dht.New(context.Background(), n.Host) kademliaDHT, err := dht.New(context.Background(), n.Host)
if err != nil { if err != nil {
n.Logger.Fatal("Failed to create new DHT instance: ", err) logrus.Fatal("Failed to create new DHT instance: ", err)
} }
if err = kademliaDHT.Bootstrap(context.Background()); err != nil { if err = kademliaDHT.Bootstrap(context.Background()); err != nil {
n.Logger.Fatal(err) logrus.Fatal(err)
} }
if !n.Config.Bootstrap { if !n.Config.Bootstrap {
var wg sync.WaitGroup var wg sync.WaitGroup
bootstrapMultiaddr, err := multiaddr.NewMultiaddr(n.Config.BootstrapNodeMultiaddr) bootstrapMultiaddr, err := multiaddr.NewMultiaddr(n.Config.BootstrapNodeMultiaddr)
if err != nil { if err != nil {
n.Logger.Fatal(err) logrus.Fatal(err)
} }
peerinfo, _ := peer.AddrInfoFromP2pAddr(bootstrapMultiaddr) peerinfo, _ := peer.AddrInfoFromP2pAddr(bootstrapMultiaddr)
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
if err := n.Host.Connect(context.Background(), *peerinfo); err != nil { if err := n.Host.Connect(context.Background(), *peerinfo); err != nil {
n.Logger.Fatal(err) logrus.Fatal(err)
} }
n.Logger.Info("Connection established with bootstrap node:", *peerinfo) logrus.Info("Connection established with bootstrap node:", *peerinfo)
}() }()
wg.Wait() wg.Wait()
} }
n.Logger.Info("Announcing ourselves...") logrus.Info("Announcing ourselves...")
routingDiscovery := discovery.NewRoutingDiscovery(kademliaDHT) routingDiscovery := discovery.NewRoutingDiscovery(kademliaDHT)
discovery.Advertise(context.Background(), routingDiscovery, n.Config.Rendezvous) discovery.Advertise(context.Background(), routingDiscovery, n.Config.Rendezvous)
n.Logger.Info("Successfully announced!") logrus.Info("Successfully announced!")
// Randezvous string = service tag // Randezvous string = service tag
// Discover all peers with our service // Discover all peers with our service
n.Logger.Info("Searching for other peers...") logrus.Info("Searching for other peers...")
peerChan, err := routingDiscovery.FindPeers(context.Background(), n.Config.Rendezvous) peerChan, err := routingDiscovery.FindPeers(context.Background(), n.Config.Rendezvous)
if err != nil { if err != nil {
n.Logger.Fatal("Failed to find new peers, exiting...", err) logrus.Fatal("Failed to find new peers, exiting...", err)
} }
go func() { go func() {
MainLoop: MainLoop:
@ -155,13 +153,13 @@ func (n *Node) setupLibp2pHost(ctx context.Context, privateKey crypto.PrivKey) {
if newPeer.ID.String() == n.Host.ID().String() { if newPeer.ID.String() == n.Host.ID().String() {
continue continue
} }
n.Logger.Info("Found peer:", newPeer, ", put it to the peerstore") logrus.Info("Found peer:", newPeer, ", put it to the peerstore")
n.Host.Peerstore().AddAddr(newPeer.ID, newPeer.Addrs[0], peerstore.PermanentAddrTTL) n.Host.Peerstore().AddAddr(newPeer.ID, newPeer.Addrs[0], peerstore.PermanentAddrTTL)
// Connect to the peer // Connect to the peer
if err := n.Host.Connect(ctx, newPeer); err != nil { if err := n.Host.Connect(ctx, newPeer); err != nil {
n.Logger.Warn("Connection failed: ", err) logrus.Warn("Connection failed: ", err)
} }
n.Logger.Info("Connected to: ", newPeer) logrus.Info("Connected to: ", newPeer)
} }
} }
} }
@ -179,17 +177,17 @@ func Start() error {
node, err := NewNode(*configPath) node, err := NewNode(*configPath)
if *verbose { if *verbose {
log.SetAllLoggers(log.LevelDebug) logrus.SetLevel(logrus.DebugLevel)
} else { } else {
log.SetAllLoggers(log.LevelInfo) logrus.SetLevel(logrus.InfoLevel)
} }
if err != nil { if err != nil {
log.Logger("node").Panic(err) logrus.Panic(err)
} }
privKey, err := generatePrivateKey() privKey, err := generatePrivateKey()
if err != nil { if err != nil {
node.Logger.Fatal(err) logrus.Fatal(err)
} }
ctx, ctxCancel := context.WithCancel(context.Background()) ctx, ctxCancel := context.WithCancel(context.Background())

View File

@ -7,12 +7,11 @@ import (
"github.com/Secured-Finance/dione/config" "github.com/Secured-Finance/dione/config"
"github.com/Secured-Finance/dione/consensus" "github.com/Secured-Finance/dione/consensus"
"github.com/ipfs/go-log" "github.com/sirupsen/logrus"
) )
func TestConsensus(t *testing.T) { func TestConsensus(t *testing.T) {
var logger = log.Logger("test") logrus.SetLevel(logrus.DebugLevel)
log.SetAllLoggers(log.LevelDebug)
cfg := &config.Config{ cfg := &config.Config{
ListenPort: "1234", ListenPort: "1234",
@ -35,7 +34,6 @@ func TestConsensus(t *testing.T) {
Config: cfg, Config: cfg,
GlobalCtx: ctx, GlobalCtx: ctx,
GlobalCtxCancel: ctxCancel, GlobalCtxCancel: ctxCancel,
Logger: log.Logger("node"),
} }
node1.setupNode(ctx, privKey) node1.setupNode(ctx, privKey)
@ -47,13 +45,13 @@ func TestConsensus(t *testing.T) {
ctx, ctxCancel = context.WithCancel(context.Background()) ctx, ctxCancel = context.WithCancel(context.Background())
cfg.ListenPort = "1235" cfg.ListenPort = "1235"
cfg.Bootstrap = false cfg.Bootstrap = false
cfg.BootstrapNodeMultiaddr = node1.Host.Addrs()[0].String() + fmt.Sprintf("/p2p/%s", node1.Host.ID().String()) //cfg.BootstrapNodeMultiaddr = node1.Host.Addrs()[0].String() + fmt.Sprintf("/p2p/%s", node1.Host.ID().String())
cfg.BootstrapNodeMultiaddr = "/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN"
node2 := &Node{ node2 := &Node{
OracleTopic: "dione", OracleTopic: "dione",
Config: cfg, Config: cfg,
GlobalCtx: ctx, GlobalCtx: ctx,
GlobalCtxCancel: ctxCancel, GlobalCtxCancel: ctxCancel,
Logger: log.Logger("node"),
} }
node2.setupNode(ctx, privKey) node2.setupNode(ctx, privKey)
@ -69,12 +67,9 @@ func TestConsensus(t *testing.T) {
Config: cfg, Config: cfg,
GlobalCtx: ctx, GlobalCtx: ctx,
GlobalCtxCancel: ctxCancel, GlobalCtxCancel: ctxCancel,
Logger: log.Logger("node"),
} }
node3.setupNode(ctx, privKey) node3.setupNode(ctx, privKey)
logger.Debug(node3.Host.Peerstore().Peers())
node2.ConsensusManager.NewTestConsensus("test") node2.ConsensusManager.NewTestConsensus("test")
node1.ConsensusManager.NewTestConsensus("test1") node1.ConsensusManager.NewTestConsensus("test1")
node3.ConsensusManager.NewTestConsensus("test") node3.ConsensusManager.NewTestConsensus("test")

View File

@ -5,16 +5,16 @@ import (
"encoding/json" "encoding/json"
"github.com/Secured-Finance/dione/models" "github.com/Secured-Finance/dione/models"
"github.com/ipfs/go-log"
host "github.com/libp2p/go-libp2p-core/host" host "github.com/libp2p/go-libp2p-core/host"
peer "github.com/libp2p/go-libp2p-core/peer" peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/sirupsen/logrus"
) )
type PubSubRouter struct { type PubSubRouter struct {
node host.Host node host.Host
pubsub *pubsub.PubSub pubsub *pubsub.PubSub
logger *log.ZapEventLogger
context context.Context context context.Context
contextCancel context.CancelFunc contextCancel context.CancelFunc
handlers map[string][]Handler handlers map[string][]Handler
@ -26,25 +26,24 @@ func NewPubSubRouter(h host.Host, oracleTopic string) *PubSubRouter {
psr := &PubSubRouter{ psr := &PubSubRouter{
node: h, node: h,
logger: log.Logger("PubSubRouter"),
context: ctx, context: ctx,
contextCancel: ctxCancel, contextCancel: ctxCancel,
handlers: make(map[string][]Handler), handlers: make(map[string][]Handler),
} }
pb, err := pubsub.NewGossipSub( pb, err := pubsub.NewFloodsubWithProtocols(
context.Background(), context.TODO(),
psr.node, pubsub.WithMessageSigning(true), psr.node, []protocol.ID{"/dione/1.0.0"}, //pubsub.WithMessageSigning(true),
pubsub.WithStrictSignatureVerification(true), //pubsub.WithStrictSignatureVerification(true),
) )
if err != nil { if err != nil {
psr.logger.Fatal("Error occurred when create PubSub", err) logrus.Fatal("Error occurred when create PubSub", err)
} }
psr.oracleTopic = oracleTopic psr.oracleTopic = oracleTopic
subscription, err := pb.Subscribe(oracleTopic) subscription, err := pb.Subscribe(oracleTopic)
if err != nil { if err != nil {
psr.logger.Fatal("Error occurred when subscribing to service topic", err) logrus.Fatal("Error occurred when subscribing to service topic", err)
} }
psr.pubsub = pb psr.pubsub = pb
@ -57,7 +56,7 @@ func NewPubSubRouter(h host.Host, oracleTopic string) *PubSubRouter {
{ {
msg, err := subscription.Next(psr.context) msg, err := subscription.Next(psr.context)
if err != nil { if err != nil {
psr.logger.Warn("Failed to receive pubsub message: ", err.Error()) logrus.Warn("Failed to receive pubsub message: ", err.Error())
} }
psr.handleMessage(msg) psr.handleMessage(msg)
} }
@ -71,23 +70,24 @@ func NewPubSubRouter(h host.Host, oracleTopic string) *PubSubRouter {
func (psr *PubSubRouter) handleMessage(p *pubsub.Message) { func (psr *PubSubRouter) handleMessage(p *pubsub.Message) {
senderPeerID, err := peer.IDFromBytes(p.From) senderPeerID, err := peer.IDFromBytes(p.From)
if err != nil { if err != nil {
psr.logger.Warn("Unable to decode sender peer ID! " + err.Error()) logrus.Warn("Unable to decode sender peer ID! " + err.Error())
return return
} }
// We can receive our own messages when sending to the topic. So we should drop them. // We can receive our own messages when sending to the topic. So we should drop them.
if senderPeerID == psr.node.ID() { if senderPeerID == psr.node.ID() {
logrus.Debug("Drop message because it came from the current node - a bug (or feature) in the pubsub system")
return return
} }
var message models.Message var message models.Message
err = json.Unmarshal(p.Data, &message) err = json.Unmarshal(p.Data, &message)
if err != nil { if err != nil {
psr.logger.Warn("Unable to decode message data! " + err.Error()) logrus.Warn("Unable to decode message data! " + err.Error())
return return
} }
message.From = senderPeerID.String() message.From = senderPeerID.String()
handlers, ok := psr.handlers[message.Type] handlers, ok := psr.handlers[message.Type]
if !ok { if !ok {
psr.logger.Warn("Dropping message " + message.Type + " because we don't have any handlers!") logrus.Warn("Dropping message " + message.Type + " because we don't have any handlers!")
return return
} }
for _, v := range handlers { for _, v := range handlers {

View File

@ -11,12 +11,10 @@ import (
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ipfs/go-log"
) )
type EthereumClient struct { type EthereumClient struct {
client *ethclient.Client client *ethclient.Client
Logger *log.ZapEventLogger
authTransactor *bind.TransactOpts authTransactor *bind.TransactOpts
oracleEmitter *oracleemitter.SmartcontractsSession oracleEmitter *oracleemitter.SmartcontractsSession
aggregator *aggregator.SmartcontractsSession aggregator *aggregator.SmartcontractsSession
@ -37,10 +35,7 @@ type Ethereum interface {
} }
func NewEthereumClient() *EthereumClient { func NewEthereumClient() *EthereumClient {
ethereumClient := &EthereumClient{ ethereumClient := &EthereumClient{}
Logger: log.Logger("rendezvous"),
}
log.SetAllLoggers(log.LevelInfo)
return ethereumClient return ethereumClient
} }