From 5881074dd2312135e4529406f69871c01ba260c6 Mon Sep 17 00:00:00 2001 From: Yunfeng Zhou Date: Tue, 9 Jun 2026 16:17:37 +0800 Subject: [PATCH 1/3] [flink] Support stream read Chain Table --- docs/docs/primary-key-table/chain-table.md | 47 +- .../paimon/table/ChainGroupReadTable.java | 7 +- .../table/ChainTableFileStoreTable.java | 133 ++ .../paimon/table/ChainTableStreamScan.java | 391 +++++ .../paimon/table/FileStoreTableFactory.java | 2 +- .../source/ContinuousFileSplitEnumerator.java | 28 + .../paimon/flink/FlinkChainTableITCase.java | 1319 +++++++++++++++++ 7 files changed, 1923 insertions(+), 4 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/ChainTableFileStoreTable.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java diff --git a/docs/docs/primary-key-table/chain-table.md b/docs/docs/primary-key-table/chain-table.md index a861fb12e6bd..01a6ae1648bf 100644 --- a/docs/docs/primary-key-table/chain-table.md +++ b/docs/docs/primary-key-table/chain-table.md @@ -129,7 +129,6 @@ ALTER TABLE `default`.`t$branch_delta` SET ( Notice that: - Chain table is only supported for primary key table, which means you should define `bucket` and `bucket-key` for the table. - Chain table should ensure that the schema of each branch is consistent. -- Both Spark and Flink batch read/write are supported. Flink streaming read/write is not supported. - Chain compact is not supported for now, and it will be supported later. - Deletion vector is not supported for chain table. @@ -191,6 +190,52 @@ you will get the following result: +---+----+-----+ ``` +## Streaming Read + +Chain tables support Flink streaming read. A streaming read job operates in two phases: + +1. **Full load phase**: Produces a full result by reading the latest snapshot partition (per group) + and delta partitions that come after it. For each partition group, only the most recent snapshot + partition is included — older snapshot partitions are considered outdated and excluded. +2. **Incremental phase**: Continuously reads new commits from the delta branch as they arrive. + +### Write-Side Requirements + +Streaming read assumes the chain table follows the standard write pattern described at the top of +this page: + +- **Snapshot branch** receives periodic full data (e.g., a daily ODS binlog dump job writes via + `INSERT OVERWRITE t$branch_snapshot`). Each snapshot partition represents a complete view of the + data at that point in time. +- **Delta branch** receives incremental changes between snapshots (e.g., a batch job writes the + current day's new/updated records via `INSERT INTO t$branch_delta`). Each delta partition + contains only the changes for that period. The delta branch must have a + [changelog producer](./changelog-producer) configured (e.g., `'changelog-producer' = 'input'`) + for streaming read to work. + +The streaming read relies on this pattern to produce correct results. After the full load phase, +only new delta branch commits are picked up — writes to the snapshot branch do not trigger +streaming output. To incorporate a new snapshot, restart the streaming job. + +### Usage + +```sql +SET 'execution.runtime-mode' = 'streaming'; + +INSERT INTO downstream_sink SELECT * FROM default.t; +``` + +### Limitations + +- The incremental phase only monitors the **delta branch**. Writes to the snapshot branch are + not detected until the streaming job is restarted. +- The chain-table-aware streaming scan only supports the default startup mode (`latest-full`). + When the user specifies an explicit starting position — such as `scan.snapshot-id`, + `scan.timestamp-millis`, `scan.mode = 'latest'`, or `consumer-id` with existing progress — + an `UnsupportedOperationException` is thrown. To use standard streaming read without chain + table logic, read from a specific branch table (e.g., `t$branch_delta`) instead of the main + table. + ## Group Partition In real-world scenarios, a table often has multiple partition dimensions. For example, data may be diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java index 75ba5f4097f3..0237828e6a58 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java @@ -520,8 +520,11 @@ public TableRead withIOManager(IOManager ioManager) { @Override public RecordReader createReader(Split split) throws IOException { - checkArgument(split instanceof ChainSplit); - return fallbackRead.createReader(split); + if (split instanceof ChainSplit || split instanceof DataSplit) { + return fallbackRead.createReader(split); + } + throw new IllegalArgumentException( + "Unsupported split type for chain table read: " + split.getClass().getName()); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ChainTableFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ChainTableFileStoreTable.java new file mode 100644 index 000000000000..350247949faf --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/ChainTableFileStoreTable.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.CoreOptions.StartupMode; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.source.StreamDataTableScan; + +import java.util.Map; + +/** + * Chain-table-aware extension of {@link FallbackReadFileStoreTable}. Inherits the batch read + * behavior (partition-level fallback between the current branch and {@link ChainGroupReadTable}), + * and additionally overrides {@link #newStreamScan()} to return a chain-aware {@link + * ChainTableStreamScan} that performs a partition-level full load followed by incremental + * delta-only streaming. + */ +public class ChainTableFileStoreTable extends FallbackReadFileStoreTable { + + public ChainTableFileStoreTable(FileStoreTable wrapped, FileStoreTable other) { + super(wrapped, other, true); + } + + @Override + public StreamDataTableScan newStreamScan() { + CoreOptions coreOptions = wrapped.coreOptions(); + + StartupMode effectiveMode = coreOptions.startupMode(); + boolean hasConsumerProgress = + coreOptions.consumerId() != null && !coreOptions.consumerIgnoreProgress(); + if (effectiveMode != StartupMode.LATEST_FULL || hasConsumerProgress) { + String reason = + describeUnsupportedMode(coreOptions, effectiveMode, hasConsumerProgress); + throw new UnsupportedOperationException( + "Chain table streaming read does not support startup mode '" + + reason + + "'. " + + "Chain table streaming only supports the default 'latest-full' mode, which first " + + "produces a partition-level full result and then continuously reads incremental " + + "data from the delta branch.\n" + + "Suggestions:\n" + + " - To use chain table streaming: remove the explicit scan mode/position settings " + + "so that the default 'latest-full' mode is used.\n" + + " - To use standard streaming read without chain table logic: read from a " + + "specific branch table (e.g., 't$branch_delta') instead of the main table."); + } + + // Inherited other() returns the ChainGroupReadTable directly. + ChainGroupReadTable chainGroupReadTable = (ChainGroupReadTable) other(); + + return new ChainTableStreamScan(chainGroupReadTable); + } + + private static String describeUnsupportedMode( + CoreOptions coreOptions, StartupMode effectiveMode, boolean hasConsumerProgress) { + if (hasConsumerProgress) { + return "consumer-id with existing progress"; + } + switch (effectiveMode) { + case LATEST: + return "scan.mode=latest"; + case FROM_SNAPSHOT: + if (coreOptions.scanSnapshotId() != null) { + return "scan.snapshot-id=" + coreOptions.scanSnapshotId(); + } + if (coreOptions.scanTagName() != null) { + return "scan.tag-name=" + coreOptions.scanTagName(); + } + if (coreOptions.scanWatermark() != null) { + return "scan.watermark=" + coreOptions.scanWatermark(); + } + return "from-snapshot"; + case FROM_TIMESTAMP: + if (coreOptions.scanTimestampMills() != null) { + return "scan.timestamp-millis=" + coreOptions.scanTimestampMills(); + } + if (coreOptions.scanTimestamp() != null) { + return "scan.timestamp=" + coreOptions.scanTimestamp(); + } + return "from-timestamp"; + default: + return effectiveMode.name().toLowerCase().replace('_', '-'); + } + } + + @Override + public FileStoreTable copy(Map dynamicOptions) { + return new ChainTableFileStoreTable( + wrapped.copy(dynamicOptions), other().copy(rewriteOtherOptions(dynamicOptions))); + } + + @Override + public FileStoreTable copy(TableSchema newTableSchema) { + return new ChainTableFileStoreTable( + wrapped.copy(newTableSchema), + other().copy(newTableSchema.copy(rewriteOtherOptions(newTableSchema.options())))); + } + + @Override + public FileStoreTable copyWithoutTimeTravel(Map dynamicOptions) { + return new ChainTableFileStoreTable( + wrapped.copyWithoutTimeTravel(dynamicOptions), + other().copyWithoutTimeTravel(rewriteOtherOptions(dynamicOptions))); + } + + @Override + public FileStoreTable copyWithLatestSchema() { + return new ChainTableFileStoreTable( + wrapped.copyWithLatestSchema(), other().copyWithLatestSchema()); + } + + @Override + public FileStoreTable switchToBranch(String branchName) { + return new ChainTableFileStoreTable(switchWrappedToBranch(branchName), other()); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java b/paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java new file mode 100644 index 000000000000..5f4ae499a55a --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.codegen.CodeGenUtils; +import org.apache.paimon.codegen.RecordComparator; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.PartitionEntry; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.table.source.ChainSplit; +import org.apache.paimon.table.source.DataFilePlan; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.DataTableScan; +import org.apache.paimon.table.source.DataTableStreamScan; +import org.apache.paimon.table.source.InnerTableScan; +import org.apache.paimon.table.source.SnapshotNotExistPlan; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.StreamDataTableScan; +import org.apache.paimon.table.source.TableScan; +import org.apache.paimon.table.source.snapshot.StartingContext; +import org.apache.paimon.utils.ChainPartitionProjector; +import org.apache.paimon.utils.ChainTableUtils; +import org.apache.paimon.utils.SnapshotManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * Streaming scan for chain tables with a two-phase design: + * + *
    + *
  • Phase 1 (Starting): Outputs the latest snapshot partition (per group) and delta + * partitions that come after it. Older snapshot partitions are excluded as they are + * considered outdated. Each primary key appears exactly once under its natural partition. + * Unlike batch full scan, anchor-based chain merging is intentionally skipped to keep Phase 1 + * lightweight — this avoids split explosion in long-running jobs with many partitions. + *
  • Phase 2 (Incremental): Stream new snapshots from the delta branch only, picking up + * from where Phase 1 left off. + *
+ * + *

Checkpoint state is a single {@code Long} — the delta branch's next snapshot id. On stateful + * restart, Phase 1 is skipped and incremental streaming resumes from the checkpointed position. On + * stateless restart (null state), a fresh starting scan is performed. + */ +public class ChainTableStreamScan implements StreamDataTableScan { + + private static final Logger LOG = LoggerFactory.getLogger(ChainTableStreamScan.class); + + private final ChainGroupReadTable chainGroupReadTable; + + /** Phase 1: batch scan used to access snapshot branch data via {@code mainScan}. */ + private final ChainGroupReadTable.ChainTableBatchScan batchScan; + + /** Phase 2: delta-only stream scan. */ + private final DataTableStreamScan deltaStreamScan; + + /** Projector for splitting full partition into group and chain parts. */ + private final ChainPartitionProjector partitionProjector; + + /** Comparator for chain partition keys only. */ + private final RecordComparator chainPartitionComparator; + + /** + * Checkpoint state: the next delta snapshot id to read. Null before Phase 1 completes; non-null + * once Phase 1 is done or after a stateful restore. + */ + @Nullable private Long nextDeltaSnapshotId; + + /** Whether the starting plan (Phase 1) has been completed. */ + private boolean startingDone = false; + + /** Predicates and shard for applying to local scans created in {@link #planStarting()}. */ + private final List predicates = new ArrayList<>(); + + private int shardIndex = -1; + + private int shardCount = -1; + + public ChainTableStreamScan(ChainGroupReadTable chainGroupReadTable) { + this.chainGroupReadTable = chainGroupReadTable; + this.batchScan = + new ChainGroupReadTable.ChainTableBatchScan( + chainGroupReadTable.schema(), chainGroupReadTable); + this.deltaStreamScan = (DataTableStreamScan) chainGroupReadTable.other().newStreamScan(); + + // Initialize partition projector and chain comparator using the established pattern + // from ChainTableBatchScan. + List chainKeys = + ChainTableUtils.chainPartitionKeys( + chainGroupReadTable.coreOptions(), + chainGroupReadTable.schema().partitionKeys()); + this.partitionProjector = + new ChainPartitionProjector( + chainGroupReadTable.schema().logicalPartitionType(), chainKeys.size()); + this.chainPartitionComparator = + CodeGenUtils.newRecordComparator( + partitionProjector.chainPartitionType().getFieldTypes()); + } + + @Override + public StartingContext startingContext() { + if (!startingDone) { + return StartingContext.EMPTY; + } + return deltaStreamScan.startingContext(); + } + + @Override + public TableScan.Plan plan() { + if (!startingDone) { + return planStarting(); + } + TableScan.Plan plan = deltaStreamScan.plan(); + // Never return SnapshotNotExistPlan — it would cause the Flink enumerator to + // set stopTriggerScan=true and permanently stop polling for new data. + if (plan instanceof SnapshotNotExistPlan) { + return new DataFilePlan<>(Collections.emptyList()); + } + return plan; + } + + /** + * Starting plan: outputs the latest snapshot partition (per group) and delta partitions that + * come after it. Older snapshot partitions are excluded. Each primary key appears exactly once + * under its natural partition. + * + *

Unlike batch full scan, anchor-based chain merging is not performed. This keeps Phase 1 + * lightweight for long-running jobs. + */ + private TableScan.Plan planStarting() { + FileStoreTable deltaTable = chainGroupReadTable.other(); + String deltaBranch = deltaTable.coreOptions().branch(); + String snapshotBranch = chainGroupReadTable.wrapped.coreOptions().branch(); + + Long latestId = captureDeltaPosition(deltaTable); + + // 1. Read delta branch data at the pinned snapshot, grouped by partition. + Map> deltaSplitsByPartition; + if (latestId != null) { + FileStoreTable pinnedDelta = + deltaTable.copy( + Collections.singletonMap("scan.snapshot-id", String.valueOf(latestId))); + DataTableScan pinnedDeltaScan = pinnedDelta.newScan(); + applyPredicatesAndShard(pinnedDeltaScan); + deltaSplitsByPartition = groupByPartition(pinnedDeltaScan); + } else { + deltaSplitsByPartition = Collections.emptyMap(); + } + + // 2. Read all snapshot branch data, grouped by partition. + // Reuse batchScan.mainScan which has predicates/shard already applied. + Map> snapshotSplitsByPartition = + chainGroupReadTable.wrapped.snapshotManager().latestSnapshotId() != null + ? groupByPartition(batchScan.mainScan) + : Collections.emptyMap(); + + // 3. Find the latest snapshot partition per group (based on chain partition keys). + // Only output the latest snapshot partition and delta partitions after it. + Map latestChainPartitionPerGroup = new HashMap<>(); + for (BinaryRow partition : snapshotSplitsByPartition.keySet()) { + Object groupKey = toGroupKey(partition); + BinaryRow existingLatest = latestChainPartitionPerGroup.get(groupKey); + if (existingLatest == null + || chainPartitionComparator.compare( + partitionProjector.extractChainPartition(partition), + partitionProjector.extractChainPartition(existingLatest)) + > 0) { + latestChainPartitionPerGroup.put(groupKey, partition); + } + } + + // 4. Build ChainSplits: + // - For snapshot partitions: only include if chain key == latest for that group. + // - For delta partitions: include if (a) chain key > latest for that group, or + // (b) no snapshot exists for that group. + List allSplits = new ArrayList<>(); + + for (Map.Entry> entry : snapshotSplitsByPartition.entrySet()) { + BinaryRow partition = entry.getKey(); + Object groupKey = toGroupKey(partition); + BinaryRow latestPartition = latestChainPartitionPerGroup.get(groupKey); + if (chainPartitionComparator.compare( + partitionProjector.extractChainPartition(partition), + partitionProjector.extractChainPartition(latestPartition)) + == 0) { + for (DataSplit ds : entry.getValue()) { + allSplits.add(dataSplitToChainSplit(ds, snapshotBranch)); + } + } + } + + for (Map.Entry> entry : deltaSplitsByPartition.entrySet()) { + BinaryRow partition = entry.getKey(); + Object groupKey = toGroupKey(partition); + BinaryRow latestPartition = latestChainPartitionPerGroup.get(groupKey); + // Include delta partition if: + // - No snapshot exists for this group, OR + // - Chain key > latest snapshot chain key + if (latestPartition == null + || chainPartitionComparator.compare( + partitionProjector.extractChainPartition(partition), + partitionProjector.extractChainPartition(latestPartition)) + > 0) { + for (DataSplit ds : entry.getValue()) { + allSplits.add(dataSplitToChainSplit(ds, deltaBranch)); + } + } + } + + LOG.info( + "ChainTableStreamScan.planStarting [snapshot={}, delta={}]: " + + "{} delta partitions, {} snapshot partitions, " + + "{} latest snapshot groups, {} total splits", + snapshotBranch, + deltaBranch, + deltaSplitsByPartition.size(), + snapshotSplitsByPartition.size(), + latestChainPartitionPerGroup.size(), + allSplits.size()); + + startingDone = true; + return new DataFilePlan<>(allSplits); + } + + /** + * Captures the delta branch's latest snapshot id and positions the Phase 2 stream scan to start + * from the next snapshot. This makes the Phase 1 / Phase 2 boundary deterministic: Phase 1 + * reads delta data pinned at the returned snapshot id, Phase 2 starts from the snapshot after. + * + * @return the latest delta snapshot id, or {@code null} if the delta branch has no snapshots + */ + @Nullable + private Long captureDeltaPosition(FileStoreTable deltaTable) { + SnapshotManager deltaSnapshotManager = deltaTable.snapshotManager(); + Long latestId = deltaSnapshotManager.latestSnapshotId(); + nextDeltaSnapshotId = latestId != null ? latestId + 1 : Snapshot.FIRST_SNAPSHOT_ID; + LOG.info( + "ChainTableStreamScan: pinned delta branch '{}' at snapshot {}, " + + "nextDeltaSnapshotId={}", + deltaTable.coreOptions().branch(), + latestId, + nextDeltaSnapshotId); + deltaStreamScan.restore(nextDeltaSnapshotId); + return latestId; + } + + /** Plans a scan and groups the resulting splits by partition. */ + private static Map> groupByPartition(DataTableScan scan) { + Map> grouped = new LinkedHashMap<>(); + for (Split s : scan.plan().splits()) { + DataSplit ds = (DataSplit) s; + grouped.computeIfAbsent(ds.partition(), k -> new ArrayList<>()).add(ds); + } + return grouped; + } + + /** + * Extracts a stable group key from a full partition row. When there is no group partition (all + * fields are chain keys), returns a shared singleton to avoid zero-field {@link BinaryRow} + * instances that may have inconsistent {@code hashCode}/{@code equals} across different + * partitions. + */ + private Object toGroupKey(BinaryRow fullPartition) { + if (!partitionProjector.hasGroupPartition()) { + return Collections.emptyList(); + } + return partitionProjector.extractGroupPartition(fullPartition); + } + + /** + * Converts a {@link DataSplit} to a {@link ChainSplit} where all files belong to the given + * branch. The partition value is preserved as-is (no rewriting). + */ + private static ChainSplit dataSplitToChainSplit(DataSplit dataSplit, String branch) { + HashMap fileBranchMapping = new HashMap<>(); + HashMap fileBucketPathMapping = new HashMap<>(); + for (DataFileMeta file : dataSplit.dataFiles()) { + fileBranchMapping.put(file.fileName(), branch); + fileBucketPathMapping.put(file.fileName(), dataSplit.bucketPath()); + } + return new ChainSplit( + dataSplit.partition(), + dataSplit.dataFiles(), + fileBranchMapping, + fileBucketPathMapping); + } + + @Override + public InnerTableScan withFilter(Predicate predicate) { + predicates.add(predicate); + batchScan.withFilter(predicate); + deltaStreamScan.withFilter(predicate); + return this; + } + + @Override + public DataTableScan withShard(int indexOfThisSubtask, int numberOfParallelSubtasks) { + shardIndex = indexOfThisSubtask; + shardCount = numberOfParallelSubtasks; + batchScan.withShard(indexOfThisSubtask, numberOfParallelSubtasks); + deltaStreamScan.withShard(indexOfThisSubtask, numberOfParallelSubtasks); + return this; + } + + /** + * Applies all previously set predicates and shard to a newly created scan. Used for the pinned + * delta scan in {@link #planStarting()}. + */ + private void applyPredicatesAndShard(DataTableScan scan) { + for (Predicate p : predicates) { + scan.withFilter(p); + } + if (shardIndex >= 0) { + scan.withShard(shardIndex, shardCount); + } + } + + @Nullable + @Override + public Long checkpoint() { + return nextDeltaSnapshotId; + } + + @Nullable + @Override + public Long watermark() { + if (!startingDone) { + return null; + } + return deltaStreamScan.watermark(); + } + + @Override + public void restore(@Nullable Long nextSnapshotId) { + this.nextDeltaSnapshotId = nextSnapshotId; + if (nextSnapshotId != null) { + startingDone = true; + deltaStreamScan.restore(nextSnapshotId); + } + } + + @Override + public void restore(@Nullable Long nextSnapshotId, boolean scanAllSnapshot) { + if (scanAllSnapshot) { + startingDone = false; + this.nextDeltaSnapshotId = nextSnapshotId; + // No need to call deltaStreamScan.restore() here — Phase 1 will re-run and + // captureDeltaPosition() will re-position the delta stream scan. + } else { + restore(nextSnapshotId); + } + } + + @Override + public void notifyCheckpointComplete(@Nullable Long nextSnapshot) { + deltaStreamScan.notifyCheckpointComplete(nextSnapshot); + } + + @Override + public List listPartitionEntries() { + throw new UnsupportedOperationException( + "List Partition Entries is not supported in Chain Table Stream Scan."); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java index e4a865679910..53be2803ce70 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java @@ -181,7 +181,7 @@ public static FileStoreTable createChainTable( catalogEnvironment); FileStoreTable chainGroupFileStoreTable = new ChainGroupReadTable(snapshotTable, deltaTable); - return new FallbackReadFileStoreTable(table, chainGroupFileStoreTable, true); + return new ChainTableFileStoreTable(table, chainGroupFileStoreTable); } private static FileStoreTable createOtherBranchTable( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java index 77aeeb85081d..6212038072ca 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java @@ -25,6 +25,7 @@ import org.apache.paimon.postpone.PostponeBucketFileStoreWrite; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.sink.ChannelComputer; +import org.apache.paimon.table.source.ChainSplit; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.EndOfScanException; import org.apache.paimon.table.source.IncrementalSplit; @@ -328,6 +329,8 @@ protected synchronized void assignSplits() { protected int assignSuggestedTask(FileStoreSourceSplit split) { if (split.split() instanceof DataSplit) { return assignSuggestedTask((DataSplit) split.split()); + } else if (split.split() instanceof ChainSplit) { + return assignSuggestedTask((ChainSplit) split.split()); } else { return assignSuggestedTask((IncrementalSplit) split.split()); } @@ -364,6 +367,31 @@ protected int assignSuggestedTask(IncrementalSplit split) { } } + protected int assignSuggestedTask(ChainSplit split) { + int parallelism = context.currentParallelism(); + // Extract bucket id from the bucket path stored in fileBucketPathMapping. + // The bucket path ends with "bucket-{id}". + int bucketId = 0; + if (!split.fileBucketPathMapping().isEmpty()) { + String bucketPath = split.fileBucketPathMapping().values().iterator().next(); + int lastSlash = bucketPath.lastIndexOf('/'); + if (lastSlash >= 0) { + String bucketDir = bucketPath.substring(lastSlash + 1); + if (bucketDir.startsWith("bucket-")) { + try { + bucketId = Integer.parseInt(bucketDir.substring("bucket-".length())); + } catch (NumberFormatException ignored) { + } + } + } + } + if (shuffleBucketWithPartition) { + return ChannelComputer.select(split.logicalPartition(), bucketId, parallelism); + } else { + return ChannelComputer.select(bucketId, parallelism); + } + } + protected SplitAssigner createSplitAssigner(boolean unordered) { return unordered ? new FIFOSplitAssigner(Collections.emptyList()) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java index 8ac39d14308a..734e2f5d3d86 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java @@ -18,14 +18,36 @@ package org.apache.paimon.flink; +import org.apache.paimon.flink.sink.FlinkSinkBuilder; +import org.apache.paimon.table.ChainTableStreamScan; +import org.apache.paimon.table.FileStoreTable; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.ExternalizedCheckpointRetention; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableResult; import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; import org.apache.flink.util.CloseableIterator; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import java.lang.reflect.Field; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** IT cases for chain table using Flink SQL. */ public class FlinkChainTableITCase extends CatalogITCaseBase { @@ -688,4 +710,1301 @@ public void testChainTableWithGroupPartition() throws Exception { .containsExactlyInAnyOrder( "+I[2, 2, 1-1, CN, 20250811]", "+I[4, 1, 1, CN, 20250811]"); } + + /** Write Row data (with RowKind) to a specific branch using DataStream API. */ + private void writeChangelogToBranch(String db, String tableName, String branch, Row... rows) + throws Exception { + FileStoreTable table = paimonTable(tableName + "$branch_" + branch); + + StreamExecutionEnvironment env = + streamExecutionEnvironmentBuilder() + .streamingMode() + .checkpointIntervalMs(100) + .parallelism(1) + .build(); + + DataStream stream = env.fromCollection(Arrays.asList(rows)); + + new FlinkSinkBuilder(table) + .forRow( + stream, + DataTypes.ROW( + DataTypes.FIELD("k", DataTypes.BIGINT()), + DataTypes.FIELD("seq", DataTypes.BIGINT()), + DataTypes.FIELD("v", DataTypes.STRING()), + DataTypes.FIELD("dt", DataTypes.STRING()))) + .build(); + env.execute(); + } + + /** + * Collect n rows from a streaming iterator with a timeout. If no data arrives within + * timeoutSeconds, the iterator is closed and an AssertionError is thrown. This is necessary + * because it.next() blocks indefinitely when no data is available, and JUnit @Timeout cannot + * interrupt it. + */ + private List collectRows(CloseableIterator it, int n, int timeoutSeconds) + throws Exception { + List result = new ArrayList<>(); + for (int i = 0; i < n; i++) { + CompletableFuture future = + CompletableFuture.supplyAsync(() -> it.next().toString()); + try { + result.add(future.get(timeoutSeconds, TimeUnit.SECONDS)); + } catch (java.util.concurrent.TimeoutException e) { + future.cancel(true); + it.close(); + throw new AssertionError( + "Streaming read blocked for " + + timeoutSeconds + + "s after collecting " + + result.size() + + "/" + + n + + " rows. Collected so far: " + + result); + } + } + return result; + } + + /** Default collectRows with 30s timeout. */ + private List collectRows(CloseableIterator it, int n) throws Exception { + return collectRows(it, n, 30); + } + + /** + * Tests the streaming read lifecycle for a chain table with changelog-producer=input. + * + *

Verifies: initial full read from delta-only → delta incremental visible with changelog + * records (-U/+U) → snapshot OVERWRITE has no effect → more delta visible → stateless restart + * reads chain-merged state. + */ + @Test + @Timeout(120) + public void testStreamingReadChainTableLifecycleWithInputChangelog() throws Exception { + // Create chain table with changelog-producer=input + sql( + "CREATE TABLE chain_life_cl (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'," + + " 'continuous.discovery-interval' = '1ms'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_life_cl', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_life_cl', 'delta')", db); + for (String tbl : + new String[] { + "chain_life_cl", "chain_life_cl$branch_snapshot", "chain_life_cl$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + // === Phase 1: Delta-only initial data (all inserts) === + sql( + "INSERT INTO `chain_life_cl$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'base_1'), (2, 1, 'base_2'), (3, 1, 'base_3')," + + " (4, 1, 'base_4'), (5, 1, 'base_5')"); + + CloseableIterator it = sEnv.executeSql("SELECT * FROM chain_life_cl").collect(); + + List phase1 = collectRows(it, 5); + assertThat(phase1) + .containsExactlyInAnyOrder( + "+I[1, 1, base_1, 20250808]", + "+I[2, 1, base_2, 20250808]", + "+I[3, 1, base_3, 20250808]", + "+I[4, 1, base_4, 20250808]", + "+I[5, 1, base_5, 20250808]"); + + // === Phase 2: Write changelog data (with -U/+U for update) via DataStream API === + writeChangelogToBranch( + db, + "chain_life_cl", + "delta", + Row.ofKind(RowKind.UPDATE_BEFORE, 3L, 1L, "base_3", "20250809"), + Row.ofKind(RowKind.UPDATE_AFTER, 3L, 2L, "upd_3", "20250809"), + Row.ofKind(RowKind.INSERT, 6L, 1L, "new_6", "20250809"), + Row.ofKind(RowKind.INSERT, 7L, 1L, "new_7", "20250809")); + + Thread.sleep(2000); + List phase2 = collectRows(it, 4); + // changelog-producer=input: explicit -U/+U for updates + assertThat(phase2) + .containsExactlyInAnyOrder( + "-U[3, 1, base_3, 20250809]", + "+U[3, 2, upd_3, 20250809]", + "+I[6, 1, new_6, 20250809]", + "+I[7, 1, new_7, 20250809]"); + + // === Phase 3: Snapshot OVERWRITE should have NO effect === + sql( + "INSERT OVERWRITE `chain_life_cl$branch_snapshot` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'base_1'), (2, 1, 'base_2'), (3, 1, 'base_3')," + + " (4, 1, 'base_4'), (5, 1, 'base_5')"); + + Thread.sleep(2000); + + // Write delta AFTER snapshot — this proves snapshot writes don't trigger output. + // If snapshot writes were detected, we'd see duplicate or unexpected rows. + writeChangelogToBranch( + db, + "chain_life_cl", + "delta", + Row.ofKind(RowKind.INSERT, 100L, 1L, "phase3_probe", "20250810")); + + Thread.sleep(2000); + List phase3 = collectRows(it, 1); + assertThat(phase3) + .as("Only delta write should produce output, snapshot OVERWRITE should be ignored") + .containsExactlyInAnyOrder("+I[100, 1, phase3_probe, 20250810]"); + + // === Phase 4: Write more delta via DataStream API === + writeChangelogToBranch( + db, + "chain_life_cl", + "delta", + Row.ofKind(RowKind.INSERT, 8L, 1L, "new_8", "20250810"), + Row.ofKind(RowKind.INSERT, 9L, 1L, "new_9", "20250810")); + + Thread.sleep(2000); + List phase4 = collectRows(it, 2); + assertThat(phase4) + .containsExactlyInAnyOrder( + "+I[8, 1, new_8, 20250810]", "+I[9, 1, new_9, 20250810]"); + + // Terminate first streaming job + it.close(); + + // === Phase 5: Stateless restart === + CloseableIterator it2 = sEnv.executeSql("SELECT * FROM chain_life_cl").collect(); + + // Phase 5 starting (matching batch semantics): + // - snapshot@20250808: k=1-5 (snapshot wins, delta@20250808 skipped since same partition + // exists in snapshot; same values here since OVERWRITE wrote identical base data) + // - delta@20250809: changelog records (+U for update, +I for inserts) + // - delta@20250810: k=8,9,100 (delta-only, no snapshot for this partition) + // Total: 11 unique rows (PK=(dt,k) makes each (dt,k) pair distinct). + List restart = collectRows(it2, 11); + assertThat(restart) + .containsExactlyInAnyOrder( + "+I[1, 1, base_1, 20250808]", + "+I[2, 1, base_2, 20250808]", + "+I[3, 1, base_3, 20250808]", + "+U[3, 2, upd_3, 20250809]", + "+I[4, 1, base_4, 20250808]", + "+I[5, 1, base_5, 20250808]", + "+I[6, 1, new_6, 20250809]", + "+I[7, 1, new_7, 20250809]", + "+I[8, 1, new_8, 20250810]", + "+I[9, 1, new_9, 20250810]", + "+I[100, 1, phase3_probe, 20250810]"); + + // Continue writing delta + writeChangelogToBranch( + db, + "chain_life_cl", + "delta", + Row.ofKind(RowKind.INSERT, 10L, 1L, "new_10", "20250811"), + Row.ofKind(RowKind.INSERT, 11L, 1L, "new_11", "20250811")); + + Thread.sleep(2000); + List phase5b = collectRows(it2, 2); + assertThat(phase5b) + .containsExactlyInAnyOrder( + "+I[10, 1, new_10, 20250811]", "+I[11, 1, new_11, 20250811]"); + + it2.close(); + } + + /** + * Tests stateful restart of a chain table streaming read job using Flink checkpoint/restore. + * + *

