Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update/queue builder #94

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ AWS_SECRET_ACCESS_KEY="AWS_SECRET_ACCESS_KEY"
AWS_S3_BUCKET_NAME="madara-orchestrator-test-bucket"
AWS_S3_BUCKET_REGION="us-east-1"
AWS_ENDPOINT_URL="http://localhost.localstack.cloud:4566"
SQS_JOB_PROCESSING_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_job_processing_queue"
SQS_JOB_VERIFICATION_QUEUE_URL="http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/madara_orchestrator_job_verification_queue"
AWS_DEFAULT_REGION="localhost"

#### QUEUE ####
# Add without trailing slash, to be handled wherever used.
QUEUE_BASE_URL = "http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you update .env.example as well


##### On chain config #####

MADARA_RPC_URL="http://localhost:3000"
Expand Down
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Added

- added coveralls support
- added new creator for SqsQueue.
- added coveralls support.
- moved mongodb serde behind feature flag
- implemented DA worker.
- Function to calculate the kzg proof of x_0.
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub async fn init_config() -> Config {
let database = Box::new(MongoDb::new(MongoDbConfig::new_from_env()).await);

// init the queue
let queue = Box::new(SqsQueue {});
let queue = Box::new(SqsQueue::new_from_env());

let da_client = build_da_client().await;

Expand Down
9 changes: 7 additions & 2 deletions crates/orchestrator/src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ use crate::jobs::JobError;
#[automock]
#[async_trait]
pub trait QueueProvider: Send + Sync {
async fn send_message_to_queue(&self, queue: String, payload: String, delay: Option<Duration>) -> EyreResult<()>;
async fn consume_message_from_queue(&self, queue: String) -> std::result::Result<Delivery, QueueError>;
async fn send_message_to_queue(
&self,
queue_name: String,
payload: String,
delay: Option<Duration>,
) -> EyreResult<()>;
async fn consume_message_from_queue(&self, queue_name: String) -> std::result::Result<Delivery, QueueError>;
}

pub async fn init_consumers() -> Result<(), JobError> {
Expand Down
40 changes: 24 additions & 16 deletions crates/orchestrator/src/queue/sqs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,36 @@
use std::time::Duration;

use crate::queue::job_queue::JOB_PROCESSING_QUEUE;
use async_trait::async_trait;
use color_eyre::Result;
use omniqueue::backends::{SqsBackend, SqsConfig, SqsConsumer, SqsProducer};
use omniqueue::{Delivery, QueueError};
use utils::env_utils::get_env_var_or_panic;

use crate::queue::QueueProvider;
pub struct SqsQueue;

pub struct SqsQueue {
base_url: String,
}

impl SqsQueue {
pub fn new(base_url: String) -> Self {
SqsQueue { base_url }
}

pub fn new_from_env() -> Self {
let base_url = get_env_var_or_panic("QUEUE_BASE_URL");
SqsQueue { base_url }
}

pub fn get_queue_url(&self, queue_name: String) -> String {
format!("{}/{}", self.base_url.clone(), queue_name)
}
}

#[async_trait]
impl QueueProvider for SqsQueue {
async fn send_message_to_queue(&self, queue: String, payload: String, delay: Option<Duration>) -> Result<()> {
let queue_url = get_queue_url(queue);
let producer = get_producer(queue_url).await?;
async fn send_message_to_queue(&self, queue_name: String, payload: String, delay: Option<Duration>) -> Result<()> {
let producer = get_producer(self.get_queue_url(queue_name)).await?;

match delay {
Some(d) => producer.send_raw_scheduled(payload.as_str(), d).await?,
Expand All @@ -24,24 +40,16 @@ impl QueueProvider for SqsQueue {
Ok(())
}

async fn consume_message_from_queue(&self, queue: String) -> std::result::Result<Delivery, QueueError> {
let queue_url = get_queue_url(queue);
let mut consumer = get_consumer(queue_url).await?;
async fn consume_message_from_queue(&self, queue_name: String) -> std::result::Result<Delivery, QueueError> {
let mut consumer = get_consumer(self.get_queue_url(queue_name)).await?;
consumer.receive().await
}
}

fn get_queue_url(queue_name: String) -> String {
if queue_name == JOB_PROCESSING_QUEUE {
get_env_var_or_panic("SQS_JOB_PROCESSING_QUEUE_URL")
} else {
get_env_var_or_panic("SQS_JOB_VERIFICATION_QUEUE_URL")
}
}

// TODO: store the producer and consumer in memory to avoid creating a new one every time
async fn get_producer(queue: String) -> Result<SqsProducer> {
let (producer, _) =
// Automatically fetches the AWS Keys from env
SqsBackend::builder(SqsConfig { queue_dsn: queue, override_endpoint: true }).build_pair().await?;
Ok(producer)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/tests/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl TestConfigBuilder {
self.prover_client.unwrap_or_else(|| build_prover_service(&settings_provider)),
self.settlement_client.unwrap(),
self.database.unwrap(),
self.queue.unwrap_or_else(|| Box::new(SqsQueue {})),
self.queue.unwrap_or_else(|| Box::new(SqsQueue::new_from_env())),
self.storage.unwrap(),
);

Expand Down
Loading