From 1c2ddef162481aebd1d9e18d7649ad79d6363602 Mon Sep 17 00:00:00 2001 From: sbackend Date: Fri, 17 Apr 2026 11:21:06 +0200 Subject: [PATCH 1/4] fix: add metrics for investigation --- pkg/pushsync/metrics.go | 20 ++++++++++++++++++++ pkg/pushsync/pushsync.go | 27 ++++++++++++++++++++++----- 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/pkg/pushsync/metrics.go b/pkg/pushsync/metrics.go index 758a6f072a5..323aee5de98 100644 --- a/pkg/pushsync/metrics.go +++ b/pkg/pushsync/metrics.go @@ -29,6 +29,8 @@ type metrics struct { ShallowReceiptDepth *prometheus.CounterVec ShallowReceipt prometheus.Counter OverdraftRefresh prometheus.Counter + WantSelf *prometheus.CounterVec + StoreReason *prometheus.CounterVec } func newMetrics() metrics { @@ -153,6 +155,24 @@ func newMetrics() metrics { Name: "overdraft_refresh", Help: "Total number of times peers were skipped due to overdraft, requiring a wait to refresh balance.", }), + WantSelf: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "want_self_total", + Help: "Total number of times pushsync concluded that the local node should store the chunk.", + }, + []string{"cause", "origin"}, + ), + StoreReason: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "store_reason_total", + Help: "Total number of times chunks were stored locally by pushsync, partitioned by reason.", + }, + []string{"reason"}, + ), } } diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 05c4aef3d41..7a90b8b0b5b 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -56,6 +56,13 @@ const ( maxPushErrors = 32 ) +const ( + wantSelfCauseClosestIsSelf = "closest_is_self" + wantSelfCauseNoPeerLeft = "no_peer_left" + storeReasonWithinAOR = "within_aor" + storeReasonWantSelf = "want_self" +) + var ( ErrNoPush = errors.New("could not push chunk") ErrOutOfDepthStoring = errors.New("storing outside of the neighborhood") @@ -293,13 +300,15 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) } if ps.topologyDriver.IsReachable() && swarm.Proximity(ps.address.Bytes(), chunkAddress.Bytes()) >= rad { - stored, reason = true, "is within AOR" + stored, reason = true, storeReasonWithinAOR + ps.metrics.StoreReason.WithLabelValues(reason).Inc() return store(ctx) } switch receipt, err := ps.pushToClosest(ctx, chunk, false); { case errors.Is(err, topology.ErrWantSelf): - stored, reason = true, "want self" + stored, reason = true, storeReasonWantSelf + ps.metrics.StoreReason.WithLabelValues(reason).Inc() return store(ctx) case err == nil: ps.metrics.Forwarder.Inc() @@ -422,6 +431,7 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo if cac.Valid(ch) { go ps.unwrap(ch) } + ps.incWantSelf(wantSelfCauseNoPeerLeft, origin) return nil, topology.ErrWantSelf } ps.logger.Debug("no peers left", "chunk_address", ch.Address(), "error", err) @@ -513,16 +523,23 @@ func (ps *PushSync) closestPeer(chunkAddress swarm.Address, origin bool, skipLis peer, err := ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{Reachable: true, Healthy: true}, skipList...) if errors.Is(err, topology.ErrNotFound) { - peer, err := ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{Reachable: true}, skipList...) + peer, err = ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{Reachable: true}, skipList...) if errors.Is(err, topology.ErrNotFound) { - return ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{}, skipList...) + peer, err = ps.topologyDriver.ClosestPeer(chunkAddress, includeSelf, topology.Select{}, skipList...) } - return peer, err + } + + if errors.Is(err, topology.ErrWantSelf) { + ps.incWantSelf(wantSelfCauseClosestIsSelf, origin) } return peer, err } +func (ps *PushSync) incWantSelf(cause string, origin bool) { + ps.metrics.WantSelf.WithLabelValues(cause, strconv.FormatBool(origin)).Inc() +} + func (ps *PushSync) push(parentCtx context.Context, resultChan chan<- receiptResult, peer swarm.Address, ch swarm.Chunk, action accounting.Action) { // here we use a background timeout context because we do not want another push attempt to cancel this one ctx, cancel := context.WithTimeout(context.Background(), defaultTTL) From f809d00600eb5f9fda8f86acab4dd1a864a4d437 Mon Sep 17 00:00:00 2001 From: sbackend Date: Fri, 17 Apr 2026 12:00:16 +0200 Subject: [PATCH 2/4] fix: update contract for light-testnet --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 6885b1805d0..cdd45e7693e 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/ethereum/go-ethereum v1.16.9 github.com/ethersphere/batch-archive v0.0.6 github.com/ethersphere/go-price-oracle-abi v0.6.9 - github.com/ethersphere/go-storage-incentives-abi v0.9.4 + github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc4 github.com/ethersphere/go-sw3-abi v0.6.9 github.com/ethersphere/langos v1.0.0 github.com/go-playground/validator/v10 v10.19.0 diff --git a/go.sum b/go.sum index d0fc255af2a..4b7ae9ac480 100644 --- a/go.sum +++ b/go.sum @@ -254,8 +254,8 @@ github.com/ethersphere/batch-archive v0.0.6 h1:Nt9mundj8CXT42qgGdq1sqKIVOk4KkKC4 github.com/ethersphere/batch-archive v0.0.6/go.mod h1:41BPb192NoK9CYjNB8BAE1J2MtiI/5aq0Wtas5O7A7Q= github.com/ethersphere/go-price-oracle-abi v0.6.9 h1:bseen6he3PZv5GHOm+KD6s4awaFmVSD9LFx+HpB6rCU= github.com/ethersphere/go-price-oracle-abi v0.6.9/go.mod h1:sI/Qj4/zJ23/b1enzwMMv0/hLTpPNVNacEwCWjo6yBk= -github.com/ethersphere/go-storage-incentives-abi v0.9.4 h1:mSIWXQXg5OQmH10QvXMV5w0vbSibFMaRlBL37gPLTM0= -github.com/ethersphere/go-storage-incentives-abi v0.9.4/go.mod h1:SXvJVtM4sEsaSKD0jc1ClpDLw8ErPoROZDme4Wrc/Nc= +github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc4 h1:YK9FpiQz29ctU5V46CuwMt+4X5Xn8FTBwy6E2v/ix8s= +github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc4/go.mod h1:SXvJVtM4sEsaSKD0jc1ClpDLw8ErPoROZDme4Wrc/Nc= github.com/ethersphere/go-sw3-abi v0.6.9 h1:TnWLnYkWE5UvC17mQBdUmdkzhPhO8GcqvWy4wvd1QJQ= github.com/ethersphere/go-sw3-abi v0.6.9/go.mod h1:BmpsvJ8idQZdYEtWnvxA8POYQ8Rl/NhyCdF0zLMOOJU= github.com/ethersphere/langos v1.0.0 h1:NBtNKzXTTRSue95uOlzPN4py7Aofs0xWPzyj4AI1Vcc= From 3e8e3d66e47061d0e095b093fdd665bbbfd31ef5 Mon Sep 17 00:00:00 2001 From: sbackend Date: Fri, 17 Apr 2026 21:52:57 +0200 Subject: [PATCH 3/4] fix: dirty fix for unmarshal --- pkg/bigint/bigint.go | 27 +++++++++++++++++++-------- pkg/bigint/bigint_test.go | 29 +++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 8 deletions(-) diff --git a/pkg/bigint/bigint.go b/pkg/bigint/bigint.go index ce03a464600..c83cfffed2d 100644 --- a/pkg/bigint/bigint.go +++ b/pkg/bigint/bigint.go @@ -23,18 +23,25 @@ func (i *BigInt) MarshalJSON() ([]byte, error) { } func (i *BigInt) UnmarshalJSON(b []byte) error { - var val string - err := json.Unmarshal(b, &val) - if err != nil { - return err - } - if i.Int == nil { i.Int = new(big.Int) } - i.SetString(val, 10) + var val string + if err := json.Unmarshal(b, &val); err == nil { + if _, ok := i.SetString(val, 10); !ok { + return fmt.Errorf("bigint: invalid decimal string %q", val) + } + return nil + } + var num json.Number + if err := json.Unmarshal(b, &num); err != nil { + return err + } + if _, ok := i.SetString(num.String(), 10); !ok { + return fmt.Errorf("bigint: invalid json number %q", num.String()) + } return nil } @@ -58,5 +65,9 @@ func (i *BigInt) UnmarshalBinary(data []byte) error { return fmt.Errorf("bigint: UnmarshalBinary called with empty data") } i.Int = new(big.Int) - return i.GobDecode(data) + if err := i.GobDecode(data); err != nil { + // fallback: try JSON + return i.UnmarshalJSON(data) + } + return nil } diff --git a/pkg/bigint/bigint_test.go b/pkg/bigint/bigint_test.go index 1b6cdf824e3..8d63a50497b 100644 --- a/pkg/bigint/bigint_test.go +++ b/pkg/bigint/bigint_test.go @@ -117,3 +117,32 @@ func TestMarshaling(t *testing.T) { t.Error("Wrongly marshaled data") } } + +func TestUnmarshalJSONAcceptsStringAndNumber(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + input string + want string + }{ + {name: "string", input: `"123456789"`, want: "123456789"}, + {name: "number", input: `123456789`, want: "123456789"}, + {name: "negative number", input: `-42`, want: "-42"}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + var got bigint.BigInt + if err := got.UnmarshalJSON([]byte(tc.input)); err != nil { + t.Fatalf("UnmarshalJSON: %v", err) + } + + if got.String() != tc.want { + t.Fatalf("got %s, want %s", got.String(), tc.want) + } + }) + } +} From 83773d77d05ea9b947b6ee3cd4f0284cdd593fb5 Mon Sep 17 00:00:00 2001 From: sbackend Date: Mon, 20 Apr 2026 15:30:55 +0200 Subject: [PATCH 4/4] fix: add metrics to define anomaly node_wants_itself vs normal node_wants_itself --- go.mod | 2 +- go.sum | 4 ++-- pkg/pushsync/metrics.go | 7 +++++++ pkg/pushsync/pushsync.go | 3 +++ 4 files changed, 13 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index cdd45e7693e..6885b1805d0 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/ethereum/go-ethereum v1.16.9 github.com/ethersphere/batch-archive v0.0.6 github.com/ethersphere/go-price-oracle-abi v0.6.9 - github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc4 + github.com/ethersphere/go-storage-incentives-abi v0.9.4 github.com/ethersphere/go-sw3-abi v0.6.9 github.com/ethersphere/langos v1.0.0 github.com/go-playground/validator/v10 v10.19.0 diff --git a/go.sum b/go.sum index 4b7ae9ac480..d0fc255af2a 100644 --- a/go.sum +++ b/go.sum @@ -254,8 +254,8 @@ github.com/ethersphere/batch-archive v0.0.6 h1:Nt9mundj8CXT42qgGdq1sqKIVOk4KkKC4 github.com/ethersphere/batch-archive v0.0.6/go.mod h1:41BPb192NoK9CYjNB8BAE1J2MtiI/5aq0Wtas5O7A7Q= github.com/ethersphere/go-price-oracle-abi v0.6.9 h1:bseen6he3PZv5GHOm+KD6s4awaFmVSD9LFx+HpB6rCU= github.com/ethersphere/go-price-oracle-abi v0.6.9/go.mod h1:sI/Qj4/zJ23/b1enzwMMv0/hLTpPNVNacEwCWjo6yBk= -github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc4 h1:YK9FpiQz29ctU5V46CuwMt+4X5Xn8FTBwy6E2v/ix8s= -github.com/ethersphere/go-storage-incentives-abi v0.9.3-rc4/go.mod h1:SXvJVtM4sEsaSKD0jc1ClpDLw8ErPoROZDme4Wrc/Nc= +github.com/ethersphere/go-storage-incentives-abi v0.9.4 h1:mSIWXQXg5OQmH10QvXMV5w0vbSibFMaRlBL37gPLTM0= +github.com/ethersphere/go-storage-incentives-abi v0.9.4/go.mod h1:SXvJVtM4sEsaSKD0jc1ClpDLw8ErPoROZDme4Wrc/Nc= github.com/ethersphere/go-sw3-abi v0.6.9 h1:TnWLnYkWE5UvC17mQBdUmdkzhPhO8GcqvWy4wvd1QJQ= github.com/ethersphere/go-sw3-abi v0.6.9/go.mod h1:BmpsvJ8idQZdYEtWnvxA8POYQ8Rl/NhyCdF0zLMOOJU= github.com/ethersphere/langos v1.0.0 h1:NBtNKzXTTRSue95uOlzPN4py7Aofs0xWPzyj4AI1Vcc= diff --git a/pkg/pushsync/metrics.go b/pkg/pushsync/metrics.go index 323aee5de98..d8fba31a29d 100644 --- a/pkg/pushsync/metrics.go +++ b/pkg/pushsync/metrics.go @@ -31,6 +31,7 @@ type metrics struct { OverdraftRefresh prometheus.Counter WantSelf *prometheus.CounterVec StoreReason *prometheus.CounterVec + WantSelfOutOfDepth prometheus.Counter } func newMetrics() metrics { @@ -173,6 +174,12 @@ func newMetrics() metrics { }, []string{"reason"}, ), + WantSelfOutOfDepth: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "want_self_out_of_depth_total", + Help: "Total number of times a chunk was stored via ErrWantSelf with proximity strictly below the storage radius.", + }), } } diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 7a90b8b0b5b..dd05f3a1552 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -309,6 +309,9 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) case errors.Is(err, topology.ErrWantSelf): stored, reason = true, storeReasonWantSelf ps.metrics.StoreReason.WithLabelValues(reason).Inc() + if swarm.Proximity(ps.address.Bytes(), chunkAddress.Bytes()) < rad { + ps.metrics.WantSelfOutOfDepth.Inc() + } return store(ctx) case err == nil: ps.metrics.Forwarder.Inc()