Skip to content

Commit

Permalink
Python API to access Java server threads (#4949)
Browse files Browse the repository at this point in the history
Provides access to server thread pools from python, enabling plugin or
dhui component authors to move work off-thread as appropriate, without
introducing their own threads. Later work will make use of the existing
server Scheduler.

Partial #4942
  • Loading branch information
niloc132 authored Dec 15, 2023
1 parent aadf4c8 commit ac70fd3
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.ScriptApi;
import io.deephaven.util.annotations.VisibleForTesting;
import io.deephaven.util.thread.NamingThreadFactory;
import io.deephaven.util.thread.ThreadInitializationFactory;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand All @@ -44,6 +45,9 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -93,6 +97,7 @@ public PythonDeephavenSession(
}
scriptFinder = new ScriptFinder(DEFAULT_SCRIPT_PATH);

registerJavaExecutor(threadInitializationFactory);
publishInitial();
/*
* And now the user-defined initialization scripts, if any.
Expand Down Expand Up @@ -122,9 +127,26 @@ public PythonDeephavenSession(final UpdateGraph updateGraph,
}
scriptFinder = null;

registerJavaExecutor(threadInitializationFactory);
publishInitial();
}

private void registerJavaExecutor(ThreadInitializationFactory threadInitializationFactory) {
// TODO (deephaven-core#4040) Temporary exec service until we have cleaner startup wiring
try (PyModule pyModule = PyModule.importModule("deephaven.server.executors");
final PythonDeephavenThreadsModule module = pyModule.createProxy(PythonDeephavenThreadsModule.class)) {
NamingThreadFactory threadFactory = new NamingThreadFactory(PythonDeephavenSession.class, "serverThread") {
@Override
public Thread newThread(@NotNull Runnable r) {
return super.newThread(threadInitializationFactory.createInitializer(r));
}
};
ExecutorService executorService = Executors.newFixedThreadPool(1, threadFactory);
module._register_named_java_executor("serial", executorService::submit);
module._register_named_java_executor("concurrent", executorService::submit);
}
}

@Override
@VisibleForTesting
public QueryScope newQueryScope() {
Expand Down Expand Up @@ -327,4 +349,10 @@ interface PythonScriptSessionModule extends Closeable {

void close();
}

interface PythonDeephavenThreadsModule extends Closeable {
void close();

void _register_named_java_executor(String executorName, Consumer<Runnable> execute);
}
}
3 changes: 0 additions & 3 deletions py/server/deephaven/server/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
#
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
#

# Packages under the deephaven.server heading are not meant to be called externally - it exists as a convenient place
# for the server to execute implementation logic via python
63 changes: 63 additions & 0 deletions py/server/deephaven/server/executors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#
# Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
#
"""
Support for running operations on JVM server threads, so that they can be given work from python. Initially, there
are two executors, "serial" and "concurrent". Any task that will take an exclusive UGP lock should use the serial
executor, otherwise the concurrent executor should be used. In the future there may be a "fast" executor, for use
when there is no chance of using either lock.
"""

from typing import Callable, Dict, List
import jpy
from deephaven.jcompat import j_runnable
from deephaven import DHError


_executors: Dict[str, Callable[[Callable[[], None]], None]] = {}


def has_executor(executor_name: str) -> bool:
"""
Returns True if an executor exists with that name.
"""
return executor_name in executor_names()


def executor_names() -> List[str]:
"""
Returns: the List of known executor names
"""
return list(_executors.keys())


def submit_task(executor_name: str, task: Callable[[], None]) -> None:
"""
Submits a task to run on a named executor. If no such executor exists, raises KeyError.
Typically, tasks should not block on other threads. Ensure tasks never block on other tasks submitted to the same executor.
Args:
executor_name (str): the name of the executor to submit the task to
task (Callable[[], None]): the function to run on the named executor
Raises:
KeyError if the executor name
"""
_executors[executor_name](task)


def _register_named_java_executor(executor_name: str, java_executor: jpy.JType) -> None:
"""
Provides a Java executor for user code to submit tasks to. Called during server startup.
Args:
executor_name (str): the name of the executor to register
java_executor (jpy.JType): a Java Consumer<Runnable> instance
Raises:
DHError
"""
if executor_name in executor_names():
raise DHError(f"Executor with name {executor_name} already registered")
_executors[executor_name] = lambda task: java_executor.accept(j_runnable(task))

0 comments on commit ac70fd3

Please sign in to comment.