Skip to content

Commit e33587b

Browse files
committed
Add sample size limit validation for native histograms samples
Signed-off-by: Paurush Garg <[email protected]>
1 parent 493493d commit e33587b

File tree

6 files changed

+56
-26
lines changed

6 files changed

+56
-26
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,10 @@
4747
* [ENHANCEMENT] Compactor: Optimize cleaner run time. #6815
4848
* [ENHANCEMENT] Parquet Storage: Allow percentage based dynamic shard size for Parquet Converter. #6817
4949
* [ENHANCEMENT] Query Frontend: Enhance the performance of the JSON codec. #6816
50+
* [ENHANCEMENT] Compactor: Emit partition metrics separate from cleaner job. #6827
5051
* [ENHANCEMENT] Metadata Cache: Support inmemory and multi level cache backend. #6829
5152
* [ENHANCEMENT] Store Gateway: Allow to ignore syncing blocks older than certain time using `ignore_blocks_before`. #6830
52-
* [ENHANCEMENT] Compactor: Emit partition metrics separate from cleaner job. #6827
53+
* [ENHANCEMENT] Distributor: Add native histograms max sample size bytes limit validation. #6834
5354
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
5455
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
5556
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576

docs/configuration/config-file-reference.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3542,6 +3542,10 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
35423542
# CLI flag: -validation.max-labels-size-bytes
35433543
[max_labels_size_bytes: <int> | default = 0]
35443544
3545+
# Maximum size in bytes of a native histogram sample. 0 to disable the limit.
3546+
# CLI flag: -validation.max-native-histogram-sample-size-bytes
3547+
[max_native_histogram_sample_size_bytes: <int> | default = 0]
3548+
35453549
# Maximum length accepted for metric metadata. Metadata refers to Metric Name,
35463550
# HELP and UNIT.
35473551
# CLI flag: -validation.max-metadata-length

pkg/util/validation/errors.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,26 @@ func (e *nativeHistogramSchemaInvalidError) Error() string {
262262
return fmt.Sprintf("invalid native histogram schema %d for metric: %.200q. supported schema from %d to %d", e.receivedSchema, formatLabelSet(e.series), histogram.ExponentialSchemaMin, histogram.ExponentialSchemaMax)
263263
}
264264

265+
// nativeHistogramSampleSizeBytesExceededError is a ValidationError implementation for samples with native histogram
266+
// exceeding the sample size bytes limit
267+
type nativeHistogramSampleSizeBytesExceededError struct {
268+
nhSampleSizeBytes int
269+
series []cortexpb.LabelAdapter
270+
limit int
271+
}
272+
273+
func newNativeHistogramSampleSizeBytesExceededError(series []cortexpb.LabelAdapter, nhSampleSizeBytes int, limit int) ValidationError {
274+
return &nativeHistogramSampleSizeBytesExceededError{
275+
nhSampleSizeBytes: nhSampleSizeBytes,
276+
series: series,
277+
limit: limit,
278+
}
279+
}
280+
281+
func (e *nativeHistogramSampleSizeBytesExceededError) Error() string {
282+
return fmt.Sprintf("native histogram sample size bytes exceeded for metric (actual: %d, limit: %d) metric: %.200q", e.nhSampleSizeBytes, e.limit, formatLabelSet(e.series))
283+
}
284+
265285
// formatLabelSet formats label adapters as a metric name with labels, while preserving
266286
// label order, and keeping duplicates. If there are multiple "__name__" labels, only
267287
// first one is used as metric name, other ones will be included as regular labels.

