Skip to content

Commit 7501da7

Browse files
committed
fix(v2): make segment upload hedging limiter respect context
1 parent 3b51386 commit 7501da7

File tree

6 files changed

+117
-139
lines changed

6 files changed

+117
-139
lines changed

pkg/experiment/ingester/segment.go

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/opentracing/opentracing-go"
2323
"github.com/thanos-io/objstore"
2424
"golang.org/x/exp/maps"
25+
"golang.org/x/time/rate"
2526

2627
profilev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1"
2728
metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
@@ -51,9 +52,9 @@ type segmentsWriter struct {
5152
ctx context.Context
5253
cancel context.CancelFunc
5354

54-
metrics *segmentMetrics
55-
headMetrics *memdb.HeadMetrics
56-
retryLimiter *retry.RateLimiter
55+
metrics *segmentMetrics
56+
headMetrics *memdb.HeadMetrics
57+
hedgedUploadLimiter *rate.Limiter
5758
}
5859

5960
type shard struct {
@@ -137,7 +138,7 @@ func newSegmentWriter(l log.Logger, metrics *segmentMetrics, hm *memdb.HeadMetri
137138
shards: make(map[shardKey]*shard),
138139
metastore: metastoreClient,
139140
}
140-
sw.retryLimiter = retry.NewRateLimiter(sw.config.UploadHedgeRateMax, int(sw.config.UploadHedgeRateBurst))
141+
sw.hedgedUploadLimiter = rate.NewLimiter(rate.Limit(sw.config.UploadHedgeRateMax), int(sw.config.UploadHedgeRateBurst))
141142
sw.ctx, sw.cancel = context.WithCancel(context.Background())
142143
flushWorkers := runtime.GOMAXPROCS(-1)
143144
if config.FlushConcurrency > 0 {
@@ -605,39 +606,38 @@ func (sw *segmentsWriter) uploadBlock(ctx context.Context, blockData []byte, met
605606
// if the request is not completed within a certain time, we trigger
606607
// a second upload attempt. Upload errors are retried explicitly and
607608
// are included into the call duration.
608-
uploadWithRetry := func(ctx context.Context, hedge bool) (any, error) {
609-
retryConfig := backoff.Config{
610-
MinBackoff: sw.config.UploadMinBackoff,
611-
MaxBackoff: sw.config.UploadMaxBackoff,
612-
MaxRetries: sw.config.UploadMaxRetries,
613-
}
614-
var attemptErr error
615-
if hedge {
616-
// Hedged requests are not retried.
617-
retryConfig.MaxRetries = 1
618-
attemptStart := time.Now()
619-
defer func() {
620-
sw.metrics.segmentHedgedUploadDuration.
621-
WithLabelValues(statusLabelValue(attemptErr)).
622-
Observe(time.Since(attemptStart).Seconds())
623-
}()
624-
}
625-
// Retry on all errors.
626-
retries := backoff.New(ctx, retryConfig)
627-
for retries.Ongoing() && ctx.Err() == nil {
628-
if attemptErr = sw.bucket.Upload(ctx, path, bytes.NewReader(blockData)); attemptErr == nil {
629-
break
630-
}
631-
retries.Wait()
632-
}
633-
return nil, attemptErr
634-
}
635-
636609
hedgedUpload := retry.Hedged[any]{
637-
Call: uploadWithRetry,
638-
Trigger: time.After(sw.config.UploadHedgeAfter),
639-
Throttler: sw.retryLimiter,
640-
FailFast: false,
610+
Trigger: time.After(sw.config.UploadHedgeAfter),
611+
Call: func(ctx context.Context, hedge bool) (any, error) {
612+
retryConfig := backoff.Config{
613+
MinBackoff: sw.config.UploadMinBackoff,
614+
MaxBackoff: sw.config.UploadMaxBackoff,
615+
MaxRetries: sw.config.UploadMaxRetries,
616+
}
617+
var attemptErr error
618+
if hedge {
619+
if limitErr := sw.hedgedUploadLimiter.Wait(ctx); limitErr != nil {
620+
return nil, limitErr
621+
}
622+
// Hedged requests are not retried.
623+
retryConfig.MaxRetries = 0
624+
attemptStart := time.Now()
625+
defer func() {
626+
sw.metrics.segmentHedgedUploadDuration.
627+
WithLabelValues(statusLabelValue(attemptErr)).
628+
Observe(time.Since(attemptStart).Seconds())
629+
}()
630+
}
631+
// Retry on all errors.
632+
retries := backoff.New(ctx, retryConfig)
633+
for retries.Ongoing() {
634+
if attemptErr = sw.bucket.Upload(ctx, path, bytes.NewReader(blockData)); attemptErr == nil {
635+
break
636+
}
637+
retries.Wait()
638+
}
639+
return nil, attemptErr
640+
},
641641
}
642642

643643
if _, err = hedgedUpload.Do(ctx); err != nil {

pkg/experiment/ingester/segment_test.go

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package ingester
33
import (
44
"bytes"
55
"context"
6+
"errors"
7+
"flag"
68
"fmt"
79
"io"
810
"math/rand"
@@ -14,21 +16,21 @@ import (
1416
"testing"
1517
"time"
1618

17-
"github.com/grafana/pyroscope/pkg/experiment/block/metadata"
18-
1919
gprofile "github.com/google/pprof/profile"
2020
"github.com/grafana/dskit/flagext"
2121
model2 "github.com/prometheus/common/model"
2222
"github.com/stretchr/testify/assert"
2323
"github.com/stretchr/testify/mock"
2424
"github.com/stretchr/testify/require"
25+
"golang.org/x/time/rate"
2526

2627
profilev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1"
2728
ingesterv1 "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1"
2829
"github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1/ingesterv1connect"
2930
metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
3031
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
3132
"github.com/grafana/pyroscope/pkg/experiment/block"
33+
"github.com/grafana/pyroscope/pkg/experiment/block/metadata"
3234
"github.com/grafana/pyroscope/pkg/experiment/ingester/memdb"
3335
testutil2 "github.com/grafana/pyroscope/pkg/experiment/ingester/memdb/testutil"
3436
"github.com/grafana/pyroscope/pkg/experiment/metastore"
@@ -1004,3 +1006,78 @@ func hasUnsymbolizedLabel(t *testing.T, block *metastorev1.BlockMeta) bool {
10041006
}
10051007
return false
10061008
}
1009+
1010+
type mockBucket struct {
1011+
*memory.InMemBucket
1012+
uploads atomic.Int64
1013+
}
1014+
1015+
func (m *mockBucket) Upload(ctx context.Context, _ string, _ io.Reader) error {
1016+
m.uploads.Add(1)
1017+
<-ctx.Done()
1018+
return ctx.Err()
1019+
}
1020+
1021+
func TestUploadBlock_HedgedUploadLimiter(t *testing.T) {
1022+
t.Run("available", func(t *testing.T) {
1023+
t.Parallel()
1024+
1025+
bucket := &mockBucket{InMemBucket: memory.NewInMemBucket()}
1026+
logger := test.NewTestingLogger(t)
1027+
1028+
var config Config
1029+
config.RegisterFlags(flag.NewFlagSet("test", flag.PanicOnError))
1030+
config.UploadTimeout = time.Millisecond * 250
1031+
config.UploadHedgeAfter = time.Millisecond
1032+
config.UploadHedgeRateMax = 10
1033+
config.UploadHedgeRateBurst = 10
1034+
config.UploadMaxRetries = 0
1035+
1036+
sw := &segmentsWriter{
1037+
config: config,
1038+
logger: logger,
1039+
bucket: bucket,
1040+
hedgedUploadLimiter: rate.NewLimiter(rate.Limit(config.UploadHedgeRateMax), int(config.UploadHedgeRateBurst)),
1041+
metrics: newSegmentMetrics(nil),
1042+
}
1043+
1044+
// To avoid flakiness: there are no guarantees that the
1045+
// hedged request is triggered before the upload timeout
1046+
// expiration.
1047+
hedgedRequestTriggered := func() bool {
1048+
bucket.uploads.Store(0)
1049+
err := sw.uploadBlock(context.Background(), nil, new(metastorev1.BlockMeta), new(segment))
1050+
return errors.Is(err, context.DeadlineExceeded) && int64(2) == bucket.uploads.Load()
1051+
}
1052+
1053+
require.Eventually(t, hedgedRequestTriggered, time.Second*10, time.Millisecond*50)
1054+
})
1055+
1056+
t.Run("exhausted", func(t *testing.T) {
1057+
t.Parallel()
1058+
1059+
bucket := &mockBucket{InMemBucket: memory.NewInMemBucket()}
1060+
logger := test.NewTestingLogger(t)
1061+
1062+
var config Config
1063+
config.RegisterFlags(flag.NewFlagSet("test", flag.PanicOnError))
1064+
config.UploadTimeout = time.Millisecond * 250
1065+
config.UploadHedgeAfter = time.Millisecond
1066+
config.UploadHedgeRateMax = 0.1
1067+
config.UploadHedgeRateBurst = 10
1068+
config.UploadMaxRetries = 0
1069+
1070+
sw := &segmentsWriter{
1071+
config: config,
1072+
logger: logger,
1073+
bucket: bucket,
1074+
hedgedUploadLimiter: rate.NewLimiter(rate.Limit(config.UploadHedgeRateMax), int(config.UploadHedgeRateBurst)),
1075+
metrics: newSegmentMetrics(nil),
1076+
}
1077+
1078+
require.True(t, sw.hedgedUploadLimiter.ReserveN(time.Now(), int(config.UploadHedgeRateBurst)).OK())
1079+
err := sw.uploadBlock(context.Background(), nil, new(metastorev1.BlockMeta), new(segment))
1080+
require.ErrorIs(t, err, context.DeadlineExceeded)
1081+
require.Equal(t, int64(1), bucket.uploads.Load())
1082+
})
1083+
}

pkg/experiment/ingester/service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
7575
f.DurationVar(&cfg.UploadMinBackoff, prefix+".upload-retry-min-period", 50*time.Millisecond, "Minimum delay when backing off.")
7676
f.DurationVar(&cfg.UploadMaxBackoff, prefix+".upload-retry-max-period", defaultSegmentDuration, "Maximum delay when backing off.")
7777
f.DurationVar(&cfg.UploadHedgeAfter, prefix+".upload-hedge-after", defaultSegmentDuration, "Time after which to hedge the upload request.")
78-
f.Float64Var(&cfg.UploadHedgeRateMax, prefix+".upload-hedge-rate-max", defaultHedgedRequestMaxRate, "Maximum number of hedged requests per second.")
78+
f.Float64Var(&cfg.UploadHedgeRateMax, prefix+".upload-hedge-rate-max", defaultHedgedRequestMaxRate, "Maximum number of hedged requests per second. 0 disables rate limiting.")
7979
f.UintVar(&cfg.UploadHedgeRateBurst, prefix+".upload-hedge-rate-burst", defaultHedgedRequestBurst, "Maximum number of hedged requests in a burst.")
8080
f.BoolVar(&cfg.MetadataDLQEnabled, prefix+".metadata-dlq-enabled", true, "Enables dead letter queue (DLQ) for metadata. If the metadata update fails, it will be stored and updated asynchronously.")
8181
f.DurationVar(&cfg.MetadataUpdateTimeout, prefix+".metadata-update-timeout", 2*time.Second, "Timeout for metadata update requests.")

pkg/util/retry/hedged.go

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,6 @@ type Hedged[T any] struct {
2626
// - the result received first is returned, regardless of anything.
2727
// - if Call fails before the trigger fires, it won't be retried.
2828
FailFast bool
29-
30-
// Throttler executes call retries. Optional.
31-
Throttler
32-
}
33-
34-
type Throttler interface {
35-
Run(func())
3629
}
3730

3831
type Call[T any] func(ctx context.Context, isRetry bool) (T, error)
@@ -76,13 +69,7 @@ func (s Hedged[T]) Do(ctx context.Context) (T, error) {
7669
case <-attemptCtx.Done():
7770
// Call has returned, or caller cancelled the request.
7871
case <-s.Trigger:
79-
if s.Throttler != nil {
80-
s.Throttler.Run(func() {
81-
attempt(true)
82-
})
83-
} else {
84-
attempt(true)
85-
}
72+
attempt(true)
8673
}
8774

8875
wg.Wait()

pkg/util/retry/hedged_test.go

Lines changed: 0 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package retry
33
import (
44
"context"
55
"errors"
6-
"sync/atomic"
76
"testing"
87
"time"
98
)
@@ -155,50 +154,3 @@ func Test_Hedging(t *testing.T) {
155154
})
156155
}
157156
}
158-
159-
func Test_Hedging_Limiter(t *testing.T) {
160-
type testCase struct {
161-
description string
162-
runner Throttler
163-
maxAttempts int64
164-
}
165-
166-
const attempts = 5
167-
testCases := []testCase{
168-
{
169-
description: "zero limit disables retries",
170-
runner: NewLimiter(0),
171-
maxAttempts: attempts,
172-
},
173-
{
174-
description: "number of attempts does not exceed the limit",
175-
runner: NewLimiter(2),
176-
maxAttempts: attempts + 2,
177-
},
178-
}
179-
180-
for _, c := range testCases {
181-
c := c
182-
t.Run(c.description, func(t *testing.T) {
183-
t.Parallel()
184-
185-
var n int64
186-
attempt := Hedged[int]{
187-
Throttler: NewLimiter(0),
188-
Call: func(context.Context, bool) (int, error) {
189-
atomic.AddInt64(&n, 1)
190-
<-time.After(time.Millisecond)
191-
return 0, nil
192-
},
193-
}
194-
195-
for i := 0; i < 5; i++ {
196-
_, _ = attempt.Do(context.Background())
197-
}
198-
199-
if n > c.maxAttempts {
200-
t.Fatal("number of attempts exceeded")
201-
}
202-
})
203-
}
204-
}

pkg/util/retry/limiter.go

Lines changed: 0 additions & 38 deletions
This file was deleted.

0 commit comments

Comments
 (0)