Skip to content

Commit

Permalink
ext_proc: expose main stream info from the async client steam info (e…
Browse files Browse the repository at this point in the history
…nvoyproxy#35650)

Some upstream HTTP filters running in the ext_proc sidestream may
require access to metadata associated with the main stream established
for the client. This change exposes the main ("parent") stream_info from
the async client's stream info. We need to clear this (to avoid a
dangling reference) in the case of deferred stream closure, where the
main stream closes before the async client stream.

---------

Signed-off-by: Elisha Ziskind <[email protected]>
  • Loading branch information
eziskind authored Aug 19, 2024
1 parent 6648db2 commit 7a09bfb
Show file tree
Hide file tree
Showing 17 changed files with 64 additions and 3 deletions.
1 change: 1 addition & 0 deletions envoy/grpc/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class RawAsyncStream {
* @returns the stream info object associated with this stream.
*/
virtual const StreamInfo::StreamInfo& streamInfo() const PURE;
virtual StreamInfo::StreamInfo& streamInfo() PURE;

/***
* Register a callback to be called when high/low write buffer watermark events occur on the
Expand Down
1 change: 1 addition & 0 deletions envoy/http/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ class AsyncClient {
* @returns the stream info object associated with the stream.
*/
virtual const StreamInfo::StreamInfo& streamInfo() const PURE;
virtual StreamInfo::StreamInfo& streamInfo() PURE;
};

/***
Expand Down
17 changes: 17 additions & 0 deletions envoy/stream_info/stream_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,23 @@ class StreamInfo {
*/
virtual bool shouldDrainConnectionUponCompletion() const PURE;

/**
* Set the parent for this StreamInfo. This is used to associate the
* stream info of an async client with the stream info of the downstream
* connection.
*/
virtual void setParentStreamInfo(const StreamInfo& parent_stream_info) PURE;

/**
* Get the parent for this StreamInfo, if available.
*/
virtual OptRef<const StreamInfo> parentStreamInfo() const PURE;

/**
* Clear the parent for this StreamInfo.
*/
virtual void clearParentStreamInfo() PURE;

/**
* Called if the connection decides to drain itself after serving this request.
* @param should_drain true to close the connection once this stream has
Expand Down
1 change: 1 addition & 0 deletions source/common/grpc/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class AsyncStreamImpl : public RawAsyncStream,

bool hasResetStream() const { return http_reset_; }
const StreamInfo::StreamInfo& streamInfo() const override { return stream_->streamInfo(); }
StreamInfo::StreamInfo& streamInfo() override { return stream_->streamInfo(); }

void setWatermarkCallbacks(Http::SidestreamWatermarkCallbacks& callbacks) override {
stream_->setWatermarkCallbacks(callbacks);
Expand Down
1 change: 1 addition & 0 deletions source/common/grpc/google_async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ class GoogleAsyncStreamImpl : public RawAsyncStream,
return bytes_in_write_pending_queue_ > parent_.perStreamBufferLimitBytes();
}
const StreamInfo::StreamInfo& streamInfo() const override { return unused_stream_info_; }
StreamInfo::StreamInfo& streamInfo() override { return unused_stream_info_; }

// Google-gRPC code doesn't use Envoy watermark buffers, so the functions below are not used.
void setWatermarkCallbacks(Http::SidestreamWatermarkCallbacks&) override {}
Expand Down
1 change: 1 addition & 0 deletions source/common/grpc/typed_async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ template <typename Request> class AsyncStream /* : public RawAsyncStream */ {
bool operator==(RawAsyncStream* stream) const { return stream_ == stream; }
bool operator!=(RawAsyncStream* stream) const { return stream_ != stream; }
const StreamInfo::StreamInfo& streamInfo() const { return stream_->streamInfo(); }
StreamInfo::StreamInfo& streamInfo() { return stream_->streamInfo(); }

private:
RawAsyncStream* stream_{};
Expand Down
4 changes: 4 additions & 0 deletions source/common/http/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCal
stream_info_.setUpstreamClusterInfo(parent_.cluster_);
stream_info_.route_ = route_;

if (options.parent_context.stream_info != nullptr) {
stream_info_.setParentStreamInfo(*options.parent_context.stream_info);
}

if (options.buffer_body_for_retry) {
buffered_body_ = std::make_unique<Buffer::OwnedImpl>(account_);
}
Expand Down
2 changes: 1 addition & 1 deletion source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,14 @@ class AsyncStreamImpl : public virtual AsyncClient::Stream,
void reset() override;
bool isAboveWriteBufferHighWatermark() const override { return high_watermark_calls_ > 0; }
const StreamInfo::StreamInfo& streamInfo() const override { return stream_info_; }
StreamInfo::StreamInfoImpl& streamInfo() override { return stream_info_; }

protected:
AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks,
const AsyncClient::StreamOptions& options, absl::Status& creation_status);

