Skip to content

Commit

Permalink
ext_proc: Add more tests for observability mode (envoyproxy#35392)
Browse files Browse the repository at this point in the history
<!--
!!!ATTENTION!!!

If you are fixing *any* crash or *any* potential security issue, *do
not*
open a pull request in this repo. Please report the issue via emailing
[email protected] where the issue will be triaged
appropriately.
Thank you in advance for helping to keep Envoy secure.

!!!ATTENTION!!!

For an explanation of how to fill out the fields, please see the
relevant section
in
[PULL_REQUESTS.md](https://github.com/envoyproxy/envoy/blob/main/PULL_REQUESTS.md)
-->

Commit Message: (1) Test SendAndReceiveDynamicMetadata (2) verify the
receive body message (3) Added a streaming integration test which tests
large chunk of bodies
`PostAndProcessStreamedRequestBodyObservabilityMode`

Risk Level: low, test only
Testing: integration tests
Docs Changes: N/A
Release Notes: N/A
Platform Specific Features: N/A

---------

Signed-off-by: tyxia <[email protected]>
  • Loading branch information
tyxia authored Jul 25, 2024
1 parent ca35967 commit eeb367e
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 42 deletions.
121 changes: 79 additions & 42 deletions test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,45 @@ class ExtProcIntegrationTest : public HttpIntegrationTest,
verifyDownstreamResponse(*response, 200);
}

void testSendDyanmicMetadata() {
ProtobufWkt::Struct test_md_struct;
(*test_md_struct.mutable_fields())["foo"].set_string_value("value from ext_proc");

ProtobufWkt::Value md_val;
*(md_val.mutable_struct_value()) = test_md_struct;

processGenericMessage(
*grpc_upstreams_[0], true,
[md_val](const ProcessingRequest& req, ProcessingResponse& resp) {
// Verify the processing request contains the untyped metadata we injected.
EXPECT_TRUE(req.metadata_context().filter_metadata().contains("forwarding_ns_untyped"));
const ProtobufWkt::Struct& fwd_metadata =
req.metadata_context().filter_metadata().at("forwarding_ns_untyped");
EXPECT_EQ(1, fwd_metadata.fields_size());
EXPECT_TRUE(fwd_metadata.fields().contains("foo"));
EXPECT_EQ("value from set_metadata", fwd_metadata.fields().at("foo").string_value());

// Verify the processing request contains the typed metadata we injected.
EXPECT_TRUE(
req.metadata_context().typed_filter_metadata().contains("forwarding_ns_typed"));
const ProtobufWkt::Any& fwd_typed_metadata =
req.metadata_context().typed_filter_metadata().at("forwarding_ns_typed");
EXPECT_EQ("type.googleapis.com/envoy.extensions.filters.http.set_metadata.v3.Metadata",
fwd_typed_metadata.type_url());
envoy::extensions::filters::http::set_metadata::v3::Metadata typed_md_from_req;
fwd_typed_metadata.UnpackTo(&typed_md_from_req);
EXPECT_EQ("typed_value from set_metadata", typed_md_from_req.metadata_namespace());

// Spoof the response to contain receiving metadata.
HeadersResponse headers_resp;
(*resp.mutable_request_headers()) = headers_resp;
auto mut_md_fields = resp.mutable_dynamic_metadata()->mutable_fields();
(*mut_md_fields).emplace("receiving_ns_untyped", md_val);

return true;
});
}

void testSidestreamPushbackDownstream(uint32_t body_size, bool check_downstream_flow_control) {
config_helper_.setBufferLimits(1024, 1024);

Expand Down Expand Up @@ -3750,40 +3789,7 @@ TEST_P(ExtProcIntegrationTest, SendAndReceiveDynamicMetadata) {

auto response = sendDownstreamRequest(absl::nullopt);

ProtobufWkt::Struct test_md_struct;
(*test_md_struct.mutable_fields())["foo"].set_string_value("value from ext_proc");

ProtobufWkt::Value md_val;
*(md_val.mutable_struct_value()) = test_md_struct;

processGenericMessage(
*grpc_upstreams_[0], true, [md_val](const ProcessingRequest& req, ProcessingResponse& resp) {
// Verify the processing request contains the untyped metadata we injected.
EXPECT_TRUE(req.metadata_context().filter_metadata().contains("forwarding_ns_untyped"));
const ProtobufWkt::Struct& fwd_metadata =
req.metadata_context().filter_metadata().at("forwarding_ns_untyped");
EXPECT_EQ(1, fwd_metadata.fields_size());
EXPECT_TRUE(fwd_metadata.fields().contains("foo"));
EXPECT_EQ("value from set_metadata", fwd_metadata.fields().at("foo").string_value());

// Verify the processing request contains the typed metadata we injected.
EXPECT_TRUE(req.metadata_context().typed_filter_metadata().contains("forwarding_ns_typed"));
const ProtobufWkt::Any& fwd_typed_metadata =
req.metadata_context().typed_filter_metadata().at("forwarding_ns_typed");
EXPECT_EQ("type.googleapis.com/envoy.extensions.filters.http.set_metadata.v3.Metadata",
fwd_typed_metadata.type_url());
envoy::extensions::filters::http::set_metadata::v3::Metadata typed_md_from_req;
fwd_typed_metadata.UnpackTo(&typed_md_from_req);
EXPECT_EQ("typed_value from set_metadata", typed_md_from_req.metadata_namespace());

// Spoof the response to contain receiving metadata.
HeadersResponse headers_resp;
(*resp.mutable_request_headers()) = headers_resp;
auto mut_md_fields = resp.mutable_dynamic_metadata()->mutable_fields();
(*mut_md_fields).emplace("receiving_ns_untyped", md_val);

return true;
});
testSendDyanmicMetadata();

handleUpstreamRequest();

Expand All @@ -3800,6 +3806,34 @@ TEST_P(ExtProcIntegrationTest, SendAndReceiveDynamicMetadata) {
verifyDownstreamResponse(*response, 200);
}

TEST_P(ExtProcIntegrationTest, SendAndReceiveDynamicMetadataObservabilityMode) {
proto_config_.set_observability_mode(true);
proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SEND);
proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP);

auto* md_opts = proto_config_.mutable_metadata_options();
md_opts->mutable_forwarding_namespaces()->add_untyped("forwarding_ns_untyped");
md_opts->mutable_forwarding_namespaces()->add_typed("forwarding_ns_typed");
md_opts->mutable_receiving_namespaces()->add_untyped("receiving_ns_untyped");

ConfigOptions config_option = {};
config_option.add_metadata = true;
initializeConfig(config_option);
HttpIntegrationTest::initialize();

auto response = sendDownstreamRequest(absl::nullopt);

testSendDyanmicMetadata();

handleUpstreamRequest();

ASSERT_TRUE(response->waitForEndStream());
ASSERT_TRUE(response->complete());
// No headers from dynamic metadata response as the response is ignored in observability mode.
EXPECT_THAT(response->headers(), HasNoHeader(Http::LowerCaseString("receiving_ns_untyped.foo")));
verifyDownstreamResponse(*response, 200);
}

#if defined(USE_CEL_PARSER)
TEST_P(ExtProcIntegrationTest, RequestResponseAttributes) {
proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SEND);
Expand Down Expand Up @@ -4148,14 +4182,17 @@ TEST_P(ExtProcIntegrationTest, ObservabilityModeWithBody) {
const std::string original_body_str = "Hello";
auto response = sendDownstreamRequestWithBody(original_body_str, absl::nullopt);

processRequestBodyMessage(
*grpc_upstreams_[0], true, [](const HttpBody& body, BodyResponse& body_resp) {
EXPECT_TRUE(body.end_of_stream());
// Try to mutate the body.
auto* body_mut = body_resp.mutable_response()->mutable_body_mutation();
body_mut->set_body("Hello, World!");
return true;
});
processRequestBodyMessage(*grpc_upstreams_[0], true,
[&original_body_str](const HttpBody& body, BodyResponse& body_resp) {
// Verify the received body message.
EXPECT_EQ(body.body(), original_body_str);
EXPECT_TRUE(body.end_of_stream());
// Try to mutate the body.
auto* body_mut =
body_resp.mutable_response()->mutable_body_mutation();
body_mut->set_body("Hello, World!");
return true;
});

ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_));
ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,55 @@ TEST_P(StreamingIntegrationTest, PostAndProcessStreamedRequestBodyAndClose) {
EXPECT_THAT(client_response_->headers(), Http::HttpStatusIs("200"));
}

