Skip to content

Commit

Permalink
feat: v2 migration Cloudflare KV (#81)
Browse files Browse the repository at this point in the history
  • Loading branch information
chris13524 authored Jun 27, 2024
1 parent 6871789 commit 527987e
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 12 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

88 changes: 88 additions & 0 deletions src/attestation_store/cf_kv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use {
super::{AttestationStore, Result},
crate::http_server::{CsrfToken, TokenManager},
async_trait::async_trait,
hyper::StatusCode,
reqwest::Url,
serde::Serialize,
std::time::Duration,
};

#[derive(Clone)]
pub struct CloudflareKv {
pub endpoint: Url,
pub token_manager: TokenManager,
pub http_client: reqwest::Client,
}

impl CloudflareKv {
pub fn new(endpoint: Url, token_manager: TokenManager) -> Self {
Self {
endpoint,
token_manager,
http_client: reqwest::Client::new(),
}
}
}

#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct SetAttestationCompatBody<'a> {
attestation_id: &'a str,
origin: &'a str,
}

#[async_trait]
impl AttestationStore for CloudflareKv {
async fn set_attestation(&self, id: &str, origin: &str) -> Result<()> {
let url = self.endpoint.join("/attestation")?;
let res = self
.http_client
.post(url)
.header(
CsrfToken::header_name(),
self.token_manager
.generate_csrf_token()
.map_err(|e| anyhow::anyhow!("{e:?}"))?,
)
.json(&SetAttestationCompatBody {
attestation_id: id,
origin,
})
.timeout(Duration::from_secs(1))
.send()
.await?;
if res.status().is_success() {
Ok(())
} else {
Err(anyhow::anyhow!(
"Failed to set attestation: status:{} response body:{:?}",
res.status(),
res.text().await
))
}
}

async fn get_attestation(&self, id: &str) -> Result<Option<String>> {
let url = self
.endpoint
.join(&format!("/v1/compat-attestation/{id}"))?;
let response = self
.http_client
.get(url)
.timeout(Duration::from_secs(1))
.send()
.await?;
match response.status() {
status if status.is_success() => {
let value = response.text().await?;
Ok(Some(value))
}
StatusCode::NOT_FOUND => Ok(None),
status => Err(anyhow::anyhow!(
"Failed to get attestation: status:{status} response body:{:?}",
response.text().await
)),
}
}
}
44 changes: 44 additions & 0 deletions src/attestation_store/migration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use {
super::{cf_kv::CloudflareKv, AttestationStore, Result},
crate::util::redis,
async_trait::async_trait,
};

pub struct Store {
redis: redis::Adapter,
cf_kv: CloudflareKv,
}

impl Store {
pub fn new(redis: redis::Adapter, cf_kv: CloudflareKv) -> Self {
Self { redis, cf_kv }
}
}

#[async_trait]
impl AttestationStore for Store {
async fn set_attestation(&self, id: &str, origin: &str) -> Result<()> {
let redis_fut = self.redis.set_attestation(id, origin);
let cf_kv_fut = self.cf_kv.set_attestation(id, origin);
let (redis_res, cf_kv_res) = tokio::join!(redis_fut, cf_kv_fut);
if let Err(e) = cf_kv_res {
log::error!("Failed to set attestation in Cloudflare KV: {e} {e:?}");
}
redis_res
}

async fn get_attestation(&self, id: &str) -> Result<Option<String>> {
if let Some(attestation) = self.redis.get_attestation(id).await? {
Ok(Some(attestation))
} else {
let res = self.cf_kv.get_attestation(id).await;
match res {
Ok(a) => Ok(a),
Err(e) => {
log::error!("Failed to get attestation from Cloudflare KV: {e} {e:?}");
Ok(None)
}
}
}
}
}
2 changes: 2 additions & 0 deletions src/attestation_store/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod cf_kv;
pub mod migration;
pub mod redis;

use async_trait::async_trait;
Expand Down
11 changes: 6 additions & 5 deletions src/http_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,14 @@ impl<S, G> Server<S, G> {
}
}

