diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index be681bab06..3dfeed1cde 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -25,7 +25,7 @@ jobs: steps: - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 - id: changed - uses: tj-actions/changed-files@e9772d140489982e0e3704fea5ee93d536f1e275 + uses: tj-actions/changed-files@48d8f15b2aaa3d255ca5af3eba4870f807ce6b3c with: files: | .codecov.yml diff --git a/.github/workflows/fuzzers.yml b/.github/workflows/fuzzers.yml index 34fe29bdbe..07d4842deb 100644 --- a/.github/workflows/fuzzers.yml +++ b/.github/workflows/fuzzers.yml @@ -32,7 +32,7 @@ jobs: - run: apt update && apt install -y jo - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 - run: git config --global --add safe.directory "$PWD" # actions/runner#2033 - - uses: tj-actions/changed-files@e9772d140489982e0e3704fea5ee93d536f1e275 + - uses: tj-actions/changed-files@48d8f15b2aaa3d255ca5af3eba4870f807ce6b3c id: changed-files - name: list changed crates id: list-changed diff --git a/.github/workflows/markdown.yml b/.github/workflows/markdown.yml index 66cc431415..207aa0678b 100644 --- a/.github/workflows/markdown.yml +++ b/.github/workflows/markdown.yml @@ -15,6 +15,6 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 - - uses: DavidAnson/markdownlint-cli2-action@b4c9feab76d8025d1e83c653fa3990936df0e6c8 + - uses: DavidAnson/markdownlint-cli2-action@db43aef879112c3119a410d69f66701e0d530809 with: globs: "**/*.md" diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index e880a1118d..6ecd4e609c 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -18,20 +18,20 @@ jobs: steps: - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 - id: build - uses: tj-actions/changed-files@e9772d140489982e0e3704fea5ee93d536f1e275 + uses: tj-actions/changed-files@48d8f15b2aaa3d255ca5af3eba4870f807ce6b3c with: files: | .github/workflows/pr.yml justfile Dockerfile - id: actions - uses: tj-actions/changed-files@e9772d140489982e0e3704fea5ee93d536f1e275 + uses: tj-actions/changed-files@48d8f15b2aaa3d255ca5af3eba4870f807ce6b3c with: files: | .github/workflows/** .devcontainer/* - id: cargo - uses: tj-actions/changed-files@e9772d140489982e0e3704fea5ee93d536f1e275 + uses: tj-actions/changed-files@48d8f15b2aaa3d255ca5af3eba4870f807ce6b3c with: files_ignore: "Cargo.toml" files: | @@ -40,7 +40,7 @@ jobs: if: steps.cargo.outputs.any_changed == 'true' run: ./.github/list-crates.sh ${{ steps.cargo.outputs.all_changed_files }} - id: rust - uses: tj-actions/changed-files@e9772d140489982e0e3704fea5ee93d536f1e275 + uses: tj-actions/changed-files@48d8f15b2aaa3d255ca5af3eba4870f807ce6b3c with: files: | **/*.rs diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index af76725181..4329ac229b 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -84,7 +84,7 @@ jobs: if: github.event_name == 'pull_request' - id: changed if: github.event_name == 'pull_request' - uses: tj-actions/changed-files@e9772d140489982e0e3704fea5ee93d536f1e275 + uses: tj-actions/changed-files@48d8f15b2aaa3d255ca5af3eba4870f807ce6b3c with: files: | .github/workflows/release.yml diff --git a/Cargo.lock b/Cargo.lock index 745a5adc25..7164fe91ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -53,9 +53,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.86" +version = "1.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" +checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6" [[package]] name = "arbitrary" @@ -296,15 +296,15 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.7.1" +version = "1.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" +checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3" [[package]] name = "cc" -version = "1.1.18" +version = "1.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b62ac837cdb5cb22e10a256099b4fc502b1dfe560cb282963a974d7abd80e476" +checksum = "07b1695e2c7e8fc85310cde85aeaab7e3097f593c91d209d3f9df76c928100f0" dependencies = [ "jobserver", "libc", @@ -1865,6 +1865,7 @@ dependencies = [ "linkerd-http-route", "linkerd-proxy-api-resolve", "linkerd-proxy-core", + "linkerd-tls-route", "linkerd2-proxy-api", "maplit", "once_cell", @@ -2214,6 +2215,18 @@ dependencies = [ "untrusted", ] +[[package]] +name = "linkerd-tls-route" +version = "0.1.0" +dependencies = [ + "linkerd-dns", + "linkerd-tls", + "rand", + "regex", + "thiserror", + "tracing", +] + [[package]] name = "linkerd-tls-test-util" version = "0.1.0" @@ -2412,9 +2425,9 @@ checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" [[package]] name = "memmap2" -version = "0.9.4" +version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe751422e4a8caa417e13c3ea66452215d7d63e19e604f4980461212f3ae1322" +checksum = "fd3f7eed9d3848f8b98834af67102b720745c4ec028fcd0aa0239277e7de374f" dependencies = [ "libc", ] @@ -2562,9 +2575,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.19.0" +version = "1.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +checksum = "33ea5043e58958ee56f3e15a90aee535795cd7dfd319846288d93c5b57d85cbe" [[package]] name = "opencensus-proto" @@ -3000,9 +3013,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.36" +version = "0.38.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f55e80d50763938498dd5ebb18647174e0c76dc38c5505294bb224624f30f36" +checksum = "8acb788b847c24f28525660c4d7758620a7210875711f79e7f663cc152726811" dependencies = [ "bitflags 2.4.2", "errno", @@ -3205,9 +3218,9 @@ checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" [[package]] name = "symbolic-common" -version = "12.11.0" +version = "12.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c1db5ac243c7d7f8439eb3b8f0357888b37cf3732957e91383b0ad61756374e" +checksum = "9fdf97c441f18a4f92425b896a4ec7a27e03631a0b1047ec4e34e9916a9a167e" dependencies = [ "debugid", "memmap2", @@ -3217,9 +3230,9 @@ dependencies = [ [[package]] name = "symbolic-demangle" -version = "12.11.0" +version = "12.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea26e430c27d4a8a5dea4c4b81440606c7c1a415bd611451ef6af8c81416afc3" +checksum = "bc8ece6b129e97e53d1fbb3f61d33a6a9e5369b11d01228c068094d6d134eaea" dependencies = [ "cpp_demangle", "rustc-demangle", @@ -3652,15 +3665,15 @@ checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75" [[package]] name = "unicode-ident" -version = "1.0.12" +version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" +checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe" [[package]] name = "unicode-normalization" -version = "0.1.23" +version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a56d1686db2308d901306f92a263857ef59ea39678a5458e7cb17f01415101f5" +checksum = "5033c97c4262335cded6d6fc3e5c18ab755e1a3dc96376350f3d8e9f009ad956" dependencies = [ "tinyvec", ] diff --git a/Cargo.toml b/Cargo.toml index 21bbbeab2e..d20bccfbb6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,6 +71,7 @@ members = [ "linkerd/tonic-stream", "linkerd/tonic-watch", "linkerd/tls", + "linkerd/tls/route", "linkerd/tls/test-util", "linkerd/tracing", "linkerd/transport-header", diff --git a/linkerd/app/core/src/config.rs b/linkerd/app/core/src/config.rs index a5d2f45ebc..4791238dbc 100644 --- a/linkerd/app/core/src/config.rs +++ b/linkerd/app/core/src/config.rs @@ -2,7 +2,7 @@ pub use crate::exp_backoff::ExponentialBackoff; use crate::{ proxy::http::{self, h1, h2}, svc::{queue, CloneParam, ExtractParam, Param}, - transport::{DualListenAddr, Keepalive, ListenAddr}, + transport::{DualListenAddr, Keepalive, ListenAddr, UserTimeout}, }; use std::time::Duration; @@ -10,6 +10,7 @@ use std::time::Duration; pub struct ServerConfig { pub addr: DualListenAddr, pub keepalive: Keepalive, + pub user_timeout: UserTimeout, pub http2: h2::ServerParams, } @@ -18,6 +19,7 @@ pub struct ConnectConfig { pub backoff: ExponentialBackoff, pub timeout: Duration, pub keepalive: Keepalive, + pub user_timeout: UserTimeout, pub http1: h1::PoolSettings, pub http2: h2::ClientParams, } @@ -84,3 +86,9 @@ impl Param for ServerConfig { self.keepalive } } + +impl Param for ServerConfig { + fn param(&self) -> UserTimeout { + self.user_timeout + } +} diff --git a/linkerd/app/core/src/control.rs b/linkerd/app/core/src/control.rs index 856c0fedc2..92685c8527 100644 --- a/linkerd/app/core/src/control.rs +++ b/linkerd/app/core/src/control.rs @@ -124,13 +124,16 @@ impl Config { } }; - let client = svc::stack(ConnectTcp::new(self.connect.keepalive)) - .push(tls::Client::layer(identity)) - .push_connect_timeout(self.connect.timeout) - .push_map_target(|(_version, target)| target) - .push(self::client::layer(self.connect.http2)) - .push_on_service(svc::MapErr::layer_boxed()) - .into_new_service(); + let client = svc::stack(ConnectTcp::new( + self.connect.keepalive, + self.connect.user_timeout, + )) + .push(tls::Client::layer(identity)) + .push_connect_timeout(self.connect.timeout) + .push_map_target(|(_version, target)| target) + .push(self::client::layer(self.connect.http2)) + .push_on_service(svc::MapErr::layer_boxed()) + .into_new_service(); let endpoint = client // Ensure that connection is driven independently of the load diff --git a/linkerd/app/inbound/src/lib.rs b/linkerd/app/inbound/src/lib.rs index 95e03ee7e3..88ec18afb8 100644 --- a/linkerd/app/inbound/src/lib.rs +++ b/linkerd/app/inbound/src/lib.rs @@ -201,6 +201,7 @@ impl Inbound<()> { // forwarding and HTTP proxying). let ConnectConfig { ref keepalive, + ref user_timeout, ref timeout, .. } = config.proxy.connect; @@ -209,7 +210,7 @@ impl Inbound<()> { #[error("inbound connection must not target port {0}")] struct Loop(u16); - svc::stack(transport::ConnectTcp::new(*keepalive)) + svc::stack(transport::ConnectTcp::new(*keepalive, *user_timeout)) // Limits the time we wait for a connection to be established. .push_connect_timeout(*timeout) // Prevent connections that would target the inbound proxy port from looping. diff --git a/linkerd/app/inbound/src/test_util.rs b/linkerd/app/inbound/src/test_util.rs index efd962300a..fb28be8c24 100644 --- a/linkerd/app/inbound/src/test_util.rs +++ b/linkerd/app/inbound/src/test_util.rs @@ -10,7 +10,7 @@ use linkerd_app_core::{ http::{h1, h2}, tap, }, - transport::{DualListenAddr, Keepalive}, + transport::{DualListenAddr, Keepalive, UserTimeout}, ProxyRuntime, }; pub use linkerd_app_test as support; @@ -59,10 +59,12 @@ pub fn default_config() -> Config { server: config::ServerConfig { addr: DualListenAddr(([0, 0, 0, 0], 0).into(), None), keepalive: Keepalive(None), + user_timeout: UserTimeout(None), http2: h2::ServerParams::default(), }, connect: config::ConnectConfig { keepalive: Keepalive(None), + user_timeout: UserTimeout(None), timeout: Duration::from_secs(1), backoff: exp_backoff::ExponentialBackoff::try_new( Duration::from_millis(100), diff --git a/linkerd/app/integration/src/proxy.rs b/linkerd/app/integration/src/proxy.rs index 9df84451dc..fd7caa2d28 100644 --- a/linkerd/app/integration/src/proxy.rs +++ b/linkerd/app/integration/src/proxy.rs @@ -1,7 +1,9 @@ use super::*; use linkerd_app_core::{ svc::Param, - transport::{listen, orig_dst, Keepalive, ListenAddr, Local, OrigDstAddr, ServerAddr}, + transport::{ + listen, orig_dst, Keepalive, ListenAddr, Local, OrigDstAddr, ServerAddr, UserTimeout, + }, Result, }; use std::{collections::HashSet, thread}; @@ -68,7 +70,7 @@ struct MockDualOrigDst { impl listen::Bind for MockOrigDst where - T: Param + Param, + T: Param + Param + Param, { type Addrs = orig_dst::Addrs; type BoundAddrs = Local; @@ -118,7 +120,7 @@ impl fmt::Debug for MockOrigDst { impl listen::Bind for MockDualOrigDst where - T: Param + Param, + T: Param + Param + Param, { type Addrs = orig_dst::Addrs; type BoundAddrs = (Local, Option>); diff --git a/linkerd/app/outbound/src/tcp/connect.rs b/linkerd/app/outbound/src/tcp/connect.rs index 16f4944e1f..1b1aa6066a 100644 --- a/linkerd/app/outbound/src/tcp/connect.rs +++ b/linkerd/app/outbound/src/tcp/connect.rs @@ -21,7 +21,10 @@ pub struct PreventLoopback(S); impl Outbound<()> { pub fn to_tcp_connect(&self) -> Outbound> { - let connect = PreventLoopback(ConnectTcp::new(self.config.proxy.connect.keepalive)); + let connect = PreventLoopback(ConnectTcp::new( + self.config.proxy.connect.keepalive, + self.config.proxy.connect.user_timeout, + )); self.clone().with_stack(connect) } } diff --git a/linkerd/app/outbound/src/test_util.rs b/linkerd/app/outbound/src/test_util.rs index 5fe07522d6..f00bf44b86 100644 --- a/linkerd/app/outbound/src/test_util.rs +++ b/linkerd/app/outbound/src/test_util.rs @@ -7,7 +7,7 @@ use linkerd_app_core::{ http::{h1, h2}, tap, }, - transport::{DualListenAddr, Keepalive}, + transport::{DualListenAddr, Keepalive, UserTimeout}, IpMatch, IpNet, ProxyRuntime, }; pub use linkerd_app_test as support; @@ -26,10 +26,12 @@ pub(crate) fn default_config() -> Config { server: config::ServerConfig { addr: DualListenAddr(([0, 0, 0, 0], 0).into(), None), keepalive: Keepalive(None), + user_timeout: UserTimeout(None), http2: h2::ServerParams::default(), }, connect: config::ConnectConfig { keepalive: Keepalive(None), + user_timeout: UserTimeout(None), timeout: Duration::from_secs(1), backoff: exp_backoff::ExponentialBackoff::try_new( Duration::from_millis(100), diff --git a/linkerd/app/src/env.rs b/linkerd/app/src/env.rs index 290e1da0b7..49e00a88ba 100644 --- a/linkerd/app/src/env.rs +++ b/linkerd/app/src/env.rs @@ -5,7 +5,7 @@ use linkerd_app_core::{ control::{Config as ControlConfig, ControlAddr}, proxy::http::{h1, h2}, tls, - transport::{DualListenAddr, Keepalive, ListenAddr}, + transport::{DualListenAddr, Keepalive, ListenAddr, UserTimeout}, AddrMatch, Conditional, IpNet, }; use std::{ @@ -129,6 +129,12 @@ const ENV_OUTBOUND_ACCEPT_KEEPALIVE: &str = "LINKERD2_PROXY_OUTBOUND_ACCEPT_KEEP const ENV_INBOUND_CONNECT_KEEPALIVE: &str = "LINKERD2_PROXY_INBOUND_CONNECT_KEEPALIVE"; const ENV_OUTBOUND_CONNECT_KEEPALIVE: &str = "LINKERD2_PROXY_OUTBOUND_CONNECT_KEEPALIVE"; +const ENV_INBOUND_ACCEPT_USER_TIMEOUT: &str = "LINKERD2_PROXY_INBOUND_ACCEPT_USER_TIMEOUT"; +const ENV_OUTBOUND_ACCEPT_USER_TIMEOUT: &str = "LINKERD2_PROXY_OUTBOUND_ACCEPT_USER_TIMEOUT"; + +const ENV_INBOUND_CONNECT_USER_TIMEOUT: &str = "LINKERD2_PROXY_INBOUND_CONNECT_USER_TIMEOUT"; +const ENV_OUTBOUND_CONNECT_USER_TIMEOUT: &str = "LINKERD2_PROXY_OUTBOUND_CONNECT_USER_TIMEOUT"; + const ENV_INBOUND_MAX_IDLE_CONNS_PER_ENDPOINT: &str = "LINKERD2_PROXY_MAX_IDLE_CONNS_PER_ENDPOINT"; const ENV_OUTBOUND_MAX_IDLE_CONNS_PER_ENDPOINT: &str = "LINKERD2_PROXY_OUTBOUND_MAX_IDLE_CONNS_PER_ENDPOINT"; @@ -374,6 +380,16 @@ pub fn parse_config(strings: &S) -> Result let inbound_connect_keepalive = parse(strings, ENV_INBOUND_CONNECT_KEEPALIVE, parse_duration); let outbound_connect_keepalive = parse(strings, ENV_OUTBOUND_CONNECT_KEEPALIVE, parse_duration); + let inbound_accept_user_timeout = + parse(strings, ENV_INBOUND_ACCEPT_USER_TIMEOUT, parse_duration); + let outbound_accept_user_timeout = + parse(strings, ENV_OUTBOUND_ACCEPT_USER_TIMEOUT, parse_duration); + + let inbound_connect_user_timeout = + parse(strings, ENV_INBOUND_CONNECT_USER_TIMEOUT, parse_duration); + let outbound_connect_user_timeout = + parse(strings, ENV_OUTBOUND_CONNECT_USER_TIMEOUT, parse_duration); + let shutdown_grace_period = parse(strings, ENV_SHUTDOWN_GRACE_PERIOD, parse_duration); let inbound_discovery_idle_timeout = @@ -477,9 +493,11 @@ pub fn parse_config(strings: &S) -> Result }; let keepalive = Keepalive(outbound_accept_keepalive?); + let user_timeout = UserTimeout(outbound_accept_user_timeout?); let server = ServerConfig { addr, keepalive, + user_timeout, http2: http2::parse_server(strings, "LINKERD2_PROXY_OUTBOUND_SERVER_HTTP2")?, }; let discovery_idle_timeout = @@ -487,6 +505,7 @@ pub fn parse_config(strings: &S) -> Result let max_idle = outbound_max_idle_per_endpoint?.unwrap_or(DEFAULT_OUTBOUND_MAX_IDLE_CONNS_PER_ENDPOINT); let keepalive = Keepalive(outbound_connect_keepalive?); + let user_timeout = UserTimeout(outbound_connect_user_timeout?); let connection_pool_timeout = parse( strings, ENV_OUTBOUND_HTTP1_CONNECTION_POOL_IDLE_TIMEOUT, @@ -495,6 +514,7 @@ pub fn parse_config(strings: &S) -> Result let connect = ConnectConfig { keepalive, + user_timeout, timeout: outbound_connect_timeout?.unwrap_or(DEFAULT_OUTBOUND_CONNECT_TIMEOUT), backoff: parse_backoff( strings, @@ -565,9 +585,11 @@ pub fn parse_config(strings: &S) -> Result None, ); let keepalive = Keepalive(inbound_accept_keepalive?); + let user_timeout = UserTimeout(inbound_accept_user_timeout?); let server = ServerConfig { addr, keepalive, + user_timeout, http2: http2::parse_server(strings, "LINKERD2_PROXY_INBOUND_SERVER_HTTP2")?, }; let discovery_idle_timeout = @@ -581,8 +603,10 @@ pub fn parse_config(strings: &S) -> Result )? .unwrap_or(DEFAULT_INBOUND_HTTP1_CONNECTION_POOL_IDLE_TIMEOUT); let keepalive = Keepalive(inbound_connect_keepalive?); + let user_timeout = UserTimeout(inbound_connect_user_timeout?); let connect = ConnectConfig { keepalive, + user_timeout, timeout: inbound_connect_timeout?.unwrap_or(DEFAULT_INBOUND_CONNECT_TIMEOUT), backoff: parse_backoff( strings, @@ -769,6 +793,7 @@ pub fn parse_config(strings: &S) -> Result server: ServerConfig { addr: DualListenAddr(admin_listener_addr, None), keepalive: inbound.proxy.server.keepalive, + user_timeout: inbound.proxy.server.user_timeout, http2: inbound.proxy.server.http2.clone(), }, @@ -829,6 +854,7 @@ pub fn parse_config(strings: &S) -> Result config: ServerConfig { addr: DualListenAddr(addr, None), keepalive: inbound.proxy.server.keepalive, + user_timeout: inbound.proxy.server.user_timeout, http2: inbound.proxy.server.http2.clone(), }, }) diff --git a/linkerd/meshtls/tests/util.rs b/linkerd/meshtls/tests/util.rs index c294ecfecc..7b7d6592fa 100644 --- a/linkerd/meshtls/tests/util.rs +++ b/linkerd/meshtls/tests/util.rs @@ -11,7 +11,7 @@ use linkerd_meshtls as meshtls; use linkerd_proxy_transport::{ addrs::*, listen::{Addrs, Bind, BindTcp}, - ConnectTcp, Keepalive, + ConnectTcp, Keepalive, UserTimeout, }; use linkerd_stack::{ layer::Layer, service_fn, ExtractParam, InsertParam, NewService, Param, ServiceExt, @@ -283,7 +283,7 @@ where let tls = Some(client_server_id.clone()); let client = async move { let conn = tls::Client::layer(client_tls) - .layer(ConnectTcp::new(Keepalive(None))) + .layer(ConnectTcp::new(Keepalive(None), UserTimeout(None))) .oneshot(Target(server_addr.into(), client_server_id)) .await; match conn { @@ -406,6 +406,11 @@ impl Param for Server { Keepalive(None) } } +impl Param for Server { + fn param(&self) -> UserTimeout { + UserTimeout(None) + } +} // === impl ServerParams === diff --git a/linkerd/proxy/client-policy/Cargo.toml b/linkerd/proxy/client-policy/Cargo.toml index 9e6f5042d0..63bf7ab0e5 100644 --- a/linkerd/proxy/client-policy/Cargo.toml +++ b/linkerd/proxy/client-policy/Cargo.toml @@ -26,6 +26,7 @@ thiserror = { version = "1", optional = true } linkerd-error = { path = "../../error" } linkerd-exp-backoff = { path = "../../exp-backoff" } linkerd-http-route = { path = "../../http/route" } +linkerd-tls-route = { path = "../../tls/route" } linkerd-proxy-api-resolve = { path = "../api-resolve" } linkerd-proxy-core = { path = "../core" } diff --git a/linkerd/proxy/client-policy/src/lib.rs b/linkerd/proxy/client-policy/src/lib.rs index 8835eed33d..5cbde717a5 100644 --- a/linkerd/proxy/client-policy/src/lib.rs +++ b/linkerd/proxy/client-policy/src/lib.rs @@ -7,6 +7,7 @@ use std::{borrow::Cow, fmt, hash::Hash, net::SocketAddr, num::NonZeroU16, sync:: pub mod grpc; pub mod http; pub mod opaq; +pub mod tls; pub use linkerd_http_route as route; pub use linkerd_proxy_api_resolve::Metadata as EndpointMetadata; @@ -34,8 +35,7 @@ pub enum Protocol { Opaque(opaq::Opaque), - // TODO(ver) TLS-aware type - Tls(opaq::Opaque), + Tls(tls::Tls), } #[derive(Clone, Debug, Eq)] @@ -497,7 +497,10 @@ pub mod proto { | Protocol::Http2(http::Http2 { ref routes, .. }) => { http::proto::fill_route_backends(routes, &mut backends); } - Protocol::Opaque(ref p) | Protocol::Tls(ref p) => { + Protocol::Opaque(ref p) => { + p.fill_backends(&mut backends); + } + Protocol::Tls(ref p) => { p.fill_backends(&mut backends); } Protocol::Grpc(ref p) => { diff --git a/linkerd/proxy/client-policy/src/tls.rs b/linkerd/proxy/client-policy/src/tls.rs new file mode 100644 index 0000000000..cafc61445d --- /dev/null +++ b/linkerd/proxy/client-policy/src/tls.rs @@ -0,0 +1,55 @@ +use linkerd_tls_route as tls; +use std::sync::Arc; + +pub use linkerd_tls_route::{find, sni, RouteMatch}; + +pub type Policy = crate::RoutePolicy; +pub type Route = tls::Route; +pub type Rule = tls::Rule; + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct Tls { + pub routes: Arc<[Route]>, +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub enum Filter {} + +pub fn default(distribution: crate::RouteDistribution) -> Route { + Route { + snis: vec![], + rules: vec![Rule { + matches: vec![], + policy: Policy { + meta: crate::Meta::new_default("default"), + filters: Arc::new([]), + params: (), + distribution, + }, + }], + } +} + +impl Default for Tls { + fn default() -> Self { + Self { + routes: Arc::new([]), + } + } +} + +#[cfg(feature = "proto")] +pub mod proto { + use super::*; + use crate::proto::BackendSet; + + impl Tls { + pub fn fill_backends(&self, set: &mut BackendSet) { + for Route { ref rules, .. } in &*self.routes { + for Rule { ref policy, .. } in rules { + policy.distribution.fill_backends(set); + } + } + } + } +} diff --git a/linkerd/proxy/transport/src/connect.rs b/linkerd/proxy/transport/src/connect.rs index 4989ac2129..d11b3753c6 100644 --- a/linkerd/proxy/transport/src/connect.rs +++ b/linkerd/proxy/transport/src/connect.rs @@ -1,4 +1,4 @@ -use crate::{ClientAddr, Keepalive, Local, Remote, ServerAddr}; +use crate::{ClientAddr, Keepalive, Local, Remote, ServerAddr, UserTimeout}; use linkerd_io as io; use linkerd_stack::{Param, Service}; use std::{ @@ -12,11 +12,15 @@ use tracing::debug; #[derive(Copy, Clone, Debug)] pub struct ConnectTcp { keepalive: Keepalive, + user_timeout: UserTimeout, } impl ConnectTcp { - pub fn new(keepalive: Keepalive) -> Self { - Self { keepalive } + pub fn new(keepalive: Keepalive, user_timeout: UserTimeout) -> Self { + Self { + keepalive, + user_timeout, + } } } @@ -31,12 +35,14 @@ impl>> Service for ConnectTcp { fn call(&mut self, t: T) -> Self::Future { let Keepalive(keepalive) = self.keepalive; + let UserTimeout(user_timeout) = self.user_timeout; let Remote(ServerAddr(addr)) = t.param(); debug!(server.addr = %addr, "Connecting"); Box::pin(async move { let io = TcpStream::connect(&addr).await?; super::set_nodelay_or_warn(&io); let io = super::set_keepalive_or_warn(io, keepalive)?; + let io = super::set_user_timeout_or_warn(io, user_timeout)?; let local_addr = io.local_addr()?; debug!( local.addr = %local_addr, diff --git a/linkerd/proxy/transport/src/lib.rs b/linkerd/proxy/transport/src/lib.rs index 655c554ccc..b23bceadf7 100644 --- a/linkerd/proxy/transport/src/lib.rs +++ b/linkerd/proxy/transport/src/lib.rs @@ -36,6 +36,15 @@ impl From for Option { } } +#[derive(Copy, Clone, Debug, Default)] +pub struct UserTimeout(pub Option); + +impl From for Option { + fn from(UserTimeout(duration): UserTimeout) -> Option { + duration + } +} + // Misc. fn set_nodelay_or_warn(socket: &TcpStream) { @@ -61,3 +70,18 @@ fn set_keepalive_or_warn( let stream: std::net::TcpStream = socket2::Socket::into(sock); tokio::net::TcpStream::from_std(stream) } + +fn set_user_timeout_or_warn( + tcp: TcpStream, + user_timeout: Option, +) -> io::Result { + let sock = { + let stream = tokio::net::TcpStream::into_std(tcp)?; + socket2::Socket::from(stream) + }; + if let Err(e) = sock.set_tcp_user_timeout(user_timeout) { + tracing::warn!("failed to set user timeout: {}", e); + } + let stream: std::net::TcpStream = socket2::Socket::into(sock); + tokio::net::TcpStream::from_std(stream) +} diff --git a/linkerd/proxy/transport/src/listen.rs b/linkerd/proxy/transport/src/listen.rs index 1109a7f913..fe2031a38b 100644 --- a/linkerd/proxy/transport/src/listen.rs +++ b/linkerd/proxy/transport/src/listen.rs @@ -1,6 +1,6 @@ mod dual_bind; -use crate::{addrs::*, Keepalive}; +use crate::{addrs::*, Keepalive, UserTimeout}; use dual_bind::DualBind; use futures::prelude::*; use linkerd_error::Result; @@ -49,6 +49,10 @@ struct AcceptError(#[source] io::Error); #[error("failed to set TCP keepalive: {0}")] struct KeepaliveError(#[source] io::Error); +#[derive(Debug, Error)] +#[error("failed to set TCP User Timeout: {0}")] +struct UserTimeoutError(#[source] io::Error); + #[derive(Debug, Error)] #[error("failed to obtain peer address: {0}")] struct PeerAddrError(#[source] io::Error); @@ -67,7 +71,7 @@ impl BindTcp { impl Bind for BindTcp where - T: Param + Param, + T: Param + Param + Param, { type Addrs = Addrs; type BoundAddrs = Local; @@ -84,10 +88,13 @@ where }; let server = Local(ServerAddr(listen.local_addr()?)); let Keepalive(keepalive) = params.param(); + let UserTimeout(user_timeout) = params.param(); let accept = TcpListenerStream::new(listen).map(move |res| { let tcp = res.map_err(AcceptError)?; super::set_nodelay_or_warn(&tcp); let tcp = super::set_keepalive_or_warn(tcp, keepalive).map_err(KeepaliveError)?; + let tcp = + super::set_user_timeout_or_warn(tcp, user_timeout).map_err(UserTimeoutError)?; fn ipv4_mapped(orig: SocketAddr) -> SocketAddr { if let SocketAddr::V6(v6) = orig { diff --git a/linkerd/proxy/transport/src/listen/dual_bind.rs b/linkerd/proxy/transport/src/listen/dual_bind.rs index 6f7d238e28..f9dde45625 100644 --- a/linkerd/proxy/transport/src/listen/dual_bind.rs +++ b/linkerd/proxy/transport/src/listen/dual_bind.rs @@ -1,4 +1,4 @@ -use crate::{addrs::DualListenAddr, listen::Bind, Keepalive, ListenAddr}; +use crate::{addrs::DualListenAddr, listen::Bind, Keepalive, ListenAddr, UserTimeout}; use futures::Stream; use linkerd_error::Result; use linkerd_stack::Param; @@ -26,7 +26,7 @@ impl From for DualBind { impl Bind for DualBind where - T: Param + Param + Clone, + T: Param + Param + Param + Clone, B: Bind, Io = TcpStream> + Clone + 'static, { type Addrs = B::Addrs; @@ -62,6 +62,12 @@ impl> Param for Listen { } } +impl> Param for Listen { + fn param(&self) -> UserTimeout { + self.parent.param() + } +} + impl Param for Listen { fn param(&self) -> ListenAddr { ListenAddr(self.addr) diff --git a/linkerd/service-profiles/Cargo.toml b/linkerd/service-profiles/Cargo.toml index 4baef313ac..89311dc73a 100644 --- a/linkerd/service-profiles/Cargo.toml +++ b/linkerd/service-profiles/Cargo.toml @@ -15,7 +15,7 @@ futures = { version = "0.3", default-features = false } http = "0.2" http-body = "0.4" linkerd2-proxy-api = { workspace = true, features = ["destination"] } -once_cell = "1.17" +once_cell = "1.20" prost-types = "0.12" regex = "1" tokio = { version = "1", features = ["macros", "rt", "sync", "time"] } diff --git a/linkerd/tls/route/Cargo.toml b/linkerd/tls/route/Cargo.toml new file mode 100644 index 0000000000..278e3d8c4d --- /dev/null +++ b/linkerd/tls/route/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "linkerd-tls-route" +version = "0.1.0" +license = "Apache-2.0" +edition = "2021" +publish = false + +[dependencies] +regex = "1" +rand = "0.8" +thiserror = "1" +tracing = "0.1" +linkerd-tls = { path = "../" } +linkerd-dns = { path = "../../dns" } diff --git a/linkerd/tls/route/src/lib.rs b/linkerd/tls/route/src/lib.rs new file mode 100644 index 0000000000..e93493bc4d --- /dev/null +++ b/linkerd/tls/route/src/lib.rs @@ -0,0 +1,106 @@ +//! An TLS route matching library for Linkerd to support the TLSRoute +//! Kubernetes Gateway API types. + +#![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)] +#![forbid(unsafe_code)] + +use linkerd_tls::ServerName; +use r#match::SessionMatch; +use tracing::trace; + +pub mod r#match; +pub mod sni; +#[cfg(test)] +mod tests; + +pub use self::sni::{InvalidSni, MatchSni, SniMatch}; + +/// Groups routing rules under a common set of SNIs. +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct Route

