Skip to content

Commit

Permalink
refactor(prom): Outline ResponseFuture into a submodule
Browse files Browse the repository at this point in the history
this is a bit of a self-contained system, with `ResponseFuture` acting
as the public face of it. let's codify that by making it its own
submodule.

Signed-off-by: katelyn martin <[email protected]>
  • Loading branch information
cratelyn committed Oct 1, 2024
1 parent 6dc17fc commit f80f790
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 168 deletions.
171 changes: 3 additions & 168 deletions linkerd/http/prom/src/record_response.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use http_body::Body;
use linkerd_error::Error;
use linkerd_http_box::BoxBody;
use linkerd_metrics::prom::Counter;
use linkerd_stack as svc;
use prometheus_client::{
encoding::EncodeLabelSet,
Expand All @@ -10,20 +7,16 @@ use prometheus_client::{
histogram::Histogram,
},
};
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::{sync::oneshot, time};
use std::sync::Arc;

mod request;
mod response;
mod response_future;

pub use self::{
request::{NewRequestDuration, RecordRequestDuration, RequestMetrics},
response::{NewResponseDuration, RecordResponseDuration, ResponseMetrics},
response_future::{ResponseFuture, ResponseState},
};

/// A strategy for labeling request/responses streams for status and duration
Expand Down Expand Up @@ -90,40 +83,6 @@ pub struct RecordResponse<L, M, S> {
metric: M,
}

#[pin_project::pin_project]
pub struct ResponseFuture<L, F>
where
L: StreamLabel,
{
#[pin]
inner: F,
state: Option<ResponseState<L>>,
}

/// Notifies the response labeler when the response body is flushed.
#[pin_project::pin_project(PinnedDrop)]
struct ResponseBody<L: StreamLabel> {
#[pin]
inner: BoxBody,
state: Option<ResponseState<L>>,
}

/// Inner state used by [`ResponseFuture`] and [`ResponseBody`].
///
/// This is used to update Prometheus metrics across the response's lifecycle.
///
/// This is generic across an `L`-typed [`StreamLabel`], which bears responsibility for labelling
/// responses according to their status code and duration.
struct ResponseState<L: StreamLabel> {
labeler: L,
/// The family of [`Counter`]s tracking response status code.
statuses: Family<L::StatusLabels, Counter>,
/// The family of [`Histogram`]s tracking response durations.
duration: DurationFamily<L::DurationLabels>,
/// Receives a timestamp noting when the service received a request.
start: oneshot::Receiver<time::Instant>,
}

/// A family of labeled duration histograms.
type DurationFamily<L> = Family<L, Histogram, MkDurationHistogram>;

Expand Down Expand Up @@ -190,127 +149,3 @@ where
}
}
}

// === impl ResponseFuture ===

impl<L, F> Future for ResponseFuture<L, F>
where
L: StreamLabel,
F: Future<Output = Result<http::Response<BoxBody>, Error>>,
{
/// A [`ResponseFuture`] produces the same response as its inner `F`-typed future.
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();

// Poll the inner future, returning if it isn't ready yet.
let res = futures::ready!(this.inner.poll(cx)).map_err(Into::into);

// We got a response back! Take our state, and examine the output.
let mut state = this.state.take();
match res {
Ok(rsp) => {
let (head, inner) = rsp.into_parts();
if let Some(ResponseState { labeler, .. }) = state.as_mut() {
labeler.init_response(&head);
}

// Call `end_stream` if the body is empty.
if inner.is_end_stream() {
end_stream(&mut state, Ok(None));
}
Poll::Ready(Ok(http::Response::from_parts(
head,
BoxBody::new(ResponseBody { inner, state }),
)))
}
Err(error) => {
end_stream(&mut state, Err(&error));
Poll::Ready(Err(error))
}
}
}
}

// === impl ResponseBody ===

