Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api)!: Support merged listening on multiple tables #5672

Merged
merged 15 commits into from
Jul 15, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.integrations.python;

import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.table.ModifiedColumnSet;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.impl.ListenerRecorder;
import io.deephaven.engine.table.impl.MergedListener;
import io.deephaven.engine.table.impl.TableUpdateImpl;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.ScriptApi;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jpy.PyObject;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Stream;

/**
* A Deephaven merged listener which fires when any of its bound listener recorders has updates and all of its
* dependencies have been satisfied. The listener then invokes the Python listener object.
*
* The Python listener object must be a Python MergedListener instance that provides a "_process" method implementation
* with no argument.
*/
@ScriptApi
public class PythonMergedListenerAdapter extends MergedListener {
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
private final PyObject pyCallable;

/**
* Create a Python merged listener.
*
* @param recorders The listener recorders to which this listener will subscribe.
* @param dependencies The tables that must be satisfied before this listener is executed.
* @param listenerDescription A description for the UpdatePerformanceTracker to append to its entry description, may
* be null.
* @param pyObjectIn Python listener object.
*/
private PythonMergedListenerAdapter(
@NotNull ListenerRecorder[] recorders,
@Nullable NotificationQueue.Dependency[] dependencies,
@Nullable String listenerDescription,
@NotNull PyObject pyObjectIn) {
super(Arrays.asList(recorders), Arrays.asList(dependencies), listenerDescription, null);
Arrays.stream(recorders).forEach(rec -> rec.setMergedListener(this));
this.pyCallable = PythonUtils.pyMergeListenerFunc(pyObjectIn);
}

public static PythonMergedListenerAdapter create(
@NotNull ListenerRecorder[] recorders,
@Nullable NotificationQueue.Dependency[] dependencies,
@Nullable String listenerDescription,
@NotNull PyObject pyObjectIn) {
if (recorders.length < 2) {
throw new IllegalArgumentException("At least two listener recorders must be provided");
}

final NotificationQueue.Dependency[] allItems =
Stream.concat(Arrays.stream(recorders), Arrays.stream(dependencies))
.filter(Objects::nonNull)
.toArray(NotificationQueue.Dependency[]::new);

final UpdateGraph updateGraph = allItems[0].getUpdateGraph(allItems);

try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) {
return new PythonMergedListenerAdapter(recorders, dependencies, listenerDescription, pyObjectIn);
}
}

public ArrayList<TableUpdate> currentRowsAsUpdates() {
final ArrayList<TableUpdate> updates = new ArrayList<>();
for (ListenerRecorder recorder : getRecorders()) {
final TableUpdate update = new TableUpdateImpl(
recorder.getParent().getRowSet().copy(),
RowSetFactory.empty(),
RowSetFactory.empty(),
RowSetShiftData.EMPTY,
ModifiedColumnSet.EMPTY);
updates.add(update);
}
return updates;
}

@Override
protected void process() {
pyCallable.call("__call__");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ static PyObject pyListenerFunc(final PyObject pyObject) {
return pyCallable(pyObject, "on_update");
}

/**
* Gets the python function that should be called by a merged listener. The input can be either (1) a callable or
* (2) an object which provides an "_process" method.
*
* @param pyObject python listener object. This should either be a callable or an object which provides an
* "_process" method.
* @return python function that should be called by a merged listener.
* @throws IllegalArgumentException python listener object is not a valid listener.
*/
static PyObject pyMergeListenerFunc(final PyObject pyObject) {
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
return pyCallable(pyObject, "_process");
}

/**
* Creates a callable PyObject, either using method.apply() or __call__(), if the pyObjectIn has such methods
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public abstract class MergedListener extends LivenessArtifact implements Notific
AtomicLongFieldUpdater.newUpdater(MergedListener.class, "lastCompletedStep");

private final UpdateGraph updateGraph;

jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
private final Iterable<? extends ListenerRecorder> recorders;
private final Iterable<NotificationQueue.Dependency> dependencies;
private final String listenerDescription;
Expand Down Expand Up @@ -91,6 +92,10 @@ public UpdateGraph getUpdateGraph() {
return updateGraph;
}

protected Iterable<? extends ListenerRecorder> getRecorders() {
return recorders;
}

public final void notifyOnUpstreamError(
@NotNull final Throwable upstreamError, @Nullable final TableListener.Entry errorSourceEntry) {
notifyInternal(upstreamError, errorSourceEntry);
Expand Down
Loading
Loading