diff --git a/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProviderLoader.java b/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProviderLoader.java index 98ab4d8c583..ee2508ce2ca 100644 --- a/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProviderLoader.java +++ b/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProviderLoader.java @@ -6,7 +6,6 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.ServiceLoader; @@ -19,11 +18,22 @@ public final class SeekableChannelsProviderLoader { private static volatile SeekableChannelsProviderLoader instance; + /** + * Get a static a {@link SeekableChannelsProviderLoader} instance that is loading with + * {@link SeekableChannelsProviderPlugin} provided via {@link ServiceLoader#load(Class)}. + * + * @return The {@link SeekableChannelsProviderLoader} instance. + */ public static SeekableChannelsProviderLoader getInstance() { - if (instance == null) { - instance = new SeekableChannelsProviderLoader(); + SeekableChannelsProviderLoader localInstance; + if ((localInstance = instance) == null) { + synchronized (SeekableChannelsProviderLoader.class) { + if ((localInstance = instance) == null) { + instance = localInstance = new SeekableChannelsProviderLoader(); + } + } } - return instance; + return localInstance; } private final List providers; @@ -37,21 +47,19 @@ private SeekableChannelsProviderLoader() { } /** - * Create a new {@link SeekableChannelsProvider} compatible for reading from and writing to the given URI, using the - * plugins loaded by the {@link ServiceLoader}. For example, for a "S3" URI, we will create a - * {@link SeekableChannelsProvider} which can read files from S3. + * Create a new {@link SeekableChannelsProvider} compatible for reading from and writing to the given URI scheme. + * For example, for a "S3" URI, we will create a {@link SeekableChannelsProvider} which can read files from S3. * - * @param uri The URI + * @param uriScheme The URI scheme * @param specialInstructions An optional object to pass special instructions to the provider. - * @return A {@link SeekableChannelsProvider} for the given URI. + * @return A {@link SeekableChannelsProvider} for the given URI scheme. */ - public SeekableChannelsProvider fromServiceLoader(@NotNull final URI uri, - @Nullable final Object specialInstructions) { + public SeekableChannelsProvider load(@NotNull final String uriScheme, @Nullable final Object specialInstructions) { for (final SeekableChannelsProviderPlugin plugin : providers) { - if (plugin.isCompatible(uri, specialInstructions)) { - return plugin.createProvider(uri, specialInstructions); + if (plugin.isCompatible(uriScheme, specialInstructions)) { + return plugin.createProvider(uriScheme, specialInstructions); } } - throw new UnsupportedOperationException("No plugin found for uri: " + uri); + throw new UnsupportedOperationException("No plugin found for uri scheme: " + uriScheme); } } diff --git a/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProviderPlugin.java b/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProviderPlugin.java index 4ae6ae6d76b..e0f21741cb8 100644 --- a/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProviderPlugin.java +++ b/Util/channel/src/main/java/io/deephaven/util/channel/SeekableChannelsProviderPlugin.java @@ -6,20 +6,18 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import java.net.URI; - /** * A plugin interface for providing {@link SeekableChannelsProvider} implementations for different URI schemes, e.g. S3. * Check out {@link SeekableChannelsProviderLoader} for more details. */ public interface SeekableChannelsProviderPlugin { /** - * Check if this plugin is compatible with the given URI and config object. + * Check if this plugin is compatible with the given URI scheme and config object. */ - boolean isCompatible(@NotNull URI uri, @Nullable Object config); + boolean isCompatible(@NotNull String uriScheme, @Nullable Object config); /** - * Create a {@link SeekableChannelsProvider} for the given URI and config object. + * Create a {@link SeekableChannelsProvider} for the given URI scheme and config object. */ - SeekableChannelsProvider createProvider(@NotNull URI uri, @Nullable Object object); + SeekableChannelsProvider createProvider(@NotNull String uriScheme, @Nullable Object object); } diff --git a/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/S3InstructionsProviderPlugin.java b/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/S3InstructionsProviderPlugin.java index bd72daff66c..6e8b1d1bf1f 100644 --- a/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/S3InstructionsProviderPlugin.java +++ b/extensions/iceberg/s3/src/main/java/io/deephaven/iceberg/util/S3InstructionsProviderPlugin.java @@ -12,7 +12,6 @@ import org.apache.iceberg.aws.s3.S3FileIOProperties; import org.jetbrains.annotations.NotNull; -import java.net.URI; import java.util.Map; /** @@ -25,7 +24,8 @@ @SuppressWarnings("unused") public final class S3InstructionsProviderPlugin implements DataInstructionsProviderPlugin { @Override - public S3Instructions createInstructions(@NotNull final URI uri, @NotNull final Map properties) { + public S3Instructions createInstructions(@NotNull final String uriScheme, + @NotNull final Map properties) { final S3Instructions s3Instructions = DeephavenAwsClientFactory.getInstructions(properties).orElse(null); if (s3Instructions != null) { return s3Instructions; @@ -33,9 +33,9 @@ public S3Instructions createInstructions(@NotNull final URI uri, @NotNull final // If the URI scheme is "s3","s3a","s3n" or if the properties contain one of these specific keys, we can // create a useful S3Instructions object. - if (uri.getScheme().equals("s3") - || uri.getScheme().equals("s3a") - || uri.getScheme().equals("s3n") + if (uriScheme.equals("s3") + || uriScheme.equals("s3a") + || uriScheme.equals("s3n") || properties.containsKey(AwsClientProperties.CLIENT_REGION) || properties.containsKey(S3FileIOProperties.ACCESS_KEY_ID) || properties.containsKey(S3FileIOProperties.SECRET_ACCESS_KEY) diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/internal/DataInstructionsProviderLoader.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/internal/DataInstructionsProviderLoader.java index 4ae28e0e044..b2c7cefcbf3 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/internal/DataInstructionsProviderLoader.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/internal/DataInstructionsProviderLoader.java @@ -5,12 +5,15 @@ import org.jetbrains.annotations.NotNull; -import java.net.URI; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.ServiceLoader; /** * A service loader class for loading {@link DataInstructionsProviderPlugin} implementations at runtime which provide - * {@link DataInstructionsProviderLoader} implementations for different URI paths. + * {@link DataInstructionsProviderLoader} implementations for different URI schemes. */ public final class DataInstructionsProviderLoader { /** @@ -21,30 +24,33 @@ public final class DataInstructionsProviderLoader { /** * Ensure that the {@link DataInstructionsProviderPlugin plugins} are loaded exactly once. */ - private static void ensureProviders() { - if (cachedProviders == null) { + private static List ensureProviders() { + List localProviders; + if ((localProviders = cachedProviders) == null) { synchronized (DataInstructionsProviderLoader.class) { - if (cachedProviders == null) { - cachedProviders = new ArrayList<>(); + if ((localProviders = cachedProviders) == null) { + localProviders = new ArrayList<>(); // Load the plugins for (final DataInstructionsProviderPlugin plugin : ServiceLoader .load(DataInstructionsProviderPlugin.class)) { - cachedProviders.add(plugin); + localProviders.add(plugin); } + cachedProviders = localProviders; } } } + return localProviders; } /** - * Get a {@link DataInstructionsProviderLoader} instance for the given property collection. + * Create a {@link DataInstructionsProviderLoader} instance for the given property collection with a static list of + * {@link DataInstructionsProviderPlugin} provided via {@link ServiceLoader#load(Class)}. * * @param properties The property collection. * @return A {@link DataInstructionsProviderLoader} instance. */ public static DataInstructionsProviderLoader create(final Map properties) { - ensureProviders(); - return new DataInstructionsProviderLoader(properties); + return new DataInstructionsProviderLoader(properties, ensureProviders()); } /** @@ -62,27 +68,28 @@ public static DataInstructionsProviderLoader create(final Map pr * * @param properties The property collection. */ - private DataInstructionsProviderLoader(final Map properties) { - this.properties = properties; - providers = cachedProviders; + private DataInstructionsProviderLoader( + final Map properties, + final List providers) { + this.properties = Objects.requireNonNull(properties); + this.providers = Objects.requireNonNull(providers); } /** - * Create a new data instructions object compatible with reading from and writing to the given URI, using the - * plugins loaded by the {@link ServiceLoader}. For example, for a "S3" URI, we will create an - * {@code S3Instructions} object which can read files from S3. + * Create a new data instructions object compatible with reading from and writing to the given URI scheme. For + * example, for an "S3" URI scheme will create an {@code S3Instructions} object which can read files from S3. * - * @param uri The URI - * @return A data instructions object for the given URI or null if one cannot be found + * @param uriScheme The URI scheme + * @return A data instructions object for the given URI scheme or null if one cannot be found */ - public Object fromServiceLoader(@NotNull final URI uri) { + public Object load(@NotNull final String uriScheme) { for (final DataInstructionsProviderPlugin plugin : providers) { - final Object pluginInstructions = plugin.createInstructions(uri, properties); + final Object pluginInstructions = plugin.createInstructions(uriScheme, properties); if (pluginInstructions != null) { return pluginInstructions; } } - // No plugin found for this URI and property collection. + // No plugin found for this URI scheme and property collection. return null; } } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/internal/DataInstructionsProviderPlugin.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/internal/DataInstructionsProviderPlugin.java index b0e48cc9166..632312d6605 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/internal/DataInstructionsProviderPlugin.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/internal/DataInstructionsProviderPlugin.java @@ -5,16 +5,15 @@ import org.jetbrains.annotations.NotNull; -import java.net.URI; import java.util.Map; /** * A plugin interface for providing {@link DataInstructionsProviderPlugin} implementations for different property - * collections and URI values. Check out {@link DataInstructionsProviderLoader} for more details. + * collections and URI schemes. Check out {@link DataInstructionsProviderLoader} for more details. */ public interface DataInstructionsProviderPlugin { /** - * Create a data instructions object for the given URI. + * Create a data instructions object for the given URI scheme. */ - Object createInstructions(@NotNull URI uri, @NotNull final Map properties); + Object createInstructions(@NotNull String uriScheme, @NotNull Map properties); } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java index 970ad673581..647e7da12a0 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java @@ -14,6 +14,8 @@ import io.deephaven.iceberg.util.IcebergTableAdapter; import io.deephaven.parquet.table.ParquetInstructions; import io.deephaven.iceberg.internal.DataInstructionsProviderLoader; +import io.deephaven.util.channel.SeekableChannelsProvider; +import io.deephaven.util.channel.SeekableChannelsProviderLoader; import org.apache.iceberg.*; import org.apache.iceberg.io.FileIO; import org.jetbrains.annotations.NotNull; @@ -38,65 +40,37 @@ public abstract class IcebergBaseLayout implements TableLocationKeyFinder cache; + private final Map cache; /** - * The {@link Snapshot} from which to discover data files. + * The {@link ParquetInstructions} object that will be used to read any Parquet data files in this table. */ - Snapshot snapshot; + private final ParquetInstructions parquetInstructions; /** - * The data instructions provider for creating instructions from URI and user-supplied properties. + * The {@link SeekableChannelsProvider} object that will be used for {@link IcebergTableParquetLocationKey} + * creation. */ - final DataInstructionsProviderLoader dataInstructionsProvider; + private final SeekableChannelsProvider channelsProvider; /** - * The {@link ParquetInstructions} object that will be used to read any Parquet data files in this table. Only - * accessed while synchronized on {@code this}. + * The {@link Snapshot} from which to discover data files. */ - ParquetInstructions parquetInstructions; + Snapshot snapshot; protected IcebergTableLocationKey locationKey( final org.apache.iceberg.FileFormat format, final URI fileUri, @Nullable final Map> partitions) { - if (format == org.apache.iceberg.FileFormat.PARQUET) { - if (parquetInstructions == null) { - // Start with user-supplied instructions (if provided). - final ParquetInstructions.Builder builder = new ParquetInstructions.Builder(); - - // Add the table definition. - builder.setTableDefinition(tableDef); - - // Add any column rename mappings. - if (!instructions.columnRenames().isEmpty()) { - for (Map.Entry entry : instructions.columnRenames().entrySet()) { - builder.addColumnNameMapping(entry.getKey(), entry.getValue()); - } - } - - // Add the data instructions if provided as part of the IcebergReadInstructions. - if (instructions.dataInstructions().isPresent()) { - builder.setSpecialInstructions(instructions.dataInstructions().get()); - } else { - // Attempt to create data instructions from the properties collection and URI. - final Object dataInstructions = dataInstructionsProvider.fromServiceLoader(fileUri); - if (dataInstructions != null) { - builder.setSpecialInstructions(dataInstructions); - } - } - - parquetInstructions = builder.build(); - } - return new IcebergTableParquetLocationKey(fileUri, 0, partitions, parquetInstructions); + return new IcebergTableParquetLocationKey(fileUri, 0, partitions, parquetInstructions, channelsProvider); } throw new UnsupportedOperationException(String.format("%s:%d - an unsupported file format %s for URI '%s'", tableAdapter, snapshot.snapshotId(), format, fileUri)); @@ -112,23 +86,46 @@ public IcebergBaseLayout( @NotNull final DataInstructionsProviderLoader dataInstructionsProvider) { this.tableAdapter = tableAdapter; this.snapshot = tableAdapter.getSnapshot(instructions); - this.instructions = instructions; - this.dataInstructionsProvider = dataInstructionsProvider; this.tableDef = tableAdapter.definition(instructions); - + this.uriScheme = locationUri(tableAdapter.icebergTable()).getScheme(); + // Add the data instructions if provided as part of the IcebergReadInstructions, or else attempt to create + // data instructions from the properties collection and URI scheme. + final Object specialInstructions = instructions.dataInstructions() + .orElseGet(() -> dataInstructionsProvider.load(uriScheme)); + { + // Start with user-supplied instructions (if provided). + final ParquetInstructions.Builder builder = new ParquetInstructions.Builder(); + + // Add the table definition. + builder.setTableDefinition(tableDef); + + // Add any column rename mappings. + if (!instructions.columnRenames().isEmpty()) { + for (Map.Entry entry : instructions.columnRenames().entrySet()) { + builder.addColumnNameMapping(entry.getKey(), entry.getValue()); + } + } + if (specialInstructions != null) { + builder.setSpecialInstructions(specialInstructions); + } + this.parquetInstructions = builder.build(); + } + this.channelsProvider = SeekableChannelsProviderLoader.getInstance().load(uriScheme, specialInstructions); this.cache = new HashMap<>(); } abstract IcebergTableLocationKey keyFromDataFile(DataFile df, URI fileUri); - @NotNull - private URI dataFileUri(@NotNull DataFile df) { - String path = df.path().toString(); - final FileIO fileIO = tableAdapter.icebergTable().io(); - if (fileIO instanceof RelativeFileIO) { - path = ((RelativeFileIO) fileIO).absoluteLocation(path); - } - return FileUtils.convertToURI(path, false); + private static String path(String path, FileIO io) { + return io instanceof RelativeFileIO ? ((RelativeFileIO) io).absoluteLocation(path) : path; + } + + private static URI locationUri(Table table) { + return FileUtils.convertToURI(path(table.location(), table.io()), true); + } + + private static URI dataFileUri(Table table, DataFile dataFile) { + return FileUtils.convertToURI(path(dataFile.path().toString(), table.io()), false); } @Override @@ -149,7 +146,12 @@ public synchronized void findKeys(@NotNull final Consumer reader = ManifestFiles.read(manifestFile, table.io())) { for (DataFile df : reader) { - final URI fileUri = dataFileUri(df); + final URI fileUri = dataFileUri(table, df); + if (!uriScheme.equals(fileUri.getScheme())) { + throw new TableDataException(String.format( + "%s:%d - multiple URI schemes are not currently supported. uriScheme=%s, fileUri=%s", + table, snapshot.snapshotId(), uriScheme, fileUri)); + } final IcebergTableLocationKey locationKey = cache.computeIfAbsent(fileUri, uri -> keyFromDataFile(df, fileUri)); if (locationKey != null) { diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableParquetLocationKey.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableParquetLocationKey.java index e356d0ecb92..52ae74b0e8f 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableParquetLocationKey.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/location/IcebergTableParquetLocationKey.java @@ -6,6 +6,7 @@ import io.deephaven.engine.table.impl.locations.TableLocationKey; import io.deephaven.parquet.table.ParquetInstructions; import io.deephaven.parquet.table.location.ParquetTableLocationKey; +import io.deephaven.util.channel.SeekableChannelsProvider; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -29,13 +30,15 @@ public class IcebergTableParquetLocationKey extends ParquetTableLocationKey impl * parameter is {@code null}, the location will be a member of no partitions. An ordered copy of the map will * be made, so the calling code is free to mutate the map after this call * @param readInstructions the instructions for customizations while reading + * @param channelsProvider the provider for reading the file */ public IcebergTableParquetLocationKey( @NotNull final URI fileUri, final int order, @Nullable final Map> partitions, - @NotNull final ParquetInstructions readInstructions) { - super(fileUri, order, partitions, readInstructions); + @NotNull final ParquetInstructions readInstructions, + @NotNull final SeekableChannelsProvider channelsProvider) { + super(fileUri, order, partitions, channelsProvider); this.readInstructions = readInstructions; } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java index 0a4821629f7..a03294a28ed 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java @@ -575,7 +575,7 @@ private static void writeTablesImpl( } // Assuming all destination URIs have the same scheme, and will use the same channels provider instance final SeekableChannelsProvider channelsProvider = SeekableChannelsProviderLoader.getInstance() - .fromServiceLoader(destinations[0], writeInstructions.getSpecialInstructions()); + .load(destinations[0].getScheme(), writeInstructions.getSpecialInstructions()); final ParquetMetadataFileWriter metadataFileWriter; if (writeInstructions.generateMetadataFiles()) { @@ -958,7 +958,7 @@ private static Table readPartitionedTableDirectory( // Check if the directory has a metadata file final URI metadataFileURI = tableRootDirectory.resolve(METADATA_FILE_NAME); final SeekableChannelsProvider channelsProvider = - SeekableChannelsProviderLoader.getInstance().fromServiceLoader(tableRootDirectory, + SeekableChannelsProviderLoader.getInstance().load(tableRootDirectory.getScheme(), readInstructions.getSpecialInstructions()); if (channelsProvider.exists(metadataFileURI)) { return readPartitionedTableWithMetadata(metadataFileURI, readInstructions, channelsProvider); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/DeephavenNestedPartitionLayout.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/DeephavenNestedPartitionLayout.java index 830e3e719cf..31f367763bf 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/DeephavenNestedPartitionLayout.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/DeephavenNestedPartitionLayout.java @@ -3,6 +3,7 @@ // package io.deephaven.parquet.table.layout; +import io.deephaven.base.FileUtils; import io.deephaven.engine.table.impl.locations.TableDataException; import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder; import io.deephaven.engine.table.impl.locations.local.URITableLocationKey; @@ -53,16 +54,16 @@ public static DeephavenNestedPartitionLayout forParquet @NotNull final String columnPartitionKey, @Nullable final Predicate internalPartitionValueFilter, @NotNull final ParquetInstructions readInstructions) { - final SeekableChannelsProvider channelsProvider = - SeekableChannelsProviderLoader.getInstance().fromServiceLoader(convertToURI(tableRootDirectory, true), - readInstructions.getSpecialInstructions()); + // noinspection resource + final SeekableChannelsProvider channelsProvider = SeekableChannelsProviderLoader.getInstance() + .load(FileUtils.FILE_URI_SCHEME, readInstructions.getSpecialInstructions()); return new DeephavenNestedPartitionLayout<>(tableRootDirectory, tableName, columnPartitionKey, internalPartitionValueFilter) { @Override protected ParquetTableLocationKey makeKey(@NotNull Path tableLeafDirectory, @NotNull Map> partitions) { final URI fileURI = convertToURI(tableLeafDirectory.resolve(PARQUET_FILE_NAME), false); - return new ParquetTableLocationKey(fileURI, 0, partitions, readInstructions, channelsProvider); + return new ParquetTableLocationKey(fileURI, 0, partitions, channelsProvider); } }; } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetFlatPartitionedLayout.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetFlatPartitionedLayout.java index 71f0068383b..cd0ec0add51 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetFlatPartitionedLayout.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetFlatPartitionedLayout.java @@ -31,7 +31,6 @@ public final class ParquetFlatPartitionedLayout implements TableLocationKeyFinde private final URI tableRootDirectory; private final Map cache; - private final ParquetInstructions readInstructions; private final SeekableChannelsProvider channelsProvider; /** @@ -42,9 +41,8 @@ public ParquetFlatPartitionedLayout(@NotNull final URI tableRootDirectoryURI, @NotNull final ParquetInstructions readInstructions) { this.tableRootDirectory = tableRootDirectoryURI; this.cache = Collections.synchronizedMap(new HashMap<>()); - this.readInstructions = readInstructions; - this.channelsProvider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader(tableRootDirectory, - readInstructions.getSpecialInstructions()); + this.channelsProvider = SeekableChannelsProviderLoader.getInstance() + .load(tableRootDirectory.getScheme(), readInstructions.getSpecialInstructions()); } public String toString() { @@ -80,6 +78,6 @@ public void findKeys(@NotNull final Consumer locationKe } private ParquetTableLocationKey locationKey(@NotNull final URI uri) { - return new ParquetTableLocationKey(uri, 0, null, readInstructions, channelsProvider); + return new ParquetTableLocationKey(uri, 0, null, channelsProvider); } } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetKeyValuePartitionedLayout.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetKeyValuePartitionedLayout.java index c2b10421600..310fc015161 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetKeyValuePartitionedLayout.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetKeyValuePartitionedLayout.java @@ -65,22 +65,20 @@ public static ParquetKeyValuePartitionedLayout create( @NotNull final ParquetInstructions readInstructions, @Nullable SeekableChannelsProvider channelsProvider) { if (channelsProvider == null) { - channelsProvider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader(tableRootDirectory, - readInstructions.getSpecialInstructions()); + // noinspection resource + channelsProvider = SeekableChannelsProviderLoader.getInstance() + .load(tableRootDirectory.getScheme(), readInstructions.getSpecialInstructions()); } - return new ParquetKeyValuePartitionedLayout(tableRootDirectory, tableDefinition, readInstructions, - channelsProvider); + return new ParquetKeyValuePartitionedLayout(tableRootDirectory, tableDefinition, channelsProvider); } private ParquetKeyValuePartitionedLayout( @NotNull final URI tableRootDirectory, @NotNull final TableDefinition tableDefinition, - @NotNull final ParquetInstructions readInstructions, @NotNull final SeekableChannelsProvider channelsProvider) { super(tableRootDirectory, () -> new LocationTableBuilderDefinition(tableDefinition), - (uri, partitions) -> new ParquetTableLocationKey(uri, 0, partitions, readInstructions, - channelsProvider), + (uri, partitions) -> new ParquetTableLocationKey(uri, 0, partitions, channelsProvider), Math.toIntExact(tableDefinition.getColumnStream().filter(ColumnDefinition::isPartitioning).count())); this.channelsProvider = channelsProvider; } @@ -102,22 +100,20 @@ public static ParquetKeyValuePartitionedLayout create( @NotNull final ParquetInstructions readInstructions, @Nullable SeekableChannelsProvider channelsProvider) { if (channelsProvider == null) { - channelsProvider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader(tableRootDirectory, - readInstructions.getSpecialInstructions()); + // noinspection resource + channelsProvider = SeekableChannelsProviderLoader.getInstance() + .load(tableRootDirectory.getScheme(), readInstructions.getSpecialInstructions()); } - return new ParquetKeyValuePartitionedLayout(tableRootDirectory, maxPartitioningLevels, readInstructions, - channelsProvider); + return new ParquetKeyValuePartitionedLayout(tableRootDirectory, maxPartitioningLevels, channelsProvider); } private ParquetKeyValuePartitionedLayout( @NotNull final URI tableRootDirectory, final int maxPartitioningLevels, - @NotNull final ParquetInstructions readInstructions, @NotNull final SeekableChannelsProvider channelsProvider) { super(tableRootDirectory, () -> new LocationTableBuilderCsv(tableRootDirectory), - (uri, partitions) -> new ParquetTableLocationKey(uri, 0, partitions, readInstructions, - channelsProvider), + (uri, partitions) -> new ParquetTableLocationKey(uri, 0, partitions, channelsProvider), maxPartitioningLevels); this.channelsProvider = channelsProvider; } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java index 84546de519e..09700f17280 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java @@ -96,8 +96,9 @@ public static ParquetMetadataFileLayout create( final URI metadataFileURI = isMetadataFile ? source : directory.resolve(METADATA_FILE_NAME); final URI commonMetadataFileURI = isCommonMetadataFile ? source : directory.resolve(COMMON_METADATA_FILE_NAME); if (channelsProvider == null) { - channelsProvider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader(source, - inputInstructions.getSpecialInstructions()); + // noinspection resource + channelsProvider = SeekableChannelsProviderLoader.getInstance() + .load(source.getScheme(), inputInstructions.getSpecialInstructions()); } return new ParquetMetadataFileLayout(directory, metadataFileURI, commonMetadataFileURI, inputInstructions, channelsProvider); @@ -232,7 +233,7 @@ private ParquetMetadataFileLayout( } final URI partitionFileURI = resolve(tableRootDirectory, relativePathString); final ParquetTableLocationKey tlk = new ParquetTableLocationKey(partitionFileURI, - partitionOrder.getAndIncrement(), partitions, inputInstructions, channelsProvider); + partitionOrder.getAndIncrement(), partitions, channelsProvider); tlk.setFileReader(metadataFileReader); tlk.setMetadata(getParquetMetadataForFile(relativePathString, metadataFileMetadata)); tlk.setRowGroupIndices(rowGroupIndices); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java index 1681c0fc339..685d973cf35 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java @@ -21,6 +21,7 @@ import java.net.URI; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.stream.IntStream; import static io.deephaven.parquet.base.ParquetUtils.PARQUET_FILE_EXTENSION; @@ -54,9 +55,8 @@ public class ParquetTableLocationKey extends URITableLocationKey { public ParquetTableLocationKey(@NotNull final URI parquetFileUri, final int order, @Nullable final Map> partitions, @NotNull final ParquetInstructions readInstructions) { - this(parquetFileUri, order, partitions, readInstructions, - SeekableChannelsProviderLoader.getInstance().fromServiceLoader(parquetFileUri, - readInstructions.getSpecialInstructions())); + this(parquetFileUri, order, partitions, SeekableChannelsProviderLoader.getInstance() + .load(parquetFileUri.getScheme(), readInstructions.getSpecialInstructions())); } /** @@ -67,15 +67,36 @@ public ParquetTableLocationKey(@NotNull final URI parquetFileUri, final int orde * @param partitions The table partitions enclosing the table location keyed by {@code this}. Note that if this * parameter is {@code null}, the location will be a member of no partitions. An ordered copy of the map will * be made, so the calling code is free to mutate the map after this call - * @param readInstructions the instructions for customizations while reading + * @param readInstructions the instructions for customizations while reading, unused * @param channelsProvider the provider for reading the file + * @deprecated the {@code readInstructions} parameter is unused, please + * use{@link #ParquetTableLocationKey(URI, int, Map, SeekableChannelsProvider)} */ + @Deprecated(forRemoval = true) public ParquetTableLocationKey(@NotNull final URI parquetFileUri, final int order, @Nullable final Map> partitions, @NotNull final ParquetInstructions readInstructions, @NotNull final SeekableChannelsProvider channelsProvider) { + this(parquetFileUri, order, partitions, channelsProvider); + } + + /** + * Construct a new ParquetTableLocationKey for the supplied {@code parquetFileUri} and {@code partitions}. + * + * @param parquetFileUri The parquet file that backs the keyed location. Will be adjusted to an absolute path. + * @param order Explicit ordering index, taking precedence over other fields + * @param partitions The table partitions enclosing the table location keyed by {@code this}. Note that if this + * parameter is {@code null}, the location will be a member of no partitions. An ordered copy of the map will + * be made, so the calling code is free to mutate the map after this call + * @param channelsProvider the provider for reading the file + */ + public ParquetTableLocationKey( + @NotNull final URI parquetFileUri, + final int order, + @Nullable final Map> partitions, + @NotNull final SeekableChannelsProvider channelsProvider) { super(validateParquetFile(parquetFileUri), order, partitions); - this.channelsProvider = channelsProvider; + this.channelsProvider = Objects.requireNonNull(channelsProvider); } private static URI validateParquetFile(@NotNull final URI parquetFileUri) { diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/GCSSeekableChannelProviderPlugin.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/GCSSeekableChannelProviderPlugin.java index 6c3ca91ac8e..bdd954c6e08 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/GCSSeekableChannelProviderPlugin.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/GCSSeekableChannelProviderPlugin.java @@ -25,14 +25,14 @@ public final class GCSSeekableChannelProviderPlugin implements SeekableChannelsP S3Instructions.builder().endpointOverride(DEFAULT_ENDPOINT_OVERRIDE).build(); @Override - public boolean isCompatible(@NotNull final URI uri, @Nullable final Object config) { - return GCS_URI_SCHEME.equals(uri.getScheme()); + public boolean isCompatible(@NotNull final String uriScheme, @Nullable final Object config) { + return GCS_URI_SCHEME.equals(uriScheme); } @Override - public SeekableChannelsProvider createProvider(@NotNull final URI uri, @Nullable final Object config) { - if (!isCompatible(uri, config)) { - throw new IllegalArgumentException("Arguments not compatible, provided uri " + uri); + public SeekableChannelsProvider createProvider(@NotNull final String uriScheme, @Nullable final Object config) { + if (!isCompatible(uriScheme, config)) { + throw new IllegalArgumentException("Arguments not compatible, provided uri scheme " + uriScheme); } return new GCSSeekableChannelProvider(s3Instructions(config)); } diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProviderPlugin.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProviderPlugin.java index 619e31b3824..c7c7eac5ad7 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProviderPlugin.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProviderPlugin.java @@ -20,14 +20,14 @@ public final class S3SeekableChannelProviderPlugin implements SeekableChannelsPr static final String S3_URI_SCHEME = "s3"; @Override - public boolean isCompatible(@NotNull final URI uri, @Nullable final Object config) { - return S3_URI_SCHEME.equals(uri.getScheme()); + public boolean isCompatible(@NotNull final String uriScheme, @Nullable final Object config) { + return S3_URI_SCHEME.equals(uriScheme); } @Override - public SeekableChannelsProvider createProvider(@NotNull final URI uri, @Nullable final Object config) { - if (!isCompatible(uri, config)) { - throw new IllegalArgumentException("Arguments not compatible, provided uri " + uri); + public SeekableChannelsProvider createProvider(@NotNull final String uriScheme, @Nullable final Object config) { + if (!isCompatible(uriScheme, config)) { + throw new IllegalArgumentException("Arguments not compatible, provided uri scheme " + uriScheme); } if (config != null && !(config instanceof S3Instructions)) { throw new IllegalArgumentException("Only S3Instructions are valid when reading files from S3, provided " + diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleTestBase.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleTestBase.java index a0cf78b0f3a..aebd6ec5fdf 100644 --- a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleTestBase.java +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleTestBase.java @@ -46,7 +46,7 @@ void readSimpleFiles() final URI uri = uri("empty.txt"); final ByteBuffer buffer = ByteBuffer.allocate(1); try ( - final SeekableChannelsProvider providerImpl = providerImpl(uri); + final SeekableChannelsProvider providerImpl = providerImpl(); final SeekableChannelsProvider provider = CachedChannelProvider.create(providerImpl, 32); final SeekableChannelContext context = provider.makeContext(); final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) { @@ -56,7 +56,7 @@ void readSimpleFiles() { final URI uri = uri("hello/world.txt"); try ( - final SeekableChannelsProvider providerImpl = providerImpl(uri); + final SeekableChannelsProvider providerImpl = providerImpl(); final SeekableChannelsProvider provider = CachedChannelProvider.create(providerImpl, 32); final SeekableChannelContext context = provider.makeContext(); final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) { @@ -78,7 +78,7 @@ public int read() { final URI uri = uri("32MiB.bin"); final ByteBuffer buffer = ByteBuffer.allocate(1); try ( - final SeekableChannelsProvider providerImpl = providerImpl(uri); + final SeekableChannelsProvider providerImpl = providerImpl(); final SeekableChannelsProvider provider = CachedChannelProvider.create(providerImpl, 32); final SeekableChannelContext context = provider.makeContext(); final SeekableByteChannel readChannel = provider.getReadChannel(context, uri)) { @@ -97,7 +97,7 @@ void readWriteTest() throws IOException { final String content = "Hello, world!"; final byte[] contentBytes = content.getBytes(StandardCharsets.UTF_8); try ( - final SeekableChannelsProvider providerImpl = providerImpl(uri); + final SeekableChannelsProvider providerImpl = providerImpl(); final SeekableChannelsProvider provider = CachedChannelProvider.create(providerImpl, 32); final CompletableOutputStream outputStream = provider.getOutputStream(uri, 0)) { final int numBytes = 36 * 1024 * 1024; // 36 Mib -> Three 10-MiB parts + One 6-MiB part diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3SeekableChannelTestSetup.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3SeekableChannelTestSetup.java index 6cc9fe05fa8..9f4826476df 100644 --- a/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3SeekableChannelTestSetup.java +++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/testlib/S3SeekableChannelTestSetup.java @@ -29,6 +29,8 @@ public abstract class S3SeekableChannelTestSetup { + protected static final String SCHEME = "s3"; + protected ExecutorService executor; protected S3AsyncClient asyncClient; protected String bucket; @@ -59,7 +61,7 @@ protected void uploadDirectory(final Path directory, final String prefix) } protected final URI uri(String key) { - return URI.create(String.format("s3://%s/%s", bucket, key)); + return URI.create(String.format("%s://%s/%s", SCHEME, bucket, key)); } protected final void putObject(String key, AsyncRequestBody body) @@ -68,10 +70,10 @@ protected final void putObject(String key, AsyncRequestBody body) TimeUnit.SECONDS); } - protected final SeekableChannelsProvider providerImpl(URI uri) { + protected final SeekableChannelsProvider providerImpl() { final S3SeekableChannelProviderPlugin plugin = new S3SeekableChannelProviderPlugin(); final S3Instructions instructions = s3Instructions(S3Instructions.builder()).build(); - return plugin.createProvider(uri, instructions); + return plugin.createProvider(SCHEME, instructions); } protected static ByteBuffer readAll(ReadableByteChannel channel, int maxBytes) throws IOException { diff --git a/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProviderPlugin.java b/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProviderPlugin.java index 32ec7a3564a..1fba21ba4c6 100644 --- a/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProviderPlugin.java +++ b/extensions/trackedfile/src/main/java/io/deephaven/extensions/trackedfile/TrackedSeekableChannelsProviderPlugin.java @@ -10,8 +10,6 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import java.net.URI; - import static io.deephaven.base.FileUtils.FILE_URI_SCHEME; /** @@ -21,14 +19,14 @@ public final class TrackedSeekableChannelsProviderPlugin implements SeekableChannelsProviderPlugin { @Override - public boolean isCompatible(@NotNull final URI uri, @Nullable final Object object) { - return FILE_URI_SCHEME.equals(uri.getScheme()); + public boolean isCompatible(@NotNull final String uriScheme, @Nullable final Object object) { + return FILE_URI_SCHEME.equals(uriScheme); } @Override - public SeekableChannelsProvider createProvider(@NotNull final URI uri, @Nullable final Object object) { - if (!isCompatible(uri, object)) { - throw new IllegalArgumentException("Arguments not compatible, provided uri " + uri); + public SeekableChannelsProvider createProvider(@NotNull final String uriScheme, @Nullable final Object object) { + if (!isCompatible(uriScheme, object)) { + throw new IllegalArgumentException("Arguments not compatible, provided uri scheme " + uriScheme); } if (object != null) { throw new IllegalArgumentException("Arguments not compatible, provided non null object");