Skip to content

Commit

Permalink
docs(prom): Document record_response interfaces
Browse files Browse the repository at this point in the history
Signed-off-by: katelyn martin <[email protected]>
  • Loading branch information
cratelyn committed Oct 1, 2024
1 parent cb52f40 commit b984daa
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
21 changes: 20 additions & 1 deletion linkerd/http/prom/src/record_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,28 @@ struct ResponseBody<L: StreamLabel> {
state: Option<ResponseState<L>>,
}

/// Inner state used by [`ResponseFuture`] and [`ResponseBody`].
///
/// This is used to update Prometheus metrics across the response's lifecycle.
///
/// This is generic across an `L`-typed [`StreamLabel`], which bears responsibility for labelling
/// responses according to their status code and duration.
struct ResponseState<L: StreamLabel> {
labeler: L,
/// The family of [`Counter`]s tracking response status code.
statuses: Family<L::StatusLabels, Counter>,
/// The family of [`Histogram`]s tracking response durations.
duration: DurationFamily<L::DurationLabels>,
/// Receives a timestamp noting when the service received a request.
start: oneshot::Receiver<time::Instant>,
}

/// A family of labeled duration histograms.
type DurationFamily<L> = Family<L, Histogram, MkDurationHistogram>;

/// Creates new [`Histogram`]s.
///
/// See [`MkDurationHistogram::new_metric()`].
#[derive(Clone, Debug)]
struct MkDurationHistogram(Arc<[f64]>);

Expand Down Expand Up @@ -216,18 +229,24 @@ where
L: StreamLabel,
F: Future<Output = Result<http::Response<BoxBody>, Error>>,
{
type Output = Result<http::Response<BoxBody>, Error>;
/// A [`ResponseFuture`] produces the same response as its inner `F`-typed future.
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();

// Poll the inner future, returning if it isn't ready yet.
let res = futures::ready!(this.inner.poll(cx)).map_err(Into::into);

// We got a response back! Take our state, and examine the output.
let mut state = this.state.take();
match res {
Ok(rsp) => {
if let Some(ResponseState { labeler, .. }) = state.as_mut() {
labeler.init_response(&rsp);
}

// Break the response into parts, and call `end_stream` if the body is empty.
let (head, inner) = rsp.into_parts();
if inner.is_end_stream() {
end_stream(&mut state, Ok(None));
Expand Down
1 change: 1 addition & 0 deletions linkerd/http/prom/src/record_response/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tokio::{sync::oneshot, time};

use super::{DurationFamily, MkDurationHistogram, MkStreamLabel};

/// Metrics type that tracks completed responses.
#[derive(Debug)]
pub struct ResponseMetrics<DurL, StatL> {
duration: DurationFamily<DurL>,
Expand Down

0 comments on commit b984daa

Please sign in to comment.