From 462269db5111b947a2ae38ffce11107661fe2e9f Mon Sep 17 00:00:00 2001 From: Jiang Zhu Date: Fri, 9 Jan 2026 20:42:50 +0000 Subject: [PATCH 1/2] Call setWaitForMinSessionsDuration in creating SpannerOptions. --- .../beam/sdk/io/gcp/spanner/SpannerAccessor.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java index 96ce735cad4a..9acd859705fe 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java @@ -64,6 +64,9 @@ public class SpannerAccessor implements AutoCloseable { /** Instance ID to use when connecting to an experimental host. */ public static final String EXPERIMENTAL_HOST_INSTANCE_ID = "default"; + /** Duration to wait for minimum sessions to be available. */ + private static final Duration WAIT_FOR_MIN_SESSIONS = Duration.ofMinutes(10); + // Only create one SpannerAccessor for each different SpannerConfig. private static final ConcurrentHashMap spannerAccessors = new ConcurrentHashMap<>(); @@ -111,7 +114,12 @@ public static SpannerAccessor getOrCreate(SpannerConfig spannerConfig) { @VisibleForTesting static SpannerOptions buildSpannerOptions(SpannerConfig spannerConfig) { - SpannerOptions.Builder builder = SpannerOptions.newBuilder(); + SpannerOptions.Builder builder = SpannerOptions.newBuilder() + .setSessionPoolOption( + SessionPoolOptions.newBuilder() + .setWaitForMinSessionsDuration(WAIT_FOR_MIN_SESSIONS) + .build()) + .build(); Set retryableCodes = new HashSet<>(); if (spannerConfig.getRetryableCodes() != null) { From 59f9317a30eef3b4186336b47c5abc5eb8dc50f6 Mon Sep 17 00:00:00 2001 From: Jiang Zhu Date: Sat, 17 Jan 2026 00:37:24 +0000 Subject: [PATCH 2/2] test --- .../beam/sdk/io/gcp/spanner/SpannerAccessor.java | 10 +--------- .../changestreams/action/QueryChangeStreamAction.java | 9 ++++++++- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java index 9acd859705fe..96ce735cad4a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java @@ -64,9 +64,6 @@ public class SpannerAccessor implements AutoCloseable { /** Instance ID to use when connecting to an experimental host. */ public static final String EXPERIMENTAL_HOST_INSTANCE_ID = "default"; - /** Duration to wait for minimum sessions to be available. */ - private static final Duration WAIT_FOR_MIN_SESSIONS = Duration.ofMinutes(10); - // Only create one SpannerAccessor for each different SpannerConfig. private static final ConcurrentHashMap spannerAccessors = new ConcurrentHashMap<>(); @@ -114,12 +111,7 @@ public static SpannerAccessor getOrCreate(SpannerConfig spannerConfig) { @VisibleForTesting static SpannerOptions buildSpannerOptions(SpannerConfig spannerConfig) { - SpannerOptions.Builder builder = SpannerOptions.newBuilder() - .setSessionPoolOption( - SessionPoolOptions.newBuilder() - .setWaitForMinSessionsDuration(WAIT_FOR_MIN_SESSIONS) - .build()) - .build(); + SpannerOptions.Builder builder = SpannerOptions.newBuilder(); Set retryableCodes = new HashSet<>(); if (spannerConfig.getRetryableCodes() != null) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java index 3176abd9f247..c5fbd5723d16 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT; +import com.google.api.client.util.Sleeper; import com.google.cloud.Timestamp; import com.google.cloud.spanner.ErrorCode; import com.google.cloud.spanner.SpannerException; @@ -194,6 +195,12 @@ public ProcessContinuation run( new IllegalStateException( "Partition " + token + " not found in metadata table")); + try { + Sleeper.DEFAULT.sleep(120 * 1000L); // Artificial delay to simulate long processing + } catch (InterruptedException e) { + throw new RuntimeException("Sleep interrupted", e); + } + // Interrupter with soft timeout to commit the work if any records have been processed. RestrictionInterrupter interrupter = RestrictionInterrupter.withSoftTimeout(RESTRICTION_TRACKER_TIMEOUT); @@ -343,6 +350,6 @@ private boolean isTimestampOutOfRange(SpannerException e) { // checkpoints every 5s or 5MB output provided and the change stream query has deadline for 1 min. private Timestamp getNextReadChangeStreamEndTimestamp() { final Timestamp current = Timestamp.now(); - return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 2 * 60, current.getNanos()); + return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 90, current.getNanos()); } }