Skip to content

Commit

Permalink
Add suport for response header filter (#2439)
Browse files Browse the repository at this point in the history
Add support for the response header modifier, which was added to the proxy API in linkerd/linkerd2-proxy-api#251

Signed-off-by: Alex Leong <[email protected]>
  • Loading branch information
adleong authored Jul 19, 2023
1 parent d6172c5 commit 3224560
Show file tree
Hide file tree
Showing 16 changed files with 161 additions and 45 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1893,9 +1893,9 @@ dependencies = [

[[package]]
name = "linkerd2-proxy-api"
version = "0.10.0"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "597facef5c3f12aece4d18a5e3dbba88288837b0b5d8276681d063e4c9b98a14"
checksum = "2348745f909668e6de2dbd175eeeac374887ffb33989a0e09766f1807b27cdfe"
dependencies = [
"h2",
"http",
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/inbound/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ linkerd-meshtls = { path = "../../meshtls", optional = true }
linkerd-meshtls-rustls = { path = "../../meshtls/rustls", optional = true }
linkerd-proxy-client-policy = { path = "../../proxy/client-policy" }
linkerd-tonic-watch = { path = "../../tonic-watch" }
linkerd2-proxy-api = { version = "0.10", features = ["inbound"] }
linkerd2-proxy-api = { version = "0.11", features = ["inbound"] }
once_cell = "1"
parking_lot = "0.12"
rangemap = "1"
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ ipnet = "2"
linkerd-app = { path = "..", features = ["allow-loopback"] }
linkerd-app-core = { path = "../core" }
linkerd-metrics = { path = "../../metrics", features = ["test_util"] }
linkerd2-proxy-api = { version = "0.10", features = [
linkerd2-proxy-api = { version = "0.11", features = [
"destination",
"arbitrary",
] }
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/outbound/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ ahash = "0.8"
bytes = "1"
http = "0.2"
futures = { version = "0.3", default-features = false }
linkerd2-proxy-api = { version = "0.10", features = ["outbound"] }
linkerd2-proxy-api = { version = "0.11", features = ["outbound"] }
linkerd-app-core = { path = "../core" }
linkerd-app-test = { path = "../test", optional = true }
linkerd-distribute = { path = "../../distribute" }
Expand Down
18 changes: 14 additions & 4 deletions linkerd/app/outbound/src/http/logical/policy/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,13 @@ impl<T, M, F, E> svc::Param<http::timeout::ResponseTimeout> for MatchedRoute<T,

impl<T> filters::Apply for Http<T> {
#[inline]
fn apply<B>(&self, req: &mut ::http::Request<B>) -> Result<()> {
filters::apply_http(&self.r#match, &self.params.filters, req)
fn apply_request<B>(&self, req: &mut ::http::Request<B>) -> Result<()> {
filters::apply_http_request(&self.r#match, &self.params.filters, req)
}

#[inline]
fn apply_response<B>(&self, rsp: &mut ::http::Response<B>) -> Result<()> {
filters::apply_http_response(&self.params.filters, rsp)
}
}

Expand All @@ -168,8 +173,13 @@ impl<T> svc::Param<classify::Request> for Http<T> {

impl<T> filters::Apply for Grpc<T> {
#[inline]
fn apply<B>(&self, req: &mut ::http::Request<B>) -> Result<()> {
filters::apply_grpc(&self.r#match, &self.params.filters, req)
fn apply_request<B>(&self, req: &mut ::http::Request<B>) -> Result<()> {
filters::apply_grpc_request(&self.r#match, &self.params.filters, req)
}

#[inline]
fn apply_response<B>(&self, rsp: &mut ::http::Response<B>) -> Result<()> {
filters::apply_grpc_response(&self.params.filters, rsp)
}
}

Expand Down
17 changes: 13 additions & 4 deletions linkerd/app/outbound/src/http/logical/policy/route/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,24 @@ impl<T, M, F> svc::Param<http::ResponseTimeout> for MatchedBackend<T, M, F> {

impl<T> filters::Apply for Http<T> {
#[inline]
fn apply<B>(&self, req: &mut ::http::Request<B>) -> Result<()> {
filters::apply_http(&self.r#match, &self.params.filters, req)
fn apply_request<B>(&self, req: &mut ::http::Request<B>) -> Result<()> {
filters::apply_http_request(&self.r#match, &self.params.filters, req)
}

#[inline]
fn apply_response<B>(&self, rsp: &mut ::http::Response<B>) -> Result<()> {
filters::apply_http_response(&self.params.filters, rsp)
}
}

impl<T> filters::Apply for Grpc<T> {
#[inline]
fn apply<B>(&self, req: &mut ::http::Request<B>) -> Result<()> {
filters::apply_grpc(&self.r#match, &self.params.filters, req)
fn apply_request<B>(&self, req: &mut ::http::Request<B>) -> Result<()> {
filters::apply_grpc_request(&self.r#match, &self.params.filters, req)
}

fn apply_response<B>(&self, rsp: &mut ::http::Response<B>) -> Result<()> {
filters::apply_grpc_response(&self.params.filters, rsp)
}
}

Expand Down
98 changes: 87 additions & 11 deletions linkerd/app/outbound/src/http/logical/policy/route/filters.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
use futures::{future, TryFutureExt};
use futures::{future, ready, Future, TryFuture, TryFutureExt};
use linkerd_app_core::{
svc::{self, ExtractParam},
Error, Result,
};
use linkerd_proxy_client_policy::{grpc, http};
use std::{marker::PhantomData, task};
use pin_project::pin_project;
use std::{
marker::PhantomData,
pin::Pin,
task::{self, Context, Poll},
};

/// A middleware that enforces policy on each HTTP request.
///
Expand All @@ -26,6 +31,15 @@ pub struct ApplyFilters<A, S> {
inner: S,
}

#[derive(Clone, Debug)]
#[pin_project]
pub struct ResponseFuture<A, F> {
apply: A,

#[pin]
inner: F,
}

pub mod errors {
use super::*;
use std::sync::Arc;
Expand Down Expand Up @@ -61,10 +75,11 @@ pub mod errors {
}

pub(crate) trait Apply {
fn apply<B>(&self, req: &mut ::http::Request<B>) -> Result<()>;
fn apply_request<B>(&self, req: &mut ::http::Request<B>) -> Result<()>;
fn apply_response<B>(&self, rsp: &mut ::http::Response<B>) -> Result<()>;
}

pub fn apply_http<B>(
pub fn apply_http_request<B>(
r#match: &http::RouteMatch,
filters: &[http::Filter],
req: &mut ::http::Request<B>,
Expand Down Expand Up @@ -99,13 +114,32 @@ pub fn apply_http<B>(
http::Filter::InternalError(msg) => {
return Err(errors::HttpInvalidPolicy(msg).into());
}
http::Filter::ResponseHeaders(_) => {} // ResponseHeaders filter does not apply to requests.
}
}

Ok(())
}

pub fn apply_http_response<B>(
filters: &[http::Filter],
rsp: &mut ::http::Response<B>,
) -> Result<()> {
// TODO Do any metrics apply here?
for filter in filters {
match filter {
http::Filter::InjectFailure(_) => {} // InjectFailure filter does not apply to responses.
http::Filter::Redirect(_) => {} // Redirect filter does not apply to responses.
http::Filter::RequestHeaders(_) => {} // RequestHeaders filter does not apply to responses.
http::Filter::InternalError(_) => {} // InternalError filter does not apply to responses.
http::Filter::ResponseHeaders(rh) => rh.apply(rsp.headers_mut()),
}
}

Ok(())
}

pub fn apply_grpc<B>(
pub fn apply_grpc_request<B>(
_match: &grpc::RouteMatch,
filters: &[grpc::Filter],
req: &mut ::http::Request<B>,
Expand All @@ -131,6 +165,20 @@ pub fn apply_grpc<B>(
Ok(())
}

pub fn apply_grpc_response<B>(
filters: &[grpc::Filter],
_rsp: &mut ::http::Response<B>,
) -> Result<()> {
for filter in filters {
match filter {
grpc::Filter::InjectFailure(_) => {} // InjectFailure filter does not apply to responses.
grpc::Filter::RequestHeaders(_) => {} // RequestHeaders filter does not apply to responses.
grpc::Filter::InternalError(_) => {} // InternalError filter does not apply to responses.
}
}

Ok(())
}
// === impl NewApplyFilters ===

impl<A, X: Clone, N> NewApplyFilters<A, X, N> {
Expand Down Expand Up @@ -167,25 +215,53 @@ where

impl<B, A, S> svc::Service<::http::Request<B>> for ApplyFilters<A, S>
where
A: Apply,
S: svc::Service<::http::Request<B>>,
A: Apply + Clone,
S: svc::Service<::http::Request<B>, Response = ::http::Response<B>>,
S::Error: Into<Error>,
{
type Response = S::Response;
type Error = Error;
type Future =
future::Either<future::Ready<Result<Self::Response>>, future::ErrInto<S::Future, Error>>;
type Future = future::Either<
future::Ready<Result<Self::Response>>,
future::ErrInto<ResponseFuture<A, S::Future>, Error>,
>;

#[inline]
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> task::Poll<Result<()>> {
self.inner.poll_ready(cx).map_err(Into::into)
}

fn call(&mut self, mut req: ::http::Request<B>) -> Self::Future {
if let Err(e) = self.apply.apply(&mut req) {
if let Err(e) = self.apply.apply_request(&mut req) {
return future::Either::Left(future::err(e));
}
let rsp = ResponseFuture {
apply: self.apply.clone(),
inner: self.inner.call(req),
};
future::Either::Right(rsp.err_into::<Error>())
}
}

// === impl ResponseFuture ===

future::Either::Right(self.inner.call(req).err_into::<Error>())
impl<B, A, F> Future for ResponseFuture<A, F>
where
A: Apply,
F: TryFuture<Ok = ::http::Response<B>>,
F::Error: Into<Error>,
{
type Output = Result<::http::Response<B>>;

#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let mut out = ready!(this.inner.try_poll(cx));
if let Ok(rsp) = &mut out {
if let Err(e) = this.apply.apply_response(rsp) {
return Poll::Ready(Err(e));
};
}
Poll::Ready(out.map_err(Into::into))
}
}
2 changes: 1 addition & 1 deletion linkerd/http-route/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ tracing = "0.1"
url = "2"

[dependencies.linkerd2-proxy-api]
version = "0.10"
version = "0.11"
features = ["http-route", "grpc-route"]
optional = true

Expand Down
41 changes: 29 additions & 12 deletions linkerd/http-route/src/http/filter/modify_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,23 @@ pub mod proto {
type Error = InvalidModifyHeader;

fn try_from(rhm: api::RequestHeaderModifier) -> Result<Self, Self::Error> {
fn to_pairs(
hs: Option<http_types::Headers>,
) -> Result<Vec<(HeaderName, HeaderValue)>, InvalidModifyHeader> {
hs.into_iter()
.flat_map(|a| a.headers.into_iter())
.map(|h| {
let name = h.name.parse::<HeaderName>()?;
let value = HeaderValue::from_bytes(&h.value)?;
Ok((name, value))
})
.collect()
}
let add = to_pairs(rhm.add)?;
let set = to_pairs(rhm.set)?;
let remove = rhm
.remove
.into_iter()
.map(|n| n.parse())
.collect::<Result<Vec<HeaderName>, http::header::InvalidHeaderName>>()?;
Ok(ModifyHeader { add, set, remove })
}
}

// === impl ModifyResponseHeader ===

impl TryFrom<api::ResponseHeaderModifier> for ModifyHeader {
type Error = InvalidModifyHeader;

fn try_from(rhm: api::ResponseHeaderModifier) -> Result<Self, Self::Error> {
let add = to_pairs(rhm.add)?;
let set = to_pairs(rhm.set)?;
let remove = rhm
Expand All @@ -69,4 +73,17 @@ pub mod proto {
Ok(ModifyHeader { add, set, remove })
}
}

fn to_pairs(
hs: Option<http_types::Headers>,
) -> Result<Vec<(HeaderName, HeaderValue)>, InvalidModifyHeader> {
hs.into_iter()
.flat_map(|a| a.headers.into_iter())
.map(|h| {
let name = h.name.parse::<HeaderName>()?;
let value = HeaderValue::from_bytes(&h.value)?;
Ok((name, value))
})
.collect()
}
}
2 changes: 1 addition & 1 deletion linkerd/proxy/api-resolve/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async-stream = "0.3"
futures = { version = "0.3", default-features = false }
linkerd-addr = { path = "../../addr" }
linkerd-error = { path = "../../error" }
linkerd2-proxy-api = { version = "0.10", features = ["destination"] }
linkerd2-proxy-api = { version = "0.11", features = ["destination"] }
linkerd-proxy-core = { path = "../core" }
linkerd-stack = { path = "../../stack" }
linkerd-tls = { path = "../../tls" }
Expand Down
2 changes: 1 addition & 1 deletion linkerd/proxy/client-policy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ proto = [
ahash = "0.8"
ipnet = "2"
http = "0.2"
linkerd2-proxy-api = { version = "0.10", optional = true, features = [
linkerd2-proxy-api = { version = "0.11", optional = true, features = [
"outbound",
] }
linkerd-error = { path = "../../error" }
Expand Down
4 changes: 4 additions & 0 deletions linkerd/proxy/client-policy/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub enum Filter {
InjectFailure(filter::InjectFailure),
Redirect(filter::RedirectRequest),
RequestHeaders(filter::ModifyHeader),
ResponseHeaders(filter::ModifyHeader),
InternalError(&'static str),
}

Expand Down Expand Up @@ -321,6 +322,9 @@ pub mod proto {
Kind::RequestHeaderModifier(filter) => {
Ok(Filter::RequestHeaders(filter.try_into()?))
}
Kind::ResponseHeaderModifier(filter) => {
Ok(Filter::ResponseHeaders(filter.try_into()?))
}
Kind::Redirect(filter) => Ok(Filter::Redirect(filter.try_into()?)),
}
}
Expand Down
2 changes: 1 addition & 1 deletion linkerd/proxy/identity-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ publish = false

[dependencies]
futures = { version = "0.3", default-features = false }
linkerd2-proxy-api = { version = "0.10", features = ["identity"] }
linkerd2-proxy-api = { version = "0.11", features = ["identity"] }
linkerd-error = { path = "../../error" }
linkerd-identity = { path = "../../identity" }
linkerd-metrics = { path = "../../metrics" }
Expand Down
2 changes: 1 addition & 1 deletion linkerd/proxy/server-policy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ prost-types = { version = "0.11", optional = true }
thiserror = "1"

[dependencies.linkerd2-proxy-api]
version = "0.10"
version = "0.11"
features = ["inbound"]
optional = true

Expand Down
4 changes: 2 additions & 2 deletions linkerd/proxy/tap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ http = "0.2"
hyper = { version = "0.14", features = ["http1", "http2"] }
futures = { version = "0.3", default-features = false }
ipnet = "2.7"
linkerd2-proxy-api = { version = "0.10", features = ["tap"] }
linkerd2-proxy-api = { version = "0.11", features = ["tap"] }
linkerd-conditional = { path = "../../conditional" }
linkerd-error = { path = "../../error" }
linkerd-meshtls = { path = "../../meshtls" }
Expand All @@ -30,5 +30,5 @@ tracing = "0.1"
pin-project = "1"

[dev-dependencies]
linkerd2-proxy-api = { version = "0.10", features = ["arbitrary"] }
linkerd2-proxy-api = { version = "0.11", features = ["arbitrary"] }
quickcheck = { version = "1", default-features = false }
Loading

0 comments on commit 3224560

Please sign in to comment.