Skip to content

Commit

Permalink
feat: DH-18174: Delay reading from parquet file when creating table a…
Browse files Browse the repository at this point in the history
…nd column location (#6622)

Backport of #6606
Related to https://deephaven.atlassian.net/browse/DH-18174
  • Loading branch information
malhotrashivam authored Feb 4, 2025
1 parent b6503a8 commit 315ff3f
Show file tree
Hide file tree
Showing 6 changed files with 307 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ interface Listener extends BasicTableDataListener {

/**
* @param name The column name
* @return The ColumnLocation for the defined column under this table location
* @return The ColumnLocation for the defined column under this table location. The exact same ColumnLocation object
* should be returned for the same column name.
*/
@NotNull
ColumnLocation getColumnLocation(@NotNull CharSequence name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,16 @@ public LivenessReferent asLivenessReferent() {
// TableLocationState implementation
// ------------------------------------------------------------------------------------------------------------------

/**
* No-op by default, can be overridden by subclasses to initialize state on first access.
* <p>
* The expectation for static locations that override this is to call {@link #handleUpdateInternal(RowSet, long)}
* instead of {@link #handleUpdate(RowSet, long)}, and {@link #handleUpdateInternal(TableLocationState)} instead of
* {@link #handleUpdate(TableLocationState)} from inside {@link #initializeState()}. Otherwise, the initialization
* logic will recurse infinitely.
*/
protected void initializeState() {}

@Override
@NotNull
public final Object getStateLock() {
Expand All @@ -95,16 +105,19 @@ public final Object getStateLock() {

@Override
public final RowSet getRowSet() {
initializeState();
return state.getRowSet();
}

@Override
public final long getSize() {
initializeState();
return state.getSize();
}

@Override
public final long getLastModifiedTimeMillis() {
initializeState();
return state.getLastModifiedTimeMillis();
}

Expand Down Expand Up @@ -137,6 +150,11 @@ protected final void deliverInitialSnapshot(@NotNull final Listener listener) {
* @param lastModifiedTimeMillis The new lastModificationTimeMillis
*/
public final void handleUpdate(final RowSet rowSet, final long lastModifiedTimeMillis) {
initializeState();
handleUpdateInternal(rowSet, lastModifiedTimeMillis);
}

protected final void handleUpdateInternal(final RowSet rowSet, final long lastModifiedTimeMillis) {
if (state.setValues(rowSet, lastModifiedTimeMillis) && supportsSubscriptions()) {
deliverUpdateNotification();
}
Expand All @@ -149,6 +167,11 @@ public final void handleUpdate(final RowSet rowSet, final long lastModifiedTimeM
* @param source The source to copy state values from
*/
public void handleUpdate(@NotNull final TableLocationState source) {
initializeState();
handleUpdateInternal(source);
}

protected final void handleUpdateInternal(@NotNull final TableLocationState source) {
if (source.copyStateValuesTo(state) && supportsSubscriptions()) {
deliverUpdateNotification();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,6 @@ private class IncludedTableLocationEntry implements Comparable<IncludedTableLoca
// New regions indices are assigned in order of insertion, starting from 0 with no re-use of removed indices.
// If this logic changes, the `getTableAttributes()` logic needs to be updated.
private final int regionIndex = nextRegionIndex++;
private final List<ColumnLocationState<?>> columnLocationStates = new ArrayList<>();

/**
* RowSet in the region's space, not the table's space.
Expand Down Expand Up @@ -631,13 +630,10 @@ private void processInitial(final RowSetBuilderSequential addedRowSetBuilder, fi
.appendRange(regionFirstKey + subRegionFirstKey, regionFirstKey + subRegionLastKey));

for (final ColumnDefinition<?> columnDefinition : columnDefinitions) {
// noinspection unchecked,rawtypes
final ColumnLocationState<?> state = new ColumnLocationState(
columnDefinition,
columnSources.get(columnDefinition.getName()),
location.getColumnLocation(columnDefinition.getName()));
columnLocationStates.add(state);
state.regionAllocated(regionIndex);
final RegionedColumnSource<?> regionedColumnSource = columnSources.get(columnDefinition.getName());
final ColumnLocation columnLocation = location.getColumnLocation(columnDefinition.getName());
Assert.eq(regionIndex, "regionIndex", regionedColumnSource.addRegion(columnDefinition, columnLocation),
"regionedColumnSource.addRegion((definition, location)");
}

rowSetAtLastUpdate = initialRowSet;
Expand Down Expand Up @@ -710,7 +706,7 @@ private boolean pollUpdates(final RowSetBuilderSequential addedRowSetBuilder) {
}

private void invalidate() {
columnLocationStates.forEach(cls -> cls.source.invalidateRegion(regionIndex));
columnSources.values().forEach(source -> source.invalidateRegion(regionIndex));
}

@Override
Expand All @@ -734,30 +730,6 @@ public ImmutableTableLocationKey getKey(
}
};

/**
* Batches up a definition, source, and location for ease of use. Implements grouping maintenance.
*/
private static class ColumnLocationState<T> {

protected final ColumnDefinition<T> definition;
protected final RegionedColumnSource<T> source;
protected final ColumnLocation location;

private ColumnLocationState(
ColumnDefinition<T> definition,
RegionedColumnSource<T> source,
ColumnLocation location) {
this.definition = definition;
this.source = source;
this.location = location;
}

private void regionAllocated(final int regionIndex) {
Assert.eq(regionIndex, "regionIndex", source.addRegion(definition, location),
"source.addRegion((definition, location)");
}
}

public Map<String, Object> getTableAttributes(
@NotNull TableUpdateMode tableUpdateMode,
@NotNull TableUpdateMode tableLocationUpdateMode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import java.math.BigInteger;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.LongFunction;
Expand All @@ -58,48 +60,71 @@ final class ParquetColumnLocation<ATTR extends Values> extends AbstractColumnLoc
private static final int MAX_PAGE_CACHE_SIZE = Configuration.getInstance()
.getIntegerForClassWithDefault(ParquetColumnLocation.class, "maxPageCacheSize", 8192);

private final String columnName;
private final String parquetColumnName;

private volatile boolean readersInitialized;
private final Object readersLock;

// Access to following variables must be guarded by initializeReaders()
// -----------------------------------------------------------------------
/**
* Factory object needed for deferred initialization of the remaining fields. We delay initializing this field
* itself till we need to read the column data.
*/
private ColumnChunkReader[] columnChunkReaders;

/**
* Factory object needed for deferred initialization of the remaining fields. Reference serves as a barrier to
* ensure visibility of the derived fields.
* Whether the column location actually exists.
*/
private volatile ColumnChunkReader[] columnChunkReaders;
private boolean exists;
// -----------------------------------------------------------------------

// We should consider moving this to column level if needed. Column-location level likely allows more parallelism.
private volatile PageCache<ATTR> pageCache;
private volatile boolean pagesInitialized;
private final Object pagesLock;

// Access to following variables must be guarded by initializePages()
// -----------------------------------------------------------------------
private ColumnChunkPageStore<ATTR>[] pageStores;
private Supplier<Chunk<ATTR>>[] dictionaryChunkSuppliers;
private ColumnChunkPageStore<DictionaryKeys>[] dictionaryKeysPageStores;
// -----------------------------------------------------------------------

/**
* Construct a new {@link ParquetColumnLocation} for the specified {@link ParquetTableLocation} and column name.
*
* @param tableLocation The table location enclosing this column location
* @param parquetColumnName The Parquet file column name
* @param columnChunkReaders The {@link ColumnChunkReader column chunk readers} for this location
*/
ParquetColumnLocation(
@NotNull final ParquetTableLocation tableLocation,
@NotNull final String columnName,
@NotNull final String parquetColumnName,
@Nullable final ColumnChunkReader[] columnChunkReaders) {
@NotNull final String parquetColumnName) {
super(tableLocation, columnName);
this.columnName = columnName;
this.parquetColumnName = parquetColumnName;
this.columnChunkReaders = columnChunkReaders;
this.readersInitialized = false;
this.readersLock = new Object();
this.pagesInitialized = false;
this.pagesLock = new Object();
}

private PageCache<ATTR> ensurePageCache() {
PageCache<ATTR> localPageCache;
if ((localPageCache = pageCache) != null) {
return localPageCache;
private void initializeReaders() {
if (readersInitialized) {
return;
}

synchronized (this) {
if ((localPageCache = pageCache) != null) {
return localPageCache;
synchronized (readersLock) {
if (readersInitialized) {
return;
}
return pageCache = new PageCache<>(INITIAL_PAGE_CACHE_SIZE, MAX_PAGE_CACHE_SIZE);
final String[] columnPath = tl().getParquetColumnNameToPath().get(parquetColumnName);
final List<String> nameList =
columnPath == null ? Collections.singletonList(parquetColumnName) : Arrays.asList(columnPath);
final ColumnChunkReader[] columnChunkReaders = Arrays.stream(tl().getRowGroupReaders())
.map(rgr -> rgr.getColumnChunk(columnName, nameList)).toArray(ColumnChunkReader[]::new);
exists = Arrays.stream(columnChunkReaders).anyMatch(ccr -> ccr != null && ccr.numRows() > 0);
this.columnChunkReaders = exists ? columnChunkReaders : null;
readersInitialized = true;
}
}

Expand All @@ -114,10 +139,8 @@ public String getImplementationName() {

@Override
public boolean exists() {
// If we see a null columnChunkReaders array, either we don't exist or we are guaranteed to
// see a non-null
// pageStores array
return columnChunkReaders != null || pageStores != null;
initializeReaders();
return exists;
}

private ParquetTableLocation tl() {
Expand Down Expand Up @@ -258,9 +281,9 @@ private <TYPE> ColumnRegionObject<TYPE, ATTR> makeSingleColumnRegionObject(
* @return The page stores
*/
@NotNull
public ColumnChunkPageStore<ATTR>[] getPageStores(
private ColumnChunkPageStore<ATTR>[] getPageStores(
@NotNull final ColumnDefinition<?> columnDefinition) {
fetchValues(columnDefinition);
initializePages(columnDefinition);
return pageStores;
}

Expand All @@ -270,9 +293,9 @@ public ColumnChunkPageStore<ATTR>[] getPageStores(
* @param columnDefinition The {@link ColumnDefinition} used to lookup type information
* @return The dictionary values chunk suppliers, or null if none exist
*/
public Supplier<Chunk<ATTR>>[] getDictionaryChunkSuppliers(
private Supplier<Chunk<ATTR>>[] getDictionaryChunkSuppliers(
@NotNull final ColumnDefinition<?> columnDefinition) {
fetchValues(columnDefinition);
initializePages(columnDefinition);
return dictionaryChunkSuppliers;
}

Expand All @@ -285,30 +308,35 @@ public Supplier<Chunk<ATTR>>[] getDictionaryChunkSuppliers(
*/
private ColumnChunkPageStore<DictionaryKeys>[] getDictionaryKeysPageStores(
@NotNull final ColumnDefinition<?> columnDefinition) {
fetchValues(columnDefinition);
initializePages(columnDefinition);
return dictionaryKeysPageStores;
}

@SuppressWarnings("unchecked")
private void fetchValues(@NotNull final ColumnDefinition<?> columnDefinition) {
if (columnChunkReaders == null) {
private void initializePages(@NotNull final ColumnDefinition<?> columnDefinition) {
if (pagesInitialized) {
return;
}
synchronized (this) {
if (columnChunkReaders == null) {
synchronized (pagesLock) {
if (pagesInitialized) {
return;
}

initializeReaders();
final int pageStoreCount = columnChunkReaders.length;
pageStores = new ColumnChunkPageStore[pageStoreCount];
dictionaryChunkSuppliers = new Supplier[pageStoreCount];
dictionaryKeysPageStores = new ColumnChunkPageStore[pageStoreCount];

// We should consider moving this page-cache to column level if needed.
// Column-location level likely allows more parallelism.
final PageCache<ATTR> pageCache = new PageCache<>(INITIAL_PAGE_CACHE_SIZE, MAX_PAGE_CACHE_SIZE);

for (int psi = 0; psi < pageStoreCount; ++psi) {
final ColumnChunkReader columnChunkReader = columnChunkReaders[psi];
try {
final ColumnChunkPageStore.CreatorResult<ATTR> creatorResult =
ColumnChunkPageStore.create(
ensurePageCache(),
pageCache,
columnChunkReader,
tl().getRegionParameters().regionMask,
makeToPage(tl().getColumnTypes().get(parquetColumnName),
Expand All @@ -325,6 +353,7 @@ private void fetchValues(@NotNull final ColumnDefinition<?> columnDefinition) {
}

columnChunkReaders = null;
pagesInitialized = true;
}
}

Expand Down
Loading

0 comments on commit 315ff3f

Please sign in to comment.