Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api)!: Support merged listening on multiple tables #5672

Merged
merged 15 commits into from
Jul 15, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.integrations.python;

import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.table.impl.ListenerRecorder;
import io.deephaven.engine.table.impl.MergedListener;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jpy.PyObject;

import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Stream;

/**
* A Deephaven merged listener which fires when any of its bound listener recorders has updates and all of its
* dependencies have been satisfied. The listener then invokes the Python listener object.
*
* The Python listener object must be a Python MergedListener instance that provides a "process" method implementation
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
* with no argument.
*/
public class PythonMergedListenerAdapter extends MergedListener {
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
private final PyObject pyCallable;

/**
* Create a Python merged listener.
*
* @param recorders The listener recorders to which this listener will subscribe.
* @param dependencies The tables that must be satisfied before this listener is executed.
* @param listenerDescription A description for the UpdatePerformanceTracker to append to its entry description, may
* be null.
* @param pyObjectIn Python listener object.
*/
private PythonMergedListenerAdapter(
@NotNull ListenerRecorder[] recorders,
@Nullable NotificationQueue.Dependency[] dependencies,
@Nullable String listenerDescription,
@NotNull PyObject pyObjectIn) {
super(Arrays.asList(recorders), Arrays.asList(dependencies), listenerDescription, null);
Arrays.stream(recorders).forEach(rec -> rec.setMergedListener(this));
this.pyCallable = PythonUtils.pyMergeListenerFunc(pyObjectIn);
}

public static PythonMergedListenerAdapter create(
@NotNull ListenerRecorder[] recorders,
@Nullable NotificationQueue.Dependency[] dependencies,
@Nullable String listenerDescription,
@NotNull PyObject pyObjectIn) {
if (recorders.length < 2) {
throw new IllegalArgumentException("At least two listener recorders must be provided");
}

final NotificationQueue.Dependency[] allItems =
Stream.concat(Arrays.stream(recorders), Arrays.stream(dependencies))
.filter(Objects::nonNull)
.toArray(NotificationQueue.Dependency[]::new);

final UpdateGraph updateGraph = allItems[0].getUpdateGraph(allItems);

try (final SafeCloseable ignored = ExecutionContext.getContext().withUpdateGraph(updateGraph).open()) {
return new PythonMergedListenerAdapter(recorders, dependencies, listenerDescription, pyObjectIn);
}
}

@Override
protected void process() {
pyCallable.call("__call__");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ static PyObject pyListenerFunc(final PyObject pyObject) {
return pyCallable(pyObject, "on_update");
}

static PyObject pyMergeListenerFunc(final PyObject pyObject) {
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
return pyCallable(pyObject, "process");
}
rcaudy marked this conversation as resolved.
Show resolved Hide resolved

/**
* Creates a callable PyObject, either using method.apply() or __call__(), if the pyObjectIn has such methods
Expand Down
Loading
Loading