From 9c3ce1294ab0393b2a1e4ee51ebea3306190e125 Mon Sep 17 00:00:00 2001
From: Ryan Caudy <rcaudy@gmail.com>
Date: Tue, 14 Jan 2025 23:09:03 -0500
Subject: [PATCH] First crack at integrating a flag for turning off location
 existence checking in SourcePartitionedTable, as well as adding partitioning
 column values to the underlying partitioned Table

---
 .../table/impl/SourcePartitionedTable.java    | 126 ++++++++++++++----
 .../impl/SingleTableLocationProvider.java     |   2 +-
 .../impl/SourcePartitionedTableTest.java      |   3 +-
 .../barrage/util/PythonTableDataService.java  |   9 +-
 .../experimental/table_data_service.py        |  13 +-
 5 files changed, 116 insertions(+), 37 deletions(-)

diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java
index 7ef10f4e31a..c39dc289f1b 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/SourcePartitionedTable.java
@@ -54,6 +54,7 @@ public class SourcePartitionedTable extends PartitionedTableImpl {
      * @param refreshSizes Whether the locations found should be refreshed
      * @param locationKeyMatcher Function to filter desired location keys
      */
+    @Deprecated(forRemoval = true)
     public SourcePartitionedTable(
             @NotNull final TableDefinition constituentDefinition,
             @NotNull final UnaryOperator<Table> applyTablePermissions,
@@ -61,13 +62,44 @@ public SourcePartitionedTable(
             final boolean refreshLocations,
             final boolean refreshSizes,
             @NotNull final Predicate<ImmutableTableLocationKey> locationKeyMatcher) {
+        this(constituentDefinition, applyTablePermissions, tableLocationProvider, refreshLocations, refreshSizes,
+                locationKeyMatcher, true);
+    }
+
+    /**
+     * Construct a {@link SourcePartitionedTable} from the supplied parameters.
+     * <p>
+     * Note that refreshLocations and refreshSizes are distinct because there are use cases that supply an external
+     * RowSet and hence don't require size refreshes. Others might care for size refreshes, but only the
+     * initially-available set of locations.
+     *
+     * @param constituentDefinition The {@link TableDefinition} expected of constituent {@link Table tables}
+     * @param applyTablePermissions Function to apply in order to correctly restrict the visible result rows
+     * @param tableLocationProvider Source for table locations
+     * @param refreshLocations Whether the set of locations should be refreshed
+     * @param refreshSizes Whether the locations found should be refreshed
+     * @param locationKeyMatcher Function to filter desired location keys
+     * @param preCheckExistence Whether to pre-check the existence (non-null, non-zero size) of locations before
+     *        including them in the result SourcePartitionedTable as constituents. It is recommended to set this to
+     *        {@code false} if you will do subsequent filtering on the result, or if you are confident that all
+     *        locations are valid.
+     */
+    public SourcePartitionedTable(
+            @NotNull final TableDefinition constituentDefinition,
+            @NotNull final UnaryOperator<Table> applyTablePermissions,
+            @NotNull final TableLocationProvider tableLocationProvider,
+            final boolean refreshLocations,
+            final boolean refreshSizes,
+            @NotNull final Predicate<ImmutableTableLocationKey> locationKeyMatcher,
+            final boolean preCheckExistence) {
         super(new UnderlyingTableMaintainer(
                 constituentDefinition,
                 applyTablePermissions,
                 tableLocationProvider,
                 refreshLocations,
                 refreshSizes,
-                locationKeyMatcher).result(),
+                locationKeyMatcher,
+                preCheckExistence).result(),
                 Set.of(KEY_COLUMN_NAME),
                 true,
                 CONSTITUENT_COLUMN_NAME,
@@ -86,6 +118,8 @@ private static final class UnderlyingTableMaintainer extends ReferenceCountedLiv
         private final Predicate<ImmutableTableLocationKey> locationKeyMatcher;
 
         private final TrackingWritableRowSet resultRows;
+        private final String[] partitioningColumnNames;
+        private final WritableColumnSource<?>[] resultPartitionValues;
         private final WritableColumnSource<TableLocationKey> resultTableLocationKeys;
         private final WritableColumnSource<Table> resultLocationTables;
         private final QueryTable result;
@@ -106,7 +140,8 @@ private UnderlyingTableMaintainer(
                 @NotNull final TableLocationProvider tableLocationProvider,
                 final boolean refreshLocations,
                 final boolean refreshSizes,
-                @NotNull final Predicate<ImmutableTableLocationKey> locationKeyMatcher) {
+                @NotNull final Predicate<ImmutableTableLocationKey> locationKeyMatcher,
+                final boolean preCheckExistence) {
             super(false);
 
             this.constituentDefinition = constituentDefinition;
@@ -116,10 +151,20 @@ private UnderlyingTableMaintainer(
             this.locationKeyMatcher = locationKeyMatcher;
 
             resultRows = RowSetFactory.empty().toTracking();
+            final List<ColumnDefinition<?>> partitioningColumns = constituentDefinition.getPartitioningColumns();
+            partitioningColumnNames = partitioningColumns.stream()
+                    .map(ColumnDefinition::getName)
+                    .toArray(String[]::new);
+            resultPartitionValues = partitioningColumns.stream()
+                    .map(cd -> ArrayBackedColumnSource.getMemoryColumnSource(cd.getDataType(), cd.getComponentType()))
+                    .toArray(WritableColumnSource[]::new);
             resultTableLocationKeys = ArrayBackedColumnSource.getMemoryColumnSource(TableLocationKey.class, null);
             resultLocationTables = ArrayBackedColumnSource.getMemoryColumnSource(Table.class, null);
 
-            final Map<String, ColumnSource<?>> resultSources = new LinkedHashMap<>(2);
+            final Map<String, ColumnSource<?>> resultSources = new LinkedHashMap<>(partitioningColumns.size() + 2);
+            for (int pci = 0; pci < partitioningColumns.size(); ++pci) {
+                resultSources.put(partitioningColumnNames[pci], resultPartitionValues[pci]);
+            }
             resultSources.put(KEY_COLUMN_NAME, resultTableLocationKeys);
             resultSources.put(CONSTITUENT_COLUMN_NAME, resultLocationTables);
             result = new QueryTable(resultRows, resultSources);
@@ -135,14 +180,17 @@ private UnderlyingTableMaintainer(
             }
 
             if (needToRefreshLocations) {
+                Arrays.stream(resultPartitionValues).forEach(ColumnSource::startTrackingPrevValues);
                 resultTableLocationKeys.startTrackingPrevValues();
                 resultLocationTables.startTrackingPrevValues();
 
                 subscriptionBuffer = new TableLocationSubscriptionBuffer(tableLocationProvider);
                 manage(subscriptionBuffer);
 
-                pendingLocationStates = new IntrusiveDoublyLinkedQueue<>(
-                        IntrusiveDoublyLinkedNode.Adapter.<PendingLocationState>getInstance());
+                pendingLocationStates = preCheckExistence
+                        ? new IntrusiveDoublyLinkedQueue<>(
+                                IntrusiveDoublyLinkedNode.Adapter.<PendingLocationState>getInstance())
+                        : null;
                 readyLocationStates = new IntrusiveDoublyLinkedQueue<>(
                         IntrusiveDoublyLinkedNode.Adapter.<PendingLocationState>getInstance());
                 processNewLocationsUpdateRoot = new InstrumentedTableUpdateSource(
@@ -206,12 +254,17 @@ private RowSet sortAndAddLocations(@NotNull final Stream<TableLocation> location
             // Note that makeConstituentTable expects us to subsequently unmanage the TableLocations
             unmanage(locations.sorted(Comparator.comparing(TableLocation::getKey)).peek(tl -> {
                 final long constituentRowKey = lastInsertedRowKey.incrementAndGet();
-                final Table constituentTable = makeConstituentTable(tl);
+
+                for (int pci = 0; pci < resultPartitionValues.length; ++pci) {
+                    addPartitionValue(tl.getKey(), partitioningColumnNames[pci], resultPartitionValues[pci],
+                            constituentRowKey);
+                }
 
                 resultTableLocationKeys.ensureCapacity(constituentRowKey + 1);
                 resultTableLocationKeys.set(constituentRowKey, tl.getKey());
 
                 resultLocationTables.ensureCapacity(constituentRowKey + 1);
+                final Table constituentTable = makeConstituentTable(tl);
                 resultLocationTables.set(constituentRowKey, constituentTable);
 
                 if (result.isRefreshing()) {
@@ -223,6 +276,15 @@ private RowSet sortAndAddLocations(@NotNull final Stream<TableLocation> location
                     : RowSetFactory.fromRange(initialLastRowKey + 1, lastInsertedRowKey.get());
         }
 
+        private static <T> void addPartitionValue(
+                @NotNull final TableLocationKey tableLocationKey,
+                @NotNull final String partitioningColumnName,
+                @NotNull final WritableColumnSource<T> partitionValueColumn,
+                final long rowKey) {
+            partitionValueColumn.ensureCapacity(rowKey + 1);
+            partitionValueColumn.set(rowKey, tableLocationKey.getPartitionValue(partitioningColumnName));
+        }
+
         private Table makeConstituentTable(@NotNull final TableLocation tableLocation) {
             final PartitionAwareSourceTable constituent = new PartitionAwareSourceTable(
                     constituentDefinition,
@@ -280,19 +342,24 @@ private RowSet processAdditions(final TableLocationSubscriptionBuffer.LocationUp
              * population in STM ColumnSources.
              */
             // TODO (https://github.com/deephaven/deephaven-core/issues/867): Refactor around a ticking partition table
-            locationUpdate.getPendingAddedLocationKeys().stream()
+            final Stream<PendingLocationState> newPendingLocations = locationUpdate.getPendingAddedLocationKeys()
+                    .stream()
                     .map(LiveSupplier::get)
                     .filter(locationKeyMatcher)
                     .map(tableLocationProvider::getTableLocation)
                     .peek(this::manage)
-                    .map(PendingLocationState::new)
-                    .forEach(pendingLocationStates::offer);
-            for (final Iterator<PendingLocationState> iter = pendingLocationStates.iterator(); iter.hasNext();) {
-                final PendingLocationState pendingLocationState = iter.next();
-                if (pendingLocationState.exists()) {
-                    iter.remove();
-                    readyLocationStates.offer(pendingLocationState);
+                    .map(PendingLocationState::new);
+            if (pendingLocationStates != null) {
+                newPendingLocations.forEach(pendingLocationStates::offer);
+                for (final Iterator<PendingLocationState> iter = pendingLocationStates.iterator(); iter.hasNext();) {
+                    final PendingLocationState pendingLocationState = iter.next();
+                    if (pendingLocationState.exists()) {
+                        iter.remove();
+                        readyLocationStates.offer(pendingLocationState);
+                    }
                 }
+            } else {
+                newPendingLocations.forEach(readyLocationStates::offer);
             }
 
             if (readyLocationStates.isEmpty()) {
@@ -317,22 +384,24 @@ private RowSet processRemovals(final TableLocationSubscriptionBuffer.LocationUpd
             }
 
             // Iterate through the pending locations and remove any that are in the removed set.
-            List<LivenessReferent> toUnmanage = null;
-            for (final Iterator<PendingLocationState> iter = pendingLocationStates.iterator(); iter.hasNext();) {
-                final PendingLocationState pendingLocationState = iter.next();
-                if (relevantRemovedLocations.contains(pendingLocationState.location.getKey())) {
-                    iter.remove();
-                    // Release the state and plan to unmanage the location
-                    if (toUnmanage == null) {
-                        toUnmanage = new ArrayList<>();
+            if (pendingLocationStates != null) {
+                List<LivenessReferent> toUnmanage = null;
+                for (final Iterator<PendingLocationState> iter = pendingLocationStates.iterator(); iter.hasNext();) {
+                    final PendingLocationState pendingLocationState = iter.next();
+                    if (relevantRemovedLocations.contains(pendingLocationState.location.getKey())) {
+                        iter.remove();
+                        // Release the state and plan to unmanage the location
+                        if (toUnmanage == null) {
+                            toUnmanage = new ArrayList<>();
+                        }
+                        toUnmanage.add(pendingLocationState.release());
                     }
-                    toUnmanage.add(pendingLocationState.release());
                 }
-            }
-            if (toUnmanage != null) {
-                unmanage(toUnmanage.stream());
-                // noinspection UnusedAssignment
-                toUnmanage = null;
+                if (toUnmanage != null) {
+                    unmanage(toUnmanage.stream());
+                    // noinspection UnusedAssignment
+                    toUnmanage = null;
+                }
             }
 
             // At the end of the cycle we need to make sure we unmanage any removed constituents.
@@ -367,6 +436,7 @@ private RowSet processRemovals(final TableLocationSubscriptionBuffer.LocationUpd
             this.removedLocationsCommitter.maybeActivate();
 
             final WritableRowSet deletedRows = deleteBuilder.build();
+            Arrays.stream(resultPartitionValues).forEach(cs -> cs.setNull(deletedRows));
             resultTableLocationKeys.setNull(deletedRows);
             resultLocationTables.setNull(deletedRows);
             return deletedRows;
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/SingleTableLocationProvider.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/SingleTableLocationProvider.java
index 977c4d8f0ef..045c6c7a4da 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/SingleTableLocationProvider.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/SingleTableLocationProvider.java
@@ -17,7 +17,7 @@
 /**
  * A {@link TableLocationProvider} that provides access to exactly one, previously-known {@link TableLocation}. In
  * contrast to {@link AbstractTableLocationProvider}, this class does not manage the liveness of the table location.
- * Managment must be done externally (as in {@link io.deephaven.engine.table.impl.SourcePartitionedTable}).
+ * Management must be done externally (as in {@link io.deephaven.engine.table.impl.SourcePartitionedTable}).
  */
 public final class SingleTableLocationProvider implements TableLocationProvider {
     private static final String IMPLEMENTATION_NAME = SingleTableLocationProvider.class.getSimpleName();
diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/SourcePartitionedTableTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/SourcePartitionedTableTest.java
index b040d3a7a5f..68906192f8c 100644
--- a/engine/table/src/test/java/io/deephaven/engine/table/impl/SourcePartitionedTableTest.java
+++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/SourcePartitionedTableTest.java
@@ -104,7 +104,8 @@ private SourcePartitionedTable setUpData() {
                 tlp,
                 true,
                 true,
-                l -> true);
+                l -> true,
+                true);
     }
 
     private void verifyStringColumnContents(Table table, String columnName, String... expectedValues) {
diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/PythonTableDataService.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/PythonTableDataService.java
index 7586ee831fc..e0bc8127aec 100644
--- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/PythonTableDataService.java
+++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/util/PythonTableDataService.java
@@ -120,10 +120,14 @@ public Table makeTable(@NotNull final TableKeyImpl tableKey, final boolean live)
      *
      * @param tableKey The table key
      * @param live Whether the result should update as new data becomes available
+     * @param preCheckExistence Whether to include only locations observed to have non-empty data
      * @return The {@link PartitionedTable}
      */
     @ScriptApi
-    public PartitionedTable makePartitionedTable(@NotNull final TableKeyImpl tableKey, final boolean live) {
+    public PartitionedTable makePartitionedTable(
+            @NotNull final TableKeyImpl tableKey,
+            final boolean live,
+            final boolean preCheckExistence) {
         final TableLocationProviderImpl tableLocationProvider =
                 (TableLocationProviderImpl) getTableLocationProvider(tableKey);
         return new SourcePartitionedTable(
@@ -132,7 +136,8 @@ public PartitionedTable makePartitionedTable(@NotNull final TableKeyImpl tableKe
                 tableLocationProvider,
                 live,
                 live,
-                tlk -> true);
+                tlk -> true,
+                preCheckExistence);
     }
 
     /**
diff --git a/py/server/deephaven/experimental/table_data_service.py b/py/server/deephaven/experimental/table_data_service.py
index fdf6d677294..9d171b21881 100644
--- a/py/server/deephaven/experimental/table_data_service.py
+++ b/py/server/deephaven/experimental/table_data_service.py
@@ -260,7 +260,7 @@ def make_table(self, table_key: TableKey, *, refreshing: bool) -> Table:
 
         Args:
             table_key (TableKey): the table key
-            refreshing (bool): whether the table is live or static
+            refreshing (bool): whether the table is live (True) or static (False)
 
         Returns:
             Table: a new table
@@ -274,13 +274,15 @@ def make_table(self, table_key: TableKey, *, refreshing: bool) -> Table:
         except Exception as e:
             raise DHError(e, message=f"failed to make a table for the key {table_key}") from e
 
-    def make_partitioned_table(self, table_key: TableKey, *, refreshing: bool) -> PartitionedTable:
+    def make_partitioned_table(self, table_key: TableKey, *, refreshing: bool,
+                               pre_check_existence: bool = False) -> PartitionedTable:
         """ Creates a PartitionedTable backed by the backend service with the given table key.
 
         Args:
             table_key (TableKey): the table key
-            refreshing (bool): whether the partitioned table is live or static
-
+            refreshing (bool): whether the partitioned table is live (True) or static (False)
+            pre_check_existence (bool): whether the partitioned table should verify that locations exist and are
+                non-empty before including them in the table
         Returns:
             PartitionedTable: a new partitioned table
 
@@ -289,7 +291,8 @@ def make_partitioned_table(self, table_key: TableKey, *, refreshing: bool) -> Pa
         """
         j_table_key = _JTableKeyImpl(table_key)
         try:
-            return PartitionedTable(self._j_tbl_service.makePartitionedTable(j_table_key, refreshing))
+            return PartitionedTable(
+                self._j_tbl_service.makePartitionedTable(j_table_key, refreshing, pre_check_existence))
         except Exception as e:
             raise DHError(e, message=f"failed to make a partitioned table for the key {table_key}") from e