From b984daa923dc967aedfb9a7d6cc98a2ec9a20201 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Wed, 25 Sep 2024 00:00:00 +0000 Subject: [PATCH] docs(prom): Document `record_response` interfaces Signed-off-by: katelyn martin --- linkerd/http/prom/src/record_response.rs | 21 ++++++++++++++++++- .../http/prom/src/record_response/response.rs | 1 + 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/linkerd/http/prom/src/record_response.rs b/linkerd/http/prom/src/record_response.rs index 3810fd65ee..6765fe943a 100644 --- a/linkerd/http/prom/src/record_response.rs +++ b/linkerd/http/prom/src/record_response.rs @@ -128,15 +128,28 @@ struct ResponseBody { state: Option>, } +/// Inner state used by [`ResponseFuture`] and [`ResponseBody`]. +/// +/// This is used to update Prometheus metrics across the response's lifecycle. +/// +/// This is generic across an `L`-typed [`StreamLabel`], which bears responsibility for labelling +/// responses according to their status code and duration. struct ResponseState { labeler: L, + /// The family of [`Counter`]s tracking response status code. statuses: Family, + /// The family of [`Histogram`]s tracking response durations. duration: DurationFamily, + /// Receives a timestamp noting when the service received a request. start: oneshot::Receiver, } +/// A family of labeled duration histograms. type DurationFamily = Family; +/// Creates new [`Histogram`]s. +/// +/// See [`MkDurationHistogram::new_metric()`]. #[derive(Clone, Debug)] struct MkDurationHistogram(Arc<[f64]>); @@ -216,11 +229,16 @@ where L: StreamLabel, F: Future, Error>>, { - type Output = Result, Error>; + /// A [`ResponseFuture`] produces the same response as its inner `F`-typed future. + type Output = F::Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); + + // Poll the inner future, returning if it isn't ready yet. let res = futures::ready!(this.inner.poll(cx)).map_err(Into::into); + + // We got a response back! Take our state, and examine the output. let mut state = this.state.take(); match res { Ok(rsp) => { @@ -228,6 +246,7 @@ where labeler.init_response(&rsp); } + // Break the response into parts, and call `end_stream` if the body is empty. let (head, inner) = rsp.into_parts(); if inner.is_end_stream() { end_stream(&mut state, Ok(None)); diff --git a/linkerd/http/prom/src/record_response/response.rs b/linkerd/http/prom/src/record_response/response.rs index d11267aa32..dacaa73242 100644 --- a/linkerd/http/prom/src/record_response/response.rs +++ b/linkerd/http/prom/src/record_response/response.rs @@ -16,6 +16,7 @@ use tokio::{sync::oneshot, time}; use super::{DurationFamily, MkDurationHistogram, MkStreamLabel}; +/// Metrics type that tracks completed responses. #[derive(Debug)] pub struct ResponseMetrics { duration: DurationFamily,