Skip to content

Commit

Permalink
eth: check blob transaction validity on the peer goroutine when recei…
Browse files Browse the repository at this point in the history
…ved (#31219)

This ensures that if we receive a blob transaction announcement where we cannot
link the tx to the sidecar commitments, we will drop the sending peer. This check
is added in the protocol handler for the PooledTransactions message.

Tests for this have also been added in the cross-client "eth" protocol test suite.

---------

Co-authored-by: Felix Lange <[email protected]>
  • Loading branch information
jwasinger and fjl authored Mar 1, 2025
1 parent ebc3232 commit d2bbde2
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 12 deletions.
5 changes: 5 additions & 0 deletions cmd/devp2p/internal/ethtest/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,16 @@ func (c *Conn) Write(proto Proto, code uint64, msg any) error {
return err
}

var errDisc error = fmt.Errorf("disconnect")

// ReadEth reads an Eth sub-protocol wire message.
func (c *Conn) ReadEth() (any, error) {
c.SetReadDeadline(time.Now().Add(timeout))
for {
code, data, _, err := c.Conn.Read()
if code == discMsg {
return nil, errDisc
}
if err != nil {
return nil, err
}
Expand Down
197 changes: 197 additions & 0 deletions cmd/devp2p/internal/ethtest/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@
package ethtest

import (
"context"
"crypto/rand"
"fmt"
"reflect"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
Expand Down Expand Up @@ -79,6 +83,8 @@ func (s *Suite) EthTests() []utesting.Test {
{Name: "InvalidTxs", Fn: s.TestInvalidTxs},
{Name: "NewPooledTxs", Fn: s.TestNewPooledTxs},
{Name: "BlobViolations", Fn: s.TestBlobViolations},
{Name: "TestBlobTxWithoutSidecar", Fn: s.TestBlobTxWithoutSidecar},
{Name: "TestBlobTxWithMismatchedSidecar", Fn: s.TestBlobTxWithMismatchedSidecar},
}
}

Expand Down Expand Up @@ -825,3 +831,194 @@ func (s *Suite) TestBlobViolations(t *utesting.T) {
conn.Close()
}
}

// mangleSidecar returns a copy of the given blob transaction where the sidecar
// data has been modified to produce a different commitment hash.
func mangleSidecar(tx *types.Transaction) *types.Transaction {
sidecar := tx.BlobTxSidecar()
copy := types.BlobTxSidecar{
Blobs: append([]kzg4844.Blob{}, sidecar.Blobs...),
Commitments: append([]kzg4844.Commitment{}, sidecar.Commitments...),
Proofs: append([]kzg4844.Proof{}, sidecar.Proofs...),
}
// zero the first commitment to alter the sidecar hash
copy.Commitments[0] = kzg4844.Commitment{}
return tx.WithBlobTxSidecar(&copy)
}

func (s *Suite) TestBlobTxWithoutSidecar(t *utesting.T) {
t.Log(`This test checks that a blob transaction first advertised/transmitted without blobs will result in the sending peer being disconnected, and the full transaction should be successfully retrieved from another peer.`)
tx := s.makeBlobTxs(1, 2, 42)[0]
badTx := tx.WithoutBlobTxSidecar()
s.testBadBlobTx(t, tx, badTx)
}

func (s *Suite) TestBlobTxWithMismatchedSidecar(t *utesting.T) {
t.Log(`This test checks that a blob transaction first advertised/transmitted without blobs, whose commitment don't correspond to the blob_versioned_hashes in the transaction, will result in the sending peer being disconnected, and the full transaction should be successfully retrieved from another peer.`)
tx := s.makeBlobTxs(1, 2, 43)[0]
badTx := mangleSidecar(tx)
s.testBadBlobTx(t, tx, badTx)
}

// readUntil reads eth protocol messages until a message of the target type is
// received. It returns an error if there is a disconnect, or if the context
// is cancelled before a message of the desired type can be read.
func readUntil[T any](ctx context.Context, conn *Conn) (*T, error) {
for {
select {
case <-ctx.Done():
return nil, context.Canceled
default:
}
received, err := conn.ReadEth()
if err != nil {
if err == errDisc {
return nil, errDisc
}
continue
}

switch res := received.(type) {
case *T:
return res, nil
}
}
}

// readUntilDisconnect reads eth protocol messages until the peer disconnects.
// It returns whether the peer disconnects in the next 100ms.
func readUntilDisconnect(conn *Conn) (disconnected bool) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
_, err := readUntil[struct{}](ctx, conn)
return err == errDisc
}

func (s *Suite) testBadBlobTx(t *utesting.T, tx *types.Transaction, badTx *types.Transaction) {
stage1, stage2, stage3 := new(sync.WaitGroup), new(sync.WaitGroup), new(sync.WaitGroup)
stage1.Add(1)
stage2.Add(1)
stage3.Add(1)

errc := make(chan error)

badPeer := func() {
// announce the correct hash from the bad peer.
// when the transaction is first requested before transmitting it from the bad peer,
// trigger step 2: connection and announcement by good peers

conn, err := s.dial()
if err != nil {
errc <- fmt.Errorf("dial fail: %v", err)
return
}
defer conn.Close()

if err := conn.peer(s.chain, nil); err != nil {
errc <- fmt.Errorf("bad peer: peering failed: %v", err)
return
}

ann := eth.NewPooledTransactionHashesPacket{
Types: []byte{types.BlobTxType},
Sizes: []uint32{uint32(badTx.Size())},
Hashes: []common.Hash{badTx.Hash()},
}

if err := conn.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann); err != nil {
errc <- fmt.Errorf("sending announcement failed: %v", err)
return
}

req, err := readUntil[eth.GetPooledTransactionsPacket](context.Background(), conn)
if err != nil {
errc <- fmt.Errorf("failed to read GetPooledTransactions message: %v", err)
return
}

stage1.Done()
stage2.Wait()

// the good peer is connected, and has announced the tx.
// proceed to send the incorrect one from the bad peer.

resp := eth.PooledTransactionsPacket{RequestId: req.RequestId, PooledTransactionsResponse: eth.PooledTransactionsResponse(types.Transactions{badTx})}
if err := conn.Write(ethProto, eth.PooledTransactionsMsg, resp); err != nil {
errc <- fmt.Errorf("writing pooled tx response failed: %v", err)
return
}
if !readUntilDisconnect(conn) {
errc <- fmt.Errorf("expected bad peer to be disconnected")
return
}
stage3.Done()
}

goodPeer := func() {
stage1.Wait()

conn, err := s.dial()
if err != nil {
errc <- fmt.Errorf("dial fail: %v", err)
return
}
defer conn.Close()

if err := conn.peer(s.chain, nil); err != nil {
errc <- fmt.Errorf("peering failed: %v", err)
return
}

ann := eth.NewPooledTransactionHashesPacket{
Types: []byte{types.BlobTxType},
Sizes: []uint32{uint32(tx.Size())},
Hashes: []common.Hash{tx.Hash()},
}

if err := conn.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann); err != nil {
errc <- fmt.Errorf("sending announcement failed: %v", err)
return
}

// wait until the bad peer has transmitted the incorrect transaction
stage2.Done()
stage3.Wait()

// the bad peer has transmitted the bad tx, and been disconnected.
// transmit the same tx but with correct sidecar from the good peer.

var req *eth.GetPooledTransactionsPacket
req, err = readUntil[eth.GetPooledTransactionsPacket](context.Background(), conn)
if err != nil {
errc <- fmt.Errorf("reading pooled tx request failed: %v", err)
return
}

if req.GetPooledTransactionsRequest[0] != tx.Hash() {
errc <- fmt.Errorf("requested unknown tx hash")
return
}

resp := eth.PooledTransactionsPacket{RequestId: req.RequestId, PooledTransactionsResponse: eth.PooledTransactionsResponse(types.Transactions{tx})}
if err := conn.Write(ethProto, eth.PooledTransactionsMsg, resp); err != nil {
errc <- fmt.Errorf("writing pooled tx response failed: %v", err)
return
}
if readUntilDisconnect(conn) {
errc <- fmt.Errorf("unexpected disconnect")
return
}
close(errc)
}

