Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -303,14 +303,24 @@ public ProcessContinuation run(
throw e;
}

LOG.debug("[{}] change stream completed successfully", token);
if (tracker.tryClaim(endTimestamp)) {
LOG.debug(
"[{}] change stream completed successfully up to {}", token, changeStreamQueryEndTimestamp);
if (!tracker.tryClaim(changeStreamQueryEndTimestamp)) {
Copy link

@tianz101 tianz101 Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sam, "An unbounded end timestamp is no longer allowed for the change stream query rpc. However if we reach the end timestamp we should be careful to only advance the tracker to this timestamp and not the possibly unbounded end timestamp of the range." Just a caution to confirm with you:

  1. The dataflow pipeline can still be configured with bounded endTs and unbounded endTs
  2. But no matter how the pipeline endTs is configured, here dataflow always break down to use endTs=now()+2m to query spanner change stream.
  3. We should only claim now()+2m if the query is successful.

return ProcessContinuation.stop();
}

if (changeStreamQueryEndTimestamp.equals(endTimestamp)) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should here be >= or just = or no difference?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems not correct if we have partition terminate case. changeStreamQueryEndTimestamp will not be equal to endTimestamp for terminated partition, so the partition will not be marked finished.

LOG.debug("[{}] Finishing partition", token);
// TODO: This should be performed after the commit succeeds. Since bundle finalizers are not
// guaranteed to be called, this needs to be performed in a subsequent fused stage.
partitionMetadataDao.updateToFinished(token);
metrics.decActivePartitionReadCounter();
LOG.info("[{}] After attempting to finish the partition", token);
return ProcessContinuation.stop();
}
return ProcessContinuation.stop();

LOG.info("[{}] Rescheduling partition where query completed due to not being finished", token);
return ProcessContinuation.resume();
}

private BundleFinalizer.Callback updateWatermarkCallback(
Expand Down Expand Up @@ -339,8 +349,8 @@ private boolean isTimestampOutOfRange(SpannerException e) {
}

// Return (now + 2 mins) as the end timestamp for reading change streams. This is only used if
// users want to run the connector forever. This approach works because Google Dataflow
// checkpoints every 5s or 5MB output provided and the change stream query has deadline for 1 min.
// users want to run the connector forever. If the end timestamp is reached, we will resume
// processing from that timestamp on a subsequent DoFn execution.
private Timestamp getNextReadChangeStreamEndTimestamp() {
final Timestamp current = Timestamp.now();
return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 2 * 60, current.getNanos());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
*/
package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;

import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.SCHEDULED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -55,6 +57,7 @@
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;

public class QueryChangeStreamActionTest {
private static final String PARTITION_TOKEN = "partitionToken";
Expand Down Expand Up @@ -613,6 +616,53 @@ public void testQueryChangeStreamWithStreamFinished() {
assertEquals(ProcessContinuation.stop(), result);
verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP);
verify(partitionMetadataDao).updateToFinished(PARTITION_TOKEN);
verify(metrics).decActivePartitionReadCounter();

verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any());
verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any());
verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any());
verify(partitionStartRecordAction, never()).run(any(), any(), any(), any(), any());
verify(partitionEndRecordAction, never()).run(any(), any(), any(), any(), any());
verify(partitionEventRecordAction, never()).run(any(), any(), any(), any(), any());
}

@Test
public void testQueryChangeStreamFinishedWithResume() {
partition =
PartitionMetadata.newBuilder()
.setPartitionToken(PARTITION_TOKEN)
.setParentTokens(Sets.newHashSet("parentToken"))
.setStartTimestamp(PARTITION_START_TIMESTAMP)
.setEndTimestamp(MAX_INCLUSIVE_END_AT)
.setHeartbeatMillis(PARTITION_HEARTBEAT_MILLIS)
.setState(SCHEDULED)
.setWatermark(WATERMARK_TIMESTAMP)
.setScheduledAt(Timestamp.now())
.build();
when(partitionMetadataMapper.from(any())).thenReturn(partition);

final ChangeStreamResultSet changeStreamResultSet = mock(ChangeStreamResultSet.class);
final ArgumentCaptor<Timestamp> timestampCaptor = ArgumentCaptor.forClass(Timestamp.class);
when(changeStreamDao.changeStreamQuery(
eq(PARTITION_TOKEN),
eq(PARTITION_START_TIMESTAMP),
timestampCaptor.capture(),
eq(PARTITION_HEARTBEAT_MILLIS)))
.thenReturn(changeStreamResultSet);
when(changeStreamResultSet.next()).thenReturn(false);
when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);

final ProcessContinuation result =
action.run(
partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer);
assertEquals(ProcessContinuation.resume(), result);
assertNotEquals(MAX_INCLUSIVE_END_AT, timestampCaptor.getValue());

verify(restrictionTracker).tryClaim(timestampCaptor.getValue());
verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP);
verify(partitionMetadataDao, never()).updateToFinished(PARTITION_TOKEN);
verify(metrics, never()).decActivePartitionReadCounter();

verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any());
verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any());
Expand Down
Loading