Skip to content

Commit

Permalink
fix: DH-18539: Fix incorrect snapshot results on historical sorted ro…
Browse files Browse the repository at this point in the history
…llups. (#6642)

Co-authored-by: Larry Booker <[email protected]>
  • Loading branch information
cpwright and lbooker42 authored Feb 14, 2025
1 parent 1348851 commit 0b48788
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 31 deletions.
6 changes: 6 additions & 0 deletions engine/api/src/main/java/io/deephaven/engine/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,16 @@ public interface Table extends
* implementation.
*/
String AGGREGATION_ROW_LOOKUP_ATTRIBUTE = "AggregationRowLookup";

/**
* Attribute on sort results used for hierarchical table construction. Specification is left to the implementation.
*/
String SORT_REVERSE_LOOKUP_ATTRIBUTE = "SortReverseLookup";
/**
* Attribute on sort results used for hierarchical table construction. Specification is left to the implementation.
*/
String SORT_ROW_REDIRECTION_ATTRIBUTE = "SortRowRedirection";

String SNAPSHOT_VIEWPORT_TYPE = "Snapshot";
/**
* This attribute is used internally by TableTools.merge to detect successive merges. Its presence indicates that it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,21 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.*;
import java.util.function.LongUnaryOperator;

import static io.deephaven.engine.table.Table.SORT_REVERSE_LOOKUP_ATTRIBUTE;
import static io.deephaven.engine.table.Table.SORT_ROW_REDIRECTION_ATTRIBUTE;

public class SortOperation implements QueryTable.MemoizableOperation<QueryTable> {
static final Map<String, Object> IDENTITY_REDIRECTION_ATTRIBUTES;
// The "+" sign is not valid in a column, therefore we can be sure that this is a proper sentinel value.
static final String IDENTITY_REDIRECTION_VALUE = "+IDENTITY_REDIRECTION";
static {
final HashMap<String, Object> identityRedirectionAttributes = new HashMap<>();
identityRedirectionAttributes.put(SORT_ROW_REDIRECTION_ATTRIBUTE, IDENTITY_REDIRECTION_VALUE);
IDENTITY_REDIRECTION_ATTRIBUTES = Collections.unmodifiableMap(identityRedirectionAttributes);
}

private final QueryTable parent;
private QueryTable resultTable;
Expand Down Expand Up @@ -135,14 +142,12 @@ private QueryTable historicalSort(SortHelpers.SortMapping sortedKeys) {
final TrackingRowSet resultRowSet = RowSetFactory.flat(sortedKeys.size()).toTracking();

final Map<String, ColumnSource<?>> resultMap = new LinkedHashMap<>();
for (Map.Entry<String, ColumnSource<?>> stringColumnSourceEntry : parent.getColumnSourceMap().entrySet()) {
resultMap.put(stringColumnSourceEntry.getKey(),
RedirectedColumnSource.maybeRedirect(sortMapping, stringColumnSourceEntry.getValue()));
}
final String sortMappingColumnName = populateRedirectedColumns(resultMap, sortMapping);

resultTable = new QueryTable(resultRowSet, resultMap);
parent.copyAttributes(resultTable, BaseTable.CopyAttributeOperation.Sort);
resultTable.setFlat();
resultTable.setAttribute(SORT_ROW_REDIRECTION_ATTRIBUTE, sortMappingColumnName);
setSorted(resultTable);
return resultTable;
}
Expand Down Expand Up @@ -228,7 +233,8 @@ private void setSorted(QueryTable table) {
}

private QueryTable withSorted(QueryTable table) {
return (QueryTable) SortedColumnsAttribute.withOrderForColumn(table, sortColumnNames[0], sortOrder[0]);
return (QueryTable) SortedColumnsAttribute.withOrderForColumn(table, sortColumnNames[0], sortOrder[0],
IDENTITY_REDIRECTION_ATTRIBUTES);
}

@Override
Expand Down Expand Up @@ -280,10 +286,7 @@ public Result<QueryTable> initialize(boolean usePrev, long beforeClock) {
sortMapping.writableCast().fillFromChunk(fillFromContext, LongChunk.chunkWrap(sortedKeys),
closer.add(resultRowSet.copy()));

for (Map.Entry<String, ColumnSource<?>> stringColumnSourceEntry : parent.getColumnSourceMap().entrySet()) {
resultMap.put(stringColumnSourceEntry.getKey(),
RedirectedColumnSource.maybeRedirect(sortMapping, stringColumnSourceEntry.getValue()));
}
String sortMappingColumnName = populateRedirectedColumns(resultMap, sortMapping);

// noinspection unchecked
final ColumnSource<Comparable<?>>[] sortedColumnsToSortBy =
Expand All @@ -298,6 +301,7 @@ public Result<QueryTable> initialize(boolean usePrev, long beforeClock) {

resultTable = new QueryTable(resultRowSet, resultMap);
parent.copyAttributes(resultTable, BaseTable.CopyAttributeOperation.Sort);
resultTable.setAttribute(SORT_ROW_REDIRECTION_ATTRIBUTE, sortMappingColumnName);
setReverseLookup(resultTable, (final long innerRowKey) -> {
final long outerRowKey = reverseLookup.get(innerRowKey);
return outerRowKey == reverseLookup.getNoEntryValue() ? RowSequence.NULL_ROW_KEY : outerRowKey;
Expand All @@ -320,19 +324,34 @@ public Result<QueryTable> initialize(boolean usePrev, long beforeClock) {
}
}

private String populateRedirectedColumns(Map<String, ColumnSource<?>> resultMap, RowRedirection sortMapping) {
// if nothing is actually redirected, we can use the identity value
String sortMappingColumnName = IDENTITY_REDIRECTION_VALUE;

for (Map.Entry<String, ColumnSource<?>> stringColumnSourceEntry : parent.getColumnSourceMap().entrySet()) {
final ColumnSource<?> innerSource = stringColumnSourceEntry.getValue();
final ColumnSource<?> redirectedSource = RedirectedColumnSource.maybeRedirect(sortMapping, innerSource);
resultMap.put(stringColumnSourceEntry.getKey(), redirectedSource);
if (redirectedSource != innerSource) {
sortMappingColumnName = stringColumnSourceEntry.getKey();
}
}
return sortMappingColumnName;
}

/**
* Get the row redirection for a sort result.
*
* @param sortResult The sort result table; <em>must</em> be the direct result of a sort.
* @return The row redirection if at least one column required redirection, otherwise {@code null}
* @return The row redirection for this table if at least one column required redirection, otherwise {@code null}
*/
public static RowRedirection getRowRedirection(@NotNull final Table sortResult) {
for (final ColumnSource<?> columnSource : sortResult.getColumnSources()) {
if (columnSource instanceof RedirectedColumnSource) {
return ((RedirectedColumnSource<?>) columnSource).getRowRedirection();
}
final String columnName = (String) sortResult.getAttribute(SORT_ROW_REDIRECTION_ATTRIBUTE);
if (columnName == null || columnName.equals(IDENTITY_REDIRECTION_VALUE)) {
return null;
}
return null;

return ((RedirectedColumnSource<?>) sortResult.getColumnSource(columnName)).getRowRedirection();
}

/**
Expand All @@ -351,7 +370,7 @@ public static RowRedirection getRowRedirection(@NotNull final Table sortResult)
*
* @param parent The sort input table; must have been sorted in order to produce {@code sortResult}
* @param sortResult The sort result table; <em>must</em> be the direct result of a sort on {@code parent}
* @return The reverse lookup
* @return The reverse lookup, or null if no redirection is performed.
*/
public static LongUnaryOperator getReverseLookup(@NotNull final Table parent, @NotNull final Table sortResult) {
if (BlinkTableTools.isBlink(parent)) {
Expand All @@ -365,9 +384,8 @@ public static LongUnaryOperator getReverseLookup(@NotNull final Table parent, @N
return (LongUnaryOperator) value;
}
final RowRedirection sortRedirection = getRowRedirection(sortResult);
if (sortRedirection == null || sortRedirection == getRowRedirection(parent)) {
// Static table was already sorted
return LongUnaryOperator.identity();
if (sortRedirection == null) {
return null;
}
final HashMapK4V4 reverseLookup = new HashMapLockFreeK4V4(sortResult.intSize(), .75f, RowSequence.NULL_ROW_KEY);
try (final LongColumnIterator innerRowKeys =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,28 @@ public static Table withOrderForColumn(Table table, String columnName, SortingOr
return table.withAttributes(Map.of(Table.SORTED_COLUMNS_ATTRIBUTE, newAttribute));
}

/**
* Ensure that the result table is marked as sorted by the given column.
*
* @param table the table to update
* @param columnName the column to update
* @param order the order that the column is sorted in
* @return {@code table}, or a copy of it with the necessary attribute set
*/
public static Table withOrderForColumn(Table table, String columnName, SortingOrder order,
Map<String, ?> additionalAttributes) {
final String oldAttribute = (String) table.getAttribute(Table.SORTED_COLUMNS_ATTRIBUTE);
final String newAttribute = setOrderForColumn(oldAttribute, columnName, order);
if (additionalAttributes.isEmpty()) {
return table.withAttributes(Map.of(Table.SORTED_COLUMNS_ATTRIBUTE, newAttribute));
} else {
final Map<String, Object> attributesToAdd = new LinkedHashMap<>();
attributesToAdd.putAll(additionalAttributes);
attributesToAdd.put(Table.SORTED_COLUMNS_ATTRIBUTE, newAttribute);
return table.withAttributes(attributesToAdd);
}
}

/**
* Get the columns a {@link Table} is sorted by.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ private void doFillChunk(@NotNull final ColumnSource.FillContext context,

if (ascendingMapping) {
effectiveContext.doOrderedFillAscending(innerSource, usePrev, destination);
} else if (innerSource instanceof FillUnordered) {
} else if (FillUnordered.providesFillUnordered(innerSource)) {
// noinspection unchecked
effectiveContext.doUnorderedFill((FillUnordered<Values>) innerSource, usePrev, destination);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@
package io.deephaven.engine.table.impl;

import io.deephaven.api.ColumnName;
import io.deephaven.api.agg.Aggregation;
import io.deephaven.api.agg.spec.AggSpec;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.csv.CsvTools;
import io.deephaven.csv.util.CsvReaderException;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.rowset.RowSequenceFactory;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.table.*;
Expand All @@ -20,14 +25,17 @@
import io.deephaven.engine.table.impl.sources.chunkcolumnsource.ChunkColumnSource;
import io.deephaven.engine.testutil.ControlledUpdateGraph;
import io.deephaven.engine.testutil.junit4.EngineCleanup;
import io.deephaven.engine.util.TableTools;
import io.deephaven.test.types.OutOfBandTest;

import junit.framework.TestCase;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import java.io.ByteArrayInputStream;
import java.time.Instant;
import java.util.Arrays;
import java.util.BitSet;
Expand All @@ -44,13 +52,8 @@
import static io.deephaven.engine.table.impl.sources.ReinterpretUtils.byteToBooleanSource;
import static io.deephaven.engine.table.impl.sources.ReinterpretUtils.longToInstantSource;
import static io.deephaven.engine.table.impl.sources.ReinterpretUtils.maybeConvertToPrimitiveChunkType;
import static io.deephaven.engine.testutil.TstUtils.addToTable;
import static io.deephaven.engine.testutil.TstUtils.assertTableEquals;
import static io.deephaven.engine.testutil.TstUtils.testRefreshingTable;
import static io.deephaven.engine.util.TableTools.booleanCol;
import static io.deephaven.engine.util.TableTools.byteCol;
import static io.deephaven.engine.util.TableTools.intCol;
import static io.deephaven.engine.util.TableTools.newTable;
import static io.deephaven.engine.testutil.TstUtils.*;
import static io.deephaven.engine.util.TableTools.*;
import static io.deephaven.util.QueryConstants.NULL_INT;
import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -201,6 +204,62 @@ public void testTreeSnapshotSatisfaction() throws ExecutionException, Interrupte
concurrentExecutor.shutdown();
}

@Test
public void testSortedExpandAll() throws CsvReaderException {
final String data = "A,B,C,N\n" +
"Apple,One,Alpha,1\n" +
"Apple,One,Alpha,2\n" +
"Apple,One,Bravo,3\n" +
"Apple,One,Bravo,4\n" +
"Apple,One,Bravo,5\n" +
"Apple,One,Bravo,6\n" +
"Banana,Two,Alpha,7\n" +
"Banana,Two,Alpha,8\n" +
"Banana,Two,Bravo,3\n" +
"Banana,Two,Bravo,4\n" +
"Banana,Three,Bravo,1\n" +
"Banana,Three,Bravo,1\n";

final Table source = CsvTools.readCsv(new ByteArrayInputStream(data.getBytes()));

TableTools.show(source);
final RollupTable rollupTable = source.rollup(List.of(Aggregation.of(AggSpec.sum(), "N")), "A", "B", "C");
final RollupTable sortedRollup = rollupTable.withNodeOperations(
rollupTable.makeNodeOperationsRecorder(RollupTable.NodeType.Aggregated).sortDescending("N"));

final String[] arrayWithNull = new String[1];
final Table keyTable = newTable(
intCol(rollupTable.getRowDepthColumn().name(), 0),
stringCol("A", arrayWithNull),
stringCol("B", arrayWithNull),
stringCol("C", arrayWithNull),
byteCol("Action", HierarchicalTable.KEY_TABLE_ACTION_EXPAND_ALL));

final SnapshotState ss = rollupTable.makeSnapshotState();
final Table snapshot =
snapshotToTable(rollupTable, ss, keyTable, ColumnName.of("Action"), null, RowSetFactory.flat(30));
TableTools.showWithRowSet(snapshot);

final SnapshotState ssSort = sortedRollup.makeSnapshotState();

final Table snapshotSort =
snapshotToTable(sortedRollup, ssSort, keyTable, ColumnName.of("Action"), null, RowSetFactory.flat(30));
TableTools.showWithRowSet(snapshotSort);

// first we know that the size of the tables must be the same
TestCase.assertEquals(snapshot.size(), snapshotSort.size());
// and the first row must be the same, because it is the parent
assertTableEquals(snapshot.head(1), snapshotSort.head(1));
// then we have six rows of banana, and that should be identical
assertTableEquals(snapshot.slice(5, 11), snapshotSort.slice(1, 7));
// then we need to check on the apple rows, but those are not actually identical because of sorting
Table appleExpected = snapshot.where("A=`Apple`").sortDescending("N");
assertTableEquals(appleExpected, snapshotSort.slice(7, 11));

freeSnapshotTableChunks(snapshot);
freeSnapshotTableChunks(snapshotSort);
}

@SuppressWarnings("SameParameterValue")
private static Table snapshotToTable(
@NotNull final HierarchicalTable<?> hierarchicalTable,
Expand Down
2 changes: 1 addition & 1 deletion go/pkg/client/example_import_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func Example_importTable() {
// metadata: ["deephaven:isDateFormat": "false", "deephaven:isNumberFormat": "false", "deephaven:isPartitioning": "false", "deephaven:isRowStyle": "false", "deephaven:isSortable": "true", "deephaven:isStyle": "false", "deephaven:type": "float"]
// - Volume: type=int32, nullable
// metadata: ["deephaven:isDateFormat": "false", "deephaven:isNumberFormat": "false", "deephaven:isPartitioning": "false", "deephaven:isRowStyle": "false", "deephaven:isSortable": "true", "deephaven:isStyle": "false", "deephaven:type": "int"]
// metadata: ["deephaven:attribute.AddOnly": "true", "deephaven:attribute.AppendOnly": "true", "deephaven:attribute.SortedColumns": "Close=Ascending", "deephaven:attribute_type.AddOnly": "java.lang.Boolean", "deephaven:attribute_type.AppendOnly": "java.lang.Boolean", "deephaven:attribute_type.SortedColumns": "java.lang.String"]
// metadata: ["deephaven:attribute.AddOnly": "true", "deephaven:attribute.AppendOnly": "true", "deephaven:attribute.SortRowRedirection": "Volume", "deephaven:attribute.SortedColumns": "Close=Ascending", "deephaven:attribute_type.AddOnly": "java.lang.Boolean", "deephaven:attribute_type.AppendOnly": "java.lang.Boolean", "deephaven:attribute_type.SortRowRedirection": "java.lang.String", "deephaven:attribute_type.SortedColumns": "java.lang.String"]
// rows: 5
// col[0][Ticker]: ["IBM" "XRX" "XYZZY" "GME" "ZNGA"]
// col[1][Close]: [38.7 53.8 88.5 453 544.9]
Expand Down

0 comments on commit 0b48788

Please sign in to comment.