Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flow control: side stream flow control part 2 #35077

Merged
merged 17 commits into from
Jul 25, 2024
2 changes: 1 addition & 1 deletion envoy/grpc/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class RawAsyncStream {
* removeWatermarkCallbacks. If there's already a watermark callback registered, this method
* will trigger ENVOY_BUG.
*/
virtual void setWatermarkCallbacks(Http::DecoderFilterWatermarkCallbacks& callbacks) PURE;
virtual void setWatermarkCallbacks(Http::SidestreamWatermarkCallbacks& callbacks) PURE;

/***
* Remove previously set watermark callbacks. If there's no watermark callback registered, this
Expand Down
24 changes: 23 additions & 1 deletion envoy/http/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,28 @@ using FilterConfigSharedPtr = std::shared_ptr<FilterConfig>;
} // namespace Router
namespace Http {

class SidestreamWatermarkCallbacks {
tyxia marked this conversation as resolved.
Show resolved Hide resolved
public:
virtual ~SidestreamWatermarkCallbacks() = default;

/**
* Called when the sidestream connection or stream goes over its high watermark. Note that this
* may be called separately for both the stream going over and the connection going over. It
* is the responsibility of the sidestreamWatermarkCallbacks implementation to handle unwinding
* multiple high and low watermark calls.
*/
virtual void onAboveWriteBufferHighWatermark() PURE;
tyxia marked this conversation as resolved.
Show resolved Hide resolved

/**
* Called when the sidestream connection or stream goes from over its high watermark to under its
* low watermark. As with onAboveWriteBufferHighWatermark above, this may be called independently
* when both the stream and the connection go under the low watermark limit, and the callee must
* ensure that the flow of data does not resume until all callers which were above their high
* watermarks have gone below.
*/
virtual void onBelowWriteBufferLowWatermark() PURE;
};

