Skip to content

Commit

Permalink
fix(proof_data_handler): Unlock jobs on transient errors (#2486)
Browse files Browse the repository at this point in the history
## What ❔

Currently, proof data handler locks the job for proof generation, and
starts fetching required data after that.
If any error happens during fetching, the method will err, and the job
will remain locked.

This PR changes it, so that if any error occurs, we unlock the job
before we return an error.

Additionally, it reduces the amount of non-necessary panics in the
touched code, and adds some docs.

## Why ❔

Correctness & efficiency.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
popzxc authored Jul 26, 2024
1 parent b9cb222 commit 7c336b1
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 44 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 50 additions & 7 deletions core/lib/dal/src/proof_generation_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,14 @@ enum ProofGenerationJobStatus {
}

impl ProofGenerationDal<'_, '_> {
pub async fn get_next_block_to_be_proven(
/// Chooses the batch number so that it has all the necessary data to generate the proof
/// and is not already picked.
///
/// Marks the batch as picked by the prover, preventing it from being picked twice.
///
/// The batch can be unpicked either via a corresponding DAL method, or it is considered
/// not picked after `processing_timeout` passes.
pub async fn lock_batch_for_proving(
&mut self,
processing_timeout: Duration,
) -> DalResult<Option<L1BatchNumber>> {
Expand Down Expand Up @@ -72,14 +79,38 @@ impl ProofGenerationDal<'_, '_> {
"#,
&processing_timeout,
)
.fetch_optional(self.storage.conn())
.await
.unwrap()
.instrument("lock_batch_for_proving")
.with_arg("processing_timeout", &processing_timeout)
.fetch_optional(self.storage)
.await?
.map(|row| L1BatchNumber(row.l1_batch_number as u32));

Ok(result)
}

/// Marks a previously locked batch as 'unpicked', allowing it to be picked without having
/// to wait for the processing timeout.
pub async fn unlock_batch(&mut self, l1_batch_number: L1BatchNumber) -> DalResult<()> {
let batch_number = i64::from(l1_batch_number.0);
sqlx::query!(
r#"
UPDATE proof_generation_details
SET
status = 'unpicked',
updated_at = NOW()
WHERE
l1_batch_number = $1
"#,
batch_number,
)
.instrument("unlock_batch")
.with_arg("l1_batch_number", &l1_batch_number)
.execute(self.storage)
.await?;

Ok(())
}

pub async fn save_proof_artifacts_metadata(
&mut self,
batch_number: L1BatchNumber,
Expand Down Expand Up @@ -388,7 +419,7 @@ mod tests {

let picked_l1_batch = conn
.proof_generation_dal()
.get_next_block_to_be_proven(Duration::MAX)
.lock_batch_for_proving(Duration::MAX)
.await
.unwrap();
assert_eq!(picked_l1_batch, Some(L1BatchNumber(1)));
Expand All @@ -399,10 +430,22 @@ mod tests {
.unwrap();
assert_eq!(unpicked_l1_batch, None);

// Check that we can unlock the batch and then pick it again.
conn.proof_generation_dal()
.unlock_batch(L1BatchNumber(1))
.await
.unwrap();
let picked_l1_batch = conn
.proof_generation_dal()
.lock_batch_for_proving(Duration::MAX)
.await
.unwrap();
assert_eq!(picked_l1_batch, Some(L1BatchNumber(1)));

// Check that with small enough processing timeout, the L1 batch can be picked again
let picked_l1_batch = conn
.proof_generation_dal()
.get_next_block_to_be_proven(Duration::ZERO)
.lock_batch_for_proving(Duration::ZERO)
.await
.unwrap();
assert_eq!(picked_l1_batch, Some(L1BatchNumber(1)));
Expand All @@ -414,7 +457,7 @@ mod tests {

let picked_l1_batch = conn
.proof_generation_dal()
.get_next_block_to_be_proven(Duration::MAX)
.lock_batch_for_proving(Duration::MAX)
.await
.unwrap();
assert_eq!(picked_l1_batch, None);
Expand Down
104 changes: 67 additions & 37 deletions core/node/proof_data_handler/src/request_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,64 @@ impl RequestProcessor {
) -> Result<Json<ProofGenerationDataResponse>, RequestProcessorError> {
tracing::info!("Received request for proof generation data: {:?}", request);

let l1_batch_number_result = self
.pool
let l1_batch_number = match self.lock_batch_for_proving().await? {
Some(number) => number,
None => return Ok(Json(ProofGenerationDataResponse::Success(None))), // no batches pending to be proven
};

let proof_generation_data = self
.proof_generation_data_for_existing_batch(l1_batch_number)
.await;

// If we weren't able to fetch all the data, we should unlock the batch before returning.
match proof_generation_data {
Ok(data) => Ok(Json(ProofGenerationDataResponse::Success(Some(Box::new(
data,
))))),
Err(err) => {
self.unlock_batch(l1_batch_number).await?;
Err(err)
}
}
}

/// Will choose a batch that has all the required data and isn't picked up by any prover yet.
async fn lock_batch_for_proving(&self) -> Result<Option<L1BatchNumber>, RequestProcessorError> {
self.pool
.connection()
.await
.unwrap()
.map_err(RequestProcessorError::Dal)?
.proof_generation_dal()
.get_next_block_to_be_proven(self.config.proof_generation_timeout())
.lock_batch_for_proving(self.config.proof_generation_timeout())
.await
.map_err(RequestProcessorError::Dal)?;
.map_err(RequestProcessorError::Dal)
}

let l1_batch_number = match l1_batch_number_result {
Some(number) => number,
None => return Ok(Json(ProofGenerationDataResponse::Success(None))), // no batches pending to be proven
};
/// Marks the batch as 'unpicked', allowing it to be picked up by another prover.
async fn unlock_batch(
&self,
l1_batch_number: L1BatchNumber,
) -> Result<(), RequestProcessorError> {
self.pool
.connection()
.await
.map_err(RequestProcessorError::Dal)?
.proof_generation_dal()
.unlock_batch(l1_batch_number)
.await
.map_err(RequestProcessorError::Dal)
}

/// Will fetch all the required data for the batch and return it.
///
/// ## Panics
///
/// Expects all the data to be present in the database.
/// Will panic if any of the required data is missing.
async fn proof_generation_data_for_existing_batch(
&self,
l1_batch_number: L1BatchNumber,
) -> Result<ProofGenerationData, RequestProcessorError> {
let vm_run_data: VMRunWitnessInputData = self
.blob_store
.get(l1_batch_number)
Expand All @@ -77,52 +120,43 @@ impl RequestProcessor {
.await
.map_err(RequestProcessorError::ObjectStore)?;

let previous_batch_metadata = self
// Acquire connection after interacting with GCP, to avoid holding the connection for too long.
let mut conn = self
.pool
.connection()
.await
.unwrap()
.map_err(RequestProcessorError::Dal)?;

let previous_batch_metadata = conn
.blocks_dal()
.get_l1_batch_metadata(L1BatchNumber(l1_batch_number.checked_sub(1).unwrap()))
.await
.unwrap()
.map_err(RequestProcessorError::Dal)?
.expect("No metadata for previous batch");

let header = self
.pool
.connection()
.await
.unwrap()
let header = conn
.blocks_dal()
.get_l1_batch_header(l1_batch_number)
.await
.unwrap()
.map_err(RequestProcessorError::Dal)?
.unwrap_or_else(|| panic!("Missing header for {}", l1_batch_number));

let minor_version = header.protocol_version.unwrap();
let protocol_version = self
.pool
.connection()
.await
.unwrap()
let protocol_version = conn
.protocol_versions_dal()
.get_protocol_version_with_latest_patch(minor_version)
.await
.unwrap()
.map_err(RequestProcessorError::Dal)?
.unwrap_or_else(|| {
panic!("Missing l1 verifier info for protocol version {minor_version}")
});

let batch_header = self
.pool
.connection()
.await
.unwrap()
let batch_header = conn
.blocks_dal()
.get_l1_batch_header(l1_batch_number)
.await
.unwrap()
.unwrap();
.map_err(RequestProcessorError::Dal)?
.unwrap_or_else(|| panic!("Missing header for {}", l1_batch_number));

let eip_4844_blobs = match self.commitment_mode {
L1BatchCommitmentMode::Validium => Eip4844Blobs::empty(),
Expand All @@ -149,16 +183,12 @@ impl RequestProcessor {

METRICS.observe_blob_sizes(&blob);

let proof_gen_data = ProofGenerationData {
Ok(ProofGenerationData {
l1_batch_number,
witness_input_data: blob,
protocol_version: protocol_version.version,
l1_verifier_config: protocol_version.l1_verifier_config,
};

Ok(Json(ProofGenerationDataResponse::Success(Some(Box::new(
proof_gen_data,
)))))
})
}

pub(crate) async fn submit_proof(
Expand Down

0 comments on commit 7c336b1

Please sign in to comment.