Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(http/prom): Simplify record_response middleware #3242

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 9 additions & 12 deletions linkerd/app/outbound/src/http/logical/policy/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::super::Concrete;
use crate::{ParentRef, RouteRef};
use linkerd_app_core::{classify, proxy::http, svc, Addr, Error, Result};
use linkerd_distribute as distribute;
use linkerd_http_prom as http_prom;
use linkerd_http_route as http_route;
use linkerd_proxy_client_policy as policy;
use std::{fmt::Debug, hash::Hash, sync::Arc};
Expand Down Expand Up @@ -57,8 +58,8 @@ pub(crate) type BackendDistribution<T, F> = distribute::Distribution<Backend<T,
pub(crate) type NewDistribute<T, F, N> = distribute::NewDistribute<Backend<T, F>, (), N>;

pub type Metrics<R, B> = metrics::RouteMetrics<
<R as metrics::MkStreamLabel>::StreamLabel,
<B as metrics::MkStreamLabel>::StreamLabel,
<R as http_prom::MkStreamLabel>::StreamLabel,
<B as http_prom::MkStreamLabel>::StreamLabel,
>;

/// Wraps errors with route metadata.
Expand Down Expand Up @@ -88,9 +89,9 @@ where
Self: filters::Apply,
Self: svc::Param<classify::Request>,
Self: svc::Param<extensions::Params>,
Self: metrics::MkStreamLabel,
Self: http_prom::MkStreamLabel,
MatchedBackend<T, M, F>: filters::Apply,
MatchedBackend<T, M, F>: metrics::MkStreamLabel,
MatchedBackend<T, M, F>: http_prom::MkStreamLabel,
{
/// Builds a route stack that applies policy filters to requests and
/// distributes requests over each route's backends. These [`Concrete`]
Expand Down Expand Up @@ -172,12 +173,10 @@ impl<T> filters::Apply for Http<T> {
}
}