{ + /// A list of SNIs that this route applies to, to be matched against, + /// + /// If at least one match is specified, any match may apply for rules to applied. + /// When no SNI matches are present, all SNIs match. + pub snis: Vec, + + /// Must not be empty. + pub rules: Vec>, +} + +/// Policies for a given set of route matches. +#[derive(Clone, Debug, Default, Hash, PartialEq, Eq)] +pub struct Rule

{ + /// A list of session matchers, *any* of which may apply. + /// + /// The "best" match is used when comparing rules. + pub matches: Vec, + + /// The policy to apply to sessions matched by this rule. + pub policy: P, +} + +/// Summarizes a matched route so that route matches may be compared/ordered. A +/// greater match is preferred over a lesser match. +#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Default)] +pub struct RouteMatch { + sni: Option, + route: r#match::SessionMatch, +} + +/// Provides metadata information about a TLS session. For now this contains +/// only the SNI value but further down the line, we could add more metadata +/// if want to support more advanced routing scenarios. +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct SessionInfo { + pub sni: ServerName, +} + +pub fn find

(routes: &[Route

], session_info: SessionInfo) -> Option<(RouteMatch, &P)> { + trace!(routes = ?routes.len(), "Finding matching route"); + + best(routes.iter().filter_map(|rt| { + trace!(snis = ?rt.snis); + let sni = if rt.snis.is_empty() { + None + } else { + let session_sni = &session_info.sni; + trace!(%session_sni, "matching sni"); + let sni_match = rt + .snis + .iter() + .filter_map(|a| a.summarize_match(session_sni)) + .max()?; + Some(sni_match) + }; + + trace!(rules = %rt.rules.len()); + let (route, policy) = best(rt.rules.iter().filter_map(|rule| { + // If there are no matches in the list, then the rule has an + // implicit default match. + if rule.matches.is_empty() { + trace!("implicit match"); + return Some((SessionMatch::default(), &rule.policy)); + } + // Find the best match to compare against other rules/routes + // (if any apply). The order/precedence of matches is not + // relevant. + let summary = rule + .matches + .iter() + .filter_map(|m| m.match_session(&session_info)) + .max()?; + trace!("matches!"); + Some((summary, &rule.policy)) + }))?; + + Some((RouteMatch { sni, route }, policy)) + })) +} + +#[inline] +fn best(matches: impl Iterator) -> Option<(M, P)> { + // This is roughly equivalent to `max_by(...)` but we want to ensure + // that the first match wins. + matches.reduce(|(m0, p0), (m1, p1)| if m0 >= m1 { (m0, p0) } else { (m1, p1) }) +} diff --git a/linkerd/tls/route/src/match.rs b/linkerd/tls/route/src/match.rs new file mode 100644 index 0000000000..e29672ab3e --- /dev/null +++ b/linkerd/tls/route/src/match.rs @@ -0,0 +1,29 @@ +use std::cmp::Ordering; + +use crate::SessionInfo; + +/// Matches TLS sessions. For now, this is a placeholder +#[derive(Clone, Debug, Default, Hash, PartialEq, Eq)] +pub struct MatchSession(()); + +/// Summarizes a matched TLS session. For now this is a placeholder +#[derive(Clone, Debug, Hash, PartialEq, Eq, Default)] +pub struct SessionMatch(()); + +impl MatchSession { + pub(crate) fn match_session(&self, _: &SessionInfo) -> Option { + Some(SessionMatch::default()) + } +} + +impl std::cmp::PartialOrd for SessionMatch { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl std::cmp::Ord for SessionMatch { + fn cmp(&self, _: &Self) -> std::cmp::Ordering { + Ordering::Equal + } +} diff --git a/linkerd/tls/route/src/sni.rs b/linkerd/tls/route/src/sni.rs new file mode 100644 index 0000000000..099c2d7e0e --- /dev/null +++ b/linkerd/tls/route/src/sni.rs @@ -0,0 +1,191 @@ +use linkerd_dns as dns; +use linkerd_tls::ServerName; + +/// Defines a way to match against SNI attributes of the TLS ClientHello +/// message in a TLS handshake. The SNI value being matched is the equivalent +/// of a hostname (as defined in RFC 1123) with 2 notable exceptions: +/// +/// 1. IPs are not allowed in SNI names per RFC 6066. +/// 2. A hostname may be prefixed with a wildcard label (`*.`). The wildcard +/// label must appear by itself as the first label. +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub enum MatchSni { + Exact(String), + + /// Tokenized reverse list of DNS name suffix labels. + /// + /// For example: the match `*.example.com` is stored as `["com", + /// "example"]`. + Suffix(Vec), +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub enum SniMatch { + Exact(usize), + Suffix(usize), +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq, thiserror::Error)] +pub enum InvalidSni { + #[error("invalid sni: {0}")] + Invalid(#[from] dns::InvalidName), +} + +// === impl MatchSni === + +impl std::str::FromStr for MatchSni { + type Err = InvalidSni; + + fn from_str(sni: &str) -> Result { + if let Some(sni) = sni.strip_prefix("*.") { + return Ok(Self::Suffix( + sni.split('.').map(|s| s.to_string()).rev().collect(), + )); + } + + Ok(Self::Exact(sni.to_string())) + } +} + +impl MatchSni { + pub fn summarize_match(&self, sni: &ServerName) -> Option { + let mut sni = sni.as_str(); + + match self { + Self::Exact(h) => { + if !h.ends_with('.') { + sni = sni.strip_suffix('.').unwrap_or(sni); + } + if h == sni { + Some(SniMatch::Exact(h.len())) + } else { + None + } + } + + Self::Suffix(suffix) => { + if suffix.first().map(|s| &**s) != Some("") { + sni = sni.strip_suffix('.').unwrap_or(sni); + } + let mut length = 0; + for sfx in suffix.iter() { + sni = sni.strip_suffix(sfx)?; + sni = sni.strip_suffix('.')?; + length += sfx.len() + 1; + } + + Some(SniMatch::Suffix(length)) + } + } + } +} + +// === impl SniMatch === + +impl std::cmp::PartialOrd for SniMatch { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl std::cmp::Ord for SniMatch { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + use std::cmp::Ordering; + match (self, other) { + (Self::Exact(l), Self::Exact(r)) => l.cmp(r), + (Self::Suffix(l), Self::Suffix(r)) => l.cmp(r), + (Self::Exact(_), Self::Suffix(_)) => Ordering::Greater, + (Self::Suffix(_), Self::Exact(_)) => Ordering::Less, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn exact() { + let m = "example.com" + .parse::() + .expect("example.com parses"); + assert_eq!(m, MatchSni::Exact("example.com".to_string())); + assert_eq!( + m.summarize_match(&"example.com".parse().unwrap()), + Some(SniMatch::Exact("example.com".len())) + ); + assert_eq!( + m.summarize_match(&"example.com.".parse().unwrap()), + Some(SniMatch::Exact("example.com".len())) + ); + assert_eq!(m.summarize_match(&"foo.example.com".parse().unwrap()), None); + + let m = "example.com." + .parse::() + .expect("example.com parses"); + assert_eq!(m, MatchSni::Exact("example.com.".to_string())); + assert_eq!(m.summarize_match(&"example.com".parse().unwrap()), None,); + assert_eq!( + m.summarize_match(&"example.com.".parse().unwrap()), + Some(SniMatch::Exact("example.com.".len())) + ); + } + + #[test] + fn suffix() { + let m = "*.example.com" + .parse::() + .expect("*.example.com parses"); + assert_eq!( + m, + MatchSni::Suffix(vec!["com".to_string(), "example".to_string()]) + ); + + assert_eq!(m.summarize_match(&"example.com".parse().unwrap()), None); + assert_eq!( + m.summarize_match(&"foo.example.com".parse().unwrap()), + Some(SniMatch::Suffix(".example.com".len())) + ); + assert_eq!( + m.summarize_match(&"foo.example.com".parse().unwrap()), + Some(SniMatch::Suffix(".example.com".len())) + ); + assert_eq!( + m.summarize_match(&"bar.foo.example.com".parse().unwrap()), + Some(SniMatch::Suffix(".example.com".len())) + ); + + let m = "*.example.com." + .parse::() + .expect("*.example.com. parses"); + assert_eq!( + m, + MatchSni::Suffix(vec![ + "".to_string(), + "com".to_string(), + "example".to_string() + ]) + ); + assert_eq!( + m.summarize_match(&"bar.foo.example.com".parse().unwrap()), + None + ); + assert_eq!( + m.summarize_match(&"bar.foo.example.com.".parse().unwrap()), + Some(SniMatch::Suffix(".example.com.".len())) + ); + } + + #[test] + fn cmp() { + assert!(SniMatch::Exact("example.com".len()) > SniMatch::Suffix(".example.com".len())); + assert!(SniMatch::Exact("foo.example.com".len()) > SniMatch::Exact("example.com".len())); + assert!( + SniMatch::Suffix(".foo.example.com".len()) > SniMatch::Suffix(".example.com".len()) + ); + assert_eq!( + SniMatch::Suffix(".foo.example.com".len()), + SniMatch::Suffix(".bar.example.com".len()) + ); + } +} diff --git a/linkerd/tls/route/src/tests.rs b/linkerd/tls/route/src/tests.rs new file mode 100644 index 0000000000..193df9c3d0 --- /dev/null +++ b/linkerd/tls/route/src/tests.rs @@ -0,0 +1,115 @@ +use super::*; + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum Policy { + Expected, + Unexpected, +} + +impl Default for Policy { + fn default() -> Self { + Self::Unexpected + } +} + +/// Given two equivalent routes, choose the explicit sni match and not +/// the wildcard. +#[test] +fn sni_precedence() { + let rts = vec![ + Route { + snis: vec!["*.example.com".parse().unwrap()], + rules: vec![Rule { + policy: Policy::Unexpected, + matches: vec![], + }], + }, + Route { + snis: vec!["foo.example.com".parse().unwrap()], + rules: vec![Rule { + policy: Policy::Expected, + matches: vec![], + }], + }, + ]; + + let si = SessionInfo { + sni: "foo.example.com".parse().expect("must parse"), + }; + + let (_, policy) = find(&rts, si).expect("must match"); + assert_eq!(*policy, Policy::Expected, "incorrect rule matched"); +} + +#[test] +fn first_identical_wins() { + let rts = vec![ + Route { + rules: vec![ + Rule { + policy: Policy::Expected, + matches: vec![], + }, + // Redundant rule. + Rule::default(), + ], + snis: vec![], + }, + // Redundant route. + Route { + rules: vec![Rule::default()], + snis: vec![], + }, + ]; + + let si = SessionInfo { + sni: "api.github.io".parse().expect("must parse"), + }; + + let (_, policy) = find(&rts, si).expect("must match"); + assert_eq!(*policy, Policy::Expected, "incorrect rule matched"); +} + +#[test] +fn no_match_suffix() { + let rts = vec![Route { + snis: vec!["*.test.example.com".parse().unwrap()], + rules: vec![Rule { + policy: Policy::Unexpected, + matches: vec![], + }], + }]; + + let si = SessionInfo { + sni: "test.example.com".parse().expect("must parse"), + }; + + assert!(find(&rts, si).is_none(), "should have no matches"); +} + +#[test] +fn no_match_exact() { + let rts = vec![Route { + snis: vec!["test.example.com".parse().unwrap()], + rules: vec![Rule { + policy: Policy::Unexpected, + matches: vec![], + }], + }]; + + let si = SessionInfo { + sni: "fest.example.com".parse().expect("must parse"), + }; + + assert!(find(&rts, si).is_none(), "should have no matches"); +} + +#[test] +fn no_routes_no_match() { + let rts: Vec> = Vec::default(); + let si = SessionInfo { + sni: "fest.example.com".parse().expect("must parse"), + }; + + assert!(find(&rts, si).is_none(), "should have no matches"); +}