From 2f5a0545f0562f36f2bb6e70257b04091687f73b Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Thu, 14 Dec 2023 09:49:50 -0600 Subject: [PATCH] Make OperationInitializer non-static (#4919) This patch makes the OperationInitializer part of the ExecutionContext, allowing consumers to specify a preferred instance. ThreadInitializationFactory is also changed to be built in Dagger instead of only by configuration. Combined, these changes make it easier to reason about startup, requiring that dependencies are declared explicitly rather than implicitly referenced and used. It is likely that some of the work done here should be refactored further into some JobScheduler factory or assisted injection, but OperationInitializer is still referenced directly by InitialFilterExecution, so this would require more refactoring. Ideally, OperationInitializer.NON_PARALLELIZABLE would be used instead of the OperationInitializerThreadPool's threadlocal, but some services restore the user's own exec context - this is a shortcoming of the patch, and possibly should be resolved before merging. Downstream users will need to consider if they want to capture the OperationInitializer when they create new exec contexts - generally this will be desired. Partial #4040 --- .../python/PythonDeephavenSession.java | 10 +-- .../thread/ThreadInitializationFactory.java | 54 +++------------- .../engine/context/ExecutionContext.java | 63 ++++++++++++++++--- .../context/PoisonedOperationInitializer.java | 30 +++++++++ .../engine/context/PoisonedUpdateGraph.java | 1 - .../engine/context/TestQueryCompiler.java | 1 - .../table/impl/AbstractFilterExecution.java | 5 +- .../table/impl/InitialFilterExecution.java | 3 +- .../OperationInitializationThreadPool.java | 61 ++++++++---------- .../engine/table/impl/QueryTable.java | 5 +- .../partitioned/PartitionedTableImpl.java | 28 ++++++++- .../PartitionedTableProxyImpl.java | 1 - .../impl/rangejoin/RangeJoinOperation.java | 6 +- .../impl/sources/UnionSourceManager.java | 1 - .../engine/table/impl/updateby/UpdateBy.java | 6 +- ...erationInitializationPoolJobScheduler.java | 10 ++- .../updategraph/impl/PeriodicUpdateGraph.java | 48 +++++++++----- .../engine/util/AbstractScriptSession.java | 4 ++ .../engine/util/GroovyDeephavenSession.java | 7 ++- .../util/NoLanguageDeephavenSession.java | 11 ++-- .../engine/table/impl/FuzzerTest.java | 7 ++- .../table/impl/PartitionedTableTest.java | 4 -- .../engine/table/impl/QueryTableTest.java | 8 ++- .../table/impl/QueryTableWhereTest.java | 2 +- .../select/TestConditionFilterGeneration.java | 1 - .../select/TestFormulaColumnGeneration.java | 1 - .../impl/TestEventDrivenUpdateGraph.java | 37 ++++++++--- .../scripts/TestGroovyDeephavenSession.java | 3 +- .../engine/context/TestExecutionContext.java | 9 ++- .../testutil/ControlledUpdateGraph.java | 8 +-- .../updategraph/OperationInitializer.java | 42 +++++++++++++ .../src/main/resources/dh-defaults.prop | 4 -- .../src/main/resources/dh-tests.prop | 1 - py/server/test_helper/__init__.py | 3 +- .../impl/select/TestConditionFilter.java | 2 + .../console/NoConsoleSessionModule.java | 6 +- .../SessionToExecutionStateModule.java | 19 ------ .../groovy/GroovyConsoleSessionModule.java | 4 +- .../python/PythonConsoleSessionModule.java | 5 +- .../console/python/PythonDebuggingModule.java | 16 +++++ .../server/runner/DeephavenApiServer.java | 3 - .../runner/scheduler/SchedulerModule.java | 29 +++++++-- .../ApplicationServiceGrpcImplTest.java | 3 +- .../server/appmode/ApplicationTest.java | 9 ++- .../test/FlightMessageRoundTripTest.java | 3 +- sphinx/source/conf.py | 3 +- 46 files changed, 390 insertions(+), 197 deletions(-) create mode 100644 engine/context/src/main/java/io/deephaven/engine/context/PoisonedOperationInitializer.java create mode 100644 engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java delete mode 100644 server/src/main/java/io/deephaven/server/console/SessionToExecutionStateModule.java create mode 100644 server/src/main/java/io/deephaven/server/console/python/PythonDebuggingModule.java diff --git a/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java b/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java index 607174ee1a3..c7c80b27080 100644 --- a/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java +++ b/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java @@ -24,6 +24,7 @@ import io.deephaven.util.SafeCloseable; import io.deephaven.util.annotations.ScriptApi; import io.deephaven.util.annotations.VisibleForTesting; +import io.deephaven.util.thread.ThreadInitializationFactory; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jpy.KeyError; @@ -76,11 +77,12 @@ public class PythonDeephavenSession extends AbstractScriptSession scope) { - super(updateGraph, NoOp.INSTANCE, null); + public PythonDeephavenSession(final UpdateGraph updateGraph, + final ThreadInitializationFactory threadInitializationFactory, final PythonScope scope) { + super(updateGraph, threadInitializationFactory, NoOp.INSTANCE, null); evaluator = null; this.scope = (PythonScope) scope; diff --git a/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java b/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java index 56a5436adb7..932ce5bb54e 100644 --- a/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java +++ b/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java @@ -1,55 +1,21 @@ package io.deephaven.util.thread; -import io.deephaven.configuration.Configuration; - -import java.lang.reflect.InvocationTargetException; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; -import java.util.stream.Collectors; +import java.util.Collection; /** * Extension point to allow threads that will run user code from within the platform to be controlled by configuration. */ public interface ThreadInitializationFactory { - /* private */ String[] CONFIGURED_INITIALIZATION_TYPES = - Configuration.getInstance().getStringArrayFromProperty("thread.initialization"); - /* private */ List INITIALIZERS = Arrays.stream(CONFIGURED_INITIALIZATION_TYPES) - .filter(str -> !str.isBlank()) - .map(type -> { - try { - // noinspection unchecked - Class clazz = - (Class) Class.forName(type); - return clazz.getDeclaredConstructor().newInstance(); - } catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException - | InstantiationException | IllegalAccessException e) { - - // TODO (https://github.com/deephaven/deephaven-core/issues/4040): - // Currently the default property file is shared between both the java client and the server. This - // means that client-side usage will attempt to load the thread.initialization property intended for - // the server which is not available on the class path. - if (e instanceof ClassNotFoundException && type.startsWith("io.deephaven.server.")) { - return null; - } - - throw new IllegalArgumentException( - "Error instantiating initializer " + type + ", please check configuration", e); - } - }) - .filter(Objects::nonNull) - .collect(Collectors.toUnmodifiableList()); + ThreadInitializationFactory NO_OP = r -> r; - /** - * Chains configured initializers to run before/around any given runnable, returning a runnable intended to be run - * by a new thread. - */ - static Runnable wrapRunnable(Runnable runnable) { - Runnable acc = runnable; - for (ThreadInitializationFactory INITIALIZER : INITIALIZERS) { - acc = INITIALIZER.createInitializer(acc); - } - return acc; + static ThreadInitializationFactory of(Collection factories) { + return runnable -> { + Runnable acc = runnable; + for (ThreadInitializationFactory factory : factories) { + acc = factory.createInitializer(acc); + } + return acc; + }; } Runnable createInitializer(Runnable runnable); diff --git a/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java b/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java index 05643ce6166..a698a85b841 100644 --- a/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java +++ b/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java @@ -4,6 +4,7 @@ package io.deephaven.engine.context; import io.deephaven.auth.AuthContext; +import io.deephaven.engine.updategraph.OperationInitializer; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.util.SafeCloseable; import io.deephaven.util.annotations.ScriptApi; @@ -13,10 +14,26 @@ import java.util.Objects; import java.util.function.Supplier; +/** + * Container for context-specific objects, that can be activated on a thread or passed to certain operations. + * ExecutionContexts are immutable, and support a builder pattern to create new instances and "with" methods to + * customize existing ones. Any thread that interacts with the Deephaven engine will need to have an active + * ExecutionContext. + */ public class ExecutionContext { + /** + * Creates a new builder for an ExecutionContext, capturing the current thread's auth context, update graph, and + * operation initializer. Typically, this method should be called on a thread that already has an active + * ExecutionContext, to more easily reuse those. + * + * @return a new builder to create an ExecutionContext + */ public static Builder newBuilder() { - return new Builder(); + ExecutionContext existing = getContext(); + return new Builder() + .setUpdateGraph(existing.getUpdateGraph()) + .setOperationInitializer(existing.getInitializer()); } public static ExecutionContext makeExecutionContext(boolean isSystemic) { @@ -83,6 +100,7 @@ private static void setContext(final ExecutionContext context) { private final QueryScope queryScope; private final QueryCompiler queryCompiler; private final UpdateGraph updateGraph; + private final OperationInitializer operationInitializer; private ExecutionContext( final boolean isSystemic, @@ -90,13 +108,15 @@ private ExecutionContext( final QueryLibrary queryLibrary, final QueryScope queryScope, final QueryCompiler queryCompiler, - final UpdateGraph updateGraph) { + final UpdateGraph updateGraph, + OperationInitializer operationInitializer) { this.isSystemic = isSystemic; this.authContext = authContext; this.queryLibrary = Objects.requireNonNull(queryLibrary); this.queryScope = Objects.requireNonNull(queryScope); this.queryCompiler = Objects.requireNonNull(queryCompiler); - this.updateGraph = updateGraph; + this.updateGraph = Objects.requireNonNull(updateGraph); + this.operationInitializer = Objects.requireNonNull(operationInitializer); } /** @@ -110,7 +130,8 @@ public ExecutionContext withSystemic(boolean isSystemic) { if (isSystemic == this.isSystemic) { return this; } - return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph); + return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph, + operationInitializer); } /** @@ -124,7 +145,8 @@ public ExecutionContext withAuthContext(final AuthContext authContext) { if (authContext == this.authContext) { return this; } - return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph); + return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph, + operationInitializer); } /** @@ -138,7 +160,16 @@ public ExecutionContext withUpdateGraph(final UpdateGraph updateGraph) { if (updateGraph == this.updateGraph) { return this; } - return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph); + return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph, + operationInitializer); + } + + public ExecutionContext withOperationInitializer(final OperationInitializer operationInitializer) { + if (operationInitializer == this.operationInitializer) { + return this; + } + return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph, + operationInitializer); } /** @@ -198,6 +229,10 @@ public UpdateGraph getUpdateGraph() { return updateGraph; } + public OperationInitializer getInitializer() { + return operationInitializer; + } + @SuppressWarnings("unused") public static class Builder { private boolean isSystemic = false; @@ -208,6 +243,7 @@ public static class Builder { private QueryScope queryScope = PoisonedQueryScope.INSTANCE; private QueryCompiler queryCompiler = PoisonedQueryCompiler.INSTANCE; private UpdateGraph updateGraph = PoisonedUpdateGraph.INSTANCE; + private OperationInitializer operationInitializer = PoisonedOperationInitializer.INSTANCE; private Builder() { // propagate the auth context from the current context @@ -356,19 +392,32 @@ public Builder setUpdateGraph(UpdateGraph updateGraph) { /** * Use the current ExecutionContext's UpdateGraph instance. + * + * @deprecated The update graph is automatically captured, this method should no longer be needed. */ @ScriptApi + @Deprecated(forRemoval = true, since = "0.31") public Builder captureUpdateGraph() { this.updateGraph = getContext().getUpdateGraph(); return this; } + /** + * Use the specified operation initializer instead of the captured instance. + */ + @ScriptApi + public Builder setOperationInitializer(OperationInitializer operationInitializer) { + this.operationInitializer = operationInitializer; + return this; + } + /** * @return the newly instantiated ExecutionContext */ @ScriptApi public ExecutionContext build() { - return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph); + return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph, + operationInitializer); } } } diff --git a/engine/context/src/main/java/io/deephaven/engine/context/PoisonedOperationInitializer.java b/engine/context/src/main/java/io/deephaven/engine/context/PoisonedOperationInitializer.java new file mode 100644 index 00000000000..49474844755 --- /dev/null +++ b/engine/context/src/main/java/io/deephaven/engine/context/PoisonedOperationInitializer.java @@ -0,0 +1,30 @@ +package io.deephaven.engine.context; + +import io.deephaven.engine.updategraph.OperationInitializer; +import io.deephaven.util.ExecutionContextRegistrationException; + +import java.util.concurrent.Future; + +public class PoisonedOperationInitializer implements OperationInitializer { + + public static final PoisonedOperationInitializer INSTANCE = new PoisonedOperationInitializer(); + + private T fail() { + throw ExecutionContextRegistrationException.onFailedComponentAccess("OperationInitializer"); + } + + @Override + public boolean canParallelize() { + return fail(); + } + + @Override + public Future submit(Runnable runnable) { + return fail(); + } + + @Override + public int parallelismFactor() { + return fail(); + } +} diff --git a/engine/context/src/main/java/io/deephaven/engine/context/PoisonedUpdateGraph.java b/engine/context/src/main/java/io/deephaven/engine/context/PoisonedUpdateGraph.java index 6cfb3ad1283..31dc734785b 100644 --- a/engine/context/src/main/java/io/deephaven/engine/context/PoisonedUpdateGraph.java +++ b/engine/context/src/main/java/io/deephaven/engine/context/PoisonedUpdateGraph.java @@ -2,7 +2,6 @@ import io.deephaven.base.log.LogOutput; import io.deephaven.engine.updategraph.LogicalClock; -import io.deephaven.engine.updategraph.LogicalClockImpl; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.io.log.LogEntry; import io.deephaven.util.ExecutionContextRegistrationException; diff --git a/engine/context/src/test/java/io/deephaven/engine/context/TestQueryCompiler.java b/engine/context/src/test/java/io/deephaven/engine/context/TestQueryCompiler.java index 39d90603ced..02374a67bf0 100644 --- a/engine/context/src/test/java/io/deephaven/engine/context/TestQueryCompiler.java +++ b/engine/context/src/test/java/io/deephaven/engine/context/TestQueryCompiler.java @@ -66,7 +66,6 @@ public class TestQueryCompiler { @Before public void setUp() throws IOException { executionContextClosable = ExecutionContext.newBuilder() - .captureUpdateGraph() .captureQueryLibrary() .captureQueryScope() .setQueryCompiler(QueryCompiler.create(folder.newFolder(), TestQueryCompiler.class.getClassLoader())) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java index d862284777f..a136c87701c 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java @@ -2,6 +2,7 @@ import io.deephaven.base.log.LogOutput; import io.deephaven.base.verify.Assert; +import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.exceptions.CancellationException; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.RowSetFactory; @@ -325,10 +326,10 @@ abstract void enqueueSubFilters( */ abstract boolean doParallelization(long numberOfRows); - static boolean doParallelizationBase(long numberOfRows) { + boolean doParallelizationBase(long numberOfRows) { return !QueryTable.DISABLE_PARALLEL_WHERE && numberOfRows != 0 && (QueryTable.FORCE_PARALLEL_WHERE || numberOfRows / 2 > QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT) - && OperationInitializationThreadPool.canParallelize(); + && ExecutionContext.getContext().getInitializer().canParallelize(); } /** diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java index b00d195dba0..dc00456fdd2 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java @@ -1,6 +1,7 @@ package io.deephaven.engine.table.impl; import io.deephaven.base.verify.Assert; +import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.exceptions.CancellationException; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.table.ModifiedColumnSet; @@ -85,7 +86,7 @@ void enqueueSubFilters( private void enqueueJobs(Iterable subFilters) { for (NotificationQueue.Notification notification : subFilters) { - OperationInitializationThreadPool.executorService().submit(() -> { + ExecutionContext.getContext().getInitializer().submit(() -> { root.runningChildren.put(Thread.currentThread(), Thread.currentThread()); try { if (!root.cancelled.get()) { diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java index c12a0a605d3..286d4386d17 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java @@ -5,17 +5,22 @@ import io.deephaven.chunk.util.pools.MultiChunkPool; import io.deephaven.configuration.Configuration; +import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.updategraph.OperationInitializer; import io.deephaven.util.thread.NamingThreadFactory; import io.deephaven.util.thread.ThreadInitializationFactory; import org.jetbrains.annotations.NotNull; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -public class OperationInitializationThreadPool { +/** + * Implementation of OperationInitializer that delegates to a pool of threads. + */ +public class OperationInitializationThreadPool implements OperationInitializer { /** * The number of threads that will be used for parallel initialization in this process @@ -31,56 +36,42 @@ public class OperationInitializationThreadPool { NUM_THREADS = numThreads; } } + private final ThreadLocal isInitializationThread = ThreadLocal.withInitial(() -> false); - private static final ThreadLocal isInitializationThread = ThreadLocal.withInitial(() -> false); - - /** - * @return Whether the current thread is part of the OperationInitializationThreadPool's {@link #executorService()} - */ - public static boolean isInitializationThread() { - return isInitializationThread.get(); - } - - /** - * @return Whether the current thread can parallelize operations using the OperationInitializationThreadPool's - * {@link #executorService()} - */ - public static boolean canParallelize() { - return NUM_THREADS > 1 && !isInitializationThread(); - } - - private static final ThreadPoolExecutor executorService; + private final ThreadPoolExecutor executorService; - static { + public OperationInitializationThreadPool(ThreadInitializationFactory factory) { final ThreadGroup threadGroup = new ThreadGroup("OperationInitializationThreadPool"); final ThreadFactory threadFactory = new NamingThreadFactory( threadGroup, OperationInitializationThreadPool.class, "initializationExecutor", true) { @Override public Thread newThread(@NotNull final Runnable r) { - return super.newThread(ThreadInitializationFactory.wrapRunnable(() -> { + return super.newThread(factory.createInitializer(() -> { isInitializationThread.set(true); MultiChunkPool.enableDedicatedPoolForThisThread(); - r.run(); + ExecutionContext.newBuilder().setOperationInitializer(OperationInitializer.NON_PARALLELIZABLE) + .build().apply(r); })); } }; executorService = new ThreadPoolExecutor( NUM_THREADS, NUM_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), threadFactory); + + executorService.prestartAllCoreThreads(); } - /** - * @return The OperationInitializationThreadPool's {@link ExecutorService}; will be {@code null} if the - * OperationInitializationThreadPool has not been {@link #start() started} - */ - public static ExecutorService executorService() { - return executorService; + @Override + public boolean canParallelize() { + return NUM_THREADS > 1 && !isInitializationThread.get(); } - /** - * Start the OperationInitializationThreadPool. In practice, this just pre-starts all threads in the - * {@link #executorService()}. - */ - public static void start() { - executorService.prestartAllCoreThreads(); + @Override + public Future submit(Runnable runnable) { + return executorService.submit(runnable); + } + + @Override + public int parallelismFactor() { + return NUM_THREADS; } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index 53e6dc4f776..afeff5a30c8 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -1482,9 +1482,10 @@ this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSourc final CompletableFuture waitForResult = new CompletableFuture<>(); final JobScheduler jobScheduler; if ((QueryTable.FORCE_PARALLEL_SELECT_AND_UPDATE || QueryTable.ENABLE_PARALLEL_SELECT_AND_UPDATE) - && OperationInitializationThreadPool.canParallelize() + && ExecutionContext.getContext().getInitializer().canParallelize() && analyzer.allowCrossColumnParallelization()) { - jobScheduler = new OperationInitializationPoolJobScheduler(); + jobScheduler = new OperationInitializationPoolJobScheduler( + ExecutionContext.getContext().getInitializer()); } else { jobScheduler = ImmediateJobScheduler.INSTANCE; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java index e3ecf0a6b03..d401673b5f0 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java @@ -30,6 +30,7 @@ import io.deephaven.engine.table.impl.sources.UnionSourceManager; import io.deephaven.engine.table.iterators.ChunkedObjectColumnIterator; import io.deephaven.engine.updategraph.NotificationQueue.Dependency; +import io.deephaven.engine.updategraph.OperationInitializer; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.util.SafeCloseable; import io.deephaven.util.annotations.InternalUseOnly; @@ -296,7 +297,7 @@ public PartitionedTable transform( // Perform the transformation final Table resultTable = prepared.update(List.of(new TableTransformationColumn( constituentColumnName, - executionContext, + disableRecursiveParallelOperationInitialization(executionContext), prepared.isRefreshing() ? transformer : assertResultsStatic(transformer)))); // Make sure we have a valid result constituent definition @@ -318,6 +319,29 @@ public PartitionedTable transform( return resultPartitionedTable; } + /** + * Ensures that the returned executionContext will have an OperationInitializer compatible with being called by work + * already running on an initialization thread - it must either already return false for + * {@link OperationInitializer#canParallelize()}, or must be a different instance than the current context's + * OperationInitializer. + */ + private static ExecutionContext disableRecursiveParallelOperationInitialization(ExecutionContext provided) { + if (provided == null) { + return null; + } + ExecutionContext current = ExecutionContext.getContext(); + if (!provided.getInitializer().canParallelize()) { + return provided; + } + if (current.getInitializer() != provided.getInitializer()) { + return provided; + } + + // The current operation initializer isn't safe to submit more tasks that we will block on, replace + // with an instance that will never attempt to push work to another thread + return provided.withOperationInitializer(OperationInitializer.NON_PARALLELIZABLE); + } + @Override public PartitionedTable partitionedTransform( @NotNull final PartitionedTable other, @@ -353,7 +377,7 @@ public PartitionedTable partitionedTransform( .update(List.of(new BiTableTransformationColumn( constituentColumnName, RHS_CONSTITUENT, - executionContext, + disableRecursiveParallelOperationInitialization(executionContext), prepared.isRefreshing() ? transformer : assertResultsStatic(transformer)))) .dropColumns(RHS_CONSTITUENT); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableProxyImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableProxyImpl.java index 89e789cea6e..8cc0b210593 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableProxyImpl.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableProxyImpl.java @@ -134,7 +134,6 @@ private static ExecutionContext getOrCreateExecutionContext(final boolean requir if (context == null) { final ExecutionContext.Builder builder = ExecutionContext.newBuilder() .captureQueryCompiler() - .captureUpdateGraph() .markSystemic(); if (requiresFullContext) { builder.newQueryLibrary(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java index 6ec652d0282..aebaf7a975e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java @@ -26,7 +26,6 @@ import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.table.*; import io.deephaven.engine.table.impl.MemoizedOperationKey; -import io.deephaven.engine.table.impl.OperationInitializationThreadPool; import io.deephaven.engine.table.impl.QueryTable; import io.deephaven.engine.table.impl.SortingOrder; import io.deephaven.engine.table.impl.OperationSnapshotControl; @@ -253,14 +252,13 @@ public Result initialize(final boolean usePrev, final long beforeClo QueryTable.checkInitiateBinaryOperation(leftTable, rightTable); final JobScheduler jobScheduler; - if (OperationInitializationThreadPool.canParallelize()) { - jobScheduler = new OperationInitializationPoolJobScheduler(); + if (ExecutionContext.getContext().getInitializer().canParallelize()) { + jobScheduler = new OperationInitializationPoolJobScheduler(ExecutionContext.getContext().getInitializer()); } else { jobScheduler = ImmediateJobScheduler.INSTANCE; } final ExecutionContext executionContext = ExecutionContext.newBuilder() - .captureUpdateGraph() .markSystemic().build(); return new Result<>(staticRangeJoin(jobScheduler, executionContext)); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/UnionSourceManager.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/UnionSourceManager.java index 9392a00bec3..546dbcf19ac 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/UnionSourceManager.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/UnionSourceManager.java @@ -104,7 +104,6 @@ public UnionSourceManager(@NotNull final PartitionedTable partitionedTable) { executionContext = ExecutionContext.newBuilder() .markSystemic() - .captureUpdateGraph() .build(); } else { listenerRecorders = null; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java index 245d312793f..f6f0ca98558 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java @@ -300,13 +300,13 @@ class PhasedUpdateProcessor implements LogOutputAppendable { dirtyWindowOperators[winIdx].set(0, windows[winIdx].operators.length); } // Create the proper JobScheduler for the following parallel tasks - if (OperationInitializationThreadPool.canParallelize()) { - jobScheduler = new OperationInitializationPoolJobScheduler(); + if (ExecutionContext.getContext().getInitializer().canParallelize()) { + jobScheduler = + new OperationInitializationPoolJobScheduler(ExecutionContext.getContext().getInitializer()); } else { jobScheduler = ImmediateJobScheduler.INSTANCE; } executionContext = ExecutionContext.newBuilder() - .captureUpdateGraph() .markSystemic().build(); } else { // Determine which windows need to be computed. diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/OperationInitializationPoolJobScheduler.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/OperationInitializationPoolJobScheduler.java index 2722d61fd35..f3a9a45fc30 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/OperationInitializationPoolJobScheduler.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/OperationInitializationPoolJobScheduler.java @@ -4,6 +4,7 @@ import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.table.impl.OperationInitializationThreadPool; import io.deephaven.engine.table.impl.perf.BasePerformanceEntry; +import io.deephaven.engine.updategraph.OperationInitializer; import io.deephaven.io.log.impl.LogOutputStringImpl; import io.deephaven.util.SafeCloseable; import io.deephaven.util.process.ProcessEnvironment; @@ -11,7 +12,12 @@ import java.util.function.Consumer; public class OperationInitializationPoolJobScheduler implements JobScheduler { - final BasePerformanceEntry accumulatedBaseEntry = new BasePerformanceEntry(); + private final BasePerformanceEntry accumulatedBaseEntry = new BasePerformanceEntry(); + private final OperationInitializer threadPool; + + public OperationInitializationPoolJobScheduler(OperationInitializer threadPool) { + this.threadPool = threadPool; + } @Override public void submit( @@ -19,7 +25,7 @@ public void submit( final Runnable runnable, final LogOutputAppendable description, final Consumer onError) { - OperationInitializationThreadPool.executorService().submit(() -> { + threadPool.submit(() -> { final BasePerformanceEntry basePerformanceEntry = new BasePerformanceEntry(); basePerformanceEntry.onBaseEntryStart(); try (final SafeCloseable ignored = executionContext == null ? null : executionContext.open()) { diff --git a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java index a86225c9522..acc4ea026ee 100644 --- a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java +++ b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java @@ -98,6 +98,8 @@ public static Builder newBuilder(final String name) { "PeriodicUpdateGraph.targetCycleDurationMillis"; private final long defaultTargetCycleDurationMillis; private volatile long targetCycleDurationMillis; + private final ThreadInitializationFactory threadInitializationFactory; + /** * The number of threads in our executor service for dispatching notifications. If 1, then we don't actually use the @@ -115,11 +117,13 @@ public PeriodicUpdateGraph( final boolean allowUnitTestMode, final long targetCycleDurationMillis, final long minimumCycleDurationToLogNanos, - final int numUpdateThreads) { + final int numUpdateThreads, + final ThreadInitializationFactory threadInitializationFactory) { super(name, allowUnitTestMode, log, minimumCycleDurationToLogNanos); this.allowUnitTestMode = allowUnitTestMode; this.defaultTargetCycleDurationMillis = targetCycleDurationMillis; this.targetCycleDurationMillis = targetCycleDurationMillis; + this.threadInitializationFactory = threadInitializationFactory; if (numUpdateThreads <= 0) { this.updateThreads = Runtime.getRuntime().availableProcessors(); @@ -127,8 +131,9 @@ public PeriodicUpdateGraph( this.updateThreads = numUpdateThreads; } - refreshThread = new Thread(ThreadInitializationFactory.wrapRunnable(() -> { - configureRefreshThread(); + OperationInitializer captured = ExecutionContext.getContext().getInitializer(); + refreshThread = new Thread(threadInitializationFactory.createInitializer(() -> { + configureRefreshThread(captured); while (running) { Assert.eqFalse(this.allowUnitTestMode, "allowUnitTestMode"); refreshTablesAndFlushNotifications(); @@ -140,7 +145,7 @@ public PeriodicUpdateGraph( @Override public Thread newThread(@NotNull final Runnable r) { // Not a refresh thread, but should still be instrumented for debugging purposes. - return super.newThread(ThreadInitializationFactory.wrapRunnable(r)); + return super.newThread(threadInitializationFactory.createInitializer(r)); } }); } @@ -1092,8 +1097,9 @@ private NotificationProcessorThreadFactory(@NotNull final ThreadGroup threadGrou @Override public Thread newThread(@NotNull final Runnable r) { - return super.newThread(ThreadInitializationFactory.wrapRunnable(() -> { - configureRefreshThread(); + OperationInitializer captured = ExecutionContext.getContext().getInitializer(); + return super.newThread(threadInitializationFactory.createInitializer(() -> { + configureRefreshThread(captured); r.run(); })); } @@ -1112,8 +1118,9 @@ private UnitTestThreadFactory() { @Override public Thread newThread(@NotNull final Runnable r) { + OperationInitializer captured = ExecutionContext.getContext().getInitializer(); return super.newThread(() -> { - configureUnitTestRefreshThread(); + configureUnitTestRefreshThread(captured); r.run(); }); } @@ -1122,19 +1129,19 @@ public Thread newThread(@NotNull final Runnable r) { /** * Configure the primary UpdateGraph thread or one of the auxiliary notification processing threads. */ - private void configureRefreshThread() { + private void configureRefreshThread(OperationInitializer captured) { SystemicObjectTracker.markThreadSystemic(); MultiChunkPool.enableDedicatedPoolForThisThread(); isUpdateThread.set(true); - // Install this UpdateGraph via ExecutionContext for refresh threads + // Install this UpdateGraph via ExecutionContext for refresh threads, share the same operation initializer // noinspection resource - ExecutionContext.newBuilder().setUpdateGraph(this).build().open(); + ExecutionContext.newBuilder().setUpdateGraph(this).setOperationInitializer(captured).build().open(); } /** * Configure threads to be used for unit test processing. */ - private void configureUnitTestRefreshThread() { + private void configureUnitTestRefreshThread(OperationInitializer captured) { final Thread currentThread = Thread.currentThread(); final Thread.UncaughtExceptionHandler existing = currentThread.getUncaughtExceptionHandler(); currentThread.setUncaughtExceptionHandler((final Thread errorThread, final Throwable throwable) -> { @@ -1142,9 +1149,9 @@ private void configureUnitTestRefreshThread() { existing.uncaughtException(errorThread, throwable); }); isUpdateThread.set(true); - // Install this UpdateGraph via ExecutionContext for refresh threads + // Install this UpdateGraph and share operation initializer pool via ExecutionContext for refresh threads // noinspection resource - ExecutionContext.newBuilder().setUpdateGraph(this).build().open(); + ExecutionContext.newBuilder().setUpdateGraph(this).setOperationInitializer(captured).build().open(); } public static PeriodicUpdateGraph getInstance(final String name) { @@ -1160,6 +1167,7 @@ public static final class Builder { private String name; private int numUpdateThreads = -1; + private ThreadInitializationFactory threadInitializationFactory = runnable -> runnable; public Builder(String name) { this.name = name; @@ -1202,6 +1210,17 @@ public Builder numUpdateThreads(int numUpdateThreads) { return this; } + /** + * Sets a functional interface that adds custom initialization for threads started by this UpdateGraph. + * + * @param threadInitializationFactory the function to invoke on any runnables that will be used to start threads + * @return this builder + */ + public Builder threadInitializationFactory(ThreadInitializationFactory threadInitializationFactory) { + this.threadInitializationFactory = threadInitializationFactory; + return this; + } + /** * Constructs and returns a PeriodicUpdateGraph. It is an error to do so an instance already exists with the * name provided to this builder. @@ -1230,7 +1249,8 @@ private PeriodicUpdateGraph construct() { allowUnitTestMode, targetCycleDurationMillis, minimumCycleDurationToLogNanos, - numUpdateThreads); + numUpdateThreads, + threadInitializationFactory); } } } diff --git a/engine/table/src/main/java/io/deephaven/engine/util/AbstractScriptSession.java b/engine/table/src/main/java/io/deephaven/engine/util/AbstractScriptSession.java index 4466c6a7bb1..9bd1679b939 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/AbstractScriptSession.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/AbstractScriptSession.java @@ -18,10 +18,12 @@ import io.deephaven.engine.context.QueryScope; import io.deephaven.engine.context.QueryScopeParam; import io.deephaven.engine.table.hierarchical.HierarchicalTable; +import io.deephaven.engine.table.impl.OperationInitializationThreadPool; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.plugin.type.ObjectType; import io.deephaven.plugin.type.ObjectTypeLookup; import io.deephaven.util.SafeCloseable; +import io.deephaven.util.thread.ThreadInitializationFactory; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -70,6 +72,7 @@ private static void createOrClearDirectory(final File directory) { protected AbstractScriptSession( UpdateGraph updateGraph, + final ThreadInitializationFactory threadInitializationFactory, ObjectTypeLookup objectTypeLookup, @Nullable Listener changeListener) { this.objectTypeLookup = objectTypeLookup; @@ -90,6 +93,7 @@ protected AbstractScriptSession( .setQueryScope(queryScope) .setQueryCompiler(compilerContext) .setUpdateGraph(updateGraph) + .setOperationInitializer(new OperationInitializationThreadPool(threadInitializationFactory)) .build(); } diff --git a/engine/table/src/main/java/io/deephaven/engine/util/GroovyDeephavenSession.java b/engine/table/src/main/java/io/deephaven/engine/util/GroovyDeephavenSession.java index 6f644571dfc..c071ce0a9c7 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/GroovyDeephavenSession.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/GroovyDeephavenSession.java @@ -41,6 +41,7 @@ import io.deephaven.time.DateTimeUtils; import io.deephaven.util.QueryConstants; import io.deephaven.util.annotations.VisibleForTesting; +import io.deephaven.util.thread.ThreadInitializationFactory; import io.deephaven.util.type.ArrayTypeUtils; import io.deephaven.util.type.TypeUtils; import io.github.classgraph.ClassGraph; @@ -145,18 +146,20 @@ private String getNextScriptClassName() { public GroovyDeephavenSession( final UpdateGraph updateGraph, + final ThreadInitializationFactory threadInitializationFactory, final ObjectTypeLookup objectTypeLookup, final RunScripts runScripts) throws IOException { - this(updateGraph, objectTypeLookup, null, runScripts); + this(updateGraph, threadInitializationFactory, objectTypeLookup, null, runScripts); } public GroovyDeephavenSession( final UpdateGraph updateGraph, + final ThreadInitializationFactory threadInitializationFactory, ObjectTypeLookup objectTypeLookup, @Nullable final Listener changeListener, final RunScripts runScripts) throws IOException { - super(updateGraph, objectTypeLookup, changeListener); + super(updateGraph, threadInitializationFactory, objectTypeLookup, changeListener); addDefaultImports(consoleImports); if (INCLUDE_DEFAULT_IMPORTS_IN_LOADED_GROOVY) { diff --git a/engine/table/src/main/java/io/deephaven/engine/util/NoLanguageDeephavenSession.java b/engine/table/src/main/java/io/deephaven/engine/util/NoLanguageDeephavenSession.java index 1140aec2a1a..7c7a28c838e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/NoLanguageDeephavenSession.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/NoLanguageDeephavenSession.java @@ -5,6 +5,7 @@ import io.deephaven.engine.context.QueryScope; import io.deephaven.engine.updategraph.UpdateGraph; +import io.deephaven.util.thread.ThreadInitializationFactory; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -24,12 +25,14 @@ public class NoLanguageDeephavenSession extends AbstractScriptSession variables; - public NoLanguageDeephavenSession(final UpdateGraph updateGraph) { - this(updateGraph, SCRIPT_TYPE); + public NoLanguageDeephavenSession(final UpdateGraph updateGraph, + final ThreadInitializationFactory threadInitializationFactory) { + this(updateGraph, threadInitializationFactory, SCRIPT_TYPE); } - public NoLanguageDeephavenSession(final UpdateGraph updateGraph, final String scriptType) { - super(updateGraph, null, null); + public NoLanguageDeephavenSession(final UpdateGraph updateGraph, + final ThreadInitializationFactory threadInitializationFactory, final String scriptType) { + super(updateGraph, threadInitializationFactory, null, null); this.scriptType = scriptType; variables = new LinkedHashMap<>(); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/FuzzerTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/FuzzerTest.java index 5f321b33854..b179795b59d 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/FuzzerTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/FuzzerTest.java @@ -20,9 +20,9 @@ import io.deephaven.time.DateTimeUtils; import io.deephaven.engine.util.TableTools; import io.deephaven.engine.util.GroovyDeephavenSession; -import io.deephaven.engine.util.GroovyDeephavenSession.RunScripts; import io.deephaven.test.types.SerialTest; import io.deephaven.util.SafeCloseable; +import io.deephaven.util.thread.ThreadInitializationFactory; import org.jetbrains.annotations.Nullable; import org.junit.Assume; import org.junit.Rule; @@ -75,7 +75,10 @@ private GroovyDeephavenSession getGroovySession() throws IOException { private GroovyDeephavenSession getGroovySession(@Nullable Clock clock) throws IOException { final GroovyDeephavenSession session = new GroovyDeephavenSession( - ExecutionContext.getContext().getUpdateGraph(), NoOp.INSTANCE, RunScripts.serviceLoader()); + ExecutionContext.getContext().getUpdateGraph(), + ThreadInitializationFactory.NO_OP, + NoOp.INSTANCE, + GroovyDeephavenSession.RunScripts.serviceLoader()); session.getExecutionContext().open(); return session; } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java index 22c45cca47f..61c2a97f102 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java @@ -555,7 +555,6 @@ public void testCrossDependencies() { .captureQueryScopeVars("pauseHelper2") .captureQueryLibrary() .captureQueryCompiler() - .captureUpdateGraph() .build(); final PartitionedTable result2 = sourceTable2.update("SlowItDown=pauseHelper.pauseValue(k)").partitionBy("USym2").transform( @@ -646,7 +645,6 @@ public void testCrossDependencies2() { .captureQueryScopeVars("pauseHelper") .captureQueryLibrary() .captureQueryCompiler() - .captureUpdateGraph() .build(); final PartitionedTable result2 = sourceTable2.partitionBy("USym2").transform(executionContext, t -> t.withAttributes(Map.of(BaseTable.TEST_SOURCE_TABLE_ATTRIBUTE, "true")) @@ -935,7 +933,6 @@ protected Table e() { .newQueryScope() .captureQueryCompiler() .captureQueryLibrary() - .captureUpdateGraph() .build().open()) { ExecutionContext.getContext().getQueryScope().putParam("queryScopeVar", "queryScopeValue"); @@ -996,7 +993,6 @@ public void testTransformDependencyCorrectness() { final ExecutionContext executionContext = ExecutionContext.newBuilder() .emptyQueryScope() .newQueryLibrary() - .captureUpdateGraph() .captureQueryCompiler() .build(); final PartitionedTable transformed = partitioned.transform(executionContext, tableIn -> { diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java index 1a3fb62bf8b..58ef8ac5996 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java @@ -3006,7 +3006,13 @@ public void testMemoize() { } public void testMemoizeConcurrent() { - final ExecutorService dualPool = Executors.newFixedThreadPool(2); + final ExecutorService dualPool = Executors.newFixedThreadPool(2, new ThreadFactory() { + @Override + public Thread newThread(Runnable runnable) { + ExecutionContext captured = ExecutionContext.getContext(); + return new Thread(() -> captured.apply(runnable)); + } + }); final boolean old = QueryTable.setMemoizeResults(true); try { diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java index 5d6621d0eb0..4d511bb2a80 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java @@ -833,7 +833,7 @@ public void testInterFilterInterruption() { // we want to make sure we can push something through the thread pool and are not hogging it final CountDownLatch latch = new CountDownLatch(1); - OperationInitializationThreadPool.executorService().submit(latch::countDown); + ExecutionContext.getContext().getInitializer().submit(latch::countDown); waitForLatch(latch); assertEquals(0, fastCounter.invokes.get()); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestConditionFilterGeneration.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestConditionFilterGeneration.java index 5c208d88db7..bc2f176fa87 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestConditionFilterGeneration.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestConditionFilterGeneration.java @@ -30,7 +30,6 @@ public void setUp() { .newQueryLibrary("DEFAULT") .captureQueryCompiler() .captureQueryScope() - .captureUpdateGraph() .build().open(); } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestFormulaColumnGeneration.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestFormulaColumnGeneration.java index 1265f4ae1a1..bf3621f4f81 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestFormulaColumnGeneration.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestFormulaColumnGeneration.java @@ -49,7 +49,6 @@ public void setUp() { .newQueryLibrary("DEFAULT") .captureQueryCompiler() .captureQueryScope() - .captureUpdateGraph() .build().open(); } diff --git a/engine/table/src/test/java/io/deephaven/engine/updategraph/impl/TestEventDrivenUpdateGraph.java b/engine/table/src/test/java/io/deephaven/engine/updategraph/impl/TestEventDrivenUpdateGraph.java index 7ad90534b13..ed398ca15c3 100644 --- a/engine/table/src/test/java/io/deephaven/engine/updategraph/impl/TestEventDrivenUpdateGraph.java +++ b/engine/table/src/test/java/io/deephaven/engine/updategraph/impl/TestEventDrivenUpdateGraph.java @@ -23,6 +23,7 @@ import java.nio.file.Path; import java.util.Collections; +import static io.deephaven.engine.context.TestExecutionContext.OPERATION_INITIALIZATION; import static io.deephaven.engine.util.TableTools.*; import static org.junit.Assert.assertEquals; @@ -105,8 +106,13 @@ private QueryCompiler compilerForUnitTests() { public void testSimpleAdd() { final EventDrivenUpdateGraph eventDrivenUpdateGraph = EventDrivenUpdateGraph.newBuilder("TestEDUG").build(); - final ExecutionContext context = ExecutionContext.newBuilder().setUpdateGraph(eventDrivenUpdateGraph) - .emptyQueryScope().newQueryLibrary().setQueryCompiler(compilerForUnitTests()).build(); + final ExecutionContext context = ExecutionContext.newBuilder() + .setUpdateGraph(eventDrivenUpdateGraph) + .emptyQueryScope() + .newQueryLibrary() + .setOperationInitializer(OPERATION_INITIALIZATION) + .setQueryCompiler(compilerForUnitTests()) + .build(); try (final SafeCloseable ignored = context.open()) { final SourceThatRefreshes sourceThatRefreshes = new SourceThatRefreshes(eventDrivenUpdateGraph); final Table updated = @@ -125,8 +131,13 @@ public void testSimpleAdd() { public void testSimpleModify() { final EventDrivenUpdateGraph eventDrivenUpdateGraph = new EventDrivenUpdateGraph.Builder("TestEDUG").build(); - final ExecutionContext context = ExecutionContext.newBuilder().setUpdateGraph(eventDrivenUpdateGraph) - .emptyQueryScope().newQueryLibrary().setQueryCompiler(compilerForUnitTests()).build(); + final ExecutionContext context = ExecutionContext.newBuilder() + .setUpdateGraph(eventDrivenUpdateGraph) + .emptyQueryScope() + .newQueryLibrary() + .setOperationInitializer(OPERATION_INITIALIZATION) + .setQueryCompiler(compilerForUnitTests()) + .build(); try (final SafeCloseable ignored = context.open()) { final SourceThatModifiesItself modifySource = new SourceThatModifiesItself(eventDrivenUpdateGraph); final Table updated = @@ -182,8 +193,13 @@ public void testUpdatePerformanceTracker() { defaultUpdateGraph.requestRefresh(); final Table inRange; - final ExecutionContext context = ExecutionContext.newBuilder().setUpdateGraph(defaultUpdateGraph) - .emptyQueryScope().newQueryLibrary().setQueryCompiler(compilerForUnitTests()).build(); + final ExecutionContext context = ExecutionContext.newBuilder() + .setUpdateGraph(defaultUpdateGraph) + .emptyQueryScope() + .newQueryLibrary() + .setQueryCompiler(compilerForUnitTests()) + .setOperationInitializer(OPERATION_INITIALIZATION) + .build(); try (final SafeCloseable ignored = context.open()) { final Table uptAgged = upt.where("!isNull(EntryId)").aggBy( Aggregation.AggSum("UsageNanos", "InvocationCount", "RowsModified"), @@ -223,8 +239,13 @@ static public T sleepValue(long duration, T retVal) { private Object doWork(final EventDrivenUpdateGraph eventDrivenUpdateGraph, final int durationMillis, final int steps) { - final ExecutionContext context = ExecutionContext.newBuilder().setUpdateGraph(eventDrivenUpdateGraph) - .emptyQueryScope().newQueryLibrary().setQueryCompiler(compilerForUnitTests()).build(); + final ExecutionContext context = ExecutionContext.newBuilder() + .setUpdateGraph(eventDrivenUpdateGraph) + .emptyQueryScope() + .newQueryLibrary() + .setQueryCompiler(compilerForUnitTests()) + .setOperationInitializer(OPERATION_INITIALIZATION) + .build(); try (final SafeCloseable ignored = context.open()) { final SourceThatModifiesItself modifySource = new SourceThatModifiesItself(eventDrivenUpdateGraph); final Table updated = diff --git a/engine/table/src/test/java/io/deephaven/engine/util/scripts/TestGroovyDeephavenSession.java b/engine/table/src/test/java/io/deephaven/engine/util/scripts/TestGroovyDeephavenSession.java index 572d71da8c9..e1074176a6e 100644 --- a/engine/table/src/test/java/io/deephaven/engine/util/scripts/TestGroovyDeephavenSession.java +++ b/engine/table/src/test/java/io/deephaven/engine/util/scripts/TestGroovyDeephavenSession.java @@ -16,6 +16,7 @@ import io.deephaven.function.Sort; import io.deephaven.plugin.type.ObjectTypeLookup.NoOp; import io.deephaven.util.SafeCloseable; +import io.deephaven.util.thread.ThreadInitializationFactory; import org.apache.commons.lang3.mutable.MutableInt; import org.junit.After; import org.junit.Assert; @@ -48,7 +49,7 @@ public void setup() throws IOException { livenessScope = new LivenessScope(); LivenessScopeStack.push(livenessScope); session = new GroovyDeephavenSession( - ExecutionContext.getContext().getUpdateGraph(), NoOp.INSTANCE, null, + ExecutionContext.getContext().getUpdateGraph(), ThreadInitializationFactory.NO_OP, NoOp.INSTANCE, null, GroovyDeephavenSession.RunScripts.none()); executionContext = session.getExecutionContext().open(); } diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java b/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java index 2c64edea940..a0ee07d8112 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java @@ -1,9 +1,15 @@ package io.deephaven.engine.context; import io.deephaven.auth.AuthContext; +import io.deephaven.engine.table.impl.OperationInitializationThreadPool; import io.deephaven.engine.testutil.ControlledUpdateGraph; +import io.deephaven.util.thread.ThreadInitializationFactory; public class TestExecutionContext { + public static final ControlledUpdateGraph UPDATE_GRAPH = new ControlledUpdateGraph(); + + public static final OperationInitializationThreadPool OPERATION_INITIALIZATION = + new OperationInitializationThreadPool(ThreadInitializationFactory.NO_OP); public static ExecutionContext createForUnitTests() { return new ExecutionContext.Builder(new AuthContext.SuperUser()) @@ -11,7 +17,8 @@ public static ExecutionContext createForUnitTests() { .newQueryScope() .newQueryLibrary() .setQueryCompiler(QueryCompiler.createForUnitTests()) - .setUpdateGraph(ControlledUpdateGraph.INSTANCE) + .setUpdateGraph(UPDATE_GRAPH) + .setOperationInitializer(OPERATION_INITIALIZATION) .build(); } } diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java index a24c1778486..0ca0d815015 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java @@ -1,13 +1,11 @@ package io.deephaven.engine.testutil; import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph; +import io.deephaven.util.thread.ThreadInitializationFactory; // TODO (deephaven-core#3886): Extract test functionality from PeriodicUpdateGraph public class ControlledUpdateGraph extends PeriodicUpdateGraph { - - public static final ControlledUpdateGraph INSTANCE = new ControlledUpdateGraph(); - - private ControlledUpdateGraph() { - super("TEST", true, 1000, 25, -1); + public ControlledUpdateGraph() { + super("TEST", true, 1000, 25, -1, ThreadInitializationFactory.NO_OP); } } diff --git a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java new file mode 100644 index 00000000000..d5d4337e292 --- /dev/null +++ b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java @@ -0,0 +1,42 @@ +package io.deephaven.engine.updategraph; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; + +/** + * Provides guidance for initialization operations on how they can parallelize. + */ +public interface OperationInitializer { + OperationInitializer NON_PARALLELIZABLE = new OperationInitializer() { + @Override + public boolean canParallelize() { + return false; + } + + @Override + public Future submit(Runnable runnable) { + runnable.run(); + return CompletableFuture.completedFuture(null); + } + + @Override + public int parallelismFactor() { + return 1; + } + }; + + /** + * Whether the current thread can parallelize operations using this OperationInitialization. + */ + boolean canParallelize(); + + /** + * Submits a task to run in this thread pool. + */ + Future submit(Runnable runnable); + + /** + * Number of threads that are potentially available. + */ + int parallelismFactor(); +} diff --git a/props/configs/src/main/resources/dh-defaults.prop b/props/configs/src/main/resources/dh-defaults.prop index 99ff56f26cb..be2dcba5eb9 100644 --- a/props/configs/src/main/resources/dh-defaults.prop +++ b/props/configs/src/main/resources/dh-defaults.prop @@ -60,7 +60,3 @@ client.configuration.list=java.version,deephaven.version,barrage.version,http.se # jar, and a class that is found in that jar. Any such keys will be made available to the client.configuration.list # as .version. client.version.list=deephaven=io.deephaven.engine.table.Table,barrage=io.deephaven.barrage.flatbuf.BarrageMessageWrapper - - -# Specifies additional setup to run on threads that can perform table operations with user code. Comma-separated list, instances must be of type io.deephaven.util.thread.ThreadInitializationFactory -thread.initialization=io.deephaven.server.console.python.DebuggingInitializer diff --git a/props/test-configs/src/main/resources/dh-tests.prop b/props/test-configs/src/main/resources/dh-tests.prop index 64dfddae10a..f7d2503aa35 100644 --- a/props/test-configs/src/main/resources/dh-tests.prop +++ b/props/test-configs/src/main/resources/dh-tests.prop @@ -102,4 +102,3 @@ client.version.list= authentication.anonymous.warn=false deephaven.console.type=none -thread.initialization= diff --git a/py/server/test_helper/__init__.py b/py/server/test_helper/__init__.py index 4cc93ba1eaa..1dc3eaaca55 100644 --- a/py/server/test_helper/__init__.py +++ b/py/server/test_helper/__init__.py @@ -71,8 +71,9 @@ def start_jvm_for_tests(jvm_props: Dict[str, str] = None): global py_dh_session _JPeriodicUpdateGraph = jpy.get_type("io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph") _j_test_update_graph = _JPeriodicUpdateGraph.newBuilder(_JPeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME).existingOrBuild() + no_op_operation_initializer = jpy.get_type("io.deephaven.util.thread.ThreadInitializationFactory").NO_OP _JPythonScriptSession = jpy.get_type("io.deephaven.integrations.python.PythonDeephavenSession") - py_dh_session = _JPythonScriptSession(_j_test_update_graph, py_scope_jpy) + py_dh_session = _JPythonScriptSession(_j_test_update_graph, no_op_operation_initializer, py_scope_jpy) def _expand_wildcards_in_list(elements): diff --git a/python-engine-test/src/test/java/io/deephaven/engine/table/impl/select/TestConditionFilter.java b/python-engine-test/src/test/java/io/deephaven/engine/table/impl/select/TestConditionFilter.java index a9b683a8630..9c3c1f38ab9 100644 --- a/python-engine-test/src/test/java/io/deephaven/engine/table/impl/select/TestConditionFilter.java +++ b/python-engine-test/src/test/java/io/deephaven/engine/table/impl/select/TestConditionFilter.java @@ -21,6 +21,7 @@ import io.deephaven.engine.util.PythonScopeJpyImpl; import io.deephaven.engine.table.ColumnSource; import io.deephaven.jpy.PythonTest; +import io.deephaven.util.thread.ThreadInitializationFactory; import org.apache.commons.lang3.exception.ExceptionUtils; import org.jpy.PyInputMode; import org.jpy.PyModule; @@ -376,6 +377,7 @@ private void check(String expression, Predicate> testPredica if (pythonScope == null) { final ExecutionContext context = new PythonDeephavenSession( ExecutionContext.getDefaultContext().getUpdateGraph(), + ThreadInitializationFactory.NO_OP, new PythonScopeJpyImpl(getMainGlobals().asDict())).getExecutionContext(); pythonScope = context.getQueryScope(); context.open(); diff --git a/server/src/main/java/io/deephaven/server/console/NoConsoleSessionModule.java b/server/src/main/java/io/deephaven/server/console/NoConsoleSessionModule.java index eef5255c7e4..a460bc6f1f7 100644 --- a/server/src/main/java/io/deephaven/server/console/NoConsoleSessionModule.java +++ b/server/src/main/java/io/deephaven/server/console/NoConsoleSessionModule.java @@ -12,6 +12,7 @@ import io.deephaven.engine.util.NoLanguageDeephavenSession; import io.deephaven.engine.util.ScriptSession; import io.deephaven.server.console.groovy.InitScriptsModule; +import io.deephaven.util.thread.ThreadInitializationFactory; import javax.inject.Named; @@ -26,7 +27,8 @@ ScriptSession bindScriptSession(NoLanguageDeephavenSession noLanguageSession) { @Provides NoLanguageDeephavenSession bindNoLanguageSession( - @Named(PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME) final UpdateGraph updateGraph) { - return new NoLanguageDeephavenSession(updateGraph); + @Named(PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME) final UpdateGraph updateGraph, + ThreadInitializationFactory threadInitializationFactory) { + return new NoLanguageDeephavenSession(updateGraph, threadInitializationFactory); } } diff --git a/server/src/main/java/io/deephaven/server/console/SessionToExecutionStateModule.java b/server/src/main/java/io/deephaven/server/console/SessionToExecutionStateModule.java deleted file mode 100644 index 633febd1a1e..00000000000 --- a/server/src/main/java/io/deephaven/server/console/SessionToExecutionStateModule.java +++ /dev/null @@ -1,19 +0,0 @@ -package io.deephaven.server.console; - -import dagger.Module; -import dagger.Provides; -import io.deephaven.engine.context.ExecutionContext; -import io.deephaven.engine.util.ScriptSession; -import io.deephaven.server.auth.AuthorizationProvider; - -/** - * Deprecated: use {@link ExecutionContextModule} instead. - */ -@Deprecated(since = "0.26.0", forRemoval = true) -@Module -public interface SessionToExecutionStateModule { - @Provides - static ExecutionContext bindExecutionContext(ScriptSession session, AuthorizationProvider authProvider) { - return ExecutionContextModule.bindExecutionContext(session, authProvider); - } -} diff --git a/server/src/main/java/io/deephaven/server/console/groovy/GroovyConsoleSessionModule.java b/server/src/main/java/io/deephaven/server/console/groovy/GroovyConsoleSessionModule.java index eeeead6567f..c5e8158568d 100644 --- a/server/src/main/java/io/deephaven/server/console/groovy/GroovyConsoleSessionModule.java +++ b/server/src/main/java/io/deephaven/server/console/groovy/GroovyConsoleSessionModule.java @@ -13,6 +13,7 @@ import io.deephaven.engine.util.GroovyDeephavenSession.RunScripts; import io.deephaven.engine.util.ScriptSession; import io.deephaven.plugin.type.ObjectTypeLookup; +import io.deephaven.util.thread.ThreadInitializationFactory; import javax.inject.Named; import java.io.IOException; @@ -30,11 +31,12 @@ ScriptSession bindScriptSession(final GroovyDeephavenSession groovySession) { @Provides GroovyDeephavenSession bindGroovySession( @Named(PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME) final UpdateGraph updateGraph, + ThreadInitializationFactory threadInitializationFactory, final ObjectTypeLookup lookup, final ScriptSession.Listener listener, final RunScripts runScripts) { try { - return new GroovyDeephavenSession(updateGraph, lookup, listener, runScripts); + return new GroovyDeephavenSession(updateGraph, threadInitializationFactory, lookup, listener, runScripts); } catch (final IOException e) { throw new UncheckedIOException(e); } diff --git a/server/src/main/java/io/deephaven/server/console/python/PythonConsoleSessionModule.java b/server/src/main/java/io/deephaven/server/console/python/PythonConsoleSessionModule.java index 78999fb4c0c..d5f54618462 100644 --- a/server/src/main/java/io/deephaven/server/console/python/PythonConsoleSessionModule.java +++ b/server/src/main/java/io/deephaven/server/console/python/PythonConsoleSessionModule.java @@ -13,6 +13,7 @@ import io.deephaven.engine.util.ScriptSession; import io.deephaven.integrations.python.PythonDeephavenSession; import io.deephaven.plugin.type.ObjectTypeLookup; +import io.deephaven.util.thread.ThreadInitializationFactory; import javax.inject.Named; import java.io.IOException; @@ -30,11 +31,13 @@ ScriptSession bindScriptSession(PythonDeephavenSession pythonSession) { @Provides PythonDeephavenSession bindPythonSession( @Named(PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME) final UpdateGraph updateGraph, + final ThreadInitializationFactory threadInitializationFactory, final ObjectTypeLookup lookup, final ScriptSession.Listener listener, final PythonEvaluatorJpy pythonEvaluator) { try { - return new PythonDeephavenSession(updateGraph, lookup, listener, true, pythonEvaluator); + return new PythonDeephavenSession(updateGraph, threadInitializationFactory, lookup, listener, true, + pythonEvaluator); } catch (IOException e) { throw new UncheckedIOException("Unable to run python startup scripts", e); } diff --git a/server/src/main/java/io/deephaven/server/console/python/PythonDebuggingModule.java b/server/src/main/java/io/deephaven/server/console/python/PythonDebuggingModule.java new file mode 100644 index 00000000000..7f4d70ab530 --- /dev/null +++ b/server/src/main/java/io/deephaven/server/console/python/PythonDebuggingModule.java @@ -0,0 +1,16 @@ +// +// Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending +// +package io.deephaven.server.console.python; + +import dagger.Binds; +import dagger.Module; +import dagger.multibindings.IntoSet; +import io.deephaven.util.thread.ThreadInitializationFactory; + +@Module +public interface PythonDebuggingModule { + @Binds + @IntoSet + ThreadInitializationFactory bindDebuggingInitializer(DebuggingInitializer debuggingInitializer); +} diff --git a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java index a13a29da41e..8ccc367a6b7 100644 --- a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java +++ b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java @@ -144,9 +144,6 @@ public DeephavenApiServer run() throws IOException, ClassNotFoundException, Time // noinspection resource executionContextProvider.get().open(); - log.info().append("Starting Operation Initialization Thread Pool...").endl(); - OperationInitializationThreadPool.start(); - log.info().append("Starting Update Graph...").endl(); getUpdateGraph().cast().start(); diff --git a/server/src/main/java/io/deephaven/server/runner/scheduler/SchedulerModule.java b/server/src/main/java/io/deephaven/server/runner/scheduler/SchedulerModule.java index c6710570b38..26f4b0b32c5 100644 --- a/server/src/main/java/io/deephaven/server/runner/scheduler/SchedulerModule.java +++ b/server/src/main/java/io/deephaven/server/runner/scheduler/SchedulerModule.java @@ -2,6 +2,7 @@ import dagger.Module; import dagger.Provides; +import dagger.multibindings.ElementsIntoSet; import io.deephaven.base.clock.Clock; import io.deephaven.chunk.util.pools.MultiChunkPool; import io.deephaven.engine.context.ExecutionContext; @@ -16,6 +17,8 @@ import javax.inject.Named; import javax.inject.Singleton; +import java.util.Collections; +import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -31,13 +34,25 @@ */ @Module public class SchedulerModule { + @Provides + @ElementsIntoSet + static Set primeThreadInitializers() { + return Collections.emptySet(); + } + + @Provides + static ThreadInitializationFactory provideThreadInitializationFactory(Set factories) { + return ThreadInitializationFactory.of(factories); + } @Provides @Singleton public static Scheduler provideScheduler( final @Named(PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME) UpdateGraph updateGraph, - final @Named("scheduler.poolSize") int poolSize) { - final ThreadFactory concurrentThreadFactory = new ThreadFactory("Scheduler-Concurrent", updateGraph); + final @Named("scheduler.poolSize") int poolSize, + final ThreadInitializationFactory initializationFactory) { + final ThreadFactory concurrentThreadFactory = + new ThreadFactory("Scheduler-Concurrent", updateGraph, initializationFactory); final ScheduledExecutorService concurrentExecutor = new ScheduledThreadPoolExecutor(poolSize, concurrentThreadFactory) { @Override @@ -47,7 +62,8 @@ protected void afterExecute(final Runnable task, final Throwable error) { } }; - final ThreadFactory serialThreadFactory = new ThreadFactory("Scheduler-Serial", updateGraph); + final ThreadFactory serialThreadFactory = + new ThreadFactory("Scheduler-Serial", updateGraph, initializationFactory); final ExecutorService serialExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), serialThreadFactory) { @@ -63,15 +79,18 @@ protected void afterExecute(final Runnable task, final Throwable error) { private static class ThreadFactory extends NamingThreadFactory { private final UpdateGraph updateGraph; + private final ThreadInitializationFactory initializationFactory; - public ThreadFactory(final String name, final UpdateGraph updateGraph) { + public ThreadFactory(final String name, final UpdateGraph updateGraph, + ThreadInitializationFactory initializationFactory) { super(DeephavenApiServer.class, name); this.updateGraph = updateGraph; + this.initializationFactory = initializationFactory; } @Override public Thread newThread(@NotNull final Runnable r) { - return super.newThread(ThreadInitializationFactory.wrapRunnable(() -> { + return super.newThread(initializationFactory.createInitializer(() -> { MultiChunkPool.enableDedicatedPoolForThisThread(); // noinspection resource ExecutionContext.getContext().withUpdateGraph(updateGraph).open(); diff --git a/server/src/test/java/io/deephaven/server/appmode/ApplicationServiceGrpcImplTest.java b/server/src/test/java/io/deephaven/server/appmode/ApplicationServiceGrpcImplTest.java index 82a965462f0..b11c16fece8 100644 --- a/server/src/test/java/io/deephaven/server/appmode/ApplicationServiceGrpcImplTest.java +++ b/server/src/test/java/io/deephaven/server/appmode/ApplicationServiceGrpcImplTest.java @@ -17,6 +17,7 @@ import io.deephaven.server.session.SessionState; import io.deephaven.server.util.TestControlledScheduler; import io.deephaven.auth.AuthContext; +import io.deephaven.util.thread.ThreadInitializationFactory; import io.grpc.Context; import io.grpc.stub.StreamObserver; import org.junit.After; @@ -87,7 +88,7 @@ public void onListFieldsSubscribeFailedObserver() { // trigger a change ScriptSession scriptSession = new NoLanguageDeephavenSession( - ExecutionContext.getDefaultContext().getUpdateGraph()); + ExecutionContext.getDefaultContext().getUpdateGraph(), ThreadInitializationFactory.NO_OP); scriptSession.setVariable("key", "hello world"); ScriptSession.Changes changes = new ScriptSession.Changes(); changes.created.put("key", "Object"); diff --git a/server/src/test/java/io/deephaven/server/appmode/ApplicationTest.java b/server/src/test/java/io/deephaven/server/appmode/ApplicationTest.java index ac17a47a807..8214cd3e313 100644 --- a/server/src/test/java/io/deephaven/server/appmode/ApplicationTest.java +++ b/server/src/test/java/io/deephaven/server/appmode/ApplicationTest.java @@ -13,6 +13,7 @@ import io.deephaven.integrations.python.PythonDeephavenSession; import io.deephaven.engine.util.PythonEvaluatorJpy; import io.deephaven.plugin.type.ObjectTypeLookup.NoOp; +import io.deephaven.util.thread.ThreadInitializationFactory; import org.junit.After; import org.junit.Ignore; import org.junit.Rule; @@ -51,7 +52,9 @@ public void app00() { @Test public void app01() throws IOException { session = new GroovyDeephavenSession( - ExecutionContext.getContext().getUpdateGraph(), NoOp.INSTANCE, null, + ExecutionContext.getContext().getUpdateGraph(), + ThreadInitializationFactory.NO_OP, + NoOp.INSTANCE, null, GroovyDeephavenSession.RunScripts.none()); ApplicationState app = ApplicationFactory.create(ApplicationConfigs.testAppDir(), ApplicationConfigs.app01(), session, new NoopStateListener()); @@ -65,7 +68,9 @@ public void app01() throws IOException { @Ignore("TODO: deephaven-core#1741 python test needs to run in a container") public void app02() throws IOException, InterruptedException, TimeoutException { session = new PythonDeephavenSession( - ExecutionContext.getDefaultContext().getUpdateGraph(), NoOp.INSTANCE, null, false, + ExecutionContext.getDefaultContext().getUpdateGraph(), + ThreadInitializationFactory.NO_OP, + NoOp.INSTANCE, null, false, PythonEvaluatorJpy.withGlobalCopy()); ApplicationState app = ApplicationFactory.create(ApplicationConfigs.testAppDir(), ApplicationConfigs.app02(), session, new NoopStateListener()); diff --git a/server/test/src/main/java/io/deephaven/server/test/FlightMessageRoundTripTest.java b/server/test/src/main/java/io/deephaven/server/test/FlightMessageRoundTripTest.java index 2edbd62bb80..67845489b7d 100644 --- a/server/test/src/main/java/io/deephaven/server/test/FlightMessageRoundTripTest.java +++ b/server/test/src/main/java/io/deephaven/server/test/FlightMessageRoundTripTest.java @@ -55,6 +55,7 @@ import io.deephaven.server.util.Scheduler; import io.deephaven.util.SafeCloseable; import io.deephaven.auth.AuthContext; +import io.deephaven.util.thread.ThreadInitializationFactory; import io.grpc.*; import io.grpc.CallOptions; import io.grpc.stub.ClientCalls; @@ -110,7 +111,7 @@ TicketResolver ticketResolver(ScopeTicketResolver resolver) { @Singleton @Provides AbstractScriptSession provideAbstractScriptSession(final UpdateGraph updateGraph) { - return new NoLanguageDeephavenSession(updateGraph, "non-script-session"); + return new NoLanguageDeephavenSession(updateGraph, ThreadInitializationFactory.NO_OP, "non-script-session"); } @Provides diff --git a/sphinx/source/conf.py b/sphinx/source/conf.py index 61ae8ca5b95..27accbdb119 100644 --- a/sphinx/source/conf.py +++ b/sphinx/source/conf.py @@ -108,7 +108,8 @@ _JUpdateGraph = jpy.get_type("io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph") docs_update_graph = _JUpdateGraph.newBuilder("PYTHON_DOCS").build() _JPythonScriptSession = jpy.get_type("io.deephaven.integrations.python.PythonDeephavenSession") -py_dh_session = _JPythonScriptSession(docs_update_graph, py_scope_jpy) +no_op_operation_initializer = jpy.get_type("io.deephaven.util.thread.ThreadInitializationFactory").NO_OP +py_dh_session = _JPythonScriptSession(docs_update_graph, no_op_operation_initializer, py_scope_jpy) py_dh_session.getExecutionContext().open() pygments_style = 'sphinx'