Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.fluss.flink.tiering.committer;

import org.apache.fluss.client.metadata.LakeSnapshot;
import org.apache.fluss.exception.ApiException;
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
import org.apache.fluss.flink.utils.FlinkTestBase;
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.lake.committer.LakeCommitResult;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePath;
Expand All @@ -33,9 +35,11 @@

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -207,6 +211,38 @@ void testCompatibilityWithOldCommitter(boolean isPartitioned) throws Exception {
assertThat(lakeTableSnapshot.getBucketLogEndOffset()).isEqualTo(logEndOffsets);
}

@Test
void testPrepareLakeSnapshotSurfacesServerError() throws Exception {
TablePath tablePath = TablePath.of("fluss", "test_prepare_error");
Tuple2<Long, Collection<Long>> tableIdAndPartitions = createTable(tablePath, false);
long tableId = tableIdAndPartitions.f0;
Map<TableBucket, Long> offsets = mockLogEndOffsets(tableId, tableIdAndPartitions.f1);

String offsetsPath =
flussTableLakeSnapshotCommitter.prepareLakeSnapshot(tableId, tablePath, offsets);
flussTableLakeSnapshotCommitter.commit(
tableId,
1L,
offsetsPath,
null,
Collections.emptyMap(),
Collections.emptyMap(),
null);

// delete that file so the server's merge-read fails on the next prepare
FsPath offsetsFile = new FsPath(offsetsPath);
offsetsFile.getFileSystem().delete(offsetsFile, false);

assertThatThrownBy(
() ->
flussTableLakeSnapshotCommitter.prepareLakeSnapshot(
tableId, tablePath, offsets))
.isInstanceOf(IOException.class)
.hasMessageContaining("Fail to prepare commit table lake snapshot")
.rootCause()
.isInstanceOf(ApiException.class);
}

private Map<TableBucket, Long> mockLogEndOffsets(long tableId, Collection<Long> partitionsIds) {
Map<TableBucket, Long> logEndOffsets = new HashMap<>();
for (int bucket = 0; bucket < 3; bucket++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,13 @@ public CompletableFuture<PrepareLakeTableSnapshotResponse> prepareLakeTableSnaps
pbPrepareLakeTableRespForTable.setLakeTableOffsetsPath(
fsPath.toString());
} catch (Exception e) {
long tableId = bucketOffsets.getTableId();
LOG.warn(
"Failed to prepare lake table snapshot for table {}.",
tableId,
e);
Errors error = ApiError.fromThrowable(e).error();
pbPrepareLakeTableRespForTable.setTableId(tableId);
pbPrepareLakeTableRespForTable.setError(
error.code(), error.message());
}
Expand Down
Loading