Skip to content

Add nativeHistogram IngestionRate limit #6794

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
* [ENHANCEMENT] Distributor: Add min/max schema validation for NativeHistograms. #6766
* [ENHANCEMENT] Ingester: Handle runtime errors in query path #6769
* [ENHANCEMENT] Compactor: Support metadata caching bucket for Cleaner. Can be enabled via `-compactor.cleaner-caching-bucket-enabled` flag. #6778
* [ENHANCEMENT] Distributor: Add ingestion rate limit for Native Histograms. #6794
* [ENHANCEMENT] Compactor, Store Gateway: Introduce user scanner strategy and user index. #6780
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
Expand Down
10 changes: 10 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3427,6 +3427,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -distributor.ingestion-rate-limit
[ingestion_rate: <float> | default = 25000]

# Per-user nativeHistograms ingestion rate limit in samples per second. 0 to
# disable the limit
# CLI flag: -distributor.native-histograms-ingestion-rate-limit
[native_histograms_ingestion_rate: <float> | default = 0]

# Whether the ingestion rate limit should be applied individually to each
# distributor instance (local), or evenly shared across the cluster (global).
# CLI flag: -distributor.ingestion-rate-limit-strategy
Expand All @@ -3436,6 +3441,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -distributor.ingestion-burst-size
[ingestion_burst_size: <int> | default = 50000]

# Per-user allowed nativeHistograms ingestion burst size (in number of samples).
# 0 to disable the limit
# CLI flag: -distributor.native-histograms-ingestion-burst-size
[native_histograms_ingestion_burst_size: <int> | default = 0]

# Flag to enable, for all users, handling of samples with external labels
# identifying replicas in an HA Prometheus setup.
# CLI flag: -distributor.ha-tracker.enable-for-all-users
Expand Down
52 changes: 37 additions & 15 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ type Distributor struct {
HATracker *ha.HATracker

// Per-user rate limiter.
ingestionRateLimiter *limiter.RateLimiter
ingestionRateLimiter *limiter.RateLimiter
nativeHistogramsIngestionRateLimiter *limiter.RateLimiter

// Manager for subservices (HA Tracker, distributor ring and client pool)
subservices *services.Manager
Expand Down Expand Up @@ -267,11 +268,13 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
// it's an internal dependency and can't join the distributors ring, we skip rate
// limiting.
var ingestionRateStrategy limiter.RateLimiterStrategy
var nativeHistogramsIngestionRateStrategy limiter.RateLimiterStrategy
var distributorsLifeCycler *ring.Lifecycler
var distributorsRing *ring.Ring

if !canJoinDistributorsRing {
ingestionRateStrategy = newInfiniteIngestionRateStrategy()
nativeHistogramsIngestionRateStrategy = newInfiniteIngestionRateStrategy()
} else if limits.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy {
distributorsLifeCycler, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ringKey, true, true, log, prometheus.WrapRegistererWithPrefix("cortex_", reg))
if err != nil {
Expand All @@ -285,21 +288,24 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
subservices = append(subservices, distributorsLifeCycler, distributorsRing)

ingestionRateStrategy = newGlobalIngestionRateStrategy(limits, distributorsLifeCycler)
nativeHistogramsIngestionRateStrategy = newGlobalNativeHistogramsIngestionRateStrategy(limits, distributorsLifeCycler)
} else {
ingestionRateStrategy = newLocalIngestionRateStrategy(limits)
nativeHistogramsIngestionRateStrategy = newLocalNativeHistogramsIngestionRateStrategy(limits)
}

d := &Distributor{
cfg: cfg,
log: log,
ingestersRing: ingestersRing,
ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log),
distributorsLifeCycler: distributorsLifeCycler,
distributorsRing: distributorsRing,
limits: limits,
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
HATracker: haTracker,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
cfg: cfg,
log: log,
ingestersRing: ingestersRing,
ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log),
distributorsLifeCycler: distributorsLifeCycler,
distributorsRing: distributorsRing,
limits: limits,
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
nativeHistogramsIngestionRateLimiter: limiter.NewRateLimiter(nativeHistogramsIngestionRateStrategy, 10*time.Second),
HATracker: haTracker,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),

queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Expand Down Expand Up @@ -774,16 +780,32 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co

