diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/AsOfJoinHelper.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/AsOfJoinHelper.java index aad2f6fc6e0..b301024da74 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/AsOfJoinHelper.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/AsOfJoinHelper.java @@ -249,8 +249,6 @@ private static Table rightStaticAj(JoinControl control, stampPair, columnsToAdd, order == SortingOrder.Descending, disallowExactMatch), leftTable, result) { @Override public void onUpdate(TableUpdate upstream) { - final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream); - rowRedirection.removeAll(upstream.removed()); final boolean keysModified = upstream.modifiedColumnSet().containsAny(leftKeysOrStamps); @@ -296,7 +294,8 @@ public void onUpdate(TableUpdate upstream) { } } - downstream.modifiedColumnSet = result.getModifiedColumnSetForUpdates(); + final TableUpdateImpl downstream = + TableUpdateImpl.copy(upstream, result.getModifiedColumnSetForUpdates()); leftTransformer.clearAndTransform(upstream.modifiedColumnSet(), downstream.modifiedColumnSet()); if (keysModified) { downstream.modifiedColumnSet().setAll(allRightColumns); @@ -1478,8 +1477,6 @@ private static Table zeroKeyAjRightStatic(QueryTable leftTable, Table rightTable leftTable, result) { @Override public void onUpdate(TableUpdate upstream) { - final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream); - rowRedirection.removeAll(upstream.removed()); final boolean stampModified = upstream.modified().isNonempty() @@ -1504,7 +1501,8 @@ public void onUpdate(TableUpdate upstream) { compactedRightStampKeys, rowRedirection); } - downstream.modifiedColumnSet = result.getModifiedColumnSetForUpdates(); + final TableUpdateImpl downstream = + TableUpdateImpl.copy(upstream, result.getModifiedColumnSetForUpdates()); leftTransformer.clearAndTransform(upstream.modifiedColumnSet(), downstream.modifiedColumnSet()); if (stampModified) { diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/BaseTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/BaseTable.java index 389fa155f8e..a66ac84e0f9 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/BaseTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/BaseTable.java @@ -964,14 +964,9 @@ public ListenerImpl(String description, Table parent, BaseTable dependent) { @Override public void onUpdate(final TableUpdate upstream) { - final TableUpdate downstream; - if (!canReuseModifiedColumnSet) { - final TableUpdateImpl upstreamCopy = TableUpdateImpl.copy(upstream); - upstreamCopy.modifiedColumnSet = ModifiedColumnSet.ALL; - downstream = upstreamCopy; - } else { - downstream = upstream.acquire(); - } + final TableUpdate downstream = canReuseModifiedColumnSet + ? upstream.acquire() + : TableUpdateImpl.copy(upstream, ModifiedColumnSet.ALL); dependent.notifyListeners(downstream); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/NaturalJoinHelper.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/NaturalJoinHelper.java index 3448858b8ad..31e0d8f27b1 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/NaturalJoinHelper.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/NaturalJoinHelper.java @@ -354,8 +354,8 @@ protected void process() { @Override public void onUpdate(final TableUpdate upstream) { checkRightTableSizeZeroKeys(leftTable, rightTable, exactMatch); - final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream); - downstream.modifiedColumnSet = result.getModifiedColumnSetForUpdates(); + final TableUpdateImpl downstream = + TableUpdateImpl.copy(upstream, result.getModifiedColumnSetForUpdates()); leftTransformer.clearAndTransform(upstream.modifiedColumnSet(), downstream.modifiedColumnSet); result.notifyListeners(downstream); @@ -476,14 +476,13 @@ private static class LeftTickingListener extends BaseTable.ListenerImpl { @Override public void onUpdate(final TableUpdate upstream) { - final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream); rowRedirection.removeAll(upstream.removed()); try (final RowSet prevRowSet = leftTable.getRowSet().copyPrev()) { rowRedirection.applyShift(prevRowSet, upstream.shifted()); } - downstream.modifiedColumnSet = result.getModifiedColumnSetForUpdates(); + final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream, result.getModifiedColumnSetForUpdates()); leftTransformer.clearAndTransform(upstream.modifiedColumnSet(), downstream.modifiedColumnSet); if (upstream.modifiedColumnSet().containsAny(leftKeyColumns)) { diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index 8e0a48fcd9f..14ce8368704 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -1821,8 +1821,8 @@ private static class ViewOrUpdateViewListener extends ListenerImpl { @Override public void onUpdate(final TableUpdate upstream) { - final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream); - downstream.modifiedColumnSet = dependent.getModifiedColumnSetForUpdates(); + final TableUpdateImpl downstream = + TableUpdateImpl.copy(upstream, dependent.getModifiedColumnSetForUpdates()); transformer.clearAndTransform(upstream.modifiedColumnSet(), downstream.modifiedColumnSet); dependent.notifyListeners(downstream); } @@ -1896,19 +1896,19 @@ public Table dropColumns(String... columnNames) { "dropColumns(" + Arrays.deepToString(columnNames) + ')', this, resultTable) { @Override public void onUpdate(final TableUpdate upstream) { - final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream); + final TableUpdateImpl downstream; final ModifiedColumnSet resultModifiedColumnSet = resultTable.getModifiedColumnSetForUpdates(); mcsTransformer.clearAndTransform(upstream.modifiedColumnSet(), resultModifiedColumnSet); if (upstream.modified().isEmpty() || resultModifiedColumnSet.empty()) { - downstream.modifiedColumnSet = ModifiedColumnSet.EMPTY; + downstream = TableUpdateImpl.copy(upstream, ModifiedColumnSet.EMPTY); if (downstream.modified().isNonempty()) { downstream.modified().close(); downstream.modified = RowSetFactory.empty(); } } else { - downstream.modifiedColumnSet = resultModifiedColumnSet; + downstream = TableUpdateImpl.copy(upstream, resultModifiedColumnSet); } resultTable.notifyListeners(downstream); } @@ -2039,13 +2039,14 @@ private Table renameColumnsImpl( methodNuggetPrefix + pairsLogString + ')', this, resultTable) { @Override public void onUpdate(final TableUpdate upstream) { - final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream); + final TableUpdateImpl downstream; if (upstream.modified().isNonempty()) { - downstream.modifiedColumnSet = resultTable.getModifiedColumnSetForUpdates(); + downstream = TableUpdateImpl.copy(upstream, + resultTable.getModifiedColumnSetForUpdates()); mcsTransformer.clearAndTransform(upstream.modifiedColumnSet(), downstream.modifiedColumnSet); } else { - downstream.modifiedColumnSet = ModifiedColumnSet.EMPTY; + downstream = TableUpdateImpl.copy(upstream, ModifiedColumnSet.EMPTY); } resultTable.notifyListeners(downstream); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/ShiftedColumnOperation.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/ShiftedColumnOperation.java index 1f9f6125ca9..c7f23671f03 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/ShiftedColumnOperation.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/ShiftedColumnOperation.java @@ -131,8 +131,7 @@ private static Table getShiftedColumns(@NotNull Table source, long shift, @NotNu final BaseTable.ListenerImpl listener = new BaseTable.ListenerImpl("propagateUpdates", source, result) { @Override public void onUpdate(TableUpdate upstream) { - final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream); - downstream.modifiedColumnSet = downstreamColumnSet; + final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream, downstreamColumnSet); mcsTransformer.clearAndTransform( upstream.modifiedColumnSet(), downstream.modifiedColumnSet); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SparseSelect.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SparseSelect.java index 52fb59fbc66..af1b4838c68 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SparseSelect.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SparseSelect.java @@ -189,8 +189,8 @@ private static Table sparseSelect(QueryTable source, String[] preserveColumns, S @Override public void onUpdate(TableUpdate upstream) { - final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream); - downstream.modifiedColumnSet = modifiedColumnSetForUpdates; + final TableUpdateImpl downstream = + TableUpdateImpl.copy(upstream, modifiedColumnSetForUpdates); if (sparseObjectSources.length > 0) { try (final RowSet removedOnly = upstream.removed().minus(upstream.added())) { for (final ObjectSparseArraySource objectSparseArraySource : sparseObjectSources) { diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/TableUpdateImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/TableUpdateImpl.java index 15b1ad63b41..0674fbab2a1 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/TableUpdateImpl.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/TableUpdateImpl.java @@ -165,9 +165,19 @@ public static TableUpdateImpl copy(@NotNull final TableUpdate tableUpdate) { newMCS = new ModifiedColumnSet(oldMCS); newMCS.setAll(oldMCS); } + return copy(tableUpdate, newMCS); + } + + /** + * Make a deep copy of a {@link TableUpdate} with the given {@code mcs}. + */ + public static TableUpdateImpl copy(@NotNull final TableUpdate tableUpdate, @NotNull final ModifiedColumnSet mcs) { return new TableUpdateImpl( - tableUpdate.added().copy(), tableUpdate.removed().copy(), tableUpdate.modified().copy(), - tableUpdate.shifted(), newMCS); + tableUpdate.added().copy(), + tableUpdate.removed().copy(), + tableUpdate.modified().copy(), + tableUpdate.shifted(), + mcs); } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/TableUpdateValidator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/TableUpdateValidator.java index 1d4dd3ae586..07018f352b2 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/TableUpdateValidator.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/TableUpdateValidator.java @@ -10,7 +10,6 @@ import io.deephaven.chunk.attributes.Values; import io.deephaven.chunk.util.hashing.ChunkEquals; import io.deephaven.configuration.Configuration; -import io.deephaven.datastructures.util.CollectionUtil; import io.deephaven.engine.rowset.RowSequence; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.RowSetShiftData; @@ -219,8 +218,7 @@ private void onUpdate(final TableUpdate upstream) { return; } - final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream); - resultTable.notifyListeners(downstream); + resultTable.notifyListeners(upstream.acquire()); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/util/TickSuppressor.java b/engine/table/src/main/java/io/deephaven/engine/util/TickSuppressor.java index 189a6781005..cc127795631 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/TickSuppressor.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/TickSuppressor.java @@ -55,10 +55,11 @@ public static Table convertModificationsToAddsAndRemoves(Table input) { "convertModificationsToAddsAndRemoves", input, resultTable) { @Override public void onUpdate(TableUpdate upstream) { - final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream); + final TableUpdateImpl downstream = new TableUpdateImpl(); downstream.added = upstream.added().union(upstream.modified()); downstream.removed = upstream.removed().union(upstream.getModifiedPreShift()); downstream.modified = RowSetFactory.empty(); + downstream.shifted = upstream.shifted(); downstream.modifiedColumnSet = ModifiedColumnSet.EMPTY; resultTable.notifyListeners(downstream); } @@ -121,9 +122,8 @@ public static Table removeSpuriousModifications(Table input) { @Override public void onUpdate(TableUpdate upstream) { - final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream); - downstream.modifiedColumnSet = resultTable.getModifiedColumnSetForUpdates(); - + final TableUpdateImpl downstream = + TableUpdateImpl.copy(upstream, resultTable.getModifiedColumnSetForUpdates()); if (downstream.modified().isEmpty()) { identityTransformer.clearAndTransform(upstream.modifiedColumnSet(), downstream.modifiedColumnSet()); diff --git a/engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java b/engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java index 9f62e7ebb35..278814c44b0 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/WindowCheck.java @@ -346,7 +346,8 @@ protected void process() { // now add the new timestamps addRowSequence(upstream.added(), rowKeyToEntry != null); - final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream); + final TableUpdateImpl downstream = + TableUpdateImpl.copy(upstream, result.getModifiedColumnSetForUpdates()); try (final RowSet modifiedByTime = recomputeModified()) { if (modifiedByTime.isNonempty()) { @@ -355,7 +356,6 @@ protected void process() { } // everything that was added, removed, or modified stays added removed or modified - downstream.modifiedColumnSet = result.getModifiedColumnSetForUpdates(); if (downstream.modified.isNonempty()) { mcsTransformer.clearAndTransform(upstream.modifiedColumnSet(), downstream.modifiedColumnSet); downstream.modifiedColumnSet.setAll(mcsResultWindowColumn); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/SelectOverheadLimiter.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/SelectOverheadLimiter.java index 2a2248d2eef..f2a6ce18894 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/SelectOverheadLimiter.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/SelectOverheadLimiter.java @@ -141,8 +141,8 @@ protected void process() { rowSet.remove(upstream.removed()); upstream.shifted().apply(rowSet); rowSet.insert(upstream.added()); - final TableUpdateImpl copy = TableUpdateImpl.copy(upstream); - copy.modifiedColumnSet = result.getModifiedColumnSetForUpdates(); + final TableUpdateImpl copy = + TableUpdateImpl.copy(upstream, result.getModifiedColumnSetForUpdates()); flatTransformer.clearAndTransform(upstream.modifiedColumnSet(), copy.modifiedColumnSet()); result.notifyListeners(copy); return; @@ -157,8 +157,8 @@ protected void process() { rowSet.insert(upstream.added()); if (overheadTracker.overhead() <= permittedOverhead) { - final TableUpdateImpl copy = TableUpdateImpl.copy(upstream); - copy.modifiedColumnSet = result.getModifiedColumnSetForUpdates(); + final TableUpdateImpl copy = + TableUpdateImpl.copy(upstream, result.getModifiedColumnSetForUpdates()); inputTransformer.clearAndTransform(upstream.modifiedColumnSet(), copy.modifiedColumnSet()); result.notifyListeners(copy); return;