From 38a3aa87bae37b0808670bf24f755d9c23b12f84 Mon Sep 17 00:00:00 2001 From: Devin Smith Date: Fri, 15 Nov 2024 00:35:15 -0800 Subject: [PATCH] fix!: ensure Iceberg layouts own the SeekableChannelsProvider (#6371) This greatly improves the efficiency of Iceberg reading. Previously, it was creating a `SeekableChannelsProvider` per URI, and now only one is created once per layout (/ Table). To aid in this update, objects that were previously created on-demand in `IcebergBaseLayout` are now created once upon construction. To enable this, it was noted that only the URI scheme is relevant for discrimination, and not actually the full URI to the data files. Thus, we can use the URI scheme as provided via `org.apache.iceberg.Table#location` to do any up-front loading. The various interfaces that take a URI have been update to take a URI scheme instead. While this change could technically have been made in a non-breaking fashion by delegating existing URI methods to URI scheme methods, the existence of the URI methods encourages the wrong mental model and is easy to misuse, so they have been removed. One of the `ParquetTableLocationKey` constructors has been deprecated, marked for removal. A more appropriate constructor has been added. BREAKING CHANGE: - `SeekableChannelsProviderLoader.fromServiceLoader` has been removed, replaced with `SeekableChannelsProviderLoader.load`. - `DataInstructionsProviderLoader.fromServiceLoader` has been removed, replaced with `DataInstructionsProviderLoader.load`. - `SeekableChannelsProviderPlugin` methods have been changed, now use a `String` for the URI scheme instead of a `URI`. - `DataInstructionsProviderPlugin.createInstructions` method has been changed, now uses a `String` for the URI scheme instead of a `URI`. - `IcebergTableParquetLocationKey` has added a new `SeekableChannelsProvider` parameter. --- .../SeekableChannelsProviderLoader.java | 36 +++--- .../SeekableChannelsProviderPlugin.java | 10 +- .../util/S3InstructionsProviderPlugin.java | 10 +- .../DataInstructionsProviderLoader.java | 51 +++++---- .../DataInstructionsProviderPlugin.java | 7 +- .../iceberg/layout/IcebergBaseLayout.java | 104 +++++++++--------- .../IcebergTableParquetLocationKey.java | 7 +- .../deephaven/parquet/table/ParquetTools.java | 4 +- .../DeephavenNestedPartitionLayout.java | 9 +- .../layout/ParquetFlatPartitionedLayout.java | 8 +- .../ParquetKeyValuePartitionedLayout.java | 24 ++-- .../layout/ParquetMetadataFileLayout.java | 7 +- .../location/ParquetTableLocationKey.java | 31 +++++- .../s3/GCSSeekableChannelProviderPlugin.java | 10 +- .../s3/S3SeekableChannelProviderPlugin.java | 10 +- .../s3/S3SeekableChannelSimpleTestBase.java | 8 +- .../testlib/S3SeekableChannelTestSetup.java | 8 +- ...TrackedSeekableChannelsProviderPlugin.java | 12 +- 18 files changed, 195 insertions(+), 161 deletions(-) diff --git a/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProviderLoader.java b/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProviderLoader.java index 98ab4d8c583..ee2508ce2ca 100644 --- a/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProviderLoader.java +++ b/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProviderLoader.java @@ -6,7 +6,6 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.ServiceLoader; @@ -19,11 +18,22 @@ public final class SeekableChannelsProviderLoader { private static volatile SeekableChannelsProviderLoader instance; + /** + * Get a static a {@link SeekableChannelsProviderLoader} instance that is loading with + * {@link SeekableChannelsProviderPlugin} provided via {@link ServiceLoader#load(Class)}. + * + * @return The {@link SeekableChannelsProviderLoader} instance. + */ public static SeekableChannelsProviderLoader getInstance() { - if (instance == null) { - instance = new SeekableChannelsProviderLoader(); + SeekableChannelsProviderLoader localInstance; + if ((localInstance = instance) == null) { + synchronized (SeekableChannelsProviderLoader.class) { + if ((localInstance = instance) == null) { + instance = localInstance = new SeekableChannelsProviderLoader(); + } + } } - return instance; + return localInstance; } private final List providers; @@ -37,21 +47,19 @@ private SeekableChannelsProviderLoader() { } /** - * Create a new {@link SeekableChannelsProvider} compatible for reading from and writing to the given URI, using the - * plugins loaded by the {@link ServiceLoader}. For example, for a "S3" URI, we will create a - * {@link SeekableChannelsProvider} which can read files from S3. + * Create a new {@link SeekableChannelsProvider} compatible for reading from and writing to the given URI scheme. + * For example, for a "S3" URI, we will create a {@link SeekableChannelsProvider} which can read files from S3. * - * @param uri The URI + * @param uriScheme The URI scheme * @param specialInstructions An optional object to pass special instructions to the provider. - * @return A {@link SeekableChannelsProvider} for the given URI. + * @return A {@link SeekableChannelsProvider} for the given URI scheme. */ - public SeekableChannelsProvider fromServiceLoader(@NotNull final URI uri, - @Nullable final Object specialInstructions) { + public SeekableChannelsProvider load(@NotNull final String uriScheme, @Nullable final Object specialInstructions) { for (final SeekableChannelsProviderPlugin plugin : providers) { - if (plugin.isCompatible(uri, specialInstructions)) { - return plugin.createProvider(uri, specialInstructions); + if (plugin.isCompatible(uriScheme, specialInstructions)) { + return plugin.createProvider(uriScheme, specialInstructions); } } - throw new UnsupportedOperationException("No plugin found for uri: " + uri); + throw new UnsupportedOperationException("No plugin found for uri scheme: " + uriScheme); } } diff --git a/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProviderPlugin.java b/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProviderPlugin.java index 4ae6ae6d76b..e0f21741cb8 100644 --- a/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProviderPlugin.java +++ b/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProviderPlugin.java @@ -6,20 +6,18 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import java.net.URI; - /** * A plugin interface for providing {@link SeekableChannelsProvider} implementations for different URI schemes, e.g. S3. * Check out {@link SeekableChannelsProviderLoader} for more details. */ public interface SeekableChannelsProviderPlugin { /** - * Check if this plugin is compatible with the given URI and config object. + * Check if this plugin is compatible with the given URI scheme and config object. */ - boolean isCompatible(@NotNull URI uri, @Nullable Object config); + boolean isCompatible(@NotNull String uriScheme, @Nullable Object config); /** - * Create a {@link SeekableChannelsProvider} for the given URI and config object. + * Create a {@link SeekableChannelsProvider} for the given URI scheme and config object. */ - SeekableChannelsProvider createProvider(@NotNull URI uri, @Nullable Object object); + SeekableChannelsProvider createProvider(@NotNull String uriScheme, @Nullable Object object); } diff --git a/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/S3InstructionsProviderPlugin.java b/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/S3InstructionsProviderPlugin.java index bd72daff66c..6e8b1d1bf1f 100644 --- a/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/S3InstructionsProviderPlugin.java +++ b/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/S3InstructionsProviderPlugin.java @@ -12,7 +12,6 @@ import org.apache.iceberg.aws.s3.S3FileIOProperties; import org.jetbrains.annotations.NotNull; -import java.net.URI; import java.util.Map; /** @@ -25,7 +24,8 @@ @SuppressWarnings("unused") public final class S3InstructionsProviderPlugin implements DataInstructionsProviderPlugin { @Override - public S3Instructions createInstructions(@NotNull final URI uri, @NotNull final Map properties) { + public S3Instructions createInstructions(@NotNull final String uriScheme, + @NotNull final Map properties) { final S3Instructions s3Instructions = DeephavenAwsClientFactory.getInstructions(properties).orElse(null); if (s3Instructions != null) { return s3Instructions; @@ -33,9 +33,9 @@ public S3Instructions createInstructions(@NotNull final URI uri, @NotNull final // If the URI scheme is "s3","s3a","s3n" or if the properties contain one of these specific keys, we can // create a useful S3Instructions object. - if (uri.getScheme().equals("s3") - || uri.getScheme().equals("s3a") - || uri.getScheme().equals("s3n") + if (uriScheme.equals("s3") + || uriScheme.equals("s3a") + || uriScheme.equals("s3n") || properties.containsKey(AwsClientProperties.CLIENT_REGION) || properties.containsKey(S3FileIOProperties.ACCESS_KEY_ID) || properties.containsKey(S3FileIOProperties.SECRET_ACCESS_KEY) diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/internal/DataInstructionsProviderLoader.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/internal/DataInstructionsProviderLoader.java index 4ae28e0e044..b2c7cefcbf3 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/internal/DataInstructionsProviderLoader.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/internal/DataInstructionsProviderLoader.java @@ -5,12 +5,15 @@ import org.jetbrains.annotations.NotNull; -import java.net.URI; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.ServiceLoader; /** * A service loader class for loading {@link DataInstructionsProviderPlugin} implementations at runtime which provide - * {@link DataInstructionsProviderLoader} implementations for different URI paths. + * {@link DataInstructionsProviderLoader} implementations for different URI schemes. */ public final class DataInstructionsProviderLoader { /** @@ -21,30 +24,33 @@ public final class DataInstructionsProviderLoader { /** * Ensure that the {@link DataInstructionsProviderPlugin plugins} are loaded exactly once. */ - private static void ensureProviders() { - if (cachedProviders == null) { + private static List ensureProviders() { + List localProviders; + if ((localProviders = cachedProviders) == null) { synchronized (DataInstructionsProviderLoader.class) { - if (cachedProviders == null) { - cachedProviders = new ArrayList<>(); + if ((localProviders = cachedProviders) == null) { + localProviders = new ArrayList<>(); // Load the plugins for (final DataInstructionsProviderPlugin plugin : ServiceLoader .load(DataInstructionsProviderPlugin.class)) { - cachedProviders.add(plugin); + localProviders.add(plugin); } + cachedProviders = localProviders; } } } + return localProviders; } /** - * Get a {@link DataInstructionsProviderLoader} instance for the given property collection. + * Create a {@link DataInstructionsProviderLoader} instance for the given property collection with a static list of + * {@link DataInstructionsProviderPlugin} provided via {@link ServiceLoader#load(Class)}. * * @param properties The property collection. * @return A {@link DataInstructionsProviderLoader} instance. */ public static DataInstructionsProviderLoader create(final Map properties) { - ensureProviders(); - return new DataInstructionsProviderLoader(properties); + return new DataInstructionsProviderLoader(properties, ensureProviders()); } /** @@ -62,27 +68,28 @@ public static DataInstructionsProviderLoader create(final Map pr * * @param properties The property collection. */ - private DataInstructionsProviderLoader(final Map properties) { - this.properties = properties; - providers = cachedProviders; + private DataInstructionsProviderLoader( + final Map properties, + final List providers) { + this.properties = Objects.requireNonNull(properties); + this.providers = Objects.requireNonNull(providers); } /** - * Create a new data instructions object compatible with reading from and writing to the given URI, using the - * plugins loaded by the {@link ServiceLoader}. For example, for a "S3" URI, we will create an - * {@code S3Instructions} object which can read files from S3. + * Create a new data instructions object compatible with reading from and writing to the given URI scheme. For + * example, for an "S3" URI scheme will create an {@code S3Instructions} object which can read files from S3. * - * @param uri The URI - * @return A data instructions object for the given URI or null if one cannot be found + * @param uriScheme The URI scheme + * @return A data instructions object for the given URI scheme or null if one cannot be found */ - public Object fromServiceLoader(@NotNull final URI uri) { + public Object load(@NotNull final String uriScheme) { for (final DataInstructionsProviderPlugin plugin : providers) { - final Object pluginInstructions = plugin.createInstructions(uri, properties); + final Object pluginInstructions = plugin.createInstructions(uriScheme, properties); if (pluginInstructions != null) { return pluginInstructions; } } - // No plugin found for this URI and property collection. + // No plugin found for this URI scheme and property collection. return null; } } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/internal/DataInstructionsProviderPlugin.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/internal/DataInstructionsProviderPlugin.java index b0e48cc9166..632312d6605 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/internal/DataInstructionsProviderPlugin.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/internal/DataInstructionsProviderPlugin.java @@ -5,16 +5,15 @@ import org.jetbrains.annotations.NotNull; -import java.net.URI; import java.util.Map; /** * A plugin interface for providing {@link DataInstructionsProviderPlugin} implementations for different property - * collections and URI values. Check out {@link DataInstructionsProviderLoader} for more details. + * collections and URI schemes. Check out {@link DataInstructionsProviderLoader} for more details. */ public interface DataInstructionsProviderPlugin { /** - * Create a data instructions object for the given URI. + * Create a data instructions object for the given URI scheme. */ - Object createInstructions(@NotNull URI uri, @NotNull final Map properties); + Object createInstructions(@NotNull String uriScheme, @NotNull Map properties); } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java index 970ad673581..647e7da12a0 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java @@ -14,6 +14,8 @@ import io.deephaven.iceberg.util.IcebergTableAdapter; import io.deephaven.parquet.table.ParquetInstructions; import io.deephaven.iceberg.internal.DataInstructionsProviderLoader; +import io.deephaven.util.channel.SeekableChannelsProvider; +import io.deephaven.util.channel.SeekableChannelsProviderLoader; import org.apache.iceberg.*; import org.apache.iceberg.io.FileIO; import org.jetbrains.annotations.NotNull; @@ -38,65 +40,37 @@ public abstract class IcebergBaseLayout implements TableLocationKeyFinder cache; + private final Map cache; /** - * The {@link Snapshot} from which to discover data files. + * The {@link ParquetInstructions} object that will be used to read any Parquet data files in this table. */ - Snapshot snapshot; + private final ParquetInstructions parquetInstructions; /** - * The data instructions provider for creating instructions from URI and user-supplied properties. + * The {@link SeekableChannelsProvider} object that will be used for {@link IcebergTableParquetLocationKey} + * creation. */ - final DataInstructionsProviderLoader dataInstructionsProvider; + private final SeekableChannelsProvider channelsProvider; /** - * The {@link ParquetInstructions} object that will be used to read any Parquet data files in this table. Only - * accessed while synchronized on {@code this}. + * The {@link Snapshot} from which to discover data files. */ - ParquetInstructions parquetInstructions; + Snapshot snapshot; protected IcebergTableLocationKey locationKey( final org.apache.iceberg.FileFormat format, final URI fileUri, @Nullable final Map> partitions) { - if (format == org.apache.iceberg.FileFormat.PARQUET) { - if (parquetInstructions == null) { - // Start with user-supplied instructions (if provided). - final ParquetInstructions.Builder builder = new ParquetInstructions.Builder(); - - // Add the table definition. - builder.setTableDefinition(tableDef); - - // Add any column rename mappings. - if (!instructions.columnRenames().isEmpty()) { - for (Map.Entry entry : instructions.columnRenames().entrySet()) { - builder.addColumnNameMapping(entry.getKey(), entry.getValue()); - } - } - - // Add the data instructions if provided as part of the IcebergReadInstructions. - if (instructions.dataInstructions().isPresent()) { - builder.setSpecialInstructions(instructions.dataInstructions().get()); - } else { - // Attempt to create data instructions from the properties collection and URI. - final Object dataInstructions = dataInstructionsProvider.fromServiceLoader(fileUri); - if (dataInstructions != null) { - builder.setSpecialInstructions(dataInstructions); - } - } - - parquetInstructions = builder.build(); - } - return new IcebergTableParquetLocationKey(fileUri, 0, partitions, parquetInstructions); + return new IcebergTableParquetLocationKey(fileUri, 0, partitions, parquetInstructions, channelsProvider); } throw new UnsupportedOperationException(String.format("%s:%d - an unsupported file format %s for URI '%s'", tableAdapter, snapshot.snapshotId(), format, fileUri)); @@ -112,23 +86,46 @@ public IcebergBaseLayout( @NotNull final DataInstructionsProviderLoader dataInstructionsProvider) { this.tableAdapter = tableAdapter; this.snapshot = tableAdapter.getSnapshot(instructions); - this.instructions = instructions; - this.dataInstructionsProvider = dataInstructionsProvider; this.tableDef = tableAdapter.definition(instructions); - + this.uriScheme = locationUri(tableAdapter.icebergTable()).getScheme(); + // Add the data instructions if provided as part of the IcebergReadInstructions, or else attempt to create + // data instructions from the properties collection and URI scheme. + final Object specialInstructions = instructions.dataInstructions() + .orElseGet(() -> dataInstructionsProvider.load(uriScheme)); + { + // Start with user-supplied instructions (if provided). + final ParquetInstructions.Builder builder = new ParquetInstructions.Builder(); + + // Add the table definition. + builder.setTableDefinition(tableDef); + + // Add any column rename mappings. + if (!instructions.columnRenames().isEmpty()) { + for (Map.Entry entry : instructions.columnRenames().entrySet()) { + builder.addColumnNameMapping(entry.getKey(), entry.getValue()); + } + } + if (specialInstructions != null) { + builder.setSpecialInstructions(specialInstructions); + } + this.parquetInstructions = builder.build(); + } + this.channelsProvider = SeekableChannelsProviderLoader.getInstance().load(uriScheme, specialInstructions); this.cache = new HashMap<>(); } abstract IcebergTableLocationKey keyFromDataFile(DataFile df, URI fileUri); - @NotNull - private URI dataFileUri(@NotNull DataFile df) { - String path = df.path().toString(); - final FileIO fileIO = tableAdapter.icebergTable().io(); - if (fileIO instanceof RelativeFileIO) { - path = ((RelativeFileIO) fileIO).absoluteLocation(path); - } - return FileUtils.convertToURI(path, false); + private static String path(String path, FileIO io) { + return io instanceof RelativeFileIO ? ((RelativeFileIO) io).absoluteLocation(path) : path; + } + + private static URI locationUri(Table table) { + return FileUtils.convertToURI(path(table.location(), table.io()), true); + } + + private static URI dataFileUri(Table table, DataFile dataFile) { + return FileUtils.convertToURI(path(dataFile.path().toString(), table.io()), false); } @Override @@ -149,7 +146,12 @@ public synchronized void findKeys(@NotNull final Consumer reader = ManifestFiles.read(manifestFile, table.io())) { for (DataFile df : reader) { - final URI fileUri = dataFileUri(df); + final URI fileUri = dataFileUri(table, df); + if (!uriScheme.equals(fileUri.getScheme())) { + throw new TableDataException(String.format( + "%s:%d - multiple URI schemes are not currently supported. uriScheme=%s, fileUri=%s", + table, snapshot.snapshotId(), uriScheme, fileUri)); + } final IcebergTableLocationKey locationKey = cache.computeIfAbsent(fileUri, uri -> keyFromDataFile(df, fileUri)); if (locationKey != null) { diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableParquetLocationKey.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableParquetLocationKey.java index e356d0ecb92..52ae74b0e8f 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableParquetLocationKey.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableParquetLocationKey.java @@ -6,6 +6,7 @@ import io.deephaven.engine.table.impl.locations.TableLocationKey; import io.deephaven.parquet.table.ParquetInstructions; import io.deephaven.parquet.table.location.ParquetTableLocationKey; +import io.deephaven.util.channel.SeekableChannelsProvider; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -29,13 +30,15 @@ public class IcebergTableParquetLocationKey extends ParquetTableLocationKey impl * parameter is {@code null}, the location will be a member of no partitions. An ordered copy of the map will * be made, so the calling code is free to mutate the map after this call * @param readInstructions the instructions for customizations while reading + * @param channelsProvider the provider for reading the file */ public IcebergTableParquetLocationKey( @NotNull final URI fileUri, final int order, @Nullable final Map> partitions, - @NotNull final ParquetInstructions readInstructions) { - super(fileUri, order, partitions, readInstructions); + @NotNull final ParquetInstructions readInstructions, + @NotNull final SeekableChannelsProvider channelsProvider) { + super(fileUri, order, partitions, channelsProvider); this.readInstructions = readInstructions; } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java index 0a4821629f7..a03294a28ed 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java @@ -575,7 +575,7 @@ private static void writeTablesImpl( } // Assuming all destination URIs have the same scheme, and will use the same channels provider instance final SeekableChannelsProvider channelsProvider = SeekableChannelsProviderLoader.getInstance() - .fromServiceLoader(destinations[0], writeInstructions.getSpecialInstructions()); + .load(destinations[0].getScheme(), writeInstructions.getSpecialInstructions()); final ParquetMetadataFileWriter metadataFileWriter; if (writeInstructions.generateMetadataFiles()) { @@ -958,7 +958,7 @@ private static Table readPartitionedTableDirectory( // Check if the directory has a metadata file final URI metadataFileURI = tableRootDirectory.resolve(METADATA_FILE_NAME); final SeekableChannelsProvider channelsProvider = - SeekableChannelsProviderLoader.getInstance().fromServiceLoader(tableRootDirectory, + SeekableChannelsProviderLoader.getInstance().load(tableRootDirectory.getScheme(), readInstructions.getSpecialInstructions()); if (channelsProvider.exists(metadataFileURI)) { return readPartitionedTableWithMetadata(metadataFileURI, readInstructions, channelsProvider); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/DeephavenNestedPartitionLayout.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/DeephavenNestedPartitionLayout.java index 830e3e719cf..31f367763bf 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/DeephavenNestedPartitionLayout.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/DeephavenNestedPartitionLayout.java @@ -3,6 +3,7 @@ // package io.deephaven.parquet.table.layout; +import io.deephaven.base.FileUtils; import io.deephaven.engine.table.impl.locations.TableDataException; import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder; import io.deephaven.engine.table.impl.locations.local.URITableLocationKey; @@ -53,16 +54,16 @@ public static DeephavenNestedPartitionLayout forParquet @NotNull final String columnPartitionKey, @Nullable final Predicate internalPartitionValueFilter, @NotNull final ParquetInstructions readInstructions) { - final SeekableChannelsProvider channelsProvider = - SeekableChannelsProviderLoader.getInstance().fromServiceLoader(convertToURI(tableRootDirectory, true), - readInstructions.getSpecialInstructions()); + // noinspection resource + final SeekableChannelsProvider channelsProvider = SeekableChannelsProviderLoader.getInstance() + .load(FileUtils.FILE_URI_SCHEME, readInstructions.getSpecialInstructions()); return new DeephavenNestedPartitionLayout<>(tableRootDirectory, tableName, columnPartitionKey, internalPartitionValueFilter) { @Override protected ParquetTableLocationKey makeKey(@NotNull Path tableLeafDirectory, @NotNull Map> partitions) { final URI fileURI = convertToURI(tableLeafDirectory.resolve(PARQUET_FILE_NAME), false); - return new ParquetTableLocationKey(fileURI, 0, partitions, readInstructions, channelsProvider); + return new ParquetTableLocationKey(fileURI, 0, partitions, channelsProvider); } }; } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetFlatPartitionedLayout.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetFlatPartitionedLayout.java index 71f0068383b..cd0ec0add51 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetFlatPartitionedLayout.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetFlatPartitionedLayout.java @@ -31,7 +31,6 @@ public final class ParquetFlatPartitionedLayout implements TableLocationKeyFinde private final URI tableRootDirectory; private final Map cache; - private final ParquetInstructions readInstructions; private final SeekableChannelsProvider channelsProvider; /** @@ -42,9 +41,8 @@ public ParquetFlatPartitionedLayout(@NotNull final URI tableRootDirectoryURI, @NotNull final ParquetInstructions readInstructions) { this.tableRootDirectory = tableRootDirectoryURI; this.cache = Collections.synchronizedMap(new HashMap<>()); - this.readInstructions = readInstructions; - this.channelsProvider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader(tableRootDirectory, - readInstructions.getSpecialInstructions()); + this.channelsProvider = SeekableChannelsProviderLoader.getInstance() + .load(tableRootDirectory.getScheme(), readInstructions.getSpecialInstructions()); } public String toString() { @@ -80,6 +78,6 @@ public void findKeys(@NotNull final Consumer locationKe } private ParquetTableLocationKey locationKey(@NotNull final URI uri) { - return new ParquetTableLocationKey(uri, 0, null, readInstructions, channelsProvider); + return new ParquetTableLocationKey(uri, 0, null, channelsProvider); } } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetKeyValuePartitionedLayout.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetKeyValuePartitionedLayout.java index c2b10421600..310fc015161 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetKeyValuePartitionedLayout.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetKeyValuePartitionedLayout.java @@ -65,22 +65,20 @@ public static ParquetKeyValuePartitionedLayout create( @NotNull final ParquetInstructions readInstructions, @Nullable SeekableChannelsProvider channelsProvider) { if (channelsProvider == null) { - channelsProvider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader(tableRootDirectory, - readInstructions.getSpecialInstructions()); + // noinspection resource + channelsProvider = SeekableChannelsProviderLoader.getInstance() + .load(tableRootDirectory.getScheme(), readInstructions.getSpecialInstructions()); } - return new ParquetKeyValuePartitionedLayout(tableRootDirectory, tableDefinition, readInstructions, - channelsProvider); + return new ParquetKeyValuePartitionedLayout(tableRootDirectory, tableDefinition, channelsProvider); } private ParquetKeyValuePartitionedLayout( @NotNull final URI tableRootDirectory, @NotNull final TableDefinition tableDefinition, - @NotNull final ParquetInstructions readInstructions, @NotNull final SeekableChannelsProvider channelsProvider) { super(tableRootDirectory, () -> new LocationTableBuilderDefinition(tableDefinition), - (uri, partitions) -> new ParquetTableLocationKey(uri, 0, partitions, readInstructions, - channelsProvider), + (uri, partitions) -> new ParquetTableLocationKey(uri, 0, partitions, channelsProvider), Math.toIntExact(tableDefinition.getColumnStream().filter(ColumnDefinition::isPartitioning).count())); this.channelsProvider = channelsProvider; } @@ -102,22 +100,20 @@ public static ParquetKeyValuePartitionedLayout create( @NotNull final ParquetInstructions readInstructions, @Nullable SeekableChannelsProvider channelsProvider) { if (channelsProvider == null) { - channelsProvider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader(tableRootDirectory, - readInstructions.getSpecialInstructions()); + // noinspection resource + channelsProvider = SeekableChannelsProviderLoader.getInstance() + .load(tableRootDirectory.getScheme(), readInstructions.getSpecialInstructions()); } - return new ParquetKeyValuePartitionedLayout(tableRootDirectory, maxPartitioningLevels, readInstructions, - channelsProvider); + return new ParquetKeyValuePartitionedLayout(tableRootDirectory, maxPartitioningLevels, channelsProvider); } private ParquetKeyValuePartitionedLayout( @NotNull final URI tableRootDirectory, final int maxPartitioningLevels, - @NotNull final ParquetInstructions readInstructions, @NotNull final SeekableChannelsProvider channelsProvider) { super(tableRootDirectory, () -> new LocationTableBuilderCsv(tableRootDirectory), - (uri, partitions) -> new ParquetTableLocationKey(uri, 0, partitions, readInstructions, - channelsProvider), + (uri, partitions) -> new ParquetTableLocationKey(uri, 0, partitions, channelsProvider), maxPartitioningLevels); this.channelsProvider = channelsProvider; } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java index 84546de519e..09700f17280 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java @@ -96,8 +96,9 @@ public static ParquetMetadataFileLayout create( final URI metadataFileURI = isMetadataFile ? source : directory.resolve(METADATA_FILE_NAME); final URI commonMetadataFileURI = isCommonMetadataFile ? source : directory.resolve(COMMON_METADATA_FILE_NAME); if (channelsProvider == null) { - channelsProvider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader(source, - inputInstructions.getSpecialInstructions()); + // noinspection resource + channelsProvider = SeekableChannelsProviderLoader.getInstance() + .load(source.getScheme(), inputInstructions.getSpecialInstructions()); } return new ParquetMetadataFileLayout(directory, metadataFileURI, commonMetadataFileURI, inputInstructions, channelsProvider); @@ -232,7 +233,7 @@ private ParquetMetadataFileLayout( } final URI partitionFileURI = resolve(tableRootDirectory, relativePathString); final ParquetTableLocationKey tlk = new ParquetTableLocationKey(partitionFileURI, - partitionOrder.getAndIncrement(), partitions, inputInstructions, channelsProvider); + partitionOrder.getAndIncrement(), partitions, channelsProvider); tlk.setFileReader(metadataFileReader); tlk.setMetadata(getParquetMetadataForFile(relativePathString, metadataFileMetadata)); tlk.setRowGroupIndices(rowGroupIndices); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java index 1681c0fc339..685d973cf35 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java @@ -21,6 +21,7 @@ import java.net.URI; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.stream.IntStream; import static io.deephaven.parquet.base.ParquetUtils.PARQUET_FILE_EXTENSION; @@ -54,9 +55,8 @@ public class ParquetTableLocationKey extends URITableLocationKey { public ParquetTableLocationKey(@NotNull final URI parquetFileUri, final int order, @Nullable final Map> partitions, @NotNull final ParquetInstructions readInstructions) { - this(parquetFileUri, order, partitions, readInstructions, - SeekableChannelsProviderLoader.getInstance().fromServiceLoader(parquetFileUri, - readInstructions.getSpecialInstructions())); + this(parquetFileUri, order, partitions, SeekableChannelsProviderLoader.getInstance() + .load(parquetFileUri.getScheme(), readInstructions.getSpecialInstructions())); } /** @@ -67,15 +67,36 @@ public ParquetTableLocationKey(@NotNull final URI parquetFileUri, final int orde * @param partitions The table partitions enclosing the table location keyed by {@code this}. Note that if this * parameter is {@code null}, the location will be a member of no partitions. An ordered copy of the map will * be made, so the calling code is free to mutate the map after this call - * @param readInstructions the instructions for customizations while reading + * @param readInstructions the instructions for customizations while reading, unused * @param channelsProvider the provider for reading the file + * @deprecated the {@code readInstructions} parameter is unused, please + * use{@link #ParquetTableLocationKey(URI, int, Map, SeekableChannelsProvider)} */ + @Deprecated(forRemoval = true) public ParquetTableLocationKey(@NotNull final URI parquetFileUri, final int order, @Nullable final Map> partitions, @NotNull final ParquetInstructions readInstructions, @NotNull final SeekableChannelsProvider channelsProvider) { + this(parquetFileUri, order, partitions, channelsProvider); + } + + /** + * Construct a new ParquetTableLocationKey for the supplied {@code parquetFileUri} and {@code partitions}. + * + * @param parquetFileUri The parquet file that backs the keyed location. Will be adjusted to an absolute path. + * @param order Explicit ordering index, taking precedence over other fields + * @param partitions The table partitions enclosing the table location keyed by {@code this}. Note that if this + * parameter is {@code null}, the location will be a member of no partitions. An ordered copy of the map will + * be made, so the calling code is free to mutate the map after this call + * @param channelsProvider the provider for reading the file + */ + public ParquetTableLocationKey( + @NotNull final URI parquetFileUri, + final int order, + @Nullable final Map> partitions, + @NotNull final SeekableChannelsProvider channelsProvider) { super(validateParquetFile(parquetFileUri), order, partitions); - this.channelsProvider = channelsProvider; + this.channelsProvider = Objects.requireNonNull(channelsProvider); } private static URI validateParquetFile(@NotNull final URI parquetFileUri) { diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/GCSSeekableChannelProviderPlugin.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/GCSSeekableChannelProviderPlugin.java index 6c3ca91ac8e..bdd954c6e08 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/GCSSeekableChannelProviderPlugin.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/GCSSeekableChannelProviderPlugin.java @@ -25,14 +25,14 @@ public final class GCSSeekableChannelProviderPlugin implements SeekableChannelsP S3Instructions.builder().endpointOverride(DEFAULT_ENDPOINT_OVERRIDE).build(); @Override - public boolean isCompatible(@NotNull final URI uri, @Nullable final Object config) { - return GCS_URI_SCHEME.equals(uri.getScheme()); + public boolean isCompatible(@NotNull final String uriScheme, @Nullable final Object config) { + return GCS_URI_SCHEME.equals(uriScheme); } @Override - public SeekableChannelsProvider createProvider(@NotNull final URI uri, @Nullable final Object config) { - if (!isCompatible(uri, config)) { - throw new IllegalArgumentException("Arguments not compatible, provided uri " + uri); + public SeekableChannelsProvider createProvider(@NotNull final String uriScheme, @Nullable final Object config) { + if (!isCompatible(uriScheme, config)) { + throw new IllegalArgumentException("Arguments not compatible, provided uri scheme " + uriScheme); } return new GCSSeekableChannelProvider(s3Instructions(config)); } diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProviderPlugin.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProviderPlugin.java index 619e31b3824..c7c7eac5ad7 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProviderPlugin.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProviderPlugin.java @@ -20,14 +20,14 @@ public final class S3SeekableChannelProviderPlugin implements SeekableChannelsPr static final String S3_URI_SCHEME = "s3"; @Override - public boolean isCompatible(@NotNull final URI uri, @Nullable final Object config) { - return S3_URI_SCHEME.equals(uri.getScheme()); + public boolean isCompatible(@NotNull final String uriScheme, @Nullable final Object config) { + return S3_URI_SCHEME.equals(uriScheme); } @Override - public SeekableChannelsProvider createProvider(@NotNull final URI uri, @Nullable final Object config) { - if (!isCompatible(uri, config)) { - throw new IllegalArgumentException("Arguments not compatible, provided uri " + uri); + public SeekableChannelsProvider createProvider(@NotNull final String uriScheme, @Nullable final Object config) { + if (!isCompatible(uriScheme, config)) { + throw new IllegalArgumentException("Arguments not compatible, provided uri scheme " + uriScheme); } if (config != null && !(config instanceof S3Instructions)) { throw new IllegalArgumentException("Only S3Instructions are valid when reading files from S3, provided " + diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleTestBase.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleTestBase.java index a0cf78b0f3a..aebd6ec5fdf 100644 --- a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleTestBase.java +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleTestBase.java @@ -46,7 +46,7 @@ void readSimpleFiles() final URI uri = uri("empty.txt"); final ByteBuffer buffer = ByteBuffer.allocate(1); try ( - final SeekableChannelsProvider providerImpl = providerImpl(uri); + final SeekableChannelsProvider providerImpl = providerImpl(); final SeekableChannelsProvider provider = CachedChannelProvider.create(providerImpl, 32); final SeekableChannelContext context = provider.makeContext(); final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) { @@ -56,7 +56,7 @@ void readSimpleFiles() { final URI uri = uri("hello/world.txt"); try ( - final SeekableChannelsProvider providerImpl = providerImpl(uri); + final SeekableChannelsProvider providerImpl = providerImpl(); final SeekableChannelsProvider provider = CachedChannelProvider.create(providerImpl, 32); final SeekableChannelContext context = provider.makeContext(); final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) { @@ -78,7 +78,7 @@ public int read() { final URI uri = uri("32MiB.bin"); final ByteBuffer buffer = ByteBuffer.allocate(1); try ( - final SeekableChannelsProvider providerImpl = providerImpl(uri); + final SeekableChannelsProvider providerImpl = providerImpl(); final SeekableChannelsProvider provider = CachedChannelProvider.create(providerImpl, 32); final SeekableChannelContext context = provider.makeContext(); final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) { @@ -97,7 +97,7 @@ void readWriteTest() throws IOException { final String content = "Hello, world!"; final byte[] contentBytes = content.getBytes(StandardCharsets.UTF_8); try ( - final SeekableChannelsProvider providerImpl = providerImpl(uri); + final SeekableChannelsProvider providerImpl = providerImpl(); final SeekableChannelsProvider provider = CachedChannelProvider.create(providerImpl, 32); final CompletableOutputStream outputStream = provider.getOutputStream(uri, 0)) { final int numBytes = 36 * 1024 * 1024; // 36 Mib -> Three 10-MiB parts + One 6-MiB part diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3SeekableChannelTestSetup.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3SeekableChannelTestSetup.java index 6cc9fe05fa8..9f4826476df 100644 --- a/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3SeekableChannelTestSetup.java +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3SeekableChannelTestSetup.java @@ -29,6 +29,8 @@ public abstract class S3SeekableChannelTestSetup { + protected static final String SCHEME = "s3"; + protected ExecutorService executor; protected S3AsyncClient asyncClient; protected String bucket; @@ -59,7 +61,7 @@ protected void uploadDirectory(final Path directory, final String prefix) } protected final URI uri(String key) { - return URI.create(String.format("s3://%s/%s", bucket, key)); + return URI.create(String.format("%s://%s/%s", SCHEME, bucket, key)); } protected final void putObject(String key, AsyncRequestBody body) @@ -68,10 +70,10 @@ protected final void putObject(String key, AsyncRequestBody body) TimeUnit.SECONDS); } - protected final SeekableChannelsProvider providerImpl(URI uri) { + protected final SeekableChannelsProvider providerImpl() { final S3SeekableChannelProviderPlugin plugin = new S3SeekableChannelProviderPlugin(); final S3Instructions instructions = s3Instructions(S3Instructions.builder()).build(); - return plugin.createProvider(uri, instructions); + return plugin.createProvider(SCHEME, instructions); } protected static ByteBuffer readAll(ReadableByteChannel channel, int maxBytes) throws IOException { diff --git a/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProviderPlugin.java b/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProviderPlugin.java index 32ec7a3564a..1fba21ba4c6 100644 --- a/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProviderPlugin.java +++ b/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProviderPlugin.java @@ -10,8 +10,6 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import java.net.URI; - import static io.deephaven.base.FileUtils.FILE_URI_SCHEME; /** @@ -21,14 +19,14 @@ public final class TrackedSeekableChannelsProviderPlugin implements SeekableChannelsProviderPlugin { @Override - public boolean isCompatible(@NotNull final URI uri, @Nullable final Object object) { - return FILE_URI_SCHEME.equals(uri.getScheme()); + public boolean isCompatible(@NotNull final String uriScheme, @Nullable final Object object) { + return FILE_URI_SCHEME.equals(uriScheme); } @Override - public SeekableChannelsProvider createProvider(@NotNull final URI uri, @Nullable final Object object) { - if (!isCompatible(uri, object)) { - throw new IllegalArgumentException("Arguments not compatible, provided uri " + uri); + public SeekableChannelsProvider createProvider(@NotNull final String uriScheme, @Nullable final Object object) { + if (!isCompatible(uriScheme, object)) { + throw new IllegalArgumentException("Arguments not compatible, provided uri scheme " + uriScheme); } if (object != null) { throw new IllegalArgumentException("Arguments not compatible, provided non null object");