-
Notifications
You must be signed in to change notification settings - Fork 215
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Track last scan time from before scan starts instead of based on last Modified of objects #4493
Conversation
…Modified of objects Signed-off-by: Taylor Gray <[email protected]>
@@ -260,10 +261,13 @@ void getNextPartition_supplier_with_scheduling_options_returns_expected_Partitio | |||
assertThat(globalStateMap.containsKey(SCAN_COUNT), equalTo(true)); | |||
assertThat(globalStateMap.get(SCAN_COUNT), equalTo(1)); | |||
assertThat(globalStateMap.containsKey(firstBucket), equalTo(true)); | |||
assertThat(globalStateMap.get(firstBucket), equalTo(mostRecentFirstScan.toString())); | |||
assertThat(Instant.parse((CharSequence) globalStateMap.get(firstBucket)).toEpochMilli(), lessThanOrEqualTo(mostRecentFirstScan.toEpochMilli())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java's Instant
is Comparable
, so you should be able to avoid the .toEpochMilli()
to make this somewhat simpler.
final List<PartitionIdentifier> allPartitionIdentifiers = new ArrayList<>(); | ||
ListObjectsV2Response listObjectsV2Response = null; | ||
do { | ||
listObjectsV2Response = s3Client.listObjectsV2(listObjectsV2Request.fetchOwner(true).continuationToken(Objects.nonNull(listObjectsV2Response) ? listObjectsV2Response.nextContinuationToken() : null).build()); | ||
allPartitionIdentifiers.addAll(listObjectsV2Response.contents().stream() | ||
.filter(s3Object -> isLastModifiedTimeAfterMostRecentScanForBucket(bucket, s3Object, globalStateMap)) | ||
.filter(s3Object -> isLastModifiedTimeAfterMostRecentScanForBucket(previousScanTime, s3Object)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also filter out items with a Last-Modified
which is greater than updatedScanTime
?
As I understand, you will start this scan at time T. You may read an object from time T+1. Then, when you scan again at T+2, you are still using T to filter. Thus, you may re-read the T+1 item.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aren't the items with Last-modified greater than updatedScanTime ones we also want to find?
Re-reading is not an issue, and just means there will be an extra get call to the source coordination store. Before this PR, we were basing this filter on the objects we read in the scan, but there can be a race condition where we read some objects with a timestamp that is earlier than another that is missed in the scan due to a race condition. This implementation guarantees we won't hit this, at the cost of doing some extra lookups to the coordination store.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Because we store the exact key in the coordination store, we won't re-read the actual S3 objects, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that's right. The main purpose for tracking the last modified here and filtering out is to just reduce lookups to the coordination store
Signed-off-by: Taylor Gray <[email protected]>
Description
This change makes it so that the filtering done between scans for last modified objects, which is used to reduce lookups to the coordination store, is tracked based on the time before the listObjects call to scan for each bucket is made. This will make it impossible to miss items that are uploaded in the middle or end of a scan
Issues Resolved
Previously attempted to fix with #4124
Original issue: #4123
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.