diff --git a/Integrations/src/main/java/io/deephaven/integrations/python/PythonMergedListenerAdapter.java b/Integrations/src/main/java/io/deephaven/integrations/python/PythonMergedListenerAdapter.java new file mode 100644 index 00000000000..784aa5e5c1c --- /dev/null +++ b/Integrations/src/main/java/io/deephaven/integrations/python/PythonMergedListenerAdapter.java @@ -0,0 +1,74 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.integrations.python; + +import io.deephaven.engine.context.ExecutionContext; +import io.deephaven.engine.table.impl.ListenerRecorder; +import io.deephaven.engine.table.impl.MergedListener; +import io.deephaven.engine.updategraph.NotificationQueue; +import io.deephaven.engine.updategraph.UpdateGraph; +import io.deephaven.util.SafeCloseable; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.jpy.PyObject; + +import java.util.Arrays; +import java.util.Objects; +import java.util.stream.Stream; + +/** + * A Deephaven merged listener which fires when any of its bound listener recorders has updates and all of its + * dependencies have been satisfied. The listener then invokes the Python listener object. + * + * The Python listener object must be a Python MergedListener instance that provides a "process" method implementation + * with no argument. + */ +public class PythonMergedListenerAdapter extends MergedListener { + private final PyObject pyCallable; + + /** + * Create a Python merged listener. + * + * @param recorders The listener recorders to which this listener will subscribe. + * @param dependencies The tables that must be satisfied before this listener is executed. + * @param listenerDescription A description for the UpdatePerformanceTracker to append to its entry description, may + * be null. + * @param pyObjectIn Python listener object. + */ + private PythonMergedListenerAdapter( + @NotNull ListenerRecorder[] recorders, + @Nullable NotificationQueue.Dependency[] dependencies, + @Nullable String listenerDescription, + @NotNull PyObject pyObjectIn) { + super(Arrays.asList(recorders), Arrays.asList(dependencies), listenerDescription, null); + Arrays.stream(recorders).forEach(rec -> rec.setMergedListener(this)); + this.pyCallable = PythonUtils.pyMergeListenerFunc(pyObjectIn); + } + + public static PythonMergedListenerAdapter create( + @NotNull ListenerRecorder[] recorders, + @Nullable NotificationQueue.Dependency[] dependencies, + @Nullable String listenerDescription, + @NotNull PyObject pyObjectIn) { + if (recorders.length < 2) { + throw new IllegalArgumentException("At least two listener recorders must be provided"); + } + + final NotificationQueue.Dependency[] allItems = + Stream.concat(Arrays.stream(recorders), Arrays.stream(dependencies)) + .filter(Objects::nonNull) + .toArray(NotificationQueue.Dependency[]::new); + + final UpdateGraph updateGraph = allItems[0].getUpdateGraph(allItems); + + try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) { + return new PythonMergedListenerAdapter(recorders, dependencies, listenerDescription, pyObjectIn); + } + } + + @Override + protected void process() { + pyCallable.call("__call__"); + } +} diff --git a/Integrations/src/main/java/io/deephaven/integrations/python/PythonUtils.java b/Integrations/src/main/java/io/deephaven/integrations/python/PythonUtils.java index 5d3d861fa21..d2f93fef8be 100644 --- a/Integrations/src/main/java/io/deephaven/integrations/python/PythonUtils.java +++ b/Integrations/src/main/java/io/deephaven/integrations/python/PythonUtils.java @@ -23,6 +23,9 @@ static PyObject pyListenerFunc(final PyObject pyObject) { return pyCallable(pyObject, "on_update"); } + static PyObject pyMergeListenerFunc(final PyObject pyObject) { + return pyCallable(pyObject, "process"); + } /** * Creates a callable PyObject, either using method.apply() or __call__(), if the pyObjectIn has such methods diff --git a/py/server/deephaven/table_listener.py b/py/server/deephaven/table_listener.py index c7db8af827e..329a9eb5c4f 100644 --- a/py/server/deephaven/table_listener.py +++ b/py/server/deephaven/table_listener.py @@ -6,7 +6,7 @@ from abc import ABC, abstractmethod from functools import wraps from inspect import signature -from typing import Callable, Union, List, Generator, Dict, Literal, Sequence +from typing import Callable, Union, List, Generator, Dict, Literal, Sequence, Optional import jpy import numpy @@ -16,11 +16,14 @@ from deephaven._wrapper import JObjectWrapper from deephaven.jcompat import to_sequence from deephaven.table import Table -from deephaven._table_reader import _table_reader_chunk_dict, _table_reader_all_dict +from deephaven._table_reader import _table_reader_all_dict, _table_reader_chunk_dict from deephaven.update_graph import UpdateGraph _JPythonReplayListenerAdapter = jpy.get_type("io.deephaven.integrations.python.PythonReplayListenerAdapter") _JTableUpdate = jpy.get_type("io.deephaven.engine.table.TableUpdate") +_JTableUpdateDataReader = jpy.get_type("io.deephaven.integrations.python.PythonListenerTableUpdateDataReader") +_JListenerRecorder = jpy.get_type("io.deephaven.engine.table.impl.ListenerRecorder") +_JPythonMergedListenerAdapter = jpy.get_type("io.deephaven.integrations.python.PythonMergedListenerAdapter") class TableUpdate(JObjectWrapper): """A TableUpdate object represents a table update event. It contains the added, removed, and modified rows in the @@ -30,6 +33,8 @@ class TableUpdate(JObjectWrapper): def __init__(self, table: Table, j_table_update: jpy.JType): self.table = table self.j_table_update = j_table_update + # make sure we always use the _JTableUpdate interface and not the implementations + self.j_table_update = jpy.cast(j_table_update, _JTableUpdate) @property def j_object(self) -> jpy.JType: @@ -45,7 +50,7 @@ def added(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]: Returns: a dict """ - if not self.j_table_update.added: + if not self.j_table_update.added(): return {} return _table_reader_all_dict(table=self.table, cols=cols, row_set= self.j_table_update.added.asRowSet(), @@ -63,7 +68,7 @@ def added_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) -> G Returns: a generator """ - if not self.j_table_update.added: + if not self.j_table_update.added(): return (_ for _ in ()) return _table_reader_chunk_dict(table=self.table, cols=cols, row_set=self.j_table_update.added.asRowSet(), @@ -79,7 +84,7 @@ def removed(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray Returns: a dict """ - if not self.j_table_update.removed: + if not self.j_table_update.removed(): return {} return _table_reader_all_dict(table=self.table, cols=cols, row_set=self.j_table_update.removed.asRowSet(), @@ -97,7 +102,7 @@ def removed_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) -> Returns: a generator """ - if not self.j_table_update.removed: + if not self.j_table_update.removed(): return (_ for _ in ()) return _table_reader_chunk_dict(table=self.table, cols=cols, row_set=self.j_table_update.removed.asRowSet(), @@ -113,7 +118,7 @@ def modified(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarra Returns: a dict """ - if not self.j_table_update.modified: + if not self.j_table_update.modified(): return {} return _table_reader_all_dict(table=self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(), @@ -131,7 +136,7 @@ def modified_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) - Returns: a generator """ - if not self.j_table_update.modified: + if not self.j_table_update.modified(): return (_ for _ in ()) return _table_reader_chunk_dict(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(), @@ -147,7 +152,7 @@ def modified_prev(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.n Returns: a dict """ - if not self.j_table_update.modified: + if not self.j_table_update.modified(): return {} return _table_reader_all_dict(table=self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(), @@ -165,7 +170,7 @@ def modified_prev_chunks(self, chunk_size: int, cols: Union[str, List[str]] = No Returns: a generator """ - if not self.j_table_update.modified: + if not self.j_table_update.modified(): return (_ for _ in ()) return _table_reader_chunk_dict(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(), @@ -178,7 +183,7 @@ def shifted(self): @property def modified_columns(self) -> List[str]: """The list of modified columns in this update.""" - cols = self.j_table_update.modifiedColumnSet.dirtyColumnNames() + cols = self.j_table_update.modifiedColumnSet().dirtyColumnNames() return list(cols) if cols else [] @@ -228,7 +233,7 @@ def _listener_wrapper(table: Table): table (Table): the table to listen for updates. """ - def decorator(listener: Callable): + def decorator(listener: Callable[[TableUpdate, bool], None]): @wraps(listener) def wrapper(update, *args): t_update = TableUpdate(table=table, j_table_update=update) @@ -239,7 +244,7 @@ def wrapper(update, *args): return decorator -def _wrap_listener_func(t: Table, listener: Callable): +def _wrap_listener_func(t: Table, listener: Callable[[TableUpdate, bool], None]): n_params = len(signature(listener).parameters) if n_params != 2: raise ValueError("listener function must have 2 (update, is_replay) parameters.") @@ -258,7 +263,7 @@ class TableListenerHandle(JObjectWrapper): """A handle to manage a table listener's lifecycle.""" j_object_type = _JPythonReplayListenerAdapter - def __init__(self, t: Table, listener: Union[Callable, TableListener], description: str = None, + def __init__(self, t: Table, listener: Union[Callable[[TableUpdate, bool], None], TableListener], description: str = None, dependencies: Union[Table, Sequence[Table]] = None): """Creates a new table listener handle with dependencies. @@ -276,7 +281,7 @@ def __init__(self, t: Table, listener: Union[Callable, TableListener], descripti Args: t (Table): table to listen to - listener (Union[Callable, TableListener]): listener for table changes + listener (Union[Callable[[TableUpdate, bool], None], TableListener]): listener for table changes description (str, optional): description for the UpdatePerformanceTracker to append to the listener's entry description, default is None dependencies (Union[Table, Sequence[Table]]): tables that must be satisfied before the listener's execution. @@ -298,10 +303,10 @@ def __init__(self, t: Table, listener: Union[Callable, TableListener], descripti self.description = description self.dependencies = to_sequence(dependencies) - if callable(listener): - self.listener_wrapped = _wrap_listener_func(t, listener) - elif isinstance(listener, TableListener): + if isinstance(listener, TableListener): self.listener_wrapped = _wrap_listener_obj(t, listener) + elif callable(listener): + self.listener_wrapped = _wrap_listener_func(t, listener) else: raise DHError(message="listener is neither callable nor TableListener object") @@ -353,7 +358,7 @@ def stop(self) -> None: self.started = False -def listen(t: Table, listener: Union[Callable, TableListener], description: str = None, do_replay: bool = False, +def listen(t: Table, listener: Union[Callable[[TableUpdate, bool], None], TableListener], description: str = None, do_replay: bool = False, replay_lock: Literal["shared", "exclusive"] = "shared", dependencies: Union[Table, Sequence[Table]] = None)\ -> TableListenerHandle: """This is a convenience function that creates a TableListenerHandle object and immediately starts it to listen @@ -366,7 +371,7 @@ def listen(t: Table, listener: Union[Callable, TableListener], description: str Args: t (Table): table to listen to - listener (Union[Callable, TableListener]): listener for table changes + listener (Union[Callable[[TableUpdate, bool], None], TableListener]): listener for table changes description (str, optional): description for the UpdatePerformanceTracker to append to the listener's entry description, default is None do_replay (bool): whether to replay the initial snapshot of the table, default is False @@ -393,3 +398,159 @@ def listen(t: Table, listener: Union[Callable, TableListener], description: str description=description) table_listener_handle.start(do_replay=do_replay, replay_lock=replay_lock) return table_listener_handle + + +class _ListenerRecorder (JObjectWrapper): + """A ListenerRecorder object records the table updates and notifies the associated MergedListener that a change + has occurred.""" + + j_object_type = _JListenerRecorder + + @property + def j_object(self) -> jpy.JType: + return self.j_listener_recorder + + def __init__(self, table: Table): + if not table.is_refreshing: + raise DHError(message="table must be a refreshing table.") + + self.j_listener_recorder = _JListenerRecorder("Python Wrapped Listener recorder", table.j_table, None) + self.table = table + + def table_update(self) -> Optional[TableUpdate]: + """Gets the table update from the listener recorder. If there is no update in the current update graph cycle, + returns None. + + Returns: + a TableUpdate or None + """ + j_table_update = self.j_listener_recorder.getUpdate() + return TableUpdate(self.table, j_table_update) if j_table_update else None + + +class MergedListener(ABC): + """A MergedListener has a collection of ListenerRecorder. Each one must complete before the merged listener + executes only once for the current update graph cycle. """ + + @abstractmethod + def on_update(self, updates: Dict[Table, TableUpdate]) -> None: + """The required method on a user defined subclass of MergedListener that processes the table updates from the + tables that are listened to. + """ + ... + + +class MergedListenerHandle: + """A handle to manage a merged listener's lifecycle.""" + + def __init__(self, tables: Sequence[Table], listener: Union[Callable[[Dict[Table, TableUpdate]], None], MergedListener], + description: str = None, dependencies: Union[Table, Sequence[Table]] = None): + """Creates a new MergedListenerHandle with the provided listener recorders and dependencies. + + Table change events are processed by 'listener', which can be either + (1) a callable (e.g. function) or + (2) an instance of MergedTableListener type which provides an "on_update" method. + The callable or the on_update method must take 1 parameter of type Dict[Table, TableUpdate]. + + Note: Don't do table operations in the listener. Do them beforehand, and add the results as dependencies. + + Args: + tables (Sequence[Table]): tables to listen to + listener (Union[Callable[[Dict[Table, TableUpdate]], None], MergedListener]): listener to process table updates + from the tables. + description (str, optional): description for the UpdatePerformanceTracker to append to the listener's entry + dependencies (Union[Table, Sequence[Table]]): tables that must be satisfied before the listener's execution. + A refreshing table is considered to be satisfied if all possible updates to the table have been processed + in the current update graph cycle. A static table is always considered to be satisfied. If a specified + table is refreshing, it must belong to the same update graph as the table being listened to. Default is + None. + + Dependencies ensure that the listener can safely access the dependent tables during its execution. This + mainly includes reading the data from the tables. While performing operations on the dependent tables in + the listener is safe, it is not recommended because reading or operating on the result tables of those + operations may not be safe. It is best to perform the operations on the dependent tables beforehand, + and then add the result tables as dependencies to the listener so that they can be safely read in it. + + Raises: + DHError + """ + if len(tables) < 2: + raise DHError(message="A merged listener must listen to at least 2 refreshing tables.") + + self.tables = tables + self.listener_recorders = [_ListenerRecorder(t) for t in tables] + + self.dependencies = dependencies + + if isinstance(listener, MergedListener): + self.listener = listener.on_update + else: + self.listener = listener + + try: + self.merged_listener_adapter = _JPythonMergedListenerAdapter.create( + to_sequence(self.listener_recorders), + to_sequence(self.dependencies), + description, + self) + self.started = False + except Exception as e: + raise DHError(e, "failed to create a merged listener adapter.") from e + + def process(self) -> None: + """Process the table updates from the listener recorders.""" + self.listener({lr.table: lr.table_update() for lr in self.listener_recorders}) + + def start(self) -> None: + """Start the listener.""" + if self.started: + raise RuntimeError("Attempting to start an already started merged listener..") + + with update_graph.auto_locking_ctx(self.listener_recorders[0].table.update_graph): + for lr in self.listener_recorders: + lr.table.j_table.addUpdateListener(lr.j_listener_recorder) + self.started = True + + def stop(self) -> None: + """Stop the listener.""" + if not self.started: + return + + with update_graph.auto_locking_ctx(self.listener_recorders[0].table.update_graph): + for lr in self.listener_recorders: + lr.table.j_table.removeUpdateListener(lr.j_listener_recorder) + self.started = False + + +def merged_listen(tables: Sequence[Table], listener: Union[Callable[[Dict[Table, TableUpdate]], None], MergedListener], + description: str = None, dependencies: Union[Table, Sequence[Table]] = None) -> MergedListenerHandle: + """This is a convenience function that creates a MergedTableListenerHandle object and immediately starts it to + listen for table updates. + + The function returns the created MergedListenerHandle object whose 'stop' method can be called to stop + listening. If it goes out of scope and is garbage collected, the listener will stop receiving any table updates. + + Note: Don't do table operations in the listener. Do them beforehand, and add the results as dependencies. + + Args: + tables (Sequence[Table]): tables to listen to. + listener (Union[Callable[[Dict[Table, TableUpdate]], None], MergedListener]): listener to process table updates + from the tables. + description (str, optional): description for the UpdatePerformanceTracker to append to the listener's entry + description, default is None + dependencies (Union[Table, Sequence[Table]]): tables that must be satisfied before the listener's execution. + A refreshing table is considered to be satisfied if all possible updates to the table have been processed + in the current update graph cycle. A static table is always considered to be satisfied. If a specified + table is refreshing, it must belong to the same update graph as the table being listened to. Default is + None. + + Dependencies ensure that the listener can safely access the dependent tables during its execution. This + mainly includes reading the data from the tables. While performing operations on the dependent tables in + the listener is safe, it is not recommended because reading or operating on the result tables of those + operations may not be safe. It is best to perform the operations on the dependent tables beforehand, + and then add the result tables as dependencies to the listener so that they can be safely read in it. + """ + merged_listener_handle = MergedListenerHandle(tables=tables, listener=listener, + description=description, dependencies=dependencies) + merged_listener_handle.start() + return merged_listener_handle diff --git a/py/server/tests/test_table_listener.py b/py/server/tests/test_table_listener.py index bded80c1786..25eb5f72e22 100644 --- a/py/server/tests/test_table_listener.py +++ b/py/server/tests/test_table_listener.py @@ -4,17 +4,18 @@ import time import unittest -from typing import List, Union +from typing import List, Union, Optional, Dict import numpy import jpy -from deephaven import time_table, new_table, input_table, DHError +from deephaven import time_table, new_table, input_table, DHError, empty_table from deephaven.column import bool_col, string_col from deephaven.experimental import time_window from deephaven.jcompat import to_sequence from deephaven.table import Table -from deephaven.table_listener import listen, TableListener, TableListenerHandle +from deephaven.table_listener import listen, TableListener, TableListenerHandle, MergedListener, TableUpdate, \ + MergedListenerHandle, merged_listen from deephaven.execution_context import get_exec_ctx from deephaven.update_graph import exclusive_lock from tests.testbase import BaseTestCase @@ -22,7 +23,7 @@ _JColumnVectors = jpy.get_type("io.deephaven.engine.table.vectors.ColumnVectors") class TableUpdateRecorder: - def __init__(self, table: Table, chunk_size: int = None, cols: Union[str, List[str]] = None): + def __init__(self, table: Optional[Table] = None, chunk_size: int = None, cols: Union[str, List[str]] = None): self.table = table self.chunk_size = chunk_size self.cols = cols @@ -34,7 +35,7 @@ def __init__(self, table: Table, chunk_size: int = None, cols: Union[str, List[s self.replays = [] self.modified_columns_list = [] - def record(self, update, is_replay): + def record(self, update: TableUpdate, is_replay: bool=False): if self.chunk_size is None: self.added.append(update.added()) self.removed.append(update.removed()) @@ -327,6 +328,114 @@ def listener_func(update, is_replay): with self.assertRaises(DHError): table_listener_handle = TableListenerHandle(self.test_table, listener_func, dependencies=dep_table) + def test_merged_listener_obj(self): + t1 = time_table("PT1s").update(["X=i % 11"]) + t2 = time_table("PT2s").update(["Y=i % 8"]) + t3 = time_table("PT3s").update(["Z=i % 5"]) + + class TestMergedListener(MergedListener): + def on_update(self, updates: Dict[Table, TableUpdate]) -> None: + for update in updates.values(): + if update: + tur.record(update) + + tml = TestMergedListener() + with self.subTest("Direct Handle"): + tur = TableUpdateRecorder() + mlh = MergedListenerHandle([t1, t2, t3], tml) + mlh.start() + ensure_ugp_cycles(tur, cycles=3) + mlh.stop() + mlh.start() + ensure_ugp_cycles(tur, cycles=6) + mlh.stop() + self.assertGreaterEqual(len(tur.replays), 6) + + with self.subTest("Convenience function"): + tur = TableUpdateRecorder() + mlh = merged_listen([t1, t2, t3], tml) + ensure_ugp_cycles(tur, cycles=3) + mlh.stop() + mlh.start() + ensure_ugp_cycles(tur, cycles=6) + mlh.stop() + self.assertGreaterEqual(len(tur.replays), 6) + + def test_merged_listener_func(self): + t1 = time_table("PT1s").update(["X=i % 11"]) + t2 = time_table("PT2s").update(["Y=i % 8"]) + t3 = time_table("PT3s").update(["Z=i % 5"]) + + def test_ml_func(updates: Dict[Table, TableUpdate]) -> None: + if updates[t1] or updates[t3]: + tur.record(updates[t1]) + + with self.subTest("Direct Handle"): + tur = TableUpdateRecorder() + mlh = MergedListenerHandle([t1, t2, t3], test_ml_func) + mlh.start() + ensure_ugp_cycles(tur, cycles=3) + mlh.stop() + mlh.start() + ensure_ugp_cycles(tur, cycles=6) + mlh.stop() + self.assertGreaterEqual(len(tur.replays), 6) + + with self.subTest("Convenience function"): + tur = TableUpdateRecorder() + mlh = merged_listen([t1, t2, t3], test_ml_func) + ensure_ugp_cycles(tur, cycles=3) + mlh.stop() + mlh.start() + ensure_ugp_cycles(tur, cycles=6) + mlh.stop() + self.assertGreaterEqual(len(tur.replays), 6) + + def test_merged_listener_with_deps(self): + t1 = time_table("PT1s").update(["X=i % 11"]) + t2 = time_table("PT2s").update(["Y=i % 8"]) + t3 = time_table("PT3s").update(["Z=i % 5"]) + + dep_table = time_table("PT00:00:05").update("X = i % 11") + ec = get_exec_ctx() + + tur = TableUpdateRecorder() + j_arrays = [] + class TestMergedListener(MergedListener): + def on_update(self, updates: Dict[Table, TableUpdate]) -> None: + if updates[t1] and updates[t2]: + tur.record(updates[t2]) + + with ec: + t = dep_table.view(["Y = i % 8"]) + j_arrays.append(_JColumnVectors.of(t.j_table, "Y").copyToArray()) + + tml = TestMergedListener() + mlh = MergedListenerHandle(tables=[t1, t2, t3], listener=tml, dependencies=dep_table) + mlh.start() + ensure_ugp_cycles(tur, cycles=3) + mlh.stop() + mlh.start() + ensure_ugp_cycles(tur, cycles=6) + mlh.stop() + self.assertGreaterEqual(len(tur.replays), 6) + self.assertTrue(len(j_arrays) > 0 and all([len(ja) > 0 for ja in j_arrays])) + + def test_merged_listener_error(self): + t1 = time_table("PT1s").update(["X=i % 11"]) + + def test_ml_func(updates: Dict[Table, TableUpdate]) -> None: + pass + + with self.assertRaises(DHError) as cm: + mlh = MergedListenerHandle([t1], test_ml_func) + self.assertIn("at least 2 refreshing tables", str(cm.exception)) + + et = empty_table(1) + with self.assertRaises(DHError) as cm: + mlh = merged_listen([t1, et], test_ml_func) + self.assertIn("must be a refreshing table", str(cm.exception)) + if __name__ == "__main__": unittest.main()