Skip to content

Commit

Permalink
Improved handling of parquet writes (deephaven#4212)
Browse files Browse the repository at this point in the history
Parquet writes will be done in a temporary file path and then swapped out at the final path once the write completes.
  • Loading branch information
malhotrashivam authored Aug 10, 2023
1 parent c272a26 commit 57b429e
Show file tree
Hide file tree
Showing 5 changed files with 627 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,17 +183,14 @@ public void timedOut() {
}
}

private class CloseRecorder implements Runnable {

private final AtomicBoolean reclaimed = new AtomicBoolean(false);

private class CloseRecorder extends AtomicBoolean implements Runnable {
private CloseRecorder() {
size.incrementAndGet();
}

@Override
public void run() {
if (reclaimed.compareAndSet(false, true)) {
if (compareAndSet(false, true)) {
size.decrementAndGet();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,54 +98,32 @@ interface TransferObject<B> extends SafeCloseable {
void fetchData(RowSequence rs);
}

private static String minusParquetSuffix(@NotNull final String s) {
if (s.endsWith(PARQUET_FILE_EXTENSION)) {
return s.substring(0, s.length() - PARQUET_FILE_EXTENSION.length());
}
return s;
}

public static Function<String, String> defaultGroupingFileName(@NotNull final String path) {
final String prefix = minusParquetSuffix(path);
return columnName -> prefix + "_" + columnName + "_grouping.parquet";
}

/**
* Writes a table in parquet format under a given path
*
* @param t The table to write
* @param path The destination path
* @param incomingMeta A map of metadata values to be stores in the file footer
* @param groupingPathFactory a factory to construct paths for grouping tables.
* @param groupingColumns List of columns the tables are grouped by (the write operation will store the grouping
* info)
* @throws SchemaMappingException Error creating a parquet table schema for the given table (likely due to
* unsupported types)
* @throws IOException For file writing related errors
* Helper struct used to pass information about where to write the grouping files for each grouping column
*/
public static void write(
@NotNull final Table t,
@NotNull final String path,
@NotNull final Map<String, String> incomingMeta,
@NotNull final Function<String, String> groupingPathFactory,
@NotNull final String... groupingColumns) throws SchemaMappingException, IOException {
write(t, t.getDefinition(), ParquetInstructions.EMPTY, path, incomingMeta, groupingPathFactory,
groupingColumns);
}
public static class GroupingColumnWritingInfo {
/**
* Parquet name of this grouping column
*/
public final String parquetColumnName;
/**
* File path to be added in the grouping metadata of main parquet file
*/
public final File metadataFilePath;

/**
* Writes a table in parquet format under a given path
*
* @param t the table to write
* @param path the destination path
* @param incomingMeta any metadata to include in the parquet metadata
* @param groupingColumns the grouping columns (if any)
*/
public static void write(@NotNull final Table t,
@NotNull final String path,
@NotNull final Map<String, String> incomingMeta,
@NotNull final String... groupingColumns) throws SchemaMappingException, IOException {
write(t, path, incomingMeta, defaultGroupingFileName(path), groupingColumns);
/**
* Destination path for writing the grouping file. The two filenames can differ because we write grouping files
* to shadow file paths first and then place them at the final path once the write is complete. But the metadata
* should always hold the accurate path.
*/
public final File destFile;

public GroupingColumnWritingInfo(final String parquetColumnName, final File metadataFilePath,
final File destFile) {
this.parquetColumnName = parquetColumnName;
this.metadataFilePath = metadataFilePath;
this.destFile = destFile;
}
}

/**
Expand All @@ -156,9 +134,6 @@ public static void write(@NotNull final Table t,
* @param writeInstructions Write instructions for customizations while writing
* @param destPathName The destination path
* @param incomingMeta A map of metadata values to be stores in the file footer
* @param groupingPathFactory a factory for constructing paths for grouping tables
* @param groupingColumns List of columns the tables are grouped by (the write operation will store the grouping
* info)
* @throws SchemaMappingException Error creating a parquet table schema for the given table (likely due to
* unsupported types)
* @throws IOException For file writing related errors
Expand All @@ -169,36 +144,34 @@ public static void write(
@NotNull final ParquetInstructions writeInstructions,
@NotNull final String destPathName,
@NotNull final Map<String, String> incomingMeta,
@NotNull final Function<String, String> groupingPathFactory,
@NotNull final String... groupingColumns) throws SchemaMappingException, IOException {
final Map<String, GroupingColumnWritingInfo> groupingColumnsWritingInfoMap)
throws SchemaMappingException, IOException {
final TableInfo.Builder tableInfoBuilder = TableInfo.builder();
ArrayList<String> cleanupPaths = null;
List<File> cleanupFiles = null;
try {
if (groupingColumns.length > 0) {
cleanupPaths = new ArrayList<>(groupingColumns.length);
final Table[] auxiliaryTables = Arrays.stream(groupingColumns)
.map(columnName -> groupingAsTable(t, columnName))
.toArray(Table[]::new);

if (groupingColumnsWritingInfoMap != null) {
cleanupFiles = new ArrayList<>(groupingColumnsWritingInfoMap.size());
final Path destDirPath = Paths.get(destPathName).getParent();
for (int gci = 0; gci < auxiliaryTables.length; ++gci) {
final String parquetColumnName =
writeInstructions.getParquetColumnNameFromColumnNameOrDefault(groupingColumns[gci]);
final String groupingPath = groupingPathFactory.apply(parquetColumnName);
cleanupPaths.add(groupingPath);
for (Map.Entry<String, GroupingColumnWritingInfo> entry : groupingColumnsWritingInfoMap.entrySet()) {
final String groupingColumnName = entry.getKey();
final Table auxiliaryTable = groupingAsTable(t, groupingColumnName);
final String parquetColumnName = entry.getValue().parquetColumnName;
final File metadataFilePath = entry.getValue().metadataFilePath;
final File groupingDestFile = entry.getValue().destFile;
cleanupFiles.add(groupingDestFile);
tableInfoBuilder.addGroupingColumns(GroupingColumnInfo.of(parquetColumnName,
destDirPath.relativize(Paths.get(groupingPath)).toString()));
write(auxiliaryTables[gci], auxiliaryTables[gci].getDefinition(), writeInstructions, groupingPath,
Collections.emptyMap());
destDirPath.relativize(metadataFilePath.toPath()).toString()));
write(auxiliaryTable, auxiliaryTable.getDefinition(), writeInstructions,
groupingDestFile.getAbsolutePath(), Collections.emptyMap(), TableInfo.builder());
}
}
write(t, definition, writeInstructions, destPathName, incomingMeta, tableInfoBuilder);
} catch (Exception e) {
if (cleanupPaths != null) {
for (final String cleanupPath : cleanupPaths) {
if (cleanupFiles != null) {
for (final File cleanupFile : cleanupFiles) {
try {
// noinspection ResultOfMethodCallIgnored
new File(cleanupPath).delete();
cleanupFile.delete();
} catch (Exception ignored) {
}
}
Expand All @@ -207,26 +180,6 @@ public static void write(
}
}

/**
* Writes a table in parquet format under a given path
*
* @param t the table to write
* @param definition the writable table definition
* @param writeInstructions the parquet instructions for writing
* @param path the path to write to
* @param incomingMeta any metadata to include ih the parquet metadata
* @param groupingColumns the grouping columns (if any)
*/
public static void write(
@NotNull final Table t,
@NotNull final TableDefinition definition,
@NotNull final ParquetInstructions writeInstructions,
@NotNull final String path,
@NotNull final Map<String, String> incomingMeta,
@NotNull final String... groupingColumns) throws SchemaMappingException, IOException {
write(t, definition, writeInstructions, path, incomingMeta, defaultGroupingFileName(path), groupingColumns);
}

/**
* Writes a table in parquet format under a given path
*
Expand Down
Loading

0 comments on commit 57b429e

Please sign in to comment.