diff --git a/policy-test/tests/outbound_api_linkerd.rs b/policy-test/tests/outbound_api_linkerd.rs index fa33017523207..3cf461288fb73 100644 --- a/policy-test/tests/outbound_api_linkerd.rs +++ b/policy-test/tests/outbound_api_linkerd.rs @@ -5,7 +5,8 @@ use kube::ResourceExt; use linkerd_policy_controller_k8s_api as k8s; use linkerd_policy_test::{ assert_default_accrual_backoff, create, create_annotated_service, create_cluster_scoped, - create_opaque_service, create_service, delete_cluster_scoped, grpc, mk_service, with_temp_ns, + create_opaque_service, create_service, delete_cluster_scoped, grpc, mk_service, replace, + with_temp_ns, }; use maplit::{btreemap, convert_args}; use tokio::time; @@ -842,6 +843,122 @@ async fn backend_with_filters() { .await; } +#[tokio::test(flavor = "current_thread")] +async fn route_with_retry_filter() { + with_temp_ns(|client, ns| async move { + // Create a service + let svc = create_service(&client, &ns, "my-svc", 4191).await; + + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let config = rx + .next() + .await + .expect("watch must not fail") + .expect("watch must return an initial config"); + tracing::trace!(?config); + + // There should be a default route. + detect_http_routes(&config, |routes| { + let route = assert_singleton(routes); + assert_route_is_default(route, &svc, 4191); + }); + + let (extension_ref, filter) = mk_retry_filter(&ns, "my-great-retry-filter", 5, None); + let filter = create(&client, filter).await; + + let backend_name = "backend"; + let backends = [backend_name]; + let route = mk_http_route(&ns, "foo-route", &svc, 4191) + .with_backends(Some(&backends), None, None) + .with_filters(Some(vec![ + k8s::policy::httproute::HttpRouteFilter::ExtensionRef { extension_ref }, + ])); + let _route = create(&client, route.build()).await; + + let config = rx + .next() + .await + .expect("watch must not fail") + .expect("watch must return an updated config"); + tracing::trace!(?config); + + // Route must have a retry policy. + detect_http_routes(&config, |routes| { + let route = assert_singleton(routes); + let rule = assert_singleton(&route.rules); + let filters = &rule.filters; + assert_eq!(*filters, vec![]); + assert_eq!( + rule.retry_policy, + Some(grpc::outbound::http_route::RetryPolicy { + retry_statuses: vec![grpc::destination::HttpStatusRange { min: 500, max: 599 }], + max_per_request: 5 + }) + ) + }); + + // Config must have a retry budget. + detect_retry_budget(&config, |budget| { + assert_eq!( + budget, + Some(&grpc::destination::RetryBudget { + retry_ratio: 0.2, + min_retries_per_second: 10, + ttl: Some(Duration::from_secs(10).try_into().unwrap()), + }) + ) + }); + + // Update the filter + let (_, new_filter) = mk_retry_filter( + &ns, + "my-great-retry-filter", + 10, + vec!["500-503".to_string(), "509-599".to_string()], + ); + let _filter = replace(&client, filter, new_filter).await; + tracing::info!("filter updated"); + + let config = rx + .next() + .await + .expect("watch must not fail") + .expect("watch must return an updated config"); + tracing::trace!(?config); + + // Route retry policy must have updated + detect_http_routes(&config, |routes| { + let route = assert_singleton(routes); + let rule = assert_singleton(&route.rules); + let filters = &rule.filters; + assert_eq!(*filters, vec![]); + assert_eq!( + rule.retry_policy, + Some(grpc::outbound::http_route::RetryPolicy { + retry_statuses: vec![ + grpc::destination::HttpStatusRange { min: 500, max: 503 }, + grpc::destination::HttpStatusRange { min: 509, max: 599 }, + ], + max_per_request: 10 + }) + ) + }); + + // Config must have a retry budget. + detect_retry_budget(&config, |budget| { + assert_eq!( + budget, + Some(&grpc::destination::RetryBudget { + retry_ratio: 0.2, + min_retries_per_second: 10, + ttl: Some(Duration::from_secs(10).try_into().unwrap()), + }) + ) + }); + }) + .await; +} + /* Helpers */ struct HttpRouteBuilder(k8s::policy::HttpRoute); @@ -909,6 +1026,34 @@ fn mk_http_route(ns: &str, name: &str, svc: &k8s::Service, port: u16) -> HttpRou }) } +fn mk_retry_filter( + ns: &str, + name: &str, + max_retries: impl Into>, + statuses: impl Into>>, +) -> ( + k8s_gateway_api::LocalObjectReference, + k8s::policy::HttpRetryFilter, +) { + let extension_ref = k8s_gateway_api::LocalObjectReference { + group: "policy.linkerd.io".to_string(), + kind: "HttpRetryFilter".to_string(), + name: name.to_string(), + }; + let filter = k8s::policy::HttpRetryFilter { + metadata: kube::api::ObjectMeta { + namespace: Some(ns.to_string()), + name: Some(name.to_string()), + ..Default::default() + }, + spec: k8s::policy::http_retry_filter::HttpRetryFilterSpec { + max_retries_per_request: max_retries.into(), + retry_statuses: statuses.into(), + }, + }; + (extension_ref, filter) +} + impl HttpRouteBuilder { fn with_backends( self, @@ -1023,6 +1168,38 @@ where } } +#[track_caller] +fn detect_retry_budget(config: &grpc::outbound::OutboundPolicy, f: F) +where + F: Fn(Option<&grpc::destination::RetryBudget>), +{ + let kind = config + .protocol + .as_ref() + .expect("must have proxy protocol") + .kind + .as_ref() + .expect("must have kind"); + if let grpc::outbound::proxy_protocol::Kind::Detect(grpc::outbound::proxy_protocol::Detect { + opaque: _, + timeout: _, + http1, + http2, + }) = kind + { + let http1 = http1 + .as_ref() + .expect("proxy protocol must have http1 field"); + let http2 = http2 + .as_ref() + .expect("proxy protocol must have http2 field"); + f(http1.retry_budget.as_ref()); + f(http2.retry_budget.as_ref()); + } else { + panic!("proxy protocol must be Detect; actually got:\n{kind:#?}") + } +} + #[track_caller] fn detect_failure_accrual(config: &grpc::outbound::OutboundPolicy, f: F) where