From 2a18a64570c628d07b80ea832ef64a752c186720 Mon Sep 17 00:00:00 2001 From: Lann Martin Date: Fri, 6 Sep 2024 11:56:58 -0400 Subject: [PATCH 1/3] Make OutboundHttpInterceptor async Signed-off-by: Lann Martin --- crates/factor-outbound-http/src/lib.rs | 18 ++++--- crates/factor-outbound-http/src/wasi.rs | 68 ++++++++++++------------ crates/trigger-http/src/outbound_http.rs | 48 +++++++---------- 3 files changed, 62 insertions(+), 72 deletions(-) diff --git a/crates/factor-outbound-http/src/lib.rs b/crates/factor-outbound-http/src/lib.rs index fa39de13d..127dc7fdf 100644 --- a/crates/factor-outbound-http/src/lib.rs +++ b/crates/factor-outbound-http/src/lib.rs @@ -3,7 +3,7 @@ mod wasi; pub mod wasi_2023_10_18; pub mod wasi_2023_11_10; -use std::net::SocketAddr; +use std::{net::SocketAddr, sync::Arc}; use anyhow::Context; use http::{ @@ -16,7 +16,8 @@ use spin_factor_outbound_networking::{ use spin_factors::{ anyhow, ConfigureAppContext, Factor, PrepareContext, RuntimeFactors, SelfInstanceBuilder, }; -use wasmtime_wasi_http::WasiHttpCtx; +use spin_world::async_trait; +use wasmtime_wasi_http::{types::IncomingResponse, WasiHttpCtx}; pub use wasmtime_wasi_http::{ body::HyperOutgoingBody, @@ -91,7 +92,7 @@ pub struct InstanceState { allow_private_ips: bool, component_tls_configs: ComponentTlsConfigs, self_request_origin: Option, - request_interceptor: Option>, + request_interceptor: Option>, // Connection-pooling client for 'fermyon:spin/http' interface spin_http_client: Option, } @@ -115,7 +116,7 @@ impl InstanceState { if self.request_interceptor.is_some() { anyhow::bail!("set_request_interceptor can only be called once"); } - self.request_interceptor = Some(Box::new(interceptor)); + self.request_interceptor = Some(Arc::new(interceptor)); Ok(()) } } @@ -177,6 +178,7 @@ impl std::fmt::Display for SelfRequestOrigin { /// An outbound HTTP request interceptor to be used with /// [`InstanceState::set_request_interceptor`]. +#[async_trait] pub trait OutboundHttpInterceptor: Send + Sync { /// Intercept an outgoing HTTP request. /// @@ -186,12 +188,12 @@ pub trait OutboundHttpInterceptor: Send + Sync { /// /// If this method returns [`InterceptedResponse::Intercepted`], the inner /// result will be returned as the result of the request, bypassing the - /// default handler. - fn intercept( + /// default handler. The `request` will also be dropped immediately. + async fn intercept( &self, request: &mut Request, config: &mut OutgoingRequestConfig, - ) -> InterceptOutcome; + ) -> HttpResult; } /// The type returned by an [`OutboundHttpInterceptor`]. @@ -201,5 +203,5 @@ pub enum InterceptOutcome { Continue, /// The given result will be returned as the result of the intercepted /// request, bypassing the default handler. - Complete(HttpResult), + Complete(IncomingResponse), } diff --git a/crates/factor-outbound-http/src/wasi.rs b/crates/factor-outbound-http/src/wasi.rs index 0b0f2c29f..fbd806368 100644 --- a/crates/factor-outbound-http/src/wasi.rs +++ b/crates/factor-outbound-http/src/wasi.rs @@ -5,7 +5,7 @@ use http::{header::HOST, Request}; use http_body_util::BodyExt; use ip_network::IpNetwork; use rustls::ClientConfig; -use spin_factor_outbound_networking::OutboundAllowedHosts; +use spin_factor_outbound_networking::{ComponentTlsConfigs, OutboundAllowedHosts}; use spin_factors::{wasmtime::component::ResourceTable, RuntimeFactorsInstanceState}; use tokio::{net::TcpStream, time::timeout}; use tracing::{field::Empty, instrument, Instrument}; @@ -19,7 +19,7 @@ use wasmtime_wasi_http::{ use crate::{ wasi_2023_10_18, wasi_2023_11_10, InstanceState, InterceptOutcome, OutboundHttpFactor, - SelfRequestOrigin, + OutboundHttpInterceptor, SelfRequestOrigin, }; pub(crate) fn add_to_linker( @@ -84,46 +84,18 @@ impl<'a> WasiHttpView for WasiHttpImplInner<'a> { )] fn send_request( &mut self, - mut request: Request, - mut config: wasmtime_wasi_http::types::OutgoingRequestConfig, + request: Request, + config: wasmtime_wasi_http::types::OutgoingRequestConfig, ) -> wasmtime_wasi_http::HttpResult { - // wasmtime-wasi-http fills in scheme and authority for relative URLs - // (e.g. https://:443/), which makes them hard to reason about. - // Undo that here. - let uri = request.uri_mut(); - if uri - .authority() - .is_some_and(|authority| authority.host().is_empty()) - { - let mut builder = http::uri::Builder::new(); - if let Some(paq) = uri.path_and_query() { - builder = builder.path_and_query(paq.clone()); - } - *uri = builder.build().unwrap(); - } - - if let Some(interceptor) = &self.state.request_interceptor { - match interceptor.intercept(&mut request, &mut config) { - InterceptOutcome::Continue => (), - InterceptOutcome::Complete(res) => return res, - } - } - - let host = request.uri().host().unwrap_or_default(); - let tls_client_config = self - .state - .component_tls_configs - .get_client_config(host) - .clone(); - Ok(HostFutureIncomingResponse::Pending( wasmtime_wasi::runtime::spawn( send_request_impl( request, config, self.state.allowed_hosts.clone(), + self.state.component_tls_configs.clone(), + self.state.request_interceptor.clone(), self.state.self_request_origin.clone(), - tls_client_config, self.state.allow_private_ips, ) .in_current_span(), @@ -136,10 +108,36 @@ async fn send_request_impl( mut request: Request, mut config: wasmtime_wasi_http::types::OutgoingRequestConfig, outbound_allowed_hosts: OutboundAllowedHosts, + component_tls_configs: ComponentTlsConfigs, + request_interceptor: Option>, self_request_origin: Option, - tls_client_config: Arc, allow_private_ips: bool, ) -> anyhow::Result> { + // wasmtime-wasi-http fills in scheme and authority for relative URLs + // (e.g. https://:443/), which makes them hard to reason about. + // Undo that here. + let uri = request.uri_mut(); + if uri + .authority() + .is_some_and(|authority| authority.host().is_empty()) + { + let mut builder = http::uri::Builder::new(); + if let Some(paq) = uri.path_and_query() { + builder = builder.path_and_query(paq.clone()); + } + *uri = builder.build().unwrap(); + } + + if let Some(interceptor) = request_interceptor { + match interceptor.intercept(&mut request, &mut config).await? { + InterceptOutcome::Continue => (), + InterceptOutcome::Complete(resp) => return Ok(Ok(resp)), + } + } + + let host = request.uri().host().unwrap_or_default(); + let tls_client_config = component_tls_configs.get_client_config(host).clone(); + if request.uri().authority().is_some() { // Absolute URI let is_allowed = outbound_allowed_hosts diff --git a/crates/trigger-http/src/outbound_http.rs b/crates/trigger-http/src/outbound_http.rs index b8abe1661..9db66ce04 100644 --- a/crates/trigger-http/src/outbound_http.rs +++ b/crates/trigger-http/src/outbound_http.rs @@ -4,13 +4,12 @@ use std::{ }; use http::uri::Scheme; -use spin_factor_outbound_http::{ - HostFutureIncomingResponse, InterceptOutcome, OutgoingRequestConfig, Request, -}; +use spin_core::async_trait; +use spin_factor_outbound_http::{InterceptOutcome, OutgoingRequestConfig, Request}; use spin_factor_outbound_networking::parse_service_chaining_target; use spin_factors::RuntimeFactors; use spin_http::routes::RouteMatch; -use wasmtime_wasi_http::types::IncomingResponse; +use wasmtime_wasi_http::{types::IncomingResponse, HttpError, HttpResult}; use crate::HttpServer; @@ -27,40 +26,31 @@ impl OutboundHttpInterceptor { const CHAINED_CLIENT_ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); +#[async_trait] impl spin_factor_outbound_http::OutboundHttpInterceptor for OutboundHttpInterceptor { - fn intercept( + async fn intercept( &self, request: &mut Request, config: &mut OutgoingRequestConfig, - ) -> InterceptOutcome { - let uri = request.uri(); - + ) -> HttpResult { // Handle service chaining requests - if let Some(component_id) = parse_service_chaining_target(uri) { - // TODO: look at the rest of chain_request - let route_match = RouteMatch::synthetic(&component_id, uri.path()); + if let Some(component_id) = parse_service_chaining_target(request.uri()) { let req = std::mem::take(request); - let between_bytes_timeout = config.between_bytes_timeout; - let server = self.server.clone(); - let resp_fut = async move { - match server - .handle_trigger_route(req, route_match, Scheme::HTTP, CHAINED_CLIENT_ADDR) - .await - { - Ok(resp) => Ok(Ok(IncomingResponse { - resp, - between_bytes_timeout, - worker: None, - })), - Err(e) => Err(wasmtime::Error::msg(e)), - } - }; - let resp = HostFutureIncomingResponse::pending(wasmtime_wasi::runtime::spawn(resp_fut)); - InterceptOutcome::Complete(Ok(resp)) + let route_match = RouteMatch::synthetic(&component_id, req.uri().path()); + let resp = self + .server + .handle_trigger_route(req, route_match, Scheme::HTTP, CHAINED_CLIENT_ADDR) + .await + .map_err(HttpError::trap)?; + Ok(InterceptOutcome::Complete(IncomingResponse { + resp, + worker: None, + between_bytes_timeout: config.between_bytes_timeout, + })) } else { - InterceptOutcome::Continue + Ok(InterceptOutcome::Continue) } } } From b3ab37175968452ce2dfb9f386de498306410ae1 Mon Sep 17 00:00:00 2001 From: Lann Martin Date: Fri, 6 Sep 2024 12:57:23 -0400 Subject: [PATCH 2/3] Fix some outbound-http telemetry Signed-off-by: Lann Martin --- crates/factor-outbound-http/src/wasi.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/crates/factor-outbound-http/src/wasi.rs b/crates/factor-outbound-http/src/wasi.rs index fbd806368..0d32e52a8 100644 --- a/crates/factor-outbound-http/src/wasi.rs +++ b/crates/factor-outbound-http/src/wasi.rs @@ -74,7 +74,7 @@ impl<'a> WasiHttpView for WasiHttpImplInner<'a> { skip_all, fields( otel.kind = "client", - url.full = %request.uri(), + url.full = Empty, http.request.method = %request.method(), otel.name = %request.method(), http.response.status_code = Empty, @@ -127,6 +127,10 @@ async fn send_request_impl( } *uri = builder.build().unwrap(); } + let span = tracing::Span::current(); + span.record("url.full", uri.to_string()); + + spin_telemetry::inject_trace_context(&mut request); if let Some(interceptor) = request_interceptor { match interceptor.intercept(&mut request, &mut config).await? { @@ -165,17 +169,15 @@ async fn send_request_impl( config.use_tls = origin.use_tls(); request.headers_mut().insert(HOST, origin.host_header()); - spin_telemetry::inject_trace_context(&mut request); let path_and_query = request.uri().path_and_query().cloned(); *request.uri_mut() = origin.into_uri(path_and_query); } let authority = request.uri().authority().context("authority not set")?; - let current_span = tracing::Span::current(); - current_span.record("server.address", authority.host()); + span.record("server.address", authority.host()); if let Some(port) = authority.port() { - current_span.record("server.port", port.as_u16()); + span.record("server.port", port.as_u16()); } Ok(send_request_handler(request, config, tls_client_config, allow_private_ips).await) @@ -315,6 +317,8 @@ async fn send_request_handler( .map_err(hyper_request_error)? .map(|body| body.map_err(hyper_request_error).boxed()); + tracing::Span::current().record("http.response.status_code", resp.status().as_u16()); + Ok(wasmtime_wasi_http::types::IncomingResponse { resp, worker: Some(worker), From fdcc7e7c96b6ba50c6988d4bfd47666192e641cc Mon Sep 17 00:00:00 2001 From: Lann Martin Date: Mon, 9 Sep 2024 08:46:27 -0400 Subject: [PATCH 3/3] Simplify OutboundHttpInterceptor Signed-off-by: Lann Martin --- crates/factor-outbound-http/src/lib.rs | 17 +++++++---------- crates/factor-outbound-http/src/wasi.rs | 11 +++++++++-- crates/trigger-http/src/outbound_http.rs | 16 ++++------------ 3 files changed, 20 insertions(+), 24 deletions(-) diff --git a/crates/factor-outbound-http/src/lib.rs b/crates/factor-outbound-http/src/lib.rs index 127dc7fdf..0d576f044 100644 --- a/crates/factor-outbound-http/src/lib.rs +++ b/crates/factor-outbound-http/src/lib.rs @@ -17,7 +17,7 @@ use spin_factors::{ anyhow, ConfigureAppContext, Factor, PrepareContext, RuntimeFactors, SelfInstanceBuilder, }; use spin_world::async_trait; -use wasmtime_wasi_http::{types::IncomingResponse, WasiHttpCtx}; +use wasmtime_wasi_http::WasiHttpCtx; pub use wasmtime_wasi_http::{ body::HyperOutgoingBody, @@ -124,6 +124,7 @@ impl InstanceState { impl SelfInstanceBuilder for InstanceState {} pub type Request = http::Request; +pub type Response = http::Response; /// SelfRequestOrigin indicates the base URI to use for "self" requests. /// @@ -183,17 +184,13 @@ pub trait OutboundHttpInterceptor: Send + Sync { /// Intercept an outgoing HTTP request. /// /// If this method returns [`InterceptedResponse::Continue`], the (possibly - /// updated) request and config will be passed on to the default outgoing - /// request handler. + /// updated) request will be passed on to the default outgoing request + /// handler. /// /// If this method returns [`InterceptedResponse::Intercepted`], the inner /// result will be returned as the result of the request, bypassing the /// default handler. The `request` will also be dropped immediately. - async fn intercept( - &self, - request: &mut Request, - config: &mut OutgoingRequestConfig, - ) -> HttpResult; + async fn intercept(&self, request: &mut Request) -> HttpResult; } /// The type returned by an [`OutboundHttpInterceptor`]. @@ -201,7 +198,7 @@ pub enum InterceptOutcome { /// The intercepted request will be passed on to the default outgoing /// request handler. Continue, - /// The given result will be returned as the result of the intercepted + /// The given response will be returned as the result of the intercepted /// request, bypassing the default handler. - Complete(IncomingResponse), + Complete(Response), } diff --git a/crates/factor-outbound-http/src/wasi.rs b/crates/factor-outbound-http/src/wasi.rs index 0d32e52a8..67fabc4a1 100644 --- a/crates/factor-outbound-http/src/wasi.rs +++ b/crates/factor-outbound-http/src/wasi.rs @@ -133,9 +133,16 @@ async fn send_request_impl( spin_telemetry::inject_trace_context(&mut request); if let Some(interceptor) = request_interceptor { - match interceptor.intercept(&mut request, &mut config).await? { + match interceptor.intercept(&mut request).await? { InterceptOutcome::Continue => (), - InterceptOutcome::Complete(resp) => return Ok(Ok(resp)), + InterceptOutcome::Complete(resp) => { + let resp = IncomingResponse { + resp, + worker: None, + between_bytes_timeout: config.between_bytes_timeout, + }; + return Ok(Ok(resp)); + } } } diff --git a/crates/trigger-http/src/outbound_http.rs b/crates/trigger-http/src/outbound_http.rs index 9db66ce04..f5cc556d2 100644 --- a/crates/trigger-http/src/outbound_http.rs +++ b/crates/trigger-http/src/outbound_http.rs @@ -5,11 +5,11 @@ use std::{ use http::uri::Scheme; use spin_core::async_trait; -use spin_factor_outbound_http::{InterceptOutcome, OutgoingRequestConfig, Request}; +use spin_factor_outbound_http::{InterceptOutcome, Request}; use spin_factor_outbound_networking::parse_service_chaining_target; use spin_factors::RuntimeFactors; use spin_http::routes::RouteMatch; -use wasmtime_wasi_http::{types::IncomingResponse, HttpError, HttpResult}; +use wasmtime_wasi_http::{HttpError, HttpResult}; use crate::HttpServer; @@ -30,11 +30,7 @@ const CHAINED_CLIENT_ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new impl spin_factor_outbound_http::OutboundHttpInterceptor for OutboundHttpInterceptor { - async fn intercept( - &self, - request: &mut Request, - config: &mut OutgoingRequestConfig, - ) -> HttpResult { + async fn intercept(&self, request: &mut Request) -> HttpResult { // Handle service chaining requests if let Some(component_id) = parse_service_chaining_target(request.uri()) { let req = std::mem::take(request); @@ -44,11 +40,7 @@ impl spin_factor_outbound_http::OutboundHttpInterceptor .handle_trigger_route(req, route_match, Scheme::HTTP, CHAINED_CLIENT_ADDR) .await .map_err(HttpError::trap)?; - Ok(InterceptOutcome::Complete(IncomingResponse { - resp, - worker: None, - between_bytes_timeout: config.between_bytes_timeout, - })) + Ok(InterceptOutcome::Complete(resp)) } else { Ok(InterceptOutcome::Continue) }