Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Partition Store snapshot restore policy #1999

Draft
wants to merge 1 commit into
base: pcholakov/stack/1
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/bifrost/src/background_appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ where
}

/// Start the background appender as a TaskCenter background task. Note that the task will not
/// autmatically react to TaskCenter's shutdown signal, it gives control over the shutdown
/// behaviour to the the owner of [`AppenderHandle`] to drain or drop when appropriate.
/// automatically react to TaskCenter's shutdown signal, it gives control over the shutdown
/// behaviour to the owner of [`AppenderHandle`] to drain or drop when appropriate.
pub fn start(
self,
task_center: TaskCenter,
Expand Down
86 changes: 84 additions & 2 deletions crates/partition-store/src/partition_store_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

use std::collections::BTreeMap;
use std::ops::RangeInclusive;
use std::path::Path;
use std::sync::Arc;

use rocksdb::ExportImportFilesMetaData;
Expand All @@ -19,12 +20,12 @@ use tracing::{debug, error, info, warn};
use restate_rocksdb::{
CfName, CfPrefixPattern, DbName, DbSpecBuilder, RocksDb, RocksDbManager, RocksError,
};
use restate_types::config::{RocksDbOptions, StorageOptions};
use restate_types::config::{RocksDbOptions, SnapshotsOptions, StorageOptions};
use restate_types::identifiers::{PartitionId, PartitionKey};
use restate_types::live::{BoxedLiveLoad, LiveLoad};

use crate::cf_options;
use crate::snapshots::LocalPartitionSnapshot;
use crate::snapshots::{LocalPartitionSnapshot, PartitionSnapshotMetadata};
use crate::PartitionStore;
use crate::DB;

Expand Down Expand Up @@ -95,6 +96,47 @@ impl PartitionStoreManager {
self.lookup.lock().await.live.values().cloned().collect()
}

pub async fn open_or_restore_partition_store(
&self,
partition_id: PartitionId,
partition_key_range: RangeInclusive<PartitionKey>,
open_mode: OpenMode,
rocksdb_opts: &RocksDbOptions,
snapshots_opts: &SnapshotsOptions,
) -> Result<PartitionStore, RocksError> {
let cf_name = cf_for_partition(partition_id);
let already_exists = self.rocksdb.inner().cf_handle(&cf_name).is_some();

if !already_exists
&& snapshots_opts
.restore_policy
.unwrap_or_default()
.allows_restore_on_init()
{
if let Some((metadata, snapshot)) =
Self::find_latest_snapshot(&snapshots_opts.snapshots_dir(partition_id))
{
info!(
?partition_id,
snapshot_id = %metadata.snapshot_id,
lsn = ?metadata.min_applied_lsn,
"Restoring partition from snapshot"
);
return self
.restore_partition_store_snapshot(
partition_id,
partition_key_range,
snapshot,
rocksdb_opts,
)
.await;
}
}

self.open_partition_store(partition_id, partition_key_range, open_mode, rocksdb_opts)
.await
}

pub async fn open_partition_store(
&self,
partition_id: PartitionId,
Expand Down Expand Up @@ -130,6 +172,46 @@ impl PartitionStoreManager {
Ok(partition_store)
}

pub fn find_latest_snapshot(
dir: &Path,
) -> Option<(PartitionSnapshotMetadata, LocalPartitionSnapshot)> {
if !dir.exists() || !dir.is_dir() {
return None;
}

let mut snapshots: Vec<_> = std::fs::read_dir(dir)
.ok()?
.filter_map(|entry| entry.ok())
.map(|entry| entry.file_name().into_string().ok())
.collect::<Option<Vec<_>>>()?;

snapshots.sort_by(|a, b| b.cmp(a));
let latest_snapshot = snapshots.first();

if let Some(latest_snapshot) = latest_snapshot {
let metadata_path = dir.join(latest_snapshot).join("metadata.json");
if metadata_path.exists() {
let metadata = std::fs::read_to_string(metadata_path).ok()?;
let metadata: PartitionSnapshotMetadata = serde_json::from_str(&metadata).ok()?;

debug!(
location = ?latest_snapshot,
"Found partition snapshot, going to bootstrap store from it",
);
let snapshot = LocalPartitionSnapshot {
base_dir: latest_snapshot.into(),
min_applied_lsn: metadata.min_applied_lsn,
db_comparator_name: metadata.db_comparator_name.clone(),
files: metadata.files.clone(),
};

return Some((metadata, snapshot));
}
}

None
}

/// Imports a partition snapshot and opens it as a partition store.
/// The database must not have an existing column family for the partition id;
/// it will be created based on the supplied snapshot.
Expand Down
35 changes: 32 additions & 3 deletions crates/types/src/config/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use std::num::{NonZeroU16, NonZeroUsize};
use std::path::PathBuf;
use std::time::Duration;

use serde::{Deserialize, Serialize};
use serde_with::{serde_as, with_prefix};
use tracing::warn;

use restate_serde_util::NonZeroByteCount;
Expand Down Expand Up @@ -62,9 +63,13 @@ pub struct WorkerOptions {
max_command_batch_size: NonZeroUsize,

#[serde(flatten)]
#[serde(deserialize_with = "prefix_snapshot::deserialize")]
#[serde(serialize_with = "prefix_snapshot::serialize")]
pub snapshots: SnapshotsOptions,
}

with_prefix!(prefix_snapshot "snapshot-");

impl WorkerOptions {
pub fn internal_queue_length(&self) -> usize {
self.internal_queue_length.into()
Expand Down Expand Up @@ -355,7 +360,10 @@ impl Default for StorageOptions {
#[cfg_attr(feature = "schemars", schemars(rename = "SnapshotsOptions", default))]
#[serde(rename_all = "kebab-case")]
#[builder(default)]
pub struct SnapshotsOptions {}
pub struct SnapshotsOptions {
/// ## Snapshot restore policy
pub restore_policy: Option<SnapshotRestorePolicy>,
}

impl SnapshotsOptions {
pub fn snapshots_base_dir(&self) -> PathBuf {
Expand All @@ -366,3 +374,24 @@ impl SnapshotsOptions {
super::data_dir("db-snapshots").join(partition_id.to_string())
}
}

#[derive(Debug, Clone, Copy, Hash, Default, Serialize, Deserialize)]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
#[serde(rename_all = "kebab-case")]
pub enum SnapshotRestorePolicy {
/// ## Never
/// Do not attempt to restore from a snapshot, always preferring to rebuild worker state from the log.
#[default]
Never,

/// ## Initialize from snapshot if available.
/// Attempt to restore the most recent available snapshot only when the store is first created.
#[serde(rename = "on-init")]
InitializeFromSnapshot,
}

impl SnapshotRestorePolicy {
pub fn allows_restore_on_init(&self) -> bool {
matches!(self, SnapshotRestorePolicy::InitializeFromSnapshot)
}
}
3 changes: 2 additions & 1 deletion crates/worker/src/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -781,11 +781,12 @@ impl<T: TransportConnect> PartitionProcessorManager<T> {
let key_range = key_range.clone();
move || async move {
let partition_store = storage_manager
.open_partition_store(
.open_or_restore_partition_store(
partition_id,
key_range,
OpenMode::CreateIfMissing,
&options.storage.rocksdb,
&options.snapshots,
)
.await?;

Expand Down
Loading