Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract zone locality into separate TCP metric #3262

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions linkerd/app/core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@
pub use crate::transport::labels::{TargetAddr, TlsAccept};
use crate::{
classify::Class,
control, http_metrics, opencensus, opentelemetry, profiles, stack_metrics,
svc::Param,
tls,
control, http_metrics, opencensus, opentelemetry, profiles, proxy, stack_metrics, svc, tls,
transport::{self, labels::TlsConnect},
};
use linkerd_addr::Addr;
pub use linkerd_metrics::*;
use linkerd_proxy_server_policy as policy;
use prometheus_client::encoding::EncodeLabelValue;
use std::{
fmt::{self, Write},
net::SocketAddr,
Expand Down Expand Up @@ -102,9 +101,32 @@
pub server_id: tls::ConditionalClientTls,
pub authority: Option<http::uri::Authority>,
pub labels: Option<String>,
pub zone_locality: OutboundZoneLocality,
pub target_addr: SocketAddr,
}

#[derive(Debug, Copy, Clone, Default, Hash, Eq, PartialEq, EncodeLabelValue)]
pub enum OutboundZoneLocality {
#[default]
cratelyn marked this conversation as resolved.
Show resolved Hide resolved
Unknown,
Local,
Remote,
}

impl OutboundZoneLocality {
pub fn new(metadata: &proxy::api_resolve::Metadata) -> Self {
if let Some(is_zone_local) = metadata.is_zone_local() {
if is_zone_local {
OutboundZoneLocality::Local

Check warning on line 120 in linkerd/app/core/src/metrics.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/core/src/metrics.rs#L119-L120

Added lines #L119 - L120 were not covered by tests
} else {
OutboundZoneLocality::Remote

Check warning on line 122 in linkerd/app/core/src/metrics.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/core/src/metrics.rs#L122

Added line #L122 was not covered by tests
}
} else {
OutboundZoneLocality::Unknown
}
}
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct StackLabels {
pub direction: Direction,
Expand Down Expand Up @@ -217,7 +239,7 @@

// === impl CtlLabels ===

impl Param<ControlLabels> for control::ControlAddr {
impl svc::Param<ControlLabels> for control::ControlAddr {
fn param(&self) -> ControlLabels {
ControlLabels {
addr: self.addr.clone(),
Expand Down Expand Up @@ -361,6 +383,12 @@
}
}

impl svc::Param<OutboundZoneLocality> for OutboundEndpointLabels {
fn param(&self) -> OutboundZoneLocality {
self.zone_locality

Check warning on line 388 in linkerd/app/core/src/metrics.rs

View check run for this annotation

Codecov / codecov/patch

linkerd/app/core/src/metrics.rs#L387-L388

Added lines #L387 - L388 were not covered by tests
}
}

impl FmtLabels for OutboundEndpointLabels {
fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(a) = self.authority.as_ref() {
Expand Down
8 changes: 8 additions & 0 deletions linkerd/app/outbound/src/http/concrete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use super::{balance::EwmaConfig, client, handle_proxy_error_headers};
use crate::{http, stack_labels, BackendRef, Outbound, ParentRef};
use linkerd_app_core::{
config::{ConnectConfig, QueueConfig},
metrics::OutboundZoneLocality,
metrics::{prefix_labels, EndpointLabels, OutboundEndpointLabels},
Comment on lines +8 to 9
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, consistency.

profiles,
proxy::{
Expand Down Expand Up @@ -212,12 +213,19 @@ where
OutboundEndpointLabels {
authority: self.parent.param(),
labels: prefix_labels("dst", self.metadata.labels().iter()),
zone_locality: self.param(),
server_id: self.param(),
target_addr: self.addr.into(),
}
}
}

impl<T> svc::Param<OutboundZoneLocality> for Endpoint<T> {
fn param(&self) -> OutboundZoneLocality {
OutboundZoneLocality::new(&self.metadata)
}
}

impl<T> svc::Param<EndpointLabels> for Endpoint<T>
where
T: svc::Param<Option<http::uri::Authority>>,
Expand Down
8 changes: 8 additions & 0 deletions linkerd/app/outbound/src/http/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use super::{
use crate::{tcp::tagged_transport, Outbound};
use linkerd_app_core::{
classify, config, errors, http_tracing, metrics,
metrics::OutboundZoneLocality,
proxy::{api_resolve::ProtocolHint, http, tap},
svc::{self, ExtractParam},
tls,
Expand Down Expand Up @@ -238,6 +239,13 @@ impl<T: svc::Param<transport::labels::Key>> svc::Param<transport::labels::Key> f
}
}

impl<T: svc::Param<OutboundZoneLocality>> svc::Param<OutboundZoneLocality> for Connect<T> {
#[inline]
fn param(&self) -> OutboundZoneLocality {
self.inner.param()
}
}

// === impl EndpointError ===

impl<T> From<(&T, Error)> for EndpointError
Expand Down
2 changes: 2 additions & 0 deletions linkerd/app/outbound/src/http/endpoint/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::*;
use crate::{http, tcp, test_util::*};
use ::http::header::{CONNECTION, UPGRADE};
use linkerd_app_core::metrics::OutboundZoneLocality;
use linkerd_app_core::{
io,
proxy::api_resolve::ProtocolHint,
Expand Down Expand Up @@ -300,6 +301,7 @@ impl svc::Param<metrics::OutboundEndpointLabels> for Endpoint {
metrics::OutboundEndpointLabels {
authority: None,
labels: None,
zone_locality: OutboundZoneLocality::Unknown,
server_id: self.param(),
target_addr: self.addr.into(),
}
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/outbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ mod sidecar;
pub mod tcp;
#[cfg(any(test, feature = "test-util"))]
pub mod test_util;
mod zone;

pub use self::discover::{spawn_synthesized_profile_policy, synthesize_forward_policy, Discovery};
use self::metrics::OutboundMetrics;
Expand Down
8 changes: 6 additions & 2 deletions linkerd/app/outbound/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub struct OutboundMetrics {
pub(crate) struct PromMetrics {
pub(crate) http: crate::http::HttpMetrics,
pub(crate) opaq: crate::opaq::OpaqMetrics,
pub(crate) zone: crate::zone::TcpZoneMetricsParams,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
Expand All @@ -47,7 +48,7 @@ pub struct BalancerMetricsParams<K>(balance::MetricFamilies<K>);

struct ScopedKey<'a, 'b>(&'a str, &'b str);

// === impl BalancerMetricsPaarams ===
// === impl BalancerMetricsParams ===

impl<K> BalancerMetricsParams<K>
where
Expand Down Expand Up @@ -80,6 +81,7 @@ where
Self(balance::MetricFamilies::default())
}
}

// === impl PromMetrics ===

impl PromMetrics {
Expand All @@ -89,8 +91,10 @@ impl PromMetrics {
let http = crate::http::HttpMetrics::register(registry);

let opaq = crate::opaq::OpaqMetrics::register(registry.sub_registry_with_prefix("tcp"));
let zone =
crate::zone::TcpZoneMetricsParams::register(registry.sub_registry_with_prefix("tcp"));

Self { http, opaq }
Self { http, opaq, zone }
}
}

Expand Down
8 changes: 8 additions & 0 deletions linkerd/app/outbound/src/opaq/concrete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{metrics::BalancerMetricsParams, stack_labels, BackendRef, Outbound,
use linkerd_app_core::{
config::QueueConfig,
drain, io,
metrics::OutboundZoneLocality,
metrics::{
self,
prom::{self, EncodeLabelSetMut},
Expand Down Expand Up @@ -323,12 +324,19 @@ where
metrics::OutboundEndpointLabels {
authority,
labels: metrics::prefix_labels("dst", self.metadata.labels().iter()),
zone_locality: self.param(),
server_id: self.param(),
target_addr: self.addr.into(),
}
}
}

impl<T> svc::Param<OutboundZoneLocality> for Endpoint<T> {
fn param(&self) -> OutboundZoneLocality {
OutboundZoneLocality::new(&self.metadata)
}
}

impl<T> svc::Param<metrics::EndpointLabels> for Endpoint<T>
where
T: svc::Param<Option<profiles::LogicalAddr>>,
Expand Down
8 changes: 4 additions & 4 deletions linkerd/app/outbound/src/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
pub use self::connect::Connect;
use crate::Outbound;
use linkerd_app_core::{
io, svc,
io,
metrics::OutboundZoneLocality,
svc,
transport::{self, addrs::*, metrics},
Error, Infallible,
};
Expand All @@ -9,9 +12,6 @@ mod connect;
mod endpoint;
pub mod tagged_transport;

pub use self::connect::Connect;
pub use linkerd_app_core::proxy::tcp::Forward;

#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct Accept {
orig_dst: OrigDstAddr,
Expand Down
4 changes: 4 additions & 0 deletions linkerd/app/outbound/src/tcp/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ impl<C> Outbound<C> {
T: svc::Param<Option<http::AuthorityOverride>>,
T: svc::Param<Option<SessionProtocol>>,
T: svc::Param<transport::labels::Key>,
T: svc::Param<OutboundZoneLocality>,
// Connector stack.
C: svc::MakeConnection<Connect, Metadata = Local<ClientAddr>, Error = io::Error>,
C: Clone + Send + 'static,
Expand All @@ -45,6 +46,9 @@ impl<C> Outbound<C> {
.push(transport::metrics::Client::layer(
rt.metrics.proxy.transport.clone(),
))
.push(transport::metrics::zone::client::ZoneMetricsClient::layer(
rt.metrics.prom.zone.clone(),
))
})
}
}
79 changes: 79 additions & 0 deletions linkerd/app/outbound/src/zone.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use crate::metrics::prom;
use linkerd_app_core::{
metrics::OutboundZoneLocality, svc, transport::metrics::zone::TcpZoneMetrics,
};
use prometheus_client::{
encoding::{EncodeLabelSet, EncodeLabelValue},
registry::Unit,
};

#[derive(Clone, Debug, Default)]
pub struct TcpZoneMetricsParams {
transfer_cost: prom::Family<TcpZoneLabels, prom::Counter>,
}

impl TcpZoneMetricsParams {
pub fn register(registry: &mut prom::Registry) -> Self {
Comment on lines +10 to +16
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be most idiomatic to move this into linkerd/transport-metrics/src/zone.rs and have it be generic over the label type -- then the metrics details can all be private to the module and the labeling context remains in the outbound app.

let transfer_cost = prom::Family::default();
registry.register_with_unit(
"transfer_cost",
"The total cost of data transferred in and out of this proxy in bytes",
Copy link
Member

@olix0r olix0r Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More yakshaving, not a blocker:

Suggested change
"The total cost of data transferred in and out of this proxy in bytes",
"The total amount of data transferred in and out of this proxy, by cost zone",

Unit::Bytes,
transfer_cost.clone(),
);

TcpZoneMetricsParams { transfer_cost }
}

pub fn metrics(&self, zone_locality: OutboundZoneLocality) -> TcpZoneMetrics {
let recv_bytes = self
.transfer_cost
.get_or_create(&TcpZoneLabels::new_recv_labels(zone_locality))
.clone();
let send_bytes = self
.transfer_cost
.get_or_create(&TcpZoneLabels::new_send_labels(zone_locality))
.clone();
TcpZoneMetrics {
recv_bytes,
send_bytes,
}
}
}

#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq, EncodeLabelSet)]
pub struct TcpZoneLabels {
op: TcpZoneOp,
zone_locality: OutboundZoneLocality,
}

impl TcpZoneLabels {
fn new_send_labels(zone_locality: OutboundZoneLocality) -> Self {
Self {
op: TcpZoneOp::Send,
zone_locality,
}
}

fn new_recv_labels(zone_locality: OutboundZoneLocality) -> Self {
Self {
op: TcpZoneOp::Recv,
zone_locality,
}
}
}

#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq, EncodeLabelValue)]
enum TcpZoneOp {
Send,
Recv,
}

impl<T> svc::ExtractParam<TcpZoneMetrics, T> for TcpZoneMetricsParams
where
T: svc::Param<OutboundZoneLocality>,
{
fn extract_param(&self, target: &T) -> TcpZoneMetrics {
self.metrics(target.param())
}
}
9 changes: 9 additions & 0 deletions linkerd/proxy/api-resolve/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub struct Metadata {
authority_override: Option<Authority>,

http2: HTTP2ClientParams,
is_zone_local: Option<bool>,
}

#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
Expand All @@ -53,11 +54,13 @@ impl Default for Metadata {
tagged_transport_port: None,
protocol_hint: ProtocolHint::Unknown,
http2: HTTP2ClientParams::default(),
is_zone_local: None,
}
}
}

impl Metadata {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
labels: impl IntoIterator<Item = (String, String)>,
protocol_hint: ProtocolHint,
Expand All @@ -66,6 +69,7 @@ impl Metadata {
authority_override: Option<Authority>,
weight: u32,
http2: HTTP2ClientParams,
is_zone_local: Option<bool>,
) -> Self {
Self {
labels: labels.into_iter().collect::<BTreeMap<_, _>>().into(),
Expand All @@ -75,6 +79,7 @@ impl Metadata {
authority_override,
weight,
http2,
is_zone_local,
}
}

Expand All @@ -87,6 +92,10 @@ impl Metadata {
self.labels.clone()
}

pub fn is_zone_local(&self) -> Option<bool> {
self.is_zone_local
}

pub fn protocol_hint(&self) -> ProtocolHint {
self.protocol_hint
}
Expand Down
Loading
Loading