Skip to content

Commit

Permalink
Make OperationInitializer non-static (#4919)
Browse files Browse the repository at this point in the history
This patch makes the OperationInitializer part of the
ExecutionContext, allowing consumers to specify a preferred instance.
ThreadInitializationFactory is also changed to be built in Dagger
instead of only by configuration. Combined, these changes make it
easier to reason about startup, requiring that dependencies are
declared explicitly rather than implicitly referenced and used.

It is likely that some of the work done here should be refactored
further into some JobScheduler factory or assisted injection, but
OperationInitializer is still referenced directly by
InitialFilterExecution, so this would require more refactoring.

Ideally, OperationInitializer.NON_PARALLELIZABLE would be used instead
of the OperationInitializerThreadPool's threadlocal, but some services
restore the user's own exec context - this is a shortcoming of the
patch, and possibly should be resolved before merging.

Downstream users will need to consider if they want to capture the
OperationInitializer when they create new exec contexts - generally
this will be desired.

Partial #4040
  • Loading branch information
niloc132 committed Dec 14, 2023
1 parent d1fef9c commit 2f5a054
Show file tree
Hide file tree
Showing 46 changed files with 390 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.ScriptApi;
import io.deephaven.util.annotations.VisibleForTesting;
import io.deephaven.util.thread.ThreadInitializationFactory;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jpy.KeyError;
Expand Down Expand Up @@ -76,11 +77,12 @@ public class PythonDeephavenSession extends AbstractScriptSession<PythonSnapshot
*/
public PythonDeephavenSession(
final UpdateGraph updateGraph,
final ThreadInitializationFactory threadInitializationFactory,
final ObjectTypeLookup objectTypeLookup,
@Nullable final Listener listener,
final boolean runInitScripts,
final PythonEvaluatorJpy pythonEvaluator) throws IOException {
super(updateGraph, objectTypeLookup, listener);
super(updateGraph, threadInitializationFactory, objectTypeLookup, listener);

evaluator = pythonEvaluator;
scope = pythonEvaluator.getScope();
Expand Down Expand Up @@ -108,9 +110,9 @@ public PythonDeephavenSession(
* Creates a Python "{@link ScriptSession}", for use where we should only be reading from the scope, such as an
* IPython kernel session.
*/
public PythonDeephavenSession(
final UpdateGraph updateGraph, final PythonScope<?> scope) {
super(updateGraph, NoOp.INSTANCE, null);
public PythonDeephavenSession(final UpdateGraph updateGraph,
final ThreadInitializationFactory threadInitializationFactory, final PythonScope<?> scope) {
super(updateGraph, threadInitializationFactory, NoOp.INSTANCE, null);

evaluator = null;
this.scope = (PythonScope<PyObject>) scope;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,55 +1,21 @@
package io.deephaven.util.thread;

import io.deephaven.configuration.Configuration;

import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.Collection;

/**
* Extension point to allow threads that will run user code from within the platform to be controlled by configuration.
*/
public interface ThreadInitializationFactory {
/* private */ String[] CONFIGURED_INITIALIZATION_TYPES =
Configuration.getInstance().getStringArrayFromProperty("thread.initialization");
/* private */ List<ThreadInitializationFactory> INITIALIZERS = Arrays.stream(CONFIGURED_INITIALIZATION_TYPES)
.filter(str -> !str.isBlank())
.map(type -> {
try {
// noinspection unchecked
Class<? extends ThreadInitializationFactory> clazz =
(Class<? extends ThreadInitializationFactory>) Class.forName(type);
return clazz.getDeclaredConstructor().newInstance();
} catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException
| InstantiationException | IllegalAccessException e) {

// TODO (https://github.com/deephaven/deephaven-core/issues/4040):
// Currently the default property file is shared between both the java client and the server. This
// means that client-side usage will attempt to load the thread.initialization property intended for
// the server which is not available on the class path.
if (e instanceof ClassNotFoundException && type.startsWith("io.deephaven.server.")) {
return null;
}

throw new IllegalArgumentException(
"Error instantiating initializer " + type + ", please check configuration", e);
}
})
.filter(Objects::nonNull)
.collect(Collectors.toUnmodifiableList());
ThreadInitializationFactory NO_OP = r -> r;

/**
* Chains configured initializers to run before/around any given runnable, returning a runnable intended to be run
* by a new thread.
*/
static Runnable wrapRunnable(Runnable runnable) {
Runnable acc = runnable;
for (ThreadInitializationFactory INITIALIZER : INITIALIZERS) {
acc = INITIALIZER.createInitializer(acc);
}
return acc;
static ThreadInitializationFactory of(Collection<ThreadInitializationFactory> factories) {
return runnable -> {
Runnable acc = runnable;
for (ThreadInitializationFactory factory : factories) {
acc = factory.createInitializer(acc);
}
return acc;
};
}

Runnable createInitializer(Runnable runnable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.deephaven.engine.context;

import io.deephaven.auth.AuthContext;
import io.deephaven.engine.updategraph.OperationInitializer;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.ScriptApi;
Expand All @@ -13,10 +14,26 @@
import java.util.Objects;
import java.util.function.Supplier;

/**
* Container for context-specific objects, that can be activated on a thread or passed to certain operations.
* ExecutionContexts are immutable, and support a builder pattern to create new instances and "with" methods to
* customize existing ones. Any thread that interacts with the Deephaven engine will need to have an active
* ExecutionContext.
*/
public class ExecutionContext {

/**
* Creates a new builder for an ExecutionContext, capturing the current thread's auth context, update graph, and
* operation initializer. Typically, this method should be called on a thread that already has an active
* ExecutionContext, to more easily reuse those.
*
* @return a new builder to create an ExecutionContext
*/
public static Builder newBuilder() {
return new Builder();
ExecutionContext existing = getContext();
return new Builder()
.setUpdateGraph(existing.getUpdateGraph())
.setOperationInitializer(existing.getInitializer());
}

public static ExecutionContext makeExecutionContext(boolean isSystemic) {
Expand Down Expand Up @@ -83,20 +100,23 @@ private static void setContext(final ExecutionContext context) {
private final QueryScope queryScope;
private final QueryCompiler queryCompiler;
private final UpdateGraph updateGraph;
private final OperationInitializer operationInitializer;

private ExecutionContext(
final boolean isSystemic,
final AuthContext authContext,
final QueryLibrary queryLibrary,
final QueryScope queryScope,
final QueryCompiler queryCompiler,
final UpdateGraph updateGraph) {
final UpdateGraph updateGraph,
OperationInitializer operationInitializer) {
this.isSystemic = isSystemic;
this.authContext = authContext;
this.queryLibrary = Objects.requireNonNull(queryLibrary);
this.queryScope = Objects.requireNonNull(queryScope);
this.queryCompiler = Objects.requireNonNull(queryCompiler);
this.updateGraph = updateGraph;
this.updateGraph = Objects.requireNonNull(updateGraph);
this.operationInitializer = Objects.requireNonNull(operationInitializer);
}

/**
Expand All @@ -110,7 +130,8 @@ public ExecutionContext withSystemic(boolean isSystemic) {
if (isSystemic == this.isSystemic) {
return this;
}
return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph);
return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph,
operationInitializer);
}

/**
Expand All @@ -124,7 +145,8 @@ public ExecutionContext withAuthContext(final AuthContext authContext) {
if (authContext == this.authContext) {
return this;
}
return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph);
return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph,
operationInitializer);
}

/**
Expand All @@ -138,7 +160,16 @@ public ExecutionContext withUpdateGraph(final UpdateGraph updateGraph) {
if (updateGraph == this.updateGraph) {
return this;
}
return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph);
return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph,
operationInitializer);
}

public ExecutionContext withOperationInitializer(final OperationInitializer operationInitializer) {
if (operationInitializer == this.operationInitializer) {
return this;
}
return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph,
operationInitializer);
}

/**
Expand Down Expand Up @@ -198,6 +229,10 @@ public UpdateGraph getUpdateGraph() {
return updateGraph;
}

public OperationInitializer getInitializer() {
return operationInitializer;
}

@SuppressWarnings("unused")
public static class Builder {
private boolean isSystemic = false;
Expand All @@ -208,6 +243,7 @@ public static class Builder {
private QueryScope queryScope = PoisonedQueryScope.INSTANCE;
private QueryCompiler queryCompiler = PoisonedQueryCompiler.INSTANCE;
private UpdateGraph updateGraph = PoisonedUpdateGraph.INSTANCE;
private OperationInitializer operationInitializer = PoisonedOperationInitializer.INSTANCE;

private Builder() {
// propagate the auth context from the current context
Expand Down Expand Up @@ -356,19 +392,32 @@ public Builder setUpdateGraph(UpdateGraph updateGraph) {

/**
* Use the current ExecutionContext's UpdateGraph instance.
*
* @deprecated The update graph is automatically captured, this method should no longer be needed.
*/
@ScriptApi
@Deprecated(forRemoval = true, since = "0.31")
public Builder captureUpdateGraph() {
this.updateGraph = getContext().getUpdateGraph();
return this;
}

/**
* Use the specified operation initializer instead of the captured instance.
*/
@ScriptApi
public Builder setOperationInitializer(OperationInitializer operationInitializer) {
this.operationInitializer = operationInitializer;
return this;
}

/**
* @return the newly instantiated ExecutionContext
*/
@ScriptApi
public ExecutionContext build() {
return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph);
return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph,
operationInitializer);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.deephaven.engine.context;

import io.deephaven.engine.updategraph.OperationInitializer;
import io.deephaven.util.ExecutionContextRegistrationException;

import java.util.concurrent.Future;

public class PoisonedOperationInitializer implements OperationInitializer {

public static final PoisonedOperationInitializer INSTANCE = new PoisonedOperationInitializer();

private <T> T fail() {
throw ExecutionContextRegistrationException.onFailedComponentAccess("OperationInitializer");
}

@Override
public boolean canParallelize() {
return fail();
}

@Override
public Future<?> submit(Runnable runnable) {
return fail();
}

@Override
public int parallelismFactor() {
return fail();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import io.deephaven.base.log.LogOutput;
import io.deephaven.engine.updategraph.LogicalClock;
import io.deephaven.engine.updategraph.LogicalClockImpl;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.io.log.LogEntry;
import io.deephaven.util.ExecutionContextRegistrationException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public class TestQueryCompiler {
@Before
public void setUp() throws IOException {
executionContextClosable = ExecutionContext.newBuilder()
.captureUpdateGraph()
.captureQueryLibrary()
.captureQueryScope()
.setQueryCompiler(QueryCompiler.create(folder.newFolder(), TestQueryCompiler.class.getClassLoader()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.deephaven.base.log.LogOutput;
import io.deephaven.base.verify.Assert;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.exceptions.CancellationException;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
Expand Down Expand Up @@ -325,10 +326,10 @@ abstract void enqueueSubFilters(
*/
abstract boolean doParallelization(long numberOfRows);

static boolean doParallelizationBase(long numberOfRows) {
boolean doParallelizationBase(long numberOfRows) {
return !QueryTable.DISABLE_PARALLEL_WHERE && numberOfRows != 0
&& (QueryTable.FORCE_PARALLEL_WHERE || numberOfRows / 2 > QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT)
&& OperationInitializationThreadPool.canParallelize();
&& ExecutionContext.getContext().getInitializer().canParallelize();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.deephaven.engine.table.impl;

import io.deephaven.base.verify.Assert;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.exceptions.CancellationException;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.table.ModifiedColumnSet;
Expand Down Expand Up @@ -85,7 +86,7 @@ void enqueueSubFilters(

private void enqueueJobs(Iterable<? extends NotificationQueue.Notification> subFilters) {
for (NotificationQueue.Notification notification : subFilters) {
OperationInitializationThreadPool.executorService().submit(() -> {
ExecutionContext.getContext().getInitializer().submit(() -> {
root.runningChildren.put(Thread.currentThread(), Thread.currentThread());
try {
if (!root.cancelled.get()) {
Expand Down
Loading

0 comments on commit 2f5a054

Please sign in to comment.