Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-43631: [C++] Add C++ implementation of Async C Data Interface #44495

Open
wants to merge 19 commits into
base: main
Choose a base branch
from

Conversation

zeroshade
Copy link
Member

@zeroshade zeroshade commented Oct 22, 2024

Rationale for this change

Building on #43632 which created the Async C Data Structures, this adds functions to bridge.h/bridge.cc to implement helpers for managing the Async C Data interfaces

What changes are included in this PR?

Two functions added to bridge.h:

  1. CreateAsyncDeviceStreamHandler populates a ArrowAsyncDeviceStreamHandler and an Executor to provide a future that resolves to an AsyncRecordBatchGenerator to produce record batches as they are pushed asynchronously. The ArrowAsyncDeviceStreamHandler can then be passed to any asynchronous producer.
  2. ExportAsyncRecordBatchReader takes a record batch generator and a schema, along with an ArrowAsyncDeviceStreamHandler to use for calling the callbacks to push data as it is available from the generator.

Are these changes tested?

Unit tests are added (currently only one test, more tests to be added)

Are there any user-facing changes?

No

cpp/src/arrow/record_batch.h Outdated Show resolved Hide resolved
cpp/src/arrow/c/bridge.h Show resolved Hide resolved
cpp/src/arrow/c/bridge.h Outdated Show resolved Hide resolved
cpp/src/arrow/c/bridge.h Outdated Show resolved Hide resolved
};

namespace internal {
class Executor;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This really needs to not be internal at some point

cpp/src/arrow/c/bridge_test.cc Outdated Show resolved Hide resolved
cpp/src/arrow/c/bridge_test.cc Outdated Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test that also propagates errors? (Both: before the schema, and after the schema)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added tests to propagate errors

@github-actions github-actions bot added awaiting changes Awaiting changes awaiting change review Awaiting change review and removed awaiting committer review Awaiting committer review awaiting changes Awaiting changes awaiting change review Awaiting change review labels Oct 22, 2024
Status operator()(const std::shared_ptr<RecordBatch>& record) {
std::unique_lock<std::mutex> lock(state_->mutex_);
if (state_->pending_requests_ == 0) {
state_->cv_.wait(lock, [this]() -> bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we supposed to have a blocking call in consuming an AsyncGenerator?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I honestly couldn't figure out a better way around this to manage the backpressure from the calls to request. I'm open to suggestions for a better route here other than simply saying this should not be run on the main thread shrug

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this works for now

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose the way it "should" work is that we never call into the AsyncGenerator unless we have a pending request? But we'd then need to handle more of the control flow here ourselves

cpp/src/arrow/c/bridge.cc Outdated Show resolved Hide resolved
@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting change review Awaiting change review labels Oct 22, 2024
cpp/src/arrow/c/bridge.cc Outdated Show resolved Hide resolved
cpp/src/arrow/c/bridge.cc Show resolved Hide resolved
@github-actions github-actions bot added awaiting change review Awaiting change review awaiting changes Awaiting changes and removed awaiting changes Awaiting changes awaiting change review Awaiting change review labels Oct 22, 2024
@github-actions github-actions bot added the awaiting changes Awaiting changes label Oct 22, 2024
@github-actions github-actions bot added awaiting change review Awaiting change review awaiting changes Awaiting changes and removed awaiting changes Awaiting changes awaiting change review Awaiting change review labels Oct 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants