Skip to content

Commit be1d61b

Browse files
committed
Adding Tests and Updating Docs
Signed-off-by: Paurush Garg <[email protected]>
1 parent e7d86bc commit be1d61b

File tree

4 files changed

+182
-68
lines changed

4 files changed

+182
-68
lines changed

docs/configuration/config-file-reference.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3427,6 +3427,10 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
34273427
# CLI flag: -distributor.ingestion-rate-limit
34283428
[ingestion_rate: <float> | default = 25000]
34293429
3430+
# Per-user nativeHistograms ingestion rate limit in samples per second.
3431+
# CLI flag: -distributor.native-histograms-ingestion-rate-limit
3432+
[native_histograms_ingestion_rate: <float> | default = 25000]
3433+
34303434
# Whether the ingestion rate limit should be applied individually to each
34313435
# distributor instance (local), or evenly shared across the cluster (global).
34323436
# CLI flag: -distributor.ingestion-rate-limit-strategy
@@ -3436,6 +3440,10 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
34363440
# CLI flag: -distributor.ingestion-burst-size
34373441
[ingestion_burst_size: <int> | default = 50000]
34383442
3443+
# Per-user allowed nativeHistograms ingestion burst size (in number of samples).
3444+
# CLI flag: -distributor.native-histograms-ingestion-burst-size
3445+
[native_histograms_ingestion_burst_size: <int> | default = 50000]
3446+
34393447
# Flag to enable, for all users, handling of samples with external labels
34403448
# identifying replicas in an HA Prometheus setup.
34413449
# CLI flag: -distributor.ha-tracker.enable-for-all-users

pkg/distributor/distributor.go

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -781,29 +781,25 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
781781
totalSamples := validatedFloatSamples + validatedHistogramSamples
782782
totalN := totalSamples + validatedExemplars + len(validatedMetadata)
783783

784-
if !d.nativeHistogramsIngestionRateLimiter.AllowN(now, userID, validatedHistogramSamples) {
784+
nhRateLimited := !d.nativeHistogramsIngestionRateLimiter.AllowN(now, userID, validatedHistogramSamples)
785+
rateLimited := !d.ingestionRateLimiter.AllowN(now, userID, totalN)
786+
787+
if nhRateLimited || rateLimited {
785788
// Ensure the request slice is reused if the request is rate limited.
786789
cortexpb.ReuseSlice(req.Timeseries)
787790

788791
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(totalSamples))
789792
d.validateMetrics.DiscardedExemplars.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(validatedExemplars))
790793
d.validateMetrics.DiscardedMetadata.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(len(validatedMetadata)))
791-
// Return a 429 here to tell the client it is going too fast.
792-
// Client may discard the data or slow down and re-send.
793-
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
794-
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.nativeHistogramsIngestionRateLimiter.Limit(now, userID), totalSamples, len(validatedMetadata))
795794
}
796795

797-
if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {
798-
// Ensure the request slice is reused if the request is rate limited.
799-
cortexpb.ReuseSlice(req.Timeseries)
800-
801-
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(totalSamples))
802-
d.validateMetrics.DiscardedExemplars.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedExemplars))
803-
d.validateMetrics.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata)))
804-
// Return a 429 here to tell the client it is going too fast.
805-
// Client may discard the data or slow down and re-send.
806-
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
796+
// Return a 429 here to tell the client it is going too fast.
797+
// Client may discard the data or slow down and re-send.
798+
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
799+
if nhRateLimited {
800+
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.nativeHistogramsIngestionRateLimiter.Limit(now, userID), totalSamples, len(validatedMetadata))
801+
}
802+
if rateLimited {
807803
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), totalSamples, len(validatedMetadata))
808804
}
809805

pkg/distributor/distributor_test.go

Lines changed: 161 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -597,17 +597,21 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
597597

