From 6e7dc09e5a94ccb58c2c1930120f3526ccc6a467 Mon Sep 17 00:00:00 2001 From: "Charles P. Wright" Date: Fri, 14 Feb 2025 13:32:44 -0500 Subject: [PATCH] fix: DH-18539: Fix incorrect snapshot results on historical sorted rollups. (#6642) Co-authored-by: Larry Booker --- .../java/io/deephaven/engine/table/Table.java | 6 ++ .../engine/table/impl/SortOperation.java | 62 ++++++++++------ .../table/impl/SortedColumnsAttribute.java | 22 ++++++ .../impl/sources/RedirectedColumnSource.java | 2 +- .../impl/TestHierarchicalTableSnapshots.java | 73 +++++++++++++++++-- go/pkg/client/example_import_table_test.go | 2 +- 6 files changed, 136 insertions(+), 31 deletions(-) diff --git a/engine/api/src/main/java/io/deephaven/engine/table/Table.java b/engine/api/src/main/java/io/deephaven/engine/table/Table.java index c784a10fedb..2a258b2c3c8 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/Table.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/Table.java @@ -179,10 +179,16 @@ public interface Table extends * implementation. */ String AGGREGATION_ROW_LOOKUP_ATTRIBUTE = "AggregationRowLookup"; + /** * Attribute on sort results used for hierarchical table construction. Specification is left to the implementation. */ String SORT_REVERSE_LOOKUP_ATTRIBUTE = "SortReverseLookup"; + /** + * Attribute on sort results used for hierarchical table construction. Specification is left to the implementation. + */ + String SORT_ROW_REDIRECTION_ATTRIBUTE = "SortRowRedirection"; + String SNAPSHOT_VIEWPORT_TYPE = "Snapshot"; /** * This attribute is used internally by TableTools.merge to detect successive merges. Its presence indicates that it diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SortOperation.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SortOperation.java index 6bb82d86e58..a5451b4f32e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SortOperation.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SortOperation.java @@ -26,14 +26,21 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import java.util.Arrays; -import java.util.LinkedHashMap; -import java.util.Map; +import java.util.*; import java.util.function.LongUnaryOperator; import static io.deephaven.engine.table.Table.SORT_REVERSE_LOOKUP_ATTRIBUTE; +import static io.deephaven.engine.table.Table.SORT_ROW_REDIRECTION_ATTRIBUTE; public class SortOperation implements QueryTable.MemoizableOperation { + static final Map IDENTITY_REDIRECTION_ATTRIBUTES; + // The "+" sign is not valid in a column, therefore we can be sure that this is a proper sentinel value. + static final String IDENTITY_REDIRECTION_VALUE = "+IDENTITY_REDIRECTION"; + static { + final HashMap identityRedirectionAttributes = new HashMap<>(); + identityRedirectionAttributes.put(SORT_ROW_REDIRECTION_ATTRIBUTE, IDENTITY_REDIRECTION_VALUE); + IDENTITY_REDIRECTION_ATTRIBUTES = Collections.unmodifiableMap(identityRedirectionAttributes); + } private final QueryTable parent; private QueryTable resultTable; @@ -135,14 +142,12 @@ private QueryTable historicalSort(SortHelpers.SortMapping sortedKeys) { final TrackingRowSet resultRowSet = RowSetFactory.flat(sortedKeys.size()).toTracking(); final Map> resultMap = new LinkedHashMap<>(); - for (Map.Entry> stringColumnSourceEntry : parent.getColumnSourceMap().entrySet()) { - resultMap.put(stringColumnSourceEntry.getKey(), - RedirectedColumnSource.maybeRedirect(sortMapping, stringColumnSourceEntry.getValue())); - } + final String sortMappingColumnName = populateRedirectedColumns(resultMap, sortMapping); resultTable = new QueryTable(resultRowSet, resultMap); parent.copyAttributes(resultTable, BaseTable.CopyAttributeOperation.Sort); resultTable.setFlat(); + resultTable.setAttribute(SORT_ROW_REDIRECTION_ATTRIBUTE, sortMappingColumnName); setSorted(resultTable); return resultTable; } @@ -228,7 +233,8 @@ private void setSorted(QueryTable table) { } private QueryTable withSorted(QueryTable table) { - return (QueryTable) SortedColumnsAttribute.withOrderForColumn(table, sortColumnNames[0], sortOrder[0]); + return (QueryTable) SortedColumnsAttribute.withOrderForColumn(table, sortColumnNames[0], sortOrder[0], + IDENTITY_REDIRECTION_ATTRIBUTES); } @Override @@ -280,10 +286,7 @@ public Result initialize(boolean usePrev, long beforeClock) { sortMapping.writableCast().fillFromChunk(fillFromContext, LongChunk.chunkWrap(sortedKeys), closer.add(resultRowSet.copy())); - for (Map.Entry> stringColumnSourceEntry : parent.getColumnSourceMap().entrySet()) { - resultMap.put(stringColumnSourceEntry.getKey(), - RedirectedColumnSource.maybeRedirect(sortMapping, stringColumnSourceEntry.getValue())); - } + String sortMappingColumnName = populateRedirectedColumns(resultMap, sortMapping); // noinspection unchecked final ColumnSource>[] sortedColumnsToSortBy = @@ -298,6 +301,7 @@ public Result initialize(boolean usePrev, long beforeClock) { resultTable = new QueryTable(resultRowSet, resultMap); parent.copyAttributes(resultTable, BaseTable.CopyAttributeOperation.Sort); + resultTable.setAttribute(SORT_ROW_REDIRECTION_ATTRIBUTE, sortMappingColumnName); setReverseLookup(resultTable, (final long innerRowKey) -> { final long outerRowKey = reverseLookup.get(innerRowKey); return outerRowKey == reverseLookup.getNoEntryValue() ? RowSequence.NULL_ROW_KEY : outerRowKey; @@ -320,19 +324,34 @@ public Result initialize(boolean usePrev, long beforeClock) { } } + private String populateRedirectedColumns(Map> resultMap, RowRedirection sortMapping) { + // if nothing is actually redirected, we can use the identity value + String sortMappingColumnName = IDENTITY_REDIRECTION_VALUE; + + for (Map.Entry> stringColumnSourceEntry : parent.getColumnSourceMap().entrySet()) { + final ColumnSource innerSource = stringColumnSourceEntry.getValue(); + final ColumnSource redirectedSource = RedirectedColumnSource.maybeRedirect(sortMapping, innerSource); + resultMap.put(stringColumnSourceEntry.getKey(), redirectedSource); + if (redirectedSource != innerSource) { + sortMappingColumnName = stringColumnSourceEntry.getKey(); + } + } + return sortMappingColumnName; + } + /** * Get the row redirection for a sort result. * * @param sortResult The sort result table; must be the direct result of a sort. - * @return The row redirection if at least one column required redirection, otherwise {@code null} + * @return The row redirection for this table if at least one column required redirection, otherwise {@code null} */ public static RowRedirection getRowRedirection(@NotNull final Table sortResult) { - for (final ColumnSource columnSource : sortResult.getColumnSources()) { - if (columnSource instanceof RedirectedColumnSource) { - return ((RedirectedColumnSource) columnSource).getRowRedirection(); - } + final String columnName = (String) sortResult.getAttribute(SORT_ROW_REDIRECTION_ATTRIBUTE); + if (columnName == null || columnName.equals(IDENTITY_REDIRECTION_VALUE)) { + return null; } - return null; + + return ((RedirectedColumnSource) sortResult.getColumnSource(columnName)).getRowRedirection(); } /** @@ -351,7 +370,7 @@ public static RowRedirection getRowRedirection(@NotNull final Table sortResult) * * @param parent The sort input table; must have been sorted in order to produce {@code sortResult} * @param sortResult The sort result table; must be the direct result of a sort on {@code parent} - * @return The reverse lookup + * @return The reverse lookup, or null if no redirection is performed. */ public static LongUnaryOperator getReverseLookup(@NotNull final Table parent, @NotNull final Table sortResult) { if (BlinkTableTools.isBlink(parent)) { @@ -365,9 +384,8 @@ public static LongUnaryOperator getReverseLookup(@NotNull final Table parent, @N return (LongUnaryOperator) value; } final RowRedirection sortRedirection = getRowRedirection(sortResult); - if (sortRedirection == null || sortRedirection == getRowRedirection(parent)) { - // Static table was already sorted - return LongUnaryOperator.identity(); + if (sortRedirection == null) { + return null; } final HashMapK4V4 reverseLookup = new HashMapLockFreeK4V4(sortResult.intSize(), .75f, RowSequence.NULL_ROW_KEY); try (final LongColumnIterator innerRowKeys = diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SortedColumnsAttribute.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SortedColumnsAttribute.java index 31f2828bcb4..98e66f17410 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SortedColumnsAttribute.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SortedColumnsAttribute.java @@ -97,6 +97,28 @@ public static Table withOrderForColumn(Table table, String columnName, SortingOr return table.withAttributes(Map.of(Table.SORTED_COLUMNS_ATTRIBUTE, newAttribute)); } + /** + * Ensure that the result table is marked as sorted by the given column. + * + * @param table the table to update + * @param columnName the column to update + * @param order the order that the column is sorted in + * @return {@code table}, or a copy of it with the necessary attribute set + */ + public static Table withOrderForColumn(Table table, String columnName, SortingOrder order, + Map additionalAttributes) { + final String oldAttribute = (String) table.getAttribute(Table.SORTED_COLUMNS_ATTRIBUTE); + final String newAttribute = setOrderForColumn(oldAttribute, columnName, order); + if (additionalAttributes.isEmpty()) { + return table.withAttributes(Map.of(Table.SORTED_COLUMNS_ATTRIBUTE, newAttribute)); + } else { + final Map attributesToAdd = new LinkedHashMap<>(); + attributesToAdd.putAll(additionalAttributes); + attributesToAdd.put(Table.SORTED_COLUMNS_ATTRIBUTE, newAttribute); + return table.withAttributes(attributesToAdd); + } + } + /** * Get the columns a {@link Table} is sorted by. * diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/RedirectedColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/RedirectedColumnSource.java index 432daa0efe3..f07eed29ef8 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/RedirectedColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/RedirectedColumnSource.java @@ -523,7 +523,7 @@ private void doFillChunk(@NotNull final ColumnSource.FillContext context, if (ascendingMapping) { effectiveContext.doOrderedFillAscending(innerSource, usePrev, destination); - } else if (innerSource instanceof FillUnordered) { + } else if (FillUnordered.providesFillUnordered(innerSource)) { // noinspection unchecked effectiveContext.doUnorderedFill((FillUnordered) innerSource, usePrev, destination); } else { diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestHierarchicalTableSnapshots.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestHierarchicalTableSnapshots.java index 62c78fcb2d3..53d6595fb47 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/TestHierarchicalTableSnapshots.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/TestHierarchicalTableSnapshots.java @@ -4,10 +4,15 @@ package io.deephaven.engine.table.impl; import io.deephaven.api.ColumnName; +import io.deephaven.api.agg.Aggregation; +import io.deephaven.api.agg.spec.AggSpec; import io.deephaven.chunk.WritableChunk; import io.deephaven.chunk.attributes.Values; +import io.deephaven.csv.CsvTools; +import io.deephaven.csv.util.CsvReaderException; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.rowset.RowSequence; +import io.deephaven.engine.rowset.RowSequenceFactory; import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.rowset.RowSetShiftData; import io.deephaven.engine.table.*; @@ -20,14 +25,17 @@ import io.deephaven.engine.table.impl.sources.chunkcolumnsource.ChunkColumnSource; import io.deephaven.engine.testutil.ControlledUpdateGraph; import io.deephaven.engine.testutil.junit4.EngineCleanup; +import io.deephaven.engine.util.TableTools; import io.deephaven.test.types.OutOfBandTest; +import junit.framework.TestCase; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.io.ByteArrayInputStream; import java.time.Instant; import java.util.Arrays; import java.util.BitSet; @@ -44,13 +52,8 @@ import static io.deephaven.engine.table.impl.sources.ReinterpretUtils.byteToBooleanSource; import static io.deephaven.engine.table.impl.sources.ReinterpretUtils.longToInstantSource; import static io.deephaven.engine.table.impl.sources.ReinterpretUtils.maybeConvertToPrimitiveChunkType; -import static io.deephaven.engine.testutil.TstUtils.addToTable; -import static io.deephaven.engine.testutil.TstUtils.assertTableEquals; -import static io.deephaven.engine.testutil.TstUtils.testRefreshingTable; -import static io.deephaven.engine.util.TableTools.booleanCol; -import static io.deephaven.engine.util.TableTools.byteCol; -import static io.deephaven.engine.util.TableTools.intCol; -import static io.deephaven.engine.util.TableTools.newTable; +import static io.deephaven.engine.testutil.TstUtils.*; +import static io.deephaven.engine.util.TableTools.*; import static io.deephaven.util.QueryConstants.NULL_INT; import static org.assertj.core.api.Assertions.assertThat; @@ -201,6 +204,62 @@ public void testTreeSnapshotSatisfaction() throws ExecutionException, Interrupte concurrentExecutor.shutdown(); } + @Test + public void testSortedExpandAll() throws CsvReaderException { + final String data = "A,B,C,N\n" + + "Apple,One,Alpha,1\n" + + "Apple,One,Alpha,2\n" + + "Apple,One,Bravo,3\n" + + "Apple,One,Bravo,4\n" + + "Apple,One,Bravo,5\n" + + "Apple,One,Bravo,6\n" + + "Banana,Two,Alpha,7\n" + + "Banana,Two,Alpha,8\n" + + "Banana,Two,Bravo,3\n" + + "Banana,Two,Bravo,4\n" + + "Banana,Three,Bravo,1\n" + + "Banana,Three,Bravo,1\n"; + + final Table source = CsvTools.readCsv(new ByteArrayInputStream(data.getBytes())); + + TableTools.show(source); + final RollupTable rollupTable = source.rollup(List.of(Aggregation.of(AggSpec.sum(), "N")), "A", "B", "C"); + final RollupTable sortedRollup = rollupTable.withNodeOperations( + rollupTable.makeNodeOperationsRecorder(RollupTable.NodeType.Aggregated).sortDescending("N")); + + final String[] arrayWithNull = new String[1]; + final Table keyTable = newTable( + intCol(rollupTable.getRowDepthColumn().name(), 0), + stringCol("A", arrayWithNull), + stringCol("B", arrayWithNull), + stringCol("C", arrayWithNull), + byteCol("Action", HierarchicalTable.KEY_TABLE_ACTION_EXPAND_ALL)); + + final SnapshotState ss = rollupTable.makeSnapshotState(); + final Table snapshot = + snapshotToTable(rollupTable, ss, keyTable, ColumnName.of("Action"), null, RowSetFactory.flat(30)); + TableTools.showWithRowSet(snapshot); + + final SnapshotState ssSort = sortedRollup.makeSnapshotState(); + + final Table snapshotSort = + snapshotToTable(sortedRollup, ssSort, keyTable, ColumnName.of("Action"), null, RowSetFactory.flat(30)); + TableTools.showWithRowSet(snapshotSort); + + // first we know that the size of the tables must be the same + TestCase.assertEquals(snapshot.size(), snapshotSort.size()); + // and the first row must be the same, because it is the parent + assertTableEquals(snapshot.head(1), snapshotSort.head(1)); + // then we have six rows of banana, and that should be identical + assertTableEquals(snapshot.slice(5, 11), snapshotSort.slice(1, 7)); + // then we need to check on the apple rows, but those are not actually identical because of sorting + Table appleExpected = snapshot.where("A=`Apple`").sortDescending("N"); + assertTableEquals(appleExpected, snapshotSort.slice(7, 11)); + + freeSnapshotTableChunks(snapshot); + freeSnapshotTableChunks(snapshotSort); + } + @SuppressWarnings("SameParameterValue") private static Table snapshotToTable( @NotNull final HierarchicalTable hierarchicalTable, diff --git a/go/pkg/client/example_import_table_test.go b/go/pkg/client/example_import_table_test.go index 133f33445bc..96ad85ebb00 100644 --- a/go/pkg/client/example_import_table_test.go +++ b/go/pkg/client/example_import_table_test.go @@ -93,7 +93,7 @@ func Example_importTable() { // metadata: ["deephaven:isDateFormat": "false", "deephaven:isNumberFormat": "false", "deephaven:isPartitioning": "false", "deephaven:isRowStyle": "false", "deephaven:isSortable": "true", "deephaven:isStyle": "false", "deephaven:type": "float"] // - Volume: type=int32, nullable // metadata: ["deephaven:isDateFormat": "false", "deephaven:isNumberFormat": "false", "deephaven:isPartitioning": "false", "deephaven:isRowStyle": "false", "deephaven:isSortable": "true", "deephaven:isStyle": "false", "deephaven:type": "int"] - // metadata: ["deephaven:attribute.AddOnly": "true", "deephaven:attribute.AppendOnly": "true", "deephaven:attribute.SortedColumns": "Close=Ascending", "deephaven:attribute_type.AddOnly": "java.lang.Boolean", "deephaven:attribute_type.AppendOnly": "java.lang.Boolean", "deephaven:attribute_type.SortedColumns": "java.lang.String"] + // metadata: ["deephaven:attribute.AddOnly": "true", "deephaven:attribute.AppendOnly": "true", "deephaven:attribute.SortRowRedirection": "Volume", "deephaven:attribute.SortedColumns": "Close=Ascending", "deephaven:attribute_type.AddOnly": "java.lang.Boolean", "deephaven:attribute_type.AppendOnly": "java.lang.Boolean", "deephaven:attribute_type.SortRowRedirection": "java.lang.String", "deephaven:attribute_type.SortedColumns": "java.lang.String"] // rows: 5 // col[0][Ticker]: ["IBM" "XRX" "XYZZY" "GME" "ZNGA"] // col[1][Close]: [38.7 53.8 88.5 453 544.9]