Skip to content

Commit

Permalink
Fix bugs
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Leong <[email protected]>
  • Loading branch information
adleong committed Jul 21, 2023
1 parent 045ce2a commit 2835958
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 46 deletions.
2 changes: 1 addition & 1 deletion cli/cmd/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ displayed.`,
client := outbound.NewOutboundPoliciesClient(conn)

result, err = client.Get(cmd.Context(), &outbound.TrafficSpec{
SourceWorkload: "diagnostics",
SourceWorkload: "default:diagnostics",
Target: &outbound.TrafficSpec_Authority{Authority: fmt.Sprintf("%s.%s.svc:%d", name, namespace, port)},
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion policy-controller/grpc/src/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ where
outbound::traffic_spec::Target::Addr(target) => target,
outbound::traffic_spec::Target::Authority(auth) => {
return self.lookup_authority(&auth).map(
|(service_name, service_namespace, service_port)| OutboundDiscoverTarget {
|(service_namespace, service_name, service_port)| OutboundDiscoverTarget {
service_name,
service_namespace,
service_port,
Expand Down
47 changes: 31 additions & 16 deletions policy-controller/k8s/index/src/outbound/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,19 +356,28 @@ impl Namespace {
watches_by_ns: Default::default(),
};

for (gknn, route) in routes.into_iter() {
if *gknn.namespace == *self.namespace {
// This is a producer namespace route and should apply to watches
// from all namespaces.
for watch in service_routes.watches_by_ns.values_mut() {
// Producer routes are routes in the same namespace as their
// parent service. Consumer routes are routes in other
// namespaces.
let (producer_routes, consumer_routes): (Vec<_>, Vec<_>) = routes
.into_iter()
.partition(|(gknn, _route)| *gknn.namespace == *self.namespace);
for (gknn, route) in consumer_routes {
// Consumer routes should only apply to watches from the
// consumer namespace.
let watch = service_routes.watch_for_ns_or_default(gknn.namespace.to_string());
watch.routes.insert(gknn, route);
}
for (gknn, route) in producer_routes {
// Insert the route into the producer namespace.
let watch = service_routes.watch_for_ns_or_default(gknn.namespace.to_string());
watch.routes.insert(gknn.clone(), route.clone());
// Producer routes apply to clients in all namespaces, so
// apply it to watches for all other namespaces too.
for (ns, watch) in service_routes.watches_by_ns.iter_mut() {
if ns != &gknn.namespace {
watch.routes.insert(gknn.clone(), route.clone());
}
} else {
// This is a consumer namespace route and should only apply to
// watches from that namespace.
let watch =
service_routes.watch_for_ns_or_default(gknn.namespace.to_string());
watch.routes.insert(gknn, route);
}
}

Expand Down Expand Up @@ -728,11 +737,17 @@ impl ServiceRoutes {

fn apply(&mut self, gknn: GroupKindNamespaceName, route: HttpRoute) {
if *gknn.namespace == *self.namespace {
// This is a producer namespace route and should apply to watches
// from all namespaces.
for watch in self.watches_by_ns.values_mut() {
watch.routes.insert(gknn.clone(), route.clone());
watch.send_if_modified();
// This is a producer namespace route.
let watch = self.watch_for_ns_or_default(gknn.namespace.to_string());
watch.routes.insert(gknn.clone(), route.clone());
watch.send_if_modified();
// Producer routes apply to clients in all namespaces, so
// apply it to watches for all other namespaces too.
for (ns, watch) in self.watches_by_ns.iter_mut() {
if ns != &gknn.namespace {
watch.routes.insert(gknn.clone(), route.clone());
watch.send_if_modified();
}
}
} else {
// This is a consumer namespace route and should only apply to
Expand Down
2 changes: 1 addition & 1 deletion policy-test/src/curl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ impl Running {
&self.name,
|obj: Option<&k8s::Pod>| -> bool { obj.and_then(get_exit_code).is_some() },
);
let pod = match time::timeout(time::Duration::from_secs(30), finished).await {
let pod = match time::timeout(time::Duration::from_secs(60), finished).await {
Ok(Ok(Some(pod))) => pod,
Ok(Ok(None)) => unreachable!("Condition must wait for pod"),
Ok(Err(error)) => panic!("Failed to wait for exit code: {}: {}", self.name, error),
Expand Down
37 changes: 24 additions & 13 deletions policy-test/tests/outbound_api_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -848,45 +848,56 @@ async fn http_route_with_no_port() {
// Create a service
let svc = create_service(&client, &ns, "my-svc", 4191).await;

let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
let config = rx
let mut rx_4191 = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
let config_4191 = rx_4191
.next()
.await
.expect("watch must not fail")
.expect("watch must return an initial config");
tracing::trace!(?config);
tracing::trace!(?config_4191);

let mut rx_9999 = retry_watch_outbound_policy(&client, &ns, &svc, 9999).await;
let config_9999 = rx_9999
.next()
.await
.expect("watch must not fail")
.expect("watch must return an initial config");
tracing::trace!(?config_9999);

// There should be a default route.
detect_http_routes(&config, |routes| {
detect_http_routes(&config_4191, |routes| {
let route = assert_singleton(routes);
assert_route_is_default(route, &svc, 4191);
});
detect_http_routes(&config_9999, |routes| {
let route = assert_singleton(routes);
assert_route_is_default(route, &svc, 9999);
});

let _route = create(&client, mk_http_route(&ns, "foo-route", &svc, None).build()).await;

let config = rx
let config_4191 = rx_4191
.next()
.await
.expect("watch must not fail")
.expect("watch must return an updated config");
tracing::trace!(?config);
tracing::trace!(?config_4191);

// The route should apply to the service.
detect_http_routes(&config, |routes| {
detect_http_routes(&config_4191, |routes| {
let route = assert_singleton(routes);
assert_route_name_eq(route, "foo-route");
});

// The route should apply to other ports too.
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 9999).await;
let config = rx
let config_9999 = rx_9999
.next()
.await
.expect("watch must not fail")
.expect("watch must return an initial config");
tracing::trace!(?config);
.expect("watch must return an updated config");
tracing::trace!(?config_9999);

detect_http_routes(&config, |routes| {
// The route should apply to other ports too.
detect_http_routes(&config_9999, |routes| {
let route = assert_singleton(routes);
assert_route_name_eq(route, "foo-route");
});
Expand Down
39 changes: 25 additions & 14 deletions policy-test/tests/outbound_api_linkerd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -865,45 +865,56 @@ async fn http_route_with_no_port() {
// Create a service
let svc = create_service(&client, &ns, "my-svc", 4191).await;

let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
let config = rx
let mut rx_4191 = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
let config_4191 = rx_4191
.next()
.await
.expect("watch must not fail")
.expect("watch must return an initial config");
tracing::trace!(?config);
tracing::trace!(?config_4191);

let mut rx_9999 = retry_watch_outbound_policy(&client, &ns, &svc, 9999).await;
let config_9999 = rx_9999
.next()
.await
.expect("watch must not fail")
.expect("watch must return an initial config");
tracing::trace!(?config_9999);

// There should be a default route.
detect_http_routes(&config, |routes| {
detect_http_routes(&config_4191, |routes| {
let route = assert_singleton(routes);
assert_route_is_default(route, &svc, 4191);
});
detect_http_routes(&config_9999, |routes| {
let route = assert_singleton(routes);
assert_route_is_default(route, &svc, 9999);
});

let _route = create(&client, mk_http_route(&ns, "foo-route", &svc, None).build()).await;

let config = rx
let config_4191 = rx_4191
.next()
.await
.expect("watch must not fail")
.expect("watch must return an updated config");
tracing::trace!(?config);
tracing::trace!(?config_4191);

// The route should apply to the service.
detect_http_routes(&config, |routes| {
detect_http_routes(&config_4191, |routes| {
let route = assert_singleton(routes);
assert_route_name_eq(route, "foo-route");
});

// The route should apply to other ports too.
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 9999).await;
let config = rx
let config_9999 = rx_9999
.next()
.await
.expect("watch must not fail")
.expect("watch must return an initial config");
tracing::trace!(?config);
.expect("watch must return an updated config");
tracing::trace!(?config_9999);

detect_http_routes(&config, |routes| {
// The route should apply to other ports too.
detect_http_routes(&config_9999, |routes| {
let route = assert_singleton(routes);
assert_route_name_eq(route, "foo-route");
});
Expand Down Expand Up @@ -1002,7 +1013,7 @@ async fn consumer_route() {
)
.await;

let mut producer_rx = retry_watch_outbound_policy(&client, &ns, &svc, 4149).await;
let mut producer_rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
let producer_config = producer_rx
.next()
.await
Expand Down

0 comments on commit 2835958

Please sign in to comment.