bool remoteClosed() { return remote_closed_; }
void closeLocal(bool end_stream);
StreamInfo::StreamInfoImpl& streamInfo() override { return stream_info_; }

AsyncClientImpl& parent_;
// Callback to listen for stream destruction.
Expand Down
10 changes: 10 additions & 0 deletions source/common/stream_info/stream_info_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ struct StreamInfoImpl : public StreamInfo {
upstream_bytes_meter_ = info.getUpstreamBytesMeter();
bytes_sent_ = info.bytesSent();
is_shadow_ = info.isShadow();
parent_stream_info_ = info.parentStreamInfo();
}

void setIsShadow(bool is_shadow) { is_shadow_ = is_shadow; }
Expand All @@ -441,6 +442,14 @@ struct StreamInfoImpl : public StreamInfo {
should_drain_connection_ = should_drain;
}

void setParentStreamInfo(const StreamInfo& parent_stream_info) override {
parent_stream_info_ = parent_stream_info;
}

OptRef<const StreamInfo> parentStreamInfo() const override { return parent_stream_info_; }

void clearParentStreamInfo() override { parent_stream_info_.reset(); }

TimeSource& time_source_;
SystemTime start_time_;
MonotonicTime start_time_monotonic_;
Expand Down Expand Up @@ -489,6 +498,7 @@ struct StreamInfoImpl : public StreamInfo {
std::string downstream_transport_failure_reason_;
bool should_scheme_match_upstream_{false};
bool should_drain_connection_{false};
OptRef<const StreamInfo> parent_stream_info_;
};

} // namespace StreamInfo
Expand Down
1 change: 1 addition & 0 deletions source/extensions/filters/http/ext_proc/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class ExternalProcessorStream {
// Idempotent close. Return true if it actually closed.
virtual bool close() PURE;
virtual const StreamInfo::StreamInfo& streamInfo() const PURE;
virtual StreamInfo::StreamInfo& streamInfo() PURE;
virtual void notifyFilterDestroy() PURE;
};

Expand Down
9 changes: 7 additions & 2 deletions source/extensions/filters/http/ext_proc/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,12 @@ class ExternalProcessorStreamImpl : public ExternalProcessorStream,

// Unregister the watermark callbacks(if any) to prevent access of filter callbacks after
// the filter object is destroyed.
if (grpc_side_stream_flow_control_ && !stream_closed_) {
stream_.removeWatermarkCallbacks();
if (!stream_closed_) {
// Remove the parent stream info to avoid a dangling reference.
stream_.streamInfo().clearParentStreamInfo();
if (grpc_side_stream_flow_control_) {
stream_.removeWatermarkCallbacks();
}
}
}

Expand All @@ -74,6 +78,7 @@ class ExternalProcessorStreamImpl : public ExternalProcessorStream,
void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&& metadata) override;
void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override;
const StreamInfo::StreamInfo& streamInfo() const override { return stream_.streamInfo(); }
StreamInfo::StreamInfo& streamInfo() override { return stream_.streamInfo(); }

bool grpcSidestreamFlowControl() { return grpc_side_stream_flow_control_; }

