Skip to content

Commit

Permalink
C++ Client: Keep FlightStreamWriter alive for the duration of the DoE…
Browse files Browse the repository at this point in the history
…xchange (#4725)
  • Loading branch information
kosak authored Oct 26, 2023
1 parent f6dc16c commit 370c744
Showing 1 changed file with 16 additions and 8 deletions.
24 changes: 16 additions & 8 deletions cpp-client/deephaven/dhclient/src/subscription/subscribe_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ class UpdateProcessor final : public SubscriptionHandle {
public:
[[nodiscard]]
static std::shared_ptr<UpdateProcessor> startThread(std::unique_ptr<FlightStreamReader> fsr,
std::shared_ptr<Schema> schema, std::shared_ptr<TickingCallback> callback);
std::unique_ptr<FlightStreamWriter> fsw, std::shared_ptr<Schema> schema,
std::shared_ptr<TickingCallback> callback);

UpdateProcessor(std::unique_ptr<FlightStreamReader> fsr,
UpdateProcessor(std::unique_ptr<FlightStreamReader> fsr, std::unique_ptr<FlightStreamWriter> fsw,
std::shared_ptr<Schema> schema, std::shared_ptr<TickingCallback> callback);
~UpdateProcessor() final;

Expand All @@ -82,6 +83,10 @@ class UpdateProcessor final : public SubscriptionHandle {

private:
std::unique_ptr<FlightStreamReader> fsr_;
// The FlightStreamWriter is not used inside the thread, but arrow Flight >= 8.0.0 seems to
// require that it stay alive (along with the FlightStreamReader) for the duration of the
// DoExchange session.
std::unique_ptr<FlightStreamWriter> fsw_;
std::shared_ptr<Schema> schema_;
std::shared_ptr<TickingCallback> callback_;

Expand Down Expand Up @@ -163,24 +168,27 @@ std::shared_ptr<SubscriptionHandle> SubscribeState::InvokeHelper() {
OkOrThrow(DEEPHAVEN_LOCATION_EXPR(fsw->WriteMetadata(std::move(buffer))));

// Run forever (until error or cancellation)
auto processor = UpdateProcessor::startThread(std::move(fsr), std::move(schema_),
auto processor = UpdateProcessor::startThread(std::move(fsr), std::move(fsw), std::move(schema_),
std::move(callback_));
return processor;
}

std::shared_ptr<UpdateProcessor> UpdateProcessor::startThread(
std::unique_ptr<FlightStreamReader> fsr, std::shared_ptr<Schema> schema,
std::unique_ptr<FlightStreamReader> fsr,
std::unique_ptr<FlightStreamWriter> fsw,
std::shared_ptr<Schema> schema,
std::shared_ptr<TickingCallback> callback) {
auto result = std::make_shared<UpdateProcessor>(std::move(fsr), std::move(schema),
std::move(callback));
auto result = std::make_shared<UpdateProcessor>(std::move(fsr), std::move(fsw),
std::move(schema), std::move(callback));
result->thread_ = std::thread(&RunUntilCancelled, result);
return result;
}

UpdateProcessor::UpdateProcessor(std::unique_ptr<FlightStreamReader> fsr,
std::unique_ptr<FlightStreamWriter> fsw,
std::shared_ptr<Schema> schema, std::shared_ptr<TickingCallback> callback) :
fsr_(std::move(fsr)), schema_(std::move(schema)), callback_(std::move(callback)),
cancelled_(false) {}
fsr_(std::move(fsr)), fsw_(std::move(fsw)), schema_(std::move(schema)),
callback_(std::move(callback)), cancelled_(false) {}

UpdateProcessor::~UpdateProcessor() {
Cancel();
Expand Down

0 comments on commit 370c744

Please sign in to comment.