diff --git a/py/server/deephaven/table_listener.py b/py/server/deephaven/table_listener.py index 11d44eba128..7b46d4b90c8 100644 --- a/py/server/deephaven/table_listener.py +++ b/py/server/deephaven/table_listener.py @@ -69,8 +69,9 @@ class TableUpdate(JObjectWrapper): def __init__(self, table: Table, j_table_update: jpy.JType): self.table = table - self.j_table_update = jpy.cast(j_table_update, _JTableUpdateImpl) - print(type(j_table_update), type(j_table_update).__mro__) + self.j_table_update = j_table_update + # make sure we always use the _JTableUpdate interface and not the implementations + self.j_table_update = jpy.cast(j_table_update, _JTableUpdate) @property def j_object(self) -> jpy.JType: @@ -86,12 +87,12 @@ def added(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]: Returns: a dict """ - if not self.j_table_update.added: + if not self.j_table_update.added(): return {} try: return next( - _changes_to_numpy(table=self.table, cols=cols, row_set=self.j_table_update.added.asRowSet(), + _changes_to_numpy(table=self.table, cols=cols, row_set=self.j_table_update.added().asRowSet(), chunk_size=None)) except StopIteration: return {} @@ -108,10 +109,10 @@ def added_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) -> G Returns: a generator """ - if not self.j_table_update.added: + if not self.j_table_update.added(): return (_ for _ in ()) - return _changes_to_numpy(table=self.table, cols=cols, row_set=self.j_table_update.added.asRowSet(), + return _changes_to_numpy(table=self.table, cols=cols, row_set=self.j_table_update.added().asRowSet(), chunk_size=chunk_size) def removed(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]: @@ -124,12 +125,12 @@ def removed(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray Returns: a dict """ - if not self.j_table_update.removed: + if not self.j_table_update.removed(): return {} try: return next( - _changes_to_numpy(table=self.table, cols=cols, row_set=self.j_table_update.removed.asRowSet(), + _changes_to_numpy(table=self.table, cols=cols, row_set=self.j_table_update.removed().asRowSet(), chunk_size=None, prev=True)) except StopIteration: return {} @@ -146,10 +147,10 @@ def removed_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) -> Returns: a generator """ - if not self.j_table_update.removed: + if not self.j_table_update.removed(): return (_ for _ in ()) - return _changes_to_numpy(table=self.table, cols=cols, row_set=self.j_table_update.removed.asRowSet(), + return _changes_to_numpy(table=self.table, cols=cols, row_set=self.j_table_update.removed().asRowSet(), chunk_size=chunk_size, prev=True) def modified(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]: @@ -162,12 +163,12 @@ def modified(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarra Returns: a dict """ - if not self.j_table_update.modified: + if not self.j_table_update.modified(): return {} try: return next( - _changes_to_numpy(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(), + _changes_to_numpy(self.table, cols=cols, row_set=self.j_table_update.modified().asRowSet(), chunk_size=None)) except StopIteration: return {} @@ -184,10 +185,10 @@ def modified_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) - Returns: a generator """ - if not self.j_table_update.modified: + if not self.j_table_update.modified(): return (_ for _ in ()) - return _changes_to_numpy(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(), + return _changes_to_numpy(self.table, cols=cols, row_set=self.j_table_update.modified().asRowSet(), chunk_size=chunk_size) def modified_prev(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]: @@ -200,12 +201,12 @@ def modified_prev(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.n Returns: a dict """ - if not self.j_table_update.modified: + if not self.j_table_update.modified(): return {} try: return next( - _changes_to_numpy(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(), + _changes_to_numpy(self.table, cols=cols, row_set=self.j_table_update.modified().asRowSet(), chunk_size=None, prev=True)) except StopIteration: return {} @@ -222,10 +223,10 @@ def modified_prev_chunks(self, chunk_size: int, cols: Union[str, List[str]] = No Returns: a generator """ - if not self.j_table_update.modified: + if not self.j_table_update.modified(): return (_ for _ in ()) - return _changes_to_numpy(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(), + return _changes_to_numpy(self.table, cols=cols, row_set=self.j_table_update.modified().asRowSet(), chunk_size=chunk_size, prev=True) @property @@ -235,7 +236,7 @@ def shifted(self): @property def modified_columns(self) -> List[str]: """The list of modified columns in this update.""" - cols = self.j_table_update.modifiedColumnSet.dirtyColumnNames() + cols = self.j_table_update.modifiedColumnSet().dirtyColumnNames() return list(cols) if cols else [] @@ -285,7 +286,7 @@ def _listener_wrapper(table: Table): table (Table): the table to listen for updates. """ - def decorator(listener: Callable): + def decorator(listener: Callable[[TableUpdate, bool], None]): @wraps(listener) def wrapper(update, *args): t_update = TableUpdate(table=table, j_table_update=update) @@ -296,7 +297,7 @@ def wrapper(update, *args): return decorator -def _wrap_listener_func(t: Table, listener: Callable): +def _wrap_listener_func(t: Table, listener: Callable[[TableUpdate, bool], None]): n_params = len(signature(listener).parameters) if n_params != 2: raise ValueError("listener function must have 2 (update, is_replay) parameters.") @@ -315,7 +316,7 @@ class TableListenerHandle(JObjectWrapper): """A handle to manage a table listener's lifecycle.""" j_object_type = _JPythonReplayListenerAdapter - def __init__(self, t: Table, listener: Union[Callable, TableListener], description: str = None, + def __init__(self, t: Table, listener: Union[Callable[[TableUpdate, bool], None], TableListener], description: str = None, dependencies: Union[Table, Sequence[Table]] = None): """Creates a new table listener handle with dependencies. @@ -333,7 +334,7 @@ def __init__(self, t: Table, listener: Union[Callable, TableListener], descripti Args: t (Table): table to listen to - listener (Union[Callable, TableListener]): listener for table changes + listener (Union[Callable[[TableUpdate, bool], None], TableListener]): listener for table changes description (str, optional): description for the UpdatePerformanceTracker to append to the listener's entry description, default is None dependencies (Union[Table, Sequence[Table]]): tables that must be satisfied before the listener's execution. @@ -410,7 +411,7 @@ def stop(self) -> None: self.started = False -def listen(t: Table, listener: Union[Callable, TableListener], description: str = None, do_replay: bool = False, +def listen(t: Table, listener: Union[Callable[[TableUpdate, bool], None], TableListener], description: str = None, do_replay: bool = False, replay_lock: Literal["shared", "exclusive"] = "shared", dependencies: Union[Table, Sequence[Table]] = None)\ -> TableListenerHandle: """This is a convenience function that creates a TableListenerHandle object and immediately starts it to listen @@ -423,7 +424,7 @@ def listen(t: Table, listener: Union[Callable, TableListener], description: str Args: t (Table): table to listen to - listener (Union[Callable, TableListener]): listener for table changes + listener (Union[Callable[[TableUpdate, bool], None], TableListener]): listener for table changes description (str, optional): description for the UpdatePerformanceTracker to append to the listener's entry description, default is None do_replay (bool): whether to replay the initial snapshot of the table, default is False @@ -470,14 +471,6 @@ def __init__(self, table: Table): self.j_listener_recorder = _JListenerRecorder("Python Wrapped Listener recorder", table.j_table, None) self.table = table - def are_recorded_updates_valid(self) -> bool: - """Checks if the recorded updates are valid for the current update cycle. - - Returns: - a bool - """ - return self.j_listener_recorder.recordedVariablesAreValid() - def table_update(self) -> Optional[TableUpdate]: """Gets the table update from the listener recorder. If there is no update in the current update graph cycle, returns None. @@ -570,7 +563,7 @@ def start(self) -> None: if self.started: raise RuntimeError("Attempting to start an already started merged listener..") - with update_graph.shared_lock(self.listener_recorders[0].table.update_graph): + with update_graph.auto_locking_ctx(self.listener_recorders[0].table.update_graph): for lr in self.listener_recorders: lr.table.j_table.addUpdateListener(lr.j_listener_recorder) self.started = True @@ -580,7 +573,7 @@ def stop(self) -> None: if not self.started: return - with update_graph.shared_lock(self.listener_recorders[0].table.update_graph): + with update_graph.auto_locking_ctx(self.listener_recorders[0].table.update_graph): for lr in self.listener_recorders: lr.table.j_table.removeUpdateListener(lr.j_listener_recorder) self.started = False