From 48fa48b3b0aa94901efaf3077366532fea5ef553 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Wed, 13 Dec 2023 11:53:02 -0600 Subject: [PATCH] in-progress pre-review feedback --- .../python/PythonDeephavenSession.java | 3 +- py/server/deephaven/server/__init__.py | 3 - py/server/deephaven/server/executors.py | 56 +++++++++++++++++++ py/server/deephaven/threads.py | 47 ---------------- 4 files changed, 58 insertions(+), 51 deletions(-) create mode 100644 py/server/deephaven/server/executors.py delete mode 100644 py/server/deephaven/threads.py diff --git a/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java b/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java index c2ec1d88abd..817a4721aa3 100644 --- a/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java +++ b/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java @@ -130,7 +130,8 @@ public PythonDeephavenSession( private void registerJavaExecutor() { // TODO (deephaven-core#4040) Temporary exec service until we have cleaner startup wiring - try (final PythonDeephavenThreadsModule module = PyModule.importModule("deephaven.threads").createProxy(PythonDeephavenThreadsModule.class)) { + try (PyModule pyModule = PyModule.importModule("deephaven.server.executors"); + final PythonDeephavenThreadsModule module = pyModule.createProxy(PythonDeephavenThreadsModule.class)) { ExecutorService executorService = Executors.newFixedThreadPool(1); module._register_named_java_executor("serial", executorService::submit); module._register_named_java_executor("concurrent", executorService::submit); diff --git a/py/server/deephaven/server/__init__.py b/py/server/deephaven/server/__init__.py index b5af5621f70..cb8603f5850 100644 --- a/py/server/deephaven/server/__init__.py +++ b/py/server/deephaven/server/__init__.py @@ -1,6 +1,3 @@ # # Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending # - -# Packages under the deephaven.server heading are not meant to be called externally - it exists as a convenient place -# for the server to execute implementation logic via python diff --git a/py/server/deephaven/server/executors.py b/py/server/deephaven/server/executors.py new file mode 100644 index 00000000000..6826d7b9f60 --- /dev/null +++ b/py/server/deephaven/server/executors.py @@ -0,0 +1,56 @@ +# +# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending +# +""" +Support for running operations on JVM server threads, so that they can be given work from python. Initially, there +are two executors, "serial" and "concurrent". Any task that will take an exclusive UGP lock should use the serial +executor, otherwise the concurrent executor should be used. In the future there may be a "fast" executor, for use +when there is no chance of using either lock. +""" + +from typing import Callable, Dict, List +import jpy +from deephaven.jcompat import j_runnable + + +_executors: Dict[str, Callable[[Callable[[], None]], None]] = {} + + +def has_named_executor(executor_name: str) -> bool: + """ + Returns True if the named executor exists and can have tasks submitted to it. + """ + return executor_name in executor_names() + + +def executor_names() -> List[str]: + """ + Returns: the List of known executor names + """ + return list(_executors.keys()) + + +def submit_task(executor_name: str, task: Callable[[], None]) -> None: + """ + Submits a task to run on a named executor. If no such executor exists, raises KeyError. + + Typically, tasks should not block on other threads. Ensure tasks never block on other tasks submitted to the same executor. + + Args: + executor_name: the name of the executor to submit the task to + task: the function to run on the named executor + + Raises: KeyError if the executor name + """ + _executors[executor_name](task) + + +def _register_named_java_executor(executor_name: str, java_executor: jpy.JType) -> None: + """ + Provides a Java executor for user code to submit tasks to. + + Args: + executor_name: the name of the executor to register + java_executor: a Java Consumer instance + """ + _executors[executor_name] = lambda task: java_executor.accept(j_runnable(task)) diff --git a/py/server/deephaven/threads.py b/py/server/deephaven/threads.py deleted file mode 100644 index 020a85b7154..00000000000 --- a/py/server/deephaven/threads.py +++ /dev/null @@ -1,47 +0,0 @@ -# -# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending -# -""" -Support for running operations on JVM server threads, so that they can be given work from python. -""" - -from typing import Callable, Dict -from concurrent.futures import ThreadPoolExecutor -import jpy -from .jcompat import j_runnable - - -_executors: Dict[str, Callable[[Callable[[], None]], None]] = {} - - -def has_named_executor(executor_name: str) -> bool: - """ - Returns True if the named executor exists and can have tasks submitted to it. - """ - return hasattr(_executors, executor_name) - - -def submit_task(executor_name: str, task: Callable[[], None]) -> None: - """ - Submits a task to run on a named executor. If no such executor exists, raises KeyError. - """ - _executors[executor_name](task) - - -def _register_named_java_executor(executor_name: str, java_executor: jpy.JType): - """ - Provides a Java executor for user code to submit tasks to. - """ - _executors[executor_name] = lambda task: java_executor.accept(j_runnable(task)) - - -# def _register_named_executor(executor_name: str, executor: Callable[[Callable[[], None]], None]) -> None: -# """ -# Provides a Python executor for user code to submit tasks to. Only intended to be used until Java threads -# are available for this. -# """ -# _executors[executor_name] = executor -# -# -# _register_named_executor('serial', ThreadPoolExecutor(1).submit) -# _register_named_executor('concurrent', ThreadPoolExecutor(4).submit)