Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add add_async/delete_async methods in InputTable #6061

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

jmao-denver
Copy link
Contributor

Fixes #3887

@jmao-denver jmao-denver changed the title Add add_async/delete_async methods in InputTable feat: Add add_async/delete_async methods in InputTable Sep 12, 2024
@jmao-denver jmao-denver added this to the 0.37.0 milestone Sep 12, 2024
@jmao-denver jmao-denver marked this pull request as ready for review September 12, 2024 20:51
Comment on lines +290 to +294
"""Asynchronously writes rows from the provided table to this input table. If this is a keyed input table,
added rows with keys that match existing rows will replace those rows. This method returns immediately without
waiting for the operation to complete. If the operation succeeds, the optional on_success callback if provided
will be called. If the operation fails, the optional on_error callback if provided will be called. If on_error
is not provided, a default callback function will be called that simply prints out the received exception.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there order processing guarantees?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes for the requests made on the same thread, and it is probably safe to assume that it is not a typical use pattern to add to an InputTable from multiple threads, in which case, it should fall on the user to sync the threads if some kind of ordering needs to be achieved.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document the guarantees if they exist.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jianfeng's statement around ordering is accurate. That said, it's currently implementation-defined, rather than something the Java interface guarantees. The implementations are thread-safe, and that is to be expected; concurrent usage from multiple threads, however, gives no guarantees about ordering (nor should it).

py/server/deephaven/table_factory.py Show resolved Hide resolved
append_only_input_table.add_async(t, on_success=on_success)
append_only_input_table.add_async(t, on_success=on_success)
while success_count < 2:
sleep(0.1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rcaudy is going to cry

Copy link
Contributor Author

@jmao-denver jmao-denver Sep 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this is the best I can do but I think it is safe/deterministic . Initially I used await_update but because it is what add_async uses behind the scene to wait for UGP to finish processing, it creates a race condition where one of the calls will wait for ever.

py/server/tests/test_table_factory.py Show resolved Hide resolved
py/server/tests/test_table_factory.py Show resolved Hide resolved
@@ -272,6 +285,70 @@ def delete(self, table: Table) -> None:
except Exception as e:
raise DHError(e, "delete data in the InputTable failed.") from e

def add_async(self, table: Table, on_success: Callable[[], None] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

support should also be added to the client

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure that we need to.

  1. async add/delete is what the gRPC API uses already on the server.
  2. the Python client talks to the server only synchronously. It'd be a paradigm shift if we were to support Python asyncIO, not to say it is not doable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern on this is that users frequently want to write code once and use it on both the client and server. If that might happen with InputTable, we should have the method to make the API consistent, even if the method is just calling add. If there are no reasonable cases where that might happen, then maybe it isn't a concern. This is what I'm worried about.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a problem with introducing async support, but I do think Jianfeng is right that it may significantly expand scope and require some research on his end. I'd rather we do that in a separate PR, since this one is self-contained and makes things better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with a separate PR.

py/server/deephaven/table_factory.py Show resolved Hide resolved
@@ -478,6 +475,48 @@ def test_j_input_wrapping(self):
self.assertFalse(isinstance(t, InputTable))
self.assertTrue(isinstance(t, Table))

def test_input_table_async(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to see a test or two where on_error gets called. e.g. wrong schema.

Copy link
Contributor Author

@jmao-denver jmao-denver Sep 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, the error that causes on_error to be called isn't something we can easily produce. Added a test case to demo that.

py/server/deephaven/table_factory.py Show resolved Hide resolved
py/server/deephaven/table_factory.py Outdated Show resolved Hide resolved
Comment on lines +290 to +294
"""Asynchronously writes rows from the provided table to this input table. If this is a keyed input table,
added rows with keys that match existing rows will replace those rows. This method returns immediately without
waiting for the operation to complete. If the operation succeeds, the optional on_success callback if provided
will be called. If the operation fails, the optional on_error callback if provided will be called. If on_error
is not provided, a default callback function will be called that simply prints out the received exception.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jianfeng's statement around ordering is accurate. That said, it's currently implementation-defined, rather than something the Java interface guarantees. The implementations are thread-safe, and that is to be expected; concurrent usage from multiple threads, however, gives no guarantees about ordering (nor should it).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support MutableInputTable.addAsync and deleteAsync from the Python wrappers
3 participants