Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api)!: Support merged listening on multiple tables #5672

Merged
merged 15 commits into from
Jul 15, 2024
50 changes: 6 additions & 44 deletions py/server/deephaven/table_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,42 +187,6 @@ def modified_columns(self) -> List[str]:
return list(cols) if cols else []


def _do_locked(ug: Union[UpdateGraph, Table], f: Callable, lock_type: Literal["shared","exclusive"] = "shared") -> \
None:
"""Executes a function while holding the UpdateGraph (UG) lock. Holding the UG lock
ensures that the contents of a table will not change during a computation, but holding
the lock also prevents table updates from happening. The lock should be held for as little
time as possible.

Args:
ug (Union[UpdateGraph, Table]): The Update Graph (UG) or a table-like object.
f (Callable): callable to execute while holding the UG lock, could be function or an object with an 'apply'
attribute which is callable
lock_type (str): UG lock type, valid values are "exclusive" and "shared". "exclusive" allows only a single
reader or writer to hold the lock. "shared" allows multiple readers or a single writer to hold the lock.
Raises:
ValueError
"""
if isinstance(ug, Table):
ug = ug.update_graph

if lock_type == "exclusive":
with update_graph.exclusive_lock(ug):
f()
elif lock_type == "shared":
with update_graph.shared_lock(ug):
f()
else:
raise ValueError(f"Unsupported lock type: lock_type={lock_type}")

def _locked(ug: Union[UpdateGraph, Table], lock_type: Literal["shared", "exclusive"] = "shared") -> Generator:
if lock_type == "exclusive":
return update_graph.exclusive_lock(ug)
elif lock_type == "shared":
return update_graph.shared_lock(ug)
else:
raise ValueError(f"Unsupported lock type: lock_type={lock_type}")

class TableListener(ABC):
"""An abstract table listener class that should be subclassed by any user table listener class."""

Expand Down Expand Up @@ -329,12 +293,11 @@ def __init__(self, t: Table, listener: Union[Callable[[TableUpdate, bool], None]
def j_object(self) -> jpy.JType:
return self.listener_adapter

def start(self, do_replay: bool = False, replay_lock: Literal["shared", "exclusive"] = "shared") -> None:
def start(self, do_replay: bool = False) -> None:
"""Start the listener by registering it with the table and listening for updates.

Args:
do_replay (bool): whether to replay the initial snapshot of the table, default is False
replay_lock (str): the lock type used during replay, default is 'shared', can also be 'exclusive'.

Raises:
DHError
Expand All @@ -358,14 +321,12 @@ def stop(self) -> None:

if not self.started:
return
with update_graph.auto_locking_ctx(self.t):
self.t.j_table.removeUpdateListener(self.listener_adapter)
self.t.j_table.removeUpdateListener(self.listener_adapter)
self.started = False


def listen(t: Table, listener: Union[Callable[[TableUpdate, bool], None], TableListener], description: str = None, do_replay: bool = False,
replay_lock: Literal["shared", "exclusive"] = "shared", dependencies: Union[Table, Sequence[Table]] = None)\
-> TableListenerHandle:
dependencies: Union[Table, Sequence[Table]] = None) -> TableListenerHandle:
"""This is a convenience function that creates a TableListenerHandle object and immediately starts it to listen
for table updates.

Expand All @@ -380,7 +341,6 @@ def listen(t: Table, listener: Union[Callable[[TableUpdate, bool], None], TableL
description (str, optional): description for the UpdatePerformanceTracker to append to the listener's entry
description, default is None
do_replay (bool): whether to replay the initial snapshot of the table, default is False
replay_lock (str): the lock type used during replay, default is 'shared', can also be 'exclusive'
dependencies (Union[Table, Sequence[Table]]): tables that must be satisfied before the listener's execution.
A refreshing table is considered to be satisfied if all possible updates to the table have been processed
in the current update graph cycle. A static table is always considered to be satisfied. If a specified
Expand All @@ -401,7 +361,7 @@ def listen(t: Table, listener: Union[Callable[[TableUpdate, bool], None], TableL
"""
table_listener_handle = TableListenerHandle(t=t, dependencies=dependencies, listener=listener,
description=description)
table_listener_handle.start(do_replay=do_replay, replay_lock=replay_lock)
table_listener_handle.start(do_replay=do_replay)
return table_listener_handle


Expand Down Expand Up @@ -532,6 +492,7 @@ def start(self, do_replay: bool = False) -> None:
raise RuntimeError("Attempting to start an already started merged listener..")

try:
# self.tables[0] is guaranteed to be a refreshing table
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could actually make this more elegant and future-proof if you:

  1. Exposed is_refreshing and update_graph as properties on your MergedListener wrapper (MergedListenerHandle). The former is presumably always true, and the latter can delegate to the wrapper Java Object's getUpdateGraph().
  2. Used self as the object to pass to auto_locking_ctx.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's probably more elegant code but I am OK with the current code as it is more 'explicit'.

with update_graph.auto_locking_ctx(self.tables[0]):
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
if do_replay:
j_replay_updates = self.merged_listener_adapter.currentRowsAsUpdates()
Expand All @@ -555,6 +516,7 @@ def stop(self) -> None:
if not self.started:
return

# self.tables[0] is guaranteed to be a refreshing table
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
with update_graph.auto_locking_ctx(self.tables[0]):
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
for lr in self.listener_recorders:
lr.table.j_table.removeUpdateListener(lr.j_listener_recorder)
Expand Down
4 changes: 2 additions & 2 deletions py/server/tests/test_table_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,14 +297,14 @@ def listener_func(update, is_replay):

with self.subTest("do_replay=True, replay_lock='exclusive'"):
table_update_recorder = TableUpdateRecorder(self.test_table)
table_listener_handle.start(do_replay=True, replay_lock="exclusive")
table_listener_handle.start(do_replay=True)
ensure_ugp_cycles(table_update_recorder, cycles=3)
table_listener_handle.stop()
self.check_update_recorder(table_update_recorder, has_replay=True, has_added=True, has_removed=True, has_modified=False)

with self.subTest("do_replay=True, replay_lock='shared'"):
table_update_recorder = TableUpdateRecorder(self.test_table)
table_listener_handle.start(do_replay=True, replay_lock="shared") # noqa
table_listener_handle.start(do_replay=True) # noqa
ensure_ugp_cycles(table_update_recorder, cycles=3)
table_listener_handle.stop()
self.check_update_recorder(table_update_recorder, has_replay=True, has_added=True, has_removed=True, has_modified=False)
Expand Down
Loading