Skip to content

Commit

Permalink
chore: adding message worker spawning on pgsql notify event
Browse files Browse the repository at this point in the history
  • Loading branch information
geekbrother committed Oct 26, 2023
1 parent 63c572f commit cc837ff
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 7 deletions.
10 changes: 5 additions & 5 deletions src/publisher_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ fn create_http_client(
pub async fn bootstrap(config: Configuration) -> Result<()> {
wc::metrics::ServiceMetrics::init_with_name("notify-publisher-service");

let postgres = PgPoolOptions::new().connect(&config.postgres_url).await?;
sqlx::migrate!("./migrations").run(&postgres).await?;
let postgres_pool = PgPoolOptions::new().connect(&config.postgres_url).await?;
sqlx::migrate!("./migrations").run(&postgres_pool).await?;

let seed = sha256::digest(config.keypair_seed.as_bytes()).as_bytes()[..32]
.try_into()
.map_err(|_| Error::InvalidKeypairSeed)?;
let keypair = Keypair::generate(&mut StdRng::from_seed(seed));
let _http_relay_client = Arc::new(create_http_client(
let http_relay_client = Arc::new(create_http_client(
&keypair,
&config.relay_url.replace("ws", "http"),
&config.notify_url,
Expand All @@ -50,10 +50,10 @@ pub async fn bootstrap(config: Configuration) -> Result<()> {

info!("Starting metrics server on {}", telemetry_addr);
let telemetry_app = Router::new().route("/metrics", get(metrics::handler));

let postgres_pool_arc = Arc::new(postgres_pool);
select! {
e = axum::Server::bind(&telemetry_addr).serve(telemetry_app.into_make_service()) => warn!("Metrics server terminated {:?}", e),
e = worker::run() => warn!("Worker process terminated {:?}", e),
e = worker::run(&postgres_pool_arc, &http_relay_client) => warn!("Worker process terminated {:?}", e),
}

Ok(())
Expand Down
35 changes: 33 additions & 2 deletions src/publisher_service/worker.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,34 @@
pub async fn run() {
// TODO: worker implementation
use {
sqlx::{postgres::PgListener, PgPool},
std::sync::Arc,
};

pub async fn run(
pg_pool: &Arc<PgPool>,
relay_client: &Arc<relay_client::http::Client>,
) -> Result<(), sqlx::Error> {
let mut pg_notify_listener = PgListener::connect_with(pg_pool)
.await
.expect("Notifying listener failed to connect to Postgres");
pg_notify_listener
.listen("notification_for_delivery")
.await
.expect("Failed to listen to Postgres events channels");

loop {
let notification = pg_notify_listener.recv().await?;
tokio::spawn({
let pg_pool = pg_pool.clone();
let relay_client = relay_client.clone();
async move { process_message(&pg_pool, &relay_client, notification.payload()).await }
});
}
}

async fn process_message(
_pg_pool: &Arc<PgPool>,
_relay_client: &Arc<relay_client::http::Client>,
_message_delivery_id: &str,
) {
// TODO: implement
}

0 comments on commit cc837ff

Please sign in to comment.