Skip to content

Commit

Permalink
flow control: side stream flow control part 2 (envoyproxy#35077)
Browse files Browse the repository at this point in the history
- Deprecated and replaced the `DecoderFilterWatermarkCallbacks` that was
introduced [PR](envoyproxy#25207) with
`SidestreamWatermarkCallbacks` in this PR so that both encoder and
decode filter callbacks can be registered
- Implements sidestream push back on upstream

Risk Level: Low, existing runtime guard
Testing: integration test and unit test
Docs Changes: N/A. a design doc in draft
https://docs.google.com/document/d/1fwISI5wKQe6H3EOuaDLrAAgPU8Hq3KXaxJG0pm3vu-4/edit#heading=h.xgjl2srtytjt
and will add it to envoy flow control doc in the end or at least when
full flow control is approved.
Release Notes: N/A
Platform Specific Features: N/A

---------

Signed-off-by: tyxia <[email protected]>
Signed-off-by: asingh-g <[email protected]>
  • Loading branch information
tyxia authored and asingh-g committed Aug 20, 2024
1 parent a987542 commit e65ad8d
Show file tree
Hide file tree
Showing 30 changed files with 291 additions and 75 deletions.
2 changes: 1 addition & 1 deletion envoy/grpc/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class RawAsyncStream {
* removeWatermarkCallbacks. If there's already a watermark callback registered, this method
* will trigger ENVOY_BUG.
*/
virtual void setWatermarkCallbacks(Http::DecoderFilterWatermarkCallbacks& callbacks) PURE;
virtual void setWatermarkCallbacks(Http::SidestreamWatermarkCallbacks& callbacks) PURE;

/***
* Remove previously set watermark callbacks. If there's no watermark callback registered, this
Expand Down
27 changes: 26 additions & 1 deletion envoy/http/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,31 @@ using FilterConfigSharedPtr = std::shared_ptr<FilterConfig>;
} // namespace Router
namespace Http {

/**
* Callbacks for sidestream connection (from http async client) watermark limits.
*/
class SidestreamWatermarkCallbacks {
public:
virtual ~SidestreamWatermarkCallbacks() = default;

/**
* Called when the sidestream connection or stream goes over its high watermark. Note that this
* may be called separately for both the stream going over and the connection going over. It
* is the responsibility of the sidestreamWatermarkCallbacks implementation to handle unwinding
* multiple high and low watermark calls.
*/
virtual void onSidestreamAboveHighWatermark() PURE;

/**
* Called when the sidestream connection or stream goes from over its high watermark to under its
* low watermark. As with onSidestreamAboveHighWatermark above, this may be called independently
* when both the stream and the connection go under the low watermark limit, and the callee must
* ensure that the flow of data does not resume until all callers which were above their high
* watermarks have gone below.
*/
virtual void onSidestreamBelowLowWatermark() PURE;
};

/**
* Supports sending an HTTP request message and receiving a response asynchronously.
*/
Expand Down Expand Up @@ -191,7 +216,7 @@ class AsyncClient {
* removeWatermarkCallbacks. If there's already a watermark callback registered, this method
* will trigger ENVOY_BUG.
*/
virtual void setWatermarkCallbacks(DecoderFilterWatermarkCallbacks& callbacks) PURE;
virtual void setWatermarkCallbacks(Http::SidestreamWatermarkCallbacks& callbacks) PURE;

/***
* Remove previously set watermark callbacks. If there's no watermark callback registered, this
Expand Down
39 changes: 17 additions & 22 deletions envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -526,33 +526,12 @@ class StreamFilterCallbacks {
virtual ResponseTrailerMapOptRef responseTrailers() PURE;
};

class DecoderFilterWatermarkCallbacks {
public:
virtual ~DecoderFilterWatermarkCallbacks() = default;

/**
* Called when the buffer for a decoder filter or any buffers the filter sends data to go over
* their high watermark.
*
* In the case of a filter such as the router filter, which spills into multiple buffers (codec,
* connection etc.) this may be called multiple times. Any such filter is responsible for calling
* the low watermark callbacks an equal number of times as the respective buffers are drained.
*/
virtual void onDecoderFilterAboveWriteBufferHighWatermark() PURE;

/**
* Called when a decoder filter or any buffers the filter sends data to go from over its high
* watermark to under its low watermark.
*/
virtual void onDecoderFilterBelowWriteBufferLowWatermark() PURE;
};
/**
* Stream decoder filter callbacks add additional callbacks that allow a
* decoding filter to restart decoding if they decide to hold data (e.g. for
* buffering or rate limiting).
*/
class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks,
public virtual DecoderFilterWatermarkCallbacks {
class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks {
public:
/**
* Continue iterating through the filter chain with buffered headers and body data. This routine
Expand Down Expand Up @@ -727,6 +706,22 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks,
*/
virtual void encodeMetadata(MetadataMapPtr&& metadata_map) PURE;

/**
* Called when the buffer for a decoder filter or any buffers the filter sends data to go over
* their high watermark.
*
* In the case of a filter such as the router filter, which spills into multiple buffers (codec,
* connection etc.) this may be called multiple times. Any such filter is responsible for calling
* the low watermark callbacks an equal number of times as the respective buffers are drained.
*/
virtual void onDecoderFilterAboveWriteBufferHighWatermark() PURE;

/**
* Called when a decoder filter or any buffers the filter sends data to go from over its high
* watermark to under its low watermark.
*/
virtual void onDecoderFilterBelowWriteBufferLowWatermark() PURE;

/**
* This routine can be called by a filter to subscribe to watermark events on the downstream
* stream and downstream connection.
Expand Down
2 changes: 1 addition & 1 deletion source/common/grpc/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class AsyncStreamImpl : public RawAsyncStream,
bool hasResetStream() const { return http_reset_; }
const StreamInfo::StreamInfo& streamInfo() const override { return stream_->streamInfo(); }

void setWatermarkCallbacks(Http::DecoderFilterWatermarkCallbacks& callbacks) override {
void setWatermarkCallbacks(Http::SidestreamWatermarkCallbacks& callbacks) override {
stream_->setWatermarkCallbacks(callbacks);
}

Expand Down
2 changes: 1 addition & 1 deletion source/common/grpc/google_async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ class GoogleAsyncStreamImpl : public RawAsyncStream,
const StreamInfo::StreamInfo& streamInfo() const override { return unused_stream_info_; }

// Google-gRPC code doesn't use Envoy watermark buffers, so the functions below are not used.
void setWatermarkCallbacks(Http::DecoderFilterWatermarkCallbacks&) override {}
void setWatermarkCallbacks(Http::SidestreamWatermarkCallbacks&) override {}
void removeWatermarkCallbacks() override {}

protected:
Expand Down
2 changes: 1 addition & 1 deletion source/common/grpc/typed_async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ template <typename Request> class AsyncStream /* : public RawAsyncStream */ {
return stream_->isAboveWriteBufferHighWatermark();
}

void setWatermarkCallbacks(Envoy::Http::DecoderFilterWatermarkCallbacks& callbacks) {
void setWatermarkCallbacks(Http::SidestreamWatermarkCallbacks& callbacks) {
stream_->setWatermarkCallbacks(callbacks);
}

Expand Down
8 changes: 8 additions & 0 deletions source/common/http/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "sidestream_watermark_lib",
hdrs = ["sidestream_watermark.h"],
deps = [
"//envoy/http:async_client_interface",
],
)

envoy_cc_library(
name = "character_set_validation_lib",
hdrs = ["character_set_validation.h"],
Expand Down
12 changes: 6 additions & 6 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,18 @@ class AsyncStreamImpl : public virtual AsyncClient::Stream,
destructor_callback_.reset();
}

void setWatermarkCallbacks(DecoderFilterWatermarkCallbacks& callbacks) override {
void setWatermarkCallbacks(Http::SidestreamWatermarkCallbacks& callbacks) override {
ENVOY_BUG(!watermark_callbacks_, "Watermark callbacks should not already be registered!");
watermark_callbacks_.emplace(callbacks);
for (uint32_t i = 0; i < high_watermark_calls_; ++i) {
watermark_callbacks_->get().onDecoderFilterAboveWriteBufferHighWatermark();
watermark_callbacks_->get().onSidestreamAboveHighWatermark();
}
}

void removeWatermarkCallbacks() override {
ENVOY_BUG(watermark_callbacks_, "Watermark callbacks should already be registered!");
for (uint32_t i = 0; i < high_watermark_calls_; ++i) {
watermark_callbacks_->get().onDecoderFilterBelowWriteBufferLowWatermark();
watermark_callbacks_->get().onSidestreamBelowLowWatermark();
}
watermark_callbacks_.reset();
}
Expand All @@ -163,7 +163,7 @@ class AsyncStreamImpl : public virtual AsyncClient::Stream,
// Callback to listen for stream destruction.
absl::optional<AsyncClient::StreamDestructorCallbacks> destructor_callback_;
// Callback to listen for low/high/overflow watermark events.
absl::optional<std::reference_wrapper<DecoderFilterWatermarkCallbacks>> watermark_callbacks_;
absl::optional<std::reference_wrapper<SidestreamWatermarkCallbacks>> watermark_callbacks_;
bool complete_{};
const bool discard_response_body_;

Expand Down Expand Up @@ -231,14 +231,14 @@ class AsyncStreamImpl : public virtual AsyncClient::Stream,
void onDecoderFilterAboveWriteBufferHighWatermark() override {
++high_watermark_calls_;
if (watermark_callbacks_.has_value()) {
watermark_callbacks_->get().onDecoderFilterAboveWriteBufferHighWatermark();
watermark_callbacks_->get().onSidestreamAboveHighWatermark();
}
}
void onDecoderFilterBelowWriteBufferLowWatermark() override {
ASSERT(high_watermark_calls_ != 0);
--high_watermark_calls_;
if (watermark_callbacks_.has_value()) {
watermark_callbacks_->get().onDecoderFilterBelowWriteBufferLowWatermark();
watermark_callbacks_->get().onSidestreamBelowLowWatermark();
}
}
void addDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks&) override {}
Expand Down
61 changes: 61 additions & 0 deletions source/common/http/sidestream_watermark.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#pragma once

#include "envoy/http/async_client.h"

namespace Envoy {
namespace Http {

/**
* Sidestream watermark callback implementation for stream filter that either handles decoding only
* or handles both encoding and decoding.
*/
class StreamFilterSidestreamWatermarkCallbacks : public Http::SidestreamWatermarkCallbacks {
public:
void onSidestreamAboveHighWatermark() final {
// Sidestream push back downstream, if callback is configured.
if (decode_callback_ != nullptr) {
decode_callback_->onDecoderFilterAboveWriteBufferHighWatermark();
}

// Sidestream push back upstream, if callback is configured.
if (encode_callback_ != nullptr) {
encode_callback_->onEncoderFilterAboveWriteBufferHighWatermark();
}
}

void onSidestreamBelowLowWatermark() final {
if (decode_callback_ != nullptr) {
decode_callback_->onDecoderFilterBelowWriteBufferLowWatermark();
}

if (encode_callback_ != nullptr) {
encode_callback_->onEncoderFilterBelowWriteBufferLowWatermark();
}
}

/**
* The set function needs to be called by stream decoder filter before side stream connection is
* established, to apply the backpressure to downstream when it is above watermark,
*/
void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks* decode_callback) {
decode_callback_ = decode_callback;
}

/**
* The set function needs to be called by stream encoder filter before side stream connection is
* established, to apply the backpressure to upstream when it is above watermark,
*/
void setEncoderFilterCallbacks(Http::StreamEncoderFilterCallbacks* encode_callback) {
encode_callback_ = encode_callback;
}

private:
// Non owning pointers; `removeWatermarkCallbacks()` needs to be called to unregister watermark
// callbacks (if any) before filter callbacks are destroyed. Typically when stream is being closed
// or filter is being destroyed.
Http::StreamDecoderFilterCallbacks* decode_callback_ = nullptr;
Http::StreamEncoderFilterCallbacks* encode_callback_ = nullptr;
};

} // namespace Http
} // namespace Envoy
1 change: 1 addition & 0 deletions source/common/router/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ envoy_cc_library(
"//source/common/http:header_map_lib",
"//source/common/http:headers_lib",
"//source/common/http:message_lib",
"//source/common/http:sidestream_watermark_lib",
"//source/common/http:utility_lib",
"//source/common/network:application_protocol_lib",
"//source/common/network:socket_option_factory_lib",
Expand Down
4 changes: 3 additions & 1 deletion source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,7 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,
shadow_streams_.insert(shadow_stream);
shadow_stream->setDestructorCallback(
[this, shadow_stream]() { shadow_streams_.erase(shadow_stream); });
shadow_stream->setWatermarkCallbacks(*callbacks_);
shadow_stream->setWatermarkCallbacks(watermark_callbacks_);
}
}
}
Expand Down Expand Up @@ -993,6 +993,8 @@ void Filter::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callb
if (callbacks_->decoderBufferLimit() != 0) {
retry_shadow_buffer_limit_ = callbacks_->decoderBufferLimit();
}

watermark_callbacks_.setDecoderFilterCallbacks(callbacks_);
}

void Filter::cleanup() {
Expand Down
2 changes: 2 additions & 0 deletions source/common/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "source/common/config/utility.h"
#include "source/common/config/well_known_names.h"
#include "source/common/http/filter_chain_helper.h"
#include "source/common/http/sidestream_watermark.h"
#include "source/common/http/utility.h"
#include "source/common/router/config_impl.h"
#include "source/common/router/context_impl.h"
Expand Down Expand Up @@ -599,6 +600,7 @@ class Filter : Logger::Loggable<Logger::Id::router>,
uint32_t pending_retries_{0};
Http::Code timeout_response_code_ = Http::Code::GatewayTimeout;
FilterUtility::HedgingParams hedging_params_;
Http::StreamFilterSidestreamWatermarkCallbacks watermark_callbacks_;
bool grpc_request_ : 1;
bool exclude_http_code_stats_ : 1;
bool downstream_response_started_ : 1;
Expand Down
1 change: 1 addition & 0 deletions source/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ envoy_cc_library(
"//envoy/grpc:async_client_manager_interface",
"//envoy/grpc:status",
"//envoy/stream_info:stream_info_interface",
"//source/common/http:sidestream_watermark_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
"@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto",
],
Expand Down
4 changes: 3 additions & 1 deletion source/extensions/filters/http/ext_proc/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include "envoy/service/ext_proc/v3/external_processor.pb.h"
#include "envoy/stream_info/stream_info.h"

#include "source/common/http/sidestream_watermark.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
Expand Down Expand Up @@ -44,7 +46,7 @@ class ExternalProcessorClient {
start(ExternalProcessorCallbacks& callbacks,
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
const Http::AsyncClient::StreamOptions& options,
Http::DecoderFilterWatermarkCallbacks* decoder_watermark_callbacks) PURE;
Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) PURE;
};

using ExternalProcessorClientPtr = std::unique_ptr<ExternalProcessorClient>;
Expand Down
8 changes: 4 additions & 4 deletions source/extensions/filters/http/ext_proc/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,25 @@ ExternalProcessorStreamPtr ExternalProcessorClientImpl::start(
ExternalProcessorCallbacks& callbacks,
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
const Http::AsyncClient::StreamOptions& options,
Http::DecoderFilterWatermarkCallbacks* decoder_watermark_callbacks) {
Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) {
auto client_or_error =
client_manager_.getOrCreateRawAsyncClientWithHashKey(config_with_hash_key, scope_, true);
THROW_IF_STATUS_NOT_OK(client_or_error, throw);
Grpc::AsyncClient<ProcessingRequest, ProcessingResponse> grpcClient(client_or_error.value());
return ExternalProcessorStreamImpl::create(std::move(grpcClient), callbacks, options,
decoder_watermark_callbacks);
sidestream_watermark_callbacks);
}

ExternalProcessorStreamPtr ExternalProcessorStreamImpl::create(
Grpc::AsyncClient<ProcessingRequest, ProcessingResponse>&& client,
ExternalProcessorCallbacks& callbacks, const Http::AsyncClient::StreamOptions& options,
Http::DecoderFilterWatermarkCallbacks* decoder_watermark_callbacks) {
Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) {
auto stream =
std::unique_ptr<ExternalProcessorStreamImpl>(new ExternalProcessorStreamImpl(callbacks));

if (stream->startStream(std::move(client), options)) {
if (stream->grpcSidestreamFlowControl()) {
stream->stream_->setWatermarkCallbacks(*decoder_watermark_callbacks);
stream->stream_->setWatermarkCallbacks(sidestream_watermark_callbacks);
}
return stream;
}
Expand Down
4 changes: 2 additions & 2 deletions source/extensions/filters/http/ext_proc/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class ExternalProcessorClientImpl : public ExternalProcessorClient {
start(ExternalProcessorCallbacks& callbacks,
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
const Http::AsyncClient::StreamOptions& options,
Http::DecoderFilterWatermarkCallbacks* decoder_watermark_callbacks) override;
Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) override;

private:
Grpc::AsyncClientManager& client_manager_;
Expand All @@ -46,7 +46,7 @@ class ExternalProcessorStreamImpl : public ExternalProcessorStream,
static ExternalProcessorStreamPtr
create(Grpc::AsyncClient<ProcessingRequest, ProcessingResponse>&& client,
ExternalProcessorCallbacks& callbacks, const Http::AsyncClient::StreamOptions& options,
Http::DecoderFilterWatermarkCallbacks* decoder_watermark_callbacks);
Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks);

void send(ProcessingRequest&& request, bool end_stream) override;
// Close the stream. This is idempotent and will return true if we
Expand Down
Loading

0 comments on commit e65ad8d

Please sign in to comment.