impl<T> metrics::MkStreamLabel for Http<T> {
type StatusLabels = metrics::labels::HttpRouteRsp;
type DurationLabels = metrics::labels::Route;
impl<T> http_prom::MkStreamLabel for Http<T> {
type StreamLabel = metrics::LabelHttpRouteRsp;

fn mk_stream_labeler<B>(&self, _: &::http::Request<B>) -> Option<Self::StreamLabel> {
fn mk_stream_labeler(&self, _: &::http::request::Parts) -> Option<Self::StreamLabel> {
let parent = self.params.parent_ref.clone();
let route = self.params.route_ref.clone();
Some(metrics::LabelHttpRsp::from(metrics::labels::Route::from((
Expand Down Expand Up @@ -226,12 +225,10 @@ impl<T> filters::Apply for Grpc<T> {
}
}

impl<T> metrics::MkStreamLabel for Grpc<T> {
type StatusLabels = metrics::labels::GrpcRouteRsp;
type DurationLabels = metrics::labels::Route;
impl<T> http_prom::MkStreamLabel for Grpc<T> {
type StreamLabel = metrics::LabelGrpcRouteRsp;

fn mk_stream_labeler<B>(&self, _: &::http::Request<B>) -> Option<Self::StreamLabel> {
fn mk_stream_labeler(&self, _: &::http::request::Parts) -> Option<Self::StreamLabel> {
let parent = self.params.parent_ref.clone();
let route = self.params.route_ref.clone();
Some(metrics::LabelGrpcRsp::from(metrics::labels::Route::from((
Expand Down
32 changes: 14 additions & 18 deletions linkerd/app/outbound/src/http/logical/policy/route/backend.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{super::Concrete, filters};
use crate::{BackendRef, ParentRef, RouteRef};
use crate::{http::logical::policy::route, BackendRef, ParentRef, RouteRef};
use linkerd_app_core::{proxy::http, svc, Error, Result};
use linkerd_http_prom::record_response::MkStreamLabel;
use linkerd_http_prom as http_prom;
use linkerd_http_route as http_route;
use linkerd_proxy_client_policy as policy;
use std::{fmt::Debug, hash::Hash, sync::Arc};
Expand All @@ -21,7 +21,7 @@ pub(crate) type Http<T> =
pub(crate) type Grpc<T> =
MatchedBackend<T, http_route::grpc::r#match::RouteMatch, policy::grpc::Filter>;

pub type Metrics<T> = metrics::RouteBackendMetrics<<T as MkStreamLabel>::StreamLabel>;
pub type Metrics<T> = metrics::RouteBackendMetrics<<T as http_prom::MkStreamLabel>::StreamLabel>;

/// Wraps errors with backend metadata.
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -68,7 +68,7 @@ where
F: Clone + Send + Sync + 'static,
// Assert that filters can be applied.
Self: filters::Apply,
Self: metrics::MkStreamLabel,
Self: http_prom::MkStreamLabel,
{
/// Builds a stack that applies per-route-backend policy filters over an
/// inner [`Concrete`] stack.
Expand Down Expand Up @@ -147,17 +147,15 @@ impl<T> filters::Apply for Http<T> {
}
}

impl<T> metrics::MkStreamLabel for Http<T> {
type StatusLabels = metrics::labels::HttpRouteBackendRsp;
type DurationLabels = metrics::labels::RouteBackend;
type StreamLabel = metrics::LabelHttpRouteBackendRsp;
impl<T> http_prom::MkStreamLabel for Http<T> {
type StreamLabel = route::metrics::LabelHttpRouteBackendRsp;

fn mk_stream_labeler<B>(&self, _: &::http::Request<B>) -> Option<Self::StreamLabel> {
fn mk_stream_labeler(&self, _: &::http::request::Parts) -> Option<Self::StreamLabel> {
let parent = self.params.concrete.parent_ref.clone();
let route = self.params.route_ref.clone();
let backend = self.params.concrete.backend_ref.clone();
Some(metrics::LabelHttpRsp::from(
metrics::labels::RouteBackend::from((parent, route, backend)),
Some(route::metrics::LabelHttpRsp::from(
route::metrics::labels::RouteBackend::from((parent, route, backend)),
))
}
}
Expand All @@ -175,17 +173,15 @@ impl<T> filters::Apply for Grpc<T> {
}
}

impl<T> metrics::MkStreamLabel for Grpc<T> {
type StatusLabels = metrics::labels::GrpcRouteBackendRsp;
type DurationLabels = metrics::labels::RouteBackend;
type StreamLabel = metrics::LabelGrpcRouteBackendRsp;
impl<T> http_prom::MkStreamLabel for Grpc<T> {
type StreamLabel = route::metrics::LabelGrpcRouteBackendRsp;

fn mk_stream_labeler<B>(&self, _: &::http::Request<B>) -> Option<Self::StreamLabel> {
fn mk_stream_labeler(&self, _: &::http::request::Parts) -> Option<Self::StreamLabel> {
let parent = self.params.concrete.parent_ref.clone();
let route = self.params.route_ref.clone();
let backend = self.params.concrete.backend_ref.clone();
Some(metrics::LabelGrpcRsp::from(
metrics::labels::RouteBackend::from((parent, route, backend)),
Some(route::metrics::LabelGrpcRsp::from(
route::metrics::labels::RouteBackend::from((parent, route, backend)),
))
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use crate::{BackendRef, ParentRef, RouteRef};
use crate::{
http::logical::policy::route::metrics::{
labels, ExtractRecordDurationParams, NewRecordDuration,
},
BackendRef, ParentRef, RouteRef,
};
use linkerd_app_core::{metrics::prom, svc};
use linkerd_http_prom::{
record_response::{self, NewResponseDuration, StreamLabel},
NewCountRequests, RequestCount, RequestCountFamilies,
count_reqs::{NewCountRequests, RequestCount, RequestCountFamilies},
record_response::{self, MkStreamLabel, NewResponseDuration, ResponseMetrics, StreamLabel},
};

pub use super::super::metrics::*;
pub use linkerd_http_prom::record_response::MkStreamLabel;

#[cfg(test)]
mod tests;

Expand All @@ -17,11 +19,6 @@ pub struct RouteBackendMetrics<L: StreamLabel> {
responses: ResponseMetrics<L>,
}

type ResponseMetrics<L> = record_response::ResponseMetrics<
<L as StreamLabel>::DurationLabels,
<L as StreamLabel>::StatusLabels,
>;

pub fn layer<T, N>(
metrics: &RouteBackendMetrics<T::StreamLabel>,
) -> impl svc::Layer<
Expand Down Expand Up @@ -75,7 +72,7 @@ impl<L: StreamLabel> RouteBackendMetrics<L> {
p: ParentRef,
r: RouteRef,
b: BackendRef,
) -> linkerd_http_prom::RequestCount {
) -> RequestCount {
self.requests.metrics(&labels::RouteBackend(p, r, b))
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
use super::{
super::{Backend, Grpc, Http},
labels,
test_util::*,
LabelGrpcRouteBackendRsp, LabelHttpRouteBackendRsp, RouteBackendMetrics,
RouteBackendMetrics,
};
use crate::http::{
concrete,
logical::{
policy::route::metrics::{
labels, test_util::*, LabelGrpcRouteBackendRsp, LabelHttpRouteBackendRsp,
},
Concrete,
},
};
use crate::http::{concrete, logical::Concrete};
use linkerd_app_core::{
svc::{self, http::BoxBody, Layer, NewService},
transport::{Remote, ServerAddr},
Expand Down
35 changes: 22 additions & 13 deletions linkerd/app/outbound/src/http/logical/policy/route/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,14 @@ use linkerd_app_core::{
metrics::prom::{self, EncodeLabelSetMut},
svc,
};
use linkerd_http_prom::record_response::{self, StreamLabel};

pub use linkerd_http_prom::record_response::MkStreamLabel;
use linkerd_http_prom::record_response::{self, MkStreamLabel, RequestMetrics, StreamLabel};

pub mod labels;
#[cfg(test)]
pub(super) mod test_util;
#[cfg(test)]
mod tests;

pub type RequestMetrics<R> = record_response::RequestMetrics<
<R as StreamLabel>::DurationLabels,
<R as StreamLabel>::StatusLabels,
>;

#[derive(Debug)]
pub struct RouteMetrics<R: StreamLabel, B: StreamLabel> {
pub(super) retry: retry::RouteRetryMetrics,
Expand All @@ -29,18 +22,34 @@ pub type HttpRouteMetrics = RouteMetrics<LabelHttpRouteRsp, LabelHttpRouteBacken
pub type GrpcRouteMetrics = RouteMetrics<LabelGrpcRouteRsp, LabelGrpcRouteBackendRsp>;

/// Tracks HTTP streams to produce response labels.
///
/// Provides a [`StreamLabel`] implementation to label response streams on both logical routes and
/// concrete backends.
#[derive(Clone, Debug)]
pub struct LabelHttpRsp<L> {
/// The parent set of labels to which this response stream belongs.
parent: L,
/// The response's HTTP status code.
status: Option<http::StatusCode>,
/// The category of error, if applicable.
///
/// This is `None` when no error occured.
error: Option<labels::Error>,
}

/// Tracks gRPC streams to produce response labels.
///
/// Provides a [`StreamLabel`] implementation to label response streams on both logical routes and
/// concrete backends.
#[derive(Clone, Debug)]
pub struct LabelGrpcRsp<L> {
/// The parent set of labels to which this response stream belongs.
parent: L,
/// The response's gRPC status code.
status: Option<tonic::Code>,
/// The category of error, if applicable.
///
/// This is `None` when no error occured.
error: Option<labels::Error>,
}

Expand Down Expand Up @@ -127,7 +136,7 @@ impl<R: StreamLabel, B: StreamLabel> RouteMetrics<R, B> {
p: crate::ParentRef,
r: crate::RouteRef,
b: crate::BackendRef,
) -> linkerd_http_prom::RequestCount {
) -> linkerd_http_prom::count_reqs::RequestCount {
self.backend.backend_request_count(p, r, b)
}
}
Expand Down Expand Up @@ -166,8 +175,8 @@ where
type StatusLabels = labels::Rsp<P, labels::HttpRsp>;
type DurationLabels = P;

fn init_response<B>(&mut self, rsp: &http::Response<B>) {
self.status = Some(rsp.status());
fn init_response(&mut self, rsp: &http::response::Parts) {
self.status = Some(rsp.status);
}

fn end_response(&mut self, res: Result<Option<&http::HeaderMap>, &linkerd_app_core::Error>) {
Expand Down Expand Up @@ -217,9 +226,9 @@ where
type StatusLabels = labels::Rsp<P, labels::GrpcRsp>;
type DurationLabels = P;

fn init_response<B>(&mut self, rsp: &http::Response<B>) {
fn init_response(&mut self, rsp: &http::response::Parts) {
self.status = rsp
.headers()
.headers
.get("grpc-status")
.map(|v| tonic::Code::from_bytes(v.as_bytes()));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,38 +1,44 @@
//! Prometheus label types.
//!
//! This submodule contains types that implement [`EncodeLabelSet`], [`EncodeLabelSetMut`], and
//! [`EncodeLabelValue`]. These may be used to work with a labeled
//! [`Family`][prometheus_client::metrics::family::Family] of metrics.
//!
//! Use [`Family::get_or_create()`][prometheus_client::metrics::family::Family::get_or_create]
//! to retrieve, or create should it not exist, a metric with a given set of label values.

use linkerd_app_core::{errors, metrics::prom::EncodeLabelSetMut, proxy::http, Error as BoxError};
use prometheus_client::encoding::*;

use crate::{BackendRef, ParentRef, RouteRef};

/// Prometheus labels for a route resource, usually a service.
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct Route(pub ParentRef, pub RouteRef);

/// Prometheus labels for a backend resource.
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct RouteBackend(pub ParentRef, pub RouteRef, pub BackendRef);

/// Prometheus labels for a route's response.
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct Rsp<P, L>(pub P, pub L);

pub type RouteRsp<L> = Rsp<Route, L>;
pub type HttpRouteRsp = RouteRsp<HttpRsp>;
pub type GrpcRouteRsp = RouteRsp<GrpcRsp>;

pub type RouteBackendRsp<L> = Rsp<RouteBackend, L>;
pub type HttpRouteBackendRsp = RouteBackendRsp<HttpRsp>;
pub type GrpcRouteBackendRsp = RouteBackendRsp<GrpcRsp>;

/// Prometheus labels for an HTTP response.
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct HttpRsp {
pub status: Option<http::StatusCode>,
pub error: Option<Error>,
}

/// Prometheus labels for a gRPC response.
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct GrpcRsp {
pub status: Option<tonic::Code>,
pub error: Option<Error>,
}

/// Prometheus labels representing an error.
#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)]
pub enum Error {
FailFast,
Expand Down Expand Up @@ -178,6 +184,7 @@ impl EncodeLabelSet for GrpcRsp {
// === impl Error ===

impl Error {
/// Returns an [`Error`] or a status code, given a boxed error.
pub fn new_or_status(error: &BoxError) -> Result<Self, u16> {
use super::super::super::errors as policy;
use crate::http::h2::{H2Error, Reason};
Expand Down
5 changes: 3 additions & 2 deletions linkerd/app/outbound/src/http/logical/policy/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use linkerd_app_core::{
classify, proxy::http, svc, transport::addrs::*, Addr, Error, NameAddr, Result,
};
use linkerd_distribute as distribute;
use linkerd_http_prom as http_prom;
use linkerd_http_route as http_route;
use linkerd_proxy_client_policy as policy;
use std::{fmt::Debug, hash::Hash, sync::Arc};
Expand Down Expand Up @@ -65,8 +66,8 @@ where
route::MatchedRoute<T, M::Summary, F, P>: route::filters::Apply
+ svc::Param<classify::Request>
+ svc::Param<route::extensions::Params>
+ route::metrics::MkStreamLabel,
route::MatchedBackend<T, M::Summary, F>: route::filters::Apply + route::metrics::MkStreamLabel,
+ http_prom::MkStreamLabel,
route::MatchedBackend<T, M::Summary, F>: route::filters::Apply + http_prom::MkStreamLabel,
{
/// Builds a stack that applies routes to distribute requests over a cached
/// set of inner services so that.
Expand Down
4 changes: 2 additions & 2 deletions linkerd/http/prom/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)]
#![forbid(unsafe_code)]

mod count_reqs;
pub mod count_reqs;
pub mod record_response;

pub use self::count_reqs::{CountRequests, NewCountRequests, RequestCount, RequestCountFamilies};
pub use self::record_response::{MkStreamLabel, StreamLabel};
Loading
Loading