diff --git a/engine/api/src/main/java/io/deephaven/engine/table/Table.java b/engine/api/src/main/java/io/deephaven/engine/table/Table.java index f06946b2d9c..56091230495 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/Table.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/Table.java @@ -675,7 +675,7 @@ RollupTable rollup(Collection aggregations, boolean inclu /** * Get a {@link Table} that contains a sub-set of the rows from {@code this}. The result will share the same * {@link #getColumnSources() column sources} and {@link #getDefinition() definition} as this table. - * + *

* The result will not update on its own. The caller must also establish an appropriate listener to update * {@code rowSet} and propagate {@link TableUpdate updates}. * diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index d438979cee2..72c4d20aff0 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -3233,10 +3233,10 @@ public void propagateFlatness(QueryTable result) { /** * Get a {@link Table} that contains a sub-set of the rows from {@code this}. The result will share the same * {@link #getColumnSources() column sources} and {@link #getDefinition() definition} as this table. - * + *

* The result will not update on its own, the caller must also establish an appropriate listener to update * {@code rowSet} and propagate {@link TableUpdate updates}. - * + *

* No {@link QueryPerformanceNugget nugget} is opened for this table, to prevent operations that call this * repeatedly from having an inordinate performance penalty. If callers require a nugget, they must create one in * the enclosing operation. @@ -3252,16 +3252,38 @@ public QueryTable getSubTable(@NotNull final TrackingRowSet rowSet) { } } + /** + * Get a {@link Table} that adds, or overwrites, columns from {@code this}. The result will share the same + * {@link #getRowSet() row set} as this table. + *

+ * The result will not update on its own. The caller must also establish an appropriate listener to update the + * provided column sources and propagate {@link TableUpdate updates}. + *

+ * No attributes are propagated to the result table. + * + * @param additionalSources The additional columns to add or overwrite + * @return A new table with the additional columns + */ + public QueryTable withAdditionalColumns(@NotNull final Map> additionalSources) { + final UpdateGraph updateGraph = getUpdateGraph(); + try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) { + final LinkedHashMap> columns = new LinkedHashMap<>(this.columns); + columns.putAll(additionalSources); + final TableDefinition definition = TableDefinition.inferFrom(columns); + return new QueryTable(definition, rowSet, columns, null, null); + } + } + /** * Get a {@link Table} that contains a sub-set of the rows from {@code this}. The result will share the same * {@link #getColumnSources() column sources} and {@link #getDefinition() definition} as this table. - * + *

* The result will not update on its own, the caller must also establish an appropriate listener to update * {@code rowSet} and propagate {@link TableUpdate updates}. - * + *

* This method is intended to be used for composing alternative engine operations, in particular * {@link #partitionBy(boolean, String...)}. - * + *

* No {@link QueryPerformanceNugget nugget} is opened for this table, to prevent operations that call this * repeatedly from having an inordinate performance penalty. If callers require a nugget, they must create one in * the enclosing operation. diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SortHelpers.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SortHelpers.java index 0ff68f8acc8..6378fec6c18 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SortHelpers.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SortHelpers.java @@ -233,7 +233,7 @@ static SortMapping getSortedKeys(SortingOrder[] order, ColumnSource>[] columnsToSortBy, RowSet rowSetToSort, boolean usePrev, boolean allowSymbolTable) { - if (rowSetToSort.size() == 0) { + if (rowSetToSort.isEmpty()) { return EMPTY_SORT_MAPPING; } @@ -300,7 +300,9 @@ private int doIntLookup(long symTabId) { return lookupTable[region][id]; } - private static SparseSymbolMapping createMapping(LongChunk originalSymbol, IntChunk mappedIndex) { + private static SparseSymbolMapping createMapping( + @NotNull final LongChunk originalSymbol, + @NotNull final LongChunk mappedIndex) { // figure out what the maximum region is, and determine how many bits of it there are int maxUpperPart = 0; int minTrailing = 32; @@ -330,7 +332,7 @@ private static SparseSymbolMapping createMapping(LongChunk originalSymbol, IntCh final long symTabId = originalSymbol.get(ii); final int region = (int) (symTabId >> (32 + minTrailing)); final int id = (int) symTabId; - final int mappedId = mappedIndex.get(ii); + final int mappedId = Math.toIntExact(mappedIndex.get(ii)); maxMapping = Math.max(maxMapping, mappedId); lookupTable[region][id] = mappedId; } @@ -340,14 +342,13 @@ private static SparseSymbolMapping createMapping(LongChunk originalSymbol, IntCh } private static final String SORTED_INDEX_COLUMN_NAME = "SortedIndex"; - private static final String SORTED_INDEX_COLUMN_UPDATE = SORTED_INDEX_COLUMN_NAME + "=i"; private static SortMapping doSymbolTableMapping(SortingOrder order, ColumnSource> columnSource, RowSet rowSet, boolean usePrev) { final int sortSize = rowSet.intSize(); final ColumnSource reinterpreted = columnSource.reinterpret(long.class); - final Table symbolTable = ((SymbolTableSource) columnSource).getStaticSymbolTable(rowSet, true); + final Table symbolTable = ((SymbolTableSource) columnSource).getStaticSymbolTable(rowSet, true); if (symbolTable.size() >= sortSize) { // the very first thing we will do is sort the symbol table, using a regular sort; if it is larger than the @@ -355,8 +356,16 @@ private static SortMapping doSymbolTableMapping(SortingOrder order, ColumnSource return getSortMappingOne(order, columnSource, rowSet, usePrev); } - final Table idMapping = symbolTable.sort(SymbolTableSource.SYMBOL_COLUMN_NAME) - .groupBy(SymbolTableSource.SYMBOL_COLUMN_NAME).update(SORTED_INDEX_COLUMN_UPDATE).ungroup() + final QueryTable groupedSymbols = (QueryTable) symbolTable.sort(SymbolTableSource.SYMBOL_COLUMN_NAME) + .groupBy(SymbolTableSource.SYMBOL_COLUMN_NAME).coalesce(); + final Map> extraColumn; + if (groupedSymbols.isFlat()) { + extraColumn = Map.of(SORTED_INDEX_COLUMN_NAME, RowKeyColumnSource.INSTANCE); + } else { + extraColumn = Map.of(SORTED_INDEX_COLUMN_NAME, new RowPositionColumnSource(groupedSymbols.getRowSet())); + } + final Table idMapping = groupedSymbols.withAdditionalColumns(extraColumn) + .ungroup() .view(SymbolTableSource.ID_COLUMN_NAME, SORTED_INDEX_COLUMN_NAME); final int symbolEntries = idMapping.intSize(); @@ -364,13 +373,13 @@ private static SortMapping doSymbolTableMapping(SortingOrder order, ColumnSource final SparseSymbolMapping mapping; try (final WritableLongChunk originalSymbol = WritableLongChunk.makeWritableChunk(symbolEntries); - final WritableIntChunk mappedIndex = WritableIntChunk.makeWritableChunk(symbolEntries)) { - final ColumnSource idSource = idMapping.getColumnSource(SymbolTableSource.ID_COLUMN_NAME); + final WritableLongChunk mappedIndex = WritableLongChunk.makeWritableChunk(symbolEntries)) { + final ColumnSource idSource = idMapping.getColumnSource(SymbolTableSource.ID_COLUMN_NAME); try (final ColumnSource.FillContext idContext = idSource.makeFillContext(symbolEntries)) { idSource.fillChunk(idContext, originalSymbol, idMapping.getRowSet()); } - final ColumnSource sortedRowSetSource = idMapping.getColumnSource(SORTED_INDEX_COLUMN_NAME); + final ColumnSource sortedRowSetSource = idMapping.getColumnSource(SORTED_INDEX_COLUMN_NAME); try (final ColumnSource.FillContext sortedIndexContext = sortedRowSetSource.makeFillContext(symbolEntries)) { sortedRowSetSource.fillChunk(sortedIndexContext, mappedIndex, idMapping.getRowSet()); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SortOperation.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SortOperation.java index 8977bc3aa8e..45c3bc84c1b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SortOperation.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SortOperation.java @@ -131,7 +131,7 @@ private QueryTable historicalSort(SortHelpers.SortMapping sortedKeys) { } @NotNull - private Result streamSort(@NotNull final SortHelpers.SortMapping initialSortedKeys) { + private Result blinkTableSort(@NotNull final SortHelpers.SortMapping initialSortedKeys) { final LongChunkColumnSource initialInnerRedirectionSource = new LongChunkColumnSource(); if (initialSortedKeys.size() > 0) { initialInnerRedirectionSource @@ -173,7 +173,7 @@ public void onUpdate(@NotNull final TableUpdate upstream) { } final SortHelpers.SortMapping updateSortedKeys = - SortHelpers.getSortedKeys(sortOrder, sortColumns, upstream.added(), false); + SortHelpers.getSortedKeys(sortOrder, sortColumns, upstream.added(), false, false); final LongChunkColumnSource recycled = recycledInnerRedirectionSource.getValue(); recycledInnerRedirectionSource.setValue(null); final LongChunkColumnSource updateInnerRedirectSource = @@ -220,7 +220,7 @@ public Result initialize(boolean usePrev, long beforeClock) { final RowSet indexToUse = usePrev ? prevIndex : parent.getRowSet(); final SortHelpers.SortMapping sortedKeys = SortHelpers.getSortedKeys(sortOrder, sortColumns, indexToUse, usePrev); - return streamSort(sortedKeys); + return blinkTableSort(sortedKeys); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ChunkedOperatorAggregationHelper.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ChunkedOperatorAggregationHelper.java index 5c600e2a63f..76608e67c4e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ChunkedOperatorAggregationHelper.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/ChunkedOperatorAggregationHelper.java @@ -303,7 +303,14 @@ public void onFailureInternal(@NotNull final Throwable originalException, Entry snapshotControl.setListenerAndResult(listener, result); } - return ac.transformResult(result); + final QueryTable finalResult = ac.transformResult(result); + final boolean noInitialKeys = initialKeys == null || (!initialKeys.isRefreshing() && initialKeys.isEmpty()); + if (!input.isRefreshing() && finalResult.getRowSet().isFlat()) { + finalResult.setFlat(); + } else if ((input.isAddOnly() || input.isAppendOnly() || input.isBlink()) && (noInitialKeys || preserveEmpty)) { + finalResult.setFlat(); + } + return finalResult; } private static OperatorAggregationStateManager makeStateManager( @@ -1580,7 +1587,11 @@ private static QueryTable staticGroupedAggregation(QueryTable withView, String k return keyToSlot::get; }); - return ac.transformResult(result); + final QueryTable finalResult = ac.transformResult(result); + if (finalResult.getRowSet().isFlat()) { + finalResult.setFlat(); + } + return finalResult; } private static void doGroupedAddition( @@ -2115,7 +2126,9 @@ public void onFailureInternal(@NotNull final Throwable originalException, ac.supplyRowLookup(() -> key -> Arrays.equals((Object[]) key, EMPTY_KEY) ? 0 : DEFAULT_UNKNOWN_ROW); - return ac.transformResult(result); + final QueryTable finalResult = ac.transformResult(result); + finalResult.setFlat(); + return finalResult; } private static void doNoKeyAddition(RowSequence index, AggregationContext ac, diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ArrayBackedColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ArrayBackedColumnSource.java index c438e8c1864..f3799952ec8 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ArrayBackedColumnSource.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ArrayBackedColumnSource.java @@ -37,10 +37,6 @@ import java.time.Instant; import java.util.Arrays; import java.util.Collection; -import java.util.Objects; -import java.util.concurrent.TimeUnit; - -import static io.deephaven.util.QueryConstants.NULL_LONG; /** * A ColumnSource backed by in-memory arrays of data. diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ArraySourceHelper.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ArraySourceHelper.java index b0cb7a9e796..574b50c6981 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ArraySourceHelper.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/ArraySourceHelper.java @@ -8,7 +8,6 @@ import io.deephaven.chunk.Chunk; import io.deephaven.chunk.ChunkType; import io.deephaven.chunk.attributes.Values; -import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.rowset.RowSequence; import io.deephaven.engine.table.ChunkSource; import io.deephaven.engine.table.ColumnSource; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/RowIdSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/RowIdSource.java deleted file mode 100644 index ff6538a1985..00000000000 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/RowIdSource.java +++ /dev/null @@ -1,20 +0,0 @@ -/** - * Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending - */ -package io.deephaven.engine.table.impl.sources; - -import io.deephaven.engine.table.impl.AbstractColumnSource; -import io.deephaven.engine.table.impl.ImmutableColumnSourceGetDefaults; - -public class RowIdSource extends AbstractColumnSource implements ImmutableColumnSourceGetDefaults.ForLong { - public static final RowIdSource INSTANCE = new RowIdSource(); - - public RowIdSource() { - super(Long.class); - } - - @Override - public long getLong(long rowKey) { - return rowKey; - } -} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/RowKeyColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/RowKeyColumnSource.java new file mode 100644 index 00000000000..4dd28914900 --- /dev/null +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/RowKeyColumnSource.java @@ -0,0 +1,97 @@ +/** + * Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.engine.table.impl.sources; + +import io.deephaven.chunk.LongChunk; +import io.deephaven.chunk.WritableChunk; +import io.deephaven.chunk.WritableLongChunk; +import io.deephaven.chunk.attributes.Values; +import io.deephaven.engine.rowset.RowSequence; +import io.deephaven.engine.rowset.chunkattributes.RowKeys; +import io.deephaven.engine.table.impl.AbstractColumnSource; +import io.deephaven.engine.table.impl.ImmutableColumnSourceGetDefaults; +import io.deephaven.util.QueryConstants; +import org.jetbrains.annotations.NotNull; + +/** + * This is a column source that uses no additional memory and is an identity mapping from row key to row key. + */ +public class RowKeyColumnSource extends AbstractColumnSource + implements ImmutableColumnSourceGetDefaults.ForLong, FillUnordered { + public static final RowKeyColumnSource INSTANCE = new RowKeyColumnSource(); + + public RowKeyColumnSource() { + super(Long.class); + } + + @Override + public long getLong(long rowKey) { + return rowKey < 0 ? QueryConstants.NULL_LONG : rowKey; + } + + @Override + public void fillChunk( + @NotNull final FillContext context, + @NotNull final WritableChunk destination, + @NotNull final RowSequence rowSequence) { + doFillChunk(destination, rowSequence); + } + + @Override + public void fillPrevChunk( + @NotNull final FillContext context, + @NotNull final WritableChunk destination, + @NotNull final RowSequence rowSequence) { + doFillChunk(destination, rowSequence); + } + + static void doFillChunk( + @NotNull final WritableChunk destination, + @NotNull final RowSequence rowSequence) { + final WritableLongChunk longChunk = destination.asWritableLongChunk(); + if (rowSequence.isContiguous()) { + final int size = rowSequence.intSize(); + longChunk.setSize(size); + final long firstRowKey = rowSequence.firstRowKey(); + for (int ii = 0; ii < size; ++ii) { + longChunk.set(ii, firstRowKey + ii); + } + } else { + rowSequence.fillRowKeyChunk(longChunk); + } + } + + @Override + public boolean providesFillUnordered() { + return true; + } + + @Override + public void fillChunkUnordered( + @NotNull final FillContext context, + @NotNull final WritableChunk dest, + @NotNull final LongChunk keys) { + doFillUnordered(dest, keys); + } + + + @Override + public void fillPrevChunkUnordered( + @NotNull final FillContext context, + @NotNull final WritableChunk dest, + @NotNull final LongChunk keys) { + doFillUnordered(dest, keys); + } + + private void doFillUnordered( + @NotNull final WritableChunk dest, + @NotNull final LongChunk keys) { + final WritableLongChunk longChunk = dest.asWritableLongChunk(); + longChunk.setSize(keys.size()); + for (int ii = 0; ii < keys.size(); ++ii) { + long key = keys.get(ii); + longChunk.set(ii, key < 0 ? QueryConstants.NULL_LONG : key); + } + } +} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/RowPositionColumnSource.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/RowPositionColumnSource.java new file mode 100644 index 00000000000..b5b1c9608bc --- /dev/null +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/RowPositionColumnSource.java @@ -0,0 +1,58 @@ +/** + * Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.engine.table.impl.sources; + +import io.deephaven.chunk.WritableChunk; +import io.deephaven.chunk.attributes.Values; +import io.deephaven.engine.rowset.RowSequence; +import io.deephaven.engine.rowset.RowSet; +import io.deephaven.engine.rowset.TrackingRowSet; +import io.deephaven.engine.table.impl.AbstractColumnSource; +import io.deephaven.engine.table.impl.MutableColumnSourceGetDefaults; +import io.deephaven.util.QueryConstants; +import org.jetbrains.annotations.NotNull; + +/** + * This is a column source that uses no additional memory and maps from row key to row position in the supplied row set. + */ +public class RowPositionColumnSource extends AbstractColumnSource + implements MutableColumnSourceGetDefaults.ForLong { + + private final TrackingRowSet rowSet; + + public RowPositionColumnSource(final TrackingRowSet rowSet) { + super(Long.class); + this.rowSet = rowSet; + } + + @Override + public long getLong(long rowKey) { + return rowKey < 0 ? QueryConstants.NULL_LONG : rowSet.find(rowKey); + } + + @Override + public long getPrevLong(long rowKey) { + return rowKey < 0 ? QueryConstants.NULL_LONG : rowSet.findPrev(rowKey); + } + + @Override + public void fillChunk( + @NotNull final FillContext context, + @NotNull final WritableChunk destination, + @NotNull final RowSequence rowSequence) { + try (final RowSet result = rowSet.invert(rowSequence.asRowSet())) { + RowKeyColumnSource.doFillChunk(destination, result); + } + } + + @Override + public void fillPrevChunk( + @NotNull final FillContext context, + @NotNull final WritableChunk destination, + @NotNull final RowSequence rowSequence) { + try (final RowSet result = rowSet.prev().invert(rowSequence.asRowSet())) { + RowKeyColumnSource.doFillChunk(destination, result); + } + } +} diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceWithDictionary.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceWithDictionary.java index c9bd05ddbee..2565664ed73 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceWithDictionary.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceWithDictionary.java @@ -12,9 +12,9 @@ import io.deephaven.engine.table.impl.*; import io.deephaven.engine.table.impl.locations.ColumnLocation; import io.deephaven.engine.table.impl.ColumnSourceGetDefaults; -import io.deephaven.engine.table.impl.sources.RowIdSource; import io.deephaven.engine.table.impl.chunkattributes.DictionaryKeys; import io.deephaven.chunk.attributes.Values; +import io.deephaven.engine.table.impl.sources.RowKeyColumnSource; import org.apache.commons.lang3.mutable.Mutable; import org.apache.commons.lang3.mutable.MutableObject; import org.jetbrains.annotations.NotNull; @@ -235,7 +235,7 @@ public QueryTable getStaticSymbolTable(@NotNull RowSet sourceIndex, boolean useL } final Map> symbolTableColumnSources = new LinkedHashMap<>(); - symbolTableColumnSources.put(SymbolTableSource.ID_COLUMN_NAME, new RowIdSource()); + symbolTableColumnSources.put(SymbolTableSource.ID_COLUMN_NAME, RowKeyColumnSource.INSTANCE); symbolTableColumnSources.put(SymbolTableSource.SYMBOL_COLUMN_NAME, dictionaryColumn); return new QueryTable(symbolTableRowSet, symbolTableColumnSources); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java index 61c2a97f102..d7da759f171 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java @@ -6,6 +6,7 @@ import io.deephaven.api.ColumnName; import io.deephaven.api.SortColumn; import io.deephaven.api.agg.Partition; +import io.deephaven.base.FileUtils; import io.deephaven.base.SleepUtil; import io.deephaven.base.log.LogOutput; import io.deephaven.base.verify.Assert; @@ -29,6 +30,8 @@ import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.engine.util.TableTools; import io.deephaven.engine.util.systemicmarking.SystemicObjectTracker; +import io.deephaven.parquet.table.ParquetInstructions; +import io.deephaven.parquet.table.ParquetTools; import io.deephaven.test.types.OutOfBandTest; import io.deephaven.util.SafeCloseable; import junit.framework.TestCase; @@ -36,6 +39,9 @@ import org.apache.commons.lang3.mutable.MutableLong; import org.junit.experimental.categories.Category; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; import java.util.List; import java.util.Map; import java.util.Random; @@ -1072,4 +1078,31 @@ public void testTransformStaticToRefreshing() { } catch (IllegalStateException expected) { } } + + public void testPartitionedTableSort() throws IOException { + final File tmpDir = Files.createTempDirectory("PartitionedTableTest-").toFile(); + try { + final ParquetInstructions instructions = ParquetInstructions.builder().useDictionary("I", true).build(); + Table a = emptyTable(200).update("I = `` + (50 + (ii % 100))", "K = ii"); + Table b = emptyTable(200).update("I = `` + (ii % 100)", "K = ii"); + ParquetTools.writeTable(a, new java.io.File(tmpDir + "/Partition=p0/data.parquet"), instructions); + ParquetTools.writeTable(b, new java.io.File(tmpDir + "/Partition=p1/data.parquet"), instructions); + a = a.updateView("Partition = `p0`").moveColumnsUp("Partition"); + b = b.updateView("Partition = `p1`").moveColumnsUp("Partition"); + + final Table fromDisk = ParquetTools.readTable(tmpDir); + + // Assert non-partitioned table sorts. + final Table diskOuterSort = fromDisk.sort("I"); + final Table exOuterSort = TableTools.merge(a, b).sort("I"); + assertTableEquals(exOuterSort, diskOuterSort); + + // Assert partitioned table sorts. + final Table diskInnerSort = fromDisk.partitionBy("Partition").proxy().sort("I").target().merge(); + final Table exInnerSort = TableTools.merge(a.sort("I"), b.sort("I")); + assertTableEquals(exInnerSort, diskInnerSort); + } finally { + FileUtils.deleteRecursively(tmpDir); + } + } } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java index c6bd254977b..777d55caf57 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java @@ -13,6 +13,7 @@ import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.context.QueryScope; import io.deephaven.engine.exceptions.CancellationException; +import io.deephaven.engine.table.impl.sources.RowKeyColumnSource; import io.deephaven.engine.util.TableTools; import io.deephaven.engine.table.impl.verify.TableAssertions; import io.deephaven.engine.table.impl.select.*; @@ -24,7 +25,6 @@ import io.deephaven.engine.table.ShiftObliviousListener; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.impl.chunkfilter.ChunkFilter; -import io.deephaven.engine.table.impl.sources.RowIdSource; import io.deephaven.engine.testutil.*; import io.deephaven.engine.testutil.QueryTableTestBase.TableComparator; import io.deephaven.engine.testutil.generator.*; @@ -1034,7 +1034,7 @@ public void testComparableRangeFilter() { public void testBigTable() { final Table source = new QueryTable( RowSetFactory.flat(10_000_000L).toTracking(), - Collections.singletonMap("A", new RowIdSource())); + Collections.singletonMap("A", RowKeyColumnSource.INSTANCE)); final IncrementalReleaseFilter incrementalReleaseFilter = new IncrementalReleaseFilter(0, 1000000L); final Table filtered = source.where(incrementalReleaseFilter); final Table result = filtered.where("A >= 6_000_000L", "A < 7_000_000L"); @@ -1053,7 +1053,7 @@ public void testBigTable() { public void testBigTableInitial() { final Table source = new QueryTable( RowSetFactory.flat(10_000_000L).toTracking(), - Collections.singletonMap("A", new RowIdSource())); + Collections.singletonMap("A", RowKeyColumnSource.INSTANCE)); final Table result = source.where("A >= 6_000_000L", "A < 7_000_000L"); assertEquals(1_000_000, result.size()); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/sources/TestRowKeyColumnSource.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/sources/TestRowKeyColumnSource.java new file mode 100644 index 00000000000..12eb064e901 --- /dev/null +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/sources/TestRowKeyColumnSource.java @@ -0,0 +1,99 @@ +/** + * Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.engine.table.impl.sources; + +import io.deephaven.base.verify.Assert; +import io.deephaven.chunk.WritableLongChunk; +import io.deephaven.chunk.attributes.Values; +import io.deephaven.engine.rowset.RowSet; +import io.deephaven.engine.rowset.RowSetBuilderSequential; +import io.deephaven.engine.rowset.RowSetFactory; +import io.deephaven.engine.rowset.chunkattributes.RowKeys; +import io.deephaven.engine.table.ChunkSource; +import io.deephaven.util.QueryConstants; +import org.junit.Test; + +public class TestRowKeyColumnSource { + @Test + public void testGet() { + final RowKeyColumnSource source = RowKeyColumnSource.INSTANCE; + try (final RowSet rs = RowSetFactory.fromRange(0, 2048)) { + rs.forAllRowKeys(key -> { + Assert.eq(source.getLong(key), "source.get(key)", key, "key"); + Assert.eq(source.getPrevLong(key), "source.getPrevLong(key)", key, "key"); + }); + } + + Assert.eq(source.getLong(RowSet.NULL_ROW_KEY), "source.get(NULL_ROW_KEY)", + QueryConstants.NULL_LONG, "QueryConstants.NULL_LONG"); + Assert.eq(source.getPrevLong(RowSet.NULL_ROW_KEY), "source.getPrevLong(NULL_ROW_KEY)", + QueryConstants.NULL_LONG, "QueryConstants.NULL_LONG"); + } + + @Test + public void testFillChunk() { + final RowKeyColumnSource source = RowKeyColumnSource.INSTANCE; + + try (final WritableLongChunk chunk = WritableLongChunk.makeWritableChunk(2048); + final ChunkSource.FillContext context = source.makeFillContext(chunk.capacity())) { + final RowSetBuilderSequential builder = RowSetFactory.builderSequential(); + final long RNG_MULTIPLE = 3; + final long RNG_SIZE = 8; + for (long ii = 0; ii < chunk.capacity() / RNG_SIZE; ++ii) { + final long firstKey = ii * RNG_MULTIPLE * RNG_SIZE; + builder.appendRange(firstKey, firstKey + RNG_SIZE - 1); + } + try (final RowSet rs = builder.build()) { + source.fillChunk(context, chunk, rs); + for (int ii = 0; ii < chunk.size(); ++ii) { + Assert.eq(chunk.get(ii), "chunk.get(ii)", rs.get(ii), "rs.get(ii)"); + } + + chunk.fillWithNullValue(0, chunk.capacity()); + source.fillPrevChunk(context, chunk, rs); + for (int ii = 0; ii < chunk.size(); ++ii) { + Assert.eq(chunk.get(ii), "chunk.get(ii)", rs.get(ii), "rs.get(ii)"); + } + } + } + } + + @Test + public void testFillChunkUnordered() { + final RowKeyColumnSource source = RowKeyColumnSource.INSTANCE; + + try (final WritableLongChunk chunk = WritableLongChunk.makeWritableChunk(2048); + final WritableLongChunk keyChunk = WritableLongChunk.makeWritableChunk(2048); + final ChunkSource.FillContext context = source.makeFillContext(chunk.capacity())) { + + keyChunk.setSize(0); + for (long ii = 0; ii < keyChunk.capacity(); ++ii) { + if (ii % 128 == 0) { + keyChunk.add(RowSet.NULL_ROW_KEY); + } else { + keyChunk.add((ii * 104729) % 65536); + } + } + + source.fillChunkUnordered(context, chunk, keyChunk); + for (int ii = 0; ii < chunk.size(); ++ii) { + if (ii % 128 == 0) { + Assert.eq(chunk.get(ii), "chunk.get(ii)", QueryConstants.NULL_LONG, "QueryConstants.NULL_LONG"); + } else { + Assert.eq(chunk.get(ii), "chunk.get(ii)", keyChunk.get(ii), "keyChunk.get(ii)"); + } + } + + chunk.fillWithNullValue(0, chunk.capacity()); + source.fillPrevChunkUnordered(context, chunk, keyChunk); + for (int ii = 0; ii < chunk.size(); ++ii) { + if (ii % 128 == 0) { + Assert.eq(chunk.get(ii), "chunk.get(ii)", QueryConstants.NULL_LONG, "QueryConstants.NULL_LONG"); + } else { + Assert.eq(chunk.get(ii), "chunk.get(ii)", keyChunk.get(ii), "keyChunk.get(ii)"); + } + } + } + } +} diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/sources/TestRowPositionColumnSource.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/sources/TestRowPositionColumnSource.java new file mode 100644 index 00000000000..0f9874daf2c --- /dev/null +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/sources/TestRowPositionColumnSource.java @@ -0,0 +1,89 @@ +/** + * Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending + */ +package io.deephaven.engine.table.impl.sources; + +import io.deephaven.base.verify.Assert; +import io.deephaven.chunk.WritableLongChunk; +import io.deephaven.chunk.attributes.Values; +import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.rowset.*; +import io.deephaven.engine.table.ChunkSource; +import io.deephaven.engine.testutil.ControlledUpdateGraph; +import io.deephaven.engine.testutil.junit4.EngineCleanup; +import io.deephaven.util.QueryConstants; +import org.junit.Rule; +import org.junit.Test; + +public class TestRowPositionColumnSource { + + @Rule + public final EngineCleanup framework = new EngineCleanup(); + + @Test + public void testGet() { + final RowSetBuilderSequential builder = RowSetFactory.builderSequential(); + final long RNG_MULTIPLE = 3; + final long RNG_SIZE = 8; + for (long ii = 0; ii < 2048; ++ii) { + final long firstKey = ii * RNG_MULTIPLE * RNG_SIZE; + builder.appendRange(firstKey, firstKey + RNG_SIZE - 1); + } + + try (final TrackingWritableRowSet rs = builder.build().toTracking()) { + final RowPositionColumnSource source = new RowPositionColumnSource(rs); + + rs.forAllRowKeys(key -> { + Assert.eq(source.getLong(key), "source.get(key)", rs.find(key), "rs.find(key)"); + }); + + final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + updateGraph.runWithinUnitTestCycle(() -> { + rs.removeRange(0, rs.get(1023)); + + rs.forAllRowKeys(key -> { + Assert.eq(source.getPrevLong(key), "source.get(key)", rs.find(key) + 1024, "rs.find(key) - 1024"); + }); + }); + + Assert.eq(source.getLong(RowSet.NULL_ROW_KEY), "source.get(NULL_ROW_KEY)", + QueryConstants.NULL_LONG, "QueryConstants.NULL_LONG"); + Assert.eq(source.getPrevLong(RowSet.NULL_ROW_KEY), "source.getPrevLong(NULL_ROW_KEY)", + QueryConstants.NULL_LONG, "QueryConstants.NULL_LONG"); + } + } + + @Test + public void testFillChunk() { + final RowSetBuilderSequential builder = RowSetFactory.builderSequential(); + final long RNG_MULTIPLE = 3; + final long RNG_SIZE = 8; + for (long ii = 0; ii < 2048; ++ii) { + final long firstKey = ii * RNG_MULTIPLE * RNG_SIZE; + builder.appendRange(firstKey, firstKey + RNG_SIZE - 1); + } + + try (final TrackingWritableRowSet rs = builder.build().toTracking(); + final WritableLongChunk chunk = WritableLongChunk.makeWritableChunk(rs.intSize())) { + final RowPositionColumnSource source = new RowPositionColumnSource(rs); + + try (final ChunkSource.FillContext context = source.makeFillContext(chunk.capacity())) { + source.fillChunk(context, chunk, rs); + for (int ii = 0; ii < chunk.size(); ++ii) { + Assert.eq(chunk.get(ii), "chunk.get(ii)", ii, "rs.find(ii)"); + } + + chunk.fillWithNullValue(0, chunk.capacity()); + final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast(); + updateGraph.runWithinUnitTestCycle(() -> { + rs.removeRange(0, rs.get(1023)); + source.fillPrevChunk(context, chunk, rs); + + for (int ii = 0; ii < chunk.size(); ++ii) { + Assert.eq(chunk.get(ii), "chunk.get(ii)", ii + 1024, "rs.find(ii) - 1024"); + } + }); + } + } + } +}