Skip to content

Commit

Permalink
feat: Support on-error callback for Py listeners (#5929)
Browse files Browse the repository at this point in the history
Fixes #5809

---------

Co-authored-by: Ryan Caudy <[email protected]>
  • Loading branch information
jmao-denver and rcaudy authored Aug 15, 2024
1 parent 07786ef commit e919d52
Show file tree
Hide file tree
Showing 7 changed files with 468 additions and 37 deletions.
1 change: 1 addition & 0 deletions Integrations/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies {
implementation project(':plugin')
implementation project(':Configuration')
implementation project(':log-factory')
implementation libs.commons.lang3

testImplementation project(':engine-test-utils')
testImplementation project(path: ':Base', configuration: 'tests')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.table.ModifiedColumnSet;
import io.deephaven.engine.table.TableListener;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.impl.ListenerRecorder;
import io.deephaven.engine.table.impl.MergedListener;
import io.deephaven.engine.table.impl.TableUpdateImpl;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.ScriptApi;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jpy.PyObject;
Expand All @@ -33,7 +37,10 @@
*/
@ScriptApi
public class PythonMergedListenerAdapter extends MergedListener {
private final PyObject pyCallable;
private static final Logger log = LoggerFactory.getLogger(PythonMergedListenerAdapter.class);

private final PyObject pyListenerCallable;
private final PyObject pyOnFailureCallback;

/**
* Create a Python merged listener.
Expand All @@ -42,23 +49,26 @@ public class PythonMergedListenerAdapter extends MergedListener {
* @param dependencies The tables that must be satisfied before this listener is executed.
* @param listenerDescription A description for the UpdatePerformanceTracker to append to its entry description, may
* be null.
* @param pyObjectIn Python listener object.
* @param pyListener Python listener object.
*/
private PythonMergedListenerAdapter(
@NotNull ListenerRecorder[] recorders,
@Nullable NotificationQueue.Dependency[] dependencies,
@Nullable String listenerDescription,
@NotNull PyObject pyObjectIn) {
@NotNull PyObject pyListener,
@NotNull PyObject pyOnFailureCallback) {
super(Arrays.asList(recorders), Arrays.asList(dependencies), listenerDescription, null);
Arrays.stream(recorders).forEach(rec -> rec.setMergedListener(this));
this.pyCallable = PythonUtils.pyMergeListenerFunc(pyObjectIn);
this.pyListenerCallable = PythonUtils.pyMergeListenerFunc(Objects.requireNonNull(pyListener));
this.pyOnFailureCallback = Objects.requireNonNull(pyOnFailureCallback);
}

public static PythonMergedListenerAdapter create(
@NotNull ListenerRecorder[] recorders,
@Nullable NotificationQueue.Dependency[] dependencies,
@Nullable String listenerDescription,
@NotNull PyObject pyObjectIn) {
@NotNull PyObject pyListener,
@NotNull PyObject pyOnFailureCallback) {
if (recorders.length < 2) {
throw new IllegalArgumentException("At least two listener recorders must be provided");
}
Expand All @@ -71,7 +81,8 @@ public static PythonMergedListenerAdapter create(
final UpdateGraph updateGraph = allItems[0].getUpdateGraph(allItems);

try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) {
return new PythonMergedListenerAdapter(recorders, dependencies, listenerDescription, pyObjectIn);
return new PythonMergedListenerAdapter(recorders, dependencies, listenerDescription, pyListener,
pyOnFailureCallback);
}
}

Expand All @@ -91,6 +102,20 @@ public ArrayList<TableUpdate> currentRowsAsUpdates() {

@Override
protected void process() {
pyCallable.call("__call__");
pyListenerCallable.call("__call__");
}

@Override
protected void propagateErrorDownstream(boolean fromProcess, @NotNull Throwable error,
TableListener.@Nullable Entry entry) {
if (!pyOnFailureCallback.isNone()) {
try {
pyOnFailureCallback.call("__call__", ExceptionUtils.getStackTrace(error));
} catch (Exception e2) {
// If the Python onFailure callback fails, log the new exception
// and continue with the original exception.
log.error().append("Python on_error callback failed: ").append(e2).endl();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
package io.deephaven.integrations.python;

import org.apache.commons.lang3.exception.ExceptionUtils;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableUpdate;
Expand All @@ -14,12 +15,16 @@
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.ScriptApi;
import org.jetbrains.annotations.NotNull;
import org.jpy.PyObject;

import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Objects;


/**
Expand All @@ -33,7 +38,10 @@
public class PythonReplayListenerAdapter extends InstrumentedTableUpdateListenerAdapter
implements TableSnapshotReplayer {
private static final long serialVersionUID = -8882402061960621245L;
private final PyObject pyCallable;
private static final Logger log = LoggerFactory.getLogger(PythonReplayListenerAdapter.class);

private final PyObject pyListenerCallable;
private final PyObject pyOnFailureCallback;
private final NotificationQueue.Dependency[] dependencies;

/**
Expand All @@ -43,22 +51,34 @@ public class PythonReplayListenerAdapter extends InstrumentedTableUpdateListener
* null.
* @param source The source table to which this listener will subscribe.
* @param retain Whether a hard reference to this listener should be maintained to prevent it from being collected.
* @param pyObjectIn Python listener object.
* @param pyListener Python listener object.
* @param dependencies The tables that must be satisfied before this listener is executed.
*/
public static PythonReplayListenerAdapter create(@Nullable String description, Table source, boolean retain,
PyObject pyObjectIn, NotificationQueue.Dependency... dependencies) {
public static PythonReplayListenerAdapter create(
@Nullable String description,
@NotNull Table source,
boolean retain,
@NotNull PyObject pyListener,
@NotNull PyObject pyOnFailureCallback,
@Nullable NotificationQueue.Dependency... dependencies) {
final UpdateGraph updateGraph = source.getUpdateGraph(dependencies);
try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) {
return new PythonReplayListenerAdapter(description, source, retain, pyObjectIn, dependencies);
return new PythonReplayListenerAdapter(description, source, retain, pyListener, pyOnFailureCallback,
dependencies);
}
}

private PythonReplayListenerAdapter(@Nullable String description, Table source, boolean retain, PyObject pyObjectIn,
NotificationQueue.Dependency... dependencies) {
private PythonReplayListenerAdapter(
@Nullable String description,
@NotNull Table source,
boolean retain,
@NotNull PyObject pyListener,
@NotNull PyObject pyOnFailureCallback,
@Nullable NotificationQueue.Dependency... dependencies) {
super(description, source, retain);
this.dependencies = dependencies;
this.pyCallable = PythonUtils.pyListenerFunc(pyObjectIn);
this.pyListenerCallable = PythonUtils.pyListenerFunc(Objects.requireNonNull(pyListener));
this.pyOnFailureCallback = Objects.requireNonNull(pyOnFailureCallback);
}

@Override
Expand All @@ -69,18 +89,36 @@ public void replay() {
final TableUpdate update =
new TableUpdateImpl(source.getRowSet(), emptyRowSet, emptyRowSet, emptyShift, emptyColumnSet);
final boolean isReplay = true;
pyCallable.call("__call__", update, isReplay);
pyListenerCallable.call("__call__", update, isReplay);
}

@Override
public void onUpdate(final TableUpdate update) {
final boolean isReplay = false;
pyCallable.call("__call__", update, isReplay);
pyListenerCallable.call("__call__", update, isReplay);
}

@Override
public void onFailureInternal(Throwable originalException, Entry sourceEntry) {
if (!pyOnFailureCallback.isNone()) {
try {
pyOnFailureCallback.call("__call__", ExceptionUtils.getStackTrace(originalException));
} catch (Throwable e) {
// If the Python onFailure callback fails, log the new exception
// and continue with the original exception.
log.error().append("Python on_error callback failed: ").append(e).endl();
}
}
super.onFailureInternal(originalException, sourceEntry);
}

@Override
public boolean canExecute(final long step) {
return super.canExecute(step)
&& (dependencies.length == 0 || Arrays.stream(dependencies).allMatch(t -> t.satisfied(step)));
}

public boolean isFailed() {
return failed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public abstract class InstrumentedTableListenerBase extends LivenessArtifact
private final PerformanceEntry entry;
private final boolean terminalListener;

private boolean failed = false;
protected boolean failed = false;
private static volatile boolean verboseLogging = Configuration
.getInstance()
.getBooleanWithDefault("InstrumentedTableListenerBase.verboseLogging", false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public abstract class MergedListener extends LivenessArtifact implements Notific
protected final PerformanceEntry entry;
private final String logPrefix;

private boolean failed;


@SuppressWarnings("FieldMayBeFinal")
private volatile long lastCompletedStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP;
private volatile long lastEnqueuedStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP;
Expand Down Expand Up @@ -96,6 +99,10 @@ protected Iterable<? extends ListenerRecorder> getRecorders() {
return recorders;
}

public boolean isFailed() {
return failed;
}

public final void notifyOnUpstreamError(
@NotNull final Throwable upstreamError, @Nullable final TableListener.Entry errorSourceEntry) {
notifyInternal(upstreamError, errorSourceEntry);
Expand All @@ -107,6 +114,10 @@ public void notifyChanges() {

private void notifyInternal(@Nullable final Throwable upstreamError,
@Nullable final TableListener.Entry errorSourceEntry) {
if (failed) {
return;
}

final long currentStep = getUpdateGraph().clock().currentStep();

synchronized (this) {
Expand Down Expand Up @@ -150,6 +161,7 @@ protected void propagateError(
final boolean uncaughtExceptionFromProcess,
@NotNull final Throwable error,
@Nullable final TableListener.Entry entry) {
failed = true;
forceReferenceCountToZero();
propagateErrorDownstream(uncaughtExceptionFromProcess, error, entry);
try {
Expand Down
Loading

0 comments on commit e919d52

Please sign in to comment.