/**
* Supports sending an HTTP request message and receiving a response asynchronously.
*/
Expand Down Expand Up @@ -191,7 +213,7 @@ class AsyncClient {
* removeWatermarkCallbacks. If there's already a watermark callback registered, this method
* will trigger ENVOY_BUG.
*/
virtual void setWatermarkCallbacks(DecoderFilterWatermarkCallbacks& callbacks) PURE;
virtual void setWatermarkCallbacks(Http::SidestreamWatermarkCallbacks& callbacks) PURE;

/***
* Remove previously set watermark callbacks. If there's no watermark callback registered, this
Expand Down
2 changes: 1 addition & 1 deletion source/common/grpc/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class AsyncStreamImpl : public RawAsyncStream,
bool hasResetStream() const { return http_reset_; }
const StreamInfo::StreamInfo& streamInfo() const override { return stream_->streamInfo(); }

void setWatermarkCallbacks(Http::DecoderFilterWatermarkCallbacks& callbacks) override {
void setWatermarkCallbacks(Http::SidestreamWatermarkCallbacks& callbacks) override {
stream_->setWatermarkCallbacks(callbacks);
}

Expand Down
2 changes: 1 addition & 1 deletion source/common/grpc/google_async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ class GoogleAsyncStreamImpl : public RawAsyncStream,
const StreamInfo::StreamInfo& streamInfo() const override { return unused_stream_info_; }

// Google-gRPC code doesn't use Envoy watermark buffers, so the functions below are not used.
void setWatermarkCallbacks(Http::DecoderFilterWatermarkCallbacks&) override {}
void setWatermarkCallbacks(Http::SidestreamWatermarkCallbacks&) override {}
void removeWatermarkCallbacks() override {}

protected:
Expand Down
2 changes: 1 addition & 1 deletion source/common/grpc/typed_async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ template <typename Request> class AsyncStream /* : public RawAsyncStream */ {
return stream_->isAboveWriteBufferHighWatermark();
}

void setWatermarkCallbacks(Envoy::Http::DecoderFilterWatermarkCallbacks& callbacks) {
void setWatermarkCallbacks(Http::SidestreamWatermarkCallbacks& callbacks) {
stream_->setWatermarkCallbacks(callbacks);
}

Expand Down
12 changes: 6 additions & 6 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,18 @@ class AsyncStreamImpl : public virtual AsyncClient::Stream,
destructor_callback_.reset();
}

void setWatermarkCallbacks(DecoderFilterWatermarkCallbacks& callbacks) override {
void setWatermarkCallbacks(Http::SidestreamWatermarkCallbacks& callbacks) override {
ENVOY_BUG(!watermark_callbacks_, "Watermark callbacks should not already be registered!");
watermark_callbacks_.emplace(callbacks);
for (uint32_t i = 0; i < high_watermark_calls_; ++i) {
watermark_callbacks_->get().onDecoderFilterAboveWriteBufferHighWatermark();
watermark_callbacks_->get().onAboveWriteBufferHighWatermark();
}
}

void removeWatermarkCallbacks() override {
ENVOY_BUG(watermark_callbacks_, "Watermark callbacks should already be registered!");
for (uint32_t i = 0; i < high_watermark_calls_; ++i) {
watermark_callbacks_->get().onDecoderFilterBelowWriteBufferLowWatermark();
watermark_callbacks_->get().onBelowWriteBufferLowWatermark();
}
watermark_callbacks_.reset();
}
Expand All @@ -163,7 +163,7 @@ class AsyncStreamImpl : public virtual AsyncClient::Stream,
// Callback to listen for stream destruction.
absl::optional<AsyncClient::StreamDestructorCallbacks> destructor_callback_;
// Callback to listen for low/high/overflow watermark events.
absl::optional<std::reference_wrapper<DecoderFilterWatermarkCallbacks>> watermark_callbacks_;
absl::optional<std::reference_wrapper<SidestreamWatermarkCallbacks>> watermark_callbacks_;
bool complete_{};
const bool discard_response_body_;

Expand Down Expand Up @@ -231,14 +231,14 @@ class AsyncStreamImpl : public virtual AsyncClient::Stream,
void onDecoderFilterAboveWriteBufferHighWatermark() override {
++high_watermark_calls_;
if (watermark_callbacks_.has_value()) {
watermark_callbacks_->get().onDecoderFilterAboveWriteBufferHighWatermark();
watermark_callbacks_->get().onAboveWriteBufferHighWatermark();
}
}
void onDecoderFilterBelowWriteBufferLowWatermark() override {
ASSERT(high_watermark_calls_ != 0);
--high_watermark_calls_;
if (watermark_callbacks_.has_value()) {
watermark_callbacks_->get().onDecoderFilterBelowWriteBufferLowWatermark();
watermark_callbacks_->get().onBelowWriteBufferLowWatermark();
}
}
void addDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks&) override {}
Expand Down
53 changes: 53 additions & 0 deletions source/common/http/async_client_utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,58 @@ class AsyncClientRequestTracker {
absl::flat_hash_set<AsyncClient::Request*> active_requests_;
};

/**
* Sidestream watermark callback implementation for stream filter that either handles decoding only
* or handles both encoding and decoding.
*/
class StreamFilterSidestreamWatermarkCallbacks : public Http::SidestreamWatermarkCallbacks {
tyxia marked this conversation as resolved.
Show resolved Hide resolved
public:
StreamFilterSidestreamWatermarkCallbacks() = default;

void onAboveWriteBufferHighWatermark() final {
tyxia marked this conversation as resolved.
Show resolved Hide resolved
// Sidestream push back downstream, if callback is configured.
if (decode_callback_ != nullptr) {
decode_callback_->onDecoderFilterAboveWriteBufferHighWatermark();
} else {
// TODO(tyxia) Log
}

// Sidestream push back upstream, if callback is configured.
if (encode_callback_ != nullptr) {
encode_callback_->onEncoderFilterAboveWriteBufferHighWatermark();
} else {
// TODO(tyxia) Log
}
}

void onBelowWriteBufferLowWatermark() final {
if (decode_callback_ != nullptr) {
decode_callback_->onDecoderFilterBelowWriteBufferLowWatermark();
}

if (encode_callback_ != nullptr) {
encode_callback_->onEncoderFilterBelowWriteBufferLowWatermark();
} else {
}
}

void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks* decode_callback) {
decode_callback_ = decode_callback;
}

void setEncoderFilterCallbacks(Http::StreamEncoderFilterCallbacks* encode_callback) {
encode_callback_ = encode_callback;
}

void resetFilterCallbacks() {
decode_callback_ = nullptr;
encode_callback_ = nullptr;
}

private:
Http::StreamDecoderFilterCallbacks* decode_callback_ = nullptr;
Http::StreamEncoderFilterCallbacks* encode_callback_ = nullptr;
};

} // namespace Http
} // namespace Envoy
1 change: 1 addition & 0 deletions source/common/router/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ envoy_cc_library(
"//source/common/common:utility_lib",
"//source/common/config:utility_lib",
"//source/common/grpc:common_lib",
"//source/common/http:async_client_utility_lib",
"//source/common/http:codes_lib",
"//source/common/http:filter_chain_helper_lib",
"//source/common/http:filter_manager_lib",
Expand Down
4 changes: 3 additions & 1 deletion source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,
shadow_streams_.insert(shadow_stream);
shadow_stream->setDestructorCallback(
[this, shadow_stream]() { shadow_streams_.erase(shadow_stream); });
shadow_stream->setWatermarkCallbacks(*callbacks_);
shadow_stream->setWatermarkCallbacks(watermark_callbacks_);
}
}
}
Expand Down Expand Up @@ -994,6 +994,8 @@ void Filter::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callb
if (callbacks_->decoderBufferLimit() != 0) {
retry_shadow_buffer_limit_ = callbacks_->decoderBufferLimit();
}

