diff --git a/Cargo.lock b/Cargo.lock index aa4ad7ab4f..f582c83cfc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1010,6 +1010,7 @@ dependencies = [ "linkerd-app-core", "linkerd-app-test", "linkerd-http-access-log", + "linkerd-http-metrics", "linkerd-idle-cache", "linkerd-io", "linkerd-meshtls", diff --git a/linkerd/app/admin/src/stack.rs b/linkerd/app/admin/src/stack.rs index bb7fd6282d..f703155e63 100644 --- a/linkerd/app/admin/src/stack.rs +++ b/linkerd/app/admin/src/stack.rs @@ -99,6 +99,7 @@ impl Config { let admin = crate::server::Admin::new(report, ready, shutdown, trace); let admin = svc::stack(move |_| admin.clone()) .push(metrics.proxy.http_endpoint.to_layer::()) + .push(classify::NewClassify::layer_default()) .push_map_target(|(permit, http)| Permitted { permit, http }) .push(inbound::policy::NewHttpPolicy::layer(metrics.http_authz.clone())) .push(Rescue::layer()) diff --git a/linkerd/app/core/src/control.rs b/linkerd/app/core/src/control.rs index 9591661071..45b472d221 100644 --- a/linkerd/app/core/src/control.rs +++ b/linkerd/app/core/src/control.rs @@ -124,6 +124,7 @@ impl Config { .lift_new() .push(self::balance::layer(dns, resolve_backoff)) .push(metrics.to_layer::()) + .push(classify::NewClassify::layer_default()) // This buffer allows a resolver client to be shared across stacks. // No load shed is applied here, however, so backpressure may leak // into the caller task. diff --git a/linkerd/app/inbound/Cargo.toml b/linkerd/app/inbound/Cargo.toml index 8f74e9635f..c76c1b229d 100644 --- a/linkerd/app/inbound/Cargo.toml +++ b/linkerd/app/inbound/Cargo.toml @@ -52,6 +52,7 @@ libfuzzer-sys = { version = "0.4", features = ["arbitrary-derive"] } [dev-dependencies] hyper = { version = "0.14", features = ["http1", "http2"] } linkerd-app-test = { path = "../test" } +linkerd-http-metrics = { path = "../../http-metrics", features = ["test-util"] } linkerd-idle-cache = { path = "../../idle-cache", features = ["test-util"] } linkerd-io = { path = "../../io", features = ["tokio-test"] } linkerd-meshtls = { path = "../../meshtls", features = ["rustls"] } diff --git a/linkerd/app/inbound/src/http/router.rs b/linkerd/app/inbound/src/http/router.rs index 9e895e04f9..a234b86bac 100644 --- a/linkerd/app/inbound/src/http/router.rs +++ b/linkerd/app/inbound/src/http/router.rs @@ -148,7 +148,8 @@ impl Inbound { // Attempts to discover a service profile for each logical target (as // informed by the request's headers). The stack is cached until a // request has not been received for `cache_max_idle_age`. - let router = http.clone() + let router = http + .clone() .check_new_service::>() .push_map_target(|p: Profile| p.logical) .push(profiles::http::NewProxyRouter::layer( @@ -164,6 +165,7 @@ impl Inbound { .to_layer::(), ) .push_on_service(http::BoxResponse::layer()) + // Configure a per-route response classifier based on the profile. .push(classify::NewClassify::layer()) .push_http_insert_target::() .push_map_target(|(route, profile)| ProfileRoute { route, profile }) @@ -186,10 +188,7 @@ impl Inbound { } Ok(svc::Either::B(logical)) }, - http.clone() - .push_on_service(svc::MapErr::layer(Error::from)) - .check_new_service::>() - .into_inner(), + http.clone().into_inner(), ) .check_new_service::<(Option, Logical), http::Request<_>>(); @@ -229,8 +228,7 @@ impl Inbound { // Skip the profile stack if it takes too long to become ready. .push_when_unready(config.profile_skip_timeout, http.into_inner()) .push_on_service( - svc::layers() - .push(rt.metrics.proxy.stack.layer(stack_labels("http", "logical"))) + rt.metrics.proxy.stack.layer(stack_labels("http", "logical")), ) .push(svc::NewQueue::layer_via(config.http_request_queue)) .push_new_idle_cached(config.discovery_idle_timeout) @@ -239,6 +237,9 @@ impl Inbound { .push(http::Retain::layer()) .push(http::BoxResponse::layer()), ) + // Configure default response classification early. It may be + // overridden by profile routes above. + .push(classify::NewClassify::layer_default()) .check_new_service::>() .instrument(|t: &Logical| { let name = t.logical.as_ref().map(tracing::field::display); @@ -414,12 +415,6 @@ impl Param for Logical { } } -impl Param for Logical { - fn param(&self) -> classify::Request { - classify::Request::default() - } -} - impl tap::Inspect for Logical { fn src_addr(&self, req: &http::Request) -> Option { req.extensions().get::>().map(|a| **a) diff --git a/linkerd/app/inbound/src/http/tests.rs b/linkerd/app/inbound/src/http/tests.rs index cf55c435bc..d21f005af5 100644 --- a/linkerd/app/inbound/src/http/tests.rs +++ b/linkerd/app/inbound/src/http/tests.rs @@ -6,10 +6,11 @@ use crate::{ }, Config, Inbound, }; -use hyper::{client::conn::Builder as ClientBuilder, Body, Request, Response}; +use hyper::{body::HttpBody, client::conn::Builder as ClientBuilder, Body, Request, Response}; use linkerd_app_core::{ + classify, errors::respond::L5D_PROXY_ERROR, - identity, io, + identity, io, metrics, proxy::http, svc::{self, NewService, Param}, tls, @@ -19,6 +20,7 @@ use linkerd_app_core::{ use linkerd_app_test::connect::ConnectFuture; use linkerd_tracing::test::trace_init; use std::{net::SocketAddr, sync::Arc}; +use tokio::time; use tracing::Instrument; fn build_server( @@ -469,6 +471,84 @@ async fn grpc_unmeshed_response_error_header() { let _ = bg.await; } +#[tokio::test(flavor = "current_thread")] +async fn grpc_response_class() { + let _trace = trace_init(); + + // Build a mock connector serves a gRPC server that returns errors. + let connect = { + let mut server = hyper::server::conn::Http::new(); + server.http2_only(true); + support::connect().endpoint_fn_boxed( + Target::addr(), + grpc_status_server(server, tonic::Code::Unknown), + ) + }; + + // Build a client using the connect that always errors. + let mut client = ClientBuilder::new(); + client.http2_only(true); + let profiles = profile::resolver(); + let profile_tx = + profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap()); + profile_tx.send(profile::Profile::default()).unwrap(); + let cfg = default_config(); + let (rt, _shutdown) = runtime(); + let metrics = rt + .metrics + .clone() + .http_endpoint + .into_report(time::Duration::from_secs(3600)); + let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_h2()); + let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await; + + // Send a request and assert that it is OK with the expected header + // message. + let req = Request::builder() + .method(http::Method::POST) + .uri("http://foo.svc.cluster.local:5550") + .header(http::header::CONTENT_TYPE, "application/grpc") + .body(Body::default()) + .unwrap(); + + let mut response = http_util::http_request(&mut client, req).await.unwrap(); + assert_eq!(response.status(), http::StatusCode::OK); + + response.body_mut().data().await; + let trls = response.body_mut().trailers().await.unwrap().unwrap(); + assert_eq!(trls.get("grpc-status").unwrap().to_str().unwrap(), "2"); + + let response_total = metrics + .get_response_total( + &metrics::EndpointLabels::Inbound(metrics::InboundEndpointLabels { + tls: Target::meshed_h2().1, + authority: Some("foo.svc.cluster.local:5550".parse().unwrap()), + target_addr: "127.0.0.1:80".parse().unwrap(), + policy: metrics::RouteAuthzLabels { + route: metrics::RouteLabels { + server: metrics::ServerLabel(Arc::new(policy::Meta::Resource { + group: "policy.linkerd.io".into(), + kind: "server".into(), + name: "testsrv".into(), + })), + route: policy::Meta::new_default("default"), + }, + authz: Arc::new(policy::Meta::Resource { + group: "policy.linkerd.io".into(), + kind: "serverauthorization".into(), + name: "testsaz".into(), + }), + }, + }), + Some(http::StatusCode::OK), + &classify::Class::Grpc(Err(tonic::Code::Unknown)), + ) + .expect("response_total not found"); + assert_eq!(response_total, 1.0); + + drop((client, bg)); +} + #[tracing::instrument] fn hello_server( http: hyper::server::conn::Http, @@ -490,6 +570,42 @@ fn hello_server( } } +#[tracing::instrument] +fn grpc_status_server( + http: hyper::server::conn::Http, + status: tonic::Code, +) -> impl Fn(Remote) -> io::Result { + move |endpoint| { + let span = tracing::info_span!("grpc_status_server", ?endpoint); + let _e = span.enter(); + tracing::info!("mock connecting"); + let (client_io, server_io) = support::io::duplex(4096); + tokio::spawn( + http.serve_connection( + server_io, + hyper::service::service_fn(move |request: Request| async move { + tracing::info!(?request); + let (mut tx, rx) = Body::channel(); + tokio::spawn(async move { + let mut trls = ::http::HeaderMap::new(); + trls.insert("grpc-status", (status as u32).to_string().parse().unwrap()); + tx.send_trailers(trls).await + }); + Ok::<_, io::Error>( + http::Response::builder() + .version(::http::Version::HTTP_2) + .header("content-type", "application/grpc") + .body(rx) + .unwrap(), + ) + }), + ) + .in_current_span(), + ); + Ok(io::BoxedIo::new(client_io)) + } +} + #[tracing::instrument] fn connect_error() -> impl Fn(Remote) -> io::Result { move |_| { @@ -523,7 +639,7 @@ fn connect_timeout( struct Target(http::Version, tls::ConditionalServerTls); #[track_caller] -fn check_error_header(hdrs: &hyper::header::HeaderMap, expected: &str) { +fn check_error_header(hdrs: &::http::HeaderMap, expected: &str) { let message = hdrs .get(L5D_PROXY_ERROR) .expect("response did not contain l5d-proxy-error header") diff --git a/linkerd/app/outbound/src/http/endpoint/tests.rs b/linkerd/app/outbound/src/http/endpoint/tests.rs index 461ea2e913..863e4a1147 100644 --- a/linkerd/app/outbound/src/http/endpoint/tests.rs +++ b/linkerd/app/outbound/src/http/endpoint/tests.rs @@ -27,6 +27,8 @@ async fn http11_forward() { .with_stack(connect) .push_http_tcp_client() .push_http_endpoint::<_, http::BoxBody, _>() + .into_stack() + .push(classify::NewClassify::layer_default()) .into_inner(); let svc = stack.new_service(Endpoint { @@ -61,6 +63,8 @@ async fn http2_forward() { .with_stack(connect) .push_http_tcp_client() .push_http_endpoint::<_, http::BoxBody, _>() + .into_stack() + .push(classify::NewClassify::layer_default()) .into_inner(); let svc = stack.new_service(Endpoint { @@ -97,6 +101,8 @@ async fn orig_proto_upgrade() { .with_stack(connect) .push_http_tcp_client() .push_http_endpoint::<_, http::BoxBody, _>() + .into_stack() + .push(classify::NewClassify::layer_default()) .into_inner(); let svc = stack.new_service(Endpoint { @@ -146,6 +152,7 @@ async fn orig_proto_skipped_on_http_upgrade() { .push_http_tcp_client() .push_http_endpoint::<_, http::BoxBody, _>() .into_stack() + .push(classify::NewClassify::layer_default()) .push_on_service(http::BoxRequest::layer()) // We need the server-side upgrade layer to annotate the request so that the client // knows that an HTTP upgrade is in progress. @@ -192,6 +199,8 @@ async fn orig_proto_http2_noop() { .with_stack(connect) .push_http_tcp_client() .push_http_endpoint::<_, http::BoxBody, _>() + .into_stack() + .push(classify::NewClassify::layer_default()) .into_inner(); let svc = stack.new_service(Endpoint { diff --git a/linkerd/app/outbound/src/http/retry.rs b/linkerd/app/outbound/src/http/retry.rs index b61ddf3722..625ff19648 100644 --- a/linkerd/app/outbound/src/http/retry.rs +++ b/linkerd/app/outbound/src/http/retry.rs @@ -131,6 +131,10 @@ where clone.extensions_mut().insert(client_handle); } + if let Some(classify) = req.extensions().get::().cloned() { + clone.extensions_mut().insert(classify); + } + Some(clone) } } diff --git a/linkerd/http-metrics/Cargo.toml b/linkerd/http-metrics/Cargo.toml index 61c7eeda72..3c22bc329e 100644 --- a/linkerd/http-metrics/Cargo.toml +++ b/linkerd/http-metrics/Cargo.toml @@ -6,6 +6,9 @@ license = "Apache-2.0" edition = "2021" publish = false +[features] +test-util = [] + [dependencies] bytes = "1" futures = { version = "0.3", default-features = false } diff --git a/linkerd/http-metrics/src/lib.rs b/linkerd/http-metrics/src/lib.rs index c28376a0b8..c33922b845 100644 --- a/linkerd/http-metrics/src/lib.rs +++ b/linkerd/http-metrics/src/lib.rs @@ -25,6 +25,22 @@ where include_latencies: bool, } +#[cfg(feature = "test-util")] +impl Report> { + pub fn get_response_total( + &self, + labels: &T, + status: Option, + class: &C, + ) -> Option { + let registry = self.registry.lock(); + let requests = registry.get(labels)?.lock(); + let status = requests.by_status().get(&status)?; + let class = status.by_class().get(class)?; + Some(class.total()) + } +} + impl Clone for Report { fn clone(&self) -> Self { Self { diff --git a/linkerd/http-metrics/src/requests.rs b/linkerd/http-metrics/src/requests.rs index ca563e6cfa..7a0df94b87 100644 --- a/linkerd/http-metrics/src/requests.rs +++ b/linkerd/http-metrics/src/requests.rs @@ -28,7 +28,7 @@ where } #[derive(Debug)] -struct StatusMetrics +pub struct StatusMetrics where C: Hash + Eq, { @@ -87,6 +87,17 @@ impl Default for Metrics { } } +#[cfg(feature = "test-util")] +impl Metrics { + pub fn total(&self) -> &Counter { + &self.total + } + + pub fn by_status(&self) -> &HashMap, StatusMetrics> { + &self.by_status + } +} + impl LastUpdate for Metrics { fn last_update(&self) -> Instant { self.last_update @@ -105,6 +116,20 @@ where } } +#[cfg(feature = "test-util")] +impl StatusMetrics { + pub fn by_class(&self) -> &HashMap { + &self.by_class + } +} + +#[cfg(feature = "test-util")] +impl ClassMetrics { + pub fn total(&self) -> f64 { + self.total.value() + } +} + #[cfg(test)] mod tests { #[test] diff --git a/linkerd/http-metrics/src/requests/service.rs b/linkerd/http-metrics/src/requests/service.rs index 71a75c5860..279117f3b8 100644 --- a/linkerd/http-metrics/src/requests/service.rs +++ b/linkerd/http-metrics/src/requests/service.rs @@ -96,7 +96,7 @@ where impl Clone for HttpMetrics where S: Clone, - C: ClassifyResponse + Clone + Default + Send + Sync + 'static, + C: ClassifyResponse + Clone + Send + Sync + 'static, C::Class: Hash + Eq, { fn clone(&self) -> Self { @@ -108,6 +108,16 @@ where } } +#[inline] +fn classify_unwrap_if_debug_else_default(req: &http::Request) -> C +where + C: Clone + Default + Send + Sync + 'static, +{ + let c = req.extensions().get::().cloned(); + debug_assert!(c.is_some(), "request must have response classifier"); + c.unwrap_or_default() +} + impl Proxy, S> for HttpMetrics where P: Proxy>, S, Response = http::Response>, @@ -143,10 +153,8 @@ where http::Request::from_parts(head, body) }; - let classify = req.extensions().get::().cloned().unwrap_or_default(); - ResponseFuture { - classify: Some(classify), + classify: Some(classify_unwrap_if_debug_else_default(&req)), metrics: self.metrics.clone(), stream_open_at: Instant::now(), inner: self.inner.proxy(svc, req), @@ -167,6 +175,7 @@ where type Error = Error; type Future = ResponseFuture; + #[inline] fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx).map_err(Into::into) } @@ -192,10 +201,8 @@ where http::Request::from_parts(head, body) }; - let classify = req.extensions().get::().cloned().unwrap_or_default(); - ResponseFuture { - classify: Some(classify), + classify: Some(classify_unwrap_if_debug_else_default(&req)), metrics: self.metrics.clone(), stream_open_at: Instant::now(), inner: self.inner.call(req), diff --git a/linkerd/proxy/http/src/classify/insert.rs b/linkerd/proxy/http/src/classify/insert.rs index ba67096dea..958f507863 100644 --- a/linkerd/proxy/http/src/classify/insert.rs +++ b/linkerd/proxy/http/src/classify/insert.rs @@ -1,4 +1,4 @@ -use linkerd_stack::{layer, ExtractParam, NewService, Proxy, Service}; +use linkerd_stack::{layer, CloneParam, ExtractParam, NewService, Proxy, Service}; use std::{ marker::PhantomData, task::{Context, Poll}, @@ -33,6 +33,12 @@ impl NewInsertClassifyResponse { } } +impl NewInsertClassifyResponse, N> { + pub fn layer_default() -> impl layer::Layer + Clone { + Self::layer_via(CloneParam::from(C::default())) + } +} + impl NewService for NewInsertClassifyResponse where C: super::Classify, @@ -61,8 +67,9 @@ where fn proxy(&self, svc: &mut S, mut req: http::Request) -> Self::Future { let classify_rsp = self.classify.classify(&req); - let prior = req.extensions_mut().insert(classify_rsp); - debug_assert!(prior.is_none(), "classification extension already existed"); + if req.extensions_mut().insert(classify_rsp).is_some() { + tracing::debug!("Overrode response classifier"); + } self.inner.proxy(svc, req) } } @@ -83,8 +90,9 @@ where fn call(&mut self, mut req: http::Request) -> Self::Future { let classify_rsp = self.classify.classify(&req); - let prior = req.extensions_mut().insert(classify_rsp); - debug_assert!(prior.is_none(), "classification extension already existed"); + if req.extensions_mut().insert(classify_rsp).is_some() { + tracing::debug!("Overrode response classifier"); + } self.inner.call(req) } }