Phase 1: Write initial delta data, start streaming job, verify read. Phase 2: Trigger a + * checkpoint (saves enumerator state including nextDeltaSnapshotId), then cancel the job. Phase + * 3: Write new delta data while the job is down. Phase 4: Restart from checkpoint — the + * restored scan should skip doFullLoad() and only read Phase 3's new data. Phase 5: Verify + * incremental streaming continues to work after restore. + */ + @Test + @Timeout(180) + public void testStreamingReadChainTableStatefulRestart() throws Exception { + // Create chain table (source) + sql( + "CREATE TABLE chain_restart (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'," + + " 'continuous.discovery-interval' = '1ms'" + + ")"); + + // Create a Paimon PK sink table. The Paimon sink supports upsert + // (primary key), so the planner won't need ChangelogNormalize. + // Paimon sink does NOT implement CheckpointedFunction (it uses operator + // state for in-flight files, committed during checkpoint complete), so no + // buffer leakage on checkpoint recovery — unlike CollectSinkFunction. + sql( + "CREATE TABLE chain_restart_sink (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING," + + " PRIMARY KEY (dt, k) NOT ENFORCED" + + ") PARTITIONED BY (dt) WITH (" + + " 'bucket' = '2'," + + " 'merge-engine' = 'deduplicate'," + + " 'sequence.field' = 'seq'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_restart', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_restart', 'delta')", db); + for (String tbl : + new String[] { + "chain_restart", "chain_restart$branch_snapshot", "chain_restart$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + // Configure checkpoint for stateful restart + org.apache.flink.configuration.Configuration config = sEnv.getConfig().getConfiguration(); + config.setString("state.checkpoints.dir", "file://" + path + "/checkpoints"); + config.set( + CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION, + ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION); + config.removeKey("execution.checkpointing.interval"); + + // Same SQL for both phases → operator graph matches → state recovery works + String streamSql = "INSERT INTO chain_restart_sink SELECT * FROM chain_restart"; + + // T4: Write snapshot data BEFORE starting streaming, so the starting phase + // exercises the snapshot+delta merge path (not just delta-only). + sql( + "INSERT INTO `chain_restart$branch_snapshot` PARTITION (dt = '20250807')" + + " VALUES (10, 1, 'snap_10'), (11, 1, 'snap_11')"); + + // === Phase 1: Write initial delta data and start streaming INSERT INTO === + sql( + "INSERT INTO `chain_restart$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'base_1'), (2, 1, 'base_2'), (3, 1, 'base_3')"); + + TableResult tableResult = sEnv.executeSql(streamSql); + //noinspection OptionalGetWithoutIsPresent + JobClient jobClient = tableResult.getJobClient().get(); + + // Wait for data to flow to the sink + Thread.sleep(5000); + + // === Phase 2: Trigger checkpoint and cancel job === + // The checkpoint commits data to the sink table. + Thread.sleep(3000); + String checkpointPath = triggerCheckpoint(jobClient); + + java.io.File cpFile = + new java.io.File(checkpointPath.replace("file:/", "/").replace("file://", "/")); + if (!cpFile.exists()) { + cpFile = cpFile.getParentFile(); + } + assertThat(cpFile.exists()).as("Checkpoint directory should exist: " + cpFile).isTrue(); + + Thread.sleep(2000); + jobClient.cancel().get(); + + // Verify Phase 1 data via batch read from the sink table. + // Starting should include both snapshot-only (dt=20250807) and delta (dt=20250808). + List phase1 = + sql("SELECT * FROM chain_restart_sink").stream() + .map(Row::toString) + .collect(java.util.stream.Collectors.toList()); + System.err.println("[TEST] Phase 1 sink rows: " + phase1); + assertThat(phase1) + .as("Phase 1: starting includes snapshot-only and delta partitions") + .containsExactlyInAnyOrder( + "+I[1, 1, base_1, 20250808]", + "+I[2, 1, base_2, 20250808]", + "+I[3, 1, base_3, 20250808]", + "+I[10, 1, snap_10, 20250807]", + "+I[11, 1, snap_11, 20250807]"); + + // === Phase 3: Write new delta data while job is stopped === + sql( + "INSERT INTO `chain_restart$branch_delta` PARTITION (dt = '20250809')" + + " VALUES (4, 1, 'new_4'), (5, 1, 'new_5')"); + + // === Phase 4: Restart from checkpoint === + sEnv.getConfig() + .getConfiguration() + .setString("execution.state-recovery.path", checkpointPath); + + TableResult tableResult2 = sEnv.executeSql(streamSql); + //noinspection OptionalGetWithoutIsPresent + JobClient jobClient2 = tableResult2.getJobClient().get(); + + // Wait for restored scan to produce and commit data + Thread.sleep(5000); + + // Trigger checkpoint to commit Phase 4 data + Thread.sleep(3000); + triggerCheckpoint(jobClient2); + Thread.sleep(2000); + + // Read all records from the sink table + List phase4 = + sql("SELECT * FROM chain_restart_sink").stream() + .map(Row::toString) + .collect(java.util.stream.Collectors.toList()); + System.err.println("[TEST] Phase 4 sink rows (" + phase4.size() + "): " + phase4); + + // Verify new data is present + assertThat(phase4) + .as("Stateful restart: sink should contain new delta data") + .contains("+I[4, 1, new_4, 20250809]", "+I[5, 1, new_5, 20250809]"); + + // Verify total count: should be 7 (5 Phase 1 + 2 Phase 4), not more (duplicates) + assertThat(phase4.size()) + .as("Should have exactly 7 records (no duplicates from state recovery)") + .isEqualTo(7); + + // === Phase 5: Verify incremental streaming continues after restore === + sql( + "INSERT INTO `chain_restart$branch_delta` PARTITION (dt = '20250810')" + + " VALUES (6, 1, 'new_6')"); + + Thread.sleep(5000); + triggerCheckpoint(jobClient2); + Thread.sleep(2000); + + List phase5 = + sql("SELECT * FROM chain_restart_sink").stream() + .map(Row::toString) + .collect(java.util.stream.Collectors.toList()); + System.err.println("[TEST] Phase 5 sink rows (" + phase5.size() + "): " + phase5); + assertThat(phase5) + .as("Incremental streaming should continue after restore") + .contains("+I[6, 1, new_6, 20250810]"); + + jobClient2.cancel().get(); + + // Clean up state-recovery config for other tests + sEnv.getConfig().getConfiguration().removeKey("execution.state-recovery.path"); + } + + /** + * T1: Tests streaming read with snapshot+delta overlap in the starting phase. Verifies that + * doFullLoad() correctly merges snapshot-only, delta-only, and overlapping partitions. + */ + @Test + @Timeout(120) + public void testStreamingReadWithSnapshotDeltaOverlap() throws Exception { + sql( + "CREATE TABLE chain_overlap (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'," + + " 'continuous.discovery-interval' = '1ms'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_overlap', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_overlap', 'delta')", db); + for (String tbl : + new String[] { + "chain_overlap", "chain_overlap$branch_snapshot", "chain_overlap$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + // Write snapshot data: dt=20250807 (snapshot-only) and dt=20250808 (overlapping) + sql( + "INSERT INTO `chain_overlap$branch_snapshot` PARTITION (dt = '20250807')" + + " VALUES (6, 1, 'snap_6'), (7, 1, 'snap_7'), (8, 1, 'snap_8')"); + sql( + "INSERT INTO `chain_overlap$branch_snapshot` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'snap_1'), (2, 1, 'snap_2'), (3, 1, 'snap_3')"); + + // Write delta data: dt=20250808 (overlapping) and dt=20250809 (delta-only) + sql( + "INSERT INTO `chain_overlap$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 2, 'delta_1'), (2, 2, 'delta_2')," + + " (4, 1, 'delta_4'), (5, 1, 'delta_5')"); + sql( + "INSERT INTO `chain_overlap$branch_delta` PARTITION (dt = '20250809')" + + " VALUES (10, 1, 'new_10'), (11, 1, 'new_11')"); + + // Start streaming read + CloseableIterator it = sEnv.executeSql("SELECT * FROM chain_overlap").collect(); + + // Starting behavior (new: only output latest snapshot partition and partitions after it): + // - Latest snapshot partition: dt=20250808 (k=1,2,3 from snapshot, delta at this partition + // is skipped because snapshot wins for overlapping) + // - Delta-only partition after latest snapshot: dt=20250809 (k=10,11) + // - dt=20250807 is NOT output because it's before the latest snapshot partition + List startingRows = collectRows(it, 5); + assertThat(startingRows) + .as("Starting: latest snapshot partition and delta partitions after it") + .containsExactlyInAnyOrder( + "+I[1, 1, snap_1, 20250808]", + "+I[2, 1, snap_2, 20250808]", + "+I[3, 1, snap_3, 20250808]", + "+I[10, 1, new_10, 20250809]", + "+I[11, 1, new_11, 20250809]"); + + // Incremental: write new delta and verify it streams through + writeChangelogToBranch( + db, + "chain_overlap", + "delta", + Row.ofKind(RowKind.INSERT, 20L, 1L, "incr_20", "20250810")); + + Thread.sleep(2000); + List incr = collectRows(it, 1); + assertThat(incr) + .as("Incremental: new delta data should stream through") + .containsExactlyInAnyOrder("+I[20, 1, incr_20, 20250810]"); + + it.close(); + } + + /** + * T2: Tests that non-default startup modes throw an error for chain table streaming read. When + * scan.mode=latest is specified, an {@link UnsupportedOperationException} is thrown with a + * helpful message. + */ + @Test + @Timeout(60) + public void testStreamingReadRejectsNonDefaultStartup() throws Exception { + sql( + "CREATE TABLE chain_bypass (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'," + + " 'continuous.discovery-interval' = '1ms'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_bypass', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_bypass', 'delta')", db); + for (String tbl : + new String[] { + "chain_bypass", "chain_bypass$branch_snapshot", "chain_bypass$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + // Write data to main table (so snapshots exist for copy() to resolve) + sql( + "INSERT INTO `chain_bypass$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'v1'), (2, 1, 'v2')"); + + // Default mode: chain-table-aware scan (ChainTableStreamScan) + FileStoreTable table = paimonTable("chain_bypass"); + assertThat(table.newStreamScan()) + .as("Default startup mode should use ChainTableStreamScan") + .isInstanceOf(ChainTableStreamScan.class); + + // scan.mode=latest: should throw UnsupportedOperationException + FileStoreTable tableLatest = + table.copy(java.util.Collections.singletonMap("scan.mode", "latest")); + assertThatThrownBy(tableLatest::newStreamScan) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("scan.mode=latest") + .hasMessageContaining("Chain table streaming read does not support") + .hasMessageContaining("t$branch_delta"); + } + + /** + * T3: Tests that streaming read works with changelog-producer=none (the default). + * + *

