Skip to content

Commit

Permalink
inbound: Fix gRPC response classification (#2496)
Browse files Browse the repository at this point in the history
ecaaf39 changed the proxy's behavior with regard to creating [default
response classifiers][default]: the defaults used to support detecting
gRPC response (regardless of the request properties).

To fix this, we modify the metrics module that uses responses
classifiers to *require* them without inferring defaults. This enforces
the intended usage pattern so that we do not silently and implicitly
fall back to the default behavior.

This change also updates the `NewClassify` module that inserts the
response classifier request extension so that overrides are supported.
We then can install a default classifier early in request processing and
override it only if specified by a route configuration.

To support this change, the http-metrics crate is updated to support
querying response_total metrics without stringifying everything.

[default]: ecaaf39#diff-372e8a8a57b1fad5d94f37d2f77fdc7a45bcf708782475424b75d671f99ea1a0L97-L103
  • Loading branch information
olix0r authored and adleong committed Nov 16, 2023
1 parent ba4de1a commit 185421d
Show file tree
Hide file tree
Showing 13 changed files with 216 additions and 29 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,7 @@ dependencies = [
"linkerd-app-core",
"linkerd-app-test",
"linkerd-http-access-log",
"linkerd-http-metrics",
"linkerd-idle-cache",
"linkerd-io",
"linkerd-meshtls",
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/admin/src/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ impl Config {
let admin = crate::server::Admin::new(report, ready, shutdown, trace);
let admin = svc::stack(move |_| admin.clone())
.push(metrics.proxy.http_endpoint.to_layer::<classify::Response, _, Permitted>())
.push(classify::NewClassify::layer_default())
.push_map_target(|(permit, http)| Permitted { permit, http })
.push(inbound::policy::NewHttpPolicy::layer(metrics.http_authz.clone()))
.push(Rescue::layer())
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/core/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ impl Config {
.lift_new()
.push(self::balance::layer(dns, resolve_backoff))
.push(metrics.to_layer::<classify::Response, _, _>())
.push(classify::NewClassify::layer_default())
// This buffer allows a resolver client to be shared across stacks.
// No load shed is applied here, however, so backpressure may leak
// into the caller task.
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/inbound/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ libfuzzer-sys = { version = "0.4", features = ["arbitrary-derive"] }
[dev-dependencies]
hyper = { version = "0.14", features = ["http1", "http2"] }
linkerd-app-test = { path = "../test" }
linkerd-http-metrics = { path = "../../http-metrics", features = ["test-util"] }
linkerd-idle-cache = { path = "../../idle-cache", features = ["test-util"] }
linkerd-io = { path = "../../io", features = ["tokio-test"] }
linkerd-meshtls = { path = "../../meshtls", features = ["rustls"] }
Expand Down
21 changes: 8 additions & 13 deletions linkerd/app/inbound/src/http/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ impl<C> Inbound<C> {
// Attempts to discover a service profile for each logical target (as
// informed by the request's headers). The stack is cached until a
// request has not been received for `cache_max_idle_age`.
let router = http.clone()
let router = http
.clone()
.check_new_service::<Logical, http::Request<http::BoxBody>>()
.push_map_target(|p: Profile| p.logical)
.push(profiles::http::NewProxyRouter::layer(
Expand All @@ -164,6 +165,7 @@ impl<C> Inbound<C> {
.to_layer::<classify::Response, _, _>(),
)
.push_on_service(http::BoxResponse::layer())
// Configure a per-route response classifier based on the profile.
.push(classify::NewClassify::layer())
.push_http_insert_target::<profiles::http::Route>()
.push_map_target(|(route, profile)| ProfileRoute { route, profile })
Expand All @@ -186,10 +188,7 @@ impl<C> Inbound<C> {
}
Ok(svc::Either::B(logical))
},
http.clone()
.push_on_service(svc::MapErr::layer(Error::from))
.check_new_service::<Logical, http::Request<_>>()
.into_inner(),
http.clone().into_inner(),
)
.check_new_service::<(Option<profiles::Receiver>, Logical), http::Request<_>>();

Expand Down Expand Up @@ -229,8 +228,7 @@ impl<C> Inbound<C> {
// Skip the profile stack if it takes too long to become ready.
.push_when_unready(config.profile_skip_timeout, http.into_inner())
.push_on_service(
svc::layers()
.push(rt.metrics.proxy.stack.layer(stack_labels("http", "logical")))
rt.metrics.proxy.stack.layer(stack_labels("http", "logical")),
)
.push(svc::NewQueue::layer_via(config.http_request_queue))
.push_new_idle_cached(config.discovery_idle_timeout)
Expand All @@ -239,6 +237,9 @@ impl<C> Inbound<C> {
.push(http::Retain::layer())
.push(http::BoxResponse::layer()),
)
// Configure default response classification early. It may be
// overridden by profile routes above.
.push(classify::NewClassify::layer_default())
.check_new_service::<Logical, http::Request<http::BoxBody>>()
.instrument(|t: &Logical| {
let name = t.logical.as_ref().map(tracing::field::display);
Expand Down Expand Up @@ -414,12 +415,6 @@ impl Param<metrics::EndpointLabels> for Logical {
}
}

impl Param<classify::Request> for Logical {
fn param(&self) -> classify::Request {
classify::Request::default()
}
}

impl tap::Inspect for Logical {
fn src_addr<B>(&self, req: &http::Request<B>) -> Option<SocketAddr> {
req.extensions().get::<Remote<ClientAddr>>().map(|a| **a)
Expand Down
122 changes: 119 additions & 3 deletions linkerd/app/inbound/src/http/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use crate::{
},
Config, Inbound,
};
use hyper::{client::conn::Builder as ClientBuilder, Body, Request, Response};
use hyper::{body::HttpBody, client::conn::Builder as ClientBuilder, Body, Request, Response};
use linkerd_app_core::{
classify,
errors::respond::L5D_PROXY_ERROR,
identity, io,
identity, io, metrics,
proxy::http,
svc::{self, NewService, Param},
tls,
Expand All @@ -19,6 +20,7 @@ use linkerd_app_core::{
use linkerd_app_test::connect::ConnectFuture;
use linkerd_tracing::test::trace_init;
use std::{net::SocketAddr, sync::Arc};
use tokio::time;
use tracing::Instrument;

fn build_server<I>(
Expand Down Expand Up @@ -469,6 +471,84 @@ async fn grpc_unmeshed_response_error_header() {
let _ = bg.await;
}

#[tokio::test(flavor = "current_thread")]
async fn grpc_response_class() {
let _trace = trace_init();

// Build a mock connector serves a gRPC server that returns errors.
let connect = {
let mut server = hyper::server::conn::Http::new();
server.http2_only(true);
support::connect().endpoint_fn_boxed(
Target::addr(),
grpc_status_server(server, tonic::Code::Unknown),
)
};

// Build a client using the connect that always errors.
let mut client = ClientBuilder::new();
client.http2_only(true);
let profiles = profile::resolver();
let profile_tx =
profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap());
profile_tx.send(profile::Profile::default()).unwrap();
let cfg = default_config();
let (rt, _shutdown) = runtime();
let metrics = rt
.metrics
.clone()
.http_endpoint
.into_report(time::Duration::from_secs(3600));
let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_h2());
let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await;

// Send a request and assert that it is OK with the expected header
// message.
let req = Request::builder()
.method(http::Method::POST)
.uri("http://foo.svc.cluster.local:5550")
.header(http::header::CONTENT_TYPE, "application/grpc")
.body(Body::default())
.unwrap();

let mut response = http_util::http_request(&mut client, req).await.unwrap();
assert_eq!(response.status(), http::StatusCode::OK);

response.body_mut().data().await;
let trls = response.body_mut().trailers().await.unwrap().unwrap();
assert_eq!(trls.get("grpc-status").unwrap().to_str().unwrap(), "2");

let response_total = metrics
.get_response_total(
&metrics::EndpointLabels::Inbound(metrics::InboundEndpointLabels {
tls: Target::meshed_h2().1,
authority: Some("foo.svc.cluster.local:5550".parse().unwrap()),
target_addr: "127.0.0.1:80".parse().unwrap(),
policy: metrics::RouteAuthzLabels {
route: metrics::RouteLabels {
server: metrics::ServerLabel(Arc::new(policy::Meta::Resource {
group: "policy.linkerd.io".into(),
kind: "server".into(),
name: "testsrv".into(),
})),
route: policy::Meta::new_default("default"),
},
authz: Arc::new(policy::Meta::Resource {
group: "policy.linkerd.io".into(),
kind: "serverauthorization".into(),
name: "testsaz".into(),
}),
},
}),
Some(http::StatusCode::OK),
&classify::Class::Grpc(Err(tonic::Code::Unknown)),
)
.expect("response_total not found");
assert_eq!(response_total, 1.0);

drop((client, bg));
}

#[tracing::instrument]
fn hello_server(
http: hyper::server::conn::Http,
Expand All @@ -490,6 +570,42 @@ fn hello_server(
}
}

#[tracing::instrument]
fn grpc_status_server(
http: hyper::server::conn::Http,
status: tonic::Code,
) -> impl Fn(Remote<ServerAddr>) -> io::Result<io::BoxedIo> {
move |endpoint| {
let span = tracing::info_span!("grpc_status_server", ?endpoint);
let _e = span.enter();
tracing::info!("mock connecting");
let (client_io, server_io) = support::io::duplex(4096);
tokio::spawn(
http.serve_connection(
server_io,
hyper::service::service_fn(move |request: Request<Body>| async move {
tracing::info!(?request);
let (mut tx, rx) = Body::channel();
tokio::spawn(async move {
let mut trls = ::http::HeaderMap::new();
trls.insert("grpc-status", (status as u32).to_string().parse().unwrap());
tx.send_trailers(trls).await
});
Ok::<_, io::Error>(
http::Response::builder()
.version(::http::Version::HTTP_2)
.header("content-type", "application/grpc")
.body(rx)
.unwrap(),
)
}),
)
.in_current_span(),
);
Ok(io::BoxedIo::new(client_io))
}
}

#[tracing::instrument]
fn connect_error() -> impl Fn(Remote<ServerAddr>) -> io::Result<io::BoxedIo> {
move |_| {
Expand Down Expand Up @@ -523,7 +639,7 @@ fn connect_timeout(
struct Target(http::Version, tls::ConditionalServerTls);

#[track_caller]
fn check_error_header(hdrs: &hyper::header::HeaderMap, expected: &str) {
fn check_error_header(hdrs: &::http::HeaderMap, expected: &str) {
let message = hdrs
.get(L5D_PROXY_ERROR)
.expect("response did not contain l5d-proxy-error header")
Expand Down
9 changes: 9 additions & 0 deletions linkerd/app/outbound/src/http/endpoint/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ async fn http11_forward() {
.with_stack(connect)
.push_http_tcp_client()
.push_http_endpoint::<_, http::BoxBody, _>()
.into_stack()
.push(classify::NewClassify::layer_default())
.into_inner();

let svc = stack.new_service(Endpoint {
Expand Down Expand Up @@ -61,6 +63,8 @@ async fn http2_forward() {
.with_stack(connect)
.push_http_tcp_client()
.push_http_endpoint::<_, http::BoxBody, _>()
.into_stack()
.push(classify::NewClassify::layer_default())
.into_inner();

let svc = stack.new_service(Endpoint {
Expand Down Expand Up @@ -97,6 +101,8 @@ async fn orig_proto_upgrade() {
.with_stack(connect)
.push_http_tcp_client()
.push_http_endpoint::<_, http::BoxBody, _>()
.into_stack()
.push(classify::NewClassify::layer_default())
.into_inner();

let svc = stack.new_service(Endpoint {
Expand Down Expand Up @@ -146,6 +152,7 @@ async fn orig_proto_skipped_on_http_upgrade() {
.push_http_tcp_client()
.push_http_endpoint::<_, http::BoxBody, _>()
.into_stack()
.push(classify::NewClassify::layer_default())
.push_on_service(http::BoxRequest::layer())
// We need the server-side upgrade layer to annotate the request so that the client
// knows that an HTTP upgrade is in progress.
Expand Down Expand Up @@ -192,6 +199,8 @@ async fn orig_proto_http2_noop() {
.with_stack(connect)
.push_http_tcp_client()
.push_http_endpoint::<_, http::BoxBody, _>()
.into_stack()
.push(classify::NewClassify::layer_default())
.into_inner();

let svc = stack.new_service(Endpoint {
Expand Down
4 changes: 4 additions & 0 deletions linkerd/app/outbound/src/http/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ where
clone.extensions_mut().insert(client_handle);
}

if let Some(classify) = req.extensions().get::<classify::Response>().cloned() {
clone.extensions_mut().insert(classify);
}

Some(clone)
}
}
Expand Down
3 changes: 3 additions & 0 deletions linkerd/http-metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ license = "Apache-2.0"
edition = "2021"
publish = false

[features]
test-util = []

[dependencies]
bytes = "1"
futures = { version = "0.3", default-features = false }
Expand Down
16 changes: 16 additions & 0 deletions linkerd/http-metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,22 @@ where
include_latencies: bool,
}

#[cfg(feature = "test-util")]
impl<T: Hash + Eq, C: Hash + Eq> Report<T, requests::Metrics<C>> {
pub fn get_response_total(
&self,
labels: &T,
status: Option<http::StatusCode>,
class: &C,
) -> Option<f64> {
let registry = self.registry.lock();
let requests = registry.get(labels)?.lock();
let status = requests.by_status().get(&status)?;
let class = status.by_class().get(class)?;
Some(class.total())
}
}

impl<T: Hash + Eq, M> Clone for Report<T, M> {
fn clone(&self) -> Self {
Self {
Expand Down
27 changes: 26 additions & 1 deletion linkerd/http-metrics/src/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ where
}

#[derive(Debug)]
struct StatusMetrics<C>
pub struct StatusMetrics<C>
where
C: Hash + Eq,
{
Expand Down Expand Up @@ -87,6 +87,17 @@ impl<C: Hash + Eq> Default for Metrics<C> {
}
}

#[cfg(feature = "test-util")]
impl<C: Hash + Eq> Metrics<C> {
pub fn total(&self) -> &Counter {
&self.total
}

pub fn by_status(&self) -> &HashMap<Option<http::StatusCode>, StatusMetrics<C>> {
&self.by_status
}
}

impl<C: Hash + Eq> LastUpdate for Metrics<C> {
fn last_update(&self) -> Instant {
self.last_update
Expand All @@ -105,6 +116,20 @@ where
}
}

#[cfg(feature = "test-util")]
impl<C: Hash + Eq> StatusMetrics<C> {
pub fn by_class(&self) -> &HashMap<C, ClassMetrics> {
&self.by_class
}
}

#[cfg(feature = "test-util")]
impl ClassMetrics {
pub fn total(&self) -> f64 {
self.total.value()
}
}

#[cfg(test)]
mod tests {
#[test]
Expand Down
Loading

0 comments on commit 185421d

Please sign in to comment.