Skip to content

fix(limits): Read the consumer group and topic from the ingest-limits config #17831

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jun 2, 2025
9 changes: 9 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1069,6 +1069,15 @@ ingest_limits:
# CLI flag: -ingest-limits.lifecycler.ID
[id: <string> | default = "<hostname>"]

# The consumer group for the Kafka topic used to read stream metadata records.
# CLI flag: -ingest-limits.consumer-group
[consumer_group: <string> | default = "ingest-limits"]

# The topic for the Kafka topic used to read and write stream metadata
# records.
# CLI flag: -ingest-limits.topic
[topic: <string> | default = ""]

ingest_limits_frontend:
client_config:
# Configures client gRPC connections to limits service.
Expand Down
18 changes: 18 additions & 0 deletions pkg/limits/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (
DefaultBucketSize = 1 * time.Minute
DefaultEvictInterval = 30 * time.Minute
DefaultNumPartitions = 64
DefaultConsumerGroup = "ingest-limits"
)

// Config represents the configuration for the ingest limits service.
Expand Down Expand Up @@ -49,6 +50,8 @@ type Config struct {
// LifecyclerConfig is the config to build a ring lifecycler.
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`
KafkaConfig kafka.Config `yaml:"-"`
ConsumerGroup string `yaml:"consumer_group"`
Topic string `yaml:"topic"`

// Deprecated.
WindowSize time.Duration `yaml:"window_size" doc:"hidden|deprecated"`
Expand Down Expand Up @@ -93,6 +96,18 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
DefaultNumPartitions,
"The number of partitions for the Kafka topic used to read and write stream metadata. It is fixed, not a maximum.",
)
f.StringVar(
&cfg.ConsumerGroup,
"ingest-limits.consumer-group",
DefaultConsumerGroup,
"The consumer group for the Kafka topic used to read stream metadata records.",
)
f.StringVar(
&cfg.Topic,
"ingest-limits.topic",
"",
"The topic for the Kafka topic used to read and write stream metadata records.",
)
}

func (cfg *Config) Validate() error {
Expand All @@ -114,5 +129,8 @@ func (cfg *Config) Validate() error {
if cfg.NumPartitions <= 0 {
return errors.New("num-partitions must be greater than 0")
}
if cfg.Topic == "" {
return errors.New("topic must be set")
}
return nil
}
6 changes: 2 additions & 4 deletions pkg/limits/frontend/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,9 @@ func (g *ringGatherer) exceedsLimits(ctx context.Context, req *proto.ExceedsLimi
// Make a copy of the streams from the request. We will prune this slice
// each time we receive the responses from a zone.
streams := make([]*proto.StreamMetadata, 0, len(req.Streams))
for _, stream := range req.Streams {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

streams = append(streams, stream)
}
streams = append(streams, req.Streams...)
// Query each zone as ordered in zonesToQuery. If a zone answers all
// streams, the request is satisifed and there is no need to query
// streams, the request is satisfied and there is no need to query
// subsequent zones. If a zone answers just a subset of streams
// (i.e. the instance that is consuming a partition is unavailable or the
// partition that owns one or more streams does not have a consumer)
Expand Down
11 changes: 4 additions & 7 deletions pkg/limits/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ const (
// Ring
RingKey = "ingest-limits"
RingName = "ingest-limits"

// Kafka
consumerGroup = "ingest-limits"
)

// MetadataTopic returns the metadata topic name for the given topic.
Expand Down Expand Up @@ -154,13 +151,13 @@ func New(cfg Config, lims Limits, logger log.Logger, reg prometheus.Registerer)

// Create a copy of the config to modify the topic
kCfg := cfg.KafkaConfig
kCfg.Topic = MetadataTopic(kCfg.Topic)
kCfg.Topic = cfg.Topic
kCfg.AutoCreateTopicEnabled = true
kCfg.AutoCreateTopicDefaultPartitions = cfg.NumPartitions

offsetManager, err := partition.NewKafkaOffsetManager(
kCfg,
"ingest-limits",
cfg.ConsumerGroup,
logger,
prometheus.NewRegistry(),
)
Expand All @@ -176,8 +173,8 @@ func New(cfg Config, lims Limits, logger log.Logger, reg prometheus.Registerer)
)

s.clientReader, err = client.NewReaderClient("ingest-limits-reader", kCfg, logger, reg,
kgo.ConsumerGroup(consumerGroup),
kgo.ConsumeTopics(kCfg.Topic),
kgo.ConsumerGroup(cfg.ConsumerGroup),
kgo.ConsumeTopics(cfg.Topic),
kgo.Balancers(kgo.CooperativeStickyBalancer()),
kgo.ConsumeResetOffset(kgo.NewOffset().AfterMilli(s.clock.Now().Add(-s.cfg.ActiveWindow).UnixMilli())),
kgo.DisableAutoCommit(),
Expand Down
5 changes: 3 additions & 2 deletions pkg/limits/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,11 @@ func TestIngestLimits_ExceedsLimits_Concurrent(t *testing.T) {
func TestNew(t *testing.T) {
cfg := Config{
KafkaConfig: kafka.Config{
Topic: "test-topic",
WriteTimeout: 10 * time.Second,
},
ActiveWindow: time.Hour,
ConsumerGroup: "test-consumer-group",
Topic: "test-topic.metadata",
ActiveWindow: time.Hour,
LifecyclerConfig: ring.LifecyclerConfig{
RingConfig: ring.Config{
KVStore: kv.Config{
Expand Down
12 changes: 7 additions & 5 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,13 @@ func (c *Config) Validate() error {
if err := c.LimitsConfig.Validate(); err != nil {
errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid limits_config config"))
}
if err := c.IngestLimits.Validate(); err != nil {
errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid ingest_limits config"))
}
if err := c.IngestLimitsFrontend.Validate(); err != nil {
errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid ingest_limits_frontend config"))
if c.IngestLimits.Enabled {
if err := c.IngestLimits.Validate(); err != nil {
errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid ingest_limits config"))
}
if err := c.IngestLimitsFrontend.Validate(); err != nil {
errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid ingest_limits_frontend config"))
}
}
if err := c.IngestLimitsFrontendClient.Validate(); err != nil {
errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid ingest_limits_frontend_client config"))
Expand Down
6 changes: 3 additions & 3 deletions tools/dev/kafka/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ services:
- ./provisioning:/etc/grafana/provisioning/
- ./data/grafana/:/var/lib/grafana/
extra_hosts:
- 'host.docker.internal:host-gateway'
- "host.docker.internal:host-gateway"
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
Expand All @@ -25,7 +25,7 @@ services:
depends_on:
- broker
extra_hosts:
- 'host.docker.internal:host-gateway'
- "host.docker.internal:host-gateway"
broker:
image: apache/kafka:latest
hostname: broker
Expand All @@ -49,5 +49,5 @@ services:
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
generator:
image: ctovena/log-generator:latest
image: theperiklis/log-generator:latest
command: -url http://host.docker.internal:3100/loki/api/v1/push
5 changes: 5 additions & 0 deletions tools/dev/kafka/loki-local-config.debug.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ common:

kafka_config:
topic: "loki"
reader_config:
address: localhost:9092
writer_config:
address: localhost:9092

querier:
query_partition_ingesters: true
Expand All @@ -28,6 +32,7 @@ ingest_limits:
enabled: true
active_window: 1h
num_partitions: 1
topic: "loki.metadata"
lifecycler:
ring:
kvstore:
Expand Down
2 changes: 1 addition & 1 deletion tools/stream-generator/generator/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (s *Generator) sendStreamsToKafka(ctx context.Context, streams []distributo
Key: []byte(tenant),
Value: b,
Partition: partitionID,
Topic: limits.MetadataTopic(s.cfg.Kafka.Topic),
Topic: fmt.Sprintf("%s.metadata", s.cfg.Kafka.Topic),
}
// Send to Kafka
produceResults := s.writer.ProduceSync(ctx, []*kgo.Record{metadataRecord})
Expand Down
Loading