Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api)!: Support merged listening on multiple tables #5672

Merged
merged 15 commits into from
Jul 15, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package io.deephaven.integrations.python;

import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.table.ModifiedColumnSet;
Expand All @@ -15,6 +14,7 @@
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.ScriptApi;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jpy.PyObject;
Expand All @@ -31,6 +31,7 @@
* The Python listener object must be a Python MergedListener instance that provides a "_process" method implementation
* with no argument.
*/
@ScriptApi
public class PythonMergedListenerAdapter extends MergedListener {
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
private final PyObject pyCallable;

Expand Down Expand Up @@ -74,20 +75,18 @@ public static PythonMergedListenerAdapter create(
}
}

public void replay() {
final RowSet emptyRowSet = RowSetFactory.empty();
final RowSetShiftData emptyShift = RowSetShiftData.EMPTY;
final ModifiedColumnSet emptyColumnSet = ModifiedColumnSet.EMPTY;

ArrayList<TableUpdate> updates = new ArrayList<>();
public ArrayList<TableUpdate> currentRowsAsUpdates() {
final ArrayList<TableUpdate> updates = new ArrayList<>();
for (ListenerRecorder recorder : getRecorders()) {
final TableUpdate update =
new TableUpdateImpl(recorder.getParent().getRowSet(), emptyRowSet, emptyRowSet, emptyShift,
emptyColumnSet);
final TableUpdate update = new TableUpdateImpl(
recorder.getParent().getRowSet().copy(),
RowSetFactory.empty(),
RowSetFactory.empty(),
RowSetShiftData.EMPTY,
ModifiedColumnSet.EMPTY);
updates.add(update);
}

pyCallable.call("__call__", updates);
return updates;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ public abstract class MergedListener extends LivenessArtifact implements Notific

private final UpdateGraph updateGraph;

jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
protected Iterable<? extends ListenerRecorder> getRecorders() {
return recorders;
}

private final Iterable<? extends ListenerRecorder> recorders;
private final Iterable<NotificationQueue.Dependency> dependencies;
private final String listenerDescription;
Expand Down Expand Up @@ -96,6 +92,10 @@ public UpdateGraph getUpdateGraph() {
return updateGraph;
}

protected Iterable<? extends ListenerRecorder> getRecorders() {
return recorders;
}

public final void notifyOnUpstreamError(
@NotNull final Throwable upstreamError, @Nullable final TableListener.Entry errorSourceEntry) {
notifyInternal(upstreamError, errorSourceEntry);
Expand Down
70 changes: 33 additions & 37 deletions py/server/deephaven/table_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,13 @@ def _do_locked(ug: Union[UpdateGraph, Table], f: Callable, lock_type: Literal["s
else:
raise ValueError(f"Unsupported lock type: lock_type={lock_type}")

def _locked(ug: Union[UpdateGraph, Table], lock_type: Literal["shared", "exclusive"] = "shared") -> Generator:
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
if lock_type == "exclusive":
return update_graph.exclusive_lock(ug)
elif lock_type == "shared":
return update_graph.shared_lock(ug)
else:
raise ValueError(f"Unsupported lock type: lock_type={lock_type}")

class TableListener(ABC):
"""An abstract table listener class that should be subclassed by any user table listener class."""
Expand Down Expand Up @@ -298,6 +305,9 @@ def __init__(self, t: Table, listener: Union[Callable[[TableUpdate, bool], None]
Raises:
DHError
"""
if not t.is_refreshing:
raise DHError(message="table must be a refreshing table.")
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved

self.t = t
self.description = description
self.dependencies = to_sequence(dependencies)
Expand Down Expand Up @@ -333,18 +343,13 @@ def start(self, do_replay: bool = False, replay_lock: Literal["shared", "exclusi
raise RuntimeError("Attempting to start an already started listener..")

try:
def _start():
with update_graph.auto_locking_ctx(self.t):
if do_replay:
self.listener_adapter.replay()

self.t.j_table.addUpdateListener(self.listener_adapter)

if do_replay:
_do_locked(self.t, _start, lock_type=replay_lock)
else:
_start()
except Exception as e:
raise DHError(e, "failed to listen to the table changes.") from e
raise DHError(e, "failed to listen to the table changes.") from e

self.started = True

Expand All @@ -353,7 +358,8 @@ def stop(self) -> None:

if not self.started:
return
self.t.j_table.removeUpdateListener(self.listener_adapter)
with update_graph.auto_locking_ctx(self.t):
self.t.j_table.removeUpdateListener(self.listener_adapter)
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
self.started = False


Expand Down Expand Up @@ -509,23 +515,15 @@ def __init__(self, tables: Sequence[Table], listener: Union[Callable[[Dict[Table
except Exception as e:
raise DHError(e, "failed to create a merged listener adapter.") from e

def _process(self, replay_updates: Optional[jpy.JType] = None) -> None:
"""Process the table updates from the listener recorders.

Args:
replay_updates (Optional[jpy.JType]): the replay updates, only provided during replay.
"""
if replay_updates:
self.listener({t: TableUpdate(t, tu) for t, tu in zip(self.tables, j_list_to_list(replay_updates))}, True)
else:
self.listener({lr.table: lr.table_update() for lr in self.listener_recorders}, False)
def _process(self) -> None:
"""Process the table updates from the listener recorders. """
self.listener({lr.table: lr.table_update() for lr in self.listener_recorders}, False)
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved

def start(self, do_replay: bool = False, replay_lock: Literal["shared", "exclusive"] = "shared") -> None:
def start(self, do_replay: bool = False) -> None:
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
"""Start the listener by registering it with the tables and listening for updates.

Args:
do_replay (bool): whether to replay the initial snapshots of the tables, default is False
replay_lock (str): the lock type used during replay, default is 'shared', can also be 'exclusive'.

Raises:
DHError
Expand All @@ -534,19 +532,18 @@ def start(self, do_replay: bool = False, replay_lock: Literal["shared", "exclusi
raise RuntimeError("Attempting to start an already started merged listener..")

try:
def _start():
with update_graph.auto_locking_ctx(self.tables[0]):
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
if do_replay:
self.merged_listener_adapter.replay()

with update_graph.auto_locking_ctx(self.tables[0].update_graph):
for lr in self.listener_recorders:
lr.table.j_table.addUpdateListener(lr.j_listener_recorder)

if do_replay:
_do_locked(self.tables[0].update_graph, _start, lock_type=replay_lock)
else:
_start()

j_replay_updates = self.merged_listener_adapter.currentRowsAsUpdates()
replay_updates = {t: TableUpdate(t, tu) for t, tu in zip(self.tables, j_list_to_list(j_replay_updates))}
try:
self.listener(replay_updates, True)
finally:
for replay_update in replay_updates.values():
replay_update.j_object.release()

for lr in self.listener_recorders:
lr.table.j_table.addUpdateListener(lr.j_listener_recorder)
except Exception as e:
raise DHError(e, "failed to listen to the table changes.") from e

Expand All @@ -558,15 +555,15 @@ def stop(self) -> None:
if not self.started:
return

with update_graph.auto_locking_ctx(self.listener_recorders[0].table.update_graph):
with update_graph.auto_locking_ctx(self.tables[0]):
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
for lr in self.listener_recorders:
lr.table.j_table.removeUpdateListener(lr.j_listener_recorder)
self.started = False


def merged_listen(tables: Sequence[Table], listener: Union[Callable[[Dict[Table, TableUpdate]], None], MergedListener],
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
do_replay: bool = False, replay_lock: Literal["shared", "exclusive"] = "shared",
description: str = None, dependencies: Union[Table, Sequence[Table]] = None) -> MergedListenerHandle:
do_replay: bool = False, description: str = None, dependencies: Union[Table, Sequence[Table]] = None)\
-> MergedListenerHandle:
"""This is a convenience function that creates a MergedListenerHandle object and immediately starts it to
listen for table updates.

Expand All @@ -582,7 +579,6 @@ def merged_listen(tables: Sequence[Table], listener: Union[Callable[[Dict[Table,
description (str, optional): description for the UpdatePerformanceTracker to append to the listener's entry
description, default is None
do_replay (bool): whether to replay the initial snapshots of the tables, default is False
replay_lock (str): the lock type used during replay, default is 'shared', can also be 'exclusive'
dependencies (Union[Table, Sequence[Table]]): tables that must be satisfied before the listener's execution.
A refreshing table is considered to be satisfied if all possible updates to the table have been processed
in the current update graph cycle. A static table is always considered to be satisfied. If a specified
Expand All @@ -597,5 +593,5 @@ def merged_listen(tables: Sequence[Table], listener: Union[Callable[[Dict[Table,
"""
merged_listener_handle = MergedListenerHandle(tables=tables, listener=listener,
description=description, dependencies=dependencies)
merged_listener_handle.start(do_replay=do_replay, replay_lock=replay_lock)
merged_listener_handle.start(do_replay=do_replay)
return merged_listener_handle
8 changes: 4 additions & 4 deletions py/server/tests/test_table_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(self, table: Optional[Table] = None, chunk_size: int = None, cols:
def record(self, update: TableUpdate, is_replay: bool):
if not update:
return

if self.chunk_size is None:
self.added.append(update.added())
self.removed.append(update.removed())
Expand Down Expand Up @@ -457,7 +457,7 @@ def on_update(self, updates: Dict[Table, TableUpdate], is_replay: bool) -> None:
mlh = merged_listen([t1, t2, t3], tml, do_replay=True)
ensure_ugp_cycles(tur, cycles=3)
mlh.stop()
mlh.start(do_replay=True, replay_lock="exclusive")
mlh.start(do_replay=True)
ensure_ugp_cycles(tur, cycles=6)
mlh.stop()
self.assertGreaterEqual(len(tur.replays), 6)
Expand All @@ -469,10 +469,10 @@ def test_ml_func(updates: Dict[Table, TableUpdate], is_replay: bool) -> None:
with self.subTest("Direct Handle - replay"):
tur = TableUpdateRecorder()
mlh = MergedListenerHandle([t1, t2, t3], test_ml_func)
mlh.start(do_replay=True, replay_lock="shared")
mlh.start(do_replay=True)
ensure_ugp_cycles(tur, cycles=3)
mlh.stop()
mlh.start(do_replay=True, replay_lock="exclusive")
mlh.start(do_replay=True)
ensure_ugp_cycles(tur, cycles=6)
mlh.stop()
self.assertGreaterEqual(len(tur.replays), 6)
Expand Down
6 changes: 3 additions & 3 deletions py/server/tests/test_udf_scalar_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,9 +609,9 @@ def f(p1: float, p2: np.float64) -> bool:
dv = 0.05
with warnings.catch_warnings(record=True) as w:
t = empty_table(10).update("X = f(dv, dv)")
self.assertEqual(w[-1].category, UserWarning)
self.assertRegex(str(w[-1].message), "numpy scalar type.*is used")
self.assertEqual(10, t.to_string().count("true"))
self.assertEqual(w[-1].category, UserWarning)
self.assertRegex(str(w[-1].message), "numpy scalar type.*is used")
self.assertEqual(10, t.to_string().count("true"))


if __name__ == "__main__":
Expand Down
Loading