From e7d86bcea39cc2584054160346e25be285c28830 Mon Sep 17 00:00:00 2001 From: Paurush Garg Date: Thu, 5 Jun 2025 07:15:52 -0700 Subject: [PATCH 1/9] Add native histograms ingestion rate limit Signed-off-by: Paurush Garg --- pkg/distributor/distributor.go | 42 +++++++++---- pkg/distributor/distributor_test.go | 11 ++++ pkg/distributor/ingestion_rate_strategy.go | 46 ++++++++++++++ .../ingestion_rate_strategy_test.go | 62 ++++++++++++------- pkg/util/validation/limits.go | 60 +++++++++++------- pkg/util/validation/validate.go | 3 +- 6 files changed, 168 insertions(+), 56 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 6843bc27fcc..767a25877ff 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -95,7 +95,8 @@ type Distributor struct { HATracker *ha.HATracker // Per-user rate limiter. - ingestionRateLimiter *limiter.RateLimiter + ingestionRateLimiter *limiter.RateLimiter + nativeHistogramsIngestionRateLimiter *limiter.RateLimiter // Manager for subservices (HA Tracker, distributor ring and client pool) subservices *services.Manager @@ -267,11 +268,13 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove // it's an internal dependency and can't join the distributors ring, we skip rate // limiting. var ingestionRateStrategy limiter.RateLimiterStrategy + var nativeHistogramsIngestionRateStrategy limiter.RateLimiterStrategy var distributorsLifeCycler *ring.Lifecycler var distributorsRing *ring.Ring if !canJoinDistributorsRing { ingestionRateStrategy = newInfiniteIngestionRateStrategy() + nativeHistogramsIngestionRateStrategy = newInfiniteIngestionRateStrategy() } else if limits.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy { distributorsLifeCycler, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ringKey, true, true, log, prometheus.WrapRegistererWithPrefix("cortex_", reg)) if err != nil { @@ -285,21 +288,24 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove subservices = append(subservices, distributorsLifeCycler, distributorsRing) ingestionRateStrategy = newGlobalIngestionRateStrategy(limits, distributorsLifeCycler) + nativeHistogramsIngestionRateStrategy = newGlobalNativeHistogramsIngestionRateStrategy(limits, distributorsLifeCycler) } else { ingestionRateStrategy = newLocalIngestionRateStrategy(limits) + nativeHistogramsIngestionRateStrategy = newLocalNativeHistogramsIngestionRateStrategy(limits) } d := &Distributor{ - cfg: cfg, - log: log, - ingestersRing: ingestersRing, - ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log), - distributorsLifeCycler: distributorsLifeCycler, - distributorsRing: distributorsRing, - limits: limits, - ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second), - HATracker: haTracker, - ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval), + cfg: cfg, + log: log, + ingestersRing: ingestersRing, + ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log), + distributorsLifeCycler: distributorsLifeCycler, + distributorsRing: distributorsRing, + limits: limits, + ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second), + nativeHistogramsIngestionRateLimiter: limiter.NewRateLimiter(nativeHistogramsIngestionRateStrategy, 10*time.Second), + HATracker: haTracker, + ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval), queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Namespace: "cortex", @@ -774,6 +780,20 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co totalSamples := validatedFloatSamples + validatedHistogramSamples totalN := totalSamples + validatedExemplars + len(validatedMetadata) + + if !d.nativeHistogramsIngestionRateLimiter.AllowN(now, userID, validatedHistogramSamples) { + // Ensure the request slice is reused if the request is rate limited. + cortexpb.ReuseSlice(req.Timeseries) + + d.validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(totalSamples)) + d.validateMetrics.DiscardedExemplars.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(validatedExemplars)) + d.validateMetrics.DiscardedMetadata.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(len(validatedMetadata))) + // Return a 429 here to tell the client it is going too fast. + // Client may discard the data or slow down and re-send. + // Prometheus v2.26 added a remote-write option 'retry_on_http_429'. + return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.nativeHistogramsIngestionRateLimiter.Limit(now, userID), totalSamples, len(validatedMetadata)) + } + if !d.ingestionRateLimiter.AllowN(now, userID, totalN) { // Ensure the request slice is reused if the request is rate limited. cortexpb.ReuseSlice(req.Timeseries) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 2d3ceb62c1d..f1bc2b5b701 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -351,6 +351,15 @@ func TestDistributor_Push(t *testing.T) { cortex_distributor_received_samples_total{type="histogram",user="userDistributorPush"} 25 `, }, + "A push not exceeding burst size but exceeding nativeHistograms burst size should fail, histograms": { + numIngesters: 3, + happyIngesters: 3, + samples: samplesIn{num: 15, startTimestampMs: 123456789000}, + histogramSamples: true, + metadata: 5, + expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (10) exceeded while adding 15 samples and 5 metadata"), + metricNames: []string{lastSeenTimestamp, distributorReceivedSamples}, + }, } { for _, useStreamPush := range []bool{false, true} { for _, shardByAllLabels := range []bool{true, false} { @@ -364,6 +373,8 @@ func TestDistributor_Push(t *testing.T) { flagext.DefaultValues(limits) limits.IngestionRate = 20 limits.IngestionBurstSize = 20 + limits.NativeHistogramsIngestionRate = 10 + limits.NativeHistogramsIngestionBurstSize = 10 ds, _, regs, _ := prepare(t, prepConfig{ numIngesters: tc.numIngesters, diff --git a/pkg/distributor/ingestion_rate_strategy.go b/pkg/distributor/ingestion_rate_strategy.go index cc3e5dd2402..09629011af6 100644 --- a/pkg/distributor/ingestion_rate_strategy.go +++ b/pkg/distributor/ingestion_rate_strategy.go @@ -72,3 +72,49 @@ func (s *infiniteStrategy) Burst(tenantID string) int { // Burst is ignored when limit = rate.Inf return 0 } + +type localStrategyNativeHistograms struct { + limits *validation.Overrides +} + +func newLocalNativeHistogramsIngestionRateStrategy(limits *validation.Overrides) limiter.RateLimiterStrategy { + return &localStrategyNativeHistograms{ + limits: limits, + } +} + +func (s *localStrategyNativeHistograms) Limit(tenantID string) float64 { + return s.limits.NativeHistogramsIngestionRate(tenantID) +} + +func (s *localStrategyNativeHistograms) Burst(tenantID string) int { + return s.limits.NativeHistogramsIngestionBurstSize(tenantID) +} + +type globalStrategyNativeHistograms struct { + limits *validation.Overrides + ring ReadLifecycler +} + +func newGlobalNativeHistogramsIngestionRateStrategy(limits *validation.Overrides, ring ReadLifecycler) limiter.RateLimiterStrategy { + return &globalStrategyNativeHistograms{ + limits: limits, + ring: ring, + } +} + +func (s *globalStrategyNativeHistograms) Limit(tenantID string) float64 { + numDistributors := s.ring.HealthyInstancesCount() + + if numDistributors == 0 { + return s.limits.NativeHistogramsIngestionRate(tenantID) + } + + return s.limits.NativeHistogramsIngestionRate(tenantID) / float64(numDistributors) +} + +func (s *globalStrategyNativeHistograms) Burst(tenantID string) int { + // The meaning of burst doesn't change for the global strategy, in order + // to keep it easier to understand for users / operators. + return s.limits.NativeHistogramsIngestionBurstSize(tenantID) +} diff --git a/pkg/distributor/ingestion_rate_strategy_test.go b/pkg/distributor/ingestion_rate_strategy_test.go index 2820c2fb59d..baf47d7d796 100644 --- a/pkg/distributor/ingestion_rate_strategy_test.go +++ b/pkg/distributor/ingestion_rate_strategy_test.go @@ -15,44 +15,58 @@ import ( func TestIngestionRateStrategy(t *testing.T) { t.Parallel() tests := map[string]struct { - limits validation.Limits - ring ReadLifecycler - expectedLimit float64 - expectedBurst int + limits validation.Limits + ring ReadLifecycler + expectedLimit float64 + expectedBurst int + expectedNativeHistogramsLimit float64 + expectedNativeHistogramsBurst int }{ "local rate limiter should just return configured limits": { limits: validation.Limits{ - IngestionRateStrategy: validation.LocalIngestionRateStrategy, - IngestionRate: float64(1000), - IngestionBurstSize: 10000, + IngestionRateStrategy: validation.LocalIngestionRateStrategy, + IngestionRate: float64(1000), + IngestionBurstSize: 10000, + NativeHistogramsIngestionRate: float64(100), + NativeHistogramsIngestionBurstSize: 100, }, - ring: nil, - expectedLimit: float64(1000), - expectedBurst: 10000, + ring: nil, + expectedLimit: float64(1000), + expectedBurst: 10000, + expectedNativeHistogramsLimit: float64(100), + expectedNativeHistogramsBurst: 100, }, "global rate limiter should share the limit across the number of distributors": { limits: validation.Limits{ - IngestionRateStrategy: validation.GlobalIngestionRateStrategy, - IngestionRate: float64(1000), - IngestionBurstSize: 10000, + IngestionRateStrategy: validation.GlobalIngestionRateStrategy, + IngestionRate: float64(1000), + IngestionBurstSize: 10000, + NativeHistogramsIngestionRate: float64(100), + NativeHistogramsIngestionBurstSize: 100, }, ring: func() ReadLifecycler { ring := newReadLifecyclerMock() ring.On("HealthyInstancesCount").Return(2) return ring }(), - expectedLimit: float64(500), - expectedBurst: 10000, + expectedLimit: float64(500), + expectedBurst: 10000, + expectedNativeHistogramsLimit: float64(50), + expectedNativeHistogramsBurst: 100, }, "infinite rate limiter should return unlimited settings": { limits: validation.Limits{ - IngestionRateStrategy: "infinite", - IngestionRate: float64(1000), - IngestionBurstSize: 10000, + IngestionRateStrategy: "infinite", + IngestionRate: float64(1000), + IngestionBurstSize: 10000, + NativeHistogramsIngestionRate: float64(100), + NativeHistogramsIngestionBurstSize: 100, }, - ring: nil, - expectedLimit: float64(rate.Inf), - expectedBurst: 0, + ring: nil, + expectedLimit: float64(rate.Inf), + expectedBurst: 0, + expectedNativeHistogramsLimit: float64(rate.Inf), + expectedNativeHistogramsBurst: 0, }, } @@ -62,6 +76,7 @@ func TestIngestionRateStrategy(t *testing.T) { t.Run(testName, func(t *testing.T) { t.Parallel() var strategy limiter.RateLimiterStrategy + var nativeHistogramsStrategy limiter.RateLimiterStrategy // Init limits overrides overrides, err := validation.NewOverrides(testData.limits, nil) @@ -71,16 +86,21 @@ func TestIngestionRateStrategy(t *testing.T) { switch testData.limits.IngestionRateStrategy { case validation.LocalIngestionRateStrategy: strategy = newLocalIngestionRateStrategy(overrides) + nativeHistogramsStrategy = newLocalNativeHistogramsIngestionRateStrategy(overrides) case validation.GlobalIngestionRateStrategy: strategy = newGlobalIngestionRateStrategy(overrides, testData.ring) + nativeHistogramsStrategy = newGlobalNativeHistogramsIngestionRateStrategy(overrides, testData.ring) case "infinite": strategy = newInfiniteIngestionRateStrategy() + nativeHistogramsStrategy = newInfiniteIngestionRateStrategy() default: require.Fail(t, "Unknown strategy") } assert.Equal(t, strategy.Limit("test"), testData.expectedLimit) assert.Equal(t, strategy.Burst("test"), testData.expectedBurst) + assert.Equal(t, nativeHistogramsStrategy.Limit("test"), testData.expectedNativeHistogramsLimit) + assert.Equal(t, nativeHistogramsStrategy.Burst("test"), testData.expectedNativeHistogramsBurst) }) } } diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 6419dc6ba89..dff59afb1b4 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -123,29 +123,31 @@ type LimitsPerLabelSet struct { // limits via flags, or per-user limits via yaml config. type Limits struct { // Distributor enforced limits. - IngestionRate float64 `yaml:"ingestion_rate" json:"ingestion_rate"` - IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"` - IngestionBurstSize int `yaml:"ingestion_burst_size" json:"ingestion_burst_size"` - AcceptHASamples bool `yaml:"accept_ha_samples" json:"accept_ha_samples"` - AcceptMixedHASamples bool `yaml:"accept_mixed_ha_samples" json:"accept_mixed_ha_samples"` - HAClusterLabel string `yaml:"ha_cluster_label" json:"ha_cluster_label"` - HAReplicaLabel string `yaml:"ha_replica_label" json:"ha_replica_label"` - HAMaxClusters int `yaml:"ha_max_clusters" json:"ha_max_clusters"` - DropLabels flagext.StringSlice `yaml:"drop_labels" json:"drop_labels"` - MaxLabelNameLength int `yaml:"max_label_name_length" json:"max_label_name_length"` - MaxLabelValueLength int `yaml:"max_label_value_length" json:"max_label_value_length"` - MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series" json:"max_label_names_per_series"` - MaxLabelsSizeBytes int `yaml:"max_labels_size_bytes" json:"max_labels_size_bytes"` - MaxMetadataLength int `yaml:"max_metadata_length" json:"max_metadata_length"` - RejectOldSamples bool `yaml:"reject_old_samples" json:"reject_old_samples"` - RejectOldSamplesMaxAge model.Duration `yaml:"reject_old_samples_max_age" json:"reject_old_samples_max_age"` - CreationGracePeriod model.Duration `yaml:"creation_grace_period" json:"creation_grace_period"` - EnforceMetadataMetricName bool `yaml:"enforce_metadata_metric_name" json:"enforce_metadata_metric_name"` - EnforceMetricName bool `yaml:"enforce_metric_name" json:"enforce_metric_name"` - IngestionTenantShardSize int `yaml:"ingestion_tenant_shard_size" json:"ingestion_tenant_shard_size"` - MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty" json:"metric_relabel_configs,omitempty" doc:"nocli|description=List of metric relabel configurations. Note that in most situations, it is more effective to use metrics relabeling directly in the Prometheus server, e.g. remote_write.write_relabel_configs."` - MaxNativeHistogramBuckets int `yaml:"max_native_histogram_buckets" json:"max_native_histogram_buckets"` - PromoteResourceAttributes []string `yaml:"promote_resource_attributes" json:"promote_resource_attributes"` + IngestionRate float64 `yaml:"ingestion_rate" json:"ingestion_rate"` + NativeHistogramsIngestionRate float64 `yaml:"native_histograms_ingestion_rate" json:"native_histograms_ingestion_rate"` + IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"` + IngestionBurstSize int `yaml:"ingestion_burst_size" json:"ingestion_burst_size"` + NativeHistogramsIngestionBurstSize int `yaml:"native_histograms_ingestion_burst_size" json:"native_histograms_ingestion_burst_size"` + AcceptHASamples bool `yaml:"accept_ha_samples" json:"accept_ha_samples"` + AcceptMixedHASamples bool `yaml:"accept_mixed_ha_samples" json:"accept_mixed_ha_samples"` + HAClusterLabel string `yaml:"ha_cluster_label" json:"ha_cluster_label"` + HAReplicaLabel string `yaml:"ha_replica_label" json:"ha_replica_label"` + HAMaxClusters int `yaml:"ha_max_clusters" json:"ha_max_clusters"` + DropLabels flagext.StringSlice `yaml:"drop_labels" json:"drop_labels"` + MaxLabelNameLength int `yaml:"max_label_name_length" json:"max_label_name_length"` + MaxLabelValueLength int `yaml:"max_label_value_length" json:"max_label_value_length"` + MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series" json:"max_label_names_per_series"` + MaxLabelsSizeBytes int `yaml:"max_labels_size_bytes" json:"max_labels_size_bytes"` + MaxMetadataLength int `yaml:"max_metadata_length" json:"max_metadata_length"` + RejectOldSamples bool `yaml:"reject_old_samples" json:"reject_old_samples"` + RejectOldSamplesMaxAge model.Duration `yaml:"reject_old_samples_max_age" json:"reject_old_samples_max_age"` + CreationGracePeriod model.Duration `yaml:"creation_grace_period" json:"creation_grace_period"` + EnforceMetadataMetricName bool `yaml:"enforce_metadata_metric_name" json:"enforce_metadata_metric_name"` + EnforceMetricName bool `yaml:"enforce_metric_name" json:"enforce_metric_name"` + IngestionTenantShardSize int `yaml:"ingestion_tenant_shard_size" json:"ingestion_tenant_shard_size"` + MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty" json:"metric_relabel_configs,omitempty" doc:"nocli|description=List of metric relabel configurations. Note that in most situations, it is more effective to use metrics relabeling directly in the Prometheus server, e.g. remote_write.write_relabel_configs."` + MaxNativeHistogramBuckets int `yaml:"max_native_histogram_buckets" json:"max_native_histogram_buckets"` + PromoteResourceAttributes []string `yaml:"promote_resource_attributes" json:"promote_resource_attributes"` // Ingester enforced limits. // Series @@ -240,8 +242,10 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.IngestionTenantShardSize, "distributor.ingestion-tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used. Must be set both on ingesters and distributors. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.") f.Float64Var(&l.IngestionRate, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.") + f.Float64Var(&l.NativeHistogramsIngestionRate, "distributor.native-histograms-ingestion-rate-limit", 2500, "Per-user nativeHistograms ingestion rate limit in samples per second.") f.StringVar(&l.IngestionRateStrategy, "distributor.ingestion-rate-limit-strategy", "local", "Whether the ingestion rate limit should be applied individually to each distributor instance (local), or evenly shared across the cluster (global).") f.IntVar(&l.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples).") + f.IntVar(&l.NativeHistogramsIngestionBurstSize, "distributor.native-histograms-ingestion-burst-size", 5000, "Per-user allowed nativeHistograms ingestion burst size (in number of samples).") f.BoolVar(&l.AcceptHASamples, "distributor.ha-tracker.enable-for-all-users", false, "Flag to enable, for all users, handling of samples with external labels identifying replicas in an HA Prometheus setup.") f.BoolVar(&l.AcceptMixedHASamples, "experimental.distributor.ha-tracker.mixed-ha-samples", false, "[Experimental] Flag to enable handling of samples with mixed external labels identifying replicas in an HA Prometheus setup. Supported only if -distributor.ha-tracker.enable-for-all-users is true.") f.StringVar(&l.HAClusterLabel, "distributor.ha-tracker.cluster", "cluster", "Prometheus label to look for in samples to identify a Prometheus HA cluster.") @@ -577,6 +581,11 @@ func (o *Overrides) IngestionRate(userID string) float64 { return o.GetOverridesForUser(userID).IngestionRate } +// NativeHistogramsIngestionRate returns the limit on ingester rate (samples per second). +func (o *Overrides) NativeHistogramsIngestionRate(userID string) float64 { + return o.GetOverridesForUser(userID).NativeHistogramsIngestionRate +} + // IngestionRateStrategy returns whether the ingestion rate limit should be individually applied // to each distributor instance (local) or evenly shared across the cluster (global). func (o *Overrides) IngestionRateStrategy() string { @@ -589,6 +598,11 @@ func (o *Overrides) IngestionBurstSize(userID string) int { return o.GetOverridesForUser(userID).IngestionBurstSize } +// NativeHistogramsIngestionBurstSize returns the burst size for ingestion rate. +func (o *Overrides) NativeHistogramsIngestionBurstSize(userID string) int { + return o.GetOverridesForUser(userID).NativeHistogramsIngestionBurstSize +} + // AcceptHASamples returns whether the distributor should track and accept samples from HA replicas for this user. func (o *Overrides) AcceptHASamples(userID string) bool { return o.GetOverridesForUser(userID).AcceptHASamples diff --git a/pkg/util/validation/validate.go b/pkg/util/validation/validate.go index 557c8bdb41b..7c8b3ee82df 100644 --- a/pkg/util/validation/validate.go +++ b/pkg/util/validation/validate.go @@ -61,7 +61,8 @@ const ( // RateLimited is one of the values for the reason to discard samples. // Declared here to avoid duplication in ingester and distributor. - RateLimited = "rate_limited" + RateLimited = "rate_limited" + NativeHistogramsRateLimited = "natve_histograms_rate_limited" // Too many HA clusters is one of the reasons for discarding samples. TooManyHAClusters = "too_many_ha_clusters" From be1d61bc809a445a6c8619a1ece931c9d18b1f77 Mon Sep 17 00:00:00 2001 From: Paurush Garg Date: Thu, 5 Jun 2025 21:09:34 -0700 Subject: [PATCH 2/9] Adding Tests and Updating Docs Signed-off-by: Paurush Garg --- docs/configuration/config-file-reference.md | 8 + pkg/distributor/distributor.go | 26 +-- pkg/distributor/distributor_test.go | 212 +++++++++++++++----- pkg/util/validation/limits.go | 4 +- 4 files changed, 182 insertions(+), 68 deletions(-) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 4460ae63021..5a1dce36cdb 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3427,6 +3427,10 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -distributor.ingestion-rate-limit [ingestion_rate: | default = 25000] +# Per-user nativeHistograms ingestion rate limit in samples per second. +# CLI flag: -distributor.native-histograms-ingestion-rate-limit +[native_histograms_ingestion_rate: | default = 25000] + # Whether the ingestion rate limit should be applied individually to each # distributor instance (local), or evenly shared across the cluster (global). # CLI flag: -distributor.ingestion-rate-limit-strategy @@ -3436,6 +3440,10 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -distributor.ingestion-burst-size [ingestion_burst_size: | default = 50000] +# Per-user allowed nativeHistograms ingestion burst size (in number of samples). +# CLI flag: -distributor.native-histograms-ingestion-burst-size +[native_histograms_ingestion_burst_size: | default = 50000] + # Flag to enable, for all users, handling of samples with external labels # identifying replicas in an HA Prometheus setup. # CLI flag: -distributor.ha-tracker.enable-for-all-users diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 767a25877ff..d35a545b1d6 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -781,29 +781,25 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co totalSamples := validatedFloatSamples + validatedHistogramSamples totalN := totalSamples + validatedExemplars + len(validatedMetadata) - if !d.nativeHistogramsIngestionRateLimiter.AllowN(now, userID, validatedHistogramSamples) { + nhRateLimited := !d.nativeHistogramsIngestionRateLimiter.AllowN(now, userID, validatedHistogramSamples) + rateLimited := !d.ingestionRateLimiter.AllowN(now, userID, totalN) + + if nhRateLimited || rateLimited { // Ensure the request slice is reused if the request is rate limited. cortexpb.ReuseSlice(req.Timeseries) d.validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(totalSamples)) d.validateMetrics.DiscardedExemplars.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(validatedExemplars)) d.validateMetrics.DiscardedMetadata.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(len(validatedMetadata))) - // Return a 429 here to tell the client it is going too fast. - // Client may discard the data or slow down and re-send. - // Prometheus v2.26 added a remote-write option 'retry_on_http_429'. - return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.nativeHistogramsIngestionRateLimiter.Limit(now, userID), totalSamples, len(validatedMetadata)) } - if !d.ingestionRateLimiter.AllowN(now, userID, totalN) { - // Ensure the request slice is reused if the request is rate limited. - cortexpb.ReuseSlice(req.Timeseries) - - d.validateMetrics.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(totalSamples)) - d.validateMetrics.DiscardedExemplars.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedExemplars)) - d.validateMetrics.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata))) - // Return a 429 here to tell the client it is going too fast. - // Client may discard the data or slow down and re-send. - // Prometheus v2.26 added a remote-write option 'retry_on_http_429'. + // Return a 429 here to tell the client it is going too fast. + // Client may discard the data or slow down and re-send. + // Prometheus v2.26 added a remote-write option 'retry_on_http_429'. + if nhRateLimited { + return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.nativeHistogramsIngestionRateLimiter.Limit(now, userID), totalSamples, len(validatedMetadata)) + } + if rateLimited { return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), totalSamples, len(validatedMetadata)) } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index f1bc2b5b701..ddf2bf1b940 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -597,17 +597,21 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "user") tests := map[string]struct { - distributors int - ingestionRateStrategy string - ingestionRate float64 - ingestionBurstSize int - pushes []testPush + distributors int + ingestionRateStrategy string + ingestionRate float64 + ingestionBurstSize int + nativeHistogramsIngestionRate float64 + nativeHistogramsIngestionBurstSize int + pushes []testPush }{ "local strategy: limit should be set to each distributor": { - distributors: 2, - ingestionRateStrategy: validation.LocalIngestionRateStrategy, - ingestionRate: 10, - ingestionBurstSize: 10, + distributors: 2, + ingestionRateStrategy: validation.LocalIngestionRateStrategy, + ingestionRate: 10, + ingestionBurstSize: 10, + nativeHistogramsIngestionRate: 10, + nativeHistogramsIngestionBurstSize: 10, pushes: []testPush{ {samples: 4, expectedError: nil}, {metadata: 1, expectedError: nil}, @@ -618,10 +622,12 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { }, }, "global strategy: limit should be evenly shared across distributors": { - distributors: 2, - ingestionRateStrategy: validation.GlobalIngestionRateStrategy, - ingestionRate: 10, - ingestionBurstSize: 5, + distributors: 2, + ingestionRateStrategy: validation.GlobalIngestionRateStrategy, + ingestionRate: 10, + ingestionBurstSize: 5, + nativeHistogramsIngestionRate: 10, + nativeHistogramsIngestionBurstSize: 5, pushes: []testPush{ {samples: 2, expectedError: nil}, {samples: 1, expectedError: nil}, @@ -632,10 +638,12 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { }, }, "global strategy: burst should set to each distributor": { - distributors: 2, - ingestionRateStrategy: validation.GlobalIngestionRateStrategy, - ingestionRate: 10, - ingestionBurstSize: 20, + distributors: 2, + ingestionRateStrategy: validation.GlobalIngestionRateStrategy, + ingestionRate: 10, + ingestionBurstSize: 20, + nativeHistogramsIngestionRate: 10, + nativeHistogramsIngestionBurstSize: 20, pushes: []testPush{ {samples: 10, expectedError: nil}, {samples: 5, expectedError: nil}, @@ -650,46 +658,148 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { for testName, testData := range tests { testData := testData - for _, enableHistogram := range []bool{false, true} { - enableHistogram := enableHistogram - t.Run(fmt.Sprintf("%s, histogram=%s", testName, strconv.FormatBool(enableHistogram)), func(t *testing.T) { - t.Parallel() - limits := &validation.Limits{} - flagext.DefaultValues(limits) - limits.IngestionRateStrategy = testData.ingestionRateStrategy - limits.IngestionRate = testData.ingestionRate - limits.IngestionBurstSize = testData.ingestionBurstSize + t.Run(testName, func(t *testing.T) { + t.Parallel() + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.IngestionRateStrategy = testData.ingestionRateStrategy + limits.IngestionRate = testData.ingestionRate + limits.IngestionBurstSize = testData.ingestionBurstSize + limits.NativeHistogramsIngestionRate = testData.nativeHistogramsIngestionRate + limits.NativeHistogramsIngestionBurstSize = testData.nativeHistogramsIngestionBurstSize + + // Start all expected distributors + distributors, _, _, _ := prepare(t, prepConfig{ + numIngesters: 3, + happyIngesters: 3, + numDistributors: testData.distributors, + shardByAllLabels: true, + limits: limits, + }) - // Start all expected distributors - distributors, _, _, _ := prepare(t, prepConfig{ - numIngesters: 3, - happyIngesters: 3, - numDistributors: testData.distributors, - shardByAllLabels: true, - limits: limits, - }) + // Push samples in multiple requests to the first distributor + for _, push := range testData.pushes { + var request = makeWriteRequest(0, push.samples, push.metadata, 0) - // Push samples in multiple requests to the first distributor - for _, push := range testData.pushes { - var request *cortexpb.WriteRequest - if !enableHistogram { - request = makeWriteRequest(0, push.samples, push.metadata, 0) - } else { - request = makeWriteRequest(0, 0, push.metadata, push.samples) - } - response, err := distributors[0].Push(ctx, request) + response, err := distributors[0].Push(ctx, request) - if push.expectedError == nil { - assert.Equal(t, emptyResponse, response) - assert.Nil(t, err) - } else { - assert.Nil(t, response) - assert.Equal(t, push.expectedError, err) - } + if push.expectedError == nil { + assert.Equal(t, emptyResponse, response) + assert.Nil(t, err) + } else { + assert.Nil(t, response) + assert.Equal(t, push.expectedError, err) } + } + }) + } +} + +func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) { + t.Parallel() + type testPush struct { + samples int + metadata int + expectedError error + } + + ctx := user.InjectOrgID(context.Background(), "user") + tests := map[string]struct { + distributors int + ingestionRateStrategy string + ingestionRate float64 + ingestionBurstSize int + nativeHistogramsIngestionRate float64 + nativeHistogramsIngestionBurstSize int + pushes []testPush + }{ + "local strategy: limit should be set to each distributor: histograms": { + distributors: 2, + ingestionRateStrategy: validation.LocalIngestionRateStrategy, + ingestionRate: 10, + ingestionBurstSize: 10, + nativeHistogramsIngestionRate: 5, + nativeHistogramsIngestionBurstSize: 5, + pushes: []testPush{ + {samples: 2, expectedError: nil}, + {metadata: 1, expectedError: nil}, + {samples: 4, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (5) exceeded while adding 4 samples and 0 metadata")}, + {samples: 2, metadata: 1, expectedError: nil}, + {samples: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (5) exceeded while adding 3 samples and 0 metadata")}, + {metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10) exceeded while adding 0 samples and 1 metadata")}, + }, + }, + "global strategy: limit should be evenly shared across distributors: histograms": { + distributors: 2, + ingestionRateStrategy: validation.GlobalIngestionRateStrategy, + ingestionRate: 10, + ingestionBurstSize: 5, + nativeHistogramsIngestionRate: 6, + nativeHistogramsIngestionBurstSize: 3, + pushes: []testPush{ + {samples: 2, expectedError: nil}, + {samples: 2, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (3) exceeded while adding 2 samples and 1 metadata")}, + {samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 1 samples and 0 metadata")}, + {samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (3) exceeded while adding 1 samples and 0 metadata")}, + }, + }, + "global strategy: burst should set to each distributor: histograms": { + distributors: 2, + ingestionRateStrategy: validation.GlobalIngestionRateStrategy, + ingestionRate: 10, + ingestionBurstSize: 20, + nativeHistogramsIngestionRate: 6, + nativeHistogramsIngestionBurstSize: 10, + pushes: []testPush{ + {samples: 3, expectedError: nil}, + {samples: 1, expectedError: nil}, + {samples: 7, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (3) exceeded while adding 7 samples and 1 metadata")}, + {samples: 5, expectedError: nil}, + {samples: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (3) exceeded while adding 3 samples and 0 metadata")}, + {metadata: 12, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 0 samples and 12 metadata")}, + }, + }, + } + + for testName, testData := range tests { + testData := testData + + t.Run(testName, func(t *testing.T) { + t.Parallel() + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.IngestionRateStrategy = testData.ingestionRateStrategy + limits.IngestionRate = testData.ingestionRate + limits.IngestionBurstSize = testData.ingestionBurstSize + limits.NativeHistogramsIngestionRate = testData.nativeHistogramsIngestionRate + limits.NativeHistogramsIngestionBurstSize = testData.nativeHistogramsIngestionBurstSize + + // Start all expected distributors + distributors, _, _, _ := prepare(t, prepConfig{ + numIngesters: 3, + happyIngesters: 3, + numDistributors: testData.distributors, + shardByAllLabels: true, + limits: limits, }) - } + + // Push samples in multiple requests to the first distributor + for _, push := range testData.pushes { + var request = makeWriteRequest(0, 0, push.metadata, push.samples) + + response, err := distributors[0].Push(ctx, request) + + if push.expectedError == nil { + assert.Equal(t, emptyResponse, response) + assert.Nil(t, err) + } else { + assert.Nil(t, response) + assert.Equal(t, push.expectedError, err) + } + } + }) } + } func TestPush_QuorumError(t *testing.T) { diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index dff59afb1b4..3d5a9f39df3 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -242,10 +242,10 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.IngestionTenantShardSize, "distributor.ingestion-tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used. Must be set both on ingesters and distributors. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.") f.Float64Var(&l.IngestionRate, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.") - f.Float64Var(&l.NativeHistogramsIngestionRate, "distributor.native-histograms-ingestion-rate-limit", 2500, "Per-user nativeHistograms ingestion rate limit in samples per second.") + f.Float64Var(&l.NativeHistogramsIngestionRate, "distributor.native-histograms-ingestion-rate-limit", 25000, "Per-user nativeHistograms ingestion rate limit in samples per second.") f.StringVar(&l.IngestionRateStrategy, "distributor.ingestion-rate-limit-strategy", "local", "Whether the ingestion rate limit should be applied individually to each distributor instance (local), or evenly shared across the cluster (global).") f.IntVar(&l.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples).") - f.IntVar(&l.NativeHistogramsIngestionBurstSize, "distributor.native-histograms-ingestion-burst-size", 5000, "Per-user allowed nativeHistograms ingestion burst size (in number of samples).") + f.IntVar(&l.NativeHistogramsIngestionBurstSize, "distributor.native-histograms-ingestion-burst-size", 50000, "Per-user allowed nativeHistograms ingestion burst size (in number of samples).") f.BoolVar(&l.AcceptHASamples, "distributor.ha-tracker.enable-for-all-users", false, "Flag to enable, for all users, handling of samples with external labels identifying replicas in an HA Prometheus setup.") f.BoolVar(&l.AcceptMixedHASamples, "experimental.distributor.ha-tracker.mixed-ha-samples", false, "[Experimental] Flag to enable handling of samples with mixed external labels identifying replicas in an HA Prometheus setup. Supported only if -distributor.ha-tracker.enable-for-all-users is true.") f.StringVar(&l.HAClusterLabel, "distributor.ha-tracker.cluster", "cluster", "Prometheus label to look for in samples to identify a Prometheus HA cluster.") From 0b4cb0d020e1868d1f8eabb040044ae272ae27a3 Mon Sep 17 00:00:00 2001 From: Paurush Garg Date: Thu, 5 Jun 2025 22:46:28 -0700 Subject: [PATCH 3/9] Resolving failed testcase Signed-off-by: Paurush Garg --- CHANGELOG.md | 1 + pkg/distributor/distributor_test.go | 1 + 2 files changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f78f1b445c3..a44768944d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Changelog ## master / unreleased +* [ENHANCEMENT] Distributor: Add ingestion rate limit for Native Histograms. #6794 * [CHANGE] Ingester: Remove EnableNativeHistograms config flag and instead gate keep through new per-tenant limit at ingestion. #6718 * [CHANGE] StoreGateway/Alertmanager: Add default 5s connection timeout on client. #6603 * [CHANGE] Validate a tenantID when to use a single tenant resolver. #6727 diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index ddf2bf1b940..8f9def19343 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -809,6 +809,7 @@ func TestPush_QuorumError(t *testing.T) { flagext.DefaultValues(&limits) limits.IngestionRate = math.MaxFloat64 + limits.NativeHistogramsIngestionRate = math.MaxFloat64 dists, ingesters, _, r := prepare(t, prepConfig{ numDistributors: 1, From 822cb4d18500ac8ed97aae24ac3ec2e320ccdab9 Mon Sep 17 00:00:00 2001 From: Paurush Garg Date: Mon, 9 Jun 2025 18:21:15 -0700 Subject: [PATCH 4/9] Resolving comments Signed-off-by: Paurush Garg --- CHANGELOG.md | 2 +- pkg/distributor/distributor.go | 17 ++++++++++------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a44768944d6..d1f51b35df8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,6 @@ # Changelog ## master / unreleased -* [ENHANCEMENT] Distributor: Add ingestion rate limit for Native Histograms. #6794 * [CHANGE] Ingester: Remove EnableNativeHistograms config flag and instead gate keep through new per-tenant limit at ingestion. #6718 * [CHANGE] StoreGateway/Alertmanager: Add default 5s connection timeout on client. #6603 * [CHANGE] Validate a tenantID when to use a single tenant resolver. #6727 @@ -38,6 +37,7 @@ * [ENHANCEMENT] Distributor: Add min/max schema validation for NativeHistograms. #6766 * [ENHANCEMENT] Ingester: Handle runtime errors in query path #6769 * [ENHANCEMENT] Compactor: Support metadata caching bucket for Cleaner. Can be enabled via `-compactor.cleaner-caching-bucket-enabled` flag. #6778 +* [ENHANCEMENT] Distributor: Add ingestion rate limit for Native Histograms. #6794 * [ENHANCEMENT] Compactor, Store Gateway: Introduce user scanner strategy and user index. #6780 * [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517 * [BUGFIX] Ingester: Fix labelset data race condition. #6573 diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index d35a545b1d6..e9f8587826d 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -784,22 +784,25 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co nhRateLimited := !d.nativeHistogramsIngestionRateLimiter.AllowN(now, userID, validatedHistogramSamples) rateLimited := !d.ingestionRateLimiter.AllowN(now, userID, totalN) - if nhRateLimited || rateLimited { + // Return a 429 here to tell the client it is going too fast. + // Client may discard the data or slow down and re-send. + // Prometheus v2.26 added a remote-write option 'retry_on_http_429'. + if nhRateLimited { // Ensure the request slice is reused if the request is rate limited. cortexpb.ReuseSlice(req.Timeseries) - d.validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(totalSamples)) d.validateMetrics.DiscardedExemplars.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(validatedExemplars)) d.validateMetrics.DiscardedMetadata.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(len(validatedMetadata))) - } - // Return a 429 here to tell the client it is going too fast. - // Client may discard the data or slow down and re-send. - // Prometheus v2.26 added a remote-write option 'retry_on_http_429'. - if nhRateLimited { return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.nativeHistogramsIngestionRateLimiter.Limit(now, userID), totalSamples, len(validatedMetadata)) } if rateLimited { + // Ensure the request slice is reused if the request is rate limited. + cortexpb.ReuseSlice(req.Timeseries) + d.validateMetrics.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(totalSamples)) + d.validateMetrics.DiscardedExemplars.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedExemplars)) + d.validateMetrics.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata))) + return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), totalSamples, len(validatedMetadata)) } From f2e4c1f9396aa8ac516ce6c1fd4ea41f3ed01504 Mon Sep 17 00:00:00 2001 From: Paurush Garg Date: Wed, 11 Jun 2025 20:19:34 -0700 Subject: [PATCH 5/9] Resolving comments Signed-off-by: Paurush Garg --- pkg/distributor/distributor.go | 5 +- pkg/distributor/distributor_test.go | 108 ++++++++++++++-------------- pkg/util/validation/limits.go | 4 +- 3 files changed, 58 insertions(+), 59 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index e9f8587826d..a5139fb9636 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -781,7 +781,10 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co totalSamples := validatedFloatSamples + validatedHistogramSamples totalN := totalSamples + validatedExemplars + len(validatedMetadata) - nhRateLimited := !d.nativeHistogramsIngestionRateLimiter.AllowN(now, userID, validatedHistogramSamples) + nhRateLimited := false + if limits.NativeHistogramsIngestionRate > 0 && limits.NativeHistogramsIngestionBurstSize > 0 { + nhRateLimited = !d.nativeHistogramsIngestionRateLimiter.AllowN(now, userID, validatedHistogramSamples) + } rateLimited := !d.ingestionRateLimiter.AllowN(now, userID, totalN) // Return a 429 here to tell the client it is going too fast. diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 8f9def19343..bbfc781fcf4 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -597,21 +597,17 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "user") tests := map[string]struct { - distributors int - ingestionRateStrategy string - ingestionRate float64 - ingestionBurstSize int - nativeHistogramsIngestionRate float64 - nativeHistogramsIngestionBurstSize int - pushes []testPush + distributors int + ingestionRateStrategy string + ingestionRate float64 + ingestionBurstSize int + pushes []testPush }{ "local strategy: limit should be set to each distributor": { - distributors: 2, - ingestionRateStrategy: validation.LocalIngestionRateStrategy, - ingestionRate: 10, - ingestionBurstSize: 10, - nativeHistogramsIngestionRate: 10, - nativeHistogramsIngestionBurstSize: 10, + distributors: 2, + ingestionRateStrategy: validation.LocalIngestionRateStrategy, + ingestionRate: 10, + ingestionBurstSize: 10, pushes: []testPush{ {samples: 4, expectedError: nil}, {metadata: 1, expectedError: nil}, @@ -622,12 +618,10 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { }, }, "global strategy: limit should be evenly shared across distributors": { - distributors: 2, - ingestionRateStrategy: validation.GlobalIngestionRateStrategy, - ingestionRate: 10, - ingestionBurstSize: 5, - nativeHistogramsIngestionRate: 10, - nativeHistogramsIngestionBurstSize: 5, + distributors: 2, + ingestionRateStrategy: validation.GlobalIngestionRateStrategy, + ingestionRate: 10, + ingestionBurstSize: 5, pushes: []testPush{ {samples: 2, expectedError: nil}, {samples: 1, expectedError: nil}, @@ -638,12 +632,10 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { }, }, "global strategy: burst should set to each distributor": { - distributors: 2, - ingestionRateStrategy: validation.GlobalIngestionRateStrategy, - ingestionRate: 10, - ingestionBurstSize: 20, - nativeHistogramsIngestionRate: 10, - nativeHistogramsIngestionBurstSize: 20, + distributors: 2, + ingestionRateStrategy: validation.GlobalIngestionRateStrategy, + ingestionRate: 10, + ingestionBurstSize: 20, pushes: []testPush{ {samples: 10, expectedError: nil}, {samples: 5, expectedError: nil}, @@ -658,40 +650,45 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { for testName, testData := range tests { testData := testData - t.Run(testName, func(t *testing.T) { - t.Parallel() - limits := &validation.Limits{} - flagext.DefaultValues(limits) - limits.IngestionRateStrategy = testData.ingestionRateStrategy - limits.IngestionRate = testData.ingestionRate - limits.IngestionBurstSize = testData.ingestionBurstSize - limits.NativeHistogramsIngestionRate = testData.nativeHistogramsIngestionRate - limits.NativeHistogramsIngestionBurstSize = testData.nativeHistogramsIngestionBurstSize - - // Start all expected distributors - distributors, _, _, _ := prepare(t, prepConfig{ - numIngesters: 3, - happyIngesters: 3, - numDistributors: testData.distributors, - shardByAllLabels: true, - limits: limits, - }) + for _, enableHistogram := range []bool{false, true} { + enableHistogram := enableHistogram + t.Run(fmt.Sprintf("%s, histogram=%s", testName, strconv.FormatBool(enableHistogram)), func(t *testing.T) { + t.Parallel() + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.IngestionRateStrategy = testData.ingestionRateStrategy + limits.IngestionRate = testData.ingestionRate + limits.IngestionBurstSize = testData.ingestionBurstSize - // Push samples in multiple requests to the first distributor - for _, push := range testData.pushes { - var request = makeWriteRequest(0, push.samples, push.metadata, 0) + // Start all expected distributors + distributors, _, _, _ := prepare(t, prepConfig{ + numIngesters: 3, + happyIngesters: 3, + numDistributors: testData.distributors, + shardByAllLabels: true, + limits: limits, + }) - response, err := distributors[0].Push(ctx, request) + // Push samples in multiple requests to the first distributor + for _, push := range testData.pushes { + var request *cortexpb.WriteRequest + if !enableHistogram { + request = makeWriteRequest(0, push.samples, push.metadata, 0) + } else { + request = makeWriteRequest(0, 0, push.metadata, push.samples) + } + response, err := distributors[0].Push(ctx, request) - if push.expectedError == nil { - assert.Equal(t, emptyResponse, response) - assert.Nil(t, err) - } else { - assert.Nil(t, response) - assert.Equal(t, push.expectedError, err) + if push.expectedError == nil { + assert.Equal(t, emptyResponse, response) + assert.Nil(t, err) + } else { + assert.Nil(t, response) + assert.Equal(t, push.expectedError, err) + } } - } - }) + }) + } } } @@ -809,7 +806,6 @@ func TestPush_QuorumError(t *testing.T) { flagext.DefaultValues(&limits) limits.IngestionRate = math.MaxFloat64 - limits.NativeHistogramsIngestionRate = math.MaxFloat64 dists, ingesters, _, r := prepare(t, prepConfig{ numDistributors: 1, diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 3d5a9f39df3..b31618ea292 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -242,10 +242,10 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.IngestionTenantShardSize, "distributor.ingestion-tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used. Must be set both on ingesters and distributors. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.") f.Float64Var(&l.IngestionRate, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.") - f.Float64Var(&l.NativeHistogramsIngestionRate, "distributor.native-histograms-ingestion-rate-limit", 25000, "Per-user nativeHistograms ingestion rate limit in samples per second.") + f.Float64Var(&l.NativeHistogramsIngestionRate, "distributor.native-histograms-ingestion-rate-limit", 0, "Per-user nativeHistograms ingestion rate limit in samples per second. 0 to disable the limit") f.StringVar(&l.IngestionRateStrategy, "distributor.ingestion-rate-limit-strategy", "local", "Whether the ingestion rate limit should be applied individually to each distributor instance (local), or evenly shared across the cluster (global).") f.IntVar(&l.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples).") - f.IntVar(&l.NativeHistogramsIngestionBurstSize, "distributor.native-histograms-ingestion-burst-size", 50000, "Per-user allowed nativeHistograms ingestion burst size (in number of samples).") + f.IntVar(&l.NativeHistogramsIngestionBurstSize, "distributor.native-histograms-ingestion-burst-size", 0, "Per-user allowed nativeHistograms ingestion burst size (in number of samples). 0 to disable the limit") f.BoolVar(&l.AcceptHASamples, "distributor.ha-tracker.enable-for-all-users", false, "Flag to enable, for all users, handling of samples with external labels identifying replicas in an HA Prometheus setup.") f.BoolVar(&l.AcceptMixedHASamples, "experimental.distributor.ha-tracker.mixed-ha-samples", false, "[Experimental] Flag to enable handling of samples with mixed external labels identifying replicas in an HA Prometheus setup. Supported only if -distributor.ha-tracker.enable-for-all-users is true.") f.StringVar(&l.HAClusterLabel, "distributor.ha-tracker.cluster", "cluster", "Prometheus label to look for in samples to identify a Prometheus HA cluster.") From 55896c87bd29633aa9fd4597aaf8822e906a40c7 Mon Sep 17 00:00:00 2001 From: Paurush Garg Date: Wed, 11 Jun 2025 20:35:49 -0700 Subject: [PATCH 6/9] Updating doc Signed-off-by: Paurush Garg --- docs/configuration/config-file-reference.md | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 5a1dce36cdb..7431d053787 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3427,9 +3427,10 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -distributor.ingestion-rate-limit [ingestion_rate: | default = 25000] -# Per-user nativeHistograms ingestion rate limit in samples per second. +# Per-user nativeHistograms ingestion rate limit in samples per second. 0 to +# disable the limit # CLI flag: -distributor.native-histograms-ingestion-rate-limit -[native_histograms_ingestion_rate: | default = 25000] +[native_histograms_ingestion_rate: | default = 0] # Whether the ingestion rate limit should be applied individually to each # distributor instance (local), or evenly shared across the cluster (global). @@ -3441,8 +3442,9 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s [ingestion_burst_size: | default = 50000] # Per-user allowed nativeHistograms ingestion burst size (in number of samples). +# 0 to disable the limit # CLI flag: -distributor.native-histograms-ingestion-burst-size -[native_histograms_ingestion_burst_size: | default = 50000] +[native_histograms_ingestion_burst_size: | default = 0] # Flag to enable, for all users, handling of samples with external labels # identifying replicas in an HA Prometheus setup. From ff78e593bca6efcf7347e8ec253f699373d7b01d Mon Sep 17 00:00:00 2001 From: Paurush Garg Date: Tue, 17 Jun 2025 12:58:11 -0700 Subject: [PATCH 7/9] Changing NativeHistograms default ingestion limits Signed-off-by: Paurush Garg --- docs/configuration/config-file-reference.md | 9 ++++----- pkg/distributor/distributor.go | 5 +++-- pkg/distributor/distributor_test.go | 16 ++++++++-------- pkg/util/validation/limits.go | 4 ++-- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 7431d053787..720505498d2 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3427,10 +3427,10 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -distributor.ingestion-rate-limit [ingestion_rate: | default = 25000] -# Per-user nativeHistograms ingestion rate limit in samples per second. 0 to -# disable the limit +# Per-user native histograms ingestion rate limit in samples per second. +# Disabled by default # CLI flag: -distributor.native-histograms-ingestion-rate-limit -[native_histograms_ingestion_rate: | default = 0] +[native_histograms_ingestion_rate: | default = 1.7976931348623157e+308] # Whether the ingestion rate limit should be applied individually to each # distributor instance (local), or evenly shared across the cluster (global). @@ -3441,8 +3441,7 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -distributor.ingestion-burst-size [ingestion_burst_size: | default = 50000] -# Per-user allowed nativeHistograms ingestion burst size (in number of samples). -# 0 to disable the limit +# Per-user allowed native histograms ingestion burst size (in number of samples) # CLI flag: -distributor.native-histograms-ingestion-burst-size [native_histograms_ingestion_burst_size: | default = 0] diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index a5139fb9636..b798a27dcb3 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "io" + "math" "net/http" "sort" "strings" @@ -782,7 +783,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co totalN := totalSamples + validatedExemplars + len(validatedMetadata) nhRateLimited := false - if limits.NativeHistogramsIngestionRate > 0 && limits.NativeHistogramsIngestionBurstSize > 0 { + if limits.NativeHistogramsIngestionRate != math.MaxFloat64 { nhRateLimited = !d.nativeHistogramsIngestionRateLimiter.AllowN(now, userID, validatedHistogramSamples) } rateLimited := !d.ingestionRateLimiter.AllowN(now, userID, totalN) @@ -797,7 +798,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co d.validateMetrics.DiscardedExemplars.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(validatedExemplars)) d.validateMetrics.DiscardedMetadata.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(len(validatedMetadata))) - return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.nativeHistogramsIngestionRateLimiter.Limit(now, userID), totalSamples, len(validatedMetadata)) + return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.nativeHistogramsIngestionRateLimiter.Limit(now, userID), totalSamples, len(validatedMetadata)) } if rateLimited { // Ensure the request slice is reused if the request is rate limited. diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index bbfc781fcf4..30d91212a10 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -351,13 +351,13 @@ func TestDistributor_Push(t *testing.T) { cortex_distributor_received_samples_total{type="histogram",user="userDistributorPush"} 25 `, }, - "A push not exceeding burst size but exceeding nativeHistograms burst size should fail, histograms": { + "A push not exceeding burst size but exceeding native histograms burst size should fail, histograms": { numIngesters: 3, happyIngesters: 3, samples: samplesIn{num: 15, startTimestampMs: 123456789000}, histogramSamples: true, metadata: 5, - expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (10) exceeded while adding 15 samples and 5 metadata"), + expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (10) exceeded while adding 15 samples and 5 metadata"), metricNames: []string{lastSeenTimestamp, distributorReceivedSamples}, }, } { @@ -720,9 +720,9 @@ func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) { pushes: []testPush{ {samples: 2, expectedError: nil}, {metadata: 1, expectedError: nil}, - {samples: 4, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (5) exceeded while adding 4 samples and 0 metadata")}, + {samples: 4, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (5) exceeded while adding 4 samples and 0 metadata")}, {samples: 2, metadata: 1, expectedError: nil}, - {samples: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (5) exceeded while adding 3 samples and 0 metadata")}, + {samples: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (5) exceeded while adding 3 samples and 0 metadata")}, {metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10) exceeded while adding 0 samples and 1 metadata")}, }, }, @@ -735,9 +735,9 @@ func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) { nativeHistogramsIngestionBurstSize: 3, pushes: []testPush{ {samples: 2, expectedError: nil}, - {samples: 2, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (3) exceeded while adding 2 samples and 1 metadata")}, + {samples: 2, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (3) exceeded while adding 2 samples and 1 metadata")}, {samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 1 samples and 0 metadata")}, - {samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (3) exceeded while adding 1 samples and 0 metadata")}, + {samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (3) exceeded while adding 1 samples and 0 metadata")}, }, }, "global strategy: burst should set to each distributor: histograms": { @@ -750,9 +750,9 @@ func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) { pushes: []testPush{ {samples: 3, expectedError: nil}, {samples: 1, expectedError: nil}, - {samples: 7, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (3) exceeded while adding 7 samples and 1 metadata")}, + {samples: 7, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (3) exceeded while adding 7 samples and 1 metadata")}, {samples: 5, expectedError: nil}, - {samples: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (3) exceeded while adding 3 samples and 0 metadata")}, + {samples: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (3) exceeded while adding 3 samples and 0 metadata")}, {metadata: 12, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 0 samples and 12 metadata")}, }, }, diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index b31618ea292..87a6a2b7b98 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -242,10 +242,10 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.IngestionTenantShardSize, "distributor.ingestion-tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used. Must be set both on ingesters and distributors. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.") f.Float64Var(&l.IngestionRate, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.") - f.Float64Var(&l.NativeHistogramsIngestionRate, "distributor.native-histograms-ingestion-rate-limit", 0, "Per-user nativeHistograms ingestion rate limit in samples per second. 0 to disable the limit") + f.Float64Var(&l.NativeHistogramsIngestionRate, "distributor.native-histograms-ingestion-rate-limit", math.MaxFloat64, "Per-user native histograms ingestion rate limit in samples per second. Disabled by default") f.StringVar(&l.IngestionRateStrategy, "distributor.ingestion-rate-limit-strategy", "local", "Whether the ingestion rate limit should be applied individually to each distributor instance (local), or evenly shared across the cluster (global).") f.IntVar(&l.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples).") - f.IntVar(&l.NativeHistogramsIngestionBurstSize, "distributor.native-histograms-ingestion-burst-size", 0, "Per-user allowed nativeHistograms ingestion burst size (in number of samples). 0 to disable the limit") + f.IntVar(&l.NativeHistogramsIngestionBurstSize, "distributor.native-histograms-ingestion-burst-size", 0, "Per-user allowed native histograms ingestion burst size (in number of samples)") f.BoolVar(&l.AcceptHASamples, "distributor.ha-tracker.enable-for-all-users", false, "Flag to enable, for all users, handling of samples with external labels identifying replicas in an HA Prometheus setup.") f.BoolVar(&l.AcceptMixedHASamples, "experimental.distributor.ha-tracker.mixed-ha-samples", false, "[Experimental] Flag to enable handling of samples with mixed external labels identifying replicas in an HA Prometheus setup. Supported only if -distributor.ha-tracker.enable-for-all-users is true.") f.StringVar(&l.HAClusterLabel, "distributor.ha-tracker.cluster", "cluster", "Prometheus label to look for in samples to identify a Prometheus HA cluster.") From ab7a85c4eb72e1ba73c8e169b8633735090b14f8 Mon Sep 17 00:00:00 2001 From: Paurush Garg Date: Thu, 19 Jun 2025 18:25:07 -0700 Subject: [PATCH 8/9] Discard only NH Samples for NH Rate Limiter Signed-off-by: Paurush Garg --- pkg/distributor/distributor.go | 55 ++++++++++++++------------ pkg/distributor/distributor_test.go | 61 ++++++++--------------------- pkg/util/validation/validate.go | 2 +- 3 files changed, 47 insertions(+), 71 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index b798a27dcb3..6b92003b12d 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -761,7 +761,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co } // A WriteRequest can only contain series or metadata but not both. This might change in the future. - seriesKeys, validatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, err := d.prepareSeriesKeys(ctx, req, userID, limits, removeReplica) + seriesKeys, nhSeriesKeys, validatedTimeseries, nhValidatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, err := d.prepareSeriesKeys(ctx, req, userID, limits, removeReplica) if err != nil { return nil, err } @@ -772,6 +772,18 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co d.receivedExemplars.WithLabelValues(userID).Add(float64(validatedExemplars)) d.receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata))) + nhRateLimited := false + if limits.NativeHistogramsIngestionRate != math.MaxFloat64 { + nhRateLimited = !d.nativeHistogramsIngestionRateLimiter.AllowN(time.Now(), userID, validatedHistogramSamples) + } + + if nhRateLimited { + d.validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(validatedHistogramSamples)) + } else { + seriesKeys = append(seriesKeys, nhSeriesKeys...) + validatedTimeseries = append(validatedTimeseries, nhValidatedTimeseries...) + } + if len(seriesKeys) == 0 && len(metadataKeys) == 0 { // Ensure the request slice is reused if there's no series or metadata passing the validation. cortexpb.ReuseSlice(req.Timeseries) @@ -781,32 +793,16 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co totalSamples := validatedFloatSamples + validatedHistogramSamples totalN := totalSamples + validatedExemplars + len(validatedMetadata) - - nhRateLimited := false - if limits.NativeHistogramsIngestionRate != math.MaxFloat64 { - nhRateLimited = !d.nativeHistogramsIngestionRateLimiter.AllowN(now, userID, validatedHistogramSamples) - } - rateLimited := !d.ingestionRateLimiter.AllowN(now, userID, totalN) - - // Return a 429 here to tell the client it is going too fast. - // Client may discard the data or slow down and re-send. - // Prometheus v2.26 added a remote-write option 'retry_on_http_429'. - if nhRateLimited { + if !d.ingestionRateLimiter.AllowN(now, userID, totalN) { // Ensure the request slice is reused if the request is rate limited. cortexpb.ReuseSlice(req.Timeseries) - d.validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(totalSamples)) - d.validateMetrics.DiscardedExemplars.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(validatedExemplars)) - d.validateMetrics.DiscardedMetadata.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(len(validatedMetadata))) - return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.nativeHistogramsIngestionRateLimiter.Limit(now, userID), totalSamples, len(validatedMetadata)) - } - if rateLimited { - // Ensure the request slice is reused if the request is rate limited. - cortexpb.ReuseSlice(req.Timeseries) d.validateMetrics.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(totalSamples)) d.validateMetrics.DiscardedExemplars.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedExemplars)) d.validateMetrics.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata))) - + // Return a 429 here to tell the client it is going too fast. + // Client may discard the data or slow down and re-send. + // Prometheus v2.26 added a remote-write option 'retry_on_http_429'. return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), totalSamples, len(validatedMetadata)) } @@ -959,14 +955,16 @@ type samplesLabelSetEntry struct { labels labels.Labels } -func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.WriteRequest, userID string, limits *validation.Limits, removeReplica bool) ([]uint32, []cortexpb.PreallocTimeseries, int, int, int, error, error) { +func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.WriteRequest, userID string, limits *validation.Limits, removeReplica bool) ([]uint32, []uint32, []cortexpb.PreallocTimeseries, []cortexpb.PreallocTimeseries, int, int, int, error, error) { pSpan, _ := opentracing.StartSpanFromContext(ctx, "prepareSeriesKeys") defer pSpan.Finish() // For each timeseries or samples, we compute a hash to distribute across ingesters; // check each sample/metadata and discard if outside limits. validatedTimeseries := make([]cortexpb.PreallocTimeseries, 0, len(req.Timeseries)) + nhValidatedTimeseries := make([]cortexpb.PreallocTimeseries, 0, len(req.Timeseries)) seriesKeys := make([]uint32, 0, len(req.Timeseries)) + nhSeriesKeys := make([]uint32, 0, len(req.Timeseries)) validatedFloatSamples := 0 validatedHistogramSamples := 0 validatedExemplars := 0 @@ -1074,7 +1072,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write // label and dropped labels (if any) key, err := d.tokenForLabels(userID, ts.Labels) if err != nil { - return nil, nil, 0, 0, 0, nil, err + return nil, nil, nil, nil, 0, 0, 0, nil, err } validatedSeries, validationErr := d.validateSeries(ts, userID, skipLabelNameValidation, limits) @@ -1109,8 +1107,13 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write } } - seriesKeys = append(seriesKeys, key) - validatedTimeseries = append(validatedTimeseries, validatedSeries) + if len(ts.Histograms) > 0 { + nhSeriesKeys = append(nhSeriesKeys, key) + nhValidatedTimeseries = append(nhValidatedTimeseries, validatedSeries) + } else { + seriesKeys = append(seriesKeys, key) + validatedTimeseries = append(validatedTimeseries, validatedSeries) + } validatedFloatSamples += len(ts.Samples) validatedHistogramSamples += len(ts.Histograms) validatedExemplars += len(ts.Exemplars) @@ -1126,7 +1129,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write } } - return seriesKeys, validatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, nil + return seriesKeys, nhSeriesKeys, validatedTimeseries, nhValidatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, nil } func sortLabelsIfNeeded(labels []cortexpb.LabelAdapter) { diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 30d91212a10..965555d7306 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -351,15 +351,6 @@ func TestDistributor_Push(t *testing.T) { cortex_distributor_received_samples_total{type="histogram",user="userDistributorPush"} 25 `, }, - "A push not exceeding burst size but exceeding native histograms burst size should fail, histograms": { - numIngesters: 3, - happyIngesters: 3, - samples: samplesIn{num: 15, startTimestampMs: 123456789000}, - histogramSamples: true, - metadata: 5, - expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (10) exceeded while adding 15 samples and 5 metadata"), - metricNames: []string{lastSeenTimestamp, distributorReceivedSamples}, - }, } { for _, useStreamPush := range []bool{false, true} { for _, shardByAllLabels := range []bool{true, false} { @@ -373,8 +364,6 @@ func TestDistributor_Push(t *testing.T) { flagext.DefaultValues(limits) limits.IngestionRate = 20 limits.IngestionBurstSize = 20 - limits.NativeHistogramsIngestionRate = 10 - limits.NativeHistogramsIngestionBurstSize = 10 ds, _, regs, _ := prepare(t, prepConfig{ numIngesters: tc.numIngesters, @@ -695,9 +684,10 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) { t.Parallel() type testPush struct { - samples int - metadata int - expectedError error + samples int + metadata int + expectedError error + expectedNHDiscardedSampleMetricValue int } ctx := user.InjectOrgID(context.Background(), "user") @@ -713,47 +703,29 @@ func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) { "local strategy: limit should be set to each distributor: histograms": { distributors: 2, ingestionRateStrategy: validation.LocalIngestionRateStrategy, - ingestionRate: 10, - ingestionBurstSize: 10, + ingestionRate: 13, + ingestionBurstSize: 13, nativeHistogramsIngestionRate: 5, nativeHistogramsIngestionBurstSize: 5, pushes: []testPush{ - {samples: 2, expectedError: nil}, - {metadata: 1, expectedError: nil}, - {samples: 4, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (5) exceeded while adding 4 samples and 0 metadata")}, - {samples: 2, metadata: 1, expectedError: nil}, - {samples: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (5) exceeded while adding 3 samples and 0 metadata")}, - {metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10) exceeded while adding 0 samples and 1 metadata")}, - }, - }, - "global strategy: limit should be evenly shared across distributors: histograms": { - distributors: 2, - ingestionRateStrategy: validation.GlobalIngestionRateStrategy, - ingestionRate: 10, - ingestionBurstSize: 5, - nativeHistogramsIngestionRate: 6, - nativeHistogramsIngestionBurstSize: 3, - pushes: []testPush{ - {samples: 2, expectedError: nil}, - {samples: 2, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (3) exceeded while adding 2 samples and 1 metadata")}, - {samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 1 samples and 0 metadata")}, - {samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (3) exceeded while adding 1 samples and 0 metadata")}, + {samples: 2, expectedError: nil, expectedNHDiscardedSampleMetricValue: 0}, + {samples: 4, expectedError: nil, expectedNHDiscardedSampleMetricValue: 4}, + {samples: 2, metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 4}, + {samples: 2, metadata: 8, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (13) exceeded while adding 2 samples and 8 metadata"), expectedNHDiscardedSampleMetricValue: 6}, }, }, "global strategy: burst should set to each distributor: histograms": { distributors: 2, ingestionRateStrategy: validation.GlobalIngestionRateStrategy, - ingestionRate: 10, + ingestionRate: 20, ingestionBurstSize: 20, nativeHistogramsIngestionRate: 6, - nativeHistogramsIngestionBurstSize: 10, + nativeHistogramsIngestionBurstSize: 3, pushes: []testPush{ - {samples: 3, expectedError: nil}, - {samples: 1, expectedError: nil}, - {samples: 7, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (3) exceeded while adding 7 samples and 1 metadata")}, - {samples: 5, expectedError: nil}, - {samples: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "native histograms ingestion rate limit (3) exceeded while adding 3 samples and 0 metadata")}, - {metadata: 12, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 0 samples and 12 metadata")}, + {samples: 2, expectedError: nil, expectedNHDiscardedSampleMetricValue: 0}, + {samples: 4, expectedError: nil, expectedNHDiscardedSampleMetricValue: 4}, + {samples: 1, metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 4}, + {samples: 2, metadata: 15, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10) exceeded while adding 2 samples and 15 metadata"), expectedNHDiscardedSampleMetricValue: 6}, }, }, } @@ -793,6 +765,7 @@ func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) { assert.Nil(t, response) assert.Equal(t, push.expectedError, err) } + assert.Equal(t, float64(push.expectedNHDiscardedSampleMetricValue), testutil.ToFloat64(distributors[0].validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramsRateLimited, "user"))) } }) } diff --git a/pkg/util/validation/validate.go b/pkg/util/validation/validate.go index 7c8b3ee82df..71edea6a3db 100644 --- a/pkg/util/validation/validate.go +++ b/pkg/util/validation/validate.go @@ -62,7 +62,7 @@ const ( // RateLimited is one of the values for the reason to discard samples. // Declared here to avoid duplication in ingester and distributor. RateLimited = "rate_limited" - NativeHistogramsRateLimited = "natve_histograms_rate_limited" + NativeHistogramsRateLimited = "native_histograms_rate_limited" // Too many HA clusters is one of the reasons for discarding samples. TooManyHAClusters = "too_many_ha_clusters" From a1130442204f165288509ba838792c930e8c9e0e Mon Sep 17 00:00:00 2001 From: Paurush Garg Date: Mon, 23 Jun 2025 21:02:52 -0700 Subject: [PATCH 9/9] Adding more test coverage Signed-off-by: Paurush Garg --- docs/configuration/config-file-reference.md | 14 +-- pkg/distributor/distributor.go | 48 ++++---- pkg/distributor/distributor_test.go | 116 +++++++++++++----- pkg/distributor/ingestion_rate_strategy.go | 34 ++--- .../ingestion_rate_strategy_test.go | 52 +++++--- pkg/util/validation/limits.go | 66 +++++----- pkg/util/validation/validate.go | 4 +- 7 files changed, 199 insertions(+), 135 deletions(-) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 720505498d2..5d870fc4e1e 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3427,10 +3427,10 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -distributor.ingestion-rate-limit [ingestion_rate: | default = 25000] -# Per-user native histograms ingestion rate limit in samples per second. -# Disabled by default -# CLI flag: -distributor.native-histograms-ingestion-rate-limit -[native_histograms_ingestion_rate: | default = 1.7976931348623157e+308] +# Per-user native histogram ingestion rate limit in samples per second. Disabled +# by default +# CLI flag: -distributor.native-histogram-ingestion-rate-limit +[native_histogram_ingestion_rate: | default = 1.7976931348623157e+308] # Whether the ingestion rate limit should be applied individually to each # distributor instance (local), or evenly shared across the cluster (global). @@ -3441,9 +3441,9 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -distributor.ingestion-burst-size [ingestion_burst_size: | default = 50000] -# Per-user allowed native histograms ingestion burst size (in number of samples) -# CLI flag: -distributor.native-histograms-ingestion-burst-size -[native_histograms_ingestion_burst_size: | default = 0] +# Per-user allowed native histogram ingestion burst size (in number of samples) +# CLI flag: -distributor.native-histogram-ingestion-burst-size +[native_histogram_ingestion_burst_size: | default = 0] # Flag to enable, for all users, handling of samples with external labels # identifying replicas in an HA Prometheus setup. diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 6b92003b12d..ae33124cc12 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -5,7 +5,6 @@ import ( "flag" "fmt" "io" - "math" "net/http" "sort" "strings" @@ -96,8 +95,8 @@ type Distributor struct { HATracker *ha.HATracker // Per-user rate limiter. - ingestionRateLimiter *limiter.RateLimiter - nativeHistogramsIngestionRateLimiter *limiter.RateLimiter + ingestionRateLimiter *limiter.RateLimiter + nativeHistogramIngestionRateLimiter *limiter.RateLimiter // Manager for subservices (HA Tracker, distributor ring and client pool) subservices *services.Manager @@ -269,13 +268,13 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove // it's an internal dependency and can't join the distributors ring, we skip rate // limiting. var ingestionRateStrategy limiter.RateLimiterStrategy - var nativeHistogramsIngestionRateStrategy limiter.RateLimiterStrategy + var nativeHistogramIngestionRateStrategy limiter.RateLimiterStrategy var distributorsLifeCycler *ring.Lifecycler var distributorsRing *ring.Ring if !canJoinDistributorsRing { ingestionRateStrategy = newInfiniteIngestionRateStrategy() - nativeHistogramsIngestionRateStrategy = newInfiniteIngestionRateStrategy() + nativeHistogramIngestionRateStrategy = newInfiniteIngestionRateStrategy() } else if limits.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy { distributorsLifeCycler, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ringKey, true, true, log, prometheus.WrapRegistererWithPrefix("cortex_", reg)) if err != nil { @@ -289,24 +288,24 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove subservices = append(subservices, distributorsLifeCycler, distributorsRing) ingestionRateStrategy = newGlobalIngestionRateStrategy(limits, distributorsLifeCycler) - nativeHistogramsIngestionRateStrategy = newGlobalNativeHistogramsIngestionRateStrategy(limits, distributorsLifeCycler) + nativeHistogramIngestionRateStrategy = newGlobalNativeHistogramIngestionRateStrategy(limits, distributorsLifeCycler) } else { ingestionRateStrategy = newLocalIngestionRateStrategy(limits) - nativeHistogramsIngestionRateStrategy = newLocalNativeHistogramsIngestionRateStrategy(limits) + nativeHistogramIngestionRateStrategy = newLocalNativeHistogramIngestionRateStrategy(limits) } d := &Distributor{ - cfg: cfg, - log: log, - ingestersRing: ingestersRing, - ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log), - distributorsLifeCycler: distributorsLifeCycler, - distributorsRing: distributorsRing, - limits: limits, - ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second), - nativeHistogramsIngestionRateLimiter: limiter.NewRateLimiter(nativeHistogramsIngestionRateStrategy, 10*time.Second), - HATracker: haTracker, - ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval), + cfg: cfg, + log: log, + ingestersRing: ingestersRing, + ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log), + distributorsLifeCycler: distributorsLifeCycler, + distributorsRing: distributorsRing, + limits: limits, + ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second), + nativeHistogramIngestionRateLimiter: limiter.NewRateLimiter(nativeHistogramIngestionRateStrategy, 10*time.Second), + HATracker: haTracker, + ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval), queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Namespace: "cortex", @@ -772,13 +771,10 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co d.receivedExemplars.WithLabelValues(userID).Add(float64(validatedExemplars)) d.receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata))) - nhRateLimited := false - if limits.NativeHistogramsIngestionRate != math.MaxFloat64 { - nhRateLimited = !d.nativeHistogramsIngestionRateLimiter.AllowN(time.Now(), userID, validatedHistogramSamples) - } - - if nhRateLimited { - d.validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(validatedHistogramSamples)) + if !d.nativeHistogramIngestionRateLimiter.AllowN(now, userID, validatedHistogramSamples) { + level.Warn(d.log).Log("msg", "native histogram ingestion rate limit (%v) exceeded while adding %d native histogram samples", d.nativeHistogramIngestionRateLimiter.Limit(now, userID), validatedHistogramSamples) + d.validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramRateLimited, userID).Add(float64(validatedHistogramSamples)) + validatedHistogramSamples = 0 } else { seriesKeys = append(seriesKeys, nhSeriesKeys...) validatedTimeseries = append(validatedTimeseries, nhValidatedTimeseries...) @@ -960,7 +956,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write defer pSpan.Finish() // For each timeseries or samples, we compute a hash to distribute across ingesters; - // check each sample/metadata and discard if outside limits. + // check each sample/metadata and discard if outside limits. validatedTimeseries := make([]cortexpb.PreallocTimeseries, 0, len(req.Timeseries)) nhValidatedTimeseries := make([]cortexpb.PreallocTimeseries, 0, len(req.Timeseries)) seriesKeys := make([]uint32, 0, len(req.Timeseries)) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 965555d7306..710464b08d0 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -685,6 +685,7 @@ func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) { t.Parallel() type testPush struct { samples int + nhSamples int metadata int expectedError error expectedNHDiscardedSampleMetricValue int @@ -692,40 +693,89 @@ func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "user") tests := map[string]struct { - distributors int - ingestionRateStrategy string - ingestionRate float64 - ingestionBurstSize int - nativeHistogramsIngestionRate float64 - nativeHistogramsIngestionBurstSize int - pushes []testPush + distributors int + ingestionRateStrategy string + ingestionRate float64 + ingestionBurstSize int + nativeHistogramIngestionRateStrategy string + nativeHistogramIngestionRate float64 + nativeHistogramIngestionBurstSize int + pushes []testPush }{ - "local strategy: limit should be set to each distributor: histograms": { - distributors: 2, - ingestionRateStrategy: validation.LocalIngestionRateStrategy, - ingestionRate: 13, - ingestionBurstSize: 13, - nativeHistogramsIngestionRate: 5, - nativeHistogramsIngestionBurstSize: 5, + "local strategy: native histograms limit should be set to each distributor": { + distributors: 2, + ingestionRateStrategy: validation.LocalIngestionRateStrategy, + ingestionRate: 20, + ingestionBurstSize: 20, + nativeHistogramIngestionRate: 10, + nativeHistogramIngestionBurstSize: 10, pushes: []testPush{ - {samples: 2, expectedError: nil, expectedNHDiscardedSampleMetricValue: 0}, - {samples: 4, expectedError: nil, expectedNHDiscardedSampleMetricValue: 4}, - {samples: 2, metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 4}, - {samples: 2, metadata: 8, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (13) exceeded while adding 2 samples and 8 metadata"), expectedNHDiscardedSampleMetricValue: 6}, + {nhSamples: 4, expectedError: nil}, + {nhSamples: 6, expectedError: nil}, + {nhSamples: 6, expectedError: nil, expectedNHDiscardedSampleMetricValue: 6}, + {nhSamples: 4, metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 10}, + {nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 11}, + {nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 12}, }, }, - "global strategy: burst should set to each distributor: histograms": { - distributors: 2, - ingestionRateStrategy: validation.GlobalIngestionRateStrategy, - ingestionRate: 20, - ingestionBurstSize: 20, - nativeHistogramsIngestionRate: 6, - nativeHistogramsIngestionBurstSize: 3, + "global strategy: native histograms limit should be evenly shared across distributors": { + distributors: 2, + ingestionRateStrategy: validation.GlobalIngestionRateStrategy, + ingestionRate: 20, + ingestionBurstSize: 10, + nativeHistogramIngestionRate: 10, + nativeHistogramIngestionBurstSize: 5, pushes: []testPush{ - {samples: 2, expectedError: nil, expectedNHDiscardedSampleMetricValue: 0}, - {samples: 4, expectedError: nil, expectedNHDiscardedSampleMetricValue: 4}, - {samples: 1, metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 4}, - {samples: 2, metadata: 15, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10) exceeded while adding 2 samples and 15 metadata"), expectedNHDiscardedSampleMetricValue: 6}, + {nhSamples: 2, expectedError: nil}, + {nhSamples: 1, expectedError: nil}, + {nhSamples: 3, metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 3}, + {nhSamples: 2, expectedError: nil, expectedNHDiscardedSampleMetricValue: 3}, + {nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 4}, + {nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 5}, + }, + }, + "global strategy: native histograms burst should set to each distributor": { + distributors: 2, + ingestionRateStrategy: validation.GlobalIngestionRateStrategy, + ingestionRate: 20, + ingestionBurstSize: 40, + nativeHistogramIngestionRate: 10, + nativeHistogramIngestionBurstSize: 20, + pushes: []testPush{ + {nhSamples: 10, expectedError: nil}, + {nhSamples: 5, expectedError: nil}, + {nhSamples: 6, metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 6}, + {nhSamples: 5, expectedError: nil, expectedNHDiscardedSampleMetricValue: 6}, + {nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 7}, + {nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 8}, + }, + }, + "local strategy: If NH samples hit NH rate limit, other samples should succeed when under rate limit": { + distributors: 2, + ingestionRateStrategy: validation.LocalIngestionRateStrategy, + ingestionRate: 20, + ingestionBurstSize: 20, + nativeHistogramIngestionRate: 5, + nativeHistogramIngestionBurstSize: 5, + pushes: []testPush{ + {samples: 5, nhSamples: 4, expectedError: nil}, + {samples: 6, nhSamples: 2, expectedError: nil, expectedNHDiscardedSampleMetricValue: 2}, + {samples: 4, metadata: 1, nhSamples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (20) exceeded while adding 5 samples and 1 metadata"), expectedNHDiscardedSampleMetricValue: 2}, + {metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 2}, + }, + }, + "global strategy: If NH samples hit NH rate limit, other samples should succeed when under rate limit": { + distributors: 2, + ingestionRateStrategy: validation.GlobalIngestionRateStrategy, + ingestionRate: 20, + ingestionBurstSize: 10, + nativeHistogramIngestionRate: 10, + nativeHistogramIngestionBurstSize: 5, + pushes: []testPush{ + {samples: 3, nhSamples: 2, expectedError: nil}, + {samples: 3, nhSamples: 4, expectedError: nil, expectedNHDiscardedSampleMetricValue: 4}, + {samples: 1, metadata: 1, nhSamples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10) exceeded while adding 2 samples and 1 metadata"), expectedNHDiscardedSampleMetricValue: 4}, + {metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 4}, }, }, } @@ -740,8 +790,8 @@ func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) { limits.IngestionRateStrategy = testData.ingestionRateStrategy limits.IngestionRate = testData.ingestionRate limits.IngestionBurstSize = testData.ingestionBurstSize - limits.NativeHistogramsIngestionRate = testData.nativeHistogramsIngestionRate - limits.NativeHistogramsIngestionBurstSize = testData.nativeHistogramsIngestionBurstSize + limits.NativeHistogramIngestionRate = testData.nativeHistogramIngestionRate + limits.NativeHistogramIngestionBurstSize = testData.nativeHistogramIngestionBurstSize // Start all expected distributors distributors, _, _, _ := prepare(t, prepConfig{ @@ -754,7 +804,7 @@ func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) { // Push samples in multiple requests to the first distributor for _, push := range testData.pushes { - var request = makeWriteRequest(0, 0, push.metadata, push.samples) + var request = makeWriteRequest(0, push.samples, push.metadata, push.nhSamples) response, err := distributors[0].Push(ctx, request) @@ -765,7 +815,7 @@ func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) { assert.Nil(t, response) assert.Equal(t, push.expectedError, err) } - assert.Equal(t, float64(push.expectedNHDiscardedSampleMetricValue), testutil.ToFloat64(distributors[0].validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramsRateLimited, "user"))) + assert.Equal(t, float64(push.expectedNHDiscardedSampleMetricValue), testutil.ToFloat64(distributors[0].validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramRateLimited, "user"))) } }) } diff --git a/pkg/distributor/ingestion_rate_strategy.go b/pkg/distributor/ingestion_rate_strategy.go index 09629011af6..10957a2395f 100644 --- a/pkg/distributor/ingestion_rate_strategy.go +++ b/pkg/distributor/ingestion_rate_strategy.go @@ -45,7 +45,7 @@ func newGlobalIngestionRateStrategy(limits *validation.Overrides, ring ReadLifec func (s *globalStrategy) Limit(tenantID string) float64 { numDistributors := s.ring.HealthyInstancesCount() - if numDistributors == 0 { + if numDistributors == 0 || s.limits.IngestionRate(tenantID) == float64(rate.Inf) { return s.limits.IngestionRate(tenantID) } @@ -73,48 +73,48 @@ func (s *infiniteStrategy) Burst(tenantID string) int { return 0 } -type localStrategyNativeHistograms struct { +type localStrategyNativeHistogram struct { limits *validation.Overrides } -func newLocalNativeHistogramsIngestionRateStrategy(limits *validation.Overrides) limiter.RateLimiterStrategy { - return &localStrategyNativeHistograms{ +func newLocalNativeHistogramIngestionRateStrategy(limits *validation.Overrides) limiter.RateLimiterStrategy { + return &localStrategyNativeHistogram{ limits: limits, } } -func (s *localStrategyNativeHistograms) Limit(tenantID string) float64 { - return s.limits.NativeHistogramsIngestionRate(tenantID) +func (s *localStrategyNativeHistogram) Limit(tenantID string) float64 { + return s.limits.NativeHistogramIngestionRate(tenantID) } -func (s *localStrategyNativeHistograms) Burst(tenantID string) int { - return s.limits.NativeHistogramsIngestionBurstSize(tenantID) +func (s *localStrategyNativeHistogram) Burst(tenantID string) int { + return s.limits.NativeHistogramIngestionBurstSize(tenantID) } -type globalStrategyNativeHistograms struct { +type globalStrategyNativeHistogram struct { limits *validation.Overrides ring ReadLifecycler } -func newGlobalNativeHistogramsIngestionRateStrategy(limits *validation.Overrides, ring ReadLifecycler) limiter.RateLimiterStrategy { - return &globalStrategyNativeHistograms{ +func newGlobalNativeHistogramIngestionRateStrategy(limits *validation.Overrides, ring ReadLifecycler) limiter.RateLimiterStrategy { + return &globalStrategyNativeHistogram{ limits: limits, ring: ring, } } -func (s *globalStrategyNativeHistograms) Limit(tenantID string) float64 { +func (s *globalStrategyNativeHistogram) Limit(tenantID string) float64 { numDistributors := s.ring.HealthyInstancesCount() - if numDistributors == 0 { - return s.limits.NativeHistogramsIngestionRate(tenantID) + if numDistributors == 0 || s.limits.NativeHistogramIngestionRate(tenantID) == float64(rate.Inf) { + return s.limits.NativeHistogramIngestionRate(tenantID) } - return s.limits.NativeHistogramsIngestionRate(tenantID) / float64(numDistributors) + return s.limits.NativeHistogramIngestionRate(tenantID) / float64(numDistributors) } -func (s *globalStrategyNativeHistograms) Burst(tenantID string) int { +func (s *globalStrategyNativeHistogram) Burst(tenantID string) int { // The meaning of burst doesn't change for the global strategy, in order // to keep it easier to understand for users / operators. - return s.limits.NativeHistogramsIngestionBurstSize(tenantID) + return s.limits.NativeHistogramIngestionBurstSize(tenantID) } diff --git a/pkg/distributor/ingestion_rate_strategy_test.go b/pkg/distributor/ingestion_rate_strategy_test.go index baf47d7d796..6fdfc901d32 100644 --- a/pkg/distributor/ingestion_rate_strategy_test.go +++ b/pkg/distributor/ingestion_rate_strategy_test.go @@ -24,11 +24,11 @@ func TestIngestionRateStrategy(t *testing.T) { }{ "local rate limiter should just return configured limits": { limits: validation.Limits{ - IngestionRateStrategy: validation.LocalIngestionRateStrategy, - IngestionRate: float64(1000), - IngestionBurstSize: 10000, - NativeHistogramsIngestionRate: float64(100), - NativeHistogramsIngestionBurstSize: 100, + IngestionRateStrategy: validation.LocalIngestionRateStrategy, + IngestionRate: float64(1000), + IngestionBurstSize: 10000, + NativeHistogramIngestionRate: float64(100), + NativeHistogramIngestionBurstSize: 100, }, ring: nil, expectedLimit: float64(1000), @@ -38,11 +38,11 @@ func TestIngestionRateStrategy(t *testing.T) { }, "global rate limiter should share the limit across the number of distributors": { limits: validation.Limits{ - IngestionRateStrategy: validation.GlobalIngestionRateStrategy, - IngestionRate: float64(1000), - IngestionBurstSize: 10000, - NativeHistogramsIngestionRate: float64(100), - NativeHistogramsIngestionBurstSize: 100, + IngestionRateStrategy: validation.GlobalIngestionRateStrategy, + IngestionRate: float64(1000), + IngestionBurstSize: 10000, + NativeHistogramIngestionRate: float64(100), + NativeHistogramIngestionBurstSize: 100, }, ring: func() ReadLifecycler { ring := newReadLifecyclerMock() @@ -54,13 +54,31 @@ func TestIngestionRateStrategy(t *testing.T) { expectedNativeHistogramsLimit: float64(50), expectedNativeHistogramsBurst: 100, }, + "global rate limiter should handle the special case of inf ingestion rate": { + limits: validation.Limits{ + IngestionRateStrategy: validation.GlobalIngestionRateStrategy, + IngestionRate: float64(rate.Inf), + IngestionBurstSize: 0, + NativeHistogramIngestionRate: float64(rate.Inf), + NativeHistogramIngestionBurstSize: 0, + }, + ring: func() ReadLifecycler { + ring := newReadLifecyclerMock() + ring.On("HealthyInstancesCount").Return(2) + return ring + }(), + expectedLimit: float64(rate.Inf), + expectedBurst: 0, + expectedNativeHistogramsLimit: float64(rate.Inf), + expectedNativeHistogramsBurst: 0, + }, "infinite rate limiter should return unlimited settings": { limits: validation.Limits{ - IngestionRateStrategy: "infinite", - IngestionRate: float64(1000), - IngestionBurstSize: 10000, - NativeHistogramsIngestionRate: float64(100), - NativeHistogramsIngestionBurstSize: 100, + IngestionRateStrategy: "infinite", + IngestionRate: float64(1000), + IngestionBurstSize: 10000, + NativeHistogramIngestionRate: float64(100), + NativeHistogramIngestionBurstSize: 100, }, ring: nil, expectedLimit: float64(rate.Inf), @@ -86,10 +104,10 @@ func TestIngestionRateStrategy(t *testing.T) { switch testData.limits.IngestionRateStrategy { case validation.LocalIngestionRateStrategy: strategy = newLocalIngestionRateStrategy(overrides) - nativeHistogramsStrategy = newLocalNativeHistogramsIngestionRateStrategy(overrides) + nativeHistogramsStrategy = newLocalNativeHistogramIngestionRateStrategy(overrides) case validation.GlobalIngestionRateStrategy: strategy = newGlobalIngestionRateStrategy(overrides, testData.ring) - nativeHistogramsStrategy = newGlobalNativeHistogramsIngestionRateStrategy(overrides, testData.ring) + nativeHistogramsStrategy = newGlobalNativeHistogramIngestionRateStrategy(overrides, testData.ring) case "infinite": strategy = newInfiniteIngestionRateStrategy() nativeHistogramsStrategy = newInfiniteIngestionRateStrategy() diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 87a6a2b7b98..a960d5207bb 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -123,31 +123,31 @@ type LimitsPerLabelSet struct { // limits via flags, or per-user limits via yaml config. type Limits struct { // Distributor enforced limits. - IngestionRate float64 `yaml:"ingestion_rate" json:"ingestion_rate"` - NativeHistogramsIngestionRate float64 `yaml:"native_histograms_ingestion_rate" json:"native_histograms_ingestion_rate"` - IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"` - IngestionBurstSize int `yaml:"ingestion_burst_size" json:"ingestion_burst_size"` - NativeHistogramsIngestionBurstSize int `yaml:"native_histograms_ingestion_burst_size" json:"native_histograms_ingestion_burst_size"` - AcceptHASamples bool `yaml:"accept_ha_samples" json:"accept_ha_samples"` - AcceptMixedHASamples bool `yaml:"accept_mixed_ha_samples" json:"accept_mixed_ha_samples"` - HAClusterLabel string `yaml:"ha_cluster_label" json:"ha_cluster_label"` - HAReplicaLabel string `yaml:"ha_replica_label" json:"ha_replica_label"` - HAMaxClusters int `yaml:"ha_max_clusters" json:"ha_max_clusters"` - DropLabels flagext.StringSlice `yaml:"drop_labels" json:"drop_labels"` - MaxLabelNameLength int `yaml:"max_label_name_length" json:"max_label_name_length"` - MaxLabelValueLength int `yaml:"max_label_value_length" json:"max_label_value_length"` - MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series" json:"max_label_names_per_series"` - MaxLabelsSizeBytes int `yaml:"max_labels_size_bytes" json:"max_labels_size_bytes"` - MaxMetadataLength int `yaml:"max_metadata_length" json:"max_metadata_length"` - RejectOldSamples bool `yaml:"reject_old_samples" json:"reject_old_samples"` - RejectOldSamplesMaxAge model.Duration `yaml:"reject_old_samples_max_age" json:"reject_old_samples_max_age"` - CreationGracePeriod model.Duration `yaml:"creation_grace_period" json:"creation_grace_period"` - EnforceMetadataMetricName bool `yaml:"enforce_metadata_metric_name" json:"enforce_metadata_metric_name"` - EnforceMetricName bool `yaml:"enforce_metric_name" json:"enforce_metric_name"` - IngestionTenantShardSize int `yaml:"ingestion_tenant_shard_size" json:"ingestion_tenant_shard_size"` - MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty" json:"metric_relabel_configs,omitempty" doc:"nocli|description=List of metric relabel configurations. Note that in most situations, it is more effective to use metrics relabeling directly in the Prometheus server, e.g. remote_write.write_relabel_configs."` - MaxNativeHistogramBuckets int `yaml:"max_native_histogram_buckets" json:"max_native_histogram_buckets"` - PromoteResourceAttributes []string `yaml:"promote_resource_attributes" json:"promote_resource_attributes"` + IngestionRate float64 `yaml:"ingestion_rate" json:"ingestion_rate"` + NativeHistogramIngestionRate float64 `yaml:"native_histogram_ingestion_rate" json:"native_histogram_ingestion_rate"` + IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"` + IngestionBurstSize int `yaml:"ingestion_burst_size" json:"ingestion_burst_size"` + NativeHistogramIngestionBurstSize int `yaml:"native_histogram_ingestion_burst_size" json:"native_histogram_ingestion_burst_size"` + AcceptHASamples bool `yaml:"accept_ha_samples" json:"accept_ha_samples"` + AcceptMixedHASamples bool `yaml:"accept_mixed_ha_samples" json:"accept_mixed_ha_samples"` + HAClusterLabel string `yaml:"ha_cluster_label" json:"ha_cluster_label"` + HAReplicaLabel string `yaml:"ha_replica_label" json:"ha_replica_label"` + HAMaxClusters int `yaml:"ha_max_clusters" json:"ha_max_clusters"` + DropLabels flagext.StringSlice `yaml:"drop_labels" json:"drop_labels"` + MaxLabelNameLength int `yaml:"max_label_name_length" json:"max_label_name_length"` + MaxLabelValueLength int `yaml:"max_label_value_length" json:"max_label_value_length"` + MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series" json:"max_label_names_per_series"` + MaxLabelsSizeBytes int `yaml:"max_labels_size_bytes" json:"max_labels_size_bytes"` + MaxMetadataLength int `yaml:"max_metadata_length" json:"max_metadata_length"` + RejectOldSamples bool `yaml:"reject_old_samples" json:"reject_old_samples"` + RejectOldSamplesMaxAge model.Duration `yaml:"reject_old_samples_max_age" json:"reject_old_samples_max_age"` + CreationGracePeriod model.Duration `yaml:"creation_grace_period" json:"creation_grace_period"` + EnforceMetadataMetricName bool `yaml:"enforce_metadata_metric_name" json:"enforce_metadata_metric_name"` + EnforceMetricName bool `yaml:"enforce_metric_name" json:"enforce_metric_name"` + IngestionTenantShardSize int `yaml:"ingestion_tenant_shard_size" json:"ingestion_tenant_shard_size"` + MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty" json:"metric_relabel_configs,omitempty" doc:"nocli|description=List of metric relabel configurations. Note that in most situations, it is more effective to use metrics relabeling directly in the Prometheus server, e.g. remote_write.write_relabel_configs."` + MaxNativeHistogramBuckets int `yaml:"max_native_histogram_buckets" json:"max_native_histogram_buckets"` + PromoteResourceAttributes []string `yaml:"promote_resource_attributes" json:"promote_resource_attributes"` // Ingester enforced limits. // Series @@ -242,10 +242,10 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.IngestionTenantShardSize, "distributor.ingestion-tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used. Must be set both on ingesters and distributors. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.") f.Float64Var(&l.IngestionRate, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.") - f.Float64Var(&l.NativeHistogramsIngestionRate, "distributor.native-histograms-ingestion-rate-limit", math.MaxFloat64, "Per-user native histograms ingestion rate limit in samples per second. Disabled by default") + f.Float64Var(&l.NativeHistogramIngestionRate, "distributor.native-histogram-ingestion-rate-limit", float64(rate.Inf), "Per-user native histogram ingestion rate limit in samples per second. Disabled by default") f.StringVar(&l.IngestionRateStrategy, "distributor.ingestion-rate-limit-strategy", "local", "Whether the ingestion rate limit should be applied individually to each distributor instance (local), or evenly shared across the cluster (global).") f.IntVar(&l.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples).") - f.IntVar(&l.NativeHistogramsIngestionBurstSize, "distributor.native-histograms-ingestion-burst-size", 0, "Per-user allowed native histograms ingestion burst size (in number of samples)") + f.IntVar(&l.NativeHistogramIngestionBurstSize, "distributor.native-histogram-ingestion-burst-size", 0, "Per-user allowed native histogram ingestion burst size (in number of samples)") f.BoolVar(&l.AcceptHASamples, "distributor.ha-tracker.enable-for-all-users", false, "Flag to enable, for all users, handling of samples with external labels identifying replicas in an HA Prometheus setup.") f.BoolVar(&l.AcceptMixedHASamples, "experimental.distributor.ha-tracker.mixed-ha-samples", false, "[Experimental] Flag to enable handling of samples with mixed external labels identifying replicas in an HA Prometheus setup. Supported only if -distributor.ha-tracker.enable-for-all-users is true.") f.StringVar(&l.HAClusterLabel, "distributor.ha-tracker.cluster", "cluster", "Prometheus label to look for in samples to identify a Prometheus HA cluster.") @@ -581,9 +581,9 @@ func (o *Overrides) IngestionRate(userID string) float64 { return o.GetOverridesForUser(userID).IngestionRate } -// NativeHistogramsIngestionRate returns the limit on ingester rate (samples per second). -func (o *Overrides) NativeHistogramsIngestionRate(userID string) float64 { - return o.GetOverridesForUser(userID).NativeHistogramsIngestionRate +// NativeHistogramIngestionRate returns the limit on ingester rate (samples per second). +func (o *Overrides) NativeHistogramIngestionRate(userID string) float64 { + return o.GetOverridesForUser(userID).NativeHistogramIngestionRate } // IngestionRateStrategy returns whether the ingestion rate limit should be individually applied @@ -598,9 +598,9 @@ func (o *Overrides) IngestionBurstSize(userID string) int { return o.GetOverridesForUser(userID).IngestionBurstSize } -// NativeHistogramsIngestionBurstSize returns the burst size for ingestion rate. -func (o *Overrides) NativeHistogramsIngestionBurstSize(userID string) int { - return o.GetOverridesForUser(userID).NativeHistogramsIngestionBurstSize +// NativeHistogramIngestionBurstSize returns the burst size for ingestion rate. +func (o *Overrides) NativeHistogramIngestionBurstSize(userID string) int { + return o.GetOverridesForUser(userID).NativeHistogramIngestionBurstSize } // AcceptHASamples returns whether the distributor should track and accept samples from HA replicas for this user. diff --git a/pkg/util/validation/validate.go b/pkg/util/validation/validate.go index 71edea6a3db..1d339ca62be 100644 --- a/pkg/util/validation/validate.go +++ b/pkg/util/validation/validate.go @@ -61,8 +61,8 @@ const ( // RateLimited is one of the values for the reason to discard samples. // Declared here to avoid duplication in ingester and distributor. - RateLimited = "rate_limited" - NativeHistogramsRateLimited = "native_histograms_rate_limited" + RateLimited = "rate_limited" + NativeHistogramRateLimited = "native_histogram_rate_limited" // Too many HA clusters is one of the reasons for discarding samples. TooManyHAClusters = "too_many_ha_clusters"