Skip to content

Commit

Permalink
Merge pull request #2809 from lann/outbound-http-interceptor-async
Browse files Browse the repository at this point in the history
Make OutboundHttpInterceptor async
  • Loading branch information
lann authored Sep 9, 2024
2 parents 8ad00cc + fdcc7e7 commit 351d49d
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 85 deletions.
25 changes: 12 additions & 13 deletions crates/factor-outbound-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod wasi;
pub mod wasi_2023_10_18;
pub mod wasi_2023_11_10;

use std::net::SocketAddr;
use std::{net::SocketAddr, sync::Arc};

use anyhow::Context;
use http::{
Expand All @@ -16,6 +16,7 @@ use spin_factor_outbound_networking::{
use spin_factors::{
anyhow, ConfigureAppContext, Factor, PrepareContext, RuntimeFactors, SelfInstanceBuilder,
};
use spin_world::async_trait;
use wasmtime_wasi_http::WasiHttpCtx;

pub use wasmtime_wasi_http::{
Expand Down Expand Up @@ -91,7 +92,7 @@ pub struct InstanceState {
allow_private_ips: bool,
component_tls_configs: ComponentTlsConfigs,
self_request_origin: Option<SelfRequestOrigin>,
request_interceptor: Option<Box<dyn OutboundHttpInterceptor>>,
request_interceptor: Option<Arc<dyn OutboundHttpInterceptor>>,
// Connection-pooling client for 'fermyon:spin/http' interface
spin_http_client: Option<reqwest::Client>,
}
Expand All @@ -115,14 +116,15 @@ impl InstanceState {
if self.request_interceptor.is_some() {
anyhow::bail!("set_request_interceptor can only be called once");
}
self.request_interceptor = Some(Box::new(interceptor));
self.request_interceptor = Some(Arc::new(interceptor));
Ok(())
}
}

impl SelfInstanceBuilder for InstanceState {}

pub type Request = http::Request<wasmtime_wasi_http::body::HyperOutgoingBody>;
pub type Response = http::Response<wasmtime_wasi_http::body::HyperIncomingBody>;

/// SelfRequestOrigin indicates the base URI to use for "self" requests.
///
Expand Down Expand Up @@ -177,29 +179,26 @@ impl std::fmt::Display for SelfRequestOrigin {

/// An outbound HTTP request interceptor to be used with
/// [`InstanceState::set_request_interceptor`].
#[async_trait]
pub trait OutboundHttpInterceptor: Send + Sync {
/// Intercept an outgoing HTTP request.
///
/// If this method returns [`InterceptedResponse::Continue`], the (possibly
/// updated) request and config will be passed on to the default outgoing
/// request handler.
/// updated) request will be passed on to the default outgoing request
/// handler.
///
/// If this method returns [`InterceptedResponse::Intercepted`], the inner
/// result will be returned as the result of the request, bypassing the
/// default handler.
fn intercept(
&self,
request: &mut Request,
config: &mut OutgoingRequestConfig,
) -> InterceptOutcome;
/// default handler. The `request` will also be dropped immediately.
async fn intercept(&self, request: &mut Request) -> HttpResult<InterceptOutcome>;
}

/// The type returned by an [`OutboundHttpInterceptor`].
pub enum InterceptOutcome {
/// The intercepted request will be passed on to the default outgoing
/// request handler.
Continue,
/// The given result will be returned as the result of the intercepted
/// The given response will be returned as the result of the intercepted
/// request, bypassing the default handler.
Complete(HttpResult<HostFutureIncomingResponse>),
Complete(Response),
}
89 changes: 49 additions & 40 deletions crates/factor-outbound-http/src/wasi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use http::{header::HOST, Request};
use http_body_util::BodyExt;
use ip_network::IpNetwork;
use rustls::ClientConfig;
use spin_factor_outbound_networking::OutboundAllowedHosts;
use spin_factor_outbound_networking::{ComponentTlsConfigs, OutboundAllowedHosts};
use spin_factors::{wasmtime::component::ResourceTable, RuntimeFactorsInstanceState};
use tokio::{net::TcpStream, time::timeout};
use tracing::{field::Empty, instrument, Instrument};
Expand All @@ -19,7 +19,7 @@ use wasmtime_wasi_http::{

use crate::{
wasi_2023_10_18, wasi_2023_11_10, InstanceState, InterceptOutcome, OutboundHttpFactor,
SelfRequestOrigin,
OutboundHttpInterceptor, SelfRequestOrigin,
};

pub(crate) fn add_to_linker<T: Send + 'static>(
Expand Down Expand Up @@ -74,7 +74,7 @@ impl<'a> WasiHttpView for WasiHttpImplInner<'a> {
skip_all,
fields(
otel.kind = "client",
url.full = %request.uri(),
url.full = Empty,
http.request.method = %request.method(),
otel.name = %request.method(),
http.response.status_code = Empty,
Expand All @@ -84,46 +84,18 @@ impl<'a> WasiHttpView for WasiHttpImplInner<'a> {
)]
fn send_request(
&mut self,
mut request: Request<wasmtime_wasi_http::body::HyperOutgoingBody>,
mut config: wasmtime_wasi_http::types::OutgoingRequestConfig,
request: Request<wasmtime_wasi_http::body::HyperOutgoingBody>,
config: wasmtime_wasi_http::types::OutgoingRequestConfig,
) -> wasmtime_wasi_http::HttpResult<wasmtime_wasi_http::types::HostFutureIncomingResponse> {
// wasmtime-wasi-http fills in scheme and authority for relative URLs
// (e.g. https://:443/<path>), which makes them hard to reason about.
// Undo that here.
let uri = request.uri_mut();
if uri
.authority()
.is_some_and(|authority| authority.host().is_empty())
{
let mut builder = http::uri::Builder::new();
if let Some(paq) = uri.path_and_query() {
builder = builder.path_and_query(paq.clone());
}
*uri = builder.build().unwrap();
}

if let Some(interceptor) = &self.state.request_interceptor {
match interceptor.intercept(&mut request, &mut config) {
InterceptOutcome::Continue => (),
InterceptOutcome::Complete(res) => return res,
}
}

let host = request.uri().host().unwrap_or_default();
let tls_client_config = self
.state
.component_tls_configs
.get_client_config(host)
.clone();

Ok(HostFutureIncomingResponse::Pending(
wasmtime_wasi::runtime::spawn(
send_request_impl(
request,
config,
self.state.allowed_hosts.clone(),
self.state.component_tls_configs.clone(),
self.state.request_interceptor.clone(),
self.state.self_request_origin.clone(),
tls_client_config,
self.state.allow_private_ips,
)
.in_current_span(),
Expand All @@ -136,10 +108,47 @@ async fn send_request_impl(
mut request: Request<wasmtime_wasi_http::body::HyperOutgoingBody>,
mut config: wasmtime_wasi_http::types::OutgoingRequestConfig,
outbound_allowed_hosts: OutboundAllowedHosts,
component_tls_configs: ComponentTlsConfigs,
request_interceptor: Option<Arc<dyn OutboundHttpInterceptor>>,
self_request_origin: Option<SelfRequestOrigin>,
tls_client_config: Arc<ClientConfig>,
allow_private_ips: bool,
) -> anyhow::Result<Result<IncomingResponse, ErrorCode>> {
// wasmtime-wasi-http fills in scheme and authority for relative URLs
// (e.g. https://:443/<path>), which makes them hard to reason about.
// Undo that here.
let uri = request.uri_mut();
if uri
.authority()
.is_some_and(|authority| authority.host().is_empty())
{
let mut builder = http::uri::Builder::new();
if let Some(paq) = uri.path_and_query() {
builder = builder.path_and_query(paq.clone());
}
*uri = builder.build().unwrap();
}
let span = tracing::Span::current();
span.record("url.full", uri.to_string());

spin_telemetry::inject_trace_context(&mut request);

if let Some(interceptor) = request_interceptor {
match interceptor.intercept(&mut request).await? {
InterceptOutcome::Continue => (),
InterceptOutcome::Complete(resp) => {
let resp = IncomingResponse {
resp,
worker: None,
between_bytes_timeout: config.between_bytes_timeout,
};
return Ok(Ok(resp));
}
}
}

let host = request.uri().host().unwrap_or_default();
let tls_client_config = component_tls_configs.get_client_config(host).clone();

if request.uri().authority().is_some() {
// Absolute URI
let is_allowed = outbound_allowed_hosts
Expand Down Expand Up @@ -167,17 +176,15 @@ async fn send_request_impl(
config.use_tls = origin.use_tls();

request.headers_mut().insert(HOST, origin.host_header());
spin_telemetry::inject_trace_context(&mut request);

let path_and_query = request.uri().path_and_query().cloned();
*request.uri_mut() = origin.into_uri(path_and_query);
}

let authority = request.uri().authority().context("authority not set")?;
let current_span = tracing::Span::current();
current_span.record("server.address", authority.host());
span.record("server.address", authority.host());
if let Some(port) = authority.port() {
current_span.record("server.port", port.as_u16());
span.record("server.port", port.as_u16());
}

Ok(send_request_handler(request, config, tls_client_config, allow_private_ips).await)
Expand Down Expand Up @@ -317,6 +324,8 @@ async fn send_request_handler(
.map_err(hyper_request_error)?
.map(|body| body.map_err(hyper_request_error).boxed());

tracing::Span::current().record("http.response.status_code", resp.status().as_u16());

Ok(wasmtime_wasi_http::types::IncomingResponse {
resp,
worker: Some(worker),
Expand Down
46 changes: 14 additions & 32 deletions crates/trigger-http/src/outbound_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ use std::{
};

use http::uri::Scheme;
use spin_factor_outbound_http::{
HostFutureIncomingResponse, InterceptOutcome, OutgoingRequestConfig, Request,
};
use spin_core::async_trait;
use spin_factor_outbound_http::{InterceptOutcome, Request};
use spin_factor_outbound_networking::parse_service_chaining_target;
use spin_factors::RuntimeFactors;
use spin_http::routes::RouteMatch;
use wasmtime_wasi_http::types::IncomingResponse;
use wasmtime_wasi_http::{HttpError, HttpResult};

use crate::HttpServer;

Expand All @@ -27,40 +26,23 @@ impl<F: RuntimeFactors> OutboundHttpInterceptor<F> {

const CHAINED_CLIENT_ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);

#[async_trait]
impl<F: RuntimeFactors> spin_factor_outbound_http::OutboundHttpInterceptor
for OutboundHttpInterceptor<F>
{
fn intercept(
&self,
request: &mut Request,
config: &mut OutgoingRequestConfig,
) -> InterceptOutcome {
let uri = request.uri();

async fn intercept(&self, request: &mut Request) -> HttpResult<InterceptOutcome> {
// Handle service chaining requests
if let Some(component_id) = parse_service_chaining_target(uri) {
// TODO: look at the rest of chain_request
let route_match = RouteMatch::synthetic(&component_id, uri.path());
if let Some(component_id) = parse_service_chaining_target(request.uri()) {
let req = std::mem::take(request);
let between_bytes_timeout = config.between_bytes_timeout;
let server = self.server.clone();
let resp_fut = async move {
match server
.handle_trigger_route(req, route_match, Scheme::HTTP, CHAINED_CLIENT_ADDR)
.await
{
Ok(resp) => Ok(Ok(IncomingResponse {
resp,
between_bytes_timeout,
worker: None,
})),
Err(e) => Err(wasmtime::Error::msg(e)),
}
};
let resp = HostFutureIncomingResponse::pending(wasmtime_wasi::runtime::spawn(resp_fut));
InterceptOutcome::Complete(Ok(resp))
let route_match = RouteMatch::synthetic(&component_id, req.uri().path());
let resp = self
.server
.handle_trigger_route(req, route_match, Scheme::HTTP, CHAINED_CLIENT_ADDR)
.await
.map_err(HttpError::trap)?;
Ok(InterceptOutcome::Complete(resp))
} else {
InterceptOutcome::Continue
Ok(InterceptOutcome::Continue)
}
}
}

0 comments on commit 351d49d

Please sign in to comment.