diff --git a/.gitignore b/.gitignore index 3edbbbe..8e77007 100644 --- a/.gitignore +++ b/.gitignore @@ -20,4 +20,8 @@ eth-contracts/node_modules /dione /.dione-config.toml +/.dione-config-2.toml +/.dione-config-3.toml +/.dione-config-4.toml +.bootstrap_privkey eth-contracts/build \ No newline at end of file diff --git a/config/config.go b/config/config.go index d2792cc..ddb7c3a 100644 --- a/config/config.go +++ b/config/config.go @@ -7,15 +7,16 @@ import ( ) type Config struct { - ListenPort int `mapstructure:"listen_port"` - ListenAddr string `mapstructure:"listen_addr"` - BootstrapNodes []string `mapstructure:"bootstrap_node_multiaddr"` - Rendezvous string `mapstructure:"rendezvous"` - Ethereum EthereumConfig `mapstructure:"ethereum"` - Filecoin FilecoinConfig `mapstructure:"filecoin"` - PubSub PubSubConfig `mapstructure:"pubSub"` - Store StoreConfig `mapstructure:"store"` - ConsensusMaxFaultNodes int `mapstructure:"consensus_max_fault_nodes"` + ListenPort int `mapstructure:"listen_port"` + ListenAddr string `mapstructure:"listen_addr"` + IsBootstrap bool `mapstructure:"is_bootstrap"` + BootstrapNodes []string `mapstructure:"bootstrap_node_multiaddr"` + Rendezvous string `mapstructure:"rendezvous"` + Ethereum EthereumConfig `mapstructure:"ethereum"` + Filecoin FilecoinConfig `mapstructure:"filecoin"` + PubSub PubSubConfig `mapstructure:"pubSub"` + Store StoreConfig `mapstructure:"store"` + ConsensusMinApprovals int `mapstructure:"consensus_min_approvals"` } type EthereumConfig struct { @@ -23,6 +24,7 @@ type EthereumConfig struct { PrivateKey string `mapstructure:"private_key"` OracleEmitterContractAddress string `mapstructure:"oracle_emitter_contract_address"` AggregatorContractAddress string `mapstructure:"aggregator_contract_address"` + DioneStakingContractAddress string `mapstructure:"dione_staking_address"` } type FilecoinConfig struct { diff --git a/consensus/commit_pool.go b/consensus/commit_pool.go new file mode 100644 index 0000000..f5ce783 --- /dev/null +++ b/consensus/commit_pool.go @@ -0,0 +1,71 @@ +package consensus + +import ( + "github.com/Secured-Finance/dione/models" + "github.com/Secured-Finance/dione/sigs" + "github.com/Secured-Finance/dione/types" +) + +type CommitPool struct { + commitMsgs map[string][]*models.Message +} + +func NewCommitPool() *CommitPool { + return &CommitPool{ + commitMsgs: map[string][]*models.Message{}, + } +} + +func (cp *CommitPool) CreateCommit(prepareMsg *models.Message, privateKey []byte) (*models.Message, error) { + var message models.Message + message.Type = models.MessageTypeCommit + var consensusMsg models.ConsensusMessage + prepareCMessage := prepareMsg.Payload + consensusMsg.ConsensusID = prepareCMessage.ConsensusID + consensusMsg.RequestID = prepareMsg.Payload.RequestID + consensusMsg.CallbackAddress = prepareMsg.Payload.CallbackAddress + consensusMsg.Data = prepareCMessage.Data + signature, err := sigs.Sign(types.SigTypeEd25519, privateKey, []byte(prepareCMessage.Data)) + if err != nil { + return nil, err + } + consensusMsg.Signature = signature.Data + message.Payload = consensusMsg + return &message, nil +} + +func (cp *CommitPool) IsExistingCommit(commitMsg *models.Message) bool { + consensusMessage := commitMsg.Payload + var exists bool + for _, v := range cp.commitMsgs[consensusMessage.ConsensusID] { + if v.From == commitMsg.From { + exists = true + } + } + return exists +} + +func (cp *CommitPool) IsValidCommit(commit *models.Message) bool { + consensusMsg := commit.Payload + err := sigs.Verify(&types.Signature{Type: types.SigTypeEd25519, Data: consensusMsg.Signature}, commit.From, []byte(consensusMsg.Data)) + if err != nil { + return false + } + return true +} + +func (cp *CommitPool) AddCommit(commit *models.Message) { + consensusID := commit.Payload.ConsensusID + if _, ok := cp.commitMsgs[consensusID]; !ok { + cp.commitMsgs[consensusID] = []*models.Message{} + } + + cp.commitMsgs[consensusID] = append(cp.commitMsgs[consensusID], commit) +} + +func (cp *CommitPool) CommitSize(consensusID string) int { + if v, ok := cp.commitMsgs[consensusID]; ok { + return len(v) + } + return 0 +} diff --git a/consensus/consensus.go b/consensus/consensus.go index 219eaca..6674663 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -1,133 +1,266 @@ package consensus import ( - "sync" + "math/big" + + "github.com/ethereum/go-ethereum/common" + + "github.com/Secured-Finance/dione/ethclient" + "github.com/sirupsen/logrus" "github.com/Secured-Finance/dione/models" "github.com/Secured-Finance/dione/pb" - "github.com/sirupsen/logrus" -) - -type ConsensusState int - -const ( - consensusPrePrepared ConsensusState = 0x0 - consensusPrepared ConsensusState = 0x1 - consensusCommitted ConsensusState = 0x2 - - testValidData = "test" ) type PBFTConsensusManager struct { - psb *pb.PubSubRouter - Consensuses map[string]*ConsensusData - maxFaultNodes int - miner *Miner + psb *pb.PubSubRouter + //Consensuses map[string]*ConsensusData + //maxFaultNodes int + minApprovals int + privKey []byte + prePreparePool *PrePreparePool + preparePool *PreparePool + commitPool *CommitPool + consensusLeaders map[string]bool + ethereumClient *ethclient.EthereumClient } -type ConsensusData struct { - preparedCount int - commitCount int - State ConsensusState - mutex sync.Mutex - result string - test bool - onConsensusFinishCallback func(finalData string) -} +//type ConsensusData struct { +// preparedCount int +// commitCount int +// mutex sync.Mutex +// result string +// test bool +// onConsensusFinishCallback func(finalData string) +//} -func NewPBFTConsensusManager(psb *pb.PubSubRouter, maxFaultNodes int) *PBFTConsensusManager { +func NewPBFTConsensusManager(psb *pb.PubSubRouter, minApprovals int, privKey []byte, ethereumClient *ethclient.EthereumClient) *PBFTConsensusManager { pcm := &PBFTConsensusManager{} - pcm.Consensuses = make(map[string]*ConsensusData) pcm.psb = psb - pcm.psb.Hook("prepared", pcm.handlePreparedMessage) - pcm.psb.Hook("commit", pcm.handleCommitMessage) + pcm.prePreparePool = NewPrePreparePool() + pcm.preparePool = NewPreparePool() + pcm.commitPool = NewCommitPool() + pcm.minApprovals = minApprovals + pcm.privKey = privKey + pcm.ethereumClient = ethereumClient + pcm.consensusLeaders = map[string]bool{} + pcm.psb.Hook(models.MessageTypePrePrepare, pcm.handlePrePrepare) + pcm.psb.Hook(models.MessageTypePrepare, pcm.handlePrepare) + pcm.psb.Hook(models.MessageTypeCommit, pcm.handleCommit) return pcm } -func (pcm *PBFTConsensusManager) NewTestConsensus(data string, consensusID string, onConsensusFinishCallback func(finalData string)) { - //consensusID := uuid.New().String() - cData := &ConsensusData{} - cData.test = true - cData.onConsensusFinishCallback = onConsensusFinishCallback - pcm.Consensuses[consensusID] = cData +//func (pcm *PBFTConsensusManager) NewTestConsensus(data string, consensusID string, onConsensusFinishCallback func(finalData string)) { +// //consensusID := uuid.New().String() +// cData := &ConsensusData{} +// cData.test = true +// cData.onConsensusFinishCallback = onConsensusFinishCallback +// pcm.Consensuses[consensusID] = cData +// +// // here we will create DioneTask +// +// msg := models.Message{} +// msg.Type = "prepared" +// msg.Payload = make(map[string]interface{}) +// msg.Payload["consensusID"] = consensusID +// msg.Payload["data"] = data +// sign, err := sigs.Sign(types.SigTypeEd25519, pcm.privKey, []byte(data)) +// if err != nil { +// logrus.Warnf("failed to sign data: %w", err) +// return +// } +// msg.Payload["signature"] = string(sign.Data) +// pcm.psb.BroadcastToServiceTopic(&msg) +// +// cData.State = ConsensusPrePrepared +// logrus.Debug("started new consensus: " + consensusID) +//} +// +//func (pcm *PBFTConsensusManager) handlePrePrepareMessage(sender peer.ID, message *models.Message) { +// consensusID := message.Payload["consensusID"].(string) +// if _, ok := pcm.Consensuses[consensusID]; !ok { +// logrus.Warn("Unknown consensus ID,: " + consensusID + ", creating consensusInfo") +// pcm.Consensuses[consensusID] = &ConsensusData{ +// State: ConsensusPrePrepared, +// onConsensusFinishCallback: func(finalData string) {}, +// } +// } +// data := pcm.Consensuses[consensusID] +// logrus.Debug("received pre_prepare msg") +//} +// +//func (pcm *PBFTConsensusManager) handlePreparedMessage(sender peer.ID, message *models.Message) { +// // TODO add check on view of the message +// consensusID := message.Payload["consensusID"].(string) +// if _, ok := pcm.Consensuses[consensusID]; !ok { +// logrus.Warn("Unknown consensus ID,: " + consensusID + ", creating consensusInfo") +// pcm.Consensuses[consensusID] = &ConsensusData{ +// State: ConsensusPrePrepared, +// onConsensusFinishCallback: func(finalData string) {}, +// } +// } +// data := pcm.Consensuses[consensusID] +// logrus.Debug("received prepared msg") +// //data := pcm.Consensuses[consensusID] +// +// // TODO +// // here we can validate miner which produced this task, is he winner, and so on +// // validation steps: +// // 1. validate sender eligibility to mine (check if it has minimal stake) +// // 2. validate sender wincount +// // 3. validate randomness +// // 4. validate vrf +// // 5. validate payload signature +// // 6. validate transaction (get from rpc client and compare with received) +// +// signStr := message.Payload["signature"].(string) +// signRaw := []byte(signStr) +// err := sigs.Verify(&types.Signature{Data: signRaw, Type: types.SigTypeEd25519}, sender, message.Payload["data"].([]byte)) +// if err != nil { +// logrus.Warn("failed to verify data signature") +// return +// } +// +// data.mutex.Lock() +// defer data.mutex.Unlock() +// data.preparedCount++ +// +// if data.preparedCount > 2*pcm.maxFaultNodes+1 { +// msg := models.Message{} +// msg.Payload = make(map[string]interface{}) +// msg.Type = "commit" +// msg.Payload["consensusID"] = consensusID +// msg.Payload["data"] = message.Payload["data"] +// sign, err := sigs.Sign(types.SigTypeEd25519, pcm.privKey, message.Payload["data"].([]byte)) +// if err != nil { +// logrus.Warnf("failed to sign data: %w", err) +// return +// } +// msg.Payload["signature"] = string(sign.Data) +// err = pcm.psb.BroadcastToServiceTopic(&msg) +// if err != nil { +// logrus.Warn("Unable to send COMMIT message: " + err.Error()) +// return +// } +// data.State = ConsensusPrepared +// } +//} +// +//func (pcm *PBFTConsensusManager) handleCommitMessage(sender peer.ID, message *models.Message) { +// // TODO add check on view of the message +// // TODO add validation of data by hash to this stage +// consensusID := message.Payload["consensusID"].(string) +// if _, ok := pcm.Consensuses[consensusID]; !ok { +// logrus.Warn("Unknown consensus ID: " + consensusID) +// return +// } +// data := pcm.Consensuses[consensusID] +// +// data.mutex.Lock() +// defer data.mutex.Unlock() +// if data.State == ConsensusCommitted { +// logrus.Debug("consensus already finished, dropping COMMIT message") +// return +// } +// +// logrus.Debug("received commit msg") +// +// signStr := message.Payload["signature"].(string) +// signRaw := []byte(signStr) +// err := sigs.Verify(&types.Signature{Data: signRaw, Type: types.SigTypeEd25519}, sender, message.Payload["data"].([]byte)) +// if err != nil { +// logrus.Warn("failed to verify data signature") +// return +// } +// +// data.commitCount++ +// +// if data.commitCount > 2*pcm.maxFaultNodes+1 { +// data.State = ConsensusCommitted +// data.result = message.Payload["data"].(string) +// logrus.Debug("consensus successfully finished with result: " + data.result) +// data.onConsensusFinishCallback(data.result) +// } +//} - // here we will create DioneTask - - msg := models.Message{} - msg.Type = "prepared" - msg.Payload = make(map[string]interface{}) - msg.Payload["consensusID"] = consensusID - msg.Payload["data"] = data - pcm.psb.BroadcastToServiceTopic(&msg) - - cData.State = consensusPrePrepared - logrus.Debug("started new consensus: " + consensusID) +func (pcm *PBFTConsensusManager) Propose(consensusID, data string, requestID *big.Int, callbackAddress common.Address) error { + pcm.consensusLeaders[consensusID] = true + reqIDRaw := requestID.String() + callbackAddressHex := callbackAddress.Hex() + prePrepareMsg, err := pcm.prePreparePool.CreatePrePrepare(consensusID, data, reqIDRaw, callbackAddressHex, pcm.privKey) + if err != nil { + return err + } + pcm.psb.BroadcastToServiceTopic(prePrepareMsg) + return nil } -func (pcm *PBFTConsensusManager) handlePreparedMessage(message *models.Message) { - // TODO add check on view of the message - consensusID := message.Payload["consensusID"].(string) - if _, ok := pcm.Consensuses[consensusID]; !ok { - logrus.Warn("Unknown consensus ID: " + consensusID) +func (pcm *PBFTConsensusManager) handlePrePrepare(message *models.Message) { + if pcm.prePreparePool.IsExistingPrePrepare(message) { + logrus.Debug("received existing pre_prepare msg, dropping...") + return + } + if !pcm.prePreparePool.IsValidPrePrepare(message) { + logrus.Debug("received invalid pre_prepare msg, dropping...") return } - logrus.Debug("received prepared msg") - data := pcm.Consensuses[consensusID] - // TODO - // here we can validate miner which produced this task, is he winner, and so on - // validation steps: - // 1. validate sender eligibility to mine (check if it has minimal stake) - // 2. validate sender wincount - // 3. validate randomness - // 4. validate vrf - // 5. validate payload signature - // 6. validate transaction (get from rpc client and compare with received) + pcm.psb.BroadcastToServiceTopic(message) + prepareMsg, err := pcm.preparePool.CreatePrepare(message, pcm.privKey) + if err != nil { + logrus.Errorf("failed to create prepare message: %w", err) + } + pcm.psb.BroadcastToServiceTopic(prepareMsg) +} - data.mutex.Lock() - data.preparedCount++ - data.mutex.Unlock() +func (pcm *PBFTConsensusManager) handlePrepare(message *models.Message) { + if pcm.preparePool.IsExistingPrepare(message) { + logrus.Debug("received existing prepare msg, dropping...") + return + } + if !pcm.preparePool.IsValidPrepare(message) { + logrus.Debug("received invalid prepare msg, dropping...") + return + } - if data.preparedCount > 2*pcm.maxFaultNodes+1 { - msg := models.Message{} - msg.Payload = make(map[string]interface{}) - msg.Type = "commit" - msg.Payload["consensusID"] = consensusID - msg.Payload["data"] = message.Payload["data"] - err := pcm.psb.BroadcastToServiceTopic(&msg) + pcm.preparePool.AddPrepare(message) + pcm.psb.BroadcastToServiceTopic(message) + + if pcm.preparePool.PrepareSize(message.Payload.ConsensusID) >= pcm.minApprovals { + commitMsg, err := pcm.commitPool.CreateCommit(message, pcm.privKey) if err != nil { - logrus.Warn("Unable to send COMMIT message: " + err.Error()) - return + logrus.Errorf("failed to create commit message: %w", err) } - data.State = consensusPrepared + pcm.psb.BroadcastToServiceTopic(commitMsg) } } -func (pcm *PBFTConsensusManager) handleCommitMessage(message *models.Message) { - // TODO add check on view of the message - // TODO add validation of data by hash to this stage - consensusID := message.Payload["consensusID"].(string) - if _, ok := pcm.Consensuses[consensusID]; !ok { - logrus.Warn("Unknown consensus ID: " + consensusID) +func (pcm *PBFTConsensusManager) handleCommit(message *models.Message) { + if pcm.commitPool.IsExistingCommit(message) { + logrus.Debug("received existing commit msg, dropping...") return } - data := pcm.Consensuses[consensusID] - - data.mutex.Lock() - defer data.mutex.Unlock() - if data.State == consensusCommitted { - logrus.Debug("consensus already finished, dropping COMMIT message") + if !pcm.commitPool.IsValidCommit(message) { + logrus.Debug("received invalid commit msg, dropping...") return } - logrus.Debug("received commit msg") + pcm.commitPool.AddCommit(message) + pcm.psb.BroadcastToServiceTopic(message) - data.commitCount++ - - if data.commitCount > 2*pcm.maxFaultNodes+1 { - data.State = consensusCommitted - data.result = message.Payload["data"].(string) - logrus.Debug("consensus successfully finished with result: " + data.result) - data.onConsensusFinishCallback(data.result) + consensusMsg := message.Payload + if pcm.commitPool.CommitSize(consensusMsg.ConsensusID) >= pcm.minApprovals { + if pcm.consensusLeaders[consensusMsg.ConsensusID] { + logrus.Infof("Submitting on-chain result for consensus ID: %s", consensusMsg.ConsensusID) + reqID, ok := new(big.Int).SetString(consensusMsg.RequestID, 10) + if !ok { + logrus.Errorf("Failed to parse big int: %v", consensusMsg.RequestID) + } + callbackAddress := common.HexToAddress(consensusMsg.CallbackAddress) + err := pcm.ethereumClient.SubmitRequestAnswer(reqID, consensusMsg.Data, callbackAddress) + if err != nil { + logrus.Errorf("Failed to submit on-chain result: %w", err) + } + } } } diff --git a/consensus/miner.go b/consensus/miner.go index 612d1bc..8198b1d 100644 --- a/consensus/miner.go +++ b/consensus/miner.go @@ -2,10 +2,14 @@ package consensus import ( "context" + "encoding/json" "sync" - "github.com/Secured-Finance/dione/beacon" + solana2 "github.com/Secured-Finance/dione/rpc/solana" + "github.com/Secured-Finance/dione/beacon" + oracleEmitter "github.com/Secured-Finance/dione/contracts/oracleemitter" + solTypes "github.com/Secured-Finance/dione/rpc/solana/types" "github.com/libp2p/go-libp2p-core/peer" "github.com/Secured-Finance/dione/ethclient" @@ -23,6 +27,7 @@ type Miner struct { mutex sync.Mutex beacon beacon.BeaconNetworks ethClient *ethclient.EthereumClient + solanaClient *solana2.SolanaClient minerStake types.BigInt networkStake types.BigInt } @@ -33,13 +38,15 @@ func NewMiner( api WalletAPI, beacon beacon.BeaconNetworks, ethClient *ethclient.EthereumClient, + solanaClient *solana2.SolanaClient, ) *Miner { return &Miner{ - address: address, - ethAddress: ethAddress, - api: api, - beacon: beacon, - ethClient: ethClient, + address: address, + ethAddress: ethAddress, + api: api, + beacon: beacon, + ethClient: ethClient, + solanaClient: solanaClient, } } @@ -68,7 +75,7 @@ func (m *Miner) UpdateCurrentStakeInfo() error { return nil } -func (m *Miner) MineTask(ctx context.Context, payload []byte) (*types.DioneTask, error) { +func (m *Miner) MineTask(ctx context.Context, event *oracleEmitter.OracleEmitterNewOracleRequest, sign SignFunc) (*types.DioneTask, error) { bvals, err := beacon.BeaconEntriesForTask(ctx, m.beacon) if err != nil { return nil, xerrors.Errorf("failed to get beacon entries: %w", err) @@ -94,14 +101,31 @@ func (m *Miner) MineTask(ctx context.Context, payload []byte) (*types.DioneTask, if winner == nil { return nil, nil } + + res, err := m.solanaClient.GetTransaction(event.RequestParams) + if err != nil { + return nil, xerrors.Errorf("Couldn't get solana request: %w", err) + } + response := res.Body() + var txRes solTypes.TxResponse + if err = json.Unmarshal(response, &txRes); err != nil { + return nil, xerrors.Errorf("Couldn't unmarshal solana response: %w", err) + } + blockHash := txRes.Result.Transaction.Message.RecentBlockhash + signature, err := sign(ctx, m.address, response) + if err != nil { + return nil, xerrors.Errorf("Couldn't sign solana response: %w", err) + } + return &types.DioneTask{ Miner: m.address, Ticket: ticket, ElectionProof: winner, BeaconEntries: bvals, - Payload: payload, - // TODO: signature - DrandRound: types.DrandRound(rbase.Round), + Payload: response, + BlockHash: blockHash, + Signature: signature, + DrandRound: types.DrandRound(rbase.Round), }, nil } diff --git a/consensus/pre_prepare_pool.go b/consensus/pre_prepare_pool.go new file mode 100644 index 0000000..3bd125e --- /dev/null +++ b/consensus/pre_prepare_pool.go @@ -0,0 +1,66 @@ +package consensus + +import ( + "github.com/Secured-Finance/dione/models" + "github.com/Secured-Finance/dione/sigs" + "github.com/Secured-Finance/dione/types" + "github.com/sirupsen/logrus" +) + +type PrePreparePool struct { + prePrepareMsgs map[string][]*models.Message +} + +func NewPrePreparePool() *PrePreparePool { + return &PrePreparePool{ + prePrepareMsgs: map[string][]*models.Message{}, + } +} + +func (pp *PrePreparePool) CreatePrePrepare(consensusID, data string, requestID string, callbackAddress string, privateKey []byte) (*models.Message, error) { + var message models.Message + message.Type = models.MessageTypePrePrepare + var consensusMsg models.ConsensusMessage + consensusMsg.ConsensusID = consensusID + consensusMsg.RequestID = requestID + consensusMsg.CallbackAddress = callbackAddress + consensusMsg.Data = data + signature, err := sigs.Sign(types.SigTypeEd25519, privateKey, []byte(data)) + if err != nil { + return nil, err + } + consensusMsg.Signature = signature.Data + message.Payload = consensusMsg + return &message, nil +} + +func (ppp *PrePreparePool) IsExistingPrePrepare(prepareMsg *models.Message) bool { + consensusMessage := prepareMsg.Payload + var exists bool + for _, v := range ppp.prePrepareMsgs[consensusMessage.ConsensusID] { + if v.From == prepareMsg.From { + exists = true + } + } + return exists +} + +func (ppp *PrePreparePool) IsValidPrePrepare(prePrepare *models.Message) bool { + // TODO here we need to do validation of tx itself + consensusMsg := prePrepare.Payload + err := sigs.Verify(&types.Signature{Type: types.SigTypeEd25519, Data: consensusMsg.Signature}, prePrepare.From, []byte(consensusMsg.Data)) + if err != nil { + logrus.Errorf("unable to verify signature: %v", err) + return false + } + return true +} + +func (ppp *PrePreparePool) AddPrePrepare(prePrepare *models.Message) { + consensusID := prePrepare.Payload.ConsensusID + if _, ok := ppp.prePrepareMsgs[consensusID]; !ok { + ppp.prePrepareMsgs[consensusID] = []*models.Message{} + } + + ppp.prePrepareMsgs[consensusID] = append(ppp.prePrepareMsgs[consensusID], prePrepare) +} diff --git a/consensus/prepare_pool.go b/consensus/prepare_pool.go new file mode 100644 index 0000000..c395c4c --- /dev/null +++ b/consensus/prepare_pool.go @@ -0,0 +1,72 @@ +package consensus + +import ( + "github.com/Secured-Finance/dione/models" + "github.com/Secured-Finance/dione/sigs" + "github.com/Secured-Finance/dione/types" +) + +type PreparePool struct { + prepareMsgs map[string][]*models.Message + privateKey []byte +} + +func NewPreparePool() *PreparePool { + return &PreparePool{ + prepareMsgs: map[string][]*models.Message{}, + } +} + +func (pp *PreparePool) CreatePrepare(prePrepareMsg *models.Message, privateKey []byte) (*models.Message, error) { + var message models.Message + message.Type = models.MessageTypePrepare + var consensusMsg models.ConsensusMessage + prepareCMessage := prePrepareMsg.Payload + consensusMsg.ConsensusID = prepareCMessage.ConsensusID + consensusMsg.RequestID = prePrepareMsg.Payload.RequestID + consensusMsg.CallbackAddress = prePrepareMsg.Payload.CallbackAddress + consensusMsg.Data = prepareCMessage.Data + signature, err := sigs.Sign(types.SigTypeEd25519, privateKey, []byte(prepareCMessage.Data)) + if err != nil { + return nil, err + } + consensusMsg.Signature = signature.Data + message.Payload = consensusMsg + return &message, nil +} + +func (pp *PreparePool) IsExistingPrepare(prepareMsg *models.Message) bool { + consensusMessage := prepareMsg.Payload + var exists bool + for _, v := range pp.prepareMsgs[consensusMessage.ConsensusID] { + if v.From == prepareMsg.From { + exists = true + } + } + return exists +} + +func (pp *PreparePool) IsValidPrepare(prepare *models.Message) bool { + consensusMsg := prepare.Payload + err := sigs.Verify(&types.Signature{Type: types.SigTypeEd25519, Data: consensusMsg.Signature}, prepare.From, []byte(consensusMsg.Data)) + if err != nil { + return false + } + return true +} + +func (pp *PreparePool) AddPrepare(prepare *models.Message) { + consensusID := prepare.Payload.ConsensusID + if _, ok := pp.prepareMsgs[consensusID]; !ok { + pp.prepareMsgs[consensusID] = []*models.Message{} + } + + pp.prepareMsgs[consensusID] = append(pp.prepareMsgs[consensusID], prepare) +} + +func (pp *PreparePool) PrepareSize(consensusID string) int { + if v, ok := pp.prepareMsgs[consensusID]; ok { + return len(v) + } + return 0 +} diff --git a/contracts/aggregator/Aggregator.go b/contracts/aggregator/Aggregator.go index eb270e9..7156d31 100644 --- a/contracts/aggregator/Aggregator.go +++ b/contracts/aggregator/Aggregator.go @@ -27,7 +27,7 @@ var ( ) // AggregatorABI is the input ABI used to generate the binding from. -const AggregatorABI = "[{\"anonymous\":false,\"inputs\":[{\"indexed\":true,\"internalType\":\"address\",\"name\":\"previousOwner\",\"type\":\"address\"},{\"indexed\":true,\"internalType\":\"address\",\"name\":\"newOwner\",\"type\":\"address\"}],\"name\":\"OwnershipTransferred\",\"type\":\"event\"},{\"inputs\":[],\"name\":\"dioneStaking\",\"outputs\":[{\"internalType\":\"contractIDioneStaking\",\"name\":\"\",\"type\":\"address\"}],\"stateMutability\":\"view\",\"type\":\"function\"},{\"inputs\":[],\"name\":\"owner\",\"outputs\":[{\"internalType\":\"address\",\"name\":\"\",\"type\":\"address\"}],\"stateMutability\":\"view\",\"type\":\"function\"},{\"inputs\":[],\"name\":\"renounceOwnership\",\"outputs\":[],\"stateMutability\":\"nonpayable\",\"type\":\"function\"},{\"inputs\":[{\"internalType\":\"address\",\"name\":\"newOwner\",\"type\":\"address\"}],\"name\":\"transferOwnership\",\"outputs\":[],\"stateMutability\":\"nonpayable\",\"type\":\"function\"},{\"inputs\":[{\"internalType\":\"contractIDioneStaking\",\"name\":\"_dioneStaking\",\"type\":\"address\"}],\"name\":\"setDioneStaking\",\"outputs\":[],\"stateMutability\":\"nonpayable\",\"type\":\"function\"},{\"inputs\":[{\"internalType\":\"uint256\",\"name\":\"reqID\",\"type\":\"uint256\"},{\"internalType\":\"string\",\"name\":\"data\",\"type\":\"string\"},{\"internalType\":\"address\",\"name\":\"callbackAddress\",\"type\":\"address\"},{\"internalType\":\"bytes4\",\"name\":\"callbackMethodID\",\"type\":\"bytes4\"}],\"name\":\"collectData\",\"outputs\":[],\"stateMutability\":\"nonpayable\",\"type\":\"function\"}]" +const AggregatorABI = "[{\"anonymous\":false,\"inputs\":[{\"indexed\":true,\"internalType\":\"address\",\"name\":\"previousOwner\",\"type\":\"address\"},{\"indexed\":true,\"internalType\":\"address\",\"name\":\"newOwner\",\"type\":\"address\"}],\"name\":\"OwnershipTransferred\",\"type\":\"event\"},{\"inputs\":[],\"name\":\"dioneStaking\",\"outputs\":[{\"internalType\":\"contractIDioneStaking\",\"name\":\"\",\"type\":\"address\"}],\"stateMutability\":\"view\",\"type\":\"function\"},{\"inputs\":[],\"name\":\"owner\",\"outputs\":[{\"internalType\":\"address\",\"name\":\"\",\"type\":\"address\"}],\"stateMutability\":\"view\",\"type\":\"function\"},{\"inputs\":[],\"name\":\"renounceOwnership\",\"outputs\":[],\"stateMutability\":\"nonpayable\",\"type\":\"function\"},{\"inputs\":[{\"internalType\":\"address\",\"name\":\"newOwner\",\"type\":\"address\"}],\"name\":\"transferOwnership\",\"outputs\":[],\"stateMutability\":\"nonpayable\",\"type\":\"function\"},{\"inputs\":[{\"internalType\":\"contractIDioneStaking\",\"name\":\"_dioneStaking\",\"type\":\"address\"}],\"name\":\"setDioneStaking\",\"outputs\":[],\"stateMutability\":\"nonpayable\",\"type\":\"function\"},{\"inputs\":[{\"internalType\":\"uint256\",\"name\":\"reqID\",\"type\":\"uint256\"},{\"internalType\":\"string\",\"name\":\"data\",\"type\":\"string\"},{\"internalType\":\"contractIMediator\",\"name\":\"callbackAddress\",\"type\":\"address\"}],\"name\":\"collectData\",\"outputs\":[],\"stateMutability\":\"nonpayable\",\"type\":\"function\"}]" // Aggregator is an auto generated Go binding around an Ethereum contract. type Aggregator struct { @@ -223,25 +223,25 @@ func (_Aggregator *AggregatorCallerSession) Owner() (common.Address, error) { return _Aggregator.Contract.Owner(&_Aggregator.CallOpts) } -// CollectData is a paid mutator transaction binding the contract method 0x05187092. +// CollectData is a paid mutator transaction binding the contract method 0xce45d837. // -// Solidity: function collectData(uint256 reqID, string data, address callbackAddress, bytes4 callbackMethodID) returns() -func (_Aggregator *AggregatorTransactor) CollectData(opts *bind.TransactOpts, reqID *big.Int, data string, callbackAddress common.Address, callbackMethodID [4]byte) (*types.Transaction, error) { - return _Aggregator.contract.Transact(opts, "collectData", reqID, data, callbackAddress, callbackMethodID) +// Solidity: function collectData(uint256 reqID, string data, address callbackAddress) returns() +func (_Aggregator *AggregatorTransactor) CollectData(opts *bind.TransactOpts, reqID *big.Int, data string, callbackAddress common.Address) (*types.Transaction, error) { + return _Aggregator.contract.Transact(opts, "collectData", reqID, data, callbackAddress) } -// CollectData is a paid mutator transaction binding the contract method 0x05187092. +// CollectData is a paid mutator transaction binding the contract method 0xce45d837. // -// Solidity: function collectData(uint256 reqID, string data, address callbackAddress, bytes4 callbackMethodID) returns() -func (_Aggregator *AggregatorSession) CollectData(reqID *big.Int, data string, callbackAddress common.Address, callbackMethodID [4]byte) (*types.Transaction, error) { - return _Aggregator.Contract.CollectData(&_Aggregator.TransactOpts, reqID, data, callbackAddress, callbackMethodID) +// Solidity: function collectData(uint256 reqID, string data, address callbackAddress) returns() +func (_Aggregator *AggregatorSession) CollectData(reqID *big.Int, data string, callbackAddress common.Address) (*types.Transaction, error) { + return _Aggregator.Contract.CollectData(&_Aggregator.TransactOpts, reqID, data, callbackAddress) } -// CollectData is a paid mutator transaction binding the contract method 0x05187092. +// CollectData is a paid mutator transaction binding the contract method 0xce45d837. // -// Solidity: function collectData(uint256 reqID, string data, address callbackAddress, bytes4 callbackMethodID) returns() -func (_Aggregator *AggregatorTransactorSession) CollectData(reqID *big.Int, data string, callbackAddress common.Address, callbackMethodID [4]byte) (*types.Transaction, error) { - return _Aggregator.Contract.CollectData(&_Aggregator.TransactOpts, reqID, data, callbackAddress, callbackMethodID) +// Solidity: function collectData(uint256 reqID, string data, address callbackAddress) returns() +func (_Aggregator *AggregatorTransactorSession) CollectData(reqID *big.Int, data string, callbackAddress common.Address) (*types.Transaction, error) { + return _Aggregator.Contract.CollectData(&_Aggregator.TransactOpts, reqID, data, callbackAddress) } // RenounceOwnership is a paid mutator transaction binding the contract method 0x715018a6. diff --git a/eth-contracts/contracts/Aggregator.sol b/eth-contracts/contracts/Aggregator.sol index 164f2d3..f4c1b43 100644 --- a/eth-contracts/contracts/Aggregator.sol +++ b/eth-contracts/contracts/Aggregator.sol @@ -10,6 +10,10 @@ interface IDioneStaking { function isLegitMiner(address _minerAddr) external returns (bool); } +interface IMediator { + function _receiveDataCallback(uint256 reqID, string memory data) external; +} + contract Aggregator is Ownable, ReentrancyGuard { IDioneStaking public dioneStaking; @@ -18,9 +22,7 @@ contract Aggregator is Ownable, ReentrancyGuard { dioneStaking = _dioneStaking; } - function collectData(uint256 reqID, string memory data, address callbackAddress, bytes4 callbackMethodID) public nonReentrant { - require(dioneStaking.isLegitMiner(msg.sender)); - (bool success,) = callbackAddress.call(abi.encode(callbackMethodID, reqID, data)); - require(success); + function collectData(uint256 reqID, string memory data, IMediator callbackAddress) public nonReentrant { + callbackAddress._receiveDataCallback(reqID, data); } } diff --git a/eth-contracts/contracts/Mediator.sol b/eth-contracts/contracts/Mediator.sol index 2363306..fcbac98 100644 --- a/eth-contracts/contracts/Mediator.sol +++ b/eth-contracts/contracts/Mediator.sol @@ -18,7 +18,7 @@ contract Mediator is Ownable { aggregator = _aggregator; } - function request(uint8 memory originChain, uint8 memory requestType, string memory requestParams) public returns (uint256) { + function request(string memory originChain, string memory requestType, string memory requestParams) public returns (uint256) { return oracleEmitter.requestOracles(originChain, requestType, requestParams, address(this), bytes4(keccak256("_receiveDataCallback(uint256, string)"))); } diff --git a/eth-contracts/contracts/OracleEmitter.sol b/eth-contracts/contracts/OracleEmitter.sol index f0df37a..b0a3b14 100644 --- a/eth-contracts/contracts/OracleEmitter.sol +++ b/eth-contracts/contracts/OracleEmitter.sol @@ -5,15 +5,15 @@ contract OracleEmitter { uint256 requestCounter; event NewOracleRequest( - uint8 originChain, - uint8 requestType, + string originChain, + string requestType, string requestParams, address callbackAddress, bytes4 callbackMethodID, uint256 requestID ); - function requestOracles(uint8 memory originChain, uint8 memory requestType, string memory requestParams, address callbackAddress, bytes4 callbackMethodID) public returns (uint256) { + function requestOracles(string memory originChain, string memory requestType, string memory requestParams, address callbackAddress, bytes4 callbackMethodID) public returns (uint256) { requestCounter++; emit NewOracleRequest(originChain, requestType, requestParams, callbackAddress, callbackMethodID, requestCounter); return requestCounter; diff --git a/ethclient/ethereum.go b/ethclient/ethereum.go index 682a71d..a9773b1 100644 --- a/ethclient/ethereum.go +++ b/ethclient/ethereum.go @@ -5,6 +5,7 @@ import ( "math/big" "github.com/Secured-Finance/dione/contracts/aggregator" + "github.com/Secured-Finance/dione/contracts/dioneStaking" stakingContract "github.com/Secured-Finance/dione/contracts/dioneStaking" oracleEmitter "github.com/Secured-Finance/dione/contracts/oracleemitter" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -44,7 +45,7 @@ func NewEthereumClient() *EthereumClient { return ethereumClient } -func (c *EthereumClient) Initialize(ctx context.Context, url, privateKey, oracleEmitterContractAddress, aggregatorContractAddress string) error { +func (c *EthereumClient) Initialize(ctx context.Context, url, privateKey, oracleEmitterContractAddress, aggregatorContractAddress, dioneStakingAddress string) error { client, err := ethclient.Dial(url) if err != nil { return err @@ -66,6 +67,10 @@ func (c *EthereumClient) Initialize(ctx context.Context, url, privateKey, oracle if err != nil { return err } + stakingContract, err := dioneStaking.NewDioneStaking(common.HexToAddress(dioneStakingAddress), client) + if err != nil { + return err + } c.oracleEmitter = &oracleEmitter.OracleEmitterSession{ Contract: emitter, CallOpts: bind.CallOpts{ @@ -76,8 +81,8 @@ func (c *EthereumClient) Initialize(ctx context.Context, url, privateKey, oracle TransactOpts: bind.TransactOpts{ From: authTransactor.From, Signer: authTransactor.Signer, - GasLimit: 0, // 0 automatically estimates gas limit - GasPrice: nil, // nil automatically suggests gas price + GasLimit: 200000, // 0 automatically estimates gas limit + GasPrice: nil, // nil automatically suggests gas price Context: context.Background(), }, } @@ -91,8 +96,23 @@ func (c *EthereumClient) Initialize(ctx context.Context, url, privateKey, oracle TransactOpts: bind.TransactOpts{ From: authTransactor.From, Signer: authTransactor.Signer, - GasLimit: 0, // 0 automatically estimates gas limit - GasPrice: nil, // nil automatically suggests gas price + GasLimit: 200000, // 0 automatically estimates gas limit + GasPrice: nil, // nil automatically suggests gas price + Context: context.Background(), + }, + } + c.dioneStaking = &dioneStaking.DioneStakingSession{ + Contract: stakingContract, + CallOpts: bind.CallOpts{ + Pending: true, + From: authTransactor.From, + Context: context.Background(), + }, + TransactOpts: bind.TransactOpts{ + From: authTransactor.From, + Signer: authTransactor.Signer, + GasLimit: 200000, // 0 automatically estimates gas limit + GasPrice: big.NewInt(1860127603), // nil automatically suggests gas price Context: context.Background(), }, } @@ -103,12 +123,12 @@ func (c *EthereumClient) GetEthAddress() *common.Address { return c.ethAddress } -func (c *EthereumClient) SubscribeOnOracleEvents() (chan *oracleEmitter.OracleEmitterNewOracleRequest, event.Subscription, error) { +func (c *EthereumClient) SubscribeOnOracleEvents(ctx context.Context) (chan *oracleEmitter.OracleEmitterNewOracleRequest, event.Subscription, error) { resChan := make(chan *oracleEmitter.OracleEmitterNewOracleRequest) requestsFilter := c.oracleEmitter.Contract.OracleEmitterFilterer subscription, err := requestsFilter.WatchNewOracleRequest(&bind.WatchOpts{ Start: nil, //last block - Context: nil, + Context: ctx, }, resChan) if err != nil { return nil, nil, err @@ -116,7 +136,7 @@ func (c *EthereumClient) SubscribeOnOracleEvents() (chan *oracleEmitter.OracleEm return resChan, subscription, err } -func (c *EthereumClient) SubmitRequestAnswer(reqID *big.Int, data string, callbackAddress common.Address, callbackMethodID [4]byte) error { +func (c *EthereumClient) SubmitRequestAnswer(reqID *big.Int, data string, callbackAddress common.Address) error { // privateKey, err := crypto.HexToECDSA(private_key) // if err != nil { // c.Logger.Fatal("Failed to generate private key", err) @@ -142,7 +162,7 @@ func (c *EthereumClient) SubmitRequestAnswer(reqID *big.Int, data string, callba // c.Logger.Fatal(err) // } - _, err := c.aggregator.CollectData(reqID, data, callbackAddress, callbackMethodID) + _, err := c.aggregator.CollectData(reqID, data, callbackAddress) if err != nil { return err } diff --git a/go.mod b/go.mod index 0471c94..c647c51 100644 --- a/go.mod +++ b/go.mod @@ -41,6 +41,7 @@ require ( github.com/mattn/go-colorable v0.1.6 // indirect github.com/mattn/go-sqlite3 v1.9.0 github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 + github.com/mitchellh/mapstructure v1.3.3 github.com/multiformats/go-multiaddr v0.3.1 github.com/olekukonko/tablewriter v0.0.4 // indirect github.com/onsi/ginkgo v1.14.0 // indirect diff --git a/go.sum b/go.sum index 838f95f..27c71b2 100644 --- a/go.sum +++ b/go.sum @@ -743,6 +743,8 @@ github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0Qu github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/mitchellh/mapstructure v1.3.3 h1:SzB1nHZ2Xi+17FP0zVQBHIZqvwRN9408fJO8h+eeNA8= +github.com/mitchellh/mapstructure v1.3.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= diff --git a/models/message.go b/models/message.go index 1bd3d66..1a8af34 100644 --- a/models/message.go +++ b/models/message.go @@ -1,7 +1,29 @@ package models -type Message struct { - Type string `json:"type"` - Payload map[string]interface{} `json:"payload"` - From string `json:"-"` +import ( + "github.com/libp2p/go-libp2p-core/peer" +) + +type MessageType uint8 + +const ( + MessageTypeUnknown = MessageType(iota) + + MessageTypePrePrepare + MessageTypePrepare + MessageTypeCommit +) + +type ConsensusMessage struct { + ConsensusID string + Signature []byte + RequestID string + CallbackAddress string + Data string +} + +type Message struct { + Type MessageType `json:"type"` + Payload ConsensusMessage `json:"payload"` + From peer.ID `json:"-"` } diff --git a/node/ethereum.go b/node/ethereum.go new file mode 100644 index 0000000..13d3797 --- /dev/null +++ b/node/ethereum.go @@ -0,0 +1,56 @@ +package node + +import ( + "context" + + "github.com/sirupsen/logrus" +) + +func (n *Node) subscribeOnEthContracts(ctx context.Context) { + eventChan, subscription, err := n.Ethereum.SubscribeOnOracleEvents(ctx) + if err != nil { + logrus.Fatal("Can't subscribe on ethereum contracts, exiting... ", err) + } + + go func() { + EventLoop: + for { + select { + case event := <-eventChan: + { + task, err := n.Miner.MineTask(ctx, event, n.Wallet.WalletSign) + if err != nil { + logrus.Fatal("Error with mining algorithm, exiting... ", err) + } + if task == nil { + continue + } + logrus.Info("BlockHash for Solana transaction: ", task.BlockHash) + logrus.Info("Started new consensus round with ID: ", task.BlockHash) + //n.ConsensusManager.NewTestConsensus(string(task.BlockHash), task.BlockHash, func(finalData string) { + // if finalData != string(task.BlockHash) { + // logrus.Warnf("Expected final data to be %s, not %s", task.BlockHash, finalData) + // return + // } + // logrus.Info("Consensus ID: ", task.BlockHash, " was successfull") + // logrus.Info("Submitting on-chain result: ", task.BlockHash, "for consensus ID: ", task.BlockHash) + // if task.Miner == n.Host.ID() { + // if err := n.Ethereum.SubmitRequestAnswer(event.RequestID, task.BlockHash, event.CallbackAddress, event.CallbackMethodID); err != nil { + // logrus.Warn("Can't submit request to ethereum chain: ", err) + // } + // } + //}) + + err = n.ConsensusManager.Propose(task.BlockHash, task.BlockHash, event.RequestID, event.CallbackAddress) + if err != nil { + logrus.Errorf("Failed to propose task: %w", err) + } + } + case <-ctx.Done(): + break EventLoop + case <-subscription.Err(): + logrus.Fatal("Error with ethereum subscription, exiting... ", err) + } + } + }() +} diff --git a/node/node.go b/node/node.go index 2d08b6a..f9738f4 100644 --- a/node/node.go +++ b/node/node.go @@ -5,9 +5,14 @@ import ( "crypto/rand" "flag" "fmt" + "io/ioutil" + "os" "time" - "github.com/Secured-Finance/dione/solana" + solana2 "github.com/Secured-Finance/dione/rpc/solana" + + "github.com/Secured-Finance/dione/rpc/filecoin" + "github.com/Secured-Finance/dione/types" "github.com/Secured-Finance/dione/wallet" @@ -22,7 +27,6 @@ import ( "github.com/Secured-Finance/dione/consensus" "github.com/Secured-Finance/dione/ethclient" "github.com/Secured-Finance/dione/pb" - "github.com/Secured-Finance/dione/rpc" "github.com/libp2p/go-libp2p" crypto "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" @@ -31,7 +35,7 @@ import ( ) const ( - DefaultPEXUpdateTime = 1 * time.Minute + DefaultPEXUpdateTime = 6 * time.Second ) type Node struct { @@ -41,9 +45,9 @@ type Node struct { GlobalCtxCancel context.CancelFunc OracleTopic string Config *config.Config - Lotus *rpc.LotusClient + Lotus *filecoin.LotusClient Ethereum *ethclient.EthereumClient - Solana *solana.SolanaClient + Solana *solana2.SolanaClient ConsensusManager *consensus.PBFTConsensusManager Miner *consensus.Miner Beacon beacon.BeaconNetworks @@ -70,8 +74,12 @@ func (n *Node) setupNode(ctx context.Context, prvKey crypto.PrivKey, pexDiscover if err != nil { logrus.Fatal(err) } + n.setupSolanaClient() n.setupPubsub() - n.setupConsensusManager(n.Config.ConsensusMaxFaultNodes) + err = n.setupConsensusManager(prvKey, n.Config.ConsensusMinApprovals) + if err != nil { + logrus.Fatalf("Failed to setup consensus manager: %w", err) + } err = n.setupBeacon() if err != nil { logrus.Fatal(err) @@ -84,10 +92,11 @@ func (n *Node) setupNode(ctx context.Context, prvKey crypto.PrivKey, pexDiscover if err != nil { logrus.Fatal(err) } + n.subscribeOnEthContracts(ctx) } func (n *Node) setupMiner() error { - n.Miner = consensus.NewMiner(n.Host.ID(), *n.Ethereum.GetEthAddress(), n.Wallet, n.Beacon, n.Ethereum) + n.Miner = consensus.NewMiner(n.Host.ID(), *n.Ethereum.GetEthAddress(), n.Wallet, n.Beacon, n.Ethereum, n.Solana) return nil } @@ -129,16 +138,17 @@ func (n *Node) setupEthereumClient() error { n.Config.Ethereum.PrivateKey, n.Config.Ethereum.OracleEmitterContractAddress, n.Config.Ethereum.AggregatorContractAddress, + n.Config.Ethereum.DioneStakingContractAddress, ) } func (n *Node) setupFilecoinClient() { - lotus := rpc.NewLotusClient(n.Config.Filecoin.LotusHost, n.Config.Filecoin.LotusToken) + lotus := filecoin.NewLotusClient(n.Config.Filecoin.LotusHost, n.Config.Filecoin.LotusToken) n.Lotus = lotus } func (n *Node) setupSolanaClient() { - solana := solana.NewSolanaClient() + solana := solana2.NewSolanaClient() n.Solana = solana } @@ -148,8 +158,13 @@ func (n *Node) setupPubsub() { //time.Sleep(3 * time.Second) } -func (n *Node) setupConsensusManager(maxFaultNodes int) { - n.ConsensusManager = consensus.NewPBFTConsensusManager(n.PubSubRouter, maxFaultNodes) +func (n *Node) setupConsensusManager(privateKey crypto.PrivKey, minApprovals int) error { + pkeyRaw, err := privateKey.Raw() + if err != nil { + return err + } + n.ConsensusManager = consensus.NewPBFTConsensusManager(n.PubSubRouter, minApprovals, pkeyRaw, n.Ethereum) + return nil } func (n *Node) setupLibp2pHost(ctx context.Context, privateKey crypto.PrivKey, pexDiscoveryUpdateTime time.Duration) { @@ -177,7 +192,9 @@ func (n *Node) setupLibp2pHost(ctx context.Context, privateKey crypto.PrivKey, p } bootstrapMaddrs = append(bootstrapMaddrs, maddr) } - + if n.Config.IsBootstrap { + bootstrapMaddrs = nil + } discovery, err := pex.NewPEXDiscovery(host, bootstrapMaddrs, pexDiscoveryUpdateTime) if err != nil { logrus.Fatal("Can't set up PEX discovery protocol, exiting... ", err) @@ -235,22 +252,37 @@ func Start() error { if *verbose { logrus.SetLevel(logrus.DebugLevel) } else { - logrus.SetLevel(logrus.InfoLevel) + logrus.SetLevel(logrus.DebugLevel) } if err != nil { logrus.Panic(err) } - privKey, err := generatePrivateKey() - if err != nil { - logrus.Fatal(err) + var privateKey crypto.PrivKey + + if node.Config.IsBootstrap { + if _, err := os.Stat(".bootstrap_privkey"); os.IsNotExist(err) { + privateKey, err = generatePrivateKey() + if err != nil { + logrus.Fatal(err) + } + + f, _ := os.Create(".bootstrap_privkey") + r, _ := privateKey.Raw() + f.Write(r) + } else { + pkey, _ := ioutil.ReadFile(".bootstrap_privkey") + privateKey, _ = crypto.UnmarshalEd25519PrivateKey(pkey) + } + } else { + privateKey, err = generatePrivateKey() } ctx, ctxCancel := context.WithCancel(context.Background()) node.GlobalCtx = ctx node.GlobalCtxCancel = ctxCancel - node.setupNode(ctx, privKey, DefaultPEXUpdateTime) + node.setupNode(ctx, privateKey, DefaultPEXUpdateTime) for { select { case <-ctx.Done(): diff --git a/node/node_test.go b/node/node_test.go index 7c66347..7a71854 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -27,7 +27,7 @@ func TestConsensus(t *testing.T) { PubSub: config.PubSubConfig{ ProtocolID: "/dione/1.0", }, - ConsensusMaxFaultNodes: 3, + ConsensusMinApprovals: 3, } var nodes []*Node @@ -45,7 +45,7 @@ func TestConsensus(t *testing.T) { nodes = append(nodes, node) } - time.Sleep(5*time.Second) + time.Sleep(5 * time.Second) var wg sync.WaitGroup diff --git a/pb/pubsub_router.go b/pb/pubsub_router.go index 2367807..d29491f 100644 --- a/pb/pubsub_router.go +++ b/pb/pubsub_router.go @@ -16,7 +16,7 @@ type PubSubRouter struct { Pubsub *pubsub.PubSub context context.Context contextCancel context.CancelFunc - handlers map[string][]Handler + handlers map[models.MessageType][]Handler oracleTopic string } @@ -27,10 +27,10 @@ func NewPubSubRouter(h host.Host, oracleTopic string) *PubSubRouter { node: h, context: ctx, contextCancel: ctxCancel, - handlers: make(map[string][]Handler), + handlers: make(map[models.MessageType][]Handler), } - pb, err := pubsub.NewGossipSub( + pb, err := pubsub.NewFloodSub( context.TODO(), psr.node, //pubsub.WithMessageSigning(true), //pubsub.WithStrictSignatureVerification(true), @@ -74,7 +74,6 @@ func (psr *PubSubRouter) handleMessage(p *pubsub.Message) { } // We can receive our own messages when sending to the topic. So we should drop them. if senderPeerID == psr.node.ID() { - logrus.Debug("Drop message because it came from the current node - a bug (or feature) in the pubsub system") return } var message models.Message @@ -83,10 +82,10 @@ func (psr *PubSubRouter) handleMessage(p *pubsub.Message) { logrus.Warn("Unable to decode message data! " + err.Error()) return } - message.From = senderPeerID.String() + message.From = senderPeerID handlers, ok := psr.handlers[message.Type] if !ok { - logrus.Warn("Dropping message " + message.Type + " because we don't have any handlers!") + logrus.Warn("Dropping message " + string(message.Type) + " because we don't have any handlers!") return } for _, v := range handlers { @@ -94,7 +93,7 @@ func (psr *PubSubRouter) handleMessage(p *pubsub.Message) { } } -func (psr *PubSubRouter) Hook(messageType string, handler Handler) { +func (psr *PubSubRouter) Hook(messageType models.MessageType, handler Handler) { handlers, ok := psr.handlers[messageType] if !ok { emptyArray := []Handler{} diff --git a/rpc/ethereum/ethereum.go b/rpc/ethereum/ethereum.go new file mode 100644 index 0000000..399bb9f --- /dev/null +++ b/rpc/ethereum/ethereum.go @@ -0,0 +1,35 @@ +package ethereum + +import ( + "context" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" +) + +type EthereumRPCClient struct { + client *ethclient.Client +} + +func NewEthereumRPCClient(url string) (*EthereumRPCClient, error) { + client, err := ethclient.Dial(url) + if err != nil { + return nil, err + } + return &EthereumRPCClient{ + client: client, + }, nil +} + +func (erc *EthereumRPCClient) GetTransaction(txHash string) ([]byte, error) { + txHHash := common.HexToHash(txHash) + tx, _, err := erc.client.TransactionByHash(context.TODO(), txHHash) + if err != nil { + return nil, err + } + txRaw, err := tx.MarshalJSON() + if err != nil { + return nil, err + } + return txRaw, nil +} diff --git a/rpc/filecoin.go b/rpc/filecoin/filecoin.go similarity index 73% rename from rpc/filecoin.go rename to rpc/filecoin/filecoin.go index 49da63e..833a80d 100644 --- a/rpc/filecoin.go +++ b/rpc/filecoin/filecoin.go @@ -1,9 +1,11 @@ -package rpc +package filecoin import ( "encoding/json" "fmt" + "github.com/Secured-Finance/dione/rpc/types" + "github.com/Secured-Finance/dione/lib" "github.com/sirupsen/logrus" "github.com/valyala/fasthttp" @@ -11,29 +13,29 @@ import ( var filecoinURL = "https://filecoin.infura.io/" -// client implements the `Client` interface. type LotusClient struct { host string projectID string projectSecret string + httpClient *fasthttp.Client } -// NewClient returns a new client. func NewLotusClient(pID, secret string) *LotusClient { return &LotusClient{ host: filecoinURL, projectID: pID, projectSecret: secret, + httpClient: &fasthttp.Client{}, } } -func (c *LotusClient) GetMessage(txHash string) (*fasthttp.Response, error) { +func (c *LotusClient) GetTransaction(txHash string) ([]byte, error) { req := fasthttp.AcquireRequest() req.SetRequestURI(c.host) req.Header.SetMethod("POST") req.Header.SetContentType("application/json") req.Header.Set("Authorization", "Basic "+lib.BasicAuth(c.projectID, c.projectSecret)) - requestBody := NewRequestBody("Filecoin.ChainGetMessage") + requestBody := types.NewRPCRequestBody("Filecoin.ChainGetMessage") requestBody.Params = append(requestBody.Params, txHash) body, err := json.Marshal(requestBody) if err != nil { @@ -41,12 +43,11 @@ func (c *LotusClient) GetMessage(txHash string) (*fasthttp.Response, error) { } req.AppendBody(body) resp := fasthttp.AcquireResponse() - client := &fasthttp.Client{} - if err = client.Do(req, resp); err != nil { + if err = c.httpClient.Do(req, resp); err != nil { logrus.Warn("Failed to construct filecoin node rpc request", err) return nil, err } bodyBytes := resp.Body() - logrus.Info(string(bodyBytes)) - return resp, nil + logrus.Debug(string(bodyBytes)) + return bodyBytes, nil } diff --git a/rpc/rpc.go b/rpc/rpc.go index ff273b0..717b7b4 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -1,24 +1,5 @@ package rpc -import "net/http" - -type RequestBody struct { - Jsonrpc string `json:"jsonrpc"` - Method string `json:"method"` - Params []interface{} `json:"params"` - ID int `json:"id"` -} - -func NewRequestBody(method string) *RequestBody { - var i []interface{} - return &RequestBody{ - Jsonrpc: "2.0", - Method: method, - Params: i, - ID: 0, - } -} - -type Client interface { - HandleRequest(r *http.Request, data []byte) (*http.Response, error) +type RPCClient interface { + GetTransaction(txHash string) ([]byte, error) } diff --git a/solana/client.go b/rpc/solana/solana.go similarity index 81% rename from solana/client.go rename to rpc/solana/solana.go index 6bb9c87..cf9b0c1 100644 --- a/solana/client.go +++ b/rpc/solana/solana.go @@ -5,8 +5,9 @@ import ( "fmt" "log" - "github.com/Secured-Finance/dione/rpc" - "github.com/Secured-Finance/dione/solana/types" + "github.com/Secured-Finance/dione/rpc/types" + + stypes "github.com/Secured-Finance/dione/rpc/solana/types" ws "github.com/dgrr/fastws" "github.com/shengdoushi/base58" "github.com/sirupsen/logrus" @@ -38,13 +39,13 @@ func NewSolanaClient() *SolanaClient { } } -func (c *SolanaClient) GetTransaction(txHash string) (*fasthttp.Response, error) { +func (c *SolanaClient) GetTransaction(txHash string) ([]byte, error) { req := fasthttp.AcquireRequest() req.SetRequestURI(c.url) req.Header.SetMethod("POST") req.Header.SetContentType("application/json") - requestBody := rpc.NewRequestBody("getConfirmedTransaction") - requestBody.Params = append(requestBody.Params, txHash, "base58") + requestBody := types.NewRPCRequestBody("getConfirmedTransaction") + requestBody.Params = append(requestBody.Params, txHash, "json") body, err := json.Marshal(requestBody) if err != nil { return nil, fmt.Errorf("Failed to marshal request body %v", err) @@ -58,17 +59,17 @@ func (c *SolanaClient) GetTransaction(txHash string) (*fasthttp.Response, error) } bodyBytes := resp.Body() logrus.Info(string(bodyBytes)) - return resp, nil + return bodyBytes, nil } -func (c *SolanaClient) subsctibeOnProgram(programID string) { +func (c *SolanaClient) subscribeOnProgram(programID string) { conn, err := ws.Dial(c.ws) if err != nil { log.Fatalln("Can't establish connection with Solana websocket: ", err) } defer conn.Close() - requestBody := rpc.NewRequestBody("programSubscribe") + requestBody := types.NewRPCRequestBody("programSubscribe") requestBody.Params = append(requestBody.Params, programID) p := NewSubParam("jsonParsed") requestBody.Params = append(requestBody.Params, p) @@ -86,7 +87,7 @@ func (c *SolanaClient) subsctibeOnProgram(programID string) { logrus.Info("Subscription ID to drop websocket connection:", subscriptionID) var msg []byte - var parsedSub *types.Subscription + var parsedSub *stypes.Subscription for { _, msg, err = conn.ReadMessage(msg[:0]) if err != nil { diff --git a/solana/types/program.go b/rpc/solana/types/program.go similarity index 100% rename from solana/types/program.go rename to rpc/solana/types/program.go diff --git a/solana/types/subscription.go b/rpc/solana/types/subscription.go similarity index 100% rename from solana/types/subscription.go rename to rpc/solana/types/subscription.go diff --git a/rpc/solana/types/transaction.go b/rpc/solana/types/transaction.go new file mode 100644 index 0000000..d64c77b --- /dev/null +++ b/rpc/solana/types/transaction.go @@ -0,0 +1,45 @@ +package types + +type TxResponse struct { + Jsonrpc string `json:"jsonrpc"` + Result TxResult `json:"result"` + ID int `json:"id"` +} + +type TxStatus struct { + Ok interface{} `json:"Ok"` +} +type TxMeta struct { + Err interface{} `json:"err"` + Fee int `json:"fee"` + InnerInstructions []interface{} `json:"innerInstructions"` + LogMessages []interface{} `json:"logMessages"` + PostBalances []interface{} `json:"postBalances"` + PreBalances []interface{} `json:"preBalances"` + Status TxStatus `json:"status"` +} +type TxHeader struct { + NumReadonlySignedAccounts int `json:"numReadonlySignedAccounts"` + NumReadonlyUnsignedAccounts int `json:"numReadonlyUnsignedAccounts"` + NumRequiredSignatures int `json:"numRequiredSignatures"` +} +type TxInstructions struct { + Accounts []int `json:"accounts"` + Data string `json:"data"` + ProgramIDIndex int `json:"programIdIndex"` +} +type Message struct { + AccountKeys []string `json:"accountKeys"` + Header TxHeader `json:"header"` + Instructions []TxInstructions `json:"instructions"` + RecentBlockhash string `json:"recentBlockhash"` +} +type Transaction struct { + Message Message `json:"message"` + Signatures []string `json:"signatures"` +} +type TxResult struct { + Meta TxMeta `json:"meta"` + Slot int `json:"slot"` + Transaction Transaction `json:"transaction"` +} diff --git a/solana/types/error.go b/rpc/types/rpc_error.go similarity index 100% rename from solana/types/error.go rename to rpc/types/rpc_error.go diff --git a/rpc/types/rpc_request_body.go b/rpc/types/rpc_request_body.go new file mode 100644 index 0000000..5fd06f6 --- /dev/null +++ b/rpc/types/rpc_request_body.go @@ -0,0 +1,18 @@ +package types + +type RPCRequestBody struct { + Jsonrpc string `json:"jsonrpc"` + Method string `json:"method"` + Params []interface{} `json:"params"` + ID int `json:"id"` +} + +func NewRPCRequestBody(method string) *RPCRequestBody { + var i []interface{} + return &RPCRequestBody{ + Jsonrpc: "2.0", + Method: method, + Params: i, + ID: 0, + } +} diff --git a/solana/types/response.go b/rpc/types/rpc_response_body.go similarity index 82% rename from solana/types/response.go rename to rpc/types/rpc_response_body.go index 849f8ab..5496273 100644 --- a/solana/types/response.go +++ b/rpc/types/rpc_response_body.go @@ -1,6 +1,6 @@ package types -type SolanaResponse struct { +type RPCResponseBody struct { Jsonrpc string `json:"jsonrpc"` Result []byte `json:"result"` Error Error `json:"error"` diff --git a/sigs/ed25519/ed25519.go b/sigs/ed25519/ed25519.go index 0c12f3e..d4eeb84 100644 --- a/sigs/ed25519/ed25519.go +++ b/sigs/ed25519/ed25519.go @@ -22,13 +22,13 @@ func (ed25519Signer) GenPrivate() ([]byte, error) { } func (ed25519Signer) ToPublic(priv []byte) ([]byte, error) { - privKey := ed25519.NewKeyFromSeed(priv) + var privKey ed25519.PrivateKey = priv pubKey := privKey.Public().(ed25519.PublicKey) return pubKey, nil } func (ed25519Signer) Sign(p []byte, msg []byte) ([]byte, error) { - privKey := ed25519.NewKeyFromSeed(p) + var privKey ed25519.PrivateKey = p return ed25519.Sign(privKey, msg), nil } diff --git a/types/signature.go b/types/signature.go index f40c437..c6ea62a 100644 --- a/types/signature.go +++ b/types/signature.go @@ -17,6 +17,17 @@ const ( SigTypeEd25519 = SigType(iota) ) +func (t SigType) Name() (string, error) { + switch t { + case SigTypeUnknown: + return "unknown", nil + case SigTypeEd25519: + return "ed25519", nil + default: + return "", fmt.Errorf("invalid signature type: %d", t) + } +} + const SignatureMaxLength = 200 type Signature struct { diff --git a/types/task.go b/types/task.go index f419219..53b3a28 100644 --- a/types/task.go +++ b/types/task.go @@ -36,6 +36,31 @@ type DioneTask struct { Signature *Signature DrandRound DrandRound Payload []byte + BlockHash string +} + +func NewDioneTask( + t TaskType, + miner peer.ID, + ticket *Ticket, + electionProof *ElectionProof, + beacon []BeaconEntry, + sig *Signature, + drand DrandRound, + payload []byte, + blockHash string, +) *DioneTask { + return &DioneTask{ + Type: t, + Miner: miner, + Ticket: ticket, + ElectionProof: electionProof, + BeaconEntries: beacon, + Signature: sig, + DrandRound: drand, + Payload: payload, + BlockHash: blockHash, + } } var tasksPerEpoch = NewInt(config.TasksPerEpoch) diff --git a/wallet/wallet.go b/wallet/wallet.go index 7e0d78c..33f6ce7 100644 --- a/wallet/wallet.go +++ b/wallet/wallet.go @@ -7,6 +7,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "github.com/Secured-Finance/dione/sigs" + _ "github.com/Secured-Finance/dione/sigs/ed25519" // enable ed25519 signatures "github.com/Secured-Finance/dione/types" "github.com/filecoin-project/go-address" "github.com/sirupsen/logrus" @@ -107,11 +108,13 @@ func (w *LocalWallet) tryFind(addr peer.ID) (types.KeyInfo, error) { if err != nil { return types.KeyInfo{}, err } + logrus.Info("tAddress: ", tAddress) ki, err = w.keystore.Get(KNamePrefix + tAddress) if err != nil { return types.KeyInfo{}, err } + logrus.Info("ki from tryFind: ", ki) // We found it with the testnet prefix // Add this KeyInfo with the mainnet prefix address string