impl<L> http_body::Body for ResponseBody<L>
where
L: StreamLabel,
{
type Data = <BoxBody as http_body::Body>::Data;
type Error = Error;

fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Error>>> {
let mut this = self.project();
let res =
futures::ready!(this.inner.as_mut().poll_data(cx)).map(|res| res.map_err(Into::into));
if let Some(Err(error)) = res.as_ref() {
end_stream(this.state, Err(error));
} else if (*this.inner).is_end_stream() {
end_stream(this.state, Ok(None));
}
Poll::Ready(res)
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Error>> {
let this = self.project();
let res = futures::ready!(this.inner.poll_trailers(cx)).map_err(Into::into);
end_stream(this.state, res.as_ref().map(Option::as_ref));
Poll::Ready(res)
}

fn is_end_stream(&self) -> bool {
self.inner.is_end_stream()
}
}

#[pin_project::pinned_drop]
impl<L> PinnedDrop for ResponseBody<L>
where
L: StreamLabel,
{
fn drop(self: Pin<&mut Self>) {
let this = self.project();
if this.state.is_some() {
end_stream(this.state, Err(&RequestCancelled(()).into()));
}
}
}

fn end_stream<L>(
state: &mut Option<ResponseState<L>>,
res: Result<Option<&http::HeaderMap>, &Error>,
) where
L: StreamLabel,
{
let Some(ResponseState {
duration,
statuses: total,
mut start,
mut labeler,
}) = state.take()
else {
return;
};

labeler.end_response(res);

total.get_or_create(&labeler.status_labels()).inc();

let elapsed = if let Ok(start) = start.try_recv() {
time::Instant::now().saturating_duration_since(start)
} else {
time::Duration::ZERO
};
duration
.get_or_create(&labeler.duration_labels())
.observe(elapsed.as_secs_f64());
}
172 changes: 172 additions & 0 deletions linkerd/http/prom/src/record_response/response_future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
use super::{DurationFamily, StreamLabel};
use http_body::Body;
use linkerd_error::Error;
use linkerd_http_box::BoxBody;
use linkerd_metrics::prom::Counter;
use prometheus_client::metrics::family::Family;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio::{sync::oneshot, time};

#[pin_project::pin_project]
pub struct ResponseFuture<L, F>
where
L: StreamLabel,
{
#[pin]
pub(crate) inner: F,
pub(crate) state: Option<ResponseState<L>>,
}

/// Notifies the response labeler when the response body is flushed.
#[pin_project::pin_project(PinnedDrop)]
struct ResponseBody<L: StreamLabel> {
#[pin]
inner: BoxBody,
state: Option<ResponseState<L>>,
}

/// Inner state used by [`ResponseFuture`] and [`ResponseBody`].

Check warning on line 32 in linkerd/http/prom/src/record_response/response_future.rs

View workflow job for this annotation

GitHub Actions / rust

warning: public documentation for `ResponseState` links to private item `ResponseBody` --> linkerd/http/prom/src/record_response/response_future.rs:32:50 | 32 | /// Inner state used by [`ResponseFuture`] and [`ResponseBody`]. | ^^^^^^^^^^^^ this item is private | = note: this link will resolve properly if you pass `--document-private-items` = note: `#[warn(rustdoc::private_intra_doc_links)]` on by default
///
/// This is used to update Prometheus metrics across the response's lifecycle.
///
/// This is generic across an `L`-typed [`StreamLabel`], which bears responsibility for labelling
/// responses according to their status code and duration.
//
// TODO(kate): this should not need to be public.
pub struct ResponseState<L: StreamLabel> {
pub(super) labeler: L,
/// The family of [`Counter`]s tracking response status code.
pub(super) statuses: Family<L::StatusLabels, Counter>,
/// The family of [`Histogram`]s tracking response durations.
pub(super) duration: DurationFamily<L::DurationLabels>,
/// Receives a timestamp noting when the service received a request.
pub(super) start: oneshot::Receiver<time::Instant>,
}

// === impl ResponseFuture ===

impl<L, F> Future for ResponseFuture<L, F>
where
L: StreamLabel,
F: Future<Output = Result<http::Response<BoxBody>, Error>>,
{
/// A [`ResponseFuture`] produces the same response as its inner `F`-typed future.
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();

// Poll the inner future, returning if it isn't ready yet.
let res = futures::ready!(this.inner.poll(cx)).map_err(Into::into);

// We got a response back! Take our state, and examine the output.
let mut state = this.state.take();
match res {
Ok(rsp) => {
let (head, inner) = rsp.into_parts();
if let Some(ResponseState { labeler, .. }) = state.as_mut() {
labeler.init_response(&head);
}

// Call `end_stream` if the body is empty.
if inner.is_end_stream() {
end_stream(&mut state, Ok(None));
}
Poll::Ready(Ok(http::Response::from_parts(
head,
BoxBody::new(ResponseBody { inner, state }),
)))
}
Err(error) => {
end_stream(&mut state, Err(&error));
Poll::Ready(Err(error))
}
}
}
}

// === impl ResponseBody ===

impl<L> http_body::Body for ResponseBody<L>
where
L: StreamLabel,
{
type Data = <BoxBody as http_body::Body>::Data;
type Error = Error;

fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Error>>> {
let mut this = self.project();
let res =
futures::ready!(this.inner.as_mut().poll_data(cx)).map(|res| res.map_err(Into::into));
if let Some(Err(error)) = res.as_ref() {
end_stream(this.state, Err(error));
} else if (*this.inner).is_end_stream() {
end_stream(this.state, Ok(None));
}
Poll::Ready(res)
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Error>> {
let this = self.project();
let res = futures::ready!(this.inner.poll_trailers(cx)).map_err(Into::into);
end_stream(this.state, res.as_ref().map(Option::as_ref));
Poll::Ready(res)
}

fn is_end_stream(&self) -> bool {
self.inner.is_end_stream()
}
}

#[pin_project::pinned_drop]
impl<L> PinnedDrop for ResponseBody<L>
where
L: StreamLabel,
{
fn drop(self: Pin<&mut Self>) {
let this = self.project();
if this.state.is_some() {
end_stream(this.state, Err(&super::RequestCancelled(()).into()));
}
}
}

fn end_stream<L>(
state: &mut Option<ResponseState<L>>,
res: Result<Option<&http::HeaderMap>, &Error>,
) where
L: StreamLabel,
{
let Some(ResponseState {
duration,
statuses: total,
mut start,
mut labeler,
}) = state.take()
else {
return;
};

labeler.end_response(res);

total.get_or_create(&labeler.status_labels()).inc();

let elapsed = if let Ok(start) = start.try_recv() {
time::Instant::now().saturating_duration_since(start)
} else {
time::Duration::ZERO
};
duration
.get_or_create(&labeler.duration_labels())
.observe(elapsed.as_secs_f64());
}

0 comments on commit f80f790

Please sign in to comment.