diff --git a/pkg/experiment/ingester/segment.go b/pkg/experiment/ingester/segment.go index a2a4b0eb0e..2d91daa47d 100644 --- a/pkg/experiment/ingester/segment.go +++ b/pkg/experiment/ingester/segment.go @@ -22,6 +22,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/thanos-io/objstore" "golang.org/x/exp/maps" + "golang.org/x/time/rate" profilev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1" metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" @@ -51,9 +52,9 @@ type segmentsWriter struct { ctx context.Context cancel context.CancelFunc - metrics *segmentMetrics - headMetrics *memdb.HeadMetrics - retryLimiter *retry.RateLimiter + metrics *segmentMetrics + headMetrics *memdb.HeadMetrics + hedgedUploadLimiter *rate.Limiter } type shard struct { @@ -137,7 +138,7 @@ func newSegmentWriter(l log.Logger, metrics *segmentMetrics, hm *memdb.HeadMetri shards: make(map[shardKey]*shard), metastore: metastoreClient, } - sw.retryLimiter = retry.NewRateLimiter(sw.config.UploadHedgeRateMax, int(sw.config.UploadHedgeRateBurst)) + sw.hedgedUploadLimiter = rate.NewLimiter(rate.Limit(sw.config.UploadHedgeRateMax), int(sw.config.UploadHedgeRateBurst)) sw.ctx, sw.cancel = context.WithCancel(context.Background()) flushWorkers := runtime.GOMAXPROCS(-1) if config.FlushConcurrency > 0 { @@ -605,39 +606,38 @@ func (sw *segmentsWriter) uploadBlock(ctx context.Context, blockData []byte, met // if the request is not completed within a certain time, we trigger // a second upload attempt. Upload errors are retried explicitly and // are included into the call duration. - uploadWithRetry := func(ctx context.Context, hedge bool) (any, error) { - retryConfig := backoff.Config{ - MinBackoff: sw.config.UploadMinBackoff, - MaxBackoff: sw.config.UploadMaxBackoff, - MaxRetries: sw.config.UploadMaxRetries, - } - var attemptErr error - if hedge { - // Hedged requests are not retried. - retryConfig.MaxRetries = 1 - attemptStart := time.Now() - defer func() { - sw.metrics.segmentHedgedUploadDuration. - WithLabelValues(statusLabelValue(attemptErr)). - Observe(time.Since(attemptStart).Seconds()) - }() - } - // Retry on all errors. - retries := backoff.New(ctx, retryConfig) - for retries.Ongoing() && ctx.Err() == nil { - if attemptErr = sw.bucket.Upload(ctx, path, bytes.NewReader(blockData)); attemptErr == nil { - break - } - retries.Wait() - } - return nil, attemptErr - } - hedgedUpload := retry.Hedged[any]{ - Call: uploadWithRetry, - Trigger: time.After(sw.config.UploadHedgeAfter), - Throttler: sw.retryLimiter, - FailFast: false, + Trigger: time.After(sw.config.UploadHedgeAfter), + Call: func(ctx context.Context, hedge bool) (any, error) { + retryConfig := backoff.Config{ + MinBackoff: sw.config.UploadMinBackoff, + MaxBackoff: sw.config.UploadMaxBackoff, + MaxRetries: sw.config.UploadMaxRetries, + } + var attemptErr error + if hedge { + if limitErr := sw.hedgedUploadLimiter.Wait(ctx); limitErr != nil { + return nil, limitErr + } + // Hedged requests are not retried. + retryConfig.MaxRetries = 0 + attemptStart := time.Now() + defer func() { + sw.metrics.segmentHedgedUploadDuration. + WithLabelValues(statusLabelValue(attemptErr)). + Observe(time.Since(attemptStart).Seconds()) + }() + } + // Retry on all errors. + retries := backoff.New(ctx, retryConfig) + for retries.Ongoing() { + if attemptErr = sw.bucket.Upload(ctx, path, bytes.NewReader(blockData)); attemptErr == nil { + break + } + retries.Wait() + } + return nil, attemptErr + }, } if _, err = hedgedUpload.Do(ctx); err != nil { diff --git a/pkg/experiment/ingester/segment_test.go b/pkg/experiment/ingester/segment_test.go index 145a945235..ee4cb3bf6b 100644 --- a/pkg/experiment/ingester/segment_test.go +++ b/pkg/experiment/ingester/segment_test.go @@ -3,6 +3,8 @@ package ingester import ( "bytes" "context" + "errors" + "flag" "fmt" "io" "math/rand" @@ -14,14 +16,13 @@ import ( "testing" "time" - "github.com/grafana/pyroscope/pkg/experiment/block/metadata" - gprofile "github.com/google/pprof/profile" "github.com/grafana/dskit/flagext" model2 "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "golang.org/x/time/rate" profilev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1" ingesterv1 "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1" @@ -29,6 +30,7 @@ import ( metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" "github.com/grafana/pyroscope/pkg/experiment/block" + "github.com/grafana/pyroscope/pkg/experiment/block/metadata" "github.com/grafana/pyroscope/pkg/experiment/ingester/memdb" testutil2 "github.com/grafana/pyroscope/pkg/experiment/ingester/memdb/testutil" "github.com/grafana/pyroscope/pkg/experiment/metastore" @@ -1004,3 +1006,105 @@ func hasUnsymbolizedLabel(t *testing.T, block *metastorev1.BlockMeta) bool { } return false } + +type mockBucket struct { + *memory.InMemBucket + uploads atomic.Int64 +} + +func (m *mockBucket) Upload(ctx context.Context, _ string, _ io.Reader) error { + m.uploads.Add(1) + <-ctx.Done() + return ctx.Err() +} + +func TestUploadBlock_HedgedUploadLimiter(t *testing.T) { + t.Run("disabled", func(t *testing.T) { + t.Parallel() + + bucket := &mockBucket{InMemBucket: memory.NewInMemBucket()} + logger := test.NewTestingLogger(t) + + var config Config + config.RegisterFlags(flag.NewFlagSet("test", flag.PanicOnError)) + config.UploadTimeout = time.Millisecond * 250 + config.UploadHedgeAfter = time.Millisecond + config.UploadHedgeRateMax = 0 + config.UploadHedgeRateBurst = 0 + config.UploadMaxRetries = 0 + + sw := &segmentsWriter{ + config: config, + logger: logger, + bucket: bucket, + hedgedUploadLimiter: rate.NewLimiter(rate.Limit(config.UploadHedgeRateMax), int(config.UploadHedgeRateBurst)), + metrics: newSegmentMetrics(nil), + } + + err := sw.uploadBlock(context.Background(), nil, new(metastorev1.BlockMeta), new(segment)) + require.ErrorIs(t, err, context.DeadlineExceeded) + require.Equal(t, int64(1), bucket.uploads.Load()) + }) + + t.Run("available", func(t *testing.T) { + t.Parallel() + + bucket := &mockBucket{InMemBucket: memory.NewInMemBucket()} + logger := test.NewTestingLogger(t) + + var config Config + config.RegisterFlags(flag.NewFlagSet("test", flag.PanicOnError)) + config.UploadTimeout = time.Millisecond * 250 + config.UploadHedgeAfter = time.Millisecond + config.UploadHedgeRateMax = 10 + config.UploadHedgeRateBurst = 10 + config.UploadMaxRetries = 0 + + sw := &segmentsWriter{ + config: config, + logger: logger, + bucket: bucket, + hedgedUploadLimiter: rate.NewLimiter(rate.Limit(config.UploadHedgeRateMax), int(config.UploadHedgeRateBurst)), + metrics: newSegmentMetrics(nil), + } + + // To avoid flakiness: there are no guarantees that the + // hedged request is triggered before the upload timeout + // expiration. + hedgedRequestTriggered := func() bool { + bucket.uploads.Store(0) + err := sw.uploadBlock(context.Background(), nil, new(metastorev1.BlockMeta), new(segment)) + return errors.Is(err, context.DeadlineExceeded) && int64(2) == bucket.uploads.Load() + } + + require.Eventually(t, hedgedRequestTriggered, time.Second*10, time.Millisecond*50) + }) + + t.Run("exhausted", func(t *testing.T) { + t.Parallel() + + bucket := &mockBucket{InMemBucket: memory.NewInMemBucket()} + logger := test.NewTestingLogger(t) + + var config Config + config.RegisterFlags(flag.NewFlagSet("test", flag.PanicOnError)) + config.UploadTimeout = time.Millisecond * 250 + config.UploadHedgeAfter = time.Millisecond + config.UploadHedgeRateMax = 0.1 + config.UploadHedgeRateBurst = 10 + config.UploadMaxRetries = 0 + + sw := &segmentsWriter{ + config: config, + logger: logger, + bucket: bucket, + hedgedUploadLimiter: rate.NewLimiter(rate.Limit(config.UploadHedgeRateMax), int(config.UploadHedgeRateBurst)), + metrics: newSegmentMetrics(nil), + } + + require.True(t, sw.hedgedUploadLimiter.ReserveN(time.Now(), int(config.UploadHedgeRateBurst)).OK()) + err := sw.uploadBlock(context.Background(), nil, new(metastorev1.BlockMeta), new(segment)) + require.ErrorIs(t, err, context.DeadlineExceeded) + require.Equal(t, int64(1), bucket.uploads.Load()) + }) +} diff --git a/pkg/experiment/ingester/service.go b/pkg/experiment/ingester/service.go index 0e5db05b44..183ac4f131 100644 --- a/pkg/experiment/ingester/service.go +++ b/pkg/experiment/ingester/service.go @@ -75,7 +75,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.UploadMinBackoff, prefix+".upload-retry-min-period", 50*time.Millisecond, "Minimum delay when backing off.") f.DurationVar(&cfg.UploadMaxBackoff, prefix+".upload-retry-max-period", defaultSegmentDuration, "Maximum delay when backing off.") f.DurationVar(&cfg.UploadHedgeAfter, prefix+".upload-hedge-after", defaultSegmentDuration, "Time after which to hedge the upload request.") - f.Float64Var(&cfg.UploadHedgeRateMax, prefix+".upload-hedge-rate-max", defaultHedgedRequestMaxRate, "Maximum number of hedged requests per second.") + f.Float64Var(&cfg.UploadHedgeRateMax, prefix+".upload-hedge-rate-max", defaultHedgedRequestMaxRate, "Maximum number of hedged requests per second. 0 disables rate limiting.") f.UintVar(&cfg.UploadHedgeRateBurst, prefix+".upload-hedge-rate-burst", defaultHedgedRequestBurst, "Maximum number of hedged requests in a burst.") f.BoolVar(&cfg.MetadataDLQEnabled, prefix+".metadata-dlq-enabled", true, "Enables dead letter queue (DLQ) for metadata. If the metadata update fails, it will be stored and updated asynchronously.") f.DurationVar(&cfg.MetadataUpdateTimeout, prefix+".metadata-update-timeout", 2*time.Second, "Timeout for metadata update requests.") diff --git a/pkg/util/retry/hedged.go b/pkg/util/retry/hedged.go index 5acf12e6eb..957a4aa49c 100644 --- a/pkg/util/retry/hedged.go +++ b/pkg/util/retry/hedged.go @@ -26,13 +26,6 @@ type Hedged[T any] struct { // - the result received first is returned, regardless of anything. // - if Call fails before the trigger fires, it won't be retried. FailFast bool - - // Throttler executes call retries. Optional. - Throttler -} - -type Throttler interface { - Run(func()) } type Call[T any] func(ctx context.Context, isRetry bool) (T, error) @@ -76,13 +69,7 @@ func (s Hedged[T]) Do(ctx context.Context) (T, error) { case <-attemptCtx.Done(): // Call has returned, or caller cancelled the request. case <-s.Trigger: - if s.Throttler != nil { - s.Throttler.Run(func() { - attempt(true) - }) - } else { - attempt(true) - } + attempt(true) } wg.Wait() diff --git a/pkg/util/retry/hedged_test.go b/pkg/util/retry/hedged_test.go index 6476dc17ae..e17c515edf 100644 --- a/pkg/util/retry/hedged_test.go +++ b/pkg/util/retry/hedged_test.go @@ -3,7 +3,6 @@ package retry import ( "context" "errors" - "sync/atomic" "testing" "time" ) @@ -155,50 +154,3 @@ func Test_Hedging(t *testing.T) { }) } } - -func Test_Hedging_Limiter(t *testing.T) { - type testCase struct { - description string - runner Throttler - maxAttempts int64 - } - - const attempts = 5 - testCases := []testCase{ - { - description: "zero limit disables retries", - runner: NewLimiter(0), - maxAttempts: attempts, - }, - { - description: "number of attempts does not exceed the limit", - runner: NewLimiter(2), - maxAttempts: attempts + 2, - }, - } - - for _, c := range testCases { - c := c - t.Run(c.description, func(t *testing.T) { - t.Parallel() - - var n int64 - attempt := Hedged[int]{ - Throttler: NewLimiter(0), - Call: func(context.Context, bool) (int, error) { - atomic.AddInt64(&n, 1) - <-time.After(time.Millisecond) - return 0, nil - }, - } - - for i := 0; i < 5; i++ { - _, _ = attempt.Do(context.Background()) - } - - if n > c.maxAttempts { - t.Fatal("number of attempts exceeded") - } - }) - } -} diff --git a/pkg/util/retry/limiter.go b/pkg/util/retry/limiter.go deleted file mode 100644 index 22fabc5fa7..0000000000 --- a/pkg/util/retry/limiter.go +++ /dev/null @@ -1,38 +0,0 @@ -package retry - -import ( - "context" - "sync/atomic" - - "golang.org/x/time/rate" -) - -// RateLimiter implements Throttler using golang.org/x/time/rate -// with the given rate (ops per second) and burst size. -type RateLimiter struct { - limiter *rate.Limiter -} - -func NewRateLimiter(ratePerSec float64, burst int) *RateLimiter { - return &RateLimiter{limiter: rate.NewLimiter(rate.Limit(ratePerSec), burst)} -} - -func (r *RateLimiter) Run(f func()) { - _ = r.limiter.Wait(context.Background()) - f() -} - -// Limiter limits the number of tasks executed. -// Once the limit is reached, no more runs will be done. -type Limiter struct { - runs int64 - limit int64 -} - -func NewLimiter(n int64) *Limiter { return &Limiter{limit: n} } - -func (l *Limiter) Run(f func()) { - if atomic.AddInt64(&l.runs, 1) <= l.limit { - f() - } -}