watermark_callbacks_.setDecoderFilterCallbacks(callbacks_);
}

void Filter::cleanup() {
Expand Down
2 changes: 2 additions & 0 deletions source/common/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "source/common/common/logger.h"
#include "source/common/config/utility.h"
#include "source/common/config/well_known_names.h"
#include "source/common/http/async_client_utility.h"
#include "source/common/http/filter_chain_helper.h"
#include "source/common/http/utility.h"
#include "source/common/router/config_impl.h"
Expand Down Expand Up @@ -598,6 +599,7 @@ class Filter : Logger::Loggable<Logger::Id::router>,
uint32_t pending_retries_{0};
Http::Code timeout_response_code_ = Http::Code::GatewayTimeout;
FilterUtility::HedgingParams hedging_params_;
Http::StreamFilterSidestreamWatermarkCallbacks watermark_callbacks_;
bool grpc_request_ : 1;
bool exclude_http_code_stats_ : 1;
bool downstream_response_started_ : 1;
Expand Down
1 change: 1 addition & 0 deletions source/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ envoy_cc_library(
"//envoy/grpc:async_client_manager_interface",
"//envoy/grpc:status",
"//envoy/stream_info:stream_info_interface",
"//source/common/http:async_client_utility_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
"@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto",
],
Expand Down
4 changes: 3 additions & 1 deletion source/extensions/filters/http/ext_proc/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include "envoy/service/ext_proc/v3/external_processor.pb.h"
#include "envoy/stream_info/stream_info.h"

#include "source/common/http/async_client_utility.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
Expand Down Expand Up @@ -44,7 +46,7 @@ class ExternalProcessorClient {
start(ExternalProcessorCallbacks& callbacks,
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
const Http::AsyncClient::StreamOptions& options,
Http::DecoderFilterWatermarkCallbacks* decoder_watermark_callbacks) PURE;
Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) PURE;
};

using ExternalProcessorClientPtr = std::unique_ptr<ExternalProcessorClient>;
Expand Down
8 changes: 4 additions & 4 deletions source/extensions/filters/http/ext_proc/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,25 @@ ExternalProcessorStreamPtr ExternalProcessorClientImpl::start(
ExternalProcessorCallbacks& callbacks,
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
const Http::AsyncClient::StreamOptions& options,
Http::DecoderFilterWatermarkCallbacks* decoder_watermark_callbacks) {
Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) {
auto client_or_error =
client_manager_.getOrCreateRawAsyncClientWithHashKey(config_with_hash_key, scope_, true);
THROW_IF_STATUS_NOT_OK(client_or_error, throw);
Grpc::AsyncClient<ProcessingRequest, ProcessingResponse> grpcClient(client_or_error.value());
return ExternalProcessorStreamImpl::create(std::move(grpcClient), callbacks, options,
decoder_watermark_callbacks);
sidestream_watermark_callbacks);
}

