Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions flink-filesystems/flink-gs-fs-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ under the License.
<!-- in the GCS file system documentation. -->
<fs.gs.sdk.version>2.29.1</fs.gs.sdk.version>
<fs.gs.connector.version>hadoop3-2.2.18</fs.gs.connector.version>
<fs.gs.cloud.nio.version>0.128.7</fs.gs.cloud.nio.version>
<!-- Set this to the highest version of grpc artifacts from gcs-connector and google-cloud-storage -->
<fs.gs.grpc.version>1.59.1</fs.gs.grpc.version>
</properties>
Expand Down Expand Up @@ -149,6 +150,13 @@ under the License.
</exclusions>
</dependency>

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-nio</artifactId>
<version>${fs.gs.cloud.nio.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.fs.gs.storage;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.fs.gs.utils.BlobUtils;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -60,8 +61,7 @@ public GSBlobStorage.WriteChannel writeBlob(GSBlobIdentifier blobIdentifier) {
LOGGER.trace("Creating writable blob for identifier {}", blobIdentifier);
Preconditions.checkNotNull(blobIdentifier);

BlobInfo blobInfo = BlobInfo.newBuilder(blobIdentifier.getBlobId()).build();
com.google.cloud.WriteChannel writeChannel = storage.writer(blobInfo);
com.google.cloud.WriteChannel writeChannel = createWriteChannel(blobIdentifier);
return new WriteChannel(blobIdentifier, writeChannel);
}

Expand All @@ -75,12 +75,23 @@ public GSBlobStorage.WriteChannel writeBlob(
Preconditions.checkNotNull(blobIdentifier);
Preconditions.checkArgument(chunkSize.getBytes() > 0);

BlobInfo blobInfo = BlobInfo.newBuilder(blobIdentifier.getBlobId()).build();
com.google.cloud.WriteChannel writeChannel = storage.writer(blobInfo);
com.google.cloud.WriteChannel writeChannel = createWriteChannel(blobIdentifier);
writeChannel.setChunkSize((int) chunkSize.getBytes());
return new WriteChannel(blobIdentifier, writeChannel);
}

/**
* Creates a write channel for the given blob identifier with appropriate preconditions.
*
* @param blobIdentifier The blob identifier to create the write channel for
* @return The write channel with appropriate write options
*/
private com.google.cloud.WriteChannel createWriteChannel(GSBlobIdentifier blobIdentifier) {
BlobInfo existingBlob = storage.get(blobIdentifier.bucketName, blobIdentifier.objectName);
BlobInfo blobInfo = BlobInfo.newBuilder(blobIdentifier.getBlobId()).build();
return storage.writer(blobInfo, getBlobWriteOption(existingBlob));
}

@Override
public void createBlob(GSBlobIdentifier blobIdentifier) {
LOGGER.trace("Creating empty blob {}", blobIdentifier);
Expand Down Expand Up @@ -153,11 +164,74 @@ public void compose(
for (GSBlobIdentifier blobIdentifier : sourceBlobIdentifiers) {
builder.addSource(blobIdentifier.objectName);
}
BlobInfo existingTargetBlob =
storage.get(targetBlobIdentifier.bucketName, targetBlobIdentifier.objectName);
Storage.BlobTargetOption precondition = getBlobTargetOption(existingTargetBlob);
Storage.ComposeRequest request = builder.setTargetOptions(precondition).build();

Storage.ComposeRequest request = builder.build();
storage.compose(request);
}

/**
* Generic helper to create blob options with appropriate preconditions. This ensures that the
* operations become idempotent or atomic, allowing the GCS client to safely retry the 503
* errors.
*
* <p>For a target object that does not yet exist, sets the DoesNotExist precondition. This will
* cause the request to fail if the object is created before the request runs.
*
* <p>If the destination already exists, sets a generation-match precondition. This will cause
* the request to fail if the existing object's generation changes before the request runs.
*
* @param blobInfo The blob info to create the option for, or null if the blob does not exist
* @param doesNotExistSupplier Supplier for the doesNotExist option
* @param generationMatchFunction Function to create generationMatch option from generation
* number
* @param <T> The type of the blob option (BlobTargetOption or BlobWriteOption)
* @return The appropriate option for the blob
*/
@VisibleForTesting
<T> T getBlobOption(
BlobInfo blobInfo,
java.util.function.Supplier<T> doesNotExistSupplier,
java.util.function.Function<Long, T> generationMatchFunction) {
if (blobInfo == null) {
return doesNotExistSupplier.get();
} else {
return generationMatchFunction.apply(blobInfo.getGeneration());
}
}

/**
* Creates the appropriate BlobTargetOption for the given blob info.
*
* @param blobInfo The blob info to create the target option for, or null if the blob does not
* exist
* @return The appropriate target option for the blob
*/
@VisibleForTesting
Storage.BlobTargetOption getBlobTargetOption(BlobInfo blobInfo) {
return getBlobOption(
blobInfo,
Storage.BlobTargetOption::doesNotExist,
Storage.BlobTargetOption::generationMatch);
}

/**
* Creates the appropriate BlobWriteOption for the given blob info.
*
* @param blobInfo The blob info to create the write option for, or null if the blob does not
* exist
* @return The appropriate write option for the blob
*/
@VisibleForTesting
Storage.BlobWriteOption getBlobWriteOption(BlobInfo blobInfo) {
return getBlobOption(
blobInfo,
Storage.BlobWriteOption::doesNotExist,
Storage.BlobWriteOption::generationMatch);
}

@Override
public List<Boolean> delete(Iterable<GSBlobIdentifier> blobIdentifiers) {
LOGGER.trace("Deleting blobs {}", blobIdentifiers);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.fs.gs.storage;

import org.apache.flink.configuration.MemorySize;

import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.IOException;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;

/** Test {@link GSBlobStorageImpl}. */
class GSBlobStorageImplTest {

private static final String TEST_BUCKET = "test-bucket";
private GSBlobStorageImpl blobStorage;
private Storage storage;

@BeforeEach
void setUp() {
storage = LocalStorageHelper.getOptions().getService();
blobStorage = new GSBlobStorageImpl(storage);
}

@ParameterizedTest(name = "{0} with null BlobInfo")
@MethodSource("provideOptionTypes")
void testGetBlobOptionWithNullBlobInfo(
String optionType,
Supplier<Object> doesNotExistSupplier,
Function<Long, Object> generationMatchFunction,
Object expectedDoesNotExist) {
// When blob doesn't exist (null), should return doesNotExist option
Object result =
blobStorage.getBlobOption(null, doesNotExistSupplier, generationMatchFunction);

assertThat(result).isNotNull();
assertThat(result).isEqualTo(expectedDoesNotExist);
}

@ParameterizedTest(name = "{0} with existing BlobInfo")
@MethodSource("provideOptionTypes")
void testGetBlobOptionWithExistingBlobInfo(
String optionType,
Supplier<Object> doesNotExistSupplier,
Function<Long, Object> generationMatchFunction,
Object expectedDoesNotExist) {
// Create a BlobInfo with a generation number
Long generation = 12345L;
BlobId blobId = BlobId.of("test-bucket", "test-object", generation);
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build();

// When blob exists, should return generationMatch option
Object result =
blobStorage.getBlobOption(blobInfo, doesNotExistSupplier, generationMatchFunction);

assertThat(result).isNotNull();
assertThat(result.toString()).contains(generation.toString());
}

@ParameterizedTest(name = "{0} with zero generation")
@MethodSource("provideOptionTypes")
void testGetBlobOptionWithZeroGeneration(
String optionType,
Supplier<Object> doesNotExistSupplier,
Function<Long, Object> generationMatchFunction,
Object expectedDoesNotExist) {
// Test edge case: blob with generation 0
BlobId blobId = BlobId.of("test-bucket", "test-object", 0L);
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build();

Object result =
blobStorage.getBlobOption(blobInfo, doesNotExistSupplier, generationMatchFunction);

assertThat(result).isNotNull();
assertThat(result.toString()).contains("0");
}

@Test
void testWriteBlob() throws IOException {
// Note: LocalStorageHelper doesn't properly track blob generation numbers,
// so we can only test writing to new blobs in this test
GSBlobIdentifier blobIdentifier = new GSBlobIdentifier(TEST_BUCKET, "test-blob");

// Write to the blob
GSBlobStorage.WriteChannel writeChannel = blobStorage.writeBlob(blobIdentifier);

assertThat(writeChannel).isNotNull();

// Write some data
byte[] data = "test data".getBytes();
int written = writeChannel.write(data, 0, data.length);
assertThat(written).isEqualTo(data.length);

writeChannel.close();

// Verify the blob was created
Blob blob = storage.get(blobIdentifier.getBlobId());
assertThat(blob).isNotNull();
assertThat(blob.getContent()).isEqualTo(data);
}

@Test
void testWriteBlobWithChunkSize() throws IOException {
GSBlobIdentifier blobIdentifier = new GSBlobIdentifier(TEST_BUCKET, "chunked-blob");
MemorySize chunkSize = MemorySize.parse("2b");

GSBlobStorage.WriteChannel writeChannel = blobStorage.writeBlob(blobIdentifier, chunkSize);

assertThat(writeChannel).isNotNull();

byte[] data = "test data with chunks".getBytes();
int written = writeChannel.write(data, 0, data.length);
assertThat(written).isEqualTo(data.length);

writeChannel.close();

// Verify the blob was created
Blob blob = storage.get(blobIdentifier.getBlobId());
assertThat(blob).isNotNull();
assertThat(blob.getContent()).isEqualTo(data);
}

@Test
void testComposeMultipleBlobs() {
// Note: LocalStorageHelper doesn't fully implement the compose operation,
// so we can only verify that the method completes without error

// Create source blobs with content
GSBlobIdentifier source1 = new GSBlobIdentifier(TEST_BUCKET, "source1");
GSBlobIdentifier source2 = new GSBlobIdentifier(TEST_BUCKET, "source2");
GSBlobIdentifier source3 = new GSBlobIdentifier(TEST_BUCKET, "source3");

byte[] data1 = "part1".getBytes();
byte[] data2 = "part2".getBytes();
byte[] data3 = "part3".getBytes();

storage.create(BlobInfo.newBuilder(source1.getBlobId()).build(), data1);
storage.create(BlobInfo.newBuilder(source2.getBlobId()).build(), data2);
storage.create(BlobInfo.newBuilder(source3.getBlobId()).build(), data3);

// Verify source blobs exist
assertThat(storage.get(source1.getBlobId())).isNotNull();
assertThat(storage.get(source2.getBlobId())).isNotNull();
assertThat(storage.get(source3.getBlobId())).isNotNull();

// Compose into target blob - this should complete without throwing an exception
GSBlobIdentifier target = new GSBlobIdentifier(TEST_BUCKET, "composed-target");
blobStorage.compose(java.util.Arrays.asList(source1, source2, source3), target);

// Verify the method completed (no exception thrown)
// The actual composition result cannot be verified with LocalStorageHelper
}

@Test
void testComposeSingleBlob() {
// Note: LocalStorageHelper doesn't fully implement the compose operation,
// so we can only verify that the method completes without error

// Create a single source blob
GSBlobIdentifier source = new GSBlobIdentifier(TEST_BUCKET, "single-source");
byte[] data = "single blob content".getBytes();
storage.create(BlobInfo.newBuilder(source.getBlobId()).build(), data);

// Verify source blob exists
assertThat(storage.get(source.getBlobId())).isNotNull();

// Compose into target (effectively a copy) - should complete without throwing
GSBlobIdentifier target = new GSBlobIdentifier(TEST_BUCKET, "single-target");
blobStorage.compose(java.util.Collections.singletonList(source), target);

// Verify the method completed (no exception thrown)
// The actual composition result cannot be verified with LocalStorageHelper
}

private static Stream<Arguments> provideOptionTypes() {
return Stream.of(
Arguments.of(
"BlobTargetOption",
(Supplier<Object>) Storage.BlobTargetOption::doesNotExist,
(Function<Long, Object>) Storage.BlobTargetOption::generationMatch,
Storage.BlobTargetOption.doesNotExist()),
Arguments.of(
"BlobWriteOption",
(Supplier<Object>) Storage.BlobWriteOption::doesNotExist,
(Function<Long, Object>) Storage.BlobWriteOption::generationMatch,
Storage.BlobWriteOption.doesNotExist()));
}
}