struct TokenManager {
#[derive(Clone)]
pub struct TokenManager {
encoding_key: jsonwebtoken::EncodingKey,
decoding_key: jsonwebtoken::DecodingKey,
}

impl TokenManager {
fn new(secret: &[u8]) -> Self {
pub fn new(secret: &[u8]) -> Self {
Self {
encoding_key: jsonwebtoken::EncodingKey::from_secret(secret),
decoding_key: jsonwebtoken::DecodingKey::from_secret(secret),
Expand Down Expand Up @@ -262,14 +263,14 @@ where
}

#[derive(Serialize, Deserialize)]
struct CsrfToken {
pub struct CsrfToken {
exp: usize,
}

impl CsrfToken {
// Using const value instead of a fn produces this warning:
// https://rust-lang.github.io/rust-clippy/master/index.html#declare_interior_mutable_const
const fn header_name() -> HeaderName {
pub const fn header_name() -> HeaderName {
HeaderName::from_static("x-csrf-token")
}

Expand All @@ -282,7 +283,7 @@ impl CsrfToken {
}

impl TokenManager {
fn generate_csrf_token(&self) -> Result<String, Response> {
pub fn generate_csrf_token(&self) -> Result<String, Response> {
use jsonwebtoken::{encode, get_current_timestamp, Header};

const TTL_SECS: usize = 60 * 60; // 1 hour
Expand Down
20 changes: 17 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use {
AXUM_HTTP_REQUESTS_DURATION_SECONDS,
},
bouncer::{
attestation_store::{cf_kv::CloudflareKv, migration},
event_sink,
http_server::{RequestInfo, ServerConfig},
http_server::{RequestInfo, ServerConfig, TokenManager},
project_registry::{self, CachedExt as _},
scam_guard,
util::redis,
Expand Down Expand Up @@ -55,6 +56,8 @@ pub struct Configuration {
pub data_api_auth_token: String,
pub scam_guard_cache_url: String,

pub cf_kv_endpoint: String,

pub secret: String,

pub s3_endpoint: Option<String>,
Expand Down Expand Up @@ -99,8 +102,19 @@ async fn main() -> Result<(), anyhow::Error> {
.install_recorder()
.context("Failed to install Prometheus metrics recorder")?;

let attestation_store = redis::new("attestation_store", config.attestation_cache_url.clone())
.context("Failed to initialize AttestationStore")?;
let attestation_store = {
let redis_attestation_store =
redis::new("attestation_store", config.attestation_cache_url.clone())
.context("Failed to initialize AttestationStore")?;
let cf_kv_attestation_store = CloudflareKv::new(
config
.cf_kv_endpoint
.parse()
.context("Failed to parse cf_kv_endpoint")?,
TokenManager::new(config.secret.as_bytes()),
);
migration::Store::new(redis_attestation_store, cf_kv_attestation_store)
};

let project_registry_cache = redis::new(
"project_registry_cache",
Expand Down
2 changes: 2 additions & 0 deletions terraform/ecs/cluster.tf
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ resource "aws_ecs_task_definition" "app_task" {
{ name = "PROJECT_REGISTRY_CACHE_URL", value = var.project_registry_cache_url },
{ name = "SCAM_GUARD_CACHE_URL", value = var.scam_guard_cache_url },

{ name = "CF_KV_ENDPOINT", value = var.cf_kv_endpoint },

{ name = "DATA_LAKE_BUCKET", value = var.analytics_datalake_bucket_name },

{ name = "BLOCKED_COUNTRIES", value = var.ofac_blocked_countries },
Expand Down
5 changes: 5 additions & 0 deletions terraform/ecs/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ variable "scam_guard_cache_url" {
type = string
}

variable "cf_kv_endpoint" {
description = "The endpoint of the Cloudflare KV worker"
type = string
}

variable "ofac_blocked_countries" {
description = "The list of countries under OFAC sanctions"
type = string
Expand Down
2 changes: 2 additions & 0 deletions terraform/res_ecs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ module "ecs" {
project_registry_cache_url = "redis://${module.redis.endpoint}/1"
scam_guard_cache_url = "redis://${module.redis.endpoint}/2"

cf_kv_endpoint = var.cf_kv_endpoint

ofac_blocked_countries = var.ofac_blocked_countries

# Analytics
Expand Down
7 changes: 7 additions & 0 deletions terraform/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ variable "ofac_blocked_countries" {
default = ""
}

#-------------------------------------------------------------------------------
# Cloudflare KV for V2 migration

variable "cf_kv_endpoint" {
description = "The endpoint of the Cloudflare KV worker"
type = string
}

#-------------------------------------------------------------------------------
# Project Registry
Expand Down

0 comments on commit 527987e

Please sign in to comment.