Skip to content

Commit

Permalink
feat: Add add_async/delete_async methods in InputTable (#6061)
Browse files Browse the repository at this point in the history
Fixes #3887
  • Loading branch information
jmao-denver authored Sep 23, 2024
1 parent dcd7fc5 commit 2075615
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.integrations.python;

import io.deephaven.engine.util.input.InputTableStatusListener;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.util.annotations.ScriptApi;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jpy.PyObject;

import java.util.Objects;

@ScriptApi
public class PythonInputTableStatusListenerAdapter implements InputTableStatusListener {

private static final Logger log = LoggerFactory.getLogger(PythonInputTableStatusListenerAdapter.class);
private final PyObject pyOnSuccessCallback;
private final PyObject pyOnErrorCallback;

/**
* Create a Python InputTable status listener.
*
* @param pyOnSuccessCallback The Python onSuccess callback function.
* @param pyOnErrorCallback The Python onError callback function.
*/
private PythonInputTableStatusListenerAdapter(@Nullable PyObject pyOnSuccessCallback,
@NotNull PyObject pyOnErrorCallback) {
this.pyOnSuccessCallback = pyOnSuccessCallback;
this.pyOnErrorCallback = pyOnErrorCallback;
}

public static PythonInputTableStatusListenerAdapter create(@Nullable PyObject pyOnSuccessCallback,
@NotNull PyObject pyOnErrorCallback) {
return new PythonInputTableStatusListenerAdapter(pyOnSuccessCallback,
Objects.requireNonNull(pyOnErrorCallback, "Python on_error callback cannot be None"));
}

@Override
public void onError(Throwable originalException) {
try {
pyOnErrorCallback.call("__call__", ExceptionUtils.getStackTrace(originalException));
} catch (Throwable e) {
// If the Python onFailure callback fails, log the new exception
// and continue with the original exception.
log.error().append("Python on_error callback failed: ").append(e).endl();
}
}

@Override
public void onSuccess() {
if (pyOnSuccessCallback != null && !pyOnSuccessCallback.isNone()) {
pyOnSuccessCallback.call("__call__");
} else {
InputTableStatusListener.super.onSuccess();
}
}
}
111 changes: 97 additions & 14 deletions py/server/deephaven/table_factory.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
#
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
#

""" This module provides various ways to make a Deephaven table. """

from functools import wraps
from typing import Callable, List, Dict, Any, Union, Sequence, Tuple, Mapping, Optional

import jpy
Expand Down Expand Up @@ -32,6 +31,18 @@
_JRingTableTools = jpy.get_type("io.deephaven.engine.table.impl.sources.ring.RingTableTools")
_JSupplier = jpy.get_type('java.util.function.Supplier')
_JFunctionGeneratedTableFactory = jpy.get_type("io.deephaven.engine.table.impl.util.FunctionGeneratedTableFactory")
_JPythonInputTableStatusListenerAdapter = jpy.get_type(
"io.deephaven.integrations.python.PythonInputTableStatusListenerAdapter")

_DEFAULT_INPUT_TABLE_ON_ERROR_CALLBACK = lambda e: print(f"An error occurred during InputTable async operation: {e}")


def _error_callback_wrapper(callback: Callable[[Exception], None]):
@wraps(callback)
def wrapper(e):
callback(RuntimeError(e))

return wrapper


def empty_table(size: int) -> Table:
Expand Down Expand Up @@ -243,8 +254,8 @@ def __init__(self, j_table: jpy.JType):
raise DHError("the provided table's InputTable attribute type is not of InputTableUpdater type.")

def add(self, table: Table) -> None:
"""Synchronously writes rows from the provided table to this input table. If this is a keyed input table, added rows with keys
that match existing rows will replace those rows.
"""Synchronously writes rows from the provided table to this input table. If this is a keyed input table,
added rows with keys that match existing rows will replace those rows.
Args:
table (Table): the table that provides the rows to write
Expand All @@ -258,8 +269,8 @@ def add(self, table: Table) -> None:
raise DHError(e, "add to InputTable failed.") from e

def delete(self, table: Table) -> None:
"""Synchronously deletes the keys contained in the provided table from this keyed input table. If this method is called on an
append-only input table, an error will be raised.
"""Synchronously deletes the keys contained in the provided table from this keyed input table. If this
method is called on an append-only input table, an error will be raised.
Args:
table (Table): the table with the keys to delete
Expand All @@ -272,6 +283,76 @@ def delete(self, table: Table) -> None:
except Exception as e:
raise DHError(e, "delete data in the InputTable failed.") from e

def add_async(self, table: Table, on_success: Callable[[], None] = None,
on_error: Callable[[Exception], None] = None) -> None:
"""Asynchronously writes rows from the provided table to this input table. If this is a keyed input table,
added rows with keys that match existing rows will replace those rows. This method returns immediately without
waiting for the operation to complete. If the operation succeeds, the optional on_success callback if provided
will be called. If the operation fails, the optional on_error callback if provided will be called. If on_error
is not provided, a default callback function will be called that simply prints out the received exception.
Note, multiple calls to this method on the same thread will be queued and processed in order. However, ordering
is not guaranteed across threads.
Args:
table (Table): the table that provides the rows to write
on_success (Callable[[], None]): the success callback function, default is None
on_error (Callable[[Exception], None]): the error callback function, default is None. When None, a default
callback function will be provided that simply prints out the received exception. If the callback
function itself raises an exception, the new exception will be logged in the Deephaven server log and
will not be further processed by the server.
Raises:
DHError
"""
try:
if on_error:
on_error_callback = _error_callback_wrapper(on_error)
else:
on_error_callback = _error_callback_wrapper(_DEFAULT_INPUT_TABLE_ON_ERROR_CALLBACK)

j_input_table_status_listener = _JPythonInputTableStatusListenerAdapter.create(on_success,
on_error_callback)
self.j_input_table.addAsync(table.j_table, j_input_table_status_listener)
except Exception as e:
raise DHError(e, "async add to InputTable failed.") from e

def delete_async(self, table: Table, on_success: Callable[[], None] = None,
on_error: Callable[[Exception], None] = None) -> None:
"""Asynchronously deletes the keys contained in the provided table from this keyed input table. If this
method is
called on an append-only input table, an error will be raised. This method returns immediately without
waiting for
the operation to complete. If the operation succeeds, the optional on_success callback if provided
will be called. If the operation fails, the optional on_error callback if provided will be called. If on_error
is not provided, a default callback function will be called that simply prints out the received exception.
Note, multiple calls to this method on the same thread will be queued and processed in order. However, ordering
is not guaranteed across threads.
Args:
table (Table): the table with the keys to delete
on_success (Callable[[], None]): the success callback function, default is None
on_error (Callable[[Exception], None]): the error callback function, default is None. When None, a default
callback function will be provided that simply prints out the received exception. If the callback
function itself raises an exception, the new exception will be logged in the Deephaven server log and
will not be further processed by the server.
Raises:
DHError
"""
try:
if on_error:
on_error_callback = _error_callback_wrapper(on_error)
else:
on_error_callback = _error_callback_wrapper(_DEFAULT_INPUT_TABLE_ON_ERROR_CALLBACK)

j_input_table_status_listener = _JPythonInputTableStatusListenerAdapter.create(on_success,
on_error_callback)
self.j_input_table.deleteAsync(table.j_table, j_input_table_status_listener)
except Exception as e:
raise DHError(e, "async delete data in the InputTable failed.") from e

@property
def key_names(self) -> List[str]:
"""The names of the key columns of the InputTable."""
Expand Down Expand Up @@ -354,11 +435,11 @@ def ring_table(parent: Table, capacity: int, initialize: bool = True) -> Table:


def function_generated_table(table_generator: Callable[..., Table],
source_tables: Union[Table, List[Table]] = None,
refresh_interval_ms: int = None,
exec_ctx: ExecutionContext = None,
args: Tuple = (),
kwargs: Dict = {}) -> Table:
source_tables: Union[Table, List[Table]] = None,
refresh_interval_ms: int = None,
exec_ctx: ExecutionContext = None,
args: Tuple = (),
kwargs: Dict = {}) -> Table:
"""Creates an abstract table that is generated by running the table_generator() function. The function will first be
run to generate the table when this method is called, then subsequently either (a) whenever one of the
'source_tables' ticks or (b) after refresh_interval_ms have elapsed. Either 'refresh_interval_ms' or
Expand All @@ -368,13 +449,15 @@ def function_generated_table(table_generator: Callable[..., Table],
function-generated tables can create tables that are produced by arbitrary Python logic (including using Pandas or
numpy). They can also be used to retrieve data from external sources (such as files or websites).
The table definition must not change between invocations of the 'table_generator' function, or an exception will be raised.
The table definition must not change between invocations of the 'table_generator' function, or an exception will
be raised.
Note that the 'table_generator' may access data in the sourceTables but should not perform further table operations
on them without careful handling. Table operations may be memoized, and it is possible that a table operation will
return a table created by a previous invocation of the same operation. Since that result will not have been included
in the 'source_table', it's not automatically treated as a dependency for purposes of determining when it's safe to
invoke 'table_generator', allowing races to exist between accessing the operation result and that result's own update
invoke 'table_generator', allowing races to exist between accessing the operation result and that result's own
update
processing. It's best to include all dependencies directly in 'source_table', or only compute on-demand inputs under
a LivenessScope.
Expand Down Expand Up @@ -441,6 +524,6 @@ def table_generator_function():
j_function_generated_table = _JFunctionGeneratedTableFactory.create(
table_generator_j_function,
source_j_tables
)
)

return Table(j_function_generated_table)
9 changes: 1 addition & 8 deletions py/server/deephaven/table_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from deephaven.jcompat import to_sequence, j_list_to_list
from deephaven.table import Table
from deephaven._table_reader import _table_reader_all_dict, _table_reader_chunk_dict
from deephaven.table_factory import _error_callback_wrapper

_JPythonReplayListenerAdapter = jpy.get_type("io.deephaven.integrations.python.PythonReplayListenerAdapter")
_JTableUpdate = jpy.get_type("io.deephaven.engine.table.TableUpdate")
Expand Down Expand Up @@ -238,14 +239,6 @@ def _wrap_listener_obj(t: Table, listener: TableListener):
return listener


def _error_callback_wrapper(callback: Callable[[Exception], None]):
@wraps(callback)
def wrapper(e):
callback(RuntimeError(e))

return wrapper


class TableListenerHandle(JObjectWrapper):
"""A handle to manage a table listener's lifecycle."""
j_object_type = _JPythonReplayListenerAdapter
Expand Down
61 changes: 56 additions & 5 deletions py/server/tests/test_table_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import jpy
import numpy as np