ExternalProcessorStreamPtr ExternalProcessorStreamImpl::create(
Grpc::AsyncClient<ProcessingRequest, ProcessingResponse>&& client,
ExternalProcessorCallbacks& callbacks, const Http::AsyncClient::StreamOptions& options,
Http::DecoderFilterWatermarkCallbacks* decoder_watermark_callbacks) {
Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) {
auto stream =
std::unique_ptr<ExternalProcessorStreamImpl>(new ExternalProcessorStreamImpl(callbacks));

if (stream->startStream(std::move(client), options)) {
if (stream->grpcSidestreamFlowControl()) {
stream->stream_->setWatermarkCallbacks(*decoder_watermark_callbacks);
stream->stream_->setWatermarkCallbacks(sidestream_watermark_callbacks);
}
return stream;
}
Expand Down
4 changes: 2 additions & 2 deletions source/extensions/filters/http/ext_proc/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class ExternalProcessorClientImpl : public ExternalProcessorClient {
start(ExternalProcessorCallbacks& callbacks,
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
const Http::AsyncClient::StreamOptions& options,
Http::DecoderFilterWatermarkCallbacks* decoder_watermark_callbacks) override;
Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) override;

private:
Grpc::AsyncClientManager& client_manager_;
Expand All @@ -46,7 +46,7 @@ class ExternalProcessorStreamImpl : public ExternalProcessorStream,
static ExternalProcessorStreamPtr
create(Grpc::AsyncClient<ProcessingRequest, ProcessingResponse>&& client,
ExternalProcessorCallbacks& callbacks, const Http::AsyncClient::StreamOptions& options,
Http::DecoderFilterWatermarkCallbacks* decoder_watermark_callbacks);
Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks);

void send(ProcessingRequest&& request, bool end_stream) override;
// Close the stream. This is idempotent and will return true if we
Expand Down
5 changes: 4 additions & 1 deletion source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ FilterConfigPerRoute::FilterConfigPerRoute(const FilterConfigPerRoute& less_spec
void Filter::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) {
Http::PassThroughFilter::setDecoderFilterCallbacks(callbacks);
filter_callbacks_ = &callbacks;
watermark_callbacks_.setDecoderFilterCallbacks(&callbacks);
decoding_state_.setDecoderFilterCallbacks(callbacks);
const Envoy::StreamInfo::FilterStateSharedPtr& filter_state =
callbacks.streamInfo().filterState();
Expand All @@ -322,6 +323,7 @@ void Filter::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callb
void Filter::setEncoderFilterCallbacks(Http::StreamEncoderFilterCallbacks& callbacks) {
Http::PassThroughFilter::setEncoderFilterCallbacks(callbacks);
encoding_state_.setEncoderFilterCallbacks(callbacks);
watermark_callbacks_.setEncoderFilterCallbacks(&callbacks);
alyssawilk marked this conversation as resolved.
Show resolved Hide resolved
}

Filter::StreamOpenState Filter::openStream() {
Expand All @@ -343,7 +345,7 @@ Filter::StreamOpenState Filter::openStream() {
.setBufferBodyForRetry(true);

ExternalProcessorStreamPtr stream_object =
client_->start(*this, config_with_hash_key_, options, decoder_callbacks_);
client_->start(*this, config_with_hash_key_, options, watermark_callbacks_);

if (processing_complete_) {
// Stream failed while starting and either onGrpcError or onGrpcClose was already called
Expand Down Expand Up @@ -396,6 +398,7 @@ void Filter::onDestroy() {
if (stream_ != nullptr) {
stream_->notifyFilterDestroy();
}
watermark_callbacks_.resetFilterCallbacks();

if (config_->observabilityMode()) {
// In observability mode where the main stream processing and side stream processing are
Expand Down
2 changes: 2 additions & 0 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,8 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
std::vector<std::string> typed_forwarding_namespaces_{};
std::vector<std::string> untyped_receiving_namespaces_{};
Http::StreamFilterCallbacks* filter_callbacks_;
// TODO(tyxia) Lifetime
Http::StreamFilterSidestreamWatermarkCallbacks watermark_callbacks_;
};

extern std::string responseCaseToString(
Expand Down
6 changes: 3 additions & 3 deletions test/common/grpc/grpc_client_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ TEST_P(EnvoyGrpcFlowControlTest, BasicStreamWithFlowControl) {
initialize();
auto stream = createStream(empty_metadata_);

testing::StrictMock<Http::MockStreamDecoderFilterCallbacks> watermark_callbacks;
testing::StrictMock<Http::MockSidestreamWatermarkCallbacks> watermark_callbacks;

// Registering the new watermark callback.
stream->grpc_stream_->setWatermarkCallbacks(watermark_callbacks);
// Expect that flow control kicks in and watermark calls are triggered.
EXPECT_CALL(watermark_callbacks, onDecoderFilterAboveWriteBufferHighWatermark());
EXPECT_CALL(watermark_callbacks, onDecoderFilterBelowWriteBufferLowWatermark());
EXPECT_CALL(watermark_callbacks, onAboveWriteBufferHighWatermark());
EXPECT_CALL(watermark_callbacks, onBelowWriteBufferLowWatermark());

// Create send request with large request string.
helloworld::HelloRequest request_msg;
Expand Down
Loading
Loading