Skip to content

Commit

Permalink
Initial API for python server threads
Browse files Browse the repository at this point in the history
Provides access to server thread pools from python, enabling plugin or
dhui component authors to move work off-thread as appropriate, without
introducing their own threads.

Partial #4942
  • Loading branch information
niloc132 committed Dec 13, 2023
1 parent 5fab992 commit f033ea2
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -91,6 +94,7 @@ public PythonDeephavenSession(
}
scriptFinder = new ScriptFinder(DEFAULT_SCRIPT_PATH);

registerJavaExecutor();
publishInitial();
/*
* And now the user-defined initialization scripts, if any.
Expand Down Expand Up @@ -120,9 +124,19 @@ public PythonDeephavenSession(
}
scriptFinder = null;

registerJavaExecutor();
publishInitial();
}

private void registerJavaExecutor() {
// TODO (deephaven-core#4040) Temporary exec service until we have cleaner startup wiring
try (final PythonDeephavenThreadsModule module = PyModule.importModule("deephaven.threads").createProxy(PythonDeephavenThreadsModule.class)) {
ExecutorService executorService = Executors.newFixedThreadPool(1);
module._register_named_java_executor("serial", executorService::submit);
module._register_named_java_executor("concurrent", executorService::submit);
}
}

@Override
@VisibleForTesting
public QueryScope newQueryScope() {
Expand Down Expand Up @@ -325,4 +339,9 @@ interface PythonScriptSessionModule extends Closeable {

void close();
}

interface PythonDeephavenThreadsModule extends Closeable {
void close();
void _register_named_java_executor(String executorName, Consumer<Runnable> execute);
}
}
47 changes: 47 additions & 0 deletions py/server/deephaven/threads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
#
"""
Support for running operations on JVM server threads, so that they can be given work from python.
"""

from typing import Callable, Dict
from concurrent.futures import ThreadPoolExecutor
import jpy
from .jcompat import j_runnable


_executors: Dict[str, Callable[[Callable[[], None]], None]] = {}


def has_named_executor(executor_name: str) -> bool:
"""
Returns True if the named executor exists and can have tasks submitted to it.
"""
return hasattr(_executors, executor_name)


def submit_task(executor_name: str, task: Callable[[], None]) -> None:
"""
Submits a task to run on a named executor. If no such executor exists, raises KeyError.
"""
_executors[executor_name](task)


def _register_named_java_executor(executor_name: str, java_executor: jpy.JType):
"""
Provides a Java executor for user code to submit tasks to.
"""
_executors[executor_name] = lambda task: java_executor.accept(j_runnable(task))


# def _register_named_executor(executor_name: str, executor: Callable[[Callable[[], None]], None]) -> None:
# """
# Provides a Python executor for user code to submit tasks to. Only intended to be used until Java threads
# are available for this.
# """
# _executors[executor_name] = executor
#
#
# _register_named_executor('serial', ThreadPoolExecutor(1).submit)
# _register_named_executor('concurrent', ThreadPoolExecutor(4).submit)

0 comments on commit f033ea2

Please sign in to comment.