Skip to content

Commit

Permalink
Introduce TableUpdateImpl copy that takes an MCS (deephaven#5319)
Browse files Browse the repository at this point in the history
This allows allows the majority of copy callsites to be more efficient when they know they are going to overwrite the resulting modifiedColumnSet anyways.
  • Loading branch information
devinrsmith authored Apr 3, 2024
1 parent 14f8ca3 commit 30d5b60
Show file tree
Hide file tree
Showing 11 changed files with 45 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,6 @@ private static Table rightStaticAj(JoinControl control,
stampPair, columnsToAdd, order == SortingOrder.Descending, disallowExactMatch), leftTable, result) {
@Override
public void onUpdate(TableUpdate upstream) {
final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream);

rowRedirection.removeAll(upstream.removed());

final boolean keysModified = upstream.modifiedColumnSet().containsAny(leftKeysOrStamps);
Expand Down Expand Up @@ -296,7 +294,8 @@ public void onUpdate(TableUpdate upstream) {
}
}

downstream.modifiedColumnSet = result.getModifiedColumnSetForUpdates();
final TableUpdateImpl downstream =
TableUpdateImpl.copy(upstream, result.getModifiedColumnSetForUpdates());
leftTransformer.clearAndTransform(upstream.modifiedColumnSet(), downstream.modifiedColumnSet());
if (keysModified) {
downstream.modifiedColumnSet().setAll(allRightColumns);
Expand Down Expand Up @@ -1478,8 +1477,6 @@ private static Table zeroKeyAjRightStatic(QueryTable leftTable, Table rightTable
leftTable, result) {
@Override
public void onUpdate(TableUpdate upstream) {
final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream);

rowRedirection.removeAll(upstream.removed());

final boolean stampModified = upstream.modified().isNonempty()
Expand All @@ -1504,7 +1501,8 @@ public void onUpdate(TableUpdate upstream) {
compactedRightStampKeys, rowRedirection);
}

downstream.modifiedColumnSet = result.getModifiedColumnSetForUpdates();
final TableUpdateImpl downstream =
TableUpdateImpl.copy(upstream, result.getModifiedColumnSetForUpdates());
leftTransformer.clearAndTransform(upstream.modifiedColumnSet(),
downstream.modifiedColumnSet());
if (stampModified) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -964,14 +964,9 @@ public ListenerImpl(String description, Table parent, BaseTable<?> dependent) {

@Override
public void onUpdate(final TableUpdate upstream) {
final TableUpdate downstream;
if (!canReuseModifiedColumnSet) {
final TableUpdateImpl upstreamCopy = TableUpdateImpl.copy(upstream);
upstreamCopy.modifiedColumnSet = ModifiedColumnSet.ALL;
downstream = upstreamCopy;
} else {
downstream = upstream.acquire();
}
final TableUpdate downstream = canReuseModifiedColumnSet
? upstream.acquire()
: TableUpdateImpl.copy(upstream, ModifiedColumnSet.ALL);
dependent.notifyListeners(downstream);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,8 @@ protected void process() {
@Override
public void onUpdate(final TableUpdate upstream) {
checkRightTableSizeZeroKeys(leftTable, rightTable, exactMatch);
final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream);
downstream.modifiedColumnSet = result.getModifiedColumnSetForUpdates();
final TableUpdateImpl downstream =
TableUpdateImpl.copy(upstream, result.getModifiedColumnSetForUpdates());
leftTransformer.clearAndTransform(upstream.modifiedColumnSet(),
downstream.modifiedColumnSet);
result.notifyListeners(downstream);
Expand Down Expand Up @@ -476,14 +476,13 @@ private static class LeftTickingListener extends BaseTable.ListenerImpl {

@Override
public void onUpdate(final TableUpdate upstream) {
final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream);
rowRedirection.removeAll(upstream.removed());

try (final RowSet prevRowSet = leftTable.getRowSet().copyPrev()) {
rowRedirection.applyShift(prevRowSet, upstream.shifted());
}

downstream.modifiedColumnSet = result.getModifiedColumnSetForUpdates();
final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream, result.getModifiedColumnSetForUpdates());
leftTransformer.clearAndTransform(upstream.modifiedColumnSet(), downstream.modifiedColumnSet);

if (upstream.modifiedColumnSet().containsAny(leftKeyColumns)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1821,8 +1821,8 @@ private static class ViewOrUpdateViewListener extends ListenerImpl {

@Override
public void onUpdate(final TableUpdate upstream) {
final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream);
downstream.modifiedColumnSet = dependent.getModifiedColumnSetForUpdates();
final TableUpdateImpl downstream =
TableUpdateImpl.copy(upstream, dependent.getModifiedColumnSetForUpdates());
transformer.clearAndTransform(upstream.modifiedColumnSet(), downstream.modifiedColumnSet);
dependent.notifyListeners(downstream);
}
Expand Down Expand Up @@ -1896,19 +1896,19 @@ public Table dropColumns(String... columnNames) {
"dropColumns(" + Arrays.deepToString(columnNames) + ')', this, resultTable) {
@Override
public void onUpdate(final TableUpdate upstream) {
final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream);
final TableUpdateImpl downstream;
final ModifiedColumnSet resultModifiedColumnSet =
resultTable.getModifiedColumnSetForUpdates();
mcsTransformer.clearAndTransform(upstream.modifiedColumnSet(),
resultModifiedColumnSet);
if (upstream.modified().isEmpty() || resultModifiedColumnSet.empty()) {
downstream.modifiedColumnSet = ModifiedColumnSet.EMPTY;
downstream = TableUpdateImpl.copy(upstream, ModifiedColumnSet.EMPTY);
if (downstream.modified().isNonempty()) {
downstream.modified().close();
downstream.modified = RowSetFactory.empty();
}
} else {
downstream.modifiedColumnSet = resultModifiedColumnSet;
downstream = TableUpdateImpl.copy(upstream, resultModifiedColumnSet);
}
resultTable.notifyListeners(downstream);
}
Expand Down Expand Up @@ -2039,13 +2039,14 @@ private Table renameColumnsImpl(
methodNuggetPrefix + pairsLogString + ')', this, resultTable) {
@Override
public void onUpdate(final TableUpdate upstream) {
final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream);
final TableUpdateImpl downstream;
if (upstream.modified().isNonempty()) {
downstream.modifiedColumnSet = resultTable.getModifiedColumnSetForUpdates();
downstream = TableUpdateImpl.copy(upstream,
resultTable.getModifiedColumnSetForUpdates());
mcsTransformer.clearAndTransform(upstream.modifiedColumnSet(),
downstream.modifiedColumnSet);
} else {
downstream.modifiedColumnSet = ModifiedColumnSet.EMPTY;
downstream = TableUpdateImpl.copy(upstream, ModifiedColumnSet.EMPTY);
}
resultTable.notifyListeners(downstream);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ private static Table getShiftedColumns(@NotNull Table source, long shift, @NotNu
final BaseTable.ListenerImpl listener = new BaseTable.ListenerImpl("propagateUpdates", source, result) {
@Override
public void onUpdate(TableUpdate upstream) {
final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream);
downstream.modifiedColumnSet = downstreamColumnSet;
final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream, downstreamColumnSet);
mcsTransformer.clearAndTransform(
upstream.modifiedColumnSet(), downstream.modifiedColumnSet);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ private static Table sparseSelect(QueryTable source, String[] preserveColumns, S

@Override
public void onUpdate(TableUpdate upstream) {
final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream);
downstream.modifiedColumnSet = modifiedColumnSetForUpdates;
final TableUpdateImpl downstream =
TableUpdateImpl.copy(upstream, modifiedColumnSetForUpdates);
if (sparseObjectSources.length > 0) {
try (final RowSet removedOnly = upstream.removed().minus(upstream.added())) {
for (final ObjectSparseArraySource<?> objectSparseArraySource : sparseObjectSources) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,19 @@ public static TableUpdateImpl copy(@NotNull final TableUpdate tableUpdate) {
newMCS = new ModifiedColumnSet(oldMCS);
newMCS.setAll(oldMCS);
}
return copy(tableUpdate, newMCS);
}

/**
* Make a deep copy of a {@link TableUpdate} with the given {@code mcs}.
*/
public static TableUpdateImpl copy(@NotNull final TableUpdate tableUpdate, @NotNull final ModifiedColumnSet mcs) {
return new TableUpdateImpl(
tableUpdate.added().copy(), tableUpdate.removed().copy(), tableUpdate.modified().copy(),
tableUpdate.shifted(), newMCS);
tableUpdate.added().copy(),
tableUpdate.removed().copy(),
tableUpdate.modified().copy(),
tableUpdate.shifted(),
mcs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.util.hashing.ChunkEquals;
import io.deephaven.configuration.Configuration;
import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetShiftData;
Expand Down Expand Up @@ -219,8 +218,7 @@ private void onUpdate(final TableUpdate upstream) {
return;
}

final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream);
resultTable.notifyListeners(downstream);
resultTable.notifyListeners(upstream.acquire());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ public static Table convertModificationsToAddsAndRemoves(Table input) {
"convertModificationsToAddsAndRemoves", input, resultTable) {
@Override
public void onUpdate(TableUpdate upstream) {
final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream);
final TableUpdateImpl downstream = new TableUpdateImpl();
downstream.added = upstream.added().union(upstream.modified());
downstream.removed = upstream.removed().union(upstream.getModifiedPreShift());
downstream.modified = RowSetFactory.empty();
downstream.shifted = upstream.shifted();
downstream.modifiedColumnSet = ModifiedColumnSet.EMPTY;
resultTable.notifyListeners(downstream);
}
Expand Down Expand Up @@ -121,9 +122,8 @@ public static Table removeSpuriousModifications(Table input) {

@Override
public void onUpdate(TableUpdate upstream) {
final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream);
downstream.modifiedColumnSet = resultTable.getModifiedColumnSetForUpdates();

final TableUpdateImpl downstream =
TableUpdateImpl.copy(upstream, resultTable.getModifiedColumnSetForUpdates());
if (downstream.modified().isEmpty()) {
identityTransformer.clearAndTransform(upstream.modifiedColumnSet(),
downstream.modifiedColumnSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ protected void process() {
// now add the new timestamps
addRowSequence(upstream.added(), rowKeyToEntry != null);

final TableUpdateImpl downstream = TableUpdateImpl.copy(upstream);
final TableUpdateImpl downstream =
TableUpdateImpl.copy(upstream, result.getModifiedColumnSetForUpdates());

try (final RowSet modifiedByTime = recomputeModified()) {
if (modifiedByTime.isNonempty()) {
Expand All @@ -355,7 +356,6 @@ protected void process() {
}

// everything that was added, removed, or modified stays added removed or modified
downstream.modifiedColumnSet = result.getModifiedColumnSetForUpdates();
if (downstream.modified.isNonempty()) {
mcsTransformer.clearAndTransform(upstream.modifiedColumnSet(), downstream.modifiedColumnSet);
downstream.modifiedColumnSet.setAll(mcsResultWindowColumn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ protected void process() {
rowSet.remove(upstream.removed());
upstream.shifted().apply(rowSet);
rowSet.insert(upstream.added());
final TableUpdateImpl copy = TableUpdateImpl.copy(upstream);
copy.modifiedColumnSet = result.getModifiedColumnSetForUpdates();
final TableUpdateImpl copy =
TableUpdateImpl.copy(upstream, result.getModifiedColumnSetForUpdates());
flatTransformer.clearAndTransform(upstream.modifiedColumnSet(), copy.modifiedColumnSet());
result.notifyListeners(copy);
return;
Expand All @@ -157,8 +157,8 @@ protected void process() {
rowSet.insert(upstream.added());

if (overheadTracker.overhead() <= permittedOverhead) {
final TableUpdateImpl copy = TableUpdateImpl.copy(upstream);
copy.modifiedColumnSet = result.getModifiedColumnSetForUpdates();
final TableUpdateImpl copy =
TableUpdateImpl.copy(upstream, result.getModifiedColumnSetForUpdates());
inputTransformer.clearAndTransform(upstream.modifiedColumnSet(), copy.modifiedColumnSet());
result.notifyListeners(copy);
return;
Expand Down

0 comments on commit 30d5b60

Please sign in to comment.