diff --git a/engine/api/src/main/java/io/deephaven/engine/table/PartitionedTableFactory.java b/engine/api/src/main/java/io/deephaven/engine/table/PartitionedTableFactory.java index 70205eaa64a..9aeea73fc3b 100644 --- a/engine/api/src/main/java/io/deephaven/engine/table/PartitionedTableFactory.java +++ b/engine/api/src/main/java/io/deephaven/engine/table/PartitionedTableFactory.java @@ -84,7 +84,7 @@ private static Creator partitionedTableCreator() { * {@link TableDefinition#checkMutualCompatibility(TableDefinition) mutually compatible} with all values in * the "constituent" column of {@code table} * @param constituentChangesPermitted Whether {@code table} is permitted to report changes that impact the - * constituent column + * constituent column; ignored (and treated as {@code false}) if {@code !table.isRefreshing()} * @return A new PartitionedTable as described */ public static PartitionedTable of( diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java index 7fab49d9097..b00d195dba0 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java @@ -85,7 +85,7 @@ void enqueueSubFilters( private void enqueueJobs(Iterable subFilters) { for (NotificationQueue.Notification notification : subFilters) { - OperationInitializationThreadPool.executorService.submit(() -> { + OperationInitializationThreadPool.executorService().submit(() -> { root.runningChildren.put(Thread.currentThread(), Thread.currentThread()); try { if (!root.cancelled.get()) { diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java index 979ffe56432..c12a0a605d3 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java @@ -10,10 +10,16 @@ import org.jetbrains.annotations.NotNull; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; public class OperationInitializationThreadPool { + /** + * The number of threads that will be used for parallel initialization in this process + */ public static final int NUM_THREADS; static { @@ -28,29 +34,53 @@ public class OperationInitializationThreadPool { private static final ThreadLocal isInitializationThread = ThreadLocal.withInitial(() -> false); + /** + * @return Whether the current thread is part of the OperationInitializationThreadPool's {@link #executorService()} + */ public static boolean isInitializationThread() { return isInitializationThread.get(); } + /** + * @return Whether the current thread can parallelize operations using the OperationInitializationThreadPool's + * {@link #executorService()} + */ public static boolean canParallelize() { return NUM_THREADS > 1 && !isInitializationThread(); } - public final static ExecutorService executorService; + private static final ThreadPoolExecutor executorService; + static { final ThreadGroup threadGroup = new ThreadGroup("OperationInitializationThreadPool"); - final NamingThreadFactory threadFactory = - new NamingThreadFactory(threadGroup, OperationInitializationThreadPool.class, "initializationExecutor", - true) { - @Override - public Thread newThread(@NotNull Runnable r) { - return super.newThread(ThreadInitializationFactory.wrapRunnable(() -> { - isInitializationThread.set(true); - MultiChunkPool.enableDedicatedPoolForThisThread(); - r.run(); - })); - } - }; - executorService = Executors.newFixedThreadPool(NUM_THREADS, threadFactory); + final ThreadFactory threadFactory = new NamingThreadFactory( + threadGroup, OperationInitializationThreadPool.class, "initializationExecutor", true) { + @Override + public Thread newThread(@NotNull final Runnable r) { + return super.newThread(ThreadInitializationFactory.wrapRunnable(() -> { + isInitializationThread.set(true); + MultiChunkPool.enableDedicatedPoolForThisThread(); + r.run(); + })); + } + }; + executorService = new ThreadPoolExecutor( + NUM_THREADS, NUM_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), threadFactory); + } + + /** + * @return The OperationInitializationThreadPool's {@link ExecutorService}; will be {@code null} if the + * OperationInitializationThreadPool has not been {@link #start() started} + */ + public static ExecutorService executorService() { + return executorService; + } + + /** + * Start the OperationInitializationThreadPool. In practice, this just pre-starts all threads in the + * {@link #executorService()}. + */ + public static void start() { + executorService.prestartAllCoreThreads(); } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableCreatorImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableCreatorImpl.java index 06a76edd440..8a8b70e6745 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableCreatorImpl.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableCreatorImpl.java @@ -72,19 +72,13 @@ public PartitionedTable of( + " has unsupported data type " + constituentColumnDefinition.getDataType()); } - // Validate change support - if (!table.isRefreshing() && constituentChangesPermitted) { - throw new IllegalArgumentException("Partitioned table " + table - + " is static, but constituent changes are permitted"); - } - return new PartitionedTableImpl( table, keyColumnNames, uniqueKeys, constituentColumnName, constituentDefinition, - constituentChangesPermitted, + constituentChangesPermitted && table.isRefreshing(), true); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/OperationInitializationPoolJobScheduler.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/OperationInitializationPoolJobScheduler.java index 104184ba3c8..7037dd34811 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/OperationInitializationPoolJobScheduler.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/OperationInitializationPoolJobScheduler.java @@ -19,7 +19,7 @@ public void submit( final Runnable runnable, final LogOutputAppendable description, final Consumer onError) { - OperationInitializationThreadPool.executorService.submit(() -> { + OperationInitializationThreadPool.executorService().submit(() -> { final BasePerformanceEntry basePerformanceEntry = new BasePerformanceEntry(); basePerformanceEntry.onBaseEntryStart(); try (final SafeCloseable ignored = executionContext == null ? null : executionContext.open()) { diff --git a/engine/table/src/main/java/io/deephaven/engine/util/TableToolsMergeHelper.java b/engine/table/src/main/java/io/deephaven/engine/util/TableToolsMergeHelper.java index 1664aad42d3..c369a78ba4e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/TableToolsMergeHelper.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/TableToolsMergeHelper.java @@ -86,9 +86,6 @@ static List getTablesToMerge(Stream
tables, int sizeEstimate) { if (table == null) { return; } - if (table instanceof UncoalescedTable) { - table = table.coalesce(); - } if (canBreakOutUnionedTable(table)) { tableList.addAll(getComponentTables(table)); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java index c894a45d746..4b2058bbeb3 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java @@ -816,7 +816,7 @@ public void testInterFilterInterruption() { // we want to make sure we can push something through the thread pool and are not hogging it final CountDownLatch latch = new CountDownLatch(1); - OperationInitializationThreadPool.executorService.submit(latch::countDown); + OperationInitializationThreadPool.executorService().submit(latch::countDown); waitForLatch(latch); assertEquals(0, fastCounter.invokes.get()); diff --git a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java index 3275478ab00..a38b1901ece 100644 --- a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java +++ b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java @@ -5,6 +5,7 @@ import io.deephaven.auth.AuthenticationRequestHandler; import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.table.impl.OperationInitializationThreadPool; import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; import io.deephaven.engine.table.impl.perf.UpdatePerformanceTracker; import io.deephaven.engine.table.impl.util.EngineMetrics; @@ -23,7 +24,6 @@ import io.deephaven.uri.resolver.UriResolver; import io.deephaven.uri.resolver.UriResolvers; import io.deephaven.uri.resolver.UriResolversInstance; -import io.deephaven.util.SafeCloseable; import io.deephaven.util.annotations.VisibleForTesting; import io.deephaven.util.process.ProcessEnvironment; import io.deephaven.util.process.ShutdownManager; @@ -129,11 +129,14 @@ public DeephavenApiServer run() throws IOException, ClassNotFoundException, Time scriptSessionProvider.get(); pluginRegistration.registerAll(); - log.info().append("Initializing ExecutionContext for Main Thread...").endl(); + log.info().append("Initializing Execution Context for Main Thread...").endl(); // noinspection resource executionContextProvider.get().open(); - log.info().append("Starting UpdateGraph...").endl(); + log.info().append("Starting Operation Initialization Thread Pool...").endl(); + OperationInitializationThreadPool.start(); + + log.info().append("Starting Update Graph...").endl(); ug.cast().start(); EngineMetrics.maybeStartStatsCollection();