Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flow control: side stream flow control part 2 #35077

Merged
merged 17 commits into from
Jul 25, 2024
2 changes: 1 addition & 1 deletion envoy/grpc/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class RawAsyncStream {
* removeWatermarkCallbacks. If there's already a watermark callback registered, this method
* will trigger ENVOY_BUG.
*/
virtual void setWatermarkCallbacks(Http::DecoderFilterWatermarkCallbacks& callbacks) PURE;
virtual void setWatermarkCallbacks(Http::SidestreamWatermarkCallbacks& callbacks) PURE;

/***
* Remove previously set watermark callbacks. If there's no watermark callback registered, this
Expand Down
27 changes: 26 additions & 1 deletion envoy/http/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,31 @@ using FilterConfigSharedPtr = std::shared_ptr<FilterConfig>;
} // namespace Router
namespace Http {

/**
* Callbacks for sidestream connection (from http async client) watermark limits.
*/
class SidestreamWatermarkCallbacks {
tyxia marked this conversation as resolved.
Show resolved Hide resolved
public:
virtual ~SidestreamWatermarkCallbacks() = default;

/**
* Called when the sidestream connection or stream goes over its high watermark. Note that this
* may be called separately for both the stream going over and the connection going over. It
* is the responsibility of the sidestreamWatermarkCallbacks implementation to handle unwinding
* multiple high and low watermark calls.
*/
virtual void onSidestreamAboveHighWatermark() PURE;

/**
* Called when the sidestream connection or stream goes from over its high watermark to under its
* low watermark. As with onSidestreamAboveHighWatermark above, this may be called independently
* when both the stream and the connection go under the low watermark limit, and the callee must
* ensure that the flow of data does not resume until all callers which were above their high
* watermarks have gone below.
*/
virtual void onSidestreamBelowLowWatermark() PURE;
};

/**
* Supports sending an HTTP request message and receiving a response asynchronously.
*/
Expand Down Expand Up @@ -191,7 +216,7 @@ class AsyncClient {
* removeWatermarkCallbacks. If there's already a watermark callback registered, this method
* will trigger ENVOY_BUG.
*/
virtual void setWatermarkCallbacks(DecoderFilterWatermarkCallbacks& callbacks) PURE;
virtual void setWatermarkCallbacks(Http::SidestreamWatermarkCallbacks& callbacks) PURE;

/***
* Remove previously set watermark callbacks. If there's no watermark callback registered, this
Expand Down
39 changes: 17 additions & 22 deletions envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -526,33 +526,12 @@ class StreamFilterCallbacks {
virtual ResponseTrailerMapOptRef responseTrailers() PURE;
};

class DecoderFilterWatermarkCallbacks {
public:
virtual ~DecoderFilterWatermarkCallbacks() = default;

/**
* Called when the buffer for a decoder filter or any buffers the filter sends data to go over
* their high watermark.
*
* In the case of a filter such as the router filter, which spills into multiple buffers (codec,
* connection etc.) this may be called multiple times. Any such filter is responsible for calling
* the low watermark callbacks an equal number of times as the respective buffers are drained.
*/
virtual void onDecoderFilterAboveWriteBufferHighWatermark() PURE;

/**
* Called when a decoder filter or any buffers the filter sends data to go from over its high
* watermark to under its low watermark.
*/
virtual void onDecoderFilterBelowWriteBufferLowWatermark() PURE;
};
/**
* Stream decoder filter callbacks add additional callbacks that allow a
* decoding filter to restart decoding if they decide to hold data (e.g. for
* buffering or rate limiting).
*/
class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks,
public virtual DecoderFilterWatermarkCallbacks {
class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks {
public:
/**
* Continue iterating through the filter chain with buffered headers and body data. This routine
Expand Down Expand Up @@ -727,6 +706,22 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks,
*/
virtual void encodeMetadata(MetadataMapPtr&& metadata_map) PURE;

/**
tyxia marked this conversation as resolved.
Show resolved Hide resolved
* Called when the buffer for a decoder filter or any buffers the filter sends data to go over
* their high watermark.
*
* In the case of a filter such as the router filter, which spills into multiple buffers (codec,
* connection etc.) this may be called multiple times. Any such filter is responsible for calling
* the low watermark callbacks an equal number of times as the respective buffers are drained.
*/
virtual void onDecoderFilterAboveWriteBufferHighWatermark() PURE;

/**
* Called when a decoder filter or any buffers the filter sends data to go from over its high
* watermark to under its low watermark.
*/
virtual void onDecoderFilterBelowWriteBufferLowWatermark() PURE;

/**
* This routine can be called by a filter to subscribe to watermark events on the downstream
* stream and downstream connection.
Expand Down
2 changes: 1 addition & 1 deletion source/common/grpc/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class AsyncStreamImpl : public RawAsyncStream,
bool hasResetStream() const { return http_reset_; }
const StreamInfo::StreamInfo& streamInfo() const override { return stream_->streamInfo(); }

void setWatermarkCallbacks(Http::DecoderFilterWatermarkCallbacks& callbacks) override {
void setWatermarkCallbacks(Http::SidestreamWatermarkCallbacks& callbacks) override {
stream_->setWatermarkCallbacks(callbacks);
}

Expand Down
2 changes: 1 addition & 1 deletion source/common/grpc/google_async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ class GoogleAsyncStreamImpl : public RawAsyncStream,
const StreamInfo::StreamInfo& streamInfo() const override { return unused_stream_info_; }

// Google-gRPC code doesn't use Envoy watermark buffers, so the functions below are not used.
void setWatermarkCallbacks(Http::DecoderFilterWatermarkCallbacks&) override {}
void setWatermarkCallbacks(Http::SidestreamWatermarkCallbacks&) override {}
void removeWatermarkCallbacks() override {}

protected:
Expand Down
2 changes: 1 addition & 1 deletion source/common/grpc/typed_async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ template <typename Request> class AsyncStream /* : public RawAsyncStream */ {
return stream_->isAboveWriteBufferHighWatermark();
}

void setWatermarkCallbacks(Envoy::Http::DecoderFilterWatermarkCallbacks& callbacks) {
void setWatermarkCallbacks(Http::SidestreamWatermarkCallbacks& callbacks) {
stream_->setWatermarkCallbacks(callbacks);
}

Expand Down
8 changes: 8 additions & 0 deletions source/common/http/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "sidestream_watermark_lib",
hdrs = ["sidestream_watermark.h"],
deps = [
"//envoy/http:async_client_interface",
],
)

envoy_cc_library(
name = "character_set_validation_lib",
hdrs = ["character_set_validation.h"],
Expand Down
12 changes: 6 additions & 6 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,18 @@ class AsyncStreamImpl : public virtual AsyncClient::Stream,
destructor_callback_.reset();
}

void setWatermarkCallbacks(DecoderFilterWatermarkCallbacks& callbacks) override {
void setWatermarkCallbacks(Http::SidestreamWatermarkCallbacks& callbacks) override {
ENVOY_BUG(!watermark_callbacks_, "Watermark callbacks should not already be registered!");
watermark_callbacks_.emplace(callbacks);
for (uint32_t i = 0; i < high_watermark_calls_; ++i) {
watermark_callbacks_->get().onDecoderFilterAboveWriteBufferHighWatermark();
watermark_callbacks_->get().onSidestreamAboveHighWatermark();
}
}

void removeWatermarkCallbacks() override {
ENVOY_BUG(watermark_callbacks_, "Watermark callbacks should already be registered!");
for (uint32_t i = 0; i < high_watermark_calls_; ++i) {
watermark_callbacks_->get().onDecoderFilterBelowWriteBufferLowWatermark();
watermark_callbacks_->get().onSidestreamBelowLowWatermark();
}
watermark_callbacks_.reset();
}
Expand All @@ -163,7 +163,7 @@ class AsyncStreamImpl : public virtual AsyncClient::Stream,
// Callback to listen for stream destruction.
absl::optional<AsyncClient::StreamDestructorCallbacks> destructor_callback_;
// Callback to listen for low/high/overflow watermark events.
absl::optional<std::reference_wrapper<DecoderFilterWatermarkCallbacks>> watermark_callbacks_;
absl::optional<std::reference_wrapper<SidestreamWatermarkCallbacks>> watermark_callbacks_;
bool complete_{};
const bool discard_response_body_;

Expand Down Expand Up @@ -231,14 +231,14 @@ class AsyncStreamImpl : public virtual AsyncClient::Stream,
void onDecoderFilterAboveWriteBufferHighWatermark() override {
++high_watermark_calls_;
if (watermark_callbacks_.has_value()) {
watermark_callbacks_->get().onDecoderFilterAboveWriteBufferHighWatermark();
watermark_callbacks_->get().onSidestreamAboveHighWatermark();
}
}
void onDecoderFilterBelowWriteBufferLowWatermark() override {
ASSERT(high_watermark_calls_ != 0);
--high_watermark_calls_;
if (watermark_callbacks_.has_value()) {
watermark_callbacks_->get().onDecoderFilterBelowWriteBufferLowWatermark();
watermark_callbacks_->get().onSidestreamBelowLowWatermark();
}
}
void addDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks&) override {}
Expand Down
61 changes: 61 additions & 0 deletions source/common/http/sidestream_watermark.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#pragma once

#include "envoy/http/async_client.h"

namespace Envoy {
namespace Http {

/**
* Sidestream watermark callback implementation for stream filter that either handles decoding only
* or handles both encoding and decoding.
*/
class StreamFilterSidestreamWatermarkCallbacks : public Http::SidestreamWatermarkCallbacks {
public:
void onSidestreamAboveHighWatermark() final {
// Sidestream push back downstream, if callback is configured.
if (decode_callback_ != nullptr) {
decode_callback_->onDecoderFilterAboveWriteBufferHighWatermark();
}

// Sidestream push back upstream, if callback is configured.
if (encode_callback_ != nullptr) {
encode_callback_->onEncoderFilterAboveWriteBufferHighWatermark();
}
}

void onSidestreamBelowLowWatermark() final {
if (decode_callback_ != nullptr) {
decode_callback_->onDecoderFilterBelowWriteBufferLowWatermark();
}

if (encode_callback_ != nullptr) {
encode_callback_->onEncoderFilterBelowWriteBufferLowWatermark();
}
}

/**
* The set function needs to be called by stream decoder filter before side stream connection is
* established, to apply the backpressure to downstream when it is above watermark,
*/
void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks* decode_callback) {
alyssawilk marked this conversation as resolved.
Show resolved Hide resolved
decode_callback_ = decode_callback;
}

/**
* The set function needs to be called by stream encoder filter before side stream connection is
* established, to apply the backpressure to upstream when it is above watermark,
*/
void setEncoderFilterCallbacks(Http::StreamEncoderFilterCallbacks* encode_callback) {
encode_callback_ = encode_callback;
}

private:
// Non owning pointers; `removeWatermarkCallbacks()` needs to be called to unregister watermark
// callbacks (if any) before filter callbacks are destroyed. Typically when stream is being closed
// or filter is being destroyed.
Http::StreamDecoderFilterCallbacks* decode_callback_ = nullptr;
Http::StreamEncoderFilterCallbacks* encode_callback_ = nullptr;
};

} // namespace Http
} // namespace Envoy
1 change: 1 addition & 0 deletions source/common/router/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ envoy_cc_library(
"//source/common/http:header_map_lib",
"//source/common/http:headers_lib",
"//source/common/http:message_lib",
"//source/common/http:sidestream_watermark_lib",
"//source/common/http:utility_lib",
"//source/common/network:application_protocol_lib",
"//source/common/network:socket_option_factory_lib",
Expand Down
4 changes: 3 additions & 1 deletion source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,7 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,
shadow_streams_.insert(shadow_stream);
shadow_stream->setDestructorCallback(
[this, shadow_stream]() { shadow_streams_.erase(shadow_stream); });
shadow_stream->setWatermarkCallbacks(*callbacks_);
shadow_stream->setWatermarkCallbacks(watermark_callbacks_);
}
}
}
Expand Down Expand Up @@ -993,6 +993,8 @@ void Filter::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callb
if (callbacks_->decoderBufferLimit() != 0) {
retry_shadow_buffer_limit_ = callbacks_->decoderBufferLimit();
}

