From 7f85330953dc61c60200c03e8cdb177508eef548 Mon Sep 17 00:00:00 2001 From: ChronosX88 Date: Fri, 14 May 2021 23:32:39 +0300 Subject: [PATCH] Implement basic mempool --- consensus/policy/policy.go | 5 ++ pool/mempool.go | 93 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+) create mode 100644 consensus/policy/policy.go create mode 100644 pool/mempool.go diff --git a/consensus/policy/policy.go b/consensus/policy/policy.go new file mode 100644 index 0000000..28e96f9 --- /dev/null +++ b/consensus/policy/policy.go @@ -0,0 +1,5 @@ +package policy + +const ( + BlockMaxTransactionCount = 100 +) diff --git a/pool/mempool.go b/pool/mempool.go new file mode 100644 index 0000000..a5c8f47 --- /dev/null +++ b/pool/mempool.go @@ -0,0 +1,93 @@ +package pool + +import ( + "encoding/hex" + "sort" + "sync" + "time" + + "github.com/Secured-Finance/dione/consensus/policy" + + "github.com/Secured-Finance/dione/types" + + "github.com/Secured-Finance/dione/cache" +) + +const ( + DefaultTxTTL = 10 * time.Minute + DefaultTxPrefix = "tx_" +) + +type Mempool struct { + m sync.RWMutex + cache cache.Cache + txDescriptors []string // list of txs in cache +} + +func NewMempool(c cache.Cache) (*Mempool, error) { + mp := &Mempool{ + cache: c, + } + + var txDesc []string + err := c.Get("tx_list", &txDesc) + if err != nil || err != cache.ErrNilValue { + return nil, err + } + mp.txDescriptors = txDesc + + return mp, nil +} + +func (mp *Mempool) StoreTx(tx *types.Transaction) error { + mp.m.Lock() + defer mp.m.Unlock() + + hashStr := hex.EncodeToString(tx.Hash) + err := mp.cache.StoreWithTTL(DefaultTxPrefix+hashStr, tx, DefaultTxTTL) + mp.txDescriptors = append(mp.txDescriptors, hashStr) + mp.cache.Store("tx_list", mp.txDescriptors) // update tx list in cache + return err +} + +func (mp *Mempool) GetTxsForNewBlock() []*types.Transaction { + mp.m.Lock() + defer mp.m.Unlock() + + var txForBlock []*types.Transaction + var allTxs []*types.Transaction + + for i, v := range mp.txDescriptors { + var tx types.Transaction + err := mp.cache.Get(DefaultTxPrefix+v, &tx) + if err != nil { + if err == cache.ErrNilValue { + // descriptor is broken + // delete it and update list + mp.txDescriptors = removeItemFromStringSlice(mp.txDescriptors, i) + mp.cache.Store("tx_list", mp.txDescriptors) // update tx list in cache + } + continue + } + allTxs = append(allTxs, &tx) + } + sort.Slice(allTxs, func(i, j int) bool { + return allTxs[i].Timestamp.Before(allTxs[j].Timestamp) + }) + + for i := 0; i < policy.BlockMaxTransactionCount; i++ { + if len(allTxs) == 0 { + break + } + tx := allTxs[0] // get oldest tx + allTxs = allTxs[1:] // pop tx + txForBlock = append(txForBlock, tx) + } + + return txForBlock +} + +func removeItemFromStringSlice(s []string, i int) []string { + s[len(s)-1], s[i] = s[i], s[len(s)-1] + return s[:len(s)-1] +}