From 965f6b034008116c2151555bb3e5c61f5f6a51f5 Mon Sep 17 00:00:00 2001 From: Nikolay Komarevskiy Date: Wed, 1 Nov 2023 22:50:21 +0100 Subject: [PATCH] add RouteProvider --- ic-agent/src/agent/agent_error.rs | 4 + .../agent/http_transport/hyper_transport.rs | 72 +++++---------- ic-agent/src/agent/http_transport/mod.rs | 2 + .../agent/http_transport/reqwest_transport.rs | 33 +++---- .../agent/http_transport/route_provider.rs | 91 +++++++++++++++++++ 5 files changed, 137 insertions(+), 65 deletions(-) create mode 100644 ic-agent/src/agent/http_transport/route_provider.rs diff --git a/ic-agent/src/agent/agent_error.rs b/ic-agent/src/agent/agent_error.rs index e02959d4c..8f50c32a8 100644 --- a/ic-agent/src/agent/agent_error.rs +++ b/ic-agent/src/agent/agent_error.rs @@ -163,6 +163,10 @@ pub enum AgentError { /// The rejected call had an invalid reject code (valid range 1..5). #[error(transparent)] InvalidRejectCode(#[from] InvalidRejectCodeError), + + /// Route provider failed to generate a url for some reason. + #[error("Route provider failed to generate url: {0}")] + RouteProviderError(String), } impl PartialEq for AgentError { diff --git a/ic-agent/src/agent/http_transport/hyper_transport.rs b/ic-agent/src/agent/http_transport/hyper_transport.rs index 057218060..9b771cf51 100644 --- a/ic-agent/src/agent/http_transport/hyper_transport.rs +++ b/ic-agent/src/agent/http_transport/hyper_transport.rs @@ -3,10 +3,9 @@ pub use hyper; use std::{any, error::Error, future::Future, marker::PhantomData, sync::atomic::AtomicPtr}; -use http::uri::{Authority, PathAndQuery}; use http_body::{LengthLimitError, Limited}; use hyper::{ - body::{Bytes, HttpBody}, + body::HttpBody, client::HttpConnector, header::CONTENT_TYPE, service::Service, @@ -17,7 +16,7 @@ use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder}; use crate::{ agent::{ agent_error::HttpErrorPayload, - http_transport::{IC0_DOMAIN, IC0_SUB_DOMAIN}, + http_transport::route_provider::{RoundRobinRouteProvider, RouteProvider}, AgentFuture, Transport, }, export::Principal, @@ -28,7 +27,7 @@ use crate::{ #[derive(Debug)] pub struct HyperTransport, B1>> { _marker: PhantomData>, - url: Uri, + route_provider: Box, max_response_body_size: Option, service: S, } @@ -105,48 +104,18 @@ where { /// Creates a replica transport from a HTTP URL and a [`HyperService`]. pub fn create_with_service>(url: U, service: S) -> Result { - // Parse the url - let url = url.into(); - let mut parts = url.clone().into_parts(); - parts.authority = parts - .authority - .map(|v| { - let host = v.host(); - let host = match host.len().checked_sub(IC0_SUB_DOMAIN.len()) { - None => host, - Some(start) if host[start..].eq_ignore_ascii_case(IC0_SUB_DOMAIN) => IC0_DOMAIN, - Some(_) => host, - }; - let port = v.port(); - let (colon, port) = match port.as_ref() { - Some(v) => (":", v.as_str()), - None => ("", ""), - }; - Authority::from_maybe_shared(Bytes::from(format!("{host}{colon}{port}"))) - }) - .transpose() - .map_err(|_| AgentError::InvalidReplicaUrl(format!("{url}")))?; - parts.path_and_query = Some( - parts - .path_and_query - .map_or(Ok(PathAndQuery::from_static("/api/v2")), |v| { - let mut found = false; - fn replace(a: T, b: &mut T) -> T { - std::mem::replace(b, a) - } - let v = v - .path() - .trim_end_matches(|c| !replace(found || c == '/', &mut found)); - PathAndQuery::from_maybe_shared(Bytes::from(format!("{v}/api/v2"))) - }) - .map_err(|_| AgentError::InvalidReplicaUrl(format!("{url}")))?, - ); - let url = - Uri::from_parts(parts).map_err(|_| AgentError::InvalidReplicaUrl(format!("{url}")))?; + let route_provider = Box::new(RoundRobinRouteProvider::new(vec![url.into()])?); + Self::create_with_service_route(route_provider, service) + } + /// Creates a replica transport from a [`RouteProvider`] and a [`HyperService`]. + pub fn create_with_service_route( + route_provider: Box, + service: S, + ) -> Result { Ok(Self { _marker: PhantomData, - url, + route_provider, service, max_response_body_size: None, }) @@ -243,7 +212,10 @@ where _request_id: RequestId, ) -> AgentFuture<()> { Box::pin(async move { - let url = format!("{}/canister/{effective_canister_id}/call", self.url); + let url = format!( + "{}/canister/{effective_canister_id}/call", + self.route_provider.route() + ); self.request(Method::POST, url, Some(envelope)).await?; Ok(()) }) @@ -255,7 +227,10 @@ where envelope: Vec, ) -> AgentFuture> { Box::pin(async move { - let url = format!("{}/canister/{effective_canister_id}/read_state", self.url); + let url = format!( + "{}/canister/{effective_canister_id}/read_state", + self.route_provider.route() + ); self.request(Method::POST, url, Some(envelope)).await }) } @@ -269,14 +244,17 @@ where fn query(&self, effective_canister_id: Principal, envelope: Vec) -> AgentFuture> { Box::pin(async move { - let url = format!("{}/canister/{effective_canister_id}/query", self.url); + let url = format!( + "{}/canister/{effective_canister_id}/query", + self.route_provider.route() + ); self.request(Method::POST, url, Some(envelope)).await }) } fn status(&self) -> AgentFuture> { Box::pin(async move { - let url = format!("{}/status", self.url); + let url = format!("{}/status", self.route_provider.route()); self.request(Method::GET, url, None).await }) } diff --git a/ic-agent/src/agent/http_transport/mod.rs b/ic-agent/src/agent/http_transport/mod.rs index 33694cc8d..e17300d68 100644 --- a/ic-agent/src/agent/http_transport/mod.rs +++ b/ic-agent/src/agent/http_transport/mod.rs @@ -26,3 +26,5 @@ pub use hyper_transport::*; // remove after 0.25 const IC0_DOMAIN: &str = "ic0.app"; #[allow(dead_code)] const IC0_SUB_DOMAIN: &str = ".ic0.app"; + +pub mod route_provider; diff --git a/ic-agent/src/agent/http_transport/reqwest_transport.rs b/ic-agent/src/agent/http_transport/reqwest_transport.rs index e0cbbd2fc..b04ba2f74 100644 --- a/ic-agent/src/agent/http_transport/reqwest_transport.rs +++ b/ic-agent/src/agent/http_transport/reqwest_transport.rs @@ -7,13 +7,13 @@ pub use reqwest; use futures_util::StreamExt; use reqwest::{ header::{HeaderMap, CONTENT_TYPE}, - Body, Client, Method, Request, StatusCode, Url, + Body, Client, Method, Request, StatusCode, }; use crate::{ agent::{ agent_error::HttpErrorPayload, - http_transport::{IC0_DOMAIN, IC0_SUB_DOMAIN}, + http_transport::route_provider::{RoundRobinRouteProvider, RouteProvider}, AgentFuture, Transport, }, export::Principal, @@ -21,15 +21,14 @@ use crate::{ }; /// A [`Transport`] using [`reqwest`] to make HTTP calls to the Internet Computer. -#[derive(Debug)] pub struct ReqwestTransport { - url: Url, + route_provider: Box, client: Client, max_response_body_size: Option, } #[doc(hidden)] -pub use ReqwestTransport as ReqwestHttpReplicaV2Transport; // deprecate after 0.24 +pub use ReqwestTransport as ReqwestHttpReplicaV2Transport; impl ReqwestTransport { /// Creates a replica transport from a HTTP URL. @@ -52,19 +51,17 @@ impl ReqwestTransport { /// Creates a replica transport from a HTTP URL and a [`reqwest::Client`]. pub fn create_with_client>(url: U, client: Client) -> Result { - let url = url.into(); + let route_provider = Box::new(RoundRobinRouteProvider::new(vec![url.into()])?); + Self::create_with_client_route(route_provider, client) + } + + /// Creates a replica transport from a [`RouteProvider`] and a [`reqwest::Client`]. + pub fn create_with_client_route( + route_provider: Box, + client: Client, + ) -> Result { Ok(Self { - url: Url::parse(&url) - .and_then(|mut url| { - // rewrite *.ic0.app to ic0.app - if let Some(domain) = url.domain() { - if domain.ends_with(IC0_SUB_DOMAIN) { - url.set_host(Some(IC0_DOMAIN))?; - } - } - url.join("api/v2/") - }) - .map_err(|_| AgentError::InvalidReplicaUrl(url.clone()))?, + route_provider, client, max_response_body_size: None, }) @@ -127,7 +124,7 @@ impl ReqwestTransport { endpoint: &str, body: Option>, ) -> Result, AgentError> { - let url = self.url.join(endpoint)?; + let url = self.route_provider.route()?.join(endpoint)?; let mut http_request = Request::new(method, url); http_request .headers_mut() diff --git a/ic-agent/src/agent/http_transport/route_provider.rs b/ic-agent/src/agent/http_transport/route_provider.rs new file mode 100644 index 000000000..a919b1152 --- /dev/null +++ b/ic-agent/src/agent/http_transport/route_provider.rs @@ -0,0 +1,91 @@ +//! A [`RouteProvider`] for dynamic generation of routing urls. +use std::{ + str::FromStr, + sync::atomic::{AtomicUsize, Ordering}, +}; +use url::Url; + +use crate::agent::{ + http_transport::{IC0_DOMAIN, IC0_SUB_DOMAIN}, + AgentError, +}; + +/// A [`RouteProvider`] for dynamic generation of routing urls. +pub trait RouteProvider: Send + Sync { + /// Generate next routing url + fn route(&self) -> Result; +} + +/// A simple implementation of the [`RouteProvider``] which produces an even distribution of the urls from the input ones. +pub struct RoundRobinRouteProvider { + routes: Vec, + current_idx: AtomicUsize, +} + +impl RouteProvider for RoundRobinRouteProvider { + fn route(&self) -> Result { + if self.routes.is_empty() { + return Err(AgentError::RouteProviderError( + "No routing urls provided".to_string(), + )); + } + // This operation wraps around an overflow, i.e. after max is reached the value is reset back to 0. + let prev_idx = self.current_idx.fetch_add(1, Ordering::Relaxed); + Ok(self.routes[prev_idx % self.routes.len()].clone()) + } +} + +impl RoundRobinRouteProvider { + /// Construct [`RoundRobinRouteProvider`] from a vector of urls. + pub fn new>(routes: Vec) -> Result { + let routes: Result, _> = routes + .into_iter() + .map(|url| { + Url::from_str(url.as_ref()).and_then(|mut url| { + // rewrite *.ic0.app to ic0.app + if let Some(domain) = url.domain() { + if domain.ends_with(IC0_SUB_DOMAIN) { + url.set_host(Some(IC0_DOMAIN))? + } + } + url.join("/api/v2/") + }) + }) + .collect(); + Ok(Self { + routes: routes?, + current_idx: AtomicUsize::new(0), + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_empty_routes() { + let provider = RoundRobinRouteProvider::new::<&str>(vec![]) + .expect("failed to create a route provider"); + let result = provider.route().unwrap_err(); + assert_eq!( + result, + AgentError::RouteProviderError("No routing urls provided".to_string()) + ); + } + + #[test] + fn test_routes_rotation() { + let provider = RoundRobinRouteProvider::new(vec!["https://url1.com", "https://url2.com"]) + .expect("failed to create a route provider"); + let url_strings = vec!["https://url1.com", "https://url2.com", "https://url1.com"]; + let expected_urls: Vec = url_strings + .iter() + .map(|url_str| Url::parse(url_str).expect("Invalid URL")) + .collect(); + let urls: Vec = (0..3) + .map(|_| provider.route().expect("failed to get next url")) + .collect(); + assert_eq!(expected_urls, urls); + } +}