From b0f0a307e2f5a3773de6b85edbb82065bd461294 Mon Sep 17 00:00:00 2001 From: Diva M Date: Tue, 2 Jul 2024 11:40:17 -0500 Subject: [PATCH] feat: QUIC Address discovery extension add frame types transport parameter encoding and decoding - to my best understanding fix typo add some utility functions add initial observed address frames add frame encoding and decoding adjust stats minimal debugging for received observed addr simplify setting extension in transport parameter rework frame structure and send observed addr frames with path challenge ones tweak example to start testing fix encoding, send with handshake clippy fix docs reject observed addr frames when not negotiated replace request_id with seq_no according to new spec replace code point for transport parameter replace code point for frames remove sending observed address frame in handshake in server side treat as probing frame in payload processing ack is already managed by is_ack_eliciting send with path_response as well add frame to retransmits and ignore old frames send observed addr at least once per path fmt reword comment remove trailing whites keep observed address reports per path remove addressed TODO small improvement in readability add retransmission with fresh info retransmit just once fix should send logic add observed addr event surface the info restore trace level of frames some extra logs rename roles and var improve error msg assuming the default as disabled is ok, remove comment use safe arithmetic with varints for the seq_no move transport param code to method instead of From impl fix example, finally remove excesive log add helper fn carry old report into new path generate notification only on changed values downgrade log add sending test add resumption test on the acceptance case add resumption test on the rejection case spelling actual spelling and undo debug change dumb lints some spelling and formatting add retransmission test make a bit more readable update hexas make naming consistent, add test check docs for consistency --- quinn-proto/src/address_discovery.rs | 128 +++++++++++ quinn-proto/src/config.rs | 28 +++ quinn-proto/src/connection/mod.rs | 126 ++++++++++- quinn-proto/src/connection/paths.rs | 48 ++++- quinn-proto/src/connection/spaces.rs | 3 + quinn-proto/src/connection/stats.rs | 2 + quinn-proto/src/frame.rs | 114 ++++++++++ quinn-proto/src/lib.rs | 2 + quinn-proto/src/tests/mod.rs | 274 ++++++++++++++++++++++++ quinn-proto/src/transport_parameters.rs | 30 +++ quinn-proto/src/varint.rs | 24 +++ quinn/Cargo.toml | 1 + quinn/examples/client.rs | 21 +- quinn/examples/server.rs | 20 +- quinn/src/connection.rs | 17 +- 15 files changed, 828 insertions(+), 10 deletions(-) create mode 100644 quinn-proto/src/address_discovery.rs diff --git a/quinn-proto/src/address_discovery.rs b/quinn-proto/src/address_discovery.rs new file mode 100644 index 000000000..85b31b667 --- /dev/null +++ b/quinn-proto/src/address_discovery.rs @@ -0,0 +1,128 @@ +//! Address discovery types from +//! + +use crate::VarInt; + +pub(crate) const TRANSPORT_PARAMETER_CODE: u64 = 0x9f81a176; + +/// The role of each participant. +/// +/// When enabled, this is reported as a transport parameter. +#[derive(PartialEq, Eq, Clone, Copy, Debug, Default)] +pub(crate) enum Role { + /// Is able to report observer addresses to other peers, but it's not interested in receiving + /// reports about its own address. + SendOnly, + /// Is interested on reports about its own observed address, but will not report back to other + /// peers. + ReceiveOnly, + /// Will both report and receive reports of observed addresses. + Both, + /// Address discovery is disabled. + #[default] + Disabled, +} + +impl TryFrom for Role { + type Error = crate::transport_parameters::Error; + + fn try_from(value: VarInt) -> Result { + match value.0 { + 0 => Ok(Self::SendOnly), + 1 => Ok(Self::ReceiveOnly), + 2 => Ok(Self::Both), + _ => Err(crate::transport_parameters::Error::IllegalValue), + } + } +} + +impl Role { + /// Whether address discovery is disabled. + pub(crate) fn is_disabled(&self) -> bool { + matches!(self, Self::Disabled) + } + + /// Whether this peer's role allows for address reporting to other peers. + fn is_reporter(&self) -> bool { + matches!(self, Self::SendOnly | Self::Both) + } + + /// Whether this peer's role accepts observed address reports. + fn receives_reports(&self) -> bool { + matches!(self, Self::ReceiveOnly | Self::Both) + } + + /// Whether this peer should report observed addresses to the other peer. + pub(crate) fn should_report(&self, other: &Self) -> bool { + self.is_reporter() && other.receives_reports() + } + + /// Sets whether this peer should provide observed addresses to other peers. + pub(crate) fn send_reports_to_peers(&mut self, provide: bool) { + if provide { + self.enable_sending_reports_to_peers() + } else { + self.disable_sending_reports_to_peers() + } + } + + /// Enables sending reports of observed addresses to other peers. + fn enable_sending_reports_to_peers(&mut self) { + match self { + Self::SendOnly => {} // already enabled + Self::ReceiveOnly => *self = Self::Both, + Self::Both => {} // already enabled + Self::Disabled => *self = Self::SendOnly, + } + } + + /// Disables sending reports of observed addresses to other peers. + fn disable_sending_reports_to_peers(&mut self) { + match self { + Self::SendOnly => *self = Self::Disabled, + Self::ReceiveOnly => {} // already disabled + Self::Both => *self = Self::ReceiveOnly, + Self::Disabled => {} // already disabled + } + } + + /// Sets whether this peer should accept received reports of observed addresses from other + /// peers. + pub(crate) fn receive_reports_from_peers(&mut self, receive: bool) { + if receive { + self.enable_receiving_reports_from_peers() + } else { + self.disable_receiving_reports_from_peers() + } + } + + /// Enables receiving reports of observed addresses from other peers. + fn enable_receiving_reports_from_peers(&mut self) { + match self { + Self::SendOnly => *self = Self::Both, + Self::ReceiveOnly => {} // already enabled + Self::Both => {} // already enabled + Self::Disabled => *self = Self::ReceiveOnly, + } + } + + /// Disables receiving reports of observed addresses from other peers. + fn disable_receiving_reports_from_peers(&mut self) { + match self { + Self::SendOnly => {} // already disabled + Self::ReceiveOnly => *self = Self::Disabled, + Self::Both => *self = Self::SendOnly, + Self::Disabled => {} // already disabled + } + } + + /// Gives the [`VarInt`] representing this [`Role`] as a transport parameter. + pub(crate) fn as_transport_parameter(&self) -> Option { + match self { + Self::SendOnly => Some(VarInt(0)), + Self::ReceiveOnly => Some(VarInt(1)), + Self::Both => Some(VarInt(2)), + Self::Disabled => None, + } + } +} diff --git a/quinn-proto/src/config.rs b/quinn-proto/src/config.rs index 571f998dd..5dc0204fe 100644 --- a/quinn-proto/src/config.rs +++ b/quinn-proto/src/config.rs @@ -14,6 +14,7 @@ use thiserror::Error; #[cfg(any(feature = "rustls-aws-lc-rs", feature = "rustls-ring"))] use crate::crypto::rustls::{configured_provider, QuicServerConfig}; use crate::{ + address_discovery, cid_generator::{ConnectionIdGenerator, HashedConnectionIdGenerator}, congestion, crypto::{self, HandshakeTokenKey, HmacKey}, @@ -63,6 +64,8 @@ pub struct TransportConfig { pub(crate) congestion_controller_factory: Arc, pub(crate) enable_segmentation_offload: bool, + + pub(crate) address_discovery_role: crate::address_discovery::Role, } impl TransportConfig { @@ -334,6 +337,27 @@ impl TransportConfig { self.enable_segmentation_offload = enabled; self } + + /// Whether to send observed address reports to peers. + /// + /// This will aid peers in inferring their reachable address, which in most NATd networks + /// will not be easily available to them. + pub fn send_observed_address_reports(&mut self, enabled: bool) -> &mut Self { + self.address_discovery_role.send_reports_to_peers(enabled); + self + } + + /// Whether to receive observed address reports from other peers. + /// + /// Peers with the address discovery extension enabled that are willing to provide observed + /// address reports will do so if this transport parameter is set. In general, observed address + /// reports cannot be trusted. This, however, can aid the current endpoint in inferring its + /// reachable address, which in most NATd networks will not be easily available. + pub fn receive_observed_address_reports(&mut self, enabled: bool) -> &mut Self { + self.address_discovery_role + .receive_reports_from_peers(enabled); + self + } } impl Default for TransportConfig { @@ -374,6 +398,8 @@ impl Default for TransportConfig { congestion_controller_factory: Arc::new(congestion::CubicConfig::default()), enable_segmentation_offload: true, + + address_discovery_role: address_discovery::Role::default(), } } } @@ -405,6 +431,7 @@ impl fmt::Debug for TransportConfig { deterministic_packet_numbers: _, congestion_controller_factory: _, enable_segmentation_offload, + address_discovery_role, } = self; fmt.debug_struct("TransportConfig") .field("max_concurrent_bidi_streams", max_concurrent_bidi_streams) @@ -432,6 +459,7 @@ impl fmt::Debug for TransportConfig { .field("datagram_send_buffer_size", datagram_send_buffer_size) .field("congestion_controller_factory", &"[ opaque ]") .field("enable_segmentation_offload", enable_segmentation_offload) + .field("address_discovery_role", address_discovery_role) .finish() } } diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 6986d6314..aa1521b56 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -19,7 +19,7 @@ use crate::{ coding::BufMutExt, config::{ServerConfig, TransportConfig}, crypto::{self, KeyPair, Keys, PacketKey}, - frame::{self, Close, Datagram, FrameStruct}, + frame::{self, Close, Datagram, FrameStruct, ObservedAddr}, packet::{ FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet, PacketNumber, PartialDecode, SpaceId, @@ -226,6 +226,12 @@ pub struct Connection { /// no outgoing application data. app_limited: bool, + // + // ObservedAddr + // + /// Sequence number for the next observed address frame sent to the peer. + next_observed_addr_seq_no: VarInt, + streams: StreamsState, /// Surplus remote CIDs for future use on new paths rem_cids: CidQueue, @@ -345,6 +351,8 @@ impl Connection { receiving_ecn: false, total_authed_packets: 0, + next_observed_addr_seq_no: 0u32.into(), + streams: StreamsState::new( side, config.max_concurrent_uni_streams, @@ -2624,6 +2632,9 @@ impl Connection { let mut close = None; let payload_len = payload.len(); let mut ack_eliciting = false; + // if this packet triggers a path migration and includes a observed address frame, it's + // stored here + let mut migration_observed_addr = None; for result in frame::Iter::new(payload)? { let frame = result?; let span = match frame { @@ -2667,7 +2678,8 @@ impl Connection { Frame::Padding | Frame::PathChallenge(_) | Frame::PathResponse(_) - | Frame::NewConnectionId(_) => {} + | Frame::NewConnectionId(_) + | Frame::ObservedAddr(_) => {} _ => { is_probing_packet = false; } @@ -2895,6 +2907,33 @@ impl Connection { self.discard_space(now, SpaceId::Handshake); } } + Frame::ObservedAddr(observed) => { + // check if params allows the peer to send report and this node to receive it + if !self + .peer_params + .address_discovery_role + .should_report(&self.config.address_discovery_role) + { + return Err(TransportError::PROTOCOL_VIOLATION( + "received OBSERVED_ADDRESS frame when not negotiated", + )); + } + // must only be sent in data space + if packet.header.space() != SpaceId::Data { + return Err(TransportError::PROTOCOL_VIOLATION( + "OBSERVED_ADDRESS frame outside data space", + )); + } + + if remote == self.path.remote { + if let Some(updated) = self.path.update_observed_addr_report(observed) { + self.events.push_back(Event::ObservedAddr(updated)); + } + } else { + // include in migration + migration_observed_addr = Some(observed) + } + } } } @@ -2931,7 +2970,7 @@ impl Connection { .migration, "migration-initiating packets should have been dropped immediately" ); - self.migrate(now, remote); + self.migrate(now, remote, migration_observed_addr); // Break linkability, if possible self.update_rem_cid(); self.spin = false; @@ -2940,7 +2979,7 @@ impl Connection { Ok(()) } - fn migrate(&mut self, now: Instant, remote: SocketAddr) { + fn migrate(&mut self, now: Instant, remote: SocketAddr, observed_addr: Option) { trace!(%remote, "migration initiated"); // Reset rtt/congestion state for new path unless it looks like a NAT rebinding. // Note that the congestion window will not grow until validation terminates. Helps mitigate @@ -2960,6 +2999,12 @@ impl Connection { &self.config, ) }; + new_path.last_observed_addr_report = self.path.last_observed_addr_report.clone(); + if let Some(report) = observed_addr { + if let Some(updated) = new_path.update_observed_addr_report(report) { + self.events.push_back(Event::ObservedAddr(updated)); + } + } new_path.challenge = Some(self.rng.gen()); new_path.challenge_pending = true; let prev_pto = self.pto(SpaceId::Data); @@ -3044,6 +3089,53 @@ impl Connection { self.stats.frame_tx.handshake_done.saturating_add(1); } + // OBSERVED_ADDR + let mut send_observed_address = + |space_id: SpaceId, + buf: &mut Vec, + max_size: usize, + space: &mut PacketSpace, + sent: &mut SentFrames, + stats: &mut ConnectionStats, + skip_sent_check: bool| { + // should only be sent within Data space and only if allowed by extension + // negotiation + // send is also skipped if the path has already sent an observed address + let send_allowed = self + .config + .address_discovery_role + .should_report(&self.peer_params.address_discovery_role); + let send_required = + space.pending.observed_addr || !self.path.observed_addr_sent || skip_sent_check; + if space_id != SpaceId::Data || !send_allowed || !send_required { + return; + } + + let observed = + frame::ObservedAddr::new(self.path.remote, self.next_observed_addr_seq_no); + + if buf.len() + observed.size() < max_size { + observed.write(buf); + + self.next_observed_addr_seq_no = + self.next_observed_addr_seq_no.saturating_add(1u8); + self.path.observed_addr_sent = true; + + stats.frame_tx.observed_addr += 1; + sent.retransmits.get_or_create().observed_addr = true; + space.pending.observed_addr = false; + } + }; + send_observed_address( + space_id, + buf, + max_size, + space, + &mut sent, + &mut self.stats, + false, + ); + // PING if mem::replace(&mut space.ping_pending, false) { trace!("PING"); @@ -3113,7 +3205,16 @@ impl Connection { trace!("PATH_CHALLENGE {:08x}", token); buf.write(frame::FrameType::PATH_CHALLENGE); buf.write(token); - self.stats.frame_tx.path_challenge += 1; + + send_observed_address( + space_id, + buf, + max_size, + space, + &mut sent, + &mut self.stats, + true, + ); } } @@ -3126,6 +3227,19 @@ impl Connection { buf.write(frame::FrameType::PATH_RESPONSE); buf.write(token); self.stats.frame_tx.path_response += 1; + + // NOTE: this is technically not required but might be useful to ride the + // request/response nature of path challenges to refresh an observation + // Since PATH_RESPONSE is a probing frame, this is allowed by the spec. + send_observed_address( + space_id, + buf, + max_size, + space, + &mut sent, + &mut self.stats, + true, + ); } } @@ -3746,6 +3860,8 @@ pub enum Event { DatagramReceived, /// One or more application datagrams have been sent after blocking DatagramsUnblocked, + /// Received an observation of our external address from the peer. + ObservedAddr(SocketAddr), } fn instant_saturating_sub(x: Instant, y: Instant) -> Duration { diff --git a/quinn-proto/src/connection/paths.rs b/quinn-proto/src/connection/paths.rs index 2c0476c06..9f7a4e0b9 100644 --- a/quinn-proto/src/connection/paths.rs +++ b/quinn-proto/src/connection/paths.rs @@ -7,7 +7,10 @@ use super::{ pacing::Pacer, spaces::{PacketSpace, SentPacket}, }; -use crate::{congestion, packet::SpaceId, Duration, Instant, TransportConfig, TIMER_GRANULARITY}; +use crate::{ + congestion, frame::ObservedAddr, packet::SpaceId, Duration, Instant, TransportConfig, + TIMER_GRANULARITY, +}; /// Description of a particular network path pub(super) struct PathData { @@ -37,6 +40,11 @@ pub(super) struct PathData { /// Used in persistent congestion determination. pub(super) first_packet_after_rtt_sample: Option<(SpaceId, u64)>, pub(super) in_flight: InFlight, + /// Whether this path has had it's remote address reported back to the peer. This only happens + /// if both peers agree to so based on their transport parameters. + pub(super) observed_addr_sent: bool, + /// Observed address frame with the largest sequence number received from the peer on this path. + pub(super) last_observed_addr_report: Option, /// Number of the first packet sent on this path /// /// Used to determine whether a packet was sent on an earlier path. Insufficient to determine if @@ -90,10 +98,15 @@ impl PathData { ), first_packet_after_rtt_sample: None, in_flight: InFlight::new(), + observed_addr_sent: false, + last_observed_addr_report: None, first_packet: None, } } + /// Create a new path from a previous one. + /// + /// This should only be called when migrating paths. pub(super) fn from_previous(remote: SocketAddr, prev: &Self, now: Instant) -> Self { let congestion = prev.congestion.clone_box(); let smoothed_rtt = prev.rtt.get(); @@ -111,6 +124,8 @@ impl PathData { mtud: prev.mtud.clone(), first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample, in_flight: InFlight::new(), + observed_addr_sent: false, + last_observed_addr_report: None, first_packet: None, } } @@ -156,6 +171,37 @@ impl PathData { self.in_flight.remove(packet); true } + + /// Updates the last observed address report received on this path. + /// + /// If the address was updated, it's returned to be informed to the application. + #[must_use = "updated observed address must be reported to the application"] + pub(super) fn update_observed_addr_report( + &mut self, + observed: ObservedAddr, + ) -> Option { + match self.last_observed_addr_report.as_mut() { + Some(prev) => { + if prev.seq_no >= observed.seq_no { + // frames that do not increase the sequence number on this path are ignored + None + } else if prev.ip == observed.ip && prev.port == observed.port { + // keep track of the last seq_no but do not report the address as updated + prev.seq_no = observed.seq_no; + None + } else { + let addr = observed.socket_addr(); + self.last_observed_addr_report = Some(observed); + Some(addr) + } + } + None => { + let addr = observed.socket_addr(); + self.last_observed_addr_report = Some(observed); + Some(addr) + } + } + } } /// RTT estimation for a particular network path diff --git a/quinn-proto/src/connection/spaces.rs b/quinn-proto/src/connection/spaces.rs index a04fbd3d7..3832582fb 100644 --- a/quinn-proto/src/connection/spaces.rs +++ b/quinn-proto/src/connection/spaces.rs @@ -309,6 +309,7 @@ pub struct Retransmits { pub(super) retire_cids: Vec, pub(super) ack_frequency: bool, pub(super) handshake_done: bool, + pub(super) observed_addr: bool, } impl Retransmits { @@ -326,6 +327,7 @@ impl Retransmits { && self.retire_cids.is_empty() && !self.ack_frequency && !self.handshake_done + && !self.observed_addr } } @@ -347,6 +349,7 @@ impl ::std::ops::BitOrAssign for Retransmits { self.retire_cids.extend(rhs.retire_cids); self.ack_frequency |= rhs.ack_frequency; self.handshake_done |= rhs.handshake_done; + self.observed_addr |= rhs.observed_addr; } } diff --git a/quinn-proto/src/connection/stats.rs b/quinn-proto/src/connection/stats.rs index 9ddb42d1a..31f5f1d14 100644 --- a/quinn-proto/src/connection/stats.rs +++ b/quinn-proto/src/connection/stats.rs @@ -53,6 +53,7 @@ pub struct FrameStats { pub streams_blocked_uni: u64, pub stop_sending: u64, pub stream: u64, + pub observed_addr: u64, } impl FrameStats { @@ -93,6 +94,7 @@ impl FrameStats { Frame::AckFrequency(_) => self.ack_frequency += 1, Frame::ImmediateAck => self.immediate_ack += 1, Frame::HandshakeDone => self.handshake_done = self.handshake_done.saturating_add(1), + Frame::ObservedAddr(_) => self.observed_addr += 1, } } } diff --git a/quinn-proto/src/frame.rs b/quinn-proto/src/frame.rs index 0bc7f34ad..915721c7f 100644 --- a/quinn-proto/src/frame.rs +++ b/quinn-proto/src/frame.rs @@ -1,6 +1,7 @@ use std::{ fmt::{self, Write}, io, mem, + net::{IpAddr, SocketAddr}, ops::{Range, RangeInclusive}, }; @@ -134,6 +135,9 @@ frame_types! { ACK_FREQUENCY = 0xaf, IMMEDIATE_ACK = 0x1f, // DATAGRAM + // ADDRESS DISCOVERY REPORT + OBSERVED_IPV4_ADDR = 0x9f81a6, + OBSERVED_IPV6_ADDR = 0x9f81a7, } const STREAM_TYS: RangeInclusive = RangeInclusive::new(0x08, 0x0f); @@ -164,6 +168,7 @@ pub(crate) enum Frame { AckFrequency(AckFrequency), ImmediateAck, HandshakeDone, + ObservedAddr(ObservedAddr), } impl Frame { @@ -205,6 +210,7 @@ impl Frame { AckFrequency(_) => FrameType::ACK_FREQUENCY, ImmediateAck => FrameType::IMMEDIATE_ACK, HandshakeDone => FrameType::HANDSHAKE_DONE, + ObservedAddr(ref observed) => observed.get_type(), } } @@ -687,6 +693,11 @@ impl Iter { reordering_threshold: self.bytes.get()?, }), FrameType::IMMEDIATE_ACK => Frame::ImmediateAck, + FrameType::OBSERVED_IPV4_ADDR | FrameType::OBSERVED_IPV6_ADDR => { + let is_ipv6 = ty == FrameType::OBSERVED_IPV6_ADDR; + let observed = ObservedAddr::read(&mut self.bytes, is_ipv6)?; + Frame::ObservedAddr(observed) + } _ => { if let Some(s) = ty.stream() { Frame::Stream(Stream { @@ -929,8 +940,86 @@ impl AckFrequency { } } +/* Address Discovery https://datatracker.ietf.org/doc/draft-seemann-quic-address-discovery/ */ + +/// Conjuction of the information contained in the address discovery frames +/// ([`FrameType::OBSERVED_IPV4_ADDR`], [`FrameType::OBSERVED_IPV6_ADDR`]). +#[derive(Debug, PartialEq, Eq, Clone)] +pub(crate) struct ObservedAddr { + /// Monotonically increasing integer within the same connection. + pub(crate) seq_no: VarInt, + /// Reported observed address. + pub(crate) ip: IpAddr, + /// Reported observed port. + pub(crate) port: u16, +} + +impl ObservedAddr { + pub(crate) fn new>(remote: std::net::SocketAddr, seq_no: N) -> Self { + Self { + ip: remote.ip(), + port: remote.port(), + seq_no: seq_no.into(), + } + } + + /// Get the [`FrameType`] for this frame. + pub(crate) fn get_type(&self) -> FrameType { + if self.ip.is_ipv6() { + FrameType::OBSERVED_IPV6_ADDR + } else { + FrameType::OBSERVED_IPV4_ADDR + } + } + + /// Compute the number of bytes needed to encode the frame. + pub(crate) fn size(&self) -> usize { + let type_size = VarInt(self.get_type().0).size(); + let req_id_bytes = self.seq_no.size(); + let ip_bytes = if self.ip.is_ipv6() { 16 } else { 4 }; + let port_bytes = 2; + type_size + req_id_bytes + ip_bytes + port_bytes + } + + /// Unconditionally write this frame to `buf`. + pub(crate) fn write(&self, buf: &mut W) { + buf.write(self.get_type()); + buf.write(self.seq_no); + match self.ip { + IpAddr::V4(ipv4_addr) => { + buf.write(ipv4_addr); + } + IpAddr::V6(ipv6_addr) => { + buf.write(ipv6_addr); + } + } + buf.write::(self.port); + } + + /// Reads the frame contents from the buffer. + /// + /// Should only be called when the fram type has been identified as + /// [`FrameType::OBSERVED_IPV4_ADDR`] or [`FrameType::OBSERVED_IPV6_ADDR`]. + pub(crate) fn read(bytes: &mut R, is_ipv6: bool) -> coding::Result { + let seq_no = bytes.get()?; + let ip = if is_ipv6 { + IpAddr::V6(bytes.get()?) + } else { + IpAddr::V4(bytes.get()?) + }; + let port = bytes.get()?; + Ok(Self { seq_no, ip, port }) + } + + /// Gives the [`SocketAddr`] reported in the frame. + pub(crate) fn socket_addr(&self) -> SocketAddr { + (self.ip, self.port).into() + } +} + #[cfg(test)] mod test { + use super::*; use crate::coding::Codec; use assert_matches::assert_matches; @@ -996,4 +1085,29 @@ mod test { assert_eq!(frames.len(), 1); assert_matches!(&frames[0], Frame::ImmediateAck); } + + /// Test that encoding and decoding [`ObservedAddr`] produces the same result. + #[test] + fn test_observed_addr_roundrip() { + let observed_addr = ObservedAddr { + seq_no: VarInt(42), + ip: std::net::Ipv4Addr::LOCALHOST.into(), + port: 4242, + }; + let mut buf = Vec::with_capacity(observed_addr.size()); + observed_addr.write(&mut buf); + + assert_eq!( + observed_addr.size(), + buf.len(), + "expected written bytes and actual size differ" + ); + + let mut decoded = frames(buf); + assert_eq!(decoded.len(), 1); + match decoded.pop().expect("non empty") { + Frame::ObservedAddr(decoded) => assert_eq!(decoded, observed_addr), + x => panic!("incorrect frame {x:?}"), + } + } } diff --git a/quinn-proto/src/lib.rs b/quinn-proto/src/lib.rs index 79eddc827..225e4eb93 100644 --- a/quinn-proto/src/lib.rs +++ b/quinn-proto/src/lib.rs @@ -88,6 +88,8 @@ pub use crate::cid_generator::{ mod token; use token::{ResetToken, RetryToken}; +mod address_discovery; + #[cfg(feature = "arbitrary")] use arbitrary::Arbitrary; diff --git a/quinn-proto/src/tests/mod.rs b/quinn-proto/src/tests/mod.rs index cb39b351f..98438d342 100644 --- a/quinn-proto/src/tests/mod.rs +++ b/quinn-proto/src/tests/mod.rs @@ -3185,6 +3185,280 @@ fn voluntary_ack_with_large_datagrams() { ); } +/// Test the address discovery extension on a normal setup. +#[test] +fn address_discovery() { + let _guard = subscribe(); + + let server = ServerConfig { + transport: Arc::new(TransportConfig { + address_discovery_role: crate::address_discovery::Role::Both, + ..TransportConfig::default() + }), + ..server_config() + }; + let mut pair = Pair::new(Default::default(), server); + let client_config = ClientConfig { + transport: Arc::new(TransportConfig { + address_discovery_role: crate::address_discovery::Role::Both, + ..TransportConfig::default() + }), + ..client_config() + }; + let conn_handle = pair.begin_connect(client_config); + + // wait for idle connections + pair.drive(); + + // check that the client received the correct address + let expected_addr = pair.client.addr; + let conn = pair.client_conn_mut(conn_handle); + assert_matches!(conn.poll(), Some(Event::HandshakeDataReady)); + assert_matches!(conn.poll(), Some(Event::Connected)); + assert_matches!(conn.poll(), Some(Event::ObservedAddr(addr)) if addr == expected_addr); + assert_matches!(conn.poll(), None); + + // check that the server received the correct address + let conn_handle = pair.server.assert_accept(); + let expected_addr = pair.server.addr; + let conn = pair.server_conn_mut(conn_handle); + assert_matches!(conn.poll(), Some(Event::HandshakeDataReady)); + assert_matches!(conn.poll(), Some(Event::Connected)); + assert_matches!(conn.poll(), Some(Event::ObservedAddr(addr)) if addr == expected_addr); + assert_matches!(conn.poll(), None); +} + +/// Test that a different address discovery configuration on 0rtt used by the client is accepted by +/// the server. +/// NOTE: this test is the same as zero_rtt_happypath, changing client transport parameters on +/// resumption. +#[test] +fn address_discovery_zero_rtt_accepted() { + let _guard = subscribe(); + let server = ServerConfig { + transport: Arc::new(TransportConfig { + address_discovery_role: crate::address_discovery::Role::Both, + ..TransportConfig::default() + }), + ..server_config() + }; + let mut pair = Pair::new(Default::default(), server); + + pair.server.incoming_connection_behavior = IncomingConnectionBehavior::Validate; + let client_cfg = ClientConfig { + transport: Arc::new(TransportConfig { + address_discovery_role: crate::address_discovery::Role::Both, + ..TransportConfig::default() + }), + ..client_config() + }; + let alt_client_cfg = ClientConfig { + transport: Arc::new(TransportConfig { + address_discovery_role: crate::address_discovery::Role::Disabled, + ..TransportConfig::default() + }), + ..client_cfg.clone() + }; + + // Establish normal connection + let client_ch = pair.begin_connect(client_cfg); + pair.drive(); + pair.server.assert_accept(); + pair.client + .connections + .get_mut(&client_ch) + .unwrap() + .close(pair.time, VarInt(0), [][..].into()); + pair.drive(); + + pair.client.addr = SocketAddr::new( + Ipv6Addr::LOCALHOST.into(), + CLIENT_PORTS.lock().unwrap().next().unwrap(), + ); + info!("resuming session"); + let client_ch = pair.begin_connect(alt_client_cfg); + assert!(pair.client_conn_mut(client_ch).has_0rtt()); + let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap(); + const MSG: &[u8] = b"Hello, 0-RTT!"; + pair.client_send(client_ch, s).write(MSG).unwrap(); + pair.drive(); + + let conn = pair.client_conn_mut(client_ch); + assert_matches!(conn.poll(), Some(Event::HandshakeDataReady)); + assert_matches!(conn.poll(), Some(Event::Connected)); + + assert!(pair.client_conn_mut(client_ch).accepted_0rtt()); + let server_ch = pair.server.assert_accept(); + + let conn = pair.server_conn_mut(server_ch); + assert_matches!(conn.poll(), Some(Event::HandshakeDataReady)); + // We don't currently preserve stream event order wrt. connection events + assert_matches!(conn.poll(), Some(Event::Connected)); + assert_matches!( + conn.poll(), + Some(Event::Stream(StreamEvent::Opened { dir: Dir::Uni })) + ); + + let mut recv = pair.server_recv(server_ch, s); + let mut chunks = recv.read(false).unwrap(); + assert_matches!( + chunks.next(usize::MAX), + Ok(Some(chunk)) if chunk.offset == 0 && chunk.bytes == MSG + ); + let _ = chunks.finalize(); + assert_eq!(pair.client_conn_mut(client_ch).lost_packets(), 0); +} + +/// Test that a different address discovery configuration on 0rtt used by the server is rejected by +/// the client. +/// NOTE: the server MUST not change configuration on resumption. However, there is no designed +/// behaviour when this is encountered. Quinn chooses to accept and then close the connection, +/// which is what this test checks. +#[test] +fn address_discovery_zero_rtt_rejection() { + let _guard = subscribe(); + let server_cfg = ServerConfig { + transport: Arc::new(TransportConfig { + address_discovery_role: crate::address_discovery::Role::Disabled, + ..TransportConfig::default() + }), + ..server_config() + }; + let alt_server_cfg = ServerConfig { + transport: Arc::new(TransportConfig { + address_discovery_role: crate::address_discovery::Role::SendOnly, + ..TransportConfig::default() + }), + ..server_cfg.clone() + }; + let mut pair = Pair::new(Default::default(), server_cfg); + let client_cfg = ClientConfig { + transport: Arc::new(TransportConfig { + address_discovery_role: crate::address_discovery::Role::Both, + ..TransportConfig::default() + }), + ..client_config() + }; + + // Establish normal connection + let client_ch = pair.begin_connect(client_cfg.clone()); + pair.drive(); + let server_ch = pair.server.assert_accept(); + let conn = pair.server_conn_mut(server_ch); + assert_matches!(conn.poll(), Some(Event::HandshakeDataReady)); + assert_matches!(conn.poll(), Some(Event::Connected)); + assert_matches!(conn.poll(), None); + pair.client + .connections + .get_mut(&client_ch) + .unwrap() + .close(pair.time, VarInt(0), [][..].into()); + pair.drive(); + assert_matches!( + pair.server_conn_mut(server_ch).poll(), + Some(Event::ConnectionLost { .. }) + ); + assert_matches!(pair.server_conn_mut(server_ch).poll(), None); + pair.client.connections.clear(); + pair.server.connections.clear(); + + // Changing address discovery configurations makes the client close the connection + pair.server + .set_server_config(Some(Arc::new(alt_server_cfg))); + info!("resuming session"); + let client_ch = pair.begin_connect(client_cfg); + assert!(pair.client_conn_mut(client_ch).has_0rtt()); + let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap(); + const MSG: &[u8] = b"Hello, 0-RTT!"; + pair.client_send(client_ch, s).write(MSG).unwrap(); + pair.drive(); + let conn = pair.client_conn_mut(server_ch); + assert_matches!(conn.poll(), Some(Event::HandshakeDataReady)); + assert_matches!( + conn.poll(), + Some(Event::ConnectionLost { reason }) if matches!(reason, ConnectionError::TransportError(_) ) + ); +} + +#[test] +fn address_discovery_retransmission() { + let _guard = subscribe(); + + let server = ServerConfig { + transport: Arc::new(TransportConfig { + address_discovery_role: crate::address_discovery::Role::Both, + ..TransportConfig::default() + }), + ..server_config() + }; + let mut pair = Pair::new(Default::default(), server); + let client_config = ClientConfig { + transport: Arc::new(TransportConfig { + address_discovery_role: crate::address_discovery::Role::Both, + ..TransportConfig::default() + }), + ..client_config() + }; + let client_ch = pair.begin_connect(client_config); + pair.step(); + + // lose the last packet + pair.client.inbound.pop_back().unwrap(); + pair.step(); + let conn = pair.client_conn_mut(client_ch); + assert_matches!(conn.poll(), Some(Event::HandshakeDataReady)); + assert_matches!(conn.poll(), Some(Event::Connected)); + assert_matches!(conn.poll(), None); + + pair.drive(); + let conn = pair.client_conn_mut(client_ch); + assert_matches!(conn.poll(), + Some(Event::ObservedAddr(addr)) if addr == pair.client.addr); +} + +#[test] +fn address_discovery_rebind_retransmission() { + let _guard = subscribe(); + + let server = ServerConfig { + transport: Arc::new(TransportConfig { + address_discovery_role: crate::address_discovery::Role::Both, + ..TransportConfig::default() + }), + ..server_config() + }; + let mut pair = Pair::new(Default::default(), server); + let client_config = ClientConfig { + transport: Arc::new(TransportConfig { + address_discovery_role: crate::address_discovery::Role::Both, + ..TransportConfig::default() + }), + ..client_config() + }; + let client_ch = pair.begin_connect(client_config); + pair.step(); + + // lose the last packet + pair.client.inbound.pop_back().unwrap(); + pair.step(); + let conn = pair.client_conn_mut(client_ch); + assert_matches!(conn.poll(), Some(Event::HandshakeDataReady)); + assert_matches!(conn.poll(), Some(Event::Connected)); + assert_matches!(conn.poll(), None); + + // simulate a rebind to ensure we will get an updated address instead of retransmitting + // outdated info + pair.client_conn_mut(client_ch).local_address_changed(); + pair.client + .addr + .set_port(pair.client.addr.port().overflowing_add(1).0); + + pair.drive(); + let conn = pair.client_conn_mut(client_ch); + assert_matches!(conn.poll(), + Some(Event::ObservedAddr(addr)) if addr == pair.client.addr); +} + #[test] fn reject_short_idcid() { let _guard = subscribe(); diff --git a/quinn-proto/src/transport_parameters.rs b/quinn-proto/src/transport_parameters.rs index c09c41f97..8d8996ef6 100644 --- a/quinn-proto/src/transport_parameters.rs +++ b/quinn-proto/src/transport_parameters.rs @@ -15,6 +15,7 @@ use bytes::{Buf, BufMut}; use thiserror::Error; use crate::{ + address_discovery, cid_generator::ConnectionIdGenerator, cid_queue::CidQueue, coding::{BufExt, BufMutExt, UnexpectedEnd}, @@ -99,6 +100,8 @@ macro_rules! make_struct { pub(crate) stateless_reset_token: Option, /// The server's preferred address for communication after handshake completion pub(crate) preferred_address: Option, + /// The role of this peer in address discovery, if any. + pub(crate) address_discovery_role: address_discovery::Role, } // We deliberately don't implement the `Default` trait, since that would be public, and @@ -120,6 +123,8 @@ macro_rules! make_struct { retry_src_cid: None, stateless_reset_token: None, preferred_address: None, + + address_discovery_role: address_discovery::Role::Disabled, } } } @@ -160,6 +165,7 @@ impl TransportParameters { min_ack_delay: Some( VarInt::from_u64(u64::try_from(TIMER_GRANULARITY.as_micros()).unwrap()).unwrap(), ), + address_discovery_role: config.address_discovery_role, ..Self::default() } } @@ -176,6 +182,7 @@ impl TransportParameters { || cached.initial_max_streams_uni > self.initial_max_streams_uni || cached.max_datagram_frame_size > self.max_datagram_frame_size || cached.grease_quic_bit && !self.grease_quic_bit + || cached.address_discovery_role != self.address_discovery_role { return Err(TransportError::PROTOCOL_VIOLATION( "0-RTT accepted with incompatible transport parameters", @@ -349,6 +356,12 @@ impl TransportParameters { w.write_var(x.size() as u64); w.write(x); } + + if let Some(varint_role) = self.address_discovery_role.as_transport_parameter() { + w.write_var(address_discovery::TRANSPORT_PARAMETER_CODE); + w.write_var(varint_role.size() as u64); + w.write(varint_role); + } } /// Decode `TransportParameters` from buffer @@ -413,6 +426,21 @@ impl TransportParameters { _ => return Err(Error::Malformed), }, 0xff04de1b => params.min_ack_delay = Some(r.get().unwrap()), + address_discovery::TRANSPORT_PARAMETER_CODE => { + if !params.address_discovery_role.is_disabled() { + // duplicate parameter + return Err(Error::Malformed); + } + let value: VarInt = r.get()?; + if len != value.size() { + return Err(Error::Malformed); + } + params.address_discovery_role = value.try_into()?; + tracing::debug!( + role = ?params.address_discovery_role, + "address discovery enabled for peer" + ); + } _ => { macro_rules! parse { {$($(#[$doc:meta])* $name:ident ($code:expr) = $default:expr,)*} => { @@ -479,6 +507,7 @@ fn decode_cid(len: usize, value: &mut Option, r: &mut impl Buf) -> #[cfg(test)] mod test { + use super::*; #[test] @@ -499,6 +528,7 @@ mod test { }), grease_quic_bit: true, min_ack_delay: Some(2_000u32.into()), + address_discovery_role: address_discovery::Role::SendOnly, ..TransportParameters::default() }; params.write(&mut buf); diff --git a/quinn-proto/src/varint.rs b/quinn-proto/src/varint.rs index a72fb3431..08022d9db 100644 --- a/quinn-proto/src/varint.rs +++ b/quinn-proto/src/varint.rs @@ -50,6 +50,14 @@ impl VarInt { self.0 } + /// Saturating integer addition. Computes self + rhs, saturating at the numeric bounds instead + /// of overflowing. + pub fn saturating_add(self, rhs: impl Into) -> Self { + let rhs = rhs.into(); + let inner = self.0.saturating_add(rhs.0).min(Self::MAX.0); + Self(inner) + } + /// Compute the number of bytes needed to encode this value pub(crate) const fn size(self) -> usize { let x = self.0; @@ -191,3 +199,19 @@ impl Codec for VarInt { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_saturating_add() { + // add within range behaves normally + let large: VarInt = u32::MAX.into(); + let next = u64::from(u32::MAX) + 1; + assert_eq!(large.saturating_add(1u8), VarInt::from_u64(next).unwrap()); + + // outside range saturates + assert_eq!(VarInt::MAX.saturating_add(1u8), VarInt::MAX) + } +} diff --git a/quinn/Cargo.toml b/quinn/Cargo.toml index 4b020a34e..0ae4d99b1 100644 --- a/quinn/Cargo.toml +++ b/quinn/Cargo.toml @@ -71,6 +71,7 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread", "time", "macros tracing-subscriber = { workspace = true } tracing-futures = { workspace = true } url = { workspace = true } +tokio-stream = "0.1.15" [[example]] name = "server" diff --git a/quinn/examples/client.rs b/quinn/examples/client.rs index 0ace61f95..80fc3562d 100644 --- a/quinn/examples/client.rs +++ b/quinn/examples/client.rs @@ -13,7 +13,7 @@ use std::{ use anyhow::{anyhow, Result}; use clap::Parser; -use proto::crypto::rustls::QuicClientConfig; +use proto::{crypto::rustls::QuicClientConfig, TransportConfig}; use rustls::pki_types::CertificateDer; use tracing::{error, info}; use url::Url; @@ -101,8 +101,13 @@ async fn run(options: Opt) -> Result<()> { client_crypto.key_log = Arc::new(rustls::KeyLogFile::new()); } - let client_config = + let mut transport = TransportConfig::default(); + transport + .send_observed_address_reports(true) + .receive_observed_address_reports(true); + let mut client_config = quinn::ClientConfig::new(Arc::new(QuicClientConfig::try_from(client_crypto)?)); + client_config.transport_config(Arc::new(transport)); let mut endpoint = quinn::Endpoint::client(options.bind)?; endpoint.set_default_client_config(client_config); @@ -117,6 +122,18 @@ async fn run(options: Opt) -> Result<()> { .await .map_err(|e| anyhow!("failed to connect: {}", e))?; eprintln!("connected at {:?}", start.elapsed()); + let mut external_addresses = conn.observed_external_addr(); + tokio::spawn(async move { + loop { + if let Some(new_addr) = *external_addresses.borrow_and_update() { + info!(%new_addr, "new external address report"); + } + if external_addresses.changed().await.is_err() { + break; + } + } + }); + let (mut send, mut recv) = conn .open_bi() .await diff --git a/quinn/examples/server.rs b/quinn/examples/server.rs index 5f9e6f35d..ae1e6b3e2 100644 --- a/quinn/examples/server.rs +++ b/quinn/examples/server.rs @@ -127,7 +127,10 @@ async fn run(options: Opt) -> Result<()> { let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(QuicServerConfig::try_from(server_crypto)?)); let transport_config = Arc::get_mut(&mut server_config.transport).unwrap(); - transport_config.max_concurrent_uni_streams(0_u8.into()); + transport_config + .max_concurrent_uni_streams(0_u8.into()) + .send_observed_address_reports(true) + .receive_observed_address_reports(true); let root = Arc::::from(options.root.clone()); if !root.exists() { @@ -176,6 +179,21 @@ async fn handle_connection(root: Arc, conn: quinn::Incoming) -> Result<()> .protocol .map_or_else(|| "".into(), |x| String::from_utf8_lossy(&x).into_owned()) ); + + let mut external_addresses = connection.observed_external_addr(); + tokio::spawn( + async move { + loop { + if let Some(new_addr) = *external_addresses.borrow_and_update() { + info!(%new_addr, "new external address report"); + } + if external_addresses.changed().await.is_err() { + break; + } + } + } + .instrument(span.clone()), + ); async { info!("established"); diff --git a/quinn/src/connection.rs b/quinn/src/connection.rs index cf09c6e67..0969f514f 100644 --- a/quinn/src/connection.rs +++ b/quinn/src/connection.rs @@ -14,7 +14,7 @@ use bytes::Bytes; use pin_project_lite::pin_project; use rustc_hash::FxHashMap; use thiserror::Error; -use tokio::sync::{futures::Notified, mpsc, oneshot, Notify}; +use tokio::sync::{futures::Notified, mpsc, oneshot, watch, Notify}; use tracing::{debug_span, Instrument, Span}; use crate::{ @@ -636,6 +636,12 @@ impl Connection { // May need to send MAX_STREAMS to make progress conn.wake(); } + + /// Track changed on our external address as reported by the peer. + pub fn observed_external_addr(&self) -> watch::Receiver> { + let conn = self.0.state.lock("external_addr"); + conn.observed_external_addr.subscribe() + } } pin_project! { @@ -892,6 +898,7 @@ impl ConnectionRef { runtime, send_buffer: Vec::new(), buffered_transmit: None, + observed_external_addr: watch::Sender::new(None), }), shared: Shared::default(), })) @@ -974,6 +981,8 @@ pub(crate) struct State { send_buffer: Vec, /// We buffer a transmit when the underlying I/O would block buffered_transmit: Option, + /// Our last external address reported by the peer. + pub(crate) observed_external_addr: watch::Sender>, } impl State { @@ -1131,6 +1140,12 @@ impl State { wake_stream(id, &mut self.stopped); wake_stream(id, &mut self.blocked_writers); } + ObservedAddr(observed) => { + self.observed_external_addr.send_if_modified(|addr| { + let old = addr.replace(observed); + old != *addr + }); + } } } }