From df2733c61c27a8664f0318620e378bdb316e1209 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Mon, 10 Jun 2024 13:12:16 -0500 Subject: [PATCH] `table_definition` will be optional for parquet `batch_write` (#5586) --- .../deephaven/parquet/table/ParquetTools.java | 40 ++++++--- .../table/ParquetTableReadWriteTest.java | 85 +++++++++++++++++-- py/server/deephaven/parquet.py | 9 +- py/server/tests/test_parquet.py | 32 +++++-- 4 files changed, 136 insertions(+), 30 deletions(-) diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java index 1a7fc53b2ac..bf2c227828e 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java @@ -851,8 +851,9 @@ private static Collection> indexedColumnNames(@NotNull final Table /** * Write out tables to disk. Data indexes to write are determined by those already present on the first source or - * those provided through {@link ParquetInstructions.Builder#addIndexColumns}. The {@link TableDefinition} to use - * for writing must be provided as part of {@link ParquetInstructions}. + * those provided through {@link ParquetInstructions.Builder#addIndexColumns}. If all source tables have the same + * definition, this method will use the common definition for writing. Else, a definition must be provided through + * the {@code writeInstructions}. * * @param sources The tables to write * @param destinations The destination paths or URIs. Any non-existing directories in the paths provided are @@ -864,19 +865,35 @@ public static void writeTables( @NotNull final Table[] sources, @NotNull final String[] destinations, @NotNull final ParquetInstructions writeInstructions) { - final Collection> indexColumns = - writeInstructions.getIndexColumns().orElseGet(() -> indexedColumnNames(sources)); - final TableDefinition definition = writeInstructions.getTableDefinition().orElseThrow( - () -> new IllegalArgumentException("Table definition must be provided")); + if (sources.length == 0) { + throw new IllegalArgumentException("No source tables provided for writing"); + } + if (sources.length != destinations.length) { + throw new IllegalArgumentException("Number of sources and destinations must match"); + } + final TableDefinition definition; + if (writeInstructions.getTableDefinition().isPresent()) { + definition = writeInstructions.getTableDefinition().get(); + } else { + final TableDefinition firstDefinition = sources[0].getDefinition(); + for (int idx = 1; idx < sources.length; idx++) { + if (!firstDefinition.equals(sources[idx].getDefinition())) { + throw new IllegalArgumentException( + "Table definition must be provided when writing multiple tables " + + "with different definitions"); + } + } + definition = firstDefinition; + } final File[] destinationFiles = new File[destinations.length]; - for (int i = 0; i < destinations.length; i++) { - final URI destinationURI = convertToURI(destinations[i], false); + for (int idx = 0; idx < destinations.length; idx++) { + final URI destinationURI = convertToURI(destinations[idx], false); if (!FILE_URI_SCHEME.equals(destinationURI.getScheme())) { throw new IllegalArgumentException( "Only file URI scheme is supported for writing parquet files, found" + - "non-file URI: " + destinations[i]); + "non-file URI: " + destinations[idx]); } - destinationFiles[i] = new File(destinationURI); + destinationFiles[idx] = new File(destinationURI); } final File metadataRootDir; if (writeInstructions.generateMetadataFiles()) { @@ -893,7 +910,8 @@ public static void writeTables( } else { metadataRootDir = null; } - + final Collection> indexColumns = + writeInstructions.getIndexColumns().orElseGet(() -> indexedColumnNames(sources)); final Map> computedCache = buildComputedCache(() -> PartitionedTableFactory.ofTables(definition, sources).merge(), definition); // We do not have any additional schema for partitioning columns in this case. Schema for all columns will be diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index eca5a174830..cce58dfadaf 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -711,7 +711,7 @@ public void flatPartitionedParquetWithMetadataTest() throws IOException { // Write without any metadata files writeTables(new Table[] {someTable, someTable}, new String[] {firstDataFile.getPath(), secondDataFile.getPath()}, - ParquetInstructions.EMPTY.withTableDefinition(someTable.getDefinition())); + ParquetInstructions.EMPTY); final Table source = readTable(parentDir.getPath()).select(); // Now write with metadata files @@ -722,7 +722,7 @@ public void flatPartitionedParquetWithMetadataTest() throws IOException { .build(); writeTables(new Table[] {someTable, someTable}, new String[] {firstDataFile.getPath(), secondDataFile.getPath()}, - writeInstructions.withTableDefinition(someTable.getDefinition())); + writeInstructions); final Table fromDisk = readTable(parentDir.getPath()); assertTableEquals(source, fromDisk); @@ -753,7 +753,7 @@ public void flatPartitionedParquetWithMetadataTest() throws IOException { try { writeTables(new Table[] {someTable, someTable}, new String[] {firstDataFile.getPath(), updatedSecondDataFile.getPath()}, - writeInstructions.withTableDefinition(someTable.getDefinition())); + writeInstructions); fail("Expected exception when writing the metadata files for tables with different parent directories"); } catch (final RuntimeException expected) { } @@ -774,8 +774,7 @@ public void flatPartitionedParquetWithBigDecimalMetadataTest() throws IOExceptio .setGenerateMetadataFiles(true) .build(); final Table[] sources = new Table[] {firstTable, secondTable}; - writeTables(sources, new String[] {firstDataFile.getPath(), secondDataFile.getPath()}, - writeInstructions.withTableDefinition(firstTable.getDefinition())); + writeTables(sources, new String[] {firstDataFile.getPath(), secondDataFile.getPath()}, writeInstructions); // Merge the tables and compute the precision and scale as per the union of the two tables final Table expected = @@ -1782,7 +1781,7 @@ private interface TestParquetTableWriter { (sourceTable, destFile) -> writeTable(sourceTable, destFile.getPath()); private static final TestParquetTableWriter MULTI_WRITER = (table, destFile) -> writeTables(new Table[] {table}, new String[] {destFile.getPath()}, - ParquetInstructions.EMPTY.withTableDefinition(table.getDefinition())); + ParquetInstructions.EMPTY); /** * Verify that the parent directory contains the expected parquet files and index files in the right directory @@ -2014,6 +2013,77 @@ public void writeMultiTableExceptionTest() { assertEquals(0, parentDir.list().length); } + @Test + public void writeMultiTableDefinitionTest() { + // Create an empty parent directory + final File parentDir = new File(rootFile, "tempDir"); + parentDir.mkdir(); + + final int numRows = 5; + final Table firstTable = TableTools.emptyTable(numRows) + .updateView("A = Long.toString(ii)", "B=(long)ii"); + final File firstDestFile = new File(parentDir, "firstTable.parquet"); + + final Table secondTable = TableTools.emptyTable(numRows) + .updateView("A = Long.toString(ii*5)", "B=(long)(ii*5)"); + final File secondDestFile = new File(parentDir, "secondTable.parquet"); + + final Table[] tablesToSave = new Table[] {firstTable, secondTable}; + final String[] destinations = new String[] {firstDestFile.getPath(), secondDestFile.getPath()}; + + try { + writeTables(tablesToSave, new String[] {firstDestFile.getPath()}, + ParquetInstructions.EMPTY.withTableDefinition(firstTable.getDefinition())); + TestCase.fail("Exception expected becuase of mismatch in number of tables and destinations"); + } catch (final IllegalArgumentException expected) { + } + + // Writing a single table without definition should work + writeTables(new Table[] {firstTable}, new String[] {firstDestFile.getPath()}, ParquetInstructions.EMPTY); + checkSingleTable(firstTable, firstDestFile); + assertTrue(firstDestFile.delete()); + + // Writing a single table with definition should work + writeTables(new Table[] {firstTable}, new String[] {firstDestFile.getPath()}, + ParquetInstructions.EMPTY.withTableDefinition(firstTable.view("A").getDefinition())); + checkSingleTable(firstTable.view("A"), firstDestFile); + assertTrue(firstDestFile.delete()); + + // Writing multiple tables which have the same definition should work + writeTables(tablesToSave, destinations, ParquetInstructions.EMPTY); + checkSingleTable(firstTable, firstDestFile); + checkSingleTable(secondTable, secondDestFile); + assertTrue(firstDestFile.delete()); + assertTrue(secondDestFile.delete()); + + // Writing multiple tables which have the different definition should not work + final Table thirdTable = TableTools.emptyTable(numRows) + .updateView("A = Long.toString(ii*10)", "B=(int)(ii*10)"); + final File thirdDestFile = new File(parentDir, "thirdTable.parquet"); + try { + writeTables(new Table[] {firstTable, thirdTable}, + new String[] {firstDestFile.getPath(), thirdDestFile.getPath()}, + ParquetInstructions.EMPTY); + TestCase.fail("Exception expected becuase of mismatch in table definitions"); + } catch (final IllegalArgumentException expected) { + } + + // Taking view with same definition should work + writeTables(new Table[] {firstTable.view("A"), thirdTable.view("A")}, + new String[] {firstDestFile.getPath(), thirdDestFile.getPath()}, ParquetInstructions.EMPTY); + checkSingleTable(firstTable.view("A"), firstDestFile); + checkSingleTable(thirdTable.view("A"), thirdDestFile); + assertTrue(firstDestFile.delete()); + assertTrue(thirdDestFile.delete()); + + // Providing a definition should work + writeTables(new Table[] {firstTable, thirdTable}, + new String[] {firstDestFile.getPath(), thirdDestFile.getPath()}, + ParquetInstructions.EMPTY.withTableDefinition(firstTable.view("A").getDefinition())); + checkSingleTable(firstTable.view("A"), firstDestFile); + checkSingleTable(thirdTable.view("A"), thirdDestFile); + } + @Test public void writingParquetFilesWithSpacesInName() { final String parentDirName = "tempDir"; @@ -2287,8 +2357,7 @@ public void writeMultiTableIndexTest() { Table[] tablesToSave = new Table[] {firstTable, secondTable}; final String[] destinations = new String[] {firstDestFile.getPath(), secondDestFile.getPath()}; - writeTables(tablesToSave, destinations, - ParquetInstructions.EMPTY.withTableDefinition(firstTable.getDefinition())); + writeTables(tablesToSave, destinations, ParquetInstructions.EMPTY); String firstIndexFilePath = ".dh_metadata/indexes/vvv/index_vvv_firstTable.parquet"; String secondIndexFilePath = ".dh_metadata/indexes/vvv/index_vvv_secondTable.parquet"; diff --git a/py/server/deephaven/parquet.py b/py/server/deephaven/parquet.py index 4a847b8e08e..76c9dd86735 100644 --- a/py/server/deephaven/parquet.py +++ b/py/server/deephaven/parquet.py @@ -397,7 +397,7 @@ def write_partitioned( def batch_write( tables: List[Table], paths: List[str], - table_definition: Union[Dict[str, DType], List[Column]], + table_definition: Optional[Union[Dict[str, DType], List[Column]]] = None, col_instructions: Optional[List[ColumnInstruction]] = None, compression_codec_name: Optional[str] = None, max_dictionary_keys: Optional[int] = None, @@ -415,9 +415,10 @@ def batch_write( paths (List[str]): the destination paths. Any non-existing directories in the paths provided are created. If there is an error, any intermediate directories previously created are removed; note this makes this method unsafe for concurrent use - table_definition (Union[Dict[str, DType], List[Column]]): the table definition to use for writing, instead of - the definitions implied by the tables. This definition can be used to skip some columns or add additional - columns with null values. + table_definition (Optional[Union[Dict[str, DType], List[Column]]]): the table definition to use for writing. + This definition can be used to skip some columns or add additional columns with null values. Default is + None, which means if all tables have the same definition, use the common table definition implied by the + tables. Otherwise, this parameter must be specified. col_instructions (Optional[List[ColumnInstruction]]): instructions for customizations while writing compression_codec_name (Optional[str]): the compression codec to use. Allowed values include "UNCOMPRESSED", "SNAPPY", "GZIP", "LZO", "LZ4", "LZ4_RAW", "ZSTD", etc. If not specified, defaults to "SNAPPY". diff --git a/py/server/tests/test_parquet.py b/py/server/tests/test_parquet.py index 3fc54dd20ac..5275821061c 100644 --- a/py/server/tests/test_parquet.py +++ b/py/server/tests/test_parquet.py @@ -688,13 +688,7 @@ def test_write_with_index_columns(self): shutil.rmtree(".dh_metadata") second_table = empty_table(10).update(formulas=["x=i*5", "y=(double)(i/5.0)", "z=(double)(i*i*i)"]) - table_definition = { - "x": dtypes.int32, - "y": dtypes.double, - "z": dtypes.double, - } - batch_write([first_table, second_table], ["X.parquet", "Y.parquet"], index_columns=[["x"], ["y", "z"]], - table_definition=table_definition) + batch_write([first_table, second_table], ["X.parquet", "Y.parquet"], index_columns=[["x"], ["y", "z"]]) from_disk_first_table = read("X.parquet") self.assert_table_equals(first_table, from_disk_first_table) from_disk_second_table = read("Y.parquet") @@ -752,5 +746,29 @@ def test_v2_pages_helper(dh_table): dh_table2 = self.get_table_with_array_data() test_v2_pages_helper(dh_table2) + def test_batch_write_definition_handling(self): + table = empty_table(3).update( + formulas=["x=i", "y=(double)(i/10.0)", "z=(double)(i*i)"] + ) + table2 = empty_table(3).update( + formulas=["x=i*2", "y=(double)(i/5.0)", "z=(double)(i*i*i)"] + ) + # Should succeed because both tables have the same definition + batch_write([table, table2], ["X.parquet", "Y.parquet"]) + self.assert_table_equals(read("X.parquet"), table) + self.assert_table_equals(read("Y.parquet"), table2) + + table_definition = { + "x": dtypes.int32, + "y": dtypes.double, + } + batch_write([table, table2], ["X.parquet", "Y.parquet"], table_definition=table_definition) + self.assert_table_equals(read("X.parquet"), table.view(["x", "y"])) + self.assert_table_equals(read("Y.parquet"), table2.view(["x", "y"])) + + # Fails because we don't provide a table definition and the tables have different definition + with self.assertRaises(DHError): + batch_write([table, table2.view(["x", "y"])],["X.parquet", "Y.parquet"]) + if __name__ == '__main__': unittest.main()