-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-43631: [C++] Add C++ implementation of Async C Data Interface #44495
base: main
Are you sure you want to change the base?
Conversation
}; | ||
|
||
namespace internal { | ||
class Executor; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This really needs to not be internal at some point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a test that also propagates errors? (Both: before the schema, and after the schema)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added tests to propagate errors
Status operator()(const std::shared_ptr<RecordBatch>& record) { | ||
std::unique_lock<std::mutex> lock(state_->mutex_); | ||
if (state_->pending_requests_ == 0) { | ||
state_->cv_.wait(lock, [this]() -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we supposed to have a blocking call in consuming an AsyncGenerator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I honestly couldn't figure out a better way around this to manage the backpressure from the calls to request
. I'm open to suggestions for a better route here other than simply saying this should not be run on the main thread shrug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this works for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose the way it "should" work is that we never call into the AsyncGenerator unless we have a pending request? But we'd then need to handle more of the control flow here ourselves
Co-authored-by: David Li <[email protected]>
Co-authored-by: David Li <[email protected]>
2ebbbb0
to
85178f7
Compare
Rationale for this change
Building on #43632 which created the Async C Data Structures, this adds functions to
bridge.h
/bridge.cc
to implement helpers for managing the Async C Data interfacesWhat changes are included in this PR?
Two functions added to bridge.h:
CreateAsyncDeviceStreamHandler
populates aArrowAsyncDeviceStreamHandler
and anExecutor
to provide a future that resolves to anAsyncRecordBatchGenerator
to produce record batches as they are pushed asynchronously. TheArrowAsyncDeviceStreamHandler
can then be passed to any asynchronous producer.ExportAsyncRecordBatchReader
takes a record batch generator and a schema, along with anArrowAsyncDeviceStreamHandler
to use for calling the callbacks to push data as it is available from the generator.Are these changes tested?
Unit tests are added (currently only one test, more tests to be added)
Are there any user-facing changes?
No