diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java index 3176abd9f247..c5fbd5723d16 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT; +import com.google.api.client.util.Sleeper; import com.google.cloud.Timestamp; import com.google.cloud.spanner.ErrorCode; import com.google.cloud.spanner.SpannerException; @@ -194,6 +195,12 @@ public ProcessContinuation run( new IllegalStateException( "Partition " + token + " not found in metadata table")); + try { + Sleeper.DEFAULT.sleep(120 * 1000L); // Artificial delay to simulate long processing + } catch (InterruptedException e) { + throw new RuntimeException("Sleep interrupted", e); + } + // Interrupter with soft timeout to commit the work if any records have been processed. RestrictionInterrupter interrupter = RestrictionInterrupter.withSoftTimeout(RESTRICTION_TRACKER_TIMEOUT); @@ -343,6 +350,6 @@ private boolean isTimestampOutOfRange(SpannerException e) { // checkpoints every 5s or 5MB output provided and the change stream query has deadline for 1 min. private Timestamp getNextReadChangeStreamEndTimestamp() { final Timestamp current = Timestamp.now(); - return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 2 * 60, current.getNanos()); + return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 90, current.getNanos()); } }