Expand Down
12 changes: 12 additions & 0 deletions test/common/http/async_client_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2195,6 +2195,18 @@ TEST_F(AsyncClientImplTest, DumpState) {
EXPECT_CALL(stream_callbacks_, onReset());
}

TEST_F(AsyncClientImplTest, ParentStreamInfo) {
NiceMock<StreamInfo::MockStreamInfo> parent_stream_info;
auto options = AsyncClient::StreamOptions();
options.parent_context.stream_info = &parent_stream_info;
AsyncClient::Stream* stream = client_.start(stream_callbacks_, options);
EXPECT_TRUE(stream->streamInfo().parentStreamInfo().has_value());
EXPECT_EQ(stream->streamInfo().parentStreamInfo().ptr(),
dynamic_cast<const StreamInfo::StreamInfo*>(&parent_stream_info));
stream->streamInfo().clearParentStreamInfo();
EXPECT_FALSE(stream->streamInfo().parentStreamInfo().has_value());
}

} // namespace

// Must not be in anonymous namespace for friend to work.
Expand Down
1 change: 1 addition & 0 deletions test/extensions/filters/http/ext_proc/mock_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class MockStream : public ExternalProcessorStream {
MOCK_METHOD(void, send, (envoy::service::ext_proc::v3::ProcessingRequest&&, bool));
MOCK_METHOD(bool, close, ());
MOCK_METHOD(const StreamInfo::StreamInfo&, streamInfo, (), (const override));
MOCK_METHOD(StreamInfo::StreamInfo&, streamInfo, ());
MOCK_METHOD(void, notifyFilterDestroy, ());
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class MockStream : public ExternalProcessing::ExternalProcessorStream {
(envoy::service::ext_proc::v3::ProcessingRequest && request, bool end_stream));
MOCK_METHOD(bool, close, ());
MOCK_METHOD(const StreamInfo::StreamInfo&, streamInfo, (), (const override));
MOCK_METHOD(StreamInfo::StreamInfo&, streamInfo, ());
MOCK_METHOD(void, notifyFilterDestroy, ());
};

Expand Down
1 change: 1 addition & 0 deletions test/mocks/grpc/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class MockAsyncStream : public RawAsyncStream {
MOCK_METHOD(void, resetStream, ());
MOCK_METHOD(bool, isAboveWriteBufferHighWatermark, (), (const));
MOCK_METHOD(const StreamInfo::StreamInfo&, streamInfo, (), (const));
MOCK_METHOD(StreamInfo::StreamInfo&, streamInfo, (), ());
MOCK_METHOD(void, setWatermarkCallbacks, (Http::SidestreamWatermarkCallbacks&));
MOCK_METHOD(void, removeWatermarkCallbacks, ());
};
Expand Down
1 change: 1 addition & 0 deletions test/mocks/http/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,7 @@ class MockAsyncClientStream : public virtual AsyncClient::Stream {
(override));
MOCK_METHOD(void, removeWatermarkCallbacks, (), (override));
MOCK_METHOD(const StreamInfo::StreamInfo&, streamInfo, (), (const override));
MOCK_METHOD(StreamInfo::StreamInfo&, streamInfo, (), (override));

private:
absl::optional<AsyncClient::StreamDestructorCallbacks> destructor_callback_;
Expand Down
3 changes: 3 additions & 0 deletions test/mocks/stream_info/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ class MockStreamInfo : public StreamInfo {
MOCK_METHOD(void, setShouldSchemeMatchUpstream, (bool));
MOCK_METHOD(bool, shouldDrainConnectionUponCompletion, (), (const));
MOCK_METHOD(void, setShouldDrainConnectionUponCompletion, (bool));
MOCK_METHOD(void, setParentStreamInfo, (const StreamInfo&), ());
MOCK_METHOD(void, clearParentStreamInfo, ());
MOCK_METHOD(OptRef<const StreamInfo>, parentStreamInfo, (), (const));

Envoy::Event::SimulatedTimeSystem ts_;
SystemTime start_time_;
Expand Down

0 comments on commit 7a09bfb

Please sign in to comment.