From 02127ae8c5278c1a1a4387bd5bf66620c8378e59 Mon Sep 17 00:00:00 2001 From: Ryan Caudy Date: Wed, 13 Nov 2024 13:07:01 -0500 Subject: [PATCH] fix: Address a number of liveness-related issues (#6366) * Clarify liveness requirements for interacting with TableLocationProvider * Address liveness guarantees for LiveSuppliers in AbstractTableLocationProvider * Address transient liveness errors in static SourcePartitionedTables for the UnderlyingTableMaintainer the underlying table itself * Ensure that WeakCleanupReferences returned by the underlying LivenessReferent of a DelegatingLivenessReferent are held by the delegating implementation's getWeakReference() result * Clean up a few instances where we invoke super.destroy() in a different order than the usual from ReferenceCountedLivenessNode subclasses * Annotate ReferenceCountedLivenessNode.destroy() with @OverridingMethodsMustInvokeSuper, and ensure that this is followed in all but one documented exceptional case * RetainedReferenceTracker should only do immediate CleanupReference.cleanup() for its own (idempotent) implementation * Rename RetainedReferenceTracker.DropState to be less ambiguous * Address reachability during deferred drop for LivenessReferents retained by a RetainedReferenceTracker with enforceStrongReachability=true * Remove a few redundant LivenessScopes in QueryTableAjTest * Preserve reachability at time of enqueue when not done via cleanup. Use enforceStrongReachability more places in the testing framework and QueryTableAjTest to avoid future issues. * Separate enqueueing references to be dropped (now done before destroy) from dropping enqueued references * Make LivenessScopeStack play better with other LivenessManager implementations, not just LivenessScope --- .../impl/AddOnlyToBlinkTableAdapter.java | 4 +- .../engine/table/impl/AsOfJoinHelper.java | 3 + .../engine/table/impl/BaseTable.java | 4 + ...nstrumentedTableUpdateListenerAdapter.java | 3 + .../engine/table/impl/ListenerRecorder.java | 3 + .../engine/table/impl/MergedListener.java | 5 +- ...tObliviousInstrumentedListenerAdapter.java | 3 + .../table/impl/SourcePartitionedTable.java | 23 +- .../engine/table/impl/SourceTable.java | 4 + .../engine/table/impl/TimeTable.java | 2 + .../engine/table/impl/UpdatableTable.java | 2 + .../table/impl/UpdateSourceQueryTable.java | 2 + .../BucketedChunkedAjMergedListener.java | 3 + .../hierarchical/HierarchicalTableImpl.java | 2 + .../impl/locations/TableLocationProvider.java | 14 +- .../locations/impl/AbstractTableLocation.java | 3 + .../impl/AbstractTableLocationProvider.java | 22 +- .../select/BaseIncrementalReleaseFilter.java | 2 + .../engine/table/impl/select/ClockFilter.java | 2 + .../impl/select/RollingReleaseFilter.java | 2 + .../util/FunctionGeneratedTableFactory.java | 2 + .../impl/util/HashSetBackedTableFactory.java | 2 + .../engine/util/AbstractScriptSession.java | 2 + .../io/deephaven/engine/util/WindowCheck.java | 2 + .../stream/StreamToBlinkTableAdapter.java | 3 + .../engine/table/impl/QueryTableAjTest.java | 242 ++++++++-------- .../impl/TestPartitionAwareSourceTable.java | 2 + .../engine/testutil/EvalNuggetSet.java | 3 +- .../deephaven/engine/testutil/TstUtils.java | 3 +- .../liveness/DelegatingLivenessReferent.java | 11 +- .../engine/liveness/LivenessScopeStack.java | 28 +- .../ReferenceCountedLivenessNode.java | 6 +- .../ReferenceCountedLivenessReferent.java | 2 + .../liveness/RetainedReferenceTracker.java | 274 +++++++++++------- .../updategraph/UpdateSourceCombiner.java | 2 + .../barrage/table/BarrageTable.java | 2 + .../kafka/publish/PublishToKafka.java | 2 + .../client/examples/SubscribeExampleBase.java | 2 + .../client/impl/BarrageSnapshotImpl.java | 2 + .../client/impl/BarrageSubscriptionImpl.java | 2 + .../barrage/BarrageMessageProducer.java | 3 + .../HierarchicalTableViewSubscription.java | 2 + .../plugin/python/LivePyObjectWrapper.java | 3 + .../server/session/SessionState.java | 2 + .../table/ExportedTableUpdateListener.java | 3 + 45 files changed, 458 insertions(+), 257 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/AddOnlyToBlinkTableAdapter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/AddOnlyToBlinkTableAdapter.java index 70d13d47142..cee9a57a23a 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/AddOnlyToBlinkTableAdapter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/AddOnlyToBlinkTableAdapter.java @@ -14,6 +14,7 @@ import org.apache.commons.lang3.mutable.MutableObject; import org.jetbrains.annotations.NotNull; +import javax.annotation.OverridingMethodsMustInvokeSuper; import java.util.List; import java.util.Map; @@ -121,10 +122,11 @@ public void run() { } } + @OverridingMethodsMustInvokeSuper @Override protected void destroy() { - getUpdateGraph().removeSource(this); super.destroy(); + getUpdateGraph().removeSource(this); } } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/AsOfJoinHelper.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/AsOfJoinHelper.java index b301024da74..92ee2074d11 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/AsOfJoinHelper.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/AsOfJoinHelper.java @@ -38,6 +38,7 @@ import io.deephaven.util.SafeCloseableList; import org.jetbrains.annotations.NotNull; +import javax.annotation.OverridingMethodsMustInvokeSuper; import java.util.*; import java.util.function.Function; import java.util.function.Supplier; @@ -1335,6 +1336,7 @@ public void onUpdate(TableUpdate upstream) { result.notifyListeners(downstream); } + @OverridingMethodsMustInvokeSuper @Override protected void destroy() { super.destroy(); @@ -1516,6 +1518,7 @@ public void onUpdate(TableUpdate upstream) { } } + @OverridingMethodsMustInvokeSuper @Override protected void destroy() { super.destroy(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/BaseTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/BaseTable.java index a66ac84e0f9..e0a1e4d1102 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/BaseTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/BaseTable.java @@ -44,6 +44,7 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import javax.annotation.OverridingMethodsMustInvokeSuper; import java.io.*; import java.util.*; import java.util.concurrent.TimeUnit; @@ -923,6 +924,7 @@ public boolean canExecute(final long step) { return parent.satisfied(step); } + @OverridingMethodsMustInvokeSuper @Override protected void destroy() { super.destroy(); @@ -980,6 +982,7 @@ public boolean canExecute(final long step) { return parent.satisfied(step); } + @OverridingMethodsMustInvokeSuper @Override protected void destroy() { super.destroy(); @@ -1321,6 +1324,7 @@ public T createSnapshotControlIfRefreshing( // Reference Counting // ------------------------------------------------------------------------------------------------------------------ + @OverridingMethodsMustInvokeSuper @Override protected void destroy() { super.destroy(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/InstrumentedTableUpdateListenerAdapter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/InstrumentedTableUpdateListenerAdapter.java index 98def70706e..91a18e9f2d2 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/InstrumentedTableUpdateListenerAdapter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/InstrumentedTableUpdateListenerAdapter.java @@ -17,6 +17,7 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import javax.annotation.OverridingMethodsMustInvokeSuper; import java.io.IOException; /** @@ -92,8 +93,10 @@ public boolean canExecute(final long step) { return source.satisfied(step); } + @OverridingMethodsMustInvokeSuper @Override protected void destroy() { + super.destroy(); source.removeUpdateListener(this); if (retain) { RETENTION_CACHE.forget(this); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/ListenerRecorder.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/ListenerRecorder.java index f0f5bcb1b22..84ac02ec539 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/ListenerRecorder.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/ListenerRecorder.java @@ -14,6 +14,8 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import javax.annotation.OverridingMethodsMustInvokeSuper; + /** * A listener recorder stores references to added, removed, modified, and shifted indices; and then notifies a * {@link MergedListener} that a change has occurred. The combination of a {@link ListenerRecorder} and @@ -90,6 +92,7 @@ public boolean canExecute(final long step) { return parent.satisfied(step); } + @OverridingMethodsMustInvokeSuper @Override protected void destroy() { super.destroy(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/MergedListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/MergedListener.java index 6b25ddfbce0..b4a2462f219 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/MergedListener.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/MergedListener.java @@ -24,6 +24,7 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import javax.annotation.OverridingMethodsMustInvokeSuper; import java.io.IOException; import java.lang.ref.WeakReference; import java.util.Collection; @@ -174,11 +175,13 @@ protected void propagateError( } protected boolean systemicResult() { - return result == null ? false : SystemicObjectTracker.isSystemic(result); + return result != null && SystemicObjectTracker.isSystemic(result); } + @OverridingMethodsMustInvokeSuper @Override protected void destroy() { + super.destroy(); recorders.forEach(ListenerRecorder::forceReferenceCountToZero); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/ShiftObliviousInstrumentedListenerAdapter.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/ShiftObliviousInstrumentedListenerAdapter.java index 210af570042..fc94a599b97 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/ShiftObliviousInstrumentedListenerAdapter.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/ShiftObliviousInstrumentedListenerAdapter.java @@ -16,6 +16,7 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import javax.annotation.OverridingMethodsMustInvokeSuper; import java.io.IOException; /** @@ -90,8 +91,10 @@ public boolean canExecute(final long step) { return source.satisfied(step); } + @OverridingMethodsMustInvokeSuper @Override protected void destroy() { + super.destroy(); source.removeUpdateListener(this); if (retain) { RETENTION_CACHE.forget(this); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java index 3be41bb5109..77e0c5ca2c4 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java @@ -174,11 +174,20 @@ protected void instrumentedRefresh() { tableLocationProvider.refresh(); final Collection locations = new ArrayList<>(); - tableLocationProvider.getTableLocationKeys( - tlk -> locations.add(tableLocationProvider.getTableLocation(tlk.get())), - locationKeyMatcher); - try (final RowSet added = sortAndAddLocations(locations.stream())) { - resultRows.insert(added); + try { + retainReference(); + tableLocationProvider.getTableLocationKeys( + (final LiveSupplier lstlk) -> { + final TableLocation tableLocation = tableLocationProvider.getTableLocation(lstlk.get()); + manage(tableLocation); + locations.add(tableLocation); + }, + locationKeyMatcher); + try (final RowSet added = sortAndAddLocations(locations.stream())) { + resultRows.insert(added); + } + } finally { + dropReference(); } } @@ -204,7 +213,9 @@ private RowSet sortAndAddLocations(@NotNull final Stream location resultLocationTables.ensureCapacity(constituentRowKey + 1); resultLocationTables.set(constituentRowKey, constituentTable); - result.manage(constituentTable); + if (result.isRefreshing()) { + result.manage(constituentTable); + } }); return initialLastRowKey == lastInsertedRowKey.get() ? RowSetFactory.empty() diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java index bab3e560b05..88dd71fc12f 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourceTable.java @@ -25,6 +25,7 @@ import org.apache.commons.lang3.mutable.MutableObject; import org.jetbrains.annotations.NotNull; +import javax.annotation.OverridingMethodsMustInvokeSuper; import java.util.ArrayList; import java.util.Collection; @@ -310,9 +311,11 @@ public boolean subscribeForUpdates(@NotNull final TableUpdateListener listener) } if (snapshotControl != null) { + // noinspection MethodDoesntCallSuperMethod final ListenerImpl listener = new ListenerImpl("SourceTable.coalesce", this, resultTable) { + @OverridingMethodsMustInvokeSuper @Override protected void destroy() { // This impl cannot call super.destroy() because we must unsubscribe from the actual @@ -330,6 +333,7 @@ protected void destroy() { return result.getValue(); } + @OverridingMethodsMustInvokeSuper @Override protected void destroy() { super.destroy(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/TimeTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/TimeTable.java index 0c78ff2ff21..44229f6adcd 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/TimeTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/TimeTable.java @@ -29,6 +29,7 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import javax.annotation.OverridingMethodsMustInvokeSuper; import java.time.Duration; import java.time.Instant; import java.util.Collections; @@ -215,6 +216,7 @@ private void refresh(final boolean notifyListeners) { } } + @OverridingMethodsMustInvokeSuper @Override protected void destroy() { super.destroy(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdatableTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdatableTable.java index aee39b19b48..eef937f0542 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdatableTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdatableTable.java @@ -13,6 +13,7 @@ import gnu.trove.set.hash.TLongHashSet; import org.jetbrains.annotations.NotNull; +import javax.annotation.OverridingMethodsMustInvokeSuper; import java.util.Map; import java.util.function.Consumer; @@ -144,6 +145,7 @@ protected void doNotifyListeners(TableUpdate update) { notifyListeners(update); } + @OverridingMethodsMustInvokeSuper @Override public void destroy() { super.destroy(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateSourceQueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateSourceQueryTable.java index f4ea3c8f10c..a25d1817221 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateSourceQueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateSourceQueryTable.java @@ -10,6 +10,7 @@ import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph; import io.deephaven.engine.table.ColumnSource; +import javax.annotation.OverridingMethodsMustInvokeSuper; import java.util.Map; /** @@ -47,6 +48,7 @@ public synchronized void addRowKeyRange(final long firstRowKey, final long lastR additionsBuilder.addRange(firstRowKey, lastRowKey); } + @OverridingMethodsMustInvokeSuper @Override public void destroy() { super.destroy(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/asofjoin/BucketedChunkedAjMergedListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/asofjoin/BucketedChunkedAjMergedListener.java index a01dd2442b9..8496a578178 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/asofjoin/BucketedChunkedAjMergedListener.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/asofjoin/BucketedChunkedAjMergedListener.java @@ -32,6 +32,7 @@ import io.deephaven.util.SafeCloseable; import org.apache.commons.lang3.mutable.MutableObject; +import javax.annotation.OverridingMethodsMustInvokeSuper; import java.util.Arrays; import java.util.Collections; @@ -786,8 +787,10 @@ private RowSet indexFromBuilder(int slotIndex) { return rowSet; } + @OverridingMethodsMustInvokeSuper @Override protected void destroy() { + super.destroy(); leftSsaFactory.close(); rightSsaFactory.close(); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/hierarchical/HierarchicalTableImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/hierarchical/HierarchicalTableImpl.java index 052168e40ab..8e95c6c3eb3 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/hierarchical/HierarchicalTableImpl.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/hierarchical/HierarchicalTableImpl.java @@ -37,6 +37,7 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import javax.annotation.OverridingMethodsMustInvokeSuper; import java.util.*; import java.util.function.Function; import java.util.function.LongUnaryOperator; @@ -388,6 +389,7 @@ private void releaseSnapshotResources() { perLevelSharedContexts.clear(); } + @OverridingMethodsMustInvokeSuper @Override protected void destroy() { super.destroy(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/TableLocationProvider.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/TableLocationProvider.java index b6d7c94439e..55751c65901 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/TableLocationProvider.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/TableLocationProvider.java @@ -133,7 +133,7 @@ default void handleTableLocationKeysUpdate( /** * Get this provider's currently known location keys. The locations specified by the keys returned may have null * size - that is, they may not "exist" for application purposes. {@link #getTableLocation(TableLocationKey)} is - * guaranteed to succeed for all results. + * guaranteed to succeed for all results as long as the associated {@link LiveSupplier} is retained by the caller. */ @TestUseOnly default Collection getTableLocationKeys() { @@ -145,7 +145,7 @@ default Collection getTableLocationKeys() { /** * Get this provider's currently known location keys. The locations specified by the keys returned may have null * size - that is, they may not "exist" for application purposes. {@link #getTableLocation(TableLocationKey)} is - * guaranteed to succeed for all results. + * guaranteed to succeed for all results as long as the associated {@link LiveSupplier} is retained by the caller. * * @param consumer A consumer to receive the location keys */ @@ -156,7 +156,7 @@ default void getTableLocationKeys(Consumer[] leftColumnInfo; - final int leftSize = 32000; - final int rightSize = 32000; - final QueryTable leftTable = getTable(true, 100000, random, - leftColumnInfo = initColumnInfos(new String[] {"Bucket", "LeftStamp", "LeftSentinel"}, - new StringGenerator(leftSize), - new IntGenerator(0, 100000), - new IntGenerator(10_000_000, 10_010_000))); - final ColumnInfo[] rightColumnInfo; - final QueryTable rightTable = getTable(true, 100000, random, - rightColumnInfo = initColumnInfos(new String[] {"Bucket", "RightStamp", "RightSentinel"}, - new StringGenerator(leftSize), - new SortedIntGenerator(0, 100000), - new IntGenerator(20_000_000, 20_010_000))); - - final EvalNuggetInterface[] en = new EvalNuggetInterface[] { - new EvalNugget() { - @Override - protected Table e() { - return AsOfJoinHelper.asOfJoin(QueryTableJoinTest.SMALL_RIGHT_CONTROL, - (QueryTable) leftTable.sort("LeftStamp"), rightTable, - MatchPairFactory.getExpressions("Bucket", "LeftStamp=RightStamp"), - MatchPairFactory.getExpressions("RightSentinel"), SortingOrder.Ascending, false); - } - }, - }; - - for (int step = 0; step < maxSteps; step++) { - System.out.println("Step = " + step + ", leftSize=" + leftSize + ", rightSize=" + rightSize - + ", seed = " + seed + ", joinIncrement=" + joinIncrement); - if (RefreshingTableTestCase.printTableUpdates) { - System.out.println("Left Table:" + leftTable.size()); - TableTools.showWithRowSet(leftTable, 100); - System.out.println("Right Table:" + rightTable.size()); - TableTools.showWithRowSet(rightTable, 100); - } - joinIncrement.step(leftSize, rightSize, leftTable, rightTable, leftColumnInfo, rightColumnInfo, en, - random); + final JoinIncrement joinIncrement = base.leftRightStep; + final int seed = 0; + final Random random = new Random(seed); + final int maxSteps = 5; + + final ColumnInfo[] leftColumnInfo; + final int leftSize = 32000; + final int rightSize = 32000; + final QueryTable leftTable = getTable(true, 100000, random, + leftColumnInfo = initColumnInfos(new String[] {"Bucket", "LeftStamp", "LeftSentinel"}, + new StringGenerator(leftSize), + new IntGenerator(0, 100000), + new IntGenerator(10_000_000, 10_010_000))); + final ColumnInfo[] rightColumnInfo; + final QueryTable rightTable = getTable(true, 100000, random, + rightColumnInfo = initColumnInfos(new String[] {"Bucket", "RightStamp", "RightSentinel"}, + new StringGenerator(leftSize), + new SortedIntGenerator(0, 100000), + new IntGenerator(20_000_000, 20_010_000))); + + final EvalNuggetInterface[] en = new EvalNuggetInterface[] { + new EvalNugget() { + @Override + protected Table e() { + return AsOfJoinHelper.asOfJoin(QueryTableJoinTest.SMALL_RIGHT_CONTROL, + (QueryTable) leftTable.sort("LeftStamp"), rightTable, + MatchPairFactory.getExpressions("Bucket", "LeftStamp=RightStamp"), + MatchPairFactory.getExpressions("RightSentinel"), SortingOrder.Ascending, false); + } + }, + }; + + for (int step = 0; step < maxSteps; step++) { + System.out.println("Step = " + step + ", leftSize=" + leftSize + ", rightSize=" + rightSize + + ", seed = " + seed + ", joinIncrement=" + joinIncrement); + if (RefreshingTableTestCase.printTableUpdates) { + System.out.println("Left Table:" + leftTable.size()); + TableTools.showWithRowSet(leftTable, 100); + System.out.println("Right Table:" + rightTable.size()); + TableTools.showWithRowSet(rightTable, 100); } + joinIncrement.step(leftSize, rightSize, leftTable, rightTable, leftColumnInfo, rightColumnInfo, en, + random); } } @@ -1642,37 +1644,35 @@ protected Table e() { */ @Test public void testDHC3080() { - try (final SafeCloseable ignored = LivenessScopeStack.open()) { - final int seed = 0; - final Random random = new Random(seed); - - final int leftSize = 32000; - - // fairly small LHS will speed up detection of the error but will not affect correctness - final QueryTable leftTable = getTable(true, 1000, random, - initColumnInfos(new String[] {"Bucket", "LeftStamp", "LeftSentinel"}, - new StringGenerator(leftSize), - new IntGenerator(0, 100000), - new IntGenerator(10_000_000, 10_010_000))); - - // need RHS with unique bucket count > rehash threshold of 4096 - final QueryTable rightTable = getTable(true, 32000, random, - initColumnInfos(new String[] {"Bucket", "RightStamp", "RightSentinel"}, - new StringGenerator(leftSize), - new SortedIntGenerator(0, 100000), - new IntGenerator(20_000_000, 20_010_000))); - - final Table result = AsOfJoinHelper.asOfJoin(QueryTableJoinTest.SMALL_LEFT_CONTROL, leftTable, - (QueryTable) rightTable.reverse(), - MatchPairFactory.getExpressions("Bucket", "LeftStamp=RightStamp"), - MatchPairFactory.getExpressions("RightStamp", "RightSentinel"), SortingOrder.Descending, true); - - // force compare results of the bucketed output, we cannot compare static to incremental as in other tests - // because static will experience the same error when performing `rehashInternalFull()` - checkAjResults(result.partitionBy("Bucket"), leftTable.partitionBy("Bucket"), - rightTable.partitionBy("Bucket"), - true, true); - } + final int seed = 0; + final Random random = new Random(seed); + + final int leftSize = 32000; + + // fairly small LHS will speed up detection of the error but will not affect correctness + final QueryTable leftTable = getTable(true, 1000, random, + initColumnInfos(new String[] {"Bucket", "LeftStamp", "LeftSentinel"}, + new StringGenerator(leftSize), + new IntGenerator(0, 100000), + new IntGenerator(10_000_000, 10_010_000))); + + // need RHS with unique bucket count > rehash threshold of 4096 + final QueryTable rightTable = getTable(true, 32000, random, + initColumnInfos(new String[] {"Bucket", "RightStamp", "RightSentinel"}, + new StringGenerator(leftSize), + new SortedIntGenerator(0, 100000), + new IntGenerator(20_000_000, 20_010_000))); + + final Table result = AsOfJoinHelper.asOfJoin(QueryTableJoinTest.SMALL_LEFT_CONTROL, leftTable, + (QueryTable) rightTable.reverse(), + MatchPairFactory.getExpressions("Bucket", "LeftStamp=RightStamp"), + MatchPairFactory.getExpressions("RightStamp", "RightSentinel"), SortingOrder.Descending, true); + + // force compare results of the bucketed output, we cannot compare static to incremental as in other tests + // because static will experience the same error when performing `rehashInternalFull()` + checkAjResults(result.partitionBy("Bucket"), leftTable.partitionBy("Bucket"), + rightTable.partitionBy("Bucket"), + true, true); } /** @@ -1682,45 +1682,43 @@ public void testDHC3080() { */ @Test public void testDHC4700() { - try (final SafeCloseable ignored = LivenessScopeStack.open()) { - final int seed = 0; - final Random random = new Random(seed); - - final ColumnInfo[] leftColumnInfo; - final ColumnInfo[] rightColumnInfo; - - // Small initial tables. - final int leftSize = 2; - final int rightSize = 2; - final QueryTable leftTable = getTable(true, leftSize, random, - leftColumnInfo = initColumnInfos(new String[] {"Bucket", "LeftStamp", "LeftSentinel"}, - new StringGenerator(100_000), - new IntGenerator(0, 100_000), - new IntGenerator(10_000_000, 10_010_000))); - final QueryTable rightTable = getTable(true, rightSize, random, - rightColumnInfo = initColumnInfos(new String[] {"Bucket", "RightStamp", "RightSentinel"}, - new StringGenerator(100_000), - new SortedIntGenerator(0, 100_000), - new IntGenerator(20_000_000, 20_010_000))); - - final Table result = AsOfJoinHelper.asOfJoin(QueryTableJoinTest.SMALL_LEFT_CONTROL, leftTable, - (QueryTable) rightTable.reverse(), - MatchPairFactory.getExpressions("Bucket", "LeftStamp=RightStamp"), - MatchPairFactory.getExpressions("RightStamp", "RightSentinel"), SortingOrder.Descending, true); + final int seed = 0; + final Random random = new Random(seed); - final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); - updateGraph.runWithinUnitTestCycle(() -> { - // Large updates to force a partial rehash. - GenerateTableUpdates.generateShiftAwareTableUpdates(GenerateTableUpdates.DEFAULT_PROFILE, 100_000, - random, leftTable, leftColumnInfo); - GenerateTableUpdates.generateShiftAwareTableUpdates(GenerateTableUpdates.DEFAULT_PROFILE, 100_000, - random, rightTable, rightColumnInfo); - }); + final ColumnInfo[] leftColumnInfo; + final ColumnInfo[] rightColumnInfo; - // Compare results of the bucketed output. - checkAjResults(result.partitionBy("Bucket"), leftTable.partitionBy("Bucket"), - rightTable.partitionBy("Bucket"), - true, true); - } + // Small initial tables. + final int leftSize = 2; + final int rightSize = 2; + final QueryTable leftTable = getTable(true, leftSize, random, + leftColumnInfo = initColumnInfos(new String[] {"Bucket", "LeftStamp", "LeftSentinel"}, + new StringGenerator(100_000), + new IntGenerator(0, 100_000), + new IntGenerator(10_000_000, 10_010_000))); + final QueryTable rightTable = getTable(true, rightSize, random, + rightColumnInfo = initColumnInfos(new String[] {"Bucket", "RightStamp", "RightSentinel"}, + new StringGenerator(100_000), + new SortedIntGenerator(0, 100_000), + new IntGenerator(20_000_000, 20_010_000))); + + final Table result = AsOfJoinHelper.asOfJoin(QueryTableJoinTest.SMALL_LEFT_CONTROL, leftTable, + (QueryTable) rightTable.reverse(), + MatchPairFactory.getExpressions("Bucket", "LeftStamp=RightStamp"), + MatchPairFactory.getExpressions("RightStamp", "RightSentinel"), SortingOrder.Descending, true); + + final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + updateGraph.runWithinUnitTestCycle(() -> { + // Large updates to force a partial rehash. + GenerateTableUpdates.generateShiftAwareTableUpdates(GenerateTableUpdates.DEFAULT_PROFILE, 100_000, + random, leftTable, leftColumnInfo); + GenerateTableUpdates.generateShiftAwareTableUpdates(GenerateTableUpdates.DEFAULT_PROFILE, 100_000, + random, rightTable, rightColumnInfo); + }); + + // Compare results of the bucketed output. + checkAjResults(result.partitionBy("Bucket"), leftTable.partitionBy("Bucket"), + rightTable.partitionBy("Bucket"), + true, true); } } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestPartitionAwareSourceTable.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestPartitionAwareSourceTable.java index 9446b49be8a..a755e0253b5 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestPartitionAwareSourceTable.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestPartitionAwareSourceTable.java @@ -34,6 +34,7 @@ import org.junit.Before; import org.junit.Test; +import javax.annotation.OverridingMethodsMustInvokeSuper; import java.lang.ref.WeakReference; import java.util.*; import java.util.stream.Collectors; @@ -72,6 +73,7 @@ public synchronized void setTableLocation(final TableLocation tableLocation) { this.tableLocation = tableLocation; } + @OverridingMethodsMustInvokeSuper @Override protected synchronized void destroy() { super.destroy(); diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/EvalNuggetSet.java b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/EvalNuggetSet.java index ceea8e134ad..c49ce7cfd95 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/EvalNuggetSet.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/EvalNuggetSet.java @@ -3,6 +3,7 @@ // package io.deephaven.engine.testutil; +import io.deephaven.engine.liveness.LivenessScope; import io.deephaven.engine.liveness.LivenessScopeStack; import io.deephaven.engine.table.Table; import io.deephaven.engine.util.TableTools; @@ -21,7 +22,7 @@ public EvalNuggetSet(String description) { @Override public void validate(final String msg) { - try (final SafeCloseable ignored = LivenessScopeStack.open()) { + try (final SafeCloseable ignored = LivenessScopeStack.open(new LivenessScope(true), true)) { final Table expected = e(); try { TableTools.show(expected); diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/TstUtils.java b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/TstUtils.java index 4975d53fc13..9518badf6af 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/TstUtils.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/TstUtils.java @@ -12,6 +12,7 @@ import io.deephaven.chunk.WritableLongChunk; import io.deephaven.chunk.attributes.Values; import io.deephaven.configuration.Configuration; +import io.deephaven.engine.liveness.LivenessScope; import io.deephaven.engine.liveness.LivenessScopeStack; import io.deephaven.engine.liveness.LivenessStateException; import io.deephaven.engine.rowset.*; @@ -430,7 +431,7 @@ public static void validate(final String ctxt, final EvalNuggetInterface[] en) { System.out.println("================ NEXT ITERATION ================"); } for (int i = 0; i < en.length; i++) { - try (final SafeCloseable ignored = LivenessScopeStack.open()) { + try (final SafeCloseable ignored = LivenessScopeStack.open(new LivenessScope(true), true)) { if (RefreshingTableTestCase.printTableUpdates) { if (i != 0) { System.out.println("================ NUGGET (" + i + ") ================"); diff --git a/engine/updategraph/src/main/java/io/deephaven/engine/liveness/DelegatingLivenessReferent.java b/engine/updategraph/src/main/java/io/deephaven/engine/liveness/DelegatingLivenessReferent.java index 71a861f74be..6db44a3f860 100644 --- a/engine/updategraph/src/main/java/io/deephaven/engine/liveness/DelegatingLivenessReferent.java +++ b/engine/updategraph/src/main/java/io/deephaven/engine/liveness/DelegatingLivenessReferent.java @@ -3,6 +3,8 @@ // package io.deephaven.engine.liveness; +import io.deephaven.util.annotations.ReferentialIntegrity; + import java.lang.ref.WeakReference; /** @@ -32,6 +34,13 @@ default void dropReference() { @Override default WeakReference getWeakReference() { // Must return a WeakReference to the DelegatingLivenessReferent, not the underlying LivenessReferent - return new WeakReference<>(this); + return new WeakReference<>(this) { + /** + * Hold this reference to ensure that any cleanup that requires the strong reachability of our underlying + * LivenessReferent's WeakReference happens. + */ + @ReferentialIntegrity + private final WeakReference delegate = asLivenessReferent().getWeakReference(); + }; } } diff --git a/engine/updategraph/src/main/java/io/deephaven/engine/liveness/LivenessScopeStack.java b/engine/updategraph/src/main/java/io/deephaven/engine/liveness/LivenessScopeStack.java index 042738df654..fd3f85cef04 100644 --- a/engine/updategraph/src/main/java/io/deephaven/engine/liveness/LivenessScopeStack.java +++ b/engine/updategraph/src/main/java/io/deephaven/engine/liveness/LivenessScopeStack.java @@ -28,7 +28,7 @@ public class LivenessScopeStack { private static final ThreadLocal THREAD_BASE_MANAGER = ThreadLocal.withInitial(PermanentLivenessManager::new); - private final Deque stack = new ArrayDeque<>(); + private final Deque stack = new ArrayDeque<>(); private LivenessScopeStack() {} @@ -38,7 +38,7 @@ private LivenessScopeStack() {} * * @param scope The scope */ - public static void push(@NotNull final LivenessScope scope) { + public static void push(@NotNull final LivenessManager scope) { THREAD_STACK.get().pushInternal(scope); } @@ -50,7 +50,7 @@ public static void push(@NotNull final LivenessScope scope) { * * @param scope The scope */ - public static void pop(@NotNull final LivenessScope scope) { + public static void pop(@NotNull final LivenessManager scope) { THREAD_STACK.get().popInternal(scope); } @@ -76,11 +76,11 @@ public static LivenessManager peek() { * * @param scope The scope * @param releaseOnClose Whether the scope should be released when the result is closed - * @return A {@link SafeCloseable} whose {@link SafeCloseable#close()} method invokes {@link #pop(LivenessScope)} + * @return A {@link SafeCloseable} whose {@link SafeCloseable#close()} method invokes {@link #pop(LivenessManager)} * for the scope (followed by {@link LivenessScope#release()} if releaseOnClose is true) */ @NotNull - public static SafeCloseable open(@NotNull final LivenessScope scope, final boolean releaseOnClose) { + public static SafeCloseable open(@NotNull final ReleasableLivenessManager scope, final boolean releaseOnClose) { push(scope); return releaseOnClose ? new PopAndReleaseOnClose(scope) : new PopOnClose(scope); } @@ -93,7 +93,7 @@ public static SafeCloseable open(@NotNull final LivenessScope scope, final boole * This is useful enclosing a series of query engine actions whose results must be explicitly retained externally in * order to preserve liveness. * - * @return A {@link SafeCloseable} whose {@link SafeCloseable#close()} method invokes {@link #pop(LivenessScope)} + * @return A {@link SafeCloseable} whose {@link SafeCloseable#close()} method invokes {@link #pop(LivenessManager)} * for the scope, followed by {@link LivenessScope#release()} */ @NotNull @@ -103,18 +103,18 @@ public static SafeCloseable open() { return new PopAndReleaseOnClose(scope); } - private void pushInternal(@NotNull final LivenessScope scope) { + private void pushInternal(@NotNull final LivenessManager scope) { if (Liveness.DEBUG_MODE_ENABLED) { Liveness.log.info().append("LivenessDebug: Pushing scope ").append(Utils.REFERENT_FORMATTER, scope).endl(); } stack.push(scope); } - private void popInternal(@NotNull final LivenessScope scope) { + private void popInternal(@NotNull final LivenessManager scope) { if (Liveness.DEBUG_MODE_ENABLED) { Liveness.log.info().append("LivenessDebug: Popping scope ").append(Utils.REFERENT_FORMATTER, scope).endl(); } - final LivenessScope peeked = stack.peekFirst(); + final LivenessManager peeked = stack.peekFirst(); if (peeked != scope) { throw new IllegalStateException( "Caller requested to pop " + scope + " but the top of the scope stack is " + peeked); @@ -124,15 +124,15 @@ private void popInternal(@NotNull final LivenessScope scope) { @NotNull private LivenessManager peekInternal() { - final LivenessScope peeked = stack.peekFirst(); + final LivenessManager peeked = stack.peekFirst(); return peeked != null ? peeked : THREAD_BASE_MANAGER.get(); } private static final class PopOnClose implements SafeCloseable { - private final LivenessScope scope; + private final LivenessManager scope; - private PopOnClose(@NotNull final LivenessScope scope) { + private PopOnClose(@NotNull final LivenessManager scope) { this.scope = scope; } @@ -144,9 +144,9 @@ public void close() { private static final class PopAndReleaseOnClose implements SafeCloseable { - private final LivenessScope scope; + private final ReleasableLivenessManager scope; - private PopAndReleaseOnClose(@NotNull final LivenessScope scope) { + private PopAndReleaseOnClose(@NotNull final ReleasableLivenessManager scope) { this.scope = scope; } diff --git a/engine/updategraph/src/main/java/io/deephaven/engine/liveness/ReferenceCountedLivenessNode.java b/engine/updategraph/src/main/java/io/deephaven/engine/liveness/ReferenceCountedLivenessNode.java index 450a8a466b5..1625c522cca 100644 --- a/engine/updategraph/src/main/java/io/deephaven/engine/liveness/ReferenceCountedLivenessNode.java +++ b/engine/updategraph/src/main/java/io/deephaven/engine/liveness/ReferenceCountedLivenessNode.java @@ -3,6 +3,7 @@ // package io.deephaven.engine.liveness; +import io.deephaven.util.SafeCloseable; import io.deephaven.util.Utils; import io.deephaven.util.annotations.VisibleForTesting; import org.jetbrains.annotations.NotNull; @@ -107,7 +108,8 @@ public final boolean tryUnmanage(@NotNull final Stream getWeakReference() { *

* This is intended to only ever be used as a side effect of decreasing the reference count to 0. */ + @OverridingMethodsMustInvokeSuper protected void destroy() {} @Override diff --git a/engine/updategraph/src/main/java/io/deephaven/engine/liveness/RetainedReferenceTracker.java b/engine/updategraph/src/main/java/io/deephaven/engine/liveness/RetainedReferenceTracker.java index 9104338fefd..67a849aeb30 100644 --- a/engine/updategraph/src/main/java/io/deephaven/engine/liveness/RetainedReferenceTracker.java +++ b/engine/updategraph/src/main/java/io/deephaven/engine/liveness/RetainedReferenceTracker.java @@ -4,13 +4,13 @@ package io.deephaven.engine.liveness; import io.deephaven.base.cache.RetentionCache; -import io.deephaven.base.reference.CleanupReference; import io.deephaven.base.reference.WeakCleanupReference; import io.deephaven.engine.util.reference.CleanupReferenceProcessorInstance; import io.deephaven.hash.KeyedObjectHashMap; import io.deephaven.hash.KeyedObjectKey; import io.deephaven.internal.log.LoggerFactory; import io.deephaven.io.logger.Logger; +import io.deephaven.util.SafeCloseable; import io.deephaven.util.Utils; import io.deephaven.util.datastructures.hash.KeyIdentityKeyedObjectKey; import org.jetbrains.annotations.NotNull; @@ -30,7 +30,7 @@ *

* This cleanup process is initiated one of two ways: *

    - *
  1. The manager invokes it directly via {@link #ensureReferencesDropped()} because it is releasing all of its + *
  2. The manager invokes it directly via {@link #enqueueReferencesForDrop()} because it is releasing all of its * retained references.
  3. *
  4. A {@link io.deephaven.util.reference.CleanupReferenceProcessor} or similar code invokes {@link #cleanup()} after * the manager is garbage-collected.
  5. @@ -46,10 +46,8 @@ final class RetainedReferenceTracker extends WeakC private static final AtomicInteger outstandingCount = new AtomicInteger(0); - private static final ThreadLocal>> tlPendingDropReferences = - new ThreadLocal<>(); - private static final ThreadLocal>>> tlSavedQueueReference = - new ThreadLocal<>(); + private static final ThreadLocal tlPendingDropsTracker = new ThreadLocal<>(); + private static final ThreadLocal> tlSavedTrackerReference = new ThreadLocal<>(); private static final Logger log = LoggerFactory.getLogger(RetainedReferenceTracker.class); @@ -83,11 +81,11 @@ public String toString() { /** * Add a {@link LivenessReferent} to drop a reference to on {@link #cleanup()} or - * {@link #ensureReferencesDropped()}. This is not permitted if {@link #cleanup()} or - * {@link #ensureReferencesDropped()} has already been invoked. + * {@link #enqueueReferencesForDrop()}. This is not permitted if {@link #cleanup()} or + * {@link #enqueueReferencesForDrop()} has already been invoked. * * @param referent The referent to drop on cleanup - * @throws LivenessStateException If {@link #cleanup()} or {@link #ensureReferencesDropped()} has already been + * @throws LivenessStateException If {@link #cleanup()} or {@link #enqueueReferencesForDrop()} has already been * invoked */ synchronized void addReference(@NotNull final LivenessReferent referent) throws LivenessStateException { @@ -98,12 +96,12 @@ synchronized void addReference(@NotNull final LivenessReferent referent) throws /** *

    * Remove at most one existing reference to referent from this tracker, so that it will no longer be dropped on - * {@link #cleanup()} or {@link #ensureReferencesDropped()}, and drop it immediately. + * {@link #cleanup()} or {@link #enqueueReferencesForDrop()}, and drop it immediately. *

    - * This is not permitted if {@link #cleanup()} or {@link #ensureReferencesDropped()} has already been invoked. + * This is not permitted if {@link #cleanup()} or {@link #enqueueReferencesForDrop()} has already been invoked. * * @param referent The referent to remove - * @throws LivenessStateException If {@link #cleanup()} or {@link #ensureReferencesDropped()} has already been + * @throws LivenessStateException If {@link #cleanup()} or {@link #enqueueReferencesForDrop()} has already been * invoked */ synchronized void dropReference(@NotNull final LivenessReferent referent) throws LivenessStateException { @@ -114,12 +112,12 @@ synchronized void dropReference(@NotNull final LivenessReferent referent) throws /** *

    * Remove at most one existing reference to each input referent from this tracker, so that it will no longer be - * dropped on {@link #cleanup()} or {@link #ensureReferencesDropped()}, and drop it immediately. + * dropped on {@link #cleanup()} or {@link #enqueueReferencesForDrop()}, and drop it immediately. *

    - * This is not permitted if {@link #cleanup()} or {@link #ensureReferencesDropped()} has already been invoked. + * This is not permitted if {@link #cleanup()} or {@link #enqueueReferencesForDrop()} has already been invoked. * * @param referents The referents to remove - * @throws LivenessStateException If {@link #cleanup()} or {@link #ensureReferencesDropped()} has already been + * @throws LivenessStateException If {@link #cleanup()} or {@link #enqueueReferencesForDrop()} has already been * invoked */ synchronized void dropReferences(@NotNull final Stream referents) @@ -133,23 +131,15 @@ synchronized void dropReferences(@NotNull final Stream - * This is not permitted if {@link #cleanup()} or {@link #ensureReferencesDropped()} has already been invoked. + * This is not permitted if {@link #cleanup()} or {@link #enqueueReferencesForDrop()} has already been invoked. * * @param other The other tracker - * @throws LivenessStateException If {@link #cleanup()} or {@link #ensureReferencesDropped()} has already been + * @throws LivenessStateException If {@link #cleanup()} or {@link #enqueueReferencesForDrop()} has already been * invoked */ synchronized void transferReferencesTo(@NotNull final RetainedReferenceTracker other) { checkOutstanding(); - for (final WeakReference retainedReference : impl) { - final LivenessReferent retained = retainedReference.get(); - if (retained != null) { - other.addReference(retained); - } else if (retainedReference instanceof CleanupReference) { - ((CleanupReference) retainedReference).cleanup(); - } - } - impl.clear(); + impl.transferReferencesTo(other); } /** @@ -157,9 +147,9 @@ synchronized void transferReferencesTo(@NotNull final RetainedReferenceTracker - * This is not permitted if {@link #cleanup()} or {@link #ensureReferencesDropped()} has already been invoked. + * This is not permitted if {@link #cleanup()} or {@link #enqueueReferencesForDrop()} has already been invoked. * - * @throws LivenessStateException If {@link #cleanup()} or {@link #ensureReferencesDropped()} has already been + * @throws LivenessStateException If {@link #cleanup()} or {@link #enqueueReferencesForDrop()} has already been * invoked */ synchronized void makeReferencesPermanent() { @@ -174,23 +164,39 @@ private void checkOutstanding() { } } + /** + * Ensure that references are dropped even if this RetainedReferenceTracker's manager is garbage-collected. As a + * last resort, this will be invoked by the {@link CleanupReferenceProcessorInstance#LIVENESS liveness cleanup + * reference processor}, but it may also be invoked by any other RetainedReferenceTracker that observes that this + * RetainedReferenceTracker no longer refers to its manager. This method is idempotent. + */ @Override public void cleanup() { - ensureReferencesDroppedInternal(true); + // noinspection EmptyTryBlock + try (final SafeCloseable ignored = ensureReferencesDroppedInternal(true)) { + } } /** *

    - * Initiate the idempotent cleanup process. This will drop all retained references if their referents still exist. - * No new references may be added to or dropped from this tracker. + * Initiate the idempotent cleanup process. This will enqueue all retained references to be dropped if their + * referents still exist. No new references may be added to or dropped from this tracker. + * + * @apiNote This should be invoked in proactive cleanup scenarios before any destructive cleanup operations are + * undertaken, in order to allow the system to record the state of the retention graph and propagate + * proactive cleanup as far as possible. + * + * @return A {@link SafeCloseable} that will process the queued drops if necessary. Must be called exactly once, + * after any other desired cleanup has been performed. */ - void ensureReferencesDropped() { - ensureReferencesDroppedInternal(false); + SafeCloseable enqueueReferencesForDrop() { + return ensureReferencesDroppedInternal(false); } - private void ensureReferencesDroppedInternal(final boolean onCleanup) { + private SafeCloseable ensureReferencesDroppedInternal(final boolean onCleanup) { if (!OUTSTANDING_STATE_UPDATER.compareAndSet(this, OUTSTANDING, NOT_OUTSTANDING)) { - return; + return () -> { + }; } if (Liveness.DEBUG_MODE_ENABLED || (onCleanup && Liveness.CLEANUP_LOG_ENABLED)) { Liveness.log.info().append("LivenessDebug: Ensuring references dropped ") @@ -199,43 +205,38 @@ private void ensureReferencesDroppedInternal(final boolean onCleanup) { } outstandingCount.decrementAndGet(); - Queue> pendingDropReferences = tlPendingDropReferences.get(); - final boolean processDrops = pendingDropReferences == null; + PendingDropsTracker pendingDropsTracker = tlPendingDropsTracker.get(); + final boolean processDrops = pendingDropsTracker == null; if (processDrops) { - final SoftReference>> savedQueueReference = - tlSavedQueueReference.get(); - if (savedQueueReference == null || (pendingDropReferences = savedQueueReference.get()) == null) { - tlSavedQueueReference.set(new SoftReference<>(pendingDropReferences = new ArrayDeque<>())); + final SoftReference savedTrackerReference = tlSavedTrackerReference.get(); + if (savedTrackerReference == null || (pendingDropsTracker = savedTrackerReference.get()) == null) { + tlSavedTrackerReference.set(new SoftReference<>(pendingDropsTracker = new PendingDropsTracker())); } - tlPendingDropReferences.set(pendingDropReferences); + tlPendingDropsTracker.set(pendingDropsTracker); } synchronized (this) { - impl.forEach(pendingDropReferences::add); - impl.clear(); + impl.enqueueReferencesForDrop(pendingDropsTracker, onCleanup); } if (processDrops) { - try { - WeakReference pendingDropReference; - while ((pendingDropReference = pendingDropReferences.poll()) != null) { - final LivenessReferent pendingDrop = pendingDropReference.get(); - if (pendingDrop != null) { - pendingDrop.dropReference(); - } else if (pendingDropReference instanceof CleanupReference) { - ((CleanupReference) pendingDropReference).cleanup(); - } + final PendingDropsTracker finalPendingDropsTracker = pendingDropsTracker; + return () -> { + try { + finalPendingDropsTracker.dropAll(); + } finally { + tlPendingDropsTracker.remove(); } - } finally { - tlPendingDropReferences.set(null); - } + }; } + return () -> { + }; } /** *

    * Get the number of outstanding trackers (instances of RetainedReferenceTracker that have not had their - * {@link #cleanup()} or {@link #ensureReferencesDropped()} method called). + * {@link #cleanup()} or {@link #enqueueReferencesForDrop()} method called). *

    * Note that this number represents the liveness system's current knowledge of the number of live references in the * system. @@ -246,14 +247,17 @@ static int getOutstandingCount() { return outstandingCount.get(); } - private interface Impl extends Iterable> { + private interface Impl { + void add(@NotNull final LivenessReferent referent); void drop(@NotNull final LivenessReferent referent); void drop(@NotNull final Stream referents); - void clear(); + void enqueueReferencesForDrop(@NotNull PendingDropsTracker tracker, boolean onCleanup); + + void transferReferencesTo(@NotNull RetainedReferenceTracker other); void makePermanent(); } @@ -286,8 +290,8 @@ public void drop(@NotNull final LivenessReferent referent) { retainedReferences.set(rri, retainedReferences.get(rrLast)); } retainedReferences.remove(rrLast--); - if (cleared && retainedReference instanceof CleanupReference) { - ((CleanupReference) retainedReference).cleanup(); + if (cleared && retainedReference instanceof RetainedReferenceTracker) { + ((RetainedReferenceTracker) retainedReference).cleanup(); } if (found) { referent.dropReference(); @@ -298,16 +302,17 @@ public void drop(@NotNull final LivenessReferent referent) { @Override public void drop(@NotNull final Stream referents) { - final KeyedObjectHashMap referentsToRemove = - new KeyedObjectHashMap<>(DropState.KEYED_OBJECT_KEY); - referents.forEach(referent -> referentsToRemove.putIfAbsent(referent, DropState::new).incrementDrops()); + final KeyedObjectHashMap referentsToRemove = + new KeyedObjectHashMap<>(DropRequestState.KEYED_OBJECT_KEY); + referents.forEach( + referent -> referentsToRemove.putIfAbsent(referent, DropRequestState::new).incrementDrops()); if (referentsToRemove.isEmpty()) { return; } for (int rrLast = retainedReferences.size() - 1, rri = 0; rri <= rrLast;) { final WeakReference retainedReference = retainedReferences.get(rri); final boolean cleared; - final DropState foundState; + final DropRequestState foundState; { final LivenessReferent retained = retainedReference.get(); cleared = retained == null; @@ -321,8 +326,8 @@ public void drop(@NotNull final Stream referents) { retainedReferences.set(rri, retainedReferences.get(rrLast)); } retainedReferences.remove(rrLast--); - if (cleared && retainedReference instanceof CleanupReference) { - ((CleanupReference) retainedReference).cleanup(); + if (cleared && retainedReference instanceof RetainedReferenceTracker) { + ((RetainedReferenceTracker) retainedReference).cleanup(); } if (foundState != null && foundState.doDrop()) { referentsToRemove.remove(foundState.referent); @@ -334,19 +339,31 @@ public void drop(@NotNull final Stream referents) { } @Override - public void clear() { + public void enqueueReferencesForDrop(@NotNull final PendingDropsTracker tracker, final boolean onCleanup) { + if (onCleanup) { + retainedReferences.forEach(tracker::addOnCleanup); + } else { + retainedReferences.forEach(tracker::addOnEnsureDropped); + } retainedReferences.clear(); } @Override - public void makePermanent() { + public void transferReferencesTo(@NotNull final RetainedReferenceTracker other) { + retainedReferences.forEach((final WeakReference retainedReference) -> { + final LivenessReferent retained = retainedReference.get(); + if (retained != null) { + other.addReference(retained); + } else if (retainedReference instanceof RetainedReferenceTracker) { + ((RetainedReferenceTracker) retainedReference).cleanup(); + } + }); retainedReferences.clear(); } - @NotNull @Override - public Iterator> iterator() { - return retainedReferences.iterator(); + public void makePermanent() { + retainedReferences.clear(); } } @@ -379,15 +396,16 @@ public void drop(@NotNull final LivenessReferent referent) { @Override public void drop(@NotNull final Stream referents) { - final KeyedObjectHashMap referentsToRemove = - new KeyedObjectHashMap<>(DropState.KEYED_OBJECT_KEY); - referents.forEach(referent -> referentsToRemove.putIfAbsent(referent, DropState::new).incrementDrops()); + final KeyedObjectHashMap referentsToRemove = + new KeyedObjectHashMap<>(DropRequestState.KEYED_OBJECT_KEY); + referents.forEach( + referent -> referentsToRemove.putIfAbsent(referent, DropRequestState::new).incrementDrops()); if (referentsToRemove.isEmpty()) { return; } for (int rLast = retained.size() - 1, ri = 0; ri <= rLast;) { final LivenessReferent current = retained.get(ri); - final DropState foundState = referentsToRemove.get(current); + final DropRequestState foundState = referentsToRemove.get(current); if (foundState != null) { if (ri != rLast) { retained.set(ri, retained.get(rLast)); @@ -406,44 +424,39 @@ public void drop(@NotNull final Stream referents) { } @Override - public void clear() { + public void enqueueReferencesForDrop(@NotNull final PendingDropsTracker tracker, final boolean onCleanup) { + if (onCleanup) { + retained.forEach(tracker::addOnCleanup); + } else { + retained.forEach(tracker::addOnEnsureDropped); + } retained.clear(); } @Override - public void makePermanent() { - // See LivenessScope.transferTo: This is currently unreachable code, but implemented for completeness - retained.forEach(permanentReferences::retain); + public void transferReferencesTo(@NotNull final RetainedReferenceTracker other) { + retained.forEach(other::addReference); retained.clear(); } - @NotNull @Override - public Iterator> iterator() { - return new Iterator<>() { - - private final Iterator internal = retained.iterator(); - - @Override - public boolean hasNext() { - return internal.hasNext(); - } - - @Override - public WeakReference next() { - return internal.next().getWeakReference(); - } - }; + public void makePermanent() { + // See LivenessScope.transferTo: This is currently unreachable code, but implemented for completeness + retained.forEach(permanentReferences::retain); + retained.clear(); } } - private static final class DropState { + /** + * A state that tracks the number of times a referent should be dropped. + */ + private static final class DropRequestState { - private static final KeyedObjectKey KEYED_OBJECT_KEY = + private static final KeyedObjectKey KEYED_OBJECT_KEY = new KeyIdentityKeyedObjectKey<>() { @Override - public LivenessReferent getKey(@NotNull final DropState dropState) { - return dropState.referent; + public LivenessReferent getKey(@NotNull final DropRequestState dropRequestState) { + return dropRequestState.referent; } }; @@ -451,7 +464,7 @@ public LivenessReferent getKey(@NotNull final DropState dropState) { private int timesToDrop; - private DropState(@NotNull final LivenessReferent referent) { + private DropRequestState(@NotNull final LivenessReferent referent) { this.referent = referent; } @@ -464,4 +477,63 @@ boolean doDrop() { return --timesToDrop == 0; } } + + /** + * A tracker for drops that are pending on the current thread, used to avoid deep recursion when ensuring that + * references are dropped. + */ + private static final class PendingDropsTracker { + + private final Queue pendingDrops = new ArrayDeque<>(); + + void addOnCleanup(@NotNull final WeakReference reference) { + /* + * Enqueue the WeakReference, taking no position w.r.t. reachability of the referent. + */ + pendingDrops.add(reference); + } + + void addOnEnsureDropped(@NotNull final WeakReference reference) { + /* + * Preserve reachability from the time of invocation. This is intended to make sure that proactive cleanup + * respects the state of the world at the time it was initiated. + */ + final LivenessReferent referent = reference.get(); + pendingDrops.add(referent == null ? reference : referent); + } + + void addOnCleanup(@NotNull final LivenessReferent referent) { + /* + * We enqueue the WeakReference, rather than the LivenessReferent itself. Since our manager has been + * collected, it's inappropriate to enforce strong reachability to its references. + */ + pendingDrops.add(referent.getWeakReference()); + } + + void addOnEnsureDropped(@NotNull final LivenessReferent referent) { + /* + * We enqueue the LivenessReferent itself, rather than a WeakReference. Since the manager is explicitly + * ensuring that we drop its references, we should preserve reachability from the time of invocation. + */ + pendingDrops.add(referent); + } + + void dropAll() { + Object next; + while ((next = pendingDrops.poll()) != null) { + // noinspection unchecked + final WeakReference pendingDropReference = next instanceof WeakReference + ? (WeakReference) next + : null; + final LivenessReferent pendingDrop = pendingDropReference == null + ? (LivenessReferent) next + : pendingDropReference.get(); + if (pendingDrop != null) { + pendingDrop.dropReference(); + } else if (pendingDropReference instanceof RetainedReferenceTracker) { + ((RetainedReferenceTracker) pendingDropReference).cleanup(); + } + } + } + } } diff --git a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateSourceCombiner.java b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateSourceCombiner.java index 6fb6e841f6f..da7820a3712 100644 --- a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateSourceCombiner.java +++ b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/UpdateSourceCombiner.java @@ -7,6 +7,7 @@ import io.deephaven.engine.liveness.LivenessArtifact; import org.jetbrains.annotations.NotNull; +import javax.annotation.OverridingMethodsMustInvokeSuper; import java.util.Collections; /** @@ -64,6 +65,7 @@ public void requestRefresh() { updateGraph.requestRefresh(); } + @OverridingMethodsMustInvokeSuper @Override public void destroy() { super.destroy(); diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java index 514f76f05d6..60f1371d34c 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/table/BarrageTable.java @@ -37,6 +37,7 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import javax.annotation.OverridingMethodsMustInvokeSuper; import java.time.Instant; import java.util.*; import java.util.concurrent.ScheduledExecutorService; @@ -570,6 +571,7 @@ protected LogEntry beginLog(LogLevel level) { return log.getEntry(level).append(System.identityHashCode(this)); } + @OverridingMethodsMustInvokeSuper @Override protected void destroy() { super.destroy(); diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/publish/PublishToKafka.java b/extensions/kafka/src/main/java/io/deephaven/kafka/publish/PublishToKafka.java index d3b311323ba..61e4a3191f8 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/publish/PublishToKafka.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/publish/PublishToKafka.java @@ -35,6 +35,7 @@ import org.apache.kafka.common.serialization.Serializer; import org.jetbrains.annotations.NotNull; +import javax.annotation.OverridingMethodsMustInvokeSuper; import java.time.Instant; import java.util.Objects; import java.util.Properties; @@ -398,6 +399,7 @@ public void onUpdate(TableUpdate upstream) { } } + @OverridingMethodsMustInvokeSuper @Override protected void destroy() { super.destroy(); diff --git a/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SubscribeExampleBase.java b/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SubscribeExampleBase.java index ca802d34111..d228fd41ca7 100644 --- a/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SubscribeExampleBase.java +++ b/java-client/barrage-examples/src/main/java/io/deephaven/client/examples/SubscribeExampleBase.java @@ -20,6 +20,7 @@ import io.deephaven.util.annotations.ReferentialIntegrity; import picocli.CommandLine; +import javax.annotation.OverridingMethodsMustInvokeSuper; import java.util.concurrent.CountDownLatch; abstract class SubscribeExampleBase extends BarrageClientExampleBase { @@ -85,6 +86,7 @@ protected void execute(final BarrageSession client) throws Exception { manage(tableRef); } + @OverridingMethodsMustInvokeSuper @Override protected void destroy() { super.destroy(); diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java index f34382297e0..090e3b4ffec 100644 --- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java +++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSnapshotImpl.java @@ -35,6 +35,7 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import javax.annotation.OverridingMethodsMustInvokeSuper; import java.io.InputStream; import java.nio.ByteBuffer; import java.util.BitSet; @@ -218,6 +219,7 @@ private boolean tryRecordDisconnect() { return CONNECTED_UPDATER.compareAndSet(this, 1, 0); } + @OverridingMethodsMustInvokeSuper @Override protected void destroy() { super.destroy(); diff --git a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java index 26c0672e649..3a90d42d448 100644 --- a/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java +++ b/java-client/barrage/src/main/java/io/deephaven/client/impl/BarrageSubscriptionImpl.java @@ -42,6 +42,7 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import javax.annotation.OverridingMethodsMustInvokeSuper; import java.io.InputStream; import java.nio.ByteBuffer; import java.util.BitSet; @@ -254,6 +255,7 @@ private void onFutureComplete() { } } + @OverridingMethodsMustInvokeSuper @Override protected void destroy() { super.destroy(); diff --git a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java index e0420105d05..42f277d31ee 100644 --- a/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java +++ b/server/src/main/java/io/deephaven/server/barrage/BarrageMessageProducer.java @@ -51,6 +51,7 @@ import org.jetbrains.annotations.Nullable; import org.HdrHistogram.Histogram; +import javax.annotation.OverridingMethodsMustInvokeSuper; import java.time.Instant; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -657,6 +658,7 @@ protected void onFailureInternal(final Throwable originalException, Entry source } } + @OverridingMethodsMustInvokeSuper @Override public void destroy() { super.destroy(); @@ -2251,6 +2253,7 @@ BarrageMessage getSnapshot( return msg; } + @OverridingMethodsMustInvokeSuper @Override protected synchronized void destroy() { super.destroy(); diff --git a/server/src/main/java/io/deephaven/server/hierarchicaltable/HierarchicalTableViewSubscription.java b/server/src/main/java/io/deephaven/server/hierarchicaltable/HierarchicalTableViewSubscription.java index a0e18a46ad6..d91b7efe6e6 100644 --- a/server/src/main/java/io/deephaven/server/hierarchicaltable/HierarchicalTableViewSubscription.java +++ b/server/src/main/java/io/deephaven/server/hierarchicaltable/HierarchicalTableViewSubscription.java @@ -34,6 +34,7 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import javax.annotation.OverridingMethodsMustInvokeSuper; import java.time.Instant; import java.util.ArrayList; import java.util.BitSet; @@ -153,6 +154,7 @@ public HierarchicalTableViewSubscription( fbb -> HierarchicalTableSchemaUtil.makeSchemaPayload(fbb, view.getHierarchicalTable()))); } + @OverridingMethodsMustInvokeSuper @Override protected void destroy() { super.destroy(); diff --git a/server/src/main/java/io/deephaven/server/plugin/python/LivePyObjectWrapper.java b/server/src/main/java/io/deephaven/server/plugin/python/LivePyObjectWrapper.java index c06d2d19c90..c14a53f9a69 100644 --- a/server/src/main/java/io/deephaven/server/plugin/python/LivePyObjectWrapper.java +++ b/server/src/main/java/io/deephaven/server/plugin/python/LivePyObjectWrapper.java @@ -9,6 +9,8 @@ import org.jetbrains.annotations.NotNull; import org.jpy.PyObject; +import javax.annotation.OverridingMethodsMustInvokeSuper; + /** * Provides a mapping between Python refcount and Deephaven's liveness mechanism, allowing liveness scopes to manage the * single strong reference that the PyObject @@ -33,6 +35,7 @@ public LivePyObjectWrapper(@NotNull PyObject pythonObject) { LivenessScopeStack.peek().manage(this); } + @OverridingMethodsMustInvokeSuper @Override protected void destroy() { super.destroy(); diff --git a/server/src/main/java/io/deephaven/server/session/SessionState.java b/server/src/main/java/io/deephaven/server/session/SessionState.java index c144a1b77b7..526726c78a1 100644 --- a/server/src/main/java/io/deephaven/server/session/SessionState.java +++ b/server/src/main/java/io/deephaven/server/session/SessionState.java @@ -43,6 +43,7 @@ import org.jetbrains.annotations.NotNull; import javax.annotation.Nullable; +import javax.annotation.OverridingMethodsMustInvokeSuper; import javax.inject.Provider; import java.io.Closeable; import java.io.IOException; @@ -1131,6 +1132,7 @@ public synchronized void cancel() { } } + @OverridingMethodsMustInvokeSuper @Override protected synchronized void destroy() { super.destroy(); diff --git a/server/src/main/java/io/deephaven/server/table/ExportedTableUpdateListener.java b/server/src/main/java/io/deephaven/server/table/ExportedTableUpdateListener.java index f52982c2b81..cff61cf74eb 100644 --- a/server/src/main/java/io/deephaven/server/table/ExportedTableUpdateListener.java +++ b/server/src/main/java/io/deephaven/server/table/ExportedTableUpdateListener.java @@ -35,6 +35,8 @@ import io.grpc.stub.StreamObserver; import org.jetbrains.annotations.NotNull; +import javax.annotation.OverridingMethodsMustInvokeSuper; + import static io.deephaven.extensions.barrage.util.GrpcUtil.safelyComplete; /** @@ -247,6 +249,7 @@ public void onFailureInternal(final Throwable error, final Entry sourceEntry) { errorTransformer.transform(error)); } + @OverridingMethodsMustInvokeSuper @Override public void destroy() { super.destroy();