-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat : added snos worker implementation and unit tests #16
Conversation
WalkthroughThe updates encompass adding single-threaded execution to coverage tasks, dynamic configuration management with Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant Worker
participant Config as ArcSwap<Config>
participant Database
participant Queue
User->>Worker: Start Worker
Worker->>Config: Fetch Configuration
Config-->>Worker: Return Config
Worker->>Database: Check Latest Job
Database-->>Worker: Latest Job Info
Worker->>Queue: Create Job
Queue-->>Worker: Job Created
Worker->>Database: Save Created Job
Database-->>Worker: Job Saved
Worker-->>User: Job Processed Successfully
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
Outside diff range and nitpick comments (1)
crates/orchestrator/src/config.rs (1)
83-104
: Ensure thread safety and performance ofconfig_force_init
.Using
ArcSwap
provides flexibility for testing but consider the implications on performance and thread safety. Review the usage patterns to ensure that the dynamic swapping of configurations does not introduce race conditions or performance bottlenecks.
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files ignored due to path filters (1)
Cargo.lock
is excluded by!**/*.lock
Files selected for processing (12)
- .github/workflows/coverage.yml (1 hunks)
- Cargo.toml (1 hunks)
- crates/orchestrator/Cargo.toml (1 hunks)
- crates/orchestrator/src/config.rs (2 hunks)
- crates/orchestrator/src/database/mod.rs (2 hunks)
- crates/orchestrator/src/database/mongodb/mod.rs (2 hunks)
- crates/orchestrator/src/jobs/mod.rs (4 hunks)
- crates/orchestrator/src/main.rs (1 hunks)
- crates/orchestrator/src/queue/job_queue.rs (1 hunks)
- crates/orchestrator/src/tests/mod.rs (1 hunks)
- crates/orchestrator/src/tests/workers/mod.rs (1 hunks)
- crates/orchestrator/src/workers/snos.rs (2 hunks)
Files not summarized due to errors (2)
- crates/orchestrator/Cargo.toml: Error: Server error. Please try again later.
- crates/orchestrator/src/queue/job_queue.rs: Error: Server error. Please try again later.
Files skipped from review due to trivial changes (3)
- Cargo.toml
- crates/orchestrator/src/main.rs
- crates/orchestrator/src/tests/mod.rs
Additional comments not posted (5)
.github/workflows/coverage.yml (1)
29-29
: The addition of--test-threads=1
is a sensible change to ensure that tests do not interfere with each other by running in parallel, which is particularly useful for tests that are not thread-safe.crates/orchestrator/Cargo.toml (1)
15-15
: The addition ofarc-swap
as a dependency aligns with the need for atomic updates to configurations at runtime, which is crucial for systems that require high availability and dynamic configuration changes.crates/orchestrator/src/database/mod.rs (1)
37-38
: The addition ofget_latest_job_by_type
andget_all_jobs
functions is a valuable enhancement for more granular job management. Consider adding documentation comments to these functions to clarify their usage and expected behavior, especially regarding error handling and the types of results they return.crates/orchestrator/src/tests/workers/mod.rs (1)
42-51
: Check error handling in queue message sending.Ensure that error handling is robust when sending messages to the queue fails. Consider adding a test case to simulate and handle message sending failures.
crates/orchestrator/src/queue/job_queue.rs (1)
17-18
: Check visibility ofJobQueueMessage
fields.The
id
field ofJobQueueMessage
is now crate-private. Verify if this change affects any external modules that might depend on direct access to this field.
async fn get_latest_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>> { | ||
let filter = doc! { | ||
"job_type": mongodb::bson::to_bson(&job_type)?, | ||
}; | ||
let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build(); | ||
Ok(self | ||
.get_job_collection() | ||
.find_one(filter, find_options) | ||
.await | ||
.expect("Failed to fetch latest job by given job type")) | ||
} | ||
|
||
async fn get_all_jobs(&self, job_type: JobType) -> Result<Cursor<JobItem>> { | ||
let filter = doc! { | ||
"job_type": mongodb::bson::to_bson(&job_type)?, | ||
}; | ||
Ok(self.get_job_collection().find(filter, None).await.expect("Failed to fetch jobs with given job type")) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optimize MongoDB queries for job retrieval.
The new functions for retrieving jobs could benefit from optimization. Consider adding indexes on job_type
and internal_id
to improve query performance, especially for large datasets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that internal ID is a string of variable length, definitely needs an index
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@unstark, you're absolutely right. Adding an index on internal_id
will help optimize the performance of queries filtering by this field, especially as the database scales. Here's the MongoDB command you can use to create this index within the jobs
collection:
let job_collection = self.client.database("orchestrator").collection("jobs");
job_collection.create_index(doc! {"internal_id": 1}, None).await?;
This command sets up an ascending index on internal_id
. If you're also frequently querying by job_type
, considering adding a compound index that includes both internal_id
and job_type
could be beneficial, especially for the get_latest_job_by_type
function:
job_collection.create_index(doc! {"job_type": 1, "internal_id": -1}, None).await?;
This compound index would optimize the sorting and retrieval by job_type
and internal_id
, enhancing the performance of fetching the latest job by type.
crates/orchestrator/src/jobs/mod.rs
Outdated
@@ -170,6 +170,7 @@ | |||
fn get_job_handler(job_type: &JobType) -> Box<dyn Job> { | |||
match job_type { | |||
JobType::DataSubmission => Box::new(da_job::DaJob), | |||
JobType::SnosRun => Box::new(da_job::DaJob), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct handler assignment for JobType::SnosRun
.
The get_job_handler
function incorrectly assigns the DaJob
handler for JobType::SnosRun
. This should be corrected to use the appropriate handler as per the new functionality introduced.
- JobType::SnosRun => Box::new(da_job::DaJob),
+ JobType::SnosRun => Box::new(snos_job::SnosRunHandler::new()),
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
JobType::SnosRun => Box::new(da_job::DaJob), | |
JobType::SnosRun => Box::new(snos_job::SnosRunHandler::new()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ocdbytes let's fix this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!
/// However, when running tests, we often want to reinitialize because we want to clear the DB and | ||
/// set it up again for reuse in new tests. By calling `config_force_init` we replace the already | ||
/// stored config inside `ArcSwap` with the new configuration and pool settings. | ||
pub async fn config_force_init(config: Config) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's only add this code if the test flag is enabled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it could work without the global config? Apart from the need to carry it through all methods, are there any reasons to use static initialization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya that's the only advantage. Otherwise we need to pass it everywhere although that does make testing easier. Do you think we should do the latter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be fair, yes :) that would be much easier to test, also you'd be able to pass only necessary parts of the config instead of the entire struct
@@ -33,6 +34,8 @@ pub trait Database: Send + Sync { | |||
) -> Result<()>; | |||
|
|||
async fn update_metadata(&self, job: &JobItem, metadata: HashMap<String, String>) -> Result<()>; | |||
async fn get_latest_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>>; | |||
async fn get_all_jobs(&self, job_type: JobType) -> Result<Cursor<JobItem>>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cursor
is a mongodb type and the DatabaseTrait is generic across the database you're using. So we should change this to Vec
and handle the conversion for Cursor -> Vec inside the MongoDB function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no need for this function for now as test is working without it. Will remove this and if in future we need to add this function will surely follow the above method.
crates/orchestrator/src/jobs/mod.rs
Outdated
@@ -170,6 +170,7 @@ | |||
fn get_job_handler(job_type: &JobType) -> Box<dyn Job> { | |||
match job_type { | |||
JobType::DataSubmission => Box::new(da_job::DaJob), | |||
JobType::SnosRun => Box::new(da_job::DaJob), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ocdbytes let's fix this
|
||
#[rstest] | ||
#[tokio::test] | ||
async fn test_create_job() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async fn test_create_job() { | |
async fn test_snos_worker() { |
for i in 1..6 { | ||
db.expect_create_job() | ||
.returning(|_| Ok(get_job_item_mock_by_id("1".to_string()))) | ||
.call(get_job_item_mock_by_id(i.to_string())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we just call get_job_item_mock_by_id
once and use it in both places?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't work tried this value move ho jaati h
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try cloning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
already tried
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cloning a string should ideally work, what were you doing exactly?
for i in 1..6 { | ||
db.expect_get_job_by_internal_id_and_type() | ||
.returning(|_, _| Ok(None)) | ||
.call(&i.to_string(), &JobType::SnosRun) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we aren't supposed to call
anywhere. instead, we need to tell the mock that this function should be called once (times(1)
) and it should be called with args xyz (https://docs.rs/mockall/latest/mockall/#matching-arguments)
is this why you were saying that the test doesn't make sense because we're calling everything ourselves?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually call behaviour is needed when test is run.
So what happens is that if I don't add this call then test will throw an error stating that no expectation behaviour found for this particular call. Because this function is called in the code while creating the job to check whether the job already exists, If yes It will throw an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, the error sounds like the code didn't call this method but it should've been called. We don't use call for the da test. Can you double or share the exact error message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolved
let mut queue = MockQueueProvider::new(); | ||
|
||
// Mocking db functions | ||
db.expect_get_latest_job_by_type().returning(|_| Ok(None)).call(JobType::SnosRun).expect("Failed to call."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we also do a test case for when the db returns a value? we could use rstest
with cases
so that we can use the same code only
@@ -8,7 +14,35 @@ impl Worker for SnosWorker { | |||
/// 1. Fetch the latest completed block from the Starknet chain | |||
/// 2. Fetch the last block that had a SNOS job run. | |||
/// 3. Create SNOS run jobs for all the remaining blocks | |||
// TEST : added config temporarily to test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
todo!() | ||
let config = config().await; | ||
let provider = config.starknet_client(); | ||
let latest_block_number = provider.block_number().await.expect("Failed to fetch block number from rpc"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we make workers return a Result and throw an error wherever we do unwrap
or expect
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that should be the ideal case but ig error is handled in create_job
function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but we're doing unwraps and expects in worker, this will crash the orchestrator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
In most cases it's better (imho) to propagate the error as far as possible and let the caller decide what to do with it rather than panic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added error throwing in workers
.expect("Error : failed to create job for snos workers."); | ||
} | ||
|
||
log::info!("jobs created !!"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trace log?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed. It is a redundant log
let filter = doc! { | ||
"job_type": mongodb::bson::to_bson(&job_type)?, | ||
}; | ||
let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAICS internal ID is a string without additional restrictions https://github.com/unstark/madara-orchestrator/blob/3fe37331183cf3291aaef9d1e2664e54e86f0648/crates/orchestrator/src/jobs/types.rs#L104, so it's not necessarily a unique incrementing integer. We might need an additional field, e.g. created_at in microseconds to achieve what you want
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So internal_id would actually be the block number but in String. The reason it was made a string is that jobs use this column to uniquely identify a job based on the type. Like, I want the SNOS run of block 6. Now jobs can have any sort of internal id (number, uuid etc.). So as a generic, a string param was used here. I am not sure if this is the best approach though but here, -1 should work because it's a number represented as string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok nvm, this is incorrect. This function can be used by any job so if some job has a different form of internal id, this will break. However, the created_at might not be the best check either because what if we create jobs in parallel somewhere or after receiving something from a queue, there's no guarantee that the jobs would be created sequentially.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, we can explicitly rename this function to say get_latest_job_by_internal_id
so it makes the purpose of the function clear. And then later, we can have another function of get_latest_by_created_at
if some job is ok using that. wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
function name changed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would work (renaming), as for the created_at - even if there is a collision, the function would still do its job :)
let config = config().await; | ||
let provider = config.starknet_client(); | ||
let latest_block_number = provider.block_number().await.expect("Failed to fetch block number from rpc"); | ||
let latest_block_processed_data = config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could it be the case that there are gaps in the snos job executions?
For instance:
block 1: snos job succeeded
block 2: snos job failed
block 3: snos job succeeded
block 4: snos job pending
Ideally you should also search for failed ones and retry (if possible)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the job execution already has a retry logic. if something moved to a failed state, we have ideally already retried enough number of times and we should throw and alert and investigate manually, wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh right, that makes sense 👍
* feat : added snos worker implementation and unit tests * feat : added review #1 changes : added error handling for snos workers * feat : added review #1 changes : added error handling for snos workers * fix : lint * fix : lint errors --------- Co-authored-by: Arun Jangra <[email protected]>
* feat : added snos worker implementation and unit tests * feat : added review #1 changes : added error handling for snos workers * feat : added review #1 changes : added error handling for snos workers * fix : lint * fix : lint errors --------- Co-authored-by: Arun Jangra <[email protected]>
Summary by CodeRabbit
New Features
ArcSwap
.JobType::SnosRun
case in job handling.Enhancements
Dependencies
arc-swap
version 1.7.1 to the dependencies.Tests