diff --git a/policy-controller/k8s/status/src/index.rs b/policy-controller/k8s/status/src/index.rs index 8b45d5c050553..c2e31baa1896a 100644 --- a/policy-controller/k8s/status/src/index.rs +++ b/policy-controller/k8s/status/src/index.rs @@ -1,15 +1,16 @@ use crate::{ - http_route::{self, BackendReference, ParentReference}, resource_id::{NamespaceGroupKindName, ResourceId}, + routes, service::Service, }; use ahash::{AHashMap as HashMap, AHashSet as HashSet}; -use chrono::offset::Utc; -use chrono::DateTime; -use k8s::{NamespaceResourceScope, Resource}; +use chrono::{offset::Utc, DateTime}; use kubert::lease::Claim; use linkerd_policy_controller_core::{http_route::GroupKindName, POLICY_CONTROLLER_NAME}; -use linkerd_policy_controller_k8s_api::{self as k8s, gateway, ResourceExt}; +use linkerd_policy_controller_k8s_api::{ + self as k8s_core_api, gateway as k8s_gateway_api, policy as linkerd_k8s_api, + NamespaceResourceScope, Resource, ResourceExt, +}; use parking_lot::RwLock; use prometheus_client::{ metrics::{counter::Counter, histogram::Histogram}, @@ -23,7 +24,7 @@ use tokio::{ }; pub(crate) const POLICY_API_GROUP: &str = "policy.linkerd.io"; -const POLICY_API_VERSION: &str = "policy.linkerd.io/v1alpha1"; +pub(crate) const GATEWAY_API_GROUP: &str = "gateway.networking.k8s.io"; mod conditions { pub const RESOLVED_REFS: &str = "ResolvedRefs"; @@ -46,12 +47,12 @@ pub type SharedIndex = Arc>; pub struct Controller { claims: Receiver>, - client: k8s::Client, + client: k8s_core_api::Client, name: String, updates: mpsc::Receiver, patch_timeout: Duration, - /// True if this policy controller is the leader — false otherwise. + /// True if this policy controller is the leader — false otherwise. leader: bool, metrics: ControllerMetrics, @@ -78,7 +79,7 @@ pub struct Index { /// Maps HttpRoute ids to a list of their parent and backend refs, /// regardless of if those parents have accepted the route. - http_route_refs: HashMap, + route_refs: HashMap, servers: HashSet, services: HashMap, @@ -91,16 +92,16 @@ pub struct IndexMetrics { } #[derive(Clone, PartialEq)] -struct HttpRoute { - parents: Vec, - backends: Vec, - statuses: Vec, +struct RouteRef { + parents: Vec, + backends: Vec, + statuses: Vec, } #[derive(Debug, PartialEq)] pub struct Update { pub id: NamespaceGroupKindName, - pub patch: k8s::Patch, + pub patch: k8s_core_api::Patch, } impl ControllerMetrics { @@ -186,7 +187,7 @@ impl IndexMetrics { impl Controller { pub fn new( claims: Receiver>, - client: k8s::Client, + client: k8s_core_api::Client, name: String, updates: mpsc::Receiver, patch_timeout: Duration, @@ -204,7 +205,7 @@ impl Controller { } /// Process updates received from the index; each update is a patch that - /// should be applied to update the status of an HTTPRoute. A patch should + /// should be applied to update the status of an xRoute. A patch should /// only be applied if we are the holder of the write lease. pub async fn run(mut self) { // Select between the write lease claim changing and receiving updates @@ -231,10 +232,12 @@ impl Controller { // process through the updates queue but not actually patch // any resources. if self.leader { - if id.gkn.group == k8s::policy::HttpRoute::group(&()) { - self.patch_status::(&id.gkn.name, &id.namespace, patch).await; - } else if id.gkn.group == k8s_gateway_api::HttpRoute::group(&()) { + if id.gkn.group == linkerd_k8s_api::HttpRoute::group(&()) && id.gkn.kind == linkerd_k8s_api::HttpRoute::kind(&()){ + self.patch_status::(&id.gkn.name, &id.namespace, patch).await; + } else if id.gkn.group == k8s_gateway_api::HttpRoute::group(&()) && id.gkn.kind == k8s_gateway_api::HttpRoute::kind(&()) { self.patch_status::(&id.gkn.name, &id.namespace, patch).await; + } else if id.gkn.group == k8s_gateway_api::GrpcRoute::group(&()) && id.gkn.kind == k8s_gateway_api::GrpcRoute::kind(&()) { + self.patch_status::(&id.gkn.name, &id.namespace, patch).await; } } else { self.metrics.patch_drops.inc(); @@ -248,15 +251,16 @@ impl Controller { &self, name: &str, namespace: &str, - patch: k8s::Patch, + patch: k8s_core_api::Patch, ) where K: Resource, ::DynamicType: Default, K: DeserializeOwned, { - let patch_params = k8s::PatchParams::apply(POLICY_API_GROUP); - let api = k8s::Api::::namespaced(self.client.clone(), namespace); + let patch_params = k8s_core_api::PatchParams::apply(K::group(&Default::default()).as_ref()); + let api = k8s_core_api::Api::::namespaced(self.client.clone(), namespace); let start = time::Instant::now(); + match time::timeout( self.patch_timeout, api.patch_status(name, &patch_params, &patch), @@ -295,7 +299,7 @@ impl Index { name: name.to_string(), claims, updates, - http_route_refs: HashMap::new(), + route_refs: HashMap::new(), servers: HashSet::new(), services: HashMap::new(), metrics, @@ -303,11 +307,11 @@ impl Index { } /// When the write lease holder changes or a time duration has elapsed, - /// the index reconciles the statuses for all HTTPRoutes on the cluster. + /// the index reconciles the statuses for all xRoutes on the cluster. /// /// This reconciliation loop ensures that if errors occur when the /// Controller applies patches or the write lease holder changes, all - /// HTTPRoutes have an up-to-date status. + /// xRoutes have an up-to-date status. pub async fn run(index: Arc>, reconciliation_period: Duration) { // Clone the claims watch out of the index. This will immediately // drop the read lock on the index so that it is not held for the @@ -324,7 +328,7 @@ impl Index { } // The claimant has changed, or we should attempt to reconcile all - // HTTPRoutes to account for any errors. In either case, we should + // xRoutes to account for any errors. In either case, we should // only proceed if we are the current leader. let claims = claims.borrow_and_update(); let index = index.read(); @@ -339,8 +343,8 @@ impl Index { // If the route is new or its contents have changed, return true, so that a // patch is generated; otherwise return false. - fn update_http_route(&mut self, id: NamespaceGroupKindName, route: &HttpRoute) -> bool { - match self.http_route_refs.entry(id) { + fn update_route(&mut self, id: NamespaceGroupKindName, route: &RouteRef) -> bool { + match self.route_refs.entry(id) { Entry::Vacant(entry) => { entry.insert(route.clone()); } @@ -356,19 +360,19 @@ impl Index { fn parent_status( &self, - parent_ref: &ParentReference, - backend_condition: k8s::Condition, - ) -> Option { + parent_ref: &routes::ParentReference, + backend_condition: k8s_core_api::Condition, + ) -> Option { match parent_ref { - ParentReference::Server(server) => { + routes::ParentReference::Server(server) => { let condition = if self.servers.contains(server) { accepted() } else { no_matching_parent() }; - Some(gateway::RouteParentStatus { - parent_ref: gateway::ParentReference { + Some(k8s_gateway_api::RouteParentStatus { + parent_ref: k8s_gateway_api::ParentReference { group: Some(POLICY_API_GROUP.to_string()), kind: Some("Server".to_string()), namespace: Some(server.namespace.clone()), @@ -380,7 +384,7 @@ impl Index { conditions: vec![condition], }) } - ParentReference::Service(service, port) => { + routes::ParentReference::Service(service, port) => { // service is a valid parent if it exists and it has a cluster_ip. let condition = if self .services @@ -392,8 +396,8 @@ impl Index { no_matching_parent() }; - Some(gateway::RouteParentStatus { - parent_ref: gateway::ParentReference { + Some(k8s_gateway_api::RouteParentStatus { + parent_ref: k8s_gateway_api::ParentReference { group: Some("core".to_string()), kind: Some("Service".to_string()), namespace: Some(service.namespace.clone()), @@ -405,16 +409,19 @@ impl Index { conditions: vec![condition, backend_condition], }) } - ParentReference::UnknownKind => None, + routes::ParentReference::UnknownKind => None, } } - fn backend_condition(&self, backend_refs: &[BackendReference]) -> k8s::Condition { + fn backend_condition( + &self, + backend_refs: &[routes::BackendReference], + ) -> k8s_core_api::Condition { // If even one backend has a reference to an unknown / unsupported // reference, return invalid backend condition if backend_refs .iter() - .any(|reference| matches!(reference, BackendReference::Unknown)) + .any(|reference| matches!(reference, routes::BackendReference::Unknown)) { return invalid_backend_kind(); } @@ -422,7 +429,7 @@ impl Index { // If all references have been resolved (i.e exist in our services cache), // return positive status, otherwise, one of them does not exist if backend_refs.iter().any(|backend_ref| match backend_ref { - BackendReference::Service(service) => self.services.contains_key(service), + routes::BackendReference::Service(service) => self.services.contains_key(service), _ => false, }) { resolved_refs() @@ -431,11 +438,11 @@ impl Index { } } - fn make_http_route_patch( + fn make_route_patch( &self, id: &NamespaceGroupKindName, - route: &HttpRoute, - ) -> Option> { + route: &RouteRef, + ) -> Option> { // To preserve any statuses from other controllers, we copy those // statuses. let unowned_statuses = route @@ -452,23 +459,49 @@ impl Index { .filter_map(|parent_ref| self.parent_status(parent_ref, backend_condition.clone())); let all_statuses = unowned_statuses.chain(parent_statuses).collect::>(); + if eq_time_insensitive(&all_statuses, &route.statuses) { return None; } - // Include both existing statuses from other controllers and the parent - // statuses we have computed. - let status = gateway::HttpRouteStatus { - inner: gateway::RouteStatus { - parents: all_statuses, - }, - }; - Some(make_patch(&id.gkn.name, status)) + // Include both existing statuses from other controllers + // and the parent statuses we have computed. + match (id.gkn.group.as_ref(), id.gkn.kind.as_ref()) { + (POLICY_API_GROUP, "HTTPRoute") => { + // linkerd_k8s_api::HttpRoute + let status = linkerd_k8s_api::httproute::HttpRouteStatus { + inner: linkerd_k8s_api::httproute::RouteStatus { + parents: all_statuses, + }, + }; + + Some(make_patch(id, status)) + } + (GATEWAY_API_GROUP, "HTTPRoute") => { + let status = k8s_gateway_api::HttpRouteStatus { + inner: k8s_gateway_api::RouteStatus { + parents: all_statuses, + }, + }; + + Some(make_patch(id, status)) + } + (GATEWAY_API_GROUP, "GRPCRoute") => { + let status = k8s_gateway_api::GrpcRouteStatus { + inner: k8s_gateway_api::RouteStatus { + parents: all_statuses, + }, + }; + + Some(make_patch(id, status)) + } + _ => None, + } } fn reconcile(&self) { - for (id, route) in self.http_route_refs.iter() { - if let Some(patch) = self.make_http_route_patch(id, route) { + for (id, route) in self.route_refs.iter() { + if let Some(patch) = self.make_route_patch(id, route) { match self.updates.try_send(Update { id: id.clone(), patch, @@ -478,7 +511,7 @@ impl Index { } Err(error) => { self.metrics.patch_channel_full.inc(); - tracing::error!(%id.namespace, route = ?id.gkn, %error, "Failed to send HTTPRoute patch"); + tracing::error!(%id.namespace, route = ?id.gkn, %error, "Failed to send {} patch", id.gkn.group.as_ref()); } } } @@ -486,8 +519,8 @@ impl Index { } } -impl kubert::index::IndexNamespacedResource for Index { - fn apply(&mut self, resource: k8s::policy::HttpRoute) { +impl kubert::index::IndexNamespacedResource for Index { + fn apply(&mut self, resource: linkerd_k8s_api::HttpRoute) { let namespace = resource .namespace() .expect("HTTPRoute must have a namespace"); @@ -495,17 +528,17 @@ impl kubert::index::IndexNamespacedResource for Index { let id = NamespaceGroupKindName { namespace: namespace.clone(), gkn: GroupKindName { - group: k8s::policy::HttpRoute::group(&()), - kind: k8s::policy::HttpRoute::kind(&()), + group: linkerd_k8s_api::HttpRoute::group(&()), + kind: linkerd_k8s_api::HttpRoute::kind(&()), name: name.into(), }, }; // Create the route parents - let parents = http_route::make_parents(&namespace, &resource.spec.inner); + let parents = routes::http::make_parents(&namespace, &resource.spec.inner); // Create the route backends - let backends = http_route::make_backends( + let backends = routes::http::make_backends( &namespace, resource .spec @@ -524,24 +557,24 @@ impl kubert::index::IndexNamespacedResource for Index { // Construct route and insert into the index; if the HTTPRoute is // already in the index and it hasn't changed, skip creating a patch. - let route = HttpRoute { + let route = RouteRef { parents, backends, statuses, }; - self.index_httproute(id, route); + self.index_route(id, route); } fn delete(&mut self, namespace: String, name: String) { let id = NamespaceGroupKindName { namespace, gkn: GroupKindName { - group: k8s::policy::HttpRoute::group(&()), - kind: k8s::policy::HttpRoute::kind(&()), + group: linkerd_k8s_api::HttpRoute::group(&()), + kind: linkerd_k8s_api::HttpRoute::kind(&()), name: name.into(), }, }; - self.http_route_refs.remove(&id); + self.route_refs.remove(&id); } // Since apply only reindexes a single HTTPRoute at a time, there's no need @@ -564,10 +597,10 @@ impl kubert::index::IndexNamespacedResource for Inde }; // Create the route parents - let parents = http_route::make_parents(&namespace, &resource.spec.inner); + let parents = routes::http::make_parents(&namespace, &resource.spec.inner); // Create the route backends - let backends = http_route::make_backends( + let backends = routes::http::make_backends( &namespace, resource .spec @@ -586,12 +619,12 @@ impl kubert::index::IndexNamespacedResource for Inde // Construct route and insert into the index; if the HTTPRoute is // already in the index and it hasn't changed, skip creating a patch. - let route = HttpRoute { + let route = RouteRef { parents, backends, statuses, }; - self.index_httproute(id, route); + self.index_route(id, route); } fn delete(&mut self, namespace: String, name: String) { @@ -603,15 +636,77 @@ impl kubert::index::IndexNamespacedResource for Inde name: name.into(), }, }; - self.http_route_refs.remove(&id); + self.route_refs.remove(&id); } // Since apply only reindexes a single HTTPRoute at a time, there's no need // to handle resets specially. } -impl kubert::index::IndexNamespacedResource for Index { - fn apply(&mut self, resource: k8s::policy::Server) { +impl kubert::index::IndexNamespacedResource for Index { + fn apply(&mut self, resource: k8s_gateway_api::GrpcRoute) { + let namespace = resource + .namespace() + .expect("GRPCRoute must have a namespace"); + let name = resource.name_unchecked(); + let id = NamespaceGroupKindName { + namespace: namespace.clone(), + gkn: GroupKindName { + name: name.into(), + kind: k8s_gateway_api::GrpcRoute::kind(&()), + group: k8s_gateway_api::GrpcRoute::group(&()), + }, + }; + + // Create the route parents + let parents = routes::http::make_parents(&namespace, &resource.spec.inner); + + // Create the route backends + let backends = routes::grpc::make_backends( + &namespace, + resource + .spec + .rules + .into_iter() + .flatten() + .flat_map(|rule| rule.backend_refs) + .flatten(), + ); + + let statuses = resource + .status + .into_iter() + .flat_map(|status| status.inner.parents) + .collect(); + + // Construct route and insert into the index; if the GRPCRoute is + // already in the index and it hasn't changed, skip creating a patch. + let route = RouteRef { + parents, + backends, + statuses, + }; + self.index_route(id, route); + } + + fn delete(&mut self, namespace: String, name: String) { + let id = NamespaceGroupKindName { + namespace, + gkn: GroupKindName { + name: name.into(), + kind: k8s_gateway_api::GrpcRoute::kind(&()), + group: k8s_gateway_api::GrpcRoute::group(&()), + }, + }; + self.route_refs.remove(&id); + } + + // Since apply only reindexes a single GRPCRoute at a time, there's no need + // to handle resets specially. +} + +impl kubert::index::IndexNamespacedResource for Index { + fn apply(&mut self, resource: linkerd_k8s_api::Server) { let namespace = resource.namespace().expect("Server must have a namespace"); let name = resource.name_unchecked(); let id = ResourceId::new(namespace, name); @@ -643,8 +738,8 @@ impl kubert::index::IndexNamespacedResource for Index { // to handle resets specially. } -impl kubert::index::IndexNamespacedResource for Index { - fn apply(&mut self, resource: k8s::Service) { +impl kubert::index::IndexNamespacedResource for Index { + fn apply(&mut self, resource: k8s_core_api::Service) { let namespace = resource.namespace().expect("Service must have a namespace"); let name = resource.name_unchecked(); let id = ResourceId::new(namespace, name); @@ -677,10 +772,10 @@ impl kubert::index::IndexNamespacedResource for Index { } impl Index { - fn index_httproute(&mut self, id: NamespaceGroupKindName, route: HttpRoute) { - // Insert into the index; if the HTTPRoute is already in the index and it hasn't + fn index_route(&mut self, id: NamespaceGroupKindName, route: RouteRef) { + // Insert into the index; if the route is already in the index and it hasn't // changed, skip creating a patch. - if !self.update_http_route(id.clone(), &route) { + if !self.update_route(id.clone(), &route) { return; } @@ -691,9 +786,9 @@ impl Index { return; } - // Create a patch for the HTTPRoute and send it to the Controller so + // Create a patch for the route and send it to the Controller so // that it is applied. - if let Some(patch) = self.make_http_route_patch(&id, &route) { + if let Some(patch) = self.make_route_patch(&id, &route) { match self.updates.try_send(Update { id: id.clone(), patch, @@ -703,24 +798,32 @@ impl Index { } Err(error) => { self.metrics.patch_channel_full.inc(); - tracing::error!(%id.namespace, route = ?id.gkn, %error, "Failed to send HTTPRoute patch"); + tracing::error!(%id.namespace, route = ?id.gkn, %error, "Failed to send {} patch", id.gkn.kind.as_ref()); } } } } } -pub(crate) fn make_patch( - name: &str, - status: gateway::HttpRouteStatus, -) -> k8s::Patch { - let value = serde_json::json!({ - "apiVersion": POLICY_API_VERSION, - "kind": "HTTPRoute", - "name": name, +pub(crate) fn make_patch( + route_id: &NamespaceGroupKindName, + status: RouteStatus, +) -> k8s_core_api::Patch +where + RouteStatus: serde::Serialize, +{ + let api_version = route_id + .api_version() + .expect("failed to create patch for route"); + + let patch = serde_json::json!({ + "apiVersion": api_version, + "kind": &route_id.gkn.kind, + "name": &route_id.gkn.name, "status": status, }); - k8s::Patch::Merge(value) + + k8s_core_api::Patch::Merge(patch) } fn now() -> DateTime { @@ -731,9 +834,9 @@ fn now() -> DateTime { now } -fn no_matching_parent() -> k8s::Condition { - k8s::Condition { - last_transition_time: k8s::Time(now()), +fn no_matching_parent() -> k8s_core_api::Condition { + k8s_core_api::Condition { + last_transition_time: k8s_core_api::Time(now()), message: "".to_string(), observed_generation: None, reason: reasons::NO_MATCHING_PARENT.to_string(), @@ -742,9 +845,9 @@ fn no_matching_parent() -> k8s::Condition { } } -fn accepted() -> k8s::Condition { - k8s::Condition { - last_transition_time: k8s::Time(now()), +fn accepted() -> k8s_core_api::Condition { + k8s_core_api::Condition { + last_transition_time: k8s_core_api::Time(now()), message: "".to_string(), observed_generation: None, reason: conditions::ACCEPTED.to_string(), @@ -753,9 +856,9 @@ fn accepted() -> k8s::Condition { } } -fn resolved_refs() -> k8s::Condition { - k8s::Condition { - last_transition_time: k8s::Time(now()), +fn resolved_refs() -> k8s_core_api::Condition { + k8s_core_api::Condition { + last_transition_time: k8s_core_api::Time(now()), message: "".to_string(), observed_generation: None, reason: reasons::RESOLVED_REFS.to_string(), @@ -764,9 +867,9 @@ fn resolved_refs() -> k8s::Condition { } } -fn backend_not_found() -> k8s::Condition { - k8s::Condition { - last_transition_time: k8s::Time(now()), +fn backend_not_found() -> k8s_core_api::Condition { + k8s_core_api::Condition { + last_transition_time: k8s_core_api::Time(now()), message: "".to_string(), observed_generation: None, reason: reasons::BACKEND_NOT_FOUND.to_string(), @@ -775,9 +878,9 @@ fn backend_not_found() -> k8s::Condition { } } -fn invalid_backend_kind() -> k8s::Condition { - k8s::Condition { - last_transition_time: k8s::Time(now()), +fn invalid_backend_kind() -> k8s_core_api::Condition { + k8s_core_api::Condition { + last_transition_time: k8s_core_api::Time(now()), message: "".to_string(), observed_generation: None, reason: reasons::INVALID_KIND.to_string(), @@ -787,8 +890,8 @@ fn invalid_backend_kind() -> k8s::Condition { } fn eq_time_insensitive( - left: &[gateway::RouteParentStatus], - right: &[gateway::RouteParentStatus], + left: &[k8s_gateway_api::RouteParentStatus], + right: &[k8s_gateway_api::RouteParentStatus], ) -> bool { if left.len() != right.len() { return false; diff --git a/policy-controller/k8s/status/src/lib.rs b/policy-controller/k8s/status/src/lib.rs index 67e81479b586b..5dd710c34509f 100644 --- a/policy-controller/k8s/status/src/lib.rs +++ b/policy-controller/k8s/status/src/lib.rs @@ -1,6 +1,6 @@ -mod http_route; mod index; mod resource_id; +mod routes; mod service; #[cfg(test)] diff --git a/policy-controller/k8s/status/src/resource_id.rs b/policy-controller/k8s/status/src/resource_id.rs index a3036323d1705..a5dca83a69344 100644 --- a/policy-controller/k8s/status/src/resource_id.rs +++ b/policy-controller/k8s/status/src/resource_id.rs @@ -1,4 +1,9 @@ +use crate::index::{GATEWAY_API_GROUP, POLICY_API_GROUP}; use linkerd_policy_controller_core::http_route::GroupKindName; +use linkerd_policy_controller_k8s_api::{ + gateway as k8s_gateway_api, policy as linkerd_k8s_api, Resource, +}; +use std::borrow::Cow; #[derive(Clone, Debug, Eq, Hash, PartialEq)] pub struct ResourceId { @@ -17,3 +22,17 @@ pub struct NamespaceGroupKindName { pub namespace: String, pub gkn: GroupKindName, } + +impl NamespaceGroupKindName { + pub fn api_version(&self) -> Result, String> { + match (self.gkn.group.as_ref(), self.gkn.kind.as_ref()) { + (POLICY_API_GROUP, "HTTPRoute") => Ok(linkerd_k8s_api::HttpRoute::api_version(&())), + (GATEWAY_API_GROUP, "HTTPRoute") => Ok(k8s_gateway_api::HttpRoute::api_version(&())), + (GATEWAY_API_GROUP, "GRPCRoute") => Ok(k8s_gateway_api::GrpcRoute::api_version(&())), + (group, kind) => Err(format!( + "unknown group + kind combination: ({}, {})", + group, kind + )), + } + } +} diff --git a/policy-controller/k8s/status/src/routes/grpc.rs b/policy-controller/k8s/status/src/routes/grpc.rs new file mode 100644 index 0000000000000..f43380963796e --- /dev/null +++ b/policy-controller/k8s/status/src/routes/grpc.rs @@ -0,0 +1,173 @@ +use super::BackendReference; +use linkerd_policy_controller_k8s_api::gateway as k8s_gateway_api; + +pub(crate) fn make_backends( + namespace: &str, + backends: impl Iterator, +) -> Vec { + backends + .map(|backend_ref| BackendReference::from_backend_ref(&backend_ref.inner, namespace)) + .collect() +} + +#[cfg(test)] +mod test { + use super::*; + use crate::index::POLICY_API_GROUP; + use linkerd_policy_controller_k8s_api::{self as k8s_core_api, gateway as k8s_gateway_api}; + + fn default_grpc_backends( + backend_refs: Vec, + ) -> Option> { + Some( + backend_refs + .into_iter() + .map(|backend_ref| k8s_gateway_api::GrpcRouteBackendRef { + inner: backend_ref, + weight: None, + filters: None, + }) + .collect(), + ) + } + + #[test] + fn backendrefs_from_route() { + let route = k8s_gateway_api::GrpcRoute { + metadata: k8s_core_api::ObjectMeta { + namespace: Some("foo".to_string()), + name: Some("foo".to_string()), + ..Default::default() + }, + spec: k8s_gateway_api::GrpcRouteSpec { + inner: k8s_gateway_api::CommonRouteSpec { parent_refs: None }, + hostnames: None, + rules: Some(vec![ + k8s_gateway_api::GrpcRouteRule { + matches: None, + filters: None, + backend_refs: default_grpc_backends(vec![ + k8s_gateway_api::BackendObjectReference { + group: None, + kind: None, + name: "ref-1".to_string(), + namespace: Some("default".to_string()), + port: None, + }, + k8s_gateway_api::BackendObjectReference { + group: None, + kind: None, + name: "ref-2".to_string(), + namespace: None, + port: None, + }, + ]), + }, + k8s_gateway_api::GrpcRouteRule { + matches: None, + filters: None, + backend_refs: default_grpc_backends(vec![ + k8s_gateway_api::BackendObjectReference { + group: Some("Core".to_string()), + kind: Some("Service".to_string()), + name: "ref-3".to_string(), + namespace: Some("default".to_string()), + port: None, + }, + ]), + }, + k8s_gateway_api::GrpcRouteRule { + matches: None, + filters: None, + backend_refs: None, + }, + ]), + }, + status: None, + }; + + let result = make_backends( + route + .metadata + .namespace + .as_deref() + .expect("GrpcRoute must have namespace"), + route + .spec + .rules + .into_iter() + .flatten() + .flat_map(|rule| rule.backend_refs) + .flatten(), + ); + assert_eq!( + 3, + result.len(), + "expected only three BackendReferences from route" + ); + result.into_iter().for_each(|backend_ref| { + assert!(matches!(backend_ref, BackendReference::Service(_))); + }) + } + + #[test] + fn backendrefs_from_multiple_types() { + let route = k8s_gateway_api::GrpcRoute { + metadata: k8s_core_api::ObjectMeta { + namespace: Some("default".to_string()), + name: Some("foo".to_string()), + ..Default::default() + }, + spec: k8s_gateway_api::GrpcRouteSpec { + inner: k8s_gateway_api::CommonRouteSpec { parent_refs: None }, + hostnames: None, + rules: Some(vec![k8s_gateway_api::GrpcRouteRule { + matches: None, + filters: None, + backend_refs: default_grpc_backends(vec![ + k8s_gateway_api::BackendObjectReference { + group: None, + kind: None, + name: "ref-1".to_string(), + namespace: None, + port: None, + }, + k8s_gateway_api::BackendObjectReference { + group: Some(POLICY_API_GROUP.to_string()), + kind: Some("Server".to_string()), + name: "ref-2".to_string(), + namespace: None, + port: None, + }, + ]), + }]), + }, + status: None, + }; + + let result = make_backends( + route + .metadata + .namespace + .as_deref() + .expect("GrpcRoute must have namespace"), + route + .spec + .rules + .into_iter() + .flatten() + .flat_map(|rule| rule.backend_refs) + .flatten(), + ); + assert_eq!( + 2, + result.len(), + "expected only two BackendReferences from route" + ); + let mut iter = result.into_iter(); + let known = iter.next().unwrap(); + assert!(matches!(known, BackendReference::Service(_))); + let unknown = iter.next().unwrap(); + assert!(matches!(unknown, BackendReference::Unknown)) + } +} diff --git a/policy-controller/k8s/status/src/http_route.rs b/policy-controller/k8s/status/src/routes/http.rs similarity index 58% rename from policy-controller/k8s/status/src/http_route.rs rename to policy-controller/k8s/status/src/routes/http.rs index 4bf28ba5f49ce..80bf95f94f32d 100644 --- a/policy-controller/k8s/status/src/http_route.rs +++ b/policy-controller/k8s/status/src/routes/http.rs @@ -1,32 +1,10 @@ -use crate::resource_id::ResourceId; -use gateway::{CommonRouteSpec, HttpBackendRef}; -use linkerd_policy_controller_k8s_api::{ - gateway, - policy::{self, Server}, - Service, -}; +use super::{BackendReference, ParentReference}; +use linkerd_policy_controller_k8s_api::gateway as k8s_gateway_api; -/// Represents an HTTPRoute's parent reference from its spec. -/// -/// This is separate from the policy controller index's `InboundParentRef` -/// because it does not validate that the parent reference is not in another -/// namespace. This is something that should be relaxed in the future in the -/// policy controller's index and we could then consider consolidating these -/// types into a single shared lib. -#[derive(Clone, Eq, PartialEq)] -pub enum ParentReference { - Server(ResourceId), - Service(ResourceId, Option), - UnknownKind, -} - -#[derive(Clone, Eq, PartialEq)] -pub enum BackendReference { - Service(ResourceId), - Unknown, -} - -pub(crate) fn make_parents(namespace: &str, route: &CommonRouteSpec) -> Vec { +pub(crate) fn make_parents( + namespace: &str, + route: &k8s_gateway_api::CommonRouteSpec, +) -> Vec { route .parent_refs .iter() @@ -37,7 +15,7 @@ pub(crate) fn make_parents(namespace: &str, route: &CommonRouteSpec) -> Vec, + backends: impl Iterator, ) -> Vec { backends .filter_map(|http_backend_ref| http_backend_ref.backend_ref) @@ -45,65 +23,20 @@ pub(crate) fn make_backends( .collect() } -impl ParentReference { - fn from_parent_ref(parent_ref: &gateway::ParentReference, default_namespace: &str) -> Self { - if policy::httproute::parent_ref_targets_kind::(parent_ref) { - // If the parent reference does not have a namespace, default to using - // the HTTPRoute's namespace. - let namespace = parent_ref.namespace.as_deref().unwrap_or(default_namespace); - ParentReference::Server(ResourceId::new( - namespace.to_string(), - parent_ref.name.clone(), - )) - } else if policy::httproute::parent_ref_targets_kind::(parent_ref) { - // If the parent reference does not have a namespace, default to using - // the HTTPRoute's namespace. - let namespace = parent_ref.namespace.as_deref().unwrap_or(default_namespace); - ParentReference::Service( - ResourceId::new(namespace.to_string(), parent_ref.name.clone()), - parent_ref.port, - ) - } else { - ParentReference::UnknownKind - } - } -} - -impl BackendReference { - fn from_backend_ref( - backend_ref: &gateway::BackendObjectReference, - default_namespace: &str, - ) -> Self { - if policy::httproute::backend_ref_targets_kind::( - backend_ref, - ) { - let namespace = backend_ref - .namespace - .as_deref() - .unwrap_or(default_namespace); - BackendReference::Service(ResourceId::new( - namespace.to_string(), - backend_ref.name.clone(), - )) - } else { - BackendReference::Unknown - } - } -} - #[cfg(test)] mod test { use super::*; - use linkerd_policy_controller_k8s_api::{policy, ObjectMeta}; + use crate::index::POLICY_API_GROUP; + use linkerd_policy_controller_k8s_api::{self as k8s_core_api, policy}; fn mk_default_http_backends( - backend_refs: Vec, - ) -> Option> { + backend_refs: Vec, + ) -> Option> { Some( backend_refs .into_iter() - .map(|backend_ref| gateway::HttpBackendRef { - backend_ref: Some(gateway::BackendRef { + .map(|backend_ref| k8s_gateway_api::HttpBackendRef { + backend_ref: Some(k8s_gateway_api::BackendRef { inner: backend_ref, weight: None, }), @@ -115,28 +48,28 @@ mod test { #[test] fn backendrefs_from_route() { - let http_route = policy::HttpRoute { - metadata: ObjectMeta { + let route = policy::HttpRoute { + metadata: k8s_core_api::ObjectMeta { namespace: Some("foo".to_string()), name: Some("foo".to_string()), ..Default::default() }, spec: policy::HttpRouteSpec { - inner: gateway::CommonRouteSpec { parent_refs: None }, + inner: k8s_gateway_api::CommonRouteSpec { parent_refs: None }, hostnames: None, rules: Some(vec![ policy::httproute::HttpRouteRule { matches: None, filters: None, backend_refs: mk_default_http_backends(vec![ - gateway::BackendObjectReference { + k8s_gateway_api::BackendObjectReference { group: None, kind: None, name: "ref-1".to_string(), namespace: Some("default".to_string()), port: None, }, - gateway::BackendObjectReference { + k8s_gateway_api::BackendObjectReference { group: None, kind: None, name: "ref-2".to_string(), @@ -150,7 +83,7 @@ mod test { matches: None, filters: None, backend_refs: mk_default_http_backends(vec![ - gateway::BackendObjectReference { + k8s_gateway_api::BackendObjectReference { group: Some("Core".to_string()), kind: Some("Service".to_string()), name: "ref-3".to_string(), @@ -172,12 +105,12 @@ mod test { }; let result = make_backends( - http_route + route .metadata .namespace .as_deref() .expect("HttpRoute must have namespace"), - http_route + route .spec .rules .into_iter() @@ -197,28 +130,28 @@ mod test { #[test] fn backendrefs_from_multiple_types() { - let http_route = policy::HttpRoute { - metadata: ObjectMeta { + let route = policy::HttpRoute { + metadata: k8s_core_api::ObjectMeta { namespace: Some("default".to_string()), name: Some("foo".to_string()), ..Default::default() }, spec: policy::HttpRouteSpec { - inner: gateway::CommonRouteSpec { parent_refs: None }, + inner: k8s_gateway_api::CommonRouteSpec { parent_refs: None }, hostnames: None, rules: Some(vec![policy::httproute::HttpRouteRule { matches: None, filters: None, backend_refs: mk_default_http_backends(vec![ - gateway::BackendObjectReference { + k8s_gateway_api::BackendObjectReference { group: None, kind: None, name: "ref-1".to_string(), namespace: None, port: None, }, - gateway::BackendObjectReference { - group: Some("policy.linkerd.io".to_string()), + k8s_gateway_api::BackendObjectReference { + group: Some(POLICY_API_GROUP.to_string()), kind: Some("Server".to_string()), name: "ref-2".to_string(), namespace: None, @@ -232,12 +165,12 @@ mod test { }; let result = make_backends( - http_route + route .metadata .namespace .as_deref() .expect("HttpRoute must have namespace"), - http_route + route .spec .rules .into_iter() diff --git a/policy-controller/k8s/status/src/routes/mod.rs b/policy-controller/k8s/status/src/routes/mod.rs new file mode 100644 index 0000000000000..e8f95a526c438 --- /dev/null +++ b/policy-controller/k8s/status/src/routes/mod.rs @@ -0,0 +1,80 @@ +use crate::resource_id::ResourceId; +use linkerd_policy_controller_k8s_api::{ + self as k8s_core_api, gateway as k8s_gateway_api, policy as linkerd_k8s_api, +}; + +pub(crate) mod grpc; +pub(crate) mod http; + +/// Represents an xRoute's parent reference from its spec. +/// +/// This is separate from the policy controller index's `InboundParentRef` +/// because it does not validate that the parent reference is not in another +/// namespace. This is something that should be relaxed in the future in the +/// policy controller's index and we could then consider consolidating these +/// types into a single shared lib. +#[derive(Clone, Eq, PartialEq)] +pub enum ParentReference { + Server(ResourceId), + Service(ResourceId, Option), + UnknownKind, +} + +#[derive(Clone, Eq, PartialEq)] +pub enum BackendReference { + Service(ResourceId), + Unknown, +} + +impl ParentReference { + fn from_parent_ref( + parent_ref: &k8s_gateway_api::ParentReference, + default_namespace: &str, + ) -> Self { + if linkerd_k8s_api::httproute::parent_ref_targets_kind::( + parent_ref, + ) { + // If the parent reference does not have a namespace, default to using + // the HTTPRoute's namespace. + let namespace = parent_ref.namespace.as_deref().unwrap_or(default_namespace); + Self::Server(ResourceId::new( + namespace.to_string(), + parent_ref.name.clone(), + )) + } else if linkerd_k8s_api::httproute::parent_ref_targets_kind::( + parent_ref, + ) { + // If the parent reference does not have a namespace, default to using + // the HTTPRoute's namespace. + let namespace = parent_ref.namespace.as_deref().unwrap_or(default_namespace); + Self::Service( + ResourceId::new(namespace.to_string(), parent_ref.name.clone()), + parent_ref.port, + ) + } else { + Self::UnknownKind + } + } +} + +impl BackendReference { + fn from_backend_ref( + backend_ref: &k8s_gateway_api::BackendObjectReference, + default_namespace: &str, + ) -> Self { + if linkerd_k8s_api::httproute::backend_ref_targets_kind::( + backend_ref, + ) { + let namespace = backend_ref + .namespace + .as_deref() + .unwrap_or(default_namespace); + Self::Service(ResourceId::new( + namespace.to_string(), + backend_ref.name.clone(), + )) + } else { + Self::Unknown + } + } +} diff --git a/policy-controller/k8s/status/src/service.rs b/policy-controller/k8s/status/src/service.rs index 742f0bd1c1fa9..dcd649feef68b 100644 --- a/policy-controller/k8s/status/src/service.rs +++ b/policy-controller/k8s/status/src/service.rs @@ -1,4 +1,4 @@ -use linkerd_policy_controller_k8s_api as k8s; +use linkerd_policy_controller_k8s_api as k8s_core_api; #[derive(Default)] pub(crate) struct Service { @@ -18,8 +18,8 @@ impl Service { } } -impl From for Service { - fn from(svc: k8s::Service) -> Self { +impl From for Service { + fn from(svc: k8s_core_api::Service) -> Self { svc.spec .map(|spec| Self { cluster_ip: spec.cluster_ip, diff --git a/policy-controller/k8s/status/src/tests/mod.rs b/policy-controller/k8s/status/src/tests/mod.rs index f61339cd6e642..0e57b2d4fca59 100644 --- a/policy-controller/k8s/status/src/tests/mod.rs +++ b/policy-controller/k8s/status/src/tests/mod.rs @@ -1 +1 @@ -mod http_routes; +mod routes; diff --git a/policy-controller/k8s/status/src/tests/http_routes.rs b/policy-controller/k8s/status/src/tests/routes/grpc.rs similarity index 55% rename from policy-controller/k8s/status/src/tests/http_routes.rs rename to policy-controller/k8s/status/src/tests/routes/grpc.rs index 543b350a0d1c9..baeb2f04bd2e5 100644 --- a/policy-controller/k8s/status/src/tests/http_routes.rs +++ b/policy-controller/k8s/status/src/tests/routes/grpc.rs @@ -1,21 +1,19 @@ -use crate::{ - index::{self, POLICY_API_GROUP}, - resource_id::NamespaceGroupKindName, - Index, IndexMetrics, -}; -use k8s::Resource; +use crate::{index::POLICY_API_GROUP, resource_id::NamespaceGroupKindName, Index, IndexMetrics}; +use chrono::{DateTime, Utc}; use kubert::index::IndexNamespacedResource; use linkerd_policy_controller_core::{http_route::GroupKindName, POLICY_CONTROLLER_NAME}; -use linkerd_policy_controller_k8s_api::{self as k8s, gateway, policy::server::Port}; +use linkerd_policy_controller_k8s_api::{ + self as k8s_core_api, gateway as k8s_gateway_api, policy as linkerd_k8s_api, Resource, +}; use std::sync::Arc; use tokio::sync::{mpsc, watch}; #[test] -fn http_route_accepted_after_server_create() { +fn route_accepted_after_server_create() { let hostname = "test"; let claim = kubert::lease::Claim { holder: "test".to_string(), - expiry: chrono::DateTime::::MAX_UTC, + expiry: DateTime::::MAX_UTC, }; let (_claims_tx, claims_rx) = watch::channel(Arc::new(claim)); let (updates_tx, mut updates_rx) = mpsc::channel(10000); @@ -26,23 +24,30 @@ fn http_route_accepted_after_server_create() { IndexMetrics::register(&mut Default::default()), ); - // Apply the route. - let http_route = make_route("ns-0", "route-foo", "srv-8080"); - index.write().apply(http_route); - - // Create the expected update. + // Create the route id and route let id = NamespaceGroupKindName { namespace: "ns-0".to_string(), gkn: GroupKindName { - group: k8s::policy::HttpRoute::group(&()), - kind: k8s::policy::HttpRoute::kind(&()), name: "route-foo".into(), + kind: k8s_gateway_api::GrpcRoute::kind(&()), + group: k8s_gateway_api::GrpcRoute::group(&()), }, }; - let parent_status = - make_parent_status("ns-0", "srv-8080", "Accepted", "False", "NoMatchingParent"); + let route = make_route(&id, "srv-8080"); + + // Apply the route. + index.write().apply(route); + + // Create the expected update. + let parent_status = make_parent_status( + &id.namespace, + "srv-8080", + "Accepted", + "False", + "NoMatchingParent", + ); let status = make_status(vec![parent_status]); - let patch = index::make_patch("route-foo", status); + let patch = crate::index::make_patch(&id, status); // The first update will be that the HTTPRoute is not accepted because the // Server has been created yet. @@ -51,13 +56,13 @@ fn http_route_accepted_after_server_create() { assert_eq!(patch, update.patch); // Apply the server - let server = make_server( + let server = super::make_server( "ns-0", "srv-8080", - Port::Number(8080.try_into().unwrap()), + 8080, Some(("app", "app-0")), Some(("app", "app-0")), - Some(k8s::policy::server::ProxyProtocol::Http1), + Some(linkerd_k8s_api::server::ProxyProtocol::Http1), ); index.write().apply(server); @@ -65,14 +70,15 @@ fn http_route_accepted_after_server_create() { let id = NamespaceGroupKindName { namespace: "ns-0".to_string(), gkn: GroupKindName { - group: k8s::policy::HttpRoute::group(&()), - kind: k8s::policy::HttpRoute::kind(&()), name: "route-foo".into(), + kind: k8s_gateway_api::GrpcRoute::kind(&()), + group: k8s_gateway_api::GrpcRoute::group(&()), }, }; - let parent_status = make_parent_status("ns-0", "srv-8080", "Accepted", "True", "Accepted"); + let parent_status = + make_parent_status(&id.namespace, "srv-8080", "Accepted", "True", "Accepted"); let status = make_status(vec![parent_status]); - let patch = index::make_patch("route-foo", status); + let patch = crate::index::make_patch(&id, status); // The second update will be that the HTTPRoute is accepted because the // Server has been created. @@ -83,11 +89,11 @@ fn http_route_accepted_after_server_create() { } #[test] -fn http_route_rejected_after_server_delete() { +fn route_rejected_after_server_delete() { let hostname = "test"; let claim = kubert::lease::Claim { holder: "test".to_string(), - expiry: chrono::DateTime::::MAX_UTC, + expiry: DateTime::::MAX_UTC, }; let (_claims_tx, claims_rx) = watch::channel(Arc::new(claim)); let (updates_tx, mut updates_rx) = mpsc::channel(10000); @@ -98,34 +104,38 @@ fn http_route_rejected_after_server_delete() { IndexMetrics::register(&mut Default::default()), ); - let server = make_server( + let server = super::make_server( "ns-0", "srv-8080", - Port::Number(8080.try_into().unwrap()), + 8080, Some(("app", "app-0")), Some(("app", "app-0")), - Some(k8s::policy::server::ProxyProtocol::Http1), + Some(linkerd_k8s_api::server::ProxyProtocol::Http1), ); index.write().apply(server); // There should be no update since there are no HTTPRoutes yet. assert!(updates_rx.try_recv().is_err()); - let http_route = make_route("ns-0", "route-foo", "srv-8080"); - index.write().apply(http_route); - - // Create the expected update. + // Create the route id and route let id = NamespaceGroupKindName { namespace: "ns-0".to_string(), gkn: GroupKindName { - group: k8s::policy::HttpRoute::group(&()), - kind: k8s::policy::HttpRoute::kind(&()), name: "route-foo".into(), + kind: k8s_gateway_api::GrpcRoute::kind(&()), + group: k8s_gateway_api::GrpcRoute::group(&()), }, }; - let parent_status = make_parent_status("ns-0", "srv-8080", "Accepted", "True", "Accepted"); + let route = make_route(&id, "srv-8080"); + + // Apply the route + index.write().apply(route); + + // Create the expected update. + let parent_status = + make_parent_status(&id.namespace, "srv-8080", "Accepted", "True", "Accepted"); let status = make_status(vec![parent_status]); - let patch = index::make_patch("route-foo", status); + let patch = crate::index::make_patch(&id, status); // The second update will be that the HTTPRoute is accepted because the // Server has been created. @@ -135,7 +145,7 @@ fn http_route_rejected_after_server_delete() { { let mut index = index.write(); - >::delete( + >::delete( &mut index, "ns-0".to_string(), "srv-8080".to_string(), @@ -146,15 +156,15 @@ fn http_route_rejected_after_server_delete() { let id = NamespaceGroupKindName { namespace: "ns-0".to_string(), gkn: GroupKindName { - group: k8s::policy::HttpRoute::group(&()), - kind: k8s::policy::HttpRoute::kind(&()), name: "route-foo".into(), + kind: k8s_gateway_api::GrpcRoute::kind(&()), + group: k8s_gateway_api::GrpcRoute::group(&()), }, }; let parent_status = make_parent_status("ns-0", "srv-8080", "Accepted", "False", "NoMatchingParent"); let status = make_status(vec![parent_status]); - let patch = index::make_patch("route-foo", status); + let patch = crate::index::make_patch(&id, status); // The third update will be that the HTTPRoute is not accepted because the // Server has been deleted. @@ -164,76 +174,47 @@ fn http_route_rejected_after_server_delete() { assert!(updates_rx.try_recv().is_err()); } -fn make_server( - namespace: impl ToString, - name: impl ToString, - port: Port, - srv_labels: impl IntoIterator, - pod_labels: impl IntoIterator, - proxy_protocol: Option, -) -> k8s::policy::Server { - k8s::policy::Server { - metadata: k8s::ObjectMeta { - namespace: Some(namespace.to_string()), - name: Some(name.to_string()), - labels: Some( - srv_labels - .into_iter() - .map(|(k, v)| (k.to_string(), v.to_string())) - .collect(), - ), - ..Default::default() - }, - spec: k8s::policy::ServerSpec { - port, - selector: k8s::policy::server::Selector::Pod(pod_labels.into_iter().collect()), - proxy_protocol, - }, - } -} - -fn make_route( - namespace: impl ToString, - name: impl ToString, - server: impl ToString, -) -> k8s::policy::HttpRoute { - use chrono::Utc; - use k8s::{policy::httproute::*, Time}; - - HttpRoute { - metadata: k8s::ObjectMeta { - namespace: Some(namespace.to_string()), - name: Some(name.to_string()), - creation_timestamp: Some(Time(Utc::now())), +fn make_route(id: &NamespaceGroupKindName, server: impl ToString) -> k8s_gateway_api::GrpcRoute { + k8s_gateway_api::GrpcRoute { + status: None, + metadata: k8s_core_api::ObjectMeta { + name: Some(id.gkn.name.to_string()), + namespace: Some(id.namespace.clone()), + creation_timestamp: Some(k8s_core_api::Time(Utc::now())), ..Default::default() }, - spec: HttpRouteSpec { - inner: CommonRouteSpec { - parent_refs: Some(vec![ParentReference { - group: Some(POLICY_API_GROUP.to_string()), - kind: Some("Server".to_string()), + spec: k8s_gateway_api::GrpcRouteSpec { + inner: k8s_gateway_api::CommonRouteSpec { + parent_refs: Some(vec![k8s_gateway_api::ParentReference { + port: None, namespace: None, - name: server.to_string(), section_name: None, - port: None, + name: server.to_string(), + kind: Some("Server".to_string()), + group: Some(POLICY_API_GROUP.to_string()), }]), }, hostnames: None, - rules: Some(vec![HttpRouteRule { - matches: Some(vec![HttpRouteMatch { - path: Some(HttpPathMatch::PathPrefix { - value: "/foo/bar".to_string(), - }), - headers: None, - query_params: None, - method: Some("GET".to_string()), - }]), + rules: Some(vec![k8s_gateway_api::GrpcRouteRule { filters: None, backend_refs: None, - timeouts: None, + matches: Some(vec![k8s_gateway_api::GrpcRouteMatch { + headers: None, + method: Some(k8s_gateway_api::GrpcMethodMatch::Exact { + method: Some("MakeRoute".to_string()), + service: Some("io.linkerd.Test".to_string()), + }), + }]), }]), }, - status: None, + } +} + +fn make_status( + parents: Vec, +) -> k8s_gateway_api::GrpcRouteStatus { + k8s_gateway_api::GrpcRouteStatus { + inner: k8s_gateway_api::RouteStatus { parents }, } } @@ -243,31 +224,25 @@ fn make_parent_status( type_: impl ToString, status: impl ToString, reason: impl ToString, -) -> gateway::RouteParentStatus { - let condition = k8s::Condition { - last_transition_time: k8s::Time(chrono::DateTime::::MIN_UTC), +) -> k8s_gateway_api::RouteParentStatus { + let condition = k8s_core_api::Condition { message: "".to_string(), + type_: type_.to_string(), observed_generation: None, reason: reason.to_string(), status: status.to_string(), - type_: type_.to_string(), + last_transition_time: k8s_core_api::Time(DateTime::::MIN_UTC), }; - gateway::RouteParentStatus { - parent_ref: gateway::ParentReference { - group: Some(POLICY_API_GROUP.to_string()), + k8s_gateway_api::RouteParentStatus { + conditions: vec![condition], + parent_ref: k8s_gateway_api::ParentReference { + port: None, + section_name: None, + name: name.to_string(), kind: Some("Server".to_string()), namespace: Some(namespace.to_string()), - name: name.to_string(), - section_name: None, - port: None, + group: Some(POLICY_API_GROUP.to_string()), }, controller_name: POLICY_CONTROLLER_NAME.to_string(), - conditions: vec![condition], - } -} - -fn make_status(parents: Vec) -> gateway::HttpRouteStatus { - gateway::HttpRouteStatus { - inner: gateway::RouteStatus { parents }, } } diff --git a/policy-controller/k8s/status/src/tests/routes/http.rs b/policy-controller/k8s/status/src/tests/routes/http.rs new file mode 100644 index 0000000000000..a75a2327917e3 --- /dev/null +++ b/policy-controller/k8s/status/src/tests/routes/http.rs @@ -0,0 +1,459 @@ +use crate::{index::POLICY_API_GROUP, resource_id::NamespaceGroupKindName, Index, IndexMetrics}; +use chrono::{DateTime, Utc}; +use kubert::index::IndexNamespacedResource; +use linkerd_policy_controller_core::{http_route::GroupKindName, POLICY_CONTROLLER_NAME}; +use linkerd_policy_controller_k8s_api::{ + self as k8s_core_api, gateway as k8s_gateway_api, policy as linkerd_k8s_api, Resource, +}; +use std::sync::Arc; +use tokio::sync::{mpsc, watch}; + +#[test] +fn linkerd_route_accepted_after_server_create() { + let hostname = "test"; + let claim = kubert::lease::Claim { + holder: "test".to_string(), + expiry: DateTime::::MAX_UTC, + }; + let (_claims_tx, claims_rx) = watch::channel(Arc::new(claim)); + let (updates_tx, mut updates_rx) = mpsc::channel(10000); + let index = Index::shared( + hostname, + claims_rx, + updates_tx, + IndexMetrics::register(&mut Default::default()), + ); + + // Create the route id and route + let id = NamespaceGroupKindName { + namespace: "ns-0".to_string(), + gkn: GroupKindName { + group: linkerd_k8s_api::HttpRoute::group(&()), + kind: linkerd_k8s_api::HttpRoute::kind(&()), + name: "route-foo".into(), + }, + }; + let route = make_linkerd_route(&id, "srv-8080"); + + // Apply the route. + index.write().apply(route); + + // Create the expected update. + let parent_status = make_parent_status( + &id.namespace, + "srv-8080", + "Accepted", + "False", + "NoMatchingParent", + ); + let status = make_status(vec![parent_status]); + let patch = crate::index::make_patch(&id, status); + + // The first update will be that the HTTPRoute is not accepted because the + // Server has been created yet. + let update = updates_rx.try_recv().unwrap(); + assert_eq!(id, update.id); + assert_eq!(patch, update.patch); + + // Apply the server + let server = super::make_server( + "ns-0", + "srv-8080", + 8080, + Some(("app", "app-0")), + Some(("app", "app-0")), + Some(linkerd_k8s_api::server::ProxyProtocol::Http1), + ); + index.write().apply(server); + + // Create the expected update. + let id = NamespaceGroupKindName { + namespace: "ns-0".to_string(), + gkn: GroupKindName { + group: linkerd_k8s_api::HttpRoute::group(&()), + kind: linkerd_k8s_api::HttpRoute::kind(&()), + name: "route-foo".into(), + }, + }; + let parent_status = + make_parent_status(&id.namespace, "srv-8080", "Accepted", "True", "Accepted"); + let status = make_status(vec![parent_status]); + let patch = crate::index::make_patch(&id, status); + + // The second update will be that the HTTPRoute is accepted because the + // Server has been created. + let update = updates_rx.try_recv().unwrap(); + assert_eq!(id, update.id); + assert_eq!(patch, update.patch); + assert!(updates_rx.try_recv().is_err()) +} + +#[test] +fn gateway_route_accepted_after_server_create() { + let hostname = "test"; + let claim = kubert::lease::Claim { + holder: "test".to_string(), + expiry: DateTime::::MAX_UTC, + }; + let (_claims_tx, claims_rx) = watch::channel(Arc::new(claim)); + let (updates_tx, mut updates_rx) = mpsc::channel(10000); + let index = Index::shared( + hostname, + claims_rx, + updates_tx, + IndexMetrics::register(&mut Default::default()), + ); + + // Create the route id and route + let id = NamespaceGroupKindName { + namespace: "ns-0".to_string(), + gkn: GroupKindName { + name: "route-foo".into(), + kind: k8s_gateway_api::HttpRoute::kind(&()), + group: k8s_gateway_api::HttpRoute::group(&()), + }, + }; + let route = make_gateway_route(&id, "srv-8080"); + + // Apply the route. + index.write().apply(route); + + // Create the expected update. + let parent_status = make_parent_status( + &id.namespace, + "srv-8080", + "Accepted", + "False", + "NoMatchingParent", + ); + let status = make_status(vec![parent_status]); + let patch = crate::index::make_patch(&id, status); + + // The first update will be that the HTTPRoute is not accepted because the + // Server has been created yet. + let update = updates_rx.try_recv().unwrap(); + assert_eq!(id, update.id); + assert_eq!(patch, update.patch); + + // Apply the server + let server = super::make_server( + "ns-0", + "srv-8080", + 8080, + Some(("app", "app-0")), + Some(("app", "app-0")), + Some(linkerd_k8s_api::server::ProxyProtocol::Http1), + ); + index.write().apply(server); + + // Create the expected update. + let id = NamespaceGroupKindName { + namespace: "ns-0".to_string(), + gkn: GroupKindName { + name: "route-foo".into(), + kind: k8s_gateway_api::HttpRoute::kind(&()), + group: k8s_gateway_api::HttpRoute::group(&()), + }, + }; + let parent_status = + make_parent_status(&id.namespace, "srv-8080", "Accepted", "True", "Accepted"); + let status = make_status(vec![parent_status]); + let patch = crate::index::make_patch(&id, status); + + // The second update will be that the HTTPRoute is accepted because the + // Server has been created. + let update = updates_rx.try_recv().unwrap(); + assert_eq!(id, update.id); + assert_eq!(patch, update.patch); + assert!(updates_rx.try_recv().is_err()) +} + +#[test] +fn linkerd_route_rejected_after_server_delete() { + let hostname = "test"; + let claim = kubert::lease::Claim { + holder: "test".to_string(), + expiry: DateTime::::MAX_UTC, + }; + let (_claims_tx, claims_rx) = watch::channel(Arc::new(claim)); + let (updates_tx, mut updates_rx) = mpsc::channel(10000); + let index = Index::shared( + hostname, + claims_rx, + updates_tx, + IndexMetrics::register(&mut Default::default()), + ); + + let server = super::make_server( + "ns-0", + "srv-8080", + 8080, + Some(("app", "app-0")), + Some(("app", "app-0")), + Some(linkerd_k8s_api::server::ProxyProtocol::Http1), + ); + index.write().apply(server); + + // There should be no update since there are no HTTPRoutes yet. + assert!(updates_rx.try_recv().is_err()); + + // Create the route id and route + let id = NamespaceGroupKindName { + namespace: "ns-0".to_string(), + gkn: GroupKindName { + group: linkerd_k8s_api::HttpRoute::group(&()), + kind: linkerd_k8s_api::HttpRoute::kind(&()), + name: "route-foo".into(), + }, + }; + let route = make_linkerd_route(&id, "srv-8080"); + + // Apply the route + index.write().apply(route); + + // Create the expected update. + let parent_status = + make_parent_status(&id.namespace, "srv-8080", "Accepted", "True", "Accepted"); + let status = make_status(vec![parent_status]); + let patch = crate::index::make_patch(&id, status); + + // The second update will be that the HTTPRoute is accepted because the + // Server has been created. + let update = updates_rx.try_recv().unwrap(); + assert_eq!(id, update.id); + assert_eq!(patch, update.patch); + + { + let mut index = index.write(); + >::delete( + &mut index, + "ns-0".to_string(), + "srv-8080".to_string(), + ); + } + + // Create the expected update. + let id = NamespaceGroupKindName { + namespace: "ns-0".to_string(), + gkn: GroupKindName { + group: linkerd_k8s_api::HttpRoute::group(&()), + kind: linkerd_k8s_api::HttpRoute::kind(&()), + name: "route-foo".into(), + }, + }; + let parent_status = + make_parent_status("ns-0", "srv-8080", "Accepted", "False", "NoMatchingParent"); + let status = make_status(vec![parent_status]); + let patch = crate::index::make_patch(&id, status); + + // The third update will be that the HTTPRoute is not accepted because the + // Server has been deleted. + let update = updates_rx.try_recv().unwrap(); + assert_eq!(id, update.id); + assert_eq!(patch, update.patch); + assert!(updates_rx.try_recv().is_err()); +} + +#[test] +fn gateway_route_rejected_after_server_delete() { + let hostname = "test"; + let claim = kubert::lease::Claim { + holder: "test".to_string(), + expiry: DateTime::::MAX_UTC, + }; + let (_claims_tx, claims_rx) = watch::channel(Arc::new(claim)); + let (updates_tx, mut updates_rx) = mpsc::channel(10000); + let index = Index::shared( + hostname, + claims_rx, + updates_tx, + IndexMetrics::register(&mut Default::default()), + ); + + let server = super::make_server( + "ns-0", + "srv-8080", + 8080, + Some(("app", "app-0")), + Some(("app", "app-0")), + Some(linkerd_k8s_api::server::ProxyProtocol::Http1), + ); + index.write().apply(server); + + // There should be no update since there are no HTTPRoutes yet. + assert!(updates_rx.try_recv().is_err()); + + // Create the route id and route + let id = NamespaceGroupKindName { + namespace: "ns-0".to_string(), + gkn: GroupKindName { + name: "route-foo".into(), + kind: k8s_gateway_api::HttpRoute::kind(&()), + group: k8s_gateway_api::HttpRoute::group(&()), + }, + }; + let route = make_gateway_route(&id, "srv-8080"); + + // Apply the route + index.write().apply(route); + + // Create the expected update. + let parent_status = + make_parent_status(&id.namespace, "srv-8080", "Accepted", "True", "Accepted"); + let status = make_status(vec![parent_status]); + let patch = crate::index::make_patch(&id, status); + + // The second update will be that the HTTPRoute is accepted because the + // Server has been created. + let update = updates_rx.try_recv().unwrap(); + assert_eq!(id, update.id); + assert_eq!(patch, update.patch); + + { + let mut index = index.write(); + >::delete( + &mut index, + "ns-0".to_string(), + "srv-8080".to_string(), + ); + } + + // Create the expected update. + let id = NamespaceGroupKindName { + namespace: "ns-0".to_string(), + gkn: GroupKindName { + name: "route-foo".into(), + kind: k8s_gateway_api::HttpRoute::kind(&()), + group: k8s_gateway_api::HttpRoute::group(&()), + }, + }; + let parent_status = + make_parent_status("ns-0", "srv-8080", "Accepted", "False", "NoMatchingParent"); + let status = make_status(vec![parent_status]); + let patch = crate::index::make_patch(&id, status); + + // The third update will be that the HTTPRoute is not accepted because the + // Server has been deleted. + let update = updates_rx.try_recv().unwrap(); + assert_eq!(id, update.id); + assert_eq!(patch, update.patch); + assert!(updates_rx.try_recv().is_err()); +} + +fn make_status( + parents: Vec, +) -> k8s_gateway_api::HttpRouteStatus { + k8s_gateway_api::HttpRouteStatus { + inner: k8s_gateway_api::RouteStatus { parents }, + } +} + +fn make_linkerd_route( + id: &NamespaceGroupKindName, + server: impl ToString, +) -> linkerd_k8s_api::HttpRoute { + linkerd_k8s_api::HttpRoute { + metadata: k8s_core_api::ObjectMeta { + namespace: Some(id.namespace.clone()), + name: Some(id.gkn.name.to_string()), + creation_timestamp: Some(k8s_core_api::Time(Utc::now())), + ..Default::default() + }, + spec: linkerd_k8s_api::HttpRouteSpec { + inner: linkerd_k8s_api::httproute::CommonRouteSpec { + parent_refs: Some(vec![linkerd_k8s_api::httproute::ParentReference { + group: Some(POLICY_API_GROUP.to_string()), + kind: Some("Server".to_string()), + namespace: None, + name: server.to_string(), + section_name: None, + port: None, + }]), + }, + hostnames: None, + rules: Some(vec![linkerd_k8s_api::httproute::HttpRouteRule { + matches: Some(vec![linkerd_k8s_api::httproute::HttpRouteMatch { + path: Some(linkerd_k8s_api::httproute::HttpPathMatch::PathPrefix { + value: "/foo/bar".to_string(), + }), + headers: None, + query_params: None, + method: Some("GET".to_string()), + }]), + filters: None, + backend_refs: None, + timeouts: None, + }]), + }, + status: None, + } +} + +fn make_gateway_route( + id: &NamespaceGroupKindName, + server: impl ToString, +) -> k8s_gateway_api::HttpRoute { + k8s_gateway_api::HttpRoute { + status: None, + metadata: k8s_core_api::ObjectMeta { + name: Some(id.gkn.name.to_string()), + namespace: Some(id.namespace.clone()), + creation_timestamp: Some(k8s_core_api::Time(Utc::now())), + ..Default::default() + }, + spec: k8s_gateway_api::HttpRouteSpec { + inner: k8s_gateway_api::CommonRouteSpec { + parent_refs: Some(vec![k8s_gateway_api::ParentReference { + port: None, + namespace: None, + section_name: None, + name: server.to_string(), + kind: Some("Server".to_string()), + group: Some(POLICY_API_GROUP.to_string()), + }]), + }, + hostnames: None, + rules: Some(vec![k8s_gateway_api::HttpRouteRule { + filters: None, + backend_refs: None, + matches: Some(vec![k8s_gateway_api::HttpRouteMatch { + headers: None, + query_params: None, + method: Some("GET".to_string()), + path: Some(k8s_gateway_api::HttpPathMatch::PathPrefix { + value: "/foo/bar".to_string(), + }), + }]), + }]), + }, + } +} + +fn make_parent_status( + namespace: impl ToString, + name: impl ToString, + type_: impl ToString, + status: impl ToString, + reason: impl ToString, +) -> k8s_gateway_api::RouteParentStatus { + let condition = k8s_core_api::Condition { + last_transition_time: k8s_core_api::Time(DateTime::::MIN_UTC), + message: "".to_string(), + observed_generation: None, + reason: reason.to_string(), + status: status.to_string(), + type_: type_.to_string(), + }; + k8s_gateway_api::RouteParentStatus { + parent_ref: k8s_gateway_api::ParentReference { + group: Some(POLICY_API_GROUP.to_string()), + kind: Some("Server".to_string()), + namespace: Some(namespace.to_string()), + name: name.to_string(), + section_name: None, + port: None, + }, + controller_name: POLICY_CONTROLLER_NAME.to_string(), + conditions: vec![condition], + } +} diff --git a/policy-controller/k8s/status/src/tests/routes/mod.rs b/policy-controller/k8s/status/src/tests/routes/mod.rs new file mode 100644 index 0000000000000..d546655b93141 --- /dev/null +++ b/policy-controller/k8s/status/src/tests/routes/mod.rs @@ -0,0 +1,33 @@ +use linkerd_policy_controller_k8s_api::{self as k8s_core_api, policy as linkerd_k8s_api}; + +mod grpc; +mod http; + +fn make_server( + namespace: impl ToString, + name: impl ToString, + port: u16, + srv_labels: impl IntoIterator, + pod_labels: impl IntoIterator, + proxy_protocol: Option, +) -> linkerd_k8s_api::Server { + let port = linkerd_k8s_api::server::Port::Number(port.try_into().unwrap()); + linkerd_k8s_api::Server { + metadata: k8s_core_api::ObjectMeta { + namespace: Some(namespace.to_string()), + name: Some(name.to_string()), + labels: Some( + srv_labels + .into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), + ), + ..Default::default() + }, + spec: linkerd_k8s_api::ServerSpec { + port, + selector: linkerd_k8s_api::server::Selector::Pod(pod_labels.into_iter().collect()), + proxy_protocol, + }, + } +}