From 795a963d404e09fa8c2445a6488ebe33dfbf7119 Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Mon, 22 Jan 2024 21:28:55 -0500 Subject: [PATCH] wip: guarantee subscribe --- src/model/helpers.rs | 22 +++----- .../handlers/subscribe_topic.rs | 32 +++++++++-- .../handlers/notify_subscribe.rs | 56 +++++++++++-------- 3 files changed, 70 insertions(+), 40 deletions(-) diff --git a/src/model/helpers.rs b/src/model/helpers.rs index 2e89cba0..90a783f9 100644 --- a/src/model/helpers.rs +++ b/src/model/helpers.rs @@ -13,7 +13,7 @@ use { ed25519_dalek::SigningKey, relay_rpc::domain::{ProjectId, Topic}, serde::{Deserialize, Serialize}, - sqlx::{FromRow, PgPool, Postgres}, + sqlx::{FromRow, PgExecutor, PgPool, Postgres, Transaction}, std::{collections::HashSet, time::Instant}, tracing::instrument, uuid::Uuid, @@ -28,13 +28,13 @@ pub struct ProjectWithPublicKeys { pub topic: String, } -pub async fn upsert_project( +pub async fn upsert_project<'e>( project_id: ProjectId, app_domain: &str, topic: Topic, authentication_key: &SigningKey, subscribe_key: &StaticSecret, - postgres: &PgPool, + postgres: impl sqlx::PgExecutor<'e>, metrics: Option<&Metrics>, ) -> Result { let authentication_public_key = encode_authentication_public_key(authentication_key); @@ -58,7 +58,7 @@ pub async fn upsert_project( // TODO test idempotency #[allow(clippy::too_many_arguments)] #[instrument(skip(authentication_private_key, subscribe_private_key, postgres, metrics))] -async fn upsert_project_impl( +async fn upsert_project_impl<'e>( project_id: ProjectId, app_domain: &str, topic: Topic, @@ -66,7 +66,7 @@ async fn upsert_project_impl( authentication_private_key: String, subscribe_public_key: String, subscribe_private_key: String, - postgres: &PgPool, + postgres: impl sqlx::PgExecutor<'e>, metrics: Option<&Metrics>, ) -> Result { let query = " @@ -324,18 +324,16 @@ pub struct SubscribeResponse { } // TODO test idempotency -#[instrument(skip(postgres, metrics))] +#[instrument(skip(txn, metrics))] pub async fn upsert_subscriber( project: Uuid, account: AccountId, scope: HashSet, notify_key: &[u8; 32], notify_topic: Topic, - postgres: &PgPool, + txn: &mut Transaction<'_, Postgres>, metrics: Option<&Metrics>, ) -> Result { - let mut txn = postgres.begin().await?; - // `xmax = 0`: https://stackoverflow.com/a/39204667 let query = " @@ -369,9 +367,7 @@ pub async fn upsert_subscriber( metrics.postgres_query("upsert_subscriber", start); } - update_subscriber_scope(subscriber.id, scope, &mut txn, metrics).await?; - - txn.commit().await?; + update_subscriber_scope(subscriber.id, scope, txn, metrics).await?; Ok(subscriber) } @@ -415,7 +411,7 @@ pub async fn update_subscriber( async fn update_subscriber_scope( subscriber: Uuid, scope: HashSet, - txn: &mut sqlx::Transaction<'_, Postgres>, + txn: &mut Transaction<'_, Postgres>, metrics: Option<&Metrics>, ) -> Result<(), sqlx::error::Error> { let query = " diff --git a/src/services/public_http_server/handlers/subscribe_topic.rs b/src/services/public_http_server/handlers/subscribe_topic.rs index cf85e389..ea90ad49 100644 --- a/src/services/public_http_server/handlers/subscribe_topic.rs +++ b/src/services/public_http_server/handlers/subscribe_topic.rs @@ -2,9 +2,10 @@ use { crate::{ error::NotifyServerError, model::helpers::upsert_project, - publish_relay_message::subscribe_relay_topic, + publish_relay_message::{publish_relay_message, subscribe_relay_topic}, rate_limit::{self, Clock, RateLimitError}, registry::{extractor::AuthedProjectId, storage::redis::Redis}, + spec::{NOTIFY_NOOP_TAG, NOTIFY_NOOP_TTL}, state::AppState, utils::topic_from_key, }, @@ -18,10 +19,10 @@ use { hyper::StatusCode, once_cell::sync::Lazy, regex::Regex, - relay_rpc::domain::ProjectId, + relay_rpc::{domain::ProjectId, rpc::Publish}, serde::{Deserialize, Serialize}, serde_json::json, - std::sync::Arc, + std::sync::{Arc, OnceLock}, tracing::{info, instrument}, x25519_dalek::{PublicKey, StaticSecret}, }; @@ -80,13 +81,14 @@ pub async fn handler( let authentication_key = ed25519_dalek::SigningKey::generate(&mut OsRng); + let mut txn = state.postgres.begin().await?; let project = upsert_project( project_id, &app_domain, topic.clone(), &authentication_key, &subscribe_key, - &state.postgres, + &mut *txn, state.metrics.as_ref(), ) .await @@ -103,8 +105,30 @@ pub async fn handler( if project.topic == topic.as_ref() { info!("Subscribing to project topic: {topic}"); subscribe_relay_topic(&state.relay_ws_client, &topic, state.metrics.as_ref()).await?; + + // Send noop to extend ttl of relay's mapping + info!("Timing: Publishing noop to notify_topic"); + publish_relay_message( + &state.relay_http_client, + &Publish { + topic, + message: { + // Extremely minor performance optimization with OnceLock to avoid allocating the same empty string everytime + static LOCK: OnceLock> = OnceLock::new(); + LOCK.get_or_init(|| "".into()).clone() + }, + tag: NOTIFY_NOOP_TAG, + ttl_secs: NOTIFY_NOOP_TTL.as_secs() as u32, + prompt: false, + }, + state.metrics.as_ref(), + ) + .await?; + info!("Timing: Finished publishing noop to notify_topic"); } + txn.commit().await?; + Ok(Json(SubscribeTopicResponseBody { authentication_key: project.authentication_public_key, subscribe_key: project.subscribe_public_key, diff --git a/src/services/websocket_server/handlers/notify_subscribe.rs b/src/services/websocket_server/handlers/notify_subscribe.rs index 3c5b0098..2ea3ed39 100644 --- a/src/services/websocket_server/handlers/notify_subscribe.rs +++ b/src/services/websocket_server/handlers/notify_subscribe.rs @@ -153,6 +153,12 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<(), Relay let scope = parse_scope(&request_auth.scp) .map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?; // TODO change to client error? + let mut txn = state + .postgres + .begin() + .await + .map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?; + let subscriber = { // Technically we don't need to derive based on client_public_key anymore; we just need a symkey. But this is technical // debt from when clients derived the same symkey on their end via Diffie-Hellman. But now they use the value from @@ -169,7 +175,7 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<(), Relay scope.clone(), ¬ify_key, notify_topic, - &state.postgres, + &mut txn, state.metrics.as_ref(), ) .await @@ -204,6 +210,31 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<(), Relay .map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?; info!("Timing: Finished subscribing to topic"); + // Send noop to extend ttl of relay's mapping + info!("Timing: Publishing noop to notify_topic"); + publish_relay_message( + &state.relay_http_client, + &Publish { + topic: notify_topic.clone(), + message: { + // Extremely minor performance optimization with OnceLock to avoid allocating the same empty string everytime + static LOCK: OnceLock> = OnceLock::new(); + LOCK.get_or_init(|| "".into()).clone() + }, + tag: NOTIFY_NOOP_TAG, + ttl_secs: NOTIFY_NOOP_TTL.as_secs() as u32, + prompt: false, + }, + state.metrics.as_ref(), + ) + .await + .map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?; // TODO change to client error? + info!("Timing: Finished publishing noop to notify_topic"); + + txn.commit() + .await + .map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?; + info!("Timing: Recording SubscriberUpdateParams"); state.analytics.client(SubscriberUpdateParams { project_pk: project.id, @@ -215,7 +246,7 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<(), Relay method: NotifyClientMethod::Subscribe, old_scope: HashSet::new(), new_scope: scope.clone(), - notification_topic: notify_topic.clone(), + notification_topic: notify_topic, topic, }); info!("Timing: Finished recording SubscriberUpdateParams"); @@ -277,27 +308,6 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<(), Relay info!("Finished publishing subscribe response"); } - // Send noop to extend ttl of relay's mapping - info!("Timing: Publishing noop to notify_topic"); - publish_relay_message( - &state.relay_http_client, - &Publish { - topic: notify_topic, - message: { - // Extremely minor performance optimization with OnceLock to avoid allocating the same empty string everytime - static LOCK: OnceLock> = OnceLock::new(); - LOCK.get_or_init(|| "".into()).clone() - }, - tag: NOTIFY_NOOP_TAG, - ttl_secs: NOTIFY_NOOP_TTL.as_secs() as u32, - prompt: false, - }, - state.metrics.as_ref(), - ) - .await - .map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?; // TODO change to client error? - info!("Timing: Finished publishing noop to notify_topic"); - // TODO do in same txn as upsert_subscriber() if subscriber.inserted { let welcome_notification =