From 6a81e3c119889c501c0d6419a7aeee8e2a36869f Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Tue, 30 Jun 2026 13:54:59 +0100 Subject: [PATCH] [server] Set table_id on prepareLakeTableSnapshot error response --- .../FlussTableLakeSnapshotCommitterTest.java | 36 +++++++++++++++++++ .../coordinator/CoordinatorService.java | 6 ++++ 2 files changed, 42 insertions(+) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java index 51bd3bc1da..875190f4d7 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java @@ -18,8 +18,10 @@ package org.apache.fluss.flink.tiering.committer; import org.apache.fluss.client.metadata.LakeSnapshot; +import org.apache.fluss.exception.ApiException; import org.apache.fluss.exception.LakeTableSnapshotNotExistException; import org.apache.fluss.flink.utils.FlinkTestBase; +import org.apache.fluss.fs.FsPath; import org.apache.fluss.lake.committer.LakeCommitResult; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; @@ -33,9 +35,11 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -207,6 +211,38 @@ void testCompatibilityWithOldCommitter(boolean isPartitioned) throws Exception { assertThat(lakeTableSnapshot.getBucketLogEndOffset()).isEqualTo(logEndOffsets); } + @Test + void testPrepareLakeSnapshotSurfacesServerError() throws Exception { + TablePath tablePath = TablePath.of("fluss", "test_prepare_error"); + Tuple2> tableIdAndPartitions = createTable(tablePath, false); + long tableId = tableIdAndPartitions.f0; + Map offsets = mockLogEndOffsets(tableId, tableIdAndPartitions.f1); + + String offsetsPath = + flussTableLakeSnapshotCommitter.prepareLakeSnapshot(tableId, tablePath, offsets); + flussTableLakeSnapshotCommitter.commit( + tableId, + 1L, + offsetsPath, + null, + Collections.emptyMap(), + Collections.emptyMap(), + null); + + // delete that file so the server's merge-read fails on the next prepare + FsPath offsetsFile = new FsPath(offsetsPath); + offsetsFile.getFileSystem().delete(offsetsFile, false); + + assertThatThrownBy( + () -> + flussTableLakeSnapshotCommitter.prepareLakeSnapshot( + tableId, tablePath, offsets)) + .isInstanceOf(IOException.class) + .hasMessageContaining("Fail to prepare commit table lake snapshot") + .rootCause() + .isInstanceOf(ApiException.class); + } + private Map mockLogEndOffsets(long tableId, Collection partitionsIds) { Map logEndOffsets = new HashMap<>(); for (int bucket = 0; bucket < 3; bucket++) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index 0c9d3c729a..0772f5e3db 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -1050,7 +1050,13 @@ public CompletableFuture prepareLakeTableSnaps pbPrepareLakeTableRespForTable.setLakeTableOffsetsPath( fsPath.toString()); } catch (Exception e) { + long tableId = bucketOffsets.getTableId(); + LOG.warn( + "Failed to prepare lake table snapshot for table {}.", + tableId, + e); Errors error = ApiError.fromThrowable(e).error(); + pbPrepareLakeTableRespForTable.setTableId(tableId); pbPrepareLakeTableRespForTable.setError( error.code(), error.message()); }