Skip to content

fix(v2): make segment upload hedging limiter respect context #4252

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 36 additions & 36 deletions pkg/experiment/ingester/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/thanos-io/objstore"
"golang.org/x/exp/maps"
"golang.org/x/time/rate"

profilev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1"
metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
Expand Down Expand Up @@ -51,9 +52,9 @@ type segmentsWriter struct {
ctx context.Context
cancel context.CancelFunc

metrics *segmentMetrics
headMetrics *memdb.HeadMetrics
retryLimiter *retry.RateLimiter
metrics *segmentMetrics
headMetrics *memdb.HeadMetrics
hedgedUploadLimiter *rate.Limiter
}

type shard struct {
Expand Down Expand Up @@ -137,7 +138,7 @@ func newSegmentWriter(l log.Logger, metrics *segmentMetrics, hm *memdb.HeadMetri
shards: make(map[shardKey]*shard),
metastore: metastoreClient,
}
sw.retryLimiter = retry.NewRateLimiter(sw.config.UploadHedgeRateMax, int(sw.config.UploadHedgeRateBurst))
sw.hedgedUploadLimiter = rate.NewLimiter(rate.Limit(sw.config.UploadHedgeRateMax), int(sw.config.UploadHedgeRateBurst))
sw.ctx, sw.cancel = context.WithCancel(context.Background())
flushWorkers := runtime.GOMAXPROCS(-1)
if config.FlushConcurrency > 0 {
Expand Down Expand Up @@ -605,39 +606,38 @@ func (sw *segmentsWriter) uploadBlock(ctx context.Context, blockData []byte, met
// if the request is not completed within a certain time, we trigger
// a second upload attempt. Upload errors are retried explicitly and
// are included into the call duration.
uploadWithRetry := func(ctx context.Context, hedge bool) (any, error) {
retryConfig := backoff.Config{
MinBackoff: sw.config.UploadMinBackoff,
MaxBackoff: sw.config.UploadMaxBackoff,
MaxRetries: sw.config.UploadMaxRetries,
}
var attemptErr error
if hedge {
// Hedged requests are not retried.
retryConfig.MaxRetries = 1
attemptStart := time.Now()
defer func() {
sw.metrics.segmentHedgedUploadDuration.
WithLabelValues(statusLabelValue(attemptErr)).
Observe(time.Since(attemptStart).Seconds())
}()
}
// Retry on all errors.
retries := backoff.New(ctx, retryConfig)
for retries.Ongoing() && ctx.Err() == nil {
if attemptErr = sw.bucket.Upload(ctx, path, bytes.NewReader(blockData)); attemptErr == nil {
break
}
retries.Wait()
}
return nil, attemptErr
}

hedgedUpload := retry.Hedged[any]{
Call: uploadWithRetry,
Trigger: time.After(sw.config.UploadHedgeAfter),
Throttler: sw.retryLimiter,
FailFast: false,
Trigger: time.After(sw.config.UploadHedgeAfter),
Call: func(ctx context.Context, hedge bool) (any, error) {
retryConfig := backoff.Config{
MinBackoff: sw.config.UploadMinBackoff,
MaxBackoff: sw.config.UploadMaxBackoff,
MaxRetries: sw.config.UploadMaxRetries,
}
var attemptErr error
if hedge {
if limitErr := sw.hedgedUploadLimiter.Wait(ctx); limitErr != nil {
return nil, limitErr
}
// Hedged requests are not retried.
retryConfig.MaxRetries = 0
attemptStart := time.Now()
defer func() {
sw.metrics.segmentHedgedUploadDuration.
WithLabelValues(statusLabelValue(attemptErr)).
Observe(time.Since(attemptStart).Seconds())
}()
}
// Retry on all errors.
retries := backoff.New(ctx, retryConfig)
for retries.Ongoing() {
if attemptErr = sw.bucket.Upload(ctx, path, bytes.NewReader(blockData)); attemptErr == nil {
break
}
retries.Wait()
}
return nil, attemptErr
},
}

if _, err = hedgedUpload.Do(ctx); err != nil {
Expand Down
108 changes: 106 additions & 2 deletions pkg/experiment/ingester/segment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package ingester
import (
"bytes"
"context"
"errors"
"flag"
"fmt"
"io"
"math/rand"
Expand All @@ -14,21 +16,21 @@ import (
"testing"
"time"

"github.com/grafana/pyroscope/pkg/experiment/block/metadata"

gprofile "github.com/google/pprof/profile"
"github.com/grafana/dskit/flagext"
model2 "github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"

profilev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1"
ingesterv1 "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1"
"github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1/ingesterv1connect"
metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
"github.com/grafana/pyroscope/pkg/experiment/block"
"github.com/grafana/pyroscope/pkg/experiment/block/metadata"
"github.com/grafana/pyroscope/pkg/experiment/ingester/memdb"
testutil2 "github.com/grafana/pyroscope/pkg/experiment/ingester/memdb/testutil"
"github.com/grafana/pyroscope/pkg/experiment/metastore"
Expand Down Expand Up @@ -1004,3 +1006,105 @@ func hasUnsymbolizedLabel(t *testing.T, block *metastorev1.BlockMeta) bool {
}
return false
}

type mockBucket struct {
*memory.InMemBucket
uploads atomic.Int64
}

func (m *mockBucket) Upload(ctx context.Context, _ string, _ io.Reader) error {
m.uploads.Add(1)
<-ctx.Done()
return ctx.Err()
}

func TestUploadBlock_HedgedUploadLimiter(t *testing.T) {
t.Run("disabled", func(t *testing.T) {
t.Parallel()

bucket := &mockBucket{InMemBucket: memory.NewInMemBucket()}
logger := test.NewTestingLogger(t)

var config Config
config.RegisterFlags(flag.NewFlagSet("test", flag.PanicOnError))
config.UploadTimeout = time.Millisecond * 250
config.UploadHedgeAfter = time.Millisecond
config.UploadHedgeRateMax = 0
config.UploadHedgeRateBurst = 0
config.UploadMaxRetries = 0

sw := &segmentsWriter{
config: config,
logger: logger,
bucket: bucket,
hedgedUploadLimiter: rate.NewLimiter(rate.Limit(config.UploadHedgeRateMax), int(config.UploadHedgeRateBurst)),
metrics: newSegmentMetrics(nil),
}

err := sw.uploadBlock(context.Background(), nil, new(metastorev1.BlockMeta), new(segment))
require.ErrorIs(t, err, context.DeadlineExceeded)
require.Equal(t, int64(1), bucket.uploads.Load())
})

t.Run("available", func(t *testing.T) {
t.Parallel()

bucket := &mockBucket{InMemBucket: memory.NewInMemBucket()}
logger := test.NewTestingLogger(t)

var config Config
config.RegisterFlags(flag.NewFlagSet("test", flag.PanicOnError))
config.UploadTimeout = time.Millisecond * 250
config.UploadHedgeAfter = time.Millisecond
config.UploadHedgeRateMax = 10
config.UploadHedgeRateBurst = 10
config.UploadMaxRetries = 0

sw := &segmentsWriter{
config: config,
logger: logger,
bucket: bucket,
hedgedUploadLimiter: rate.NewLimiter(rate.Limit(config.UploadHedgeRateMax), int(config.UploadHedgeRateBurst)),
metrics: newSegmentMetrics(nil),
}

// To avoid flakiness: there are no guarantees that the
// hedged request is triggered before the upload timeout
// expiration.
hedgedRequestTriggered := func() bool {
bucket.uploads.Store(0)
err := sw.uploadBlock(context.Background(), nil, new(metastorev1.BlockMeta), new(segment))
return errors.Is(err, context.DeadlineExceeded) && int64(2) == bucket.uploads.Load()
}

require.Eventually(t, hedgedRequestTriggered, time.Second*10, time.Millisecond*50)
})

t.Run("exhausted", func(t *testing.T) {
t.Parallel()

bucket := &mockBucket{InMemBucket: memory.NewInMemBucket()}
logger := test.NewTestingLogger(t)

var config Config
config.RegisterFlags(flag.NewFlagSet("test", flag.PanicOnError))
config.UploadTimeout = time.Millisecond * 250
config.UploadHedgeAfter = time.Millisecond
config.UploadHedgeRateMax = 0.1
config.UploadHedgeRateBurst = 10
config.UploadMaxRetries = 0

sw := &segmentsWriter{
config: config,
logger: logger,
bucket: bucket,
hedgedUploadLimiter: rate.NewLimiter(rate.Limit(config.UploadHedgeRateMax), int(config.UploadHedgeRateBurst)),
metrics: newSegmentMetrics(nil),
}

require.True(t, sw.hedgedUploadLimiter.ReserveN(time.Now(), int(config.UploadHedgeRateBurst)).OK())
err := sw.uploadBlock(context.Background(), nil, new(metastorev1.BlockMeta), new(segment))
require.ErrorIs(t, err, context.DeadlineExceeded)
require.Equal(t, int64(1), bucket.uploads.Load())
})
}
2 changes: 1 addition & 1 deletion pkg/experiment/ingester/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.UploadMinBackoff, prefix+".upload-retry-min-period", 50*time.Millisecond, "Minimum delay when backing off.")
f.DurationVar(&cfg.UploadMaxBackoff, prefix+".upload-retry-max-period", defaultSegmentDuration, "Maximum delay when backing off.")
f.DurationVar(&cfg.UploadHedgeAfter, prefix+".upload-hedge-after", defaultSegmentDuration, "Time after which to hedge the upload request.")
f.Float64Var(&cfg.UploadHedgeRateMax, prefix+".upload-hedge-rate-max", defaultHedgedRequestMaxRate, "Maximum number of hedged requests per second.")
f.Float64Var(&cfg.UploadHedgeRateMax, prefix+".upload-hedge-rate-max", defaultHedgedRequestMaxRate, "Maximum number of hedged requests per second. 0 disables rate limiting.")
f.UintVar(&cfg.UploadHedgeRateBurst, prefix+".upload-hedge-rate-burst", defaultHedgedRequestBurst, "Maximum number of hedged requests in a burst.")
f.BoolVar(&cfg.MetadataDLQEnabled, prefix+".metadata-dlq-enabled", true, "Enables dead letter queue (DLQ) for metadata. If the metadata update fails, it will be stored and updated asynchronously.")
f.DurationVar(&cfg.MetadataUpdateTimeout, prefix+".metadata-update-timeout", 2*time.Second, "Timeout for metadata update requests.")
Expand Down
15 changes: 1 addition & 14 deletions pkg/util/retry/hedged.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,6 @@ type Hedged[T any] struct {
// - the result received first is returned, regardless of anything.
// - if Call fails before the trigger fires, it won't be retried.
FailFast bool

// Throttler executes call retries. Optional.
Throttler
}

type Throttler interface {
Run(func())
}

type Call[T any] func(ctx context.Context, isRetry bool) (T, error)
Expand Down Expand Up @@ -76,13 +69,7 @@ func (s Hedged[T]) Do(ctx context.Context) (T, error) {
case <-attemptCtx.Done():
// Call has returned, or caller cancelled the request.
case <-s.Trigger:
if s.Throttler != nil {
s.Throttler.Run(func() {
attempt(true)
})
} else {
attempt(true)
}
attempt(true)
}

wg.Wait()
Expand Down
48 changes: 0 additions & 48 deletions pkg/util/retry/hedged_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package retry
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"
)
Expand Down Expand Up @@ -155,50 +154,3 @@ func Test_Hedging(t *testing.T) {
})
}
}

func Test_Hedging_Limiter(t *testing.T) {
type testCase struct {
description string
runner Throttler
maxAttempts int64
}

const attempts = 5
testCases := []testCase{
{
description: "zero limit disables retries",
runner: NewLimiter(0),
maxAttempts: attempts,
},
{
description: "number of attempts does not exceed the limit",
runner: NewLimiter(2),
maxAttempts: attempts + 2,
},
}

for _, c := range testCases {
c := c
t.Run(c.description, func(t *testing.T) {
t.Parallel()

var n int64
attempt := Hedged[int]{
Throttler: NewLimiter(0),
Call: func(context.Context, bool) (int, error) {
atomic.AddInt64(&n, 1)
<-time.After(time.Millisecond)
return 0, nil
},
}

for i := 0; i < 5; i++ {
_, _ = attempt.Do(context.Background())
}

if n > c.maxAttempts {
t.Fatal("number of attempts exceeded")
}
})
}
}
38 changes: 0 additions & 38 deletions pkg/util/retry/limiter.go

This file was deleted.

Loading