diff --git a/py/server/deephaven/table_listener.py b/py/server/deephaven/table_listener.py index 730922de7dc..445a26dea7c 100644 --- a/py/server/deephaven/table_listener.py +++ b/py/server/deephaven/table_listener.py @@ -576,7 +576,7 @@ def stop(self) -> None: self.started = False -def merged_listen(listener_recorders: Sequence[ListenerRecorder], listener: Union[Callable, TableListener], +def merged_listen(listener_recorders: Sequence[ListenerRecorder], listener: Union[Callable, MergedListener], description: str = None, dependencies: Union[Table, Sequence[Table]] = None) -> MergedListenerHandle: """This is a convenience function that creates a MergedTableListenerHandle object and immediately starts it to listen diff --git a/py/server/tests/test_table_listener.py b/py/server/tests/test_table_listener.py index 563dba7f2ce..1fb6b31f7f3 100644 --- a/py/server/tests/test_table_listener.py +++ b/py/server/tests/test_table_listener.py @@ -15,7 +15,7 @@ from deephaven.jcompat import to_sequence from deephaven.table import Table from deephaven.table_listener import listen, TableListener, TableListenerHandle, MergedListener, TableUpdate, \ - ListenerRecorder, MergedListenerHandle + ListenerRecorder, MergedListenerHandle, merged_listen from deephaven.execution_context import get_exec_ctx from deephaven.update_graph import exclusive_lock from tests.testbase import BaseTestCase @@ -329,7 +329,6 @@ def listener_func(update, is_replay): table_listener_handle = TableListenerHandle(self.test_table, listener_func, dependencies=dep_table) def test_merged_listener_obj(self): - tur = TableUpdateRecorder() t1 = time_table("PT1s").update(["X=i % 11"]) t2 = time_table("PT2s").update(["Y=i % 8"]) t3 = time_table("PT3s").update(["Z=i % 5"]) @@ -341,17 +340,29 @@ def process(self) -> None: tur.record(self.listener_recorders[i].table_update()) tml = TestMergedListener() - mlh = MergedListenerHandle([ListenerRecorder(t) for t in [t1, t2, t3]], tml) - mlh.start() - ensure_ugp_cycles(tur, cycles=3) - mlh.stop() - mlh.start() - ensure_ugp_cycles(tur, cycles=8) - mlh.stop() - self.assertGreaterEqual(len(tur.replays), 8) + with self.subTest("Direct Handle"): + tur = TableUpdateRecorder() + mlh = MergedListenerHandle([ListenerRecorder(t) for t in [t1, t2, t3]], tml) + mlh.start() + ensure_ugp_cycles(tur, cycles=3) + mlh.stop() + mlh.start() + ensure_ugp_cycles(tur, cycles=6) + mlh.stop() + self.assertGreaterEqual(len(tur.replays), 6) + + with self.subTest("Convenience function"): + tur = TableUpdateRecorder() + mlh = merged_listen([ListenerRecorder(t) for t in [t1, t2, t3]], tml) + ensure_ugp_cycles(tur, cycles=3) + mlh.stop() + mlh.start() + ensure_ugp_cycles(tur, cycles=6) + mlh.stop() + self.assertGreaterEqual(len(tur.replays), 6) + def test_merged_listener_func(self): - tur = TableUpdateRecorder() t1 = time_table("PT1s").update(["X=i % 11"]) t2 = time_table("PT2s").update(["Y=i % 8"]) t3 = time_table("PT3s").update(["Z=i % 5"]) @@ -362,18 +373,33 @@ def test_ml_func() -> None: if listener_recorders[i].table_update(): tur.record(listener_recorders[i].table_update()) - mlh = MergedListenerHandle(listener_recorders, test_ml_func) - mlh.start() - ensure_ugp_cycles(tur, cycles=3) - mlh.stop() - mlh.start() - ensure_ugp_cycles(tur, cycles=8) - mlh.stop() - self.assertGreaterEqual(len(tur.replays), 8) + with self.subTest("Direct Handle"): + tur = TableUpdateRecorder() + mlh = MergedListenerHandle(listener_recorders, test_ml_func) + mlh.start() + ensure_ugp_cycles(tur, cycles=3) + mlh.stop() + mlh.start() + ensure_ugp_cycles(tur, cycles=6) + mlh.stop() + self.assertGreaterEqual(len(tur.replays), 6) + + with self.subTest("Convenience function"): + tur = TableUpdateRecorder() + mlh = merged_listen(listener_recorders, test_ml_func) + ensure_ugp_cycles(tur, cycles=3) + mlh.stop() + mlh.start() + ensure_ugp_cycles(tur, cycles=6) + mlh.stop() + self.assertGreaterEqual(len(tur.replays), 6) def test_merged_listener_with_deps(self): ... + def test_merged_listener_with_deps_error(self): + ... + if __name__ == "__main__": unittest.main()