diff --git a/blockchain/sync/sync_mgr.go b/blockchain/sync/sync_mgr.go index 3b6773d..a33f752 100644 --- a/blockchain/sync/sync_mgr.go +++ b/blockchain/sync/sync_mgr.go @@ -8,6 +8,8 @@ import ( "strings" "sync" + "github.com/Secured-Finance/dione/pubsub" + "github.com/Secured-Finance/dione/consensus/policy" "github.com/wealdtech/go-merkletree/keccak256" @@ -28,7 +30,6 @@ import ( type SyncManager interface { Start() - Stop() } @@ -41,11 +42,12 @@ type syncManager struct { initialSyncCompleted bool bootstrapPeer peer.ID rpcClient *gorpc.Client + psb *pubsub.PubSubRouter } -func NewSyncManager(bp *pool.BlockPool, mp *pool.Mempool, p2pRPCClient *gorpc.Client, bootstrapPeer peer.ID) SyncManager { +func NewSyncManager(bp *pool.BlockPool, mp *pool.Mempool, p2pRPCClient *gorpc.Client, bootstrapPeer peer.ID, psb *pubsub.PubSubRouter) SyncManager { ctx, cancelFunc := context.WithCancel(context.Background()) - return &syncManager{ + sm := &syncManager{ blockpool: bp, mempool: mp, ctx: ctx, @@ -53,7 +55,12 @@ func NewSyncManager(bp *pool.BlockPool, mp *pool.Mempool, p2pRPCClient *gorpc.Cl initialSyncCompleted: false, bootstrapPeer: bootstrapPeer, rpcClient: p2pRPCClient, + psb: psb, } + + psb.Hook(pubsub.NewTxMessageType, sm.onNewTransaction, types2.Transaction{}) + + return sm } func (sm *syncManager) Start() { @@ -179,6 +186,9 @@ func (sm *syncManager) doInitialMempoolSync() error { if err != nil { return err } + if getMempoolTxReply.Error != nil { + return getMempoolTxReply.Error + } for _, v := range getMempoolTxReply.Transactions { err := sm.mempool.StoreTx(&v) if err != nil { @@ -235,3 +245,21 @@ func (sm *syncManager) processReceivedBlock(block types2.Block) error { func (sm *syncManager) syncLoop() { } + +func (sm *syncManager) onNewTransaction(message *pubsub.GenericMessage) { + tx, ok := message.Payload.(types2.Transaction) + if !ok { + logrus.Warn("failed to convert payload to Transaction") + return + } + + if !tx.ValidateHash() { + logrus.Warn("failed to validate tx hash, rejecting it") + return + } // TODO add more checks on tx + + err := sm.mempool.StoreTx(&tx) + if err != nil { + logrus.Warnf("failed to store incoming transaction in mempool: %s", err.Error()) + } +}