Skip to content

commitLakeTableSnapshot writes stale snapshot path after dropTable, leaving the tiering pipeline permanently stuck #3483

Description

@binary-signal

Search before asking

  • I searched in the issues and found nothing similar.

Fluss version

main (development)

Please describe the bug 🐞

When a table is dropped while a lake-tiering commit is in flight (or queued on the tiering client), the coordinator can:

  1. Resurrect the dropped tableId's /laketable znode at /fluss/tabletservers/tables//laketable, because commitLakeTableSnapshot does no validity check against the table's lifecycle state.

  2. Leak the old tableId into the new snapshot pointer: after the table is recreated with a new tableId, /fluss/tabletservers/tables//laketable can contain a tiered_offsets URI pointing at the old -/metadata/.offsets. Since the cleanup pipeline removed that file (or the file was never re-stored after the recreate), every subsequent getLakeSnapshot for the new tableId fails with FileNotFoundException, and the TieringSourceEnumerator retries forever.

The bug is silent: writes and reads of the live table continue to work, but the lake-tiering service is permanently broken for that table until the cluster state is manually repaired.

2026-06-13 19:53:37,373 WARN  org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator [] - Fail to generate Tiering splits for table cdstream_db.timeseries.
org.apache.flink.util.FlinkRuntimeException: Failed to get table snapshot for table cdstream_db.timeseries
	at org.apache.fluss.flink.tiering.source.split.TieringSplitGenerator.generateTableSplits(TieringSplitGenerator.java:78) ~[fluss-flink-2.2-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
	at org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator.generateTieringSplits(TieringSourceEnumerator.java:453) ~[fluss-flink-2.2-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
	at org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator.generateAndAssignSplits(TieringSourceEnumerator.java:363) ~[fluss-flink-2.2-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
	at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$3(ExecutorNotifier.java:131) ~[flink-dist-2.2.1.jar:2.2.1]
	at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) ~[flink-dist-2.2.1.jar:2.2.1]
	at org.apache.flink.util.MdcUtils.lambda$wrapRunnable$1(MdcUtils.java:70) ~[flink-dist-2.2.1.jar:2.2.1]
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
	at java.base/java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.util.concurrent.ExecutionException: org.apache.fluss.exception.UnknownServerException: org.apache.fluss.exception.FlussRuntimeException: Failed to get lake table snapshot for table: cdstream_db.timeseries, table id: 62
	at org.apache.fluss.server.RpcServiceBase.lambda$getLakeSnapshot$2(RpcServiceBase.java:541)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.io.FileNotFoundException: No such file or directory: s3://fluss/remote-data/lake/cdstream_db/timeseries-58/metadata/cf19d6cc-4a98-4ce0-b518-67dd182fbbe6.offsets
	at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3866)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.extractOrFetchSimpleFileStatus(S3AFileSystem.java:5401)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:1465)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:1441)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:976)
	at org.apache.fluss.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:72)
	at org.apache.fluss.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:39)
	at org.apache.fluss.fs.PluginFileSystemWrapper$ClassLoaderFixingFileSystem.open(PluginFileSystemWrapper.java:95)
	at org.apache.fluss.server.zk.data.lake.LakeTable.toLakeTableSnapshot(LakeTable.java:203)
	at org.apache.fluss.server.zk.data.lake.LakeTable.toLakeTableSnapshot(LakeTable.java:198)
	at org.apache.fluss.server.zk.data.lake.LakeTable.getOrReadLatestTableSnapshot(LakeTable.java:142)
	at org.apache.fluss.server.zk.ZooKeeperClient.getLakeTableSnapshot(ZooKeeperClient.java:1383)
	at org.apache.fluss.server.RpcServiceBase.lambda$getLakeSnapshot$2(RpcServiceBase.java:513)
	... 3 more

	at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown Source) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source) ~[?:?]
	at org.apache.fluss.flink.tiering.source.split.TieringSplitGenerator.generateTableSplits(TieringSplitGenerator.java:67) ~[fluss-flink-2.2-1.0-SNAPSHOT.jar:1.0-SNAPSHOT]
	... 11 more
