diff --git a/linkerd/http/prom/src/record_response.rs b/linkerd/http/prom/src/record_response.rs index 61bcbe35f5..44d0e73f03 100644 --- a/linkerd/http/prom/src/record_response.rs +++ b/linkerd/http/prom/src/record_response.rs @@ -1,7 +1,4 @@ -use http_body::Body; use linkerd_error::Error; -use linkerd_http_box::BoxBody; -use linkerd_metrics::prom::Counter; use linkerd_stack as svc; use prometheus_client::{ encoding::EncodeLabelSet, @@ -10,20 +7,16 @@ use prometheus_client::{ histogram::Histogram, }, }; -use std::{ - future::Future, - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; -use tokio::{sync::oneshot, time}; +use std::sync::Arc; mod request; mod response; +mod response_future; pub use self::{ request::{NewRequestDuration, RecordRequestDuration, RequestMetrics}, response::{NewResponseDuration, RecordResponseDuration, ResponseMetrics}, + response_future::{ResponseFuture, ResponseState}, }; /// A strategy for labeling request/responses streams for status and duration @@ -90,40 +83,6 @@ pub struct RecordResponse { metric: M, } -#[pin_project::pin_project] -pub struct ResponseFuture -where - L: StreamLabel, -{ - #[pin] - inner: F, - state: Option>, -} - -/// Notifies the response labeler when the response body is flushed. -#[pin_project::pin_project(PinnedDrop)] -struct ResponseBody { - #[pin] - inner: BoxBody, - state: Option>, -} - -/// Inner state used by [`ResponseFuture`] and [`ResponseBody`]. -/// -/// This is used to update Prometheus metrics across the response's lifecycle. -/// -/// This is generic across an `L`-typed [`StreamLabel`], which bears responsibility for labelling -/// responses according to their status code and duration. -struct ResponseState { - labeler: L, - /// The family of [`Counter`]s tracking response status code. - statuses: Family, - /// The family of [`Histogram`]s tracking response durations. - duration: DurationFamily, - /// Receives a timestamp noting when the service received a request. - start: oneshot::Receiver, -} - /// A family of labeled duration histograms. type DurationFamily = Family; @@ -190,127 +149,3 @@ where } } } - -// === impl ResponseFuture === - -impl Future for ResponseFuture -where - L: StreamLabel, - F: Future, Error>>, -{ - /// A [`ResponseFuture`] produces the same response as its inner `F`-typed future. - type Output = F::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - - // Poll the inner future, returning if it isn't ready yet. - let res = futures::ready!(this.inner.poll(cx)).map_err(Into::into); - - // We got a response back! Take our state, and examine the output. - let mut state = this.state.take(); - match res { - Ok(rsp) => { - let (head, inner) = rsp.into_parts(); - if let Some(ResponseState { labeler, .. }) = state.as_mut() { - labeler.init_response(&head); - } - - // Call `end_stream` if the body is empty. - if inner.is_end_stream() { - end_stream(&mut state, Ok(None)); - } - Poll::Ready(Ok(http::Response::from_parts( - head, - BoxBody::new(ResponseBody { inner, state }), - ))) - } - Err(error) => { - end_stream(&mut state, Err(&error)); - Poll::Ready(Err(error)) - } - } - } -} - -// === impl ResponseBody === - -impl http_body::Body for ResponseBody -where - L: StreamLabel, -{ - type Data = ::Data; - type Error = Error; - - fn poll_data( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - let mut this = self.project(); - let res = - futures::ready!(this.inner.as_mut().poll_data(cx)).map(|res| res.map_err(Into::into)); - if let Some(Err(error)) = res.as_ref() { - end_stream(this.state, Err(error)); - } else if (*this.inner).is_end_stream() { - end_stream(this.state, Ok(None)); - } - Poll::Ready(res) - } - - fn poll_trailers( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Error>> { - let this = self.project(); - let res = futures::ready!(this.inner.poll_trailers(cx)).map_err(Into::into); - end_stream(this.state, res.as_ref().map(Option::as_ref)); - Poll::Ready(res) - } - - fn is_end_stream(&self) -> bool { - self.inner.is_end_stream() - } -} - -#[pin_project::pinned_drop] -impl PinnedDrop for ResponseBody -where - L: StreamLabel, -{ - fn drop(self: Pin<&mut Self>) { - let this = self.project(); - if this.state.is_some() { - end_stream(this.state, Err(&RequestCancelled(()).into())); - } - } -} - -fn end_stream( - state: &mut Option>, - res: Result, &Error>, -) where - L: StreamLabel, -{ - let Some(ResponseState { - duration, - statuses: total, - mut start, - mut labeler, - }) = state.take() - else { - return; - }; - - labeler.end_response(res); - - total.get_or_create(&labeler.status_labels()).inc(); - - let elapsed = if let Ok(start) = start.try_recv() { - time::Instant::now().saturating_duration_since(start) - } else { - time::Duration::ZERO - }; - duration - .get_or_create(&labeler.duration_labels()) - .observe(elapsed.as_secs_f64()); -} diff --git a/linkerd/http/prom/src/record_response/response_future.rs b/linkerd/http/prom/src/record_response/response_future.rs new file mode 100644 index 0000000000..d705a17aea --- /dev/null +++ b/linkerd/http/prom/src/record_response/response_future.rs @@ -0,0 +1,172 @@ +use super::{DurationFamily, StreamLabel}; +use http_body::Body; +use linkerd_error::Error; +use linkerd_http_box::BoxBody; +use linkerd_metrics::prom::Counter; +use prometheus_client::metrics::family::Family; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tokio::{sync::oneshot, time}; + +#[pin_project::pin_project] +pub struct ResponseFuture +where + L: StreamLabel, +{ + #[pin] + pub(crate) inner: F, + pub(crate) state: Option>, +} + +/// Notifies the response labeler when the response body is flushed. +#[pin_project::pin_project(PinnedDrop)] +struct ResponseBody { + #[pin] + inner: BoxBody, + state: Option>, +} + +/// Inner state used by [`ResponseFuture`] and [`ResponseBody`]. +/// +/// This is used to update Prometheus metrics across the response's lifecycle. +/// +/// This is generic across an `L`-typed [`StreamLabel`], which bears responsibility for labelling +/// responses according to their status code and duration. +// +// TODO(kate): this should not need to be public. +pub struct ResponseState { + pub(super) labeler: L, + /// The family of [`Counter`]s tracking response status code. + pub(super) statuses: Family, + /// The family of [`Histogram`]s tracking response durations. + pub(super) duration: DurationFamily, + /// Receives a timestamp noting when the service received a request. + pub(super) start: oneshot::Receiver, +} + +// === impl ResponseFuture === + +impl Future for ResponseFuture +where + L: StreamLabel, + F: Future, Error>>, +{ + /// A [`ResponseFuture`] produces the same response as its inner `F`-typed future. + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + // Poll the inner future, returning if it isn't ready yet. + let res = futures::ready!(this.inner.poll(cx)).map_err(Into::into); + + // We got a response back! Take our state, and examine the output. + let mut state = this.state.take(); + match res { + Ok(rsp) => { + let (head, inner) = rsp.into_parts(); + if let Some(ResponseState { labeler, .. }) = state.as_mut() { + labeler.init_response(&head); + } + + // Call `end_stream` if the body is empty. + if inner.is_end_stream() { + end_stream(&mut state, Ok(None)); + } + Poll::Ready(Ok(http::Response::from_parts( + head, + BoxBody::new(ResponseBody { inner, state }), + ))) + } + Err(error) => { + end_stream(&mut state, Err(&error)); + Poll::Ready(Err(error)) + } + } + } +} + +// === impl ResponseBody === + +impl http_body::Body for ResponseBody +where + L: StreamLabel, +{ + type Data = ::Data; + type Error = Error; + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let mut this = self.project(); + let res = + futures::ready!(this.inner.as_mut().poll_data(cx)).map(|res| res.map_err(Into::into)); + if let Some(Err(error)) = res.as_ref() { + end_stream(this.state, Err(error)); + } else if (*this.inner).is_end_stream() { + end_stream(this.state, Ok(None)); + } + Poll::Ready(res) + } + + fn poll_trailers( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Error>> { + let this = self.project(); + let res = futures::ready!(this.inner.poll_trailers(cx)).map_err(Into::into); + end_stream(this.state, res.as_ref().map(Option::as_ref)); + Poll::Ready(res) + } + + fn is_end_stream(&self) -> bool { + self.inner.is_end_stream() + } +} + +#[pin_project::pinned_drop] +impl PinnedDrop for ResponseBody +where + L: StreamLabel, +{ + fn drop(self: Pin<&mut Self>) { + let this = self.project(); + if this.state.is_some() { + end_stream(this.state, Err(&super::RequestCancelled(()).into())); + } + } +} + +fn end_stream( + state: &mut Option>, + res: Result, &Error>, +) where + L: StreamLabel, +{ + let Some(ResponseState { + duration, + statuses: total, + mut start, + mut labeler, + }) = state.take() + else { + return; + }; + + labeler.end_response(res); + + total.get_or_create(&labeler.status_labels()).inc(); + + let elapsed = if let Ok(start) = start.try_recv() { + time::Instant::now().saturating_duration_since(start) + } else { + time::Duration::ZERO + }; + duration + .get_or_create(&labeler.duration_labels()) + .observe(elapsed.as_secs_f64()); +}