From e919d5223ff807d079a32ea1fbd16b7fc7e42200 Mon Sep 17 00:00:00 2001 From: Jianfeng Mao <4297243+jmao-denver@users.noreply.github.com> Date: Thu, 15 Aug 2024 17:07:31 -0600 Subject: [PATCH] feat: Support on-error callback for Py listeners (#5929) Fixes #5809 --------- Co-authored-by: Ryan Caudy --- Integrations/build.gradle | 1 + .../python/PythonMergedListenerAdapter.java | 39 ++- .../python/PythonReplayListenerAdapter.java | 58 +++- .../impl/InstrumentedTableListenerBase.java | 2 +- .../engine/table/impl/MergedListener.java | 12 + py/server/deephaven/table_listener.py | 111 +++++-- py/server/tests/test_table_listener.py | 282 ++++++++++++++++++ 7 files changed, 468 insertions(+), 37 deletions(-) diff --git a/Integrations/build.gradle b/Integrations/build.gradle index 99abec47535..ac16784d585 100644 --- a/Integrations/build.gradle +++ b/Integrations/build.gradle @@ -15,6 +15,7 @@ dependencies { implementation project(':plugin') implementation project(':Configuration') implementation project(':log-factory') + implementation libs.commons.lang3 testImplementation project(':engine-test-utils') testImplementation project(path: ':Base', configuration: 'tests') diff --git a/Integrations/src/main/java/io/deephaven/integrations/python/PythonMergedListenerAdapter.java b/Integrations/src/main/java/io/deephaven/integrations/python/PythonMergedListenerAdapter.java index 215aec0b81a..e498b60cf70 100644 --- a/Integrations/src/main/java/io/deephaven/integrations/python/PythonMergedListenerAdapter.java +++ b/Integrations/src/main/java/io/deephaven/integrations/python/PythonMergedListenerAdapter.java @@ -7,14 +7,18 @@ import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.rowset.RowSetShiftData; import io.deephaven.engine.table.ModifiedColumnSet; +import io.deephaven.engine.table.TableListener; import io.deephaven.engine.table.TableUpdate; import io.deephaven.engine.table.impl.ListenerRecorder; import io.deephaven.engine.table.impl.MergedListener; import io.deephaven.engine.table.impl.TableUpdateImpl; import io.deephaven.engine.updategraph.NotificationQueue; import io.deephaven.engine.updategraph.UpdateGraph; +import io.deephaven.internal.log.LoggerFactory; +import io.deephaven.io.logger.Logger; import io.deephaven.util.SafeCloseable; import io.deephaven.util.annotations.ScriptApi; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jpy.PyObject; @@ -33,7 +37,10 @@ */ @ScriptApi public class PythonMergedListenerAdapter extends MergedListener { - private final PyObject pyCallable; + private static final Logger log = LoggerFactory.getLogger(PythonMergedListenerAdapter.class); + + private final PyObject pyListenerCallable; + private final PyObject pyOnFailureCallback; /** * Create a Python merged listener. @@ -42,23 +49,26 @@ public class PythonMergedListenerAdapter extends MergedListener { * @param dependencies The tables that must be satisfied before this listener is executed. * @param listenerDescription A description for the UpdatePerformanceTracker to append to its entry description, may * be null. - * @param pyObjectIn Python listener object. + * @param pyListener Python listener object. */ private PythonMergedListenerAdapter( @NotNull ListenerRecorder[] recorders, @Nullable NotificationQueue.Dependency[] dependencies, @Nullable String listenerDescription, - @NotNull PyObject pyObjectIn) { + @NotNull PyObject pyListener, + @NotNull PyObject pyOnFailureCallback) { super(Arrays.asList(recorders), Arrays.asList(dependencies), listenerDescription, null); Arrays.stream(recorders).forEach(rec -> rec.setMergedListener(this)); - this.pyCallable = PythonUtils.pyMergeListenerFunc(pyObjectIn); + this.pyListenerCallable = PythonUtils.pyMergeListenerFunc(Objects.requireNonNull(pyListener)); + this.pyOnFailureCallback = Objects.requireNonNull(pyOnFailureCallback); } public static PythonMergedListenerAdapter create( @NotNull ListenerRecorder[] recorders, @Nullable NotificationQueue.Dependency[] dependencies, @Nullable String listenerDescription, - @NotNull PyObject pyObjectIn) { + @NotNull PyObject pyListener, + @NotNull PyObject pyOnFailureCallback) { if (recorders.length < 2) { throw new IllegalArgumentException("At least two listener recorders must be provided"); } @@ -71,7 +81,8 @@ public static PythonMergedListenerAdapter create( final UpdateGraph updateGraph = allItems[0].getUpdateGraph(allItems); try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) { - return new PythonMergedListenerAdapter(recorders, dependencies, listenerDescription, pyObjectIn); + return new PythonMergedListenerAdapter(recorders, dependencies, listenerDescription, pyListener, + pyOnFailureCallback); } } @@ -91,6 +102,20 @@ public ArrayList currentRowsAsUpdates() { @Override protected void process() { - pyCallable.call("__call__"); + pyListenerCallable.call("__call__"); + } + + @Override + protected void propagateErrorDownstream(boolean fromProcess, @NotNull Throwable error, + TableListener.@Nullable Entry entry) { + if (!pyOnFailureCallback.isNone()) { + try { + pyOnFailureCallback.call("__call__", ExceptionUtils.getStackTrace(error)); + } catch (Exception e2) { + // If the Python onFailure callback fails, log the new exception + // and continue with the original exception. + log.error().append("Python on_error callback failed: ").append(e2).endl(); + } + } } } diff --git a/Integrations/src/main/java/io/deephaven/integrations/python/PythonReplayListenerAdapter.java b/Integrations/src/main/java/io/deephaven/integrations/python/PythonReplayListenerAdapter.java index b6eae8bcda5..e713044047a 100644 --- a/Integrations/src/main/java/io/deephaven/integrations/python/PythonReplayListenerAdapter.java +++ b/Integrations/src/main/java/io/deephaven/integrations/python/PythonReplayListenerAdapter.java @@ -3,6 +3,7 @@ // package io.deephaven.integrations.python; +import org.apache.commons.lang3.exception.ExceptionUtils; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableUpdate; @@ -14,12 +15,16 @@ import io.deephaven.engine.rowset.RowSetShiftData; import io.deephaven.engine.updategraph.NotificationQueue; import io.deephaven.engine.updategraph.UpdateGraph; +import io.deephaven.internal.log.LoggerFactory; +import io.deephaven.io.logger.Logger; import io.deephaven.util.SafeCloseable; import io.deephaven.util.annotations.ScriptApi; +import org.jetbrains.annotations.NotNull; import org.jpy.PyObject; import javax.annotation.Nullable; import java.util.Arrays; +import java.util.Objects; /** @@ -33,7 +38,10 @@ public class PythonReplayListenerAdapter extends InstrumentedTableUpdateListenerAdapter implements TableSnapshotReplayer { private static final long serialVersionUID = -8882402061960621245L; - private final PyObject pyCallable; + private static final Logger log = LoggerFactory.getLogger(PythonReplayListenerAdapter.class); + + private final PyObject pyListenerCallable; + private final PyObject pyOnFailureCallback; private final NotificationQueue.Dependency[] dependencies; /** @@ -43,22 +51,34 @@ public class PythonReplayListenerAdapter extends InstrumentedTableUpdateListener * null. * @param source The source table to which this listener will subscribe. * @param retain Whether a hard reference to this listener should be maintained to prevent it from being collected. - * @param pyObjectIn Python listener object. + * @param pyListener Python listener object. * @param dependencies The tables that must be satisfied before this listener is executed. */ - public static PythonReplayListenerAdapter create(@Nullable String description, Table source, boolean retain, - PyObject pyObjectIn, NotificationQueue.Dependency... dependencies) { + public static PythonReplayListenerAdapter create( + @Nullable String description, + @NotNull Table source, + boolean retain, + @NotNull PyObject pyListener, + @NotNull PyObject pyOnFailureCallback, + @Nullable NotificationQueue.Dependency... dependencies) { final UpdateGraph updateGraph = source.getUpdateGraph(dependencies); try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) { - return new PythonReplayListenerAdapter(description, source, retain, pyObjectIn, dependencies); + return new PythonReplayListenerAdapter(description, source, retain, pyListener, pyOnFailureCallback, + dependencies); } } - private PythonReplayListenerAdapter(@Nullable String description, Table source, boolean retain, PyObject pyObjectIn, - NotificationQueue.Dependency... dependencies) { + private PythonReplayListenerAdapter( + @Nullable String description, + @NotNull Table source, + boolean retain, + @NotNull PyObject pyListener, + @NotNull PyObject pyOnFailureCallback, + @Nullable NotificationQueue.Dependency... dependencies) { super(description, source, retain); this.dependencies = dependencies; - this.pyCallable = PythonUtils.pyListenerFunc(pyObjectIn); + this.pyListenerCallable = PythonUtils.pyListenerFunc(Objects.requireNonNull(pyListener)); + this.pyOnFailureCallback = Objects.requireNonNull(pyOnFailureCallback); } @Override @@ -69,13 +89,27 @@ public void replay() { final TableUpdate update = new TableUpdateImpl(source.getRowSet(), emptyRowSet, emptyRowSet, emptyShift, emptyColumnSet); final boolean isReplay = true; - pyCallable.call("__call__", update, isReplay); + pyListenerCallable.call("__call__", update, isReplay); } @Override public void onUpdate(final TableUpdate update) { final boolean isReplay = false; - pyCallable.call("__call__", update, isReplay); + pyListenerCallable.call("__call__", update, isReplay); + } + + @Override + public void onFailureInternal(Throwable originalException, Entry sourceEntry) { + if (!pyOnFailureCallback.isNone()) { + try { + pyOnFailureCallback.call("__call__", ExceptionUtils.getStackTrace(originalException)); + } catch (Throwable e) { + // If the Python onFailure callback fails, log the new exception + // and continue with the original exception. + log.error().append("Python on_error callback failed: ").append(e).endl(); + } + } + super.onFailureInternal(originalException, sourceEntry); } @Override @@ -83,4 +117,8 @@ public boolean canExecute(final long step) { return super.canExecute(step) && (dependencies.length == 0 || Arrays.stream(dependencies).allMatch(t -> t.satisfied(step))); } + + public boolean isFailed() { + return failed; + } } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/InstrumentedTableListenerBase.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/InstrumentedTableListenerBase.java index acdf3605ac6..c14d5eaf20f 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/InstrumentedTableListenerBase.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/InstrumentedTableListenerBase.java @@ -50,7 +50,7 @@ public abstract class InstrumentedTableListenerBase extends LivenessArtifact private final PerformanceEntry entry; private final boolean terminalListener; - private boolean failed = false; + protected boolean failed = false; private static volatile boolean verboseLogging = Configuration .getInstance() .getBooleanWithDefault("InstrumentedTableListenerBase.verboseLogging", false); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/MergedListener.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/MergedListener.java index 3a5b39e898e..6b25ddfbce0 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/MergedListener.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/MergedListener.java @@ -57,6 +57,9 @@ public abstract class MergedListener extends LivenessArtifact implements Notific protected final PerformanceEntry entry; private final String logPrefix; + private boolean failed; + + @SuppressWarnings("FieldMayBeFinal") private volatile long lastCompletedStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP; private volatile long lastEnqueuedStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP; @@ -96,6 +99,10 @@ protected Iterable getRecorders() { return recorders; } + public boolean isFailed() { + return failed; + } + public final void notifyOnUpstreamError( @NotNull final Throwable upstreamError, @Nullable final TableListener.Entry errorSourceEntry) { notifyInternal(upstreamError, errorSourceEntry); @@ -107,6 +114,10 @@ public void notifyChanges() { private void notifyInternal(@Nullable final Throwable upstreamError, @Nullable final TableListener.Entry errorSourceEntry) { + if (failed) { + return; + } + final long currentStep = getUpdateGraph().clock().currentStep(); synchronized (this) { @@ -150,6 +161,7 @@ protected void propagateError( final boolean uncaughtExceptionFromProcess, @NotNull final Throwable error, @Nullable final TableListener.Entry entry) { + failed = true; forceReferenceCountToZero(); propagateErrorDownstream(uncaughtExceptionFromProcess, error, entry); try { diff --git a/py/server/deephaven/table_listener.py b/py/server/deephaven/table_listener.py index b8d406673d7..1ad4f809eab 100644 --- a/py/server/deephaven/table_listener.py +++ b/py/server/deephaven/table_listener.py @@ -6,7 +6,7 @@ from abc import ABC, abstractmethod from functools import wraps from inspect import signature -from typing import Callable, Union, List, Generator, Dict, Literal, Sequence, Optional +from typing import Callable, Union, List, Generator, Dict, Sequence, Optional import jpy import numpy @@ -17,13 +17,13 @@ from deephaven.jcompat import to_sequence, j_list_to_list from deephaven.table import Table from deephaven._table_reader import _table_reader_all_dict, _table_reader_chunk_dict -from deephaven.update_graph import UpdateGraph _JPythonReplayListenerAdapter = jpy.get_type("io.deephaven.integrations.python.PythonReplayListenerAdapter") _JTableUpdate = jpy.get_type("io.deephaven.engine.table.TableUpdate") _JListenerRecorder = jpy.get_type("io.deephaven.engine.table.impl.ListenerRecorder") _JPythonMergedListenerAdapter = jpy.get_type("io.deephaven.integrations.python.PythonMergedListenerAdapter") + class TableUpdate(JObjectWrapper): """A TableUpdate object represents a table update event. It contains the added, removed, and modified rows in the table. """ @@ -188,13 +188,25 @@ def modified_columns(self) -> List[str]: class TableListener(ABC): - """An abstract table listener class that should be subclassed by any user table listener class.""" + """An abstract table listener class that should be subclassed by any user table listener class. It provides a + default implementation for the on_error method that simply prints out the error.""" @abstractmethod def on_update(self, update: TableUpdate, is_replay: bool) -> None: """The required method on a listener object that receives table updates.""" ... + def on_error(self, e: Exception) -> None: + """The callback method on a listener object that handles the received error. The default implementation simply prints the error. + + Args: + e (Exception): the exception that occurred during the listener's execution. + """ + print(f"An error occurred during listener execution: {self}, {e}") + + +def _default_on_error(e: Exception) -> None: + print(f"An error occurred during listener execution: {e}") def _listener_wrapper(table: Table): """A decorator to wrap a user listener function or on_update method to receive the numpy-converted Table updates. @@ -229,17 +241,25 @@ def _wrap_listener_obj(t: Table, listener: TableListener): return listener +def _error_callback_wrapper(callback: Callable[[Exception], None]): + @wraps(callback) + def wrapper(e): + callback(RuntimeError(e)) + + return wrapper + class TableListenerHandle(JObjectWrapper): """A handle to manage a table listener's lifecycle.""" j_object_type = _JPythonReplayListenerAdapter def __init__(self, t: Table, listener: Union[Callable[[TableUpdate, bool], None], TableListener], description: str = None, - dependencies: Union[Table, Sequence[Table]] = None): + dependencies: Union[Table, Sequence[Table]] = None, on_error: Callable[[Exception], None] = None): """Creates a new table listener handle with dependencies. Table change events are processed by 'listener', which can be either (1) a callable (e.g. function) or - (2) an instance of TableListener type which provides an "on_update" method. + (2) an instance of a TableListener subclass that must override the abstract "on_update" method, and optionally + override the default "on_error" method. The callable or the on_update method must have the following signatures. * (update: TableUpdate, is_replay: bool): support replaying the initial table snapshot and normal table updates @@ -265,6 +285,13 @@ def __init__(self, t: Table, listener: Union[Callable[[TableUpdate, bool], None] the listener is safe, it is not recommended because reading or operating on the result tables of those operations may not be safe. It is best to perform the operations on the dependent tables beforehand, and then add the result tables as dependencies to the listener so that they can be safely read in it. + on_error (Callable[[Exception], None]): a callback function to be invoked when an error occurs during the + listener's execution. It should only be set when the listener is a function, not when it is an instance + of TableListener. Defaults to None. When None, a default callback function will be provided that simply + prints out the received exception. If the callback function itself raises an exception, the new exception + will be logged in the Deephaven server log and will not be further processed by the server. + + Raises: DHError @@ -277,14 +304,23 @@ def __init__(self, t: Table, listener: Union[Callable[[TableUpdate, bool], None] self.dependencies = to_sequence(dependencies) if isinstance(listener, TableListener): + if on_error: + raise DHError(message="Invalid on_error argument for listeners of TableListener type which already have an on_error method.") self.listener_wrapped = _wrap_listener_obj(t, listener) + on_error_callback = _error_callback_wrapper(listener.on_error) elif callable(listener): self.listener_wrapped = _wrap_listener_func(t, listener) + if on_error: + on_error_callback = _error_callback_wrapper(on_error) + else: + on_error_callback = _error_callback_wrapper(_default_on_error) else: raise DHError(message="listener is neither callable nor TableListener object") try: - self.listener_adapter = _JPythonReplayListenerAdapter.create(description, t.j_table, False, self.listener_wrapped, self.dependencies) + self.listener_adapter = _JPythonReplayListenerAdapter.create(description, t.j_table, False, + self.listener_wrapped, on_error_callback, + self.dependencies) except Exception as e: raise DHError(e, "failed to create a table listener.") from e self.started = False @@ -326,7 +362,7 @@ def stop(self) -> None: def listen(t: Table, listener: Union[Callable[[TableUpdate, bool], None], TableListener], description: str = None, do_replay: bool = False, - dependencies: Union[Table, Sequence[Table]] = None) -> TableListenerHandle: + dependencies: Union[Table, Sequence[Table]] = None, on_error: Callable[[Exception], None] = None) -> TableListenerHandle: """This is a convenience function that creates a TableListenerHandle object and immediately starts it to listen for table updates. @@ -352,6 +388,12 @@ def listen(t: Table, listener: Union[Callable[[TableUpdate, bool], None], TableL the listener is safe, it is not recommended because reading or operating on the result tables of those operations may not be safe. It is best to perform the operations on the dependent tables beforehand, and then add the result tables as dependencies to the listener so that they can be safely read in it. + on_error (Callable[[Exception], None]): a callback function to be invoked when an error occurs during the + listener's execution. It should only be set when the listener is a function, not when it is an instance + of TableListener. Defaults to None. When None, a default callback function will be provided that simply + prints out the received exception. If the callback function itself raises an exception, the new exception + will be logged in the Deephaven server log and will not be further processed by the server. + Returns: a TableListenerHandle @@ -359,8 +401,8 @@ def listen(t: Table, listener: Union[Callable[[TableUpdate, bool], None], TableL Raises: DHError """ - table_listener_handle = TableListenerHandle(t=t, dependencies=dependencies, listener=listener, - description=description) + table_listener_handle = TableListenerHandle(t=t, listener=listener, description=description, + dependencies=dependencies, on_error=on_error) table_listener_handle.start(do_replay=do_replay) return table_listener_handle @@ -394,7 +436,8 @@ def table_update(self) -> Optional[TableUpdate]: class MergedListener(ABC): - """An abstract multi-table listener class that should be subclassed by any user multi-table listener class.""" + """An abstract multi-table listener class that should be subclassed by any user multi-table listener class. It + provides a default implementation for the on_error method that simply prints out the error.""" @abstractmethod def on_update(self, updates: Dict[Table, TableUpdate], is_replay: bool) -> None: @@ -403,6 +446,14 @@ def on_update(self, updates: Dict[Table, TableUpdate], is_replay: bool) -> None: """ ... + def on_error(self, e: Exception) -> None: + """ The callback method on a listener object that handles the received error. The default implementation simply prints the error. + + Args: + e (Exception): the exception that occurred during the listener's execution. + """ + print(f"An error occurred during listener execution: {self}, {e}") + class MergedListenerHandle(JObjectWrapper): """A handle to manage a merged listener's lifecycle.""" @@ -413,12 +464,14 @@ def j_object(self) -> jpy.JType: return self.merged_listener_adapter def __init__(self, tables: Sequence[Table], listener: Union[Callable[[Dict[Table, TableUpdate], bool], None], MergedListener], - description: str = None, dependencies: Union[Table, Sequence[Table]] = None): + description: str = None, dependencies: Union[Table, Sequence[Table]] = None, on_error: Callable[[Exception], None] = None): """Creates a new MergedListenerHandle with the provided listener recorders and dependencies. Table change events are processed by 'listener', which can be either (1) a callable (e.g. function) or - (2) an instance of MergedListener type which provides an "on_update" method. + (2) an instance of a MergedListener subclass that must override the abstract "on_update" method, and optionally + override the default "on_error" method. + The callable or the on_update method must have the following signature. *(updates: Dict[Table, TableUpdate], is_replay: bool): support replaying the initial table snapshots and normal table updates The 'updates' parameter is a dictionary of Table to TableUpdate; @@ -444,6 +497,12 @@ def __init__(self, tables: Sequence[Table], listener: Union[Callable[[Dict[Table the listener is safe, it is not recommended because reading or operating on the result tables of those operations may not be safe. It is best to perform the operations on the dependent tables beforehand, and then add the result tables as dependencies to the listener so that they can be safely read in it. + on_error (Callable[[Exception], None]): a callback function to be invoked when an error occurs during the + listener's execution. It should only be set when the listener is a function, not when it is an instance + of MergedListener. Defaults to None. When None, a default callback function will be provided that simply + prints out the received exception. If the callback function itself raises an exception, the new exception + will be logged in the Deephaven server log and will not be further processed by the server. + Raises: DHError @@ -457,20 +516,30 @@ def __init__(self, tables: Sequence[Table], listener: Union[Callable[[Dict[Table self.dependencies = dependencies if isinstance(listener, MergedListener): + if on_error: + raise DHError(message="Invalid on_error argument for listeners of MergedListener type which already have an on_error method.") self.listener = listener.on_update - else: + on_error_callback = _error_callback_wrapper(listener.on_error) + elif callable(listener): self.listener = listener + if on_error: + on_error_callback = _error_callback_wrapper(on_error) + else: + on_error_callback = _error_callback_wrapper(_default_on_error) + else: + raise DHError(message="listener is neither callable nor MergedListener object") + n_params = len(signature(self.listener).parameters) if n_params != 2: raise ValueError("merged listener function must have 2 parameters (updates, is_replay).") - try: self.merged_listener_adapter = _JPythonMergedListenerAdapter.create( to_sequence(self.listener_recorders), to_sequence(self.dependencies), description, - self) + self, + on_error_callback) self.started = False except Exception as e: raise DHError(e, "failed to create a merged listener adapter.") from e @@ -512,7 +581,6 @@ def start(self, do_replay: bool = False) -> None: self.started = True - def stop(self) -> None: """Stop the listener.""" if not self.started: @@ -527,8 +595,8 @@ def stop(self) -> None: def merged_listen(tables: Sequence[Table], listener: Union[Callable[[Dict[Table, TableUpdate]], None], MergedListener], - do_replay: bool = False, description: str = None, dependencies: Union[Table, Sequence[Table]] = None)\ - -> MergedListenerHandle: + do_replay: bool = False, description: str = None, dependencies: Union[Table, Sequence[Table]] = None, + on_error: Callable[[Exception], None] = None) -> MergedListenerHandle: """This is a convenience function that creates a MergedListenerHandle object and immediately starts it to listen for table updates. @@ -555,8 +623,13 @@ def merged_listen(tables: Sequence[Table], listener: Union[Callable[[Dict[Table, the listener is safe, it is not recommended because reading or operating on the result tables of those operations may not be safe. It is best to perform the operations on the dependent tables beforehand, and then add the result tables as dependencies to the listener so that they can be safely read in it. + on_error (Callable[[Exception], None]): a callback function to be invoked when an error occurs during the + listener's execution. It should only be set when the listener is a function, not when it is an instance + of MergedListener. Defaults to None. When None, a default callback function will be provided that simply + prints out the received exception. If the callback function itself raises an exception, the new exception + will be logged in the Deephaven server log and will not be further processed by the server. """ merged_listener_handle = MergedListenerHandle(tables=tables, listener=listener, - description=description, dependencies=dependencies) + description=description, dependencies=dependencies, on_error=on_error) merged_listener_handle.start(do_replay=do_replay) return merged_listener_handle diff --git a/py/server/tests/test_table_listener.py b/py/server/tests/test_table_listener.py index 48915a9c277..e0e6cadb64e 100644 --- a/py/server/tests/test_table_listener.py +++ b/py/server/tests/test_table_listener.py @@ -22,6 +22,7 @@ _JColumnVectors = jpy.get_type("io.deephaven.engine.table.vectors.ColumnVectors") + class TableUpdateRecorder: def __init__(self, table: Optional[Table] = None, chunk_size: int = None, cols: Union[str, List[str]] = None): self.table = table @@ -266,6 +267,8 @@ def on_update(self, update, is_replay): has_added=True, has_removed=True, has_modified=False) self.assertTrue(all([len(ja) > 0 for ja in j_arrays])) + dep_table = dep_table_2 = None + def test_listener_func_with_deps(self): cols = [ @@ -362,6 +365,8 @@ def on_update(self, updates: Dict[Table, TableUpdate], is_replay: bool) -> None: mlh.stop() self.assertGreaterEqual(len(tur.replays), 6) + t1 = t2 = t3 = None + def test_merged_listener_func(self): t1 = time_table("PT1s").update(["X=i % 11"]) t2 = time_table("PT2s").update(["Y=i % 8"]) @@ -392,6 +397,8 @@ def test_ml_func(updates: Dict[Table, TableUpdate], is_replay: bool) -> None: mlh.stop() self.assertGreaterEqual(len(tur.replays), 6) + t1 = t2 = t3 = None + def test_merged_listener_with_deps(self): t1 = time_table("PT1s").update(["X=i % 11"]) t2 = time_table("PT2s").update(["Y=i % 8"]) @@ -422,6 +429,8 @@ def on_update(self, updates: Dict[Table, TableUpdate], is_replay: bool) -> None: self.assertGreaterEqual(len(tur.replays), 6) self.assertTrue(len(j_arrays) > 0 and all([len(ja) > 0 for ja in j_arrays])) + t1 = t2 = t3 = None + def test_merged_listener_error(self): t1 = time_table("PT1s").update(["X=i % 11"]) @@ -437,6 +446,8 @@ def test_ml_func(updates: Dict[Table, TableUpdate]) -> None: mlh = merged_listen([t1, et], test_ml_func) self.assertIn("must be a refreshing table", str(cm.exception)) + t1 = et = None + def test_merged_listener_replay(self): t1 = time_table("PT1s").update(["X=i % 11"]) t2 = time_table("PT2s").update(["Y=i % 8"]) @@ -477,6 +488,277 @@ def test_ml_func(updates: Dict[Table, TableUpdate], is_replay: bool) -> None: self.assertGreaterEqual(len(tur.replays), 6) self.assertEqual(tur.replays.count(True), 2) + t1 = t2 = t3 = None + + def test_on_error_listener_func(self): + t = time_table("PT1S").update("X = i") + with self.subTest("Bad Listener Good Error Callback"): + def bad_listner_func(table_udpate, is_replay: bool) -> None: + raise ValueError("invalid value") + + def on_error(e: Exception) -> None: + nonlocal error_caught + error_caught = True + self.assertIn("invalid value", str(e)) + + error_caught = False + tlh = listen(t, bad_listner_func, on_error=on_error) + t.await_update() + self.assertTrue(error_caught) + self.assertTrue(tlh.j_object.isFailed()) + + with self.subTest("Good Listener Good Error Callback"): + def good_listner_func(table_udpate, is_replay: bool) -> None: + pass + + error_caught = False + tlh = listen(t, good_listner_func, on_error=on_error) + t.await_update() + self.assertFalse(error_caught) + self.assertFalse(tlh.j_object.isFailed()) + + with self.subTest("Bad Listener Bad Error Callback"): + error_caught: bool = False + + def bad_listner_func(table_udpate, is_replay: bool) -> None: + raise ValueError("invalid value") + + def on_error(e: Exception) -> None: + nonlocal error_caught + error_caught = True + self.assertIn("invalid value", str(e)) + raise ValueError("reraise the exception") from e + + tlh = listen(t, bad_listner_func, on_error=on_error) + t.await_update() + self.assertTrue(error_caught) + self.assertTrue(tlh.j_object.isFailed()) + + t = None + + def test_on_error_listener_obj(self): + test_self = self + t = time_table("PT1S").update("X = i") + + with self.subTest("Bad Listener Good Error Callback"): + class BadListener(TableListener): + def on_update(self, update: TableUpdate, is_replay: bool) -> None: + raise ValueError("invalid value") + + def on_error(self, e: Exception) -> None: + nonlocal error_caught + error_caught = True + test_self.assertIn("invalid value", str(e)) + + error_caught = False + bad_listener_obj = BadListener() + tlh = listen(t, bad_listener_obj) + t.await_update() + self.assertTrue(error_caught) + self.assertTrue(tlh.j_object.isFailed()) + + with self.assertRaises(DHError): + def on_error(e: Exception) -> None: + ... + tlh = listen(t, bad_listener_obj, on_error=on_error) + + with self.subTest("Good Listener Good Error Callback"): + class GoodListener(TableListener): + def on_update(self, update: TableUpdate, is_replay: bool) -> None: + ... + + def on_error(self, e: Exception) -> None: + nonlocal error_caught + error_caught = True + test_self.assertIn("invalid value", str(e)) + + error_caught = False + good_listener_obj = GoodListener() + tlh = listen(t, good_listener_obj) + t.await_update() + self.assertFalse(error_caught) + self.assertFalse(tlh.j_object.isFailed()) + + with self.subTest("Bad Listener Bad Error Callback"): + class GoodListener(TableListener): + def on_update(self, update: TableUpdate, is_replay: bool) -> None: + raise ValueError("invalid value") + + def on_error(self, e: Exception) -> None: + nonlocal error_caught + error_caught = True + test_self.assertIn("invalid value", str(e)) + raise ValueError("reraise the exception") from e + + error_caught = False + + good_listener_obj = GoodListener() + tlh = listen(t, good_listener_obj) + t.await_update() + self.assertTrue(error_caught) + self.assertTrue(tlh.j_object.isFailed()) + + t = None + + def test_on_error_merged_listener_func(self): + t1 = time_table("PT1s").update(["X=i % 11"]) + t2 = time_table("PT2s").update(["Y=i % 8"]) + t3 = time_table("PT3s").update(["Z=i % 5"]) + + with self.subTest("Bad Listener Good Error Callback"): + def bad_listner_func(updates: Dict[Table, TableUpdate], is_replay: bool) -> None: + raise ValueError("invalid value") + + def on_error(e: Exception) -> None: + nonlocal error_caught + error_caught = True + self.assertIn("invalid value", str(e)) + + error_caught = False + mlh = merged_listen([t1, t2, t3], bad_listner_func, on_error=on_error) + t1.await_update() + self.assertTrue(error_caught) + self.assertTrue(mlh.j_object.isFailed()) + + with self.subTest("Good Listener Good Error Callback"): + def good_listner_func(updates: Dict[Table, TableUpdate], is_replay: bool) -> None: + pass + + error_caught = False + mlh = merged_listen([t1, t2, t3], good_listner_func, on_error=on_error) + t1.await_update() + self.assertFalse(error_caught) + self.assertFalse(mlh.j_object.isFailed()) + + with self.subTest("Bad Listener Bad Error Callback"): + def bad_listner_func(updates: Dict[Table, TableUpdate], is_replay: bool) -> None: + raise ValueError("invalid value") + + def bad_on_error(e: Exception) -> None: + nonlocal error_caught + error_caught = True + self.assertIn("invalid value", str(e)) + raise ValueError("reraise the exception") from e + + error_caught = False + mlh = merged_listen([t1, t2, t3], bad_listner_func, on_error=bad_on_error) + t1.await_update() + self.assertTrue(error_caught) + self.assertTrue(mlh.j_object.isFailed()) + + t1 = t2 = t3 = None + + def test_on_error_merged_listener_obj(self): + test_self = self + t1 = time_table("PT1s").update(["X=i % 11"]) + t2 = time_table("PT2s").update(["Y=i % 8"]) + t3 = time_table("PT3s").update(["Z=i % 5"]) + + with self.subTest("Bad Listener Good Error Callback"): + class BadListener(MergedListener): + def on_update(self, updates: Dict[Table, TableUpdate], is_replay: bool) -> None: + raise ValueError("invalid value") + + def on_error(self, e: Exception) -> None: + nonlocal error_caught + error_caught = True + test_self.assertIn("invalid value", str(e)) + + error_caught = False + bad_listener_obj = BadListener() + mlh = merged_listen([t1, t2, t3], bad_listener_obj) + t1.await_update() + self.assertTrue(error_caught) + self.assertTrue(mlh.j_object.isFailed()) + + with self.assertRaises(DHError): + def on_error(e: Exception) -> None: + ... + tlh = merged_listen([t1, t2, t3], bad_listener_obj, on_error=on_error) + + + with self.subTest("Good Listener Good Error Callback"): + class GoodListener(MergedListener): + def on_update(self, updates: Dict[Table, TableUpdate], is_replay: bool) -> None: + ... + + def on_error(self, e: Exception) -> None: + nonlocal error_caught + error_caught = True + test_self.assertIn("invalid value", str(e)) + + error_caught = False + good_listener_obj = GoodListener() + mlh = merged_listen([t1, t2, t3], good_listener_obj) + t1.await_update() + self.assertFalse(error_caught) + self.assertFalse(mlh.j_object.isFailed()) + + with self.subTest("Bad Listener Bad Error Callback"): + class BadListener(MergedListener): + def on_update(self, updates: Dict[Table, TableUpdate], is_replay: bool) -> None: + raise ValueError("invalid value") + + def on_error(self, e: Exception) -> None: + nonlocal error_caught + error_caught = True + test_self.assertIn("invalid value", str(e)) + raise ValueError("reraise the exception") from e + + error_caught = False + bad_listener_obj = BadListener() + mlh = merged_listen([t1, t2, t3], bad_listener_obj) + t1.await_update() + self.assertTrue(error_caught) + self.assertTrue(mlh.j_object.isFailed()) + + t1 = t2 = t3 = None + + def test_default_on_error(self): + t = time_table("PT1S").update("X = i") + + def bad_listner_func(table_udpate, is_replay: bool) -> None: + raise ValueError("invalid value") + + error_caught = False + tlh = listen(t, bad_listner_func) + t.await_update() + # the default on_error only logs the error + self.assertFalse(error_caught) + self.assertTrue(tlh.j_object.isFailed()) + + class BadListener(TableListener): + def on_update(self, update, is_replay): + raise ValueError("invalid value") + + tlh = listen(t, BadListener()) + t.await_update() + # the default on_error only logs the error + self.assertFalse(error_caught) + self.assertTrue(tlh.j_object.isFailed()) + + t2 = time_table("PT1S").update("X = i") + def bad_listner_func(updates: Dict[Table, TableUpdate], is_replay: bool) -> None: + raise ValueError("invalid value") + + mlh = merged_listen([t, t2], bad_listner_func) + t.await_update() + # the default on_error only logs the error + self.assertFalse(error_caught) + self.assertTrue(mlh.j_object.isFailed()) + + class BadListener(MergedListener): + def on_update(self, updates: Dict[Table, TableUpdate], is_replay: bool) -> None: + raise ValueError("invalid value") + + mlh = merged_listen([t, t2], BadListener()) + t.await_update() + # the default on_error only logs the error + self.assertFalse(error_caught) + self.assertTrue(mlh.j_object.isFailed()) + + t = t2 = None + if __name__ == "__main__": unittest.main()