Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ public class QueryChangeStreamAction {
* @return a {@link ProcessContinuation#stop()} if a record timestamp could not be claimed or if
* the partition processing has finished
*/
@SuppressWarnings("nullness")
@VisibleForTesting
public ProcessContinuation run(
PartitionMetadata partition,
Expand All @@ -179,10 +178,9 @@ public ProcessContinuation run(
final String token = partition.getPartitionToken();
final Timestamp startTimestamp = tracker.currentRestriction().getFrom();
final Timestamp endTimestamp = partition.getEndTimestamp();
final boolean readToEndTimestamp = !endTimestamp.equals(MAX_INCLUSIVE_END_AT);
final Timestamp changeStreamQueryEndTimestamp =
endTimestamp.equals(MAX_INCLUSIVE_END_AT)
? getNextReadChangeStreamEndTimestamp()
: endTimestamp;
readToEndTimestamp ? endTimestamp : getNextReadChangeStreamEndTimestamp();

// TODO: Potentially we can avoid this fetch, by enriching the runningAt timestamp when the
// ReadChangeStreamPartitionDoFn#processElement is called
Expand All @@ -198,6 +196,7 @@ public ProcessContinuation run(
RestrictionInterrupter<Timestamp> interrupter =
RestrictionInterrupter.withSoftTimeout(RESTRICTION_TRACKER_TIMEOUT);

boolean stopAfterQuerySucceeds = readToEndTimestamp;
try (ChangeStreamResultSet resultSet =
changeStreamDao.changeStreamQuery(
token, startTimestamp, changeStreamQueryEndTimestamp, partition.getHeartbeatMillis())) {
Expand Down Expand Up @@ -250,6 +249,9 @@ public ProcessContinuation run(
tracker,
interrupter,
watermarkEstimator);
// The PartitionEndRecord indicates that there are no more records expected
// for this partition.
stopAfterQuerySucceeds = true;
} else if (record instanceof PartitionEventRecord) {
maybeContinuation =
partitionEventRecordAction.run(
Expand All @@ -272,27 +274,23 @@ public ProcessContinuation run(
}
}
}
bundleFinalizer.afterBundleCommit(
Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT),
updateWatermarkCallback(token, watermarkEstimator));

} catch (SpannerException e) {
/*
If there is a split when a partition is supposed to be finished, the residual will try
to perform a change stream query for an out of range interval. We ignore this error
here, and the residual should be able to claim the end of the timestamp range, finishing
the partition.
*/
if (isTimestampOutOfRange(e)) {
LOG.info(
"[{}] query change stream is out of range for {} to {}, finishing stream.",
token,
startTimestamp,
endTimestamp,
e);
} else {
if (!isTimestampOutOfRange(e)) {
throw e;
}
LOG.info(
"[{}] query change stream is out of range for {} to {}, finishing stream.",
token,
startTimestamp,
endTimestamp,
e);
stopAfterQuerySucceeds = true;
} catch (Exception e) {
LOG.error(
"[{}] query change stream had exception processing range {} to {}.",
Expand All @@ -303,13 +301,28 @@ public ProcessContinuation run(
throw e;
}

LOG.debug("[{}] change stream completed successfully", token);
if (tracker.tryClaim(endTimestamp)) {
LOG.debug("[{}] Finishing partition", token);
partitionMetadataDao.updateToFinished(token);
metrics.decActivePartitionReadCounter();
LOG.info("[{}] After attempting to finish the partition", token);
LOG.debug(
"[{}] change stream completed successfully up to {}", token, changeStreamQueryEndTimestamp);
Timestamp claimTimestamp =
stopAfterQuerySucceeds ? endTimestamp : changeStreamQueryEndTimestamp;
if (!tracker.tryClaim(claimTimestamp)) {
return ProcessContinuation.stop();
}
bundleFinalizer.afterBundleCommit(
Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT),
updateWatermarkCallback(token, watermarkEstimator));

if (!stopAfterQuerySucceeds) {
LOG.debug("[{}] Rescheduling partition to resume reading", token);
return ProcessContinuation.resume();
}

LOG.debug("[{}] Finishing partition", token);
// TODO: This should be performed after the commit succeeds. Since bundle finalizers are not
// guaranteed to be called, this needs to be performed in a subsequent fused stage.
partitionMetadataDao.updateToFinished(token);
metrics.decActivePartitionReadCounter();
LOG.info("[{}] After attempting to finish the partition", token);
return ProcessContinuation.stop();
}

Expand Down Expand Up @@ -339,8 +352,8 @@ private boolean isTimestampOutOfRange(SpannerException e) {
}

// Return (now + 2 mins) as the end timestamp for reading change streams. This is only used if
// users want to run the connector forever. This approach works because Google Dataflow
// checkpoints every 5s or 5MB output provided and the change stream query has deadline for 1 min.
// users want to run the connector forever. If the end timestamp is reached, we will resume
// processing from that timestamp on a subsequent DoFn execution.
private Timestamp getNextReadChangeStreamEndTimestamp() {
final Timestamp current = Timestamp.now();
return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 2 * 60, current.getNanos());
Expand Down
Loading
Loading