From e069a3389db324d82b0532f4fc2761b49796279d Mon Sep 17 00:00:00 2001 From: Hubert Date: Tue, 7 Nov 2023 15:58:09 +0100 Subject: [PATCH] Revert "[Feature] Mark-and-sweep GC. (#3504)" (#3682) --- .config/forest.dic | 2 - CHANGELOG.md | 4 - Cargo.lock | 29 +- Cargo.toml | 5 +- scripts/linters/find_unused_deps.rb | 2 +- scripts/tests/calibnet_other_check.sh | 5 + src/blocks/mod.rs | 1 + src/blocks/persistence.rs | 37 +++ src/cli/main.rs | 2 +- src/cli/subcommands/db_cmd.rs | 52 +++- src/cli_shared/cli/client.rs | 4 + src/cli_shared/cli/mod.rs | 10 +- src/daemon/main.rs | 2 + src/daemon/mod.rs | 55 ++-- src/db/gc/mod.rs | 433 -------------------------- src/db/memory.rs | 26 +- src/db/migration/migration_map.rs | 4 +- src/db/migration/mod.rs | 1 - src/db/migration/v0_14_0.rs | 192 ------------ src/db/mod.rs | 46 +-- src/db/parity_db.rs | 136 +------- src/db/rolling/gc.rs | 295 ++++++++++++++++++ src/db/rolling/impls.rs | 344 ++++++++++++++++++++ src/db/rolling/mod.rs | 52 ++++ src/ipld/util.rs | 106 ++++++- src/rpc/db_api.rs | 15 + src/rpc/mod.rs | 15 +- src/rpc/progress_api.rs | 24 ++ src/rpc/sync_api.rs | 2 + src/rpc_api/data_types.rs | 1 + src/rpc_api/mod.rs | 27 ++ src/rpc_client/db_ops.rs | 16 + src/rpc_client/mod.rs | 2 + src/tool/subcommands/benchmark_cmd.rs | 1 - src/utils/db/file_backed_obj.rs | 138 ++++++++ src/utils/db/mod.rs | 48 ++- src/utils/io/mod.rs | 2 + src/utils/io/progress_bar.rs | 122 ++++++++ src/utils/io/progress_log.rs | 22 +- 39 files changed, 1386 insertions(+), 894 deletions(-) create mode 100644 src/blocks/persistence.rs delete mode 100644 src/db/gc/mod.rs delete mode 100644 src/db/migration/v0_14_0.rs create mode 100644 src/db/rolling/gc.rs create mode 100644 src/db/rolling/impls.rs create mode 100644 src/db/rolling/mod.rs create mode 100644 src/rpc/db_api.rs create mode 100644 src/rpc/progress_api.rs create mode 100644 src/rpc_client/db_ops.rs create mode 100644 src/utils/db/file_backed_obj.rs create mode 100644 src/utils/io/progress_bar.rs diff --git a/.config/forest.dic b/.config/forest.dic index 2afdc5e61f3..1ad67edd46a 100644 --- a/.config/forest.dic +++ b/.config/forest.dic @@ -99,5 +99,3 @@ VRF WebAssembly WebSocket zstd -ParityDB -benchmark/GD diff --git a/CHANGELOG.md b/CHANGELOG.md index 6941472771c..46d6a32293c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,10 +31,6 @@ ### Changed -- [#3072](https://github.com/ChainSafe/forest/issues/3072) Implemented - mark-and-sweep GC, removing GC progress reports along with the corresponding - RPC endpoint. - ### Removed ### Fixed diff --git a/Cargo.lock b/Cargo.lock index 8013b937cad..68459037360 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3010,7 +3010,7 @@ checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" [[package]] name = "forest-filecoin" -version = "0.16.0" +version = "0.15.2" dependencies = [ "ahash", "anes", @@ -3125,6 +3125,7 @@ dependencies = [ "parity-db", "parking_lot", "pathfinding", + "pbr", "pin-project-lite", "positioned-io", "predicates", @@ -3135,7 +3136,6 @@ dependencies = [ "protobuf 3.3.0", "protobuf-codegen", "quickcheck", - "quickcheck_async", "quickcheck_macros", "ra_ap_syntax", "rand", @@ -3186,6 +3186,7 @@ dependencies = [ "tracing-subscriber", "unsigned-varint 0.7.2", "url", + "uuid", "walkdir", "zerocopy", "zstd", @@ -6139,6 +6140,17 @@ dependencies = [ "thiserror", ] +[[package]] +name = "pbr" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed5827dfa0d69b6c92493d6c38e633bbaa5937c153d0d7c28bf12313f8c6d514" +dependencies = [ + "crossbeam-channel", + "libc", + "winapi", +] + [[package]] name = "pem" version = "1.1.1" @@ -6761,16 +6773,6 @@ dependencies = [ "rand", ] -[[package]] -name = "quickcheck_async" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "247df671941313a4e255a5015772917368f1b21bfedfbd89d68fbb27e802b2fa" -dependencies = [ - "quote", - "syn 1.0.109", -] - [[package]] name = "quickcheck_macros" version = "1.0.0" @@ -9124,6 +9126,9 @@ name = "uuid" version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "88ad59a7560b41a70d191093a945f0b87bc1deeda46fb237479708a1d6b6cdfc" +dependencies = [ + "getrandom", +] [[package]] name = "valuable" diff --git a/Cargo.toml b/Cargo.toml index dfed1dc3b9d..192db24d7e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "forest-filecoin" -version = "0.16.0" +version = "0.15.2" authors = ["ChainSafe Systems "] repository = "https://github.com/ChainSafe/forest" edition = "2021" @@ -129,6 +129,7 @@ once_cell = "1.15" parity-db = { version = "0.4.6", default-features = false } parking_lot = { version = "0.12", features = ["deadlock_detection"] } pathfinding = "4.3.1" +pbr = "1.1" pin-project-lite = "0.2" positioned-io = "0.3.2" pretty_assertions = "1.3.0" @@ -183,6 +184,7 @@ tracing-loki = { version = "0.2", default-features = false, features = ["compat- tracing-subscriber = { version = "0.3", features = ["env-filter"] } unsigned-varint = { version = "0.7", features = ["codec"] } url = { version = "2.3", features = ["serde"] } +uuid = { version = "1.3", features = ['v4'] } walkdir = "2" zerocopy = { version = "0.7.9", features = ["derive"] } zstd = "0.13" @@ -207,7 +209,6 @@ num-bigint = { version = "0.4", features = ['quickcheck'] } predicates = "3.0" proc-macro2 = { version = "1.0.68", default-features = false, features = ["span-locations"] } quickcheck = "1" -quickcheck_async = "0.1.1" quickcheck_macros = "1" ra_ap_syntax = "0.0.183" regex-automata = "0.4" diff --git a/scripts/linters/find_unused_deps.rb b/scripts/linters/find_unused_deps.rb index ed5a42fe226..c6f3faa2076 100644 --- a/scripts/linters/find_unused_deps.rb +++ b/scripts/linters/find_unused_deps.rb @@ -38,7 +38,7 @@ def excluded?(crates, crate) crates.each do |crate| pattern = get_pattern(crate) unless source_code.any? { |line| line.match?(pattern) } || excluded?(crates, crate) - puts "Potentially unused: #{crate} in #{crate_dir}" + puts "Protentially unused: #{crate} in #{crate_dir}" exit_code = 1 end end diff --git a/scripts/tests/calibnet_other_check.sh b/scripts/tests/calibnet_other_check.sh index 3a21ed4770d..6282de62ca0 100755 --- a/scripts/tests/calibnet_other_check.sh +++ b/scripts/tests/calibnet_other_check.sh @@ -13,6 +13,11 @@ forest_init echo "Verifying the non calibnet snapshot (./test-snapshots/chain4.car) is being served properly." $FOREST_CLI_PATH chain read-obj -c bafy2bzacedjrqan2fwfvhfopi64yickki7miiksecglpeiavf7xueytnzevlu +echo "Running database garbage collection" +forest_check_db_stats +$FOREST_CLI_PATH db gc +forest_check_db_stats + echo "Testing js console" $FOREST_CLI_PATH attach --exec 'showPeers()' diff --git a/src/blocks/mod.rs b/src/blocks/mod.rs index d389223fc9d..21b9df28c57 100644 --- a/src/blocks/mod.rs +++ b/src/blocks/mod.rs @@ -6,6 +6,7 @@ pub mod election_proof; mod errors; pub mod gossip_block; pub mod header; +pub mod persistence; pub mod ticket; pub mod tipset; mod vrf_proof; diff --git a/src/blocks/persistence.rs b/src/blocks/persistence.rs new file mode 100644 index 00000000000..e811410a7a6 --- /dev/null +++ b/src/blocks/persistence.rs @@ -0,0 +1,37 @@ +// Copyright 2019-2023 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use crate::utils::{db::file_backed_obj::FileBackedObject, encoding::from_slice_with_fallback}; +use fvm_ipld_encoding::to_vec; + +use crate::blocks::*; + +impl FileBackedObject for TipsetKeys { + fn serialize(&self) -> anyhow::Result> { + Ok(to_vec(self)?) + } + + fn deserialize(bytes: &[u8]) -> anyhow::Result { + from_slice_with_fallback(bytes) + } +} + +#[cfg(test)] +mod tests { + use std::path::Path; + + use crate::utils::db::file_backed_obj::FileBacked; + + use super::*; + + #[test] + fn tipset_keys_round_trip() { + let path = Path::new("src/blocks/tests/calibnet/HEAD"); + let obj1: FileBacked = + FileBacked::load_from_file_or_create(path.into(), Default::default).unwrap(); + let serialized = obj1.inner().serialize().unwrap(); + let deserialized = TipsetKeys::deserialize(&serialized).unwrap(); + + assert_eq!(obj1.inner(), &deserialized); + } +} diff --git a/src/cli/main.rs b/src/cli/main.rs index 20c7670d5e0..0657f7d060f 100644 --- a/src/cli/main.rs +++ b/src/cli/main.rs @@ -43,7 +43,7 @@ where Subcommand::Config(cmd) => cmd.run(&mut std::io::stdout()), Subcommand::Send(cmd) => cmd.run(api).await, Subcommand::Info(cmd) => cmd.run(api).await, - Subcommand::DB(cmd) => cmd.run().await, + Subcommand::DB(cmd) => cmd.run(api).await, Subcommand::Snapshot(cmd) => cmd.run(api).await, Subcommand::Attach(cmd) => cmd.run(api), Subcommand::Shutdown(cmd) => cmd.run(api).await, diff --git a/src/cli/subcommands/db_cmd.rs b/src/cli/subcommands/db_cmd.rs index d939f738d77..537d3c29fc7 100644 --- a/src/cli/subcommands/db_cmd.rs +++ b/src/cli/subcommands/db_cmd.rs @@ -1,19 +1,63 @@ // Copyright 2019-2023 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +use std::sync::Arc; + +use crate::rpc_api::progress_api::GetProgressType; +use crate::rpc_client::ApiInfo; +use crate::utils::io::ProgressBar; +use chrono::Utc; use clap::Subcommand; #[derive(Debug, Subcommand)] pub enum DBCommands { - // This is a noop as the manual GC is no longer available. - #[command(hide = true)] + /// Run DB garbage collection GC, } impl DBCommands { - pub async fn run(self) -> anyhow::Result<()> { + pub async fn run(self, api: ApiInfo) -> anyhow::Result<()> { match self { - Self::GC => anyhow::bail!("manual garbage collection has been deprecated"), + Self::GC => { + let start = Utc::now(); + + let bar = Arc::new(tokio::sync::Mutex::new({ + let bar = ProgressBar::new(0); + bar.message("Running database garbage collection | blocks "); + bar + })); + tokio::spawn({ + let bar = bar.clone(); + let api = api.clone(); + async move { + let mut interval = + tokio::time::interval(tokio::time::Duration::from_secs(1)); + loop { + interval.tick().await; + if let Ok((progress, total)) = api + .get_progress(GetProgressType::DatabaseGarbageCollection) + .await + { + let bar = bar.lock().await; + if bar.is_finish() { + break; + } + bar.set_total(total); + bar.set(progress); + } + } + } + }); + + api.db_gc().await?; + + bar.lock().await.finish_println(&format!( + "Database garbage collection completed. took {}s", + (Utc::now() - start).num_seconds() + )); + + Ok(()) + } } } } diff --git a/src/cli_shared/cli/client.rs b/src/cli_shared/cli/client.rs index 2595b871143..e5c3d6ba9f2 100644 --- a/src/cli_shared/cli/client.rs +++ b/src/cli_shared/cli/client.rs @@ -8,6 +8,7 @@ use std::{ }; use crate::rpc_client::DEFAULT_PORT; +use crate::utils::io::ProgressBarVisibility; use chrono::Duration; use directories::ProjectDirs; use serde::{Deserialize, Serialize}; @@ -71,6 +72,8 @@ pub struct Client { |g| Duration::milliseconds(i64::arbitrary(g)) )))] pub token_exp: Duration, + /// Display progress bars mode. Auto will display if TTY. + pub show_progress_bars: ProgressBarVisibility, /// Load actors from the bundle file (possibly generating it if it doesn't exist) pub load_actors: bool, } @@ -96,6 +99,7 @@ impl Default for Client { metrics_address: FromStr::from_str("0.0.0.0:6116").unwrap(), rpc_address: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), DEFAULT_PORT), token_exp: Duration::seconds(5184000), // 60 Days = 5184000 Seconds + show_progress_bars: Default::default(), load_actors: true, } } diff --git a/src/cli_shared/cli/mod.rs b/src/cli_shared/cli/mod.rs index cb2e3e5ac68..133c53a2fca 100644 --- a/src/cli_shared/cli/mod.rs +++ b/src/cli_shared/cli/mod.rs @@ -11,7 +11,7 @@ use std::{ use crate::networks::NetworkChain; use crate::utils::{ - io::{read_file_to_string, read_toml}, + io::{read_file_to_string, read_toml, ProgressBarVisibility}, misc::LoggingColor, }; use ahash::HashSet; @@ -118,6 +118,10 @@ pub struct CliOpts { /// Enable or disable colored logging in `stdout` #[arg(long, default_value = "auto")] pub color: LoggingColor, + /// Display progress bars mode [always, never, auto]. Auto will display if + /// TTY. + #[arg(long)] + pub show_progress_bars: Option, /// Turn on tokio-console support for debugging #[arg(long)] pub tokio_console: bool, @@ -219,6 +223,10 @@ impl CliOpts { cfg.client.skip_load = skip_load; } + if let Some(show_progress_bars) = self.show_progress_bars { + cfg.client.show_progress_bars = show_progress_bars; + } + cfg.network.kademlia = self.kademlia.unwrap_or(cfg.network.kademlia); cfg.network.mdns = self.mdns.unwrap_or(cfg.network.mdns); if let Some(target_peer_count) = self.target_peer_count { diff --git a/src/daemon/main.rs b/src/daemon/main.rs index 9070e24967d..9192bbd8444 100644 --- a/src/daemon/main.rs +++ b/src/daemon/main.rs @@ -7,6 +7,7 @@ use crate::cli_shared::{ logger, }; use crate::daemon::ipc_shmem_conf; +use crate::utils::io::ProgressBar; use crate::utils::version::FOREST_VERSION_STRING; use anyhow::Context as _; use clap::Parser; @@ -88,6 +89,7 @@ where // subcommand. let (loki_task, _chrome_flush_guard) = logger::setup_logger(&opts); + ProgressBar::set_progress_bars_visibility(cfg.client.show_progress_bars); if let Some(path) = &path { match path { diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 041d1e4b73b..2960ac607b7 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -17,8 +17,8 @@ use crate::cli_shared::{ use crate::daemon::db_util::{import_chain_as_forest_car, load_all_forest_cars}; use crate::db::car::ManyCar; -use crate::db::db_engine::{db_root, open_db}; -use crate::db::MarkAndSweep; +use crate::db::db_engine::{db_root, open_proxy_db}; +use crate::db::rolling::DbGarbageCollector; use crate::genesis::{get_network_name_from_genesis, read_genesis_header}; use crate::key_management::{ KeyStore, KeyStoreConfig, ENCRYPTED_KEYSTORE_NAME, FOREST_KEYSTORE_PHRASE_ENV, @@ -45,8 +45,7 @@ use once_cell::sync::Lazy; use raw_sync_2::events::{Event, EventInit as _, EventState}; use shared_memory::ShmemConf; use std::path::Path; -use std::time::Duration; -use std::{cell::RefCell, cmp, net::TcpListener, path::PathBuf, sync::Arc}; +use std::{cell::RefCell, net::TcpListener, path::PathBuf, sync::Arc}; use tempfile::{Builder, TempPath}; use tokio::{ signal::{ @@ -132,9 +131,6 @@ pub async fn start_interruptable(opts: CliOpts, config: Config) -> anyhow::Resul result } -// Garbage collection interval, currently set at 10 hours. -const GC_INTERVAL: Duration = Duration::from_secs(60 * 60 * 10); - /// Starts daemon process pub(super) async fn start( opts: CliOpts, @@ -176,8 +172,10 @@ pub(super) async fn start( } let db_root_dir = db_root(&chain_data_path)?; - let db_writer = Arc::new(open_db(db_root_dir.clone(), config.db_config().clone())?); - let db = Arc::new(ManyCar::new(db_writer.clone())); + let db = Arc::new(ManyCar::new(Arc::new(open_proxy_db( + db_root_dir.clone(), + config.db_config().clone(), + )?))); let forest_car_db_dir = db_root_dir.join("car_db"); load_all_forest_cars(&db, &forest_car_db_dir)?; @@ -231,25 +229,28 @@ pub(super) async fn start( genesis_header.clone(), )?); - if !opts.no_gc { - let mut db_garbage_collector = { - let chain_store = chain_store.clone(); - let depth = cmp::max( - chain_config.policy.chain_finality * 2, - config.sync.recent_state_roots, - ); - - let get_heaviest_tipset = Box::new(move || chain_store.heaviest_tipset()); + let db_garbage_collector = { + let db = db.clone(); + let chain_store = chain_store.clone(); + let get_tipset = move || chain_store.heaviest_tipset().as_ref().clone(); + Arc::new(DbGarbageCollector::new( + db, + chain_config.policy.chain_finality, + config.sync.recent_state_roots, + get_tipset, + )) + }; - MarkAndSweep::new( - db_writer, - get_heaviest_tipset, - depth, - Duration::from_secs(chain_config.block_delay_secs as u64), - ) - }; - services.spawn(async move { db_garbage_collector.gc_loop(GC_INTERVAL).await }); + if !opts.no_gc { + services.spawn({ + let db_garbage_collector = db_garbage_collector.clone(); + async move { db_garbage_collector.collect_loop_passive().await } + }); } + services.spawn({ + let db_garbage_collector = db_garbage_collector.clone(); + async move { db_garbage_collector.collect_loop_event().await } + }); let publisher = chain_store.publisher(); @@ -346,6 +347,7 @@ pub(super) async fn start( let rpc_state_manager = Arc::clone(&state_manager); let rpc_chain_store = Arc::clone(&chain_store); + let gc_event_tx = db_garbage_collector.get_tx(); services.spawn(async move { info!("JSON-RPC endpoint started at {}", config.client.rpc_address); let beacon = Arc::new( @@ -365,6 +367,7 @@ pub(super) async fn start( start_time, beacon, chain_store: rpc_chain_store, + gc_event_tx, }), rpc_listen, FOREST_VERSION_STRING.as_str(), diff --git a/src/db/gc/mod.rs b/src/db/gc/mod.rs deleted file mode 100644 index ec7f39586ab..00000000000 --- a/src/db/gc/mod.rs +++ /dev/null @@ -1,433 +0,0 @@ -// Copyright 2019-2023 ChainSafe Systems -// SPDX-License-Identifier: Apache-2.0, MIT - -//! -//! The current implementation of the garbage collector is `concurrent mark-and-sweep`. -//! -//! ## Terminology -//! `chain finality` - a number of epochs after which it becomes impossible to add or remove a block -//! previously appended to the blockchain. -//! -//! ## Design goals -//! A correct GC algorithm that is simple and efficient for forest scenarios. This algorithm removes -//! unreachable blocks that are older than `chain finality`, making sure to avoid removing something -//! that could later become reachable as a result of a fork. -//! -//! Properties: -//! -//! - No `BlockHeader` reachable from HEAD may be garbage collected. -//! - No data younger than `chain finality` epochs may be garbage collected. -//! - State-trees older than `depth` epochs should be garbage collected. -//! - Not all unreachable data has to be garbage collected. In other words, it's -//! acceptable for the garbage collector to be conservative. -//! - The garbage collector may not prevent access to the database. -//! -//! ## GC Algorithm -//! The `mark-and-sweep` algorithm was chosen due to it's simplicity, efficiency and low memory -//! footprint. -//! -//! ## GC Workflow -//! 1. Mark: traverse all the blocks, generating integer hash representations for each identifier -//! and storing those in a set. -//! 2. Wait at least `chain finality` blocks. -//! 3. Traverse reachable blocks starting at the current heaviest tipset and remove those from the -//! marked set, leaving only unreachable entries that are older than `chain finality`. -//! 4. Sweep, removing all the remaining marked entries from the database. -//! -//! ## Correctness -//! This algorithm considers all the blocks that are visited during the `snapshot export` task -//! reachable, making sure they are kept in the database after the run. It makes sure to retain the -//! reachable graph as well as all the blocks for at least `chain finality` to account for potential -//! forks. A snapshot can be used to bootstrap the node from scratch, thus the algorithm is -//! considered correct when a valid snapshot can be exported using records available in the database -//! after the run. -//! -//! ## Disk usage -//! The expected disk usage is slightly greater than the size of live data for three reasons: -//! 1. Unreachable data is not removed until it is at least 7.5 hours old (see `chain finality`). -//! 2. The garbage collector is conservative and is expected to leave a small (less than 1%) amount -//! of unreachable data behind. -//! 3. The blockstore back-end may be fragmented, therefore not relinquishing the disk space back to -//! the OS. -//! -//! ## Memory usage -//! During the `mark` and up to the `sweep` stage, the algorithm requires `4 bytes` of memory for -//! each database record. Additionally, the seen cache while traversing the reachable graph -//! executing the `filter` stage requires at least `32 bytes` of memory for each reachable block. -//! For a typical mainnet snapshot of about 100 GiB that adds up to roughly 2.5 GiB. -//! -//! ## Scheduling -//! 1. GC is triggered automatically and there have to be at least `chain finality` epochs stored -//! for the `mark` step. -//! 2. The `filter` step is triggered after at least `chain finality` has passed since the `mark` -//! step. -//! 3. Then, the `sweep` step happens. -//! 4. Finally, the algorithm waits for a configured amount of time to initiate the next run. -//! -//! ## Performance -//! The time complexity of mark and sweep steps is `O(n)`. The filter step is currently utilizing a -//! depth-first search algorithm, with `O(V+E)` complexity, where V is the number of vertices and E -//! is the number of edges. - -use crate::blocks::Tipset; -use crate::chain::ChainEpochDelta; - -use crate::db::{truncated_hash, GarbageCollectable}; -use crate::ipld::stream_graph; -use crate::shim::clock::ChainEpoch; -use ahash::{HashSet, HashSetExt}; -use futures::StreamExt; -use fvm_ipld_blockstore::Blockstore; -use std::mem; -use std::sync::Arc; -use std::time::Duration; -use tokio::time; -use tracing::info; - -/// [`MarkAndSweep`] is a simple garbage collector implementation that traverses all the database -/// keys writing them to a [`HashSet`], then filters out those that need to be kept and schedules -/// the rest for removal. -/// -/// Note: The GC does not know anything about the hybrid CAR-backed and ParityDB approach, only -/// taking care of the latter. -pub struct MarkAndSweep { - db: Arc, - get_heaviest_tipset: Box Arc + Send>, - marked: HashSet, - epoch_marked: ChainEpoch, - depth: ChainEpochDelta, - block_time: Duration, -} - -impl MarkAndSweep { - /// Creates a new mark-and-sweep garbage collector. - /// - /// # Arguments - /// - /// * `db` - A reference to the database instance. - /// * `get_heaviest_tipset` - A function that facilitates heaviest tipset retrieval. - /// * `depth` - The number of state-roots to retain. Should be at least `2 * chain finality`. - /// * `block_time` - An average block production time. - pub fn new( - db: Arc, - get_heaviest_tipset: Box Arc + Send>, - depth: ChainEpochDelta, - block_time: Duration, - ) -> Self { - Self { - db, - get_heaviest_tipset, - depth, - marked: HashSet::new(), - epoch_marked: 0, - block_time, - } - } - // Populate the initial set with all the available database keys. - fn populate(&mut self) -> anyhow::Result<()> { - self.marked = self.db.get_keys()?; - Ok(()) - } - - // Filter out the initial set, leaving only the entries that need to be removed. - // NOTE: One concern here is that this is going to consume a lot of CPU. - async fn filter(&mut self, tipset: Arc, depth: ChainEpochDelta) -> anyhow::Result<()> { - // NOTE: We want to keep all the block headers from genesis to heaviest tipset epoch. - let mut stream = stream_graph(&self.db, (*tipset).clone().chain(&self.db), depth); - - while let Some(block) = stream.next().await { - let block = block?; - self.marked.remove(&truncated_hash(block.cid.hash())); - } - - anyhow::Ok(()) - } - - // Remove marked keys from the database. - fn sweep(&mut self) -> anyhow::Result<()> { - let marked = mem::take(&mut self.marked); - self.db.remove_keys(marked) - } - - /// Starts the Garbage Collection loop. - /// - /// # Arguments - /// - /// * `interval` - GC Interval to avoid constantly consuming node's resources. - /// - /// NOTE: This currently does not take into account the fact that we might be starting the node - /// using CAR-backed storage with a snapshot, for implementation simplicity. - pub async fn gc_loop(&mut self, interval: Duration) -> anyhow::Result<()> { - loop { - self.gc_workflow(interval).await? - } - } - - // This function yields to the main GC loop if the conditions are not met for execution of the - // next step. - async fn gc_workflow(&mut self, interval: Duration) -> anyhow::Result<()> { - let depth = self.depth; - let tipset = (self.get_heaviest_tipset)(); - let current_epoch = tipset.epoch(); - // Don't run the GC if there aren't enough state-roots yet. Sleep and yield to the main loop - // in order to refresh the heaviest tipset value. - if depth > current_epoch { - time::sleep(interval).await; - return anyhow::Ok(()); - } - - // This signifies a new run. - if self.marked.is_empty() { - // Make sure we don't run the GC too often. - time::sleep(interval).await; - - info!("populate keys for GC"); - self.populate()?; - self.epoch_marked = current_epoch; - } - - let epochs_since_marked = current_epoch - self.epoch_marked; - // Don't proceed with next steps until we advance at least `depth` epochs. Sleep and yield - // to the main loop in order to refresh the heaviest tipset value. - if epochs_since_marked < depth { - time::sleep(self.block_time * (depth - epochs_since_marked) as u32).await; - return anyhow::Ok(()); - } - - info!("filter keys for GC"); - self.filter(tipset, depth).await?; - - info!("GC sweep"); - self.sweep()?; - - anyhow::Ok(()) - } -} -#[cfg(test)] -mod test { - use crate::blocks::{BlockHeader, Tipset}; - use crate::chain::{ChainEpochDelta, ChainStore}; - - use crate::db::{GarbageCollectable, MarkAndSweep, MemoryDB}; - use crate::message_pool::test_provider::{mock_block, mock_block_with_parents}; - use crate::networks::ChainConfig; - - use crate::utils::db::CborStoreExt; - - use core::time::Duration; - - use crate::shim::clock::ChainEpoch; - use fvm_ipld_blockstore::Blockstore; - use std::sync::Arc; - - const ZERO_DURATION: Duration = Duration::from_secs(0); - - fn insert_unreachable(db: impl Blockstore, quantity: u64) { - for idx in 0..quantity { - let block: BlockHeader = mock_block(1 + idx, 1 + quantity); - db.put_cbor_default(&block).unwrap(); - } - } - - fn run_to_epoch(db: impl Blockstore, cs: &ChainStore, epoch: ChainEpoch) { - let mut heaviest_tipset = cs.heaviest_tipset(); - - for _ in heaviest_tipset.epoch()..epoch { - let block2 = mock_block_with_parents(heaviest_tipset.as_ref(), 1, 1); - db.put_cbor_default(&block2).unwrap(); - - let tipset = Arc::new(Tipset::from(&block2)); - cs.set_heaviest_tipset(tipset).unwrap(); - heaviest_tipset = cs.heaviest_tipset(); - } - } - - struct GCTester { - db: Arc, - store: Arc>, - } - - impl GCTester { - fn new() -> Self { - let db = Arc::new(MemoryDB::default()); - let config = ChainConfig::default(); - let gen_block: BlockHeader = mock_block(1, 1); - db.put_cbor_default(&gen_block).unwrap(); - let store = Arc::new( - ChainStore::new(db.clone(), db.clone(), Arc::new(config), gen_block).unwrap(), - ); - - GCTester { db, store } - } - - fn run_epochs(&self, delta: ChainEpochDelta) { - let tipset = self.store.heaviest_tipset(); - let epoch = tipset.epoch() + delta; - run_to_epoch(&self.db, &self.store, epoch); - } - - fn insert_unreachable(&self, block_number: i64) { - insert_unreachable(&self.db, block_number as u64); - } - - fn get_heaviest_tipset_fn(&self) -> Box Arc + Send> { - let store = self.store.clone(); - Box::new(move || store.heaviest_tipset()) - } - } - - // This is a test that checks the `mark` step. - // 1. Generate the genesis block and write it to the database. - // 2. Try running the GC, encounter insufficient depth, check that there were no marked records. - // 3. Generate `depth` blocks. - // 4. Run the GC again to make sure it marked all the available records successfully. - #[quickcheck_async::tokio] - async fn test_populate(depth: u8) { - // Enforce depth above zero. - if depth < 1 { - return; - } - - let tester = GCTester::new(); - let depth = depth as ChainEpochDelta; - - let mut gc = MarkAndSweep::new( - tester.db.clone(), - tester.get_heaviest_tipset_fn(), - depth, - ZERO_DURATION, - ); - - // test insufficient epochs - gc.gc_workflow(ZERO_DURATION).await.unwrap(); - assert!(gc.marked.is_empty()); - - // test marked - tester.run_epochs(depth); - gc.gc_workflow(ZERO_DURATION).await.unwrap(); - assert_eq!(gc.marked.len(), 1 + depth as usize); - assert_eq!(gc.epoch_marked, depth); - } - - #[quickcheck_async::tokio] - async fn dont_gc_reachable_data(depth: u8, current_epoch: u8) { - // Enforce depth above zero. - if depth < 1 { - return; - } - - let depth = depth as ChainEpochDelta; - let current_epoch = current_epoch as ChainEpochDelta; - - let tester = GCTester::new(); - let mut gc = MarkAndSweep::new( - tester.db.clone(), - tester.get_heaviest_tipset_fn(), - depth, - ZERO_DURATION, - ); - - let depth = depth as ChainEpochDelta; - let current_epoch = current_epoch as ChainEpochDelta; - - // Make sure we run enough epochs to initiate GC. - tester.run_epochs(current_epoch); - tester.run_epochs(depth); - // Mark. - gc.gc_workflow(ZERO_DURATION).await.unwrap(); - tester.run_epochs(depth); - // Sweep. - gc.gc_workflow(ZERO_DURATION).await.unwrap(); - - // Make sure we don't clean anything up. - assert_eq!( - tester.db.get_keys().unwrap().len() as i64, - // `Current epoch + genesis block + twice the depth.` - current_epoch + 1 + depth * 2 - ); - } - - #[quickcheck_async::tokio] - async fn no_young_data_cleanups(depth: u8, current_epoch: u8, unreachable_nodes: u8) { - // Enforce depth above zero. - if depth < 1 { - return; - } - - let depth = depth as ChainEpochDelta; - let current_epoch = current_epoch as ChainEpochDelta; - let unreachable_nodes = unreachable_nodes as i64; - - let tester = GCTester::new(); - let mut gc = MarkAndSweep::new( - tester.db.clone(), - tester.get_heaviest_tipset_fn(), - depth, - ZERO_DURATION, - ); - - let depth = depth as ChainEpochDelta; - let current_epoch = current_epoch as ChainEpochDelta; - - // Make sure we run enough epochs to initiate GC. - tester.run_epochs(current_epoch); - tester.run_epochs(depth); - // Mark. - gc.gc_workflow(ZERO_DURATION).await.unwrap(); - tester.run_epochs(depth); - - // Insert unreachable nodes after the mark step. - tester.insert_unreachable(unreachable_nodes); - // Sweep. - gc.gc_workflow(ZERO_DURATION).await.unwrap(); - - // Make sure we don't clean anything up. - assert_eq!( - tester.db.get_keys().unwrap().len() as i64, - // `Current epoch + genesis block + twice the depth + unreachable nodes.` - current_epoch + 1 + depth * 2 + unreachable_nodes - ); - } - - #[quickcheck_async::tokio] - async fn unreachable_old_data_collected(depth: u8, current_epoch: u8, unreachable_nodes: u8) { - // Enforce depth above zero. - if depth < 1 { - return; - } - - let depth = depth as ChainEpochDelta; - let current_epoch = current_epoch as ChainEpochDelta; - let unreachable_nodes = unreachable_nodes as i64; - - let tester = GCTester::new(); - let mut gc = MarkAndSweep::new( - tester.db.clone(), - tester.get_heaviest_tipset_fn(), - depth, - ZERO_DURATION, - ); - - let depth = depth as ChainEpochDelta; - let current_epoch = current_epoch as ChainEpochDelta; - - // Make sure we run enough epochs to initiate GC. - tester.run_epochs(current_epoch); - tester.run_epochs(depth); - // Insert unreachable nodes before the mark step. - tester.insert_unreachable(unreachable_nodes); - // Mark. - gc.gc_workflow(ZERO_DURATION).await.unwrap(); - tester.run_epochs(depth); - - // Sweep. - gc.gc_workflow(ZERO_DURATION).await.unwrap(); - - // Make sure we clean up old unreachable data. - assert_eq!( - tester.db.get_keys().unwrap().len() as i64, - // `Current epoch + genesis block + twice the depth.` - current_epoch + 1 + depth * 2 - ); - } -} diff --git a/src/db/memory.rs b/src/db/memory.rs index 7bc86d7bff5..e856552cb07 100644 --- a/src/db/memory.rs +++ b/src/db/memory.rs @@ -1,9 +1,8 @@ // Copyright 2019-2023 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use crate::db::{truncated_hash, GarbageCollectable}; use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite}; -use ahash::{HashMap, HashSet, HashSetExt}; +use ahash::HashMap; use cid::Cid; use fvm_ipld_blockstore::Blockstore; use itertools::Itertools; @@ -17,29 +16,6 @@ pub struct MemoryDB { settings_db: RwLock>>, } -impl GarbageCollectable for MemoryDB { - fn get_keys(&self) -> anyhow::Result> { - let mut set = HashSet::with_capacity(self.blockchain_db.read().len()); - for key in self.blockchain_db.read().keys() { - let cid = Cid::try_from(key.as_slice())?; - set.insert(truncated_hash(cid.hash())); - } - Ok(set) - } - - fn remove_keys(&self, keys: HashSet) -> anyhow::Result<()> { - let mut db = self.blockchain_db.write(); - db.retain(|key, _| { - let cid = Cid::try_from(key.as_slice()); - match cid { - Ok(cid) => !keys.contains(&truncated_hash(cid.hash())), - _ => true, - } - }); - Ok(()) - } -} - impl SettingsStore for MemoryDB { fn read_bin(&self, key: &str) -> anyhow::Result>> { Ok(self.settings_db.read().get(key).cloned()) diff --git a/src/db/migration/migration_map.rs b/src/db/migration/migration_map.rs index 309c0a8274e..932eb6e9af1 100644 --- a/src/db/migration/migration_map.rs +++ b/src/db/migration/migration_map.rs @@ -7,7 +7,6 @@ use std::{ sync::Arc, }; -use crate::db::migration::v0_14_0::Migration0_15_2_0_16_0; use anyhow::bail; use anyhow::Context as _; use itertools::Itertools; @@ -77,11 +76,10 @@ pub(super) static MIGRATIONS: Lazy = Lazy::new(|| { create_migrations!( "0.12.1" -> "0.13.0" @ Migration0_12_1_0_13_0, "0.13.0" -> "0.14.0" @ MigrationVoid, - "0.14.0" -> "0.14.1" @ MigrationVoid, + "0.14.0" -> "0.15.0" @ MigrationVoid, "0.14.1" -> "0.15.0" @ MigrationVoid, "0.15.0" -> "0.15.1" @ MigrationVoid, "0.15.1" -> "0.15.2" @ MigrationVoid, - "0.15.2" -> "0.16.0" @ Migration0_15_2_0_16_0, ); pub struct Migration { diff --git a/src/db/migration/mod.rs b/src/db/migration/mod.rs index 7dcb9bcb6ae..a88ee46351e 100644 --- a/src/db/migration/mod.rs +++ b/src/db/migration/mod.rs @@ -4,7 +4,6 @@ mod db_migration; mod migration_map; mod v0_12_1; -mod v0_14_0; mod void_migration; pub use db_migration::DbMigration; diff --git a/src/db/migration/v0_14_0.rs b/src/db/migration/v0_14_0.rs deleted file mode 100644 index 347c8fb2b70..00000000000 --- a/src/db/migration/v0_14_0.rs +++ /dev/null @@ -1,192 +0,0 @@ -// Copyright 2019-2023 ChainSafe Systems -// SPDX-License-Identifier: Apache-2.0, MIT - -//! Migration logic for 0.15.2 to 0.16.0 version. -//! We are getting rid of rolling db in favor of mark-and-sweep GC. Therefore the two databases -//! previously representing node state have to be merged into a new one and removed. - -use crate::db::db_engine::Db; -use crate::db::migration::migration_map::temporary_db_name; -use crate::db::migration::v0_14_0::paritydb_0_15_1::{DbColumn, ParityDb}; -use anyhow::Context; -use cid::multihash::Code::Blake2b256; -use cid::multihash::MultihashDigest; -use cid::Cid; -use fvm_ipld_encoding::DAG_CBOR; -use semver::Version; -use std::path::{Path, PathBuf}; -use strum::IntoEnumIterator; -use tracing::info; - -use super::migration_map::MigrationOperation; - -pub(super) struct Migration0_15_2_0_16_0 { - from: Version, - to: Version, -} - -/// Migrates the database from version 0.15.2 to 0.16.0 -/// This migration merges the two databases represented by rolling db into one. -impl MigrationOperation for Migration0_15_2_0_16_0 { - fn new(from: Version, to: Version) -> Self - where - Self: Sized, - { - Self { from, to } - } - - fn pre_checks(&self, _chain_data_path: &Path) -> anyhow::Result<()> { - Ok(()) - } - - fn migrate(&self, chain_data_path: &Path) -> anyhow::Result { - let source_db = chain_data_path.join(self.from.to_string()); - - let db_paths: Vec = source_db - .read_dir()? - .filter_map(|entry| Some(entry.ok()?.path())) - .filter(|entry| entry.is_dir()) - .collect(); - let temp_db_path = chain_data_path.join(temporary_db_name(&self.from, &self.to)); - if temp_db_path.exists() { - info!( - "removing old temporary database {temp_db_path}", - temp_db_path = temp_db_path.display() - ); - std::fs::remove_dir_all(&temp_db_path)?; - } - - // open the new database to migrate data from the old one. - let new_db = ParityDb::open(&temp_db_path)?; - - // because of the rolling db, we have to do the migration for each sub-database... - for sub_db in &db_paths { - info!("migrating RollingDB partition {:?}", sub_db); - let db = ParityDb::open(sub_db)?; - - for col in DbColumn::iter() { - info!("migrating column {}", col); - let mut res = anyhow::Ok(()); - if col == DbColumn::GraphDagCborBlake2b256 { - db.db.iter_column_while(col as u8, |val| { - let hash = Blake2b256.digest(&val.value); - let cid = Cid::new_v1(DAG_CBOR, hash); - res = new_db - .db - .commit_changes([Db::set_operation( - col as u8, - cid.to_bytes(), - val.value, - )]) - .context("failed to commit"); - - if res.is_err() { - return false; - } - - true - })?; - res?; - } else { - let mut iter = db.db.iter(col as u8)?; - while let Some((key, value)) = iter.next()? { - // We don't need this anymore as the old GC has been deprecated. - if key.eq(b"estimated_reachable_records") { - continue; - } - new_db - .db - .commit_changes([Db::set_operation(col as u8, key, value)]) - .context("failed to commit")?; - } - } - } - } - - drop(new_db); - Ok(temp_db_path) - } - - fn post_checks(&self, chain_data_path: &Path) -> anyhow::Result<()> { - let temp_db_name = temporary_db_name(&self.from, &self.to); - if !chain_data_path.join(&temp_db_name).exists() { - anyhow::bail!( - "migration database {} does not exist", - chain_data_path.join(temp_db_name).display() - ); - } - Ok(()) - } -} - -/// Database settings, Forest `v0.15.1` -mod paritydb_0_15_1 { - use crate::db; - use parity_db::{CompressionType, Db, Options}; - use std::path::PathBuf; - use strum::{Display, EnumIter, IntoEnumIterator}; - - #[derive(Copy, Clone, Debug, PartialEq, EnumIter, Display)] - #[repr(u8)] - pub(super) enum DbColumn { - GraphDagCborBlake2b256, - GraphFull, - Settings, - } - - impl DbColumn { - fn create_column_options(compression: CompressionType) -> Vec { - DbColumn::iter() - .map(|col| { - match col { - DbColumn::GraphDagCborBlake2b256 => parity_db::ColumnOptions { - preimage: true, - compression, - ..Default::default() - }, - DbColumn::GraphFull => parity_db::ColumnOptions { - preimage: true, - // This is needed for key retrieval. - btree_index: true, - compression, - ..Default::default() - }, - DbColumn::Settings => parity_db::ColumnOptions { - // explicitly disable preimage for settings column - // othewise we are not able to overwrite entries - preimage: false, - // This is needed for key retrieval. - btree_index: true, - compression, - ..Default::default() - }, - } - }) - .collect() - } - } - - pub(super) struct ParityDb {} - - impl ParityDb { - pub(super) fn to_options(path: PathBuf) -> Options { - Options { - path, - sync_wal: true, - sync_data: true, - stats: false, - salt: None, - columns: DbColumn::create_column_options(CompressionType::Lz4), - compression_threshold: [(0, 128)].into_iter().collect(), - } - } - - // Return latest ParityDB implementation here to avoid too much repetition. This will break - // if it changes and then this migration should either be maintained or removed. - pub(super) fn open(path: impl Into) -> anyhow::Result { - let opts = Self::to_options(path.into()); - let db = db::parity_db::ParityDb::wrap(Db::open_or_create(&opts)?, false); - Ok(db) - } - } -} diff --git a/src/db/mod.rs b/src/db/mod.rs index bd94eadfab4..6035d5d8d84 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -5,16 +5,11 @@ pub mod car; mod memory; pub mod parity_db; pub mod parity_db_config; - -mod gc; -pub use gc::MarkAndSweep; +pub mod rolling; pub use memory::MemoryDB; mod db_mode; pub mod migration; - -use ahash::HashSet; use anyhow::Context as _; -use cid::multihash; use serde::de::DeserializeOwned; use serde::Serialize; use std::sync::Arc; @@ -22,6 +17,8 @@ use std::sync::Arc; pub mod setting_keys { /// Key used to store the heaviest tipset in the settings store. This is expected to be a [`crate::blocks::TipsetKeys`] pub const HEAD_KEY: &str = "head"; + /// Estimated number of IPLD records in the database. This is expected to be a `usize` + pub const ESTIMATED_RECORDS_KEY: &str = "estimated_reachable_records"; /// Key used to store the memory pool configuration in the settings store. pub const MPOOL_CONFIG_KEY: &str = "/mpool/config"; } @@ -105,35 +102,11 @@ impl DBStatistics for std::sync::Arc { } } -/// A trait to facilitate mark-and-sweep garbage collection. -/// -/// NOTE: Since there is no real need for generics here right now - the 'key' type is specified to -/// avoid wrapping it. -pub trait GarbageCollectable { - /// Gets all the keys currently in the database. - /// - /// NOTE: This might need to be further enhanced with some sort of limit to avoid taking up too - /// much time and memory. - fn get_keys(&self) -> anyhow::Result>; - - /// Removes all the keys marked for deletion. - /// - /// # Arguments - /// - /// * `keys` - A set of keys to be removed from the database. - fn remove_keys(&self, keys: HashSet) -> anyhow::Result<()>; -} - -/// A function that converts a [`multihash::MultihashGeneric`] digest into a `u32` representation. -/// We don't care about collisions here as main use-case is garbage collection. -pub(crate) fn truncated_hash(hash: &multihash::MultihashGeneric) -> u32 { - let digest = hash.digest(); - u32::from_le_bytes(digest[0..4].try_into().expect("shouldn't fail")) -} - pub mod db_engine { use std::path::{Path, PathBuf}; + use crate::db::rolling::*; + use super::db_mode::choose_db; pub type Db = crate::db::parity_db::ParityDb; @@ -144,11 +117,14 @@ pub mod db_engine { choose_db(chain_data_root) } - pub fn open_db(path: PathBuf, config: DbConfig) -> anyhow::Result { - Db::open(path, &config).map_err(Into::into) + pub(in crate::db) fn open_db(path: &Path, config: &DbConfig) -> anyhow::Result { + Db::open(path, config).map_err(Into::into) } -} + pub fn open_proxy_db(db_root: PathBuf, db_config: DbConfig) -> anyhow::Result { + RollingDB::load_or_create(db_root, db_config) + } +} #[cfg(test)] mod tests { pub mod db_utils; diff --git a/src/db/parity_db.rs b/src/db/parity_db.rs index 994283ce697..1420a7f94ee 100644 --- a/src/db/parity_db.rs +++ b/src/db/parity_db.rs @@ -1,20 +1,16 @@ // Copyright 2019-2023 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use ahash::{HashSet, HashSetExt}; use std::path::PathBuf; use super::SettingsStore; -use crate::db::{ - parity_db_config::ParityDbConfig, truncated_hash, DBStatistics, GarbageCollectable, -}; +use crate::db::{parity_db_config::ParityDbConfig, DBStatistics}; use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite}; use anyhow::{anyhow, Context as _}; use cid::multihash::Code::Blake2b256; -use cid::multihash::MultihashDigest; use cid::Cid; use fvm_ipld_blockstore::Blockstore; @@ -101,13 +97,6 @@ impl ParityDb { }) } - pub fn wrap(db: parity_db::Db, stats: bool) -> Self { - Self { - db, - statistics_enabled: stats, - } - } - /// Returns an appropriate column variant based on the information /// in the Cid. fn choose_column(cid: &Cid) -> DbColumn { @@ -265,87 +254,6 @@ impl DBStatistics for ParityDb { } } -type Op = (u8, Operation, Vec>); - -impl ParityDb { - /// Removes a record. - /// - /// # Arguments - /// * `key` - record identifier - pub fn dereference_operation(key: &Cid) -> Op { - let column = Self::choose_column(key); - (column as u8, Operation::Dereference(key.to_bytes())) - } - - /// Updates/inserts a record. - /// - /// # Arguments - /// * `column` - column identifier - /// * `key` - record identifier - /// * `value` - record contents - pub fn set_operation(column: u8, key: Vec, value: Vec) -> Op { - (column, Operation::Set(key, value)) - } -} - -impl GarbageCollectable for ParityDb { - fn get_keys(&self) -> anyhow::Result> { - let mut set = HashSet::new(); - - // First iterate over all of the indexed entries. - let mut iter = self.db.iter(DbColumn::GraphFull as u8)?; - while let Some((key, _)) = iter.next()? { - let cid = Cid::try_from(key)?; - set.insert(truncated_hash(cid.hash())); - } - - self.db - .iter_column_while(DbColumn::GraphDagCborBlake2b256 as u8, |val| { - let hash = Blake2b256.digest(&val.value); - set.insert(truncated_hash(&hash)); - true - })?; - - Ok(set) - } - - fn remove_keys(&self, keys: HashSet) -> anyhow::Result<()> { - let mut iter = self.db.iter(DbColumn::GraphFull as u8)?; - while let Some((key, _)) = iter.next()? { - let cid = Cid::try_from(key)?; - - if keys.contains(&truncated_hash(cid.hash())) { - self.db - .commit_changes([Self::dereference_operation(&cid)]) - .context("error remove")? - } - } - - // An unfortunate consequence of having to use `iter_column_while`. - let mut result = Ok(()); - - self.db - .iter_column_while(DbColumn::GraphDagCborBlake2b256 as u8, |val| { - let hash = Blake2b256.digest(&val.value); - if keys.contains(&truncated_hash(&hash)) { - let cid = Cid::new_v1(DAG_CBOR, hash); - let res = self - .db - .commit_changes([Self::dereference_operation(&cid)]) - .context("error remove"); - - if res.is_err() { - result = res; - return false; - } - } - true - })?; - - result - } -} - #[cfg(test)] mod test { use cid::multihash::Code::Sha2_256; @@ -414,48 +322,6 @@ mod test { assert_eq!(b"bloop", actual.as_bytes()); } - #[test] - #[ignore] - // This needs to be reinstated once there is a reliable way to make sure that all the commits - // make it to the database and are visible when read through iterator. - // There seems to be a bug related to database reads. - // See https://github.com/paritytech/parity-db/issues/227. - fn garbage_collectable() { - let db = TempParityDB::new(); - let data = [ - b"h'nglui mglw'nafh".to_vec(), - b"Cthulhu".to_vec(), - b"R'lyeh wgah'nagl fhtagn!!".to_vec(), - ]; - let cids = [ - Cid::new_v1(DAG_CBOR, Blake2b256.digest(&data[0])), - Cid::new_v1(DAG_CBOR, Sha2_256.digest(&data[1])), - Cid::new_v1(IPLD_RAW, Blake2b256.digest(&data[1])), - ]; - - let cases = [ - (DbColumn::GraphDagCborBlake2b256, cids[0], &data[0]), - (DbColumn::GraphFull, cids[1], &data[1]), - (DbColumn::GraphFull, cids[2], &data[2]), - ]; - - for (_, cid, data) in cases { - db.put_keyed(&cid, data).unwrap(); - } - - let keys = db.get_keys().unwrap(); - - // This is flaky, because iterating columns does not give visibility guarantees for the - // latest commits. - assert_eq!(keys.len(), cases.len()); - - db.remove_keys(keys).unwrap(); - - // Panics on this line: https://github.com/paritytech/parity-db/blob/ec686930169b84d21336bed6d6f05c787a17d61f/src/file.rs#L130 - let keys = db.get_keys().unwrap(); - assert_eq!(keys.len(), 0); - } - #[test] fn choose_column_test() { let data = [0u8; 32]; diff --git a/src/db/rolling/gc.rs b/src/db/rolling/gc.rs new file mode 100644 index 00000000000..ef6b7113ef8 --- /dev/null +++ b/src/db/rolling/gc.rs @@ -0,0 +1,295 @@ +// Copyright 2019-2023 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +//! +//! The current implementation of the garbage collector is a concurrent, +//! semi-space one. +//! +//! ## Design goals +//! Implement a correct GC algorithm that is simple and efficient for forest +//! scenarios. +//! +//! ## GC algorithm +//! We chose the `semi-space` GC algorithm for simplicity and sufficiency +//! Besides `semi-space`, `mark-and-sweep` was also considered and evaluated. +//! However, it's not feasible because of the limitations of the underlying DB +//! we use, more specifically, limitations in iterating the DB and retrieving the original key. See +//! +//! ## GC workflow +//! 1. Walk back from the current heaviest tipset to the genesis block, collect +//! all the blocks that are reachable from the snapshot +//! 2. writes blocks that are absent from the `current` database to it +//! 3. delete `old` database(s) +//! 4. sets `current` database to a newly created one +//! +//! ## Correctness +//! This algorithm considers all blocks that are visited during the snapshot +//! export task reachable, and ensures they are all transferred and kept in the +//! current DB space. A snapshot can be used to bootstrap a node from +//! scratch thus the algorithm is considered appropriate when the post-GC +//! database contains blocks that are sufficient for exporting a snapshot +//! +//! ## Disk usage +//! During `walk_snapshot`, data from the `old` DB is duplicated in the +//! `current` DB, which uses extra disk space of up to 100% of the snapshot file +//! size +//! +//! ## Memory usage +//! During the data carry-over process, a memory buffer with a fixed capacity is +//! used to speed up the database write operation +//! +//! ## Scheduling +//! 1. GC is triggered automatically when total DB size is greater than `2x` of +//! the last reachable data size +//! 2. GC can be triggered manually by `forest-cli db gc` command +//! 3. There's a global GC lock to ensure at most one GC job is running +//! +//! ## Performance +//! GC performance is typically `1x-1.5x` of `snapshot export`, depending on +//! number of write operations to the `current` DB space. +//! +//! ### Look up performance +//! DB lookup performance is almost on-par between from single DB and two DBs. +//! Time cost of `forest-cli snapshot export --dry-run` on DO droplet with 16 +//! GiB RAM is between `9000s` to `11000s` for both scenarios, no significant +//! performance regression has been observed +//! +//! ### Write performance +//! DB write performance is typically on par with `snapshot import`. Note that +//! when the `current` DB space is very large, it tends to trigger DB re-index +//! more frequently, each DB re-index could pause the GC process for a few +//! minutes. The same behavior is observed during snapshot import as well. +//! +//! ### Sample mainnet log +//! ```text +//! 2023-03-16T19:50:40.323860Z INFO crate::db::rolling::gc: Garbage collection started at epoch 2689660 +//! 2023-03-16T22:27:36.484245Z INFO crate::db::rolling::gc: Garbage collection finished at epoch 2689660, took 9416s, reachable data size: 135.71GB +//! 2023-03-16T22:27:38.793717Z INFO crate::db::rolling::impls: Deleted database under /root/.local/share/forest/mainnet/paritydb/14d0f80992374fb8b20e3b1bd70d5d7b, size: 139.01GB +//! ``` + +use crate::blocks::Tipset; +use crate::db::setting_keys::ESTIMATED_RECORDS_KEY; +use crate::db::SettingsStoreExt; +use crate::ipld::util::*; +use crate::utils::db::{BlockstoreBufferedWriteExt, DB_KEY_BYTES}; +use anyhow::Context as _; +use chrono::Utc; +use fvm_ipld_blockstore::Blockstore; +use human_repr::HumanCount; +use std::{ + sync::atomic::{self, AtomicU64, AtomicUsize}, + time::Duration, +}; +use tokio::sync::Mutex; + +use super::*; + +pub struct DbGarbageCollector +where + F: Fn() -> Tipset + Send + Sync + 'static, +{ + db: Arc>>, + get_tipset: F, + chain_finality: i64, + recent_state_roots: i64, + lock: Mutex<()>, + gc_tx: flume::Sender>>, + gc_rx: flume::Receiver>>, + last_reachable_bytes: AtomicU64, +} + +impl DbGarbageCollector +where + F: Fn() -> Tipset + Send + Sync + 'static, +{ + pub fn new( + db: Arc>>, + chain_finality: i64, + recent_state_roots: i64, + get_tipset: F, + ) -> Self { + let (gc_tx, gc_rx) = flume::unbounded(); + + Self { + db, + get_tipset, + chain_finality, + recent_state_roots, + lock: Default::default(), + gc_tx, + gc_rx, + last_reachable_bytes: AtomicU64::new(0), + } + } + + pub fn get_tx(&self) -> flume::Sender>> { + self.gc_tx.clone() + } + + /// This loop automatically triggers `collect_once` when the total DB size + /// is greater than `2x` of the last reachable data size + pub async fn collect_loop_passive(&self) -> anyhow::Result<()> { + info!("Running automatic database garbage collection task"); + loop { + // Check every 10 mins + tokio::time::sleep(Duration::from_secs(10 * 60)).await; + + // Bypass size checking during import + let tipset = (self.get_tipset)(); + if tipset.epoch() == 0 { + continue; + } + + // Bypass size checking when lock is held + { + let lock = self.lock.try_lock(); + if lock.is_err() { + continue; + } + } + + if let (Ok(total_size), Ok(current_size), last_reachable_bytes) = ( + self.db.writer().total_size_in_bytes(), + self.db.writer().current_size_in_bytes(), + self.last_reachable_bytes.load(atomic::Ordering::Relaxed), + ) { + let should_collect = if last_reachable_bytes > 0 { + total_size > (gc_trigger_factor() * last_reachable_bytes as f64) as _ + } else { + total_size > 0 && current_size * 3 > total_size + }; + + if should_collect { + if let Err(err) = self.collect_once().await { + warn!("Garbage collection failed: {err}"); + } + } + } + } + } + + /// This loop listens on events emitted by `forest-cli db gc` and triggers + /// `collect_once` + pub async fn collect_loop_event(self: &Arc) -> anyhow::Result<()> { + info!("Listening on database garbage collection events"); + while let Ok(responder) = self.gc_rx.recv_async().await { + let this = self.clone(); + tokio::spawn(async move { + let result = this.collect_once().await; + if let Err(e) = responder.send(result) { + warn!("{e}"); + } + }); + } + + Ok(()) + } + + /// ## GC workflow + /// 1. Walk back from the current heaviest tipset to the genesis block, + /// collect all the blocks that are reachable from the snapshot + /// 2. writes blocks that are absent from the `current` database to it + /// 3. delete `old` database(s) + /// 4. sets `current` database to a newly created one + /// + /// ## Data Safety + /// The blockchain consists of an immutable part (tipsets that are at least + /// 900 epochs older than the current head) and a mutable part (tipsets + /// that are within the most recent 900 epochs). Deleting data from the + /// mutable part of the chain can be problematic; therefore, we record the + /// exact epoch at which a new current database space is created, and only + /// perform garbage collection when this creation epoch has become + /// immutable (at least 900 epochs older than the current head), thus + /// the old database space that will be deleted at the end of garbage + /// collection only contains immutable or finalized part of the chain, + /// from which all block data that is marked as unreachable will not + /// become reachable because of the chain being mutated later. + async fn collect_once(&self) -> anyhow::Result<()> { + let tipset = (self.get_tipset)(); + + if self.db.writer().current_creation_epoch() + self.chain_finality >= tipset.epoch() { + anyhow::bail!("Cancelling GC: the old DB space contains unfinalized chain parts"); + } + + let guard = self.lock.try_lock(); + if guard.is_err() { + anyhow::bail!("Another garbage collection task is in progress."); + } + + let start = Utc::now(); + let reachable_bytes = Arc::new(AtomicUsize::new(0)); + + info!("Garbage collection started at epoch {}", tipset.epoch()); + let db = &self.db; + // 128MB + const BUFFER_CAPCITY_BYTES: usize = 128 * 1024 * 1024; + let (tx, rx) = flume::bounded(100); + let write_task = tokio::spawn({ + let db = db.writer().current(); + async move { db.buffered_write(rx, BUFFER_CAPCITY_BYTES).await } + }); + let estimated_reachable_records = self.db.writer().read_obj(ESTIMATED_RECORDS_KEY)?; + let n_records = walk_snapshot( + &tipset, + self.recent_state_roots, + |cid| { + let db = db.clone(); + let tx = tx.clone(); + let reachable_bytes = reachable_bytes.clone(); + async move { + let block = db + .get(&cid)? + .with_context(|| format!("Cid {cid} not found in blockstore"))?; + + let pair = (cid, block.clone()); + if db.writer().has(&cid)? { + reachable_bytes + .fetch_add(DB_KEY_BYTES + pair.1.len(), atomic::Ordering::Relaxed); + + if !db.writer().current().has(&cid)? { + tx.send_async(pair).await?; + } + } + + Ok(block) + } + }, + Some("Running DB GC | blocks"), + Some(WALK_SNAPSHOT_PROGRESS_DB_GC.clone()), + estimated_reachable_records, + ) + .await?; + drop(tx); + + self.db + .writer() + .write_obj(ESTIMATED_RECORDS_KEY, &n_records)?; + + write_task.await??; + + let reachable_bytes = reachable_bytes.load(atomic::Ordering::Relaxed); + self.last_reachable_bytes + .store(reachable_bytes as _, atomic::Ordering::Relaxed); + info!( + "Garbage collection finished at epoch {}, took {}s, paritydb reachable data size: {}", + tipset.epoch(), + (Utc::now() - start).num_seconds(), + reachable_bytes.human_count_bytes(), + ); + + // Use the latest head here + self.db.writer().next_current((self.get_tipset)().epoch())?; + + Ok(()) + } +} + +fn gc_trigger_factor() -> f64 { + const DEFAULT_GC_TRIGGER_FACTOR: f64 = 2.0; + + if let Ok(factor) = std::env::var("FOREST_GC_TRIGGER_FACTOR") { + factor.parse().unwrap_or(DEFAULT_GC_TRIGGER_FACTOR) + } else { + DEFAULT_GC_TRIGGER_FACTOR + } +} diff --git a/src/db/rolling/impls.rs b/src/db/rolling/impls.rs new file mode 100644 index 00000000000..a71312a7b57 --- /dev/null +++ b/src/db/rolling/impls.rs @@ -0,0 +1,344 @@ +// Copyright 2019-2023 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite}; +use crate::utils::db::file_backed_obj::FileBackedObject; +use ahash::HashSet; +use cid::Cid; +use fvm_ipld_blockstore::Blockstore; +use human_repr::HumanCount; +use itertools::Itertools; +use parking_lot::RwLock; +use uuid::Uuid; + +use super::*; +use crate::db::*; + +impl Blockstore for RollingDB { + fn has(&self, k: &Cid) -> anyhow::Result { + for db in self.db_queue() { + if Blockstore::has(&db, k)? { + return Ok(true); + } + } + + Ok(false) + } + + fn get(&self, k: &Cid) -> anyhow::Result>> { + for db in self.db_queue() { + if let Some(v) = Blockstore::get(&db, k)? { + return Ok(Some(v)); + } + } + + Ok(None) + } + + fn put( + &self, + mh_code: cid::multihash::Code, + block: &fvm_ipld_blockstore::Block, + ) -> anyhow::Result + where + Self: Sized, + D: AsRef<[u8]>, + { + Blockstore::put(&self.current(), mh_code, block) + } + + fn put_many(&self, blocks: I) -> anyhow::Result<()> + where + Self: Sized, + D: AsRef<[u8]>, + I: IntoIterator)>, + { + Blockstore::put_many(&self.current(), blocks) + } + + fn put_many_keyed(&self, blocks: I) -> anyhow::Result<()> + where + Self: Sized, + D: AsRef<[u8]>, + I: IntoIterator, + { + Blockstore::put_many_keyed(&self.current(), blocks) + } + + fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> { + Blockstore::put_keyed(&self.current(), k, block) + } +} + +impl SettingsStore for RollingDB { + fn read_bin(&self, key: &str) -> anyhow::Result>> { + for db in self.db_queue() { + if let Some(v) = SettingsStore::read_bin(db.as_ref(), key)? { + return Ok(Some(v)); + } + } + + Ok(None) + } + + fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> { + SettingsStore::write_bin(self.current.read().as_ref(), key, value) + } + + fn exists(&self, key: &str) -> anyhow::Result { + for db in self.db_queue() { + if SettingsStore::exists(db.as_ref(), key)? { + return Ok(true); + } + } + + Ok(false) + } + + fn setting_keys(&self) -> anyhow::Result> { + let mut set = HashSet::default(); + for db in self.db_queue() { + set.extend(SettingsStore::setting_keys(db.as_ref())?); + } + Ok(set.into_iter().collect_vec()) + } +} + +impl BitswapStoreRead for RollingDB { + fn contains(&self, cid: &Cid) -> anyhow::Result { + for db in self.db_queue() { + if BitswapStoreRead::contains(&db, cid)? { + return Ok(true); + } + } + + Ok(false) + } + + fn get(&self, cid: &Cid) -> anyhow::Result>> { + for db in self.db_queue() { + if let Some(v) = BitswapStoreRead::get(&db, cid)? { + return Ok(Some(v)); + } + } + + Ok(None) + } +} + +impl BitswapStoreReadWrite for RollingDB { + type Params = ::Params; + + fn insert(&self, block: &libipld::Block) -> anyhow::Result<()> { + BitswapStoreReadWrite::insert(self.current().as_ref(), block) + } +} + +impl DBStatistics for RollingDB { + fn get_statistics(&self) -> Option { + DBStatistics::get_statistics(self.current.read().as_ref()) + } +} + +impl FileBackedObject for DbIndex { + fn serialize(&self) -> anyhow::Result> { + Ok(serde_yaml::to_string(self)?.as_bytes().to_vec()) + } + + fn deserialize(bytes: &[u8]) -> anyhow::Result { + Ok(serde_yaml::from_slice(bytes)?) + } +} + +impl RollingDB { + pub fn load_or_create(db_root: PathBuf, db_config: DbConfig) -> anyhow::Result { + if !db_root.exists() { + std::fs::create_dir_all(db_root.as_path())?; + } + let (db_index, current, old) = load_dbs(&db_root, &db_config)?; + + Ok(Self { + db_root, + db_config, + db_index: RwLock::new(db_index), + current: RwLock::new(current.into()), + old: RwLock::new(old.into()), + }) + } + + /// Sets `current` as `old`, and sets a new DB as `current`, finally delete + /// the dangling `old` DB. + pub(super) fn next_current(&self, current_epoch: i64) -> anyhow::Result<()> { + let new_db_name = Uuid::new_v4().simple().to_string(); + info!("Setting {new_db_name} as current db"); + let db = open_db(&self.db_root.join(&new_db_name), &self.db_config)?; + *self.old.write() = std::mem::replace(&mut self.current.write(), db.into()); + + let mut db_index = self.db_index.write(); + let db_index_inner_mut = db_index.inner_mut(); + let old_db_path = self.db_root.join(&db_index_inner_mut.old); + db_index_inner_mut.old = db_index_inner_mut.current.clone(); + db_index_inner_mut.current = new_db_name; + db_index_inner_mut.current_creation_epoch = current_epoch; + db_index.sync()?; + + delete_db(&old_db_path); + + self.transfer_settings()?; + + Ok(()) + } + + pub(super) fn current_creation_epoch(&self) -> i64 { + self.db_index.read().inner().current_creation_epoch + } + + pub fn total_size_in_bytes(&self) -> anyhow::Result { + // Sum old and current in case forest CAR files are stored under DB root + Ok(self.current_size_in_bytes()? + self.old_size_in_bytes()?) + } + + pub fn old_size_in_bytes(&self) -> anyhow::Result { + Ok(fs_extra::dir::get_size( + self.db_root + .as_path() + .join(self.db_index.read().inner().old.as_str()), + )?) + } + + pub fn current_size_in_bytes(&self) -> anyhow::Result { + Ok(fs_extra::dir::get_size( + self.db_root + .as_path() + .join(self.db_index.read().inner().current.as_str()), + )?) + } + + pub fn current(&self) -> Arc { + self.current.read().clone() + } + + fn db_queue(&self) -> [Arc; 2] { + [self.current.read().clone(), self.old.read().clone()] + } + + fn transfer_settings(&self) -> anyhow::Result<()> { + let current = self.current.read(); + for key in self.setting_keys()? { + if !current.exists(&key)? { + if let Some(v) = self.read_bin(&key)? { + current.write_bin(&key, &v)?; + } + } + } + + Ok(()) + } +} + +fn load_dbs(db_root: &Path, db_config: &DbConfig) -> anyhow::Result<(FileBacked, Db, Db)> { + let mut db_index = + FileBacked::load_from_file_or_create(db_root.join("db_index.yaml"), Default::default)?; + let db_index_mut: &mut DbIndex = db_index.inner_mut(); + if db_index_mut.current.is_empty() { + db_index_mut.current = Uuid::new_v4().simple().to_string(); + } + if db_index_mut.old.is_empty() { + db_index_mut.old = Uuid::new_v4().simple().to_string(); + } + let current = open_db(&db_root.join(&db_index_mut.current), db_config)?; + let old = open_db(&db_root.join(&db_index_mut.old), db_config)?; + db_index.sync()?; + Ok((db_index, current, old)) +} + +fn delete_db(db_path: &Path) { + let size = fs_extra::dir::get_size(db_path).unwrap_or_default(); + if let Err(err) = std::fs::remove_dir_all(db_path) { + warn!( + "Error deleting database under {}, size: {}. {err}", + db_path.display(), + size.human_count_bytes() + ); + } else { + info!( + "Deleted database under {}, size: {}", + db_path.display(), + size.human_count_bytes() + ); + } +} + +#[cfg(test)] +mod tests { + use std::{thread::sleep, time::Duration}; + + use cid::{multihash::MultihashDigest, Cid}; + use fvm_ipld_blockstore::Blockstore; + use pretty_assertions::assert_eq; + use rand::Rng; + use tempfile::TempDir; + + use super::*; + use crate::libp2p_bitswap::BitswapStoreRead; + + #[test] + fn rolling_db_behaviour_tests() { + let db_root = TempDir::new().unwrap(); + let rolling_db = + RollingDB::load_or_create(db_root.path().into(), Default::default()).unwrap(); + println!("Generating random blocks"); + let pairs: Vec<_> = (0..1000) + .map(|_| { + let mut bytes = [0; 1024]; + rand::rngs::OsRng.fill(&mut bytes); + let cid = + Cid::new_v0(cid::multihash::Code::Sha2_256.digest(bytes.as_slice())).unwrap(); + (cid, bytes.to_vec()) + }) + .collect(); + + let split_index = 500; + + for (i, (k, block)) in pairs.iter().enumerate() { + if i == split_index { + sleep(Duration::from_millis(1)); + println!("Creating a new current db"); + rolling_db.next_current(0).unwrap(); + println!("Created a new current db"); + } + rolling_db.put_keyed(k, block).unwrap(); + } + + for (i, (k, block)) in pairs.iter().enumerate() { + assert!(rolling_db.contains(k).unwrap(), "{i}"); + assert_eq!( + Blockstore::get(&rolling_db, k).unwrap().unwrap().as_slice(), + block, + "{i}" + ); + } + + rolling_db.next_current(0).unwrap(); + + for (i, (k, _)) in pairs.iter().enumerate() { + if i < split_index { + assert!(!rolling_db.contains(k).unwrap(), "{i}"); + } else { + assert!(rolling_db.contains(k).unwrap(), "{i}"); + } + } + + drop(rolling_db); + + let rolling_db = + RollingDB::load_or_create(db_root.path().into(), Default::default()).unwrap(); + for (i, (k, _)) in pairs.iter().enumerate() { + if i < split_index { + assert!(!rolling_db.contains(k).unwrap()); + } else { + assert!(rolling_db.contains(k).unwrap()); + } + } + } +} diff --git a/src/db/rolling/mod.rs b/src/db/rolling/mod.rs new file mode 100644 index 00000000000..da3cdda2afb --- /dev/null +++ b/src/db/rolling/mod.rs @@ -0,0 +1,52 @@ +// Copyright 2019-2023 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +//! The state of the Filecoin Blockchain is a persistent, directed acyclic +//! graph. Data in this graph is never mutated nor explicitly deleted but may +//! become unreachable over time. +//! +//! This module contains a concurrent, semi-space garbage collector. The garbage +//! collector is guaranteed to be non-blocking and can be expected to run with a +//! fixed memory overhead and require disk space proportional to the size of the +//! reachable graph. For example, if the size of the reachable graph is 100 GiB, +//! expect this garbage collector to use `3x100 GiB = 300 GiB` of storage. + +mod gc; +pub use gc::*; +mod impls; + +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; + +use parking_lot::RwLock; +use serde::{Deserialize, Serialize}; +use tracing::{info, warn}; + +use super::car::ManyCar; +use crate::db::db_engine::{open_db, Db, DbConfig}; +use crate::utils::db::file_backed_obj::FileBacked; + +/// This DB wrapper is specially designed for supporting the concurrent, +/// semi-space GC algorithm that is implemented in [`DbGarbageCollector`], +/// containing a reference to the `old` DB space and a reference to the +/// `current` DB space. Both underlying key-vale DB are supposed to contain only +/// block data as value and its content-addressed CID as key +pub struct RollingDB { + db_root: PathBuf, + db_config: DbConfig, + db_index: RwLock>, + /// The current writable DB + current: RwLock>, + /// The old writable DB + old: RwLock>, +} + +#[derive(Debug, Default, Serialize, Deserialize)] +struct DbIndex { + current: String, + #[serde(default = "Default::default")] + current_creation_epoch: i64, + old: String, +} diff --git a/src/ipld/util.rs b/src/ipld/util.rs index 912efe1e6cd..6c59a6cc760 100644 --- a/src/ipld/util.rs +++ b/src/ipld/util.rs @@ -1,20 +1,31 @@ // Copyright 2019-2023 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use std::{collections::VecDeque, future::Future, sync::Arc}; +use std::{ + collections::VecDeque, + future::Future, + sync::{ + atomic::{self, AtomicU64}, + Arc, + }, +}; use crate::cid_collections::CidHashSet; use crate::ipld::Ipld; use crate::shim::clock::ChainEpoch; use crate::utils::db::car_stream::CarBlock; use crate::utils::encoding::extract_cids; -use crate::{blocks::Tipset, utils::encoding::from_slice_with_fallback}; +use crate::utils::io::progress_log::WithProgressRaw; +use crate::{ + blocks::{BlockHeader, Tipset}, + utils::encoding::from_slice_with_fallback, +}; use anyhow::Context as _; use cid::Cid; use futures::Stream; use fvm_ipld_blockstore::Blockstore; use kanal::{Receiver, Sender}; - +use once_cell::sync::Lazy; use parking_lot::Mutex; use pin_project_lite::pin_project; use std::pin::Pin; @@ -100,6 +111,91 @@ where Ok(()) } +pub type ProgressBarCurrentTotalPair = Arc<(AtomicU64, AtomicU64)>; + +pub static WALK_SNAPSHOT_PROGRESS_DB_GC: Lazy = + Lazy::new(Default::default); + +/// Walks over tipset and state data and loads all blocks not yet seen. +/// This is tracked based on the callback function loading blocks. +pub async fn walk_snapshot( + tipset: &Tipset, + recent_roots: i64, + mut load_block: F, + progress_bar_message: Option<&str>, + progress_tracker: Option, + estimated_total_records: Option, +) -> anyhow::Result +where + F: FnMut(Cid) -> T + Send, + T: Future>> + Send, +{ + let estimated_total_records = estimated_total_records.unwrap_or_default(); + let message = progress_bar_message.unwrap_or("Walking snapshot"); + #[allow(deprecated)] // Tracking issue: https://github.com/ChainSafe/forest/issues/3157 + let wp = WithProgressRaw::new(message, estimated_total_records); + + let mut seen = CidHashSet::default(); + let mut blocks_to_walk: VecDeque = tipset.cids().into(); + let mut current_min_height = tipset.epoch(); + let incl_roots_epoch = tipset.epoch() - recent_roots; + + let on_inserted = { + let wp = wp.clone(); + let progress_tracker = progress_tracker.clone(); + move |len: usize| { + let progress = len as u64; + let total = progress.max(estimated_total_records); + wp.set(progress); + wp.set_total(total); + if let Some(progress_tracker) = &progress_tracker { + progress_tracker + .0 + .store(progress, atomic::Ordering::Relaxed); + progress_tracker.1.store(total, atomic::Ordering::Relaxed); + } + } + }; + + while let Some(next) = blocks_to_walk.pop_front() { + if !seen.insert(next) { + continue; + }; + on_inserted(seen.len()); + + if !should_save_block_to_snapshot(next) { + continue; + } + + let data = load_block(next).await?; + let h = from_slice_with_fallback::(&data)?; + + if current_min_height > h.epoch() { + current_min_height = h.epoch(); + } + + if h.epoch() > incl_roots_epoch { + recurse_links_hash(&mut seen, *h.messages(), &mut load_block, &on_inserted).await?; + } + + if h.epoch() > 0 { + for p in h.parents().cids.clone() { + blocks_to_walk.push_back(p); + } + } else { + for p in h.parents().cids.clone() { + load_block(p).await?; + } + } + + if h.epoch() == 0 || h.epoch() > incl_roots_epoch { + recurse_links_hash(&mut seen, *h.state_root(), &mut load_block, &on_inserted).await?; + } + } + + Ok(seen.len()) +} + fn should_save_block_to_snapshot(cid: Cid) -> bool { // Don't include identity CIDs. // We only include raw and dagcbor, for now. @@ -372,10 +468,6 @@ impl UnorderedChainStream { Err(v) => v.lock().clone(), } } - - pub async fn join_workers(self) -> anyhow::Result<()> { - self.worker_handle.await? - } } /// Stream all blocks that are reachable before the `stateroot_limit` epoch in an unordered fashion. diff --git a/src/rpc/db_api.rs b/src/rpc/db_api.rs new file mode 100644 index 00000000000..27ec52c795f --- /dev/null +++ b/src/rpc/db_api.rs @@ -0,0 +1,15 @@ +// Copyright 2019-2023 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use crate::rpc_api::data_types::RPCState; +use fvm_ipld_blockstore::Blockstore; +use jsonrpc_v2::{Data, Error as JsonRpcError}; + +pub(in crate::rpc) async fn db_gc( + data: Data>, +) -> Result<(), JsonRpcError> { + let (tx, rx) = flume::bounded(1); + data.gc_event_tx.send_async(tx).await?; + rx.recv_async().await??; + Ok(()) +} diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index d960a674368..4ca4125b811 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -5,10 +5,12 @@ mod auth_api; mod beacon_api; mod chain_api; mod common_api; +mod db_api; mod gas_api; mod mpool_api; mod net_api; mod node_api; +mod progress_api; mod rpc_http_handler; mod rpc_util; mod rpc_ws_handler; @@ -19,12 +21,13 @@ mod wallet_api; use std::{net::TcpListener, sync::Arc}; use crate::rpc_api::{ - auth_api::*, beacon_api::*, chain_api::*, common_api::*, data_types::RPCState, gas_api::*, - mpool_api::*, net_api::*, node_api::NODE_STATUS, state_api::*, sync_api::*, wallet_api::*, + auth_api::*, beacon_api::*, chain_api::*, common_api::*, data_types::RPCState, db_api::*, + gas_api::*, mpool_api::*, net_api::*, node_api::NODE_STATUS, progress_api::GET_PROGRESS, + state_api::*, sync_api::*, wallet_api::*, }; use axum::routing::{get, post}; use fvm_ipld_blockstore::Blockstore; -use jsonrpc_v2::{Data, Error as JSONRPCError, Server}; +use jsonrpc_v2::{Data, Error as JSONRPCError, Params, Server}; use tokio::sync::mpsc::Sender; use tracing::info; @@ -36,6 +39,8 @@ use crate::rpc::{ state_api::*, }; +pub type RpcResult = Result; + pub async fn start_rpc( state: Arc>, rpc_endpoint: TcpListener, @@ -127,6 +132,10 @@ where .with_method(NET_INFO, net_api::net_info::) .with_method(NET_CONNECT, net_api::net_connect::) .with_method(NET_DISCONNECT, net_api::net_disconnect::) + // DB API + .with_method(DB_GC, db_api::db_gc::) + // Progress API + .with_method(GET_PROGRESS, progress_api::get_progress) // Node API .with_method(NODE_STATUS, node_api::node_status::) .finish_unwrapped(), diff --git a/src/rpc/progress_api.rs b/src/rpc/progress_api.rs new file mode 100644 index 00000000000..f4c1a4db0a7 --- /dev/null +++ b/src/rpc/progress_api.rs @@ -0,0 +1,24 @@ +// Copyright 2019-2023 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +#![allow(clippy::unused_async)] + +use std::sync::atomic; + +use crate::ipld::{ProgressBarCurrentTotalPair, WALK_SNAPSHOT_PROGRESS_DB_GC}; +use crate::rpc_api::progress_api::{GetProgressParams, GetProgressResult, GetProgressType}; + +use crate::rpc::*; + +pub(in crate::rpc) async fn get_progress( + Params((typ,)): Params, +) -> RpcResult { + let tracker: &ProgressBarCurrentTotalPair = match typ { + GetProgressType::DatabaseGarbageCollection => &WALK_SNAPSHOT_PROGRESS_DB_GC, + }; + + Ok(( + tracker.0.load(atomic::Ordering::Relaxed), + tracker.1.load(atomic::Ordering::Relaxed), + )) +} diff --git a/src/rpc/sync_api.rs b/src/rpc/sync_api.rs index e8f3765aa66..bb054446909 100644 --- a/src/rpc/sync_api.rs +++ b/src/rpc/sync_api.rs @@ -120,6 +120,7 @@ mod tests { .unwrap() }; let start_time = chrono::Utc::now(); + let (gc_event_tx, _) = flume::unbounded(); let state = Arc::new(RPCState { state_manager, @@ -132,6 +133,7 @@ mod tests { start_time, chain_store: cs_for_chain.clone(), beacon, + gc_event_tx, }); (state, network_rx) } diff --git a/src/rpc_api/data_types.rs b/src/rpc_api/data_types.rs index 117efef12ed..a374c38c7f0 100644 --- a/src/rpc_api/data_types.rs +++ b/src/rpc_api/data_types.rs @@ -58,6 +58,7 @@ where pub network_name: String, pub start_time: chrono::DateTime, pub beacon: Arc, + pub gc_event_tx: flume::Sender>>, } #[derive(Debug, Serialize, Deserialize)] diff --git a/src/rpc_api/mod.rs b/src/rpc_api/mod.rs index b44b30cf28b..cd0d667b08b 100644 --- a/src/rpc_api/mod.rs +++ b/src/rpc_api/mod.rs @@ -103,6 +103,11 @@ pub static ACCESS_MAP: Lazy> = Lazy::new(|| { access.insert(net_api::NET_CONNECT, Access::Write); access.insert(net_api::NET_DISCONNECT, Access::Write); + // DB API + access.insert(db_api::DB_GC, Access::Write); + + // Progress API + access.insert(progress_api::GET_PROGRESS, Access::Read); // Node API access.insert(node_api::NODE_STATUS, Access::Read); @@ -294,6 +299,28 @@ pub mod net_api { pub const NET_DISCONNECT: &str = "Filecoin.NetDisconnect"; } +/// DB API +pub mod db_api { + pub const DB_GC: &str = "Filecoin.DatabaseGarbageCollection"; +} + +/// Progress API +pub mod progress_api { + use crate::lotus_json::lotus_json_with_self; + use serde::{Deserialize, Serialize}; + + pub const GET_PROGRESS: &str = "Filecoin.GetProgress"; + pub type GetProgressParams = (GetProgressType,); + pub type GetProgressResult = (u64, u64); + + #[derive(Serialize, Deserialize)] + pub enum GetProgressType { + DatabaseGarbageCollection, + } + + lotus_json_with_self!(GetProgressType); +} + /// Node API pub mod node_api { pub const NODE_STATUS: &str = "Filecoin.NodeStatus"; diff --git a/src/rpc_client/db_ops.rs b/src/rpc_client/db_ops.rs new file mode 100644 index 00000000000..850e75fb8fe --- /dev/null +++ b/src/rpc_client/db_ops.rs @@ -0,0 +1,16 @@ +// Copyright 2019-2023 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use crate::rpc_api::db_api::*; + +use super::{ApiInfo, JsonRpcError, RpcRequest}; + +impl ApiInfo { + pub async fn db_gc(&self) -> Result<(), JsonRpcError> { + self.call(Self::db_gc_req()).await + } + + pub fn db_gc_req() -> RpcRequest<()> { + RpcRequest::new(DB_GC, ()) + } +} diff --git a/src/rpc_client/mod.rs b/src/rpc_client/mod.rs index 586856d3439..bd1cdbc9cad 100644 --- a/src/rpc_client/mod.rs +++ b/src/rpc_client/mod.rs @@ -4,9 +4,11 @@ pub mod auth_ops; pub mod chain_ops; pub mod common_ops; +pub mod db_ops; pub mod mpool_ops; pub mod net_ops; pub mod node_ops; +pub mod progress_ops; pub mod state_ops; pub mod sync_ops; pub mod wallet_ops; diff --git a/src/tool/subcommands/benchmark_cmd.rs b/src/tool/subcommands/benchmark_cmd.rs index c3358ab1faa..d28c3e0be88 100644 --- a/src/tool/subcommands/benchmark_cmd.rs +++ b/src/tool/subcommands/benchmark_cmd.rs @@ -184,7 +184,6 @@ async fn benchmark_unordered_graph_traversal(input: Vec) -> anyhow::Res while let Some(block) = s.try_next().await? { sink.write_all(&block.data).await? } - s.join_workers().await?; Ok(()) } diff --git a/src/utils/db/file_backed_obj.rs b/src/utils/db/file_backed_obj.rs new file mode 100644 index 00000000000..2e58115888b --- /dev/null +++ b/src/utils/db/file_backed_obj.rs @@ -0,0 +1,138 @@ +// Copyright 2019-2023 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use std::{path::PathBuf, str::FromStr}; + +use ahash::HashSet; +use cid::Cid; +use tracing::warn; + +pub struct FileBacked { + inner: T, + path: PathBuf, +} + +impl FileBacked { + /// Gets a borrow of the inner object + pub fn inner(&self) -> &T { + &self.inner + } + + /// Gets a mutable borrow of the inner object + pub fn inner_mut(&mut self) -> &mut T { + &mut self.inner + } + + /// Loads an object from a file and creates a new instance + pub fn load_from_file_or_create T>( + path: PathBuf, + create: F, + ) -> anyhow::Result { + let mut need_sync = false; + let obj = if path.is_file() { + let bytes = std::fs::read(path.as_path())?; + Self { + inner: T::deserialize(&bytes) + .map_err(|e| { + warn!("Error loading object from {}", path.display()); + need_sync = true; + e + }) + .unwrap_or_else(|_| create()), + path, + } + } else { + need_sync = true; + Self { + inner: create(), + path, + } + }; + + if need_sync { + obj.sync()?; + } + + Ok(obj) + } + + /// Syncs the object to the file + pub fn sync(&self) -> anyhow::Result<()> { + let bytes = self.inner().serialize()?; + Ok(std::fs::write(&self.path, bytes)?) + } +} + +/// An object that is backed by a single file on disk +pub trait FileBackedObject: Sized { + /// Serializes into a byte array + fn serialize(&self) -> anyhow::Result>; + + /// De-serializes from a byte array + fn deserialize(bytes: &[u8]) -> anyhow::Result; +} + +impl FileBackedObject for Cid { + fn serialize(&self) -> anyhow::Result> { + Ok(self.to_string().into_bytes()) + } + + fn deserialize(bytes: &[u8]) -> anyhow::Result { + Ok(Cid::from_str(String::from_utf8_lossy(bytes).trim())?) + } +} + +impl FileBackedObject for HashSet { + fn serialize(&self) -> anyhow::Result> { + let serialized = serde_json::to_string(&self)?; + Ok(serialized.into_bytes()) + } + + fn deserialize(bytes: &[u8]) -> anyhow::Result { + let result = serde_json::from_str(String::from_utf8_lossy(bytes).trim()); + Ok(result?) + } +} + +#[derive(Default, serde::Serialize, serde::Deserialize)] +pub struct ChainMeta { + pub estimated_reachable_records: usize, +} + +impl FileBackedObject for ChainMeta { + fn serialize(&self) -> anyhow::Result> { + let serialized = serde_yaml::to_string(&self)?; + Ok(serialized.into_bytes()) + } + + fn deserialize(bytes: &[u8]) -> anyhow::Result { + let result = serde_yaml::from_str(String::from_utf8_lossy(bytes).trim())?; + Ok(result) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use cid::multihash::{self, MultihashDigest}; + use rand::Rng; + use tempfile::TempDir; + + #[test] + fn cid_round_trip() { + let mut bytes = [0; 1024]; + rand::rngs::OsRng.fill(&mut bytes); + let cid = Cid::new_v0(multihash::Code::Sha2_256.digest(bytes.as_slice())).unwrap(); + let serialized = cid.serialize().unwrap(); + let deserialized = Cid::deserialize(&serialized).unwrap(); + assert_eq!(cid, deserialized); + + let dir = TempDir::new().unwrap(); + let file_path = dir.path().join("CID"); + let obj1: FileBacked = + FileBacked::load_from_file_or_create(file_path.clone(), || cid).unwrap(); + let obj2: FileBacked = + FileBacked::load_from_file_or_create(file_path, Default::default).unwrap(); + assert_eq!(obj1.inner(), obj2.inner()); + } +} diff --git a/src/utils/db/mod.rs b/src/utils/db/mod.rs index 97005843ac6..ff41c151b89 100644 --- a/src/utils/db/mod.rs +++ b/src/utils/db/mod.rs @@ -4,7 +4,10 @@ pub mod car_index; pub mod car_stream; pub mod car_util; +pub mod file_backed_obj; +use async_trait::async_trait; +use chrono::Utc; use cid::{ multihash::{Code, MultihashDigest}, Cid, @@ -12,9 +15,15 @@ use cid::{ use fvm_ipld_blockstore::Blockstore; use fvm_ipld_encoding::CborStore; use fvm_ipld_encoding::{to_vec, DAG_CBOR}; - +use human_repr::HumanCount; use serde::ser::Serialize; +use tracing::info; +/// DB key size in bytes for estimating reachable data size. Use parity-db value +/// for simplicity. The actual value for other underlying DB might be slightly +/// different but that is negligible for calculating the total reachable data +/// size +pub const DB_KEY_BYTES: usize = 32; /// Extension methods for inserting and retrieving IPLD data with CIDs pub trait BlockstoreExt: Blockstore { /// Batch put CBOR objects into block store and returns vector of CIDs @@ -65,3 +74,40 @@ pub trait CborStoreExt: CborStore { } impl CborStoreExt for T {} + +/// Extension methods for buffered write with manageable limit of RAM usage +#[async_trait] +pub trait BlockstoreBufferedWriteExt: Blockstore + Sized { + async fn buffered_write( + &self, + rx: flume::Receiver<(Cid, Vec)>, + buffer_capacity_bytes: usize, + ) -> anyhow::Result<()> { + let start = Utc::now(); + let mut total_bytes = 0; + let mut total_entries = 0; + let mut estimated_buffer_bytes = 0; + let mut buffer = vec![]; + while let Ok((key, value)) = rx.recv_async().await { + // Key is stored in 32 bytes in paritydb + estimated_buffer_bytes += DB_KEY_BYTES + value.len(); + total_bytes += DB_KEY_BYTES + value.len(); + total_entries += 1; + buffer.push((key, value)); + if estimated_buffer_bytes >= buffer_capacity_bytes { + self.put_many_keyed(std::mem::take(&mut buffer))?; + estimated_buffer_bytes = 0; + } + } + self.put_many_keyed(buffer)?; + info!( + "Buffered write completed: total entries: {total_entries}, total size: {}, took: {}s", + total_bytes.human_count_bytes(), + (Utc::now() - start).num_seconds() + ); + + Ok(()) + } +} + +impl BlockstoreBufferedWriteExt for T {} diff --git a/src/utils/io/mod.rs b/src/utils/io/mod.rs index 5322ba9c4a7..8953683b6c0 100644 --- a/src/utils/io/mod.rs +++ b/src/utils/io/mod.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0, MIT mod mmap; +pub mod progress_bar; pub mod progress_log; mod tempfile; mod writer_checksum; @@ -13,6 +14,7 @@ use std::{ }; pub use mmap::{EitherMmapOrRandomAccessFile, Mmap}; +pub use progress_bar::{ProgressBar, ProgressBarVisibility}; pub use progress_log::{WithProgress, WithProgressRaw}; pub use writer_checksum::*; diff --git a/src/utils/io/progress_bar.rs b/src/utils/io/progress_bar.rs new file mode 100644 index 00000000000..a8f9823bfc7 --- /dev/null +++ b/src/utils/io/progress_bar.rs @@ -0,0 +1,122 @@ +// Copyright 2019-2023 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT +// JANK(aatifsyed): I don't really understand why this module exists, a lot of +// the code looks wrong +use std::{io::Stdout, str::FromStr, sync::Arc}; + +use is_terminal::IsTerminal; +use parking_lot::{Mutex, RwLock}; +pub use pbr::Units; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +#[derive(Default)] +#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))] +pub enum ProgressBarVisibility { + Always, + #[default] + Auto, + Never, +} + +impl ProgressBarVisibility { + /// Checks if stdout is a TTY + pub fn should_display(&self) -> bool { + matches!( + self, + ProgressBarVisibility::Always + | ProgressBarVisibility::Auto if std::io::stdout().is_terminal() + ) + } +} + +impl FromStr for ProgressBarVisibility { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "auto" => Ok(ProgressBarVisibility::Auto), + "always" => Ok(ProgressBarVisibility::Always), + "never" => Ok(ProgressBarVisibility::Never), + _ => Err(Self::Err::msg( + "Invalid progress bar visibility option. Must be one of [auto, always, never]", + )), + } + } +} + +static PROGRESS_BAR_VISIBILITY: RwLock = + RwLock::new(ProgressBarVisibility::Auto); + +/// Progress bar wrapper, allows suppressing progress bars. +#[derive(Clone)] +pub struct ProgressBar { + inner: Arc>>, + display: bool, +} + +impl ProgressBar { + pub fn new(size: u64) -> Self { + Self { + inner: Arc::new(Mutex::new(pbr::ProgressBar::new(size))), + display: Self::should_display(), + } + } + + pub fn message(&self, message: &str) { + if self.display { + self.inner.lock().message(message); + } + } + + pub fn set_total(&self, i: u64) { + if self.display { + self.inner.lock().total = i; + } + } + + pub fn set(&self, i: u64) -> u64 { + if self.display { + self.inner.lock().set(i) + } else { + 0 + } + } + + pub fn is_finish(&self) -> bool { + self.inner.lock().is_finish + } + + pub fn finish(&self) { + if self.display { + self.inner.lock().finish(); + } + } + + pub fn finish_println(&self, s: &str) { + if self.display { + self.inner.lock().finish_println(s); + } + } + + /// Sets the visibility of progress bars (globally). + pub fn set_progress_bars_visibility(visibility: ProgressBarVisibility) { + *PROGRESS_BAR_VISIBILITY.write() = visibility; + } + + /// Checks the global variable if progress bar should be shown. + fn should_display() -> bool { + match *PROGRESS_BAR_VISIBILITY.read() { + ProgressBarVisibility::Always => true, + ProgressBarVisibility::Auto => std::io::stdout().is_terminal(), + ProgressBarVisibility::Never => false, + } + } +} + +impl Drop for ProgressBar { + fn drop(&mut self) { + self.finish() + } +} diff --git a/src/utils/io/progress_log.rs b/src/utils/io/progress_log.rs index 4ddb928644b..348fc91b12d 100644 --- a/src/utils/io/progress_log.rs +++ b/src/utils/io/progress_log.rs @@ -76,10 +76,10 @@ impl tokio::io::AsyncRead for WithProgress { } impl WithProgress { - pub fn wrap_async_read(message: &str, read: S, _total_items: u64) -> WithProgress { + pub fn wrap_async_read(message: &str, read: S, total_items: u64) -> WithProgress { WithProgress { inner: read, - progress: Progress::new(message), + progress: Progress::new(message, total_items), } } } @@ -87,16 +87,18 @@ impl WithProgress { #[derive(Debug, Clone)] struct Progress { completed_items: u64, + total_items: u64, start: Instant, last_logged: Instant, message: String, } impl Progress { - fn new(message: &str) -> Self { + fn new(message: &str, total_items: u64) -> Self { let now = Instant::now(); Self { completed_items: 0, + total_items, start: now, last_logged: now, message: message.into(), @@ -115,6 +117,12 @@ impl Progress { self.emit_log_if_required(); } + fn set_total(&mut self, value: u64) { + self.total_items = value; + + self.emit_log_if_required(); + } + fn emit_log_if_required(&mut self) { let now = Instant::now(); if (now - self.last_logged) > UPDATE_FREQUENCY { @@ -138,11 +146,11 @@ pub struct WithProgressRaw { impl WithProgressRaw { #[deprecated] - pub fn new(message: &str, _total_items: u64) -> Self { + pub fn new(message: &str, total_items: u64) -> Self { WithProgressRaw { sync: Arc::new(Mutex::new(WithProgress { inner: (), - progress: Progress::new(message), + progress: Progress::new(message, total_items), })), } } @@ -150,4 +158,8 @@ impl WithProgressRaw { pub fn set(&self, value: u64) { self.sync.lock().progress.set(value); } + + pub fn set_total(&self, value: u64) { + self.sync.lock().progress.set_total(value); + } }