from time import sleep
from deephaven import DHError, read_csv, time_table, empty_table, merge, merge_sorted, dtypes, new_table, \
input_table, time, _wrapper
from deephaven.column import byte_col, char_col, short_col, bool_col, int_col, long_col, float_col, double_col, \
Expand Down Expand Up @@ -430,10 +430,7 @@ def test_instant_array(self):
from deephaven import dtypes as dht
from deephaven import time as dhtu

col_defs_5 = \
{ \
"InstantArray": dht.instant_array \
}
col_defs_5 = {"InstantArray": dht.instant_array}

dtw5 = DynamicTableWriter(col_defs_5)
t5 = dtw5.table
Expand Down Expand Up @@ -478,6 +475,60 @@ def test_j_input_wrapping(self):
self.assertFalse(isinstance(t, InputTable))
self.assertTrue(isinstance(t, Table))

def test_input_table_async(self):
cols = [
bool_col(name="Boolean", data=[True, False]),
byte_col(name="Byte", data=(1, -1)),
char_col(name="Char", data='-1'),
short_col(name="Short", data=[1, -1]),
int_col(name="Int", data=[1, -1]),
long_col(name="Long", data=[1, -1]),
long_col(name="NPLong", data=np.array([1, -1], dtype=np.int8)),
float_col(name="Float", data=[1.01, -1.01]),
double_col(name="Double", data=[1.01, -1.01]),
string_col(name="String", data=["foo", "bar"]),
]
t = new_table(cols=cols)

with self.subTest("async add"):
self.assertEqual(t.size, 2)
success_count = 0
def on_success():
nonlocal success_count
success_count += 1
append_only_input_table = input_table(col_defs=t.definition)
append_only_input_table.add_async(t, on_success=on_success)
append_only_input_table.add_async(t, on_success=on_success)
while success_count < 2:
sleep(0.1)
self.assertEqual(append_only_input_table.size, 4)

keyed_input_table = input_table(col_defs=t.definition, key_cols="String")
keyed_input_table.add_async(t, on_success=on_success)
keyed_input_table.add_async(t, on_success=on_success)
while success_count < 4:
sleep(0.1)
self.assertEqual(keyed_input_table.size, 2)

with self.subTest("async delete"):
keyed_input_table = input_table(init_table=t, key_cols=["String", "Double"])
keyed_input_table.delete_async(t.select(["String", "Double"]), on_success=on_success)
while success_count < 5:
sleep(0.1)
self.assertEqual(keyed_input_table.size, 0)
t1 = t.drop_columns("String")

with self.subTest("schema mismatch"):
error_count = 0
def on_error(e: Exception):
nonlocal error_count
error_count += 1

append_only_input_table = input_table(col_defs=t1.definition)
with self.assertRaises(DHError) as cm:
append_only_input_table.add_async(t, on_success=on_success, on_error=on_error)
self.assertEqual(error_count, 0)


if __name__ == '__main__':
unittest.main()

0 comments on commit 2075615

Please sign in to comment.