Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Delay reading from parquet file when creating table and column location #6590

Open
wants to merge 12 commits into
base: rc/v0.37.x
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ interface Listener extends BasicTableDataListener {

/**
* @param name The column name
* @return The ColumnLocation for the defined column under this table location
* @return The ColumnLocation for the defined column under this table location. The exact same ColumnLocation object
* should be returned for the same column name.
*/
@NotNull
ColumnLocation getColumnLocation(@NotNull CharSequence name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ public final Object getStateLock() {
}

@Override
public final RowSet getRowSet() {
public RowSet getRowSet() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are missing the need to override getLastModifiedTimeMillis in this regime.

That said, I think this is a case where we do want to provide a protected method for callers to override, but only for the TableLocationState methods. This allows us to leave these implementations final. Something like

    // ------------------------------------------------------------------------------------------------------------------
    // TableLocationState implementation
    // ------------------------------------------------------------------------------------------------------------------

    protected void initializeState() {
        
    }
    
    @Override
    @NotNull
    public final Object getStateLock() {
        initializeState();
        return state.getStateLock();
    }

    @Override
    public final RowSet getRowSet() {
        initializeState();
        return state.getRowSet();
    }

    @Override
    public final long getSize() {
        initializeState();
        return state.getSize();
    }

    @Override
    public final long getLastModifiedTimeMillis() {
        initializeState();
        return state.getLastModifiedTimeMillis();
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And then it is just a shim into your normal initialize.

    @Override
    protected void initializeState() {
        initialize();
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I felt that we don't need to call initialize if some caller is just asking for getLastModifiedTimeMillis and getStateLock because those two don't need underlying parquet file to be read.
Once the file is read, the last modified time will be updated.

return state.getRowSet();
}

@Override
public final long getSize() {
public long getSize() {
return state.getSize();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,9 @@ private class IncludedTableLocationEntry implements Comparable<IncludedTableLoca
// New regions indices are assigned in order of insertion, starting from 0 with no re-use of removed indices.
// If this logic changes, the `getTableAttributes()` logic needs to be updated.
private final int regionIndex = nextRegionIndex++;
private final List<ColumnLocationState<?>> columnLocationStates = new ArrayList<>();

// Collection of column sources for which we have added a region, useful for invalidating together
private final Collection<RegionedColumnSource<?>> regionedColumnSources = new ArrayList<>();

/**
* RowSet in the region's space, not the table's space.
Expand Down Expand Up @@ -631,13 +633,11 @@ private void processInitial(final RowSetBuilderSequential addedRowSetBuilder, fi
.appendRange(regionFirstKey + subRegionFirstKey, regionFirstKey + subRegionLastKey));

for (final ColumnDefinition<?> columnDefinition : columnDefinitions) {
// noinspection unchecked,rawtypes
final ColumnLocationState<?> state = new ColumnLocationState(
columnDefinition,
columnSources.get(columnDefinition.getName()),
location.getColumnLocation(columnDefinition.getName()));
columnLocationStates.add(state);
state.regionAllocated(regionIndex);
final RegionedColumnSource<?> regionedColumnSource = columnSources.get(columnDefinition.getName());
final ColumnLocation columnLocation = location.getColumnLocation(columnDefinition.getName());
Assert.eq(regionIndex, "regionIndex", regionedColumnSource.addRegion(columnDefinition, columnLocation),
"regionedColumnSource.addRegion((definition, location)");
regionedColumnSources.add(regionedColumnSource);
}

rowSetAtLastUpdate = initialRowSet;
Expand Down Expand Up @@ -710,7 +710,7 @@ private boolean pollUpdates(final RowSetBuilderSequential addedRowSetBuilder) {
}

private void invalidate() {
columnLocationStates.forEach(cls -> cls.source.invalidateRegion(regionIndex));
regionedColumnSources.forEach(source -> source.invalidateRegion(regionIndex));
}

@Override
Expand All @@ -734,30 +734,6 @@ public ImmutableTableLocationKey getKey(
}
};

/**
* Batches up a definition, source, and location for ease of use. Implements grouping maintenance.
*/
private static class ColumnLocationState<T> {

protected final ColumnDefinition<T> definition;
protected final RegionedColumnSource<T> source;
protected final ColumnLocation location;

private ColumnLocationState(
ColumnDefinition<T> definition,
RegionedColumnSource<T> source,
ColumnLocation location) {
this.definition = definition;
this.source = source;
this.location = location;
}

private void regionAllocated(final int regionIndex) {
Assert.eq(regionIndex, "regionIndex", source.addRegion(definition, location),
"source.addRegion((definition, location)");
}
}

public Map<String, Object> getTableAttributes(
@NotNull TableUpdateMode tableUpdateMode,
@NotNull TableUpdateMode tableLocationUpdateMode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import java.math.BigInteger;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.LongFunction;
Expand All @@ -58,13 +60,25 @@ final class ParquetColumnLocation<ATTR extends Values> extends AbstractColumnLoc
private static final int MAX_PAGE_CACHE_SIZE = Configuration.getInstance()
.getIntegerForClassWithDefault(ParquetColumnLocation.class, "maxPageCacheSize", 8192);

private final String columnName;
private final String parquetColumnName;


private volatile boolean readersInitialized;

// Access to following variables must be guarded by initializeReaders()
/**
* Factory object needed for deferred initialization of the remaining fields. Reference serves as a barrier to
* ensure visibility of the derived fields.
* ensure visibility of the derived fields. We delay initializing this field till we need to read the column data.
*/
private volatile ColumnChunkReader[] columnChunkReaders;

/**
* Whether the column location actually exists.
*/
private boolean exists;
// -----------------------------------------------------------------------

// We should consider moving this to column level if needed. Column-location level likely allows more parallelism.
private volatile PageCache<ATTR> pageCache;

Expand All @@ -77,16 +91,34 @@ final class ParquetColumnLocation<ATTR extends Values> extends AbstractColumnLoc
*
* @param tableLocation The table location enclosing this column location
* @param parquetColumnName The Parquet file column name
* @param columnChunkReaders The {@link ColumnChunkReader column chunk readers} for this location
*/
ParquetColumnLocation(
@NotNull final ParquetTableLocation tableLocation,
@NotNull final String columnName,
@NotNull final String parquetColumnName,
@Nullable final ColumnChunkReader[] columnChunkReaders) {
@NotNull final String parquetColumnName) {
super(tableLocation, columnName);
this.columnName = columnName;
this.parquetColumnName = parquetColumnName;
this.columnChunkReaders = columnChunkReaders;
this.readersInitialized = false;
}

private void initializeReaders() {
if (readersInitialized) {
return;
}
synchronized (this) {
if (readersInitialized) {
return;
}
final String[] columnPath = tl().getParquetColumnNameToPath().get(parquetColumnName);
final List<String> nameList =
columnPath == null ? Collections.singletonList(parquetColumnName) : Arrays.asList(columnPath);
final ColumnChunkReader[] columnChunkReaders = Arrays.stream(tl().getRowGroupReaders())
.map(rgr -> rgr.getColumnChunk(columnName, nameList)).toArray(ColumnChunkReader[]::new);
exists = Arrays.stream(columnChunkReaders).anyMatch(ccr -> ccr != null && ccr.numRows() > 0);
this.columnChunkReaders = exists ? columnChunkReaders : null;
readersInitialized = true;
}
}

private PageCache<ATTR> ensurePageCache() {
Expand Down Expand Up @@ -114,10 +146,8 @@ public String getImplementationName() {

@Override
public boolean exists() {
// If we see a null columnChunkReaders array, either we don't exist or we are guaranteed to
// see a non-null
// pageStores array
return columnChunkReaders != null || pageStores != null;
initializeReaders();
return exists;
}

private ParquetTableLocation tl() {
Expand Down Expand Up @@ -258,7 +288,7 @@ private <TYPE> ColumnRegionObject<TYPE, ATTR> makeSingleColumnRegionObject(
* @return The page stores
*/
@NotNull
public ColumnChunkPageStore<ATTR>[] getPageStores(
private ColumnChunkPageStore<ATTR>[] getPageStores(
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
@NotNull final ColumnDefinition<?> columnDefinition) {
fetchValues(columnDefinition);
return pageStores;
Expand All @@ -270,7 +300,7 @@ public ColumnChunkPageStore<ATTR>[] getPageStores(
* @param columnDefinition The {@link ColumnDefinition} used to lookup type information
* @return The dictionary values chunk suppliers, or null if none exist
*/
public Supplier<Chunk<ATTR>>[] getDictionaryChunkSuppliers(
private Supplier<Chunk<ATTR>>[] getDictionaryChunkSuppliers(
@NotNull final ColumnDefinition<?> columnDefinition) {
fetchValues(columnDefinition);
return dictionaryChunkSuppliers;
Expand All @@ -291,6 +321,7 @@ private ColumnChunkPageStore<DictionaryKeys>[] getDictionaryKeysPageStores(

@SuppressWarnings("unchecked")
private void fetchValues(@NotNull final ColumnDefinition<?> columnDefinition) {
initializeReaders();
if (columnChunkReaders == null) {
return;
}
Expand Down
Loading