Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion docs/docs/primary-key-table/chain-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ ALTER TABLE `default`.`t$branch_delta` SET (
Notice that:
- Chain table is only supported for primary key table, which means you should define `bucket` and `bucket-key` for the table.
- Chain table should ensure that the schema of each branch is consistent.
- Both Spark and Flink batch read/write are supported. Flink streaming read/write is not supported.
- Chain compact is not supported for now, and it will be supported later.
- Deletion vector is not supported for chain table.

Expand Down Expand Up @@ -191,6 +190,58 @@ you will get the following result:
+---+----+-----+
```

## Streaming Read

Chain tables support Flink streaming read. A streaming read job operates in two phases:

1. **Full load phase**: Produces a full result by reading the latest snapshot partition (per group)
and delta partitions that come after it. For each partition group, only the most recent snapshot
partition is included — older snapshot partitions are considered outdated and excluded.
2. **Incremental phase**: Continuously reads new commits from the delta branch as they arrive.

### Write-Side Requirements

Streaming read assumes the chain table follows the standard write pattern described at the top of
this page:

- **Snapshot branch** receives periodic full data (e.g., a daily ODS binlog dump job writes via
`INSERT OVERWRITE t$branch_snapshot`). Each snapshot partition represents a complete view of the
data at that point in time.
- **Delta branch** receives incremental changes between snapshots (e.g., a batch job writes the
current day's new/updated records via `INSERT INTO t$branch_delta`). Each delta partition
contains only the changes for that period. The delta branch must have a
[changelog producer](./changelog-producer) configured (e.g., `'changelog-producer' = 'input'`)
for streaming read to work.

The streaming read relies on this pattern to produce correct results. After the full load phase,
only new delta branch commits are picked up — writes to the snapshot branch do not trigger
streaming output. To incorporate a new snapshot, restart the streaming job.

### Usage

```sql
SET 'execution.runtime-mode' = 'streaming';

INSERT INTO downstream_sink SELECT * FROM default.t;
```

### Limitations

- The incremental phase only monitors the **delta branch**. Writes to the snapshot branch are
not detected until the streaming job is restarted.
- The chain-table-aware streaming scan only supports the default startup mode (`latest-full`).
When the user specifies an explicit starting position — such as `scan.snapshot-id`,
`scan.timestamp-millis`, `scan.mode = 'latest'`, or `consumer-id` with existing progress —
an `UnsupportedOperationException` is thrown. To use standard streaming read without chain
table logic, read from a specific branch table (e.g., `t$branch_delta`) instead of the main
table.
- Partition filters are not supported in chain table streaming reads. Specifying a partition
filter — either via a `WHERE` clause on partition columns or the `scan.partitions` table
option — throws an `UnsupportedOperationException`. This is because the chain table streaming
scan determines which partitions to read based on the chain-merge logic across snapshot and
delta branches, and applying a partition filter would interfere with this logic. To read a
specific partition, use batch mode instead.

## Group Partition

In real-world scenarios, a table often has multiple partition dimensions. For example, data may be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,18 +239,7 @@ public Plan plan() {
PredicateBuilder builder = new PredicateBuilder(tableSchema.logicalPartitionType());
for (Split split : mainScan.plan().splits()) {
DataSplit dataSplit = (DataSplit) split;
HashMap<String, String> fileBucketPathMapping = new HashMap<>();
HashMap<String, String> fileBranchMapping = new HashMap<>();
for (DataFileMeta file : dataSplit.dataFiles()) {
fileBucketPathMapping.put(file.fileName(), ((DataSplit) split).bucketPath());
fileBranchMapping.put(file.fileName(), options.scanFallbackSnapshotBranch());
}
splits.add(
new ChainSplit(
dataSplit.partition(),
dataSplit.dataFiles(),
fileBranchMapping,
fileBucketPathMapping));
splits.add(ChainSplit.from(dataSplit, options.scanFallbackSnapshotBranch()));
}

Set<BinaryRow> snapshotPartitions =
Expand Down Expand Up @@ -520,8 +509,11 @@ public TableRead withIOManager(IOManager ioManager) {

@Override
public RecordReader<InternalRow> createReader(Split split) throws IOException {
checkArgument(split instanceof ChainSplit);
return fallbackRead.createReader(split);
if (split instanceof ChainSplit || split instanceof DataSplit) {
return fallbackRead.createReader(split);
}
throw new IllegalArgumentException(
"Unsupported split type for chain table read: " + split.getClass().getName());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.StartupMode;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.StreamDataTableScan;

import java.util.Map;

/**
* Chain-table-aware extension of {@link FallbackReadFileStoreTable}. Inherits the batch read
* behavior (partition-level fallback between the current branch and {@link ChainGroupReadTable}),
* and additionally overrides {@link #newStreamScan()} to return a chain-aware {@link
* ChainTableStreamScan} that performs a partition-level full load followed by incremental
* delta-only streaming.
*/
public class ChainTableFileStoreTable extends FallbackReadFileStoreTable {

public ChainTableFileStoreTable(FileStoreTable wrapped, FileStoreTable other) {
super(wrapped, other, true);
}

@Override
public StreamDataTableScan newStreamScan() {
CoreOptions coreOptions = wrapped.coreOptions();

StartupMode effectiveMode = coreOptions.startupMode();
boolean hasConsumerProgress =
coreOptions.consumerId() != null && !coreOptions.consumerIgnoreProgress();
if (effectiveMode != StartupMode.LATEST_FULL || hasConsumerProgress) {
String reason =
describeUnsupportedMode(coreOptions, effectiveMode, hasConsumerProgress);
throw new UnsupportedOperationException(
"Chain table streaming read does not support startup mode '"
+ reason
+ "'. "
+ "Chain table streaming only supports the default 'latest-full' mode, which first "
+ "produces a partition-level full result and then continuously reads incremental "
+ "data from the delta branch.\n"
+ "Suggestions:\n"
+ " - To use chain table streaming: remove the explicit scan mode/position settings "
+ "so that the default 'latest-full' mode is used.\n"
+ " - To use standard streaming read without chain table logic: read from a "
+ "specific branch table (e.g., 't$branch_delta') instead of the main table.");
}

// Inherited other() returns the ChainGroupReadTable directly.
ChainGroupReadTable chainGroupReadTable = (ChainGroupReadTable) other();

return new ChainTableStreamScan(chainGroupReadTable);
}

private static String describeUnsupportedMode(
CoreOptions coreOptions, StartupMode effectiveMode, boolean hasConsumerProgress) {
if (hasConsumerProgress) {
return "consumer-id with existing progress";
}
switch (effectiveMode) {
case LATEST:
return "scan.mode=latest";
case FROM_SNAPSHOT:
if (coreOptions.scanSnapshotId() != null) {
return "scan.snapshot-id=" + coreOptions.scanSnapshotId();
}
if (coreOptions.scanTagName() != null) {
return "scan.tag-name=" + coreOptions.scanTagName();
}
if (coreOptions.scanWatermark() != null) {
return "scan.watermark=" + coreOptions.scanWatermark();
}
return "from-snapshot";
case FROM_TIMESTAMP:
if (coreOptions.scanTimestampMills() != null) {
return "scan.timestamp-millis=" + coreOptions.scanTimestampMills();
}
if (coreOptions.scanTimestamp() != null) {
return "scan.timestamp=" + coreOptions.scanTimestamp();
}
return "from-timestamp";
default:
return effectiveMode.name().toLowerCase().replace('_', '-');
}
}

@Override
public FileStoreTable copy(Map<String, String> dynamicOptions) {
return new ChainTableFileStoreTable(
wrapped.copy(dynamicOptions), other().copy(rewriteOtherOptions(dynamicOptions)));
}

@Override
public FileStoreTable copy(TableSchema newTableSchema) {
return new ChainTableFileStoreTable(
wrapped.copy(newTableSchema),
other().copy(newTableSchema.copy(rewriteOtherOptions(newTableSchema.options()))));
}

@Override
public FileStoreTable copyWithoutTimeTravel(Map<String, String> dynamicOptions) {
return new ChainTableFileStoreTable(
wrapped.copyWithoutTimeTravel(dynamicOptions),
other().copyWithoutTimeTravel(rewriteOtherOptions(dynamicOptions)));
}

@Override
public FileStoreTable copyWithLatestSchema() {
return new ChainTableFileStoreTable(
wrapped.copyWithLatestSchema(), other().copyWithLatestSchema());
}

@Override
public FileStoreTable switchToBranch(String branchName) {
return new ChainTableFileStoreTable(switchWrappedToBranch(branchName), other());
}
}
Loading
Loading