301 lines
7.8 KiB
Go
301 lines
7.8 KiB
Go
package sync
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/hex"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/multiformats/go-multiaddr"
|
|
|
|
"github.com/fxamacker/cbor/v2"
|
|
|
|
"github.com/asaskevich/EventBus"
|
|
|
|
"github.com/Secured-Finance/dione/blockchain/utils"
|
|
|
|
"github.com/Secured-Finance/dione/blockchain"
|
|
|
|
"github.com/Secured-Finance/dione/pubsub"
|
|
|
|
"github.com/Secured-Finance/dione/consensus/policy"
|
|
|
|
"github.com/wealdtech/go-merkletree/keccak256"
|
|
|
|
"github.com/wealdtech/go-merkletree"
|
|
|
|
"github.com/sirupsen/logrus"
|
|
|
|
"github.com/Secured-Finance/dione/node/wire"
|
|
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
|
|
|
types2 "github.com/Secured-Finance/dione/blockchain/types"
|
|
|
|
"github.com/Secured-Finance/dione/blockchain/pool"
|
|
gorpc "github.com/libp2p/go-libp2p-gorpc"
|
|
)
|
|
|
|
type SyncManager interface {
|
|
Run()
|
|
}
|
|
|
|
type syncManager struct {
|
|
blockpool *blockchain.BlockChain
|
|
mempool *pool.Mempool
|
|
wg sync.WaitGroup
|
|
ctx context.Context
|
|
ctxCancelFunc context.CancelFunc
|
|
initialSyncCompleted bool
|
|
bootstrapPeer peer.ID
|
|
rpcClient *gorpc.Client
|
|
psb *pubsub.PubSubRouter
|
|
bus EventBus.Bus
|
|
}
|
|
|
|
func NewSyncManager(
|
|
bus EventBus.Bus,
|
|
bc *blockchain.BlockChain,
|
|
mp *pool.Mempool,
|
|
p2pRPCClient *gorpc.Client,
|
|
bootstrapAddresses []multiaddr.Multiaddr,
|
|
psb *pubsub.PubSubRouter,
|
|
) SyncManager {
|
|
ctx, cancelFunc := context.WithCancel(context.Background())
|
|
|
|
bootstrapPeer := peer.ID("")
|
|
|
|
if bootstrapAddresses != nil {
|
|
addr, err := peer.AddrInfoFromP2pAddr(bootstrapAddresses[0]) // FIXME
|
|
if err != nil {
|
|
logrus.Fatal(err)
|
|
}
|
|
bootstrapPeer = addr.ID
|
|
}
|
|
|
|
sm := &syncManager{
|
|
bus: bus,
|
|
blockpool: bc,
|
|
mempool: mp,
|
|
ctx: ctx,
|
|
ctxCancelFunc: cancelFunc,
|
|
initialSyncCompleted: false,
|
|
bootstrapPeer: bootstrapPeer,
|
|
rpcClient: p2pRPCClient,
|
|
psb: psb,
|
|
}
|
|
|
|
logrus.Info("Blockchain sync subsystem has been successfully initialized!")
|
|
|
|
return sm
|
|
}
|
|
|
|
func (sm *syncManager) Run() {
|
|
sm.psb.Hook(pubsub.NewTxMessageType, sm.onNewTransaction)
|
|
sm.psb.Hook(pubsub.NewBlockMessageType, sm.onNewBlock)
|
|
|
|
go func() {
|
|
if err := sm.initialSync(); err != nil {
|
|
logrus.Error(err)
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (sm *syncManager) initialSync() error {
|
|
if err := sm.doInitialBlockPoolSync(); err != nil {
|
|
return err
|
|
}
|
|
if err := sm.doInitialMempoolSync(); err != nil {
|
|
return err
|
|
}
|
|
sm.bus.Publish("sync:initialSyncCompleted")
|
|
return nil
|
|
}
|
|
|
|
func (sm *syncManager) doInitialBlockPoolSync() error {
|
|
if sm.initialSyncCompleted {
|
|
return nil
|
|
}
|
|
|
|
ourLastHeight, _ := sm.blockpool.GetLatestBlockHeight()
|
|
|
|
if sm.bootstrapPeer == "" {
|
|
return nil // FIXME
|
|
}
|
|
|
|
var reply wire.LastBlockHeightReply
|
|
err := sm.rpcClient.Call(sm.bootstrapPeer, "NetworkService", "LastBlockHeight", nil, &reply)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if reply.Height > ourLastHeight {
|
|
heightCount := reply.Height - ourLastHeight
|
|
var from uint64
|
|
to := ourLastHeight
|
|
var receivedBlocks []types2.Block
|
|
for heightCount > 0 {
|
|
from = to + 1
|
|
var addedVal uint64
|
|
if heightCount < policy.MaxBlockCountForRetrieving {
|
|
addedVal = heightCount
|
|
} else {
|
|
addedVal = policy.MaxBlockCountForRetrieving
|
|
}
|
|
heightCount -= addedVal
|
|
to += addedVal
|
|
var getBlocksReply wire.GetRangeOfBlocksReply
|
|
arg := wire.GetRangeOfBlocksArg{From: from, To: to}
|
|
err = sm.rpcClient.Call(sm.bootstrapPeer, "NetworkService", "GetRangeOfBlocks", arg, &getBlocksReply)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
receivedBlocks = append(receivedBlocks, getBlocksReply.Blocks...)
|
|
if len(getBlocksReply.FailedBlockHeights) != 0 {
|
|
logrus.Warnf("remote node is unable to retrieve block heights: %s", strings.Trim(strings.Join(strings.Fields(fmt.Sprint(getBlocksReply.FailedBlockHeights)), ", "), "[]"))
|
|
// FIXME we definitely need to handle it, because in that case our chain isn't complete!
|
|
}
|
|
}
|
|
for _, b := range receivedBlocks {
|
|
err := sm.processReceivedBlock(b) // it should process the block synchronously
|
|
if err != nil {
|
|
logrus.Warnf("unable to process block %d: %s", b.Header.Height, err.Error())
|
|
continue
|
|
}
|
|
}
|
|
} else {
|
|
// FIXME probably we need to pick up better peer for syncing, because chain of current peer can be out-of-date as well
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (sm *syncManager) doInitialMempoolSync() error {
|
|
if sm.bootstrapPeer == "" {
|
|
return nil // FIXME
|
|
}
|
|
|
|
var reply wire.InvMessage
|
|
err := sm.rpcClient.Call(sm.bootstrapPeer, "NetworkService", "Mempool", nil, &reply)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var txsToRetrieve [][]byte
|
|
|
|
for _, v := range reply.Inventory {
|
|
_, err = sm.mempool.GetTransaction(v.Hash)
|
|
if errors.Is(err, pool.ErrTxNotFound) {
|
|
txsToRetrieve = append(txsToRetrieve, v.Hash)
|
|
}
|
|
}
|
|
|
|
for {
|
|
var txHashes [][]byte
|
|
|
|
if len(txsToRetrieve) == 0 {
|
|
break
|
|
}
|
|
|
|
if len(txsToRetrieve) > policy.MaxTransactionCountForRetrieving {
|
|
txHashes = txsToRetrieve[:policy.MaxTransactionCountForRetrieving]
|
|
txsToRetrieve = txsToRetrieve[policy.MaxTransactionCountForRetrieving:]
|
|
} else {
|
|
txHashes = txsToRetrieve
|
|
txsToRetrieve = nil
|
|
}
|
|
|
|
getMempoolTxArg := wire.GetMempoolTxsArg{
|
|
Items: txHashes,
|
|
}
|
|
var getMempoolTxReply wire.GetMempoolTxsReply
|
|
err := sm.rpcClient.Call(sm.bootstrapPeer, "NetworkService", "GetMempoolTxs", getMempoolTxArg, &getMempoolTxReply)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, v := range getMempoolTxReply.Transactions {
|
|
err := sm.mempool.StoreTx(&v)
|
|
if err != nil {
|
|
logrus.Warnf(err.Error())
|
|
}
|
|
}
|
|
// TODO handle not found transactions
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (sm *syncManager) processReceivedBlock(block types2.Block) error {
|
|
// validate block
|
|
previousBlockHeader, err := sm.blockpool.FetchBlockHeaderByHeight(block.Header.Height - 1)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to retrieve previous block %d", block.Header.Height-1)
|
|
}
|
|
if bytes.Compare(block.Header.LastHash, previousBlockHeader.Hash) != 0 {
|
|
return fmt.Errorf("block header has invalid last block hash")
|
|
}
|
|
verified, err := merkletree.VerifyProofUsing(previousBlockHeader.Hash, false, block.Header.LastHashProof, [][]byte{block.Header.Hash}, keccak256.New())
|
|
if err != nil {
|
|
return fmt.Errorf("failed to verify last block hash merkle proof: %w", err)
|
|
}
|
|
if !verified {
|
|
return fmt.Errorf("merkle hash of current block doesn't contain hash of previous block")
|
|
}
|
|
|
|
// check if hashes of block transactions are present in the block hash merkle tree
|
|
for _, tx := range block.Data { // FIXME we need to do something with rejected txs
|
|
if err := utils.VerifyTx(block.Header, tx); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
err = sm.blockpool.StoreBlock(&block)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to store block in blockpool: %s", err.Error())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (sm *syncManager) onNewTransaction(message *pubsub.PubSubMessage) {
|
|
var tx types2.Transaction
|
|
err := cbor.Unmarshal(message.Payload, &tx)
|
|
if err != nil {
|
|
logrus.Errorf("failed to convert payload to transaction: %s", err.Error())
|
|
return
|
|
}
|
|
|
|
// TODO add more checks on tx
|
|
if !tx.ValidateHash() {
|
|
logrus.WithField("txHash", hex.EncodeToString(tx.Hash)).Warn("failed to validate transaction hash, rejecting it")
|
|
return
|
|
}
|
|
|
|
err = sm.mempool.StoreTx(&tx)
|
|
if err != nil {
|
|
logrus.Warnf("failed to store incoming transaction in mempool: %s", err.Error())
|
|
}
|
|
}
|
|
|
|
func (sm *syncManager) onNewBlock(message *pubsub.PubSubMessage) {
|
|
var block types2.Block
|
|
|
|
err := cbor.Unmarshal(message.Payload, &block)
|
|
if err != nil {
|
|
logrus.WithField("err", err.Error()).Error("failed to unmarshal payload of NewBlock message")
|
|
return
|
|
}
|
|
|
|
err = sm.blockpool.StoreBlock(&block)
|
|
if err != nil {
|
|
logrus.WithFields(logrus.Fields{
|
|
"err": err.Error(),
|
|
"blockHash": fmt.Sprintf("%x", block.Header.Hash),
|
|
}).Error("failed to store block from NewBlock message")
|
|
return
|
|
}
|
|
}
|