From 850dfcb72193d59d5d13c252e3e6613588400a35 Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Fri, 6 Mar 2026 12:32:53 +0100 Subject: [PATCH 1/9] fix(pushsync,pusher): prevent silent chunk loss on shallow receipts --- pkg/pusher/pusher.go | 5 ++- pkg/pushsync/pushsync.go | 42 +++++++++++++++---- pkg/pushsync/pushsync_test.go | 78 ++++++++++++++++++++++++++++------- 3 files changed, 101 insertions(+), 24 deletions(-) diff --git a/pkg/pusher/pusher.go b/pkg/pusher/pusher.go index 9c4073f1fcc..560cee6fad0 100644 --- a/pkg/pusher/pusher.go +++ b/pkg/pusher/pusher.go @@ -280,7 +280,10 @@ func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) ( if s.shallowReceipt(op.identityAddress) { return true, err } - if err := s.storer.Report(ctx, op.Chunk, storage.ChunkSynced); err != nil { + // Retry budget exhausted: no peer in the correct neighborhood stored the + // chunk. Report CouldNotSync rather than Synced to avoid falsely marking + // the chunk as delivered. + if err := s.storer.Report(ctx, op.Chunk, storage.ChunkCouldNotSync); err != nil { loggerV1.Error(err, "pusher: failed to report sync status") return true, err } diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 05c4aef3d41..8f7d80f85ac 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -299,6 +299,13 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) switch receipt, err := ps.pushToClosest(ctx, chunk, false); { case errors.Is(err, topology.ErrWantSelf): + // Only store if we are actually within our neighborhood. If the chunk + // is outside our AOR we would store it in a low bin and unreserve() + // would evict it almost immediately, leaving the chunk nowhere on the + // network while the origin believes it was delivered. + if swarm.Proximity(ps.address.Bytes(), chunkAddress.Bytes()) < rad { + return ErrOutOfDepthStoring + } stored, reason = true, "want self" return store(ctx) case err == nil: @@ -361,10 +368,11 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo ps.metrics.TotalRequests.Inc() var ( - sentErrorsLeft = 1 - preemptiveTicker <-chan time.Time - inflight int - parallelForwards = maxMultiplexForwards + sentErrorsLeft = 1 + preemptiveTicker <-chan time.Time + inflight int + parallelForwards = maxMultiplexForwards + shallowReceiptResult *pb.Receipt ) if origin { @@ -419,11 +427,19 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo if skip.PruneExpiresAfter(idAddress, overDraftRefresh) == 0 { // no overdraft peers, we have depleted ALL peers if inflight == 0 { if ps.fullNode { + // If a peer already has the chunk (even at wrong depth), don't + // store locally — the chunk is closer to its neighbourhood than us. + if shallowReceiptResult != nil { + return shallowReceiptResult, ErrShallowReceipt + } if cac.Valid(ch) { go ps.unwrap(ch) } return nil, topology.ErrWantSelf } + if shallowReceiptResult != nil { + return shallowReceiptResult, ErrShallowReceipt + } ps.logger.Debug("no peers left", "chunk_address", ch.Address(), "error", err) return nil, err } @@ -486,12 +502,17 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo return result.receipt, nil } - switch err := ps.checkReceipt(result.receipt); { - case err == nil: + if err := ps.checkReceipt(result.receipt); err == nil { return result.receipt, nil - case errors.Is(err, ErrShallowReceipt): - ps.errSkip.Add(idAddress, result.peer, skiplistDur) - return result.receipt, err + } else if errors.Is(err, ErrShallowReceipt) { + // Treat shallow receipt like any other failure: exhaust the full + // error budget and wait for any other inflight parallel pushes + // (e.g. multiplex forwards) before giving up. Only return + // ErrShallowReceipt once the entire budget is spent. + shallowReceiptResult = result.receipt + result.err = err + } else { + result.err = err } } @@ -505,6 +526,9 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo } } + if shallowReceiptResult != nil { + return shallowReceiptResult, ErrShallowReceipt + } return nil, ErrNoPush } diff --git a/pkg/pushsync/pushsync_test.go b/pkg/pushsync/pushsync_test.go index 90e6d654a5b..a689781ded8 100644 --- a/pkg/pushsync/pushsync_test.go +++ b/pkg/pushsync/pushsync_test.go @@ -208,29 +208,48 @@ func TestSocListener(t *testing.T) { waitOnRecordAndTest(t, closestPeer, recorder, sch2.Address(), nil) } -// TestShallowReceipt forces the peer to send back a shallow receipt to a pushsync request. In return, the origin node returns the error along with the received receipt. +// TestShallowReceipt verifies that when a storer node stores a chunk legitimately +// within its own AOR but the origin node has a stricter radius, the origin +// correctly identifies and returns ErrShallowReceipt together with the receipt. func TestShallowReceipt(t *testing.T) { t.Parallel() - // chunk data to upload - chunk := testingc.FixtureChunk("7000") - var highPO uint8 = 31 + key, err := crypto.GenerateSecp256k1Key() + if err != nil { + t.Fatal(err) + } - // create a pivot node and a mocked closest node - pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // base is 0000 - closestPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000") // binary 0110 -> po 1 + signer := crypto.NewDefaultSigner(key) - // peer is the node responding to the chunk receipt message - // mock should return ErrWantSelf since there's no one to forward to - psPeer, _ := createPushSyncNodeWithRadius(t, closestPeer, defaultPrices, nil, nil, defaultSigner(chunk), highPO, 0, mock.WithClosestPeerErr(topology.ErrWantSelf)) + pubKey, err := signer.PublicKey() + if err != nil { + t.Fatal(err) + } + + closestPeer, err := crypto.NewOverlayAddress(*pubKey, 1, blockHash.Bytes()) + if err != nil { + t.Fatal(err) + } + + // Storer stores within its own AOR (proximity == storerRadius → just qualifies). + // The origin has a higher radius, so it considers the receipt too shallow. + storerRadius := 3 + chunkProximity := 3 + pivotRadius := 7 + pivotTolerance := uint8(0) + + pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") + + chunk := testingc.GenerateValidRandomChunkAt(t, closestPeer, chunkProximity) + + // storer: proximity == storerRadius → within AOR → stores and sends receipt + psPeer, _ := createPushSyncNodeWithRadius(t, closestPeer, defaultPrices, nil, nil, signer, uint8(storerRadius), 0, mock.WithClosestPeerErr(topology.ErrWantSelf)) recorder := streamtest.New(streamtest.WithProtocols(psPeer.Protocol()), streamtest.WithBaseAddr(pivotNode)) - // pivot node needs the streamer since the chunk is intercepted by - // the chunk worker, then gets sent by opening a new stream - psPivot, _ := createPushSyncNodeWithRadius(t, pivotNode, defaultPrices, recorder, nil, defaultSigner(chunk), highPO, 0, mock.WithClosestPeer(closestPeer)) + // pivot: stricter radius → origin considers the receipt shallow + psPivot, _ := createPushSyncNodeWithRadius(t, pivotNode, defaultPrices, recorder, nil, nil, uint8(pivotRadius), pivotTolerance, mock.WithClosestPeer(closestPeer)) - // Trigger the sending of chunk to the closest node receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk) if !errors.Is(err, pushsync.ErrShallowReceipt) { t.Fatalf("got %v, want %v", err, pushsync.ErrShallowReceipt) @@ -247,6 +266,37 @@ func TestShallowReceipt(t *testing.T) { waitOnRecordAndTest(t, closestPeer, recorder, chunk.Address(), nil) } +// TestOutOfDepthStoring verifies that when a storer is forced (ErrWantSelf) but +// the chunk is outside its AOR, it refuses to store and returns an error rather +// than storing a chunk it will immediately evict. +func TestOutOfDepthStoring(t *testing.T) { + t.Parallel() + + chunk := testingc.FixtureChunk("7000") + + var highPO uint8 = 31 + + // Storer address has very low proximity to the chunk; its radius is highPO. + // It has no closer peers (ErrWantSelf) but MUST refuse to store because + // the chunk is far outside its AOR. + pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") + closestPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000") + + psPeer, _ := createPushSyncNodeWithRadius(t, closestPeer, defaultPrices, nil, nil, defaultSigner(chunk), highPO, 0, mock.WithClosestPeerErr(topology.ErrWantSelf)) + + recorder := streamtest.New(streamtest.WithProtocols(psPeer.Protocol()), streamtest.WithBaseAddr(pivotNode)) + + psPivot, _ := createPushSyncNodeWithRadius(t, pivotNode, defaultPrices, recorder, nil, defaultSigner(chunk), highPO, 0, mock.WithClosestPeer(closestPeer)) + + _, err := psPivot.PushChunkToClosest(context.Background(), chunk) + + // The storer correctly refused to store, so the origin exhausted its peers + // without any shallow receipt. No ErrShallowReceipt should be returned. + if errors.Is(err, pushsync.ErrShallowReceipt) { + t.Fatal("got ErrShallowReceipt, but storer should have refused to store out-of-depth chunk") + } +} + // TestShallowReceiptTolerance sends back a shallow receipt but because of the tolerance level, the origin node accepts the receipts. func TestShallowReceiptTolerance(t *testing.T) { t.Parallel() From 0fab07cd28136acb4e2706f4cccd98178e1561be Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Mon, 9 Mar 2026 12:31:22 +0100 Subject: [PATCH 2/9] fix(pushsync): modify TestShallowReceipt --- pkg/pushsync/pushsync_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/pushsync/pushsync_test.go b/pkg/pushsync/pushsync_test.go index a689781ded8..1b9fe325cd9 100644 --- a/pkg/pushsync/pushsync_test.go +++ b/pkg/pushsync/pushsync_test.go @@ -231,18 +231,18 @@ func TestShallowReceipt(t *testing.T) { t.Fatal(err) } - // Storer stores within its own AOR (proximity == storerRadius → just qualifies). - // The origin has a higher radius, so it considers the receipt too shallow. - storerRadius := 3 - chunkProximity := 3 - pivotRadius := 7 + // Storer stores within its own AOR (proximity > storerRadius → qualifies). + // The origin has a much higher radius, so it always considers the receipt shallow. + storerRadius := 1 + chunkProximity := 0 + pivotRadius := 31 pivotTolerance := uint8(0) pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") chunk := testingc.GenerateValidRandomChunkAt(t, closestPeer, chunkProximity) - // storer: proximity == storerRadius → within AOR → stores and sends receipt + // storer: proximity > storerRadius → within AOR → stores and sends receipt psPeer, _ := createPushSyncNodeWithRadius(t, closestPeer, defaultPrices, nil, nil, signer, uint8(storerRadius), 0, mock.WithClosestPeerErr(topology.ErrWantSelf)) recorder := streamtest.New(streamtest.WithProtocols(psPeer.Protocol()), streamtest.WithBaseAddr(pivotNode)) From 9089e44d3a483b57b7a51e263c5f19b92e5cbc0f Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Mon, 9 Mar 2026 12:36:36 +0100 Subject: [PATCH 3/9] fix(pushsync): add OutOfDepthStoring counter metric --- pkg/pushsync/metrics.go | 9 ++++++++- pkg/pushsync/pushsync.go | 1 + 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/pushsync/metrics.go b/pkg/pushsync/metrics.go index 758a6f072a5..1adc5958a11 100644 --- a/pkg/pushsync/metrics.go +++ b/pkg/pushsync/metrics.go @@ -28,7 +28,8 @@ type metrics struct { ReceiptDepth *prometheus.CounterVec ShallowReceiptDepth *prometheus.CounterVec ShallowReceipt prometheus.Counter - OverdraftRefresh prometheus.Counter + OverdraftRefresh prometheus.Counter + OutOfDepthStoring prometheus.Counter } func newMetrics() metrics { @@ -153,6 +154,12 @@ func newMetrics() metrics { Name: "overdraft_refresh", Help: "Total number of times peers were skipped due to overdraft, requiring a wait to refresh balance.", }), + OutOfDepthStoring: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "out_of_depth_storing", + Help: "Total number of times a chunk was refused because it was outside the neighborhood (ErrWantSelf with proximity < radius).", + }), } } diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 8f7d80f85ac..c4f041e7085 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -304,6 +304,7 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) // would evict it almost immediately, leaving the chunk nowhere on the // network while the origin believes it was delivered. if swarm.Proximity(ps.address.Bytes(), chunkAddress.Bytes()) < rad { + ps.metrics.OutOfDepthStoring.Inc() return ErrOutOfDepthStoring } stored, reason = true, "want self" From 68669c3c4c8bca996a9decabe5a7c09108d8b498 Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Mon, 9 Mar 2026 12:48:21 +0100 Subject: [PATCH 4/9] chore: gofmt --- pkg/file/splitter/internal/job.go | 4 ++-- pkg/pushsync/metrics.go | 4 ++-- pkg/pushsync/pushsync.go | 8 ++++---- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/file/splitter/internal/job.go b/pkg/file/splitter/internal/job.go index db34e5764c1..f33c134bcc0 100644 --- a/pkg/file/splitter/internal/job.go +++ b/pkg/file/splitter/internal/job.go @@ -198,7 +198,7 @@ func (s *SimpleSplitterJob) hashUnfinished() error { // F F // F F F // -// F F F F S +// # F F F F S // // The result will be: // @@ -206,7 +206,7 @@ func (s *SimpleSplitterJob) hashUnfinished() error { // F F // F F F // -// F F F F +// # F F F F // // After which the SS will be hashed to obtain the final root hash func (s *SimpleSplitterJob) moveDanglingChunk() error { diff --git a/pkg/pushsync/metrics.go b/pkg/pushsync/metrics.go index 1adc5958a11..b21bedffa84 100644 --- a/pkg/pushsync/metrics.go +++ b/pkg/pushsync/metrics.go @@ -28,8 +28,8 @@ type metrics struct { ReceiptDepth *prometheus.CounterVec ShallowReceiptDepth *prometheus.CounterVec ShallowReceipt prometheus.Counter - OverdraftRefresh prometheus.Counter - OutOfDepthStoring prometheus.Counter + OverdraftRefresh prometheus.Counter + OutOfDepthStoring prometheus.Counter } func newMetrics() metrics { diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index c4f041e7085..575b73b0878 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -369,10 +369,10 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo ps.metrics.TotalRequests.Inc() var ( - sentErrorsLeft = 1 - preemptiveTicker <-chan time.Time - inflight int - parallelForwards = maxMultiplexForwards + sentErrorsLeft = 1 + preemptiveTicker <-chan time.Time + inflight int + parallelForwards = maxMultiplexForwards shallowReceiptResult *pb.Receipt ) From 68b9c47199cf3a40cd27bf265e38f4c64912628b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ljubi=C5=A1a=20Ga=C4=8Devi=C4=87?= <35105035+gacevicljubisa@users.noreply.github.com> Date: Mon, 9 Mar 2026 15:19:59 +0100 Subject: [PATCH 5/9] fix: revert unrelated changes Updated comments to correct formatting and clarity. --- pkg/file/splitter/internal/job.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/file/splitter/internal/job.go b/pkg/file/splitter/internal/job.go index f33c134bcc0..db34e5764c1 100644 --- a/pkg/file/splitter/internal/job.go +++ b/pkg/file/splitter/internal/job.go @@ -198,7 +198,7 @@ func (s *SimpleSplitterJob) hashUnfinished() error { // F F // F F F // -// # F F F F S +// F F F F S // // The result will be: // @@ -206,7 +206,7 @@ func (s *SimpleSplitterJob) hashUnfinished() error { // F F // F F F // -// # F F F F +// F F F F // // After which the SS will be hashed to obtain the final root hash func (s *SimpleSplitterJob) moveDanglingChunk() error { From acae9c8f5eaf694ecf9257f47a27cd7c40feb80d Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Tue, 10 Mar 2026 18:47:59 +0100 Subject: [PATCH 6/9] fix(pushsync): move shallow receipt check below unwrap block --- pkg/pushsync/pushsync.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 575b73b0878..7e50e89cb1b 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -428,14 +428,14 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo if skip.PruneExpiresAfter(idAddress, overDraftRefresh) == 0 { // no overdraft peers, we have depleted ALL peers if inflight == 0 { if ps.fullNode { + if cac.Valid(ch) { + go ps.unwrap(ch) + } // If a peer already has the chunk (even at wrong depth), don't // store locally — the chunk is closer to its neighbourhood than us. if shallowReceiptResult != nil { return shallowReceiptResult, ErrShallowReceipt } - if cac.Valid(ch) { - go ps.unwrap(ch) - } return nil, topology.ErrWantSelf } if shallowReceiptResult != nil { From 03057b1b32a6914b3a087b5acdf84285d1a20c3d Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Tue, 10 Mar 2026 18:57:01 +0100 Subject: [PATCH 7/9] test(pushsync): assert specific ErrWantSelf in TestOutOfDepthStoring --- pkg/pushsync/pushsync_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/pushsync/pushsync_test.go b/pkg/pushsync/pushsync_test.go index 1b9fe325cd9..7bef63d85c7 100644 --- a/pkg/pushsync/pushsync_test.go +++ b/pkg/pushsync/pushsync_test.go @@ -291,9 +291,9 @@ func TestOutOfDepthStoring(t *testing.T) { _, err := psPivot.PushChunkToClosest(context.Background(), chunk) // The storer correctly refused to store, so the origin exhausted its peers - // without any shallow receipt. No ErrShallowReceipt should be returned. - if errors.Is(err, pushsync.ErrShallowReceipt) { - t.Fatal("got ErrShallowReceipt, but storer should have refused to store out-of-depth chunk") + // and falls back to ErrWantSelf (full node with no remaining peers). + if !errors.Is(err, topology.ErrWantSelf) { + t.Fatalf("got %v, want %v", err, topology.ErrWantSelf) } } From 68658057151b518c6a20922e790312ae886a7249 Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Wed, 29 Apr 2026 10:41:00 +0200 Subject: [PATCH 8/9] fix(pushsync,pusher): complete CouldNotSync wiring and review fixes --- pkg/pusher/metrics.go | 19 ++-- pkg/pusher/pusher.go | 16 ++-- pkg/pushsync/pushsync.go | 103 ++++++++++++---------- pkg/storer/internal/upload/uploadstore.go | 3 + 4 files changed, 82 insertions(+), 59 deletions(-) diff --git a/pkg/pusher/metrics.go b/pkg/pusher/metrics.go index 0a92951b42b..30c0233e2fa 100644 --- a/pkg/pusher/metrics.go +++ b/pkg/pusher/metrics.go @@ -10,12 +10,13 @@ import ( ) type metrics struct { - TotalToPush prometheus.Counter - TotalSynced prometheus.Counter - TotalErrors prometheus.Counter - MarkAndSweepTime prometheus.Histogram - SyncTime prometheus.Histogram - ErrorTime prometheus.Histogram + TotalToPush prometheus.Counter + TotalSynced prometheus.Counter + TotalCouldNotSync prometheus.Counter + TotalErrors prometheus.Counter + MarkAndSweepTime prometheus.Histogram + SyncTime prometheus.Histogram + ErrorTime prometheus.Histogram } func newMetrics() metrics { @@ -34,6 +35,12 @@ func newMetrics() metrics { Name: "total_synced", Help: "Total chunks synced successfully with valid receipts.", }), + TotalCouldNotSync: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "total_could_not_sync", + Help: "Total chunks abandoned after exhausting retries with no valid receipt (shallow receipt or no peer in the correct neighbourhood).", + }), TotalErrors: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: m.Namespace, Subsystem: subsystem, diff --git a/pkg/pusher/pusher.go b/pkg/pusher/pusher.go index 560cee6fad0..9f369176ffe 100644 --- a/pkg/pusher/pusher.go +++ b/pkg/pusher/pusher.go @@ -276,13 +276,13 @@ func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) ( loggerV1.Error(err, "pusher: failed reporting chunk") return true, err } + s.attempts.delete(op.identityAddress) case errors.Is(err, pushsync.ErrShallowReceipt): if s.shallowReceipt(op.identityAddress) { return true, err } - // Retry budget exhausted: no peer in the correct neighborhood stored the - // chunk. Report CouldNotSync rather than Synced to avoid falsely marking - // the chunk as delivered. + // budget exhausted; report CouldNotSync so the chunk isn't marked delivered + s.metrics.TotalCouldNotSync.Inc() if err := s.storer.Report(ctx, op.Chunk, storage.ChunkCouldNotSync); err != nil { loggerV1.Error(err, "pusher: failed to report sync status") return true, err @@ -292,6 +292,7 @@ func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) ( loggerV1.Error(err, "pusher: failed to report sync status") return true, err } + s.attempts.delete(op.identityAddress) default: loggerV1.Error(err, "pusher: failed PushChunkToClosest") return true, err @@ -333,13 +334,16 @@ func (s *Service) pushDirect(ctx context.Context, logger log.Logger, op *Op) err if err != nil { loggerV1.Error(err, "pusher: failed to store chunk") } + s.attempts.delete(op.identityAddress) case errors.Is(err, pushsync.ErrShallowReceipt): if s.shallowReceipt(op.identityAddress) { return err } - // out of attempts for retry, swallow error - err = nil - case err != nil: + // budget exhausted; propagate err instead of falsely reporting success + s.metrics.TotalCouldNotSync.Inc() + case err == nil: + s.attempts.delete(op.identityAddress) + default: loggerV1.Error(err, "pusher: failed PushChunkToClosest") } diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 7e50e89cb1b..7c009dc3b29 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -299,10 +299,10 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) switch receipt, err := ps.pushToClosest(ctx, chunk, false); { case errors.Is(err, topology.ErrWantSelf): - // Only store if we are actually within our neighborhood. If the chunk - // is outside our AOR we would store it in a low bin and unreserve() - // would evict it almost immediately, leaving the chunk nowhere on the - // network while the origin believes it was delivered. + // Storing out-of-AOR puts the chunk in a low bin where unreserve() + // will evict it shortly after the origin sees a success receipt. + // rad is StorageRadius (= reserve.Radius), the doubling-aware lower + // bound; CommittedDepth would reject sister neighbourhoods. if swarm.Proximity(ps.address.Bytes(), chunkAddress.Bytes()) < rad { ps.metrics.OutOfDepthStoring.Inc() return ErrOutOfDepthStoring @@ -374,6 +374,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo inflight int parallelForwards = maxMultiplexForwards shallowReceiptResult *pb.Receipt + shallowReceiptPO uint8 ) if origin { @@ -424,41 +425,45 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo // For non-origin peers, if the chunk is not within depth, they may store the chunk if they are the closest peer to the chunk. fullSkip := append(skip.ChunkPeers(idAddress), ps.errSkip.ChunkPeers(idAddress)...) peer, err := ps.closestPeer(ch.Address(), origin, fullSkip) + + // ErrWantSelf on a forwarder can mean closer peers exist but are + // overdraft-skipped; wait for refresh before falling back to self. + if errors.Is(err, topology.ErrNotFound) || errors.Is(err, topology.ErrWantSelf) { + if skip.PruneExpiresAfter(idAddress, overDraftRefresh) > 0 { + ps.metrics.OverdraftRefresh.Inc() + if ps.overDraftRefreshLimiter.Allow() { + ps.logger.Debug("sleeping to refresh overdraft balance") + } + + select { + case <-time.After(overDraftRefresh): + retry() + continue + case <-ctx.Done(): + return nil, ctx.Err() + } + } + } + if errors.Is(err, topology.ErrNotFound) { - if skip.PruneExpiresAfter(idAddress, overDraftRefresh) == 0 { // no overdraft peers, we have depleted ALL peers - if inflight == 0 { - if ps.fullNode { - if cac.Valid(ch) { - go ps.unwrap(ch) - } - // If a peer already has the chunk (even at wrong depth), don't - // store locally — the chunk is closer to its neighbourhood than us. - if shallowReceiptResult != nil { - return shallowReceiptResult, ErrShallowReceipt - } - return nil, topology.ErrWantSelf + if inflight == 0 { + if ps.fullNode { + if cac.Valid(ch) { + go ps.unwrap(ch) } + // prefer a shallow peer over self-store when one exists if shallowReceiptResult != nil { return shallowReceiptResult, ErrShallowReceipt } - ps.logger.Debug("no peers left", "chunk_address", ch.Address(), "error", err) - return nil, err + return nil, topology.ErrWantSelf } - continue // there is still an inflight request, wait for it's result - } - - ps.metrics.OverdraftRefresh.Inc() - if ps.overDraftRefreshLimiter.Allow() { - ps.logger.Debug("sleeping to refresh overdraft balance") - } - - select { - case <-time.After(overDraftRefresh): - retry() - continue - case <-ctx.Done(): - return nil, ctx.Err() + if shallowReceiptResult != nil { + return shallowReceiptResult, ErrShallowReceipt + } + ps.logger.Debug("no peers left", "chunk_address", ch.Address(), "error", err) + return nil, err } + continue // there is still an inflight request, wait for it's result } if err != nil { @@ -503,16 +508,18 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo return result.receipt, nil } - if err := ps.checkReceipt(result.receipt); err == nil { + // Cache the best (highest-PO) shallow receipt and exhaust the + // budget; surface ErrShallowReceipt only when nothing better lands. + switch po, err := ps.checkReceipt(result.receipt); { + case err == nil: return result.receipt, nil - } else if errors.Is(err, ErrShallowReceipt) { - // Treat shallow receipt like any other failure: exhaust the full - // error budget and wait for any other inflight parallel pushes - // (e.g. multiplex forwards) before giving up. Only return - // ErrShallowReceipt once the entire budget is spent. - shallowReceiptResult = result.receipt - result.err = err - } else { + case errors.Is(err, ErrShallowReceipt): + if shallowReceiptResult == nil || po > shallowReceiptPO { + shallowReceiptResult = result.receipt + shallowReceiptPO = po + } + fallthrough + default: result.err = err } } @@ -589,24 +596,26 @@ func (ps *PushSync) push(parentCtx context.Context, resultChan chan<- receiptRes err = action.Apply() } -func (ps *PushSync) checkReceipt(receipt *pb.Receipt) error { +// checkReceipt validates the receipt and returns the storer-to-chunk PO so +// callers can rank shallow receipts; PO is zero on signature errors. +func (ps *PushSync) checkReceipt(receipt *pb.Receipt) (uint8, error) { addr := swarm.NewAddress(receipt.Address) publicKey, err := crypto.Recover(receipt.Signature, addr.Bytes()) if err != nil { - return fmt.Errorf("pushsync: receipt recover: %w", err) + return 0, fmt.Errorf("pushsync: receipt recover: %w", err) } peer, err := crypto.NewOverlayAddress(*publicKey, ps.networkID, receipt.Nonce) if err != nil { - return fmt.Errorf("pushsync: receipt storer address: %w", err) + return 0, fmt.Errorf("pushsync: receipt storer address: %w", err) } po := swarm.Proximity(addr.Bytes(), peer.Bytes()) r, err := ps.radius() if err != nil { - return fmt.Errorf("pushsync: storage radius: %w", err) + return po, fmt.Errorf("pushsync: storage radius: %w", err) } var tolerance uint8 @@ -618,13 +627,13 @@ func (ps *PushSync) checkReceipt(receipt *pb.Receipt) error { ps.metrics.ShallowReceiptDepth.WithLabelValues(strconv.Itoa(int(po))).Inc() ps.metrics.ShallowReceipt.Inc() ps.logger.Debug("shallow receipt", "chunk_address", addr, "peer_address", peer, "proximity_order", po, "peer_radius", receipt.StorageRadius, "self_radius", r) - return ErrShallowReceipt + return po, ErrShallowReceipt } ps.metrics.ReceiptDepth.WithLabelValues(strconv.Itoa(int(po))).Inc() ps.logger.Debug("chunk pushed", "chunk_address", addr, "peer_address", peer, "proximity_order", po) - return nil + return po, nil } func (ps *PushSync) pushChunkToPeer(ctx context.Context, peer swarm.Address, ch swarm.Chunk) (receipt *pb.Receipt, err error) { diff --git a/pkg/storer/internal/upload/uploadstore.go b/pkg/storer/internal/upload/uploadstore.go index 51e99fa16d3..38d53d97460 100644 --- a/pkg/storer/internal/upload/uploadstore.go +++ b/pkg/storer/internal/upload/uploadstore.go @@ -611,6 +611,9 @@ func Report(ctx context.Context, st transaction.Store, chunk swarm.Chunk, state ti.Synced++ case storage.ChunkSynced: ti.Synced++ + case storage.ChunkCouldNotSync: + // no Synced bump: failure is observable via Split-Synced and the + // pusher's total_could_not_sync metric } err = indexStore.Put(ti) From 1e13f0a5ea32658ad8cae7a3961c3bf5a94a2ec6 Mon Sep 17 00:00:00 2001 From: Ljubisa Gacevic Date: Thu, 30 Apr 2026 21:57:34 +0200 Subject: [PATCH 9/9] fix(pushsync): strict shallow receipt check, no tolerance slop --- pkg/node/node.go | 3 +- pkg/pushsync/pushsync.go | 58 ++++++++---------- pkg/pushsync/pushsync_integration_test.go | 2 - pkg/pushsync/pushsync_test.go | 75 ++--------------------- 4 files changed, 31 insertions(+), 107 deletions(-) diff --git a/pkg/node/node.go b/pkg/node/node.go index 35b78d4dc05..d0f6d050cc2 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -291,7 +291,6 @@ func NewBee( if o.ReserveCapacityDoubling < 0 || o.ReserveCapacityDoubling > maxAllowedDoubling { return nil, fmt.Errorf("config reserve capacity doubling has to be between default: 0 and maximum: %d", maxAllowedDoubling) } - shallowReceiptTolerance := maxAllowedDoubling - o.ReserveCapacityDoubling reserveCapacity := (1 << o.ReserveCapacityDoubling) * storer.DefaultReserveCapacity @@ -1047,7 +1046,7 @@ func NewBee( } } - pushSyncProtocol := pushsync.New(swarmAddress, networkID, nonce, p2ps, localStore, waitNetworkRFunc, kad, o.FullNodeMode && !o.BootnodeMode, pssService.TryUnwrap, gsocService.Handle, validStamp, logger, acc, pricer, signer, tracer, detector, uint8(shallowReceiptTolerance)) + pushSyncProtocol := pushsync.New(swarmAddress, networkID, nonce, p2ps, localStore, waitNetworkRFunc, kad, o.FullNodeMode && !o.BootnodeMode, pssService.TryUnwrap, gsocService.Handle, validStamp, logger, acc, pricer, signer, tracer, detector) b.pushSyncCloser = pushSyncProtocol // set the pushSyncer in the PSS diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 7c009dc3b29..8e4610bb764 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -79,27 +79,25 @@ type Storer interface { } type PushSync struct { - address swarm.Address - networkID uint64 - radius func() (uint8, error) - nonce []byte - streamer p2p.StreamerDisconnecter - store Storer - topologyDriver topology.Driver - unwrap func(swarm.Chunk) - gsocHandler func(*soc.SOC) - logger log.Logger - accounting accounting.Interface - pricer pricer.Interface - metrics metrics - tracer *tracing.Tracer - validStamp postage.ValidStampFn - signer crypto.Signer - fullNode bool - errSkip *skippeers.List - stabilizer stabilization.Subscriber - - shallowReceiptTolerance uint8 + address swarm.Address + networkID uint64 + radius func() (uint8, error) + nonce []byte + streamer p2p.StreamerDisconnecter + store Storer + topologyDriver topology.Driver + unwrap func(swarm.Chunk) + gsocHandler func(*soc.SOC) + logger log.Logger + accounting accounting.Interface + pricer pricer.Interface + metrics metrics + tracer *tracing.Tracer + validStamp postage.ValidStampFn + signer crypto.Signer + fullNode bool + errSkip *skippeers.List + stabilizer stabilization.Subscriber overDraftRefreshLimiter *rate.Limiter } @@ -128,7 +126,6 @@ func New( signer crypto.Signer, tracer *tracing.Tracer, stabilizer stabilization.Subscriber, - shallowReceiptTolerance uint8, ) *PushSync { ps := &PushSync{ address: address, @@ -149,7 +146,6 @@ func New( signer: signer, errSkip: skippeers.NewList(time.Minute), stabilizer: stabilizer, - shallowReceiptTolerance: shallowReceiptTolerance, overDraftRefreshLimiter: rate.NewLimiter(rate.Every(time.Second), 1), } @@ -299,10 +295,8 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) switch receipt, err := ps.pushToClosest(ctx, chunk, false); { case errors.Is(err, topology.ErrWantSelf): - // Storing out-of-AOR puts the chunk in a low bin where unreserve() - // will evict it shortly after the origin sees a success receipt. - // rad is StorageRadius (= reserve.Radius), the doubling-aware lower - // bound; CommittedDepth would reject sister neighbourhoods. + // Out-of-AOR chunks are unreachable via retrieval even when not + // evicted; let the origin try the next peer instead. if swarm.Proximity(ps.address.Bytes(), chunkAddress.Bytes()) < rad { ps.metrics.OutOfDepthStoring.Inc() return ErrOutOfDepthStoring @@ -597,7 +591,8 @@ func (ps *PushSync) push(parentCtx context.Context, resultChan chan<- receiptRes } // checkReceipt validates the receipt and returns the storer-to-chunk PO so -// callers can rank shallow receipts; PO is zero on signature errors. +// callers can rank shallow receipts; PO is zero on signature errors. Strict +// po >= rad: a chunk is not synced until it lands within the AOR. func (ps *PushSync) checkReceipt(receipt *pb.Receipt) (uint8, error) { addr := swarm.NewAddress(receipt.Address) @@ -618,12 +613,7 @@ func (ps *PushSync) checkReceipt(receipt *pb.Receipt) (uint8, error) { return po, fmt.Errorf("pushsync: storage radius: %w", err) } - var tolerance uint8 - if r >= ps.shallowReceiptTolerance { // check for underflow of uint8 - tolerance = r - ps.shallowReceiptTolerance - } - - if po < tolerance || uint32(po) < receipt.StorageRadius { + if po < r || uint32(po) < receipt.StorageRadius { ps.metrics.ShallowReceiptDepth.WithLabelValues(strconv.Itoa(int(po))).Inc() ps.metrics.ShallowReceipt.Inc() ps.logger.Debug("shallow receipt", "chunk_address", addr, "peer_address", peer, "proximity_order", po, "peer_radius", receipt.StorageRadius, "self_radius", r) diff --git a/pkg/pushsync/pushsync_integration_test.go b/pkg/pushsync/pushsync_integration_test.go index e9b2fa30f11..001033111b7 100644 --- a/pkg/pushsync/pushsync_integration_test.go +++ b/pkg/pushsync/pushsync_integration_test.go @@ -70,7 +70,6 @@ func TestPushSyncIntegration(t *testing.T) { serverSigner, nil, stabilizationmock.NewSubscriber(true), - 0, ) t.Cleanup(func() { serverPushSync.Close() }) @@ -106,7 +105,6 @@ func TestPushSyncIntegration(t *testing.T) { clientSigner, nil, stabilizationmock.NewSubscriber(true), - 0, ) t.Cleanup(func() { clientPushSync.Close() }) diff --git a/pkg/pushsync/pushsync_test.go b/pkg/pushsync/pushsync_test.go index b4e47356092..09a4b6c329f 100644 --- a/pkg/pushsync/pushsync_test.go +++ b/pkg/pushsync/pushsync_test.go @@ -237,19 +237,18 @@ func TestShallowReceipt(t *testing.T) { storerRadius := 1 chunkProximity := 0 pivotRadius := 31 - pivotTolerance := uint8(0) pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") chunk := testingc.GenerateValidRandomChunkAt(t, closestPeer, chunkProximity) // storer: proximity > storerRadius → within AOR → stores and sends receipt - psPeer, _ := createPushSyncNodeWithRadius(t, closestPeer, defaultPrices, nil, nil, signer, uint8(storerRadius), 0, mock.WithClosestPeerErr(topology.ErrWantSelf)) + psPeer, _ := createPushSyncNodeWithRadius(t, closestPeer, defaultPrices, nil, nil, signer, uint8(storerRadius), mock.WithClosestPeerErr(topology.ErrWantSelf)) recorder := streamtest.New(streamtest.WithProtocols(psPeer.Protocol()), streamtest.WithBaseAddr(pivotNode)) // pivot: stricter radius → origin considers the receipt shallow - psPivot, _ := createPushSyncNodeWithRadius(t, pivotNode, defaultPrices, recorder, nil, nil, uint8(pivotRadius), pivotTolerance, mock.WithClosestPeer(closestPeer)) + psPivot, _ := createPushSyncNodeWithRadius(t, pivotNode, defaultPrices, recorder, nil, nil, uint8(pivotRadius), mock.WithClosestPeer(closestPeer)) receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk) if !errors.Is(err, pushsync.ErrShallowReceipt) { @@ -283,11 +282,11 @@ func TestOutOfDepthStoring(t *testing.T) { pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") closestPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000") - psPeer, _ := createPushSyncNodeWithRadius(t, closestPeer, defaultPrices, nil, nil, defaultSigner(chunk), highPO, 0, mock.WithClosestPeerErr(topology.ErrWantSelf)) + psPeer, _ := createPushSyncNodeWithRadius(t, closestPeer, defaultPrices, nil, nil, defaultSigner(chunk), highPO, mock.WithClosestPeerErr(topology.ErrWantSelf)) recorder := streamtest.New(streamtest.WithProtocols(psPeer.Protocol()), streamtest.WithBaseAddr(pivotNode)) - psPivot, _ := createPushSyncNodeWithRadius(t, pivotNode, defaultPrices, recorder, nil, defaultSigner(chunk), highPO, 0, mock.WithClosestPeer(closestPeer)) + psPivot, _ := createPushSyncNodeWithRadius(t, pivotNode, defaultPrices, recorder, nil, defaultSigner(chunk), highPO, mock.WithClosestPeer(closestPeer)) _, err := psPivot.PushChunkToClosest(context.Background(), chunk) @@ -298,67 +297,6 @@ func TestOutOfDepthStoring(t *testing.T) { } } -// TestShallowReceiptTolerance sends back a shallow receipt but because of the tolerance level, the origin node accepts the receipts. -func TestShallowReceiptTolerance(t *testing.T) { - t.Parallel() - - key, err := crypto.GenerateSecp256k1Key() - if err != nil { - t.Fatal(err) - } - - signer := crypto.NewDefaultSigner(key) - - pubKey, err := signer.PublicKey() - if err != nil { - t.Fatal(err) - } - - closestPeer, err := crypto.NewOverlayAddress(*pubKey, 1, blockHash.Bytes()) - if err != nil { - t.Fatal(err) - } - - storerRadius := 2 - chunkProximity := 2 - - pivotRadius := 4 - pivotTolerance := uint8(2) - - // create a pivot node and a mocked closest node - pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") - - chunk := testingc.GenerateValidRandomChunkAt(t, closestPeer, chunkProximity) - - // peer is the node responding to the chunk receipt message - // mock should return ErrWantSelf since there's no one to forward to - psPeer, _ := createPushSyncNodeWithRadius(t, closestPeer, defaultPrices, nil, nil, signer, uint8(storerRadius), 0, mock.WithClosestPeerErr(topology.ErrWantSelf)) - - recorder := streamtest.New(streamtest.WithProtocols(psPeer.Protocol()), streamtest.WithBaseAddr(pivotNode)) - - // pivot node needs the streamer since the chunk is intercepted by - // the chunk worker, then gets sent by opening a new stream - psPivot, _ := createPushSyncNodeWithRadius(t, pivotNode, defaultPrices, recorder, nil, nil, uint8(pivotRadius), pivotTolerance, mock.WithClosestPeer(closestPeer)) - - // Trigger the sending of chunk to the closest node - receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk) - if !chunk.Address().Equal(receipt.Address) { - t.Fatal("invalid receipt") - } - if err != nil { - t.Fatalf("got %v, want %v", err, nil) - } - if got := swarm.Proximity(receipt.Address.Bytes(), closestPeer.Bytes()); got < uint8(chunkProximity) { - t.Fatalf("got %v, want at least %v", got, chunkProximity) - } - - // this intercepts the outgoing delivery message - waitOnRecordAndTest(t, closestPeer, recorder, chunk.Address(), chunk.Data()) - - // this intercepts the incoming receipt message - waitOnRecordAndTest(t, closestPeer, recorder, chunk.Address(), nil) -} - // TestPushChunkToClosest tests the sending of chunk to closest peer from the origination source perspective. // it also checks whether the tags are incremented properly if they are present func TestPushChunkToClosest(t *testing.T) { @@ -1060,7 +998,6 @@ func createPushSyncNodeWithRadius( unwrap func(swarm.Chunk), signer crypto.Signer, radius uint8, - shallowReceiptTolerance uint8, mockOpts ...mock.Option, ) (*pushsync.PushSync, *testStorer) { t.Helper() @@ -1083,7 +1020,7 @@ func createPushSyncNodeWithRadius( radiusFunc := func() (uint8, error) { return radius, nil } - ps := pushsync.New(addr, 1, blockHash.Bytes(), recorderDisconnecter, storer, radiusFunc, mockTopology, true, unwrap, func(*soc.SOC) {}, validStamp, log.Noop, accountingmock.NewAccounting(), mockPricer, signer, nil, stabilmock.NewSubscriber(true), shallowReceiptTolerance) + ps := pushsync.New(addr, 1, blockHash.Bytes(), recorderDisconnecter, storer, radiusFunc, mockTopology, true, unwrap, func(*soc.SOC) {}, validStamp, log.Noop, accountingmock.NewAccounting(), mockPricer, signer, nil, stabilmock.NewSubscriber(true)) t.Cleanup(func() { ps.Close() }) return ps, storer @@ -1124,7 +1061,7 @@ func createPushSyncNodeWithAccounting( radiusFunc := func() (uint8, error) { return 0, nil } - ps := pushsync.New(addr, 1, blockHash.Bytes(), recorderDisconnecter, storer, radiusFunc, mockTopology, true, unwrap, gsocListener, validStamp, logger, acct, mockPricer, signer, nil, stabilmock.NewSubscriber(true), 0) + ps := pushsync.New(addr, 1, blockHash.Bytes(), recorderDisconnecter, storer, radiusFunc, mockTopology, true, unwrap, gsocListener, validStamp, logger, acct, mockPricer, signer, nil, stabilmock.NewSubscriber(true)) t.Cleanup(func() { ps.Close() }) return ps, storer