-
Notifications
You must be signed in to change notification settings - Fork 211
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Kafka consumer record offset to event metadata #5331
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this contribution @jcrean !
I have one comment about the tests.
Also, the DCO check is failing. Please correct your commit to meet the requirement. Your email addresses are different. Probably the easiest fix is to modify your commit with an additional sign-off using the expected email address.
Commit sha: [8b524c7](https://github.com/opensearch-project/data-prepper/pull/5331/commits/8b524c72182d705dc2049dfc4ae19c21cde4565e), Author: Josh Crean, Committer: Josh Crean; Expected "Josh Crean [[email protected]](mailto:[email protected])", but got "Josh Crean [[email protected]](mailto:[email protected])".
https://github.com/opensearch-project/data-prepper/pull/5331/checks?check_run_id=35608605368
try { | ||
consumer.onPartitionsAssigned(List.of(new TopicPartition(topic, testPartition))); | ||
consumer.consumeRecords(); | ||
} catch (Exception e){} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you expecting these to fail? If so, you can wrap these with assertThrows
.
e.g.
assertThrows(Exception.class, () -> consumer.consumeRecords());
If you are not expecting this, why suppress it? Should the test fail? I tend to think to let the exception bubble up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, not expecting this to throw. I agree with you, it would be better to let the exception bubble out. I was just basing this off of the other tests in that file, which all swallow exceptions from consumeRecords()
. I assumed that there was some reason for that, so I followed suit. Since that method throws Exception
, I've carried that forward on my test, so exceptions will no longer be swallowed.
Signed-off-by: Jan Høydahl <[email protected]> Signed-off-by: Josh Crean <[email protected]>
…from Kafka. Signed-off-by: Josh Crean <[email protected]> Signed-off-by: Josh Crean <[email protected]>
Signed-off-by: Josh Crean <[email protected]> Signed-off-by: Josh Crean <[email protected]>
be6b750
to
4e06b8d
Compare
Syncing with upstream changes Signed-off-by: Josh Crean <[email protected]>
Description
This change adds an additional field named
kafka_offset
to the event metadata attributes when using the Kafka plugin. This provides the specific offset within the partition from which the record was consumed.Issues Resolved
Resolves #5164
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.