From f659f3cbd9b07b6b5f9cb8f52dfaf1f5da63d317 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Sun, 3 Mar 2024 18:34:33 -0600 Subject: [PATCH] Review with Ryan part 1 --- .../runner/TableBenchmarkState.java | 5 +- .../table/impl/QueryTableAggregationTest.java | 4 +- .../parquet/table/ParquetTableWriter.java | 8 +- .../deephaven/parquet/table/ParquetTools.java | 177 +++++++++++++----- .../deephaven/parquet/table/ParquetUtils.java | 18 ++ .../parquet/table/PartitionFormatter.java | 134 +++++++++++++ .../table/layout/ParquetFileHelper.java | 15 -- .../layout/ParquetFlatPartitionedLayout.java | 3 +- .../ParquetKeyValuePartitionedLayout.java | 5 +- .../layout/ParquetMetadataFileLayout.java | 5 +- .../location/ParquetTableLocationKey.java | 7 +- .../table/ParquetTableReadWriteTest.java | 78 +++++++- 12 files changed, 369 insertions(+), 90 deletions(-) create mode 100644 extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetUtils.java create mode 100644 extensions/parquet/table/src/main/java/io/deephaven/parquet/table/PartitionFormatter.java delete mode 100644 extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetFileHelper.java diff --git a/BenchmarkSupport/src/main/java/io/deephaven/benchmarking/runner/TableBenchmarkState.java b/BenchmarkSupport/src/main/java/io/deephaven/benchmarking/runner/TableBenchmarkState.java index b6bf36b1a5b..93a982c275d 100644 --- a/BenchmarkSupport/src/main/java/io/deephaven/benchmarking/runner/TableBenchmarkState.java +++ b/BenchmarkSupport/src/main/java/io/deephaven/benchmarking/runner/TableBenchmarkState.java @@ -8,11 +8,12 @@ import io.deephaven.engine.table.TableDefinition; import io.deephaven.parquet.table.ParquetTools; import io.deephaven.engine.util.TableTools; -import io.deephaven.parquet.table.ParquetTableWriter; import io.deephaven.engine.table.impl.util.TableBuilder; import io.deephaven.benchmarking.BenchmarkTools; import org.openjdk.jmh.infra.BenchmarkParams; +import static io.deephaven.parquet.table.ParquetUtils.PARQUET_FILE_EXTENSION; + import java.io.File; import java.io.IOException; import java.nio.file.Path; @@ -44,7 +45,7 @@ public void init() { public void logOutput() throws IOException { final Path outputPath = BenchmarkTools.dataDir() - .resolve(BenchmarkTools.getDetailOutputPath(benchmarkName) + ParquetTableWriter.PARQUET_FILE_EXTENSION); + .resolve(BenchmarkTools.getDetailOutputPath(benchmarkName) + PARQUET_FILE_EXTENSION); final Table output = outputBuilder.build(); ParquetTools.writeTable(output, outputPath.toFile(), RESULT_DEF); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableAggregationTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableAggregationTest.java index 6b4a2d9d278..00636c04aad 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableAggregationTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableAggregationTest.java @@ -37,7 +37,6 @@ import io.deephaven.engine.util.TableTools; import io.deephaven.engine.util.systemicmarking.SystemicObjectTracker; import io.deephaven.parquet.table.ParquetInstructions; -import io.deephaven.parquet.table.ParquetTableWriter; import io.deephaven.parquet.table.ParquetTools; import io.deephaven.parquet.table.layout.ParquetKeyValuePartitionedLayout; import io.deephaven.test.types.OutOfBandTest; @@ -75,6 +74,7 @@ import static io.deephaven.util.QueryConstants.*; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.*; +import static io.deephaven.parquet.table.ParquetUtils.PARQUET_FILE_EXTENSION; @Category(OutOfBandTest.class) public class QueryTableAggregationTest { @@ -3957,7 +3957,7 @@ private Table makeDiskTable(File directory) throws IOException { final TableDefaults result = testTable(stringCol("Symbol", syms), intCol("Value", values)); - final File outputFile = new File(directory, "disk_table" + ParquetTableWriter.PARQUET_FILE_EXTENSION); + final File outputFile = new File(directory, "disk_table" + PARQUET_FILE_EXTENSION); ParquetTools.writeTable(result, outputFile, result.getDefinition()); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java index f23e7304b63..e8fa8b8107e 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java @@ -57,7 +57,6 @@ public class ParquetTableWriter { public static final String BEGIN_POS = "dh_begin_pos"; public static final String END_POS = "dh_end_pos"; public static final String GROUPING_KEY = "dh_key"; - public static final String PARQUET_FILE_EXTENSION = ".parquet"; /** * Helper struct used to pass information about where to write the grouping files for each grouping column @@ -234,14 +233,15 @@ private static void write( * Get the parquet schema for a table * * @param table the input table - * @param definition the table definition + * @param definition the definition to use for creating the schema * @param instructions write instructions for the file * @return the parquet schema */ static MessageType getSchemaForTable(@NotNull final Table table, + @NotNull final TableDefinition definition, @NotNull final ParquetInstructions instructions) { - final Table pretransformTable = pretransformTable(table, table.getDefinition()); - return MappedSchema.create(new HashMap<>(), table.getDefinition(), pretransformTable.getRowSet(), + final Table pretransformTable = pretransformTable(table, definition); + return MappedSchema.create(new HashMap<>(), definition, pretransformTable.getRowSet(), pretransformTable.getColumnSourceMap(), instructions).getParquetSchema(); } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java index 69f9e4c986e..b05e53d5a6b 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java @@ -10,6 +10,7 @@ import io.deephaven.base.verify.Require; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.table.ColumnDefinition; +import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.PartitionedTable; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; @@ -59,10 +60,13 @@ import java.util.*; import java.util.stream.Collectors; +import static io.deephaven.engine.table.impl.partitioned.PartitionedTableCreatorImpl.CONSTITUENT; import static io.deephaven.parquet.base.ParquetFileReader.FILE_URI_SCHEME; import static io.deephaven.parquet.table.ParquetTableWriter.getSchemaForTable; +import static io.deephaven.parquet.table.ParquetUtils.PARQUET_FILE_EXTENSION; +import static io.deephaven.parquet.table.ParquetUtils.COMMON_METADATA_FILE_NAME; +import static io.deephaven.parquet.table.ParquetUtils.METADATA_FILE_NAME; import static io.deephaven.util.channel.SeekableChannelsProvider.convertToURI; -import static io.deephaven.parquet.table.ParquetTableWriter.PARQUET_FILE_EXTENSION; import static io.deephaven.util.type.TypeUtils.getUnboxedTypeIfBoxed; /** @@ -72,8 +76,6 @@ public class ParquetTools { private static final int MAX_PARTITIONING_LEVELS_INFERENCE = 32; - private static final String PARQUET_METADATA_FILE = "_metadata"; - private static final String PARQUET_COMMON_METADATA_FILE = "_common_metadata"; private ParquetTools() {} @@ -470,27 +472,48 @@ private static Map groupin * method will call {@link Table#partitionBy(String...) partitionBy} on all the partitioning columns. * * @param sourceTable The table to partition and write - * @param destinationDir The destination directory to store partitioned data in. Non-existing directories are - * created. + * @param destinationDir The destination root directory to store partitioned data in nested format. Non-existing + * directories are created. * @param baseName The base name for the individual partitioned tables. For example, a base name of "table" will - * result in files named "partition1/table.parquet", "partition2/table.parquet", etc. + * result in files named "PartitioningColumn=partition1/table.parquet", + * "PartitioningColumn=partition2/table.parquet", etc. * @param writeInstructions Write instructions for customizations while writing */ public static void writeKeyValuePartitionedTable(@NotNull final Table sourceTable, @NotNull final File destinationDir, @NotNull final String baseName, @NotNull final ParquetInstructions writeInstructions) { - // TODO Should I take an additional write instruction for partitioning column names in case they are different - // from the table's partitioning columns? Also, should I take partitioning column names from the user as an - // argument to this method? + writeKeyValuePartitionedTable(sourceTable, sourceTable.getDefinition(), destinationDir, baseName, + writeInstructions); + } - // Also, how should I get the baseName, should I move it to the write instructions. + /** + * Write table to disk in parquet format with {@link TableDefinition#getPartitioningColumns() partitioning columns} + * written as "key=value" format in a nested directory structure. To generate these individual partitions, this + * method will call {@link Table#partitionBy(String...) partitionBy} on all the partitioning columns in the provided + * table definition. + * + * @param sourceTable The table to partition and write + * @param definition table definition to use (instead of the one implied by the table itself) + * @param destinationDir The destination root directory to store partitioned data in nested format. Non-existing + * directories are created. + * @param baseName The base name for the individual partitioned tables. For example, a base name of "table" will + * result in files named "PartitioningColumn=partition1/table.parquet", + * "PartitioningColumn=partition2/table.parquet", etc. + * @param writeInstructions Write instructions for customizations while writing + */ + public static void writeKeyValuePartitionedTable(@NotNull final Table sourceTable, + @NotNull final TableDefinition definition, + @NotNull final File destinationDir, + @NotNull final String baseName, + @NotNull final ParquetInstructions writeInstructions) { + // TODO Also, how should I get the baseName, should I move it to the write instructions. // Pyarrow has this optional parameter "basename_template" used to generate basenames of written data files. // The token ‘{i}’ will be replaced with an automatically incremented integer for files in the same folder. // If not specified, it defaults to “someHash-{i}.parquet”. // pyspark has names of the form "part.{i}.parquet" and allows passing a naming function which takes an integer // and generates a file name. - final List> partitioningColumns = sourceTable.getDefinition().getPartitioningColumns(); + final List> partitioningColumns = definition.getPartitioningColumns(); if (partitioningColumns.isEmpty()) { throw new IllegalArgumentException("Table must have partitioning columns to write partitioned data"); } @@ -498,74 +521,130 @@ public static void writeKeyValuePartitionedTable(@NotNull final Table sourceTabl .map(ColumnDefinition::getName) .toArray(String[]::new); final PartitionedTable partitionedTable = sourceTable.partitionBy(partitioningColNames); - writeKeyValuePartitionedTable(partitionedTable, destinationDir, baseName, writeInstructions); + writeKeyValuePartitionedTable(partitionedTable, definition, destinationDir, baseName, writeInstructions); } /** - * Write a partitioned tables to disk in parquet format with all the {@link PartitionedTable#keyColumnNames() key + * Write a partitioned table to disk in parquet format with all the {@link PartitionedTable#keyColumnNames() key * columns} as "key=value" format in a nested directory structure. To generate the partitioned table, users can call * {@link Table#partitionBy(String...) partitionBy} on the required columns. * * @param partitionedTable The partitioned table to write - * @param destinationDir The destination directory to store partitioned data in. Non-existing directories are - * created. + * @param destinationDir The destination root directory to store partitioned data in nested format. Non-existing + * directories are created. * @param baseName The base name for the individual partitioned tables. For example, a base name of "table" will - * result in files named "partition1/table.parquet", "partition2/table.parquet", etc. + * result in files named "PartitioningColumn=partition1/table.parquet", + * "PartitioningColumn=partition2/table.parquet", etc. * @param writeInstructions Write instructions for customizations while writing */ public static void writeKeyValuePartitionedTable(@NotNull final PartitionedTable partitionedTable, @NotNull final File destinationDir, @NotNull final String baseName, @NotNull final ParquetInstructions writeInstructions) { + // Get key column definitions from the partitioned table definition and non-key column definitions from the + // constituent table definition, and combine them to build the overall table definition + final Set keyColumnNames = partitionedTable.keyColumnNames(); + final Collection> columnDefinitions = new ArrayList<>(keyColumnNames.size() + + partitionedTable.constituentDefinition().numColumns()); + partitionedTable.table().getDefinition().getColumns().stream() + .filter(columnDefinition -> keyColumnNames.contains(columnDefinition.getName())) + .forEach(columnDefinitions::add); + partitionedTable.constituentDefinition().getColumns().stream() + .filter(columnDefinition -> !keyColumnNames.contains(columnDefinition.getName())) + .forEach(columnDefinitions::add); + final TableDefinition definition = TableDefinition.of(columnDefinitions); + writeKeyValuePartitionedTable(partitionedTable, definition, destinationDir, baseName, writeInstructions); + } + + /** + * Write a partitioned table to disk in parquet format with all the {@link PartitionedTable#keyColumnNames() key + * columns} as "key=value" format in a nested directory structure. To generate the partitioned table, users can call + * {@link Table#partitionBy(String...) partitionBy} on the required columns. + * + * @param partitionedTable The partitioned table to write + * @param definition table definition to use (instead of the one implied by the table itself) + * @param destinationDir The destination root directory to store partitioned data in nested format. Non-existing + * directories are created. + * @param baseName The base name for the individual partitioned tables. For example, a base name of "table" will + * result in files named "PartitioningColumn=partition1/table.parquet", + * "PartitioningColumn=partition2/table.parquet", etc. + * @param writeInstructions Write instructions for customizations while writing + */ + public static void writeKeyValuePartitionedTable(@NotNull final PartitionedTable partitionedTable, + @NotNull final TableDefinition definition, + @NotNull final File destinationDir, + @NotNull final String baseName, + @NotNull final ParquetInstructions writeInstructions) { final String[] partitioningColumnNames = partitionedTable.keyColumnNames().toArray(String[]::new); - final Table keyTable; - if (partitionedTable.uniqueKeys()) { - keyTable = partitionedTable.table().view(partitioningColumnNames); - } else { - keyTable = partitionedTable.table().selectDistinct(partitioningColumnNames); + if (partitionedTable.table().numColumns() == partitioningColumnNames.length) { + throw new IllegalArgumentException( + "Cannot write a partitioned parquet table with no non-partitioning columns"); } - final Collection constituentTables = new ArrayList<>(); + // Note that there can be multiple constituents with the same key values, so cannot directly use the + // partitionedTable.constituentFor(keyValues) method, and we need to group them together + final Table withGroupConstituents = partitionedTable.table().groupBy(partitioningColumnNames); + final ColumnSource> consituentVectorColumnSource = + withGroupConstituents.getColumnSource(CONSTITUENT.name()); + if (consituentVectorColumnSource == null) { + throw new IllegalStateException("Partitioned table must have a constituent column"); + } + final Collection
partitionedData = new ArrayList<>(); final Collection destinations = new ArrayList<>(); - keyTable.getRowSet().forAllRowKeys(key -> { - final Object[] keyValues = Arrays.stream(partitioningColumnNames) - .map(keyTable::getColumnSource) - .map(colSource -> colSource.get(key)) - .toArray(); - final StringBuilder partitionTableRelativePath = new StringBuilder(); - for (int i = 0; i < partitioningColumnNames.length; i++) { - partitionTableRelativePath.append(File.separator) - .append(partitioningColumnNames[i]).append("=").append(keyValues[i]); + withGroupConstituents.getRowSet().forAllRowKeys(key -> { + final StringBuilder relativePathBuilder = new StringBuilder(); + for (final String partitioningColumnName : partitioningColumnNames) { + final ColumnSource partitioningColSource = + withGroupConstituents.getColumnSource(partitioningColumnName); + final String partitioningValue = PartitionFormatter.formatToString(partitioningColSource.get(key)); + relativePathBuilder.append(partitioningColumnName).append("=").append(partitioningValue) + .append(File.separator); + } + final String relativePath = relativePathBuilder.toString(); + final ObjectVector constituentVector = consituentVectorColumnSource.get(key); + if (constituentVector == null) { + throw new IllegalStateException("Grouped partitioned table must have a vector constituent column for" + + " key = " + key); + } + int count = 0; + final long numConstituents = constituentVector.size(); + for (final Table constituent : constituentVector) { + final File destination; + if (numConstituents == 1) { + destination = new File(destinationDir, relativePath + baseName + ".parquet"); + } else { + destination = new File(destinationDir, relativePath + baseName + "-part-" + count + ".parquet"); + } + destinations.add(destination); + partitionedData.add(constituent); + count++; } - partitionTableRelativePath.append(File.separator).append(baseName).append(".parquet"); - destinations.add(new File(destinationDir, partitionTableRelativePath.toString())); - constituentTables.add(partitionedTable.constituentFor(keyValues)); }); // If needed, generate schema for _common_metadata file from key table final ParquetInstructions updatedWriteInstructions; if (!ParquetInstructions.DEFAULT_METADATA_ROOT_DIR.equals(writeInstructions.getMetadataRootDir())) { - final MessageType commonSchema = getSchemaForTable(keyTable, writeInstructions); + final MessageType commonSchema = getSchemaForTable(partitionedTable.table(), definition, writeInstructions); updatedWriteInstructions = ParquetInstructions.createWithCommonSchema(writeInstructions, commonSchema); } else { updatedWriteInstructions = writeInstructions; } - final TableDefinition constituentDefinition = getNonKeyTableDefiniton(partitionedTable); + final TableDefinition partitionedTablesDefinition = getNonKeyTableDefiniton(partitionedTable.keyColumnNames(), + definition); ParquetTools.writeParquetTables( - constituentTables.toArray(Table[]::new), - constituentDefinition, + partitionedData.toArray(Table[]::new), + partitionedTablesDefinition, updatedWriteInstructions, destinations.toArray(File[]::new), - constituentDefinition.getGroupingColumnNamesArray()); + partitionedTablesDefinition.getGroupingColumnNamesArray()); } /** - * Create a table definition for the non-key columns of a partitioned table + * Using the provided definition and key column names, create a sub table definition for the non-key columns. */ - private static TableDefinition getNonKeyTableDefiniton(@NotNull final PartitionedTable partitionedTable) { - final Collection keyColumnNames = partitionedTable.keyColumnNames(); + private static TableDefinition getNonKeyTableDefiniton(@NotNull final Collection keyColumnNames, + @NotNull final TableDefinition definition) { final List> nonKeyColumnDefinition = - partitionedTable.constituentDefinition() - .getColumns().stream() + definition.getColumns().stream() .filter(columnDefinition -> { final String columnName = columnDefinition.getName(); return !keyColumnNames.contains(columnName); @@ -658,10 +737,10 @@ public static void writeParquetTables(@NotNull final Table[] sources, // Write the combined metadata files to shadow destinations final File metadataDestFile, shadowMetadataFile, commonMetadataDestFile, shadowCommonMetadataFile; if (writeMetadataFiles) { - metadataDestFile = new File(metadataRootDir, PARQUET_METADATA_FILE); + metadataDestFile = new File(metadataRootDir, METADATA_FILE_NAME); shadowMetadataFile = ParquetTools.getShadowFile(metadataDestFile); shadowFiles.add(shadowMetadataFile); - commonMetadataDestFile = new File(metadataRootDir, PARQUET_COMMON_METADATA_FILE); + commonMetadataDestFile = new File(metadataRootDir, COMMON_METADATA_FILE_NAME); shadowCommonMetadataFile = ParquetTools.getShadowFile(commonMetadataDestFile); shadowFiles.add(shadowCommonMetadataFile); metadataFileWriter.writeMetadataFiles(shadowMetadataFile, shadowCommonMetadataFile); @@ -781,17 +860,17 @@ private static Table readTableInternal( if (sourceFileName.endsWith(PARQUET_FILE_EXTENSION)) { return readSingleFileTable(source, instructions); } - if (sourceFileName.equals(ParquetMetadataFileLayout.METADATA_FILE_NAME)) { + if (sourceFileName.equals(METADATA_FILE_NAME)) { return readPartitionedTableWithMetadata(sourceFile.getParentFile(), instructions); } - if (sourceFileName.equals(ParquetMetadataFileLayout.COMMON_METADATA_FILE_NAME)) { + if (sourceFileName.equals(COMMON_METADATA_FILE_NAME)) { return readPartitionedTableWithMetadata(sourceFile.getParentFile(), instructions); } throw new TableDataException( "Source file " + sourceFile + " does not appear to be a parquet file or metadata file"); } if (sourceAttr.isDirectory()) { - final Path metadataPath = sourcePath.resolve(ParquetMetadataFileLayout.METADATA_FILE_NAME); + final Path metadataPath = sourcePath.resolve(METADATA_FILE_NAME); if (Files.exists(metadataPath)) { return readPartitionedTableWithMetadata(sourceFile, instructions); } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetUtils.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetUtils.java new file mode 100644 index 00000000000..87932f18b54 --- /dev/null +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetUtils.java @@ -0,0 +1,18 @@ +package io.deephaven.parquet.table; + +import java.nio.file.Path; + +public final class ParquetUtils { + + public static final String METADATA_FILE_NAME = "_metadata"; + public static final String COMMON_METADATA_FILE_NAME = "_common_metadata"; + public static final String PARQUET_FILE_EXTENSION = ".parquet"; + + /** + * Used as a filter to select relevant parquet files while reading all files in a directory. + */ + public static boolean fileNameMatches(final Path path) { + final String fileName = path.getFileName().toString(); + return fileName.endsWith(PARQUET_FILE_EXTENSION) && fileName.charAt(0) != '.'; + } +} diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/PartitionFormatter.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/PartitionFormatter.java new file mode 100644 index 00000000000..ef50f74f0ba --- /dev/null +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/PartitionFormatter.java @@ -0,0 +1,134 @@ +package io.deephaven.parquet.table; + +import io.deephaven.time.DateTimeUtils; +import org.jetbrains.annotations.NotNull; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.util.HashMap; +import java.util.Map; + +/** + * This class takes an object read from a parquet file and formats it to a string, only for the supported types. + */ +enum PartitionFormatter { + ForString { + @Override + public String format(@NotNull final Object obj) { + return (String) obj; + } + }, + ForBoolean { + @Override + public String format(@NotNull final Object obj) { + return ((Boolean) obj).toString(); + } + }, + ForChar { + @Override + public String format(@NotNull final Object obj) { + return ((Character) obj).toString(); + } + }, + ForByte { + @Override + public String format(@NotNull final Object obj) { + return ((Byte) obj).toString(); + } + }, + ForShort { + @Override + public String format(@NotNull final Object obj) { + return ((Short) obj).toString(); + } + }, + ForInt { + @Override + public String format(@NotNull final Object obj) { + return ((Integer) obj).toString(); + } + }, + ForLong { + @Override + public String format(@NotNull final Object obj) { + return ((Long) obj).toString(); + } + }, + ForFloat { + @Override + public String format(@NotNull final Object obj) { + return ((Float) obj).toString(); + } + }, + ForDouble { + @Override + public String format(@NotNull final Object obj) { + return ((Double) obj).toString(); + } + }, + ForBigInteger { + @Override + public String format(@NotNull final Object obj) { + return ((BigInteger) obj).toString(); + } + }, + ForBigDecimal { + @Override + public String format(@NotNull final Object obj) { + return ((BigDecimal) obj).toString(); + } + }, + ForInstant { + @Override + public String format(@NotNull final Object obj) { + return ((Instant) obj).toString(); + } + }, + ForLocalDate { + @Override + public String format(@NotNull final Object obj) { + return DateTimeUtils.formatDate((LocalDate) obj); + } + }, + ForLocalTime { + @Override + public String format(@NotNull final Object obj) { + return ((LocalTime) obj).toString(); + } + }; + + private static final Map, PartitionFormatter> typeMap = new HashMap<>(); + static { + typeMap.put(String.class, ForString); + typeMap.put(Boolean.class, ForBoolean); + typeMap.put(Character.class, ForChar); + typeMap.put(Byte.class, ForByte); + typeMap.put(Short.class, ForShort); + typeMap.put(Integer.class, ForInt); + typeMap.put(Long.class, ForLong); + typeMap.put(Float.class, ForFloat); + typeMap.put(Double.class, ForDouble); + typeMap.put(BigInteger.class, ForBigInteger); + typeMap.put(BigDecimal.class, ForBigDecimal); + typeMap.put(Instant.class, ForInstant); + typeMap.put(LocalDate.class, ForLocalDate); + typeMap.put(LocalTime.class, ForLocalTime); + } + + abstract String format(@NotNull Object obj); + + static String formatToString(final Object obj) { + if (obj == null) { + return "null"; + } + final PartitionFormatter formatter = typeMap.get(obj.getClass()); + if (formatter != null) { + return formatter.format(obj); + } else { + throw new UnsupportedOperationException("Unsupported type: " + obj.getClass().getSimpleName()); + } + } +} diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetFileHelper.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetFileHelper.java deleted file mode 100644 index 86dbc5ad3fa..00000000000 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetFileHelper.java +++ /dev/null @@ -1,15 +0,0 @@ -package io.deephaven.parquet.table.layout; - -import io.deephaven.parquet.table.ParquetTableWriter; - -import java.nio.file.Path; - -final class ParquetFileHelper { - /** - * Used as a filter to select relevant parquet files while reading all files in a directory. - */ - static boolean fileNameMatches(final Path path) { - final String fileName = path.getFileName().toString(); - return fileName.endsWith(ParquetTableWriter.PARQUET_FILE_EXTENSION) && fileName.charAt(0) != '.'; - } -} diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetFlatPartitionedLayout.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetFlatPartitionedLayout.java index bbaefc5d971..6e2ef6c000c 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetFlatPartitionedLayout.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetFlatPartitionedLayout.java @@ -7,6 +7,7 @@ import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder; import io.deephaven.parquet.table.ParquetInstructions; import io.deephaven.parquet.table.location.ParquetTableLocationKey; +import io.deephaven.parquet.table.ParquetUtils; import org.jetbrains.annotations.NotNull; import java.io.File; @@ -49,7 +50,7 @@ public String toString() { @Override public synchronized void findKeys(@NotNull final Consumer locationKeyObserver) { try (final DirectoryStream parquetFileStream = - Files.newDirectoryStream(tableRootDirectory.toPath(), ParquetFileHelper::fileNameMatches)) { + Files.newDirectoryStream(tableRootDirectory.toPath(), ParquetUtils::fileNameMatches)) { for (final Path parquetFilePath : parquetFileStream) { ParquetTableLocationKey locationKey = cache.get(parquetFilePath); if (locationKey == null) { diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetKeyValuePartitionedLayout.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetKeyValuePartitionedLayout.java index b1ea92a4b70..2404a69fcd2 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetKeyValuePartitionedLayout.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetKeyValuePartitionedLayout.java @@ -10,6 +10,7 @@ import io.deephaven.engine.table.impl.locations.local.LocationTableBuilderDefinition; import io.deephaven.parquet.table.ParquetInstructions; import io.deephaven.parquet.table.location.ParquetTableLocationKey; +import io.deephaven.parquet.table.ParquetUtils; import org.jetbrains.annotations.NotNull; import java.io.File; @@ -27,7 +28,7 @@ public ParquetKeyValuePartitionedLayout( @NotNull final TableDefinition tableDefinition, @NotNull final ParquetInstructions readInstructions) { super(tableRootDirectory, - ParquetFileHelper::fileNameMatches, + ParquetUtils::fileNameMatches, () -> new LocationTableBuilderDefinition(tableDefinition), (path, partitions) -> new ParquetTableLocationKey(path.toFile(), 0, partitions, readInstructions), Math.toIntExact(tableDefinition.getColumnStream().filter(ColumnDefinition::isPartitioning).count())); @@ -38,7 +39,7 @@ public ParquetKeyValuePartitionedLayout( final int maxPartitioningLevels, @NotNull final ParquetInstructions readInstructions) { super(tableRootDirectory, - ParquetFileHelper::fileNameMatches, + ParquetUtils::fileNameMatches, () -> new LocationTableBuilderCsv(tableRootDirectory), (path, partitions) -> new ParquetTableLocationKey(path.toFile(), 0, partitions, readInstructions), maxPartitioningLevels); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java index a7ed9a754c3..75ffb95913e 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetMetadataFileLayout.java @@ -36,6 +36,8 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static io.deephaven.parquet.table.ParquetUtils.COMMON_METADATA_FILE_NAME; +import static io.deephaven.parquet.table.ParquetUtils.METADATA_FILE_NAME; import static java.util.stream.Collectors.toMap; /** @@ -55,9 +57,6 @@ */ public class ParquetMetadataFileLayout implements TableLocationKeyFinder { - public static final String METADATA_FILE_NAME = "_metadata"; - public static final String COMMON_METADATA_FILE_NAME = "_common_metadata"; - private final File metadataFile; private final File commonMetadataFile; diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java index f8f4a24556e..cdc6d3b5a02 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocationKey.java @@ -8,7 +8,6 @@ import io.deephaven.parquet.table.ParquetTools; import io.deephaven.engine.table.impl.locations.TableDataException; import io.deephaven.engine.table.impl.locations.TableLocationKey; -import io.deephaven.parquet.table.ParquetTableWriter; import io.deephaven.parquet.base.ParquetFileReader; import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.format.RowGroup; @@ -23,6 +22,8 @@ import java.util.Map; import java.util.stream.IntStream; +import static io.deephaven.parquet.table.ParquetUtils.PARQUET_FILE_EXTENSION; + /** * {@link TableLocationKey} implementation for use with data stored in the parquet format. */ @@ -74,8 +75,8 @@ private static URI validateParquetFile(@NotNull final File file) { } private static URI validateParquetFile(@NotNull final URI parquetFileUri) { - if (!parquetFileUri.getRawPath().endsWith(ParquetTableWriter.PARQUET_FILE_EXTENSION)) { - throw new IllegalArgumentException("Parquet file must end in " + ParquetTableWriter.PARQUET_FILE_EXTENSION); + if (!parquetFileUri.getRawPath().endsWith(PARQUET_FILE_EXTENSION)) { + throw new IllegalArgumentException("Parquet file must end in " + PARQUET_FILE_EXTENSION); } return parquetFileUri; } diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index e10c0ee2816..98f59d81b9b 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -34,6 +34,7 @@ import io.deephaven.parquet.table.pagestore.ColumnChunkPageStore; import io.deephaven.parquet.table.transfer.StringDictionary; import io.deephaven.extensions.s3.S3Instructions; +import io.deephaven.qst.type.Type; import io.deephaven.stringset.ArrayStringSet; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; @@ -71,6 +72,8 @@ import java.net.URI; import java.time.Duration; import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -598,21 +601,31 @@ public void writeKeyValuePartitionedDataWithIntegerPartitionsTest() { public void writeKeyValuePartitionedDataWithMixedPartitionsTest() { final TableDefinition definition = TableDefinition.of( ColumnDefinition.ofInt("PC1").withPartitioning(), - ColumnDefinition.ofChar("PC2").withPartitioning(), - ColumnDefinition.ofString("PC3").withPartitioning(), - ColumnDefinition.ofLong("I")); + ColumnDefinition.ofChar("PC2"), + ColumnDefinition.ofString("PC3"), + ColumnDefinition.ofLong("II"), + ColumnDefinition.ofInt("I")); final Table inputData = ((QueryTable) TableTools.emptyTable(10) .updateView("PC1 = (int)(ii%3)", "PC2 = (char)(65 + (ii % 2))", "PC3 = java.time.LocalDate.ofEpochDay(i%2).toString()", - "I = ii")) + "II = ii", + "I = i")) .withDefinitionUnsafe(definition); + // We skip one column in the definition, and add some more partitioning and non-partitioning columns + final TableDefinition tableDefinitionToWrite = TableDefinition.of( + ColumnDefinition.ofInt("PC1").withPartitioning(), + ColumnDefinition.ofChar("PC2").withPartitioning(), + ColumnDefinition.ofString("PC3").withPartitioning(), + ColumnDefinition.ofInt("I"), + ColumnDefinition.ofInt("J")); + final File parentDir = new File(rootFile, "writeKeyValuePartitionedDataTest"); final ParquetInstructions writeInstructions = ParquetInstructions.builder() .setMetadataRootDir(parentDir.getAbsolutePath()) .build(); - writeKeyValuePartitionedTable(inputData, parentDir, "data", writeInstructions); + writeKeyValuePartitionedTable(inputData, tableDefinitionToWrite, parentDir, "data", writeInstructions); // Verify that the partitioned data exists for (int PC1 = 0; PC1 <= 2; PC1++) { @@ -626,18 +639,21 @@ public void writeKeyValuePartitionedDataWithMixedPartitionsTest() { assertTrue(dataFile.exists() && dataFile.isFile()); } } - Table fromDisk = readKeyValuePartitionedTable(parentDir, EMPTY); - assertTableEquals(inputData.sort("PC1", "PC2"), fromDisk.sort("PC1", "PC2")); + + // Give then updated table definition used to write the data, we drop the column "II" and add a new column "J" + final Table expected = inputData.dropColumns("II").updateView("J = (int)null"); + final Table fromDisk = readKeyValuePartitionedTable(parentDir, EMPTY); + assertTableEquals(expected.sort("PC1", "PC2"), fromDisk.sort("PC1", "PC2")); final File commonMetadata = new File(parentDir, "_common_metadata"); final Table fromDiskWithMetadata = readTable(commonMetadata); - assertTableEquals(inputData.sort("PC1", "PC2"), fromDiskWithMetadata.sort("PC1", "PC2")); + assertTableEquals(expected.sort("PC1", "PC2"), fromDiskWithMetadata.sort("PC1", "PC2")); // Delete some files from the partitioned data and read the required rows to verify that we only read the // required partitions FileUtils.deleteRecursivelyOnNFS(new File(parentDir, "PC1=0")); FileUtils.deleteRecursivelyOnNFS(new File(parentDir, "PC1=1")); - assertTableEquals(inputData.where("PC1 == 2").sort("PC1", "PC2", "PC3"), + assertTableEquals(expected.where("PC1 == 2").sort("PC1", "PC2", "PC3"), readTable(commonMetadata).where("PC1 == 2").sort("PC1", "PC2", "PC3")); } @@ -677,6 +693,50 @@ public void someMoreKeyValuePartitionedTestsWithComplexKeys() { fromDiskWithMetadata.sort("symbol", "epic_collection_id")); } + @Test + public void testAllPartitioningColumnTypes() { + final TableDefinition definition = TableDefinition.of( + ColumnDefinition.ofString("PC1").withPartitioning(), + ColumnDefinition.ofBoolean("PC2").withPartitioning(), + ColumnDefinition.ofChar("PC3").withPartitioning(), + ColumnDefinition.ofByte("PC4").withPartitioning(), + ColumnDefinition.ofShort("PC5").withPartitioning(), + ColumnDefinition.ofInt("PC6").withPartitioning(), + ColumnDefinition.ofLong("PC7").withPartitioning(), + ColumnDefinition.ofFloat("PC8").withPartitioning(), + ColumnDefinition.ofDouble("PC9").withPartitioning(), + ColumnDefinition.of("PC10", Type.find(BigInteger.class)).withPartitioning(), + ColumnDefinition.of("PC11", Type.find(BigDecimal.class)).withPartitioning(), + ColumnDefinition.of("PC12", Type.find(Instant.class)).withPartitioning(), + ColumnDefinition.of("PC13", Type.find(LocalDate.class)).withPartitioning(), + ColumnDefinition.of("PC14", Type.find(LocalTime.class)).withPartitioning(), + ColumnDefinition.ofInt("data")); + + final Table inputData = ((QueryTable) TableTools.emptyTable(10).updateView("PC1 = (ii%2 == 0) ? `AA` : `BB`", + "PC2 = (ii % 2 == 0)", + "PC3 = (char)(65 + (ii % 2))", + "PC4 = (byte)(ii % 2)", + "PC5 = (short)(ii % 2)", + "PC6 = (int)(ii%3)", + "PC7 = (long)(ii%2)", + "PC8 = (float)(ii % 2)", + "PC9 = (double)(ii % 2)", + "PC10 = java.math.BigInteger.valueOf(ii)", + "PC11 = java.math.BigDecimal.valueOf(ii)", + "PC12 = java.time.Instant.ofEpochSecond(ii)", + "PC13 = java.time.LocalDate.ofEpochDay(ii)", + "PC14 = java.time.LocalTime.of(i%24, i%60, (i+10)%60)", + "data = (int)(ii)")) + .withDefinitionUnsafe(definition); + + final File parentDir = new File(rootFile, "testAllPartitioningColumnTypes"); + final ParquetInstructions writeInstructions = ParquetInstructions.builder() + .setMetadataRootDir(parentDir.getAbsolutePath()) + .build(); + writeKeyValuePartitionedTable(inputData, parentDir, "data", writeInstructions); + readKeyValuePartitionedTable(parentDir, EMPTY).select(); + } + @Test public void testVectorColumns() { final Table table = getTableFlat(20000, true, false);