Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Review with Ryan part 1
Browse files Browse the repository at this point in the history
malhotrashivam committed Mar 4, 2024
1 parent e584343 commit f659f3c
Showing 12 changed files with 369 additions and 90 deletions.
Original file line number Diff line number Diff line change
@@ -8,11 +8,12 @@
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.parquet.table.ParquetTools;
import io.deephaven.engine.util.TableTools;
import io.deephaven.parquet.table.ParquetTableWriter;
import io.deephaven.engine.table.impl.util.TableBuilder;
import io.deephaven.benchmarking.BenchmarkTools;
import org.openjdk.jmh.infra.BenchmarkParams;

import static io.deephaven.parquet.table.ParquetUtils.PARQUET_FILE_EXTENSION;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
@@ -44,7 +45,7 @@ public void init() {

public void logOutput() throws IOException {
final Path outputPath = BenchmarkTools.dataDir()
.resolve(BenchmarkTools.getDetailOutputPath(benchmarkName) + ParquetTableWriter.PARQUET_FILE_EXTENSION);
.resolve(BenchmarkTools.getDetailOutputPath(benchmarkName) + PARQUET_FILE_EXTENSION);

final Table output = outputBuilder.build();
ParquetTools.writeTable(output, outputPath.toFile(), RESULT_DEF);
Original file line number Diff line number Diff line change
@@ -37,7 +37,6 @@
import io.deephaven.engine.util.TableTools;
import io.deephaven.engine.util.systemicmarking.SystemicObjectTracker;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.parquet.table.ParquetTableWriter;
import io.deephaven.parquet.table.ParquetTools;
import io.deephaven.parquet.table.layout.ParquetKeyValuePartitionedLayout;
import io.deephaven.test.types.OutOfBandTest;
@@ -75,6 +74,7 @@
import static io.deephaven.util.QueryConstants.*;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.*;
import static io.deephaven.parquet.table.ParquetUtils.PARQUET_FILE_EXTENSION;

@Category(OutOfBandTest.class)
public class QueryTableAggregationTest {
@@ -3957,7 +3957,7 @@ private Table makeDiskTable(File directory) throws IOException {
final TableDefaults result = testTable(stringCol("Symbol", syms),
intCol("Value", values));

final File outputFile = new File(directory, "disk_table" + ParquetTableWriter.PARQUET_FILE_EXTENSION);
final File outputFile = new File(directory, "disk_table" + PARQUET_FILE_EXTENSION);

ParquetTools.writeTable(result, outputFile, result.getDefinition());

Original file line number Diff line number Diff line change
@@ -57,7 +57,6 @@ public class ParquetTableWriter {
public static final String BEGIN_POS = "dh_begin_pos";
public static final String END_POS = "dh_end_pos";
public static final String GROUPING_KEY = "dh_key";
public static final String PARQUET_FILE_EXTENSION = ".parquet";

/**
* Helper struct used to pass information about where to write the grouping files for each grouping column
@@ -234,14 +233,15 @@ private static void write(
* Get the parquet schema for a table
*
* @param table the input table
* @param definition the table definition
* @param definition the definition to use for creating the schema
* @param instructions write instructions for the file
* @return the parquet schema
*/
static MessageType getSchemaForTable(@NotNull final Table table,
@NotNull final TableDefinition definition,
@NotNull final ParquetInstructions instructions) {
final Table pretransformTable = pretransformTable(table, table.getDefinition());
return MappedSchema.create(new HashMap<>(), table.getDefinition(), pretransformTable.getRowSet(),
final Table pretransformTable = pretransformTable(table, definition);
return MappedSchema.create(new HashMap<>(), definition, pretransformTable.getRowSet(),
pretransformTable.getColumnSourceMap(), instructions).getParquetSchema();
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.deephaven.parquet.table;

import java.nio.file.Path;

public final class ParquetUtils {

public static final String METADATA_FILE_NAME = "_metadata";
public static final String COMMON_METADATA_FILE_NAME = "_common_metadata";
public static final String PARQUET_FILE_EXTENSION = ".parquet";

/**
* Used as a filter to select relevant parquet files while reading all files in a directory.
*/
public static boolean fileNameMatches(final Path path) {
final String fileName = path.getFileName().toString();
return fileName.endsWith(PARQUET_FILE_EXTENSION) && fileName.charAt(0) != '.';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package io.deephaven.parquet.table;

import io.deephaven.time.DateTimeUtils;
import org.jetbrains.annotations.NotNull;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.HashMap;
import java.util.Map;

/**
* This class takes an object read from a parquet file and formats it to a string, only for the supported types.
*/
enum PartitionFormatter {
ForString {
@Override
public String format(@NotNull final Object obj) {
return (String) obj;
}
},
ForBoolean {
@Override
public String format(@NotNull final Object obj) {
return ((Boolean) obj).toString();
}
},
ForChar {
@Override
public String format(@NotNull final Object obj) {
return ((Character) obj).toString();
}
},
ForByte {
@Override
public String format(@NotNull final Object obj) {
return ((Byte) obj).toString();
}
},
ForShort {
@Override
public String format(@NotNull final Object obj) {
return ((Short) obj).toString();
}
},
ForInt {
@Override
public String format(@NotNull final Object obj) {
return ((Integer) obj).toString();
}
},
ForLong {
@Override
public String format(@NotNull final Object obj) {
return ((Long) obj).toString();
}
},
ForFloat {
@Override
public String format(@NotNull final Object obj) {
return ((Float) obj).toString();
}
},
ForDouble {
@Override
public String format(@NotNull final Object obj) {
return ((Double) obj).toString();
}
},
ForBigInteger {
@Override
public String format(@NotNull final Object obj) {
return ((BigInteger) obj).toString();
}
},
ForBigDecimal {
@Override
public String format(@NotNull final Object obj) {
return ((BigDecimal) obj).toString();
}
},
ForInstant {
@Override
public String format(@NotNull final Object obj) {
return ((Instant) obj).toString();
}
},
ForLocalDate {
@Override
public String format(@NotNull final Object obj) {
return DateTimeUtils.formatDate((LocalDate) obj);
}
},
ForLocalTime {
@Override
public String format(@NotNull final Object obj) {
return ((LocalTime) obj).toString();
}
};

private static final Map<Class<?>, PartitionFormatter> typeMap = new HashMap<>();
static {
typeMap.put(String.class, ForString);
typeMap.put(Boolean.class, ForBoolean);
typeMap.put(Character.class, ForChar);
typeMap.put(Byte.class, ForByte);
typeMap.put(Short.class, ForShort);
typeMap.put(Integer.class, ForInt);
typeMap.put(Long.class, ForLong);
typeMap.put(Float.class, ForFloat);
typeMap.put(Double.class, ForDouble);
typeMap.put(BigInteger.class, ForBigInteger);
typeMap.put(BigDecimal.class, ForBigDecimal);
typeMap.put(Instant.class, ForInstant);
typeMap.put(LocalDate.class, ForLocalDate);
typeMap.put(LocalTime.class, ForLocalTime);
}

abstract String format(@NotNull Object obj);

static String formatToString(final Object obj) {
if (obj == null) {
return "null";
}
final PartitionFormatter formatter = typeMap.get(obj.getClass());
if (formatter != null) {
return formatter.format(obj);
} else {
throw new UnsupportedOperationException("Unsupported type: " + obj.getClass().getSimpleName());
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@
import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.parquet.table.location.ParquetTableLocationKey;
import io.deephaven.parquet.table.ParquetUtils;
import org.jetbrains.annotations.NotNull;

import java.io.File;
@@ -49,7 +50,7 @@ public String toString() {
@Override
public synchronized void findKeys(@NotNull final Consumer<ParquetTableLocationKey> locationKeyObserver) {
try (final DirectoryStream<Path> parquetFileStream =
Files.newDirectoryStream(tableRootDirectory.toPath(), ParquetFileHelper::fileNameMatches)) {
Files.newDirectoryStream(tableRootDirectory.toPath(), ParquetUtils::fileNameMatches)) {
for (final Path parquetFilePath : parquetFileStream) {
ParquetTableLocationKey locationKey = cache.get(parquetFilePath);
if (locationKey == null) {
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@
import io.deephaven.engine.table.impl.locations.local.LocationTableBuilderDefinition;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.parquet.table.location.ParquetTableLocationKey;
import io.deephaven.parquet.table.ParquetUtils;
import org.jetbrains.annotations.NotNull;

import java.io.File;
@@ -27,7 +28,7 @@ public ParquetKeyValuePartitionedLayout(
@NotNull final TableDefinition tableDefinition,
@NotNull final ParquetInstructions readInstructions) {
super(tableRootDirectory,
ParquetFileHelper::fileNameMatches,
ParquetUtils::fileNameMatches,
() -> new LocationTableBuilderDefinition(tableDefinition),
(path, partitions) -> new ParquetTableLocationKey(path.toFile(), 0, partitions, readInstructions),
Math.toIntExact(tableDefinition.getColumnStream().filter(ColumnDefinition::isPartitioning).count()));
@@ -38,7 +39,7 @@ public ParquetKeyValuePartitionedLayout(
final int maxPartitioningLevels,
@NotNull final ParquetInstructions readInstructions) {
super(tableRootDirectory,
ParquetFileHelper::fileNameMatches,
ParquetUtils::fileNameMatches,
() -> new LocationTableBuilderCsv(tableRootDirectory),
(path, partitions) -> new ParquetTableLocationKey(path.toFile(), 0, partitions, readInstructions),
maxPartitioningLevels);
Original file line number Diff line number Diff line change
@@ -36,6 +36,8 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static io.deephaven.parquet.table.ParquetUtils.COMMON_METADATA_FILE_NAME;
import static io.deephaven.parquet.table.ParquetUtils.METADATA_FILE_NAME;
import static java.util.stream.Collectors.toMap;

/**
@@ -55,9 +57,6 @@
*/
public class ParquetMetadataFileLayout implements TableLocationKeyFinder<ParquetTableLocationKey> {

public static final String METADATA_FILE_NAME = "_metadata";
public static final String COMMON_METADATA_FILE_NAME = "_common_metadata";

private final File metadataFile;
private final File commonMetadataFile;

Original file line number Diff line number Diff line change
@@ -8,7 +8,6 @@
import io.deephaven.parquet.table.ParquetTools;
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.engine.table.impl.locations.TableLocationKey;
import io.deephaven.parquet.table.ParquetTableWriter;
import io.deephaven.parquet.base.ParquetFileReader;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.format.RowGroup;
@@ -23,6 +22,8 @@
import java.util.Map;
import java.util.stream.IntStream;

import static io.deephaven.parquet.table.ParquetUtils.PARQUET_FILE_EXTENSION;

/**
* {@link TableLocationKey} implementation for use with data stored in the parquet format.
*/
@@ -74,8 +75,8 @@ private static URI validateParquetFile(@NotNull final File file) {
}

private static URI validateParquetFile(@NotNull final URI parquetFileUri) {
if (!parquetFileUri.getRawPath().endsWith(ParquetTableWriter.PARQUET_FILE_EXTENSION)) {
throw new IllegalArgumentException("Parquet file must end in " + ParquetTableWriter.PARQUET_FILE_EXTENSION);
if (!parquetFileUri.getRawPath().endsWith(PARQUET_FILE_EXTENSION)) {
throw new IllegalArgumentException("Parquet file must end in " + PARQUET_FILE_EXTENSION);
}
return parquetFileUri;
}
Loading

0 comments on commit f659f3c

Please sign in to comment.