Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add HTTP middleware for Transport replacement #598

Merged
merged 8 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

* Added `AgentBuilder::with_arc_http_middleware` for `Transport`-like functionality at the level of HTTP requests.
* Add support for dynamic routing based on boundary node discovery. This is an internal feature for now, with a feature flag `_internal_dynamic-routing`.

## [0.38.1] - 2024-09-23
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions ic-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ include = ["src", "Cargo.toml", "../LICENSE", "README.md"]
arc-swap = { version = "1.7", optional = true }
async-channel = { version = "1.9", optional = true }
async-lock = "3.3"
async-trait = { version = "0.1", optional = true }
async-trait = "0.1"
async-watch = { version = "0.3", optional = true }
backoff = "0.4.0"
cached = { version = "0.52", features = ["ahash"], default-features = false }
Expand Down Expand Up @@ -48,6 +48,7 @@ simple_asn1 = "0.6.1"
stop-token = { version = "0.7", optional = true }
thiserror = { workspace = true }
time = { workspace = true }
tower-service = "0.3"
tracing = { version = "0.1", optional = true }
url = "2.1.0"

Expand Down Expand Up @@ -108,7 +109,6 @@ wasm-bindgen = [
_internal_dynamic-routing = [
"dep:arc-swap",
"dep:async-channel",
"dep:async-trait",
"dep:async-watch",
"dep:stop-token",
"tracing",
Expand Down
8 changes: 6 additions & 2 deletions ic-agent/src/agent/agent_config.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use reqwest::Client;

use crate::{
agent::{NonceFactory, NonceGenerator},
identity::{anonymous::AnonymousIdentity, Identity},
};
use reqwest::Client;
use std::{sync::Arc, time::Duration};

use super::route_provider::RouteProvider;
use super::{route_provider::RouteProvider, HttpService};

/// A configuration for an agent.
#[non_exhaustive]
Expand All @@ -28,6 +29,8 @@ pub struct AgentConfig {
pub max_response_body_size: Option<usize>,
/// See [`with_max_tcp_error_retries`](super::AgentBuilder::with_max_tcp_error_retries).
pub max_tcp_error_retries: usize,
/// See [`with_arc_http_middleware`](super::AgentBuilder::with_arc_http_middleware).
pub http_service: Option<Arc<dyn HttpService>>,
}

impl Default for AgentConfig {
Expand All @@ -37,6 +40,7 @@ impl Default for AgentConfig {
identity: Arc::new(AnonymousIdentity {}),
ingress_expiry: None,
client: None,
http_service: None,
verify_query_signatures: true,
max_concurrent_requests: 50,
route_provider: None,
Expand Down
20 changes: 19 additions & 1 deletion ic-agent/src/agent/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
};
use std::sync::Arc;

use super::route_provider::RouteProvider;
use super::{route_provider::RouteProvider, HttpService};

/// A builder for an [`Agent`].
#[derive(Default)]
Expand Down Expand Up @@ -131,10 +131,28 @@ impl AgentBuilder {

/// Provide a pre-configured HTTP client to use. Use this to set e.g. HTTP timeouts or proxy configuration.
pub fn with_http_client(mut self, client: reqwest::Client) -> Self {
assert!(
self.config.http_service.is_none(),
"with_arc_http_middleware cannot be called with with_http_client"
);
self.config.client = Some(client);
self
}

/// Provide a custom `reqwest`-compatible HTTP service, e.g. to add per-request headers for custom boundary nodes.
/// Most users will not need this and should use `with_http_client`. Cannot be called with `with_http_client`.
///
/// The trait is automatically implemented for any `tower::Service` impl matching the one `reqwest::Client` uses,
/// including `reqwest-middleware`. This is a low-level interface, and direct implementations must provide all automatic retry logic.
pub fn with_arc_http_middleware(mut self, service: Arc<dyn HttpService>) -> Self {
assert!(
self.config.client.is_none(),
"with_arc_http_middleware cannot be called with with_http_client"
);
self.config.http_service = Some(service);
self
}

/// Retry up to the specified number of times upon encountering underlying TCP errors.
pub fn with_max_tcp_error_retries(mut self, retries: usize) -> Self {
self.config.max_tcp_error_retries = retries;
Expand Down
178 changes: 120 additions & 58 deletions ic-agent/src/agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub use agent_config::AgentConfig;
pub use agent_error::AgentError;
use agent_error::HttpErrorPayload;
use async_lock::Semaphore;
use async_trait::async_trait;
pub use builder::AgentBuilder;
use cached::{Cached, TimedCache};
use ed25519_consensus::{Error as Ed25519Error, Signature, VerificationKey};
Expand All @@ -27,9 +28,10 @@ pub use ic_transport_types::{
};
pub use nonce::{NonceFactory, NonceGenerator};
use rangemap::{RangeInclusiveMap, RangeInclusiveSet, StepFns};
use reqwest::{Body, Client, Request};
use reqwest::{Body, Client, Request, Response};
use route_provider::RouteProvider;
use time::OffsetDateTime;
use tower_service::Service;

#[cfg(test)]
mod agent_test;
Expand Down Expand Up @@ -148,7 +150,7 @@ pub struct Agent {
identity: Arc<dyn Identity>,
ingress_expiry: Duration,
root_key: Arc<RwLock<Vec<u8>>>,
client: Client,
client: Arc<dyn HttpService>,
route_provider: Arc<dyn RouteProvider>,
subnet_key_cache: Arc<Mutex<SubnetCache>>,
concurrent_requests_semaphore: Arc<Semaphore>,
Expand Down Expand Up @@ -180,19 +182,23 @@ impl Agent {
identity: config.identity,
ingress_expiry: config.ingress_expiry.unwrap_or(DEFAULT_INGRESS_EXPIRY),
root_key: Arc::new(RwLock::new(IC_ROOT_KEY.to_vec())),
client: config.client.unwrap_or_else(|| {
#[cfg(not(target_family = "wasm"))]
{
Client::builder()
.use_rustls_tls()
.timeout(Duration::from_secs(360))
.build()
.expect("Could not create HTTP client.")
}
#[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
{
Client::new()
}
client: config.http_service.unwrap_or_else(|| {
Arc::new(Retry429Logic {
client: config.client.unwrap_or_else(|| {
#[cfg(not(target_family = "wasm"))]
{
Client::builder()
.use_rustls_tls()
.timeout(Duration::from_secs(360))
.build()
.expect("Could not create HTTP client.")
}
#[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
{
Client::new()
}
}),
})
}),
route_provider: config
.route_provider
Expand Down Expand Up @@ -1110,40 +1116,13 @@ impl Agent {
Ok(http_request)
};

// Dispatch request with a retry logic only for non-wasm builds.
let response = {
#[cfg(target_family = "wasm")]
{
let http_request = create_request_with_generated_url()?;
self.client.execute(http_request).await?
}
#[cfg(not(target_family = "wasm"))]
{
// RouteProvider generates urls dynamically. Some of these urls can be potentially unhealthy.
// TCP related errors (host unreachable, connection refused, connection timed out, connection reset) can be safely retried with a newly generated url.

let mut retry_count = 0;

loop {
let http_request = create_request_with_generated_url()?;

match self.client.execute(http_request).await {
Ok(response) => break response,
Err(err) => {
// Network-related errors can be retried.
if err.is_connect() {
if retry_count >= self.max_tcp_error_retries {
return Err(AgentError::TransportError(err));
}
retry_count += 1;
continue;
}
return Err(AgentError::TransportError(err));
}
}
}
}
};
let response = self
.client
.call(
&create_request_with_generated_url,
self.max_tcp_error_retries,
)
.await?;

let http_status = response.status();
let response_headers = response.headers().clone();
Expand Down Expand Up @@ -1184,15 +1163,10 @@ impl Agent {
endpoint: &str,
body: Option<Vec<u8>>,
) -> Result<(StatusCode, Vec<u8>), AgentError> {
let request_result = loop {
let result = self
.request(method.clone(), endpoint, body.as_ref().cloned())
.await?;
if result.0 != StatusCode::TOO_MANY_REQUESTS {
break result;
}
crate::util::sleep(Duration::from_millis(250)).await;
};
let request_result = self
.request(method.clone(), endpoint, body.as_ref().cloned())
.await?;

let status = request_result.0;
let headers = request_result.1;
let body = request_result.2;
Expand Down Expand Up @@ -1868,6 +1842,94 @@ impl<'agent> IntoFuture for UpdateBuilder<'agent> {
}
}

/// HTTP client middleware. Implemented automatically for `reqwest`-compatible by-ref `tower::Service`, such as `reqwest_middleware`.
#[cfg_attr(target_family = "wasm", async_trait(?Send))]
#[cfg_attr(not(target_family = "wasm"), async_trait)]
pub trait HttpService: Send + Sync {
adamspofford-dfinity marked this conversation as resolved.
Show resolved Hide resolved
/// Perform a HTTP request. Any retry logic should call `req` again, instead of `Request::try_clone`.
async fn call<'a>(
&'a self,
req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
max_retries: usize,
) -> Result<Response, AgentError>;
}
#[cfg(not(target_family = "wasm"))]
#[async_trait]
impl<T> HttpService for T
where
for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
for<'a> <&'a Self as Service<Request>>::Future: Send,
T: Send + Sync + ?Sized,
{
#[allow(clippy::needless_arbitrary_self_type)]
async fn call<'a>(
mut self: &'a Self,
req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
max_retries: usize,
) -> Result<Response, AgentError> {
let mut retry_count = 0;
loop {
match Service::call(&mut self, req()?).await {
Err(err) => {
// Network-related errors can be retried.
if err.is_connect() {
if retry_count >= max_retries {
return Err(AgentError::TransportError(err));
}
retry_count += 1;
}
}
Ok(resp) => return Ok(resp),
}
}
}
}

#[cfg(target_family = "wasm")]
#[async_trait(?Send)]
impl<T> HttpService for T
where
for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
T: Send + Sync + ?Sized,
{
#[allow(clippy::needless_arbitrary_self_type)]
async fn call<'a>(
mut self: &'a Self,
req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
_: usize,
) -> Result<Response, AgentError> {
Ok(Service::call(&mut self, req()?).await?)
}
}

struct Retry429Logic {
client: Client,
}

#[cfg_attr(target_family = "wasm", async_trait(?Send))]
#[cfg_attr(not(target_family = "wasm"), async_trait)]
impl HttpService for Retry429Logic {
async fn call<'a>(
&'a self,
req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
_max_retries: usize,
) -> Result<Response, AgentError> {
loop {
#[cfg(not(target_family = "wasm"))]
let resp = self.client.call(req, _max_retries).await?;
// Client inconveniently does not implement Service on wasm
#[cfg(target_family = "wasm")]
let resp = self.client.execute(req()?).await?;
if resp.status() == StatusCode::TOO_MANY_REQUESTS {
crate::util::sleep(Duration::from_millis(250)).await;
continue;
} else {
break Ok(resp);
}
}
}
}

#[cfg(all(test, not(target_family = "wasm")))]
mod offline_tests {
use super::*;
Expand Down
Loading