Skip to content

Commit

Permalink
fix: post merge improvement on table listener code and docstring (#5951)
Browse files Browse the repository at this point in the history
Some defensive coding
Code/docstring readability improvement

---------

Co-authored-by: Chip Kent <[email protected]>
  • Loading branch information
jmao-denver and chipkent authored Aug 16, 2024
1 parent 903abba commit 2f6530b
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ protected void propagateErrorDownstream(boolean fromProcess, @NotNull Throwable
// and continue with the original exception.
log.error().append("Python on_error callback failed: ").append(e2).endl();
}
} else {
log.error().append("Python on_error callback is None: ").append(ExceptionUtils.getStackTrace(error)).endl();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ public void onFailureInternal(Throwable originalException, Entry sourceEntry) {
// and continue with the original exception.
log.error().append("Python on_error callback failed: ").append(e).endl();
}
} else {
log.error().append("Python on_error callback is None: ")
.append(ExceptionUtils.getStackTrace(originalException)).endl();
}
super.onFailureInternal(originalException, sourceEntry);
}
Expand Down
30 changes: 14 additions & 16 deletions py/server/deephaven/table_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
_JListenerRecorder = jpy.get_type("io.deephaven.engine.table.impl.ListenerRecorder")
_JPythonMergedListenerAdapter = jpy.get_type("io.deephaven.integrations.python.PythonMergedListenerAdapter")

_DEFAULT_ON_ERROR_CALLBACK = lambda e : print(f"An error occurred during table update processing: {e}")

class TableUpdate(JObjectWrapper):
"""A TableUpdate object represents a table update event. It contains the added, removed, and modified rows in the
Expand Down Expand Up @@ -202,12 +203,9 @@ def on_error(self, e: Exception) -> None:
Args:
e (Exception): the exception that occurred during the listener's execution.
"""
print(f"An error occurred during listener execution: {self}, {e}")
print(f"An error occurred during table update processing: {self}, {e}")


def _default_on_error(e: Exception) -> None:
print(f"An error occurred during listener execution: {e}")

def _listener_wrapper(table: Table):
"""A decorator to wrap a user listener function or on_update method to receive the numpy-converted Table updates.
Expand All @@ -225,7 +223,6 @@ def wrapper(update, *args):

return decorator


def _wrap_listener_func(t: Table, listener: Callable[[TableUpdate, bool], None]):
n_params = len(signature(listener).parameters)
if n_params != 2:
Expand All @@ -248,6 +245,7 @@ def wrapper(e):

return wrapper


class TableListenerHandle(JObjectWrapper):
"""A handle to manage a table listener's lifecycle."""
j_object_type = _JPythonReplayListenerAdapter
Expand Down Expand Up @@ -287,12 +285,11 @@ def __init__(self, t: Table, listener: Union[Callable[[TableUpdate, bool], None]
and then add the result tables as dependencies to the listener so that they can be safely read in it.
on_error (Callable[[Exception], None]): a callback function to be invoked when an error occurs during the
listener's execution. It should only be set when the listener is a function, not when it is an instance
of TableListener. Defaults to None. When None, a default callback function will be provided that simply
of TableListener. When the listener is a TableListener, TableListener.on_error will be used.
Defaults to None. When None, a default callback function will be provided that simply
prints out the received exception. If the callback function itself raises an exception, the new exception
will be logged in the Deephaven server log and will not be further processed by the server.
Raises:
DHError
"""
Expand All @@ -313,7 +310,7 @@ def __init__(self, t: Table, listener: Union[Callable[[TableUpdate, bool], None]
if on_error:
on_error_callback = _error_callback_wrapper(on_error)
else:
on_error_callback = _error_callback_wrapper(_default_on_error)
on_error_callback = _error_callback_wrapper(_DEFAULT_ON_ERROR_CALLBACK)
else:
raise DHError(message="listener is neither callable nor TableListener object")

Expand Down Expand Up @@ -390,11 +387,11 @@ def listen(t: Table, listener: Union[Callable[[TableUpdate, bool], None], TableL
and then add the result tables as dependencies to the listener so that they can be safely read in it.
on_error (Callable[[Exception], None]): a callback function to be invoked when an error occurs during the
listener's execution. It should only be set when the listener is a function, not when it is an instance
of TableListener. Defaults to None. When None, a default callback function will be provided that simply
of TableListener. When the listener is a TableListener, TableListener.on_error will be used.
Defaults to None. When None, a default callback function will be provided that simply
prints out the received exception. If the callback function itself raises an exception, the new exception
will be logged in the Deephaven server log and will not be further processed by the server.
Returns:
a TableListenerHandle
Expand Down Expand Up @@ -452,7 +449,7 @@ def on_error(self, e: Exception) -> None:
Args:
e (Exception): the exception that occurred during the listener's execution.
"""
print(f"An error occurred during listener execution: {self}, {e}")
print(f"An error occurred during table update processing: {self}, {e}")


class MergedListenerHandle(JObjectWrapper):
Expand Down Expand Up @@ -499,11 +496,11 @@ def __init__(self, tables: Sequence[Table], listener: Union[Callable[[Dict[Table
and then add the result tables as dependencies to the listener so that they can be safely read in it.
on_error (Callable[[Exception], None]): a callback function to be invoked when an error occurs during the
listener's execution. It should only be set when the listener is a function, not when it is an instance
of MergedListener. Defaults to None. When None, a default callback function will be provided that simply
of MergedListener. When the listener is a MergedListener, MergedListener.on_error will be used.
Defaults to None. When None, a default callback function will be provided that simply
prints out the received exception. If the callback function itself raises an exception, the new exception
will be logged in the Deephaven server log and will not be further processed by the server.
Raises:
DHError
"""
Expand All @@ -525,7 +522,7 @@ def __init__(self, tables: Sequence[Table], listener: Union[Callable[[Dict[Table
if on_error:
on_error_callback = _error_callback_wrapper(on_error)
else:
on_error_callback = _error_callback_wrapper(_default_on_error)
on_error_callback = _error_callback_wrapper(_DEFAULT_ON_ERROR_CALLBACK)
else:
raise DHError(message="listener is neither callable nor MergedListener object")

Expand Down Expand Up @@ -625,7 +622,8 @@ def merged_listen(tables: Sequence[Table], listener: Union[Callable[[Dict[Table,
and then add the result tables as dependencies to the listener so that they can be safely read in it.
on_error (Callable[[Exception], None]): a callback function to be invoked when an error occurs during the
listener's execution. It should only be set when the listener is a function, not when it is an instance
of MergedListener. Defaults to None. When None, a default callback function will be provided that simply
of MergedListener. When the listener is a MergedListener, MergedListener.on_error will be used.
Defaults to None. When None, a default callback function will be provided that simply
prints out the received exception. If the callback function itself raises an exception, the new exception
will be logged in the Deephaven server log and will not be further processed by the server.
"""
Expand Down

0 comments on commit 2f6530b

Please sign in to comment.