Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make OperationInitializer non-static #4919

Merged
merged 23 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.ScriptApi;
import io.deephaven.util.annotations.VisibleForTesting;
import io.deephaven.util.thread.ThreadInitializationFactory;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jpy.KeyError;
Expand Down Expand Up @@ -76,11 +77,12 @@ public class PythonDeephavenSession extends AbstractScriptSession<PythonSnapshot
*/
public PythonDeephavenSession(
final UpdateGraph updateGraph,
final ThreadInitializationFactory threadInitializationFactory,
final ObjectTypeLookup objectTypeLookup,
@Nullable final Listener listener,
final boolean runInitScripts,
final PythonEvaluatorJpy pythonEvaluator) throws IOException {
super(updateGraph, objectTypeLookup, listener);
super(updateGraph, threadInitializationFactory, objectTypeLookup, listener);

evaluator = pythonEvaluator;
scope = pythonEvaluator.getScope();
Expand Down Expand Up @@ -108,9 +110,9 @@ public PythonDeephavenSession(
* Creates a Python "{@link ScriptSession}", for use where we should only be reading from the scope, such as an
* IPython kernel session.
*/
public PythonDeephavenSession(
final UpdateGraph updateGraph, final PythonScope<?> scope) {
super(updateGraph, NoOp.INSTANCE, null);
public PythonDeephavenSession(final UpdateGraph updateGraph,
final ThreadInitializationFactory threadInitializationFactory, final PythonScope<?> scope) {
super(updateGraph, threadInitializationFactory, NoOp.INSTANCE, null);

evaluator = null;
this.scope = (PythonScope<PyObject>) scope;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,55 +1,21 @@
package io.deephaven.util.thread;

import io.deephaven.configuration.Configuration;

import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.Collection;

/**
* Extension point to allow threads that will run user code from within the platform to be controlled by configuration.
*/
public interface ThreadInitializationFactory {
/* private */ String[] CONFIGURED_INITIALIZATION_TYPES =
Configuration.getInstance().getStringArrayFromProperty("thread.initialization");
/* private */ List<ThreadInitializationFactory> INITIALIZERS = Arrays.stream(CONFIGURED_INITIALIZATION_TYPES)
.filter(str -> !str.isBlank())
.map(type -> {
try {
// noinspection unchecked
Class<? extends ThreadInitializationFactory> clazz =
(Class<? extends ThreadInitializationFactory>) Class.forName(type);
return clazz.getDeclaredConstructor().newInstance();
} catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException
| InstantiationException | IllegalAccessException e) {

// TODO (https://github.com/deephaven/deephaven-core/issues/4040):
// Currently the default property file is shared between both the java client and the server. This
// means that client-side usage will attempt to load the thread.initialization property intended for
// the server which is not available on the class path.
if (e instanceof ClassNotFoundException && type.startsWith("io.deephaven.server.")) {
return null;
}

throw new IllegalArgumentException(
"Error instantiating initializer " + type + ", please check configuration", e);
}
})
.filter(Objects::nonNull)
.collect(Collectors.toUnmodifiableList());
ThreadInitializationFactory NO_OP = r -> r;

/**
* Chains configured initializers to run before/around any given runnable, returning a runnable intended to be run
* by a new thread.
*/
static Runnable wrapRunnable(Runnable runnable) {
Runnable acc = runnable;
for (ThreadInitializationFactory INITIALIZER : INITIALIZERS) {
acc = INITIALIZER.createInitializer(acc);
}
return acc;
static ThreadInitializationFactory of(Collection<ThreadInitializationFactory> factories) {
return runnable -> {
Runnable acc = runnable;
for (ThreadInitializationFactory factory : factories) {
acc = factory.createInitializer(acc);
}
return acc;
};
}

Runnable createInitializer(Runnable runnable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.deephaven.engine.context;

import io.deephaven.auth.AuthContext;
import io.deephaven.engine.updategraph.OperationInitializer;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.ScriptApi;
Expand Down Expand Up @@ -83,20 +84,23 @@ private static void setContext(final ExecutionContext context) {
private final QueryScope queryScope;
private final QueryCompiler queryCompiler;
private final UpdateGraph updateGraph;
private final OperationInitializer operationInitializer;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to other reviewers:
I think this is a reasonable thing to add to the execution context, and that it's the right interface to add. I'd welcome consensus-building here.


private ExecutionContext(
final boolean isSystemic,
final AuthContext authContext,
final QueryLibrary queryLibrary,
final QueryScope queryScope,
final QueryCompiler queryCompiler,
final UpdateGraph updateGraph) {
final UpdateGraph updateGraph,
OperationInitializer operationInitializer) {
this.isSystemic = isSystemic;
this.authContext = authContext;
this.queryLibrary = Objects.requireNonNull(queryLibrary);
this.queryScope = Objects.requireNonNull(queryScope);
this.queryCompiler = Objects.requireNonNull(queryCompiler);
this.updateGraph = updateGraph;
this.updateGraph = Objects.requireNonNull(updateGraph);
this.operationInitializer = Objects.requireNonNull(operationInitializer);
}

/**
Expand All @@ -110,7 +114,8 @@ public ExecutionContext withSystemic(boolean isSystemic) {
if (isSystemic == this.isSystemic) {
return this;
}
return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph);
return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph,
operationInitializer);
}

/**
Expand All @@ -124,7 +129,8 @@ public ExecutionContext withAuthContext(final AuthContext authContext) {
if (authContext == this.authContext) {
return this;
}
return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph);
return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph,
operationInitializer);
}

/**
Expand All @@ -138,7 +144,16 @@ public ExecutionContext withUpdateGraph(final UpdateGraph updateGraph) {
if (updateGraph == this.updateGraph) {
return this;
}
return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph);
return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph,
operationInitializer);
}

public ExecutionContext withOperationInitializer(final OperationInitializer operationInitializer) {
if (operationInitializer == this.operationInitializer) {
return this;
}
return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph,
operationInitializer);
}

/**
Expand Down Expand Up @@ -198,6 +213,10 @@ public UpdateGraph getUpdateGraph() {
return updateGraph;
}

public OperationInitializer getInitializer() {
return operationInitializer;
}

@SuppressWarnings("unused")
public static class Builder {
private boolean isSystemic = false;
Expand All @@ -208,8 +227,10 @@ public static class Builder {
private QueryScope queryScope = PoisonedQueryScope.INSTANCE;
private QueryCompiler queryCompiler = PoisonedQueryCompiler.INSTANCE;
private UpdateGraph updateGraph = PoisonedUpdateGraph.INSTANCE;
private OperationInitializer operationInitializer = PoisonedOperationInitializer.INSTANCE;

private Builder() {
// why automatically propagate this, but not other things?
niloc132 marked this conversation as resolved.
Show resolved Hide resolved
// propagate the auth context from the current context
this(getContext().authContext);
}
Expand Down Expand Up @@ -363,12 +384,25 @@ public Builder captureUpdateGraph() {
return this;
}

@ScriptApi
public Builder setOperationInitializer(OperationInitializer operationInitializer) {
this.operationInitializer = operationInitializer;
return this;
}

@ScriptApi
public Builder captureOperationInitializer() {
niloc132 marked this conversation as resolved.
Show resolved Hide resolved
this.operationInitializer = getContext().getInitializer();
return this;
}

/**
* @return the newly instantiated ExecutionContext
*/
@ScriptApi
public ExecutionContext build() {
return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph);
return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph,
operationInitializer);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.deephaven.engine.context;

import io.deephaven.engine.updategraph.OperationInitializer;
import io.deephaven.util.ExecutionContextRegistrationException;

import java.util.concurrent.Future;

public class PoisonedOperationInitializer implements OperationInitializer {
niloc132 marked this conversation as resolved.
Show resolved Hide resolved

public static final PoisonedOperationInitializer INSTANCE = new PoisonedOperationInitializer();

private <T> T fail() {
throw ExecutionContextRegistrationException.onFailedComponentAccess("OperationInitializer");
}

@Override
public boolean canParallelize() {
return fail();
}

@Override
public Future<?> submit(Runnable runnable) {
return fail();
}

@Override
public int parallelismFactor() {
return fail();
}

@Override
public void start() {
fail();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import io.deephaven.base.log.LogOutput;
import io.deephaven.engine.updategraph.LogicalClock;
import io.deephaven.engine.updategraph.LogicalClockImpl;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.io.log.LogEntry;
import io.deephaven.util.ExecutionContextRegistrationException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.deephaven.base.log.LogOutput;
import io.deephaven.base.verify.Assert;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.exceptions.CancellationException;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
Expand Down Expand Up @@ -325,10 +326,10 @@ abstract void enqueueSubFilters(
*/
abstract boolean doParallelization(long numberOfRows);

static boolean doParallelizationBase(long numberOfRows) {
boolean doParallelizationBase(long numberOfRows) {
return !QueryTable.DISABLE_PARALLEL_WHERE && numberOfRows != 0
&& (QueryTable.FORCE_PARALLEL_WHERE || numberOfRows / 2 > QueryTable.PARALLEL_WHERE_ROWS_PER_SEGMENT)
&& OperationInitializationThreadPool.canParallelize();
&& ExecutionContext.getContext().getInitializer().canParallelize();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.deephaven.engine.table.impl;

import io.deephaven.base.verify.Assert;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.exceptions.CancellationException;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.table.ModifiedColumnSet;
Expand Down Expand Up @@ -85,7 +86,7 @@ void enqueueSubFilters(

private void enqueueJobs(Iterable<? extends NotificationQueue.Notification> subFilters) {
for (NotificationQueue.Notification notification : subFilters) {
OperationInitializationThreadPool.executorService().submit(() -> {
ExecutionContext.getContext().getInitializer().submit(() -> {
root.runningChildren.put(Thread.currentThread(), Thread.currentThread());
try {
if (!root.cancelled.get()) {
Expand Down
Loading
Loading