Skip to content

Commit

Permalink
Use the Java TableUpdate interface for DI
Browse files Browse the repository at this point in the history
  • Loading branch information
jmao-denver committed Jul 10, 2024
1 parent ef536b7 commit 6abc8ac
Showing 1 changed file with 28 additions and 35 deletions.
63 changes: 28 additions & 35 deletions py/server/deephaven/table_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ class TableUpdate(JObjectWrapper):

def __init__(self, table: Table, j_table_update: jpy.JType):
self.table = table
self.j_table_update = jpy.cast(j_table_update, _JTableUpdateImpl)
print(type(j_table_update), type(j_table_update).__mro__)
self.j_table_update = j_table_update
# make sure we always use the _JTableUpdate interface and not the implementations
self.j_table_update = jpy.cast(j_table_update, _JTableUpdate)

@property
def j_object(self) -> jpy.JType:
Expand All @@ -86,12 +87,12 @@ def added(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]:
Returns:
a dict
"""
if not self.j_table_update.added:
if not self.j_table_update.added():
return {}

try:
return next(
_changes_to_numpy(table=self.table, cols=cols, row_set=self.j_table_update.added.asRowSet(),
_changes_to_numpy(table=self.table, cols=cols, row_set=self.j_table_update.added().asRowSet(),
chunk_size=None))
except StopIteration:
return {}
Expand All @@ -108,10 +109,10 @@ def added_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) -> G
Returns:
a generator
"""
if not self.j_table_update.added:
if not self.j_table_update.added():
return (_ for _ in ())

