Skip to content

Commit

Permalink
fix: DH-18395: Prefer bulk-unmanage whenever LivenessManager.unmanage…
Browse files Browse the repository at this point in the history
… is called on multiple referents (#6557)

* Make all looped LivenessManager.unmanage calls use the bulk variant, and improve documentation for the single variant to prompt use of the bulk variant
* Document TableLocationSubscriptionBuffer.LocationUpdate a bit better
  • Loading branch information
rcaudy committed Jan 13, 2025
1 parent 66fa5ca commit c210e48
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private static final class UnderlyingTableMaintainer extends ReferenceCountedLiv
@SuppressWarnings("FieldCanBeLocal") // We need to hold onto this reference for reachability purposes.
private final Runnable processNewLocationsUpdateRoot;

private final UpdateCommitter<UnderlyingTableMaintainer> removedLocationsComitter;
private final UpdateCommitter<UnderlyingTableMaintainer> removedLocationsCommitter;
private List<Table> removedConstituents = null;

private UnderlyingTableMaintainer(
Expand Down Expand Up @@ -156,12 +156,12 @@ protected void instrumentedRefresh() {
};
refreshCombiner.addSource(processNewLocationsUpdateRoot);

this.removedLocationsComitter = new UpdateCommitter<>(
this.removedLocationsCommitter = new UpdateCommitter<>(
this,
result.getUpdateGraph(),
ignored -> {
Assert.neqNull(removedConstituents, "removedConstituents");
removedConstituents.forEach(result::unmanage);
result.unmanage(removedConstituents.stream());
removedConstituents = null;
});
processPendingLocations(false);
Expand All @@ -170,7 +170,7 @@ protected void instrumentedRefresh() {
pendingLocationStates = null;
readyLocationStates = null;
processNewLocationsUpdateRoot = null;
removedLocationsComitter = null;
removedLocationsCommitter = null;
tableLocationProvider.refresh();

final Collection<TableLocation> locations = new ArrayList<>();
Expand Down Expand Up @@ -203,7 +203,8 @@ private QueryTable result() {
private RowSet sortAndAddLocations(@NotNull final Stream<TableLocation> locations) {
final long initialLastRowKey = resultRows.lastRowKey();
final MutableLong lastInsertedRowKey = new MutableLong(initialLastRowKey);
locations.sorted(Comparator.comparing(TableLocation::getKey)).forEach(tl -> {
// Note that makeConstituentTable expects us to subsequently unmanage the TableLocations
unmanage(locations.sorted(Comparator.comparing(TableLocation::getKey)).peek(tl -> {
final long constituentRowKey = lastInsertedRowKey.incrementAndGet();
final Table constituentTable = makeConstituentTable(tl);

Expand All @@ -216,7 +217,7 @@ private RowSet sortAndAddLocations(@NotNull final Stream<TableLocation> location
if (result.isRefreshing()) {
result.manage(constituentTable);
}
});
}));
return initialLastRowKey == lastInsertedRowKey.get()
? RowSetFactory.empty()
: RowSetFactory.fromRange(initialLastRowKey + 1, lastInsertedRowKey.get());
Expand All @@ -235,7 +236,7 @@ private Table makeConstituentTable(@NotNull final TableLocation tableLocation) {
// Transfer management to the constituent CSM. NOTE: this is likely to end up double-managed
// after the CSM adds the location to the table, but that's acceptable.
constituent.columnSourceManager.manage(tableLocation);
unmanage(tableLocation);
// Note that the caller is now responsible for unmanaging tableLocation on behalf of this.

// Be careful to propagate the systemic attribute properly to child tables
constituent.setAttribute(Table.SYSTEMIC_TABLE_ATTRIBUTE, result.isSystemicObject());
Expand Down Expand Up @@ -293,8 +294,12 @@ private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUp
readyLocationStates.offer(pendingLocationState);
}
}
final RowSet added = sortAndAddLocations(readyLocationStates.stream()
.map(PendingLocationState::release));

if (readyLocationStates.isEmpty()) {
return RowSetFactory.empty();
}

final RowSet added = sortAndAddLocations(readyLocationStates.stream().map(PendingLocationState::release));
readyLocationStates.clearFast();
return added;
}
Expand All @@ -312,14 +317,23 @@ private RowSet processRemovals(final TableLocationSubscriptionBuffer.LocationUpd
}

// Iterate through the pending locations and remove any that are in the removed set.
List<LivenessReferent> toUnmanage = null;
for (final Iterator<PendingLocationState> iter = pendingLocationStates.iterator(); iter.hasNext();) {
final PendingLocationState pendingLocationState = iter.next();
if (relevantRemovedLocations.contains(pendingLocationState.location.getKey())) {
iter.remove();
// Release the state and unmanage the location
unmanage(pendingLocationState.release());
// Release the state and plan to unmanage the location
if (toUnmanage == null) {
toUnmanage = new ArrayList<>();
}
toUnmanage.add(pendingLocationState.release());
}
}
if (toUnmanage != null) {
unmanage(toUnmanage.stream());
// noinspection UnusedAssignment
toUnmanage = null;
}

// At the end of the cycle we need to make sure we unmanage any removed constituents.
this.removedConstituents = new ArrayList<>(relevantRemovedLocations.size());
Expand Down Expand Up @@ -350,7 +364,7 @@ private RowSet processRemovals(final TableLocationSubscriptionBuffer.LocationUpd
removedConstituents = null;
return RowSetFactory.empty();
}
this.removedLocationsComitter.maybeActivate();
this.removedLocationsCommitter.maybeActivate();

final WritableRowSet deletedRows = deleteBuilder.build();
resultTableLocationKeys.setNull(deletedRows);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,9 @@ protected void endTransaction(@NotNull final Object token) {
}
// Release the keys that were removed after we have delivered the notifications and the
// subscribers have had a chance to process them
removedKeys.forEach(livenessManager::unmanage);
if (!removedKeys.isEmpty()) {
livenessManager.unmanage(removedKeys.stream());
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ private LocationUpdate() {

private void processAdd(@NotNull final LiveSupplier<ImmutableTableLocationKey> addedKeySupplier) {
final ImmutableTableLocationKey addedKey = addedKeySupplier.get();
// Note that we might have a remove for this key if it previously existed and is being replaced. Hence, we
// don't look for an existing remove, which is apparently asymmetric w.r.t. processRemove but still correct.
// Consumers of a LocationUpdate must process removes before adds.

// Need to verify that we don't have stacked adds (without intervening removes).
if (added.containsKey(addedKey)) {
throw new IllegalStateException("TableLocationKey " + addedKey
Expand Down Expand Up @@ -99,10 +103,16 @@ private void processTransaction(
}
}

/**
* @return The pending location keys to add. <em>Note that removes should be processed before adds.</em>
*/
public Collection<LiveSupplier<ImmutableTableLocationKey>> getPendingAddedLocationKeys() {
return added.values();
}

/**
* @return The pending location keys to remove. <em>Note that removes should be processed before adds.</em>
*/
public Collection<LiveSupplier<ImmutableTableLocationKey>> getPendingRemovedLocationKeys() {
return removed.values();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import io.deephaven.base.verify.Assert;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.engine.rowset.*;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.table.*;
Expand Down Expand Up @@ -323,12 +324,16 @@ private final class ChangeProcessingContext implements SafeCloseable {
* {@link #resultRows}. The truncating constituent and following will need to insert their entire shifted row
* set, and must update the next slot in {@link #currFirstRowKeys}.
*/
boolean slotAllocationChanged;
private boolean slotAllocationChanged;
/**
* The first key after which we began inserting shifted constituent row sets instead of trying for piecemeal
* updates.
*/
long firstTruncatedResultKey;
private long firstTruncatedResultKey;
/**
* Removed constituent listeners to bulk-unmanage.
*/
private List<LivenessReferent> toUnmanage;

private ChangeProcessingContext(@NotNull final TableUpdate constituentChanges) {
modifiedColumnSet.clear();
Expand Down Expand Up @@ -388,7 +393,10 @@ public void close() {
final SafeCloseable ignored3 = removedValues;
final SafeCloseable ignored4 = addedKeys;
final SafeCloseable ignored5 = modifiedKeys;
final SafeCloseable ignored6 = modifiedPreviousValues) {
final SafeCloseable ignored6 = modifiedPreviousValues;
final SafeCloseable ignored7 = toUnmanage == null
? null
: () -> mergedListener.unmanage(toUnmanage.stream())) {
}
// @formatter:on
}
Expand Down Expand Up @@ -504,7 +512,10 @@ private void processRemove(@NotNull final Table removedConstituent) {
listeners.remove();
}
removedConstituent.removeUpdateListener(nextListener);
mergedListener.unmanage(nextListener);
if (toUnmanage == null) {
toUnmanage = new ArrayList<>();
}
toUnmanage.add(nextListener);
advanceListener();
}
final long firstRemovedKey = prevFirstRowKeys[nextPreviousSlot];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,10 @@ protected void destroy() {
private synchronized void invalidateAndRelease() {
invalidatedLocations.forEach(IncludedTableLocationEntry::invalidate);
invalidatedLocations.clear();
releasedLocations.forEach(this::unmanage);
releasedLocations.clear();
if (!releasedLocations.isEmpty()) {
unmanage(releasedLocations.stream());
releasedLocations.clear();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ default void manage(@NotNull final LivenessReferent referent) {
/**
* If this manager manages {@code referent} one or more times, drop one such reference. If this manager is also a
* {@link LivenessReferent}, then it must also be live.
* <p>
* <em>Strongly prefer using {@link #unmanage(Stream)} when multiple referents should be unmanaged.</em>
*
* @param referent The referent to drop
*/
Expand All @@ -55,6 +57,8 @@ default void unmanage(@NotNull LivenessReferent referent) {
/**
* If this manager manages referent one or more times, drop one such reference. If this manager is also a
* {@link LivenessReferent}, then this method is a no-op if {@code this} is not live.
* <p>
* <em>Strongly prefer using {@link #tryUnmanage(Stream)}} when multiple referents should be unmanaged.</em>
*
* @param referent The referent to drop
* @return If this node is also a {@link LivenessReferent}, whether this node was live and thus in fact tried to
Expand Down

0 comments on commit c210e48

Please sign in to comment.