Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: DH-18174: Delay reading from parquet file when creating table and column location #6590

Closed
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ interface Listener extends BasicTableDataListener {

/**
* @param name The column name
* @return The ColumnLocation for the defined column under this table location
* @return The ColumnLocation for the defined column under this table location. The exact same ColumnLocation object
* should be returned for the same column name.
*/
@NotNull
ColumnLocation getColumnLocation(@NotNull CharSequence name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ protected void destroy() {
};
}

/**
* This method is called before every public method so that the child classes can initialize the location lazily, if
* necessary.
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
*/
protected void initialize() {
// Do nothing
}

@Override
public final String toString() {
return toStringHelper();
Expand All @@ -90,21 +98,25 @@ public LivenessReferent asLivenessReferent() {
@Override
@NotNull
public final Object getStateLock() {
initialize();
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
return state.getStateLock();
}

@Override
public final RowSet getRowSet() {
initialize();
return state.getRowSet();
}

@Override
public final long getSize() {
initialize();
return state.getSize();
}

@Override
public final long getLastModifiedTimeMillis() {
initialize();
return state.getLastModifiedTimeMillis();
}

Expand All @@ -126,6 +138,7 @@ public final ImmutableTableLocationKey getKey() {

@Override
protected final void deliverInitialSnapshot(@NotNull final Listener listener) {
initialize();
listener.handleUpdate();
}

Expand All @@ -137,6 +150,7 @@ protected final void deliverInitialSnapshot(@NotNull final Listener listener) {
* @param lastModifiedTimeMillis The new lastModificationTimeMillis
*/
public final void handleUpdate(final RowSet rowSet, final long lastModifiedTimeMillis) {
initialize();
if (state.setValues(rowSet, lastModifiedTimeMillis) && supportsSubscriptions()) {
deliverUpdateNotification();
}
Expand All @@ -149,6 +163,7 @@ public final void handleUpdate(final RowSet rowSet, final long lastModifiedTimeM
* @param source The source to copy state values from
*/
public void handleUpdate(@NotNull final TableLocationState source) {
initialize();
if (source.copyStateValuesTo(state) && supportsSubscriptions()) {
deliverUpdateNotification();
}
Expand Down Expand Up @@ -223,6 +238,7 @@ private BasicDataIndex getDataIndex() {
@Override
@Nullable
public final BasicDataIndex getDataIndex(@NotNull final String... columns) {
initialize();
final List<String> columnNames = new ArrayList<>(columns.length);
Collections.addAll(columnNames, columns);
columnNames.sort(String::compareTo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import java.math.BigInteger;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.LongFunction;
Expand All @@ -58,7 +60,12 @@ final class ParquetColumnLocation<ATTR extends Values> extends AbstractColumnLoc
private static final int MAX_PAGE_CACHE_SIZE = Configuration.getInstance()
.getIntegerForClassWithDefault(ParquetColumnLocation.class, "maxPageCacheSize", 8192);

private final ParquetTableLocation parquetTableLocation;
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
private final String columnName;
private final String parquetColumnName;

private volatile boolean isInitialized;

/**
* Factory object needed for deferred initialization of the remaining fields. Reference serves as a barrier to
* ensure visibility of the derived fields.
Expand All @@ -77,16 +84,36 @@ final class ParquetColumnLocation<ATTR extends Values> extends AbstractColumnLoc
*
* @param tableLocation The table location enclosing this column location
* @param parquetColumnName The Parquet file column name
* @param columnChunkReaders The {@link ColumnChunkReader column chunk readers} for this location
*/
ParquetColumnLocation(
@NotNull final ParquetTableLocation tableLocation,
@NotNull final String columnName,
@NotNull final String parquetColumnName,
@Nullable final ColumnChunkReader[] columnChunkReaders) {
@NotNull final String parquetColumnName) {
super(tableLocation, columnName);
this.parquetTableLocation = tableLocation;
this.columnName = columnName;
this.parquetColumnName = parquetColumnName;
this.columnChunkReaders = columnChunkReaders;
this.isInitialized = false;
}

private void initialize() {
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
if (isInitialized) {
return;
}
synchronized (this) {
if (isInitialized) {
return;
}
tl().initialize();
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
final String[] columnPath = tl().getParquetColumnNameToPath().get(parquetColumnName);
final List<String> nameList =
columnPath == null ? Collections.singletonList(parquetColumnName) : Arrays.asList(columnPath);
final ColumnChunkReader[] columnChunkReaders = Arrays.stream(tl().getRowGroupReaders())
.map(rgr -> rgr.getColumnChunk(columnName, nameList)).toArray(ColumnChunkReader[]::new);
final boolean exists = Arrays.stream(columnChunkReaders).anyMatch(ccr -> ccr != null && ccr.numRows() > 0);
this.columnChunkReaders = exists ? columnChunkReaders : null;
isInitialized = true;
}
}

private PageCache<ATTR> ensurePageCache() {
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -114,14 +141,14 @@ public String getImplementationName() {

@Override
public boolean exists() {
initialize();
// If we see a null columnChunkReaders array, either we don't exist or we are guaranteed to
// see a non-null
// pageStores array
// see a non-null pageStores array
return columnChunkReaders != null || pageStores != null;
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
}

private ParquetTableLocation tl() {
return (ParquetTableLocation) getTableLocation();
return parquetTableLocation;
}

private <SOURCE, REGION_TYPE> REGION_TYPE makeColumnRegion(
Expand All @@ -147,6 +174,7 @@ private <SOURCE, REGION_TYPE> REGION_TYPE makeSingleColumnRegion(final SOURCE so
@Override
public ColumnRegionChar<Values> makeColumnRegionChar(
@NotNull final ColumnDefinition<?> columnDefinition) {
initialize();
// noinspection unchecked
return (ColumnRegionChar<Values>) makeColumnRegion(this::getPageStores, columnDefinition,
ColumnRegionChar::createNull, ParquetColumnRegionChar::new,
Expand All @@ -157,6 +185,7 @@ public ColumnRegionChar<Values> makeColumnRegionChar(
@Override
public ColumnRegionByte<Values> makeColumnRegionByte(
@NotNull final ColumnDefinition<?> columnDefinition) {
initialize();
// noinspection unchecked
return (ColumnRegionByte<Values>) makeColumnRegion(this::getPageStores, columnDefinition,
ColumnRegionByte::createNull, ParquetColumnRegionByte::new,
Expand All @@ -167,6 +196,7 @@ public ColumnRegionByte<Values> makeColumnRegionByte(
@Override
public ColumnRegionShort<Values> makeColumnRegionShort(
@NotNull final ColumnDefinition<?> columnDefinition) {
initialize();
// noinspection unchecked
return (ColumnRegionShort<Values>) makeColumnRegion(this::getPageStores, columnDefinition,
ColumnRegionShort::createNull, ParquetColumnRegionShort::new,
Expand All @@ -177,6 +207,7 @@ public ColumnRegionShort<Values> makeColumnRegionShort(
@Override
public ColumnRegionInt<Values> makeColumnRegionInt(
@NotNull final ColumnDefinition<?> columnDefinition) {
initialize();
// noinspection unchecked
return (ColumnRegionInt<Values>) makeColumnRegion(this::getPageStores, columnDefinition,
ColumnRegionInt::createNull, ParquetColumnRegionInt::new,
Expand All @@ -187,6 +218,7 @@ public ColumnRegionInt<Values> makeColumnRegionInt(
@Override
public ColumnRegionLong<Values> makeColumnRegionLong(
@NotNull final ColumnDefinition<?> columnDefinition) {
initialize();
// noinspection unchecked
return (ColumnRegionLong<Values>) makeColumnRegion(this::getPageStores, columnDefinition,
ColumnRegionLong::createNull, ParquetColumnRegionLong::new,
Expand All @@ -197,6 +229,7 @@ public ColumnRegionLong<Values> makeColumnRegionLong(
@Override
public ColumnRegionFloat<Values> makeColumnRegionFloat(
@NotNull final ColumnDefinition<?> columnDefinition) {
initialize();
// noinspection unchecked
return (ColumnRegionFloat<Values>) makeColumnRegion(this::getPageStores, columnDefinition,
ColumnRegionFloat::createNull, ParquetColumnRegionFloat::new,
Expand All @@ -207,6 +240,7 @@ public ColumnRegionFloat<Values> makeColumnRegionFloat(
@Override
public ColumnRegionDouble<Values> makeColumnRegionDouble(
@NotNull final ColumnDefinition<?> columnDefinition) {
initialize();
// noinspection unchecked
return (ColumnRegionDouble<Values>) makeColumnRegion(this::getPageStores, columnDefinition,
ColumnRegionDouble::createNull, ParquetColumnRegionDouble::new,
Expand All @@ -217,6 +251,7 @@ public ColumnRegionDouble<Values> makeColumnRegionDouble(
@Override
public <TYPE> ColumnRegionObject<TYPE, Values> makeColumnRegionObject(
@NotNull final ColumnDefinition<TYPE> columnDefinition) {
initialize();
final Class<TYPE> dataType = columnDefinition.getDataType();
final ColumnChunkPageStore<ATTR>[] sources = getPageStores(columnDefinition);
final ColumnChunkPageStore<DictionaryKeys>[] dictKeySources =
Expand Down Expand Up @@ -258,7 +293,7 @@ private <TYPE> ColumnRegionObject<TYPE, ATTR> makeSingleColumnRegionObject(
* @return The page stores
*/
@NotNull
public ColumnChunkPageStore<ATTR>[] getPageStores(
private ColumnChunkPageStore<ATTR>[] getPageStores(
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
@NotNull final ColumnDefinition<?> columnDefinition) {
fetchValues(columnDefinition);
return pageStores;
Expand All @@ -270,7 +305,7 @@ public ColumnChunkPageStore<ATTR>[] getPageStores(
* @param columnDefinition The {@link ColumnDefinition} used to lookup type information
* @return The dictionary values chunk suppliers, or null if none exist
*/
public Supplier<Chunk<ATTR>>[] getDictionaryChunkSuppliers(
private Supplier<Chunk<ATTR>>[] getDictionaryChunkSuppliers(
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
@NotNull final ColumnDefinition<?> columnDefinition) {
fetchValues(columnDefinition);
return dictionaryChunkSuppliers;
Expand Down
Loading
Loading