Skip to content

Commit 8c053a3

Browse files
authored
Add distributor inflight client request limit (#6376)
Signed-off-by: Anna Tran <[email protected]>
1 parent 006cab3 commit 8c053a3

File tree

4 files changed

+80
-16
lines changed

4 files changed

+80
-16
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
* [ENHANCEMENT] Ingester: Introduce a new experimental feature for caching expanded postings on the ingester. #6296
3939
* [ENHANCEMENT] Querier/Ruler: Expose `store_gateway_consistency_check_max_attempts` for max retries when querying store gateway in consistency check. #6276
4040
* [ENHANCEMENT] StoreGateway: Add new `cortex_bucket_store_chunk_pool_inuse_bytes` metric to track the usage in chunk pool. #6310
41+
* [ENHANCEMENT] Distributor: Add new `cortex_distributor_inflight_client_requests` metric to track number of ingester client inflight requests. #6358
4142
* [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224
4243
* [BUGFIX] Ruler: Allow rule evaluation to complete during shutdown. #6326
4344
* [BUGFIX] Ring: update ring with new ip address when instance is lost, rejoins, but heartbeat is disabled #6271

docs/configuration/config-file-reference.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2679,6 +2679,12 @@ instance_limits:
26792679
# CLI flag: -distributor.instance-limits.max-inflight-push-requests
26802680
[max_inflight_push_requests: <int> | default = 0]
26812681
2682+
# Max inflight ingester client requests that this distributor can handle. This
2683+
# limit is per-distributor, not per-tenant. Additional requests will be
2684+
# rejected. 0 = unlimited.
2685+
# CLI flag: -distributor.instance-limits.max-inflight-client-requests
2686+
[max_inflight_client_requests: <int> | default = 0]
2687+
26822688
otlp:
26832689
# If true, all resource attributes are converted to labels.
26842690
# CLI flag: -distributor.otlp.convert-all-attributes

pkg/distributor/distributor.go

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ var (
5555
// Distributor instance limits errors.
5656
errTooManyInflightPushRequests = errors.New("too many inflight push requests in distributor")
5757
errMaxSamplesPushRateLimitReached = errors.New("distributor's samples push rate limit reached")
58+
errTooManyInflightClientRequests = errors.New("too many inflight ingester client requests in distributor")
5859
)
5960

6061
const (
@@ -104,8 +105,9 @@ type Distributor struct {
104105

105106
activeUsers *util.ActiveUsersCleanupService
106107

107-
ingestionRate *util_math.EwmaRate
108-
inflightPushRequests atomic.Int64
108+
ingestionRate *util_math.EwmaRate
109+
inflightPushRequests atomic.Int64
110+
inflightClientRequests atomic.Int64
109111

110112
// Metrics
111113
queryDuration *instrument.HistogramCollector
@@ -171,8 +173,9 @@ type Config struct {
171173
}
172174

173175
type InstanceLimits struct {
174-
MaxIngestionRate float64 `yaml:"max_ingestion_rate"`
175-
MaxInflightPushRequests int `yaml:"max_inflight_push_requests"`
176+
MaxIngestionRate float64 `yaml:"max_ingestion_rate"`
177+
MaxInflightPushRequests int `yaml:"max_inflight_push_requests"`
178+
MaxInflightClientRequests int `yaml:"max_inflight_client_requests"`
176179
}
177180

178181
type OTLPConfig struct {
@@ -198,6 +201,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
198201

199202
f.Float64Var(&cfg.InstanceLimits.MaxIngestionRate, "distributor.instance-limits.max-ingestion-rate", 0, "Max ingestion rate (samples/sec) that this distributor will accept. This limit is per-distributor, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. 0 = unlimited.")
200203
f.IntVar(&cfg.InstanceLimits.MaxInflightPushRequests, "distributor.instance-limits.max-inflight-push-requests", 0, "Max inflight push requests that this distributor can handle. This limit is per-distributor, not per-tenant. Additional requests will be rejected. 0 = unlimited.")
204+
f.IntVar(&cfg.InstanceLimits.MaxInflightClientRequests, "distributor.instance-limits.max-inflight-client-requests", 0, "Max inflight ingester client requests that this distributor can handle. This limit is per-distributor, not per-tenant. Additional requests will be rejected. 0 = unlimited.")
201205

202206
f.BoolVar(&cfg.OTLPConfig.ConvertAllAttributes, "distributor.otlp.convert-all-attributes", false, "If true, all resource attributes are converted to labels.")
203207
f.BoolVar(&cfg.OTLPConfig.DisableTargetInfo, "distributor.otlp.disable-target-info", false, "If true, a target_info metric is not ingested. (refer to: https://github.com/prometheus/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems)")
@@ -374,6 +378,11 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
374378
Help: instanceLimitsMetricHelp,
375379
ConstLabels: map[string]string{limitLabel: "max_inflight_push_requests"},
376380
}).Set(float64(cfg.InstanceLimits.MaxInflightPushRequests))
381+
promauto.With(reg).NewGauge(prometheus.GaugeOpts{
382+
Name: instanceLimitsMetric,
383+
Help: instanceLimitsMetricHelp,
384+
ConstLabels: map[string]string{limitLabel: "max_inflight_client_requests"},
385+
}).Set(float64(cfg.InstanceLimits.MaxInflightClientRequests))
377386
promauto.With(reg).NewGauge(prometheus.GaugeOpts{
378387
Name: instanceLimitsMetric,
379388
Help: instanceLimitsMetricHelp,
@@ -386,6 +395,13 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
386395
}, func() float64 {
387396
return float64(d.inflightPushRequests.Load())
388397
})
398+
399+
promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
400+
Name: "cortex_distributor_inflight_client_requests",
401+
Help: "Current number of inflight client requests in distributor.",
402+
}, func() float64 {
403+
return float64(d.inflightClientRequests.Load())
404+
})
389405
promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
390406
Name: "cortex_distributor_ingestion_rate_samples_per_second",
391407
Help: "Current ingestion rate in samples/sec that distributor is using to limit access.",
@@ -661,6 +677,12 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
661677
}
662678
}
663679

