diff --git a/src/publisher_service/mod.rs b/src/publisher_service/mod.rs index 32859173..0461b9b1 100644 --- a/src/publisher_service/mod.rs +++ b/src/publisher_service/mod.rs @@ -32,14 +32,14 @@ fn create_http_client( pub async fn bootstrap(config: Configuration) -> Result<()> { wc::metrics::ServiceMetrics::init_with_name("notify-publisher-service"); - let postgres = PgPoolOptions::new().connect(&config.postgres_url).await?; - sqlx::migrate!("./migrations").run(&postgres).await?; + let postgres_pool = PgPoolOptions::new().connect(&config.postgres_url).await?; + sqlx::migrate!("./migrations").run(&postgres_pool).await?; let seed = sha256::digest(config.keypair_seed.as_bytes()).as_bytes()[..32] .try_into() .map_err(|_| Error::InvalidKeypairSeed)?; let keypair = Keypair::generate(&mut StdRng::from_seed(seed)); - let _http_relay_client = Arc::new(create_http_client( + let http_relay_client = Arc::new(create_http_client( &keypair, &config.relay_url.replace("ws", "http"), &config.notify_url, @@ -50,10 +50,10 @@ pub async fn bootstrap(config: Configuration) -> Result<()> { info!("Starting metrics server on {}", telemetry_addr); let telemetry_app = Router::new().route("/metrics", get(metrics::handler)); - + let postgres_pool_arc = Arc::new(postgres_pool); select! { e = axum::Server::bind(&telemetry_addr).serve(telemetry_app.into_make_service()) => warn!("Metrics server terminated {:?}", e), - e = worker::run() => warn!("Worker process terminated {:?}", e), + e = worker::run(&postgres_pool_arc, &http_relay_client) => warn!("Worker process terminated {:?}", e), } Ok(()) diff --git a/src/publisher_service/worker.rs b/src/publisher_service/worker.rs index 35aa8f27..f8747dab 100644 --- a/src/publisher_service/worker.rs +++ b/src/publisher_service/worker.rs @@ -1,3 +1,34 @@ -pub async fn run() { - // TODO: worker implementation +use { + sqlx::{postgres::PgListener, PgPool}, + std::sync::Arc, +}; + +pub async fn run( + pg_pool: &Arc, + relay_client: &Arc, +) -> Result<(), sqlx::Error> { + let mut pg_notify_listener = PgListener::connect_with(pg_pool) + .await + .expect("Notifying listener failed to connect to Postgres"); + pg_notify_listener + .listen("notification_for_delivery") + .await + .expect("Failed to listen to Postgres events channels"); + + loop { + let notification = pg_notify_listener.recv().await?; + tokio::spawn({ + let pg_pool = pg_pool.clone(); + let relay_client = relay_client.clone(); + async move { process_message(&pg_pool, &relay_client, notification.payload()).await } + }); + } +} + +async fn process_message( + _pg_pool: &Arc, + _relay_client: &Arc, + _message_delivery_id: &str, +) { + // TODO: implement }