Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: DH-18174: Delay reading from parquet file when creating table and column location #6590

Closed
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Review with Devin
malhotrashivam committed Jan 29, 2025
commit 70788aba9fc0eb9941e91ee2c271175f502137b0
Original file line number Diff line number Diff line change
@@ -222,7 +222,7 @@ private BasicDataIndex getDataIndex() {

@Override
@Nullable
public BasicDataIndex getDataIndex(@NotNull final String... columns) {
public final BasicDataIndex getDataIndex(@NotNull final String... columns) {
final List<String> columnNames = new ArrayList<>(columns.length);
Collections.addAll(columnNames, columns);
columnNames.sort(String::compareTo);
Original file line number Diff line number Diff line change
@@ -60,19 +60,25 @@ final class ParquetColumnLocation<ATTR extends Values> extends AbstractColumnLoc
private static final int MAX_PAGE_CACHE_SIZE = Configuration.getInstance()
.getIntegerForClassWithDefault(ParquetColumnLocation.class, "maxPageCacheSize", 8192);

private final ParquetTableLocation parquetTableLocation;
private final String columnName;
private final String parquetColumnName;

private boolean isInitialized;
private volatile boolean isInitializedVolatile;

private volatile boolean readersInitialized;

// Access to following variables must be guarded by initializeReaders()
/**
* Factory object needed for deferred initialization of the remaining fields. Reference serves as a barrier to
* ensure visibility of the derived fields.
* ensure visibility of the derived fields. We delay initializing this field till we need to read the column data.
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
*/
private volatile ColumnChunkReader[] columnChunkReaders;

/**
* Whether the column location actually exists.
*/
private boolean exists;
// -----------------------------------------------------------------------

// We should consider moving this to column level if needed. Column-location level likely allows more parallelism.
private volatile PageCache<ATTR> pageCache;

@@ -91,32 +97,27 @@ final class ParquetColumnLocation<ATTR extends Values> extends AbstractColumnLoc
@NotNull final String columnName,
@NotNull final String parquetColumnName) {
super(tableLocation, columnName);
this.parquetTableLocation = tableLocation;
this.columnName = columnName;
this.parquetColumnName = parquetColumnName;
this.isInitialized = false;
this.isInitializedVolatile = false;
this.readersInitialized = false;
}

private void initialize() {
if (isInitialized) {
private void initializeReaders() {
if (readersInitialized) {
return;
}
synchronized (this) {
isInitialized = isInitializedVolatile;
if (isInitialized) {
if (readersInitialized) {
return;
}
tl().initialize();
final String[] columnPath = tl().getParquetColumnNameToPath().get(parquetColumnName);
final List<String> nameList =
columnPath == null ? Collections.singletonList(parquetColumnName) : Arrays.asList(columnPath);
final ColumnChunkReader[] columnChunkReaders = Arrays.stream(tl().getRowGroupReaders())
.map(rgr -> rgr.getColumnChunk(columnName, nameList)).toArray(ColumnChunkReader[]::new);
final boolean exists = Arrays.stream(columnChunkReaders).anyMatch(ccr -> ccr != null && ccr.numRows() > 0);
exists = Arrays.stream(columnChunkReaders).anyMatch(ccr -> ccr != null && ccr.numRows() > 0);
this.columnChunkReaders = exists ? columnChunkReaders : null;
isInitialized = true;
isInitializedVolatile = true;
readersInitialized = true;
}
}

@@ -145,14 +146,12 @@ public String getImplementationName() {

@Override
public boolean exists() {
initialize();
// If we see a null columnChunkReaders array, either we don't exist or we are guaranteed to
// see a non-null pageStores array
return columnChunkReaders != null || pageStores != null;
initializeReaders();
return exists;
}

private ParquetTableLocation tl() {
return parquetTableLocation;
return (ParquetTableLocation) getTableLocation();
}

private <SOURCE, REGION_TYPE> REGION_TYPE makeColumnRegion(
@@ -178,7 +177,6 @@ private <SOURCE, REGION_TYPE> REGION_TYPE makeSingleColumnRegion(final SOURCE so
@Override
public ColumnRegionChar<Values> makeColumnRegionChar(
@NotNull final ColumnDefinition<?> columnDefinition) {
initialize();
// noinspection unchecked
return (ColumnRegionChar<Values>) makeColumnRegion(this::getPageStores, columnDefinition,
ColumnRegionChar::createNull, ParquetColumnRegionChar::new,
@@ -189,7 +187,6 @@ public ColumnRegionChar<Values> makeColumnRegionChar(
@Override
public ColumnRegionByte<Values> makeColumnRegionByte(
@NotNull final ColumnDefinition<?> columnDefinition) {
initialize();
// noinspection unchecked
return (ColumnRegionByte<Values>) makeColumnRegion(this::getPageStores, columnDefinition,
ColumnRegionByte::createNull, ParquetColumnRegionByte::new,
@@ -200,7 +197,6 @@ public ColumnRegionByte<Values> makeColumnRegionByte(
@Override
public ColumnRegionShort<Values> makeColumnRegionShort(
@NotNull final ColumnDefinition<?> columnDefinition) {
initialize();
// noinspection unchecked
return (ColumnRegionShort<Values>) makeColumnRegion(this::getPageStores, columnDefinition,
ColumnRegionShort::createNull, ParquetColumnRegionShort::new,
@@ -211,7 +207,6 @@ public ColumnRegionShort<Values> makeColumnRegionShort(
@Override
public ColumnRegionInt<Values> makeColumnRegionInt(
@NotNull final ColumnDefinition<?> columnDefinition) {
initialize();
// noinspection unchecked
return (ColumnRegionInt<Values>) makeColumnRegion(this::getPageStores, columnDefinition,
ColumnRegionInt::createNull, ParquetColumnRegionInt::new,
@@ -222,7 +217,6 @@ public ColumnRegionInt<Values> makeColumnRegionInt(
@Override
public ColumnRegionLong<Values> makeColumnRegionLong(
@NotNull final ColumnDefinition<?> columnDefinition) {
initialize();
// noinspection unchecked
return (ColumnRegionLong<Values>) makeColumnRegion(this::getPageStores, columnDefinition,
ColumnRegionLong::createNull, ParquetColumnRegionLong::new,
@@ -233,7 +227,6 @@ public ColumnRegionLong<Values> makeColumnRegionLong(
@Override
public ColumnRegionFloat<Values> makeColumnRegionFloat(
@NotNull final ColumnDefinition<?> columnDefinition) {
initialize();
// noinspection unchecked
return (ColumnRegionFloat<Values>) makeColumnRegion(this::getPageStores, columnDefinition,
ColumnRegionFloat::createNull, ParquetColumnRegionFloat::new,
@@ -244,7 +237,6 @@ public ColumnRegionFloat<Values> makeColumnRegionFloat(
@Override
public ColumnRegionDouble<Values> makeColumnRegionDouble(
@NotNull final ColumnDefinition<?> columnDefinition) {
initialize();
// noinspection unchecked
return (ColumnRegionDouble<Values>) makeColumnRegion(this::getPageStores, columnDefinition,
ColumnRegionDouble::createNull, ParquetColumnRegionDouble::new,
@@ -255,7 +247,6 @@ public ColumnRegionDouble<Values> makeColumnRegionDouble(
@Override
public <TYPE> ColumnRegionObject<TYPE, Values> makeColumnRegionObject(
@NotNull final ColumnDefinition<TYPE> columnDefinition) {
initialize();
final Class<TYPE> dataType = columnDefinition.getDataType();
final ColumnChunkPageStore<ATTR>[] sources = getPageStores(columnDefinition);
final ColumnChunkPageStore<DictionaryKeys>[] dictKeySources =
@@ -330,6 +321,7 @@ private ColumnChunkPageStore<DictionaryKeys>[] getDictionaryKeysPageStores(

@SuppressWarnings("unchecked")
private void fetchValues(@NotNull final ColumnDefinition<?> columnDefinition) {
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
initializeReaders();
if (columnChunkReaders == null) {
return;
}
Original file line number Diff line number Diff line change
@@ -54,42 +54,39 @@ public class ParquetTableLocation extends AbstractTableLocation {

private static final String IMPLEMENTATION_NAME = ParquetColumnLocation.class.getSimpleName();

private boolean isInitialized;
private volatile boolean isInitializedVolatile;

private final ParquetInstructions readInstructions;

private volatile boolean isInitialized;

// Access to all the following variables must be guarded by initialize()
private ParquetFileReader parquetFileReader;
private int[] rowGroupIndices;

private RowGroup[] rowGroups;
private RegionedPageStore.Parameters regionParameters;
private Map<String, String[]> parquetColumnNameToPath;

private TableInfo tableInfo;
private Map<String, GroupingColumnInfo> groupingColumns;
private List<DataIndexInfo> dataIndexes;
private Map<String, ColumnTypeInfo> columnTypes;
private List<SortColumn> sortingColumns;

private String version;

private volatile RowGroupReader[] rowGroupReaders;
// -----------------------------------------------------------------------

public ParquetTableLocation(@NotNull final TableKey tableKey,
@NotNull final ParquetTableLocationKey tableLocationKey,
@NotNull final ParquetInstructions readInstructions) {
super(tableKey, tableLocationKey, false);
this.readInstructions = readInstructions;
this.isInitialized = false;
this.isInitializedVolatile = false;
}

final void initialize() {
private void initialize() {
if (isInitialized) {
return;
}
synchronized (this) {
isInitialized = isInitializedVolatile;
if (isInitialized) {
return;
}
@@ -104,7 +101,7 @@ final void initialize() {
}

final int rowGroupCount = rowGroupIndices.length;
rowGroups = IntStream.of(rowGroupIndices)
final RowGroup[] rowGroups = IntStream.of(rowGroupIndices)
.mapToObj(rgi -> parquetFileReader.fileMetaData.getRow_groups().get(rgi))
.sorted(Comparator.comparingInt(RowGroup::getOrdinal))
.toArray(RowGroup[]::new);
@@ -130,19 +127,17 @@ final void initialize() {
.orElse(TableInfo.builder().build());
version = tableInfo.version();
groupingColumns = tableInfo.groupingColumnMap();
dataIndexes = tableInfo.dataIndexes();
columnTypes = tableInfo.columnTypeMap();
sortingColumns = SortColumnInfo.sortColumns(tableInfo.sortingColumns());

if (!FILE_URI_SCHEME.equals(tableLocationKey.getURI().getScheme())) {
// We do not have the last modified time for non-file URIs
handleUpdate(computeIndex(), TableLocationState.NULL_TIME);
handleUpdate(computeIndex(rowGroups), TableLocationState.NULL_TIME);
} else {
handleUpdate(computeIndex(), new File(tableLocationKey.getURI()).lastModified());
handleUpdate(computeIndex(rowGroups), new File(tableLocationKey.getURI()).lastModified());
}

isInitialized = true;
isInitializedVolatile = true;
}
}

@@ -163,6 +158,7 @@ ParquetInstructions getReadInstructions() {
}

RegionedPageStore.Parameters getRegionParameters() {
initialize();
return regionParameters;
}

@@ -180,6 +176,7 @@ RowGroupReader[] getRowGroupReaders() {
if ((local = rowGroupReaders) != null) {
return local;
}
initialize();
return rowGroupReaders = IntStream.of(rowGroupIndices)
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
.mapToObj(idx -> parquetFileReader.getRowGroup(idx, version))
.sorted(Comparator.comparingInt(rgr -> rgr.getRowGroup().getOrdinal()))
@@ -196,6 +193,7 @@ public List<SortColumn> getSortedColumns() {

@NotNull
Map<String, String[]> getParquetColumnNameToPath() {
initialize();
return parquetColumnNameToPath;
}

@@ -218,7 +216,7 @@ protected ColumnLocation makeColumnLocation(@NotNull final String columnName) {
return new ParquetColumnLocation<>(this, columnName, parquetColumnName);
}

private RowSet computeIndex() {
private RowSet computeIndex(@NotNull final RowGroup[] rowGroups) {
final RowSetBuilderSequential sequentialBuilder = RowSetFactory.builderSequential();

for (int rgi = 0; rgi < rowGroups.length; ++rgi) {
@@ -238,6 +236,7 @@ private RowSet computeIndex() {
@NotNull
public List<String[]> getDataIndexColumns() {
initialize();
final List<DataIndexInfo> dataIndexes = tableInfo.dataIndexes();
if (dataIndexes.isEmpty() && groupingColumns.isEmpty()) {
return List.of();
}
@@ -259,7 +258,7 @@ public boolean hasDataIndex(@NotNull final String... columns) {
return metadata != null && parquetFileExists(metadata.fileURI);
}
// Check if the column names match any of the data indexes
for (final DataIndexInfo dataIndex : dataIndexes) {
for (final DataIndexInfo dataIndex : tableInfo.dataIndexes()) {
if (dataIndex.matchesColumns(columns)) {
// Validate the index file exists (without loading and parsing it)
final IndexFileMetadata metadata = getIndexFileMetadata(
@@ -277,20 +276,10 @@ private static boolean parquetFileExists(@NotNull final URI fileURI) {
return !fileURI.getScheme().equals(FILE_URI_SCHEME) || Files.exists(Path.of(fileURI));
}

@Override
@Nullable
public final BasicDataIndex getDataIndex(@NotNull final String... columns) {
initialize();
return super.getDataIndex(columns);
}

@Override
@Nullable
public BasicDataIndex loadDataIndex(@NotNull final String... columns) {
initialize();
if (tableInfo == null) {
return null;
}
final IndexFileMetadata indexFileMetaData = getIndexFileMetadata(getParquetKey().getURI(), tableInfo, columns);
if (indexFileMetaData == null) {
throw new TableDataException(