680+
// only reject requests at this stage to allow distributor to finish sending the current batch request to all ingesters
681+
// even if we've exceeded the MaxInflightClientRequests in the `doBatch`
682+
if d.cfg.InstanceLimits.MaxInflightClientRequests > 0 && d.inflightClientRequests.Load() > int64(d.cfg.InstanceLimits.MaxInflightClientRequests) {
683+
return nil, errTooManyInflightClientRequests
684+
}
685+
664686
removeReplica := false
665687
// Cache user limit with overrides so we spend less CPU doing locking. See issue #4904
666688
limits := d.limits.GetOverridesForUser(userID)
@@ -1023,6 +1045,9 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time
10231045
req.Metadata = metadata
10241046
req.Source = source
10251047

1048+
d.inflightClientRequests.Inc()
1049+
defer d.inflightClientRequests.Dec()
1050+
10261051
_, err = c.PushPreAlloc(ctx, req)
10271052

10281053
// We should not reuse the req in case of errors:

pkg/distributor/distributor_test.go

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -786,13 +786,15 @@ func TestDistributor_PushInstanceLimits(t *testing.T) {
786786

787787
ctx := user.InjectOrgID(context.Background(), "user")
788788
tests := map[string]struct {
789-
preInflight int
790-
preRateSamples int // initial rate before first push
791-
pushes []testPush // rate is recomputed after each push
789+
preInflight int
790+
preInflightClient int
791+
preRateSamples int // initial rate before first push
792+
pushes []testPush // rate is recomputed after each push
792793

793794
// limits
794-
inflightLimit int
795-
ingestionRateLimit float64
795+
inflightLimit int
796+
inflightClientLimit int
797+
ingestionRateLimit float64
796798

797799
metricNames []string
798800
expectedMetrics string
@@ -809,6 +811,7 @@ func TestDistributor_PushInstanceLimits(t *testing.T) {
809811
expectedMetrics: `
810812
# HELP cortex_distributor_instance_limits Instance limits used by this distributor.
811813
# TYPE cortex_distributor_instance_limits gauge
814+
cortex_distributor_instance_limits{limit="max_inflight_client_requests"} 0
812815
cortex_distributor_instance_limits{limit="max_inflight_push_requests"} 0
813816
cortex_distributor_instance_limits{limit="max_ingestion_rate"} 0
814817
`,
@@ -828,6 +831,7 @@ func TestDistributor_PushInstanceLimits(t *testing.T) {
828831
829832
# HELP cortex_distributor_instance_limits Instance limits used by this distributor.
830833
# TYPE cortex_distributor_instance_limits gauge
834+
cortex_distributor_instance_limits{limit="max_inflight_client_requests"} 0
831835
cortex_distributor_instance_limits{limit="max_inflight_push_requests"} 101
832836
cortex_distributor_instance_limits{limit="max_ingestion_rate"} 0
833837
`,
@@ -839,6 +843,29 @@ func TestDistributor_PushInstanceLimits(t *testing.T) {
839843
{samples: 100, expectedError: errTooManyInflightPushRequests},
840844
},
841845
},
846+
"below inflight client limit": {
847+
preInflightClient: 90,
848+
inflightClientLimit: 101,
849+
pushes: []testPush{
850+
{samples: 100, expectedError: nil},
851+
},
852+
853+
metricNames: []string{instanceLimitsMetric},
854+
expectedMetrics: `
855+
# HELP cortex_distributor_instance_limits Instance limits used by this distributor.
856+
# TYPE cortex_distributor_instance_limits gauge
857+
cortex_distributor_instance_limits{limit="max_inflight_client_requests"} 101
858+
cortex_distributor_instance_limits{limit="max_inflight_push_requests"} 0
859+
cortex_distributor_instance_limits{limit="max_ingestion_rate"} 0
860+
`,
861+
},
862+
"hits inflight client limit": {
863+
preInflightClient: 103,
864+
inflightClientLimit: 101,
865+
pushes: []testPush{
866+
{samples: 100, expectedError: errTooManyInflightClientRequests},
867+
},
868+
},
842869
"below ingestion rate limit": {
843870
preRateSamples: 500,
844871
ingestionRateLimit: 1000,
@@ -855,6 +882,7 @@ func TestDistributor_PushInstanceLimits(t *testing.T) {
855882
856883
# HELP cortex_distributor_instance_limits Instance limits used by this distributor.
857884
# TYPE cortex_distributor_instance_limits gauge
885+
cortex_distributor_instance_limits{limit="max_inflight_client_requests"} 0
858886
cortex_distributor_instance_limits{limit="max_inflight_push_requests"} 0
859887
cortex_distributor_instance_limits{limit="max_ingestion_rate"} 1000
860888
`,
@@ -894,17 +922,19 @@ func TestDistributor_PushInstanceLimits(t *testing.T) {
894922

895923
// Start all expected distributors
896924
distributors, _, regs, _ := prepare(t, prepConfig{
897-
numIngesters: 3,
898-
happyIngesters: 3,
899-
numDistributors: 1,
900-
shardByAllLabels: true,
901-
limits: limits,
902-
maxInflightRequests: testData.inflightLimit,
903-
maxIngestionRate: testData.ingestionRateLimit,
925+
numIngesters: 3,
926+
happyIngesters: 3,
927+
numDistributors: 1,
928+
shardByAllLabels: true,
929+
limits: limits,
930+
maxInflightRequests: testData.inflightLimit,
931+
maxInflightClientRequests: testData.inflightClientLimit,
932+
maxIngestionRate: testData.ingestionRateLimit,
904933
})
905934

906935
d := distributors[0]
907936
d.inflightPushRequests.Add(int64(testData.preInflight))
937+
d.inflightClientRequests.Add(int64(testData.preInflightClient))
908938
d.ingestionRate.Add(int64(testData.preRateSamples))
909939

910940
d.ingestionRate.Tick()
@@ -2790,6 +2820,7 @@ type prepConfig struct {
27902820
numDistributors int
27912821
skipLabelNameValidation bool
27922822
maxInflightRequests int
2823+
maxInflightClientRequests int
27932824
maxIngestionRate float64
27942825
replicationFactor int
27952826
enableTracker bool
@@ -2907,6 +2938,7 @@ func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []
29072938
distributorCfg.DistributorRing.InstanceAddr = "127.0.0.1"
29082939
distributorCfg.SkipLabelNameValidation = cfg.skipLabelNameValidation
29092940
distributorCfg.InstanceLimits.MaxInflightPushRequests = cfg.maxInflightRequests
2941+
distributorCfg.InstanceLimits.MaxInflightClientRequests = cfg.maxInflightClientRequests
29102942
distributorCfg.InstanceLimits.MaxIngestionRate = cfg.maxIngestionRate
29112943

29122944
if cfg.shuffleShardEnabled {

0 commit comments

Comments
 (0)