Skip to content

Commit

Permalink
feat: External prover API metrics, refactoring (#2630)
Browse files Browse the repository at this point in the history
## What ❔

Added metrics for external proof integration API, refactored code a
little bit

## Why ❔

<!-- Why are these changes done? What goal do they contribute to? What
are the principles behind them? -->
<!-- Example: PR templates ensure PR reviewers, observers, and future
iterators are in context about the evolution of repos. -->

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
Artemka374 committed Aug 22, 2024
1 parent 30edda4 commit c83cca8
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 41 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/lib/dal/src/proof_generation_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl ProofGenerationDal<'_, '_> {
Ok(result)
}

pub async fn get_available_batch(&mut self) -> DalResult<L1BatchNumber> {
pub async fn get_latest_proven_batch(&mut self) -> DalResult<L1BatchNumber> {
let result = sqlx::query!(
r#"
SELECT
Expand Down
1 change: 1 addition & 0 deletions core/node/external_proof_integration_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ zksync_dal.workspace = true
tokio.workspace = true
bincode.workspace = true
anyhow.workspace = true
vise.workspace = true
1 change: 1 addition & 0 deletions core/node/external_proof_integration_api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod error;
mod metrics;
mod processor;

use std::{net::SocketAddr, sync::Arc};
Expand Down
55 changes: 55 additions & 0 deletions core/node/external_proof_integration_api/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use std::time::Duration;

use tokio::time::Instant;
use vise::{EncodeLabelSet, EncodeLabelValue, Histogram, LabeledFamily, Metrics};

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelSet, EncodeLabelValue)]
#[metrics(label = "outcome", rename_all = "snake_case")]
pub(crate) enum CallOutcome {
Success,
Failure,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelSet, EncodeLabelValue)]
#[metrics(label = "type", rename_all = "snake_case")]
pub(crate) enum Method {
GetLatestProofGenerationData,
GetSpecificProofGenerationData,
VerifyProof,
}

#[derive(Debug, Metrics)]
#[metrics(prefix = "external_proof_integration_api")]
pub(crate) struct ProofIntegrationApiMetrics {
#[metrics(labels = ["method", "outcome"], buckets = vise::Buckets::LATENCIES)]
pub call_latency: LabeledFamily<(Method, CallOutcome), Histogram<Duration>, 2>,
}

pub(crate) struct MethodCallGuard {
method_type: Method,
outcome: CallOutcome,
started_at: Instant,
}

impl MethodCallGuard {
pub(crate) fn new(method_type: Method) -> Self {
MethodCallGuard {
method_type,
outcome: CallOutcome::Failure,
started_at: Instant::now(),
}
}

pub(crate) fn mark_successful(&mut self) {
self.outcome = CallOutcome::Success;
}
}

impl Drop for MethodCallGuard {
fn drop(&mut self) {
METRICS.call_latency[&(self.method_type, self.outcome)].observe(self.started_at.elapsed());
}
}

#[vise::register]
pub(crate) static METRICS: vise::Global<ProofIntegrationApiMetrics> = vise::Global::new();
78 changes: 47 additions & 31 deletions core/node/external_proof_integration_api/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use zksync_prover_interface::{
outputs::L1BatchProofForL1,
};

use crate::error::ProcessorError;
use crate::{
error::ProcessorError,
metrics::{Method, MethodCallGuard},
};

#[derive(Clone)]
pub(crate) struct Processor {
Expand All @@ -39,20 +42,55 @@ impl Processor {
}
}

pub(crate) async fn verify_proof(
&self,
Path(l1_batch_number): Path<u32>,
Json(payload): Json<VerifyProofRequest>,
) -> Result<(), ProcessorError> {
let mut guard = MethodCallGuard::new(Method::VerifyProof);

let l1_batch_number = L1BatchNumber(l1_batch_number);
tracing::info!(
"Received request to verify proof for batch: {:?}",
l1_batch_number
);

let serialized_proof = bincode::serialize(&payload.0)?;
let expected_proof = bincode::serialize(
&self
.blob_store
.get::<L1BatchProofForL1>((l1_batch_number, payload.0.protocol_version))
.await?,
)?;

if serialized_proof != expected_proof {
return Err(ProcessorError::InvalidProof);
}

guard.mark_successful();

Ok(())
}

#[tracing::instrument(skip_all)]
pub(crate) async fn get_proof_generation_data(
&mut self,
request: Json<OptionalProofGenerationDataRequest>,
) -> Result<Json<ProofGenerationDataResponse>, ProcessorError> {
tracing::info!("Received request for proof generation data: {:?}", request);

let mut guard = match request.0 .0 {
Some(_) => MethodCallGuard::new(Method::GetSpecificProofGenerationData),
None => MethodCallGuard::new(Method::GetLatestProofGenerationData),
};

let latest_available_batch = self
.pool
.connection()
.await
.unwrap()
.proof_generation_dal()
.get_available_batch()
.get_latest_proven_batch()
.await?;

let l1_batch_number = if let Some(l1_batch_number) = request.0 .0 {
Expand All @@ -74,9 +112,13 @@ impl Processor {
.await;

match proof_generation_data {
Ok(data) => Ok(Json(ProofGenerationDataResponse::Success(Some(Box::new(
data,
))))),
Ok(data) => {
guard.mark_successful();

Ok(Json(ProofGenerationDataResponse::Success(Some(Box::new(
data,
)))))
}
Err(err) => Err(err),
}
}
Expand Down Expand Up @@ -161,30 +203,4 @@ impl Processor {
l1_verifier_config: protocol_version.l1_verifier_config,
})
}

pub(crate) async fn verify_proof(
&self,
Path(l1_batch_number): Path<u32>,
Json(payload): Json<VerifyProofRequest>,
) -> Result<(), ProcessorError> {
let l1_batch_number = L1BatchNumber(l1_batch_number);
tracing::info!(
"Received request to verify proof for batch: {:?}",
l1_batch_number
);

let serialized_proof = bincode::serialize(&payload.0)?;
let expected_proof = bincode::serialize(
&self
.blob_store
.get::<L1BatchProofForL1>((l1_batch_number, payload.0.protocol_version))
.await?,
)?;

if serialized_proof != expected_proof {
return Err(ProcessorError::InvalidProof);
}

Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ pub struct ExternalProofIntegrationApiLayer {
#[derive(Debug, FromContext)]
#[context(crate = crate)]
pub struct Input {
pub master_pool: PoolResource<ReplicaPool>,
pub replica_pool: PoolResource<ReplicaPool>,
pub object_store: ObjectStoreResource,
}

#[derive(Debug, IntoContext)]
#[context(crate = crate)]
pub struct Output {
#[context(task)]
pub task: ProverApiTask,
pub task: ExternalProofIntegrationApiTask,
}

impl ExternalProofIntegrationApiLayer {
Expand All @@ -59,13 +59,13 @@ impl WiringLayer for ExternalProofIntegrationApiLayer {
}

async fn wire(self, input: Self::Input) -> Result<Self::Output, WiringError> {
let main_pool = input.master_pool.get().await?;
let replica_pool = input.replica_pool.get().await.unwrap();
let blob_store = input.object_store.0;

let task = ProverApiTask {
let task = ExternalProofIntegrationApiTask {
external_proof_integration_api_config: self.external_proof_integration_api_config,
blob_store,
main_pool,
replica_pool,
commitment_mode: self.commitment_mode,
};

Expand All @@ -74,15 +74,15 @@ impl WiringLayer for ExternalProofIntegrationApiLayer {
}

#[derive(Debug)]
pub struct ProverApiTask {
pub struct ExternalProofIntegrationApiTask {
external_proof_integration_api_config: ExternalProofIntegrationApiConfig,
blob_store: Arc<dyn ObjectStore>,
main_pool: ConnectionPool<Core>,
replica_pool: ConnectionPool<Core>,
commitment_mode: L1BatchCommitmentMode,
}

#[async_trait::async_trait]
impl Task for ProverApiTask {
impl Task for ExternalProofIntegrationApiTask {
fn id(&self) -> TaskId {
"external_proof_integration_api".into()
}
Expand All @@ -91,7 +91,7 @@ impl Task for ProverApiTask {
zksync_external_proof_integration_api::run_server(
self.external_proof_integration_api_config,
self.blob_store,
self.main_pool,
self.replica_pool,
self.commitment_mode,
stop_receiver.0,
)
Expand Down

0 comments on commit c83cca8

Please sign in to comment.