if err := s.engine.sendForkchoiceUpdated(); err != nil {
t.Fatalf("send fcu failed: %v", err)
}

go goodPeer()
go badPeer()
err := <-errc
if err != nil {
t.Fatalf("%v", err)
}
}
14 changes: 2 additions & 12 deletions core/txpool/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package txpool

import (
"crypto/sha256"
"errors"
"fmt"
"math/big"
Expand Down Expand Up @@ -170,20 +169,11 @@ func validateBlobSidecar(hashes []common.Hash, sidecar *types.BlobTxSidecar) err
if len(sidecar.Blobs) != len(hashes) {
return fmt.Errorf("invalid number of %d blobs compared to %d blob hashes", len(sidecar.Blobs), len(hashes))
}
if len(sidecar.Commitments) != len(hashes) {
return fmt.Errorf("invalid number of %d blob commitments compared to %d blob hashes", len(sidecar.Commitments), len(hashes))
}
if len(sidecar.Proofs) != len(hashes) {
return fmt.Errorf("invalid number of %d blob proofs compared to %d blob hashes", len(sidecar.Proofs), len(hashes))
}
// Blob quantities match up, validate that the provers match with the
// transaction hash before getting to the cryptography
hasher := sha256.New()
for i, vhash := range hashes {
computed := kzg4844.CalcBlobHashV1(hasher, &sidecar.Commitments[i])
if vhash != computed {
return fmt.Errorf("blob %d: computed hash %#x mismatches transaction one %#x", i, computed, vhash)
}
if err := sidecar.ValidateBlobCommitmentHashes(hashes); err != nil {
return err
}
// Blob commitments match with the hashes in the transaction, verify the
// blobs themselves via KZG
Expand Down
17 changes: 17 additions & 0 deletions core/types/tx_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package types
import (
"bytes"
"crypto/sha256"
"fmt"
"math/big"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -85,6 +86,22 @@ func (sc *BlobTxSidecar) encodedSize() uint64 {
return rlp.ListSize(blobs) + rlp.ListSize(commitments) + rlp.ListSize(proofs)
}

// ValidateBlobCommitmentHashes checks whether the given hashes correspond to the
// commitments in the sidecar
func (sc *BlobTxSidecar) ValidateBlobCommitmentHashes(hashes []common.Hash) error {
if len(sc.Commitments) != len(hashes) {
return fmt.Errorf("invalid number of %d blob commitments compared to %d blob hashes", len(sc.Commitments), len(hashes))
}
hasher := sha256.New()
for i, vhash := range hashes {
computed := kzg4844.CalcBlobHashV1(hasher, &sc.Commitments[i])
if vhash != computed {
return fmt.Errorf("blob %d: computed hash %#x mismatches transaction one %#x", i, computed, vhash)
}
}
return nil
}

// blobTxWithBlobs is used for encoding of transactions when blobs are present.
type blobTxWithBlobs struct {
BlobTx *BlobTx
Expand Down
13 changes: 13 additions & 0 deletions eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,19 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
return h.txFetcher.Enqueue(peer.ID(), *packet, false)

case *eth.PooledTransactionsResponse:
// If we receive any blob transactions missing sidecars, or with
// sidecars that don't correspond to the versioned hashes reported
// in the header, disconnect from the sending peer.
for _, tx := range *packet {
if tx.Type() == types.BlobTxType {
if tx.BlobTxSidecar() == nil {
return errors.New("received sidecar-less blob transaction")
}
if err := tx.BlobTxSidecar().ValidateBlobCommitmentHashes(tx.BlobHashes()); err != nil {
return err
}
}
}
return h.txFetcher.Enqueue(peer.ID(), *packet, true)

default:
Expand Down

0 comments on commit d2bbde2

Please sign in to comment.