Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Delay reading from parquet file when creating table and column location #6590

Open
wants to merge 12 commits into
base: rc/v0.37.x
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ interface Listener extends BasicTableDataListener {

/**
* @param name The column name
* @return The ColumnLocation for the defined column under this table location
* @return The ColumnLocation for the defined column under this table location. The exact same ColumnLocation object
* should be returned for the same column name.
*/
@NotNull
ColumnLocation getColumnLocation(@NotNull CharSequence name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ public final Object getStateLock() {
}

@Override
public final RowSet getRowSet() {
public RowSet getRowSet() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are missing the need to override getLastModifiedTimeMillis in this regime.

That said, I think this is a case where we do want to provide a protected method for callers to override, but only for the TableLocationState methods. This allows us to leave these implementations final. Something like

    // ------------------------------------------------------------------------------------------------------------------
    // TableLocationState implementation
    // ------------------------------------------------------------------------------------------------------------------

    protected void initializeState() {
        
    }
    
    @Override
    @NotNull
    public final Object getStateLock() {
        initializeState();
        return state.getStateLock();
    }

    @Override
    public final RowSet getRowSet() {
        initializeState();
        return state.getRowSet();
    }

    @Override
    public final long getSize() {
        initializeState();
        return state.getSize();
    }

    @Override
    public final long getLastModifiedTimeMillis() {
        initializeState();
        return state.getLastModifiedTimeMillis();
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And then it is just a shim into your normal initialize.

    @Override
    protected void initializeState() {
        initialize();
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I felt that we don't need to call initialize if some caller is just asking for getLastModifiedTimeMillis and getStateLock because those two don't need underlying parquet file to be read.
Once the file is read, the last modified time will be updated.

return state.getRowSet();
}

@Override
public final long getSize() {
public long getSize() {
return state.getSize();
}

Expand Down Expand Up @@ -222,7 +222,7 @@ private BasicDataIndex getDataIndex() {

@Override
@Nullable
public final BasicDataIndex getDataIndex(@NotNull final String... columns) {
public BasicDataIndex getDataIndex(@NotNull final String... columns) {
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
final List<String> columnNames = new ArrayList<>(columns.length);
Collections.addAll(columnNames, columns);
columnNames.sort(String::compareTo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import java.math.BigInteger;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.LongFunction;
Expand All @@ -58,7 +60,13 @@ final class ParquetColumnLocation<ATTR extends Values> extends AbstractColumnLoc
private static final int MAX_PAGE_CACHE_SIZE = Configuration.getInstance()
.getIntegerForClassWithDefault(ParquetColumnLocation.class, "maxPageCacheSize", 8192);

private final ParquetTableLocation parquetTableLocation;
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
private final String columnName;
private final String parquetColumnName;

private boolean isInitialized;
private volatile boolean isInitializedVolatile;
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved

/**
* Factory object needed for deferred initialization of the remaining fields. Reference serves as a barrier to
* ensure visibility of the derived fields.
Expand All @@ -77,16 +85,39 @@ final class ParquetColumnLocation<ATTR extends Values> extends AbstractColumnLoc
*
* @param tableLocation The table location enclosing this column location
* @param parquetColumnName The Parquet file column name
* @param columnChunkReaders The {@link ColumnChunkReader column chunk readers} for this location
*/
ParquetColumnLocation(
@NotNull final ParquetTableLocation tableLocation,
@NotNull final String columnName,
@NotNull final String parquetColumnName,
@Nullable final ColumnChunkReader[] columnChunkReaders) {
@NotNull final String parquetColumnName) {
super(tableLocation, columnName);
this.parquetTableLocation = tableLocation;
this.columnName = columnName;
this.parquetColumnName = parquetColumnName;
this.columnChunkReaders = columnChunkReaders;
this.isInitialized = false;
this.isInitializedVolatile = false;
}

private void initialize() {
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
if (isInitialized) {
return;
}
synchronized (this) {
isInitialized = isInitializedVolatile;
if (isInitialized) {
return;
}
tl().initialize();
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
final String[] columnPath = tl().getParquetColumnNameToPath().get(parquetColumnName);
final List<String> nameList =
columnPath == null ? Collections.singletonList(parquetColumnName) : Arrays.asList(columnPath);
final ColumnChunkReader[] columnChunkReaders = Arrays.stream(tl().getRowGroupReaders())
.map(rgr -> rgr.getColumnChunk(columnName, nameList)).toArray(ColumnChunkReader[]::new);
final boolean exists = Arrays.stream(columnChunkReaders).anyMatch(ccr -> ccr != null && ccr.numRows() > 0);
this.columnChunkReaders = exists ? columnChunkReaders : null;
isInitialized = true;
isInitializedVolatile = true;
}
}

private PageCache<ATTR> ensurePageCache() {
Expand Down Expand Up @@ -114,14 +145,14 @@ public String getImplementationName() {

@Override
public boolean exists() {
initialize();
// If we see a null columnChunkReaders array, either we don't exist or we are guaranteed to
// see a non-null
// pageStores array
// see a non-null pageStores array
return columnChunkReaders != null || pageStores != null;
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
}

private ParquetTableLocation tl() {
return (ParquetTableLocation) getTableLocation();
return parquetTableLocation;
}

private <SOURCE, REGION_TYPE> REGION_TYPE makeColumnRegion(
Expand All @@ -147,6 +178,7 @@ private <SOURCE, REGION_TYPE> REGION_TYPE makeSingleColumnRegion(final SOURCE so
@Override
public ColumnRegionChar<Values> makeColumnRegionChar(
@NotNull final ColumnDefinition<?> columnDefinition) {
initialize();
// noinspection unchecked
return (ColumnRegionChar<Values>) makeColumnRegion(this::getPageStores, columnDefinition,
ColumnRegionChar::createNull, ParquetColumnRegionChar::new,
Expand All @@ -157,6 +189,7 @@ public ColumnRegionChar<Values> makeColumnRegionChar(
@Override
public ColumnRegionByte<Values> makeColumnRegionByte(
@NotNull final ColumnDefinition<?> columnDefinition) {
initialize();
// noinspection unchecked
return (ColumnRegionByte<Values>) makeColumnRegion(this::getPageStores, columnDefinition,
ColumnRegionByte::createNull, ParquetColumnRegionByte::new,
Expand All @@ -167,6 +200,7 @@ public ColumnRegionByte<Values> makeColumnRegionByte(
@Override
public ColumnRegionShort<Values> makeColumnRegionShort(
@NotNull final ColumnDefinition<?> columnDefinition) {
initialize();
// noinspection unchecked
return (ColumnRegionShort<Values>) makeColumnRegion(this::getPageStores, columnDefinition,
ColumnRegionShort::createNull, ParquetColumnRegionShort::new,
Expand All @@ -177,6 +211,7 @@ public ColumnRegionShort<Values> makeColumnRegionShort(
@Override
public ColumnRegionInt<Values> makeColumnRegionInt(
@NotNull final ColumnDefinition<?> columnDefinition) {
initialize();
// noinspection unchecked
return (ColumnRegionInt<Values>) makeColumnRegion(this::getPageStores, columnDefinition,
ColumnRegionInt::createNull, ParquetColumnRegionInt::new,
Expand All @@ -187,6 +222,7 @@ public ColumnRegionInt<Values> makeColumnRegionInt(
@Override
public ColumnRegionLong<Values> makeColumnRegionLong(
@NotNull final ColumnDefinition<?> columnDefinition) {
initialize();
// noinspection unchecked
return (ColumnRegionLong<Values>) makeColumnRegion(this::getPageStores, columnDefinition,
ColumnRegionLong::createNull, ParquetColumnRegionLong::new,
Expand All @@ -197,6 +233,7 @@ public ColumnRegionLong<Values> makeColumnRegionLong(
@Override
public ColumnRegionFloat<Values> makeColumnRegionFloat(
@NotNull final ColumnDefinition<?> columnDefinition) {
initialize();
// noinspection unchecked
return (ColumnRegionFloat<Values>) makeColumnRegion(this::getPageStores, columnDefinition,
ColumnRegionFloat::createNull, ParquetColumnRegionFloat::new,
Expand All @@ -207,6 +244,7 @@ public ColumnRegionFloat<Values> makeColumnRegionFloat(
@Override
public ColumnRegionDouble<Values> makeColumnRegionDouble(
@NotNull final ColumnDefinition<?> columnDefinition) {
initialize();
// noinspection unchecked
return (ColumnRegionDouble<Values>) makeColumnRegion(this::getPageStores, columnDefinition,
ColumnRegionDouble::createNull, ParquetColumnRegionDouble::new,
Expand All @@ -217,6 +255,7 @@ public ColumnRegionDouble<Values> makeColumnRegionDouble(
@Override
public <TYPE> ColumnRegionObject<TYPE, Values> makeColumnRegionObject(
@NotNull final ColumnDefinition<TYPE> columnDefinition) {
initialize();
final Class<TYPE> dataType = columnDefinition.getDataType();
final ColumnChunkPageStore<ATTR>[] sources = getPageStores(columnDefinition);
final ColumnChunkPageStore<DictionaryKeys>[] dictKeySources =
Expand Down Expand Up @@ -258,7 +297,7 @@ private <TYPE> ColumnRegionObject<TYPE, ATTR> makeSingleColumnRegionObject(
* @return The page stores
*/
@NotNull
public ColumnChunkPageStore<ATTR>[] getPageStores(
private ColumnChunkPageStore<ATTR>[] getPageStores(
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
@NotNull final ColumnDefinition<?> columnDefinition) {
fetchValues(columnDefinition);
return pageStores;
Expand All @@ -270,7 +309,7 @@ public ColumnChunkPageStore<ATTR>[] getPageStores(
* @param columnDefinition The {@link ColumnDefinition} used to lookup type information
* @return The dictionary values chunk suppliers, or null if none exist
*/
public Supplier<Chunk<ATTR>>[] getDictionaryChunkSuppliers(
private Supplier<Chunk<ATTR>>[] getDictionaryChunkSuppliers(
@NotNull final ColumnDefinition<?> columnDefinition) {
fetchValues(columnDefinition);
return dictionaryChunkSuppliers;
Expand Down
Loading