Skip to content

Commit

Permalink
First crack at integrating a flag for turning off location existence …
Browse files Browse the repository at this point in the history
…checking in SourcePartitionedTable, as well as adding partitioning column values to the underlying partitioned Table
  • Loading branch information
rcaudy committed Jan 17, 2025
1 parent 475128a commit 26a3c0c
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
package io.deephaven.engine.table.impl;

import io.deephaven.base.verify.Assert;
import io.deephaven.engine.liveness.*;
import io.deephaven.engine.liveness.LiveSupplier;
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.liveness.ReferenceCountedLivenessNode;
import io.deephaven.engine.primitive.iterator.CloseableIterator;
import io.deephaven.engine.rowset.*;
import io.deephaven.engine.table.*;
Expand Down Expand Up @@ -54,20 +56,52 @@ public class SourcePartitionedTable extends PartitionedTableImpl {
* @param refreshSizes Whether the locations found should be refreshed
* @param locationKeyMatcher Function to filter desired location keys
*/
@Deprecated(forRemoval = true)
public SourcePartitionedTable(
@NotNull final TableDefinition constituentDefinition,
@NotNull final UnaryOperator<Table> applyTablePermissions,
@NotNull final TableLocationProvider tableLocationProvider,
final boolean refreshLocations,
final boolean refreshSizes,
@NotNull final Predicate<ImmutableTableLocationKey> locationKeyMatcher) {
this(constituentDefinition, applyTablePermissions, tableLocationProvider, refreshLocations, refreshSizes,
locationKeyMatcher, true);
}

/**
* Construct a {@link SourcePartitionedTable} from the supplied parameters.
* <p>
* Note that refreshLocations and refreshSizes are distinct because there are use cases that supply an external
* RowSet and hence don't require size refreshes. Others might care for size refreshes, but only the
* initially-available set of locations.
*
* @param constituentDefinition The {@link TableDefinition} expected of constituent {@link Table tables}
* @param applyTablePermissions Function to apply in order to correctly restrict the visible result rows
* @param tableLocationProvider Source for table locations
* @param refreshLocations Whether the set of locations should be refreshed
* @param refreshSizes Whether the locations found should be refreshed
* @param locationKeyMatcher Function to filter desired location keys
* @param preCheckExistence Whether to pre-check the existence (non-null, non-zero size) of locations before
* including them in the result SourcePartitionedTable as constituents. It is recommended to set this to
* {@code false} if you will do subsequent filtering on the result, or if you are confident that all
* locations are valid.
*/
public SourcePartitionedTable(
@NotNull final TableDefinition constituentDefinition,
@NotNull final UnaryOperator<Table> applyTablePermissions,
@NotNull final TableLocationProvider tableLocationProvider,
final boolean refreshLocations,
final boolean refreshSizes,
@NotNull final Predicate<ImmutableTableLocationKey> locationKeyMatcher,
final boolean preCheckExistence) {
super(new UnderlyingTableMaintainer(
constituentDefinition,
applyTablePermissions,
tableLocationProvider,
refreshLocations,
refreshSizes,
locationKeyMatcher).result(),
locationKeyMatcher,
preCheckExistence).result(),
Set.of(KEY_COLUMN_NAME),
true,
CONSTITUENT_COLUMN_NAME,
Expand All @@ -86,6 +120,8 @@ private static final class UnderlyingTableMaintainer extends ReferenceCountedLiv
private final Predicate<ImmutableTableLocationKey> locationKeyMatcher;

private final TrackingWritableRowSet resultRows;
private final String[] partitioningColumnNames;
private final WritableColumnSource<?>[] resultPartitionValues;
private final WritableColumnSource<TableLocationKey> resultTableLocationKeys;
private final WritableColumnSource<Table> resultLocationTables;
private final QueryTable result;
Expand All @@ -106,7 +142,8 @@ private UnderlyingTableMaintainer(
@NotNull final TableLocationProvider tableLocationProvider,
final boolean refreshLocations,
final boolean refreshSizes,
@NotNull final Predicate<ImmutableTableLocationKey> locationKeyMatcher) {
@NotNull final Predicate<ImmutableTableLocationKey> locationKeyMatcher,
final boolean preCheckExistence) {
super(false);

this.constituentDefinition = constituentDefinition;
Expand All @@ -116,10 +153,20 @@ private UnderlyingTableMaintainer(
this.locationKeyMatcher = locationKeyMatcher;

resultRows = RowSetFactory.empty().toTracking();
final List<ColumnDefinition<?>> partitioningColumns = constituentDefinition.getPartitioningColumns();
partitioningColumnNames = partitioningColumns.stream()
.map(ColumnDefinition::getName)
.toArray(String[]::new);
resultPartitionValues = partitioningColumns.stream()
.map(cd -> ArrayBackedColumnSource.getMemoryColumnSource(cd.getDataType(), cd.getComponentType()))
.toArray(WritableColumnSource[]::new);
resultTableLocationKeys = ArrayBackedColumnSource.getMemoryColumnSource(TableLocationKey.class, null);
resultLocationTables = ArrayBackedColumnSource.getMemoryColumnSource(Table.class, null);

final Map<String, ColumnSource<?>> resultSources = new LinkedHashMap<>(2);
final Map<String, ColumnSource<?>> resultSources = new LinkedHashMap<>(partitioningColumns.size() + 2);
for (int pci = 0; pci < partitioningColumns.size(); ++pci) {
resultSources.put(partitioningColumnNames[pci], resultPartitionValues[pci]);
}
resultSources.put(KEY_COLUMN_NAME, resultTableLocationKeys);
resultSources.put(CONSTITUENT_COLUMN_NAME, resultLocationTables);
result = new QueryTable(resultRows, resultSources);
Expand All @@ -135,14 +182,17 @@ private UnderlyingTableMaintainer(
}

if (needToRefreshLocations) {
Arrays.stream(resultPartitionValues).forEach(ColumnSource::startTrackingPrevValues);
resultTableLocationKeys.startTrackingPrevValues();
resultLocationTables.startTrackingPrevValues();

subscriptionBuffer = new TableLocationSubscriptionBuffer(tableLocationProvider);
manage(subscriptionBuffer);

pendingLocationStates = new IntrusiveDoublyLinkedQueue<>(
IntrusiveDoublyLinkedNode.Adapter.<PendingLocationState>getInstance());
pendingLocationStates = preCheckExistence
? new IntrusiveDoublyLinkedQueue<>(
IntrusiveDoublyLinkedNode.Adapter.<PendingLocationState>getInstance())
: null;
readyLocationStates = new IntrusiveDoublyLinkedQueue<>(
IntrusiveDoublyLinkedNode.Adapter.<PendingLocationState>getInstance());
processNewLocationsUpdateRoot = new InstrumentedTableUpdateSource(
Expand All @@ -151,7 +201,7 @@ private UnderlyingTableMaintainer(
+ "-processPendingLocations") {
@Override
protected void instrumentedRefresh() {
processPendingLocations(true);
processBufferedLocationChanges(true);
}
};
refreshCombiner.addSource(processNewLocationsUpdateRoot);
Expand All @@ -164,7 +214,7 @@ protected void instrumentedRefresh() {
result.unmanage(removedConstituents.stream());
removedConstituents = null;
});
processPendingLocations(false);
processBufferedLocationChanges(false);
} else {
subscriptionBuffer = null;
pendingLocationStates = null;
Expand Down Expand Up @@ -206,12 +256,17 @@ private RowSet sortAndAddLocations(@NotNull final Stream<TableLocation> location
// Note that makeConstituentTable expects us to subsequently unmanage the TableLocations
unmanage(locations.sorted(Comparator.comparing(TableLocation::getKey)).peek(tl -> {
final long constituentRowKey = lastInsertedRowKey.incrementAndGet();
final Table constituentTable = makeConstituentTable(tl);

for (int pci = 0; pci < resultPartitionValues.length; ++pci) {
addPartitionValue(tl.getKey(), partitioningColumnNames[pci], resultPartitionValues[pci],
constituentRowKey);
}

resultTableLocationKeys.ensureCapacity(constituentRowKey + 1);
resultTableLocationKeys.set(constituentRowKey, tl.getKey());

resultLocationTables.ensureCapacity(constituentRowKey + 1);
final Table constituentTable = makeConstituentTable(tl);
resultLocationTables.set(constituentRowKey, constituentTable);

if (result.isRefreshing()) {
Expand All @@ -223,6 +278,15 @@ private RowSet sortAndAddLocations(@NotNull final Stream<TableLocation> location
: RowSetFactory.fromRange(initialLastRowKey + 1, lastInsertedRowKey.get());
}

private static <T> void addPartitionValue(
@NotNull final TableLocationKey tableLocationKey,
@NotNull final String partitioningColumnName,
@NotNull final WritableColumnSource<T> partitionValueColumn,
final long rowKey) {
partitionValueColumn.ensureCapacity(rowKey + 1);
partitionValueColumn.set(rowKey, tableLocationKey.getPartitionValue(partitioningColumnName));
}

private Table makeConstituentTable(@NotNull final TableLocation tableLocation) {
final PartitionAwareSourceTable constituent = new PartitionAwareSourceTable(
constituentDefinition,
Expand All @@ -243,7 +307,7 @@ private Table makeConstituentTable(@NotNull final TableLocation tableLocation) {
return applyTablePermissions.apply(constituent);
}

private void processPendingLocations(final boolean notifyListeners) {
private void processBufferedLocationChanges(final boolean notifyListeners) {
final RowSet removed;
final RowSet added;

Expand All @@ -255,7 +319,8 @@ private void processPendingLocations(final boolean notifyListeners) {
removed = processRemovals(locationUpdate);
processAdditions(locationUpdate);
}
added = checkPendingLocations();
checkPendingLocations();
added = addReadyLocations();
}

if (removed == null) {
Expand Down Expand Up @@ -286,35 +351,35 @@ private void processPendingLocations(final boolean notifyListeners) {
}

private void processAdditions(final TableLocationSubscriptionBuffer.LocationUpdate locationUpdate) {
/*
* This block of code is unfortunate, because it largely duplicates the intent and effort of similar code in
* RegionedColumnSourceManager. I think that the RegionedColumnSourceManager could be changed to
* intermediate between TableLocationProvider and SourceTable or SourcePartitionedTable, allowing for much
* cleaner code in all three. The RCSM could then populate STM nodes or ST regions. We could also add a
* "RegionManager" to RegionedColumnSources, in order to eliminate the unnecessary post-initialization array
* population in STM ColumnSources.
*/
// TODO (https://github.com/deephaven/deephaven-core/issues/867): Refactor around a ticking partition table
if (locationUpdate != null) {
locationUpdate.getPendingAddedLocationKeys().stream()
.map(LiveSupplier::get)
.filter(locationKeyMatcher)
.map(tableLocationProvider::getTableLocation)
.peek(this::manage)
.map(PendingLocationState::new)
.forEach(pendingLocationStates::offer);
final Stream<PendingLocationState> newPendingLocations =
locationUpdate.getPendingAddedLocationKeys().stream()
.map(LiveSupplier::get)
.filter(locationKeyMatcher)
.map(tableLocationProvider::getTableLocation)
.peek(this::manage)
.map(PendingLocationState::new);
if (pendingLocationStates != null) {
newPendingLocations.forEach(pendingLocationStates::offer);
} else {
newPendingLocations.forEach(readyLocationStates::offer);
}
}

private RowSet checkPendingLocations() {
private void checkPendingLocations() {
if (pendingLocationStates == null) {
return;
}

for (final Iterator<PendingLocationState> iter = pendingLocationStates.iterator(); iter.hasNext();) {
final PendingLocationState pendingLocationState = iter.next();
if (pendingLocationState.exists()) {
iter.remove();
readyLocationStates.offer(pendingLocationState);
}
}
}

private RowSet addReadyLocations() {
if (readyLocationStates.isEmpty()) {
return null;
}
Expand All @@ -337,22 +402,24 @@ private RowSet processRemovals(final TableLocationSubscriptionBuffer.LocationUpd
}

// Iterate through the pending locations and remove any that are in the removed set.
List<LivenessReferent> toUnmanage = null;
for (final Iterator<PendingLocationState> iter = pendingLocationStates.iterator(); iter.hasNext();) {
final PendingLocationState pendingLocationState = iter.next();
if (relevantRemovedLocations.contains(pendingLocationState.location.getKey())) {
iter.remove();
// Release the state and plan to unmanage the location
if (toUnmanage == null) {
toUnmanage = new ArrayList<>();
if (pendingLocationStates != null) {
List<LivenessReferent> toUnmanage = null;
for (final Iterator<PendingLocationState> iter = pendingLocationStates.iterator(); iter.hasNext();) {
final PendingLocationState pendingLocationState = iter.next();
if (relevantRemovedLocations.contains(pendingLocationState.location.getKey())) {
iter.remove();
// Release the state and plan to unmanage the location
if (toUnmanage == null) {
toUnmanage = new ArrayList<>();
}
toUnmanage.add(pendingLocationState.release());
}
toUnmanage.add(pendingLocationState.release());
}
}
if (toUnmanage != null) {
unmanage(toUnmanage.stream());
// noinspection UnusedAssignment
toUnmanage = null;
if (toUnmanage != null) {
unmanage(toUnmanage.stream());
// noinspection UnusedAssignment
toUnmanage = null;
}
}

// At the end of the cycle we need to make sure we unmanage any removed constituents.
Expand Down Expand Up @@ -387,6 +454,7 @@ private RowSet processRemovals(final TableLocationSubscriptionBuffer.LocationUpd
this.removedLocationsCommitter.maybeActivate();

final WritableRowSet deletedRows = deleteBuilder.build();
Arrays.stream(resultPartitionValues).forEach(cs -> cs.setNull(deletedRows));
resultTableLocationKeys.setNull(deletedRows);
resultLocationTables.setNull(deletedRows);
return deletedRows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
/**
* A {@link TableLocationProvider} that provides access to exactly one, previously-known {@link TableLocation}. In
* contrast to {@link AbstractTableLocationProvider}, this class does not manage the liveness of the table location.
* Managment must be done externally (as in {@link io.deephaven.engine.table.impl.SourcePartitionedTable}).
* Management must be done externally (as in {@link io.deephaven.engine.table.impl.SourcePartitionedTable}).
*/
public final class SingleTableLocationProvider implements TableLocationProvider {
private static final String IMPLEMENTATION_NAME = SingleTableLocationProvider.class.getSimpleName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ private SourcePartitionedTable setUpData() {
tlp,
true,
true,
l -> true);
l -> true,
true);
}

private void verifyStringColumnContents(Table table, String columnName, String... expectedValues) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,14 @@ public Table makeTable(@NotNull final TableKeyImpl tableKey, final boolean live)
*
* @param tableKey The table key
* @param live Whether the result should update as new data becomes available
* @param preCheckExistence Whether to include only locations observed to have non-empty data
* @return The {@link PartitionedTable}
*/
@ScriptApi
public PartitionedTable makePartitionedTable(@NotNull final TableKeyImpl tableKey, final boolean live) {
public PartitionedTable makePartitionedTable(
@NotNull final TableKeyImpl tableKey,
final boolean live,
final boolean preCheckExistence) {
final TableLocationProviderImpl tableLocationProvider =
(TableLocationProviderImpl) getTableLocationProvider(tableKey);
return new SourcePartitionedTable(
Expand All @@ -132,7 +136,8 @@ public PartitionedTable makePartitionedTable(@NotNull final TableKeyImpl tableKe
tableLocationProvider,
live,
live,
tlk -> true);
tlk -> true,
preCheckExistence);
}

/**
Expand Down
Loading

0 comments on commit 26a3c0c

Please sign in to comment.