Caused by: org.apache.fluss.exception.UnknownServerException: org.apache.fluss.exception.FlussRuntimeException: Failed to get lake table snapshot for table: cdstream_db.timeseries, table id: 62
	at org.apache.fluss.server.RpcServiceBase.lambda$getLakeSnapshot$2(RpcServiceBase.java:541)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.io.FileNotFoundException: No such file or directory: s3://fluss/remote-data/lake/cdstream_db/timeseries-58/metadata/cf19d6cc-4a98-4ce0-b518-67dd182fbbe6.offsets
	at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3866)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.extractOrFetchSimpleFileStatus(S3AFileSystem.java:5401)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:1465)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:1441)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:976)
	at org.apache.fluss.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:72)
	at org.apache.fluss.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:39)
	at org.apache.fluss.fs.PluginFileSystemWrapper$ClassLoaderFixingFileSystem.open(PluginFileSystemWrapper.java:95)
	at org.apache.fluss.server.zk.data.lake.LakeTable.toLakeTableSnapshot(LakeTable.java:203)
	at org.apache.fluss.server.zk.data.lake.LakeTable.toLakeTableSnapshot(LakeTable.java:198)
	at org.apache.fluss.server.zk.data.lake.LakeTable.getOrReadLatestTableSnapshot(LakeTable.java:142)
	at org.apache.fluss.server.zk.ZooKeeperClient.getLakeTableSnapshot(ZooKeeperClient.java:1383)
	at org.apache.fluss.server.RpcServiceBase.lambda$getLakeSnapshot$2(RpcServiceBase.java:513)
	... 3 more

How to reproduce

  1. Start Fluss with datalake.format=paimon, remote.data.dir=s3://… and a primary-key table
    db.t with table.datalake.enabled=true.
  2. Write enough data to trigger at least one successful tier-snapshot for db.t. Confirm
    s3://…/lake/db/t-<id1>/metadata/<uuid>.offsets exists and
    /fluss/tabletservers/tables/<id1>/laketable contains a lake_snapshots entry pointing at it.
  3. dropTable db.t and immediately createTable db.t again. (In our case this is the
    default behaviour of a data-load utility that calls dropTable + createTable per run.)
  4. Observe the post-drop ZK state:
    • /fluss/tabletservers/tables/<id1> still exists with [laketable] as its only child
      (the rest of the znode tree was cleaned, but the /laketable child was re-created by a
      late commit).
    • /fluss/tabletservers/tables/<id2>/laketable contains tiered_offsets referencing
      lake/db/t-<id1>/metadata/<uuid>.offsets (an old-tableId path).
  5. Observe the TieringSource enumerator:
    WARN  TieringSourceEnumerator - Fail to generate Tiering splits for table db.t.
    Caused by: FlussRuntimeException: Failed to get lake table snapshot for table: db.t,
        table id: <id2>
    Caused by: FileNotFoundException:
        s3://…/lake/db/t-<id1>/metadata/<uuid>.offsets
    
    currentFailedTableEpochs: {<id2>=N} grows unbounded.

Observations

ZK state in a running cluster after dev-loop drop+recreate cycles:

/fluss/tabletservers/tables                       [58, 62, 63, 64, 65]
/fluss/tabletservers/tables/58                    [laketable]    ← orphan
/fluss/tabletservers/tables/62                    [buckets, laketable]
/fluss/tabletservers/tables/62/laketable          (see below)

/fluss/tabletservers/tables/62/laketable content:

{
  "version": 2,
  "lake_snapshots": [
    {
      "snapshot_id": 1,
      "tiered_offsets": "s3://fluss/remote-data/lake/cdstream_db/timeseries-58/metadata/cf19d6cc-4a98-4ce0-b518-67dd182fbbe6.offsets"
    }
  ]
}

S3:

  • lake/cdstream_db/timeseries-58/ directory key remains; the offsets file is gone.
  • lake/cdstream_db/timeseries-62/ does not exist.
  • Sibling tables timeseries_norm-63/, correlations-64/ are healthy (file present, ids match).

timeseries_norm and correlations are uninvolved because their previous incarnations
didn't complete a tier-snapshot before being dropped; only the table whose previous incarnation
had a successful tier-snapshot is caught in the race.

Root cause analysis

Drop flow (CoordinatorEventProcessor.processDropTable,
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java:861):