TEST_P(StreamingIntegrationTest, PostAndProcessStreamedRequestBodyObservabilityMode) {
const uint32_t num_chunks = 152;
const uint32_t chunk_size = 1000;
uint32_t total_size = num_chunks * chunk_size;

test_processor_.start(
ipVersion(),
[total_size](grpc::ServerReaderWriter<ProcessingResponse, ProcessingRequest>* stream) {
// Expect a request_headers message as the first message on the stream,
// and send back an empty response.
ProcessingRequest header_req;
ASSERT_TRUE(stream->Read(&header_req));
ASSERT_TRUE(header_req.has_request_headers());
ProcessingResponse header_resp;
header_resp.mutable_request_headers();
stream->Write(header_resp);

// Now, expect a bunch of request_body messages and respond to each.
// Count up the number of bytes we receive and make sure that we get
// them all.
uint32_t received_size = 0;
ProcessingRequest body_req;
do {
ASSERT_TRUE(stream->Read(&body_req));
ASSERT_TRUE(body_req.has_request_body());
received_size += body_req.request_body().body().size();
ProcessingResponse body_resp;
body_resp.mutable_request_body();
stream->Write(body_resp);
} while (!body_req.request_body().end_of_stream());

EXPECT_EQ(received_size, total_size);
});

// Enable observability mode.
proto_config_.set_observability_mode(true);
proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::STREAMED);
initializeConfig();
HttpIntegrationTest::initialize();
sendPostRequest(num_chunks, chunk_size, [total_size](Http::HeaderMap& headers) {
// This header tells the "autonomous upstream" that will respond to our
// request to throw an error if it doesn't get the right number of bytes.
headers.addCopy(LowerCaseString("expect_request_size_bytes"), total_size);
});

ASSERT_TRUE(client_response_->waitForEndStream());
EXPECT_TRUE(client_response_->complete());
EXPECT_THAT(client_response_->headers(), Http::HttpStatusIs("200"));
}
// Do an HTTP GET that will return a body smaller than the buffer limit, which we process
// in the processor.
TEST_P(StreamingIntegrationTest, DISABLED_GetAndProcessBufferedResponseBody) {
Expand Down

0 comments on commit eeb367e

Please sign in to comment.