diff --git a/Cargo.lock b/Cargo.lock index 8fd24232663..8b8349bf3c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8755,6 +8755,7 @@ dependencies = [ "bincode", "tokio", "tracing", + "vise", "zksync_basic_types", "zksync_config", "zksync_dal", diff --git a/core/lib/dal/src/proof_generation_dal.rs b/core/lib/dal/src/proof_generation_dal.rs index f83f026073e..dada6c69ed3 100644 --- a/core/lib/dal/src/proof_generation_dal.rs +++ b/core/lib/dal/src/proof_generation_dal.rs @@ -88,7 +88,7 @@ impl ProofGenerationDal<'_, '_> { Ok(result) } - pub async fn get_available_batch(&mut self) -> DalResult { + pub async fn get_latest_proven_batch(&mut self) -> DalResult { let result = sqlx::query!( r#" SELECT diff --git a/core/node/external_proof_integration_api/Cargo.toml b/core/node/external_proof_integration_api/Cargo.toml index ae7cd4c4d03..2e8176cd883 100644 --- a/core/node/external_proof_integration_api/Cargo.toml +++ b/core/node/external_proof_integration_api/Cargo.toml @@ -21,3 +21,4 @@ zksync_dal.workspace = true tokio.workspace = true bincode.workspace = true anyhow.workspace = true +vise.workspace = true diff --git a/core/node/external_proof_integration_api/src/lib.rs b/core/node/external_proof_integration_api/src/lib.rs index 51fecf8c23f..b1ef33b44c1 100644 --- a/core/node/external_proof_integration_api/src/lib.rs +++ b/core/node/external_proof_integration_api/src/lib.rs @@ -1,4 +1,5 @@ mod error; +mod metrics; mod processor; use std::{net::SocketAddr, sync::Arc}; diff --git a/core/node/external_proof_integration_api/src/metrics.rs b/core/node/external_proof_integration_api/src/metrics.rs new file mode 100644 index 00000000000..70815f542a0 --- /dev/null +++ b/core/node/external_proof_integration_api/src/metrics.rs @@ -0,0 +1,55 @@ +use std::time::Duration; + +use tokio::time::Instant; +use vise::{EncodeLabelSet, EncodeLabelValue, Histogram, LabeledFamily, Metrics}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelSet, EncodeLabelValue)] +#[metrics(label = "outcome", rename_all = "snake_case")] +pub(crate) enum CallOutcome { + Success, + Failure, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelSet, EncodeLabelValue)] +#[metrics(label = "type", rename_all = "snake_case")] +pub(crate) enum Method { + GetLatestProofGenerationData, + GetSpecificProofGenerationData, + VerifyProof, +} + +#[derive(Debug, Metrics)] +#[metrics(prefix = "external_proof_integration_api")] +pub(crate) struct ProofIntegrationApiMetrics { + #[metrics(labels = ["method", "outcome"], buckets = vise::Buckets::LATENCIES)] + pub call_latency: LabeledFamily<(Method, CallOutcome), Histogram, 2>, +} + +pub(crate) struct MethodCallGuard { + method_type: Method, + outcome: CallOutcome, + started_at: Instant, +} + +impl MethodCallGuard { + pub(crate) fn new(method_type: Method) -> Self { + MethodCallGuard { + method_type, + outcome: CallOutcome::Failure, + started_at: Instant::now(), + } + } + + pub(crate) fn mark_successful(&mut self) { + self.outcome = CallOutcome::Success; + } +} + +impl Drop for MethodCallGuard { + fn drop(&mut self) { + METRICS.call_latency[&(self.method_type, self.outcome)].observe(self.started_at.elapsed()); + } +} + +#[vise::register] +pub(crate) static METRICS: vise::Global = vise::Global::new(); diff --git a/core/node/external_proof_integration_api/src/processor.rs b/core/node/external_proof_integration_api/src/processor.rs index a15e45e4803..e9e56df4a06 100644 --- a/core/node/external_proof_integration_api/src/processor.rs +++ b/core/node/external_proof_integration_api/src/processor.rs @@ -17,7 +17,10 @@ use zksync_prover_interface::{ outputs::L1BatchProofForL1, }; -use crate::error::ProcessorError; +use crate::{ + error::ProcessorError, + metrics::{Method, MethodCallGuard}, +}; #[derive(Clone)] pub(crate) struct Processor { @@ -39,6 +42,36 @@ impl Processor { } } + pub(crate) async fn verify_proof( + &self, + Path(l1_batch_number): Path, + Json(payload): Json, + ) -> Result<(), ProcessorError> { + let mut guard = MethodCallGuard::new(Method::VerifyProof); + + let l1_batch_number = L1BatchNumber(l1_batch_number); + tracing::info!( + "Received request to verify proof for batch: {:?}", + l1_batch_number + ); + + let serialized_proof = bincode::serialize(&payload.0)?; + let expected_proof = bincode::serialize( + &self + .blob_store + .get::((l1_batch_number, payload.0.protocol_version)) + .await?, + )?; + + if serialized_proof != expected_proof { + return Err(ProcessorError::InvalidProof); + } + + guard.mark_successful(); + + Ok(()) + } + #[tracing::instrument(skip_all)] pub(crate) async fn get_proof_generation_data( &mut self, @@ -46,13 +79,18 @@ impl Processor { ) -> Result, ProcessorError> { tracing::info!("Received request for proof generation data: {:?}", request); + let mut guard = match request.0 .0 { + Some(_) => MethodCallGuard::new(Method::GetSpecificProofGenerationData), + None => MethodCallGuard::new(Method::GetLatestProofGenerationData), + }; + let latest_available_batch = self .pool .connection() .await .unwrap() .proof_generation_dal() - .get_available_batch() + .get_latest_proven_batch() .await?; let l1_batch_number = if let Some(l1_batch_number) = request.0 .0 { @@ -74,9 +112,13 @@ impl Processor { .await; match proof_generation_data { - Ok(data) => Ok(Json(ProofGenerationDataResponse::Success(Some(Box::new( - data, - ))))), + Ok(data) => { + guard.mark_successful(); + + Ok(Json(ProofGenerationDataResponse::Success(Some(Box::new( + data, + ))))) + } Err(err) => Err(err), } } @@ -161,30 +203,4 @@ impl Processor { l1_verifier_config: protocol_version.l1_verifier_config, }) } - - pub(crate) async fn verify_proof( - &self, - Path(l1_batch_number): Path, - Json(payload): Json, - ) -> Result<(), ProcessorError> { - let l1_batch_number = L1BatchNumber(l1_batch_number); - tracing::info!( - "Received request to verify proof for batch: {:?}", - l1_batch_number - ); - - let serialized_proof = bincode::serialize(&payload.0)?; - let expected_proof = bincode::serialize( - &self - .blob_store - .get::((l1_batch_number, payload.0.protocol_version)) - .await?, - )?; - - if serialized_proof != expected_proof { - return Err(ProcessorError::InvalidProof); - } - - Ok(()) - } } diff --git a/core/node/node_framework/src/implementations/layers/external_proof_integration_api.rs b/core/node/node_framework/src/implementations/layers/external_proof_integration_api.rs index 6f8805bc5fa..9678c0a9793 100644 --- a/core/node/node_framework/src/implementations/layers/external_proof_integration_api.rs +++ b/core/node/node_framework/src/implementations/layers/external_proof_integration_api.rs @@ -26,7 +26,7 @@ pub struct ExternalProofIntegrationApiLayer { #[derive(Debug, FromContext)] #[context(crate = crate)] pub struct Input { - pub master_pool: PoolResource, + pub replica_pool: PoolResource, pub object_store: ObjectStoreResource, } @@ -34,7 +34,7 @@ pub struct Input { #[context(crate = crate)] pub struct Output { #[context(task)] - pub task: ProverApiTask, + pub task: ExternalProofIntegrationApiTask, } impl ExternalProofIntegrationApiLayer { @@ -59,13 +59,13 @@ impl WiringLayer for ExternalProofIntegrationApiLayer { } async fn wire(self, input: Self::Input) -> Result { - let main_pool = input.master_pool.get().await?; + let replica_pool = input.replica_pool.get().await.unwrap(); let blob_store = input.object_store.0; - let task = ProverApiTask { + let task = ExternalProofIntegrationApiTask { external_proof_integration_api_config: self.external_proof_integration_api_config, blob_store, - main_pool, + replica_pool, commitment_mode: self.commitment_mode, }; @@ -74,15 +74,15 @@ impl WiringLayer for ExternalProofIntegrationApiLayer { } #[derive(Debug)] -pub struct ProverApiTask { +pub struct ExternalProofIntegrationApiTask { external_proof_integration_api_config: ExternalProofIntegrationApiConfig, blob_store: Arc, - main_pool: ConnectionPool, + replica_pool: ConnectionPool, commitment_mode: L1BatchCommitmentMode, } #[async_trait::async_trait] -impl Task for ProverApiTask { +impl Task for ExternalProofIntegrationApiTask { fn id(&self) -> TaskId { "external_proof_integration_api".into() } @@ -91,7 +91,7 @@ impl Task for ProverApiTask { zksync_external_proof_integration_api::run_server( self.external_proof_integration_api_config, self.blob_store, - self.main_pool, + self.replica_pool, self.commitment_mode, stop_receiver.0, )