Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make OperationInitializer non-static #4919

Merged
merged 23 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.ScriptApi;
import io.deephaven.util.annotations.VisibleForTesting;
import io.deephaven.util.thread.ThreadInitializationFactory;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jpy.KeyError;
Expand Down Expand Up @@ -76,11 +77,12 @@ public class PythonDeephavenSession extends AbstractScriptSession<PythonSnapshot
*/
public PythonDeephavenSession(
final UpdateGraph updateGraph,
final ThreadInitializationFactory threadInitializationFactory,
final ObjectTypeLookup objectTypeLookup,
@Nullable final Listener listener,
final boolean runInitScripts,
final PythonEvaluatorJpy pythonEvaluator) throws IOException {
super(updateGraph, objectTypeLookup, listener);
super(updateGraph, threadInitializationFactory, objectTypeLookup, listener);

evaluator = pythonEvaluator;
scope = pythonEvaluator.getScope();
Expand Down Expand Up @@ -108,9 +110,9 @@ public PythonDeephavenSession(
* Creates a Python "{@link ScriptSession}", for use where we should only be reading from the scope, such as an
* IPython kernel session.
*/
public PythonDeephavenSession(
final UpdateGraph updateGraph, final PythonScope<?> scope) {
super(updateGraph, NoOp.INSTANCE, null);
public PythonDeephavenSession(final UpdateGraph updateGraph,
final ThreadInitializationFactory threadInitializationFactory, final PythonScope<?> scope) {
super(updateGraph, threadInitializationFactory, NoOp.INSTANCE, null);

evaluator = null;
this.scope = (PythonScope<PyObject>) scope;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,55 +1,21 @@
package io.deephaven.util.thread;

import io.deephaven.configuration.Configuration;

import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.Collection;

/**
* Extension point to allow threads that will run user code from within the platform to be controlled by configuration.
*/
public interface ThreadInitializationFactory {
/* private */ String[] CONFIGURED_INITIALIZATION_TYPES =
Configuration.getInstance().getStringArrayFromProperty("thread.initialization");
/* private */ List<ThreadInitializationFactory> INITIALIZERS = Arrays.stream(CONFIGURED_INITIALIZATION_TYPES)
.filter(str -> !str.isBlank())
.map(type -> {
try {
// noinspection unchecked
Class<? extends ThreadInitializationFactory> clazz =
(Class<? extends ThreadInitializationFactory>) Class.forName(type);
return clazz.getDeclaredConstructor().newInstance();
} catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException
| InstantiationException | IllegalAccessException e) {

// TODO (https://github.com/deephaven/deephaven-core/issues/4040):
// Currently the default property file is shared between both the java client and the server. This
// means that client-side usage will attempt to load the thread.initialization property intended for
// the server which is not available on the class path.
if (e instanceof ClassNotFoundException && type.startsWith("io.deephaven.server.")) {
return null;
}

throw new IllegalArgumentException(
"Error instantiating initializer " + type + ", please check configuration", e);
}
})
.filter(Objects::nonNull)
.collect(Collectors.toUnmodifiableList());
ThreadInitializationFactory NO_OP = r -> r;

/**
* Chains configured initializers to run before/around any given runnable, returning a runnable intended to be run
* by a new thread.
*/
static Runnable wrapRunnable(Runnable runnable) {
Runnable acc = runnable;
for (ThreadInitializationFactory INITIALIZER : INITIALIZERS) {
acc = INITIALIZER.createInitializer(acc);
}
return acc;
static ThreadInitializationFactory of(Collection<ThreadInitializationFactory> factories) {
return runnable -> {
Runnable acc = runnable;
for (ThreadInitializationFactory factory : factories) {
acc = factory.createInitializer(acc);
}
return acc;
};
}

Runnable createInitializer(Runnable runnable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.deephaven.engine.context;

import io.deephaven.auth.AuthContext;
import io.deephaven.engine.updategraph.OperationInitializer;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.ScriptApi;
Expand All @@ -13,10 +14,26 @@
import java.util.Objects;
import java.util.function.Supplier;

/**
* Container for context-specific objects, that can be activated on a thread or passed to certain operations.
* ExecutionContexts are immutable, and support a builder pattern to create new instances and "with" methods to
* customize existing ones. Any thread that interacts with the Deephaven engine will need to have an active
* ExecutionContext.
*/
public class ExecutionContext {

/**
* Creates a new builder for an ExecutionContext, capturing the current thread's auth context, update graph, and
* operation initializer. Typically, this method should be called on a thread that already has an active
* ExecutionContext, to more easily reuse those.
*
* @return a new builder to create an ExecutionContext
*/
public static Builder newBuilder() {
return new Builder();
ExecutionContext existing = getContext();
return new Builder()
.setUpdateGraph(existing.getUpdateGraph())
.setOperationInitializer(existing.getInitializer());
}

public static ExecutionContext makeExecutionContext(boolean isSystemic) {
Expand Down Expand Up @@ -83,20 +100,23 @@ private static void setContext(final ExecutionContext context) {
private final QueryScope queryScope;
private final QueryCompiler queryCompiler;
private final UpdateGraph updateGraph;
private final OperationInitializer operationInitializer;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to other reviewers:
I think this is a reasonable thing to add to the execution context, and that it's the right interface to add. I'd welcome consensus-building here.


private ExecutionContext(
final boolean isSystemic,
final AuthContext authContext,
final QueryLibrary queryLibrary,
final QueryScope queryScope,
final QueryCompiler queryCompiler,
final UpdateGraph updateGraph) {
final UpdateGraph updateGraph,
OperationInitializer operationInitializer) {
this.isSystemic = isSystemic;
this.authContext = authContext;
this.queryLibrary = Objects.requireNonNull(queryLibrary);
this.queryScope = Objects.requireNonNull(queryScope);
this.queryCompiler = Objects.requireNonNull(queryCompiler);
this.updateGraph = updateGraph;
this.updateGraph = Objects.requireNonNull(updateGraph);
this.operationInitializer = Objects.requireNonNull(operationInitializer);
}

/**
Expand All @@ -110,7 +130,8 @@ public ExecutionContext withSystemic(boolean isSystemic) {
if (isSystemic == this.isSystemic) {
return this;
}
return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph);
return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph,
operationInitializer);
}

/**
Expand All @@ -124,7 +145,8 @@ public ExecutionContext withAuthContext(final AuthContext authContext) {
if (authContext == this.authContext) {
return this;
}
return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph);
return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph,
operationInitializer);
}

/**
Expand All @@ -138,7 +160,16 @@ public ExecutionContext withUpdateGraph(final UpdateGraph updateGraph) {
if (updateGraph == this.updateGraph) {
return this;
}
return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph);
return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph,
operationInitializer);
}

public ExecutionContext withOperationInitializer(final OperationInitializer operationInitializer) {
if (operationInitializer == this.operationInitializer) {
return this;
}
return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph,
operationInitializer);
}

/**
Expand Down Expand Up @@ -198,6 +229,10 @@ public UpdateGraph getUpdateGraph() {
return updateGraph;
}

public OperationInitializer getInitializer() {
return operationInitializer;
}

@SuppressWarnings("unused")
public static class Builder {
private boolean isSystemic = false;
Expand All @@ -208,6 +243,7 @@ public static class Builder {
private QueryScope queryScope = PoisonedQueryScope.INSTANCE;
private QueryCompiler queryCompiler = PoisonedQueryCompiler.INSTANCE;
private UpdateGraph updateGraph = PoisonedUpdateGraph.INSTANCE;
private OperationInitializer operationInitializer = PoisonedOperationInitializer.INSTANCE;

private Builder() {
// propagate the auth context from the current context
Expand Down Expand Up @@ -356,19 +392,32 @@ public Builder setUpdateGraph(UpdateGraph updateGraph) {

/**
* Use the current ExecutionContext's UpdateGraph instance.
*
* @deprecated The update graph is automatically captured, this method should no longer be needed.
*/
@ScriptApi
@Deprecated(forRemoval = true, since = "0.31")
public Builder captureUpdateGraph() {
this.updateGraph = getContext().getUpdateGraph();
return this;
}

/**
* Use the specified operation initializer instead of the captured instance.
*/
@ScriptApi
public Builder setOperationInitializer(OperationInitializer operationInitializer) {
this.operationInitializer = operationInitializer;
return this;
}

/**
* @return the newly instantiated ExecutionContext
*/
@ScriptApi
public ExecutionContext build() {
return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph);
return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph,
operationInitializer);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.deephaven.engine.context;

import io.deephaven.engine.updategraph.OperationInitializer;
import io.deephaven.util.ExecutionContextRegistrationException;

import java.util.concurrent.Future;

public class PoisonedOperationInitializer implements OperationInitializer {
niloc132 marked this conversation as resolved.
Show resolved Hide resolved

public static final PoisonedOperationInitializer INSTANCE = new PoisonedOperationInitializer();

private <T> T fail() {
throw ExecutionContextRegistrationException.onFailedComponentAccess("OperationInitializer");
}

@Override
public boolean canParallelize() {
return fail();
}

@Override
public Future<?> submit(Runnable runnable) {
return fail();
}

@Override
public int parallelismFactor() {
return fail();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import io.deephaven.base.log.LogOutput;
import io.deephaven.engine.updategraph.LogicalClock;
import io.deephaven.engine.updategraph.LogicalClockImpl;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.io.log.LogEntry;
import io.deephaven.util.ExecutionContextRegistrationException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public class TestQueryCompiler {
@Before
public void setUp() throws IOException {
executionContextClosable = ExecutionContext.newBuilder()
.captureUpdateGraph()
.captureQueryLibrary()
.captureQueryScope()
.setQueryCompiler(QueryCompiler.create(folder.newFolder(), TestQueryCompiler.class.getClassLoader()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.deephaven.base.log.LogOutput;
import io.deephaven.base.verify.Assert;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.exceptions.CancellationException;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
Expand Down Expand Up @@ -325,10 +326,10 @@ abstract void enqueueSubFilters(
*/
abstract boolean doParallelization(long numberOfRows);

static boolean doParallelizationBase(long numberOfRows) {
boolean doParallelizationBase(long numberOfRows) {
return !QueryTable.DISABLE_PARALLEL_WHERE && numberOfRows != 0
&& (QueryTable.FORCE_PARALLEL_WHERE || numberOfRows / 2 > QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT)
&& OperationInitializationThreadPool.canParallelize();
&& ExecutionContext.getContext().getInitializer().canParallelize();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.deephaven.engine.table.impl;

import io.deephaven.base.verify.Assert;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.exceptions.CancellationException;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.table.ModifiedColumnSet;
Expand Down Expand Up @@ -85,7 +86,7 @@ void enqueueSubFilters(

private void enqueueJobs(Iterable<? extends NotificationQueue.Notification> subFilters) {
for (NotificationQueue.Notification notification : subFilters) {
OperationInitializationThreadPool.executorService().submit(() -> {
ExecutionContext.getContext().getInitializer().submit(() -> {
root.runningChildren.put(Thread.currentThread(), Thread.currentThread());
try {
if (!root.cancelled.get()) {
Expand Down
Loading
Loading