diff --git a/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java b/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java index 607174ee1a3..c7c80b27080 100644 --- a/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java +++ b/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java @@ -24,6 +24,7 @@ import io.deephaven.util.SafeCloseable; import io.deephaven.util.annotations.ScriptApi; import io.deephaven.util.annotations.VisibleForTesting; +import io.deephaven.util.thread.ThreadInitializationFactory; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jpy.KeyError; @@ -76,11 +77,12 @@ public class PythonDeephavenSession extends AbstractScriptSession scope) { - super(updateGraph, NoOp.INSTANCE, null); + public PythonDeephavenSession(final UpdateGraph updateGraph, + final ThreadInitializationFactory threadInitializationFactory, final PythonScope scope) { + super(updateGraph, threadInitializationFactory, NoOp.INSTANCE, null); evaluator = null; this.scope = (PythonScope) scope; diff --git a/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java b/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java index 56a5436adb7..932ce5bb54e 100644 --- a/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java +++ b/Util/src/main/java/io/deephaven/util/thread/ThreadInitializationFactory.java @@ -1,55 +1,21 @@ package io.deephaven.util.thread; -import io.deephaven.configuration.Configuration; - -import java.lang.reflect.InvocationTargetException; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; -import java.util.stream.Collectors; +import java.util.Collection; /** * Extension point to allow threads that will run user code from within the platform to be controlled by configuration. */ public interface ThreadInitializationFactory { - /* private */ String[] CONFIGURED_INITIALIZATION_TYPES = - Configuration.getInstance().getStringArrayFromProperty("thread.initialization"); - /* private */ List INITIALIZERS = Arrays.stream(CONFIGURED_INITIALIZATION_TYPES) - .filter(str -> !str.isBlank()) - .map(type -> { - try { - // noinspection unchecked - Class clazz = - (Class) Class.forName(type); - return clazz.getDeclaredConstructor().newInstance(); - } catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException - | InstantiationException | IllegalAccessException e) { - - // TODO (https://github.com/deephaven/deephaven-core/issues/4040): - // Currently the default property file is shared between both the java client and the server. This - // means that client-side usage will attempt to load the thread.initialization property intended for - // the server which is not available on the class path. - if (e instanceof ClassNotFoundException && type.startsWith("io.deephaven.server.")) { - return null; - } - - throw new IllegalArgumentException( - "Error instantiating initializer " + type + ", please check configuration", e); - } - }) - .filter(Objects::nonNull) - .collect(Collectors.toUnmodifiableList()); + ThreadInitializationFactory NO_OP = r -> r; - /** - * Chains configured initializers to run before/around any given runnable, returning a runnable intended to be run - * by a new thread. - */ - static Runnable wrapRunnable(Runnable runnable) { - Runnable acc = runnable; - for (ThreadInitializationFactory INITIALIZER : INITIALIZERS) { - acc = INITIALIZER.createInitializer(acc); - } - return acc; + static ThreadInitializationFactory of(Collection factories) { + return runnable -> { + Runnable acc = runnable; + for (ThreadInitializationFactory factory : factories) { + acc = factory.createInitializer(acc); + } + return acc; + }; } Runnable createInitializer(Runnable runnable); diff --git a/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java b/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java index 05643ce6166..a698a85b841 100644 --- a/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java +++ b/engine/context/src/main/java/io/deephaven/engine/context/ExecutionContext.java @@ -4,6 +4,7 @@ package io.deephaven.engine.context; import io.deephaven.auth.AuthContext; +import io.deephaven.engine.updategraph.OperationInitializer; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.util.SafeCloseable; import io.deephaven.util.annotations.ScriptApi; @@ -13,10 +14,26 @@ import java.util.Objects; import java.util.function.Supplier; +/** + * Container for context-specific objects, that can be activated on a thread or passed to certain operations. + * ExecutionContexts are immutable, and support a builder pattern to create new instances and "with" methods to + * customize existing ones. Any thread that interacts with the Deephaven engine will need to have an active + * ExecutionContext. + */ public class ExecutionContext { + /** + * Creates a new builder for an ExecutionContext, capturing the current thread's auth context, update graph, and + * operation initializer. Typically, this method should be called on a thread that already has an active + * ExecutionContext, to more easily reuse those. + * + * @return a new builder to create an ExecutionContext + */ public static Builder newBuilder() { - return new Builder(); + ExecutionContext existing = getContext(); + return new Builder() + .setUpdateGraph(existing.getUpdateGraph()) + .setOperationInitializer(existing.getInitializer()); } public static ExecutionContext makeExecutionContext(boolean isSystemic) { @@ -83,6 +100,7 @@ private static void setContext(final ExecutionContext context) { private final QueryScope queryScope; private final QueryCompiler queryCompiler; private final UpdateGraph updateGraph; + private final OperationInitializer operationInitializer; private ExecutionContext( final boolean isSystemic, @@ -90,13 +108,15 @@ private ExecutionContext( final QueryLibrary queryLibrary, final QueryScope queryScope, final QueryCompiler queryCompiler, - final UpdateGraph updateGraph) { + final UpdateGraph updateGraph, + OperationInitializer operationInitializer) { this.isSystemic = isSystemic; this.authContext = authContext; this.queryLibrary = Objects.requireNonNull(queryLibrary); this.queryScope = Objects.requireNonNull(queryScope); this.queryCompiler = Objects.requireNonNull(queryCompiler); - this.updateGraph = updateGraph; + this.updateGraph = Objects.requireNonNull(updateGraph); + this.operationInitializer = Objects.requireNonNull(operationInitializer); } /** @@ -110,7 +130,8 @@ public ExecutionContext withSystemic(boolean isSystemic) { if (isSystemic == this.isSystemic) { return this; } - return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph); + return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph, + operationInitializer); } /** @@ -124,7 +145,8 @@ public ExecutionContext withAuthContext(final AuthContext authContext) { if (authContext == this.authContext) { return this; } - return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph); + return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph, + operationInitializer); } /** @@ -138,7 +160,16 @@ public ExecutionContext withUpdateGraph(final UpdateGraph updateGraph) { if (updateGraph == this.updateGraph) { return this; } - return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph); + return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph, + operationInitializer); + } + + public ExecutionContext withOperationInitializer(final OperationInitializer operationInitializer) { + if (operationInitializer == this.operationInitializer) { + return this; + } + return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph, + operationInitializer); } /** @@ -198,6 +229,10 @@ public UpdateGraph getUpdateGraph() { return updateGraph; } + public OperationInitializer getInitializer() { + return operationInitializer; + } + @SuppressWarnings("unused") public static class Builder { private boolean isSystemic = false; @@ -208,6 +243,7 @@ public static class Builder { private QueryScope queryScope = PoisonedQueryScope.INSTANCE; private QueryCompiler queryCompiler = PoisonedQueryCompiler.INSTANCE; private UpdateGraph updateGraph = PoisonedUpdateGraph.INSTANCE; + private OperationInitializer operationInitializer = PoisonedOperationInitializer.INSTANCE; private Builder() { // propagate the auth context from the current context @@ -356,19 +392,32 @@ public Builder setUpdateGraph(UpdateGraph updateGraph) { /** * Use the current ExecutionContext's UpdateGraph instance. + * + * @deprecated The update graph is automatically captured, this method should no longer be needed. */ @ScriptApi + @Deprecated(forRemoval = true, since = "0.31") public Builder captureUpdateGraph() { this.updateGraph = getContext().getUpdateGraph(); return this; } + /** + * Use the specified operation initializer instead of the captured instance. + */ + @ScriptApi + public Builder setOperationInitializer(OperationInitializer operationInitializer) { + this.operationInitializer = operationInitializer; + return this; + } + /** * @return the newly instantiated ExecutionContext */ @ScriptApi public ExecutionContext build() { - return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph); + return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph, + operationInitializer); } } } diff --git a/engine/context/src/main/java/io/deephaven/engine/context/PoisonedOperationInitializer.java b/engine/context/src/main/java/io/deephaven/engine/context/PoisonedOperationInitializer.java new file mode 100644 index 00000000000..49474844755 --- /dev/null +++ b/engine/context/src/main/java/io/deephaven/engine/context/PoisonedOperationInitializer.java @@ -0,0 +1,30 @@ +package io.deephaven.engine.context; + +import io.deephaven.engine.updategraph.OperationInitializer; +import io.deephaven.util.ExecutionContextRegistrationException; + +import java.util.concurrent.Future; + +public class PoisonedOperationInitializer implements OperationInitializer { + + public static final PoisonedOperationInitializer INSTANCE = new PoisonedOperationInitializer(); + + private T fail() { + throw ExecutionContextRegistrationException.onFailedComponentAccess("OperationInitializer"); + } + + @Override + public boolean canParallelize() { + return fail(); + } + + @Override + public Future submit(Runnable runnable) { + return fail(); + } + + @Override + public int parallelismFactor() { + return fail(); + } +} diff --git a/engine/context/src/main/java/io/deephaven/engine/context/PoisonedUpdateGraph.java b/engine/context/src/main/java/io/deephaven/engine/context/PoisonedUpdateGraph.java index 6cfb3ad1283..31dc734785b 100644 --- a/engine/context/src/main/java/io/deephaven/engine/context/PoisonedUpdateGraph.java +++ b/engine/context/src/main/java/io/deephaven/engine/context/PoisonedUpdateGraph.java @@ -2,7 +2,6 @@ import io.deephaven.base.log.LogOutput; import io.deephaven.engine.updategraph.LogicalClock; -import io.deephaven.engine.updategraph.LogicalClockImpl; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.io.log.LogEntry; import io.deephaven.util.ExecutionContextRegistrationException; diff --git a/engine/context/src/test/java/io/deephaven/engine/context/TestQueryCompiler.java b/engine/context/src/test/java/io/deephaven/engine/context/TestQueryCompiler.java index 39d90603ced..02374a67bf0 100644 --- a/engine/context/src/test/java/io/deephaven/engine/context/TestQueryCompiler.java +++ b/engine/context/src/test/java/io/deephaven/engine/context/TestQueryCompiler.java @@ -66,7 +66,6 @@ public class TestQueryCompiler { @Before public void setUp() throws IOException { executionContextClosable = ExecutionContext.newBuilder() - .captureUpdateGraph() .captureQueryLibrary() .captureQueryScope() .setQueryCompiler(QueryCompiler.create(folder.newFolder(), TestQueryCompiler.class.getClassLoader())) diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java index d862284777f..a136c87701c 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/AbstractFilterExecution.java @@ -2,6 +2,7 @@ import io.deephaven.base.log.LogOutput; import io.deephaven.base.verify.Assert; +import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.exceptions.CancellationException; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.rowset.RowSetFactory; @@ -325,10 +326,10 @@ abstract void enqueueSubFilters( */ abstract boolean doParallelization(long numberOfRows); - static boolean doParallelizationBase(long numberOfRows) { + boolean doParallelizationBase(long numberOfRows) { return !QueryTable.DISABLE_PARALLEL_WHERE && numberOfRows != 0 && (QueryTable.FORCE_PARALLEL_WHERE || numberOfRows / 2 > QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT) - && OperationInitializationThreadPool.canParallelize(); + && ExecutionContext.getContext().getInitializer().canParallelize(); } /** diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java index b00d195dba0..dc00456fdd2 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/InitialFilterExecution.java @@ -1,6 +1,7 @@ package io.deephaven.engine.table.impl; import io.deephaven.base.verify.Assert; +import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.exceptions.CancellationException; import io.deephaven.engine.rowset.RowSet; import io.deephaven.engine.table.ModifiedColumnSet; @@ -85,7 +86,7 @@ void enqueueSubFilters( private void enqueueJobs(Iterable subFilters) { for (NotificationQueue.Notification notification : subFilters) { - OperationInitializationThreadPool.executorService().submit(() -> { + ExecutionContext.getContext().getInitializer().submit(() -> { root.runningChildren.put(Thread.currentThread(), Thread.currentThread()); try { if (!root.cancelled.get()) { diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java index c12a0a605d3..286d4386d17 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/OperationInitializationThreadPool.java @@ -5,17 +5,22 @@ import io.deephaven.chunk.util.pools.MultiChunkPool; import io.deephaven.configuration.Configuration; +import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.updategraph.OperationInitializer; import io.deephaven.util.thread.NamingThreadFactory; import io.deephaven.util.thread.ThreadInitializationFactory; import org.jetbrains.annotations.NotNull; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -public class OperationInitializationThreadPool { +/** + * Implementation of OperationInitializer that delegates to a pool of threads. + */ +public class OperationInitializationThreadPool implements OperationInitializer { /** * The number of threads that will be used for parallel initialization in this process @@ -31,56 +36,42 @@ public class OperationInitializationThreadPool { NUM_THREADS = numThreads; } } + private final ThreadLocal isInitializationThread = ThreadLocal.withInitial(() -> false); - private static final ThreadLocal isInitializationThread = ThreadLocal.withInitial(() -> false); - - /** - * @return Whether the current thread is part of the OperationInitializationThreadPool's {@link #executorService()} - */ - public static boolean isInitializationThread() { - return isInitializationThread.get(); - } - - /** - * @return Whether the current thread can parallelize operations using the OperationInitializationThreadPool's - * {@link #executorService()} - */ - public static boolean canParallelize() { - return NUM_THREADS > 1 && !isInitializationThread(); - } - - private static final ThreadPoolExecutor executorService; + private final ThreadPoolExecutor executorService; - static { + public OperationInitializationThreadPool(ThreadInitializationFactory factory) { final ThreadGroup threadGroup = new ThreadGroup("OperationInitializationThreadPool"); final ThreadFactory threadFactory = new NamingThreadFactory( threadGroup, OperationInitializationThreadPool.class, "initializationExecutor", true) { @Override public Thread newThread(@NotNull final Runnable r) { - return super.newThread(ThreadInitializationFactory.wrapRunnable(() -> { + return super.newThread(factory.createInitializer(() -> { isInitializationThread.set(true); MultiChunkPool.enableDedicatedPoolForThisThread(); - r.run(); + ExecutionContext.newBuilder().setOperationInitializer(OperationInitializer.NON_PARALLELIZABLE) + .build().apply(r); })); } }; executorService = new ThreadPoolExecutor( NUM_THREADS, NUM_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), threadFactory); + + executorService.prestartAllCoreThreads(); } - /** - * @return The OperationInitializationThreadPool's {@link ExecutorService}; will be {@code null} if the - * OperationInitializationThreadPool has not been {@link #start() started} - */ - public static ExecutorService executorService() { - return executorService; + @Override + public boolean canParallelize() { + return NUM_THREADS > 1 && !isInitializationThread.get(); } - /** - * Start the OperationInitializationThreadPool. In practice, this just pre-starts all threads in the - * {@link #executorService()}. - */ - public static void start() { - executorService.prestartAllCoreThreads(); + @Override + public Future submit(Runnable runnable) { + return executorService.submit(runnable); + } + + @Override + public int parallelismFactor() { + return NUM_THREADS; } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java index 53e6dc4f776..afeff5a30c8 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/QueryTable.java @@ -1482,9 +1482,10 @@ this, mode, columns, rowSet, getModifiedColumnSetForUpdates(), publishTheseSourc final CompletableFuture waitForResult = new CompletableFuture<>(); final JobScheduler jobScheduler; if ((QueryTable.FORCE_PARALLEL_SELECT_AND_UPDATE || QueryTable.ENABLE_PARALLEL_SELECT_AND_UPDATE) - && OperationInitializationThreadPool.canParallelize() + && ExecutionContext.getContext().getInitializer().canParallelize() && analyzer.allowCrossColumnParallelization()) { - jobScheduler = new OperationInitializationPoolJobScheduler(); + jobScheduler = new OperationInitializationPoolJobScheduler( + ExecutionContext.getContext().getInitializer()); } else { jobScheduler = ImmediateJobScheduler.INSTANCE; } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java index e3ecf0a6b03..d401673b5f0 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableImpl.java @@ -30,6 +30,7 @@ import io.deephaven.engine.table.impl.sources.UnionSourceManager; import io.deephaven.engine.table.iterators.ChunkedObjectColumnIterator; import io.deephaven.engine.updategraph.NotificationQueue.Dependency; +import io.deephaven.engine.updategraph.OperationInitializer; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.util.SafeCloseable; import io.deephaven.util.annotations.InternalUseOnly; @@ -296,7 +297,7 @@ public PartitionedTable transform( // Perform the transformation final Table resultTable = prepared.update(List.of(new TableTransformationColumn( constituentColumnName, - executionContext, + disableRecursiveParallelOperationInitialization(executionContext), prepared.isRefreshing() ? transformer : assertResultsStatic(transformer)))); // Make sure we have a valid result constituent definition @@ -318,6 +319,29 @@ public PartitionedTable transform( return resultPartitionedTable; } + /** + * Ensures that the returned executionContext will have an OperationInitializer compatible with being called by work + * already running on an initialization thread - it must either already return false for + * {@link OperationInitializer#canParallelize()}, or must be a different instance than the current context's + * OperationInitializer. + */ + private static ExecutionContext disableRecursiveParallelOperationInitialization(ExecutionContext provided) { + if (provided == null) { + return null; + } + ExecutionContext current = ExecutionContext.getContext(); + if (!provided.getInitializer().canParallelize()) { + return provided; + } + if (current.getInitializer() != provided.getInitializer()) { + return provided; + } + + // The current operation initializer isn't safe to submit more tasks that we will block on, replace + // with an instance that will never attempt to push work to another thread + return provided.withOperationInitializer(OperationInitializer.NON_PARALLELIZABLE); + } + @Override public PartitionedTable partitionedTransform( @NotNull final PartitionedTable other, @@ -353,7 +377,7 @@ public PartitionedTable partitionedTransform( .update(List.of(new BiTableTransformationColumn( constituentColumnName, RHS_CONSTITUENT, - executionContext, + disableRecursiveParallelOperationInitialization(executionContext), prepared.isRefreshing() ? transformer : assertResultsStatic(transformer)))) .dropColumns(RHS_CONSTITUENT); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableProxyImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableProxyImpl.java index 89e789cea6e..8cc0b210593 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableProxyImpl.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/partitioned/PartitionedTableProxyImpl.java @@ -134,7 +134,6 @@ private static ExecutionContext getOrCreateExecutionContext(final boolean requir if (context == null) { final ExecutionContext.Builder builder = ExecutionContext.newBuilder() .captureQueryCompiler() - .captureUpdateGraph() .markSystemic(); if (requiresFullContext) { builder.newQueryLibrary(); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java index 6ec652d0282..aebaf7a975e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/rangejoin/RangeJoinOperation.java @@ -26,7 +26,6 @@ import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.table.*; import io.deephaven.engine.table.impl.MemoizedOperationKey; -import io.deephaven.engine.table.impl.OperationInitializationThreadPool; import io.deephaven.engine.table.impl.QueryTable; import io.deephaven.engine.table.impl.SortingOrder; import io.deephaven.engine.table.impl.OperationSnapshotControl; @@ -253,14 +252,13 @@ public Result initialize(final boolean usePrev, final long beforeClo QueryTable.checkInitiateBinaryOperation(leftTable, rightTable); final JobScheduler jobScheduler; - if (OperationInitializationThreadPool.canParallelize()) { - jobScheduler = new OperationInitializationPoolJobScheduler(); + if (ExecutionContext.getContext().getInitializer().canParallelize()) { + jobScheduler = new OperationInitializationPoolJobScheduler(ExecutionContext.getContext().getInitializer()); } else { jobScheduler = ImmediateJobScheduler.INSTANCE; } final ExecutionContext executionContext = ExecutionContext.newBuilder() - .captureUpdateGraph() .markSystemic().build(); return new Result<>(staticRangeJoin(jobScheduler, executionContext)); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/UnionSourceManager.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/UnionSourceManager.java index 9392a00bec3..546dbcf19ac 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/UnionSourceManager.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/UnionSourceManager.java @@ -104,7 +104,6 @@ public UnionSourceManager(@NotNull final PartitionedTable partitionedTable) { executionContext = ExecutionContext.newBuilder() .markSystemic() - .captureUpdateGraph() .build(); } else { listenerRecorders = null; diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java index 245d312793f..f6f0ca98558 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/UpdateBy.java @@ -300,13 +300,13 @@ class PhasedUpdateProcessor implements LogOutputAppendable { dirtyWindowOperators[winIdx].set(0, windows[winIdx].operators.length); } // Create the proper JobScheduler for the following parallel tasks - if (OperationInitializationThreadPool.canParallelize()) { - jobScheduler = new OperationInitializationPoolJobScheduler(); + if (ExecutionContext.getContext().getInitializer().canParallelize()) { + jobScheduler = + new OperationInitializationPoolJobScheduler(ExecutionContext.getContext().getInitializer()); } else { jobScheduler = ImmediateJobScheduler.INSTANCE; } executionContext = ExecutionContext.newBuilder() - .captureUpdateGraph() .markSystemic().build(); } else { // Determine which windows need to be computed. diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/OperationInitializationPoolJobScheduler.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/OperationInitializationPoolJobScheduler.java index 2722d61fd35..f3a9a45fc30 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/OperationInitializationPoolJobScheduler.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/OperationInitializationPoolJobScheduler.java @@ -4,6 +4,7 @@ import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.table.impl.OperationInitializationThreadPool; import io.deephaven.engine.table.impl.perf.BasePerformanceEntry; +import io.deephaven.engine.updategraph.OperationInitializer; import io.deephaven.io.log.impl.LogOutputStringImpl; import io.deephaven.util.SafeCloseable; import io.deephaven.util.process.ProcessEnvironment; @@ -11,7 +12,12 @@ import java.util.function.Consumer; public class OperationInitializationPoolJobScheduler implements JobScheduler { - final BasePerformanceEntry accumulatedBaseEntry = new BasePerformanceEntry(); + private final BasePerformanceEntry accumulatedBaseEntry = new BasePerformanceEntry(); + private final OperationInitializer threadPool; + + public OperationInitializationPoolJobScheduler(OperationInitializer threadPool) { + this.threadPool = threadPool; + } @Override public void submit( @@ -19,7 +25,7 @@ public void submit( final Runnable runnable, final LogOutputAppendable description, final Consumer onError) { - OperationInitializationThreadPool.executorService().submit(() -> { + threadPool.submit(() -> { final BasePerformanceEntry basePerformanceEntry = new BasePerformanceEntry(); basePerformanceEntry.onBaseEntryStart(); try (final SafeCloseable ignored = executionContext == null ? null : executionContext.open()) { diff --git a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java index a86225c9522..acc4ea026ee 100644 --- a/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java +++ b/engine/table/src/main/java/io/deephaven/engine/updategraph/impl/PeriodicUpdateGraph.java @@ -98,6 +98,8 @@ public static Builder newBuilder(final String name) { "PeriodicUpdateGraph.targetCycleDurationMillis"; private final long defaultTargetCycleDurationMillis; private volatile long targetCycleDurationMillis; + private final ThreadInitializationFactory threadInitializationFactory; + /** * The number of threads in our executor service for dispatching notifications. If 1, then we don't actually use the @@ -115,11 +117,13 @@ public PeriodicUpdateGraph( final boolean allowUnitTestMode, final long targetCycleDurationMillis, final long minimumCycleDurationToLogNanos, - final int numUpdateThreads) { + final int numUpdateThreads, + final ThreadInitializationFactory threadInitializationFactory) { super(name, allowUnitTestMode, log, minimumCycleDurationToLogNanos); this.allowUnitTestMode = allowUnitTestMode; this.defaultTargetCycleDurationMillis = targetCycleDurationMillis; this.targetCycleDurationMillis = targetCycleDurationMillis; + this.threadInitializationFactory = threadInitializationFactory; if (numUpdateThreads <= 0) { this.updateThreads = Runtime.getRuntime().availableProcessors(); @@ -127,8 +131,9 @@ public PeriodicUpdateGraph( this.updateThreads = numUpdateThreads; } - refreshThread = new Thread(ThreadInitializationFactory.wrapRunnable(() -> { - configureRefreshThread(); + OperationInitializer captured = ExecutionContext.getContext().getInitializer(); + refreshThread = new Thread(threadInitializationFactory.createInitializer(() -> { + configureRefreshThread(captured); while (running) { Assert.eqFalse(this.allowUnitTestMode, "allowUnitTestMode"); refreshTablesAndFlushNotifications(); @@ -140,7 +145,7 @@ public PeriodicUpdateGraph( @Override public Thread newThread(@NotNull final Runnable r) { // Not a refresh thread, but should still be instrumented for debugging purposes. - return super.newThread(ThreadInitializationFactory.wrapRunnable(r)); + return super.newThread(threadInitializationFactory.createInitializer(r)); } }); } @@ -1092,8 +1097,9 @@ private NotificationProcessorThreadFactory(@NotNull final ThreadGroup threadGrou @Override public Thread newThread(@NotNull final Runnable r) { - return super.newThread(ThreadInitializationFactory.wrapRunnable(() -> { - configureRefreshThread(); + OperationInitializer captured = ExecutionContext.getContext().getInitializer(); + return super.newThread(threadInitializationFactory.createInitializer(() -> { + configureRefreshThread(captured); r.run(); })); } @@ -1112,8 +1118,9 @@ private UnitTestThreadFactory() { @Override public Thread newThread(@NotNull final Runnable r) { + OperationInitializer captured = ExecutionContext.getContext().getInitializer(); return super.newThread(() -> { - configureUnitTestRefreshThread(); + configureUnitTestRefreshThread(captured); r.run(); }); } @@ -1122,19 +1129,19 @@ public Thread newThread(@NotNull final Runnable r) { /** * Configure the primary UpdateGraph thread or one of the auxiliary notification processing threads. */ - private void configureRefreshThread() { + private void configureRefreshThread(OperationInitializer captured) { SystemicObjectTracker.markThreadSystemic(); MultiChunkPool.enableDedicatedPoolForThisThread(); isUpdateThread.set(true); - // Install this UpdateGraph via ExecutionContext for refresh threads + // Install this UpdateGraph via ExecutionContext for refresh threads, share the same operation initializer // noinspection resource - ExecutionContext.newBuilder().setUpdateGraph(this).build().open(); + ExecutionContext.newBuilder().setUpdateGraph(this).setOperationInitializer(captured).build().open(); } /** * Configure threads to be used for unit test processing. */ - private void configureUnitTestRefreshThread() { + private void configureUnitTestRefreshThread(OperationInitializer captured) { final Thread currentThread = Thread.currentThread(); final Thread.UncaughtExceptionHandler existing = currentThread.getUncaughtExceptionHandler(); currentThread.setUncaughtExceptionHandler((final Thread errorThread, final Throwable throwable) -> { @@ -1142,9 +1149,9 @@ private void configureUnitTestRefreshThread() { existing.uncaughtException(errorThread, throwable); }); isUpdateThread.set(true); - // Install this UpdateGraph via ExecutionContext for refresh threads + // Install this UpdateGraph and share operation initializer pool via ExecutionContext for refresh threads // noinspection resource - ExecutionContext.newBuilder().setUpdateGraph(this).build().open(); + ExecutionContext.newBuilder().setUpdateGraph(this).setOperationInitializer(captured).build().open(); } public static PeriodicUpdateGraph getInstance(final String name) { @@ -1160,6 +1167,7 @@ public static final class Builder { private String name; private int numUpdateThreads = -1; + private ThreadInitializationFactory threadInitializationFactory = runnable -> runnable; public Builder(String name) { this.name = name; @@ -1202,6 +1210,17 @@ public Builder numUpdateThreads(int numUpdateThreads) { return this; } + /** + * Sets a functional interface that adds custom initialization for threads started by this UpdateGraph. + * + * @param threadInitializationFactory the function to invoke on any runnables that will be used to start threads + * @return this builder + */ + public Builder threadInitializationFactory(ThreadInitializationFactory threadInitializationFactory) { + this.threadInitializationFactory = threadInitializationFactory; + return this; + } + /** * Constructs and returns a PeriodicUpdateGraph. It is an error to do so an instance already exists with the * name provided to this builder. @@ -1230,7 +1249,8 @@ private PeriodicUpdateGraph construct() { allowUnitTestMode, targetCycleDurationMillis, minimumCycleDurationToLogNanos, - numUpdateThreads); + numUpdateThreads, + threadInitializationFactory); } } } diff --git a/engine/table/src/main/java/io/deephaven/engine/util/AbstractScriptSession.java b/engine/table/src/main/java/io/deephaven/engine/util/AbstractScriptSession.java index 4466c6a7bb1..9bd1679b939 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/AbstractScriptSession.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/AbstractScriptSession.java @@ -18,10 +18,12 @@ import io.deephaven.engine.context.QueryScope; import io.deephaven.engine.context.QueryScopeParam; import io.deephaven.engine.table.hierarchical.HierarchicalTable; +import io.deephaven.engine.table.impl.OperationInitializationThreadPool; import io.deephaven.engine.updategraph.UpdateGraph; import io.deephaven.plugin.type.ObjectType; import io.deephaven.plugin.type.ObjectTypeLookup; import io.deephaven.util.SafeCloseable; +import io.deephaven.util.thread.ThreadInitializationFactory; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -70,6 +72,7 @@ private static void createOrClearDirectory(final File directory) { protected AbstractScriptSession( UpdateGraph updateGraph, + final ThreadInitializationFactory threadInitializationFactory, ObjectTypeLookup objectTypeLookup, @Nullable Listener changeListener) { this.objectTypeLookup = objectTypeLookup; @@ -90,6 +93,7 @@ protected AbstractScriptSession( .setQueryScope(queryScope) .setQueryCompiler(compilerContext) .setUpdateGraph(updateGraph) + .setOperationInitializer(new OperationInitializationThreadPool(threadInitializationFactory)) .build(); } diff --git a/engine/table/src/main/java/io/deephaven/engine/util/GroovyDeephavenSession.java b/engine/table/src/main/java/io/deephaven/engine/util/GroovyDeephavenSession.java index 6f644571dfc..c071ce0a9c7 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/GroovyDeephavenSession.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/GroovyDeephavenSession.java @@ -41,6 +41,7 @@ import io.deephaven.time.DateTimeUtils; import io.deephaven.util.QueryConstants; import io.deephaven.util.annotations.VisibleForTesting; +import io.deephaven.util.thread.ThreadInitializationFactory; import io.deephaven.util.type.ArrayTypeUtils; import io.deephaven.util.type.TypeUtils; import io.github.classgraph.ClassGraph; @@ -145,18 +146,20 @@ private String getNextScriptClassName() { public GroovyDeephavenSession( final UpdateGraph updateGraph, + final ThreadInitializationFactory threadInitializationFactory, final ObjectTypeLookup objectTypeLookup, final RunScripts runScripts) throws IOException { - this(updateGraph, objectTypeLookup, null, runScripts); + this(updateGraph, threadInitializationFactory, objectTypeLookup, null, runScripts); } public GroovyDeephavenSession( final UpdateGraph updateGraph, + final ThreadInitializationFactory threadInitializationFactory, ObjectTypeLookup objectTypeLookup, @Nullable final Listener changeListener, final RunScripts runScripts) throws IOException { - super(updateGraph, objectTypeLookup, changeListener); + super(updateGraph, threadInitializationFactory, objectTypeLookup, changeListener); addDefaultImports(consoleImports); if (INCLUDE_DEFAULT_IMPORTS_IN_LOADED_GROOVY) { diff --git a/engine/table/src/main/java/io/deephaven/engine/util/NoLanguageDeephavenSession.java b/engine/table/src/main/java/io/deephaven/engine/util/NoLanguageDeephavenSession.java index 1140aec2a1a..7c7a28c838e 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/NoLanguageDeephavenSession.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/NoLanguageDeephavenSession.java @@ -5,6 +5,7 @@ import io.deephaven.engine.context.QueryScope; import io.deephaven.engine.updategraph.UpdateGraph; +import io.deephaven.util.thread.ThreadInitializationFactory; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -24,12 +25,14 @@ public class NoLanguageDeephavenSession extends AbstractScriptSession variables; - public NoLanguageDeephavenSession(final UpdateGraph updateGraph) { - this(updateGraph, SCRIPT_TYPE); + public NoLanguageDeephavenSession(final UpdateGraph updateGraph, + final ThreadInitializationFactory threadInitializationFactory) { + this(updateGraph, threadInitializationFactory, SCRIPT_TYPE); } - public NoLanguageDeephavenSession(final UpdateGraph updateGraph, final String scriptType) { - super(updateGraph, null, null); + public NoLanguageDeephavenSession(final UpdateGraph updateGraph, + final ThreadInitializationFactory threadInitializationFactory, final String scriptType) { + super(updateGraph, threadInitializationFactory, null, null); this.scriptType = scriptType; variables = new LinkedHashMap<>(); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/FuzzerTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/FuzzerTest.java index 5f321b33854..b179795b59d 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/FuzzerTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/FuzzerTest.java @@ -20,9 +20,9 @@ import io.deephaven.time.DateTimeUtils; import io.deephaven.engine.util.TableTools; import io.deephaven.engine.util.GroovyDeephavenSession; -import io.deephaven.engine.util.GroovyDeephavenSession.RunScripts; import io.deephaven.test.types.SerialTest; import io.deephaven.util.SafeCloseable; +import io.deephaven.util.thread.ThreadInitializationFactory; import org.jetbrains.annotations.Nullable; import org.junit.Assume; import org.junit.Rule; @@ -75,7 +75,10 @@ private GroovyDeephavenSession getGroovySession() throws IOException { private GroovyDeephavenSession getGroovySession(@Nullable Clock clock) throws IOException { final GroovyDeephavenSession session = new GroovyDeephavenSession( - ExecutionContext.getContext().getUpdateGraph(), NoOp.INSTANCE, RunScripts.serviceLoader()); + ExecutionContext.getContext().getUpdateGraph(), + ThreadInitializationFactory.NO_OP, + NoOp.INSTANCE, + GroovyDeephavenSession.RunScripts.serviceLoader()); session.getExecutionContext().open(); return session; } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java index 22c45cca47f..61c2a97f102 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/PartitionedTableTest.java @@ -555,7 +555,6 @@ public void testCrossDependencies() { .captureQueryScopeVars("pauseHelper2") .captureQueryLibrary() .captureQueryCompiler() - .captureUpdateGraph() .build(); final PartitionedTable result2 = sourceTable2.update("SlowItDown=pauseHelper.pauseValue(k)").partitionBy("USym2").transform( @@ -646,7 +645,6 @@ public void testCrossDependencies2() { .captureQueryScopeVars("pauseHelper") .captureQueryLibrary() .captureQueryCompiler() - .captureUpdateGraph() .build(); final PartitionedTable result2 = sourceTable2.partitionBy("USym2").transform(executionContext, t -> t.withAttributes(Map.of(BaseTable.TEST_SOURCE_TABLE_ATTRIBUTE, "true")) @@ -935,7 +933,6 @@ protected Table e() { .newQueryScope() .captureQueryCompiler() .captureQueryLibrary() - .captureUpdateGraph() .build().open()) { ExecutionContext.getContext().getQueryScope().putParam("queryScopeVar", "queryScopeValue"); @@ -996,7 +993,6 @@ public void testTransformDependencyCorrectness() { final ExecutionContext executionContext = ExecutionContext.newBuilder() .emptyQueryScope() .newQueryLibrary() - .captureUpdateGraph() .captureQueryCompiler() .build(); final PartitionedTable transformed = partitioned.transform(executionContext, tableIn -> { diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java index 1a3fb62bf8b..58ef8ac5996 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableTest.java @@ -3006,7 +3006,13 @@ public void testMemoize() { } public void testMemoizeConcurrent() { - final ExecutorService dualPool = Executors.newFixedThreadPool(2); + final ExecutorService dualPool = Executors.newFixedThreadPool(2, new ThreadFactory() { + @Override + public Thread newThread(Runnable runnable) { + ExecutionContext captured = ExecutionContext.getContext(); + return new Thread(() -> captured.apply(runnable)); + } + }); final boolean old = QueryTable.setMemoizeResults(true); try { diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java index 5d6621d0eb0..4d511bb2a80 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/QueryTableWhereTest.java @@ -833,7 +833,7 @@ public void testInterFilterInterruption() { // we want to make sure we can push something through the thread pool and are not hogging it final CountDownLatch latch = new CountDownLatch(1); - OperationInitializationThreadPool.executorService().submit(latch::countDown); + ExecutionContext.getContext().getInitializer().submit(latch::countDown); waitForLatch(latch); assertEquals(0, fastCounter.invokes.get()); diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestConditionFilterGeneration.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestConditionFilterGeneration.java index 5c208d88db7..bc2f176fa87 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestConditionFilterGeneration.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestConditionFilterGeneration.java @@ -30,7 +30,6 @@ public void setUp() { .newQueryLibrary("DEFAULT") .captureQueryCompiler() .captureQueryScope() - .captureUpdateGraph() .build().open(); } diff --git a/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestFormulaColumnGeneration.java b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestFormulaColumnGeneration.java index 1265f4ae1a1..bf3621f4f81 100644 --- a/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestFormulaColumnGeneration.java +++ b/engine/table/src/test/java/io/deephaven/engine/table/impl/select/TestFormulaColumnGeneration.java @@ -49,7 +49,6 @@ public void setUp() { .newQueryLibrary("DEFAULT") .captureQueryCompiler() .captureQueryScope() - .captureUpdateGraph() .build().open(); } diff --git a/engine/table/src/test/java/io/deephaven/engine/updategraph/impl/TestEventDrivenUpdateGraph.java b/engine/table/src/test/java/io/deephaven/engine/updategraph/impl/TestEventDrivenUpdateGraph.java index 7ad90534b13..ed398ca15c3 100644 --- a/engine/table/src/test/java/io/deephaven/engine/updategraph/impl/TestEventDrivenUpdateGraph.java +++ b/engine/table/src/test/java/io/deephaven/engine/updategraph/impl/TestEventDrivenUpdateGraph.java @@ -23,6 +23,7 @@ import java.nio.file.Path; import java.util.Collections; +import static io.deephaven.engine.context.TestExecutionContext.OPERATION_INITIALIZATION; import static io.deephaven.engine.util.TableTools.*; import static org.junit.Assert.assertEquals; @@ -105,8 +106,13 @@ private QueryCompiler compilerForUnitTests() { public void testSimpleAdd() { final EventDrivenUpdateGraph eventDrivenUpdateGraph = EventDrivenUpdateGraph.newBuilder("TestEDUG").build(); - final ExecutionContext context = ExecutionContext.newBuilder().setUpdateGraph(eventDrivenUpdateGraph) - .emptyQueryScope().newQueryLibrary().setQueryCompiler(compilerForUnitTests()).build(); + final ExecutionContext context = ExecutionContext.newBuilder() + .setUpdateGraph(eventDrivenUpdateGraph) + .emptyQueryScope() + .newQueryLibrary() + .setOperationInitializer(OPERATION_INITIALIZATION) + .setQueryCompiler(compilerForUnitTests()) + .build(); try (final SafeCloseable ignored = context.open()) { final SourceThatRefreshes sourceThatRefreshes = new SourceThatRefreshes(eventDrivenUpdateGraph); final Table updated = @@ -125,8 +131,13 @@ public void testSimpleAdd() { public void testSimpleModify() { final EventDrivenUpdateGraph eventDrivenUpdateGraph = new EventDrivenUpdateGraph.Builder("TestEDUG").build(); - final ExecutionContext context = ExecutionContext.newBuilder().setUpdateGraph(eventDrivenUpdateGraph) - .emptyQueryScope().newQueryLibrary().setQueryCompiler(compilerForUnitTests()).build(); + final ExecutionContext context = ExecutionContext.newBuilder() + .setUpdateGraph(eventDrivenUpdateGraph) + .emptyQueryScope() + .newQueryLibrary() + .setOperationInitializer(OPERATION_INITIALIZATION) + .setQueryCompiler(compilerForUnitTests()) + .build(); try (final SafeCloseable ignored = context.open()) { final SourceThatModifiesItself modifySource = new SourceThatModifiesItself(eventDrivenUpdateGraph); final Table updated = @@ -182,8 +193,13 @@ public void testUpdatePerformanceTracker() { defaultUpdateGraph.requestRefresh(); final Table inRange; - final ExecutionContext context = ExecutionContext.newBuilder().setUpdateGraph(defaultUpdateGraph) - .emptyQueryScope().newQueryLibrary().setQueryCompiler(compilerForUnitTests()).build(); + final ExecutionContext context = ExecutionContext.newBuilder() + .setUpdateGraph(defaultUpdateGraph) + .emptyQueryScope() + .newQueryLibrary() + .setQueryCompiler(compilerForUnitTests()) + .setOperationInitializer(OPERATION_INITIALIZATION) + .build(); try (final SafeCloseable ignored = context.open()) { final Table uptAgged = upt.where("!isNull(EntryId)").aggBy( Aggregation.AggSum("UsageNanos", "InvocationCount", "RowsModified"), @@ -223,8 +239,13 @@ static public T sleepValue(long duration, T retVal) { private Object doWork(final EventDrivenUpdateGraph eventDrivenUpdateGraph, final int durationMillis, final int steps) { - final ExecutionContext context = ExecutionContext.newBuilder().setUpdateGraph(eventDrivenUpdateGraph) - .emptyQueryScope().newQueryLibrary().setQueryCompiler(compilerForUnitTests()).build(); + final ExecutionContext context = ExecutionContext.newBuilder() + .setUpdateGraph(eventDrivenUpdateGraph) + .emptyQueryScope() + .newQueryLibrary() + .setQueryCompiler(compilerForUnitTests()) + .setOperationInitializer(OPERATION_INITIALIZATION) + .build(); try (final SafeCloseable ignored = context.open()) { final SourceThatModifiesItself modifySource = new SourceThatModifiesItself(eventDrivenUpdateGraph); final Table updated = diff --git a/engine/table/src/test/java/io/deephaven/engine/util/scripts/TestGroovyDeephavenSession.java b/engine/table/src/test/java/io/deephaven/engine/util/scripts/TestGroovyDeephavenSession.java index 572d71da8c9..e1074176a6e 100644 --- a/engine/table/src/test/java/io/deephaven/engine/util/scripts/TestGroovyDeephavenSession.java +++ b/engine/table/src/test/java/io/deephaven/engine/util/scripts/TestGroovyDeephavenSession.java @@ -16,6 +16,7 @@ import io.deephaven.function.Sort; import io.deephaven.plugin.type.ObjectTypeLookup.NoOp; import io.deephaven.util.SafeCloseable; +import io.deephaven.util.thread.ThreadInitializationFactory; import org.apache.commons.lang3.mutable.MutableInt; import org.junit.After; import org.junit.Assert; @@ -48,7 +49,7 @@ public void setup() throws IOException { livenessScope = new LivenessScope(); LivenessScopeStack.push(livenessScope); session = new GroovyDeephavenSession( - ExecutionContext.getContext().getUpdateGraph(), NoOp.INSTANCE, null, + ExecutionContext.getContext().getUpdateGraph(), ThreadInitializationFactory.NO_OP, NoOp.INSTANCE, null, GroovyDeephavenSession.RunScripts.none()); executionContext = session.getExecutionContext().open(); } diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java b/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java index 2c64edea940..a0ee07d8112 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/context/TestExecutionContext.java @@ -1,9 +1,15 @@ package io.deephaven.engine.context; import io.deephaven.auth.AuthContext; +import io.deephaven.engine.table.impl.OperationInitializationThreadPool; import io.deephaven.engine.testutil.ControlledUpdateGraph; +import io.deephaven.util.thread.ThreadInitializationFactory; public class TestExecutionContext { + public static final ControlledUpdateGraph UPDATE_GRAPH = new ControlledUpdateGraph(); + + public static final OperationInitializationThreadPool OPERATION_INITIALIZATION = + new OperationInitializationThreadPool(ThreadInitializationFactory.NO_OP); public static ExecutionContext createForUnitTests() { return new ExecutionContext.Builder(new AuthContext.SuperUser()) @@ -11,7 +17,8 @@ public static ExecutionContext createForUnitTests() { .newQueryScope() .newQueryLibrary() .setQueryCompiler(QueryCompiler.createForUnitTests()) - .setUpdateGraph(ControlledUpdateGraph.INSTANCE) + .setUpdateGraph(UPDATE_GRAPH) + .setOperationInitializer(OPERATION_INITIALIZATION) .build(); } } diff --git a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java index a24c1778486..0ca0d815015 100644 --- a/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java +++ b/engine/test-utils/src/main/java/io/deephaven/engine/testutil/ControlledUpdateGraph.java @@ -1,13 +1,11 @@ package io.deephaven.engine.testutil; import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph; +import io.deephaven.util.thread.ThreadInitializationFactory; // TODO (deephaven-core#3886): Extract test functionality from PeriodicUpdateGraph public class ControlledUpdateGraph extends PeriodicUpdateGraph { - - public static final ControlledUpdateGraph INSTANCE = new ControlledUpdateGraph(); - - private ControlledUpdateGraph() { - super("TEST", true, 1000, 25, -1); + public ControlledUpdateGraph() { + super("TEST", true, 1000, 25, -1, ThreadInitializationFactory.NO_OP); } } diff --git a/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java new file mode 100644 index 00000000000..d5d4337e292 --- /dev/null +++ b/engine/updategraph/src/main/java/io/deephaven/engine/updategraph/OperationInitializer.java @@ -0,0 +1,42 @@ +package io.deephaven.engine.updategraph; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; + +/** + * Provides guidance for initialization operations on how they can parallelize. + */ +public interface OperationInitializer { + OperationInitializer NON_PARALLELIZABLE = new OperationInitializer() { + @Override + public boolean canParallelize() { + return false; + } + + @Override + public Future submit(Runnable runnable) { + runnable.run(); + return CompletableFuture.completedFuture(null); + } + + @Override + public int parallelismFactor() { + return 1; + } + }; + + /** + * Whether the current thread can parallelize operations using this OperationInitialization. + */ + boolean canParallelize(); + + /** + * Submits a task to run in this thread pool. + */ + Future submit(Runnable runnable); + + /** + * Number of threads that are potentially available. + */ + int parallelismFactor(); +} diff --git a/props/configs/src/main/resources/dh-defaults.prop b/props/configs/src/main/resources/dh-defaults.prop index 99ff56f26cb..be2dcba5eb9 100644 --- a/props/configs/src/main/resources/dh-defaults.prop +++ b/props/configs/src/main/resources/dh-defaults.prop @@ -60,7 +60,3 @@ client.configuration.list=java.version,deephaven.version,barrage.version,http.se # jar, and a class that is found in that jar. Any such keys will be made available to the client.configuration.list # as .version. client.version.list=deephaven=io.deephaven.engine.table.Table,barrage=io.deephaven.barrage.flatbuf.BarrageMessageWrapper - - -# Specifies additional setup to run on threads that can perform table operations with user code. Comma-separated list, instances must be of type io.deephaven.util.thread.ThreadInitializationFactory -thread.initialization=io.deephaven.server.console.python.DebuggingInitializer diff --git a/props/test-configs/src/main/resources/dh-tests.prop b/props/test-configs/src/main/resources/dh-tests.prop index 64dfddae10a..f7d2503aa35 100644 --- a/props/test-configs/src/main/resources/dh-tests.prop +++ b/props/test-configs/src/main/resources/dh-tests.prop @@ -102,4 +102,3 @@ client.version.list= authentication.anonymous.warn=false deephaven.console.type=none -thread.initialization= diff --git a/py/server/test_helper/__init__.py b/py/server/test_helper/__init__.py index 4cc93ba1eaa..1dc3eaaca55 100644 --- a/py/server/test_helper/__init__.py +++ b/py/server/test_helper/__init__.py @@ -71,8 +71,9 @@ def start_jvm_for_tests(jvm_props: Dict[str, str] = None): global py_dh_session _JPeriodicUpdateGraph = jpy.get_type("io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph") _j_test_update_graph = _JPeriodicUpdateGraph.newBuilder(_JPeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME).existingOrBuild() + no_op_operation_initializer = jpy.get_type("io.deephaven.util.thread.ThreadInitializationFactory").NO_OP _JPythonScriptSession = jpy.get_type("io.deephaven.integrations.python.PythonDeephavenSession") - py_dh_session = _JPythonScriptSession(_j_test_update_graph, py_scope_jpy) + py_dh_session = _JPythonScriptSession(_j_test_update_graph, no_op_operation_initializer, py_scope_jpy) def _expand_wildcards_in_list(elements): diff --git a/python-engine-test/src/test/java/io/deephaven/engine/table/impl/select/TestConditionFilter.java b/python-engine-test/src/test/java/io/deephaven/engine/table/impl/select/TestConditionFilter.java index a9b683a8630..9c3c1f38ab9 100644 --- a/python-engine-test/src/test/java/io/deephaven/engine/table/impl/select/TestConditionFilter.java +++ b/python-engine-test/src/test/java/io/deephaven/engine/table/impl/select/TestConditionFilter.java @@ -21,6 +21,7 @@ import io.deephaven.engine.util.PythonScopeJpyImpl; import io.deephaven.engine.table.ColumnSource; import io.deephaven.jpy.PythonTest; +import io.deephaven.util.thread.ThreadInitializationFactory; import org.apache.commons.lang3.exception.ExceptionUtils; import org.jpy.PyInputMode; import org.jpy.PyModule; @@ -376,6 +377,7 @@ private void check(String expression, Predicate> testPredica if (pythonScope == null) { final ExecutionContext context = new PythonDeephavenSession( ExecutionContext.getDefaultContext().getUpdateGraph(), + ThreadInitializationFactory.NO_OP, new PythonScopeJpyImpl(getMainGlobals().asDict())).getExecutionContext(); pythonScope = context.getQueryScope(); context.open(); diff --git a/server/src/main/java/io/deephaven/server/console/NoConsoleSessionModule.java b/server/src/main/java/io/deephaven/server/console/NoConsoleSessionModule.java index eef5255c7e4..a460bc6f1f7 100644 --- a/server/src/main/java/io/deephaven/server/console/NoConsoleSessionModule.java +++ b/server/src/main/java/io/deephaven/server/console/NoConsoleSessionModule.java @@ -12,6 +12,7 @@ import io.deephaven.engine.util.NoLanguageDeephavenSession; import io.deephaven.engine.util.ScriptSession; import io.deephaven.server.console.groovy.InitScriptsModule; +import io.deephaven.util.thread.ThreadInitializationFactory; import javax.inject.Named; @@ -26,7 +27,8 @@ ScriptSession bindScriptSession(NoLanguageDeephavenSession noLanguageSession) { @Provides NoLanguageDeephavenSession bindNoLanguageSession( - @Named(PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME) final UpdateGraph updateGraph) { - return new NoLanguageDeephavenSession(updateGraph); + @Named(PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME) final UpdateGraph updateGraph, + ThreadInitializationFactory threadInitializationFactory) { + return new NoLanguageDeephavenSession(updateGraph, threadInitializationFactory); } } diff --git a/server/src/main/java/io/deephaven/server/console/SessionToExecutionStateModule.java b/server/src/main/java/io/deephaven/server/console/SessionToExecutionStateModule.java deleted file mode 100644 index 633febd1a1e..00000000000 --- a/server/src/main/java/io/deephaven/server/console/SessionToExecutionStateModule.java +++ /dev/null @@ -1,19 +0,0 @@ -package io.deephaven.server.console; - -import dagger.Module; -import dagger.Provides; -import io.deephaven.engine.context.ExecutionContext; -import io.deephaven.engine.util.ScriptSession; -import io.deephaven.server.auth.AuthorizationProvider; - -/** - * Deprecated: use {@link ExecutionContextModule} instead. - */ -@Deprecated(since = "0.26.0", forRemoval = true) -@Module -public interface SessionToExecutionStateModule { - @Provides - static ExecutionContext bindExecutionContext(ScriptSession session, AuthorizationProvider authProvider) { - return ExecutionContextModule.bindExecutionContext(session, authProvider); - } -} diff --git a/server/src/main/java/io/deephaven/server/console/groovy/GroovyConsoleSessionModule.java b/server/src/main/java/io/deephaven/server/console/groovy/GroovyConsoleSessionModule.java index eeeead6567f..c5e8158568d 100644 --- a/server/src/main/java/io/deephaven/server/console/groovy/GroovyConsoleSessionModule.java +++ b/server/src/main/java/io/deephaven/server/console/groovy/GroovyConsoleSessionModule.java @@ -13,6 +13,7 @@ import io.deephaven.engine.util.GroovyDeephavenSession.RunScripts; import io.deephaven.engine.util.ScriptSession; import io.deephaven.plugin.type.ObjectTypeLookup; +import io.deephaven.util.thread.ThreadInitializationFactory; import javax.inject.Named; import java.io.IOException; @@ -30,11 +31,12 @@ ScriptSession bindScriptSession(final GroovyDeephavenSession groovySession) { @Provides GroovyDeephavenSession bindGroovySession( @Named(PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME) final UpdateGraph updateGraph, + ThreadInitializationFactory threadInitializationFactory, final ObjectTypeLookup lookup, final ScriptSession.Listener listener, final RunScripts runScripts) { try { - return new GroovyDeephavenSession(updateGraph, lookup, listener, runScripts); + return new GroovyDeephavenSession(updateGraph, threadInitializationFactory, lookup, listener, runScripts); } catch (final IOException e) { throw new UncheckedIOException(e); } diff --git a/server/src/main/java/io/deephaven/server/console/python/PythonConsoleSessionModule.java b/server/src/main/java/io/deephaven/server/console/python/PythonConsoleSessionModule.java index 78999fb4c0c..d5f54618462 100644 --- a/server/src/main/java/io/deephaven/server/console/python/PythonConsoleSessionModule.java +++ b/server/src/main/java/io/deephaven/server/console/python/PythonConsoleSessionModule.java @@ -13,6 +13,7 @@ import io.deephaven.engine.util.ScriptSession; import io.deephaven.integrations.python.PythonDeephavenSession; import io.deephaven.plugin.type.ObjectTypeLookup; +import io.deephaven.util.thread.ThreadInitializationFactory; import javax.inject.Named; import java.io.IOException; @@ -30,11 +31,13 @@ ScriptSession bindScriptSession(PythonDeephavenSession pythonSession) { @Provides PythonDeephavenSession bindPythonSession( @Named(PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME) final UpdateGraph updateGraph, + final ThreadInitializationFactory threadInitializationFactory, final ObjectTypeLookup lookup, final ScriptSession.Listener listener, final PythonEvaluatorJpy pythonEvaluator) { try { - return new PythonDeephavenSession(updateGraph, lookup, listener, true, pythonEvaluator); + return new PythonDeephavenSession(updateGraph, threadInitializationFactory, lookup, listener, true, + pythonEvaluator); } catch (IOException e) { throw new UncheckedIOException("Unable to run python startup scripts", e); } diff --git a/server/src/main/java/io/deephaven/server/console/python/PythonDebuggingModule.java b/server/src/main/java/io/deephaven/server/console/python/PythonDebuggingModule.java new file mode 100644 index 00000000000..7f4d70ab530 --- /dev/null +++ b/server/src/main/java/io/deephaven/server/console/python/PythonDebuggingModule.java @@ -0,0 +1,16 @@ +// +// Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending +// +package io.deephaven.server.console.python; + +import dagger.Binds; +import dagger.Module; +import dagger.multibindings.IntoSet; +import io.deephaven.util.thread.ThreadInitializationFactory; + +@Module +public interface PythonDebuggingModule { + @Binds + @IntoSet + ThreadInitializationFactory bindDebuggingInitializer(DebuggingInitializer debuggingInitializer); +} diff --git a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java index a13a29da41e..8ccc367a6b7 100644 --- a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java +++ b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java @@ -144,9 +144,6 @@ public DeephavenApiServer run() throws IOException, ClassNotFoundException, Time // noinspection resource executionContextProvider.get().open(); - log.info().append("Starting Operation Initialization Thread Pool...").endl(); - OperationInitializationThreadPool.start(); - log.info().append("Starting Update Graph...").endl(); getUpdateGraph().cast().start(); diff --git a/server/src/main/java/io/deephaven/server/runner/scheduler/SchedulerModule.java b/server/src/main/java/io/deephaven/server/runner/scheduler/SchedulerModule.java index c6710570b38..26f4b0b32c5 100644 --- a/server/src/main/java/io/deephaven/server/runner/scheduler/SchedulerModule.java +++ b/server/src/main/java/io/deephaven/server/runner/scheduler/SchedulerModule.java @@ -2,6 +2,7 @@ import dagger.Module; import dagger.Provides; +import dagger.multibindings.ElementsIntoSet; import io.deephaven.base.clock.Clock; import io.deephaven.chunk.util.pools.MultiChunkPool; import io.deephaven.engine.context.ExecutionContext; @@ -16,6 +17,8 @@ import javax.inject.Named; import javax.inject.Singleton; +import java.util.Collections; +import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -31,13 +34,25 @@ */ @Module public class SchedulerModule { + @Provides + @ElementsIntoSet + static Set primeThreadInitializers() { + return Collections.emptySet(); + } + + @Provides + static ThreadInitializationFactory provideThreadInitializationFactory(Set factories) { + return ThreadInitializationFactory.of(factories); + } @Provides @Singleton public static Scheduler provideScheduler( final @Named(PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME) UpdateGraph updateGraph, - final @Named("scheduler.poolSize") int poolSize) { - final ThreadFactory concurrentThreadFactory = new ThreadFactory("Scheduler-Concurrent", updateGraph); + final @Named("scheduler.poolSize") int poolSize, + final ThreadInitializationFactory initializationFactory) { + final ThreadFactory concurrentThreadFactory = + new ThreadFactory("Scheduler-Concurrent", updateGraph, initializationFactory); final ScheduledExecutorService concurrentExecutor = new ScheduledThreadPoolExecutor(poolSize, concurrentThreadFactory) { @Override @@ -47,7 +62,8 @@ protected void afterExecute(final Runnable task, final Throwable error) { } }; - final ThreadFactory serialThreadFactory = new ThreadFactory("Scheduler-Serial", updateGraph); + final ThreadFactory serialThreadFactory = + new ThreadFactory("Scheduler-Serial", updateGraph, initializationFactory); final ExecutorService serialExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), serialThreadFactory) { @@ -63,15 +79,18 @@ protected void afterExecute(final Runnable task, final Throwable error) { private static class ThreadFactory extends NamingThreadFactory { private final UpdateGraph updateGraph; + private final ThreadInitializationFactory initializationFactory; - public ThreadFactory(final String name, final UpdateGraph updateGraph) { + public ThreadFactory(final String name, final UpdateGraph updateGraph, + ThreadInitializationFactory initializationFactory) { super(DeephavenApiServer.class, name); this.updateGraph = updateGraph; + this.initializationFactory = initializationFactory; } @Override public Thread newThread(@NotNull final Runnable r) { - return super.newThread(ThreadInitializationFactory.wrapRunnable(() -> { + return super.newThread(initializationFactory.createInitializer(() -> { MultiChunkPool.enableDedicatedPoolForThisThread(); // noinspection resource ExecutionContext.getContext().withUpdateGraph(updateGraph).open(); diff --git a/server/src/test/java/io/deephaven/server/appmode/ApplicationServiceGrpcImplTest.java b/server/src/test/java/io/deephaven/server/appmode/ApplicationServiceGrpcImplTest.java index 82a965462f0..b11c16fece8 100644 --- a/server/src/test/java/io/deephaven/server/appmode/ApplicationServiceGrpcImplTest.java +++ b/server/src/test/java/io/deephaven/server/appmode/ApplicationServiceGrpcImplTest.java @@ -17,6 +17,7 @@ import io.deephaven.server.session.SessionState; import io.deephaven.server.util.TestControlledScheduler; import io.deephaven.auth.AuthContext; +import io.deephaven.util.thread.ThreadInitializationFactory; import io.grpc.Context; import io.grpc.stub.StreamObserver; import org.junit.After; @@ -87,7 +88,7 @@ public void onListFieldsSubscribeFailedObserver() { // trigger a change ScriptSession scriptSession = new NoLanguageDeephavenSession( - ExecutionContext.getDefaultContext().getUpdateGraph()); + ExecutionContext.getDefaultContext().getUpdateGraph(), ThreadInitializationFactory.NO_OP); scriptSession.setVariable("key", "hello world"); ScriptSession.Changes changes = new ScriptSession.Changes(); changes.created.put("key", "Object"); diff --git a/server/src/test/java/io/deephaven/server/appmode/ApplicationTest.java b/server/src/test/java/io/deephaven/server/appmode/ApplicationTest.java index ac17a47a807..8214cd3e313 100644 --- a/server/src/test/java/io/deephaven/server/appmode/ApplicationTest.java +++ b/server/src/test/java/io/deephaven/server/appmode/ApplicationTest.java @@ -13,6 +13,7 @@ import io.deephaven.integrations.python.PythonDeephavenSession; import io.deephaven.engine.util.PythonEvaluatorJpy; import io.deephaven.plugin.type.ObjectTypeLookup.NoOp; +import io.deephaven.util.thread.ThreadInitializationFactory; import org.junit.After; import org.junit.Ignore; import org.junit.Rule; @@ -51,7 +52,9 @@ public void app00() { @Test public void app01() throws IOException { session = new GroovyDeephavenSession( - ExecutionContext.getContext().getUpdateGraph(), NoOp.INSTANCE, null, + ExecutionContext.getContext().getUpdateGraph(), + ThreadInitializationFactory.NO_OP, + NoOp.INSTANCE, null, GroovyDeephavenSession.RunScripts.none()); ApplicationState app = ApplicationFactory.create(ApplicationConfigs.testAppDir(), ApplicationConfigs.app01(), session, new NoopStateListener()); @@ -65,7 +68,9 @@ public void app01() throws IOException { @Ignore("TODO: deephaven-core#1741 python test needs to run in a container") public void app02() throws IOException, InterruptedException, TimeoutException { session = new PythonDeephavenSession( - ExecutionContext.getDefaultContext().getUpdateGraph(), NoOp.INSTANCE, null, false, + ExecutionContext.getDefaultContext().getUpdateGraph(), + ThreadInitializationFactory.NO_OP, + NoOp.INSTANCE, null, false, PythonEvaluatorJpy.withGlobalCopy()); ApplicationState app = ApplicationFactory.create(ApplicationConfigs.testAppDir(), ApplicationConfigs.app02(), session, new NoopStateListener()); diff --git a/server/test/src/main/java/io/deephaven/server/test/FlightMessageRoundTripTest.java b/server/test/src/main/java/io/deephaven/server/test/FlightMessageRoundTripTest.java index 2edbd62bb80..67845489b7d 100644 --- a/server/test/src/main/java/io/deephaven/server/test/FlightMessageRoundTripTest.java +++ b/server/test/src/main/java/io/deephaven/server/test/FlightMessageRoundTripTest.java @@ -55,6 +55,7 @@ import io.deephaven.server.util.Scheduler; import io.deephaven.util.SafeCloseable; import io.deephaven.auth.AuthContext; +import io.deephaven.util.thread.ThreadInitializationFactory; import io.grpc.*; import io.grpc.CallOptions; import io.grpc.stub.ClientCalls; @@ -110,7 +111,7 @@ TicketResolver ticketResolver(ScopeTicketResolver resolver) { @Singleton @Provides AbstractScriptSession provideAbstractScriptSession(final UpdateGraph updateGraph) { - return new NoLanguageDeephavenSession(updateGraph, "non-script-session"); + return new NoLanguageDeephavenSession(updateGraph, ThreadInitializationFactory.NO_OP, "non-script-session"); } @Provides diff --git a/sphinx/source/conf.py b/sphinx/source/conf.py index 61ae8ca5b95..27accbdb119 100644 --- a/sphinx/source/conf.py +++ b/sphinx/source/conf.py @@ -108,7 +108,8 @@ _JUpdateGraph = jpy.get_type("io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph") docs_update_graph = _JUpdateGraph.newBuilder("PYTHON_DOCS").build() _JPythonScriptSession = jpy.get_type("io.deephaven.integrations.python.PythonDeephavenSession") -py_dh_session = _JPythonScriptSession(docs_update_graph, py_scope_jpy) +no_op_operation_initializer = jpy.get_type("io.deephaven.util.thread.ThreadInitializationFactory").NO_OP +py_dh_session = _JPythonScriptSession(docs_update_graph, no_op_operation_initializer, py_scope_jpy) py_dh_session.getExecutionContext().open() pygments_style = 'sphinx'