From 527987ea13f99f0a44042f1bf4e949c4deacd29b Mon Sep 17 00:00:00 2001 From: Chris Smith <1979423+chris13524@users.noreply.github.com> Date: Thu, 27 Jun 2024 10:01:00 -0700 Subject: [PATCH] feat: v2 migration Cloudflare KV (#81) --- Cargo.lock | 8 +-- src/attestation_store/cf_kv.rs | 88 ++++++++++++++++++++++++++++++ src/attestation_store/migration.rs | 44 +++++++++++++++ src/attestation_store/mod.rs | 2 + src/http_server/mod.rs | 11 ++-- src/main.rs | 20 ++++++- terraform/ecs/cluster.tf | 2 + terraform/ecs/variables.tf | 5 ++ terraform/res_ecs.tf | 2 + terraform/variables.tf | 7 +++ 10 files changed, 177 insertions(+), 12 deletions(-) create mode 100644 src/attestation_store/cf_kv.rs create mode 100644 src/attestation_store/migration.rs diff --git a/Cargo.lock b/Cargo.lock index 526bb4d..087252b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3293,9 +3293,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.34" +version = "0.3.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8248b6521bb14bc45b4067159b9b6ad792e2d6d754d6c41fb50e29fefe38749" +checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885" dependencies = [ "deranged", "itoa", @@ -3314,9 +3314,9 @@ checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.17" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ba3a3ef41e6672a2f0f001392bb5dcd3ff0a9992d618ca761a11c3121547774" +checksum = "3f252a68540fde3a3877aeea552b832b40ab9a69e318efd078774a01ddee1ccf" dependencies = [ "num-conv", "time-core", diff --git a/src/attestation_store/cf_kv.rs b/src/attestation_store/cf_kv.rs new file mode 100644 index 0000000..3a04c28 --- /dev/null +++ b/src/attestation_store/cf_kv.rs @@ -0,0 +1,88 @@ +use { + super::{AttestationStore, Result}, + crate::http_server::{CsrfToken, TokenManager}, + async_trait::async_trait, + hyper::StatusCode, + reqwest::Url, + serde::Serialize, + std::time::Duration, +}; + +#[derive(Clone)] +pub struct CloudflareKv { + pub endpoint: Url, + pub token_manager: TokenManager, + pub http_client: reqwest::Client, +} + +impl CloudflareKv { + pub fn new(endpoint: Url, token_manager: TokenManager) -> Self { + Self { + endpoint, + token_manager, + http_client: reqwest::Client::new(), + } + } +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct SetAttestationCompatBody<'a> { + attestation_id: &'a str, + origin: &'a str, +} + +#[async_trait] +impl AttestationStore for CloudflareKv { + async fn set_attestation(&self, id: &str, origin: &str) -> Result<()> { + let url = self.endpoint.join("/attestation")?; + let res = self + .http_client + .post(url) + .header( + CsrfToken::header_name(), + self.token_manager + .generate_csrf_token() + .map_err(|e| anyhow::anyhow!("{e:?}"))?, + ) + .json(&SetAttestationCompatBody { + attestation_id: id, + origin, + }) + .timeout(Duration::from_secs(1)) + .send() + .await?; + if res.status().is_success() { + Ok(()) + } else { + Err(anyhow::anyhow!( + "Failed to set attestation: status:{} response body:{:?}", + res.status(), + res.text().await + )) + } + } + + async fn get_attestation(&self, id: &str) -> Result> { + let url = self + .endpoint + .join(&format!("/v1/compat-attestation/{id}"))?; + let response = self + .http_client + .get(url) + .timeout(Duration::from_secs(1)) + .send() + .await?; + match response.status() { + status if status.is_success() => { + let value = response.text().await?; + Ok(Some(value)) + } + StatusCode::NOT_FOUND => Ok(None), + status => Err(anyhow::anyhow!( + "Failed to get attestation: status:{status} response body:{:?}", + response.text().await + )), + } + } +} diff --git a/src/attestation_store/migration.rs b/src/attestation_store/migration.rs new file mode 100644 index 0000000..d22083a --- /dev/null +++ b/src/attestation_store/migration.rs @@ -0,0 +1,44 @@ +use { + super::{cf_kv::CloudflareKv, AttestationStore, Result}, + crate::util::redis, + async_trait::async_trait, +}; + +pub struct Store { + redis: redis::Adapter, + cf_kv: CloudflareKv, +} + +impl Store { + pub fn new(redis: redis::Adapter, cf_kv: CloudflareKv) -> Self { + Self { redis, cf_kv } + } +} + +#[async_trait] +impl AttestationStore for Store { + async fn set_attestation(&self, id: &str, origin: &str) -> Result<()> { + let redis_fut = self.redis.set_attestation(id, origin); + let cf_kv_fut = self.cf_kv.set_attestation(id, origin); + let (redis_res, cf_kv_res) = tokio::join!(redis_fut, cf_kv_fut); + if let Err(e) = cf_kv_res { + log::error!("Failed to set attestation in Cloudflare KV: {e} {e:?}"); + } + redis_res + } + + async fn get_attestation(&self, id: &str) -> Result> { + if let Some(attestation) = self.redis.get_attestation(id).await? { + Ok(Some(attestation)) + } else { + let res = self.cf_kv.get_attestation(id).await; + match res { + Ok(a) => Ok(a), + Err(e) => { + log::error!("Failed to get attestation from Cloudflare KV: {e} {e:?}"); + Ok(None) + } + } + } + } +} diff --git a/src/attestation_store/mod.rs b/src/attestation_store/mod.rs index f92eaf2..16adea2 100644 --- a/src/attestation_store/mod.rs +++ b/src/attestation_store/mod.rs @@ -1,3 +1,5 @@ +pub mod cf_kv; +pub mod migration; pub mod redis; use async_trait::async_trait; diff --git a/src/http_server/mod.rs b/src/http_server/mod.rs index 19b4f92..a4a4191 100644 --- a/src/http_server/mod.rs +++ b/src/http_server/mod.rs @@ -84,13 +84,14 @@ impl Server { } } -struct TokenManager { +#[derive(Clone)] +pub struct TokenManager { encoding_key: jsonwebtoken::EncodingKey, decoding_key: jsonwebtoken::DecodingKey, } impl TokenManager { - fn new(secret: &[u8]) -> Self { + pub fn new(secret: &[u8]) -> Self { Self { encoding_key: jsonwebtoken::EncodingKey::from_secret(secret), decoding_key: jsonwebtoken::DecodingKey::from_secret(secret), @@ -262,14 +263,14 @@ where } #[derive(Serialize, Deserialize)] -struct CsrfToken { +pub struct CsrfToken { exp: usize, } impl CsrfToken { // Using const value instead of a fn produces this warning: // https://rust-lang.github.io/rust-clippy/master/index.html#declare_interior_mutable_const - const fn header_name() -> HeaderName { + pub const fn header_name() -> HeaderName { HeaderName::from_static("x-csrf-token") } @@ -282,7 +283,7 @@ impl CsrfToken { } impl TokenManager { - fn generate_csrf_token(&self) -> Result { + pub fn generate_csrf_token(&self) -> Result { use jsonwebtoken::{encode, get_current_timestamp, Header}; const TTL_SECS: usize = 60 * 60; // 1 hour diff --git a/src/main.rs b/src/main.rs index 1ccbd8b..814ed65 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,8 +8,9 @@ use { AXUM_HTTP_REQUESTS_DURATION_SECONDS, }, bouncer::{ + attestation_store::{cf_kv::CloudflareKv, migration}, event_sink, - http_server::{RequestInfo, ServerConfig}, + http_server::{RequestInfo, ServerConfig, TokenManager}, project_registry::{self, CachedExt as _}, scam_guard, util::redis, @@ -55,6 +56,8 @@ pub struct Configuration { pub data_api_auth_token: String, pub scam_guard_cache_url: String, + pub cf_kv_endpoint: String, + pub secret: String, pub s3_endpoint: Option, @@ -99,8 +102,19 @@ async fn main() -> Result<(), anyhow::Error> { .install_recorder() .context("Failed to install Prometheus metrics recorder")?; - let attestation_store = redis::new("attestation_store", config.attestation_cache_url.clone()) - .context("Failed to initialize AttestationStore")?; + let attestation_store = { + let redis_attestation_store = + redis::new("attestation_store", config.attestation_cache_url.clone()) + .context("Failed to initialize AttestationStore")?; + let cf_kv_attestation_store = CloudflareKv::new( + config + .cf_kv_endpoint + .parse() + .context("Failed to parse cf_kv_endpoint")?, + TokenManager::new(config.secret.as_bytes()), + ); + migration::Store::new(redis_attestation_store, cf_kv_attestation_store) + }; let project_registry_cache = redis::new( "project_registry_cache", diff --git a/terraform/ecs/cluster.tf b/terraform/ecs/cluster.tf index 2b0ed8f..ae4d2d2 100644 --- a/terraform/ecs/cluster.tf +++ b/terraform/ecs/cluster.tf @@ -86,6 +86,8 @@ resource "aws_ecs_task_definition" "app_task" { { name = "PROJECT_REGISTRY_CACHE_URL", value = var.project_registry_cache_url }, { name = "SCAM_GUARD_CACHE_URL", value = var.scam_guard_cache_url }, + { name = "CF_KV_ENDPOINT", value = var.cf_kv_endpoint }, + { name = "DATA_LAKE_BUCKET", value = var.analytics_datalake_bucket_name }, { name = "BLOCKED_COUNTRIES", value = var.ofac_blocked_countries }, diff --git a/terraform/ecs/variables.tf b/terraform/ecs/variables.tf index 2205d2a..3c8d53a 100644 --- a/terraform/ecs/variables.tf +++ b/terraform/ecs/variables.tf @@ -152,6 +152,11 @@ variable "scam_guard_cache_url" { type = string } +variable "cf_kv_endpoint" { + description = "The endpoint of the Cloudflare KV worker" + type = string +} + variable "ofac_blocked_countries" { description = "The list of countries under OFAC sanctions" type = string diff --git a/terraform/res_ecs.tf b/terraform/res_ecs.tf index d93e4a2..a37a361 100644 --- a/terraform/res_ecs.tf +++ b/terraform/res_ecs.tf @@ -65,6 +65,8 @@ module "ecs" { project_registry_cache_url = "redis://${module.redis.endpoint}/1" scam_guard_cache_url = "redis://${module.redis.endpoint}/2" + cf_kv_endpoint = var.cf_kv_endpoint + ofac_blocked_countries = var.ofac_blocked_countries # Analytics diff --git a/terraform/variables.tf b/terraform/variables.tf index e364e94..a2aea4a 100644 --- a/terraform/variables.tf +++ b/terraform/variables.tf @@ -75,6 +75,13 @@ variable "ofac_blocked_countries" { default = "" } +#------------------------------------------------------------------------------- +# Cloudflare KV for V2 migration + +variable "cf_kv_endpoint" { + description = "The endpoint of the Cloudflare KV worker" + type = string +} #------------------------------------------------------------------------------- # Project Registry