watermark_callbacks_.setDecoderFilterCallbacks(callbacks_);
}

void Filter::cleanup() {
Expand Down
2 changes: 2 additions & 0 deletions source/common/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "source/common/config/utility.h"
#include "source/common/config/well_known_names.h"
#include "source/common/http/filter_chain_helper.h"
#include "source/common/http/sidestream_watermark.h"
#include "source/common/http/utility.h"
#include "source/common/router/config_impl.h"
#include "source/common/router/context_impl.h"
Expand Down Expand Up @@ -599,6 +600,7 @@ class Filter : Logger::Loggable<Logger::Id::router>,
uint32_t pending_retries_{0};
Http::Code timeout_response_code_ = Http::Code::GatewayTimeout;
FilterUtility::HedgingParams hedging_params_;
Http::StreamFilterSidestreamWatermarkCallbacks watermark_callbacks_;
bool grpc_request_ : 1;
bool exclude_http_code_stats_ : 1;
bool downstream_response_started_ : 1;
Expand Down
1 change: 1 addition & 0 deletions source/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ envoy_cc_library(
"//envoy/grpc:async_client_manager_interface",
"//envoy/grpc:status",
"//envoy/stream_info:stream_info_interface",
"//source/common/http:sidestream_watermark_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
"@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto",
],
Expand Down
4 changes: 3 additions & 1 deletion source/extensions/filters/http/ext_proc/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include "envoy/service/ext_proc/v3/external_processor.pb.h"
#include "envoy/stream_info/stream_info.h"

#include "source/common/http/sidestream_watermark.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
Expand Down Expand Up @@ -44,7 +46,7 @@ class ExternalProcessorClient {
start(ExternalProcessorCallbacks& callbacks,
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
const Http::AsyncClient::StreamOptions& options,
Http::DecoderFilterWatermarkCallbacks* decoder_watermark_callbacks) PURE;
Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) PURE;
};