pkg/util/validation/limits.go

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,8 @@ type LimitsPerLabelSet struct {
124124
type Limits struct {
125125
// Distributor enforced limits.
126126
IngestionRate float64 `yaml:"ingestion_rate" json:"ingestion_rate"`
127-
NativeHistogramIngestionRate float64 `yaml:"native_histogram_ingestion_rate" json:"native_histogram_ingestion_rate"`
128127
IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"`
129128
IngestionBurstSize int `yaml:"ingestion_burst_size" json:"ingestion_burst_size"`
130-
NativeHistogramIngestionBurstSize int `yaml:"native_histogram_ingestion_burst_size" json:"native_histogram_ingestion_burst_size"`
131129
AcceptHASamples bool `yaml:"accept_ha_samples" json:"accept_ha_samples"`
132130
AcceptMixedHASamples bool `yaml:"accept_mixed_ha_samples" json:"accept_mixed_ha_samples"`
133131
HAClusterLabel string `yaml:"ha_cluster_label" json:"ha_cluster_label"`
@@ -138,6 +136,7 @@ type Limits struct {
138136
MaxLabelValueLength int `yaml:"max_label_value_length" json:"max_label_value_length"`
139137
MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series" json:"max_label_names_per_series"`
140138
MaxLabelsSizeBytes int `yaml:"max_labels_size_bytes" json:"max_labels_size_bytes"`
139+
MaxNativeHistogramSampleSizeBytes int `yaml:"max_native_histogram_sample_size_bytes" json:"max_native_histogram_sample_size_bytes"`
141140
MaxMetadataLength int `yaml:"max_metadata_length" json:"max_metadata_length"`
142141
RejectOldSamples bool `yaml:"reject_old_samples" json:"reject_old_samples"`
143142
RejectOldSamplesMaxAge model.Duration `yaml:"reject_old_samples_max_age" json:"reject_old_samples_max_age"`
@@ -209,8 +208,8 @@ type Limits struct {
209208
CompactorPartitionSeriesCount int64 `yaml:"compactor_partition_series_count" json:"compactor_partition_series_count"`
210209

211210
// Parquet converter
212-
ParquetConverterEnabled bool `yaml:"parquet_converter_enabled" json:"parquet_converter_enabled" doc:"hidden"`
213-
ParquetConverterTenantShardSize float64 `yaml:"parquet_converter_tenant_shard_size" json:"parquet_converter_tenant_shard_size" doc:"hidden"`
211+
ParquetConverterEnabled bool `yaml:"parquet_converter_enabled" json:"parquet_converter_enabled" doc:"hidden"`
212+
ParquetConverterTenantShardSize int `yaml:"parquet_converter_tenant_shard_size" json:"parquet_converter_tenant_shard_size" doc:"hidden"`
214213

215214
// This config doesn't have a CLI flag registered here because they're registered in
216215
// their own original config struct.
@@ -242,10 +241,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
242241

243242
f.IntVar(&l.IngestionTenantShardSize, "distributor.ingestion-tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used. Must be set both on ingesters and distributors. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.")
244243
f.Float64Var(&l.IngestionRate, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.")
245-
f.Float64Var(&l.NativeHistogramIngestionRate, "distributor.native-histogram-ingestion-rate-limit", float64(rate.Inf), "Per-user native histogram ingestion rate limit in samples per second. Disabled by default")
246244
f.StringVar(&l.IngestionRateStrategy, "distributor.ingestion-rate-limit-strategy", "local", "Whether the ingestion rate limit should be applied individually to each distributor instance (local), or evenly shared across the cluster (global).")
247245
f.IntVar(&l.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples).")
248-
f.IntVar(&l.NativeHistogramIngestionBurstSize, "distributor.native-histogram-ingestion-burst-size", 0, "Per-user allowed native histogram ingestion burst size (in number of samples)")
249246
f.BoolVar(&l.AcceptHASamples, "distributor.ha-tracker.enable-for-all-users", false, "Flag to enable, for all users, handling of samples with external labels identifying replicas in an HA Prometheus setup.")
250247
f.BoolVar(&l.AcceptMixedHASamples, "experimental.distributor.ha-tracker.mixed-ha-samples", false, "[Experimental] Flag to enable handling of samples with mixed external labels identifying replicas in an HA Prometheus setup. Supported only if -distributor.ha-tracker.enable-for-all-users is true.")
251248
f.StringVar(&l.HAClusterLabel, "distributor.ha-tracker.cluster", "cluster", "Prometheus label to look for in samples to identify a Prometheus HA cluster.")
@@ -257,6 +254,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
257254
f.IntVar(&l.MaxLabelValueLength, "validation.max-length-label-value", 2048, "Maximum length accepted for label value. This setting also applies to the metric name")
258255
f.IntVar(&l.MaxLabelNamesPerSeries, "validation.max-label-names-per-series", 30, "Maximum number of label names per series.")
259256
f.IntVar(&l.MaxLabelsSizeBytes, "validation.max-labels-size-bytes", 0, "Maximum combined size in bytes of all labels and label values accepted for a series. 0 to disable the limit.")
257+
f.IntVar(&l.MaxNativeHistogramSampleSizeBytes, "validation.max-native-histogram-sample-size-bytes", 0, "Maximum size in bytes of a native histogram sample. 0 to disable the limit.")
260258
f.IntVar(&l.MaxMetadataLength, "validation.max-metadata-length", 1024, "Maximum length accepted for metric metadata. Metadata refers to Metric Name, HELP and UNIT.")
261259
f.BoolVar(&l.RejectOldSamples, "validation.reject-old-samples", false, "Reject old samples.")
262260
_ = l.RejectOldSamplesMaxAge.Set("14d")
@@ -309,7 +307,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
309307
f.Int64Var(&l.CompactorPartitionIndexSizeBytes, "compactor.partition-index-size-bytes", 68719476736, "Index size limit in bytes for each compaction partition. 0 means no limit")
310308
f.Int64Var(&l.CompactorPartitionSeriesCount, "compactor.partition-series-count", 0, "Time series count limit for each compaction partition. 0 means no limit")
311309

312-
f.Float64Var(&l.ParquetConverterTenantShardSize, "parquet-converter.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the parquet converter. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant. If the value is < 1 and > 0 the shard size will be a percentage of the total parquet converters.")
310+
f.IntVar(&l.ParquetConverterTenantShardSize, "parquet-converter.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the parquet converter. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.")
313311
f.BoolVar(&l.ParquetConverterEnabled, "parquet-converter.enabled", false, "If set, enables the Parquet converter to create the parquet files.")
314312

315313
// Store-gateway.
@@ -581,11 +579,6 @@ func (o *Overrides) IngestionRate(userID string) float64 {
581579
return o.GetOverridesForUser(userID).IngestionRate
582580
}
583581

584-
// NativeHistogramIngestionRate returns the limit on ingester rate (samples per second).
585-
func (o *Overrides) NativeHistogramIngestionRate(userID string) float64 {
586-
return o.GetOverridesForUser(userID).NativeHistogramIngestionRate
587-
}
588-
589582
// IngestionRateStrategy returns whether the ingestion rate limit should be individually applied
590583
// to each distributor instance (local) or evenly shared across the cluster (global).
591584
func (o *Overrides) IngestionRateStrategy() string {
@@ -598,11 +591,6 @@ func (o *Overrides) IngestionBurstSize(userID string) int {
598591
return o.GetOverridesForUser(userID).IngestionBurstSize
599592
}
600593

601-
// NativeHistogramIngestionBurstSize returns the burst size for ingestion rate.
602-
func (o *Overrides) NativeHistogramIngestionBurstSize(userID string) int {
603-
return o.GetOverridesForUser(userID).NativeHistogramIngestionBurstSize
604-
}
605-
606594
// AcceptHASamples returns whether the distributor should track and accept samples from HA replicas for this user.
607595
func (o *Overrides) AcceptHASamples(userID string) bool {
608596
return o.GetOverridesForUser(userID).AcceptHASamples
@@ -856,7 +844,7 @@ func (o *Overrides) CompactorTenantShardSize(userID string) float64 {
856844
}
857845

858846
// ParquetConverterTenantShardSize returns shard size (number of converters) used by this tenant when using shuffle-sharding strategy.
859-
func (o *Overrides) ParquetConverterTenantShardSize(userID string) float64 {
847+
func (o *Overrides) ParquetConverterTenantShardSize(userID string) int {
860848
return o.GetOverridesForUser(userID).ParquetConverterTenantShardSize
861849
}
862850

pkg/util/validation/validate.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ const (
5858
// Native Histogram specific validation reasons
5959
nativeHistogramBucketCountLimitExceeded = "native_histogram_buckets_exceeded"
6060
nativeHistogramInvalidSchema = "native_histogram_invalid_schema"
61+
nativeHistogramSampleSizeBytesExceeded = "native_histogram_sample_size_bytes_exceeded"
6162

6263
// RateLimited is one of the values for the reason to discard samples.
6364
// Declared here to avoid duplication in ingester and distributor.
@@ -340,6 +341,12 @@ func ValidateMetadata(validateMetrics *ValidateMetrics, cfg *Limits, userID stri
340341

341342
func ValidateNativeHistogram(validateMetrics *ValidateMetrics, limits *Limits, userID string, ls []cortexpb.LabelAdapter, histogramSample cortexpb.Histogram) (cortexpb.Histogram, error) {
342343

344+
// sample size validation for native histogram
345+
if limits.MaxNativeHistogramSampleSizeBytes > 0 && histogramSample.Size() > limits.MaxNativeHistogramSampleSizeBytes {
346+
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramSampleSizeBytesExceeded, userID).Inc()
347+
return cortexpb.Histogram{}, newNativeHistogramSampleSizeBytesExceededError(ls, histogramSample.Size(), limits.MaxNativeHistogramSampleSizeBytes)
348+
}
349+
343350
// schema validation for native histogram
344351
if histogramSample.Schema < histogram.ExponentialSchemaMin || histogramSample.Schema > histogram.ExponentialSchemaMax {
345352
validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramInvalidSchema, userID).Inc()

pkg/util/validation/validate_test.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -349,15 +349,17 @@ func TestValidateNativeHistogram(t *testing.T) {
349349
belowMinRangeSchemaHistogram.Schema = -5
350350
exceedMaxRangeSchemaFloatHistogram := tsdbutil.GenerateTestFloatHistogram(0)
351351
exceedMaxRangeSchemaFloatHistogram.Schema = 20
352+
exceedMaxSampleSizeBytesLimitFloatHistogram := tsdbutil.GenerateTestFloatHistogram(100)
352353

353354
for _, tc := range []struct {
354-
name string
355-
bucketLimit int
356-
resolutionReduced bool
357-
histogram cortexpb.Histogram
358-
expectedHistogram cortexpb.Histogram
359-
expectedErr error
360-
discardedSampleLabelValue string
355+
name string
356+
bucketLimit int
357+
resolutionReduced bool
358+
histogram cortexpb.Histogram
359+
expectedHistogram cortexpb.Histogram
360+
expectedErr error
361+
discardedSampleLabelValue string
362+
maxNativeHistogramSampleSizeBytesLimit int
361363
}{
362364
{
363365
name: "no limit, histogram",
@@ -455,12 +457,20 @@ func TestValidateNativeHistogram(t *testing.T) {
455457
expectedErr: newNativeHistogramSchemaInvalidError(lbls, int(exceedMaxRangeSchemaFloatHistogram.Schema)),
456458
discardedSampleLabelValue: nativeHistogramInvalidSchema,
457459
},
460+
{
461+
name: "exceed max sample size bytes limit",
462+
histogram: cortexpb.FloatHistogramToHistogramProto(0, exceedMaxSampleSizeBytesLimitFloatHistogram.Copy()),
463+
expectedErr: newNativeHistogramSampleSizeBytesExceededError(lbls, 126, 100),
464+
discardedSampleLabelValue: nativeHistogramSampleSizeBytesExceeded,
465+
maxNativeHistogramSampleSizeBytesLimit: 100,
466+
},
458467
} {
459468
t.Run(tc.name, func(t *testing.T) {
460469
reg := prometheus.NewRegistry()
461470
validateMetrics := NewValidateMetrics(reg)
462471
limits := new(Limits)
463472
limits.MaxNativeHistogramBuckets = tc.bucketLimit
473+
limits.MaxNativeHistogramSampleSizeBytes = tc.maxNativeHistogramSampleSizeBytesLimit
464474
actualHistogram, actualErr := ValidateNativeHistogram(validateMetrics, limits, userID, lbls, tc.histogram)
465475
if tc.expectedErr != nil {
466476
require.Equal(t, tc.expectedErr, actualErr)

0 commit comments

Comments
 (0)