diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java index 3176abd9f247..c8aeb85f1fef 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java @@ -178,11 +178,6 @@ public ProcessContinuation run( BundleFinalizer bundleFinalizer) { final String token = partition.getPartitionToken(); final Timestamp startTimestamp = tracker.currentRestriction().getFrom(); - final Timestamp endTimestamp = partition.getEndTimestamp(); - final Timestamp changeStreamQueryEndTimestamp = - endTimestamp.equals(MAX_INCLUSIVE_END_AT) - ? getNextReadChangeStreamEndTimestamp() - : endTimestamp; // TODO: Potentially we can avoid this fetch, by enriching the runningAt timestamp when the // ReadChangeStreamPartitionDoFn#processElement is called @@ -197,6 +192,12 @@ public ProcessContinuation run( // Interrupter with soft timeout to commit the work if any records have been processed. RestrictionInterrupter interrupter = RestrictionInterrupter.withSoftTimeout(RESTRICTION_TRACKER_TIMEOUT); + + final Timestamp endTimestamp = partition.getEndTimestamp(); + final Timestamp changeStreamQueryEndTimestamp = + endTimestamp.equals(MAX_INCLUSIVE_END_AT) + ? getNextReadChangeStreamEndTimestamp() + : endTimestamp; try (ChangeStreamResultSet resultSet = changeStreamDao.changeStreamQuery(