Skip to content

Commit 493493d

Browse files
authored
Add nativeHistogram IngestionRate limit (#6794)
* Add native histograms ingestion rate limit Signed-off-by: Paurush Garg <[email protected]> * Adding Tests and Updating Docs Signed-off-by: Paurush Garg <[email protected]> * Resolving failed testcase Signed-off-by: Paurush Garg <[email protected]> * Resolving comments Signed-off-by: Paurush Garg <[email protected]> * Resolving comments Signed-off-by: Paurush Garg <[email protected]> * Updating doc Signed-off-by: Paurush Garg <[email protected]> * Changing NativeHistograms default ingestion limits Signed-off-by: Paurush Garg <[email protected]> * Discard only NH Samples for NH Rate Limiter Signed-off-by: Paurush Garg <[email protected]> * Adding more test coverage Signed-off-by: Paurush Garg <[email protected]> --------- Signed-off-by: Paurush Garg <[email protected]>
1 parent f9fc11c commit 493493d

File tree

8 files changed

+336
-64
lines changed

8 files changed

+336
-64
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
* [ENHANCEMENT] Distributor: Add min/max schema validation for NativeHistograms. #6766
4141
* [ENHANCEMENT] Ingester: Handle runtime errors in query path #6769
4242
* [ENHANCEMENT] Compactor: Support metadata caching bucket for Cleaner. Can be enabled via `-compactor.cleaner-caching-bucket-enabled` flag. #6778
43+
* [ENHANCEMENT] Distributor: Add ingestion rate limit for Native Histograms. #6794
4344
* [ENHANCEMENT] Compactor, Store Gateway: Introduce user scanner strategy and user index. #6780
4445
* [ENHANCEMENT] Querier: Support chunks cache for parquet queryable. #6805
4546
* [ENHANCEMENT] Parquet Storage: Add some metrics for parquet blocks and converter. #6809 #6821

docs/configuration/config-file-reference.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3476,6 +3476,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
34763476
# CLI flag: -distributor.ingestion-rate-limit
34773477
[ingestion_rate: <float> | default = 25000]
34783478
3479+
# Per-user native histogram ingestion rate limit in samples per second. Disabled
3480+
# by default
3481+
# CLI flag: -distributor.native-histogram-ingestion-rate-limit
3482+
[native_histogram_ingestion_rate: <float> | default = 1.7976931348623157e+308]
3483+
34793484
# Whether the ingestion rate limit should be applied individually to each
34803485
# distributor instance (local), or evenly shared across the cluster (global).
34813486
# CLI flag: -distributor.ingestion-rate-limit-strategy
@@ -3485,6 +3490,10 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
34853490
# CLI flag: -distributor.ingestion-burst-size
34863491
[ingestion_burst_size: <int> | default = 50000]
34873492
3493+
# Per-user allowed native histogram ingestion burst size (in number of samples)
3494+
# CLI flag: -distributor.native-histogram-ingestion-burst-size
3495+
[native_histogram_ingestion_burst_size: <int> | default = 0]
3496+
34883497
# Flag to enable, for all users, handling of samples with external labels
34893498
# identifying replicas in an HA Prometheus setup.
34903499
# CLI flag: -distributor.ha-tracker.enable-for-all-users

pkg/distributor/distributor.go

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ type Distributor struct {
9595
HATracker *ha.HATracker
9696

9797
// Per-user rate limiter.
98-
ingestionRateLimiter *limiter.RateLimiter
98+
ingestionRateLimiter *limiter.RateLimiter
99+
nativeHistogramIngestionRateLimiter *limiter.RateLimiter
99100

100101
// Manager for subservices (HA Tracker, distributor ring and client pool)
101102
subservices *services.Manager
@@ -267,11 +268,13 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
267268
// it's an internal dependency and can't join the distributors ring, we skip rate
268269
// limiting.
269270
var ingestionRateStrategy limiter.RateLimiterStrategy
271+
var nativeHistogramIngestionRateStrategy limiter.RateLimiterStrategy
270272
var distributorsLifeCycler *ring.Lifecycler
271273
var distributorsRing *ring.Ring
272274

273275
if !canJoinDistributorsRing {
274276
ingestionRateStrategy = newInfiniteIngestionRateStrategy()
277+
nativeHistogramIngestionRateStrategy = newInfiniteIngestionRateStrategy()
275278
} else if limits.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy {
276279
distributorsLifeCycler, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ringKey, true, true, log, prometheus.WrapRegistererWithPrefix("cortex_", reg))
277280
if err != nil {
@@ -285,21 +288,24 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
285288
subservices = append(subservices, distributorsLifeCycler, distributorsRing)
286289

287290
ingestionRateStrategy = newGlobalIngestionRateStrategy(limits, distributorsLifeCycler)
291+
nativeHistogramIngestionRateStrategy = newGlobalNativeHistogramIngestionRateStrategy(limits, distributorsLifeCycler)
288292
} else {
289293
ingestionRateStrategy = newLocalIngestionRateStrategy(limits)
294+
nativeHistogramIngestionRateStrategy = newLocalNativeHistogramIngestionRateStrategy(limits)
290295
}
291296

292297
d := &Distributor{
293-
cfg: cfg,
294-
log: log,
295-
ingestersRing: ingestersRing,
296-
ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log),
297-
distributorsLifeCycler: distributorsLifeCycler,
298-
distributorsRing: distributorsRing,
299-
limits: limits,
300-
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
301-
HATracker: haTracker,
302-
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
298+
cfg: cfg,
299+
log: log,
300+
ingestersRing: ingestersRing,
301+
ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log),
302+
distributorsLifeCycler: distributorsLifeCycler,
303+
distributorsRing: distributorsRing,
304+
limits: limits,
305+
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
306+
nativeHistogramIngestionRateLimiter: limiter.NewRateLimiter(nativeHistogramIngestionRateStrategy, 10*time.Second),
307+
HATracker: haTracker,
308+
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
303309

304310
queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
305311
Namespace: "cortex",
@@ -754,7 +760,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
754760
}
755761

