Skip to content

Commit

Permalink
add linkerd crd test
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkw committed Jul 18, 2023
1 parent 41e8b2a commit d8c7cff
Showing 1 changed file with 178 additions and 1 deletion.
179 changes: 178 additions & 1 deletion policy-test/tests/outbound_api_linkerd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use kube::ResourceExt;
use linkerd_policy_controller_k8s_api as k8s;
use linkerd_policy_test::{
assert_default_accrual_backoff, create, create_annotated_service, create_cluster_scoped,
create_opaque_service, create_service, delete_cluster_scoped, grpc, mk_service, with_temp_ns,
create_opaque_service, create_service, delete_cluster_scoped, grpc, mk_service, replace,
with_temp_ns,
};
use maplit::{btreemap, convert_args};
use tokio::time;
Expand Down Expand Up @@ -842,6 +843,122 @@ async fn backend_with_filters() {
.await;
}

#[tokio::test(flavor = "current_thread")]
async fn route_with_retry_filter() {
with_temp_ns(|client, ns| async move {
// Create a service
let svc = create_service(&client, &ns, "my-svc", 4191).await;

let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
let config = rx
.next()
.await
.expect("watch must not fail")
.expect("watch must return an initial config");
tracing::trace!(?config);

// There should be a default route.
detect_http_routes(&config, |routes| {
let route = assert_singleton(routes);
assert_route_is_default(route, &svc, 4191);
});

let (extension_ref, filter) = mk_retry_filter(&ns, "my-great-retry-filter", 5, None);
let filter = create(&client, filter).await;

let backend_name = "backend";
let backends = [backend_name];
let route = mk_http_route(&ns, "foo-route", &svc, 4191)
.with_backends(Some(&backends), None, None)
.with_filters(Some(vec![
k8s::policy::httproute::HttpRouteFilter::ExtensionRef { extension_ref },
]));
let _route = create(&client, route.build()).await;

let config = rx
.next()
.await
.expect("watch must not fail")
.expect("watch must return an updated config");
tracing::trace!(?config);

// Route must have a retry policy.
detect_http_routes(&config, |routes| {
let route = assert_singleton(routes);
let rule = assert_singleton(&route.rules);
let filters = &rule.filters;
assert_eq!(*filters, vec![]);
assert_eq!(
rule.retry_policy,
Some(grpc::outbound::http_route::RetryPolicy {
retry_statuses: vec![grpc::destination::HttpStatusRange { min: 500, max: 599 }],
max_per_request: 5
})
)
});

// Config must have a retry budget.
detect_retry_budget(&config, |budget| {
assert_eq!(
budget,
Some(&grpc::destination::RetryBudget {
retry_ratio: 0.2,
min_retries_per_second: 10,
ttl: Some(Duration::from_secs(10).try_into().unwrap()),
})
)
});

// Update the filter
let (_, new_filter) = mk_retry_filter(
&ns,
"my-great-retry-filter",
10,
vec!["500-503".to_string(), "509-599".to_string()],
);
let _filter = replace(&client, filter, new_filter).await;
tracing::info!("filter updated");

let config = rx
.next()
.await
.expect("watch must not fail")
.expect("watch must return an updated config");
tracing::trace!(?config);

// Route retry policy must have updated
detect_http_routes(&config, |routes| {
let route = assert_singleton(routes);
let rule = assert_singleton(&route.rules);
let filters = &rule.filters;
assert_eq!(*filters, vec![]);
assert_eq!(
rule.retry_policy,
Some(grpc::outbound::http_route::RetryPolicy {
retry_statuses: vec![
grpc::destination::HttpStatusRange { min: 500, max: 503 },
grpc::destination::HttpStatusRange { min: 509, max: 599 },
],
max_per_request: 10
})
)
});

// Config must have a retry budget.
detect_retry_budget(&config, |budget| {
assert_eq!(
budget,
Some(&grpc::destination::RetryBudget {
retry_ratio: 0.2,
min_retries_per_second: 10,
ttl: Some(Duration::from_secs(10).try_into().unwrap()),
})
)
});
})
.await;
}

/* Helpers */

struct HttpRouteBuilder(k8s::policy::HttpRoute);
Expand Down Expand Up @@ -909,6 +1026,34 @@ fn mk_http_route(ns: &str, name: &str, svc: &k8s::Service, port: u16) -> HttpRou
})
}

fn mk_retry_filter(
ns: &str,
name: &str,
max_retries: impl Into<Option<u32>>,
statuses: impl Into<Option<Vec<String>>>,
) -> (
k8s_gateway_api::LocalObjectReference,
k8s::policy::HttpRetryFilter,
) {
let extension_ref = k8s_gateway_api::LocalObjectReference {
group: "policy.linkerd.io".to_string(),
kind: "HttpRetryFilter".to_string(),
name: name.to_string(),
};
let filter = k8s::policy::HttpRetryFilter {
metadata: kube::api::ObjectMeta {
namespace: Some(ns.to_string()),
name: Some(name.to_string()),
..Default::default()
},
spec: k8s::policy::http_retry_filter::HttpRetryFilterSpec {
max_retries_per_request: max_retries.into(),
retry_statuses: statuses.into(),
},
};
(extension_ref, filter)
}

impl HttpRouteBuilder {
fn with_backends(
self,
Expand Down Expand Up @@ -1023,6 +1168,38 @@ where
}
}

#[track_caller]
fn detect_retry_budget<F>(config: &grpc::outbound::OutboundPolicy, f: F)
where
F: Fn(Option<&grpc::destination::RetryBudget>),
{
let kind = config
.protocol
.as_ref()
.expect("must have proxy protocol")
.kind
.as_ref()
.expect("must have kind");
if let grpc::outbound::proxy_protocol::Kind::Detect(grpc::outbound::proxy_protocol::Detect {
opaque: _,
timeout: _,
http1,
http2,
}) = kind
{
let http1 = http1
.as_ref()
.expect("proxy protocol must have http1 field");
let http2 = http2
.as_ref()
.expect("proxy protocol must have http2 field");
f(http1.retry_budget.as_ref());
f(http2.retry_budget.as_ref());
} else {
panic!("proxy protocol must be Detect; actually got:\n{kind:#?}")
}
}

#[track_caller]
fn detect_failure_accrual<F>(config: &grpc::outbound::OutboundPolicy, f: F)
where
Expand Down

0 comments on commit d8c7cff

Please sign in to comment.