diff --git a/crates/bifrost/src/background_appender.rs b/crates/bifrost/src/background_appender.rs index 6619b6fd4..d37a81e99 100644 --- a/crates/bifrost/src/background_appender.rs +++ b/crates/bifrost/src/background_appender.rs @@ -56,8 +56,8 @@ where } /// Start the background appender as a TaskCenter background task. Note that the task will not - /// autmatically react to TaskCenter's shutdown signal, it gives control over the shutdown - /// behaviour to the the owner of [`AppenderHandle`] to drain or drop when appropriate. + /// automatically react to TaskCenter's shutdown signal, it gives control over the shutdown + /// behaviour to the owner of [`AppenderHandle`] to drain or drop when appropriate. pub fn start( self, task_center: TaskCenter, diff --git a/crates/partition-store/src/partition_store_manager.rs b/crates/partition-store/src/partition_store_manager.rs index 9ef2df0e2..7c8df781a 100644 --- a/crates/partition-store/src/partition_store_manager.rs +++ b/crates/partition-store/src/partition_store_manager.rs @@ -10,6 +10,7 @@ use std::collections::BTreeMap; use std::ops::RangeInclusive; +use std::path::Path; use std::sync::Arc; use rocksdb::ExportImportFilesMetaData; @@ -19,12 +20,12 @@ use tracing::{debug, error, info, warn}; use restate_rocksdb::{ CfName, CfPrefixPattern, DbName, DbSpecBuilder, RocksDb, RocksDbManager, RocksError, }; -use restate_types::config::{RocksDbOptions, StorageOptions}; +use restate_types::config::{RocksDbOptions, SnapshotsOptions, StorageOptions}; use restate_types::identifiers::{PartitionId, PartitionKey}; use restate_types::live::{BoxedLiveLoad, LiveLoad}; use crate::cf_options; -use crate::snapshots::LocalPartitionSnapshot; +use crate::snapshots::{LocalPartitionSnapshot, PartitionSnapshotMetadata}; use crate::PartitionStore; use crate::DB; @@ -95,6 +96,47 @@ impl PartitionStoreManager { self.lookup.lock().await.live.values().cloned().collect() } + pub async fn open_or_restore_partition_store( + &self, + partition_id: PartitionId, + partition_key_range: RangeInclusive, + open_mode: OpenMode, + rocksdb_opts: &RocksDbOptions, + snapshots_opts: &SnapshotsOptions, + ) -> Result { + let cf_name = cf_for_partition(partition_id); + let already_exists = self.rocksdb.inner().cf_handle(&cf_name).is_some(); + + if !already_exists + && snapshots_opts + .restore_policy + .unwrap_or_default() + .allows_restore_on_init() + { + if let Some((metadata, snapshot)) = + Self::find_latest_snapshot(&snapshots_opts.snapshots_dir(partition_id)) + { + info!( + ?partition_id, + snapshot_id = %metadata.snapshot_id, + lsn = ?metadata.min_applied_lsn, + "Restoring partition from snapshot" + ); + return self + .restore_partition_store_snapshot( + partition_id, + partition_key_range, + snapshot, + rocksdb_opts, + ) + .await; + } + } + + self.open_partition_store(partition_id, partition_key_range, open_mode, rocksdb_opts) + .await + } + pub async fn open_partition_store( &self, partition_id: PartitionId, @@ -130,6 +172,46 @@ impl PartitionStoreManager { Ok(partition_store) } + pub fn find_latest_snapshot( + dir: &Path, + ) -> Option<(PartitionSnapshotMetadata, LocalPartitionSnapshot)> { + if !dir.exists() || !dir.is_dir() { + return None; + } + + let mut snapshots: Vec<_> = std::fs::read_dir(dir) + .ok()? + .filter_map(|entry| entry.ok()) + .map(|entry| entry.file_name().into_string().ok()) + .collect::>>()?; + + snapshots.sort_by(|a, b| b.cmp(a)); + let latest_snapshot = snapshots.first(); + + if let Some(latest_snapshot) = latest_snapshot { + let metadata_path = dir.join(latest_snapshot).join("metadata.json"); + if metadata_path.exists() { + let metadata = std::fs::read_to_string(metadata_path).ok()?; + let metadata: PartitionSnapshotMetadata = serde_json::from_str(&metadata).ok()?; + + debug!( + location = ?latest_snapshot, + "Found partition snapshot, going to bootstrap store from it", + ); + let snapshot = LocalPartitionSnapshot { + base_dir: latest_snapshot.into(), + min_applied_lsn: metadata.min_applied_lsn, + db_comparator_name: metadata.db_comparator_name.clone(), + files: metadata.files.clone(), + }; + + return Some((metadata, snapshot)); + } + } + + None + } + /// Imports a partition snapshot and opens it as a partition store. /// The database must not have an existing column family for the partition id; /// it will be created based on the supplied snapshot. diff --git a/crates/types/src/config/worker.rs b/crates/types/src/config/worker.rs index d8703491c..1057e06dc 100644 --- a/crates/types/src/config/worker.rs +++ b/crates/types/src/config/worker.rs @@ -8,11 +8,12 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use serde::{Deserialize, Serialize}; -use serde_with::serde_as; use std::num::{NonZeroU16, NonZeroUsize}; use std::path::PathBuf; use std::time::Duration; + +use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, with_prefix}; use tracing::warn; use restate_serde_util::NonZeroByteCount; @@ -62,9 +63,13 @@ pub struct WorkerOptions { max_command_batch_size: NonZeroUsize, #[serde(flatten)] + #[serde(deserialize_with = "prefix_snapshot::deserialize")] + #[serde(serialize_with = "prefix_snapshot::serialize")] pub snapshots: SnapshotsOptions, } +with_prefix!(prefix_snapshot "snapshot-"); + impl WorkerOptions { pub fn internal_queue_length(&self) -> usize { self.internal_queue_length.into() @@ -352,10 +357,16 @@ impl Default for StorageOptions { #[serde_as] #[derive(Default, Debug, Clone, Serialize, Deserialize, derive_builder::Builder)] #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] -#[cfg_attr(feature = "schemars", schemars(rename = "SnapshotsOptions", default))] +#[cfg_attr( + feature = "schemars", + schemars(rename = "PartitionStoreOptions", default) +)] #[serde(rename_all = "kebab-case")] #[builder(default)] -pub struct SnapshotsOptions {} +pub struct SnapshotsOptions { + /// ## Snapshot restore policy + pub restore_policy: Option, +} impl SnapshotsOptions { pub fn snapshots_base_dir(&self) -> PathBuf { @@ -366,3 +377,24 @@ impl SnapshotsOptions { super::data_dir("db-snapshots").join(partition_id.to_string()) } } + +#[derive(Debug, Clone, Copy, Hash, Default, Serialize, Deserialize)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[serde(rename_all = "kebab-case")] +pub enum SnapshotRestorePolicy { + /// ## Never + /// Do not attempt to restore from a snapshot, always preferring to rebuild worker state from the log. + #[default] + Never, + + /// ## Initialize from snapshot if available. + /// Attempt to restore the most recent available snapshot only when the store is first created. + #[serde(rename = "on-init")] + InitializeFromSnapshot, +} + +impl SnapshotRestorePolicy { + pub fn allows_restore_on_init(&self) -> bool { + matches!(self, SnapshotRestorePolicy::InitializeFromSnapshot) + } +} diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index bb87645e8..c54a92290 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -780,11 +780,12 @@ impl PartitionProcessorManager { let key_range = key_range.clone(); move || async move { let partition_store = storage_manager - .open_partition_store( + .open_or_restore_partition_store( partition_id, key_range, OpenMode::CreateIfMissing, &options.storage.rocksdb, + &options.snapshots, ) .await?;