Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add add_async/delete_async methods in InputTable #6061

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.integrations.python;

import io.deephaven.engine.util.input.InputTableStatusListener;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.util.annotations.ScriptApi;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jpy.PyObject;

import java.util.Objects;

@ScriptApi
public class PythonInputTableStatusListenerAdapter implements InputTableStatusListener {

private static final Logger log = LoggerFactory.getLogger(PythonInputTableStatusListenerAdapter.class);
private final PyObject pyOnSuccessCallback;
private final PyObject pyOnErrorCallback;

/**
* Create a Python InputTable status listener.
*
* @param pyOnSuccessCallback The Python onSuccess callback function.
* @param pyOnErrorCallback The Python onError callback function.
*/
private PythonInputTableStatusListenerAdapter(@Nullable PyObject pyOnSuccessCallback,
@NotNull PyObject pyOnErrorCallback) {
this.pyOnSuccessCallback = pyOnSuccessCallback;
this.pyOnErrorCallback = pyOnErrorCallback;
}

public static PythonInputTableStatusListenerAdapter create(@Nullable PyObject pyOnSuccessCallback,
@NotNull PyObject pyOnErrorCallback) {
return new PythonInputTableStatusListenerAdapter(pyOnSuccessCallback,
Objects.requireNonNull(pyOnErrorCallback, "Python on_error callback cannot be None"));
}

@Override
public void onError(Throwable originalException) {
try {
pyOnErrorCallback.call("__call__", ExceptionUtils.getStackTrace(originalException));
} catch (Throwable e) {
// If the Python onFailure callback fails, log the new exception
// and continue with the original exception.
log.error().append("Python on_error callback failed: ").append(e).endl();
}
}

@Override
public void onSuccess() {
if (pyOnSuccessCallback != null && !pyOnSuccessCallback.isNone()) {
pyOnSuccessCallback.call("__call__");
} else {
InputTableStatusListener.super.onSuccess();
}
}
}
111 changes: 97 additions & 14 deletions py/server/deephaven/table_factory.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
#
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
#

""" This module provides various ways to make a Deephaven table. """

from functools import wraps
from typing import Callable, List, Dict, Any, Union, Sequence, Tuple, Mapping, Optional

import jpy
Expand Down Expand Up @@ -32,6 +31,18 @@
_JRingTableTools = jpy.get_type("io.deephaven.engine.table.impl.sources.ring.RingTableTools")
_JSupplier = jpy.get_type('java.util.function.Supplier')
_JFunctionGeneratedTableFactory = jpy.get_type("io.deephaven.engine.table.impl.util.FunctionGeneratedTableFactory")
_JPythonInputTableStatusListenerAdapter = jpy.get_type(
"io.deephaven.integrations.python.PythonInputTableStatusListenerAdapter")

_DEFAULT_INPUT_TABLE_ON_ERROR_CALLBACK = lambda e: print(f"An error occurred during InputTable async operation: {e}")
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved


def _error_callback_wrapper(callback: Callable[[Exception], None]):
@wraps(callback)
def wrapper(e):
callback(RuntimeError(e))

return wrapper


def empty_table(size: int) -> Table:
Expand Down Expand Up @@ -243,8 +254,8 @@ def __init__(self, j_table: jpy.JType):
raise DHError("the provided table's InputTable attribute type is not of InputTableUpdater type.")

def add(self, table: Table) -> None:
"""Synchronously writes rows from the provided table to this input table. If this is a keyed input table, added rows with keys
that match existing rows will replace those rows.
"""Synchronously writes rows from the provided table to this input table. If this is a keyed input table,
added rows with keys that match existing rows will replace those rows.

Args:
table (Table): the table that provides the rows to write
Expand All @@ -258,8 +269,8 @@ def add(self, table: Table) -> None:
raise DHError(e, "add to InputTable failed.") from e

def delete(self, table: Table) -> None:
"""Synchronously deletes the keys contained in the provided table from this keyed input table. If this method is called on an
append-only input table, an error will be raised.
"""Synchronously deletes the keys contained in the provided table from this keyed input table. If this
method is called on an append-only input table, an error will be raised.

Args:
table (Table): the table with the keys to delete
Expand All @@ -272,6 +283,76 @@ def delete(self, table: Table) -> None:
except Exception as e:
raise DHError(e, "delete data in the InputTable failed.") from e

def add_async(self, table: Table, on_success: Callable[[], None] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

support should also be added to the client

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure that we need to.

  1. async add/delete is what the gRPC API uses already on the server.
  2. the Python client talks to the server only synchronously. It'd be a paradigm shift if we were to support Python asyncIO, not to say it is not doable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern on this is that users frequently want to write code once and use it on both the client and server. If that might happen with InputTable, we should have the method to make the API consistent, even if the method is just calling add. If there are no reasonable cases where that might happen, then maybe it isn't a concern. This is what I'm worried about.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a problem with introducing async support, but I do think Jianfeng is right that it may significantly expand scope and require some research on his end. I'd rather we do that in a separate PR, since this one is self-contained and makes things better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with a separate PR.

on_error: Callable[[Exception], None] = None) -> None:
"""Asynchronously writes rows from the provided table to this input table. If this is a keyed input table,
added rows with keys that match existing rows will replace those rows. This method returns immediately without
waiting for the operation to complete. If the operation succeeds, the optional on_success callback if provided
will be called. If the operation fails, the optional on_error callback if provided will be called. If on_error
is not provided, a default callback function will be called that simply prints out the received exception.
Comment on lines +288 to +292
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there order processing guarantees?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes for the requests made on the same thread, and it is probably safe to assume that it is not a typical use pattern to add to an InputTable from multiple threads, in which case, it should fall on the user to sync the threads if some kind of ordering needs to be achieved.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document the guarantees if they exist.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jianfeng's statement around ordering is accurate. That said, it's currently implementation-defined, rather than something the Java interface guarantees. The implementations are thread-safe, and that is to be expected; concurrent usage from multiple threads, however, gives no guarantees about ordering (nor should it).


Note, multiple calls to this method on the same thread will be queued and processed in order. However, ordering
is not guaranteed across threads.

Args:
table (Table): the table that provides the rows to write
on_success (Callable[[], None]): the success callback function, default is None
on_error (Callable[[Exception], None]): the error callback function, default is None. When None, a default
callback function will be provided that simply prints out the received exception. If the callback
function itself raises an exception, the new exception will be logged in the Deephaven server log and
will not be further processed by the server.

Raises:
DHError
"""
try:
if on_error:
on_error_callback = _error_callback_wrapper(on_error)
else:
on_error_callback = _error_callback_wrapper(_DEFAULT_INPUT_TABLE_ON_ERROR_CALLBACK)

j_input_table_status_listener = _JPythonInputTableStatusListenerAdapter.create(on_success,
on_error_callback)
self.j_input_table.addAsync(table.j_table, j_input_table_status_listener)
except Exception as e:
raise DHError(e, "async add to InputTable failed.") from e

def delete_async(self, table: Table, on_success: Callable[[], None] = None,
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
on_error: Callable[[Exception], None] = None) -> None:
"""Asynchronously deletes the keys contained in the provided table from this keyed input table. If this
method is
called on an append-only input table, an error will be raised. This method returns immediately without
waiting for
the operation to complete. If the operation succeeds, the optional on_success callback if provided
will be called. If the operation fails, the optional on_error callback if provided will be called. If on_error
is not provided, a default callback function will be called that simply prints out the received exception.
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved

Note, multiple calls to this method on the same thread will be queued and processed in order. However, ordering
is not guaranteed across threads.

Args:
table (Table): the table with the keys to delete
on_success (Callable[[], None]): the success callback function, default is None
on_error (Callable[[Exception], None]): the error callback function, default is None. When None, a default
callback function will be provided that simply prints out the received exception. If the callback
function itself raises an exception, the new exception will be logged in the Deephaven server log and
will not be further processed by the server.

Raises:
DHError
"""
try:
if on_error:
on_error_callback = _error_callback_wrapper(on_error)
else:
on_error_callback = _error_callback_wrapper(_DEFAULT_INPUT_TABLE_ON_ERROR_CALLBACK)

j_input_table_status_listener = _JPythonInputTableStatusListenerAdapter.create(on_success,
on_error_callback)
self.j_input_table.deleteAsync(table.j_table, j_input_table_status_listener)
except Exception as e:
raise DHError(e, "async delete data in the InputTable failed.") from e

@property
def key_names(self) -> List[str]:
"""The names of the key columns of the InputTable."""
Expand Down Expand Up @@ -354,11 +435,11 @@ def ring_table(parent: Table, capacity: int, initialize: bool = True) -> Table:


def function_generated_table(table_generator: Callable[..., Table],
source_tables: Union[Table, List[Table]] = None,
refresh_interval_ms: int = None,
exec_ctx: ExecutionContext = None,
args: Tuple = (),
kwargs: Dict = {}) -> Table:
source_tables: Union[Table, List[Table]] = None,
refresh_interval_ms: int = None,
exec_ctx: ExecutionContext = None,
args: Tuple = (),
kwargs: Dict = {}) -> Table:
"""Creates an abstract table that is generated by running the table_generator() function. The function will first be
run to generate the table when this method is called, then subsequently either (a) whenever one of the
'source_tables' ticks or (b) after refresh_interval_ms have elapsed. Either 'refresh_interval_ms' or
Expand All @@ -368,13 +449,15 @@ def function_generated_table(table_generator: Callable[..., Table],
function-generated tables can create tables that are produced by arbitrary Python logic (including using Pandas or
numpy). They can also be used to retrieve data from external sources (such as files or websites).

The table definition must not change between invocations of the 'table_generator' function, or an exception will be raised.
The table definition must not change between invocations of the 'table_generator' function, or an exception will
be raised.

Note that the 'table_generator' may access data in the sourceTables but should not perform further table operations
on them without careful handling. Table operations may be memoized, and it is possible that a table operation will
return a table created by a previous invocation of the same operation. Since that result will not have been included
in the 'source_table', it's not automatically treated as a dependency for purposes of determining when it's safe to
invoke 'table_generator', allowing races to exist between accessing the operation result and that result's own update
invoke 'table_generator', allowing races to exist between accessing the operation result and that result's own
update
processing. It's best to include all dependencies directly in 'source_table', or only compute on-demand inputs under
a LivenessScope.

Expand Down Expand Up @@ -441,6 +524,6 @@ def table_generator_function():
j_function_generated_table = _JFunctionGeneratedTableFactory.create(
table_generator_j_function,
source_j_tables
)
)

return Table(j_function_generated_table)
9 changes: 1 addition & 8 deletions py/server/deephaven/table_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from deephaven.jcompat import to_sequence, j_list_to_list
from deephaven.table import Table
from deephaven._table_reader import _table_reader_all_dict, _table_reader_chunk_dict
from deephaven.table_factory import _error_callback_wrapper

_JPythonReplayListenerAdapter = jpy.get_type("io.deephaven.integrations.python.PythonReplayListenerAdapter")
_JTableUpdate = jpy.get_type("io.deephaven.engine.table.TableUpdate")
Expand Down Expand Up @@ -238,14 +239,6 @@ def _wrap_listener_obj(t: Table, listener: TableListener):
return listener


def _error_callback_wrapper(callback: Callable[[Exception], None]):
@wraps(callback)
def wrapper(e):
callback(RuntimeError(e))

return wrapper


class TableListenerHandle(JObjectWrapper):
"""A handle to manage a table listener's lifecycle."""
j_object_type = _JPythonReplayListenerAdapter
Expand Down
61 changes: 56 additions & 5 deletions py/server/tests/test_table_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import jpy
import numpy as np

from time import sleep
from deephaven import DHError, read_csv, time_table, empty_table, merge, merge_sorted, dtypes, new_table, \
input_table, time, _wrapper
from deephaven.column import byte_col, char_col, short_col, bool_col, int_col, long_col, float_col, double_col, \
Expand Down Expand Up @@ -430,10 +430,7 @@ def test_instant_array(self):
from deephaven import dtypes as dht
from deephaven import time as dhtu

col_defs_5 = \
{ \
"InstantArray": dht.instant_array \
}
col_defs_5 = {"InstantArray": dht.instant_array}

dtw5 = DynamicTableWriter(col_defs_5)
t5 = dtw5.table
Expand Down Expand Up @@ -478,6 +475,60 @@ def test_j_input_wrapping(self):
self.assertFalse(isinstance(t, InputTable))
self.assertTrue(isinstance(t, Table))

def test_input_table_async(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to see a test or two where on_error gets called. e.g. wrong schema.

Copy link
Contributor Author

@jmao-denver jmao-denver Sep 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, the error that causes on_error to be called isn't something we can easily produce. Added a test case to demo that.

cols = [
bool_col(name="Boolean", data=[True, False]),
byte_col(name="Byte", data=(1, -1)),
char_col(name="Char", data='-1'),
short_col(name="Short", data=[1, -1]),
int_col(name="Int", data=[1, -1]),
long_col(name="Long", data=[1, -1]),
long_col(name="NPLong", data=np.array([1, -1], dtype=np.int8)),
float_col(name="Float", data=[1.01, -1.01]),
double_col(name="Double", data=[1.01, -1.01]),
string_col(name="String", data=["foo", "bar"]),
]
t = new_table(cols=cols)

with self.subTest("async add"):
self.assertEqual(t.size, 2)
success_count = 0
def on_success():
nonlocal success_count
success_count += 1
append_only_input_table = input_table(col_defs=t.definition)
append_only_input_table.add_async(t, on_success=on_success)
append_only_input_table.add_async(t, on_success=on_success)
while success_count < 2:
sleep(0.1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rcaudy is going to cry

Copy link
Contributor Author

@jmao-denver jmao-denver Sep 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this is the best I can do but I think it is safe/deterministic . Initially I used await_update but because it is what add_async uses behind the scene to wait for UGP to finish processing, it creates a race condition where one of the calls will wait for ever.

self.assertEqual(append_only_input_table.size, 4)

keyed_input_table = input_table(col_defs=t.definition, key_cols="String")
keyed_input_table.add_async(t, on_success=on_success)
keyed_input_table.add_async(t, on_success=on_success)
while success_count < 4:
sleep(0.1)
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
self.assertEqual(keyed_input_table.size, 2)

with self.subTest("async delete"):
keyed_input_table = input_table(init_table=t, key_cols=["String", "Double"])
keyed_input_table.delete_async(t.select(["String", "Double"]), on_success=on_success)
while success_count < 5:
sleep(0.1)
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
self.assertEqual(keyed_input_table.size, 0)
t1 = t.drop_columns("String")

with self.subTest("schema mismatch"):
error_count = 0
def on_error(e: Exception):
nonlocal error_count
error_count += 1

append_only_input_table = input_table(col_defs=t1.definition)
with self.assertRaises(DHError) as cm:
append_only_input_table.add_async(t, on_success=on_success, on_error=on_error)
self.assertEqual(error_count, 0)


if __name__ == '__main__':
unittest.main()