Skip to content

Commit

Permalink
Fixes to merge, and OperationInitializationThreadPool initialization …
Browse files Browse the repository at this point in the history
…changes (#4049)

* Get rid of overly aggressive constituentChangesPermitted check in PartitionedTableCreatorImpl.of; we know the right answer, no need to complain to the user. Document this in PartitionedTableFactory.of.
Get rid of extra constituent coalesce in TableToolsMergeHelper; this is handled internally by UnionSourceManager.

* Minor refactoring of OperationInitializationThreadPool. Change DeephavenApiServer to ensure the initialization threads are started after JPY and before the server.
  • Loading branch information
rcaudy authored and devinrsmith committed Jun 22, 2023
1 parent e9ae5e8 commit 8339a9f
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private static Creator partitionedTableCreator() {
* {@link TableDefinition#checkMutualCompatibility(TableDefinition) mutually compatible} with all values in
* the "constituent" column of {@code table}
* @param constituentChangesPermitted Whether {@code table} is permitted to report changes that impact the
* constituent column
* constituent column; ignored (and treated as {@code false}) if {@code !table.isRefreshing()}
* @return A new PartitionedTable as described
*/
public static PartitionedTable of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ void enqueueSubFilters(

private void enqueueJobs(Iterable<? extends NotificationQueue.Notification> subFilters) {
for (NotificationQueue.Notification notification : subFilters) {
OperationInitializationThreadPool.executorService.submit(() -> {
OperationInitializationThreadPool.executorService().submit(() -> {
root.runningChildren.put(Thread.currentThread(), Thread.currentThread());
try {
if (!root.cancelled.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,16 @@
import org.jetbrains.annotations.NotNull;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class OperationInitializationThreadPool {

/**
* The number of threads that will be used for parallel initialization in this process
*/
public static final int NUM_THREADS;

static {
Expand All @@ -28,29 +34,53 @@ public class OperationInitializationThreadPool {

private static final ThreadLocal<Boolean> isInitializationThread = ThreadLocal.withInitial(() -> false);

/**
* @return Whether the current thread is part of the OperationInitializationThreadPool's {@link #executorService()}
*/
public static boolean isInitializationThread() {
return isInitializationThread.get();
}

/**
* @return Whether the current thread can parallelize operations using the OperationInitializationThreadPool's
* {@link #executorService()}
*/
public static boolean canParallelize() {
return NUM_THREADS > 1 && !isInitializationThread();
}

public final static ExecutorService executorService;
private static final ThreadPoolExecutor executorService;

static {
final ThreadGroup threadGroup = new ThreadGroup("OperationInitializationThreadPool");
final NamingThreadFactory threadFactory =
new NamingThreadFactory(threadGroup, OperationInitializationThreadPool.class, "initializationExecutor",
true) {
@Override
public Thread newThread(@NotNull Runnable r) {
return super.newThread(ThreadInitializationFactory.wrapRunnable(() -> {
isInitializationThread.set(true);
MultiChunkPool.enableDedicatedPoolForThisThread();
r.run();
}));
}
};
executorService = Executors.newFixedThreadPool(NUM_THREADS, threadFactory);
final ThreadFactory threadFactory = new NamingThreadFactory(
threadGroup, OperationInitializationThreadPool.class, "initializationExecutor", true) {
@Override
public Thread newThread(@NotNull final Runnable r) {
return super.newThread(ThreadInitializationFactory.wrapRunnable(() -> {
isInitializationThread.set(true);
MultiChunkPool.enableDedicatedPoolForThisThread();
r.run();
}));
}
};
executorService = new ThreadPoolExecutor(
NUM_THREADS, NUM_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), threadFactory);
}

/**
* @return The OperationInitializationThreadPool's {@link ExecutorService}; will be {@code null} if the
* OperationInitializationThreadPool has not been {@link #start() started}
*/
public static ExecutorService executorService() {
return executorService;
}

/**
* Start the OperationInitializationThreadPool. In practice, this just pre-starts all threads in the
* {@link #executorService()}.
*/
public static void start() {
executorService.prestartAllCoreThreads();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,13 @@ public PartitionedTable of(
+ " has unsupported data type " + constituentColumnDefinition.getDataType());
}

// Validate change support
if (!table.isRefreshing() && constituentChangesPermitted) {
throw new IllegalArgumentException("Partitioned table " + table
+ " is static, but constituent changes are permitted");
}

return new PartitionedTableImpl(
table,
keyColumnNames,
uniqueKeys,
constituentColumnName,
constituentDefinition,
constituentChangesPermitted,
constituentChangesPermitted && table.isRefreshing(),
true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public void submit(
final Runnable runnable,
final LogOutputAppendable description,
final Consumer<Exception> onError) {
OperationInitializationThreadPool.executorService.submit(() -> {
OperationInitializationThreadPool.executorService().submit(() -> {
final BasePerformanceEntry basePerformanceEntry = new BasePerformanceEntry();
basePerformanceEntry.onBaseEntryStart();
try (final SafeCloseable ignored = executionContext == null ? null : executionContext.open()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,6 @@ static List<Table> getTablesToMerge(Stream<Table> tables, int sizeEstimate) {
if (table == null) {
return;
}
if (table instanceof UncoalescedTable) {
table = table.coalesce();
}

if (canBreakOutUnionedTable(table)) {
tableList.addAll(getComponentTables(table));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ public void testInterFilterInterruption() {

// we want to make sure we can push something through the thread pool and are not hogging it
final CountDownLatch latch = new CountDownLatch(1);
OperationInitializationThreadPool.executorService.submit(latch::countDown);
OperationInitializationThreadPool.executorService().submit(latch::countDown);
waitForLatch(latch);

assertEquals(0, fastCounter.invokes.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import io.deephaven.auth.AuthenticationRequestHandler;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.table.impl.OperationInitializationThreadPool;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
import io.deephaven.engine.table.impl.perf.UpdatePerformanceTracker;
import io.deephaven.engine.table.impl.util.EngineMetrics;
Expand All @@ -23,7 +24,6 @@
import io.deephaven.uri.resolver.UriResolver;
import io.deephaven.uri.resolver.UriResolvers;
import io.deephaven.uri.resolver.UriResolversInstance;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.VisibleForTesting;
import io.deephaven.util.process.ProcessEnvironment;
import io.deephaven.util.process.ShutdownManager;
Expand Down Expand Up @@ -129,11 +129,14 @@ public DeephavenApiServer run() throws IOException, ClassNotFoundException, Time
scriptSessionProvider.get();
pluginRegistration.registerAll();

log.info().append("Initializing ExecutionContext for Main Thread...").endl();
log.info().append("Initializing Execution Context for Main Thread...").endl();
// noinspection resource
executionContextProvider.get().open();

log.info().append("Starting UpdateGraph...").endl();
log.info().append("Starting Operation Initialization Thread Pool...").endl();
OperationInitializationThreadPool.start();

log.info().append("Starting Update Graph...").endl();
ug.<PeriodicUpdateGraph>cast().start();

EngineMetrics.maybeStartStatsCollection();
Expand Down

0 comments on commit 8339a9f

Please sign in to comment.