From 6b0b9029d2dd2227b944f666c69370615103f362 Mon Sep 17 00:00:00 2001 From: juan518munoz <62400508+juan518munoz@users.noreply.github.com> Date: Mon, 7 Oct 2024 13:15:29 -0300 Subject: [PATCH 1/2] feat: concurrent da_dispatcher (#288) * initial commit * use Notify for a more deterministic approach * replace atomic for mutex * move const to config --- core/lib/config/src/configs/da_dispatcher.rs | 4 + core/lib/config/src/testonly.rs | 1 + core/lib/env_config/src/da_dispatcher.rs | 5 +- core/lib/protobuf_config/src/da_dispatcher.rs | 2 + .../src/proto/config/da_dispatcher.proto | 1 + core/node/da_dispatcher/src/da_dispatcher.rs | 434 ++++++++++++------ 6 files changed, 297 insertions(+), 150 deletions(-) diff --git a/core/lib/config/src/configs/da_dispatcher.rs b/core/lib/config/src/configs/da_dispatcher.rs index e9ad6bd3c07..c8bf1b3b899 100644 --- a/core/lib/config/src/configs/da_dispatcher.rs +++ b/core/lib/config/src/configs/da_dispatcher.rs @@ -6,6 +6,7 @@ pub const DEFAULT_POLLING_INTERVAL_MS: u32 = 5000; pub const DEFAULT_MAX_ROWS_TO_DISPATCH: u32 = 100; pub const DEFAULT_MAX_RETRIES: u16 = 5; pub const DEFAULT_USE_DUMMY_INCLUSION_DATA: bool = false; +pub const DEFAULT_MAX_CONCURRENT_REQUESTS: u32 = 100; #[derive(Debug, Clone, PartialEq, Deserialize)] pub struct DADispatcherConfig { @@ -19,6 +20,8 @@ pub struct DADispatcherConfig { // TODO: run a verification task to check if the L1 contract expects the inclusion proofs to // avoid the scenario where contracts expect real proofs, and server is using dummy proofs. pub use_dummy_inclusion_data: Option, + /// The maximun number of concurrent request to send to the DA server. + pub max_concurrent_requests: Option, } impl DADispatcherConfig { @@ -28,6 +31,7 @@ impl DADispatcherConfig { max_rows_to_dispatch: Some(DEFAULT_MAX_ROWS_TO_DISPATCH), max_retries: Some(DEFAULT_MAX_RETRIES), use_dummy_inclusion_data: Some(DEFAULT_USE_DUMMY_INCLUSION_DATA), + max_concurrent_requests: Some(DEFAULT_MAX_CONCURRENT_REQUESTS), } } diff --git a/core/lib/config/src/testonly.rs b/core/lib/config/src/testonly.rs index 4a2858b9cbf..a106acd5a2f 100644 --- a/core/lib/config/src/testonly.rs +++ b/core/lib/config/src/testonly.rs @@ -948,6 +948,7 @@ impl Distribution for EncodeDist { max_rows_to_dispatch: self.sample(rng), max_retries: self.sample(rng), use_dummy_inclusion_data: self.sample(rng), + max_concurrent_requests: self.sample(rng), } } } diff --git a/core/lib/env_config/src/da_dispatcher.rs b/core/lib/env_config/src/da_dispatcher.rs index 246752db91a..805e6b2234b 100644 --- a/core/lib/env_config/src/da_dispatcher.rs +++ b/core/lib/env_config/src/da_dispatcher.rs @@ -21,12 +21,14 @@ mod tests { interval: u32, rows_limit: u32, max_retries: u16, + max_concurrent_requests: u32, ) -> DADispatcherConfig { DADispatcherConfig { polling_interval_ms: Some(interval), max_rows_to_dispatch: Some(rows_limit), max_retries: Some(max_retries), use_dummy_inclusion_data: Some(true), + max_concurrent_requests: Some(max_concurrent_requests), } } @@ -38,9 +40,10 @@ mod tests { DA_DISPATCHER_MAX_ROWS_TO_DISPATCH=60 DA_DISPATCHER_MAX_RETRIES=7 DA_DISPATCHER_USE_DUMMY_INCLUSION_DATA="true" + DA_DISPATCHER_MAX_CONCURRENT_REQUESTS=10 "#; lock.set_env(config); let actual = DADispatcherConfig::from_env().unwrap(); - assert_eq!(actual, expected_da_layer_config(5000, 60, 7)); + assert_eq!(actual, expected_da_layer_config(5000, 60, 7, 10)); } } diff --git a/core/lib/protobuf_config/src/da_dispatcher.rs b/core/lib/protobuf_config/src/da_dispatcher.rs index d77073bd32c..e85ff5ae76e 100644 --- a/core/lib/protobuf_config/src/da_dispatcher.rs +++ b/core/lib/protobuf_config/src/da_dispatcher.rs @@ -12,6 +12,7 @@ impl ProtoRepr for proto::DataAvailabilityDispatcher { max_rows_to_dispatch: self.max_rows_to_dispatch, max_retries: self.max_retries.map(|x| x as u16), use_dummy_inclusion_data: self.use_dummy_inclusion_data, + max_concurrent_requests: self.max_concurrent_requests, }) } @@ -21,6 +22,7 @@ impl ProtoRepr for proto::DataAvailabilityDispatcher { max_rows_to_dispatch: this.max_rows_to_dispatch, max_retries: this.max_retries.map(Into::into), use_dummy_inclusion_data: this.use_dummy_inclusion_data, + max_concurrent_requests: this.max_concurrent_requests, } } } diff --git a/core/lib/protobuf_config/src/proto/config/da_dispatcher.proto b/core/lib/protobuf_config/src/proto/config/da_dispatcher.proto index dd366bd5b92..d6329d14b28 100644 --- a/core/lib/protobuf_config/src/proto/config/da_dispatcher.proto +++ b/core/lib/protobuf_config/src/proto/config/da_dispatcher.proto @@ -7,4 +7,5 @@ message DataAvailabilityDispatcher { optional uint32 max_rows_to_dispatch = 2; optional uint32 max_retries = 3; optional bool use_dummy_inclusion_data = 4; + optional uint32 max_concurrent_requests = 5; } diff --git a/core/node/da_dispatcher/src/da_dispatcher.rs b/core/node/da_dispatcher/src/da_dispatcher.rs index a476add4a70..4a9ad49e751 100644 --- a/core/node/da_dispatcher/src/da_dispatcher.rs +++ b/core/node/da_dispatcher/src/da_dispatcher.rs @@ -1,10 +1,11 @@ -use std::{future::Future, time::Duration}; +use std::{collections::HashSet, future::Future, sync::Arc, time::Duration}; use anyhow::Context; use chrono::Utc; +use futures::future::join_all; use rand::Rng; -use tokio::sync::watch::Receiver; -use zksync_config::DADispatcherConfig; +use tokio::sync::{mpsc, watch::Receiver, Mutex, Notify}; +use zksync_config::{configs::da_dispatcher::DEFAULT_MAX_CONCURRENT_REQUESTS, DADispatcherConfig}; use zksync_da_client::{ types::{DAError, InclusionData}, DataAvailabilityClient, @@ -19,6 +20,7 @@ pub struct DataAvailabilityDispatcher { client: Box, pool: ConnectionPool, config: DADispatcherConfig, + request_semaphore: Arc, } impl DataAvailabilityDispatcher { @@ -27,178 +29,312 @@ impl DataAvailabilityDispatcher { config: DADispatcherConfig, client: Box, ) -> Self { + let request_semaphore = Arc::new(tokio::sync::Semaphore::new( + config + .max_concurrent_requests + .unwrap_or(DEFAULT_MAX_CONCURRENT_REQUESTS) as usize, + )); Self { pool, config, client, + request_semaphore, } } - pub async fn run(self, mut stop_receiver: Receiver) -> anyhow::Result<()> { - loop { - if *stop_receiver.borrow() { - break; - } + pub async fn run(self, stop_receiver: Receiver) -> anyhow::Result<()> { + let subtasks = futures::future::join( + async { + if let Err(err) = self.dispatch_batches(stop_receiver.clone()).await { + tracing::error!("dispatch error {err:?}"); + } + }, + async { + if let Err(err) = self.inclusion_poller(stop_receiver.clone()).await { + tracing::error!("poll_for_inclusion error {err:?}"); + } + }, + ); + + tokio::select! { + _ = subtasks => {}, + } + Ok(()) + } + + async fn dispatch_batches(&self, stop_receiver: Receiver) -> anyhow::Result<()> { + let (tx, mut rx) = mpsc::channel( + self.config + .max_concurrent_requests + .unwrap_or(DEFAULT_MAX_CONCURRENT_REQUESTS) as usize, + ); + + let next_expected_batch = Arc::new(Mutex::new(None)); + + let stop_receiver_clone = stop_receiver.clone(); + let pool_clone = self.pool.clone(); + let config_clone = self.config.clone(); + let next_expected_batch_clone = next_expected_batch.clone(); + let pending_blobs_reader = tokio::spawn(async move { + // Used to avoid sending the same batch multiple times + let mut pending_batches = HashSet::new(); + // let pair = cvar_pair_clone.clone(); + loop { + if *stop_receiver_clone.borrow() { + tracing::info!("Stop signal received, da_dispatcher is shutting down"); + break; + } - let subtasks = futures::future::join( - async { - if let Err(err) = self.dispatch().await { - tracing::error!("dispatch error {err:?}"); + let mut conn = pool_clone.connection_tagged("da_dispatcher").await?; + let batches = conn + .data_availability_dal() + .get_ready_for_da_dispatch_l1_batches( + config_clone.max_rows_to_dispatch() as usize + ) + .await?; + drop(conn); + for batch in batches { + if pending_batches.contains(&batch.l1_batch_number.0) { + continue; } - }, - async { - if let Err(err) = self.poll_for_inclusion().await { - tracing::error!("poll_for_inclusion error {err:?}"); + + // This should only happen once. + // We can't assume that the first batch is always 1 because the dispatcher can be restarted + // and resume from a different batch. + let mut next_expected_batch_lock = next_expected_batch_clone.lock().await; + if next_expected_batch_lock.is_none() { + next_expected_batch_lock.replace(batch.l1_batch_number); } - }, - ); - let subtasks = futures::future::join(subtasks, async { - if let Err(err) = self.update_metrics().await { - tracing::error!("update_metrics error {err:?}"); + pending_batches.insert(batch.l1_batch_number.0); + METRICS.blobs_pending_dispatch.inc_by(1); + tx.send(batch).await?; } - }); - tokio::select! { - _ = subtasks => {}, - _ = stop_receiver.changed() => { + tokio::time::sleep(Duration::from_secs(5)).await; + } + Ok::<(), anyhow::Error>(()) + }); + + let pool = self.pool.clone(); + let config = self.config.clone(); + let client = self.client.clone(); + let request_semaphore = self.request_semaphore.clone(); + let notifier = Arc::new(Notify::new()); + let pending_blobs_sender = tokio::spawn(async move { + let mut spawned_requests = vec![]; + let notifier = notifier.clone(); + loop { + if *stop_receiver.borrow() { break; } - } - if tokio::time::timeout(self.config.polling_interval(), stop_receiver.changed()) - .await - .is_ok() - { - break; - } - } + let batch = match rx.recv().await { + Some(batch) => batch, + None => continue, // Should never happen + }; - tracing::info!("Stop signal received, da_dispatcher is shutting down"); - Ok(()) - } + // Block until we can send the request + let permit = request_semaphore.clone().acquire_owned().await?; - /// Dispatches the blobs to the data availability layer, and saves the blob_id in the database. - async fn dispatch(&self) -> anyhow::Result<()> { - let mut conn = self.pool.connection_tagged("da_dispatcher").await?; - let batches = conn - .data_availability_dal() - .get_ready_for_da_dispatch_l1_batches(self.config.max_rows_to_dispatch() as usize) - .await?; - drop(conn); - - for batch in batches { - let dispatch_latency = METRICS.blob_dispatch_latency.start(); - let dispatch_response = retry(self.config.max_retries(), batch.l1_batch_number, || { - self.client - .dispatch_blob(batch.l1_batch_number.0, batch.pubdata.clone()) - }) - .await - .with_context(|| { - format!( - "failed to dispatch a blob with batch_number: {}, pubdata_len: {}", - batch.l1_batch_number, - batch.pubdata.len() - ) - })?; - let dispatch_latency_duration = dispatch_latency.observe(); - - let sent_at = Utc::now().naive_utc(); - - let mut conn = self.pool.connection_tagged("da_dispatcher").await?; - conn.data_availability_dal() - .insert_l1_batch_da( - batch.l1_batch_number, - dispatch_response.blob_id.as_str(), - sent_at, - ) - .await?; - drop(conn); - - METRICS - .last_dispatched_l1_batch - .set(batch.l1_batch_number.0 as usize); - METRICS.blob_size.observe(batch.pubdata.len()); - METRICS.blobs_dispatched.inc_by(1); - tracing::info!( - "Dispatched a DA for batch_number: {}, pubdata_size: {}, dispatch_latency: {dispatch_latency_duration:?}", - batch.l1_batch_number, - batch.pubdata.len(), - ); - } + let client = client.clone(); + let pool = pool.clone(); + let config = config.clone(); + let next_expected_batch = next_expected_batch.clone(); + let notifier = notifier.clone(); + let request = tokio::spawn(async move { + let _permit = permit; // move permit into scope + let dispatch_latency = METRICS.blob_dispatch_latency.start(); + let dispatch_response = + retry(config.max_retries(), batch.l1_batch_number, || { + client.dispatch_blob(batch.l1_batch_number.0, batch.pubdata.clone()) + }) + .await + .with_context(|| { + format!( + "failed to dispatch a blob with batch_number: {}, pubdata_len: {}", + batch.l1_batch_number, + batch.pubdata.len() + ) + })?; + let dispatch_latency_duration = dispatch_latency.observe(); + + let sent_at = Utc::now().naive_utc(); + + // Before saving the blob in the database, we need to be sure that we are doing it + // in the correct order. + while next_expected_batch + .lock() + .await + .map_or(true, |next_expected_batch| { + batch.l1_batch_number > next_expected_batch + }) + { + notifier.clone().notified().await; + } + + let mut conn = pool.connection_tagged("da_dispatcher").await?; + conn.data_availability_dal() + .insert_l1_batch_da( + batch.l1_batch_number, + dispatch_response.blob_id.as_str(), + sent_at, + ) + .await?; + drop(conn); + // Update the next expected batch number + next_expected_batch + .lock() + .await + .replace(batch.l1_batch_number + 1); + notifier.notify_waiters(); + + METRICS + .last_dispatched_l1_batch + .set(batch.l1_batch_number.0 as usize); + METRICS.blob_size.observe(batch.pubdata.len()); + METRICS.blobs_dispatched.inc_by(1); + METRICS.blobs_pending_dispatch.dec_by(1); + tracing::info!( + "Dispatched a DA for batch_number: {}, pubdata_size: {}, dispatch_latency: {dispatch_latency_duration:?}", + batch.l1_batch_number, + batch.pubdata.len(), + ); + + Ok::<(), anyhow::Error>(()) + }); + spawned_requests.push(request); + } + join_all(spawned_requests).await; + Ok::<(), anyhow::Error>(()) + }); + + let results = join_all(vec![pending_blobs_reader, pending_blobs_sender]).await; + for result in results { + result??; + } Ok(()) } - /// Polls the data availability layer for inclusion data, and saves it in the database. - async fn poll_for_inclusion(&self) -> anyhow::Result<()> { - let mut conn = self.pool.connection_tagged("da_dispatcher").await?; - let blob_info = conn - .data_availability_dal() - .get_first_da_blob_awaiting_inclusion() - .await?; - drop(conn); - - let Some(blob_info) = blob_info else { - return Ok(()); - }; - - let inclusion_data = if self.config.use_dummy_inclusion_data() { - self.client - .get_inclusion_data(blob_info.blob_id.as_str()) - .await - .with_context(|| { - format!( - "failed to get inclusion data for blob_id: {}, batch_number: {}", - blob_info.blob_id, blob_info.l1_batch_number - ) - })? - } else { - // if the inclusion verification is disabled, we don't need to wait for the inclusion - // data before committing the batch, so simply return an empty vector - Some(InclusionData { data: vec![] }) - }; - - let Some(inclusion_data) = inclusion_data else { - return Ok(()); - }; - - let mut conn = self.pool.connection_tagged("da_dispatcher").await?; - conn.data_availability_dal() - .save_l1_batch_inclusion_data( - L1BatchNumber(blob_info.l1_batch_number.0), - inclusion_data.data.as_slice(), - ) - .await?; - drop(conn); - - let inclusion_latency = Utc::now().signed_duration_since(blob_info.sent_at); - if let Ok(latency) = inclusion_latency.to_std() { - METRICS.inclusion_latency.observe(latency); - } - METRICS - .last_included_l1_batch - .set(blob_info.l1_batch_number.0 as usize); - METRICS.blobs_included.inc_by(1); - - tracing::info!( - "Received an inclusion data for a batch_number: {}, inclusion_latency_seconds: {}", - blob_info.l1_batch_number, - inclusion_latency.num_seconds() + async fn inclusion_poller(&self, stop_receiver: Receiver) -> anyhow::Result<()> { + let (tx, mut rx) = mpsc::channel( + self.config + .max_concurrent_requests + .unwrap_or(DEFAULT_MAX_CONCURRENT_REQUESTS) as usize, ); - Ok(()) - } + let stop_receiver_clone = stop_receiver.clone(); + let pool_clone = self.pool.clone(); + let pending_inclusion_reader = tokio::spawn(async move { + let mut pending_inclusions = HashSet::new(); + loop { + if *stop_receiver_clone.borrow() { + break; + } + + let mut conn = pool_clone.connection_tagged("da_dispatcher").await?; + // TODO: this query might always return the same blob if the blob is not included + // we should probably change the query to return all blobs that are not included + let blob_info = conn + .data_availability_dal() + .get_first_da_blob_awaiting_inclusion() + .await?; + drop(conn); + + let Some(blob_info) = blob_info else { + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + }; + + if pending_inclusions.contains(&blob_info.blob_id) { + continue; + } + pending_inclusions.insert(blob_info.blob_id.clone()); + tx.send(blob_info).await?; + } + Ok::<(), anyhow::Error>(()) + }); + + let pool = self.pool.clone(); + let config = self.config.clone(); + let client = self.client.clone(); + let semaphore = self.request_semaphore.clone(); + let pending_inclusion_sender = tokio::spawn(async move { + let mut spawned_requests = vec![]; + loop { + if *stop_receiver.borrow() { + break; + } + let blob_info = match rx.recv().await { + Some(blob_info) => blob_info, + None => continue, // Should never happen + }; + + // Block until we can send the request + let permit = semaphore.clone().acquire_owned().await?; + + let client = client.clone(); + let pool = pool.clone(); + let config = config.clone(); + let request = tokio::spawn(async move { + let _permit = permit; // move permit into scope + let inclusion_data = if config.use_dummy_inclusion_data() { + client + .get_inclusion_data(blob_info.blob_id.as_str()) + .await + .with_context(|| { + format!( + "failed to get inclusion data for blob_id: {}, batch_number: {}", + blob_info.blob_id, blob_info.l1_batch_number + ) + })? + } else { + // if the inclusion verification is disabled, we don't need to wait for the inclusion + // data before committing the batch, so simply return an empty vector + Some(InclusionData { data: vec![] }) + }; - async fn update_metrics(&self) -> anyhow::Result<()> { - let mut conn = self.pool.connection_tagged("da_dispatcher").await?; - let batches = conn - .data_availability_dal() - .get_ready_for_da_dispatch_l1_batches(self.config.max_rows_to_dispatch() as usize) - .await?; - drop(conn); - METRICS.blobs_pending_dispatch.set(batches.len()); + let Some(inclusion_data) = inclusion_data else { + return Ok(()); + }; + let mut conn = pool.connection_tagged("da_dispatcher").await?; + conn.data_availability_dal() + .save_l1_batch_inclusion_data( + L1BatchNumber(blob_info.l1_batch_number.0), + inclusion_data.data.as_slice(), + ) + .await?; + drop(conn); + + let inclusion_latency = Utc::now().signed_duration_since(blob_info.sent_at); + if let Ok(latency) = inclusion_latency.to_std() { + METRICS.inclusion_latency.observe(latency); + } + METRICS + .last_included_l1_batch + .set(blob_info.l1_batch_number.0 as usize); + METRICS.blobs_included.inc_by(1); + + tracing::info!( + "Received an inclusion data for a batch_number: {}, inclusion_latency_seconds: {}", + blob_info.l1_batch_number, + inclusion_latency.num_seconds() + ); + + Ok::<(), anyhow::Error>(()) + }); + spawned_requests.push(request); + } + join_all(spawned_requests).await; + Ok::<(), anyhow::Error>(()) + }); + + let results = join_all(vec![pending_inclusion_reader, pending_inclusion_sender]).await; + for result in results { + result??; + } Ok(()) } } From aa6ea7d154eed46f0e31b61b73a510d89bcd1914 Mon Sep 17 00:00:00 2001 From: juan518munoz <62400508+juan518munoz@users.noreply.github.com> Date: Mon, 7 Oct 2024 14:54:52 -0300 Subject: [PATCH 2/2] feat: Da eigen implementation docs & backup scripts (#289) * initial commit * add more steps * add backup and restore ecosystem scripts * remove unnecessary step * improve docs * fix docs * fix the fix docs * add extra step * fix restore path * simplify restoration note * more docs * fix paths in backup restoration * fix whitespace * replacement fixes * moved holesky rpc url to env var --- .gitignore | 3 ++ backup-ecosystem.sh | 51 ++++++++++++++++++ eigenda-integration.md | 118 +++++++++++++++++++++++++++++++++++++++++ restore-ecosystem.sh | 100 ++++++++++++++++++++++++++++++++++ 4 files changed, 272 insertions(+) create mode 100755 backup-ecosystem.sh create mode 100755 restore-ecosystem.sh diff --git a/.gitignore b/.gitignore index c3de7a2df84..f95274b0e36 100644 --- a/.gitignore +++ b/.gitignore @@ -118,3 +118,6 @@ configs/* era-observability/ core/tests/ts-integration/deployments-zk transactions/ + +# Ecosystem backups +ecosystem_backups/ diff --git a/backup-ecosystem.sh b/backup-ecosystem.sh new file mode 100755 index 00000000000..dbdf82e3a38 --- /dev/null +++ b/backup-ecosystem.sh @@ -0,0 +1,51 @@ +#!/bin/bash + +# Check if the ecosystem name was provided as an argument +if [ -z "$1" ]; then + echo "Usage: ./backup-ecosystem ECOSYSTEM_NAME" + exit 1 +fi + +# Store the first argument as ECOSYSTEM_NAME +ECOSYSTEM_NAME=$1 + +# Prompt for the Postgres password and store it in PGPASSWORD +read -sp "Enter Postgres password: " PGPASSWORD +export PGPASSWORD + +# Path to the secrets.yaml file +SECRETS_FILE="./chains/${ECOSYSTEM_NAME}/configs/secrets.yaml" + +# Check if the secrets.yaml file exists +if [ ! -f "$SECRETS_FILE" ]; then + echo "Error: $SECRETS_FILE does not exist." + exit 1 +fi + +# Extract server_url and prover_url from the secrets.yaml file +SERVER_DB_NAME=$(grep 'server_url' "$SECRETS_FILE" | awk -F'/' '{print $NF}') +PROVER_DB_NAME=$(grep 'prover_url' "$SECRETS_FILE" | awk -F'/' '{print $NF}') + +# Export the database names +echo "Extracted SERVER_DB_NAME: $SERVER_DB_NAME" +echo "Extracted PROVER_DB_NAME: $PROVER_DB_NAME" + +# Create backup directory +mkdir -p "./ecosystem_backups/${ECOSYSTEM_NAME}" + +# Run pg_dump commands +echo "Running pg_dump for $SERVER_DB_NAME..." +pg_dump -U postgres -h localhost "$SERVER_DB_NAME" > "ecosystem_backups/${ECOSYSTEM_NAME}/${SERVER_DB_NAME}_backup.sql" +echo "Running pg_dump for $PROVER_DB_NAME..." +pg_dump -U postgres -h localhost "$PROVER_DB_NAME" > "ecosystem_backups/${ECOSYSTEM_NAME}/${PROVER_DB_NAME}_backup.sql" + +# Unset the PGPASSWORD variable for security +unset PGPASSWORD + +# Copy the chain configuration files +cp -r "./chains/${ECOSYSTEM_NAME}" "./ecosystem_backups/${ECOSYSTEM_NAME}/" + +# Copy the configs directory +cp -r "./configs" "./ecosystem_backups/${ECOSYSTEM_NAME}/" + +echo "Backup completed." diff --git a/eigenda-integration.md b/eigenda-integration.md index 4d02fa6a824..638719e2b4d 100644 --- a/eigenda-integration.md +++ b/eigenda-integration.md @@ -71,3 +71,121 @@ zk_supervisor test integration --chain eigen_da ### Metrics Access Grafana at [http://localhost:3000/](http://localhost:3000/), go to dashboards and select `EigenDA`. + +## Holesky Setup + +### Modify localhost chain id number + +Modify line 32 in `zk_toolbox/crates/types/src/l1_network.rs`: + +```rs +L1Network::Localhost => 17000, +``` + +Then recompile the zk toolbox: + +```bash +./bin/zkt +``` + +### Used wallets + +Modify `etc/env/file_based/wallets.yaml` and `configs/wallets.yaml` with the following wallets: + +```yaml +# Use your own holesky wallets, be sure they have enough funds +``` + +> ⚠️ Some steps distribute ~5000ETH to some wallets, modify `AMOUNT_FOR_DISTRIBUTION_TO_WALLETS` to a lower value if needed. + +### EigenProxy RPC + +Get `EIGEN_SIGNER_PK` from 1password and set it as an `env` var: + +```bash +export EIGEN_SIGNER_PK= +export HOLESKY_RPC_URL= +``` + +Modify `docker-compose.yml` to use holesky RPCs: + +```rust + eigenda-proxy: + image: ghcr.io/layr-labs/eigenda-proxy + environment: + - EIGEN_SIGNER_PK=$EIGEN_SIGNER_PK + - HOLESKY_RPC_URL=$HOLESKY_RPC_URL + ports: + - "4242:4242" + command: ./eigenda-proxy --addr 0.0.0.0 --port 4242 --eigenda-disperser-rpc disperser-holesky.eigenda.xyz:443 --eigenda-signer-private-key-hex $EIGEN_SIGNER_PK --eigenda-eth-rpc $HOLESKY_RPC_URL --eigenda-svc-manager-addr 0xD4A7E1Bd8015057293f0D0A557088c286942e84b --eigenda-eth-confirmation-depth 0 +``` + +### Create and initialize the ecosystem + +(be sure to have postgres container running on the background) + +```bash +zk_inception chain create \ + --chain-name holesky_eigen_da \ + --chain-id 114411 \ + --prover-mode no-proofs \ + --wallet-creation localhost \ + --l1-batch-commit-data-generator-mode validium \ + --base-token-address 0x0000000000000000000000000000000000000001 \ + --base-token-price-nominator 1 \ + --base-token-price-denominator 1 \ + --set-as-default false + +zk_inception ecosystem init \ + --deploy-paymaster true \ + --deploy-erc20 true \ + --deploy-ecosystem true \ + --l1-rpc-url $HOLESKY_RPC_URL \ + --server-db-url=postgres://postgres:notsecurepassword@localhost:5432 \ + --server-db-name=zksync_server_holesky_eigen_da \ + --prover-db-url=postgres://postgres:notsecurepassword@localhost:5432 \ + --prover-db-name=zksync_prover_holesky_eigen_da \ + --chain holesky_eigen_da \ + --verbose +``` + +### Start the server + +```bash +zk_inception server --chain holesky_eigen_da +``` + +## Backup and restoration + +It's possible to run the zk stack on one computer, and then migrate it to another, this is specially useful for holesky testing. + +### Backup + +Suppose that you want to make a backup of `holesky_eigen_da` ecosystem, you only need to run: + +```bash +./backup-ecosystem.sh holesky_eigen_da +``` + +This will generate a directory inside of `ecosystem_backups` with the name `holesky_eigen_da`. + +### Restoration + +1. Move the `ecoystem_backups/holesky_eigen_da` directory to the other computer, it should be placed in the root of the project. + +2. Restore the ecosystem with: + +```bash +./restore-ecosystem.sh holesky_eigen_da +``` + +Note that: + +- The `postgres` container has to be running. +- The `chain_id` can't be already in use. +- If you are restoring a local ecosystem, you have to use the same `reth` container as before. +- If no ecosystem has been `init`ialized on this computer before, run this command: + +```bash +git submodule update --init --recursive && zk_supervisor contracts +``` diff --git a/restore-ecosystem.sh b/restore-ecosystem.sh new file mode 100755 index 00000000000..7fbff3e9802 --- /dev/null +++ b/restore-ecosystem.sh @@ -0,0 +1,100 @@ +#!/bin/bash + +# Check if the ecosystem name was provided as an argument +if [ -z "$1" ]; then + echo "Usage: ./restore-ecosystem ECOSYSTEM_NAME" + exit 1 +fi + +# Store the first argument as ECOSYSTEM_NAME +ECOSYSTEM_NAME=$1 + +# Prompt for the Postgres password and store it in PGPASSWORD +read -sp "Enter Postgres password: " PGPASSWORD +export PGPASSWORD + +# Check if the chain directory exists +CHAIN_PATH="./chains/${ECOSYSTEM_NAME}" +BACKUP_PATH="./ecosystem_backups/${ECOSYSTEM_NAME}" + +# Check if the backup directory exists +if [ ! -d "$BACKUP_PATH" ]; then + echo "Error: Backup not found at $BACKUP_PATH." + exit 1 +fi + +# Check if the postgres is running +if ! docker ps --filter "name=postgres" --filter "status=running" | grep "postgres" > /dev/null; then + echo "Error: postgres not running, set it up first with 'zk_inception containers'." + exit 1 +fi + +# Fix backup files $ZKSYNC_HOME paths +find_and_replace() { + local target_file=$1 + + sed -i '' -e "s|db_path:.*zksync-era/\./|db_path: $(pwd)/./|g" "$target_file" + sed -i '' -e "s|state_keeper_db_path:.*zksync-era/\./|state_keeper_db_path: $(pwd)/./|g" "$target_file" + sed -i '' -e "s|path:.*zksync-era/\./|path: $(pwd)/./|g" "$target_file" + sed -i '' -e "s|configs:.*zksync-era/\./|configs: $(pwd)/./|g" "$target_file" +} + +# Array of specific files to modify +files=("$BACKUP_PATH/$ECOSYSTEM_NAME/configs/general.yaml" "$BACKUP_PATH/$ECOSYSTEM_NAME/ZkStack.yaml") + +# Loop over the files and perform the find and replace +for file in "${files[@]}"; do + if [ -f "$file" ]; then + find_and_replace "$file" + else + # Exit with error code + echo "ERROR: backup file $file does not exist." + exit 1 + fi +done + +# Copy the ecosystem backup folder to the chains folder, replacing any existing files +echo "Copying backup files to $CHAIN_PATH..." +cp -r "$BACKUP_PATH/$ECOSYSTEM_NAME" "$CHAIN_PATH" + +# Copy the configs folder in the backup to the configs folder in the root of the project +# TODO: it may be suitable to warn the user about overwriting the existing configs +# and ask for confirmation before proceeding +echo "Copying configs folder from backup..." +cp -r "$BACKUP_PATH/configs" "./" + +# Path to the secrets.yaml file +SECRETS_FILE="$CHAIN_PATH/configs/secrets.yaml" + +# Check if the secrets.yaml file exists +if [ ! -f "$SECRETS_FILE" ]; then + echo "Error: $SECRETS_FILE does not exist." + exit 1 +fi + +# Extract server_url and prover_url from the secrets.yaml file +SERVER_DB_NAME=$(grep 'server_url' "$SECRETS_FILE" | awk -F'/' '{print $NF}') +PROVER_DB_NAME=$(grep 'prover_url' "$SECRETS_FILE" | awk -F'/' '{print $NF}') + +# Show the extracted database names +echo "Extracted SERVER_DB_NAME: $SERVER_DB_NAME" +echo "Extracted PROVER_DB_NAME: $PROVER_DB_NAME" + +# Create and restore the server database +echo "Creating database $SERVER_DB_NAME..." +createdb -U postgres -h localhost "$SERVER_DB_NAME" + +echo "Restoring $SERVER_DB_NAME from backup..." +psql -U postgres -h localhost -d "$SERVER_DB_NAME" -f "$BACKUP_PATH/${SERVER_DB_NAME}_backup.sql" + +# Create and restore the prover database +echo "Creating database $PROVER_DB_NAME..." +createdb -U postgres -h localhost "$PROVER_DB_NAME" + +echo "Restoring $PROVER_DB_NAME from backup..." +psql -U postgres -h localhost -d "$PROVER_DB_NAME" -f "$BACKUP_PATH/${PROVER_DB_NAME}_backup.sql" + +# Unset the PGPASSWORD variable for security +unset PGPASSWORD + +echo "Restore completed."