Skip to content
58 changes: 43 additions & 15 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"errors"
"fmt"
"github.com/cosmos/evm/mempool/txpool/locals"
"sync"
"time"

ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/holiman/uint256"
Expand Down Expand Up @@ -46,9 +48,10 @@ type (
vmKeeper VMKeeperI

/** Mempools **/
txPool *txpool.TxPool
legacyTxPool *legacypool.LegacyPool
cosmosPool sdkmempool.ExtMempool
txPool *txpool.TxPool
legacyTxPool *legacypool.LegacyPool
localTxTracker *locals.TxTracker
cosmosPool sdkmempool.ExtMempool

/** Utils **/
logger log.Logger
Expand Down Expand Up @@ -127,6 +130,7 @@ func NewExperimentalEVMMempool(getCtxCallback func(height int64, prove bool) (sd
// from queued into pending, noting their readiness to be executed.
legacyPool.BroadcastTxFn = func(txs []*ethtypes.Transaction) error {
logger.Debug("broadcasting EVM transactions", "tx_count", len(txs))
fmt.Println(clientCtx)
return broadcastEVMTransactions(clientCtx, txConfig, txs)
}
}
Expand All @@ -143,6 +147,21 @@ func NewExperimentalEVMMempool(getCtxCallback func(height int64, prove bool) (sd
panic("tx pool should contain only legacypool")
}

var localTxTracker *locals.TxTracker

if !legacyConfig.NoLocals {
rejournal := legacyConfig.Rejournal
if rejournal < time.Second {
logger.Debug("Sanitizing invalid txpool journal time", "provided", rejournal, "updated", time.Second)
rejournal = time.Second
}
localTxTracker = locals.New(legacyConfig.Journal, rejournal, blockchain.Config(), txPool)
err := localTxTracker.Start()
if err != nil {
return nil
}
}

// Create Cosmos Mempool from configuration
cosmosPoolConfig := config.CosmosPoolConfig
if cosmosPoolConfig == nil {
Expand Down Expand Up @@ -174,18 +193,19 @@ func NewExperimentalEVMMempool(getCtxCallback func(height int64, prove bool) (sd
cosmosPool = sdkmempool.NewPriorityMempool(*cosmosPoolConfig)

evmMempool := &ExperimentalEVMMempool{
vmKeeper: vmKeeper,
txPool: txPool,
legacyTxPool: txPool.Subpools[0].(*legacypool.LegacyPool),
cosmosPool: cosmosPool,
logger: logger,
txConfig: txConfig,
blockchain: blockchain,
bondDenom: bondDenom,
evmDenom: evmDenom,
blockGasLimit: config.BlockGasLimit,
minTip: config.MinTip,
anteHandler: config.AnteHandler,
vmKeeper: vmKeeper,
txPool: txPool,
legacyTxPool: txPool.Subpools[0].(*legacypool.LegacyPool),
localTxTracker: localTxTracker,
cosmosPool: cosmosPool,
logger: logger,
txConfig: txConfig,
blockchain: blockchain,
bondDenom: bondDenom,
evmDenom: evmDenom,
blockGasLimit: config.BlockGasLimit,
minTip: config.MinTip,
anteHandler: config.AnteHandler,
}

vmKeeper.SetEvmMempool(evmMempool)
Expand Down Expand Up @@ -303,6 +323,10 @@ func (m *ExperimentalEVMMempool) Remove(tx sdk.Tx) error {
m.mtx.Lock()
defer m.mtx.Unlock()

if m.blockchain.latestCtx.BlockHeight() == 0 {
return nil
}

m.logger.Debug("removing transaction from mempool")

msg, err := m.getEVMMessage(tx)
Expand Down Expand Up @@ -419,6 +443,10 @@ func (m *ExperimentalEVMMempool) Close() error {
errs = append(errs, fmt.Errorf("failed to close txpool: %w", err))
}

if err := m.localTxTracker.Stop(); err != nil {
errs = append(errs, fmt.Errorf("failed to close localTxTracker: %w", err))
}

return errors.Join(errs...)
}

Expand Down
46 changes: 46 additions & 0 deletions mempool/txpool/locals/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2025 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package locals

import (
"errors"

"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool"
)

// IsTemporaryReject determines whether the given error indicates a temporary
// reason to reject a transaction from being included in the txpool. The result
// may change if the txpool's state changes later.
func IsTemporaryReject(err error) bool {
switch {
case errors.Is(err, legacypool.ErrOutOfOrderTxFromDelegated):
return true
case errors.Is(err, txpool.ErrInflightTxLimitReached):
return true
case errors.Is(err, legacypool.ErrAuthorityReserved):
return true
case errors.Is(err, txpool.ErrUnderpriced):
return true
case errors.Is(err, legacypool.ErrTxPoolOverflow):
return true
case errors.Is(err, legacypool.ErrFutureReplacePending):
return true
default:
return false
}
}
186 changes: 186 additions & 0 deletions mempool/txpool/locals/journal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package locals

import (
"errors"
"io"
"io/fs"
"os"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
)

// errNoActiveJournal is returned if a transaction is attempted to be inserted
// into the journal, but no such file is currently open.
var errNoActiveJournal = errors.New("no active journal")

// devNull is a WriteCloser that just discards anything written into it. Its
// goal is to allow the transaction journal to write into a fake journal when
// loading transactions on startup without printing warnings due to no file
// being read for write.
type devNull struct{}

func (*devNull) Write(p []byte) (n int, err error) { return len(p), nil }
func (*devNull) Close() error { return nil }

// journal is a rotating log of transactions with the aim of storing locally
// created transactions to allow non-executed ones to survive node restarts.
type journal struct {
path string // Filesystem path to store the transactions at
writer io.WriteCloser // Output stream to write new transactions into
}

// newTxJournal creates a new transaction journal to
func newTxJournal(path string) *journal {
return &journal{
path: path,
}
}

// load parses a transaction journal dump from disk, loading its contents into
// the specified pool.
func (journal *journal) load(add func([]*types.Transaction) []error) error {
// Open the journal for loading any past transactions
input, err := os.Open(journal.path)
if errors.Is(err, fs.ErrNotExist) {
// Skip the parsing if the journal file doesn't exist at all
return nil
}
if err != nil {
return err
}
defer input.Close()

// Temporarily discard any journal additions (don't double add on load)
journal.writer = new(devNull)
defer func() { journal.writer = nil }()

// Inject all transactions from the journal into the pool
stream := rlp.NewStream(input, 0)
total, dropped := 0, 0

// Create a method to load a limited batch of transactions and bump the
// appropriate progress counters. Then use this method to load all the
// journaled transactions in small-ish batches.
loadBatch := func(txs types.Transactions) {
for _, err := range add(txs) {
if err != nil {
log.Debug("Failed to add journaled transaction", "err", err)
dropped++
}
}
}
var (
failure error
batch types.Transactions
)
for {
// Parse the next transaction and terminate on error
tx := new(types.Transaction)
if err = stream.Decode(tx); err != nil {
if err != io.EOF {
failure = err
}
if batch.Len() > 0 {
loadBatch(batch)
}
break
}
// New transaction parsed, queue up for later, import if threshold is reached
total++

if batch = append(batch, tx); batch.Len() > 1024 {
loadBatch(batch)
batch = batch[:0]
}
}
log.Info("Loaded local transaction journal", "transactions", total, "dropped", dropped)

return failure
}

// insert adds the specified transaction to the local disk journal.
func (journal *journal) insert(tx *types.Transaction) error {
if journal.writer == nil {
return errNoActiveJournal
}
if err := rlp.Encode(journal.writer, tx); err != nil {
return err
}
return nil
}

// rotate regenerates the transaction journal based on the current contents of
// the transaction pool.
func (journal *journal) rotate(all map[common.Address]types.Transactions) error {
// Close the current journal (if any is open)
if journal.writer != nil {
if err := journal.writer.Close(); err != nil {
return err
}
journal.writer = nil
}
// Generate a new journal with the contents of the current pool
replacement, err := os.OpenFile(journal.path+".new", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return err
}
journaled := 0
for _, txs := range all {
for _, tx := range txs {
if err = rlp.Encode(replacement, tx); err != nil {
replacement.Close()

Check warning

Code scanning / CodeQL

Writable file handle closed without error handling Warning

File handle may be writable as a result of data flow from a
call to OpenFile
and closing it may result in data loss upon failure, which is not handled explicitly.

Copilot Autofix

AI 1 day ago

To fix the problem, we must ensure that any error returned by replacement.Close() is neither ignored nor silently discarded. Specifically, in the event that a write (rlp.Encode) fails inside the loop, the code should close the file and capture any error from Close(). If Close() also fails, both errors should be reported—ideally by combining them. The canonical way in Go is to chain or wrap the Close error with the initial error (if both occur). This can be done with Go's errors.Join (Go 1.20+) or, for older Go, by concatenating the messages or using a helper function. Since the import errors is already present, and Go 1.20+'s errors.Join is standard, I'll use that.

The fix consists of:

  • Not just calling replacement.Close() without checking, but capturing its error.
  • If both rlp.Encode and replacement.Close error, return errors.Join(encodeErr, closeErr).
  • If only one errors, return that error.
  • Changing only the relevant region of the file in the shown snippet (mempool/txpool/locals/journal.go).

Suggested changeset 1
mempool/txpool/locals/journal.go

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/mempool/txpool/locals/journal.go b/mempool/txpool/locals/journal.go
--- a/mempool/txpool/locals/journal.go
+++ b/mempool/txpool/locals/journal.go
@@ -147,7 +147,10 @@
 	for _, txs := range all {
 		for _, tx := range txs {
 			if err = rlp.Encode(replacement, tx); err != nil {
-				replacement.Close()
+				closeErr := replacement.Close()
+				if closeErr != nil {
+					return errors.Join(err, closeErr)
+				}
 				return err
 			}
 		}
EOF
@@ -147,7 +147,10 @@
for _, txs := range all {
for _, tx := range txs {
if err = rlp.Encode(replacement, tx); err != nil {
replacement.Close()
closeErr := replacement.Close()
if closeErr != nil {
return errors.Join(err, closeErr)
}
return err
}
}
Copilot is powered by AI and may make mistakes. Always verify output.
return err
}
}
journaled += len(txs)
}
Comment on lines +147 to +155

Check warning

Code scanning / CodeQL

Iteration over map Warning

Iteration over map may be a possible source of non-determinism
replacement.Close()

Check warning

Code scanning / CodeQL

Writable file handle closed without error handling Warning

File handle may be writable as a result of data flow from a
call to OpenFile
and closing it may result in data loss upon failure, which is not handled explicitly.

Copilot Autofix

AI 1 day ago

To fix the problem, always check the error result returned by replacement.Close() (line 156) and handle it as you do for other file operations in the same function. The best way, patterned after the rest of the function, is to assign the error returned from replacement.Close() to err, and if it is non-nil, return it as the function error immediately. This ensures that any issue closing the file is surfaced and handled, avoiding silent data loss.

Changes needed:

  • In mempool/txpool/locals/journal.go, in the rotate method, replace the call at line 156 from replacement.Close() to:
    if err = replacement.Close(); err != nil {
        return err
    }

No new imports or method definitions are necessary, as this is the pattern for error handling already used in this function.

Suggested changeset 1
mempool/txpool/locals/journal.go

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/mempool/txpool/locals/journal.go b/mempool/txpool/locals/journal.go
--- a/mempool/txpool/locals/journal.go
+++ b/mempool/txpool/locals/journal.go
@@ -153,7 +153,9 @@
 		}
 		journaled += len(txs)
 	}
-	replacement.Close()
+	if err = replacement.Close(); err != nil {
+		return err
+	}
 
 	// Replace the live journal with the newly generated one
 	if err = os.Rename(journal.path+".new", journal.path); err != nil {
EOF
@@ -153,7 +153,9 @@
}
journaled += len(txs)
}
replacement.Close()
if err = replacement.Close(); err != nil {
return err
}

// Replace the live journal with the newly generated one
if err = os.Rename(journal.path+".new", journal.path); err != nil {
Copilot is powered by AI and may make mistakes. Always verify output.

// Replace the live journal with the newly generated one
if err = os.Rename(journal.path+".new", journal.path); err != nil {
return err
}
sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return err
}
journal.writer = sink

logger := log.Info
if len(all) == 0 {
logger = log.Debug
}
logger("Regenerated local transaction journal", "transactions", journaled, "accounts", len(all))

return nil
}

// close flushes the transaction journal contents to disk and closes the file.
func (journal *journal) close() error {
var err error

if journal.writer != nil {
err = journal.writer.Close()
journal.writer = nil
}
return err
}
Loading
Loading