Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

boilerplate code #9

Merged
merged 3 commits into from
Jun 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ members = [
"crates/da_clients/da-client-interface",
"crates/da_clients/ethereum",
"crates/utils",
"crates/settlement_clients/settlement-client-interface",
]

[workspace.package]
Expand Down
3 changes: 2 additions & 1 deletion crates/orchestrator/src/controllers/jobs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::controllers::errors::AppError;
use crate::jobs::types::JobType;
use axum::extract::Json;
use serde::Deserialize;
use std::collections::HashMap;

/// Client request to create a job
#[derive(Debug, Deserialize)]
Expand All @@ -16,6 +17,6 @@ pub struct CreateJobRequest {

/// Create a job
pub async fn create_job(Json(payload): Json<CreateJobRequest>) -> Result<Json<()>, AppError> {
crate::jobs::create_job(payload.job_type, payload.internal_id).await?;
crate::jobs::create_job(payload.job_type, payload.internal_id, HashMap::new()).await?;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

The results indicate that the create_job function is being called with the new metadata parameter in the relevant files. Specifically, the instances in crates/orchestrator/src/tests/jobs/da_job/mod.rs and crates/orchestrator/src/controllers/jobs_controller.rs show the correct usage with HashMap::new().

However, there are several instances where create_job is defined but not called with the new parameter, such as in crates/orchestrator/src/database/mongodb/mod.rs, crates/orchestrator/src/jobs/state_update_job/mod.rs, crates/orchestrator/src/jobs/snos_job/mod.rs, and crates/orchestrator/src/jobs/register_proof_job/mod.rs. These instances need to be reviewed to ensure they are updated to use the new signature.

Action Required:

Please update the following files to ensure that all calls to create_job include the new metadata parameter:

  • crates/orchestrator/src/database/mongodb/mod.rs
  • crates/orchestrator/src/jobs/state_update_job/mod.rs
  • crates/orchestrator/src/jobs/snos_job/mod.rs
  • crates/orchestrator/src/jobs/register_proof_job/mod.rs
Analysis chain

Modification to create_job to include metadata.

The addition of a metadata parameter in create_job is consistent with the expanded functionality. Please ensure that all calls to this function across the codebase have been updated to pass the necessary metadata.


Modification to create_job to include metadata.

The addition of a metadata parameter in create_job is consistent with the expanded functionality. Please ensure that all calls to this function across the codebase have been updated to pass the necessary metadata.


The output from the script shows multiple occurrences of create_job across different files. To ensure all calls to create_job have been updated to include the new metadata parameter, we need to inspect these occurrences more closely.

Next Steps:

  1. Verify that all instances of create_job in the codebase are using the new signature with the metadata parameter.
  2. Specifically check the following files for correct usage:
    • crates/orchestrator/src/tests/jobs/mod.rs
    • crates/orchestrator/src/tests/jobs/da_job/mod.rs
    • crates/orchestrator/src/routes.rs
    • crates/orchestrator/src/database/mongodb/mod.rs
    • crates/orchestrator/src/jobs/mod.rs
    • crates/orchestrator/src/jobs/state_update_job/mod.rs
    • crates/orchestrator/src/jobs/snos_job/mod.rs
    • crates/orchestrator/src/jobs/register_proof_job/mod.rs
    • crates/orchestrator/src/jobs/da_job/mod.rs
    • crates/orchestrator/src/database/mod.rs
    • crates/orchestrator/src/controllers/jobs_controller.rs
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify all function calls to `create_job` include the new metadata parameter.

# Test: Search for the function usage. Expect: Only occurances with the new signature.
rg --type rust $'create_job'

Length of output: 1756



Script:

#!/bin/bash
# Description: Verify all function calls to `create_job` include the new metadata parameter.

# Test: Search for the function usage. Expect: Only occurances with the new signature.
rg --type rust $'create_job'

Length of output: 1756


Script:

#!/bin/bash
# Verify that all instances of `create_job` in the codebase are using the new signature with the metadata parameter.

# Check each file for the correct usage of `create_job`.
files=(
  "crates/orchestrator/src/tests/jobs/mod.rs"
  "crates/orchestrator/src/tests/jobs/da_job/mod.rs"
  "crates/orchestrator/src/routes.rs"
  "crates/orchestrator/src/database/mongodb/mod.rs"
  "crates/orchestrator/src/jobs/mod.rs"
  "crates/orchestrator/src/jobs/state_update_job/mod.rs"
  "crates/orchestrator/src/jobs/snos_job/mod.rs"
  "crates/orchestrator/src/jobs/register_proof_job/mod.rs"
  "crates/orchestrator/src/jobs/da_job/mod.rs"
  "crates/orchestrator/src/database/mod.rs"
  "crates/orchestrator/src/controllers/jobs_controller.rs"
)

for file in "${files[@]}"; do
  echo "Checking $file"
  rg 'create_job\(' "$file"
done

Length of output: 3664

Ok(Json::from(()))
}
9 changes: 7 additions & 2 deletions crates/orchestrator/src/jobs/da_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,19 @@ pub struct DaJob;

#[async_trait]
impl Job for DaJob {
async fn create_job(&self, _config: &Config, internal_id: String) -> Result<JobItem> {
async fn create_job(
&self,
_config: &Config,
internal_id: String,
metadata: HashMap<String, String>,
) -> Result<JobItem> {
Ok(JobItem {
id: Uuid::new_v4(),
internal_id,
job_type: JobType::DataSubmission,
status: JobStatus::Created,
external_id: String::new().into(),
metadata: HashMap::new(),
metadata,
version: 0,
})
}
Expand Down
14 changes: 11 additions & 3 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ use uuid::Uuid;

mod constants;
pub mod da_job;
mod register_proof_job;
pub mod snos_job;
mod state_update_job;
pub mod types;

/// The Job trait is used to define the methods that a job
Expand All @@ -20,7 +23,12 @@ pub mod types;
#[async_trait]
pub trait Job: Send + Sync {
/// Should build a new job item and return it
async fn create_job(&self, config: &Config, internal_id: String) -> Result<JobItem>;
async fn create_job(
&self,
config: &Config,
internal_id: String,
metadata: HashMap<String, String>,
) -> Result<JobItem>;
/// Should process the job and return the external_id which can be used to
/// track the status of the job. For example, a DA job will submit the state diff
/// to the DA layer and return the txn hash.
Expand All @@ -40,7 +48,7 @@ pub trait Job: Send + Sync {
}

/// Creates the job in the DB in the created state and adds it to the process queue
pub async fn create_job(job_type: JobType, internal_id: String) -> Result<()> {
pub async fn create_job(job_type: JobType, internal_id: String, metadata: HashMap<String, String>) -> Result<()> {
let config = config().await;
let existing_job = config.database().get_job_by_internal_id_and_type(internal_id.as_str(), &job_type).await?;
if existing_job.is_some() {
Expand All @@ -53,7 +61,7 @@ pub async fn create_job(job_type: JobType, internal_id: String) -> Result<()> {
}

let job_handler = get_job_handler(&job_type);
let job_item = job_handler.create_job(config, internal_id).await?;
let job_item = job_handler.create_job(config, internal_id, metadata).await?;
config.database().create_job(job_item.clone()).await?;

add_job_to_process_queue(job_item.id).await?;
Expand Down
55 changes: 55 additions & 0 deletions crates/orchestrator/src/jobs/register_proof_job/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use crate::config::Config;
use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
use crate::jobs::Job;
use async_trait::async_trait;
use color_eyre::Result;
use std::collections::HashMap;
use uuid::Uuid;

pub struct RegisterProofJob;

#[async_trait]
impl Job for RegisterProofJob {
async fn create_job(
&self,
_config: &Config,
internal_id: String,
metadata: HashMap<String, String>,
) -> Result<JobItem> {
Ok(JobItem {
id: Uuid::new_v4(),
internal_id,
job_type: JobType::ProofRegistration,
status: JobStatus::Created,
external_id: String::new().into(),
// metadata must contain the blocks that have been included inside this proof
// this will allow state update jobs to be created for each block
metadata,
version: 0,
})
}

async fn process_job(&self, _config: &Config, _job: &JobItem) -> Result<String> {
// Get proof from S3 and submit on chain for verification
// We need to implement a generic trait for this to support multiple
// base layers
todo!()
}

async fn verify_job(&self, _config: &Config, _job: &JobItem) -> Result<JobVerificationStatus> {
// verify that the proof transaction has been included on chain
todo!()
}

fn max_process_attempts(&self) -> u64 {
todo!()
}

fn max_verification_attempts(&self) -> u64 {
todo!()
}

fn verification_polling_delay_seconds(&self) -> u64 {
todo!()
}
}
Comment on lines +9 to +55
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RegisterProofJob struct and its methods are well implemented. However, the todo!() placeholders in methods like process_job and verify_job indicate incomplete implementation. Ensure these are addressed before merging.

54 changes: 54 additions & 0 deletions crates/orchestrator/src/jobs/snos_job/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use crate::config::Config;
use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
use crate::jobs::Job;
use async_trait::async_trait;
use color_eyre::Result;
use std::collections::HashMap;
use uuid::Uuid;

pub struct SnosJob;

#[async_trait]
impl Job for SnosJob {
async fn create_job(
&self,
_config: &Config,
internal_id: String,
metadata: HashMap<String, String>,
) -> Result<JobItem> {
Ok(JobItem {
id: Uuid::new_v4(),
internal_id,
job_type: JobType::SnosRun,
status: JobStatus::Created,
external_id: String::new().into(),
metadata,
version: 0,
})
}

async fn process_job(&self, _config: &Config, _job: &JobItem) -> Result<String> {
// 1. Fetch SNOS input data from Madara
// 2. Import SNOS in Rust and execute it with the input data
// 3. Store the received PIE in DB
todo!()
}

async fn verify_job(&self, _config: &Config, _job: &JobItem) -> Result<JobVerificationStatus> {
// No need for verification as of now. If we later on decide to outsource SNOS run
// to another servicehow a, verify_job can be used to poll on the status of the job
todo!()
}

fn max_process_attempts(&self) -> u64 {
todo!()
}

fn max_verification_attempts(&self) -> u64 {
todo!()
}

fn verification_polling_delay_seconds(&self) -> u64 {
todo!()
}
}
Comment on lines +9 to +54
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SnosJob struct and its methods are well implemented. However, the todo!() placeholders in methods like process_job and verify_job indicate incomplete implementation. Ensure these are addressed before merging.

54 changes: 54 additions & 0 deletions crates/orchestrator/src/jobs/state_update_job/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use crate::config::Config;
use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
use crate::jobs::Job;
use async_trait::async_trait;
use color_eyre::Result;
use std::collections::HashMap;
use uuid::Uuid;

pub struct StateUpdateJob;

#[async_trait]
impl Job for StateUpdateJob {
async fn create_job(
&self,
_config: &Config,
internal_id: String,
metadata: HashMap<String, String>,
) -> Result<JobItem> {
Ok(JobItem {
id: Uuid::new_v4(),
internal_id,
job_type: JobType::ProofRegistration,
status: JobStatus::Created,
external_id: String::new().into(),
// metadata must contain the blocks for which state update will be performed
// we don't do one job per state update as that makes nonce management complicated
metadata,
version: 0,
})
}

async fn process_job(&self, _config: &Config, _job: &JobItem) -> Result<String> {
// Read the metadata to get the blocks for which state update will be performed.
// For each block, get the program output (from the PIE?) and the
todo!()
}

async fn verify_job(&self, _config: &Config, _job: &JobItem) -> Result<JobVerificationStatus> {
// verify that the proof transaction has been included on chain
todo!()
}

fn max_process_attempts(&self) -> u64 {
todo!()
}

fn max_verification_attempts(&self) -> u64 {
todo!()
}

fn verification_polling_delay_seconds(&self) -> u64 {
todo!()
}
}
Comment on lines +9 to +54
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The StateUpdateJob struct and its methods are well implemented. However, the todo!() placeholders in methods like process_job and verify_job indicate incomplete implementation. Ensure these are addressed before merging.

4 changes: 3 additions & 1 deletion crates/orchestrator/src/jobs/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,14 @@ fn unwrap_external_id_failed(expected: &str, got: &ExternalId) -> color_eyre::ey

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub enum JobType {
/// Running SNOS for a block
SnosRun,
/// Submitting DA data to the DA layer
DataSubmission,
/// Getting a proof from the proving service
ProofCreation,
/// Verifying the proof on the base layer
ProofVerification,
ProofRegistration,
/// Updaing the state root on the base layer
StateUpdation,
}
Expand Down
2 changes: 2 additions & 0 deletions crates/orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ pub mod queue;
pub mod routes;
/// Contains the utils
pub mod utils;
/// Contains workers which act like cron jobs
pub mod workers;

#[cfg(test)]
mod tests;
20 changes: 20 additions & 0 deletions crates/orchestrator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ use orchestrator::config::config;
use orchestrator::queue::init_consumers;
use orchestrator::routes::app_router;
use orchestrator::utils::env_utils::get_env_var_or_default;
use orchestrator::workers::proof_registration::ProofRegistrationWorker;
use orchestrator::workers::proving::ProvingWorker;
use orchestrator::workers::snos::SnosWorker;
use orchestrator::workers::update_state::UpdateStateWorker;
use orchestrator::workers::*;

/// Start the server
#[tokio::main]
Expand All @@ -21,6 +26,21 @@ async fn main() {
// init consumer
init_consumers().await.expect("Failed to init consumers");

// spawn a thread for each worker
// changes in rollup mode - sovereign, validity, validiums etc.
// will likely involve changes in these workers as well
tokio::spawn(start_cron(Box::new(SnosWorker), 60));
tokio::spawn(start_cron(Box::new(ProvingWorker), 60));
tokio::spawn(start_cron(Box::new(ProofRegistrationWorker), 60));
tokio::spawn(start_cron(Box::new(UpdateStateWorker), 60));

tracing::info!("Listening on http://{}", address);
axum::serve(listener, app).await.expect("Failed to start axum server");
}

async fn start_cron(worker: Box<dyn Worker>, interval: u64) {
loop {
worker.run_worker().await;
tokio::time::sleep(tokio::time::Duration::from_secs(interval)).await;
}
}
2 changes: 1 addition & 1 deletion crates/orchestrator/src/tests/jobs/da_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use da_client_interface::{DaVerificationStatus, MockDaClient};
#[tokio::test]
async fn test_create_job() {
let config = init_config(None, None, None, None).await;
let job = DaJob.create_job(&config, String::from("0")).await;
let job = DaJob.create_job(&config, String::from("0"), HashMap::new()).await;
assert!(job.is_ok());

let job = job.unwrap();
Expand Down
11 changes: 11 additions & 0 deletions crates/orchestrator/src/workers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use async_trait::async_trait;

pub mod proof_registration;
pub mod proving;
pub mod snos;
pub mod update_state;

#[async_trait]
pub trait Worker: Send + Sync {
async fn run_worker(&self);
}
14 changes: 14 additions & 0 deletions crates/orchestrator/src/workers/proof_registration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use crate::workers::Worker;
use async_trait::async_trait;

pub struct ProofRegistrationWorker;

#[async_trait]
impl Worker for ProofRegistrationWorker {
/// 1. Fetch all blocks with a successful proving job run
/// 2. Group blocks that have the same proof
/// 3. For each group, create a proof registration job with from and to block in metadata
async fn run_worker(&self) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation pending in run_worker.

The run_worker method is currently not implemented and contains a todo!() placeholder. Would you like me to help with the implementation or should we track this as an issue on GitHub?

todo!()
}
}
13 changes: 13 additions & 0 deletions crates/orchestrator/src/workers/proving.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use crate::workers::Worker;
use async_trait::async_trait;

pub struct ProvingWorker;

#[async_trait]
impl Worker for ProvingWorker {
/// 1. Fetch all successful SNOS job runs that don't have a proving job
/// 2. Create a proving job for each SNOS job run
async fn run_worker(&self) {
todo!()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation pending in run_worker.

The run_worker method is currently not implemented and contains a todo!() placeholder. Would you like me to help with the implementation or should we track this as an issue on GitHub?

}
}
14 changes: 14 additions & 0 deletions crates/orchestrator/src/workers/snos.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use crate::workers::Worker;
use async_trait::async_trait;

pub struct SnosWorker;

#[async_trait]
impl Worker for SnosWorker {
/// 1. Fetch the latest completed block from the Starknet chain
/// 2. Fetch the last block that had a SNOS job run.
/// 3. Create SNOS run jobs for all the remaining blocks
async fn run_worker(&self) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation pending in run_worker.

The run_worker method is currently not implemented and contains a todo!() placeholder. Would you like me to help with the implementation or should we track this as an issue on GitHub?

todo!()
}
}
Loading
Loading