return _changes_to_numpy(table=self.table, cols=cols, row_set=self.j_table_update.added.asRowSet(),
return _changes_to_numpy(table=self.table, cols=cols, row_set=self.j_table_update.added().asRowSet(),
chunk_size=chunk_size)

def removed(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]:
Expand All @@ -124,12 +125,12 @@ def removed(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray
Returns:
a dict
"""
if not self.j_table_update.removed:
if not self.j_table_update.removed():
return {}

try:
return next(
_changes_to_numpy(table=self.table, cols=cols, row_set=self.j_table_update.removed.asRowSet(),
_changes_to_numpy(table=self.table, cols=cols, row_set=self.j_table_update.removed().asRowSet(),
chunk_size=None, prev=True))
except StopIteration:
return {}
Expand All @@ -146,10 +147,10 @@ def removed_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) ->
Returns:
a generator
"""
if not self.j_table_update.removed:
if not self.j_table_update.removed():
return (_ for _ in ())

return _changes_to_numpy(table=self.table, cols=cols, row_set=self.j_table_update.removed.asRowSet(),
return _changes_to_numpy(table=self.table, cols=cols, row_set=self.j_table_update.removed().asRowSet(),
chunk_size=chunk_size, prev=True)

def modified(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]:
Expand All @@ -162,12 +163,12 @@ def modified(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarra
Returns:
a dict
"""
if not self.j_table_update.modified:
if not self.j_table_update.modified():
return {}

try:
return next(
_changes_to_numpy(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(),
_changes_to_numpy(self.table, cols=cols, row_set=self.j_table_update.modified().asRowSet(),
chunk_size=None))
except StopIteration:
return {}
Expand All @@ -184,10 +185,10 @@ def modified_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) -
Returns:
a generator
"""
if not self.j_table_update.modified:
if not self.j_table_update.modified():
return (_ for _ in ())

return _changes_to_numpy(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(),
return _changes_to_numpy(self.table, cols=cols, row_set=self.j_table_update.modified().asRowSet(),
chunk_size=chunk_size)

def modified_prev(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]:
Expand All @@ -200,12 +201,12 @@ def modified_prev(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.n
Returns:
a dict
"""
if not self.j_table_update.modified:
if not self.j_table_update.modified():
return {}

try:
return next(
_changes_to_numpy(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(),
_changes_to_numpy(self.table, cols=cols, row_set=self.j_table_update.modified().asRowSet(),
chunk_size=None, prev=True))
except StopIteration:
return {}
Expand All @@ -222,10 +223,10 @@ def modified_prev_chunks(self, chunk_size: int, cols: Union[str, List[str]] = No
Returns:
a generator
"""
if not self.j_table_update.modified:
if not self.j_table_update.modified():
return (_ for _ in ())

return _changes_to_numpy(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(),
return _changes_to_numpy(self.table, cols=cols, row_set=self.j_table_update.modified().asRowSet(),
chunk_size=chunk_size, prev=True)

@property
Expand All @@ -235,7 +236,7 @@ def shifted(self):
@property
def modified_columns(self) -> List[str]:
"""The list of modified columns in this update."""
cols = self.j_table_update.modifiedColumnSet.dirtyColumnNames()
cols = self.j_table_update.modifiedColumnSet().dirtyColumnNames()

return list(cols) if cols else []

Expand Down Expand Up @@ -285,7 +286,7 @@ def _listener_wrapper(table: Table):
table (Table): the table to listen for updates.
"""

def decorator(listener: Callable):
def decorator(listener: Callable[[TableUpdate, bool], None]):
@wraps(listener)
def wrapper(update, *args):
t_update = TableUpdate(table=table, j_table_update=update)
Expand All @@ -296,7 +297,7 @@ def wrapper(update, *args):
return decorator


def _wrap_listener_func(t: Table, listener: Callable):
def _wrap_listener_func(t: Table, listener: Callable[[TableUpdate, bool], None]):
n_params = len(signature(listener).parameters)
if n_params != 2:
raise ValueError("listener function must have 2 (update, is_replay) parameters.")
Expand All @@ -315,7 +316,7 @@ class TableListenerHandle(JObjectWrapper):
"""A handle to manage a table listener's lifecycle."""
j_object_type = _JPythonReplayListenerAdapter

def __init__(self, t: Table, listener: Union[Callable, TableListener], description: str = None,
def __init__(self, t: Table, listener: Union[Callable[[TableUpdate, bool], None], TableListener], description: str = None,
dependencies: Union[Table, Sequence[Table]] = None):
"""Creates a new table listener handle with dependencies.
Expand All @@ -333,7 +334,7 @@ def __init__(self, t: Table, listener: Union[Callable, TableListener], descripti
Args:
t (Table): table to listen to
listener (Union[Callable, TableListener]): listener for table changes
listener (Union[Callable[[TableUpdate, bool], None], TableListener]): listener for table changes
description (str, optional): description for the UpdatePerformanceTracker to append to the listener's entry
description, default is None
dependencies (Union[Table, Sequence[Table]]): tables that must be satisfied before the listener's execution.
Expand Down Expand Up @@ -410,7 +411,7 @@ def stop(self) -> None:
self.started = False


def listen(t: Table, listener: Union[Callable, TableListener], description: str = None, do_replay: bool = False,
def listen(t: Table, listener: Union[Callable[[TableUpdate, bool], None], TableListener], description: str = None, do_replay: bool = False,
replay_lock: Literal["shared", "exclusive"] = "shared", dependencies: Union[Table, Sequence[Table]] = None)\
-> TableListenerHandle:
"""This is a convenience function that creates a TableListenerHandle object and immediately starts it to listen
Expand All @@ -423,7 +424,7 @@ def listen(t: Table, listener: Union[Callable, TableListener], description: str
Args:
t (Table): table to listen to
listener (Union[Callable, TableListener]): listener for table changes
listener (Union[Callable[[TableUpdate, bool], None], TableListener]): listener for table changes
description (str, optional): description for the UpdatePerformanceTracker to append to the listener's entry
description, default is None
do_replay (bool): whether to replay the initial snapshot of the table, default is False
Expand Down Expand Up @@ -470,14 +471,6 @@ def __init__(self, table: Table):
self.j_listener_recorder = _JListenerRecorder("Python Wrapped Listener recorder", table.j_table, None)
self.table = table

def are_recorded_updates_valid(self) -> bool:
"""Checks if the recorded updates are valid for the current update cycle.
Returns:
a bool
"""
return self.j_listener_recorder.recordedVariablesAreValid()

def table_update(self) -> Optional[TableUpdate]:
"""Gets the table update from the listener recorder. If there is no update in the current update graph cycle,
returns None.
Expand Down Expand Up @@ -570,7 +563,7 @@ def start(self) -> None:
if self.started:
raise RuntimeError("Attempting to start an already started merged listener..")

with update_graph.shared_lock(self.listener_recorders[0].table.update_graph):
with update_graph.auto_locking_ctx(self.listener_recorders[0].table.update_graph):
for lr in self.listener_recorders:
lr.table.j_table.addUpdateListener(lr.j_listener_recorder)
self.started = True
Expand All @@ -580,7 +573,7 @@ def stop(self) -> None:
if not self.started:
return

with update_graph.shared_lock(self.listener_recorders[0].table.update_graph):
with update_graph.auto_locking_ctx(self.listener_recorders[0].table.update_graph):
for lr in self.listener_recorders:
lr.table.j_table.removeUpdateListener(lr.j_listener_recorder)
self.started = False
Expand Down

0 comments on commit 6abc8ac

Please sign in to comment.