private void processDropTable(DropTableEvent dropTableEvent) {
    ...
    coordinatorContext.queueTableDeletion(...);
    tableManager.onDeleteTable(tableId);
    if (dropTableEvent.isAutoPartitionTable()) { ... }
    if (dropTableEvent.isDataLakeEnabled()) {
        lakeTableTieringManager.removeLakeTable(tableId);     // ← in-memory only
    }
    ...
}

LakeTableTieringManager.removeLakeTable
(fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java:324)
is purely in-memory:

public void removeLakeTable(long tableId) {
    inWriteLock(lock, () -> {
        tablePaths.remove(tableId);
        tableLakeFreshness.remove(tableId);
        lastTieringResult.remove(tableId);
        ...
    });
}

The actual ZK cleanup of /fluss/tabletservers/tables/<id> (which includes any /laketable
child) is deleteTableAssignment(tableId), called later from completeDeleteTable
(fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java:336+).

Meanwhile, commitLakeTableSnapshot
(fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java:2033)
calls LakeTableHelper.registerLakeTableSnapshotV2(tableId, lakeSnapshotMetadata, ...):

zkClient.upsertLakeTable(tableId, lakeTable, optPreviousTable.isPresent());

This unconditionally upserts. There is no check that:

  • the tableId is still present in coordinatorContext,
  • the table hasn't been queued for deletion,
  • the lakeSnapshotMetadata.tieredOffsetsFilePath's -<id> segment matches the commit's
    tableId argument.

So any commit landing in the window between processDropTable and the eventual recursive
delete of /fluss/tabletservers/tables/<oldId> will:

  • recreate the /laketable child under the old id (since upsertLakeTable will create parent
    znodes as needed for LakeTableZNode.path(tableId) writes), or
  • write the stale path into the new id's /laketable if the commit's tableId is the new id
    but the metadata it carries was prepared under the old id.

Suggested fixes

Any one of these would close the race, ideally all three:

  1. Reject commits for non-existent tableIds.
    In CoordinatorEventProcessor.processCommitLakeTableSnapshotEvent, before calling
    registerLakeTableSnapshotV2, look up coordinatorContext.getTableInfoById(tableId) and
    refuse the commit (return an ApiError) if the table has been dropped / queued for deletion.

  2. Validate path-tableId consistency.
    In LakeTableHelper.registerLakeTableSnapshotV2 (or earlier at the RPC boundary), parse the
    incoming tieredOffsetsFilePath and assert that the directory segment matches
    FlussPaths.remoteLakeTableSnapshotOffsetPath(remoteDataDir, tablePath, tableId)'s
    expected layout. Reject mismatches with a clear error.

  3. Wipe the lake znode synchronously in processDropTable.
    In processDropTable, before issuing the asynchronous tablet-bucket cleanup, delete
    LakeTableZNode.path(tableId) synchronously (and the parent
    /fluss/tabletservers/tables/<id> if safe to do at that point). That removes the resurrection
    target and makes any late commit fail cleanly.

(1) alone is enough to close the user-visible failure mode. (2) defends in depth against any
client that constructs a malformed commit. (3) is the cleanest fix but interacts with the
existing two-phase delete pipeline.

Workaround for affected clusters

Stop the tiering job, then via zkCli.sh:

delete /fluss/tabletservers/tables/<newId>/laketable
delete /fluss/tabletservers/tables/<oldId>/laketable
delete /fluss/tabletservers/tables/<oldId>

Restart the tiering job. It will re-prepare a fresh offsets file for the new tableId.

Relevant source pointers

  • fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java:861-889processDropTable
  • fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java:2000-2052commitLakeTableSnapshot event handler
  • fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java:830-900prepareLakeTable / commitLakeTableSnapshot RPCs
  • fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java:336-360dropTable / completeDeleteTable
  • fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java:324-345removeLakeTable
  • fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java:92-125registerLakeTableSnapshotV2
  • fluss-server/src/main/java/org/apache/fluss/server/zk/data/lake/LakeTableHelper.java:183-203storeLakeTableOffsetsFile
  • fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:489-493deleteTableAssignment
  • fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java:438-444TableIdsZNode.path()
  • fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java:665-678LakeTableZNode

Solution

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No fields configured for Bug.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions