Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ext proc http functionality support #35740

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
28094b9
Adding HTTP service support for Envoy external processing
yanjunxiang-google Jul 30, 2024
b0c2271
adding not-implemented-hide
yanjunxiang-google Jul 30, 2024
592c1e1
addressing comments
yanjunxiang-google Jul 30, 2024
e22aa8c
change the proto field name
yanjunxiang-google Jul 30, 2024
a017259
fix format error
yanjunxiang-google Jul 30, 2024
7e0a578
addressing comments
yanjunxiang-google Jul 31, 2024
6d3097f
Merge branch 'main' of https://github.com/envoyproxy/envoy into ext_p…
yanjunxiang-google Aug 1, 2024
dad36f0
Adding HTTP client for the ext_proc filter
yanjunxiang-google Aug 12, 2024
d3e1577
adding http_client
yanjunxiang-google Aug 13, 2024
c9c2b22
adding HTTP client framework
yanjunxiang-google Aug 13, 2024
0fcb348
moving client_base to http_client directory
yanjunxiang-google Aug 13, 2024
f8b793a
adding the http_client unit-test framework
yanjunxiang-google Aug 14, 2024
dadad62
fix format
yanjunxiang-google Aug 14, 2024
72d82bb
fix testing issue
yanjunxiang-google Aug 14, 2024
b787a76
Integrate HTTP client to ext_proc filter
yanjunxiang-google Aug 18, 2024
e964444
merge upstream main
yanjunxiang-google Aug 18, 2024
bcedfd9
refactor the base client class inheritance
yanjunxiang-google Aug 19, 2024
6f2e962
getting the basic functionality working
yanjunxiang-google Aug 20, 2024
3798f28
adding integration test
yanjunxiang-google Aug 20, 2024
a93234a
revert changes in ext_proc_integration_test.cc
yanjunxiang-google Aug 20, 2024
00d56fc
adding integration tests
yanjunxiang-google Aug 22, 2024
26df644
fix format
yanjunxiang-google Aug 22, 2024
4aee4be
adding onError, cancel support
yanjunxiang-google Aug 22, 2024
a08e5ad
fixing tests
yanjunxiang-google Aug 22, 2024
e398cac
fixing tests
yanjunxiang-google Aug 22, 2024
dd63ccc
fixing test issue
yanjunxiang-google Aug 23, 2024
89181e8
adding content-type header
yanjunxiang-google Aug 23, 2024
c974645
adding x-request-id header
yanjunxiang-google Sep 3, 2024
9b1611a
adding HTTP uri timeout support
yanjunxiang-google Sep 4, 2024
cf0b3fa
adding tests
yanjunxiang-google Sep 4, 2024
1ac6ee0
simplify tests
yanjunxiang-google Sep 4, 2024
99b17ea
adding tests
yanjunxiang-google Sep 4, 2024
c93f825
adding body and trailer tests
yanjunxiang-google Sep 6, 2024
f5e1881
Merge branch 'main' of https://github.com/envoyproxy/envoy into ext_p…
yanjunxiang-google Sep 6, 2024
4975682
adding body and trailer test
yanjunxiang-google Sep 7, 2024
e16faf1
fixing format
yanjunxiang-google Sep 9, 2024
a79550a
using nghttp2 as codec in the test
yanjunxiang-google Sep 11, 2024
5077778
adding more tests
yanjunxiang-google Sep 11, 2024
143ad26
merge upstream main
yanjunxiang-google Sep 11, 2024
8c4f3e4
creating HTTP clien
yanjunxiang-google Sep 11, 2024
2082535
Merge branch 'main' of https://github.com/envoyproxy/envoy into ext_p…
yanjunxiang-google Sep 12, 2024
5f7eafb
remove body/trailer sending for http_service
yanjunxiang-google Sep 12, 2024
c27d975
adding configuration validation test
yanjunxiang-google Sep 12, 2024
c016032
adding server context config test for http_service config
yanjunxiang-google Sep 12, 2024
3b01d5a
adding API doc and release notes
yanjunxiang-google Sep 12, 2024
1b0f9fb
fix doc
yanjunxiang-google Sep 12, 2024
b558660
Empty-Commit
yanjunxiang-google Sep 12, 2024
713355f
abstract send request
yanjunxiang-google Sep 26, 2024
1c9078d
merge upstream main, also restore stream_ pointer back to filter
yanjunxiang-google Sep 26, 2024
a0d2435
fix format
yanjunxiang-google Sep 26, 2024
9bb769e
moving client_base.h into up level
yanjunxiang-google Sep 26, 2024
19c494d
fix format
yanjunxiang-google Sep 26, 2024
e54c16f
set callbacks in client
yanjunxiang-google Sep 26, 2024
613001a
fix CI issue
yanjunxiang-google Sep 27, 2024
e52291e
delete wrong file
yanjunxiang-google Sep 27, 2024
5514193
removing unused runtime
yanjunxiang-google Sep 27, 2024
bf515cf
Merge branch 'main' of https://github.com/envoyproxy/envoy into ext_p…
yanjunxiang-google Sep 27, 2024
5c23475
removing size in BUILD
yanjunxiang-google Sep 27, 2024
d190cbe
adding rbe_pool
yanjunxiang-google Sep 27, 2024
3ffd9e7
addressing API comments
yanjunxiang-google Sep 27, 2024
f9f13ce
adding proto import package
yanjunxiang-google Sep 27, 2024
038c6d7
adding cancel() in ClientBase
yanjunxiang-google Sep 28, 2024
5ffb042
addressing comments
yanjunxiang-google Sep 28, 2024
82f63c5
refactor out request message building code
yanjunxiang-google Sep 28, 2024
8552778
addressing comments
yanjunxiang-google Sep 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/envoy/extensions/filters/http/ext_proc/v3/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ api_proto_package(
"//envoy/config/core/v3:pkg",
"//envoy/type/matcher/v3:pkg",
"@com_github_cncf_xds//udpa/annotations:pkg",
"@com_github_cncf_xds//xds/annotations/v3:pkg",
],
)
37 changes: 35 additions & 2 deletions api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import "envoy/type/matcher/v3/string.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/struct.proto";

import "xds/annotations/v3/status.proto";

import "udpa/annotations/migrate.proto";
import "udpa/annotations/status.proto";
import "validate/validate.proto";
Expand Down Expand Up @@ -131,8 +133,39 @@ message ExternalProcessor {
// Only one of ``http_service`` or
// :ref:`grpc_service <envoy_v3_api_field_extensions.filters.http.ext_proc.v3.ExternalProcessor.grpc_service>`.
// can be set. It is required that one of them must be set.
ExtProcHttpService http_service = 20
[(udpa.annotations.field_migrate).oneof_promotion = "ext_proc_service_type"];
//
// If ``http_service`` is set, the
// :ref:`processing_mode <envoy_v3_api_field_extensions.filters.http.ext_proc.v3.ExternalProcessor.processing_mode>`
// can not be configured to send any body or trailers. i.e, http_service only supports
// sending request or response headers to the side stream server.
//
// With this configuration, Envoy behavior:
//
// 1. The headers are first put in a proto message
// :ref:`ProcessingRequest <envoy_v3_api_msg_service.ext_proc.v3.ProcessingRequest>`.
//
// 2. This proto message is then transcoded into a JSON text.
//
// 3. Envoy then sends a HTTP POST message with content-type as "application/json",
// and this JSON text as body to the side stream server.
//
// After the side-stream receives this HTTP request message, it is expected to do as follows:
//
// 1. It converts the body, which is a JSON string, into a ``ProcessingRequest``
// proto message to examine and mutate the headers.
//
// 2. It then sets the mutated headers into a new proto message
// :ref:`ProcessingResponse <envoy_v3_api_msg_service.ext_proc.v3.ProcessingResponse>`.
//
// 3. It converts ``ProcessingResponse`` proto message into a JSON text.
//
// 4. It then sends a HTTP response back to Envoy with status code as "200",
// content-type as "application/json" and sets the JSON text as the body.
//
ExtProcHttpService http_service = 20 [
(udpa.annotations.field_migrate).oneof_promotion = "ext_proc_service_type",
(xds.annotations.v3.field_status).work_in_progress = true
];

// By default, if the gRPC stream cannot be established, or if it is closed
// prematurely with an error, the filter will fail. Specifically, if the
Expand Down
13 changes: 13 additions & 0 deletions source/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ licenses(["notice"]) # Apache 2

envoy_extension_package()

envoy_cc_library(
name = "client_base_interface",
hdrs = ["client_base.h"],
tags = ["skip_on_windows"],
deps = [
"@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto",
],
)

envoy_cc_library(
name = "ext_proc",
srcs = [
Expand All @@ -21,6 +30,7 @@ envoy_cc_library(
],
tags = ["skip_on_windows"],
deps = [
":client_base_interface",
":client_interface",
":matching_utils_lib",
":mutation_utils_lib",
Expand All @@ -34,6 +44,7 @@ envoy_cc_library(
"//source/common/runtime:runtime_features_lib",
"//source/extensions/filters/common/mutation_rules:mutation_rules_lib",
"//source/extensions/filters/http/common:pass_through_filter_lib",
"//source/extensions/filters/http/ext_proc/http_client:http_client_lib",
"@com_google_absl//absl/status",
"@com_google_absl//absl/strings:str_format",
"@com_google_absl//absl/strings:string_view",
Expand All @@ -53,6 +64,7 @@ envoy_cc_extension(
":client_lib",
":ext_proc",
"//source/extensions/filters/http/common:factory_base_lib",
"//source/extensions/filters/http/ext_proc/http_client:http_client_lib",
"@envoy_api//envoy/extensions/filters/http/ext_proc/v3:pkg_cc_proto",
],
)
Expand All @@ -62,6 +74,7 @@ envoy_cc_library(
hdrs = ["client.h"],
tags = ["skip_on_windows"],
deps = [
":client_base_interface",
"//envoy/grpc:async_client_manager_interface",
"//envoy/grpc:status",
"//envoy/stream_info:stream_info_interface",
Expand Down
9 changes: 4 additions & 5 deletions source/extensions/filters/http/ext_proc/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@
#include "envoy/stream_info/stream_info.h"

#include "source/common/http/sidestream_watermark.h"
#include "source/extensions/filters/http/ext_proc/client_base.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {

class ExternalProcessorStream {
class ExternalProcessorStream : public StreamBase {
public:
virtual ~ExternalProcessorStream() = default;
virtual void send(envoy::service::ext_proc::v3::ProcessingRequest&& request,
Expand All @@ -30,7 +31,7 @@ class ExternalProcessorStream {

using ExternalProcessorStreamPtr = std::unique_ptr<ExternalProcessorStream>;

class ExternalProcessorCallbacks {
class ExternalProcessorCallbacks : public RequestCallbacks {
public:
virtual ~ExternalProcessorCallbacks() = default;
virtual void onReceiveMessage(
Expand All @@ -40,16 +41,14 @@ class ExternalProcessorCallbacks {
virtual void logGrpcStreamInfo() PURE;
};

class ExternalProcessorClient {
class ExternalProcessorClient : public ClientBase {
public:
virtual ~ExternalProcessorClient() = default;
virtual ExternalProcessorStreamPtr
start(ExternalProcessorCallbacks& callbacks,
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
const Http::AsyncClient::StreamOptions& options,
Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) PURE;
virtual ExternalProcessorStream* stream() PURE;
virtual void setStream(ExternalProcessorStream* stream) PURE;
};

using ExternalProcessorClientPtr = std::unique_ptr<ExternalProcessorClient>;
Expand Down
47 changes: 47 additions & 0 deletions source/extensions/filters/http/ext_proc/client_base.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#pragma once

#include <memory>

#include "envoy/service/ext_proc/v3/external_processor.pb.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace ExternalProcessing {

/**
* Async callbacks used during external processing.
*/
class RequestCallbacks {
public:
virtual ~RequestCallbacks() = default;
virtual void onComplete(envoy::service::ext_proc::v3::ProcessingResponse& response) PURE;
virtual void onError() PURE;
};

/**
* Stream base class used during external processing.
*/
class StreamBase {
public:
virtual ~StreamBase() = default;
};

/**
* Async client base class used during external processing.
*/
class ClientBase {
public:
virtual ~ClientBase() = default;
virtual void sendRequest(envoy::service::ext_proc::v3::ProcessingRequest&& request,
bool end_stream, const uint64_t stream_id, RequestCallbacks* callbacks,
StreamBase* stream) PURE;
virtual void cancel() PURE;
};

using ClientBasePtr = std::unique_ptr<ClientBase>;

} // namespace ExternalProcessing
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
7 changes: 7 additions & 0 deletions source/extensions/filters/http/ext_proc/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ ExternalProcessorStreamPtr ExternalProcessorClientImpl::start(
sidestream_watermark_callbacks);
}

void ExternalProcessorClientImpl::sendRequest(
envoy::service::ext_proc::v3::ProcessingRequest&& request, bool end_stream, const uint64_t,
RequestCallbacks*, StreamBase* stream) {
ExternalProcessorStream* grpc_stream = dynamic_cast<ExternalProcessorStream*>(stream);
yanjunxiang-google marked this conversation as resolved.
Show resolved Hide resolved
grpc_stream->send(std::move(request), end_stream);
}

ExternalProcessorStreamPtr ExternalProcessorStreamImpl::create(
Grpc::AsyncClient<ProcessingRequest, ProcessingResponse>&& client,
ExternalProcessorCallbacks& callbacks, const Http::AsyncClient::StreamOptions& options,
Expand Down
9 changes: 4 additions & 5 deletions source/extensions/filters/http/ext_proc/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,14 @@ class ExternalProcessorClientImpl : public ExternalProcessorClient {
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
const Http::AsyncClient::StreamOptions& options,
Http::StreamFilterSidestreamWatermarkCallbacks& sidestream_watermark_callbacks) override;
ExternalProcessorStream* stream() override { return stream_; }
void setStream(ExternalProcessorStream* stream) override { stream_ = stream; }
void sendRequest(envoy::service::ext_proc::v3::ProcessingRequest&& request, bool end_stream,
const uint64_t stream_id, RequestCallbacks* callbacks,
StreamBase* stream) override;
void cancel() override {}

private:
Grpc::AsyncClientManager& client_manager_;
Stats::Scope& scope_;
// The gRPC stream to the external processor, which will be opened
// when it's time to send the first message.
ExternalProcessorStream* stream_ = nullptr;
};

class ExternalProcessorStreamImpl : public ExternalProcessorStream,
Expand Down
50 changes: 33 additions & 17 deletions source/extensions/filters/http/ext_proc/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "source/extensions/filters/common/expr/evaluator.h"
#include "source/extensions/filters/http/ext_proc/client_impl.h"
#include "source/extensions/filters/http/ext_proc/ext_proc.h"
#include "source/extensions/filters/http/ext_proc/http_client/http_client_impl.h"

namespace Envoy {
namespace Extensions {
Expand All @@ -22,15 +23,22 @@ ExternalProcessingFilterConfig::createFilterFactoryFromProtoTyped(
proto_config, std::chrono::milliseconds(message_timeout_ms), max_message_timeout_ms,
dual_info.scope, stats_prefix, dual_info.is_upstream,
Envoy::Extensions::Filters::Common::Expr::getBuilder(context), context);

return [filter_config = std::move(filter_config), grpc_service = proto_config.grpc_service(),
&context, dual_info](Http::FilterChainFactoryCallbacks& callbacks) {
auto client = std::make_unique<ExternalProcessorClientImpl>(
context.clusterManager().grpcAsyncClientManager(), dual_info.scope);

callbacks.addStreamFilter(Http::StreamFilterSharedPtr{
std::make_shared<Filter>(filter_config, std::move(client), grpc_service)});
};
if (proto_config.has_grpc_service()) {
return [filter_config = std::move(filter_config), &context,
dual_info](Http::FilterChainFactoryCallbacks& callbacks) {
auto client = std::make_unique<ExternalProcessorClientImpl>(
context.clusterManager().grpcAsyncClientManager(), dual_info.scope);
callbacks.addStreamFilter(
Http::StreamFilterSharedPtr{std::make_shared<Filter>(filter_config, std::move(client))});
};
} else {
return [proto_config = std::move(proto_config), filter_config = std::move(filter_config),
&context](Http::FilterChainFactoryCallbacks& callbacks) {
auto client = std::make_unique<ExtProcHttpClient>(proto_config, context);
callbacks.addStreamFilter(
Http::StreamFilterSharedPtr{std::make_shared<Filter>(filter_config, std::move(client))});
};
}
}

Router::RouteSpecificFilterConfigConstSharedPtr
Expand All @@ -54,14 +62,22 @@ ExternalProcessingFilterConfig::createFilterFactoryFromProtoWithServerContextTyp
server_context.scope(), stats_prefix, false,
Envoy::Extensions::Filters::Common::Expr::getBuilder(server_context), server_context);

return [filter_config = std::move(filter_config), grpc_service = proto_config.grpc_service(),
&server_context](Http::FilterChainFactoryCallbacks& callbacks) {
auto client = std::make_unique<ExternalProcessorClientImpl>(
server_context.clusterManager().grpcAsyncClientManager(), server_context.scope());

callbacks.addStreamFilter(Http::StreamFilterSharedPtr{
std::make_shared<Filter>(filter_config, std::move(client), grpc_service)});
};
if (proto_config.has_grpc_service()) {
return [filter_config = std::move(filter_config),
&server_context](Http::FilterChainFactoryCallbacks& callbacks) {
auto client = std::make_unique<ExternalProcessorClientImpl>(
server_context.clusterManager().grpcAsyncClientManager(), server_context.scope());
callbacks.addStreamFilter(
Http::StreamFilterSharedPtr{std::make_shared<Filter>(filter_config, std::move(client))});
};
} else {
return [proto_config = std::move(proto_config), filter_config = std::move(filter_config),
&server_context](Http::FilterChainFactoryCallbacks& callbacks) {
auto client = std::make_unique<ExtProcHttpClient>(proto_config, server_context);
callbacks.addStreamFilter(
Http::StreamFilterSharedPtr{std::make_shared<Filter>(filter_config, std::move(client))});
};
}
}

LEGACY_REGISTER_FACTORY(ExternalProcessingFilterConfig,
Expand Down
Loading