756762
// A WriteRequest can only contain series or metadata but not both. This might change in the future.
757-
seriesKeys, validatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, err := d.prepareSeriesKeys(ctx, req, userID, limits, removeReplica)
763+
seriesKeys, nhSeriesKeys, validatedTimeseries, nhValidatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, err := d.prepareSeriesKeys(ctx, req, userID, limits, removeReplica)
758764
if err != nil {
759765
return nil, err
760766
}
@@ -765,6 +771,15 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
765771
d.receivedExemplars.WithLabelValues(userID).Add(float64(validatedExemplars))
766772
d.receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata)))
767773

774+
if !d.nativeHistogramIngestionRateLimiter.AllowN(now, userID, validatedHistogramSamples) {
775+
level.Warn(d.log).Log("msg", "native histogram ingestion rate limit (%v) exceeded while adding %d native histogram samples", d.nativeHistogramIngestionRateLimiter.Limit(now, userID), validatedHistogramSamples)
776+
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramRateLimited, userID).Add(float64(validatedHistogramSamples))
777+
validatedHistogramSamples = 0
778+
} else {
779+
seriesKeys = append(seriesKeys, nhSeriesKeys...)
780+
validatedTimeseries = append(validatedTimeseries, nhValidatedTimeseries...)
781+
}
782+
768783
if len(seriesKeys) == 0 && len(metadataKeys) == 0 {
769784
// Ensure the request slice is reused if there's no series or metadata passing the validation.
770785
cortexpb.ReuseSlice(req.Timeseries)
@@ -936,14 +951,16 @@ type samplesLabelSetEntry struct {
936951
labels labels.Labels
937952
}
938953

939-
func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.WriteRequest, userID string, limits *validation.Limits, removeReplica bool) ([]uint32, []cortexpb.PreallocTimeseries, int, int, int, error, error) {
954+
func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.WriteRequest, userID string, limits *validation.Limits, removeReplica bool) ([]uint32, []uint32, []cortexpb.PreallocTimeseries, []cortexpb.PreallocTimeseries, int, int, int, error, error) {
940955
pSpan, _ := opentracing.StartSpanFromContext(ctx, "prepareSeriesKeys")
941956
defer pSpan.Finish()
942957

943958
// For each timeseries or samples, we compute a hash to distribute across ingesters;
944-
// check each sample/metadata and discard if outside limits.
959+
// check each sample/metadata and discard if outside limits.
945960
validatedTimeseries := make([]cortexpb.PreallocTimeseries, 0, len(req.Timeseries))
961+
nhValidatedTimeseries := make([]cortexpb.PreallocTimeseries, 0, len(req.Timeseries))
946962
seriesKeys := make([]uint32, 0, len(req.Timeseries))
963+
nhSeriesKeys := make([]uint32, 0, len(req.Timeseries))
947964
validatedFloatSamples := 0
948965
validatedHistogramSamples := 0
949966
validatedExemplars := 0
@@ -1051,7 +1068,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
10511068
// label and dropped labels (if any)
10521069
key, err := d.tokenForLabels(userID, ts.Labels)
10531070
if err != nil {
1054-
return nil, nil, 0, 0, 0, nil, err
1071+
return nil, nil, nil, nil, 0, 0, 0, nil, err
10551072
}
10561073
validatedSeries, validationErr := d.validateSeries(ts, userID, skipLabelNameValidation, limits)
10571074

@@ -1086,8 +1103,13 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
10861103
}
10871104
}
10881105

