Skip to content

Commit

Permalink
Box controller clients
Browse files Browse the repository at this point in the history
This change extends e62cc28 by boxing the controller clients so their
inner types signatures don't leak into the stacks that use them.

This change adds a new BoxServiceClone type to replace Tower's
BoxCloneService. Our new type implements Sync so the Service can be used
across threads.
  • Loading branch information
olix0r committed Nov 11, 2023
1 parent e62cc28 commit f53dcaf
Show file tree
Hide file tree
Showing 11 changed files with 116 additions and 101 deletions.
15 changes: 6 additions & 9 deletions linkerd/app/core/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ impl fmt::Display for ControlAddr {
}
}

pub type BoxClient =
svc::BoxServiceClone<http::Request<tonic::body::BoxBody>, http::Response<RspBody>, Error>;

pub type RspBody =
linkerd_http_metrics::requests::ResponseBody<http::balance::Body<hyper::Body>, classify::Eos>;

Expand All @@ -69,15 +72,7 @@ impl Config {
dns: dns::Resolver,
metrics: metrics::ControlHttp,
identity: identity::NewClient,
) -> svc::ArcNewService<
(),
impl svc::Service<
http::Request<tonic::body::BoxBody>,
Response = http::Response<RspBody>,
Error = ControlError,
Future = impl Send,
> + Clone,
> {
) -> svc::ArcNewService<(), BoxClient> {
let addr = self.addr;

// When a DNS resolution fails, log the error and use the TTL, if there
Expand Down Expand Up @@ -135,6 +130,8 @@ impl Config {
.push(svc::NewMapErr::layer_from_target::<ControlError, _>())
.instrument(|c: &ControlAddr| info_span!("controller", addr = %c.addr))
.push_map_target(move |()| addr.clone())
.push_on_service(svc::MapErr::layer_boxed())
.push_on_service(svc::BoxServiceClone::layer())
.push(svc::ArcNewService::layer())
.into_inner()
}
Expand Down
16 changes: 6 additions & 10 deletions linkerd/app/core/src/svc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,15 @@ pub type BoxHttp<B = http::BoxBody> =

pub type ArcNewHttp<T, B = http::BoxBody> = ArcNewService<T, BoxHttp<B>>;

pub type BoxCloneHttp<B = http::BoxBody> =
BoxCloneService<http::Request<B>, http::Response<http::BoxBody>, Error>;
pub type BoxHttpClone<B = http::BoxBody> =
BoxServiceClone<http::Request<B>, http::Response<http::BoxBody>, Error>;

pub type ArcNewCloneHttp<T, B = http::BoxBody> = ArcNewService<T, BoxCloneHttp<B>>;
pub type ArcNewHttpClone<T, B = http::BoxBody> = ArcNewService<T, BoxHttpClone<B>>;

pub type BoxTcp<I> = BoxService<I, (), Error>;

pub type ArcNewTcp<T, I> = ArcNewService<T, BoxTcp<I>>;

pub type BoxCloneTcp<I> = BoxCloneService<I, (), Error>;

pub type ArcNewCloneTcp<T, I> = ArcNewService<T, BoxCloneTcp<I>>;

#[derive(Clone, Debug)]
pub struct Layers<L>(L);

Expand Down Expand Up @@ -297,16 +293,16 @@ impl<S> Stack<S> {
self.arc_new_box()
}

pub fn arc_new_clone_http<T, B, Svc>(self) -> Stack<ArcNewCloneHttp<T, B>>
pub fn arc_new_clone_http<T, B, Svc>(self) -> Stack<ArcNewHttpClone<T, B>>
where
T: 'static,
B: 'static,
S: NewService<T, Service = Svc> + Send + Sync + 'static,
Svc: Service<http::Request<B>, Response = http::Response<http::BoxBody>, Error = Error>,
Svc: Clone + Send + 'static,
Svc: Clone + Send + Sync + 'static,
Svc::Future: Send,
{
self.push_on_service(BoxCloneService::layer())
self.push_on_service(BoxServiceClone::layer())
.push(ArcNewService::layer())
}

Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/inbound/src/http/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ struct LogicalError {
// === impl Inbound ===

impl<C> Inbound<C> {
pub(crate) fn push_http_router<T, P>(self, profiles: P) -> Inbound<svc::ArcNewCloneHttp<T>>
pub(crate) fn push_http_router<T, P>(self, profiles: P) -> Inbound<svc::ArcNewHttpClone<T>>
where
T: Param<http::Version>
+ Param<Remote<ServerAddr>>
+ Param<Remote<ClientAddr>>
+ Param<tls::ConditionalServerTls>
+ Param<policy::AllowPolicy>,
T: Clone + Send + Unpin + 'static,
T: Clone + Send + Sync + Unpin + 'static,
P: profiles::GetProfile<Error = Error>,
C: svc::MakeConnection<Http> + Clone + Send + Sync + Unpin + 'static,
C::Connection: Send + Unpin,
Expand Down
9 changes: 3 additions & 6 deletions linkerd/app/inbound/src/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ struct ServerError {
impl<H> Inbound<H> {
/// Prepares HTTP requests for inbound processing. Fails requests when the
/// `HSvc`-typed inner service is not ready.
pub fn push_http_server<T, HSvc>(self) -> Inbound<svc::ArcNewCloneHttp<T>>
pub fn push_http_server<T, HSvc>(self) -> Inbound<svc::ArcNewHttpClone<T>>
where
// Connection target.
T: Param<Version>
Expand All @@ -40,11 +40,8 @@ impl<H> Inbound<H> {
T: Clone + Send + Sync + Unpin + 'static,
// Inner HTTP stack.
H: svc::NewService<T, Service = HSvc> + Clone + Send + Sync + Unpin + 'static,
HSvc: svc::Service<http::Request<http::BoxBody>, Response = http::Response<http::BoxBody>>
+ Clone
+ Send
+ Unpin
+ 'static,
HSvc: svc::Service<http::Request<http::BoxBody>, Response = http::Response<http::BoxBody>>,
HSvc: Clone + Send + Sync + Unpin + 'static,
HSvc::Error: Into<Error>,
HSvc::Future: Send,
{
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/outbound/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl<T> Outbound<svc::ArcNewHttp<concrete::Endpoint<logical::Concrete<Http<T>>>>
/// Builds a stack that routes HTTP requests to endpoint stacks.
///
/// Buffered concrete services are cached in and evicted when idle.
pub fn push_http_cached<R>(self, resolve: R) -> Outbound<svc::ArcNewCloneHttp<T>>
pub fn push_http_cached<R>(self, resolve: R) -> Outbound<svc::ArcNewHttpClone<T>>
where
// Logical HTTP target.
T: svc::Param<http::Version>,
Expand Down
4 changes: 2 additions & 2 deletions linkerd/app/outbound/src/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ pub(crate) struct ServerRescue {
emit_headers: bool,
}

impl<T> Outbound<svc::ArcNewCloneHttp<T>> {
impl<T> Outbound<svc::ArcNewHttpClone<T>> {
/// Builds a [`svc::NewService`] stack that prepares HTTP requests to be
/// proxied.
///
/// The services produced by this stack handle errors, converting them into
/// HTTP responses.
///
/// Inner services must be available, otherwise load shedding is applied.
pub fn push_http_server(self) -> Outbound<svc::ArcNewCloneHttp<T>>
pub fn push_http_server(self) -> Outbound<svc::ArcNewHttpClone<T>>
where
// Target
T: svc::Param<http::normalize_uri::DefaultAuthority> + 'static,
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/outbound/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl<N> Outbound<N> {
/// each connection.
pub fn push_protocol<T, I, NSvc>(
self,
http: svc::ArcNewCloneHttp<Http<T>>,
http: svc::ArcNewHttpClone<Http<T>>,
) -> Outbound<svc::ArcNewTcp<T, I>>
where
// Target type indicating whether detection should be skipped.
Expand Down
30 changes: 9 additions & 21 deletions linkerd/app/src/dst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use linkerd_app_core::{
exp_backoff::{ExponentialBackoff, ExponentialBackoffStream},
identity, metrics,
profiles::{self, DiscoveryRejected},
proxy::{api_resolve as api, http, resolve::recover},
svc::{self, NewService, ServiceExt},
proxy::{api_resolve as api, resolve::recover},
svc::NewService,
Error, Recover,
};

Expand All @@ -15,15 +15,17 @@ pub struct Config {
}

/// Handles to destination service clients.
pub struct Dst<S> {
pub struct Dst {
/// The address of the destination service, used for logging.
pub addr: control::ControlAddr,

/// Resolves profiles.
pub profiles: profiles::RecoverDefault<profiles::Client<BackoffUnlessInvalidArgument, S>>,
pub profiles: profiles::RecoverDefault<
profiles::Client<BackoffUnlessInvalidArgument, control::BoxClient>,
>,

/// Resolves endpoints.
pub resolve: recover::Resolve<BackoffUnlessInvalidArgument, api::Resolve<S>>,
pub resolve: recover::Resolve<BackoffUnlessInvalidArgument, api::Resolve<control::BoxClient>>,
}

#[derive(Copy, Clone, Debug, Default)]
Expand All @@ -37,24 +39,10 @@ impl Config {
dns: dns::Resolver,
metrics: metrics::ControlHttp,
identity: identity::NewClient,
) -> Result<
Dst<
impl svc::Service<
http::Request<tonic::body::BoxBody>,
Response = http::Response<control::RspBody>,
Error = Error,
Future = impl Send,
> + Clone,
>,
Error,
> {
) -> Result<Dst, Error> {
let addr = self.control.addr.clone();
let backoff = BackoffUnlessInvalidArgument(self.control.connect.backoff);
let svc = self
.control
.build(dns, metrics, identity)
.new_service(())
.map_err(Error::from);
let svc = self.control.build(dns, metrics, identity).new_service(());

let profiles =
profiles::Client::new_recover_default(backoff, svc.clone(), self.context.clone());
Expand Down
30 changes: 5 additions & 25 deletions linkerd/app/src/policy.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
use linkerd_app_core::{
control, dns,
exp_backoff::ExponentialBackoff,
identity, metrics,
proxy::http,
svc::{self, NewService, ServiceExt},
Error,
control, dns, exp_backoff::ExponentialBackoff, identity, metrics, svc::NewService, Error,
};

use std::sync::Arc;
Expand All @@ -16,12 +11,12 @@ pub struct Config {
}

/// Handles to policy service clients.
pub struct Policy<S> {
pub struct Policy {
/// The address of the policy service, used for logging.
pub addr: control::ControlAddr,

/// Policy service gRPC client.
pub client: S,
pub client: control::BoxClient,

/// Workload identifier
pub workload: Arc<str>,
Expand All @@ -37,26 +32,11 @@ impl Config {
dns: dns::Resolver,
metrics: metrics::ControlHttp,
identity: identity::NewClient,
) -> Result<
Policy<
impl svc::Service<
http::Request<tonic::body::BoxBody>,
Response = http::Response<control::RspBody>,
Error = Error,
Future = impl Send,
> + Clone,
>,
Error,
> {
) -> Result<Policy, Error> {
let addr = self.control.addr.clone();
let workload = self.workload.into();
let backoff = self.control.connect.backoff;
let client = self
.control
.build(dns, metrics, identity)
.new_service(())
.map_err(Error::from);

let client = self.control.build(dns, metrics, identity).new_service(());
Ok(Policy {
addr,
client,
Expand Down
101 changes: 79 additions & 22 deletions linkerd/stack/src/box_service.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,94 @@
use std::marker::PhantomData;
use futures::Future;
use std::pin::Pin;
pub use tower::util::BoxService;

#[derive(Copy, Debug)]
pub struct BoxServiceLayer<R> {
_p: PhantomData<fn(R)>,
pub struct BoxServiceClone<Req, Rsp, E> {
inner: Box<
dyn sealed::CloneService<
Req,
Response = Rsp,
Error = E,
Future = Pin<Box<dyn Future<Output = Result<Rsp, E>> + Send + 'static>>,
>,
>,
}

impl<S, R> tower::Layer<S> for BoxServiceLayer<R>
where
S: tower::Service<R> + Send + 'static,
S::Future: Send + 'static,
S::Error: Send + 'static,
{
type Service = BoxService<R, S::Response, S::Error>;
fn layer(&self, s: S) -> Self::Service {
BoxService::new(s)
impl<Req: 'static, Rsp: 'static, E: 'static> BoxServiceClone<Req, Rsp, E> {
pub fn new<S>(inner: S) -> Self
where
S: crate::Service<Req, Response = Rsp, Error = E>,
S: Clone + Send + Sync + 'static,
S::Future: Send + 'static,
{
Self {
inner: Box::new(crate::BoxFuture::new(inner)),
}
}
}

impl<R> BoxServiceLayer<R> {
pub fn new() -> Self {
Self { _p: PhantomData }
pub fn layer<S>() -> impl crate::layer::Layer<S, Service = Self> + Clone + Copy
where
S: crate::Service<Req, Response = Rsp, Error = E>,
S: Clone + Send + Sync + 'static,
S::Future: Send + 'static,
{
crate::layer::mk(Self::new)
}
}

impl<R> Clone for BoxServiceLayer<R> {
impl<Req: 'static, Rsp: 'static, E: 'static> Clone for BoxServiceClone<Req, Rsp, E> {
fn clone(&self) -> Self {
Self::new()
BoxServiceClone {
inner: self.inner.clone_boxed(),
}
}
}

impl<Req, Rsp, E> crate::Service<Req> for BoxServiceClone<Req, Rsp, E>
where
Req: 'static,
Rsp: 'static,
E: 'static,
{
type Response = Rsp;
type Error = E;
type Future = Pin<Box<dyn Future<Output = Result<Rsp, E>> + Send + 'static>>;

#[inline]
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), E>> {
self.inner.poll_ready(cx)
}

#[inline]
fn call(&mut self, req: Req) -> Self::Future {
self.inner.call(req)
}
}

impl<R> Default for BoxServiceLayer<R> {
fn default() -> Self {
Self::new()
mod sealed {
use crate::Service;

pub trait CloneService<Req>: Service<Req> + Send + Sync + 'static {
fn clone_boxed(
&self,
) -> Box<
dyn CloneService<
Req,
Response = Self::Response,
Error = Self::Error,
Future = Self::Future,
>,
>;
}

impl<Req, S> CloneService<Req> for S
where
S: Service<Req> + Send + Sync + Clone + 'static,
{
fn clone_boxed(
&self,
) -> Box<dyn CloneService<Req, Response = S::Response, Error = S::Error, Future = S::Future>>
{
Box::new(self.clone())
}
}
}
Loading

0 comments on commit f53dcaf

Please sign in to comment.