Without a changelog producer, the incremental phase reads data files (delta manifest) + * rather than changelog files, producing only +I records. This is the same behavior as standard + * {@code DataTableStreamScan} with {@code changelog-producer=none}. + */ + @Test + @Timeout(60) + public void testStreamingReadWithNoChangelogProducer() throws Exception { + // Create chain table WITHOUT changelog-producer (defaults to none) + sql( + "CREATE TABLE chain_no_cl (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'," + + " 'continuous.discovery-interval' = '1ms'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_no_cl', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_no_cl', 'delta')", db); + for (String tbl : + new String[] { + "chain_no_cl", "chain_no_cl$branch_snapshot", "chain_no_cl$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + // Phase 1: Insert initial data into delta branch + sql( + "INSERT INTO `chain_no_cl$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'v1'), (2, 1, 'v2')"); + + // Start streaming read and collect Phase 1 results + try (CloseableIterator it = + sEnv.executeSql("SELECT k, v, dt FROM chain_no_cl").collect()) { + List phase1 = collectRows(it, 2); + assertThat(phase1) + .containsExactlyInAnyOrder("+I[1, v1, 20250808]", "+I[2, v2, 20250808]"); + + // Phase 2: Insert more data into delta branch (incremental) + sql( + "INSERT INTO `chain_no_cl$branch_delta` PARTITION (dt = '20250809')" + + " VALUES (3, 1, 'v3')"); + + List phase2 = collectRows(it, 1); + assertThat(phase2).containsExactly("+I[3, v3, 20250809]"); + } + } + + /** + * T6: Tests streaming read with group partitions (chain-partition-keys). Verifies that + * streaming works correctly when the table has a group dimension (e.g., region) and each group + * maintains its own independent chain. + */ + @Test + @Timeout(120) + public void testStreamingReadWithGroupPartition() throws Exception { + sql( + "CREATE TABLE chain_stream_group (" + + " k BIGINT, seq BIGINT, v STRING, region STRING, dt STRING" + + ") PARTITIONED BY (region, dt) WITH (" + + " 'primary-key' = 'region,dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'," + + " 'chain-table.chain-partition-keys' = 'dt'," + + " 'continuous.discovery-interval' = '1ms'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_stream_group', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_stream_group', 'delta')", db); + for (String tbl : + new String[] { + "chain_stream_group", + "chain_stream_group$branch_snapshot", + "chain_stream_group$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + // Write initial delta data for two regions + sql( + "INSERT INTO `chain_stream_group$branch_delta`" + + " PARTITION (region = 'CN', dt = '20250808')" + + " VALUES (1, 1, 'cn_1'), (2, 1, 'cn_2')"); + sql( + "INSERT INTO `chain_stream_group$branch_delta`" + + " PARTITION (region = 'US', dt = '20250808')" + + " VALUES (11, 1, 'us_11'), (12, 1, 'us_12')"); + + // Start streaming read + CloseableIterator it = sEnv.executeSql("SELECT * FROM chain_stream_group").collect(); + + // Starting: both regions, delta-only + List startingRows = collectRows(it, 4); + assertThat(startingRows) + .as("Starting: delta-only data for both regions") + .containsExactlyInAnyOrder( + "+I[1, 1, cn_1, CN, 20250808]", + "+I[2, 1, cn_2, CN, 20250808]", + "+I[11, 1, us_11, US, 20250808]", + "+I[12, 1, us_12, US, 20250808]"); + + // Incremental: write new delta for CN only + sql( + "INSERT INTO `chain_stream_group$branch_delta`" + + " PARTITION (region = 'CN', dt = '20250809')" + + " VALUES (3, 1, 'cn_3')"); + + Thread.sleep(2000); + List incr = collectRows(it, 1); + assertThat(incr) + .as("Incremental: new CN delta should stream through") + .containsExactlyInAnyOrder("+I[3, 1, cn_3, CN, 20250809]"); + + // Incremental: write new delta for US + sql( + "INSERT INTO `chain_stream_group$branch_delta`" + + " PARTITION (region = 'US', dt = '20250809')" + + " VALUES (13, 1, 'us_13')"); + + Thread.sleep(2000); + List incr2 = collectRows(it, 1); + assertThat(incr2) + .as("Incremental: new US delta should stream through") + .containsExactlyInAnyOrder("+I[13, 1, us_13, US, 20250809]"); + + it.close(); + } + + /** + * Triggers a checkpoint on the given job and returns the checkpoint path. Waits until all tasks + * are RUNNING before triggering. + */ + private String triggerCheckpoint(JobClient jobClient) throws Exception { + Field field = jobClient.getClass().getDeclaredField("miniCluster"); + field.setAccessible(true); + MiniCluster miniCluster = (MiniCluster) field.get(jobClient); + JobID jobID = jobClient.getJobID(); + + // Wait for all tasks to be RUNNING + AtomicBoolean allRunning = new AtomicBoolean(false); + while (!allRunning.get()) { + allRunning.set(true); + Thread.sleep(1000); + miniCluster + .getExecutionGraph(jobID) + .thenAccept( + eg -> + eg.getAllExecutionVertices() + .forEach( + v -> { + if (v.getExecutionState() + != ExecutionState.RUNNING) { + allRunning.set(false); + } + })) + .get(); + } + + return miniCluster.triggerCheckpoint(jobID).get(); + } + + // ========================================================================= + // Additional coverage tests + // ========================================================================= + + /** Tests restore(id, scanAll=true): resets starting state but preserves delta position. */ + @Test + @Timeout(60) + public void testRestoreScanAll() throws Exception { + sql( + "CREATE TABLE chain_restore_all (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_restore_all', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_restore_all', 'delta')", db); + for (String tbl : + new String[] { + "chain_restore_all", + "chain_restore_all$branch_snapshot", + "chain_restore_all$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + sql( + "INSERT INTO `chain_restore_all$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'v1'), (2, 1, 'v2')"); + + FileStoreTable table = paimonTable("chain_restore_all"); + ChainTableStreamScan scan = (ChainTableStreamScan) table.newStreamScan(); + + // Phase 1: starting + org.apache.paimon.table.source.TableScan.Plan plan1 = scan.plan(); + assertThat(plan1.splits()).as("Phase 1 should produce splits").isNotEmpty(); + Long checkpoint = scan.checkpoint(); + assertThat(checkpoint).as("Checkpoint should be non-null after Phase 1").isNotNull(); + + // Phase 2: no new data → empty plan + org.apache.paimon.table.source.TableScan.Plan plan2 = scan.plan(); + assertThat(plan2.splits()).as("Phase 2 with no new data should be empty").isEmpty(); + + // restore(id, scanAll=true): should reset to starting, preserve delta position + scan.restore(checkpoint, true); + assertThat(scan.checkpoint()) + .as("Checkpoint should be preserved after restore(id, true)") + .isEqualTo(checkpoint); + + // Starting should run again + org.apache.paimon.table.source.TableScan.Plan plan3 = scan.plan(); + assertThat(plan3.splits()) + .as("Starting should produce splits again after restore(id, true)") + .isNotEmpty(); + + // Delta position should be the same (no new commits) + assertThat(scan.checkpoint()).as("Checkpoint should remain the same").isEqualTo(checkpoint); + } + + /** Tests restore(null, true): resets to fresh starting with no delta position. */ + @Test + @Timeout(60) + public void testRestoreNullScanAll() throws Exception { + sql( + "CREATE TABLE chain_restore_null (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_restore_null', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_restore_null', 'delta')", db); + for (String tbl : + new String[] { + "chain_restore_null", + "chain_restore_null$branch_snapshot", + "chain_restore_null$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + sql( + "INSERT INTO `chain_restore_null$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'v1')"); + + FileStoreTable table = paimonTable("chain_restore_null"); + ChainTableStreamScan scan = (ChainTableStreamScan) table.newStreamScan(); + + // Phase 1: starting + scan.plan(); + assertThat(scan.checkpoint()).isNotNull(); + + // restore(null, scanAll=true): fresh start, no delta position + scan.restore(null, true); + assertThat(scan.checkpoint()) + .as("Checkpoint should be null after restore(null, true)") + .isNull(); + + // Starting should run again + org.apache.paimon.table.source.TableScan.Plan plan = scan.plan(); + assertThat(plan.splits()).as("Starting should produce splits").isNotEmpty(); + assertThat(scan.checkpoint()).as("Checkpoint should be set after new starting").isNotNull(); + } + + /** Tests streaming read with WHERE clause (partition predicate forwarding). */ + @Test + @Timeout(120) + public void testStreamingReadWithFilter() throws Exception { + sql( + "CREATE TABLE chain_filter (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'," + + " 'continuous.discovery-interval' = '1ms'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_filter', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_filter', 'delta')", db); + for (String tbl : + new String[] { + "chain_filter", "chain_filter$branch_snapshot", "chain_filter$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + // Write snapshot data for dt=20250807 and dt=20250808 + sql( + "INSERT INTO `chain_filter$branch_snapshot` PARTITION (dt = '20250807')" + + " VALUES (1, 1, 'snap_1')"); + sql( + "INSERT INTO `chain_filter$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (2, 1, 'delta_2'), (3, 1, 'delta_3')"); + sql( + "INSERT INTO `chain_filter$branch_delta` PARTITION (dt = '20250809')" + + " VALUES (4, 1, 'delta_4')"); + + // Streaming read with WHERE clause — only dt=20250808 + CloseableIterator it = + sEnv.executeSql("SELECT * FROM chain_filter WHERE dt = '20250808'").collect(); + + // Starting should only return dt=20250808 data (delta-only partition) + List startingRows = collectRows(it, 2); + assertThat(startingRows) + .as("Starting with WHERE dt=20250808 should only return that partition") + .containsExactlyInAnyOrder( + "+I[2, 1, delta_2, 20250808]", "+I[3, 1, delta_3, 20250808]"); + + // Incremental: write to dt=20250808 (should stream through) + writeChangelogToBranch( + db, + "chain_filter", + "delta", + Row.ofKind(RowKind.INSERT, 5L, 1L, "incr_5", "20250808")); + + Thread.sleep(2000); + List incr = collectRows(it, 1); + assertThat(incr) + .as("Incremental write to dt=20250808 should stream through") + .containsExactlyInAnyOrder("+I[5, 1, incr_5, 20250808]"); + + it.close(); + } + + /** Tests starting when delta branch is empty (only snapshot data). */ + @Test + @Timeout(120) + public void testStreamingReadEmptyDelta() throws Exception { + sql( + "CREATE TABLE chain_empty_delta (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'," + + " 'continuous.discovery-interval' = '1ms'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_empty_delta', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_empty_delta', 'delta')", db); + for (String tbl : + new String[] { + "chain_empty_delta", + "chain_empty_delta$branch_snapshot", + "chain_empty_delta$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + // Write ONLY to snapshot branch, delta stays empty + sql( + "INSERT INTO `chain_empty_delta$branch_snapshot` PARTITION (dt = '20250807')" + + " VALUES (1, 1, 'snap_1'), (2, 1, 'snap_2')"); + + CloseableIterator it = sEnv.executeSql("SELECT * FROM chain_empty_delta").collect(); + + List startingRows = collectRows(it, 2); + assertThat(startingRows) + .as("Starting with empty delta should return only snapshot data") + .containsExactlyInAnyOrder( + "+I[1, 1, snap_1, 20250807]", "+I[2, 1, snap_2, 20250807]"); + + // Incremental: write to delta and verify it streams through + writeChangelogToBranch( + db, + "chain_empty_delta", + "delta", + Row.ofKind(RowKind.INSERT, 3L, 1L, "new_3", "20250808")); + + Thread.sleep(2000); + List incr = collectRows(it, 1); + assertThat(incr) + .as("First delta write should stream through after snapshot-only starting") + .containsExactlyInAnyOrder("+I[3, 1, new_3, 20250808]"); + + it.close(); + } + + /** Tests starting when snapshot branch is empty (only delta data). */ + @Test + @Timeout(120) + public void testStreamingReadEmptySnapshot() throws Exception { + sql( + "CREATE TABLE chain_empty_snap (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'," + + " 'continuous.discovery-interval' = '1ms'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_empty_snap', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_empty_snap', 'delta')", db); + for (String tbl : + new String[] { + "chain_empty_snap", + "chain_empty_snap$branch_snapshot", + "chain_empty_snap$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + // Write ONLY to delta branch, snapshot stays empty + sql( + "INSERT INTO `chain_empty_snap$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'delta_1'), (2, 1, 'delta_2')"); + sql( + "INSERT INTO `chain_empty_snap$branch_delta` PARTITION (dt = '20250809')" + + " VALUES (3, 1, 'delta_3')"); + + CloseableIterator it = sEnv.executeSql("SELECT * FROM chain_empty_snap").collect(); + + List startingRows = collectRows(it, 3); + assertThat(startingRows) + .as("Starting with empty snapshot should return only delta data") + .containsExactlyInAnyOrder( + "+I[1, 1, delta_1, 20250808]", + "+I[2, 1, delta_2, 20250808]", + "+I[3, 1, delta_3, 20250809]"); + + it.close(); + } + + /** Tests that withShard() is correctly forwarded to both batch scan and delta stream scan. */ + @Test + @Timeout(60) + public void testWithShardForwarding() throws Exception { + sql( + "CREATE TABLE chain_shard (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_shard', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_shard', 'delta')", db); + for (String tbl : + new String[] { + "chain_shard", "chain_shard$branch_snapshot", "chain_shard$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + sql( + "INSERT INTO `chain_shard$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'v1'), (2, 1, 'v2')"); + + FileStoreTable table = paimonTable("chain_shard"); + + // Shard 0 of 2: should get a subset of data + ChainTableStreamScan scan0 = (ChainTableStreamScan) table.newStreamScan(); + scan0.withShard(0, 2); + org.apache.paimon.table.source.TableScan.Plan plan0 = scan0.plan(); + + // Shard 1 of 2: should get the other subset + ChainTableStreamScan scan1 = (ChainTableStreamScan) table.newStreamScan(); + scan1.withShard(1, 2); + org.apache.paimon.table.source.TableScan.Plan plan1 = scan1.plan(); + + // Together both shards should produce non-empty results + // (exact split depends on bucket hashing, but total should cover all data) + int totalSplits = plan0.splits().size() + plan1.splits().size(); + assertThat(totalSplits).as("Both shards together should produce splits").isGreaterThan(0); + } + + /** Tests streaming read when both snapshot and delta branches are empty. */ + @Test + @Timeout(60) + public void testStreamingReadBothBranchesEmpty() throws Exception { + sql( + "CREATE TABLE chain_both_empty (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'," + + " 'continuous.discovery-interval' = '1ms'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_both_empty', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_both_empty', 'delta')", db); + for (String tbl : + new String[] { + "chain_both_empty", + "chain_both_empty$branch_snapshot", + "chain_both_empty$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + // Both branches are empty — Phase 1 should produce no splits + FileStoreTable table = paimonTable("chain_both_empty"); + ChainTableStreamScan scan = (ChainTableStreamScan) table.newStreamScan(); + org.apache.paimon.table.source.TableScan.Plan plan1 = scan.plan(); + assertThat(plan1.splits()).as("Phase 1 with both branches empty should be empty").isEmpty(); + + // Phase 2: write new delta data and verify it streams through + sql( + "INSERT INTO `chain_both_empty$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'v1'), (2, 1, 'v2')"); + + org.apache.paimon.table.source.TableScan.Plan plan2 = scan.plan(); + assertThat(plan2.splits()).as("Phase 2 should pick up new delta data").isNotEmpty(); + } + + /** Tests that delta OVERWRITE in Phase 2 does not crash the scan. */ + @Test + @Timeout(60) + public void testStreamingReadDeltaOverwriteInPhase2() throws Exception { + sql( + "CREATE TABLE chain_overwrite_p2 (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'," + + " 'continuous.discovery-interval' = '1ms'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_overwrite_p2', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_overwrite_p2', 'delta')", db); + for (String tbl : + new String[] { + "chain_overwrite_p2", + "chain_overwrite_p2$branch_snapshot", + "chain_overwrite_p2$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + // Initial delta data + sql( + "INSERT INTO `chain_overwrite_p2$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'v1'), (2, 1, 'v2')"); + + FileStoreTable table = paimonTable("chain_overwrite_p2"); + ChainTableStreamScan scan = (ChainTableStreamScan) table.newStreamScan(); + + // Phase 1: read initial delta data + org.apache.paimon.table.source.TableScan.Plan plan1 = scan.plan(); + assertThat(plan1.splits()).as("Phase 1 should produce splits").isNotEmpty(); + + // Phase 2: OVERWRITE the same partition on delta branch. + // This creates a snapshot with OVERWRITE kind. The scan should handle it + // gracefully — either producing data or empty plans, but never crashing. + sql( + "INSERT OVERWRITE `chain_overwrite_p2$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 2, 'new_v1'), (3, 1, 'v3')"); + + // Verify scan.plan() does not throw after OVERWRITE + for (int i = 0; i < 3; i++) { + org.apache.paimon.table.source.TableScan.Plan planN = scan.plan(); + assertThat(planN).as("plan() should not return null after OVERWRITE").isNotNull(); + } + } + + /** Tests restore(null) re-runs Phase 1 with current data state. */ + @Test + @Timeout(60) + public void testStreamingReadRestoreAfterNewData() throws Exception { + sql( + "CREATE TABLE chain_restore_newdata (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_restore_newdata', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_restore_newdata', 'delta')", db); + for (String tbl : + new String[] { + "chain_restore_newdata", + "chain_restore_newdata$branch_snapshot", + "chain_restore_newdata$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + // Write initial snapshot + delta data + sql( + "INSERT INTO `chain_restore_newdata$branch_snapshot` PARTITION (dt = '20250807')" + + " VALUES (1, 1, 'snap_1')"); + sql( + "INSERT INTO `chain_restore_newdata$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (2, 1, 'delta_2')"); + + FileStoreTable table = paimonTable("chain_restore_newdata"); + ChainTableStreamScan scan = (ChainTableStreamScan) table.newStreamScan(); + + // Phase 1: snapshot at dt=20250807 (latest), delta at dt=20250808 + org.apache.paimon.table.source.TableScan.Plan plan1 = scan.plan(); + int phase1Size = plan1.splits().size(); + assertThat(phase1Size).as("Phase 1 should produce splits").isGreaterThan(0); + + // Add more data to both branches + sql( + "INSERT INTO `chain_restore_newdata$branch_snapshot` PARTITION (dt = '20250809')" + + " VALUES (3, 1, 'snap_3')"); + sql( + "INSERT INTO `chain_restore_newdata$branch_delta` PARTITION (dt = '20250810')" + + " VALUES (4, 1, 'delta_4')"); + + // restore(null) resets to fresh starting — Phase 1 should re-run with new data. + // After adding snapshot dt=20250809 and delta dt=20250810: + // - Latest snapshot: dt=20250809 (dt=20250807 excluded as older) + // - Delta dt=20250808 excluded (older than latest snapshot dt=20250809) + // - Delta dt=20250810 included (newer than dt=20250809) + scan.restore(null); + org.apache.paimon.table.source.TableScan.Plan plan2 = scan.plan(); + assertThat(plan2.splits()) + .as("Restore(null) should re-run Phase 1 with current data") + .isNotEmpty(); + } } From 6d67d97c20fc8f092881fa3b8374a59986929f31 Mon Sep 17 00:00:00 2001 From: Yunfeng Zhou Date: Thu, 18 Jun 2026 19:32:17 +0800 Subject: [PATCH 2/3] Fix comments --- docs/docs/primary-key-table/chain-table.md | 6 + .../paimon/table/ChainGroupReadTable.java | 13 +- .../paimon/table/ChainTableStreamScan.java | 137 +++--- .../paimon/table/source/ChainSplit.java | 18 + .../source/ContinuousFileSplitEnumerator.java | 6 +- .../paimon/flink/FlinkChainTableITCase.java | 415 +++++++++++------- 6 files changed, 382 insertions(+), 213 deletions(-) diff --git a/docs/docs/primary-key-table/chain-table.md b/docs/docs/primary-key-table/chain-table.md index 01a6ae1648bf..16b58db78cbe 100644 --- a/docs/docs/primary-key-table/chain-table.md +++ b/docs/docs/primary-key-table/chain-table.md @@ -235,6 +235,12 @@ INSERT INTO downstream_sink SELECT * FROM default.t; an `UnsupportedOperationException` is thrown. To use standard streaming read without chain table logic, read from a specific branch table (e.g., `t$branch_delta`) instead of the main table. +- Partition filters are not supported in chain table streaming reads. Specifying a partition + filter — either via a `WHERE` clause on partition columns or the `scan.partitions` table + option — throws an `UnsupportedOperationException`. This is because the chain table streaming + scan determines which partitions to read based on the chain-merge logic across snapshot and + delta branches, and applying a partition filter would interfere with this logic. To read a + specific partition, use batch mode instead. ## Group Partition diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java index 0237828e6a58..d6e03416b9c7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java @@ -239,18 +239,7 @@ public Plan plan() { PredicateBuilder builder = new PredicateBuilder(tableSchema.logicalPartitionType()); for (Split split : mainScan.plan().splits()) { DataSplit dataSplit = (DataSplit) split; - HashMap fileBucketPathMapping = new HashMap<>(); - HashMap fileBranchMapping = new HashMap<>(); - for (DataFileMeta file : dataSplit.dataFiles()) { - fileBucketPathMapping.put(file.fileName(), ((DataSplit) split).bucketPath()); - fileBranchMapping.put(file.fileName(), options.scanFallbackSnapshotBranch()); - } - splits.add( - new ChainSplit( - dataSplit.partition(), - dataSplit.dataFiles(), - fileBranchMapping, - fileBucketPathMapping)); + splits.add(ChainSplit.from(dataSplit, options.scanFallbackSnapshotBranch())); } Set snapshotPartitions = diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java b/paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java index 5f4ae499a55a..ab3c3a693e74 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java @@ -22,8 +22,9 @@ import org.apache.paimon.codegen.CodeGenUtils; import org.apache.paimon.codegen.RecordComparator; import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.PartitionEntry; +import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.predicate.PartitionPredicateVisitor; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.table.source.ChainSplit; import org.apache.paimon.table.source.DataFilePlan; @@ -87,6 +88,9 @@ public class ChainTableStreamScan implements StreamDataTableScan { /** Comparator for chain partition keys only. */ private final RecordComparator chainPartitionComparator; + /** Partition keys of the table, used to reject partition filters in streaming mode. */ + private final List partitionKeys; + /** * Checkpoint state: the next delta snapshot id to read. Null before Phase 1 completes; non-null * once Phase 1 is done or after a stateful restore. @@ -122,6 +126,7 @@ public ChainTableStreamScan(ChainGroupReadTable chainGroupReadTable) { this.chainPartitionComparator = CodeGenUtils.newRecordComparator( partitionProjector.chainPartitionType().getFieldTypes()); + this.partitionKeys = chainGroupReadTable.schema().partitionKeys(); } @Override @@ -174,45 +179,48 @@ private TableScan.Plan planStarting() { deltaSplitsByPartition = Collections.emptyMap(); } - // 2. Read all snapshot branch data, grouped by partition. - // Reuse batchScan.mainScan which has predicates/shard already applied. - Map> snapshotSplitsByPartition = - chainGroupReadTable.wrapped.snapshotManager().latestSnapshotId() != null - ? groupByPartition(batchScan.mainScan) - : Collections.emptyMap(); - - // 3. Find the latest snapshot partition per group (based on chain partition keys). - // Only output the latest snapshot partition and delta partitions after it. + // 2. List snapshot partitions (lightweight — partition metadata only, no file I/O). + // Find the latest chain partition per group, then scan only those partitions for files. + // This avoids reading file manifests for hundreds of historical partitions that will be + // discarded (only the latest per group is kept). Map latestChainPartitionPerGroup = new HashMap<>(); - for (BinaryRow partition : snapshotSplitsByPartition.keySet()) { - Object groupKey = toGroupKey(partition); - BinaryRow existingLatest = latestChainPartitionPerGroup.get(groupKey); - if (existingLatest == null - || chainPartitionComparator.compare( - partitionProjector.extractChainPartition(partition), - partitionProjector.extractChainPartition(existingLatest)) - > 0) { - latestChainPartitionPerGroup.put(groupKey, partition); + if (chainGroupReadTable.wrapped.snapshotManager().latestSnapshotId() != null) { + DataTableScan partitionListingScan = chainGroupReadTable.wrapped.newScan(); + applyPredicatesAndShard(partitionListingScan); + for (BinaryRow partition : partitionListingScan.listPartitions()) { + Object groupKey = toGroupKey(partition); + BinaryRow existingLatest = latestChainPartitionPerGroup.get(groupKey); + if (existingLatest == null + || chainPartitionComparator.compare( + partitionProjector.extractChainPartition(partition), + partitionProjector.extractChainPartition(existingLatest)) + > 0) { + latestChainPartitionPerGroup.put(groupKey, partition); + } } } + // 3. Scan file splits for latest snapshot partitions only. + List latestPartitions = new ArrayList<>(latestChainPartitionPerGroup.values()); + Map> snapshotSplitsByPartition; + if (!latestPartitions.isEmpty()) { + DataTableScan snapshotScan = chainGroupReadTable.wrapped.newScan(); + snapshotScan.withPartitionFilter(latestPartitions); + applyPredicatesAndShard(snapshotScan); + snapshotSplitsByPartition = groupByPartition(snapshotScan); + } else { + snapshotSplitsByPartition = Collections.emptyMap(); + } + // 4. Build ChainSplits: - // - For snapshot partitions: only include if chain key == latest for that group. - // - For delta partitions: include if (a) chain key > latest for that group, or + // - Snapshot partitions are already filtered to latest per group. + // - Delta partitions: include if (a) chain key > latest for that group, or // (b) no snapshot exists for that group. List allSplits = new ArrayList<>(); for (Map.Entry> entry : snapshotSplitsByPartition.entrySet()) { - BinaryRow partition = entry.getKey(); - Object groupKey = toGroupKey(partition); - BinaryRow latestPartition = latestChainPartitionPerGroup.get(groupKey); - if (chainPartitionComparator.compare( - partitionProjector.extractChainPartition(partition), - partitionProjector.extractChainPartition(latestPartition)) - == 0) { - for (DataSplit ds : entry.getValue()) { - allSplits.add(dataSplitToChainSplit(ds, snapshotBranch)); - } + for (DataSplit ds : entry.getValue()) { + allSplits.add(ChainSplit.from(ds, snapshotBranch)); } } @@ -229,7 +237,7 @@ private TableScan.Plan planStarting() { partitionProjector.extractChainPartition(latestPartition)) > 0) { for (DataSplit ds : entry.getValue()) { - allSplits.add(dataSplitToChainSplit(ds, deltaBranch)); + allSplits.add(ChainSplit.from(ds, deltaBranch)); } } } @@ -294,32 +302,56 @@ private Object toGroupKey(BinaryRow fullPartition) { return partitionProjector.extractGroupPartition(fullPartition); } - /** - * Converts a {@link DataSplit} to a {@link ChainSplit} where all files belong to the given - * branch. The partition value is preserved as-is (no rewriting). - */ - private static ChainSplit dataSplitToChainSplit(DataSplit dataSplit, String branch) { - HashMap fileBranchMapping = new HashMap<>(); - HashMap fileBucketPathMapping = new HashMap<>(); - for (DataFileMeta file : dataSplit.dataFiles()) { - fileBranchMapping.put(file.fileName(), branch); - fileBucketPathMapping.put(file.fileName(), dataSplit.bucketPath()); - } - return new ChainSplit( - dataSplit.partition(), - dataSplit.dataFiles(), - fileBranchMapping, - fileBucketPathMapping); - } - @Override public InnerTableScan withFilter(Predicate predicate) { + if (predicate == null) { + return this; + } + if (!partitionKeys.isEmpty() + && predicate.visit(new PartitionPredicateVisitor(partitionKeys))) { + throw new UnsupportedOperationException( + "Partition filter is not supported in chain table streaming read. " + + "The chain table streaming scan determines which partitions to read " + + "based on the chain-merge logic across snapshot and delta branches. " + + "Applying a partition filter would interfere with this logic. " + + "If you need to read a specific partition, use batch mode instead."); + } predicates.add(predicate); batchScan.withFilter(predicate); deltaStreamScan.withFilter(predicate); return this; } + @Override + public InnerTableScan withPartitionFilter(Map partitionSpec) { + throw new UnsupportedOperationException( + "Partition filter is not supported in chain table streaming read."); + } + + @Override + public InnerTableScan withPartitionFilter(List partitions) { + throw new UnsupportedOperationException( + "Partition filter is not supported in chain table streaming read."); + } + + @Override + public InnerTableScan withPartitionFilter(PartitionPredicate partitionPredicate) { + if (partitionPredicate != null) { + throw new UnsupportedOperationException( + "Partition filter is not supported in chain table streaming read."); + } + return this; + } + + @Override + public InnerTableScan withPartitionFilter(Predicate predicate) { + if (predicate != null) { + throw new UnsupportedOperationException( + "Partition filter is not supported in chain table streaming read."); + } + return this; + } + @Override public DataTableScan withShard(int indexOfThisSubtask, int numberOfParallelSubtasks) { shardIndex = indexOfThisSubtask; @@ -345,6 +377,9 @@ private void applyPredicatesAndShard(DataTableScan scan) { @Nullable @Override public Long checkpoint() { + if (startingDone) { + return deltaStreamScan.checkpoint(); + } return nextDeltaSnapshotId; } @@ -363,6 +398,8 @@ public void restore(@Nullable Long nextSnapshotId) { if (nextSnapshotId != null) { startingDone = true; deltaStreamScan.restore(nextSnapshotId); + } else { + startingDone = false; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java index dfa364f96a94..6b3512c615df 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java @@ -79,6 +79,24 @@ public Map fileBucketPathMapping() { return fileBucketPathMapping; } + /** + * Creates a {@link ChainSplit} from a {@link DataSplit} where all data files belong to the same + * branch. + */ + public static ChainSplit from(DataSplit dataSplit, String branch) { + HashMap fileBranchMapping = new HashMap<>(); + HashMap fileBucketPathMapping = new HashMap<>(); + for (DataFileMeta file : dataSplit.dataFiles()) { + fileBranchMapping.put(file.fileName(), branch); + fileBucketPathMapping.put(file.fileName(), dataSplit.bucketPath()); + } + return new ChainSplit( + dataSplit.partition(), + dataSplit.dataFiles(), + fileBranchMapping, + fileBucketPathMapping); + } + @Override public long rowCount() { long sum = 0; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java index 6212038072ca..a8c77aab7fd6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java @@ -380,7 +380,11 @@ protected int assignSuggestedTask(ChainSplit split) { if (bucketDir.startsWith("bucket-")) { try { bucketId = Integer.parseInt(bucketDir.substring("bucket-".length())); - } catch (NumberFormatException ignored) { + } catch (NumberFormatException e) { + LOG.warn( + "Failed to parse bucket id from path '{}', falling back to 0.", + bucketPath, + e); } } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java index 734e2f5d3d86..978124320dd9 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java @@ -21,6 +21,7 @@ import org.apache.paimon.flink.sink.FlinkSinkBuilder; import org.apache.paimon.table.ChainTableStreamScan; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.BlockingIterator; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.CheckpointingOptions; @@ -42,9 +43,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -743,34 +744,64 @@ private void writeChangelogToBranch(String db, String tableName, String branch, * because it.next() blocks indefinitely when no data is available, and JUnit @Timeout cannot * interrupt it. */ - private List collectRows(CloseableIterator it, int n, int timeoutSeconds) - throws Exception { - List result = new ArrayList<>(); - for (int i = 0; i < n; i++) { - CompletableFuture future = - CompletableFuture.supplyAsync(() -> it.next().toString()); - try { - result.add(future.get(timeoutSeconds, TimeUnit.SECONDS)); - } catch (java.util.concurrent.TimeoutException e) { - future.cancel(true); - it.close(); - throw new AssertionError( - "Streaming read blocked for " - + timeoutSeconds - + "s after collecting " - + result.size() - + "/" - + n - + " rows. Collected so far: " - + result); + /** + * Collects {@code n} rows from a streaming iterator using the project-standard {@link + * BlockingIterator}. + */ + private List collectRows(CloseableIterator it, int n) throws Exception { + return BlockingIterator.of(it).collect(n, 30, TimeUnit.SECONDS).stream() + .map(Row::toString) + .collect(Collectors.toList()); + } + + /** + * Polls the given table until it contains at least {@code minRows} rows. Used instead of + * fixed-duration Thread.sleep to avoid flaky tests on slow CI. + */ + private void waitForRowCount(String tableName, int minRows) throws Exception { + long deadline = System.currentTimeMillis() + 60_000; + int count = 0; + while (System.currentTimeMillis() < deadline) { + List rows = sql("SELECT * FROM " + tableName); + count = rows.size(); + if (count >= minRows) { + return; } + Thread.sleep(1000); } - return result; + throw new AssertionError( + "Timed out waiting for " + minRows + " rows in " + tableName + ", got " + count); } - /** Default collectRows with 30s timeout. */ - private List collectRows(CloseableIterator it, int n) throws Exception { - return collectRows(it, n, 30); + /** Polls until all tasks of the given job are in RUNNING state. */ + private void waitForJobRunning(JobClient jobClient) throws Exception { + Field field = jobClient.getClass().getDeclaredField("miniCluster"); + field.setAccessible(true); + MiniCluster miniCluster = (MiniCluster) field.get(jobClient); + JobID jobID = jobClient.getJobID(); + + long deadline = System.currentTimeMillis() + 60_000; + while (System.currentTimeMillis() < deadline) { + AtomicBoolean allRunning = new AtomicBoolean(true); + miniCluster + .getExecutionGraph(jobID) + .thenAccept( + eg -> + eg.getAllExecutionVertices() + .forEach( + v -> { + if (v.getExecutionState() + != ExecutionState.RUNNING) { + allRunning.set(false); + } + })) + .get(); + if (allRunning.get()) { + return; + } + Thread.sleep(1000); + } + throw new AssertionError("Timed out waiting for job " + jobID + " to reach RUNNING state"); } /** @@ -841,7 +872,6 @@ public void testStreamingReadChainTableLifecycleWithInputChangelog() throws Exce Row.ofKind(RowKind.INSERT, 6L, 1L, "new_6", "20250809"), Row.ofKind(RowKind.INSERT, 7L, 1L, "new_7", "20250809")); - Thread.sleep(2000); List phase2 = collectRows(it, 4); // changelog-producer=input: explicit -U/+U for updates assertThat(phase2) @@ -857,8 +887,6 @@ public void testStreamingReadChainTableLifecycleWithInputChangelog() throws Exce + " VALUES (1, 1, 'base_1'), (2, 1, 'base_2'), (3, 1, 'base_3')," + " (4, 1, 'base_4'), (5, 1, 'base_5')"); - Thread.sleep(2000); - // Write delta AFTER snapshot — this proves snapshot writes don't trigger output. // If snapshot writes were detected, we'd see duplicate or unexpected rows. writeChangelogToBranch( @@ -867,7 +895,6 @@ public void testStreamingReadChainTableLifecycleWithInputChangelog() throws Exce "delta", Row.ofKind(RowKind.INSERT, 100L, 1L, "phase3_probe", "20250810")); - Thread.sleep(2000); List phase3 = collectRows(it, 1); assertThat(phase3) .as("Only delta write should produce output, snapshot OVERWRITE should be ignored") @@ -881,7 +908,6 @@ public void testStreamingReadChainTableLifecycleWithInputChangelog() throws Exce Row.ofKind(RowKind.INSERT, 8L, 1L, "new_8", "20250810"), Row.ofKind(RowKind.INSERT, 9L, 1L, "new_9", "20250810")); - Thread.sleep(2000); List phase4 = collectRows(it, 2); assertThat(phase4) .containsExactlyInAnyOrder( @@ -922,7 +948,6 @@ public void testStreamingReadChainTableLifecycleWithInputChangelog() throws Exce Row.ofKind(RowKind.INSERT, 10L, 1L, "new_10", "20250811"), Row.ofKind(RowKind.INSERT, 11L, 1L, "new_11", "20250811")); - Thread.sleep(2000); List phase5b = collectRows(it2, 2); assertThat(phase5b) .containsExactlyInAnyOrder( @@ -934,11 +959,11 @@ public void testStreamingReadChainTableLifecycleWithInputChangelog() throws Exce /** * Tests stateful restart of a chain table streaming read job using Flink checkpoint/restore. * - *

Phase 1: Write initial delta data, start streaming job, verify read. Phase 2: Trigger a - * checkpoint (saves enumerator state including nextDeltaSnapshotId), then cancel the job. Phase - * 3: Write new delta data while the job is down. Phase 4: Restart from checkpoint — the - * restored scan should skip doFullLoad() and only read Phase 3's new data. Phase 5: Verify - * incremental streaming continues to work after restore. + *

Phase 1: Write initial delta data, start streaming job. Phase 2: Write incremental delta, + * let Phase 2 consume it, then checkpoint and cancel. Phase 3: Write new delta data while the + * job is down. Phase 4: Restart from checkpoint — the restored scan must NOT re-read the + * already-consumed delta (verifies checkpoint() returns the advanced cursor, not the stale + * Phase 1 boundary). Phase 5: Verify incremental streaming continues after restore. */ @Test @Timeout(180) @@ -995,7 +1020,10 @@ public void testStreamingReadChainTableStatefulRestart() throws Exception { config.set( CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION, ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION); - config.removeKey("execution.checkpointing.interval"); + // Enable auto-checkpointing (1s) so Phase 2 data is committed before we take the + // savepoint. This ensures the enumerator's delta cursor has advanced past + // delta@20250809, which is the scenario the checkpoint() regression would break. + config.setString("execution.checkpointing.interval", "1000"); // Same SQL for both phases → operator graph matches → state recovery works String streamSql = "INSERT INTO chain_restart_sink SELECT * FROM chain_restart"; @@ -1015,12 +1043,24 @@ public void testStreamingReadChainTableStatefulRestart() throws Exception { //noinspection OptionalGetWithoutIsPresent JobClient jobClient = tableResult.getJobClient().get(); - // Wait for data to flow to the sink - Thread.sleep(5000); + // Wait for streaming job to be fully running before writing Phase 2 data. + waitForJobRunning(jobClient); - // === Phase 2: Trigger checkpoint and cancel job === - // The checkpoint commits data to the sink table. - Thread.sleep(3000); + // === Phase 2: Write incremental delta, let Phase 2 consume it, THEN checkpoint === + // This exercises the checkpoint() regression: if checkpoint() returns the stale + // Phase 1 boundary instead of the advanced delta cursor, restore would re-read + // delta@20250809 and produce duplicates. + sql( + "INSERT INTO `chain_restart$branch_delta` PARTITION (dt = '20250809')" + + " VALUES (4, 1, 'new_4'), (5, 1, 'new_5')"); + + // Wait for auto-checkpoint to commit Phase 2 data to the sink. This proves the + // enumerator's scan has consumed delta@20250809 and its checkpoint() returned the + // advanced cursor — the exact scenario the regression would break. + waitForRowCount("chain_restart_sink", 7); + + // Create a savepoint for restart. The enumerator state now includes the advanced + // delta cursor (past delta@20250809). String checkpointPath = triggerCheckpoint(jobClient); java.io.File cpFile = @@ -1030,31 +1070,37 @@ public void testStreamingReadChainTableStatefulRestart() throws Exception { } assertThat(cpFile.exists()).as("Checkpoint directory should exist: " + cpFile).isTrue(); - Thread.sleep(2000); - jobClient.cancel().get(); - - // Verify Phase 1 data via batch read from the sink table. - // Starting should include both snapshot-only (dt=20250807) and delta (dt=20250808). - List phase1 = + // Verify Phase 1+2 data (committed by checkpoint). + List phase1and2 = sql("SELECT * FROM chain_restart_sink").stream() .map(Row::toString) - .collect(java.util.stream.Collectors.toList()); - System.err.println("[TEST] Phase 1 sink rows: " + phase1); - assertThat(phase1) - .as("Phase 1: starting includes snapshot-only and delta partitions") + .collect(Collectors.toList()); + assertThat(phase1and2) + .as("Phase 1+2: sink has snapshot, delta@20250808, and delta@20250809") .containsExactlyInAnyOrder( "+I[1, 1, base_1, 20250808]", "+I[2, 1, base_2, 20250808]", "+I[3, 1, base_3, 20250808]", "+I[10, 1, snap_10, 20250807]", - "+I[11, 1, snap_11, 20250807]"); + "+I[11, 1, snap_11, 20250807]", + "+I[4, 1, new_4, 20250809]", + "+I[5, 1, new_5, 20250809]"); + + jobClient.cancel().get(); + + // Disable auto-checkpointing before restart to avoid conflicts between + // auto-checkpoints and manual triggerCheckpoint() on the new job. + config.removeKey("execution.checkpointing.interval"); // === Phase 3: Write new delta data while job is stopped === sql( - "INSERT INTO `chain_restart$branch_delta` PARTITION (dt = '20250809')" - + " VALUES (4, 1, 'new_4'), (5, 1, 'new_5')"); + "INSERT INTO `chain_restart$branch_delta` PARTITION (dt = '20250810')" + + " VALUES (6, 1, 'new_6'), (7, 1, 'new_7')"); // === Phase 4: Restart from checkpoint === + // The restored scan should NOT re-read delta@20250809 (already consumed before + // checkpoint). If checkpoint() returned the stale Phase 1 boundary, delta@20250809 + // would be re-read and produce duplicates. sEnv.getConfig() .getConfiguration() .setString("execution.state-recovery.path", checkpointPath); @@ -1063,48 +1109,46 @@ public void testStreamingReadChainTableStatefulRestart() throws Exception { //noinspection OptionalGetWithoutIsPresent JobClient jobClient2 = tableResult2.getJobClient().get(); - // Wait for restored scan to produce and commit data - Thread.sleep(5000); - - // Trigger checkpoint to commit Phase 4 data - Thread.sleep(3000); + // Trigger checkpoint to commit Phase 4 data (restored scan output + Phase 3 delta). + waitForJobRunning(jobClient2); triggerCheckpoint(jobClient2); - Thread.sleep(2000); + + // Poll sink until checkpoint commits all 9 rows (7 from Phase 1+2 + 2 from Phase 3). + waitForRowCount("chain_restart_sink", 9); // Read all records from the sink table List phase4 = sql("SELECT * FROM chain_restart_sink").stream() .map(Row::toString) - .collect(java.util.stream.Collectors.toList()); - System.err.println("[TEST] Phase 4 sink rows (" + phase4.size() + "): " + phase4); + .collect(Collectors.toList()); // Verify new data is present assertThat(phase4) .as("Stateful restart: sink should contain new delta data") - .contains("+I[4, 1, new_4, 20250809]", "+I[5, 1, new_5, 20250809]"); + .contains("+I[6, 1, new_6, 20250810]", "+I[7, 1, new_7, 20250810]"); - // Verify total count: should be 7 (5 Phase 1 + 2 Phase 4), not more (duplicates) + // Verify total count: 7 from Phase 1+2 + 2 from Phase 3 = 9, no duplicates assertThat(phase4.size()) - .as("Should have exactly 7 records (no duplicates from state recovery)") - .isEqualTo(7); + .as("Should have exactly 9 records (no duplicates from state recovery)") + .isEqualTo(9); // === Phase 5: Verify incremental streaming continues after restore === sql( - "INSERT INTO `chain_restart$branch_delta` PARTITION (dt = '20250810')" - + " VALUES (6, 1, 'new_6')"); + "INSERT INTO `chain_restart$branch_delta` PARTITION (dt = '20250811')" + + " VALUES (8, 1, 'new_8')"); - Thread.sleep(5000); triggerCheckpoint(jobClient2); - Thread.sleep(2000); + + // Poll sink until checkpoint commits the new incremental row. + waitForRowCount("chain_restart_sink", 10); List phase5 = sql("SELECT * FROM chain_restart_sink").stream() .map(Row::toString) - .collect(java.util.stream.Collectors.toList()); - System.err.println("[TEST] Phase 5 sink rows (" + phase5.size() + "): " + phase5); + .collect(Collectors.toList()); assertThat(phase5) .as("Incremental streaming should continue after restore") - .contains("+I[6, 1, new_6, 20250810]"); + .contains("+I[8, 1, new_8, 20250811]"); jobClient2.cancel().get(); @@ -1191,7 +1235,6 @@ public void testStreamingReadWithSnapshotDeltaOverlap() throws Exception { "delta", Row.ofKind(RowKind.INSERT, 20L, 1L, "incr_20", "20250810")); - Thread.sleep(2000); List incr = collectRows(it, 1); assertThat(incr) .as("Incremental: new delta data should stream through") @@ -1391,7 +1434,6 @@ public void testStreamingReadWithGroupPartition() throws Exception { + " PARTITION (region = 'CN', dt = '20250809')" + " VALUES (3, 1, 'cn_3')"); - Thread.sleep(2000); List incr = collectRows(it, 1); assertThat(incr) .as("Incremental: new CN delta should stream through") @@ -1403,7 +1445,6 @@ public void testStreamingReadWithGroupPartition() throws Exception { + " PARTITION (region = 'US', dt = '20250809')" + " VALUES (13, 1, 'us_13')"); - Thread.sleep(2000); List incr2 = collectRows(it, 1); assertThat(incr2) .as("Incremental: new US delta should stream through") @@ -1575,78 +1616,6 @@ public void testRestoreNullScanAll() throws Exception { assertThat(scan.checkpoint()).as("Checkpoint should be set after new starting").isNotNull(); } - /** Tests streaming read with WHERE clause (partition predicate forwarding). */ - @Test - @Timeout(120) - public void testStreamingReadWithFilter() throws Exception { - sql( - "CREATE TABLE chain_filter (" - + " k BIGINT, seq BIGINT, v STRING, dt STRING" - + ") PARTITIONED BY (dt) WITH (" - + " 'primary-key' = 'dt,k'," - + " 'bucket-key' = 'k'," - + " 'bucket' = '2'," - + " 'sequence.field' = 'seq'," - + " 'merge-engine' = 'deduplicate'," - + " 'changelog-producer' = 'input'," - + " 'chain-table.enabled' = 'true'," - + " 'partition.timestamp-pattern' = '$dt'," - + " 'partition.timestamp-formatter' = 'yyyyMMdd'," - + " 'continuous.discovery-interval' = '1ms'" - + ")"); - - String db = tEnv.getCurrentDatabase(); - sql("CALL sys.create_branch('%s.chain_filter', 'snapshot')", db); - sql("CALL sys.create_branch('%s.chain_filter', 'delta')", db); - for (String tbl : - new String[] { - "chain_filter", "chain_filter$branch_snapshot", "chain_filter$branch_delta" - }) { - sql( - "ALTER TABLE `%s` SET (" - + " 'scan.fallback-snapshot-branch' = 'snapshot'," - + " 'scan.fallback-delta-branch' = 'delta')", - tbl); - } - - // Write snapshot data for dt=20250807 and dt=20250808 - sql( - "INSERT INTO `chain_filter$branch_snapshot` PARTITION (dt = '20250807')" - + " VALUES (1, 1, 'snap_1')"); - sql( - "INSERT INTO `chain_filter$branch_delta` PARTITION (dt = '20250808')" - + " VALUES (2, 1, 'delta_2'), (3, 1, 'delta_3')"); - sql( - "INSERT INTO `chain_filter$branch_delta` PARTITION (dt = '20250809')" - + " VALUES (4, 1, 'delta_4')"); - - // Streaming read with WHERE clause — only dt=20250808 - CloseableIterator it = - sEnv.executeSql("SELECT * FROM chain_filter WHERE dt = '20250808'").collect(); - - // Starting should only return dt=20250808 data (delta-only partition) - List startingRows = collectRows(it, 2); - assertThat(startingRows) - .as("Starting with WHERE dt=20250808 should only return that partition") - .containsExactlyInAnyOrder( - "+I[2, 1, delta_2, 20250808]", "+I[3, 1, delta_3, 20250808]"); - - // Incremental: write to dt=20250808 (should stream through) - writeChangelogToBranch( - db, - "chain_filter", - "delta", - Row.ofKind(RowKind.INSERT, 5L, 1L, "incr_5", "20250808")); - - Thread.sleep(2000); - List incr = collectRows(it, 1); - assertThat(incr) - .as("Incremental write to dt=20250808 should stream through") - .containsExactlyInAnyOrder("+I[5, 1, incr_5, 20250808]"); - - it.close(); - } - /** Tests starting when delta branch is empty (only snapshot data). */ @Test @Timeout(120) @@ -1703,7 +1672,6 @@ public void testStreamingReadEmptyDelta() throws Exception { "delta", Row.ofKind(RowKind.INSERT, 3L, 1L, "new_3", "20250808")); - Thread.sleep(2000); List incr = collectRows(it, 1); assertThat(incr) .as("First delta write should stream through after snapshot-only starting") @@ -2007,4 +1975,151 @@ public void testStreamingReadRestoreAfterNewData() throws Exception { .as("Restore(null) should re-run Phase 1 with current data") .isNotEmpty(); } + + /** + * T5: Tests that chain table streaming read rejects partition filters via {@code withFilter}. + * + *

Partition filters interfere with the chain table Phase 1 logic (which determines the + * latest snapshot partition per group). This test verifies that a partition-only predicate is + * rejected with an UnsupportedOperationException. + */ + @Test + @Timeout(60) + public void testStreamingReadRejectsPartitionFilter() throws Exception { + createChainTable("chain_pf_partition"); + setupChainTableBranches("chain_pf_partition"); + + sql( + "INSERT INTO `chain_pf_partition$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'v1')"); + + FileStoreTable table = paimonTable("chain_pf_partition"); + ChainTableStreamScan scan = (ChainTableStreamScan) table.newStreamScan(); + + // dt is the 4th field (index 3) in the schema: t1(0), t2(1), t3(2), dt(3) + org.apache.paimon.predicate.PredicateBuilder builder = + new org.apache.paimon.predicate.PredicateBuilder(table.rowType()); + + // Partition-only filter should be rejected + org.apache.paimon.predicate.Predicate partitionFilter = + builder.equal(3, org.apache.paimon.data.BinaryString.fromString("20250808")); + assertThatThrownBy(() -> scan.withFilter(partitionFilter)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("Partition filter is not supported"); + } + + /** + * T6: Tests that non-partition filters work end-to-end in chain table streaming reads. + * + *

Verifies: (1) {@code withFilter} on a data column is accepted at the scan API level, (2) + * streaming {@code SELECT ... WHERE v = 'hello'} filters out non-matching rows, (3) the filter + * continues to apply to incrementally written data. + */ + @Test + @Timeout(120) + public void testStreamingReadWithNonPartitionFilter() throws Exception { + sql( + "CREATE TABLE chain_data_filter (" + + " k BIGINT, seq BIGINT, v STRING, dt STRING" + + ") PARTITIONED BY (dt) WITH (" + + " 'primary-key' = 'dt,k'," + + " 'bucket-key' = 'k'," + + " 'bucket' = '2'," + + " 'sequence.field' = 'seq'," + + " 'merge-engine' = 'deduplicate'," + + " 'changelog-producer' = 'input'," + + " 'chain-table.enabled' = 'true'," + + " 'partition.timestamp-pattern' = '$dt'," + + " 'partition.timestamp-formatter' = 'yyyyMMdd'," + + " 'continuous.discovery-interval' = '1ms'" + + ")"); + + String db = tEnv.getCurrentDatabase(); + sql("CALL sys.create_branch('%s.chain_data_filter', 'snapshot')", db); + sql("CALL sys.create_branch('%s.chain_data_filter', 'delta')", db); + for (String tbl : + new String[] { + "chain_data_filter", + "chain_data_filter$branch_snapshot", + "chain_data_filter$branch_delta" + }) { + sql( + "ALTER TABLE `%s` SET (" + + " 'scan.fallback-snapshot-branch' = 'snapshot'," + + " 'scan.fallback-delta-branch' = 'delta')", + tbl); + } + + // Write initial delta data with mixed values of v + sql( + "INSERT INTO `chain_data_filter$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'hello'), (2, 1, 'world'), (3, 1, 'hello'), (4, 1, 'foo')"); + + // Streaming read with WHERE on data column v — should only return v='hello' rows + CloseableIterator it = + sEnv.executeSql("SELECT * FROM chain_data_filter WHERE v = 'hello'").collect(); + + List startingRows = collectRows(it, 2); + assertThat(startingRows) + .as("Starting with WHERE v='hello' should only return matching rows") + .containsExactlyInAnyOrder( + "+I[1, 1, hello, 20250808]", "+I[3, 1, hello, 20250808]"); + + // Incremental: write more data with mixed v values + writeChangelogToBranch( + db, + "chain_data_filter", + "delta", + Row.ofKind(RowKind.INSERT, 5L, 1L, "hello", "20250809"), + Row.ofKind(RowKind.INSERT, 6L, 1L, "bar", "20250809")); + + List incrRows = collectRows(it, 1); + assertThat(incrRows) + .as("Incremental: only v='hello' row should stream through") + .containsExactlyInAnyOrder("+I[5, 1, hello, 20250809]"); + + it.close(); + } + + /** + * T7: Tests that chain table streaming read rejects partition filters via {@code + * withPartitionFilter}. + * + *

The {@code withPartitionFilter} API (used for {@code scan.partitions} table option) should + * also be rejected. + */ + @Test + @Timeout(60) + public void testStreamingReadRejectsWithPartitionFilter() throws Exception { + createChainTable("chain_pf_api"); + setupChainTableBranches("chain_pf_api"); + + sql( + "INSERT INTO `chain_pf_api$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'v1')"); + + FileStoreTable table = paimonTable("chain_pf_api"); + ChainTableStreamScan scan = (ChainTableStreamScan) table.newStreamScan(); + + // withPartitionFilter(Map) should be rejected + assertThatThrownBy( + () -> + scan.withPartitionFilter( + java.util.Collections.singletonMap("dt", "20250808"))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("Partition filter is not supported"); + + // withPartitionFilter(PartitionPredicate) should be rejected + org.apache.paimon.predicate.PredicateBuilder ppBuilder = + new org.apache.paimon.predicate.PredicateBuilder( + table.schema().logicalPartitionType()); + org.apache.paimon.partition.PartitionPredicate pp = + org.apache.paimon.partition.PartitionPredicate.fromPredicate( + table.schema().logicalPartitionType(), + ppBuilder.equal( + 0, org.apache.paimon.data.BinaryString.fromString("20250808"))); + assertThatThrownBy(() -> scan.withPartitionFilter(pp)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("Partition filter is not supported"); + } } From 0417a172ae326301cddb9ff179666cbfcb46a40d Mon Sep 17 00:00:00 2001 From: Yunfeng Zhou Date: Sun, 21 Jun 2026 13:39:00 +0800 Subject: [PATCH 3/3] Fix comments --- .../paimon/table/ChainTableStreamScan.java | 1 - .../flink/source/FlinkSourceBuilder.java | 6 + .../paimon/flink/FlinkChainTableITCase.java | 139 +++++++++++++++--- 3 files changed, 122 insertions(+), 24 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java b/paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java index ab3c3a693e74..7378744cf5a4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java @@ -186,7 +186,6 @@ private TableScan.Plan planStarting() { Map latestChainPartitionPerGroup = new HashMap<>(); if (chainGroupReadTable.wrapped.snapshotManager().latestSnapshotId() != null) { DataTableScan partitionListingScan = chainGroupReadTable.wrapped.newScan(); - applyPredicatesAndShard(partitionListingScan); for (BinaryRow partition : partitionListingScan.listPartitions()) { Object groupKey = toGroupKey(partition); BinaryRow existingLatest = latestChainPartitionPerGroup.get(groupKey); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java index 3e96dec1ea50..2e9c8ae494c2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java @@ -326,6 +326,12 @@ public DataStream build() { TableScanUtils.streamingReadingValidate(table); if (conf.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_ENABLED)) { + if (conf.get(CoreOptions.CHAIN_TABLE_ENABLED)) { + throw new UnsupportedOperationException( + "Chain table streaming is not compatible with checkpoint-align mode. " + + "Please disable 'source.checkpoint-align.enabled' when reading " + + "a chain table in streaming mode."); + } return buildAlignedContinuousFileSource(); } else if (conf.contains(CoreOptions.CONSUMER_ID) && conf.get(CoreOptions.CONSUMER_CONSISTENCY_MODE) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java index 978124320dd9..0f05e3b00c97 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java @@ -18,9 +18,17 @@ package org.apache.paimon.flink; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryString; import org.apache.paimon.flink.sink.FlinkSinkBuilder; +import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.table.ChainTableStreamScan; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.DataTableScan; +import org.apache.paimon.table.source.TableScan; import org.apache.paimon.utils.BlockingIterator; import org.apache.flink.api.common.JobID; @@ -1533,13 +1541,13 @@ public void testRestoreScanAll() throws Exception { ChainTableStreamScan scan = (ChainTableStreamScan) table.newStreamScan(); // Phase 1: starting - org.apache.paimon.table.source.TableScan.Plan plan1 = scan.plan(); + TableScan.Plan plan1 = scan.plan(); assertThat(plan1.splits()).as("Phase 1 should produce splits").isNotEmpty(); Long checkpoint = scan.checkpoint(); assertThat(checkpoint).as("Checkpoint should be non-null after Phase 1").isNotNull(); // Phase 2: no new data → empty plan - org.apache.paimon.table.source.TableScan.Plan plan2 = scan.plan(); + TableScan.Plan plan2 = scan.plan(); assertThat(plan2.splits()).as("Phase 2 with no new data should be empty").isEmpty(); // restore(id, scanAll=true): should reset to starting, preserve delta position @@ -1549,7 +1557,7 @@ public void testRestoreScanAll() throws Exception { .isEqualTo(checkpoint); // Starting should run again - org.apache.paimon.table.source.TableScan.Plan plan3 = scan.plan(); + TableScan.Plan plan3 = scan.plan(); assertThat(plan3.splits()) .as("Starting should produce splits again after restore(id, true)") .isNotEmpty(); @@ -1611,7 +1619,7 @@ public void testRestoreNullScanAll() throws Exception { .isNull(); // Starting should run again - org.apache.paimon.table.source.TableScan.Plan plan = scan.plan(); + TableScan.Plan plan = scan.plan(); assertThat(plan.splits()).as("Starting should produce splits").isNotEmpty(); assertThat(scan.checkpoint()).as("Checkpoint should be set after new starting").isNotNull(); } @@ -1779,12 +1787,12 @@ public void testWithShardForwarding() throws Exception { // Shard 0 of 2: should get a subset of data ChainTableStreamScan scan0 = (ChainTableStreamScan) table.newStreamScan(); scan0.withShard(0, 2); - org.apache.paimon.table.source.TableScan.Plan plan0 = scan0.plan(); + TableScan.Plan plan0 = scan0.plan(); // Shard 1 of 2: should get the other subset ChainTableStreamScan scan1 = (ChainTableStreamScan) table.newStreamScan(); scan1.withShard(1, 2); - org.apache.paimon.table.source.TableScan.Plan plan1 = scan1.plan(); + TableScan.Plan plan1 = scan1.plan(); // Together both shards should produce non-empty results // (exact split depends on bucket hashing, but total should cover all data) @@ -1831,7 +1839,7 @@ public void testStreamingReadBothBranchesEmpty() throws Exception { // Both branches are empty — Phase 1 should produce no splits FileStoreTable table = paimonTable("chain_both_empty"); ChainTableStreamScan scan = (ChainTableStreamScan) table.newStreamScan(); - org.apache.paimon.table.source.TableScan.Plan plan1 = scan.plan(); + TableScan.Plan plan1 = scan.plan(); assertThat(plan1.splits()).as("Phase 1 with both branches empty should be empty").isEmpty(); // Phase 2: write new delta data and verify it streams through @@ -1839,7 +1847,7 @@ public void testStreamingReadBothBranchesEmpty() throws Exception { "INSERT INTO `chain_both_empty$branch_delta` PARTITION (dt = '20250808')" + " VALUES (1, 1, 'v1'), (2, 1, 'v2')"); - org.apache.paimon.table.source.TableScan.Plan plan2 = scan.plan(); + TableScan.Plan plan2 = scan.plan(); assertThat(plan2.splits()).as("Phase 2 should pick up new delta data").isNotEmpty(); } @@ -1888,7 +1896,7 @@ public void testStreamingReadDeltaOverwriteInPhase2() throws Exception { ChainTableStreamScan scan = (ChainTableStreamScan) table.newStreamScan(); // Phase 1: read initial delta data - org.apache.paimon.table.source.TableScan.Plan plan1 = scan.plan(); + TableScan.Plan plan1 = scan.plan(); assertThat(plan1.splits()).as("Phase 1 should produce splits").isNotEmpty(); // Phase 2: OVERWRITE the same partition on delta branch. @@ -1900,7 +1908,7 @@ public void testStreamingReadDeltaOverwriteInPhase2() throws Exception { // Verify scan.plan() does not throw after OVERWRITE for (int i = 0; i < 3; i++) { - org.apache.paimon.table.source.TableScan.Plan planN = scan.plan(); + TableScan.Plan planN = scan.plan(); assertThat(planN).as("plan() should not return null after OVERWRITE").isNotNull(); } } @@ -1952,7 +1960,7 @@ public void testStreamingReadRestoreAfterNewData() throws Exception { ChainTableStreamScan scan = (ChainTableStreamScan) table.newStreamScan(); // Phase 1: snapshot at dt=20250807 (latest), delta at dt=20250808 - org.apache.paimon.table.source.TableScan.Plan plan1 = scan.plan(); + TableScan.Plan plan1 = scan.plan(); int phase1Size = plan1.splits().size(); assertThat(phase1Size).as("Phase 1 should produce splits").isGreaterThan(0); @@ -1970,7 +1978,7 @@ public void testStreamingReadRestoreAfterNewData() throws Exception { // - Delta dt=20250808 excluded (older than latest snapshot dt=20250809) // - Delta dt=20250810 included (newer than dt=20250809) scan.restore(null); - org.apache.paimon.table.source.TableScan.Plan plan2 = scan.plan(); + TableScan.Plan plan2 = scan.plan(); assertThat(plan2.splits()) .as("Restore(null) should re-run Phase 1 with current data") .isNotEmpty(); @@ -1997,12 +2005,10 @@ public void testStreamingReadRejectsPartitionFilter() throws Exception { ChainTableStreamScan scan = (ChainTableStreamScan) table.newStreamScan(); // dt is the 4th field (index 3) in the schema: t1(0), t2(1), t3(2), dt(3) - org.apache.paimon.predicate.PredicateBuilder builder = - new org.apache.paimon.predicate.PredicateBuilder(table.rowType()); + PredicateBuilder builder = new PredicateBuilder(table.rowType()); // Partition-only filter should be rejected - org.apache.paimon.predicate.Predicate partitionFilter = - builder.equal(3, org.apache.paimon.data.BinaryString.fromString("20250808")); + Predicate partitionFilter = builder.equal(3, BinaryString.fromString("20250808")); assertThatThrownBy(() -> scan.withFilter(partitionFilter)) .isInstanceOf(UnsupportedOperationException.class) .hasMessageContaining("Partition filter is not supported"); @@ -2110,16 +2116,103 @@ public void testStreamingReadRejectsWithPartitionFilter() throws Exception { .hasMessageContaining("Partition filter is not supported"); // withPartitionFilter(PartitionPredicate) should be rejected - org.apache.paimon.predicate.PredicateBuilder ppBuilder = - new org.apache.paimon.predicate.PredicateBuilder( - table.schema().logicalPartitionType()); - org.apache.paimon.partition.PartitionPredicate pp = - org.apache.paimon.partition.PartitionPredicate.fromPredicate( + PredicateBuilder ppBuilder = new PredicateBuilder(table.schema().logicalPartitionType()); + PartitionPredicate pp = + PartitionPredicate.fromPredicate( table.schema().logicalPartitionType(), - ppBuilder.equal( - 0, org.apache.paimon.data.BinaryString.fromString("20250808"))); + ppBuilder.equal(0, BinaryString.fromString("20250808"))); assertThatThrownBy(() -> scan.withPartitionFilter(pp)) .isInstanceOf(UnsupportedOperationException.class) .hasMessageContaining("Partition filter is not supported"); } + + /** + * Tests that chain table streaming rejects checkpoint-align mode at job construction time, not + * at runtime. ChainSplit has no snapshotId and cannot participate in snapshot-aligned + * checkpoint grouping. + */ + @Test + public void testStreamingReadRejectsCheckpointAlign() throws Exception { + createChainTable("chain_align"); + setupChainTableBranches("chain_align"); + + sql( + "INSERT INTO `chain_align$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'v1')"); + + // Setting checkpoint-align.enabled on a chain table streaming read should throw + // at job construction time, not at runtime when ChainSplits are encountered. + assertThatThrownBy( + () -> + sEnv.executeSql( + "SELECT * FROM chain_align " + + "/*+ OPTIONS('source.checkpoint-align.enabled' = 'true') */")) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining( + "Chain table streaming is not compatible with checkpoint-align"); + } + + /** + * Tests that primary-key predicates do NOT affect partition discovery in chain table streaming + * Phase 1. This is the scenario from JingsongLi's review comment: + * + *

"if the latest snapshot partition no longer has k=1 but an older delta partition still + * does, SELECT ... WHERE k=1 can make this listing miss the latest snapshot partition and then + * include the old delta row, even though that partition should be considered outdated." + * + *

The test creates: snapshot@20250808 with t1=1,2; snapshot@20250809 with t1=3,4 (no t1=1); + * delta@20250808 with t1=1. Then filters on t1=1 (a primary key field). Partition discovery + * must still see both snapshot partitions so the chain boundary is correct. + */ + @Test + public void testStreamingReadPKFilterDoesNotAffectPartitionDiscovery() throws Exception { + createChainTable("chain_pk_filter"); + setupChainTableBranches("chain_pk_filter"); + + // Snapshot@20250808: has t1=1 and t1=2 + sql( + "INSERT INTO `chain_pk_filter$branch_snapshot` PARTITION (dt = '20250808')" + + " VALUES (1, 1, 'v1'), (2, 1, 'v2')"); + // Snapshot@20250809: has t1=3 and t1=4 (NO t1=1) + sql( + "INSERT INTO `chain_pk_filter$branch_snapshot` PARTITION (dt = '20250809')" + + " VALUES (3, 1, 'v3'), (4, 1, 'v4')"); + + // Delta@20250808: has t1=1 + sql( + "INSERT INTO `chain_pk_filter$branch_delta` PARTITION (dt = '20250808')" + + " VALUES (1, 2, 'delta_v1')"); + + // --- Part 1: Verify listPartitions() with a PK predicate --- + FileStoreTable mainTable = paimonTable("chain_pk_filter"); + FileStoreTable snapshotTable = + mainTable.copy( + java.util.Collections.singletonMap(CoreOptions.BRANCH.key(), "snapshot")); + + DataTableScan scan = snapshotTable.newScan(); + PredicateBuilder builder = new PredicateBuilder(snapshotTable.rowType()); + // t1 is field index 0, part of primary key (dt, t1). + // Only snapshot@20250808 has t1=1. + Predicate t1Equals1 = builder.equal(0, 1L); + scan.withFilter(t1Equals1); + + // listPartitions() must return BOTH snapshot partitions even though only + // 20250808 contains t1=1. If it returned only 20250808, the chain boundary + // would be wrong and stale data could be included. + List partitions = scan.listPartitions(); + assertThat(partitions) + .as( + "listPartitions() must return all snapshot partitions even with a PK filter. " + + "If only dt=20250808 is returned, the chain boundary is wrong.") + .hasSize(2); + + // --- Part 2: Verify filtered batch SELECT returns correct data --- + // Chain-merged batch view: snapshot@20250808(t1=1,2), snapshot@20250809(t1=3,4). + // WHERE t1 = 1 should return only the snapshot row (t1=1, dt=20250808). + List filtered = collectResult("SELECT * FROM chain_pk_filter WHERE t1 = 1"); + assertThat(filtered) + .as("WHERE t1=1 should find the snapshot row at dt=20250808") + .hasSize(1) + .containsExactly("+I[1, 1, v1, 20250808]"); + } }