From 7a09bfbe7f50a3b76242e73a2f7c6bc4de5a5534 Mon Sep 17 00:00:00 2001 From: Elisha Ziskind Date: Mon, 19 Aug 2024 14:03:52 -0400 Subject: [PATCH] ext_proc: expose main stream info from the async client steam info (#35650) Some upstream HTTP filters running in the ext_proc sidestream may require access to metadata associated with the main stream established for the client. This change exposes the main ("parent") stream_info from the async client's stream info. We need to clear this (to avoid a dangling reference) in the case of deferred stream closure, where the main stream closes before the async client stream. --------- Signed-off-by: Elisha Ziskind --- envoy/grpc/async_client.h | 1 + envoy/http/async_client.h | 1 + envoy/stream_info/stream_info.h | 17 +++++++++++++++++ source/common/grpc/async_client_impl.h | 1 + source/common/grpc/google_async_client_impl.h | 1 + source/common/grpc/typed_async_client.h | 1 + source/common/http/async_client_impl.cc | 4 ++++ source/common/http/async_client_impl.h | 2 +- source/common/stream_info/stream_info_impl.h | 10 ++++++++++ .../extensions/filters/http/ext_proc/client.h | 1 + .../filters/http/ext_proc/client_impl.h | 9 +++++++-- test/common/http/async_client_impl_test.cc | 12 ++++++++++++ .../filters/http/ext_proc/mock_server.h | 1 + .../http/ext_proc/unit_test_fuzz/mocks.h | 1 + test/mocks/grpc/mocks.h | 1 + test/mocks/http/mocks.h | 1 + test/mocks/stream_info/mocks.h | 3 +++ 17 files changed, 64 insertions(+), 3 deletions(-) diff --git a/envoy/grpc/async_client.h b/envoy/grpc/async_client.h index d263b128fd78..eea2a37701b4 100644 --- a/envoy/grpc/async_client.h +++ b/envoy/grpc/async_client.h @@ -70,6 +70,7 @@ class RawAsyncStream { * @returns the stream info object associated with this stream. */ virtual const StreamInfo::StreamInfo& streamInfo() const PURE; + virtual StreamInfo::StreamInfo& streamInfo() PURE; /*** * Register a callback to be called when high/low write buffer watermark events occur on the diff --git a/envoy/http/async_client.h b/envoy/http/async_client.h index 24e47c136038..e48c50b2de8c 100644 --- a/envoy/http/async_client.h +++ b/envoy/http/async_client.h @@ -234,6 +234,7 @@ class AsyncClient { * @returns the stream info object associated with the stream. */ virtual const StreamInfo::StreamInfo& streamInfo() const PURE; + virtual StreamInfo::StreamInfo& streamInfo() PURE; }; /*** diff --git a/envoy/stream_info/stream_info.h b/envoy/stream_info/stream_info.h index f3b8232967d6..39ffd18cef6a 100644 --- a/envoy/stream_info/stream_info.h +++ b/envoy/stream_info/stream_info.h @@ -980,6 +980,23 @@ class StreamInfo { */ virtual bool shouldDrainConnectionUponCompletion() const PURE; + /** + * Set the parent for this StreamInfo. This is used to associate the + * stream info of an async client with the stream info of the downstream + * connection. + */ + virtual void setParentStreamInfo(const StreamInfo& parent_stream_info) PURE; + + /** + * Get the parent for this StreamInfo, if available. + */ + virtual OptRef parentStreamInfo() const PURE; + + /** + * Clear the parent for this StreamInfo. + */ + virtual void clearParentStreamInfo() PURE; + /** * Called if the connection decides to drain itself after serving this request. * @param should_drain true to close the connection once this stream has diff --git a/source/common/grpc/async_client_impl.h b/source/common/grpc/async_client_impl.h index b1788812a0e9..aa4fb580455b 100644 --- a/source/common/grpc/async_client_impl.h +++ b/source/common/grpc/async_client_impl.h @@ -87,6 +87,7 @@ class AsyncStreamImpl : public RawAsyncStream, bool hasResetStream() const { return http_reset_; } const StreamInfo::StreamInfo& streamInfo() const override { return stream_->streamInfo(); } + StreamInfo::StreamInfo& streamInfo() override { return stream_->streamInfo(); } void setWatermarkCallbacks(Http::SidestreamWatermarkCallbacks& callbacks) override { stream_->setWatermarkCallbacks(callbacks); diff --git a/source/common/grpc/google_async_client_impl.h b/source/common/grpc/google_async_client_impl.h index 23bb2b6fbfdf..9ee8bdc28b30 100644 --- a/source/common/grpc/google_async_client_impl.h +++ b/source/common/grpc/google_async_client_impl.h @@ -236,6 +236,7 @@ class GoogleAsyncStreamImpl : public RawAsyncStream, return bytes_in_write_pending_queue_ > parent_.perStreamBufferLimitBytes(); } const StreamInfo::StreamInfo& streamInfo() const override { return unused_stream_info_; } + StreamInfo::StreamInfo& streamInfo() override { return unused_stream_info_; } // Google-gRPC code doesn't use Envoy watermark buffers, so the functions below are not used. void setWatermarkCallbacks(Http::SidestreamWatermarkCallbacks&) override {} diff --git a/source/common/grpc/typed_async_client.h b/source/common/grpc/typed_async_client.h index 294cb98de3e4..c4ed83f74bf5 100644 --- a/source/common/grpc/typed_async_client.h +++ b/source/common/grpc/typed_async_client.h @@ -58,6 +58,7 @@ template class AsyncStream /* : public RawAsyncStream */ { bool operator==(RawAsyncStream* stream) const { return stream_ == stream; } bool operator!=(RawAsyncStream* stream) const { return stream_ != stream; } const StreamInfo::StreamInfo& streamInfo() const { return stream_->streamInfo(); } + StreamInfo::StreamInfo& streamInfo() { return stream_->streamInfo(); } private: RawAsyncStream* stream_{}; diff --git a/source/common/http/async_client_impl.cc b/source/common/http/async_client_impl.cc index a0bec05e0d5a..493beb35c9a7 100644 --- a/source/common/http/async_client_impl.cc +++ b/source/common/http/async_client_impl.cc @@ -133,6 +133,10 @@ AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCal stream_info_.setUpstreamClusterInfo(parent_.cluster_); stream_info_.route_ = route_; + if (options.parent_context.stream_info != nullptr) { + stream_info_.setParentStreamInfo(*options.parent_context.stream_info); + } + if (options.buffer_body_for_retry) { buffered_body_ = std::make_unique(account_); } diff --git a/source/common/http/async_client_impl.h b/source/common/http/async_client_impl.h index 91214a81a605..efbceb7e93d7 100644 --- a/source/common/http/async_client_impl.h +++ b/source/common/http/async_client_impl.h @@ -150,6 +150,7 @@ class AsyncStreamImpl : public virtual AsyncClient::Stream, void reset() override; bool isAboveWriteBufferHighWatermark() const override { return high_watermark_calls_ > 0; } const StreamInfo::StreamInfo& streamInfo() const override { return stream_info_; } + StreamInfo::StreamInfoImpl& streamInfo() override { return stream_info_; } protected: AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks, @@ -157,7 +158,6 @@ class AsyncStreamImpl : public virtual AsyncClient::Stream, bool remoteClosed() { return remote_closed_; } void closeLocal(bool end_stream); - StreamInfo::StreamInfoImpl& streamInfo() override { return stream_info_; } AsyncClientImpl& parent_; // Callback to listen for stream destruction. diff --git a/source/common/stream_info/stream_info_impl.h b/source/common/stream_info/stream_info_impl.h index bcd5d6cc17eb..f74ad7cdd406 100644 --- a/source/common/stream_info/stream_info_impl.h +++ b/source/common/stream_info/stream_info_impl.h @@ -416,6 +416,7 @@ struct StreamInfoImpl : public StreamInfo { upstream_bytes_meter_ = info.getUpstreamBytesMeter(); bytes_sent_ = info.bytesSent(); is_shadow_ = info.isShadow(); + parent_stream_info_ = info.parentStreamInfo(); } void setIsShadow(bool is_shadow) { is_shadow_ = is_shadow; } @@ -441,6 +442,14 @@ struct StreamInfoImpl : public StreamInfo { should_drain_connection_ = should_drain; } + void setParentStreamInfo(const StreamInfo& parent_stream_info) override { + parent_stream_info_ = parent_stream_info; + } + + OptRef parentStreamInfo() const override { return parent_stream_info_; } + + void clearParentStreamInfo() override { parent_stream_info_.reset(); } + TimeSource& time_source_; SystemTime start_time_; MonotonicTime start_time_monotonic_; @@ -489,6 +498,7 @@ struct StreamInfoImpl : public StreamInfo { std::string downstream_transport_failure_reason_; bool should_scheme_match_upstream_{false}; bool should_drain_connection_{false}; + OptRef parent_stream_info_; }; } // namespace StreamInfo diff --git a/source/extensions/filters/http/ext_proc/client.h b/source/extensions/filters/http/ext_proc/client.h index 79913d16489b..54493c094fe3 100644 --- a/source/extensions/filters/http/ext_proc/client.h +++ b/source/extensions/filters/http/ext_proc/client.h @@ -24,6 +24,7 @@ class ExternalProcessorStream { // Idempotent close. Return true if it actually closed. virtual bool close() PURE; virtual const StreamInfo::StreamInfo& streamInfo() const PURE; + virtual StreamInfo::StreamInfo& streamInfo() PURE; virtual void notifyFilterDestroy() PURE; }; diff --git a/source/extensions/filters/http/ext_proc/client_impl.h b/source/extensions/filters/http/ext_proc/client_impl.h index d0c08d24f507..745bd3f167c8 100644 --- a/source/extensions/filters/http/ext_proc/client_impl.h +++ b/source/extensions/filters/http/ext_proc/client_impl.h @@ -60,8 +60,12 @@ class ExternalProcessorStreamImpl : public ExternalProcessorStream, // Unregister the watermark callbacks(if any) to prevent access of filter callbacks after // the filter object is destroyed. - if (grpc_side_stream_flow_control_ && !stream_closed_) { - stream_.removeWatermarkCallbacks(); + if (!stream_closed_) { + // Remove the parent stream info to avoid a dangling reference. + stream_.streamInfo().clearParentStreamInfo(); + if (grpc_side_stream_flow_control_) { + stream_.removeWatermarkCallbacks(); + } } } @@ -74,6 +78,7 @@ class ExternalProcessorStreamImpl : public ExternalProcessorStream, void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&& metadata) override; void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override; const StreamInfo::StreamInfo& streamInfo() const override { return stream_.streamInfo(); } + StreamInfo::StreamInfo& streamInfo() override { return stream_.streamInfo(); } bool grpcSidestreamFlowControl() { return grpc_side_stream_flow_control_; } diff --git a/test/common/http/async_client_impl_test.cc b/test/common/http/async_client_impl_test.cc index 2d5bc5892611..0bd9c5c5e502 100644 --- a/test/common/http/async_client_impl_test.cc +++ b/test/common/http/async_client_impl_test.cc @@ -2195,6 +2195,18 @@ TEST_F(AsyncClientImplTest, DumpState) { EXPECT_CALL(stream_callbacks_, onReset()); } +TEST_F(AsyncClientImplTest, ParentStreamInfo) { + NiceMock parent_stream_info; + auto options = AsyncClient::StreamOptions(); + options.parent_context.stream_info = &parent_stream_info; + AsyncClient::Stream* stream = client_.start(stream_callbacks_, options); + EXPECT_TRUE(stream->streamInfo().parentStreamInfo().has_value()); + EXPECT_EQ(stream->streamInfo().parentStreamInfo().ptr(), + dynamic_cast(&parent_stream_info)); + stream->streamInfo().clearParentStreamInfo(); + EXPECT_FALSE(stream->streamInfo().parentStreamInfo().has_value()); +} + } // namespace // Must not be in anonymous namespace for friend to work. diff --git a/test/extensions/filters/http/ext_proc/mock_server.h b/test/extensions/filters/http/ext_proc/mock_server.h index 6ee9d92b32aa..d0b0389b0fd4 100644 --- a/test/extensions/filters/http/ext_proc/mock_server.h +++ b/test/extensions/filters/http/ext_proc/mock_server.h @@ -26,6 +26,7 @@ class MockStream : public ExternalProcessorStream { MOCK_METHOD(void, send, (envoy::service::ext_proc::v3::ProcessingRequest&&, bool)); MOCK_METHOD(bool, close, ()); MOCK_METHOD(const StreamInfo::StreamInfo&, streamInfo, (), (const override)); + MOCK_METHOD(StreamInfo::StreamInfo&, streamInfo, ()); MOCK_METHOD(void, notifyFilterDestroy, ()); }; diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h b/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h index f346ee34b8e9..49ff067dd353 100644 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h +++ b/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h @@ -21,6 +21,7 @@ class MockStream : public ExternalProcessing::ExternalProcessorStream { (envoy::service::ext_proc::v3::ProcessingRequest && request, bool end_stream)); MOCK_METHOD(bool, close, ()); MOCK_METHOD(const StreamInfo::StreamInfo&, streamInfo, (), (const override)); + MOCK_METHOD(StreamInfo::StreamInfo&, streamInfo, ()); MOCK_METHOD(void, notifyFilterDestroy, ()); }; diff --git a/test/mocks/grpc/mocks.h b/test/mocks/grpc/mocks.h index bd9287102642..d1d41f211115 100644 --- a/test/mocks/grpc/mocks.h +++ b/test/mocks/grpc/mocks.h @@ -39,6 +39,7 @@ class MockAsyncStream : public RawAsyncStream { MOCK_METHOD(void, resetStream, ()); MOCK_METHOD(bool, isAboveWriteBufferHighWatermark, (), (const)); MOCK_METHOD(const StreamInfo::StreamInfo&, streamInfo, (), (const)); + MOCK_METHOD(StreamInfo::StreamInfo&, streamInfo, (), ()); MOCK_METHOD(void, setWatermarkCallbacks, (Http::SidestreamWatermarkCallbacks&)); MOCK_METHOD(void, removeWatermarkCallbacks, ()); }; diff --git a/test/mocks/http/mocks.h b/test/mocks/http/mocks.h index 2aa5f34cb1b9..cdf2690c2a1c 100644 --- a/test/mocks/http/mocks.h +++ b/test/mocks/http/mocks.h @@ -577,6 +577,7 @@ class MockAsyncClientStream : public virtual AsyncClient::Stream { (override)); MOCK_METHOD(void, removeWatermarkCallbacks, (), (override)); MOCK_METHOD(const StreamInfo::StreamInfo&, streamInfo, (), (const override)); + MOCK_METHOD(StreamInfo::StreamInfo&, streamInfo, (), (override)); private: absl::optional destructor_callback_; diff --git a/test/mocks/stream_info/mocks.h b/test/mocks/stream_info/mocks.h index d1c2df8d0fef..fc81b0fcf6d6 100644 --- a/test/mocks/stream_info/mocks.h +++ b/test/mocks/stream_info/mocks.h @@ -172,6 +172,9 @@ class MockStreamInfo : public StreamInfo { MOCK_METHOD(void, setShouldSchemeMatchUpstream, (bool)); MOCK_METHOD(bool, shouldDrainConnectionUponCompletion, (), (const)); MOCK_METHOD(void, setShouldDrainConnectionUponCompletion, (bool)); + MOCK_METHOD(void, setParentStreamInfo, (const StreamInfo&), ()); + MOCK_METHOD(void, clearParentStreamInfo, ()); + MOCK_METHOD(OptRef, parentStreamInfo, (), (const)); Envoy::Event::SimulatedTimeSystem ts_; SystemTime start_time_;