Implement indexes for blockpool database and auto maintaining of it
This commit is contained in:
parent
6d50f37a12
commit
dde32e11dc
@ -1,6 +1,7 @@
|
|||||||
package pool
|
package pool
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/binary"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
@ -11,18 +12,22 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
DefaultBlockPrefix = "block_"
|
DefaultBlockDataPrefix = "blockdata_"
|
||||||
DefaultBlockHeaderPrefix = "header_"
|
DefaultBlockHeaderPrefix = "header_"
|
||||||
LatestBlockKey = "latest_block"
|
DefaultMetadataIndexName = "metadata"
|
||||||
|
LatestBlockHeightKey = "latest_block_height"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrBlockNotFound = errors.New("block isn't found")
|
ErrBlockNotFound = errors.New("block isn't found")
|
||||||
|
ErrLatestHeightNil = errors.New("latest block height is nil")
|
||||||
)
|
)
|
||||||
|
|
||||||
type BlockPool struct {
|
type BlockPool struct {
|
||||||
dbEnv *lmdb.Env
|
dbEnv *lmdb.Env
|
||||||
db lmdb.DBI
|
db lmdb.DBI
|
||||||
|
metadataIndex *Index
|
||||||
|
heightIndex *Index
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBlockPool(path string) (*BlockPool, error) {
|
func NewBlockPool(path string) (*BlockPool, error) {
|
||||||
@ -57,34 +62,33 @@ func NewBlockPool(path string) (*BlockPool, error) {
|
|||||||
|
|
||||||
pool.db = dbi
|
pool.db = dbi
|
||||||
|
|
||||||
|
// create index instances
|
||||||
|
metadataIndex := NewIndex(DefaultMetadataIndexName, env, dbi)
|
||||||
|
heightIndex := NewIndex("height", env, dbi)
|
||||||
|
pool.metadataIndex = metadataIndex
|
||||||
|
pool.heightIndex = heightIndex
|
||||||
|
|
||||||
return pool, nil
|
return pool, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bp *BlockPool) SetLatestBlock(hash []byte) error {
|
func (bp *BlockPool) SetLatestBlockHeight(height uint64) error {
|
||||||
return bp.dbEnv.Update(func(txn *lmdb.Txn) error {
|
return bp.metadataIndex.PutUint64([]byte(LatestBlockHeightKey), height)
|
||||||
return txn.Put(bp.db, []byte(LatestBlockKey), hash, 0)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bp *BlockPool) GetLatestBlock() ([]byte, error) {
|
func (bp *BlockPool) GetLatestBlockHeight() (uint64, error) {
|
||||||
var hash []byte
|
height, err := bp.metadataIndex.GetUint64([]byte(LatestBlockHeightKey))
|
||||||
err := bp.dbEnv.View(func(txn *lmdb.Txn) error {
|
if err != nil {
|
||||||
data, err := txn.Get(bp.db, []byte(LatestBlockKey))
|
if err == ErrIndexKeyNotFound {
|
||||||
if err != nil {
|
return 0, ErrLatestHeightNil
|
||||||
if lmdb.IsNotFound(err) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
hash = data
|
return 0, err
|
||||||
return nil
|
}
|
||||||
})
|
return height, nil
|
||||||
return hash, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bp *BlockPool) StoreBlock(block *types2.Block) error {
|
func (bp *BlockPool) StoreBlock(block *types2.Block) error {
|
||||||
return bp.dbEnv.Update(func(txn *lmdb.Txn) error {
|
err := bp.dbEnv.Update(func(txn *lmdb.Txn) error {
|
||||||
data, err := cbor.Marshal(block)
|
data, err := cbor.Marshal(block.Data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -93,19 +97,50 @@ func (bp *BlockPool) StoreBlock(block *types2.Block) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
blockHash := hex.EncodeToString(block.Header.Hash)
|
blockHash := hex.EncodeToString(block.Header.Hash)
|
||||||
err = txn.Put(bp.db, []byte(DefaultBlockPrefix+blockHash), data, 0)
|
err = txn.Put(bp.db, []byte(DefaultBlockDataPrefix+blockHash), data, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = txn.Put(bp.db, []byte(DefaultBlockHeaderPrefix+blockHash), headerData, 0) // store header separately for easy fetching
|
err = txn.Put(bp.db, []byte(DefaultBlockHeaderPrefix+blockHash), headerData, 0) // store header separately for easy fetching
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// update index "height -> block hash"
|
||||||
|
var heightBytes []byte
|
||||||
|
binary.LittleEndian.PutUint64(heightBytes, block.Header.Height)
|
||||||
|
err = bp.heightIndex.PutBytes(heightBytes, block.Header.Hash)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// update latest block height
|
||||||
|
height, err := bp.GetLatestBlockHeight()
|
||||||
|
if err != nil && err != ErrLatestHeightNil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err == ErrLatestHeightNil {
|
||||||
|
if err = bp.SetLatestBlockHeight(block.Header.Height); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if block.Header.Height > height {
|
||||||
|
if err = bp.SetLatestBlockHeight(block.Header.Height); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bp *BlockPool) HasBlock(blockHash string) (bool, error) {
|
func (bp *BlockPool) HasBlock(blockHash []byte) (bool, error) {
|
||||||
var blockExists bool
|
var blockExists bool
|
||||||
err := bp.dbEnv.View(func(txn *lmdb.Txn) error {
|
err := bp.dbEnv.View(func(txn *lmdb.Txn) error {
|
||||||
_, err := txn.Get(bp.db, []byte(DefaultBlockPrefix+blockHash)) // try to fetch block header
|
h := hex.EncodeToString(blockHash)
|
||||||
|
_, err := txn.Get(bp.db, []byte(DefaultBlockHeaderPrefix+h)) // try to fetch block header
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if lmdb.IsNotFound(err) {
|
if lmdb.IsNotFound(err) {
|
||||||
blockExists = false
|
blockExists = false
|
||||||
@ -122,29 +157,31 @@ func (bp *BlockPool) HasBlock(blockHash string) (bool, error) {
|
|||||||
return blockExists, nil
|
return blockExists, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bp *BlockPool) FetchBlock(blockHash string) (*types2.Block, error) {
|
func (bp *BlockPool) FetchBlockData(blockHash []byte) ([]*types2.Transaction, error) {
|
||||||
var block types2.Block
|
var data []*types2.Transaction
|
||||||
err := bp.dbEnv.View(func(txn *lmdb.Txn) error {
|
err := bp.dbEnv.View(func(txn *lmdb.Txn) error {
|
||||||
data, err := txn.Get(bp.db, []byte(DefaultBlockPrefix+blockHash))
|
h := hex.EncodeToString(blockHash)
|
||||||
|
blockData, err := txn.Get(bp.db, []byte(DefaultBlockDataPrefix+h))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if lmdb.IsNotFound(err) {
|
if lmdb.IsNotFound(err) {
|
||||||
return ErrBlockNotFound
|
return ErrBlockNotFound
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = cbor.Unmarshal(data, &block)
|
err = cbor.Unmarshal(blockData, data)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &block, nil
|
return data, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bp *BlockPool) FetchBlockHeader(blockHash string) (*types2.BlockHeader, error) {
|
func (bp *BlockPool) FetchBlockHeader(blockHash []byte) (*types2.BlockHeader, error) {
|
||||||
var blockHeader types2.BlockHeader
|
var blockHeader types2.BlockHeader
|
||||||
err := bp.dbEnv.View(func(txn *lmdb.Txn) error {
|
err := bp.dbEnv.View(func(txn *lmdb.Txn) error {
|
||||||
data, err := txn.Get(bp.db, []byte(DefaultBlockHeaderPrefix+blockHash))
|
h := hex.EncodeToString(blockHash)
|
||||||
|
data, err := txn.Get(bp.db, []byte(DefaultBlockHeaderPrefix+h))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if lmdb.IsNotFound(err) {
|
if lmdb.IsNotFound(err) {
|
||||||
return ErrBlockNotFound
|
return ErrBlockNotFound
|
||||||
@ -159,3 +196,36 @@ func (bp *BlockPool) FetchBlockHeader(blockHash string) (*types2.BlockHeader, er
|
|||||||
}
|
}
|
||||||
return &blockHeader, nil
|
return &blockHeader, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (bp *BlockPool) FetchBlock(blockHash []byte) (*types2.Block, error) {
|
||||||
|
var block types2.Block
|
||||||
|
header, err := bp.FetchBlockHeader(blockHash)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
block.Header = header
|
||||||
|
|
||||||
|
data, err := bp.FetchBlockData(blockHash)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
block.Data = data
|
||||||
|
|
||||||
|
return &block, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bp *BlockPool) FetchBlockByHeight(height uint64) (*types2.Block, error) {
|
||||||
|
var heightBytes []byte
|
||||||
|
binary.LittleEndian.PutUint64(heightBytes, height)
|
||||||
|
blockHash, err := bp.heightIndex.GetBytes(heightBytes)
|
||||||
|
if err != nil {
|
||||||
|
if err == ErrIndexKeyNotFound {
|
||||||
|
return nil, ErrBlockNotFound
|
||||||
|
}
|
||||||
|
}
|
||||||
|
block, err := bp.FetchBlock(blockHash)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return block, nil
|
||||||
|
}
|
||||||
|
96
blockchain/pool/index.go
Normal file
96
blockchain/pool/index.go
Normal file
@ -0,0 +1,96 @@
|
|||||||
|
package pool
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/binary"
|
||||||
|
"encoding/hex"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/ledgerwatch/lmdb-go/lmdb"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
DefaultIndexPrefix = "indexes/"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrIndexKeyNotFound = fmt.Errorf("key is not found in the index")
|
||||||
|
)
|
||||||
|
|
||||||
|
type Index struct {
|
||||||
|
name string
|
||||||
|
dbEnv *lmdb.Env
|
||||||
|
db lmdb.DBI
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewIndex(name string, dbEnv *lmdb.Env, db lmdb.DBI) *Index {
|
||||||
|
return &Index{
|
||||||
|
name: name,
|
||||||
|
db: db,
|
||||||
|
dbEnv: dbEnv,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *Index) PutUint64(key []byte, value uint64) error {
|
||||||
|
return i.dbEnv.Update(func(txn *lmdb.Txn) error {
|
||||||
|
var data []byte
|
||||||
|
binary.LittleEndian.PutUint64(data, value)
|
||||||
|
return txn.Put(i.db, i.constructIndexKey(key), data, 0)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *Index) GetUint64(key []byte) (uint64, error) {
|
||||||
|
var num uint64
|
||||||
|
err := i.dbEnv.View(func(txn *lmdb.Txn) error {
|
||||||
|
data, err := txn.Get(i.db, i.constructIndexKey(key))
|
||||||
|
if err != nil {
|
||||||
|
if lmdb.IsNotFound(err) {
|
||||||
|
return ErrIndexKeyNotFound
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
num = binary.LittleEndian.Uint64(data)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return num, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *Index) PutBytes(key []byte, value []byte) error {
|
||||||
|
return i.dbEnv.Update(func(txn *lmdb.Txn) error {
|
||||||
|
return txn.Put(i.db, i.constructIndexKey(key), value, 0)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *Index) GetBytes(key []byte) ([]byte, error) {
|
||||||
|
var data []byte
|
||||||
|
err := i.dbEnv.View(func(txn *lmdb.Txn) error {
|
||||||
|
valueData, err := txn.Get(i.db, i.constructIndexKey(key))
|
||||||
|
if err != nil {
|
||||||
|
if lmdb.IsNotFound(err) {
|
||||||
|
return ErrIndexKeyNotFound
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
data = valueData
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return data, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *Index) Delete(key []byte) error {
|
||||||
|
return i.dbEnv.Update(func(txn *lmdb.Txn) error {
|
||||||
|
return txn.Del(i.db, i.constructIndexKey(key), nil)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *Index) constructIndexKey(key []byte) []byte {
|
||||||
|
k := hex.EncodeToString(key)
|
||||||
|
return []byte(fmt.Sprintf("%s/%s/%s", DefaultIndexPrefix, i.name, k))
|
||||||
|
}
|
@ -17,7 +17,7 @@ type Block struct {
|
|||||||
|
|
||||||
type BlockHeader struct {
|
type BlockHeader struct {
|
||||||
Timestamp int64
|
Timestamp int64
|
||||||
SeqNum uint64
|
Height uint64
|
||||||
Hash []byte
|
Hash []byte
|
||||||
LastHash []byte
|
LastHash []byte
|
||||||
Proposer peer.ID
|
Proposer peer.ID
|
||||||
@ -28,7 +28,7 @@ func GenesisBlock() *Block {
|
|||||||
return &Block{
|
return &Block{
|
||||||
Header: &BlockHeader{
|
Header: &BlockHeader{
|
||||||
Timestamp: 1620845070,
|
Timestamp: 1620845070,
|
||||||
SeqNum: 0,
|
Height: 0,
|
||||||
Hash: []byte("DIMICANDUM"),
|
Hash: []byte("DIMICANDUM"),
|
||||||
},
|
},
|
||||||
Data: []*Transaction{},
|
Data: []*Transaction{},
|
||||||
@ -66,7 +66,7 @@ func CreateBlock(lastBlockHeader *BlockHeader, txs []*Transaction, wallet *walle
|
|||||||
block := &Block{
|
block := &Block{
|
||||||
Header: &BlockHeader{
|
Header: &BlockHeader{
|
||||||
Timestamp: timestamp,
|
Timestamp: timestamp,
|
||||||
SeqNum: lastBlockHeader.SeqNum + 1,
|
Height: lastBlockHeader.Height + 1,
|
||||||
Proposer: proposer,
|
Proposer: proposer,
|
||||||
Signature: s.Data,
|
Signature: s.Data,
|
||||||
Hash: blockHash,
|
Hash: blockHash,
|
||||||
|
Loading…
Reference in New Issue
Block a user