-
Notifications
You must be signed in to change notification settings - Fork 3.7k
fix(limits): Read the consumer group and topic from the ingest-limits config #17831
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
648566c
to
bce467e
Compare
💻 Deploy preview available: |
Looks good but I think you also need to remove this function: // MetadataTopic returns the metadata topic name for the given topic.
func MetadataTopic(topic string) string {
return topic + ".metadata"
} and update its usage in different places like the stream generator. |
if c.IngestLimits.Enabled { | ||
if err := c.IngestLimits.Validate(); err != nil { | ||
errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid ingest_limits config")) | ||
} | ||
if err := c.IngestLimitsFrontend.Validate(); err != nil { | ||
errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid ingest_limits_frontend config")) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed this to conditional to fix the conflict with make validate-example-configs
looking for a valid ingest limits section.
@@ -107,7 +107,7 @@ func (s *Generator) sendStreamsToKafka(ctx context.Context, streams []distributo | |||
Key: []byte(tenant), | |||
Value: b, | |||
Partition: partitionID, | |||
Topic: limits.MetadataTopic(s.cfg.Kafka.Topic), | |||
Topic: fmt.Sprintf("%s.metadata", s.cfg.Kafka.Topic), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this read s.cfg.IngestLimits.Topic
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s.cfg
is not loki's config, but you are partially right we configure the topic for the generator via cli args, so this should be s.cfg.Kafka.Topic
f.StringVar( | ||
&cfg.Topic, | ||
"ingest-limits.topic", | ||
"", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should set a default here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kept this as we do in kafka_config.topic
to make this an explicit user choice.
What this PR does / why we need it:
This pull request enables a separate configuration of the topic and consumer-group for the ingest-limits service. This enables to separate consumers from other components like partition-ingesters.
Which issue(s) this PR fixes:
Fixes #
Special notes for your reviewer:
Checklist
CONTRIBUTING.md
guide (required)feat
PRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.docs/sources/setup/upgrade/_index.md
deprecated-config.yaml
anddeleted-config.yaml
files respectively in thetools/deprecated-config-checker
directory. Example PR