Skip to content

Commit

Permalink
Add a Partition Store snapshot restore policy
Browse files Browse the repository at this point in the history
stack-info: PR: #1999, branch: pcholakov/stack/2
  • Loading branch information
pcholakov committed Oct 4, 2024
1 parent 8df89b3 commit 66c9f49
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 8 deletions.
4 changes: 2 additions & 2 deletions crates/bifrost/src/background_appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ where
}

/// Start the background appender as a TaskCenter background task. Note that the task will not
/// autmatically react to TaskCenter's shutdown signal, it gives control over the shutdown
/// behaviour to the the owner of [`AppenderHandle`] to drain or drop when appropriate.
/// automatically react to TaskCenter's shutdown signal, it gives control over the shutdown
/// behaviour to the owner of [`AppenderHandle`] to drain or drop when appropriate.
pub fn start(
self,
task_center: TaskCenter,
Expand Down
86 changes: 84 additions & 2 deletions crates/partition-store/src/partition_store_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

use std::collections::BTreeMap;
use std::ops::RangeInclusive;
use std::path::Path;
use std::sync::Arc;

use rocksdb::ExportImportFilesMetaData;
Expand All @@ -19,12 +20,12 @@ use tracing::{debug, error, info, warn};
use restate_rocksdb::{
CfName, CfPrefixPattern, DbName, DbSpecBuilder, RocksDb, RocksDbManager, RocksError,
};
use restate_types::config::{RocksDbOptions, StorageOptions};
use restate_types::config::{RocksDbOptions, SnapshotsOptions, StorageOptions};
use restate_types::identifiers::{PartitionId, PartitionKey};
use restate_types::live::{BoxedLiveLoad, LiveLoad};

use crate::cf_options;
use crate::snapshots::LocalPartitionSnapshot;
use crate::snapshots::{LocalPartitionSnapshot, PartitionSnapshotMetadata};
use crate::PartitionStore;
use crate::DB;

Expand Down Expand Up @@ -95,6 +96,47 @@ impl PartitionStoreManager {
self.lookup.lock().await.live.values().cloned().collect()
}

pub async fn open_or_restore_partition_store(
&self,
partition_id: PartitionId,
partition_key_range: RangeInclusive<PartitionKey>,
open_mode: OpenMode,
rocksdb_opts: &RocksDbOptions,
snapshots_opts: &SnapshotsOptions,
) -> Result<PartitionStore, RocksError> {
let cf_name = cf_for_partition(partition_id);
let already_exists = self.rocksdb.inner().cf_handle(&cf_name).is_some();

if !already_exists
&& snapshots_opts
.restore_policy
.unwrap_or_default()
.allows_restore_on_init()
{
if let Some((metadata, snapshot)) =
Self::find_latest_snapshot(&snapshots_opts.snapshots_dir(partition_id))
{
info!(
?partition_id,
snapshot_id = %metadata.snapshot_id,
lsn = ?metadata.min_applied_lsn,
"Restoring partition from snapshot"
);
return self
.restore_partition_store_snapshot(
partition_id,
partition_key_range,
snapshot,
rocksdb_opts,
)
.await;
}
}

self.open_partition_store(partition_id, partition_key_range, open_mode, rocksdb_opts)
.await
}

pub async fn open_partition_store(
&self,
partition_id: PartitionId,
Expand Down Expand Up @@ -130,6 +172,46 @@ impl PartitionStoreManager {
Ok(partition_store)
}

pub fn find_latest_snapshot(
dir: &Path,
) -> Option<(PartitionSnapshotMetadata, LocalPartitionSnapshot)> {
if !dir.exists() || !dir.is_dir() {
return None;
}

let mut snapshots: Vec<_> = std::fs::read_dir(dir)
.ok()?
.filter_map(|entry| entry.ok())
.map(|entry| entry.file_name().into_string().ok())
.collect::<Option<Vec<_>>>()?;

snapshots.sort_by(|a, b| b.cmp(a));
let latest_snapshot = snapshots.first();

if let Some(latest_snapshot) = latest_snapshot {
let metadata_path = dir.join(latest_snapshot).join("metadata.json");
if metadata_path.exists() {
let metadata = std::fs::read_to_string(metadata_path).ok()?;
let metadata: PartitionSnapshotMetadata = serde_json::from_str(&metadata).ok()?;

debug!(
location = ?latest_snapshot,
"Found partition snapshot, going to bootstrap store from it",
);
let snapshot = LocalPartitionSnapshot {
base_dir: latest_snapshot.into(),
min_applied_lsn: metadata.min_applied_lsn,
db_comparator_name: metadata.db_comparator_name.clone(),
files: metadata.files.clone(),
};

return Some((metadata, snapshot));
}
}

None
}

/// Imports a partition snapshot and opens it as a partition store.
/// The database must not have an existing column family for the partition id;
/// it will be created based on the supplied snapshot.
Expand Down
35 changes: 32 additions & 3 deletions crates/types/src/config/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use std::num::{NonZeroU16, NonZeroUsize};
use std::path::PathBuf;
use std::time::Duration;

use serde::{Deserialize, Serialize};
use serde_with::{serde_as, with_prefix};
use tracing::warn;

use restate_serde_util::NonZeroByteCount;
Expand Down Expand Up @@ -62,9 +63,13 @@ pub struct WorkerOptions {
max_command_batch_size: NonZeroUsize,

#[serde(flatten)]
#[serde(deserialize_with = "prefix_snapshot::deserialize")]
#[serde(serialize_with = "prefix_snapshot::serialize")]
pub snapshots: SnapshotsOptions,
}

with_prefix!(prefix_snapshot "snapshot-");

impl WorkerOptions {
pub fn internal_queue_length(&self) -> usize {
self.internal_queue_length.into()
Expand Down Expand Up @@ -355,7 +360,10 @@ impl Default for StorageOptions {
#[cfg_attr(feature = "schemars", schemars(rename = "SnapshotsOptions", default))]
#[serde(rename_all = "kebab-case")]
#[builder(default)]
pub struct SnapshotsOptions {}
pub struct SnapshotsOptions {
/// ## Snapshot restore policy
pub restore_policy: Option<SnapshotRestorePolicy>,
}

impl SnapshotsOptions {
pub fn snapshots_base_dir(&self) -> PathBuf {
Expand All @@ -366,3 +374,24 @@ impl SnapshotsOptions {
super::data_dir("db-snapshots").join(partition_id.to_string())
}
}

#[derive(Debug, Clone, Copy, Hash, Default, Serialize, Deserialize)]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
#[serde(rename_all = "kebab-case")]
pub enum SnapshotRestorePolicy {
/// ## Never
/// Do not attempt to restore from a snapshot, always preferring to rebuild worker state from the log.
#[default]
Never,

/// ## Initialize from snapshot if available.
/// Attempt to restore the most recent available snapshot only when the store is first created.
#[serde(rename = "on-init")]
InitializeFromSnapshot,
}

impl SnapshotRestorePolicy {
pub fn allows_restore_on_init(&self) -> bool {
matches!(self, SnapshotRestorePolicy::InitializeFromSnapshot)
}
}
3 changes: 2 additions & 1 deletion crates/worker/src/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -781,11 +781,12 @@ impl<T: TransportConnect> PartitionProcessorManager<T> {
let key_range = key_range.clone();
move || async move {
let partition_store = storage_manager
.open_partition_store(
.open_or_restore_partition_store(
partition_id,
key_range,
OpenMode::CreateIfMissing,
&options.storage.rocksdb,
&options.snapshots,
)
.await?;

Expand Down

0 comments on commit 66c9f49

Please sign in to comment.