Skip to content

Commit

Permalink
ext_proc: flow control in observability mode (envoyproxy#35298)
Browse files Browse the repository at this point in the history
Commit Message: This PR is split from
envoyproxy#35077. The change is
specifically needed for ext_proc's observability mode where the stream
is deferred closed upon filter destruction (i.e., stream outlives the
filter object)

Risk Level: Low, existing runtime guard and observability mode only
Testing: e2e test and load test
Docs Changes: N/A
Release Notes: N/A
Platform Specific Features: N/A

---------

Signed-off-by: tyxia <[email protected]>
Signed-off-by: asingh-g <[email protected]>
  • Loading branch information
tyxia authored and asingh-g committed Aug 20, 2024
1 parent a0e61de commit ed55146
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 52 deletions.
3 changes: 2 additions & 1 deletion source/extensions/filters/http/ext_proc/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ void ExternalProcessorStreamImpl::send(envoy::service::ext_proc::v3::ProcessingR
bool ExternalProcessorStreamImpl::close() {
if (!stream_closed_) {
ENVOY_LOG(debug, "Closing gRPC stream");
if (grpc_side_stream_flow_control_) {
// Unregister the watermark callbacks, if any exist (e.g., filter is not destroyed yet)
if (grpc_side_stream_flow_control_ && callbacks_.has_value()) {
stream_.removeWatermarkCallbacks();
}
stream_.closeStream();
Expand Down
7 changes: 7 additions & 0 deletions source/extensions/filters/http/ext_proc/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ class ExternalProcessorStreamImpl : public ExternalProcessorStream,
// When the filter object is being destroyed, `callbacks_` (which is a OptRef to filter object)
// should be reset to avoid the dangling reference.
callbacks_.reset();

// Unregister the watermark callbacks(if any) to prevent access of filter callbacks after
// the filter object is destroyed.
if (grpc_side_stream_flow_control_ && !stream_closed_) {
stream_.removeWatermarkCallbacks();
}
}

// AsyncStreamCallbacks
Expand Down Expand Up @@ -87,6 +93,7 @@ class ExternalProcessorStreamImpl : public ExternalProcessorStream,
Grpc::AsyncStream<ProcessingRequest> stream_;
Http::AsyncClient::ParentContext grpc_context_;
bool stream_closed_ = false;
// Boolean flag initiated by runtime flag.
const bool grpc_side_stream_flow_control_;
};

Expand Down
11 changes: 7 additions & 4 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -396,15 +396,18 @@ void Filter::onDestroy() {
decoding_state_.stopMessageTimer();
encoding_state_.stopMessageTimer();

if (stream_ != nullptr) {
stream_->notifyFilterDestroy();
}

if (config_->observabilityMode()) {
// In observability mode where the main stream processing and side stream processing are
// asynchronous, it is possible that filter instance is destroyed before the side stream request
// arrives at ext_proc server. In order to prevent the data loss in this case, side stream
// closure is deferred upon filter destruction with a timer.

// First, release the referenced filter resource.
if (stream_ != nullptr) {
stream_->notifyFilterDestroy();
}

// Second, perform stream deferred closure.
deferredCloseStream();
} else {
// Perform immediate close on the stream otherwise.
Expand Down
84 changes: 37 additions & 47 deletions test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,35 @@ class ExtProcIntegrationTest : public HttpIntegrationTest,
verifyDownstreamResponse(*response, 200);
}

void testSidestreamPushbackDownstream(uint32_t body_size, bool check_downstream_flow_control) {
config_helper_.setBufferLimits(1024, 1024);

proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SKIP);
proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::STREAMED);
proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP);
initializeConfig();
HttpIntegrationTest::initialize();

std::string body_str = std::string(body_size, 'a');
auto response = sendDownstreamRequestWithBody(body_str, absl::nullopt);

bool end_stream = false;
int count = 0;
while (!end_stream) {
processRequestBodyMessage(
*grpc_upstreams_[0], count == 0 ? true : false,
[&end_stream](const HttpBody& body, BodyResponse&) {
end_stream = body.end_of_stream();
return true;
},
check_downstream_flow_control);
count++;
}
handleUpstreamRequest();

verifyDownstreamResponse(*response, 200);
}

envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor proto_config_{};
uint32_t max_message_timeout_ms_{0};
std::vector<FakeUpstream*> grpc_upstreams_;
Expand Down Expand Up @@ -4507,32 +4536,16 @@ TEST_P(ExtProcIntegrationTest, SidestreamPushbackDownstream) {
return;
}

config_helper_.setBufferLimits(1024, 1024);

proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SKIP);
proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::STREAMED);
proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP);
initializeConfig();
HttpIntegrationTest::initialize();

std::string body_str = std::string(16 * 1024, 'a');
auto response = sendDownstreamRequestWithBody(body_str, absl::nullopt);
testSidestreamPushbackDownstream(16 * 1024, true);
}

bool end_stream = false;
int count = 0;
while (!end_stream) {
processRequestBodyMessage(
*grpc_upstreams_[0], count == 0 ? true : false,
[&end_stream](const HttpBody& body, BodyResponse&) {
end_stream = body.end_of_stream();
return true;
},
/*check_downstream_flow_control=*/true);
count++;
TEST_P(ExtProcIntegrationTest, SidestreamPushbackDownstreamObservabilityMode) {
if (std::get<1>(std::get<0>(GetParam())) != Envoy::Grpc::ClientType::EnvoyGrpc) {
return;
}
handleUpstreamRequest();

verifyDownstreamResponse(*response, 200);
proto_config_.set_observability_mode(true);
testSidestreamPushbackDownstream(16 * 1024, true);
}

TEST_P(ExtProcIntegrationTest, SidestreamPushbackDownstreamRuntimeDisable) {
Expand All @@ -4543,30 +4556,7 @@ TEST_P(ExtProcIntegrationTest, SidestreamPushbackDownstreamRuntimeDisable) {
scoped_runtime_.mergeValues(
{{"envoy.reloadable_features.grpc_side_stream_flow_control", "false"}});

config_helper_.setBufferLimits(1024, 1024);

proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SKIP);
proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::STREAMED);
proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP);
initializeConfig();
HttpIntegrationTest::initialize();

std::string body_str = std::string(1030, 'a');
auto response = sendDownstreamRequestWithBody(body_str, absl::nullopt);

bool end_stream = false;
int count = 0;
while (!end_stream) {
processRequestBodyMessage(*grpc_upstreams_[0], count == 0 ? true : false,
[&end_stream](const HttpBody& body, BodyResponse&) {
end_stream = body.end_of_stream();
return true;
});
count++;
}
handleUpstreamRequest();

verifyDownstreamResponse(*response, 200);
testSidestreamPushbackDownstream(1030, false);
}

} // namespace Envoy

0 comments on commit ed55146

Please sign in to comment.