From f074007c75271b06f89384c4fb43e75786b36778 Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Wed, 25 Aug 2021 00:02:37 +0300 Subject: [PATCH] Refactor blockchain database for modularity --- blockchain/blockchain.go | 225 ++--------------- blockchain/database/database.go | 25 ++ blockchain/database/lmdb/database.go | 239 +++++++++++++++++++ blockchain/{utils => database/lmdb}/index.go | 2 +- blockchain/database/memory/database.go | 90 +++++++ config/config.go | 10 +- node/node.go | 3 +- node/node_dep_providers.go | 45 ++-- 8 files changed, 413 insertions(+), 226 deletions(-) create mode 100644 blockchain/database/database.go create mode 100644 blockchain/database/lmdb/database.go rename blockchain/{utils => database/lmdb}/index.go (99%) create mode 100644 blockchain/database/memory/database.go diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index 86dbc3d..1974fb6 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -3,13 +3,11 @@ package blockchain import ( "bytes" "context" - "encoding/binary" - "encoding/hex" - "errors" "fmt" - "os" "sync" + "github.com/Secured-Finance/dione/blockchain/database" + drand2 "github.com/Secured-Finance/dione/beacon/drand" "github.com/Secured-Finance/dione/beacon" @@ -26,105 +24,30 @@ import ( types2 "github.com/Secured-Finance/dione/blockchain/types" "github.com/fxamacker/cbor/v2" - - "github.com/ledgerwatch/lmdb-go/lmdb" -) - -const ( - DefaultBlockDataPrefix = "blockdata_" - DefaultBlockHeaderPrefix = "header_" - DefaultMetadataIndexName = "metadata" - LatestBlockHeightKey = "latest_block_height" -) - -var ( - ErrBlockNotFound = errors.New("block isn't found") - ErrLatestHeightNil = errors.New("latest block height is nil") ) type BlockChain struct { - // db-related - dbEnv *lmdb.Env - db lmdb.DBI - metadataIndex *utils.Index - heightIndex *utils.Index - + db database.Database bus EventBus.Bus miner *Miner drandBeacon *drand2.DrandBeacon } -func NewBlockChain(path string, bus EventBus.Bus, miner *Miner, db *drand2.DrandBeacon) (*BlockChain, error) { +func NewBlockChain(db database.Database, bus EventBus.Bus, miner *Miner, drand *drand2.DrandBeacon) (*BlockChain, error) { chain := &BlockChain{ + db: db, bus: bus, miner: miner, - drandBeacon: db, + drandBeacon: drand, } - // configure lmdb env - env, err := lmdb.NewEnv() - if err != nil { - return nil, err - } - - err = env.SetMaxDBs(1) - if err != nil { - return nil, err - } - err = env.SetMapSize(100 * 1024 * 1024 * 1024) // 100 GB - if err != nil { - return nil, err - } - - err = os.MkdirAll(path, 0755) - if err != nil { - return nil, err - } - - err = env.Open(path, 0, 0755) - if err != nil { - return nil, err - } - - chain.dbEnv = env - - var dbi lmdb.DBI - err = env.Update(func(txn *lmdb.Txn) error { - dbi, err = txn.OpenDBI("blocks", lmdb.Create) - return err - }) - if err != nil { - return nil, err - } - - chain.db = dbi - - // create index instances - metadataIndex := utils.NewIndex(DefaultMetadataIndexName, env, dbi) - heightIndex := utils.NewIndex("height", env, dbi) - chain.metadataIndex = metadataIndex - chain.heightIndex = heightIndex + logrus.Info("Blockchain has been successfully initialized!") return chain, nil } -func (bc *BlockChain) setLatestBlockHeight(height uint64) error { - err := bc.metadataIndex.PutUint64([]byte(LatestBlockHeightKey), height) - if err != nil { - return err - } - return nil -} - func (bc *BlockChain) GetLatestBlockHeight() (uint64, error) { - height, err := bc.metadataIndex.GetUint64([]byte(LatestBlockHeightKey)) - if err != nil { - if err == utils.ErrIndexKeyNotFound { - return 0, ErrLatestHeightNil - } - return 0, err - } - return height, nil + return bc.db.GetLatestBlockHeight() } func (bc *BlockChain) StoreBlock(block *types2.Block) error { @@ -142,43 +65,18 @@ func (bc *BlockChain) StoreBlock(block *types2.Block) error { } } - err := bc.dbEnv.Update(func(txn *lmdb.Txn) error { - data, err := cbor.Marshal(block.Data) - if err != nil { - return err - } - headerData, err := cbor.Marshal(block.Header) - if err != nil { - return err - } - blockHash := hex.EncodeToString(block.Header.Hash) - err = txn.Put(bc.db, []byte(DefaultBlockDataPrefix+blockHash), data, 0) - if err != nil { - return err - } - err = txn.Put(bc.db, []byte(DefaultBlockHeaderPrefix+blockHash), headerData, 0) // store header separately for easy fetching - return err - }) - if err != nil { - return err - } - - // update index "height -> block hash" - heightBytes := make([]byte, 8) - binary.LittleEndian.PutUint64(heightBytes, block.Header.Height) - err = bc.heightIndex.PutBytes(heightBytes, block.Header.Hash) - if err != nil { + if err := bc.db.StoreBlock(block); err != nil { return err } // update latest block height height, err := bc.GetLatestBlockHeight() - if err != nil && err != ErrLatestHeightNil { + if err != nil && err != database.ErrLatestHeightNil { return err } - if err == ErrLatestHeightNil || block.Header.Height > height { - if err = bc.setLatestBlockHeight(block.Header.Height); err != nil { + if err == database.ErrLatestHeightNil || block.Header.Height > height { + if err = bc.db.SetLatestBlockHeight(block.Header.Height); err != nil { return err } bc.bus.Publish("blockchain:latestBlockHeightUpdated", block) @@ -188,114 +86,27 @@ func (bc *BlockChain) StoreBlock(block *types2.Block) error { } func (bc *BlockChain) HasBlock(blockHash []byte) (bool, error) { - var blockExists bool - err := bc.dbEnv.View(func(txn *lmdb.Txn) error { - h := hex.EncodeToString(blockHash) - _, err := txn.Get(bc.db, []byte(DefaultBlockHeaderPrefix+h)) // try to fetch block header - if err != nil { - if lmdb.IsNotFound(err) { - blockExists = false - return nil - } - return err - } - blockExists = true - return nil - }) - if err != nil { - return false, err - } - return blockExists, nil + return bc.db.HasBlock(blockHash) } func (bc *BlockChain) FetchBlockData(blockHash []byte) ([]*types2.Transaction, error) { - var data []*types2.Transaction - err := bc.dbEnv.View(func(txn *lmdb.Txn) error { - h := hex.EncodeToString(blockHash) - blockData, err := txn.Get(bc.db, []byte(DefaultBlockDataPrefix+h)) - if err != nil { - if lmdb.IsNotFound(err) { - return ErrBlockNotFound - } - return err - } - err = cbor.Unmarshal(blockData, &data) - return err - }) - if err != nil { - return nil, err - } - - return data, nil + return bc.db.FetchBlockData(blockHash) } func (bc *BlockChain) FetchBlockHeader(blockHash []byte) (*types2.BlockHeader, error) { - var blockHeader types2.BlockHeader - err := bc.dbEnv.View(func(txn *lmdb.Txn) error { - h := hex.EncodeToString(blockHash) - data, err := txn.Get(bc.db, []byte(DefaultBlockHeaderPrefix+h)) - if err != nil { - if lmdb.IsNotFound(err) { - return ErrBlockNotFound - } - return err - } - err = cbor.Unmarshal(data, &blockHeader) - return err - }) - if err != nil { - return nil, err - } - return &blockHeader, nil + return bc.db.FetchBlockHeader(blockHash) } func (bc *BlockChain) FetchBlock(blockHash []byte) (*types2.Block, error) { - var block types2.Block - header, err := bc.FetchBlockHeader(blockHash) - if err != nil { - return nil, err - } - block.Header = header - - data, err := bc.FetchBlockData(blockHash) - if err != nil { - return nil, err - } - block.Data = data - - return &block, nil + return bc.db.FetchBlock(blockHash) } func (bc *BlockChain) FetchBlockByHeight(height uint64) (*types2.Block, error) { - var heightBytes = make([]byte, 8) - binary.LittleEndian.PutUint64(heightBytes, height) - blockHash, err := bc.heightIndex.GetBytes(heightBytes) - if err != nil { - if err == utils.ErrIndexKeyNotFound { - return nil, ErrBlockNotFound - } - } - block, err := bc.FetchBlock(blockHash) - if err != nil { - return nil, err - } - return block, nil + return bc.db.FetchBlockByHeight(height) } func (bc *BlockChain) FetchBlockHeaderByHeight(height uint64) (*types2.BlockHeader, error) { - var heightBytes = make([]byte, 8) - binary.LittleEndian.PutUint64(heightBytes, height) - blockHash, err := bc.heightIndex.GetBytes(heightBytes) - if err != nil { - if err == utils.ErrIndexKeyNotFound { - return nil, ErrBlockNotFound - } - } - blockHeader, err := bc.FetchBlockHeader(blockHash) - if err != nil { - return nil, err - } - return blockHeader, nil + return bc.db.FetchBlockHeaderByHeight(height) } func (bc *BlockChain) ValidateBlock(block *types2.Block) error { diff --git a/blockchain/database/database.go b/blockchain/database/database.go new file mode 100644 index 0000000..919c1d4 --- /dev/null +++ b/blockchain/database/database.go @@ -0,0 +1,25 @@ +package database + +import ( + "errors" + + "github.com/Secured-Finance/dione/blockchain/types" + types2 "github.com/Secured-Finance/dione/blockchain/types" +) + +var ( + ErrBlockNotFound = errors.New("block isn't found") + ErrLatestHeightNil = errors.New("latest block height is nil") +) + +type Database interface { + StoreBlock(block *types.Block) error + HasBlock(blockhash []byte) (bool, error) + FetchBlockData(blockHash []byte) ([]*types2.Transaction, error) + FetchBlockHeader(blockHash []byte) (*types2.BlockHeader, error) + FetchBlock(blockHash []byte) (*types2.Block, error) + FetchBlockByHeight(height uint64) (*types2.Block, error) + FetchBlockHeaderByHeight(height uint64) (*types2.BlockHeader, error) + GetLatestBlockHeight() (uint64, error) + SetLatestBlockHeight(height uint64) error +} diff --git a/blockchain/database/lmdb/database.go b/blockchain/database/lmdb/database.go new file mode 100644 index 0000000..490040e --- /dev/null +++ b/blockchain/database/lmdb/database.go @@ -0,0 +1,239 @@ +package lmdb + +import ( + "encoding/binary" + "encoding/hex" + "os" + + "github.com/Secured-Finance/dione/blockchain/database" + types2 "github.com/Secured-Finance/dione/blockchain/types" + "github.com/fxamacker/cbor/v2" + "github.com/ledgerwatch/lmdb-go/lmdb" +) + +const ( + DefaultBlockDataPrefix = "blockdata_" + DefaultBlockHeaderPrefix = "header_" + DefaultMetadataIndexName = "metadata" + LatestBlockHeightKey = "latest_block_height" +) + +type Database struct { + dbEnv *lmdb.Env + db lmdb.DBI + metadataIndex *Index + heightIndex *Index +} + +func NewDatabase(path string) (*Database, error) { + db := &Database{} + + // configure lmdb env + env, err := lmdb.NewEnv() + if err != nil { + return nil, err + } + + err = env.SetMaxDBs(1) + if err != nil { + return nil, err + } + err = env.SetMapSize(100 * 1024 * 1024 * 1024) // 100 GB + if err != nil { + return nil, err + } + + err = os.MkdirAll(path, 0755) + if err != nil { + return nil, err + } + + err = env.Open(path, 0, 0755) + if err != nil { + return nil, err + } + + db.dbEnv = env + + var dbi lmdb.DBI + err = env.Update(func(txn *lmdb.Txn) error { + dbi, err = txn.OpenDBI("blocks", lmdb.Create) + return err + }) + if err != nil { + return nil, err + } + + db.db = dbi + + // create index instances + metadataIndex := NewIndex(DefaultMetadataIndexName, env, dbi) + heightIndex := NewIndex("height", env, dbi) + db.metadataIndex = metadataIndex + db.heightIndex = heightIndex + + return db, nil +} + +func (d *Database) StoreBlock(block *types2.Block) error { + err := d.dbEnv.Update(func(txn *lmdb.Txn) error { + data, err := cbor.Marshal(block.Data) + if err != nil { + return err + } + headerData, err := cbor.Marshal(block.Header) + if err != nil { + return err + } + blockHash := hex.EncodeToString(block.Header.Hash) + err = txn.Put(d.db, []byte(DefaultBlockDataPrefix+blockHash), data, 0) + if err != nil { + return err + } + err = txn.Put(d.db, []byte(DefaultBlockHeaderPrefix+blockHash), headerData, 0) // store header separately for easy fetching + return err + }) + if err != nil { + return err + } + + // update index "height -> block hash" + heightBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(heightBytes, block.Header.Height) + err = d.heightIndex.PutBytes(heightBytes, block.Header.Hash) + if err != nil { + return err + } + + return nil +} + +func (d *Database) HasBlock(blockHash []byte) (bool, error) { + var blockExists bool + err := d.dbEnv.View(func(txn *lmdb.Txn) error { + h := hex.EncodeToString(blockHash) + _, err := txn.Get(d.db, []byte(DefaultBlockHeaderPrefix+h)) // try to fetch block header + if err != nil { + if lmdb.IsNotFound(err) { + blockExists = false + return nil + } + return err + } + blockExists = true + return nil + }) + if err != nil { + return false, err + } + return blockExists, nil +} + +func (d *Database) FetchBlockData(blockHash []byte) ([]*types2.Transaction, error) { + var data []*types2.Transaction + err := d.dbEnv.View(func(txn *lmdb.Txn) error { + h := hex.EncodeToString(blockHash) + blockData, err := txn.Get(d.db, []byte(DefaultBlockDataPrefix+h)) + if err != nil { + if lmdb.IsNotFound(err) { + return database.ErrBlockNotFound + } + return err + } + err = cbor.Unmarshal(blockData, &data) + return err + }) + if err != nil { + return nil, err + } + + return data, nil +} + +func (d *Database) FetchBlockHeader(blockHash []byte) (*types2.BlockHeader, error) { + var blockHeader types2.BlockHeader + err := d.dbEnv.View(func(txn *lmdb.Txn) error { + h := hex.EncodeToString(blockHash) + data, err := txn.Get(d.db, []byte(DefaultBlockHeaderPrefix+h)) + if err != nil { + if lmdb.IsNotFound(err) { + return database.ErrBlockNotFound + } + return err + } + err = cbor.Unmarshal(data, &blockHeader) + return err + }) + if err != nil { + return nil, err + } + return &blockHeader, nil +} + +func (d *Database) FetchBlock(blockHash []byte) (*types2.Block, error) { + var block types2.Block + header, err := d.FetchBlockHeader(blockHash) + if err != nil { + return nil, err + } + block.Header = header + + data, err := d.FetchBlockData(blockHash) + if err != nil { + return nil, err + } + block.Data = data + + return &block, nil +} + +func (d *Database) FetchBlockByHeight(height uint64) (*types2.Block, error) { + var heightBytes = make([]byte, 8) + binary.LittleEndian.PutUint64(heightBytes, height) + blockHash, err := d.heightIndex.GetBytes(heightBytes) + if err != nil { + if err == ErrIndexKeyNotFound { + return nil, database.ErrBlockNotFound + } + } + block, err := d.FetchBlock(blockHash) + if err != nil { + return nil, err + } + return block, nil +} + +func (d *Database) FetchBlockHeaderByHeight(height uint64) (*types2.BlockHeader, error) { + var heightBytes = make([]byte, 8) + binary.LittleEndian.PutUint64(heightBytes, height) + blockHash, err := d.heightIndex.GetBytes(heightBytes) + if err != nil { + if err == ErrIndexKeyNotFound { + return nil, database.ErrBlockNotFound + } + } + blockHeader, err := d.FetchBlockHeader(blockHash) + if err != nil { + return nil, err + } + return blockHeader, nil +} + +func (d *Database) GetLatestBlockHeight() (uint64, error) { + height, err := d.metadataIndex.GetUint64([]byte(LatestBlockHeightKey)) + if err != nil { + if err == ErrIndexKeyNotFound { + return 0, database.ErrLatestHeightNil + } + return 0, err + } + return height, nil +} + +func (d *Database) SetLatestBlockHeight(height uint64) error { + err := d.metadataIndex.PutUint64([]byte(LatestBlockHeightKey), height) + if err != nil { + return err + } + return nil +} diff --git a/blockchain/utils/index.go b/blockchain/database/lmdb/index.go similarity index 99% rename from blockchain/utils/index.go rename to blockchain/database/lmdb/index.go index 24a7896..886a012 100644 --- a/blockchain/utils/index.go +++ b/blockchain/database/lmdb/index.go @@ -1,4 +1,4 @@ -package utils +package lmdb import ( "encoding/binary" diff --git a/blockchain/database/memory/database.go b/blockchain/database/memory/database.go new file mode 100644 index 0000000..86f7449 --- /dev/null +++ b/blockchain/database/memory/database.go @@ -0,0 +1,90 @@ +package memory + +import ( + "encoding/hex" + "fmt" + + "github.com/Secured-Finance/dione/blockchain/database" + + types2 "github.com/Secured-Finance/dione/blockchain/types" + "github.com/patrickmn/go-cache" +) + +const ( + LatestBlockHeightKey = "latest_block_height" +) + +type Database struct { + db *cache.Cache +} + +func NewDatabase() *Database { + return &Database{ + db: cache.New(cache.NoExpiration, 0), + } +} + +func (d *Database) StoreBlock(block *types2.Block) error { + h := hex.EncodeToString(block.Header.Hash) + d.db.SetDefault(h, block) + d.db.SetDefault(fmt.Sprintf("height/%d", block.Header.Height), block) + return nil +} + +func (d *Database) HasBlock(blockHash []byte) (bool, error) { + _, ok := d.db.Get(hex.EncodeToString(blockHash)) + return ok, nil +} + +func (d *Database) FetchBlockData(blockHash []byte) ([]*types2.Transaction, error) { + b, err := d.FetchBlock(blockHash) + if err != nil { + return nil, err + } + return b.Data, nil +} + +func (d *Database) FetchBlockHeader(blockHash []byte) (*types2.BlockHeader, error) { + b, err := d.FetchBlock(blockHash) + if err != nil { + return nil, err + } + return b.Header, nil +} + +func (d *Database) FetchBlock(blockHash []byte) (*types2.Block, error) { + b, ok := d.db.Get(hex.EncodeToString(blockHash)) + if !ok { + return nil, database.ErrBlockNotFound + } + return b.(*types2.Block), nil +} + +func (d *Database) FetchBlockByHeight(height uint64) (*types2.Block, error) { + b, ok := d.db.Get(fmt.Sprintf("height/%d", height)) + if !ok { + return nil, database.ErrBlockNotFound + } + return b.(*types2.Block), nil +} + +func (d *Database) FetchBlockHeaderByHeight(height uint64) (*types2.BlockHeader, error) { + b, ok := d.db.Get(fmt.Sprintf("height/%d", height)) + if !ok { + return nil, database.ErrBlockNotFound + } + return b.(*types2.Block).Header, nil +} + +func (d *Database) GetLatestBlockHeight() (uint64, error) { + height, ok := d.db.Get(LatestBlockHeightKey) + if !ok { + return 0, database.ErrLatestHeightNil + } + return height.(uint64), nil +} + +func (d *Database) SetLatestBlockHeight(height uint64) error { + d.db.SetDefault(LatestBlockHeightKey, height) + return nil +} diff --git a/config/config.go b/config/config.go index c07550d..5921e84 100644 --- a/config/config.go +++ b/config/config.go @@ -7,8 +7,11 @@ import ( ) const ( - CacheTypeInMemory = "in-memory" + CacheTypeInMemory = "memory" CacheTypeRedis = "redis" + + BlockchainDatabaseInMemory = "memory" + BlockChainDatabaseLMDB = "lmdb" ) type Config struct { @@ -60,7 +63,10 @@ type RedisConfig struct { } type BlockchainConfig struct { - DatabasePath string `mapstructure:"database_path"` + DatabaseType string `mapstructure:"database_type"` + LMDB struct { + DatabasePath string `mapstructure:"database_path"` + } `mapstructure:"lmdb"` } // NewConfig creates a new config based on default values or provided .env file diff --git a/node/node.go b/node/node.go index 9834942..5e94590 100644 --- a/node/node.go +++ b/node/node.go @@ -202,8 +202,9 @@ func Start() { providePeerDiscovery, drand2.NewDrandBeacon, pool.NewMempool, + provideBlockchainDatabase, blockchain.NewMiner, - provideBlockChain, + blockchain.NewBlockChain, sync.NewSyncManager, provideNetworkRPCHost, NewNetworkService, diff --git a/node/node_dep_providers.go b/node/node_dep_providers.go index 133f7d1..19debc6 100644 --- a/node/node_dep_providers.go +++ b/node/node_dep_providers.go @@ -11,6 +11,12 @@ import ( "path/filepath" "runtime" + "github.com/Secured-Finance/dione/blockchain/database/memory" + + "github.com/Secured-Finance/dione/blockchain/database/lmdb" + + "github.com/Secured-Finance/dione/blockchain/database" + "github.com/Secured-Finance/dione/cache/inmemory" "github.com/Secured-Finance/dione/cache/redis" @@ -24,12 +30,8 @@ import ( "github.com/sirupsen/logrus" - "github.com/asaskevich/EventBus" - "github.com/Secured-Finance/dione/blockchain" - drand2 "github.com/Secured-Finance/dione/beacon/drand" - "github.com/libp2p/go-libp2p-core/protocol" gorpc "github.com/libp2p/go-libp2p-gorpc" @@ -64,6 +66,29 @@ func provideCacheManager(cfg *config.Config) cache.CacheManager { return backend } +func provideBlockchainDatabase(cfg *config.Config) (database.Database, error) { + var db database.Database + switch cfg.Blockchain.DatabaseType { + case config.BlockchainDatabaseInMemory: + db = memory.NewDatabase() + case config.BlockChainDatabaseLMDB: + { + if cfg.Blockchain.LMDB.DatabasePath == "" { + return nil, fmt.Errorf("database path for lmdb database is empty") + } + l, err := lmdb.NewDatabase(cfg.Blockchain.LMDB.DatabasePath) + if err != nil { + return nil, err + } + db = l + } + default: + db = memory.NewDatabase() + } + + return db, nil +} + // FIXME: do we really need this? //func provideWallet(peerID peer.ID, privKey []byte) (*wallet.LocalWallet, error) { // // TODO make persistent keystore @@ -163,16 +188,6 @@ func providePeerDiscovery(baddrs []multiaddr.Multiaddr, h host.Host) discovery.D return pexDiscovery } -func provideBlockChain(config *config.Config, bus EventBus.Bus, miner *blockchain.Miner, db *drand2.DrandBeacon) *blockchain.BlockChain { - bc, err := blockchain.NewBlockChain(config.Blockchain.DatabasePath, bus, miner, db) - if err != nil { - logrus.Fatalf("Failed to initialize blockchain storage: %s", err.Error()) - } - logrus.Info("Blockchain storage has been successfully initialized!") - - return bc -} - func provideDirectRPCClient(h host.Host) *gorpc.Client { return gorpc.NewClient(h, DioneProtocolID) } @@ -303,7 +318,7 @@ func configureMiner(m *blockchain.Miner, b *blockchain.BlockChain) { func initializeBlockchain(bc *blockchain.BlockChain) { _, err := bc.GetLatestBlockHeight() - if err == blockchain.ErrLatestHeightNil { + if err == database.ErrLatestHeightNil { gBlock := types2.GenesisBlock() err = bc.StoreBlock(gBlock) // commit genesis block if err != nil {