totalSamples := validatedFloatSamples + validatedHistogramSamples
totalN := totalSamples + validatedExemplars + len(validatedMetadata)
if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {

nhRateLimited := false
if limits.NativeHistogramsIngestionRate > 0 && limits.NativeHistogramsIngestionBurstSize > 0 {
nhRateLimited = !d.nativeHistogramsIngestionRateLimiter.AllowN(now, userID, validatedHistogramSamples)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm i am not sure this is how was imagining it. I think the limiter could handle this 0 as disabled.
Looking at the rate documentation, 0 is a valid value meaning all will be blocked. MaxFloat is disabling it. We might can add the default as maxFloat and see if it will work
https://github.com/cortexproject/cortex/blob/master/vendor/golang.org/x/time/rate/rate.go#L40

@yeya24 opinion?

Copy link
Contributor

@yeya24 yeya24 Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good catch.
If ingestion rate is set to 0 then we just don't run d.nativeHistogramsIngestionRateLimiter.AllowN. Or as Daniel said we use a very big value.

Let's set default to max float.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I have updated now.

rateLimited := !d.ingestionRateLimiter.AllowN(now, userID, totalN)

// Return a 429 here to tell the client it is going too fast.
// Client may discard the data or slow down and re-send.
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
if nhRateLimited {
// Ensure the request slice is reused if the request is rate limited.
cortexpb.ReuseSlice(req.Timeseries)
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(totalSamples))
d.validateMetrics.DiscardedExemplars.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(validatedExemplars))
d.validateMetrics.DiscardedMetadata.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(len(validatedMetadata)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we always returning NativeHistogramsRateLimited? Can't this be trigger only by rateLimited?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks very much.
I needed to set label value validation.RateLimited in case it is rateLimited due to IngestionRate limit, and set label value validation.NativeHistogramsRateLimited in case it is nhRateLimited due to nativeHistogramsIngestionRate limit.
Updated now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to drop all samples, exemplars and metadata if native histograms are rate limited?
The default ingestion rate drop everything today because it passes all to the rate limiter. But NH limiter we only check native histograms so it should only throttle native histograms.

I think it doesn't make sense for this limit to impact the existing ingestion rate limit if NH limit is set very small but there is still big room for the default ingestion rate

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. we can just block NH

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still the same. no? Ben suggested we drop only validatedHistogramSamples and dont fail the request right away

Copy link
Contributor

@harry671003 harry671003 Jun 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a use case for ingesting partial samples? I feel it's simpler to drop everything.

Also, we don't do partial ingestion for float samples. For example, in a remote write request with 10K samples, even if ingesting 9K samples is within the rate limit, we still reject the entire request.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me this seems a different type of rejection. For the ones we have today is a global limiter which if we want to accept until the limit we need to "choose" what goes through and what is rejected. This seems more complicated.

For a new limit as specific for nh we can just limit nh samples at all even if we could accept part of it, but allow the rest to go.

Copy link
Contributor Author

@PaurushGarg PaurushGarg Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Updated now.
I want to confirm on below two points:

Even when the NH samples are dropped by the NHRateLimiter, this PR does not reduce the totalN value (by removing the NH Samples count that were dropped by NHRateLimiter).
This is to keep the existing rate limiter behaviour untouched.
So, basically, where NH Samples were discarded by the NHRateLimiter, the existing rateLimiter would still rateLimit on the basis of total samples received in the request, regardless of whether they were already discarded by NHRateLimiter or not. And similarly, the dropped NH samples (dropped by NH Rate Limiter) would still exhaust the tokens of current RateLimiter.

Incase of NHRateLimiting, this CR doesn't return any error message or 429 to Client. It only publishes the discarded samples metric with label value exclusive for NHRateLimiter.


return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.nativeHistogramsIngestionRateLimiter.Limit(now, userID), totalSamples, len(validatedMetadata))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not use camel case for nativeHistograms. How about

native histogram ingestion rate limit (%v) exceeded while adding %d native histogram samples`

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I have updated now.

}
if rateLimited {
// Ensure the request slice is reused if the request is rate limited.
cortexpb.ReuseSlice(req.Timeseries)
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(totalSamples))
d.validateMetrics.DiscardedExemplars.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedExemplars))
d.validateMetrics.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata)))
// Return a 429 here to tell the client it is going too fast.
// Client may discard the data or slow down and re-send.
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.

return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), totalSamples, len(validatedMetadata))
}

Expand Down
118 changes: 118 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,15 @@ func TestDistributor_Push(t *testing.T) {
cortex_distributor_received_samples_total{type="histogram",user="userDistributorPush"} 25
`,
},
"A push not exceeding burst size but exceeding nativeHistograms burst size should fail, histograms": {
numIngesters: 3,
happyIngesters: 3,
samples: samplesIn{num: 15, startTimestampMs: 123456789000},
histogramSamples: true,
metadata: 5,
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (10) exceeded while adding 15 samples and 5 metadata"),
metricNames: []string{lastSeenTimestamp, distributorReceivedSamples},
},
} {
for _, useStreamPush := range []bool{false, true} {
for _, shardByAllLabels := range []bool{true, false} {
Expand All @@ -364,6 +373,8 @@ func TestDistributor_Push(t *testing.T) {
flagext.DefaultValues(limits)
limits.IngestionRate = 20
limits.IngestionBurstSize = 20
limits.NativeHistogramsIngestionRate = 10
limits.NativeHistogramsIngestionBurstSize = 10

ds, _, regs, _ := prepare(t, prepConfig{
numIngesters: tc.numIngesters,
Expand Down Expand Up @@ -681,6 +692,113 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
}
}

func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) {
t.Parallel()
type testPush struct {
samples int
metadata int
expectedError error
}

ctx := user.InjectOrgID(context.Background(), "user")
tests := map[string]struct {
distributors int
ingestionRateStrategy string
ingestionRate float64
ingestionBurstSize int
nativeHistogramsIngestionRate float64
nativeHistogramsIngestionBurstSize int
pushes []testPush
}{
"local strategy: limit should be set to each distributor: histograms": {
distributors: 2,
ingestionRateStrategy: validation.LocalIngestionRateStrategy,
ingestionRate: 10,
ingestionBurstSize: 10,
nativeHistogramsIngestionRate: 5,
nativeHistogramsIngestionBurstSize: 5,
pushes: []testPush{
{samples: 2, expectedError: nil},
{metadata: 1, expectedError: nil},
{samples: 4, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (5) exceeded while adding 4 samples and 0 metadata")},
{samples: 2, metadata: 1, expectedError: nil},
{samples: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (5) exceeded while adding 3 samples and 0 metadata")},
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10) exceeded while adding 0 samples and 1 metadata")},
},
},
"global strategy: limit should be evenly shared across distributors: histograms": {
distributors: 2,
ingestionRateStrategy: validation.GlobalIngestionRateStrategy,
ingestionRate: 10,
ingestionBurstSize: 5,
nativeHistogramsIngestionRate: 6,
nativeHistogramsIngestionBurstSize: 3,
pushes: []testPush{
{samples: 2, expectedError: nil},
{samples: 2, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (3) exceeded while adding 2 samples and 1 metadata")},
{samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 1 samples and 0 metadata")},
{samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (3) exceeded while adding 1 samples and 0 metadata")},
},
},
"global strategy: burst should set to each distributor: histograms": {
distributors: 2,
ingestionRateStrategy: validation.GlobalIngestionRateStrategy,
ingestionRate: 10,
ingestionBurstSize: 20,
nativeHistogramsIngestionRate: 6,
nativeHistogramsIngestionBurstSize: 10,
pushes: []testPush{
{samples: 3, expectedError: nil},
{samples: 1, expectedError: nil},
{samples: 7, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (3) exceeded while adding 7 samples and 1 metadata")},
{samples: 5, expectedError: nil},
{samples: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (3) exceeded while adding 3 samples and 0 metadata")},
{metadata: 12, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 0 samples and 12 metadata")},
},
},
}

for testName, testData := range tests {
testData := testData

t.Run(testName, func(t *testing.T) {
t.Parallel()
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.IngestionRateStrategy = testData.ingestionRateStrategy
limits.IngestionRate = testData.ingestionRate
limits.IngestionBurstSize = testData.ingestionBurstSize
limits.NativeHistogramsIngestionRate = testData.nativeHistogramsIngestionRate
limits.NativeHistogramsIngestionBurstSize = testData.nativeHistogramsIngestionBurstSize

// Start all expected distributors
distributors, _, _, _ := prepare(t, prepConfig{
numIngesters: 3,
happyIngesters: 3,
numDistributors: testData.distributors,
shardByAllLabels: true,
limits: limits,
})

// Push samples in multiple requests to the first distributor
for _, push := range testData.pushes {
var request = makeWriteRequest(0, 0, push.metadata, push.samples)

response, err := distributors[0].Push(ctx, request)

if push.expectedError == nil {
assert.Equal(t, emptyResponse, response)
assert.Nil(t, err)
} else {
assert.Nil(t, response)
assert.Equal(t, push.expectedError, err)
}
}
})
}

}

func TestPush_QuorumError(t *testing.T) {
t.Parallel()

Expand Down
46 changes: 46 additions & 0 deletions pkg/distributor/ingestion_rate_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,49 @@ func (s *infiniteStrategy) Burst(tenantID string) int {
// Burst is ignored when limit = rate.Inf
return 0
}

type localStrategyNativeHistograms struct {
limits *validation.Overrides
}

func newLocalNativeHistogramsIngestionRateStrategy(limits *validation.Overrides) limiter.RateLimiterStrategy {
return &localStrategyNativeHistograms{
limits: limits,
}
}

func (s *localStrategyNativeHistograms) Limit(tenantID string) float64 {
return s.limits.NativeHistogramsIngestionRate(tenantID)
}

func (s *localStrategyNativeHistograms) Burst(tenantID string) int {
return s.limits.NativeHistogramsIngestionBurstSize(tenantID)
}

type globalStrategyNativeHistograms struct {
limits *validation.Overrides
ring ReadLifecycler
}

func newGlobalNativeHistogramsIngestionRateStrategy(limits *validation.Overrides, ring ReadLifecycler) limiter.RateLimiterStrategy {
return &globalStrategyNativeHistograms{
limits: limits,
ring: ring,
}
}

func (s *globalStrategyNativeHistograms) Limit(tenantID string) float64 {
numDistributors := s.ring.HealthyInstancesCount()

if numDistributors == 0 {
return s.limits.NativeHistogramsIngestionRate(tenantID)
}

return s.limits.NativeHistogramsIngestionRate(tenantID) / float64(numDistributors)
}

func (s *globalStrategyNativeHistograms) Burst(tenantID string) int {
// The meaning of burst doesn't change for the global strategy, in order
// to keep it easier to understand for users / operators.
return s.limits.NativeHistogramsIngestionBurstSize(tenantID)
}
Loading
Loading