diff --git a/linkerd/app/core/src/control.rs b/linkerd/app/core/src/control.rs index 45b472d221..b3d2af34c0 100644 --- a/linkerd/app/core/src/control.rs +++ b/linkerd/app/core/src/control.rs @@ -55,6 +55,9 @@ impl fmt::Display for ControlAddr { } } +pub type BoxClient = + svc::BoxServiceClone, http::Response, Error>; + pub type RspBody = linkerd_http_metrics::requests::ResponseBody, classify::Eos>; @@ -69,15 +72,7 @@ impl Config { dns: dns::Resolver, metrics: metrics::ControlHttp, identity: identity::NewClient, - ) -> svc::ArcNewService< - (), - impl svc::Service< - http::Request, - Response = http::Response, - Error = ControlError, - Future = impl Send, - > + Clone, - > { + ) -> svc::ArcNewService<(), BoxClient> { let addr = self.addr; // When a DNS resolution fails, log the error and use the TTL, if there @@ -135,6 +130,8 @@ impl Config { .push(svc::NewMapErr::layer_from_target::()) .instrument(|c: &ControlAddr| info_span!("controller", addr = %c.addr)) .push_map_target(move |()| addr.clone()) + .push_on_service(svc::MapErr::layer_boxed()) + .push_on_service(svc::BoxServiceClone::layer()) .push(svc::ArcNewService::layer()) .into_inner() } diff --git a/linkerd/app/core/src/svc.rs b/linkerd/app/core/src/svc.rs index 8b075c88c8..3f2e6de97a 100644 --- a/linkerd/app/core/src/svc.rs +++ b/linkerd/app/core/src/svc.rs @@ -33,19 +33,15 @@ pub type BoxHttp = pub type ArcNewHttp = ArcNewService>; -pub type BoxCloneHttp = - BoxCloneService, http::Response, Error>; +pub type BoxHttpClone = + BoxServiceClone, http::Response, Error>; -pub type ArcNewCloneHttp = ArcNewService>; +pub type ArcNewHttpClone = ArcNewService>; pub type BoxTcp = BoxService; pub type ArcNewTcp = ArcNewService>; -pub type BoxCloneTcp = BoxCloneService; - -pub type ArcNewCloneTcp = ArcNewService>; - #[derive(Clone, Debug)] pub struct Layers(L); @@ -297,16 +293,16 @@ impl Stack { self.arc_new_box() } - pub fn arc_new_clone_http(self) -> Stack> + pub fn arc_new_clone_http(self) -> Stack> where T: 'static, B: 'static, S: NewService + Send + Sync + 'static, Svc: Service, Response = http::Response, Error = Error>, - Svc: Clone + Send + 'static, + Svc: Clone + Send + Sync + 'static, Svc::Future: Send, { - self.push_on_service(BoxCloneService::layer()) + self.push_on_service(BoxServiceClone::layer()) .push(ArcNewService::layer()) } diff --git a/linkerd/app/inbound/src/http/router.rs b/linkerd/app/inbound/src/http/router.rs index db4b5ed696..f04bd259ba 100644 --- a/linkerd/app/inbound/src/http/router.rs +++ b/linkerd/app/inbound/src/http/router.rs @@ -67,14 +67,14 @@ struct LogicalError { // === impl Inbound === impl Inbound { - pub(crate) fn push_http_router(self, profiles: P) -> Inbound> + pub(crate) fn push_http_router(self, profiles: P) -> Inbound> where T: Param + Param> + Param> + Param + Param, - T: Clone + Send + Unpin + 'static, + T: Clone + Send + Sync + Unpin + 'static, P: profiles::GetProfile, C: svc::MakeConnection + Clone + Send + Sync + Unpin + 'static, C::Connection: Send + Unpin, diff --git a/linkerd/app/inbound/src/http/server.rs b/linkerd/app/inbound/src/http/server.rs index 69bf43349a..f7d6e265b2 100644 --- a/linkerd/app/inbound/src/http/server.rs +++ b/linkerd/app/inbound/src/http/server.rs @@ -28,7 +28,7 @@ struct ServerError { impl Inbound { /// Prepares HTTP requests for inbound processing. Fails requests when the /// `HSvc`-typed inner service is not ready. - pub fn push_http_server(self) -> Inbound> + pub fn push_http_server(self) -> Inbound> where // Connection target. T: Param @@ -40,11 +40,8 @@ impl Inbound { T: Clone + Send + Sync + Unpin + 'static, // Inner HTTP stack. H: svc::NewService + Clone + Send + Sync + Unpin + 'static, - HSvc: svc::Service, Response = http::Response> - + Clone - + Send - + Unpin - + 'static, + HSvc: svc::Service, Response = http::Response>, + HSvc: Clone + Send + Sync + Unpin + 'static, HSvc::Error: Into, HSvc::Future: Send, { diff --git a/linkerd/app/outbound/src/http.rs b/linkerd/app/outbound/src/http.rs index 987b92d599..00cc8eed3f 100644 --- a/linkerd/app/outbound/src/http.rs +++ b/linkerd/app/outbound/src/http.rs @@ -78,7 +78,7 @@ impl Outbound>>> /// Builds a stack that routes HTTP requests to endpoint stacks. /// /// Buffered concrete services are cached in and evicted when idle. - pub fn push_http_cached(self, resolve: R) -> Outbound> + pub fn push_http_cached(self, resolve: R) -> Outbound> where // Logical HTTP target. T: svc::Param, diff --git a/linkerd/app/outbound/src/http/server.rs b/linkerd/app/outbound/src/http/server.rs index 3777942104..d1dcd596fe 100644 --- a/linkerd/app/outbound/src/http/server.rs +++ b/linkerd/app/outbound/src/http/server.rs @@ -11,7 +11,7 @@ pub(crate) struct ServerRescue { emit_headers: bool, } -impl Outbound> { +impl Outbound> { /// Builds a [`svc::NewService`] stack that prepares HTTP requests to be /// proxied. /// @@ -19,7 +19,7 @@ impl Outbound> { /// HTTP responses. /// /// Inner services must be available, otherwise load shedding is applied. - pub fn push_http_server(self) -> Outbound> + pub fn push_http_server(self) -> Outbound> where // Target T: svc::Param + 'static, diff --git a/linkerd/app/outbound/src/protocol.rs b/linkerd/app/outbound/src/protocol.rs index 076a1662ab..2516dba2a0 100644 --- a/linkerd/app/outbound/src/protocol.rs +++ b/linkerd/app/outbound/src/protocol.rs @@ -28,7 +28,7 @@ impl Outbound { /// each connection. pub fn push_protocol( self, - http: svc::ArcNewCloneHttp>, + http: svc::ArcNewHttpClone>, ) -> Outbound> where // Target type indicating whether detection should be skipped. diff --git a/linkerd/app/src/dst.rs b/linkerd/app/src/dst.rs index f0ce8622c4..8197d83cb1 100644 --- a/linkerd/app/src/dst.rs +++ b/linkerd/app/src/dst.rs @@ -3,8 +3,8 @@ use linkerd_app_core::{ exp_backoff::{ExponentialBackoff, ExponentialBackoffStream}, identity, metrics, profiles::{self, DiscoveryRejected}, - proxy::{api_resolve as api, http, resolve::recover}, - svc::{self, NewService, ServiceExt}, + proxy::{api_resolve as api, resolve::recover}, + svc::NewService, Error, Recover, }; @@ -15,15 +15,17 @@ pub struct Config { } /// Handles to destination service clients. -pub struct Dst { +pub struct Dst { /// The address of the destination service, used for logging. pub addr: control::ControlAddr, /// Resolves profiles. - pub profiles: profiles::RecoverDefault>, + pub profiles: profiles::RecoverDefault< + profiles::Client, + >, /// Resolves endpoints. - pub resolve: recover::Resolve>, + pub resolve: recover::Resolve>, } #[derive(Copy, Clone, Debug, Default)] @@ -37,24 +39,10 @@ impl Config { dns: dns::Resolver, metrics: metrics::ControlHttp, identity: identity::NewClient, - ) -> Result< - Dst< - impl svc::Service< - http::Request, - Response = http::Response, - Error = Error, - Future = impl Send, - > + Clone, - >, - Error, - > { + ) -> Result { let addr = self.control.addr.clone(); let backoff = BackoffUnlessInvalidArgument(self.control.connect.backoff); - let svc = self - .control - .build(dns, metrics, identity) - .new_service(()) - .map_err(Error::from); + let svc = self.control.build(dns, metrics, identity).new_service(()); let profiles = profiles::Client::new_recover_default(backoff, svc.clone(), self.context.clone()); diff --git a/linkerd/app/src/policy.rs b/linkerd/app/src/policy.rs index 357eaad070..d34ce3f2ee 100644 --- a/linkerd/app/src/policy.rs +++ b/linkerd/app/src/policy.rs @@ -1,10 +1,5 @@ use linkerd_app_core::{ - control, dns, - exp_backoff::ExponentialBackoff, - identity, metrics, - proxy::http, - svc::{self, NewService, ServiceExt}, - Error, + control, dns, exp_backoff::ExponentialBackoff, identity, metrics, svc::NewService, Error, }; use std::sync::Arc; @@ -16,12 +11,12 @@ pub struct Config { } /// Handles to policy service clients. -pub struct Policy { +pub struct Policy { /// The address of the policy service, used for logging. pub addr: control::ControlAddr, /// Policy service gRPC client. - pub client: S, + pub client: control::BoxClient, /// Workload identifier pub workload: Arc, @@ -37,26 +32,11 @@ impl Config { dns: dns::Resolver, metrics: metrics::ControlHttp, identity: identity::NewClient, - ) -> Result< - Policy< - impl svc::Service< - http::Request, - Response = http::Response, - Error = Error, - Future = impl Send, - > + Clone, - >, - Error, - > { + ) -> Result { let addr = self.control.addr.clone(); let workload = self.workload.into(); let backoff = self.control.connect.backoff; - let client = self - .control - .build(dns, metrics, identity) - .new_service(()) - .map_err(Error::from); - + let client = self.control.build(dns, metrics, identity).new_service(()); Ok(Policy { addr, client, diff --git a/linkerd/stack/src/box_service.rs b/linkerd/stack/src/box_service.rs index 929a74de5b..3871820370 100644 --- a/linkerd/stack/src/box_service.rs +++ b/linkerd/stack/src/box_service.rs @@ -1,37 +1,94 @@ -use std::marker::PhantomData; +use futures::Future; +use std::pin::Pin; pub use tower::util::BoxService; -#[derive(Copy, Debug)] -pub struct BoxServiceLayer { - _p: PhantomData, +pub struct BoxServiceClone { + inner: Box< + dyn sealed::CloneService< + Req, + Response = Rsp, + Error = E, + Future = Pin> + Send + 'static>>, + >, + >, } -impl tower::Layer for BoxServiceLayer -where - S: tower::Service + Send + 'static, - S::Future: Send + 'static, - S::Error: Send + 'static, -{ - type Service = BoxService; - fn layer(&self, s: S) -> Self::Service { - BoxService::new(s) +impl BoxServiceClone { + pub fn new(inner: S) -> Self + where + S: crate::Service, + S: Clone + Send + Sync + 'static, + S::Future: Send + 'static, + { + Self { + inner: Box::new(crate::BoxFuture::new(inner)), + } } -} -impl BoxServiceLayer { - pub fn new() -> Self { - Self { _p: PhantomData } + pub fn layer() -> impl crate::layer::Layer + Clone + Copy + where + S: crate::Service, + S: Clone + Send + Sync + 'static, + S::Future: Send + 'static, + { + crate::layer::mk(Self::new) } } -impl Clone for BoxServiceLayer { +impl Clone for BoxServiceClone { fn clone(&self) -> Self { - Self::new() + BoxServiceClone { + inner: self.inner.clone_boxed(), + } + } +} + +impl crate::Service for BoxServiceClone +where + Req: 'static, + Rsp: 'static, + E: 'static, +{ + type Response = Rsp; + type Error = E; + type Future = Pin> + Send + 'static>>; + + #[inline] + fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + self.inner.poll_ready(cx) + } + + #[inline] + fn call(&mut self, req: Req) -> Self::Future { + self.inner.call(req) } } -impl Default for BoxServiceLayer { - fn default() -> Self { - Self::new() +mod sealed { + use crate::Service; + + pub trait CloneService: Service + Send + Sync + 'static { + fn clone_boxed( + &self, + ) -> Box< + dyn CloneService< + Req, + Response = Self::Response, + Error = Self::Error, + Future = Self::Future, + >, + >; + } + + impl CloneService for S + where + S: Service + Send + Sync + Clone + 'static, + { + fn clone_boxed( + &self, + ) -> Box> + { + Box::new(self.clone()) + } } } diff --git a/linkerd/stack/src/lib.rs b/linkerd/stack/src/lib.rs index b419d03c47..5c1052272a 100644 --- a/linkerd/stack/src/lib.rs +++ b/linkerd/stack/src/lib.rs @@ -33,7 +33,7 @@ mod watch; pub use self::{ arc_new_service::ArcNewService, box_future::BoxFuture, - box_service::{BoxService, BoxServiceLayer}, + box_service::{BoxService, BoxServiceClone}, connect::{MakeConnection, WithoutConnectionMetadata}, either::{Either, NewEither}, fail::Fail, @@ -59,7 +59,7 @@ pub use self::{ }; pub use tower::{ service_fn, - util::{self, future_service, BoxCloneService, FutureService, Oneshot, ServiceExt}, + util::{self, future_service, FutureService, Oneshot, ServiceExt}, Service, };