1089-
seriesKeys = append(seriesKeys, key)
1090-
validatedTimeseries = append(validatedTimeseries, validatedSeries)
1106+
if len(ts.Histograms) > 0 {
1107+
nhSeriesKeys = append(nhSeriesKeys, key)
1108+
nhValidatedTimeseries = append(nhValidatedTimeseries, validatedSeries)
1109+
} else {
1110+
seriesKeys = append(seriesKeys, key)
1111+
validatedTimeseries = append(validatedTimeseries, validatedSeries)
1112+
}
10911113
validatedFloatSamples += len(ts.Samples)
10921114
validatedHistogramSamples += len(ts.Histograms)
10931115
validatedExemplars += len(ts.Exemplars)
@@ -1103,7 +1125,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
11031125
}
11041126
}
11051127

1106-
return seriesKeys, validatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, nil
1128+
return seriesKeys, nhSeriesKeys, validatedTimeseries, nhValidatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, nil
11071129
}
11081130

11091131
func sortLabelsIfNeeded(labels []cortexpb.LabelAdapter) {

pkg/distributor/distributor_test.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -681,6 +681,147 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
681681
}
682682
}
683683

684+
func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) {
685+
t.Parallel()
686+
type testPush struct {
687+
samples int
688+
nhSamples int
689+
metadata int
690+
expectedError error
691+
expectedNHDiscardedSampleMetricValue int
692+
}
693+
694+
ctx := user.InjectOrgID(context.Background(), "user")
695+
tests := map[string]struct {
696+
distributors int
697+
ingestionRateStrategy string
698+
ingestionRate float64
699+
ingestionBurstSize int
700+
nativeHistogramIngestionRateStrategy string
701+
nativeHistogramIngestionRate float64
702+
nativeHistogramIngestionBurstSize int
703+
pushes []testPush
704+
}{
705+
"local strategy: native histograms limit should be set to each distributor": {
706+
distributors: 2,
707+
ingestionRateStrategy: validation.LocalIngestionRateStrategy,
708+
ingestionRate: 20,
709+
ingestionBurstSize: 20,
710+
nativeHistogramIngestionRate: 10,
711+
nativeHistogramIngestionBurstSize: 10,
712+
pushes: []testPush{
713+
{nhSamples: 4, expectedError: nil},
714+
{nhSamples: 6, expectedError: nil},
715+
{nhSamples: 6, expectedError: nil, expectedNHDiscardedSampleMetricValue: 6},
716+
{nhSamples: 4, metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 10},
717+
{nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 11},
718+
{nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 12},
719+
},
720+
},
721+
"global strategy: native histograms limit should be evenly shared across distributors": {
722+
distributors: 2,
723+
ingestionRateStrategy: validation.GlobalIngestionRateStrategy,
724+
ingestionRate: 20,
725+
ingestionBurstSize: 10,
726+
nativeHistogramIngestionRate: 10,
727+
nativeHistogramIngestionBurstSize: 5,
728+
pushes: []testPush{
729+
{nhSamples: 2, expectedError: nil},
730+
{nhSamples: 1, expectedError: nil},
731+
{nhSamples: 3, metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 3},
732+
{nhSamples: 2, expectedError: nil, expectedNHDiscardedSampleMetricValue: 3},
733+
{nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 4},
734+
{nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 5},
735+
},
736+
},
737+
"global strategy: native histograms burst should set to each distributor": {
738+
distributors: 2,
739+
ingestionRateStrategy: validation.GlobalIngestionRateStrategy,
740+
ingestionRate: 20,
741+
ingestionBurstSize: 40,
742+
nativeHistogramIngestionRate: 10,
743+
nativeHistogramIngestionBurstSize: 20,
744+
pushes: []testPush{
745+
{nhSamples: 10, expectedError: nil},
746+
{nhSamples: 5, expectedError: nil},
747+
{nhSamples: 6, metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 6},
748+
{nhSamples: 5, expectedError: nil, expectedNHDiscardedSampleMetricValue: 6},
749+
{nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 7},
750+
{nhSamples: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 8},
751+
},
752+
},
753+
"local strategy: If NH samples hit NH rate limit, other samples should succeed when under rate limit": {
754+
distributors: 2,
755+
ingestionRateStrategy: validation.LocalIngestionRateStrategy,
756+
ingestionRate: 20,
757+
ingestionBurstSize: 20,
758+
nativeHistogramIngestionRate: 5,
759+
nativeHistogramIngestionBurstSize: 5,
760+
pushes: []testPush{
761+
{samples: 5, nhSamples: 4, expectedError: nil},
762+
{samples: 6, nhSamples: 2, expectedError: nil, expectedNHDiscardedSampleMetricValue: 2},
763+
{samples: 4, metadata: 1, nhSamples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (20) exceeded while adding 5 samples and 1 metadata"), expectedNHDiscardedSampleMetricValue: 2},
764+
{metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 2},
765+
},
766+
},
767+
"global strategy: If NH samples hit NH rate limit, other samples should succeed when under rate limit": {
768+
distributors: 2,
769+
ingestionRateStrategy: validation.GlobalIngestionRateStrategy,
770+
ingestionRate: 20,
771+
ingestionBurstSize: 10,
772+
nativeHistogramIngestionRate: 10,
773+
nativeHistogramIngestionBurstSize: 5,
774+
pushes: []testPush{
775+
{samples: 3, nhSamples: 2, expectedError: nil},
776+
{samples: 3, nhSamples: 4, expectedError: nil, expectedNHDiscardedSampleMetricValue: 4},
777+
{samples: 1, metadata: 1, nhSamples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10) exceeded while adding 2 samples and 1 metadata"), expectedNHDiscardedSampleMetricValue: 4},
778+
{metadata: 1, expectedError: nil, expectedNHDiscardedSampleMetricValue: 4},
779+
},
780+
},
781+
}
782+
783+
for testName, testData := range tests {
784+
testData := testData
785+
786+
t.Run(testName, func(t *testing.T) {
787+
t.Parallel()
788+
limits := &validation.Limits{}
789+
flagext.DefaultValues(limits)
790+
limits.IngestionRateStrategy = testData.ingestionRateStrategy
791+
limits.IngestionRate = testData.ingestionRate
792+
limits.IngestionBurstSize = testData.ingestionBurstSize
793+
limits.NativeHistogramIngestionRate = testData.nativeHistogramIngestionRate
794+
limits.NativeHistogramIngestionBurstSize = testData.nativeHistogramIngestionBurstSize
795+
796+
// Start all expected distributors
797+
distributors, _, _, _ := prepare(t, prepConfig{
798+
numIngesters: 3,
799+
happyIngesters: 3,
800+
numDistributors: testData.distributors,
801+
shardByAllLabels: true,
802+
limits: limits,
803+
})
804+
805+
// Push samples in multiple requests to the first distributor
806+
for _, push := range testData.pushes {
807+
var request = makeWriteRequest(0, push.samples, push.metadata, push.nhSamples)
808+
809+
response, err := distributors[0].Push(ctx, request)
810+
811+
if push.expectedError == nil {
812+
assert.Equal(t, emptyResponse, response)
813+
assert.Nil(t, err)
814+
} else {
815+
assert.Nil(t, response)
816+
assert.Equal(t, push.expectedError, err)
817+
}
818+
assert.Equal(t, float64(push.expectedNHDiscardedSampleMetricValue), testutil.ToFloat64(distributors[0].validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramRateLimited, "user")))
819+
}
820+
})
821+
}
822+
823+
}
824+
684825
func TestPush_QuorumError(t *testing.T) {
685826
t.Parallel()
686827

pkg/distributor/ingestion_rate_strategy.go

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func newGlobalIngestionRateStrategy(limits *validation.Overrides, ring ReadLifec
4545
func (s *globalStrategy) Limit(tenantID string) float64 {
4646
numDistributors := s.ring.HealthyInstancesCount()
4747

48-
if numDistributors == 0 {
48+
if numDistributors == 0 || s.limits.IngestionRate(tenantID) == float64(rate.Inf) {
4949
return s.limits.IngestionRate(tenantID)
5050
}
5151

@@ -72,3 +72,49 @@ func (s *infiniteStrategy) Burst(tenantID string) int {
7272
// Burst is ignored when limit = rate.Inf
7373
return 0
7474
}
75+
76+
type localStrategyNativeHistogram struct {
77+
limits *validation.Overrides
78+
}
79+
80+
func newLocalNativeHistogramIngestionRateStrategy(limits *validation.Overrides) limiter.RateLimiterStrategy {
81+
return &localStrategyNativeHistogram{
82+
limits: limits,
83+
}
84+
}
85+
86+
func (s *localStrategyNativeHistogram) Limit(tenantID string) float64 {
87+
return s.limits.NativeHistogramIngestionRate(tenantID)
88+
}
89+
90+
func (s *localStrategyNativeHistogram) Burst(tenantID string) int {
91+
return s.limits.NativeHistogramIngestionBurstSize(tenantID)
92+
}
93+
94+
type globalStrategyNativeHistogram struct {
95+
limits *validation.Overrides
96+
ring ReadLifecycler
97+
}
98+
99+
func newGlobalNativeHistogramIngestionRateStrategy(limits *validation.Overrides, ring ReadLifecycler) limiter.RateLimiterStrategy {
100+
return &globalStrategyNativeHistogram{
101+
limits: limits,
102+
ring: ring,
103+
}
104+
}
105+
106+
func (s *globalStrategyNativeHistogram) Limit(tenantID string) float64 {
107+
numDistributors := s.ring.HealthyInstancesCount()
108+
109+
if numDistributors == 0 || s.limits.NativeHistogramIngestionRate(tenantID) == float64(rate.Inf) {
110+
return s.limits.NativeHistogramIngestionRate(tenantID)
111+
}
112+
113+
return s.limits.NativeHistogramIngestionRate(tenantID) / float64(numDistributors)
114+
}
115+
116+
func (s *globalStrategyNativeHistogram) Burst(tenantID string) int {
117+
// The meaning of burst doesn't change for the global strategy, in order
118+
// to keep it easier to understand for users / operators.
119+
return s.limits.NativeHistogramIngestionBurstSize(tenantID)
120+
}

0 commit comments

Comments
 (0)