Skip to content

Commit

Permalink
wip: guarantee subscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
chris13524 committed Jan 23, 2024
1 parent 5605ba2 commit 795a963
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 40 deletions.
22 changes: 9 additions & 13 deletions src/model/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use {
ed25519_dalek::SigningKey,
relay_rpc::domain::{ProjectId, Topic},
serde::{Deserialize, Serialize},
sqlx::{FromRow, PgPool, Postgres},
sqlx::{FromRow, PgExecutor, PgPool, Postgres, Transaction},

Check warning on line 16 in src/model/helpers.rs

View workflow job for this annotation

GitHub Actions / CI / Integration Tests

unused import: `PgExecutor`

Check failure on line 16 in src/model/helpers.rs

View workflow job for this annotation

GitHub Actions / CI / / / Check App / Clippy

unused import: `PgExecutor`

Check warning on line 16 in src/model/helpers.rs

View workflow job for this annotation

GitHub Actions / CI / / / Check App / Unit Tests

unused import: `PgExecutor`

Check warning on line 16 in src/model/helpers.rs

View workflow job for this annotation

GitHub Actions / CI / Integration Tests

unused import: `PgExecutor`

Check failure on line 16 in src/model/helpers.rs

View workflow job for this annotation

GitHub Actions / CI / / / Check App / Clippy

unused import: `PgExecutor`

Check warning on line 16 in src/model/helpers.rs

View workflow job for this annotation

GitHub Actions / CI / / / Check App / Unit Tests

unused import: `PgExecutor`

Check warning on line 16 in src/model/helpers.rs

View workflow job for this annotation

GitHub Actions / CI / Integration Tests

unused import: `PgExecutor`

Check failure on line 16 in src/model/helpers.rs

View workflow job for this annotation

GitHub Actions / CI / / / Check App / Clippy

unused import: `PgExecutor`

Check warning on line 16 in src/model/helpers.rs

View workflow job for this annotation

GitHub Actions / CI / / / Check App / Unit Tests

unused import: `PgExecutor`
std::{collections::HashSet, time::Instant},
tracing::instrument,
uuid::Uuid,
Expand All @@ -28,13 +28,13 @@ pub struct ProjectWithPublicKeys {
pub topic: String,
}

pub async fn upsert_project(
pub async fn upsert_project<'e>(
project_id: ProjectId,
app_domain: &str,
topic: Topic,
authentication_key: &SigningKey,
subscribe_key: &StaticSecret,
postgres: &PgPool,
postgres: impl sqlx::PgExecutor<'e>,
metrics: Option<&Metrics>,
) -> Result<ProjectWithPublicKeys, sqlx::error::Error> {
let authentication_public_key = encode_authentication_public_key(authentication_key);
Expand All @@ -58,15 +58,15 @@ pub async fn upsert_project(
// TODO test idempotency
#[allow(clippy::too_many_arguments)]
#[instrument(skip(authentication_private_key, subscribe_private_key, postgres, metrics))]
async fn upsert_project_impl(
async fn upsert_project_impl<'e>(
project_id: ProjectId,
app_domain: &str,
topic: Topic,
authentication_public_key: String,
authentication_private_key: String,
subscribe_public_key: String,
subscribe_private_key: String,
postgres: &PgPool,
postgres: impl sqlx::PgExecutor<'e>,
metrics: Option<&Metrics>,
) -> Result<ProjectWithPublicKeys, sqlx::error::Error> {
let query = "
Expand Down Expand Up @@ -324,18 +324,16 @@ pub struct SubscribeResponse {
}

// TODO test idempotency
#[instrument(skip(postgres, metrics))]
#[instrument(skip(txn, metrics))]
pub async fn upsert_subscriber(
project: Uuid,
account: AccountId,
scope: HashSet<Uuid>,
notify_key: &[u8; 32],
notify_topic: Topic,
postgres: &PgPool,
txn: &mut Transaction<'_, Postgres>,
metrics: Option<&Metrics>,
) -> Result<SubscribeResponse, sqlx::error::Error> {
let mut txn = postgres.begin().await?;

// `xmax = 0`: https://stackoverflow.com/a/39204667

let query = "
Expand Down Expand Up @@ -369,9 +367,7 @@ pub async fn upsert_subscriber(
metrics.postgres_query("upsert_subscriber", start);
}

update_subscriber_scope(subscriber.id, scope, &mut txn, metrics).await?;

txn.commit().await?;
update_subscriber_scope(subscriber.id, scope, txn, metrics).await?;

Ok(subscriber)
}
Expand Down Expand Up @@ -415,7 +411,7 @@ pub async fn update_subscriber(
async fn update_subscriber_scope(
subscriber: Uuid,
scope: HashSet<Uuid>,
txn: &mut sqlx::Transaction<'_, Postgres>,
txn: &mut Transaction<'_, Postgres>,
metrics: Option<&Metrics>,
) -> Result<(), sqlx::error::Error> {
let query = "
Expand Down
32 changes: 28 additions & 4 deletions src/services/public_http_server/handlers/subscribe_topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ use {
crate::{
error::NotifyServerError,
model::helpers::upsert_project,
publish_relay_message::subscribe_relay_topic,
publish_relay_message::{publish_relay_message, subscribe_relay_topic},
rate_limit::{self, Clock, RateLimitError},
registry::{extractor::AuthedProjectId, storage::redis::Redis},
spec::{NOTIFY_NOOP_TAG, NOTIFY_NOOP_TTL},
state::AppState,
utils::topic_from_key,
},
Expand All @@ -18,10 +19,10 @@ use {
hyper::StatusCode,
once_cell::sync::Lazy,
regex::Regex,
relay_rpc::domain::ProjectId,
relay_rpc::{domain::ProjectId, rpc::Publish},
serde::{Deserialize, Serialize},
serde_json::json,
std::sync::Arc,
std::sync::{Arc, OnceLock},
tracing::{info, instrument},
x25519_dalek::{PublicKey, StaticSecret},
};
Expand Down Expand Up @@ -80,13 +81,14 @@ pub async fn handler(

let authentication_key = ed25519_dalek::SigningKey::generate(&mut OsRng);

let mut txn = state.postgres.begin().await?;
let project = upsert_project(
project_id,
&app_domain,
topic.clone(),
&authentication_key,
&subscribe_key,
&state.postgres,
&mut *txn,
state.metrics.as_ref(),
)
.await
Expand All @@ -103,8 +105,30 @@ pub async fn handler(
if project.topic == topic.as_ref() {
info!("Subscribing to project topic: {topic}");
subscribe_relay_topic(&state.relay_ws_client, &topic, state.metrics.as_ref()).await?;

// Send noop to extend ttl of relay's mapping
info!("Timing: Publishing noop to notify_topic");
publish_relay_message(
&state.relay_http_client,
&Publish {
topic,
message: {
// Extremely minor performance optimization with OnceLock to avoid allocating the same empty string everytime
static LOCK: OnceLock<Arc<str>> = OnceLock::new();
LOCK.get_or_init(|| "".into()).clone()
},
tag: NOTIFY_NOOP_TAG,
ttl_secs: NOTIFY_NOOP_TTL.as_secs() as u32,
prompt: false,
},
state.metrics.as_ref(),
)
.await?;
info!("Timing: Finished publishing noop to notify_topic");
}

txn.commit().await?;

Ok(Json(SubscribeTopicResponseBody {
authentication_key: project.authentication_public_key,
subscribe_key: project.subscribe_public_key,
Expand Down
56 changes: 33 additions & 23 deletions src/services/websocket_server/handlers/notify_subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<(), Relay
let scope = parse_scope(&request_auth.scp)
.map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?; // TODO change to client error?

let mut txn = state
.postgres
.begin()
.await
.map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?;

let subscriber = {
// Technically we don't need to derive based on client_public_key anymore; we just need a symkey. But this is technical
// debt from when clients derived the same symkey on their end via Diffie-Hellman. But now they use the value from
Expand All @@ -169,7 +175,7 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<(), Relay
scope.clone(),
&notify_key,
notify_topic,
&state.postgres,
&mut txn,
state.metrics.as_ref(),
)
.await
Expand Down Expand Up @@ -204,6 +210,31 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<(), Relay
.map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?;
info!("Timing: Finished subscribing to topic");

// Send noop to extend ttl of relay's mapping
info!("Timing: Publishing noop to notify_topic");
publish_relay_message(
&state.relay_http_client,
&Publish {
topic: notify_topic.clone(),
message: {
// Extremely minor performance optimization with OnceLock to avoid allocating the same empty string everytime
static LOCK: OnceLock<Arc<str>> = OnceLock::new();
LOCK.get_or_init(|| "".into()).clone()
},
tag: NOTIFY_NOOP_TAG,
ttl_secs: NOTIFY_NOOP_TTL.as_secs() as u32,
prompt: false,
},
state.metrics.as_ref(),
)
.await
.map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?; // TODO change to client error?
info!("Timing: Finished publishing noop to notify_topic");

txn.commit()
.await
.map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?;

info!("Timing: Recording SubscriberUpdateParams");
state.analytics.client(SubscriberUpdateParams {
project_pk: project.id,
Expand All @@ -215,7 +246,7 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<(), Relay
method: NotifyClientMethod::Subscribe,
old_scope: HashSet::new(),
new_scope: scope.clone(),
notification_topic: notify_topic.clone(),
notification_topic: notify_topic,
topic,
});
info!("Timing: Finished recording SubscriberUpdateParams");
Expand Down Expand Up @@ -277,27 +308,6 @@ pub async fn handle(msg: PublishedMessage, state: &AppState) -> Result<(), Relay
info!("Finished publishing subscribe response");
}

// Send noop to extend ttl of relay's mapping
info!("Timing: Publishing noop to notify_topic");
publish_relay_message(
&state.relay_http_client,
&Publish {
topic: notify_topic,
message: {
// Extremely minor performance optimization with OnceLock to avoid allocating the same empty string everytime
static LOCK: OnceLock<Arc<str>> = OnceLock::new();
LOCK.get_or_init(|| "".into()).clone()
},
tag: NOTIFY_NOOP_TAG,
ttl_secs: NOTIFY_NOOP_TTL.as_secs() as u32,
prompt: false,
},
state.metrics.as_ref(),
)
.await
.map_err(|e| RelayMessageServerError::NotifyServerError(e.into()))?; // TODO change to client error?
info!("Timing: Finished publishing noop to notify_topic");

// TODO do in same txn as upsert_subscriber()
if subscriber.inserted {
let welcome_notification =
Expand Down

0 comments on commit 795a963

Please sign in to comment.