598598
ctx := user.InjectOrgID(context.Background(), "user")
599599
tests := map[string]struct {
600-
distributors int
601-
ingestionRateStrategy string
602-
ingestionRate float64
603-
ingestionBurstSize int
604-
pushes []testPush
600+
distributors int
601+
ingestionRateStrategy string
602+
ingestionRate float64
603+
ingestionBurstSize int
604+
nativeHistogramsIngestionRate float64
605+
nativeHistogramsIngestionBurstSize int
606+
pushes []testPush
605607
}{
606608
"local strategy: limit should be set to each distributor": {
607-
distributors: 2,
608-
ingestionRateStrategy: validation.LocalIngestionRateStrategy,
609-
ingestionRate: 10,
610-
ingestionBurstSize: 10,
609+
distributors: 2,
610+
ingestionRateStrategy: validation.LocalIngestionRateStrategy,
611+
ingestionRate: 10,
612+
ingestionBurstSize: 10,
613+
nativeHistogramsIngestionRate: 10,
614+
nativeHistogramsIngestionBurstSize: 10,
611615
pushes: []testPush{
612616
{samples: 4, expectedError: nil},
613617
{metadata: 1, expectedError: nil},
@@ -618,10 +622,12 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
618622
},
619623
},
620624
"global strategy: limit should be evenly shared across distributors": {
621-
distributors: 2,
622-
ingestionRateStrategy: validation.GlobalIngestionRateStrategy,
623-
ingestionRate: 10,
624-
ingestionBurstSize: 5,
625+
distributors: 2,
626+
ingestionRateStrategy: validation.GlobalIngestionRateStrategy,
627+
ingestionRate: 10,
628+
ingestionBurstSize: 5,
629+
nativeHistogramsIngestionRate: 10,
630+
nativeHistogramsIngestionBurstSize: 5,
625631
pushes: []testPush{
626632
{samples: 2, expectedError: nil},
627633
{samples: 1, expectedError: nil},
@@ -632,10 +638,12 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
632638
},
633639
},
634640
"global strategy: burst should set to each distributor": {
635-
distributors: 2,
636-
ingestionRateStrategy: validation.GlobalIngestionRateStrategy,
637-
ingestionRate: 10,
638-
ingestionBurstSize: 20,
641+
distributors: 2,
642+
ingestionRateStrategy: validation.GlobalIngestionRateStrategy,
643+
ingestionRate: 10,
644+
ingestionBurstSize: 20,
645+
nativeHistogramsIngestionRate: 10,
646+
nativeHistogramsIngestionBurstSize: 20,
639647
pushes: []testPush{
640648
{samples: 10, expectedError: nil},
641649
{samples: 5, expectedError: nil},
@@ -650,46 +658,148 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
650658
for testName, testData := range tests {
651659
testData := testData
652660

653-
for _, enableHistogram := range []bool{false, true} {
654-
enableHistogram := enableHistogram
655-
t.Run(fmt.Sprintf("%s, histogram=%s", testName, strconv.FormatBool(enableHistogram)), func(t *testing.T) {
656-
t.Parallel()
657-
limits := &validation.Limits{}
658-
flagext.DefaultValues(limits)
659-
limits.IngestionRateStrategy = testData.ingestionRateStrategy
660-
limits.IngestionRate = testData.ingestionRate
661-
limits.IngestionBurstSize = testData.ingestionBurstSize
661+
t.Run(testName, func(t *testing.T) {
662+
t.Parallel()
663+
limits := &validation.Limits{}
664+
flagext.DefaultValues(limits)
665+
limits.IngestionRateStrategy = testData.ingestionRateStrategy
666+
limits.IngestionRate = testData.ingestionRate
667+
limits.IngestionBurstSize = testData.ingestionBurstSize
668+
limits.NativeHistogramsIngestionRate = testData.nativeHistogramsIngestionRate
669+
limits.NativeHistogramsIngestionBurstSize = testData.nativeHistogramsIngestionBurstSize
670+
671+
// Start all expected distributors
672+
distributors, _, _, _ := prepare(t, prepConfig{
673+
numIngesters: 3,
674+
happyIngesters: 3,
675+
numDistributors: testData.distributors,
676+
shardByAllLabels: true,
677+
limits: limits,
678+
})
662679

663-
// Start all expected distributors
664-
distributors, _, _, _ := prepare(t, prepConfig{
665-
numIngesters: 3,
666-
happyIngesters: 3,
667-
numDistributors: testData.distributors,
668-
shardByAllLabels: true,
669-
limits: limits,
670-
})
680+
// Push samples in multiple requests to the first distributor
681+
for _, push := range testData.pushes {
682+
var request = makeWriteRequest(0, push.samples, push.metadata, 0)
671683

672-
// Push samples in multiple requests to the first distributor
673-
for _, push := range testData.pushes {
674-
var request *cortexpb.WriteRequest
675-
if !enableHistogram {
676-
request = makeWriteRequest(0, push.samples, push.metadata, 0)
677-
} else {
678-
request = makeWriteRequest(0, 0, push.metadata, push.samples)
679-
}
680-
response, err := distributors[0].Push(ctx, request)
684+
response, err := distributors[0].Push(ctx, request)
681685

682-
if push.expectedError == nil {
683-
assert.Equal(t, emptyResponse, response)
684-
assert.Nil(t, err)
685-
} else {
686-
assert.Nil(t, response)
687-
assert.Equal(t, push.expectedError, err)
688-
}
686+
if push.expectedError == nil {
687+
assert.Equal(t, emptyResponse, response)
688+
assert.Nil(t, err)
689+
} else {
690+
assert.Nil(t, response)
691+
assert.Equal(t, push.expectedError, err)
689692
}
693+
}
694+
})
695+
}
696+
}
697+
698+
func TestDistributor_PushIngestionRateLimiter_Histograms(t *testing.T) {
699+
t.Parallel()
700+
type testPush struct {
701+
samples int
702+
metadata int
703+
expectedError error
704+
}
705+
706+
ctx := user.InjectOrgID(context.Background(), "user")
707+
tests := map[string]struct {
708+
distributors int
709+
ingestionRateStrategy string
710+
ingestionRate float64
711+
ingestionBurstSize int
712+
nativeHistogramsIngestionRate float64
713+
nativeHistogramsIngestionBurstSize int
714+
pushes []testPush
715+
}{
716+
"local strategy: limit should be set to each distributor: histograms": {
717+
distributors: 2,
718+
ingestionRateStrategy: validation.LocalIngestionRateStrategy,
719+
ingestionRate: 10,
720+
ingestionBurstSize: 10,
721+
nativeHistogramsIngestionRate: 5,
722+
nativeHistogramsIngestionBurstSize: 5,
723+
pushes: []testPush{
724+
{samples: 2, expectedError: nil},
725+
{metadata: 1, expectedError: nil},
726+
{samples: 4, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (5) exceeded while adding 4 samples and 0 metadata")},
727+
{samples: 2, metadata: 1, expectedError: nil},
728+
{samples: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (5) exceeded while adding 3 samples and 0 metadata")},
729+
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (10) exceeded while adding 0 samples and 1 metadata")},
730+
},
731+
},
732+
"global strategy: limit should be evenly shared across distributors: histograms": {
733+
distributors: 2,
734+
ingestionRateStrategy: validation.GlobalIngestionRateStrategy,
735+
ingestionRate: 10,
736+
ingestionBurstSize: 5,
737+
nativeHistogramsIngestionRate: 6,
738+
nativeHistogramsIngestionBurstSize: 3,
739+
pushes: []testPush{
740+
{samples: 2, expectedError: nil},
741+
{samples: 2, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (3) exceeded while adding 2 samples and 1 metadata")},
742+
{samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 1 samples and 0 metadata")},
743+
{samples: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (3) exceeded while adding 1 samples and 0 metadata")},
744+
},
745+
},
746+
"global strategy: burst should set to each distributor: histograms": {
747+
distributors: 2,
748+
ingestionRateStrategy: validation.GlobalIngestionRateStrategy,
749+
ingestionRate: 10,
750+
ingestionBurstSize: 20,
751+
nativeHistogramsIngestionRate: 6,
752+
nativeHistogramsIngestionBurstSize: 10,
753+
pushes: []testPush{
754+
{samples: 3, expectedError: nil},
755+
{samples: 1, expectedError: nil},
756+
{samples: 7, metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (3) exceeded while adding 7 samples and 1 metadata")},
757+
{samples: 5, expectedError: nil},
758+
{samples: 3, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (3) exceeded while adding 3 samples and 0 metadata")},
759+
{metadata: 12, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 0 samples and 12 metadata")},
760+
},
761+
},
762+
}
763+
764+
for testName, testData := range tests {
765+
testData := testData
766+
767+
t.Run(testName, func(t *testing.T) {
768+
t.Parallel()
769+
limits := &validation.Limits{}
770+
flagext.DefaultValues(limits)
771+
limits.IngestionRateStrategy = testData.ingestionRateStrategy
772+
limits.IngestionRate = testData.ingestionRate
773+
limits.IngestionBurstSize = testData.ingestionBurstSize
774+
limits.NativeHistogramsIngestionRate = testData.nativeHistogramsIngestionRate
775+
limits.NativeHistogramsIngestionBurstSize = testData.nativeHistogramsIngestionBurstSize
776+
777+
// Start all expected distributors
778+
distributors, _, _, _ := prepare(t, prepConfig{
779+
numIngesters: 3,
780+
happyIngesters: 3,
781+
numDistributors: testData.distributors,
782+
shardByAllLabels: true,
783+
limits: limits,
690784
})
691-
}
785+
786+
// Push samples in multiple requests to the first distributor
787+
for _, push := range testData.pushes {
788+
var request = makeWriteRequest(0, 0, push.metadata, push.samples)
789+
790+
response, err := distributors[0].Push(ctx, request)
791+
792+
if push.expectedError == nil {
793+
assert.Equal(t, emptyResponse, response)
794+
assert.Nil(t, err)
795+
} else {
796+
assert.Nil(t, response)
797+
assert.Equal(t, push.expectedError, err)
798+
}
799+
}
800+
})
692801
}
802+
693803
}
694804

695805
func TestPush_QuorumError(t *testing.T) {

pkg/util/validation/limits.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,10 +242,10 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
242242

243243
f.IntVar(&l.IngestionTenantShardSize, "distributor.ingestion-tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used. Must be set both on ingesters and distributors. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.")
244244
f.Float64Var(&l.IngestionRate, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.")
245-
f.Float64Var(&l.NativeHistogramsIngestionRate, "distributor.native-histograms-ingestion-rate-limit", 2500, "Per-user nativeHistograms ingestion rate limit in samples per second.")
245+
f.Float64Var(&l.NativeHistogramsIngestionRate, "distributor.native-histograms-ingestion-rate-limit", 25000, "Per-user nativeHistograms ingestion rate limit in samples per second.")
246246
f.StringVar(&l.IngestionRateStrategy, "distributor.ingestion-rate-limit-strategy", "local", "Whether the ingestion rate limit should be applied individually to each distributor instance (local), or evenly shared across the cluster (global).")
247247
f.IntVar(&l.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples).")
248-
f.IntVar(&l.NativeHistogramsIngestionBurstSize, "distributor.native-histograms-ingestion-burst-size", 5000, "Per-user allowed nativeHistograms ingestion burst size (in number of samples).")
248+
f.IntVar(&l.NativeHistogramsIngestionBurstSize, "distributor.native-histograms-ingestion-burst-size", 50000, "Per-user allowed nativeHistograms ingestion burst size (in number of samples).")
249249
f.BoolVar(&l.AcceptHASamples, "distributor.ha-tracker.enable-for-all-users", false, "Flag to enable, for all users, handling of samples with external labels identifying replicas in an HA Prometheus setup.")
250250
f.BoolVar(&l.AcceptMixedHASamples, "experimental.distributor.ha-tracker.mixed-ha-samples", false, "[Experimental] Flag to enable handling of samples with mixed external labels identifying replicas in an HA Prometheus setup. Supported only if -distributor.ha-tracker.enable-for-all-users is true.")
251251
f.StringVar(&l.HAClusterLabel, "distributor.ha-tracker.cluster", "cluster", "Prometheus label to look for in samples to identify a Prometheus HA cluster.")

0 commit comments

Comments
 (0)