Skip to content

Commit

Permalink
add RouteProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
nikolay-komarevskiy committed Nov 1, 2023
1 parent ec42733 commit 965f6b0
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 65 deletions.
4 changes: 4 additions & 0 deletions ic-agent/src/agent/agent_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ pub enum AgentError {
/// The rejected call had an invalid reject code (valid range 1..5).
#[error(transparent)]
InvalidRejectCode(#[from] InvalidRejectCodeError),

/// Route provider failed to generate a url for some reason.
#[error("Route provider failed to generate url: {0}")]
RouteProviderError(String),
}

impl PartialEq for AgentError {
Expand Down
72 changes: 25 additions & 47 deletions ic-agent/src/agent/http_transport/hyper_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ pub use hyper;

use std::{any, error::Error, future::Future, marker::PhantomData, sync::atomic::AtomicPtr};

use http::uri::{Authority, PathAndQuery};
use http_body::{LengthLimitError, Limited};
use hyper::{
body::{Bytes, HttpBody},
body::HttpBody,
client::HttpConnector,
header::CONTENT_TYPE,
service::Service,
Expand All @@ -17,7 +16,7 @@ use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
use crate::{
agent::{
agent_error::HttpErrorPayload,
http_transport::{IC0_DOMAIN, IC0_SUB_DOMAIN},
http_transport::route_provider::{RoundRobinRouteProvider, RouteProvider},
AgentFuture, Transport,
},
export::Principal,
Expand All @@ -28,7 +27,7 @@ use crate::{
#[derive(Debug)]
pub struct HyperTransport<B1, S = Client<HttpsConnector<HttpConnector>, B1>> {
_marker: PhantomData<AtomicPtr<B1>>,
url: Uri,
route_provider: Box<dyn RouteProvider>,
max_response_body_size: Option<usize>,
service: S,
}
Expand Down Expand Up @@ -105,48 +104,18 @@ where
{
/// Creates a replica transport from a HTTP URL and a [`HyperService`].
pub fn create_with_service<U: Into<Uri>>(url: U, service: S) -> Result<Self, AgentError> {
// Parse the url
let url = url.into();
let mut parts = url.clone().into_parts();
parts.authority = parts
.authority
.map(|v| {
let host = v.host();
let host = match host.len().checked_sub(IC0_SUB_DOMAIN.len()) {
None => host,
Some(start) if host[start..].eq_ignore_ascii_case(IC0_SUB_DOMAIN) => IC0_DOMAIN,
Some(_) => host,
};
let port = v.port();
let (colon, port) = match port.as_ref() {
Some(v) => (":", v.as_str()),
None => ("", ""),
};
Authority::from_maybe_shared(Bytes::from(format!("{host}{colon}{port}")))
})
.transpose()
.map_err(|_| AgentError::InvalidReplicaUrl(format!("{url}")))?;
parts.path_and_query = Some(
parts
.path_and_query
.map_or(Ok(PathAndQuery::from_static("/api/v2")), |v| {
let mut found = false;
fn replace<T>(a: T, b: &mut T) -> T {
std::mem::replace(b, a)
}
let v = v
.path()
.trim_end_matches(|c| !replace(found || c == '/', &mut found));
PathAndQuery::from_maybe_shared(Bytes::from(format!("{v}/api/v2")))
})
.map_err(|_| AgentError::InvalidReplicaUrl(format!("{url}")))?,
);
let url =
Uri::from_parts(parts).map_err(|_| AgentError::InvalidReplicaUrl(format!("{url}")))?;
let route_provider = Box::new(RoundRobinRouteProvider::new(vec![url.into()])?);
Self::create_with_service_route(route_provider, service)
}

/// Creates a replica transport from a [`RouteProvider`] and a [`HyperService`].
pub fn create_with_service_route(
route_provider: Box<dyn RouteProvider>,
service: S,
) -> Result<Self, AgentError> {
Ok(Self {
_marker: PhantomData,
url,
route_provider,
service,
max_response_body_size: None,
})
Expand Down Expand Up @@ -243,7 +212,10 @@ where
_request_id: RequestId,
) -> AgentFuture<()> {
Box::pin(async move {
let url = format!("{}/canister/{effective_canister_id}/call", self.url);
let url = format!(
"{}/canister/{effective_canister_id}/call",
self.route_provider.route()
);
self.request(Method::POST, url, Some(envelope)).await?;
Ok(())
})
Expand All @@ -255,7 +227,10 @@ where
envelope: Vec<u8>,
) -> AgentFuture<Vec<u8>> {
Box::pin(async move {
let url = format!("{}/canister/{effective_canister_id}/read_state", self.url);
let url = format!(
"{}/canister/{effective_canister_id}/read_state",
self.route_provider.route()
);
self.request(Method::POST, url, Some(envelope)).await
})
}
Expand All @@ -269,14 +244,17 @@ where

fn query(&self, effective_canister_id: Principal, envelope: Vec<u8>) -> AgentFuture<Vec<u8>> {
Box::pin(async move {
let url = format!("{}/canister/{effective_canister_id}/query", self.url);
let url = format!(
"{}/canister/{effective_canister_id}/query",
self.route_provider.route()
);
self.request(Method::POST, url, Some(envelope)).await
})
}

fn status(&self) -> AgentFuture<Vec<u8>> {
Box::pin(async move {
let url = format!("{}/status", self.url);
let url = format!("{}/status", self.route_provider.route());
self.request(Method::GET, url, None).await
})
}
Expand Down
2 changes: 2 additions & 0 deletions ic-agent/src/agent/http_transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ pub use hyper_transport::*; // remove after 0.25
const IC0_DOMAIN: &str = "ic0.app";
#[allow(dead_code)]
const IC0_SUB_DOMAIN: &str = ".ic0.app";

pub mod route_provider;
33 changes: 15 additions & 18 deletions ic-agent/src/agent/http_transport/reqwest_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,28 @@ pub use reqwest;
use futures_util::StreamExt;
use reqwest::{
header::{HeaderMap, CONTENT_TYPE},
Body, Client, Method, Request, StatusCode, Url,
Body, Client, Method, Request, StatusCode,
};

use crate::{
agent::{
agent_error::HttpErrorPayload,
http_transport::{IC0_DOMAIN, IC0_SUB_DOMAIN},
http_transport::route_provider::{RoundRobinRouteProvider, RouteProvider},
AgentFuture, Transport,
},
export::Principal,
AgentError, RequestId,
};

/// A [`Transport`] using [`reqwest`] to make HTTP calls to the Internet Computer.
#[derive(Debug)]
pub struct ReqwestTransport {
url: Url,
route_provider: Box<dyn RouteProvider>,
client: Client,
max_response_body_size: Option<usize>,
}

#[doc(hidden)]
pub use ReqwestTransport as ReqwestHttpReplicaV2Transport; // deprecate after 0.24
pub use ReqwestTransport as ReqwestHttpReplicaV2Transport;

impl ReqwestTransport {
/// Creates a replica transport from a HTTP URL.
Expand All @@ -52,19 +51,17 @@ impl ReqwestTransport {

/// Creates a replica transport from a HTTP URL and a [`reqwest::Client`].
pub fn create_with_client<U: Into<String>>(url: U, client: Client) -> Result<Self, AgentError> {
let url = url.into();
let route_provider = Box::new(RoundRobinRouteProvider::new(vec![url.into()])?);
Self::create_with_client_route(route_provider, client)
}

/// Creates a replica transport from a [`RouteProvider`] and a [`reqwest::Client`].
pub fn create_with_client_route(
route_provider: Box<dyn RouteProvider>,
client: Client,
) -> Result<Self, AgentError> {
Ok(Self {
url: Url::parse(&url)
.and_then(|mut url| {
// rewrite *.ic0.app to ic0.app
if let Some(domain) = url.domain() {
if domain.ends_with(IC0_SUB_DOMAIN) {
url.set_host(Some(IC0_DOMAIN))?;
}
}
url.join("api/v2/")
})
.map_err(|_| AgentError::InvalidReplicaUrl(url.clone()))?,
route_provider,
client,
max_response_body_size: None,
})
Expand Down Expand Up @@ -127,7 +124,7 @@ impl ReqwestTransport {
endpoint: &str,
body: Option<Vec<u8>>,
) -> Result<Vec<u8>, AgentError> {
let url = self.url.join(endpoint)?;
let url = self.route_provider.route()?.join(endpoint)?;
let mut http_request = Request::new(method, url);
http_request
.headers_mut()
Expand Down
91 changes: 91 additions & 0 deletions ic-agent/src/agent/http_transport/route_provider.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
//! A [`RouteProvider`] for dynamic generation of routing urls.
use std::{
str::FromStr,
sync::atomic::{AtomicUsize, Ordering},
};
use url::Url;

use crate::agent::{
http_transport::{IC0_DOMAIN, IC0_SUB_DOMAIN},
AgentError,
};

/// A [`RouteProvider`] for dynamic generation of routing urls.
pub trait RouteProvider: Send + Sync {
/// Generate next routing url
fn route(&self) -> Result<Url, AgentError>;
}

/// A simple implementation of the [`RouteProvider``] which produces an even distribution of the urls from the input ones.
pub struct RoundRobinRouteProvider {
routes: Vec<Url>,
current_idx: AtomicUsize,
}

impl RouteProvider for RoundRobinRouteProvider {
fn route(&self) -> Result<Url, AgentError> {
if self.routes.is_empty() {
return Err(AgentError::RouteProviderError(
"No routing urls provided".to_string(),
));
}
// This operation wraps around an overflow, i.e. after max is reached the value is reset back to 0.
let prev_idx = self.current_idx.fetch_add(1, Ordering::Relaxed);
Ok(self.routes[prev_idx % self.routes.len()].clone())
}
}

impl RoundRobinRouteProvider {
/// Construct [`RoundRobinRouteProvider`] from a vector of urls.
pub fn new<T: AsRef<str>>(routes: Vec<T>) -> Result<Self, AgentError> {
let routes: Result<Vec<Url>, _> = routes
.into_iter()
.map(|url| {
Url::from_str(url.as_ref()).and_then(|mut url| {
// rewrite *.ic0.app to ic0.app
if let Some(domain) = url.domain() {
if domain.ends_with(IC0_SUB_DOMAIN) {
url.set_host(Some(IC0_DOMAIN))?
}
}
url.join("/api/v2/")
})
})
.collect();
Ok(Self {
routes: routes?,
current_idx: AtomicUsize::new(0),
})
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_empty_routes() {
let provider = RoundRobinRouteProvider::new::<&str>(vec![])
.expect("failed to create a route provider");
let result = provider.route().unwrap_err();
assert_eq!(
result,
AgentError::RouteProviderError("No routing urls provided".to_string())
);
}

#[test]
fn test_routes_rotation() {
let provider = RoundRobinRouteProvider::new(vec!["https://url1.com", "https://url2.com"])
.expect("failed to create a route provider");
let url_strings = vec!["https://url1.com", "https://url2.com", "https://url1.com"];
let expected_urls: Vec<Url> = url_strings
.iter()
.map(|url_str| Url::parse(url_str).expect("Invalid URL"))
.collect();
let urls: Vec<Url> = (0..3)
.map(|_| provider.route().expect("failed to get next url"))
.collect();
assert_eq!(expected_urls, urls);
}
}

0 comments on commit 965f6b0

Please sign in to comment.