2021-05-26 21:06:46 +00:00
package sync
import (
"bytes"
"context"
2021-07-19 20:19:06 +00:00
"encoding/hex"
2021-06-02 19:45:55 +00:00
"errors"
2021-05-26 21:06:46 +00:00
"fmt"
"strings"
"sync"
2021-07-11 23:23:00 +00:00
"github.com/fxamacker/cbor/v2"
2021-06-11 11:40:32 +00:00
"github.com/asaskevich/EventBus"
"github.com/Secured-Finance/dione/blockchain/utils"
2021-06-04 21:18:06 +00:00
"github.com/Secured-Finance/dione/blockchain"
2021-06-03 21:21:14 +00:00
"github.com/Secured-Finance/dione/pubsub"
2021-06-02 19:45:55 +00:00
"github.com/Secured-Finance/dione/consensus/policy"
2021-05-26 21:06:46 +00:00
"github.com/wealdtech/go-merkletree/keccak256"
"github.com/wealdtech/go-merkletree"
"github.com/sirupsen/logrus"
"github.com/Secured-Finance/dione/node/wire"
"github.com/libp2p/go-libp2p-core/peer"
types2 "github.com/Secured-Finance/dione/blockchain/types"
"github.com/Secured-Finance/dione/blockchain/pool"
gorpc "github.com/libp2p/go-libp2p-gorpc"
)
2021-06-04 21:18:06 +00:00
type SyncManager interface { }
2021-05-26 21:06:46 +00:00
type syncManager struct {
2021-06-04 21:18:06 +00:00
blockpool * blockchain . BlockChain
2021-06-02 19:45:55 +00:00
mempool * pool . Mempool
2021-05-26 21:06:46 +00:00
wg sync . WaitGroup
ctx context . Context
ctxCancelFunc context . CancelFunc
initialSyncCompleted bool
bootstrapPeer peer . ID
rpcClient * gorpc . Client
2021-06-03 21:21:14 +00:00
psb * pubsub . PubSubRouter
2021-06-11 11:40:32 +00:00
bus EventBus . Bus
2021-05-26 21:06:46 +00:00
}
2021-07-19 20:19:06 +00:00
func NewSyncManager ( bus EventBus . Bus , bc * blockchain . BlockChain , mp * pool . Mempool , p2pRPCClient * gorpc . Client , bootstrapPeer peer . ID , psb * pubsub . PubSubRouter ) SyncManager {
2021-05-26 21:06:46 +00:00
ctx , cancelFunc := context . WithCancel ( context . Background ( ) )
2021-06-03 21:21:14 +00:00
sm := & syncManager {
2021-06-11 11:40:32 +00:00
bus : bus ,
2021-07-19 20:19:06 +00:00
blockpool : bc ,
2021-06-02 19:48:56 +00:00
mempool : mp ,
2021-05-26 21:06:46 +00:00
ctx : ctx ,
ctxCancelFunc : cancelFunc ,
initialSyncCompleted : false ,
bootstrapPeer : bootstrapPeer ,
rpcClient : p2pRPCClient ,
2021-06-03 21:21:14 +00:00
psb : psb ,
2021-05-26 21:06:46 +00:00
}
2021-06-03 21:21:14 +00:00
2021-07-11 23:23:00 +00:00
psb . Hook ( pubsub . NewTxMessageType , sm . onNewTransaction )
2021-07-19 20:19:06 +00:00
psb . Hook ( pubsub . NewBlockMessageType , sm . onNewBlock )
2021-06-03 21:21:14 +00:00
2021-06-11 11:40:32 +00:00
go func ( ) {
if err := sm . initialSync ( ) ; err != nil {
logrus . Error ( err )
}
} ( )
2021-06-03 21:21:14 +00:00
return sm
2021-05-26 21:06:46 +00:00
}
2021-06-11 11:40:32 +00:00
func ( sm * syncManager ) initialSync ( ) error {
if err := sm . doInitialBlockPoolSync ( ) ; err != nil {
return err
}
if err := sm . doInitialMempoolSync ( ) ; err != nil {
return err
}
sm . bus . Publish ( "sync:initialSyncCompleted" )
return nil
}
2021-06-02 19:45:55 +00:00
func ( sm * syncManager ) doInitialBlockPoolSync ( ) error {
2021-05-26 21:06:46 +00:00
if sm . initialSyncCompleted {
return nil
}
2021-06-02 19:45:55 +00:00
ourLastHeight , err := sm . blockpool . GetLatestBlockHeight ( )
2021-06-04 21:18:06 +00:00
if err == blockchain . ErrLatestHeightNil {
2021-05-26 21:06:46 +00:00
gBlock := types2 . GenesisBlock ( )
2021-06-02 19:45:55 +00:00
err = sm . blockpool . StoreBlock ( gBlock ) // commit genesis block
2021-05-26 21:06:46 +00:00
if err != nil {
return err
}
}
2021-07-11 00:32:58 +00:00
if sm . bootstrapPeer == "" {
return nil // FIXME
}
2021-05-26 21:06:46 +00:00
var reply wire . LastBlockHeightReply
err = sm . rpcClient . Call ( sm . bootstrapPeer , "NetworkService" , "LastBlockHeight" , nil , & reply )
if err != nil {
return err
}
if reply . Height > ourLastHeight {
heightCount := reply . Height - ourLastHeight
var from uint64
to := ourLastHeight
var receivedBlocks [ ] types2 . Block
for heightCount > 0 {
from = to + 1
var addedVal uint64
2021-06-02 19:45:55 +00:00
if heightCount < policy . MaxBlockCountForRetrieving {
2021-05-26 21:06:46 +00:00
addedVal = heightCount
} else {
2021-06-02 19:45:55 +00:00
addedVal = policy . MaxBlockCountForRetrieving
2021-05-26 21:06:46 +00:00
}
heightCount -= addedVal
to += addedVal
2021-06-02 19:45:55 +00:00
var getBlocksReply wire . GetRangeOfBlocksReply
arg := wire . GetRangeOfBlocksArg { From : from , To : to }
err = sm . rpcClient . Call ( sm . bootstrapPeer , "NetworkService" , "GetRangeOfBlocks" , arg , & getBlocksReply )
2021-05-26 21:06:46 +00:00
if err != nil {
return err
}
receivedBlocks = append ( receivedBlocks , getBlocksReply . Blocks ... )
if len ( getBlocksReply . FailedBlockHeights ) != 0 {
logrus . Warnf ( "remote node is unable to retrieve block heights: %s" , strings . Trim ( strings . Join ( strings . Fields ( fmt . Sprint ( getBlocksReply . FailedBlockHeights ) ) , ", " ) , "[]" ) )
// FIXME we definitely need to handle it, because in that case our chain isn't complete!
}
}
for _ , b := range receivedBlocks {
err := sm . processReceivedBlock ( b ) // it should process the block synchronously
if err != nil {
logrus . Warnf ( "unable to process block %d: %s" , b . Header . Height , err . Error ( ) )
continue
}
}
} else {
// FIXME probably we need to pick up better peer for syncing, because chain of current peer can be out-of-date as well
}
return nil
}
2021-06-02 19:45:55 +00:00
func ( sm * syncManager ) doInitialMempoolSync ( ) error {
2021-07-11 00:32:58 +00:00
if sm . bootstrapPeer == "" {
return nil // FIXME
}
2021-06-02 19:45:55 +00:00
var reply wire . InvMessage
err := sm . rpcClient . Call ( sm . bootstrapPeer , "NetworkService" , "Mempool" , nil , & reply )
if err != nil {
return err
}
var txsToRetrieve [ ] [ ] byte
for _ , v := range reply . Inventory {
_ , err = sm . mempool . GetTransaction ( v . Hash )
if errors . Is ( err , pool . ErrTxNotFound ) {
txsToRetrieve = append ( txsToRetrieve , v . Hash )
}
}
for {
var txHashes [ ] [ ] byte
if len ( txsToRetrieve ) == 0 {
break
}
if len ( txsToRetrieve ) > policy . MaxTransactionCountForRetrieving {
txHashes = txsToRetrieve [ : policy . MaxTransactionCountForRetrieving ]
txsToRetrieve = txsToRetrieve [ policy . MaxTransactionCountForRetrieving : ]
} else {
txHashes = txsToRetrieve
}
getMempoolTxArg := wire . GetMempoolTxsArg {
Items : txHashes ,
}
var getMempoolTxReply wire . GetMempoolTxsReply
err := sm . rpcClient . Call ( sm . bootstrapPeer , "NetworkService" , "GetMempoolTxs" , getMempoolTxArg , & getMempoolTxReply )
if err != nil {
return err
}
for _ , v := range getMempoolTxReply . Transactions {
err := sm . mempool . StoreTx ( & v )
if err != nil {
logrus . Warnf ( err . Error ( ) )
}
}
2021-06-11 11:40:32 +00:00
// TODO handle not found transactions
2021-06-02 19:45:55 +00:00
}
return nil
}
2021-05-26 21:06:46 +00:00
func ( sm * syncManager ) processReceivedBlock ( block types2 . Block ) error {
// validate block
2021-06-02 19:45:55 +00:00
previousBlockHeader , err := sm . blockpool . FetchBlockHeaderByHeight ( block . Header . Height - 1 )
2021-05-26 21:06:46 +00:00
if err != nil {
return fmt . Errorf ( "failed to retrieve previous block %d" , block . Header . Height - 1 )
}
if bytes . Compare ( block . Header . LastHash , previousBlockHeader . Hash ) != 0 {
return fmt . Errorf ( "block header has invalid last block hash" )
}
verified , err := merkletree . VerifyProofUsing ( previousBlockHeader . Hash , false , block . Header . LastHashProof , [ ] [ ] byte { block . Header . Hash } , keccak256 . New ( ) )
if err != nil {
2021-07-19 20:19:06 +00:00
return fmt . Errorf ( "failed to verify last block hash merkle proof: %w" , err )
2021-05-26 21:06:46 +00:00
}
if ! verified {
return fmt . Errorf ( "merkle hash of current block doesn't contain hash of previous block" )
}
// check if hashes of block transactions are present in the block hash merkle tree
for _ , tx := range block . Data { // FIXME we need to do something with rejected txs
2021-06-11 11:40:32 +00:00
if err := utils . VerifyTx ( block . Header , tx ) ; err != nil {
return err
2021-05-26 21:29:37 +00:00
}
2021-05-26 21:06:46 +00:00
}
2021-06-02 19:45:55 +00:00
err = sm . blockpool . StoreBlock ( & block )
2021-05-26 21:06:46 +00:00
if err != nil {
return fmt . Errorf ( "failed to store block in blockpool: %s" , err . Error ( ) )
}
return nil
}
2021-07-11 23:23:00 +00:00
func ( sm * syncManager ) onNewTransaction ( message * pubsub . PubSubMessage ) {
var tx types2 . Transaction
err := cbor . Unmarshal ( message . Payload , & tx )
if err != nil {
logrus . Errorf ( "failed to convert payload to transaction: %s" , err . Error ( ) )
2021-06-03 21:21:14 +00:00
return
}
2021-07-19 20:19:06 +00:00
// TODO add more checks on tx
2021-06-03 21:21:14 +00:00
if ! tx . ValidateHash ( ) {
2021-07-19 20:19:06 +00:00
logrus . WithField ( "txHash" , hex . EncodeToString ( tx . Hash ) ) . Warn ( "failed to validate transaction hash, rejecting it" )
2021-06-03 21:21:14 +00:00
return
2021-07-19 20:19:06 +00:00
}
2021-06-03 21:21:14 +00:00
2021-07-11 23:23:00 +00:00
err = sm . mempool . StoreTx ( & tx )
2021-06-03 21:21:14 +00:00
if err != nil {
logrus . Warnf ( "failed to store incoming transaction in mempool: %s" , err . Error ( ) )
}
}
2021-07-19 20:19:06 +00:00
func ( sm * syncManager ) onNewBlock ( message * pubsub . PubSubMessage ) {
var block types2 . Block
err := cbor . Unmarshal ( message . Payload , & block )
if err != nil {
logrus . WithField ( "err" , err . Error ( ) ) . Error ( "failed to unmarshal payload of NewBlock message" )
return
}
err = sm . blockpool . StoreBlock ( & block )
if err != nil {
logrus . WithField ( "err" , err . Error ( ) ) . Error ( "failed to store block from NewBlock message" )
return
}
}