using ExternalProcessorClientPtr = std::unique_ptr<ExternalProcessorClient>;
Expand Down
8 changes: 4 additions & 4 deletions source/extensions/filters/http/ext_proc/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,25 @@ ExternalProcessorStreamPtr ExternalProcessorClientImpl::start(
ExternalProcessorCallbacks& callbacks,
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
const Http::AsyncClient::StreamOptions& options,
Http::DecoderFilterWatermarkCallbacks* decoder_watermark_callbacks) {
Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) {
auto client_or_error =
client_manager_.getOrCreateRawAsyncClientWithHashKey(config_with_hash_key, scope_, true);
THROW_IF_STATUS_NOT_OK(client_or_error, throw);
Grpc::AsyncClient<ProcessingRequest, ProcessingResponse> grpcClient(client_or_error.value());
return ExternalProcessorStreamImpl::create(std::move(grpcClient), callbacks, options,
decoder_watermark_callbacks);
sidestream_watermark_callbacks);
}

ExternalProcessorStreamPtr ExternalProcessorStreamImpl::create(
Grpc::AsyncClient<ProcessingRequest, ProcessingResponse>&& client,
ExternalProcessorCallbacks& callbacks, const Http::AsyncClient::StreamOptions& options,
Http::DecoderFilterWatermarkCallbacks* decoder_watermark_callbacks) {
Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) {
auto stream =
std::unique_ptr<ExternalProcessorStreamImpl>(new ExternalProcessorStreamImpl(callbacks));

if (stream->startStream(std::move(client), options)) {
if (stream->grpcSidestreamFlowControl()) {
stream->stream_->setWatermarkCallbacks(*decoder_watermark_callbacks);
stream->stream_->setWatermarkCallbacks(sidestream_watermark_callbacks);
}
return stream;
}
Expand Down
4 changes: 2 additions & 2 deletions source/extensions/filters/http/ext_proc/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class ExternalProcessorClientImpl : public ExternalProcessorClient {
start(ExternalProcessorCallbacks& callbacks,
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
const Http::AsyncClient::StreamOptions& options,
Http::DecoderFilterWatermarkCallbacks* decoder_watermark_callbacks) override;
Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) override;

private:
Grpc::AsyncClientManager& client_manager_;
Expand All @@ -46,7 +46,7 @@ class ExternalProcessorStreamImpl : public ExternalProcessorStream,
static ExternalProcessorStreamPtr
create(Grpc::AsyncClient<ProcessingRequest, ProcessingResponse>&& client,
ExternalProcessorCallbacks& callbacks, const Http::AsyncClient::StreamOptions& options,
Http::DecoderFilterWatermarkCallbacks* decoder_watermark_callbacks);
Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks);

void send(ProcessingRequest&& request, bool end_stream) override;
// Close the stream. This is idempotent and will return true if we
Expand Down
Loading