From 2075615db01cdc2cc943b9ffff05b44bb7ac947c Mon Sep 17 00:00:00 2001 From: Jianfeng Mao <4297243+jmao-denver@users.noreply.github.com> Date: Mon, 23 Sep 2024 15:04:12 -0600 Subject: [PATCH] feat: Add add_async/delete_async methods in InputTable (#6061) Fixes #3887 --- ...PythonInputTableStatusListenerAdapter.java | 61 ++++++++++ py/server/deephaven/table_factory.py | 111 +++++++++++++++--- py/server/deephaven/table_listener.py | 9 +- py/server/tests/test_table_factory.py | 61 +++++++++- 4 files changed, 215 insertions(+), 27 deletions(-) create mode 100644 Integrations/src/main/java/io/deephaven/integrations/python/PythonInputTableStatusListenerAdapter.java diff --git a/Integrations/src/main/java/io/deephaven/integrations/python/PythonInputTableStatusListenerAdapter.java b/Integrations/src/main/java/io/deephaven/integrations/python/PythonInputTableStatusListenerAdapter.java new file mode 100644 index 00000000000..0949d483020 --- /dev/null +++ b/Integrations/src/main/java/io/deephaven/integrations/python/PythonInputTableStatusListenerAdapter.java @@ -0,0 +1,61 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.integrations.python; + +import io.deephaven.engine.util.input.InputTableStatusListener; +import io.deephaven.internal.log.LoggerFactory; +import io.deephaven.io.logger.Logger; +import io.deephaven.util.annotations.ScriptApi; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.jpy.PyObject; + +import java.util.Objects; + +@ScriptApi +public class PythonInputTableStatusListenerAdapter implements InputTableStatusListener { + + private static final Logger log = LoggerFactory.getLogger(PythonInputTableStatusListenerAdapter.class); + private final PyObject pyOnSuccessCallback; + private final PyObject pyOnErrorCallback; + + /** + * Create a Python InputTable status listener. + * + * @param pyOnSuccessCallback The Python onSuccess callback function. + * @param pyOnErrorCallback The Python onError callback function. + */ + private PythonInputTableStatusListenerAdapter(@Nullable PyObject pyOnSuccessCallback, + @NotNull PyObject pyOnErrorCallback) { + this.pyOnSuccessCallback = pyOnSuccessCallback; + this.pyOnErrorCallback = pyOnErrorCallback; + } + + public static PythonInputTableStatusListenerAdapter create(@Nullable PyObject pyOnSuccessCallback, + @NotNull PyObject pyOnErrorCallback) { + return new PythonInputTableStatusListenerAdapter(pyOnSuccessCallback, + Objects.requireNonNull(pyOnErrorCallback, "Python on_error callback cannot be None")); + } + + @Override + public void onError(Throwable originalException) { + try { + pyOnErrorCallback.call("__call__", ExceptionUtils.getStackTrace(originalException)); + } catch (Throwable e) { + // If the Python onFailure callback fails, log the new exception + // and continue with the original exception. + log.error().append("Python on_error callback failed: ").append(e).endl(); + } + } + + @Override + public void onSuccess() { + if (pyOnSuccessCallback != null && !pyOnSuccessCallback.isNone()) { + pyOnSuccessCallback.call("__call__"); + } else { + InputTableStatusListener.super.onSuccess(); + } + } +} diff --git a/py/server/deephaven/table_factory.py b/py/server/deephaven/table_factory.py index 3b425583ee6..5c5a20d5420 100644 --- a/py/server/deephaven/table_factory.py +++ b/py/server/deephaven/table_factory.py @@ -1,9 +1,8 @@ # # Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending # - """ This module provides various ways to make a Deephaven table. """ - +from functools import wraps from typing import Callable, List, Dict, Any, Union, Sequence, Tuple, Mapping, Optional import jpy @@ -32,6 +31,18 @@ _JRingTableTools = jpy.get_type("io.deephaven.engine.table.impl.sources.ring.RingTableTools") _JSupplier = jpy.get_type('java.util.function.Supplier') _JFunctionGeneratedTableFactory = jpy.get_type("io.deephaven.engine.table.impl.util.FunctionGeneratedTableFactory") +_JPythonInputTableStatusListenerAdapter = jpy.get_type( + "io.deephaven.integrations.python.PythonInputTableStatusListenerAdapter") + +_DEFAULT_INPUT_TABLE_ON_ERROR_CALLBACK = lambda e: print(f"An error occurred during InputTable async operation: {e}") + + +def _error_callback_wrapper(callback: Callable[[Exception], None]): + @wraps(callback) + def wrapper(e): + callback(RuntimeError(e)) + + return wrapper def empty_table(size: int) -> Table: @@ -243,8 +254,8 @@ def __init__(self, j_table: jpy.JType): raise DHError("the provided table's InputTable attribute type is not of InputTableUpdater type.") def add(self, table: Table) -> None: - """Synchronously writes rows from the provided table to this input table. If this is a keyed input table, added rows with keys - that match existing rows will replace those rows. + """Synchronously writes rows from the provided table to this input table. If this is a keyed input table, + added rows with keys that match existing rows will replace those rows. Args: table (Table): the table that provides the rows to write @@ -258,8 +269,8 @@ def add(self, table: Table) -> None: raise DHError(e, "add to InputTable failed.") from e def delete(self, table: Table) -> None: - """Synchronously deletes the keys contained in the provided table from this keyed input table. If this method is called on an - append-only input table, an error will be raised. + """Synchronously deletes the keys contained in the provided table from this keyed input table. If this + method is called on an append-only input table, an error will be raised. Args: table (Table): the table with the keys to delete @@ -272,6 +283,76 @@ def delete(self, table: Table) -> None: except Exception as e: raise DHError(e, "delete data in the InputTable failed.") from e + def add_async(self, table: Table, on_success: Callable[[], None] = None, + on_error: Callable[[Exception], None] = None) -> None: + """Asynchronously writes rows from the provided table to this input table. If this is a keyed input table, + added rows with keys that match existing rows will replace those rows. This method returns immediately without + waiting for the operation to complete. If the operation succeeds, the optional on_success callback if provided + will be called. If the operation fails, the optional on_error callback if provided will be called. If on_error + is not provided, a default callback function will be called that simply prints out the received exception. + + Note, multiple calls to this method on the same thread will be queued and processed in order. However, ordering + is not guaranteed across threads. + + Args: + table (Table): the table that provides the rows to write + on_success (Callable[[], None]): the success callback function, default is None + on_error (Callable[[Exception], None]): the error callback function, default is None. When None, a default + callback function will be provided that simply prints out the received exception. If the callback + function itself raises an exception, the new exception will be logged in the Deephaven server log and + will not be further processed by the server. + + Raises: + DHError + """ + try: + if on_error: + on_error_callback = _error_callback_wrapper(on_error) + else: + on_error_callback = _error_callback_wrapper(_DEFAULT_INPUT_TABLE_ON_ERROR_CALLBACK) + + j_input_table_status_listener = _JPythonInputTableStatusListenerAdapter.create(on_success, + on_error_callback) + self.j_input_table.addAsync(table.j_table, j_input_table_status_listener) + except Exception as e: + raise DHError(e, "async add to InputTable failed.") from e + + def delete_async(self, table: Table, on_success: Callable[[], None] = None, + on_error: Callable[[Exception], None] = None) -> None: + """Asynchronously deletes the keys contained in the provided table from this keyed input table. If this + method is + called on an append-only input table, an error will be raised. This method returns immediately without + waiting for + the operation to complete. If the operation succeeds, the optional on_success callback if provided + will be called. If the operation fails, the optional on_error callback if provided will be called. If on_error + is not provided, a default callback function will be called that simply prints out the received exception. + + Note, multiple calls to this method on the same thread will be queued and processed in order. However, ordering + is not guaranteed across threads. + + Args: + table (Table): the table with the keys to delete + on_success (Callable[[], None]): the success callback function, default is None + on_error (Callable[[Exception], None]): the error callback function, default is None. When None, a default + callback function will be provided that simply prints out the received exception. If the callback + function itself raises an exception, the new exception will be logged in the Deephaven server log and + will not be further processed by the server. + + Raises: + DHError + """ + try: + if on_error: + on_error_callback = _error_callback_wrapper(on_error) + else: + on_error_callback = _error_callback_wrapper(_DEFAULT_INPUT_TABLE_ON_ERROR_CALLBACK) + + j_input_table_status_listener = _JPythonInputTableStatusListenerAdapter.create(on_success, + on_error_callback) + self.j_input_table.deleteAsync(table.j_table, j_input_table_status_listener) + except Exception as e: + raise DHError(e, "async delete data in the InputTable failed.") from e + @property def key_names(self) -> List[str]: """The names of the key columns of the InputTable.""" @@ -354,11 +435,11 @@ def ring_table(parent: Table, capacity: int, initialize: bool = True) -> Table: def function_generated_table(table_generator: Callable[..., Table], - source_tables: Union[Table, List[Table]] = None, - refresh_interval_ms: int = None, - exec_ctx: ExecutionContext = None, - args: Tuple = (), - kwargs: Dict = {}) -> Table: + source_tables: Union[Table, List[Table]] = None, + refresh_interval_ms: int = None, + exec_ctx: ExecutionContext = None, + args: Tuple = (), + kwargs: Dict = {}) -> Table: """Creates an abstract table that is generated by running the table_generator() function. The function will first be run to generate the table when this method is called, then subsequently either (a) whenever one of the 'source_tables' ticks or (b) after refresh_interval_ms have elapsed. Either 'refresh_interval_ms' or @@ -368,13 +449,15 @@ def function_generated_table(table_generator: Callable[..., Table], function-generated tables can create tables that are produced by arbitrary Python logic (including using Pandas or numpy). They can also be used to retrieve data from external sources (such as files or websites). - The table definition must not change between invocations of the 'table_generator' function, or an exception will be raised. + The table definition must not change between invocations of the 'table_generator' function, or an exception will + be raised. Note that the 'table_generator' may access data in the sourceTables but should not perform further table operations on them without careful handling. Table operations may be memoized, and it is possible that a table operation will return a table created by a previous invocation of the same operation. Since that result will not have been included in the 'source_table', it's not automatically treated as a dependency for purposes of determining when it's safe to - invoke 'table_generator', allowing races to exist between accessing the operation result and that result's own update + invoke 'table_generator', allowing races to exist between accessing the operation result and that result's own + update processing. It's best to include all dependencies directly in 'source_table', or only compute on-demand inputs under a LivenessScope. @@ -441,6 +524,6 @@ def table_generator_function(): j_function_generated_table = _JFunctionGeneratedTableFactory.create( table_generator_j_function, source_j_tables - ) + ) return Table(j_function_generated_table) diff --git a/py/server/deephaven/table_listener.py b/py/server/deephaven/table_listener.py index ecc6b0016f6..8701cb5e9da 100644 --- a/py/server/deephaven/table_listener.py +++ b/py/server/deephaven/table_listener.py @@ -17,6 +17,7 @@ from deephaven.jcompat import to_sequence, j_list_to_list from deephaven.table import Table from deephaven._table_reader import _table_reader_all_dict, _table_reader_chunk_dict +from deephaven.table_factory import _error_callback_wrapper _JPythonReplayListenerAdapter = jpy.get_type("io.deephaven.integrations.python.PythonReplayListenerAdapter") _JTableUpdate = jpy.get_type("io.deephaven.engine.table.TableUpdate") @@ -238,14 +239,6 @@ def _wrap_listener_obj(t: Table, listener: TableListener): return listener -def _error_callback_wrapper(callback: Callable[[Exception], None]): - @wraps(callback) - def wrapper(e): - callback(RuntimeError(e)) - - return wrapper - - class TableListenerHandle(JObjectWrapper): """A handle to manage a table listener's lifecycle.""" j_object_type = _JPythonReplayListenerAdapter diff --git a/py/server/tests/test_table_factory.py b/py/server/tests/test_table_factory.py index c4fb1cbea36..4db061adc1f 100644 --- a/py/server/tests/test_table_factory.py +++ b/py/server/tests/test_table_factory.py @@ -7,7 +7,7 @@ import jpy import numpy as np - +from time import sleep from deephaven import DHError, read_csv, time_table, empty_table, merge, merge_sorted, dtypes, new_table, \ input_table, time, _wrapper from deephaven.column import byte_col, char_col, short_col, bool_col, int_col, long_col, float_col, double_col, \ @@ -430,10 +430,7 @@ def test_instant_array(self): from deephaven import dtypes as dht from deephaven import time as dhtu - col_defs_5 = \ - { \ - "InstantArray": dht.instant_array \ - } + col_defs_5 = {"InstantArray": dht.instant_array} dtw5 = DynamicTableWriter(col_defs_5) t5 = dtw5.table @@ -478,6 +475,60 @@ def test_j_input_wrapping(self): self.assertFalse(isinstance(t, InputTable)) self.assertTrue(isinstance(t, Table)) + def test_input_table_async(self): + cols = [ + bool_col(name="Boolean", data=[True, False]), + byte_col(name="Byte", data=(1, -1)), + char_col(name="Char", data='-1'), + short_col(name="Short", data=[1, -1]), + int_col(name="Int", data=[1, -1]), + long_col(name="Long", data=[1, -1]), + long_col(name="NPLong", data=np.array([1, -1], dtype=np.int8)), + float_col(name="Float", data=[1.01, -1.01]), + double_col(name="Double", data=[1.01, -1.01]), + string_col(name="String", data=["foo", "bar"]), + ] + t = new_table(cols=cols) + + with self.subTest("async add"): + self.assertEqual(t.size, 2) + success_count = 0 + def on_success(): + nonlocal success_count + success_count += 1 + append_only_input_table = input_table(col_defs=t.definition) + append_only_input_table.add_async(t, on_success=on_success) + append_only_input_table.add_async(t, on_success=on_success) + while success_count < 2: + sleep(0.1) + self.assertEqual(append_only_input_table.size, 4) + + keyed_input_table = input_table(col_defs=t.definition, key_cols="String") + keyed_input_table.add_async(t, on_success=on_success) + keyed_input_table.add_async(t, on_success=on_success) + while success_count < 4: + sleep(0.1) + self.assertEqual(keyed_input_table.size, 2) + + with self.subTest("async delete"): + keyed_input_table = input_table(init_table=t, key_cols=["String", "Double"]) + keyed_input_table.delete_async(t.select(["String", "Double"]), on_success=on_success) + while success_count < 5: + sleep(0.1) + self.assertEqual(keyed_input_table.size, 0) + t1 = t.drop_columns("String") + + with self.subTest("schema mismatch"): + error_count = 0 + def on_error(e: Exception): + nonlocal error_count + error_count += 1 + + append_only_input_table = input_table(col_defs=t1.definition) + with self.assertRaises(DHError) as cm: + append_only_input_table.add_async(t, on_success=on_success, on_error=on_error) + self.assertEqual(error_count, 0) + if __name__ == '__main__': unittest.main()