-
Notifications
You must be signed in to change notification settings - Fork 213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MAINT: add bytes metrics into opensearch source #3646
MAINT: add bytes metrics into opensearch source #3646
Conversation
Signed-off-by: George Chen <[email protected]>
@@ -191,11 +195,14 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op | |||
|
|||
searchWithSearchAfterResults.getDocuments().stream().map(Record::new).forEach(record -> { | |||
try { | |||
final long documentBytes = objectMapper.writeValueAsBytes(record.getData().getJsonNode()).length; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Writing this to bytes is going to add a performance hit.
Also, there is a possibility that this will be somewhat different than the input since we are looking at the Event
here rather than the actual JSON document.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The JsonNode in the event is essentially SearchResults/hit/source JsonNode. That document unit is uniform across bytesReceived and bytesProcessed so that user can make a comparison. I am open to other alternative unit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that we set it to the document _source
here:
Line 296 in b80b565
.withData(hit.source()) |
That may or may not be the best metric, but I think it can work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not have a better solution for performance hit yet since the SDK client returns ObjectNode...
@@ -381,7 +419,9 @@ void run_with_getNextPartition_with_valid_existing_point_in_time_does_not_create | |||
verify(sourceCoordinator, times(0)).saveProgressStateForPartition(eq(partitionKey), eq(openSearchIndexProgressState)); | |||
verify(sourceCoordinator, times(0)).updatePartitionForAcknowledgmentWait(anyString(), any(Duration.class)); | |||
|
|||
verify(bytesReceivedSummary, times(3)).record(0L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't verify the behavior sufficiently. I think we should mock ObjectMapper
to return a byte[]
of sizes that we can check against.
e.g.
int expectedDataSize1 = 10;
int expectedDataSize2 = 20;
...
when(objectMapper.writeValueAsBytes(testData1)).thenReturn(expectedDataSize1);
...
verify(bytesReceivedSummary).record(expectedDataSize1);
verify(bytesReceivedSummary).record(expectedDataSize2);
...
Signed-off-by: George Chen <[email protected]>
Description
This PR adds bytesReceived and bytesProcessed metrics into opensearch source.
Issues Resolved
Resolves #[Issue number to be closed when this PR is merged]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.