Skip to content

Commit d4a6f43

Browse files
authored
refactor(core/txpool): migrate tx subscription to SubscribeTransactions ethereum#28243 (#2125)
* refactor(txpool): remove wrapper type ethereum#27841 Partial backport of ethereum/go-ethereum PR ethereum#27841, limited to txpool wrapper removal. - Migrate txpool interfaces/call sites from `*txpool.Transaction` to `*types.Transaction` - Update eth/miner/contracts paths and related tests accordingly - No intended behavior change Blob sidecar validation/handling changes from upstream are not included here. * refactor(core/txpool): migrate tx subscription to SubscribeTransactions ethereum#28243 Replace the old SubscribeNewTxsEvent-style plumbing with the new SubscribeTransactions(ch, reorgs) interface across txpool, eth protocol manager, API backend, miner worker, and test helpers. Key changes: - Extend txpool/subpool tx subscription interface with a reorgs flag - Route eth tx announcement path to reorgs=false (new tx announcements only) - Route API/miner subscriptions to reorgs=true - Move subscription-scope cleanup to TxPool.Close() - Add Gas field to LazyTransaction in legacy pending view Note: LegacyPool currently cannot strictly separate newly seen and resurrected txs, so the reorgs flag is accepted for API compatibility and future blob-subpool integration.
1 parent 5bb1f03 commit d4a6f43

File tree

13 files changed

+103
-105
lines changed

13 files changed

+103
-105
lines changed

