Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat : added snos worker implementation and unit tests #16

Merged
merged 6 commits into from
Jun 17, 2024

Conversation

ocdbytes
Copy link
Member

@ocdbytes ocdbytes commented Jun 13, 2024

  • added SNOS worker implementation
  • added SNOS worker tests

Summary by CodeRabbit

  • New Features

    • Introduced dynamic configuration management using ArcSwap.
    • Added database functions to retrieve the latest job by type and all jobs.
    • Included new JobType::SnosRun case in job handling.
    • Implemented a new test module for job creation and processing.
  • Enhancements

    • Updated job functions to use reference configuration.
    • Improved worker-thread handling with corrected comments and terminology.
  • Dependencies

    • Added arc-swap version 1.7.1 to the dependencies.
  • Tests

    • Added extensive tests for job creation and worker interactions using mocks.

Copy link

coderabbitai bot commented Jun 13, 2024

Walkthrough

The updates encompass adding single-threaded execution to coverage tasks, dynamic configuration management with ArcSwap, expanded database functionality to retrieve job items, and additional job processing capabilities. Also introduced are comprehensive tests to mock job scenarios and adjustments in worker processing for better configuration handling.

Changes

File Change Summary
.github/workflows/coverage.yml Added --test-threads=1 flag to the cargo llvm-cov nextest command to control coverage generation threading
Cargo.toml Added arc-swap dependency version "1.7.1"
crates/orchestrator/.../config.rs Transitioned to using ArcSwap for dynamic configuration management, including new functions to initialize and update configuration
crates/orchestrator/.../database/* Added functions get_latest_job_by_type and get_all_jobs, with necessary MongoDB options and cursor updates
crates/orchestrator/.../jobs/* Modified job functions to reference configuration safely and introduced handling for new SnosRun job type
crates/orchestrator/.../main.rs Updated comments and corrected word plurality for better clarity
crates/orchestrator/.../tests/* Added a new workers test module to simulate job creation and processing using mock database, queue, and HTTP server implementations
crates/orchestrator/.../workers/snos.rs Enhanced the run_worker function to create jobs based on block differences, including logging and configuration fetching mechanisms

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant Worker
    participant Config as ArcSwap<Config>
    participant Database
    participant Queue

    User->>Worker: Start Worker
    Worker->>Config: Fetch Configuration
    Config-->>Worker: Return Config
    Worker->>Database: Check Latest Job
    Database-->>Worker: Latest Job Info
    Worker->>Queue: Create Job
    Queue-->>Worker: Job Created
    Worker->>Database: Save Created Job
    Database-->>Worker: Job Saved
    Worker-->>User: Job Processed Successfully
Loading

Poem

In the land of code, changes bloom like spring,
Dynamic configs dance, tightly in a ring.
Jobs now flow, a graceful ballet,
Database hums, in a harmonious array.
Coverage checked with single-thread might,
To make sure our code stays robust and bright.
Celebrate these tweaks, with hops and cheers,
For the orchestrator's journey, through code frontiers! 🐇✨


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

Outside diff range and nitpick comments (1)
crates/orchestrator/src/config.rs (1)

83-104: Ensure thread safety and performance of config_force_init.

Using ArcSwap provides flexibility for testing but consider the implications on performance and thread safety. Review the usage patterns to ensure that the dynamic swapping of configurations does not introduce race conditions or performance bottlenecks.

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 2a9d77b and 2e9cf16.

Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
Files selected for processing (12)
  • .github/workflows/coverage.yml (1 hunks)
  • Cargo.toml (1 hunks)
  • crates/orchestrator/Cargo.toml (1 hunks)
  • crates/orchestrator/src/config.rs (2 hunks)
  • crates/orchestrator/src/database/mod.rs (2 hunks)
  • crates/orchestrator/src/database/mongodb/mod.rs (2 hunks)
  • crates/orchestrator/src/jobs/mod.rs (4 hunks)
  • crates/orchestrator/src/main.rs (1 hunks)
  • crates/orchestrator/src/queue/job_queue.rs (1 hunks)
  • crates/orchestrator/src/tests/mod.rs (1 hunks)
  • crates/orchestrator/src/tests/workers/mod.rs (1 hunks)
  • crates/orchestrator/src/workers/snos.rs (2 hunks)
Files not summarized due to errors (2)
  • crates/orchestrator/Cargo.toml: Error: Server error. Please try again later.
  • crates/orchestrator/src/queue/job_queue.rs: Error: Server error. Please try again later.
Files skipped from review due to trivial changes (3)
  • Cargo.toml
  • crates/orchestrator/src/main.rs
  • crates/orchestrator/src/tests/mod.rs
Additional comments not posted (5)
.github/workflows/coverage.yml (1)

29-29: The addition of --test-threads=1 is a sensible change to ensure that tests do not interfere with each other by running in parallel, which is particularly useful for tests that are not thread-safe.

crates/orchestrator/Cargo.toml (1)

15-15: The addition of arc-swap as a dependency aligns with the need for atomic updates to configurations at runtime, which is crucial for systems that require high availability and dynamic configuration changes.

crates/orchestrator/src/database/mod.rs (1)

37-38: The addition of get_latest_job_by_type and get_all_jobs functions is a valuable enhancement for more granular job management. Consider adding documentation comments to these functions to clarify their usage and expected behavior, especially regarding error handling and the types of results they return.

crates/orchestrator/src/tests/workers/mod.rs (1)

42-51: Check error handling in queue message sending.

Ensure that error handling is robust when sending messages to the queue fails. Consider adding a test case to simulate and handle message sending failures.

crates/orchestrator/src/queue/job_queue.rs (1)

17-18: Check visibility of JobQueueMessage fields.

The id field of JobQueueMessage is now crate-private. Verify if this change affects any external modules that might depend on direct access to this field.

crates/orchestrator/src/tests/workers/mod.rs Outdated Show resolved Hide resolved
crates/orchestrator/src/tests/workers/mod.rs Outdated Show resolved Hide resolved
Comment on lines 119 to 136
async fn get_latest_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>> {
let filter = doc! {
"job_type": mongodb::bson::to_bson(&job_type)?,
};
let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build();
Ok(self
.get_job_collection()
.find_one(filter, find_options)
.await
.expect("Failed to fetch latest job by given job type"))
}

async fn get_all_jobs(&self, job_type: JobType) -> Result<Cursor<JobItem>> {
let filter = doc! {
"job_type": mongodb::bson::to_bson(&job_type)?,
};
Ok(self.get_job_collection().find(filter, None).await.expect("Failed to fetch jobs with given job type"))
}
Copy link

@coderabbitai coderabbitai bot Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimize MongoDB queries for job retrieval.

The new functions for retrieving jobs could benefit from optimization. Consider adding indexes on job_type and internal_id to improve query performance, especially for large datasets.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that internal ID is a string of variable length, definitely needs an index

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@unstark, you're absolutely right. Adding an index on internal_id will help optimize the performance of queries filtering by this field, especially as the database scales. Here's the MongoDB command you can use to create this index within the jobs collection:

let job_collection = self.client.database("orchestrator").collection("jobs");
job_collection.create_index(doc! {"internal_id": 1}, None).await?;

This command sets up an ascending index on internal_id. If you're also frequently querying by job_type, considering adding a compound index that includes both internal_id and job_type could be beneficial, especially for the get_latest_job_by_type function:

job_collection.create_index(doc! {"job_type": 1, "internal_id": -1}, None).await?;

This compound index would optimize the sorting and retrieval by job_type and internal_id, enhancing the performance of fetching the latest job by type.

crates/orchestrator/src/workers/snos.rs Outdated Show resolved Hide resolved
crates/orchestrator/src/jobs/mod.rs Show resolved Hide resolved
crates/orchestrator/src/jobs/mod.rs Show resolved Hide resolved
crates/orchestrator/src/jobs/mod.rs Show resolved Hide resolved
@@ -170,6 +170,7 @@
fn get_job_handler(job_type: &JobType) -> Box<dyn Job> {
match job_type {
JobType::DataSubmission => Box::new(da_job::DaJob),
JobType::SnosRun => Box::new(da_job::DaJob),
Copy link

@coderabbitai coderabbitai bot Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct handler assignment for JobType::SnosRun.

The get_job_handler function incorrectly assigns the DaJob handler for JobType::SnosRun. This should be corrected to use the appropriate handler as per the new functionality introduced.

- JobType::SnosRun => Box::new(da_job::DaJob),
+ JobType::SnosRun => Box::new(snos_job::SnosRunHandler::new()),
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
JobType::SnosRun => Box::new(da_job::DaJob),
JobType::SnosRun => Box::new(snos_job::SnosRunHandler::new()),

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ocdbytes let's fix this

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

/// However, when running tests, we often want to reinitialize because we want to clear the DB and
/// set it up again for reuse in new tests. By calling `config_force_init` we replace the already
/// stored config inside `ArcSwap` with the new configuration and pool settings.
pub async fn config_force_init(config: Config) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's only add this code if the test flag is enabled

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it could work without the global config? Apart from the need to carry it through all methods, are there any reasons to use static initialization?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya that's the only advantage. Otherwise we need to pass it everywhere although that does make testing easier. Do you think we should do the latter?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be fair, yes :) that would be much easier to test, also you'd be able to pass only necessary parts of the config instead of the entire struct

@@ -33,6 +34,8 @@ pub trait Database: Send + Sync {
) -> Result<()>;

async fn update_metadata(&self, job: &JobItem, metadata: HashMap<String, String>) -> Result<()>;
async fn get_latest_job_by_type(&self, job_type: JobType) -> Result<Option<JobItem>>;
async fn get_all_jobs(&self, job_type: JobType) -> Result<Cursor<JobItem>>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor is a mongodb type and the DatabaseTrait is generic across the database you're using. So we should change this to Vec and handle the conversion for Cursor -> Vec inside the MongoDB function

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need for this function for now as test is working without it. Will remove this and if in future we need to add this function will surely follow the above method.

@@ -170,6 +170,7 @@
fn get_job_handler(job_type: &JobType) -> Box<dyn Job> {
match job_type {
JobType::DataSubmission => Box::new(da_job::DaJob),
JobType::SnosRun => Box::new(da_job::DaJob),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ocdbytes let's fix this


#[rstest]
#[tokio::test]
async fn test_create_job() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async fn test_create_job() {
async fn test_snos_worker() {

for i in 1..6 {
db.expect_create_job()
.returning(|_| Ok(get_job_item_mock_by_id("1".to_string())))
.call(get_job_item_mock_by_id(i.to_string()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we just call get_job_item_mock_by_id once and use it in both places?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't work tried this value move ho jaati h

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try cloning?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

already tried

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cloning a string should ideally work, what were you doing exactly?

for i in 1..6 {
db.expect_get_job_by_internal_id_and_type()
.returning(|_, _| Ok(None))
.call(&i.to_string(), &JobType::SnosRun)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we aren't supposed to call anywhere. instead, we need to tell the mock that this function should be called once (times(1)) and it should be called with args xyz (https://docs.rs/mockall/latest/mockall/#matching-arguments)

is this why you were saying that the test doesn't make sense because we're calling everything ourselves?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually call behaviour is needed when test is run.
So what happens is that if I don't add this call then test will throw an error stating that no expectation behaviour found for this particular call. Because this function is called in the code while creating the job to check whether the job already exists, If yes It will throw an error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the error sounds like the code didn't call this method but it should've been called. We don't use call for the da test. Can you double or share the exact error message?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolved

let mut queue = MockQueueProvider::new();

// Mocking db functions
db.expect_get_latest_job_by_type().returning(|_| Ok(None)).call(JobType::SnosRun).expect("Failed to call.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also do a test case for when the db returns a value? we could use rstest with cases so that we can use the same code only

@@ -8,7 +14,35 @@ impl Worker for SnosWorker {
/// 1. Fetch the latest completed block from the Starknet chain
/// 2. Fetch the last block that had a SNOS job run.
/// 3. Create SNOS run jobs for all the remaining blocks
// TEST : added config temporarily to test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

todo!()
let config = config().await;
let provider = config.starknet_client();
let latest_block_number = provider.block_number().await.expect("Failed to fetch block number from rpc");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we make workers return a Result and throw an error wherever we do unwrap or expect

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that should be the ideal case but ig error is handled in create_job function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but we're doing unwraps and expects in worker, this will crash the orchestrator

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
In most cases it's better (imho) to propagate the error as far as possible and let the caller decide what to do with it rather than panic

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added error throwing in workers

.expect("Error : failed to create job for snos workers.");
}

log::info!("jobs created !!");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trace log?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed. It is a redundant log

let filter = doc! {
"job_type": mongodb::bson::to_bson(&job_type)?,
};
let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build();
Copy link
Contributor

@unstark unstark Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICS internal ID is a string without additional restrictions https://github.com/unstark/madara-orchestrator/blob/3fe37331183cf3291aaef9d1e2664e54e86f0648/crates/orchestrator/src/jobs/types.rs#L104, so it's not necessarily a unique incrementing integer. We might need an additional field, e.g. created_at in microseconds to achieve what you want

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So internal_id would actually be the block number but in String. The reason it was made a string is that jobs use this column to uniquely identify a job based on the type. Like, I want the SNOS run of block 6. Now jobs can have any sort of internal id (number, uuid etc.). So as a generic, a string param was used here. I am not sure if this is the best approach though but here, -1 should work because it's a number represented as string?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok nvm, this is incorrect. This function can be used by any job so if some job has a different form of internal id, this will break. However, the created_at might not be the best check either because what if we create jobs in parallel somewhere or after receiving something from a queue, there's no guarantee that the jobs would be created sequentially.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, we can explicitly rename this function to say get_latest_job_by_internal_id so it makes the purpose of the function clear. And then later, we can have another function of get_latest_by_created_at if some job is ok using that. wdyt?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

function name changed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would work (renaming), as for the created_at - even if there is a collision, the function would still do its job :)

let config = config().await;
let provider = config.starknet_client();
let latest_block_number = provider.block_number().await.expect("Failed to fetch block number from rpc");
let latest_block_processed_data = config
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it be the case that there are gaps in the snos job executions?
For instance:

block 1: snos job succeeded
block 2: snos job failed
block 3: snos job succeeded
block 4: snos job pending

Ideally you should also search for failed ones and retry (if possible)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the job execution already has a retry logic. if something moved to a failed state, we have ideally already retried enough number of times and we should throw and alert and investigate manually, wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right, that makes sense 👍

@ocdbytes ocdbytes merged commit b625f90 into main Jun 17, 2024
6 checks passed
Tranduy1dol referenced this pull request in sota-zk-labs/madara-orchestrator Aug 1, 2024
* feat : added snos worker implementation and unit tests

* feat : added review #1 changes : added error handling for snos workers

* feat : added review #1 changes : added error handling for snos workers

* fix : lint

* fix : lint errors

---------

Co-authored-by: Arun Jangra <[email protected]>
ocdbytes added a commit that referenced this pull request Oct 16, 2024
* feat : added snos worker implementation and unit tests

* feat : added review #1 changes : added error handling for snos workers

* feat : added review #1 changes : added error handling for snos workers

* fix : lint

* fix : lint errors

---------

Co-authored-by: Arun Jangra <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants