Skip to content

Commit

Permalink
feat: Use DH S3Instructions to build Iceberg AWS clients (#6113)
Browse files Browse the repository at this point in the history
This provides a way for users who are responsible for providing AWS / S3
credentials to specify it in a way where Deephaven can own the S3 client
building logic for the Iceberg Catalog in additional to our own data
access layer.

Note, this does _not_ deprecate `DataInstructionsProviderPlugin`, as
there may be cases where the user is _not_ responsible for providing
these credentials, and it is instead provided via the catalog after
catalog authorization. See #6191
  • Loading branch information
devinrsmith authored Oct 29, 2024
1 parent 640ab8f commit 59f226d
Show file tree
Hide file tree
Showing 24 changed files with 840 additions and 353 deletions.
38 changes: 30 additions & 8 deletions extensions/iceberg/s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ evaluationDependsOn Docker.registryProject('minio')
description 'Iceberg: Support to read iceberg catalogs.'

dependencies {
implementation project(':extensions-iceberg')
api project(':extensions-iceberg')

// Bring in the AWS / S3 extensions
api platform(libs.iceberg.bom)
Expand All @@ -25,12 +25,19 @@ dependencies {
implementation libs.awssdk.s3
implementation libs.awssdk.crt.client
runtimeOnly libs.awssdk.sts
runtimeOnly libs.awssdk.glue
implementation libs.awssdk.glue

// We don't want to explicitly pull in dependencies for dynamodb (org.apache.iceberg.aws.dynamodb.DynamoDbCatalog),
// but we need to be able to compile against it to implement AwsClientFactory
compileOnly libs.awssdk.dynamodb

// We don't want to explicitly pull in dependencies for KMS (there doesn't seem to be anything in Iceberg that
// actually calls it?), but we need to be able to compile against it to implement AwsClientFactory
compileOnly libs.awssdk.kms

compileOnly libs.autoservice
annotationProcessor libs.autoservice.compiler

testImplementation libs.junit4
testImplementation project(':engine-test-utils')

testImplementation libs.testcontainers
Expand All @@ -44,12 +51,27 @@ dependencies {
testRuntimeOnly project(':test-configs')
testRuntimeOnly project(':log-to-slf4j')
testRuntimeOnly libs.slf4j.simple

testImplementation platform(libs.junit.bom)
testImplementation libs.junit.jupiter
testRuntimeOnly libs.junit.jupiter.engine
testRuntimeOnly libs.junit.platform.launcher
}

TestTools.addEngineOutOfBandTest(project)
test {
useJUnitPlatform {
excludeTags("testcontainers")
}
}

tasks.register('testOutOfBand', Test) {
useJUnitPlatform {
includeTags("testcontainers")
}

testOutOfBand.dependsOn Docker.registryTask(project, 'localstack')
testOutOfBand.systemProperty 'testcontainers.localstack.image', Docker.localImageName('localstack')
dependsOn Docker.registryTask(project, 'localstack')
systemProperty 'testcontainers.localstack.image', Docker.localImageName('localstack')

testOutOfBand.dependsOn Docker.registryTask(project, 'minio')
testOutOfBand.systemProperty 'testcontainers.minio.image', Docker.localImageName('minio')
dependsOn Docker.registryTask(project, 'minio')
systemProperty 'testcontainers.minio.image', Docker.localImageName('minio')
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.s3;

import org.apache.iceberg.aws.AwsClientFactory;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.aws.s3.S3FileIOAwsClientFactory;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.s3.S3Client;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
* An {@link AwsClientFactory} and {@link S3FileIOAwsClientFactory} implementation that assumes ownership of AWS client
* creation as configured via {@link S3Instructions}.
*/
public final class DeephavenAwsClientFactory implements AwsClientFactory, S3FileIOAwsClientFactory {

private static final String UUID_KEY = DeephavenAwsClientFactory.class.getName() + ".__uuid";

/**
* Adds {@link DeephavenAwsClientFactory} to {@code propertiesOut} with the keys
* {@value AwsProperties#CLIENT_FACTORY} and {@value S3FileIOProperties#CLIENT_FACTORY}; it is an error if either of
* these properties is already set. After the corresponding {@link org.apache.iceberg.catalog.Catalog} is no longer
* in use, the caller should invoke the returned {@link Runnable} to clean up.
*
* @param instructions the instructions
* @param propertiesOut the properties
* @return the runnable to be invoked after initialization
*/
public static Runnable addToProperties(S3Instructions instructions, Map<String, String> propertiesOut) {
Objects.requireNonNull(instructions);
putOrThrow(propertiesOut, AwsProperties.CLIENT_FACTORY, DeephavenAwsClientFactory.class.getName());
putOrThrow(propertiesOut, S3FileIOProperties.CLIENT_FACTORY, DeephavenAwsClientFactory.class.getName());
final String uuid = UUID.randomUUID().toString();
putOrThrow(propertiesOut, UUID_KEY, uuid);
S3_INSTRUCTIONS_MAP.put(uuid, instructions);
return () -> S3_INSTRUCTIONS_MAP.remove(uuid);
}

/**
* Get the {@link S3Instructions} as set in the corresponding {@link #addToProperties(S3Instructions, Map)} if the
* properties were built with that. If the properties were built with {@link #addToProperties(S3Instructions, Map)},
* but the {@link Runnable} was already invoked for cleanup, an {@link IllegalStateException} will be thrown.
*
* @param properties the properties
* @return the instructions
*/
public static Optional<S3Instructions> getInstructions(Map<String, String> properties) {
final String uuid = properties.get(UUID_KEY);
if (uuid == null) {
return Optional.empty();
}
final S3Instructions instructions = S3_INSTRUCTIONS_MAP.get(uuid);
if (instructions == null) {
throw new IllegalStateException(
"This S3Instructions were already cleaned up; please ensure that the returned Runnable from addToProperties is not invoked until the Catalog is no longer in use.");
}
return Optional.of(instructions);
}

private static <K, V> void putOrThrow(Map<K, V> map, K key, V value) {
if (map.putIfAbsent(key, value) != null) {
throw new IllegalArgumentException(String.format("Key '%s' already exists in map", key));
}
}

private static final Map<String, S3Instructions> S3_INSTRUCTIONS_MAP = new ConcurrentHashMap<>();

private S3Instructions instructions;

public DeephavenAwsClientFactory() {
// This follows the pattern established by other Iceberg classes that have an initialize method; they have a
// default value that is set in construction, with the expectation that they are properly constructed in the
// initialize call. While those implementations likely could be stricter and implemented defensively (throwing
// an exception if any other methods are called before initialize), that does not seem to be the pattern in use.
// We do not _expect_ the default instructions as set here to ever be used, but we are choosing to follow the
// "established convention" in the rare case there is some caller misusing this in a way that does not effect
// the correctness of the end Catalog.
this.instructions = S3Instructions.DEFAULT;
}

@Override
public void initialize(Map<String, String> properties) {
this.instructions = getInstructions(properties).orElseThrow(() -> new IllegalArgumentException(
"DeephavenAwsClientFactory was setup improperly; it must be configured with DeephavenAwsClientFactory.addToProperties"));
}

@Override
public S3Client s3() {
// Iceberg calls this from org.apache.iceberg.aws.s3.S3FileIO which multiple Catalog implementations use. This
// implementation is backed by the same configuration primitives that our own async S3 client uses. It is well
// tested and provides parity between how Iceberg S3 and Deephaven S3 clients are initialized.
return S3ClientFactory.getSyncClient(instructions);
}

@Override
public GlueClient glue() {
// Iceberg calls this from org.apache.iceberg.aws.glue.GlueCatalog, and it's possible that other
// custom Catalog implementations could make use out of this interface. This implementation has been manually
// tested and confirmed to work in simple cases.
return GlueClient.builder()
.applyMutation(b -> S3ClientFactory.applyAllSharedSync(b, instructions))
.build();
}

@Override
public KmsClient kms() {
// Iceberg does not call this method. It is likely part of the interface to support advanced authorization that
// enterprise users may need. It's likely in those scenarios that the user is owning the full Catalog creation
// as well, with their own custom AwsClientFactory, so it's not clear if this implementation will be of value.
// That said, it is easy to build and follows the same pattern as the other clients, so it is provided in a
// "best-effort" basis without further testing.
return KmsClient.builder()
.applyMutation(b -> S3ClientFactory.applyAllSharedSync(b, instructions))
.build();
}

@Override
public DynamoDbClient dynamo() {
// Iceberg calls this from org.apache.iceberg.aws.dynamodb.DynamoDbCatalog, and it's possible that other
// custom Catalog implementations could make use out of this interface. This implementation is easy to build
// and follows the same pattern as the other clients, so it is provided in a "best-effort" basis without further
// testing.
return DynamoDbClient.builder()
.applyMutation(b -> S3ClientFactory.applyAllSharedSync(b, instructions))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@
package io.deephaven.iceberg.util;

import com.google.common.base.Strings;
import io.deephaven.extensions.s3.DeephavenAwsClientFactory;
import io.deephaven.extensions.s3.S3Instructions;
import io.deephaven.util.reference.CleanupReferenceProcessor;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.aws.AwsClientProperties;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.aws.HttpClientProperties;
import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import org.apache.iceberg.rest.RESTCatalog;
Expand All @@ -20,8 +25,7 @@
* Tools for accessing tables in the Iceberg table format.
*/
@SuppressWarnings("unused")
public class IcebergToolsS3 extends IcebergTools {
private static final String S3_FILE_IO_CLASS = "org.apache.iceberg.aws.s3.S3FileIO";
public final class IcebergToolsS3 {

/**
* Create an Iceberg catalog adapter for a REST catalog backed by S3 storage. If {@code null} is provided for a
Expand Down Expand Up @@ -104,4 +108,52 @@ public static IcebergCatalogAdapter createGlue(

return new IcebergCatalogAdapter(catalog, properties);
}

/**
* Create an Iceberg catalog adapter.
*
* <p>
* This is the preferred way to configure an Iceberg catalog adapter when the caller is responsible for providing
* AWS / S3 connectivity details; specifically, this allows for the parity of construction logic between
* Iceberg-managed and Deephaven-managed AWS clients. For advanced use-cases, users are encouraged to use
* {@link S3Instructions#profileName() profiles} which allows a rich degree of configurability. The
* {@code instructions} will automatically be used as special instructions if
* {@link IcebergReadInstructions#dataInstructions()} is not explicitly set. The caller is still responsible for
* providing any other properties necessary to configure their {@link org.apache.iceberg.catalog.Catalog}
* implementation.
*
* <p>
* In cases where the caller prefers to use Iceberg's AWS properties (found amongst {@link AwsProperties},
* {@link S3FileIOProperties}, and {@link HttpClientProperties}), they should use
* {@link IcebergTools#createAdapter(String, Map, Map) IcebergTools} directly. In this case, parity will be limited
* to what {@link S3InstructionsProviderPlugin} is able to infer; in advanced cases, it's possible that there will
* be a difference in construction logic between the Iceberg-managed and Deephaven-managed AWS clients which
* manifests itself as being able to browse {@link org.apache.iceberg.catalog.Catalog} metadata, but not retrieve
* {@link org.apache.iceberg.Table} data.
*
* <p>
* Note: this method does not explicitly set, nor impose, that {@link org.apache.iceberg.aws.s3.S3FileIO} be used.
* It's possible that a {@link org.apache.iceberg.catalog.Catalog} implementations depends on an AWS client for
* purposes unrelated to storing the warehouse data via S3.
*
* @param name the name of the catalog; if omitted, the catalog URI will be used to generate a name
* @param properties a map containing the Iceberg catalog properties to use
* @param hadoopConfig a map containing Hadoop configuration properties to use
* @param instructions the s3 instructions
* @return the Iceberg catalog adapter
*/
public static IcebergCatalogAdapter createAdapter(
@Nullable final String name,
@NotNull final Map<String, String> properties,
@NotNull final Map<String, String> hadoopConfig,
@NotNull final S3Instructions instructions) {
final Map<String, String> newProperties = new HashMap<>(properties);
final Runnable cleanup = DeephavenAwsClientFactory.addToProperties(instructions, newProperties);
final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(name, newProperties, hadoopConfig);
// When the Catalog becomes phantom reachable, we can invoke the DeephavenAwsClientFactory cleanup.
// Note: it would be incorrect to register the cleanup against the adapter since the Catalog can outlive the
// adapter (and the DeephavenAwsClientFactory properties are needed by the Catalog).
CleanupReferenceProcessor.getDefault().registerPhantom(adapter.catalog(), cleanup);
return adapter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.google.auto.service.AutoService;
import io.deephaven.extensions.s3.Credentials;
import io.deephaven.extensions.s3.DeephavenAwsClientFactory;
import io.deephaven.extensions.s3.S3Instructions;
import io.deephaven.iceberg.internal.DataInstructionsProviderPlugin;
import org.apache.iceberg.aws.AwsClientProperties;
Expand All @@ -15,13 +16,21 @@
import java.util.Map;

/**
* {@link io.deephaven.iceberg.internal.DataInstructionsProviderPlugin} implementation used for reading files from S3.
* {@link DataInstructionsProviderPlugin} implementation for producing a {@link S3Instructions}. The produced
* instructions will be from {@link DeephavenAwsClientFactory#getInstructions(Map)} if present, and otherwise will make
* a best-effort attempt to create an equivalent instructions based on properties from {@link AwsClientProperties} and
* {@link S3FileIOProperties}.
*/
@AutoService(io.deephaven.iceberg.internal.DataInstructionsProviderPlugin.class)
@AutoService(DataInstructionsProviderPlugin.class)
@SuppressWarnings("unused")
public final class S3InstructionsProviderPlugin implements DataInstructionsProviderPlugin {
@Override
public Object createInstructions(@NotNull final URI uri, @NotNull final Map<String, String> properties) {
public S3Instructions createInstructions(@NotNull final URI uri, @NotNull final Map<String, String> properties) {
final S3Instructions s3Instructions = DeephavenAwsClientFactory.getInstructions(properties).orElse(null);
if (s3Instructions != null) {
return s3Instructions;
}

// If the URI scheme is "s3","s3a","s3n" or if the properties contain one of these specific keys, we can
// create a useful S3Instructions object.
if (uri.getScheme().equals("s3")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

import io.deephaven.extensions.s3.S3Instructions.Builder;
import io.deephaven.extensions.s3.testlib.SingletonContainers.LocalStack;
import org.junit.BeforeClass;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import software.amazon.awssdk.services.s3.S3AsyncClient;

import java.util.Map;
Expand All @@ -15,9 +16,11 @@
import static org.apache.iceberg.aws.s3.S3FileIOProperties.ENDPOINT;
import static org.apache.iceberg.aws.s3.S3FileIOProperties.SECRET_ACCESS_KEY;

@Tag("testcontainers")
@Deprecated
public class IcebergLocalStackTest extends IcebergToolsTest {

@BeforeClass
@BeforeAll
public static void initContainer() {
// ensure container is started so container startup time isn't associated with a specific test
LocalStack.init();
Expand All @@ -34,7 +37,7 @@ public S3AsyncClient s3AsyncClient() {
}

@Override
public Map<String, String> s3Properties() {
public Map<String, String> properties() {
return Map.of(
ENDPOINT, LocalStack.s3Endpoint(),
CLIENT_REGION, LocalStack.region(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import io.deephaven.extensions.s3.testlib.SingletonContainers.MinIO;
import io.deephaven.stats.util.OSUtil;
import org.junit.jupiter.api.Assumptions;
import org.junit.BeforeClass;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import software.amazon.awssdk.services.s3.S3AsyncClient;

import java.util.Map;
Expand All @@ -17,9 +18,11 @@
import static org.apache.iceberg.aws.s3.S3FileIOProperties.ENDPOINT;
import static org.apache.iceberg.aws.s3.S3FileIOProperties.SECRET_ACCESS_KEY;

@Tag("testcontainers")
@Deprecated
public class IcebergMinIOTest extends IcebergToolsTest {

@BeforeClass
@BeforeAll
public static void initContainer() {
// TODO(deephaven-core#5116): MinIO testcontainers does not work on OS X
Assumptions.assumeFalse(OSUtil.runningMacOS(), "OSUtil.runningMacOS()");
Expand All @@ -38,12 +41,11 @@ public S3AsyncClient s3AsyncClient() {
}

@Override
public Map<String, String> s3Properties() {
public Map<String, String> properties() {
return Map.of(
ENDPOINT, MinIO.s3Endpoint(),
CLIENT_REGION, MinIO.region(),
ACCESS_KEY_ID, MinIO.accessKey(),
SECRET_ACCESS_KEY, MinIO.secretAccessKey());
}

}
Loading

0 comments on commit 59f226d

Please sign in to comment.