contracts/utils.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func CreateTransactionSign(chainConfig *params.ChainConfig, pool *txpool.TxPool,
9393
return err
9494
}
9595
// Add tx signed to local tx pool.
96-
err = pool.Add([]*txpool.Transaction{{Tx: txSigned}}, true, true)[0]
96+
err = pool.Add([]*types.Transaction{txSigned}, true, true)[0]
9797
if err != nil {
9898
log.Error("Fail to add tx sign to local pool.", "error", err, "number", block.NumberU64(), "hash", block.Hash().Hex(), "from", account.Address, "nonce", nonce)
9999
return err
@@ -121,7 +121,7 @@ func CreateTransactionSign(chainConfig *params.ChainConfig, pool *txpool.TxPool,
121121
return err
122122
}
123123
// Add tx signed to local tx pool.
124-
err = pool.Add([]*txpool.Transaction{{Tx: txSigned}}, true, true)[0]
124+
err = pool.Add([]*types.Transaction{txSigned}, true, true)[0]
125125
if err != nil {
126126
log.Error("Fail to add tx secret to local pool.", "error", err, "number", block.NumberU64(), "hash", block.Hash().Hex(), "from", account.Address, "nonce", nonce)
127127
return err
@@ -150,7 +150,7 @@ func CreateTransactionSign(chainConfig *params.ChainConfig, pool *txpool.TxPool,
150150
return err
151151
}
152152
// Add tx to pool.
153-
err = pool.Add([]*txpool.Transaction{{Tx: txSigned}}, true, true)[0]
153+
err = pool.Add([]*types.Transaction{txSigned}, true, true)[0]
154154
if err != nil {
155155
log.Error("Fail to add tx opening to local pool.", "error", err, "number", block.NumberU64(), "hash", block.Hash().Hex(), "from", account.Address, "nonce", nonce)
156156
return err

core/txpool/legacypool/legacypool.go

Lines changed: 22 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,6 @@ type LegacyPool struct {
251251
chain BlockChain
252252
gasTip atomic.Pointer[big.Int]
253253
txFeed event.Feed
254-
scope event.SubscriptionScope
255254
signer types.Signer
256255
mu sync.RWMutex
257256

@@ -447,9 +446,6 @@ func (pool *LegacyPool) loop() {
447446

448447
// Close terminates the transaction pool.
449448
func (pool *LegacyPool) Close() error {
450-
// Unsubscribe all subscriptions registered from txpool
451-
pool.scope.Close()
452-
453449
// Terminate the pool reorger and return
454450
close(pool.reorgShutdownCh)
455451
pool.wg.Wait()
@@ -468,10 +464,14 @@ func (pool *LegacyPool) Reset(oldHead, newHead *types.Header) {
468464
<-wait
469465
}
470466

471-
// SubscribeTransactions registers a subscription of NewTxsEvent and
472-
// starts sending event to the given channel.
473-
func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription {
474-
return pool.scope.Track(pool.txFeed.Subscribe(ch))
467+
// SubscribeTransactions registers a subscription for new transaction events,
468+
// supporting feeding only newly seen or also resurrected transactions.
469+
func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
470+
// The legacy pool has a very messed up internal shuffling, so it's kind of
471+
// hard to separate newly discovered transactions from resurrected ones. This
472+
// is because the new txs are added to the queue, resurrected ones too and
473+
// reorgs run lazily, so separating the two would need a marker.
474+
return pool.txFeed.Subscribe(ch)
475475
}
476476

477477
// SetGasTip updates the minimum gas tip required by the transaction pool for a
@@ -607,10 +607,11 @@ func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*txpool.L
607607
lazies[i] = &txpool.LazyTransaction{
608608
Pool: pool,
609609
Hash: txs[i].Hash(),
610-
Tx: &txpool.Transaction{Tx: txs[i]},
610+
Tx: txs[i],
611611
Time: txs[i].Time(),
612612
GasFeeCap: txs[i].GasFeeCap(),
613613
GasTipCap: txs[i].GasTipCap(),
614+
Gas: txs[i].Gas(),
614615
}
615616
}
616617
pending[addr] = lazies
@@ -1099,26 +1100,13 @@ func (pool *LegacyPool) promoteSpecialTx(addr common.Address, tx *types.Transact
10991100
return true, nil
11001101
}
11011102

1102-
// Add enqueues a batch of transactions into the pool if they are valid. Depending
1103-
// on the local flag, full pricing constraints will or will not be applied.
1104-
//
1105-
// If sync is set, the method will block until all internal maintenance related
1106-
// to the add is finished. Only use this during tests for determinism!
1107-
func (pool *LegacyPool) Add(txs []*txpool.Transaction, local bool, sync bool) []error {
1108-
unwrapped := make([]*types.Transaction, len(txs))
1109-
for i, tx := range txs {
1110-
unwrapped[i] = tx.Tx
1111-
}
1112-
return pool.addTxs(unwrapped, local, sync)
1113-
}
1114-
11151103
// AddLocals enqueues a batch of transactions into the pool if they are valid, marking the
11161104
// senders as local ones, ensuring they go around the local pricing constraints.
11171105
//
11181106
// This method is used to add transactions from the RPC API and performs synchronous pool
11191107
// reorganization and event propagation.
11201108
func (pool *LegacyPool) addLocals(txs []*types.Transaction) []error {
1121-
return pool.addTxs(txs, !pool.config.NoLocals, true)
1109+
return pool.Add(txs, !pool.config.NoLocals, true)
11221110
}
11231111

11241112
// AddLocal enqueues a single local transaction into the pool if it is valid. This is
@@ -1134,7 +1122,7 @@ func (pool *LegacyPool) addLocal(tx *types.Transaction) error {
11341122
// This method is used to add transactions from the p2p network and does not wait for pool
11351123
// reorganization and internal event propagation.
11361124
func (pool *LegacyPool) AddRemotes(txs []*types.Transaction) []error {
1137-
return pool.addTxs(txs, false, false)
1125+
return pool.Add(txs, false, false)
11381126
}
11391127

11401128
// addRemote enqueues a single transaction into the pool if it is valid. This is a convenience
@@ -1146,16 +1134,20 @@ func (pool *LegacyPool) addRemote(tx *types.Transaction) error {
11461134

11471135
// AddRemotesSync is like AddRemotes, but waits for pool reorganization. Tests use this method.
11481136
func (pool *LegacyPool) addRemotesSync(txs []*types.Transaction) []error {
1149-
return pool.addTxs(txs, false, true)
1137+
return pool.Add(txs, false, true)
11501138
}
11511139

11521140
// This is like AddRemotes with a single transaction, but waits for pool reorganization. Tests use this method.
11531141
func (pool *LegacyPool) addRemoteSync(tx *types.Transaction) error {
1154-
return pool.addTxs([]*types.Transaction{tx}, false, true)[0]
1142+
return pool.Add([]*types.Transaction{tx}, false, true)[0]
11551143
}
11561144

1157-
// addTxs attempts to queue a batch of transactions if they are valid.
1158-
func (pool *LegacyPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
1145+
// Add enqueues a batch of transactions into the pool if they are valid. Depending
1146+
// on the local flag, full pricing constraints will or will not be applied.
1147+
//
1148+
// If sync is set, the method will block until all internal maintenance related
1149+
// to the add is finished. Only use this during tests for determinism!
1150+
func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error {
11591151
// Filter out known ones without obtaining the pool lock or recovering signatures
11601152
var (
11611153
errs = make([]error, len(txs))
@@ -1241,12 +1233,12 @@ func (pool *LegacyPool) Status(hash common.Hash) txpool.TxStatus {
12411233
}
12421234

12431235
// Get returns a transaction if it is contained in the pool and nil otherwise.
1244-
func (pool *LegacyPool) Get(hash common.Hash) *txpool.Transaction {
1236+
func (pool *LegacyPool) Get(hash common.Hash) *types.Transaction {
12451237
tx := pool.get(hash)
12461238
if tx == nil {
12471239
return nil
12481240
}
1249-
return &txpool.Transaction{Tx: tx}
1241+
return tx
12501242
}
12511243

12521244
// get returns a transaction if it is contained in the pool and nil otherwise.

core/txpool/subpool.go

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,34 +26,38 @@ import (
2626
"github.com/XinFinOrg/XDPoSChain/event"
2727
)
2828

29-
// Transaction is a helper struct to group together a canonical transaction with
30-
// satellite data items that are needed by the pool but are not part of the chain.
31-
type Transaction struct {
32-
Tx *types.Transaction // Canonical transaction
33-
}
34-
3529
// LazyTransaction contains a small subset of the transaction properties that is
3630
// enough for the miner and other APIs to handle large batches of transactions;
3731
// and supports pulling up the entire transaction when really needed.
3832
type LazyTransaction struct {
39-
Pool SubPool // Transaction subpool to pull the real transaction up
40-
Hash common.Hash // Transaction hash to pull up if needed
41-
Tx *Transaction // Transaction if already resolved
33+
Pool LazyResolver // Transaction resolver to pull the real transaction up
34+
Hash common.Hash // Transaction hash to pull up if needed
35+
Tx *types.Transaction // Transaction if already resolved
4236

4337
Time time.Time // Time when the transaction was first seen
4438
GasFeeCap *big.Int // Maximum fee per gas the transaction may consume
4539
GasTipCap *big.Int // Maximum miner tip per gas the transaction can pay
40+
41+
Gas uint64 // Amount of gas required by the transaction
4642
}
4743

4844
// Resolve retrieves the full transaction belonging to a lazy handle if it is still
4945
// maintained by the transaction pool.
50-
func (ltx *LazyTransaction) Resolve() *Transaction {
46+
func (ltx *LazyTransaction) Resolve() *types.Transaction {
5147
if ltx.Tx == nil {
5248
ltx.Tx = ltx.Pool.Get(ltx.Hash)
5349
}
5450
return ltx.Tx
5551
}
5652

53+
// LazyResolver is a minimal interface needed for a transaction pool to satisfy
54+
// resolving lazy transactions. It's mostly a helper to avoid the entire sub-
55+
// pool being injected into the lazy transaction.
56+
type LazyResolver interface {
57+
// Get returns a transaction if it is contained in the pool, or nil otherwise.
58+
Get(hash common.Hash) *types.Transaction
59+
}
60+
5761
// SubPool represents a specialized transaction pool that lives on its own (e.g.
5862
// blob pool). Since independent of how many specialized pools we have, they do
5963
// need to be updated in lockstep and assemble into one coherent view for block
@@ -90,19 +94,21 @@ type SubPool interface {
9094
Has(hash common.Hash) bool
9195

9296
// Get returns a transaction if it is contained in the pool, or nil otherwise.
93-
Get(hash common.Hash) *Transaction
97+
Get(hash common.Hash) *types.Transaction
9498

9599
// Add enqueues a batch of transactions into the pool if they are valid. Due
96100
// to the large transaction churn, add may postpone fully integrating the tx
97101
// to a later point to batch multiple ones together.
98-
Add(txs []*Transaction, local bool, sync bool) []error
102+
Add(txs []*types.Transaction, local bool, sync bool) []error
99103

100104
// Pending retrieves all currently processable transactions, grouped by origin
101105
// account and sorted by nonce.
102106
Pending(enforceTips bool) map[common.Address][]*LazyTransaction
103107

104-
// SubscribeTransactions subscribes to new transaction events.
105-
SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription
108+
// SubscribeTransactions subscribes to new transaction events. The subscriber
109+
// can decide whether to receive notifications only for newly seen transactions
110+
// or also for reorged out ones.
111+
SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription
106112

107113
// Nonce returns the next nonce of an account, with all transactions executable
108114
// by the pool already applied on top.

core/txpool/txpool.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ func (p *TxPool) Close() error {
100100
errs = append(errs, err)
101101
}
102102
}
103+
// Unsubscribe anyone still listening for tx events
104+
p.subs.Close()
105+
103106
if len(errs) > 0 {
104107
return fmt.Errorf("subpool close errors: %v", errs)
105108
}
@@ -190,7 +193,7 @@ func (p *TxPool) Has(hash common.Hash) bool {
190193
}
191194

192195
// Get returns a transaction if it is contained in the pool, or nil otherwise.
193-
func (p *TxPool) Get(hash common.Hash) *Transaction {
196+
func (p *TxPool) Get(hash common.Hash) *types.Transaction {
194197
for _, subpool := range p.subpools {
195198
if tx := subpool.Get(hash); tx != nil {
196199
return tx
@@ -202,14 +205,14 @@ func (p *TxPool) Get(hash common.Hash) *Transaction {
202205
// Add enqueues a batch of transactions into the pool if they are valid. Due
203206
// to the large transaction churn, add may postpone fully integrating the tx
204207
// to a later point to batch multiple ones together.
205-
func (p *TxPool) Add(txs []*Transaction, local bool, sync bool) []error {
208+
func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
206209
// Split the input transactions between the subpools. It shouldn't really
207210
// happen that we receive merged batches, but better graceful than strange
208211
// errors.
209212
//
210213
// We also need to track how the transactions were split across the subpools,
211214
// so we can piece back the returned errors into the original order.
212-
txsets := make([][]*Transaction, len(p.subpools))
215+
txsets := make([][]*types.Transaction, len(p.subpools))
213216
splits := make([]int, len(txs))
214217

215218
for i, tx := range txs {
@@ -218,7 +221,7 @@ func (p *TxPool) Add(txs []*Transaction, local bool, sync bool) []error {
218221

219222
// Try to find a subpool that accepts the transaction
220223
for j, subpool := range p.subpools {
221-
if subpool.Filter(tx.Tx) {
224+
if subpool.Filter(tx) {
222225
txsets[j] = append(txsets[j], tx)
223226
splits[i] = j
224227
break
@@ -255,12 +258,12 @@ func (p *TxPool) Pending(enforceTips bool) map[common.Address][]*LazyTransaction
255258
return txs
256259
}
257260

258-
// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and starts sending
259-
// events to the given channel.
260-
func (p *TxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
261+
// SubscribeTransactions registers a subscription for new transaction events,
262+
// supporting feeding only newly seen or also resurrected transactions.
263+
func (p *TxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
261264
subs := make([]event.Subscription, len(p.subpools))
262265
for i, subpool := range p.subpools {
263-
subs[i] = subpool.SubscribeTransactions(ch)
266+
subs[i] = subpool.SubscribeTransactions(ch, reorgs)
264267
}
265268
return p.subs.Track(event.JoinSubscriptions(subs...))
266269
}

eth/api_backend.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscri
301301
}
302302

303303
func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
304-
return b.eth.txPool.Add([]*txpool.Transaction{{Tx: signedTx}}, true, false)[0]
304+
return b.eth.txPool.Add([]*types.Transaction{signedTx}, true, false)[0]
305305
}
306306

307307
func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {
@@ -310,7 +310,7 @@ func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {
310310
for _, batch := range pending {
311311
for _, lazy := range batch {
312312
if tx := lazy.Resolve(); tx != nil {
313-
txs = append(txs, tx.Tx)
313+
txs = append(txs, tx)
314314
}
315315
}
316316
}
@@ -319,7 +319,7 @@ func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {
319319

320320
func (b *EthAPIBackend) GetPoolTransaction(hash common.Hash) *types.Transaction {
321321
if tx := b.eth.txPool.Get(hash); tx != nil {
322-
return tx.Tx
322+
return tx
323323
}
324324
return nil
325325
}
@@ -350,7 +350,7 @@ func (b *EthAPIBackend) TxPool() *txpool.TxPool {
350350
}
351351

352352
func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
353-
return b.eth.txPool.SubscribeNewTxsEvent(ch)
353+
return b.eth.txPool.SubscribeTransactions(ch, true)
354354
}
355355

356356
func (b *EthAPIBackend) Downloader() *downloader.Downloader {

eth/handler.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import (
3131
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"
3232
"github.com/XinFinOrg/XDPoSChain/consensus/misc"
3333
"github.com/XinFinOrg/XDPoSChain/core"
34-
"github.com/XinFinOrg/XDPoSChain/core/txpool"
3534
"github.com/XinFinOrg/XDPoSChain/core/types"
3635
"github.com/XinFinOrg/XDPoSChain/eth/bft"
3736
"github.com/XinFinOrg/XDPoSChain/eth/downloader"
@@ -282,9 +281,9 @@ func (pm *ProtocolManager) removePeer(id string) {
282281
func (pm *ProtocolManager) Start(maxPeers int) {
283282
pm.maxPeers = maxPeers
284283

285-
// broadcast transactions
284+
// broadcast and announce transactions (only new ones, not resurrected ones)
286285
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
287-
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
286+
pm.txsSub = pm.txpool.SubscribeTransactions(pm.txsCh, false)
288287
pm.orderTxCh = make(chan core.OrderTxPreEvent, txChanSize)
289288
if pm.orderpool != nil {
290289
pm.orderTxSub = pm.orderpool.SubscribeTxPreEvent(pm.orderTxCh)
@@ -780,11 +779,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
780779
pm.knownTxs.Add(tx.Hash(), struct{}{})
781780
}
782781
}
783-
warped := make([]*txpool.Transaction, len(txs))
784-
for i := range txs {
785-
warped[i] = &txpool.Transaction{Tx: txs[i]}
786-
}
787-
pm.txpool.Add(warped, false, false)
782+
pm.txpool.Add(txs, false, false)
788783

789784
case msg.Code == OrderTxMsg:
790785
// Transactions arrived, make sure we have a valid and fresh chain to handle them

0 commit comments

Comments
 (0)