Skip to content

Commit

Permalink
Introduce partition store snapshots (#1954)
Browse files Browse the repository at this point in the history
This change adds the foundational support to export and import partition store
database snapshots.

Closes: #1892
  • Loading branch information
pcholakov authored Sep 26, 2024
1 parent a121f49 commit 4d6717c
Show file tree
Hide file tree
Showing 16 changed files with 502 additions and 29 deletions.
7 changes: 5 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ reqwest = { version = "0.12.5", default-features = false, features = [
"stream",
] }
rlimit = { version = "0.10.1" }
rocksdb = { version = "0.29.0", package = "rust-rocksdb", features = ["multi-threaded-cf"], git = "https://github.com/restatedev/rust-rocksdb", rev = "c6a279a40416cb47bbf576ffb190523d55818073" }
rocksdb = { version = "0.29.0", package = "rust-rocksdb", features = ["multi-threaded-cf"], git = "https://github.com/restatedev/rust-rocksdb", rev = "8f832b7e742e0d826fb9fed05a62e4bd747969bf" }
rustls = { version = "0.23.11", default-features = false, features = ["ring"] }
schemars = { version = "0.8", features = ["bytes", "enumset"] }
serde = { version = "1.0", features = ["derive"] }
Expand Down
15 changes: 9 additions & 6 deletions crates/partition-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,6 @@ default = []
options_schema = ["dep:schemars"]

[dependencies]
restate-core = { workspace = true }
restate-errors = { workspace = true }
restate-rocksdb = { workspace = true }
restate-storage-api = { workspace = true }
restate-types = { workspace = true }

anyhow = { workspace = true }
bytes = { workspace = true }
bytestring = { workspace = true }
Expand All @@ -27,12 +21,20 @@ derive_more = { workspace = true }
enum-map = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }
humantime = { workspace = true }
once_cell = { workspace = true }
paste = { workspace = true }
prost = { workspace = true }
restate-core = { workspace = true }
restate-errors = { workspace = true }
restate-rocksdb = { workspace = true }
restate-storage-api = { workspace = true }
restate-types = { workspace = true }
rocksdb = { workspace = true }
schemars = { workspace = true, optional = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true, features = ["hex"] }
static_assertions = { workspace = true }
strum = { workspace = true }
sync_wrapper = { workspace = true }
Expand All @@ -51,6 +53,7 @@ criterion = { workspace = true, features = ["async_tokio"] }
googletest = { workspace = true }
num-bigint = "0.4"
rand = { workspace = true }
serde_json = { workspace = true }
tempfile = { workspace = true }

[[bench]]
Expand Down
1 change: 1 addition & 0 deletions crates/partition-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod partition_store_manager;
pub mod promise_table;
pub mod scan;
pub mod service_status_table;
pub mod snapshots;
pub mod state_table;
pub mod timer_table;

Expand Down
35 changes: 35 additions & 0 deletions crates/partition-store/src/partition_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// by the Apache License, Version 2.0.

use std::ops::RangeInclusive;
use std::path::PathBuf;
use std::slice;
use std::sync::Arc;

Expand All @@ -18,6 +19,7 @@ use codederror::CodedError;
use restate_rocksdb::CfName;
use restate_rocksdb::IoMode;
use restate_rocksdb::Priority;
use restate_storage_api::fsm_table::ReadOnlyFsmTable;
use restate_types::config::Configuration;
use rocksdb::DBCompressionType;
use rocksdb::DBPinnableSlice;
Expand All @@ -39,6 +41,7 @@ use crate::keys::KeyKind;
use crate::keys::TableKey;
use crate::scan::PhysicalScan;
use crate::scan::TableScan;
use crate::snapshots::LocalPartitionSnapshot;

pub type DB = rocksdb::DB;

Expand Down Expand Up @@ -417,6 +420,38 @@ impl PartitionStore {
.map_err(|err| StorageError::Generic(err.into()))?;
Ok(())
}

/// Creates a snapshot of the partition in the given directory, which must not exist prior to
/// the export. The snapshot is atomic and contains, at a minimum, the reported applied LSN.
/// Additional log records may have been applied between when the LSN was read, and when the
/// snapshot was actually created. The actual snapshot applied LSN will always be equal to, or
/// greater than, the reported applied LSN.
///
/// *NB:* Creating a snapshot causes an implicit flush of the column family!
///
/// See [rocksdb::checkpoint::Checkpoint::export_column_family] for additional implementation details.
pub async fn create_snapshot(
&mut self,
snapshot_dir: PathBuf,
) -> Result<LocalPartitionSnapshot> {
let applied_lsn = self
.get_applied_lsn()
.await?
.ok_or(StorageError::DataIntegrityError)?;

let metadata = self
.rocksdb
.export_cf(self.data_cf_name.clone(), snapshot_dir.clone())
.await
.map_err(|err| StorageError::Generic(err.into()))?;

Ok(LocalPartitionSnapshot {
base_dir: snapshot_dir,
files: metadata.get_files(),
db_comparator_name: metadata.get_db_comparator_name(),
min_applied_lsn: applied_lsn,
})
}
}

fn find_cf_handle<'a>(
Expand Down
90 changes: 81 additions & 9 deletions crates/partition-store/src/partition_store_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,19 @@ use std::collections::BTreeMap;
use std::ops::RangeInclusive;
use std::sync::Arc;

use restate_types::live::BoxedLiveLoad;
use rocksdb::ExportImportFilesMetaData;
use tokio::sync::Mutex;
use tracing::debug;
use tracing::{debug, error, info, warn};

use restate_rocksdb::{
CfName, CfPrefixPattern, DbName, DbSpecBuilder, RocksDb, RocksDbManager, RocksError,
};
use restate_types::config::RocksDbOptions;
use restate_types::config::StorageOptions;
use restate_types::identifiers::PartitionId;
use restate_types::identifiers::PartitionKey;
use restate_types::live::LiveLoad;
use restate_types::config::{RocksDbOptions, StorageOptions};
use restate_types::identifiers::{PartitionId, PartitionKey};
use restate_types::live::{BoxedLiveLoad, LiveLoad};

use crate::cf_options;
use crate::snapshots::LocalPartitionSnapshot;
use crate::PartitionStore;
use crate::DB;

Expand Down Expand Up @@ -56,7 +55,7 @@ impl PartitionStoreManager {
mut storage_opts: impl LiveLoad<StorageOptions> + Send + 'static,
updateable_opts: BoxedLiveLoad<RocksDbOptions>,
initial_partition_set: &[(PartitionId, RangeInclusive<PartitionKey>)],
) -> std::result::Result<Self, RocksError> {
) -> Result<Self, RocksError> {
let options = storage_opts.live_load();

let per_partition_memory_budget = options.rocksdb_memory_budget()
Expand Down Expand Up @@ -102,7 +101,7 @@ impl PartitionStoreManager {
partition_key_range: RangeInclusive<PartitionKey>,
open_mode: OpenMode,
opts: &RocksDbOptions,
) -> std::result::Result<PartitionStore, RocksError> {
) -> Result<PartitionStore, RocksError> {
let mut guard = self.lookup.lock().await;
if let Some(store) = guard.live.get(&partition_id) {
return Ok(store.clone());
Expand Down Expand Up @@ -130,6 +129,79 @@ impl PartitionStoreManager {

Ok(partition_store)
}

/// Imports a partition snapshot and opens it as a partition store.
/// The database must not have an existing column family for the partition id;
/// it will be created based on the supplied snapshot.
pub async fn restore_partition_store_snapshot(
&self,
partition_id: PartitionId,
partition_key_range: RangeInclusive<PartitionKey>,
snapshot: LocalPartitionSnapshot,
opts: &RocksDbOptions,
) -> Result<PartitionStore, RocksError> {
let mut guard = self.lookup.lock().await;
if guard.live.contains_key(&partition_id) {
warn!(
?partition_id,
?snapshot,
"The partition store is already open, refusing to import snapshot"
);
return Err(RocksError::AlreadyOpen);
}

let cf_name = cf_for_partition(partition_id);
let cf_exists = self.rocksdb.inner().cf_handle(&cf_name).is_some();
if cf_exists {
warn!(
?partition_id,
?cf_name,
?snapshot,
"The column family for partition already exists in the database, cannot import snapshot"
);
return Err(RocksError::ColumnFamilyExists);
}

let mut import_metadata = ExportImportFilesMetaData::default();
import_metadata.set_db_comparator_name(snapshot.db_comparator_name.as_str());
import_metadata.set_files(&snapshot.files);

info!(
?partition_id,
min_applied_lsn = ?snapshot.min_applied_lsn,
"Initializing partition store from snapshot"
);

if let Err(e) = self
.rocksdb
.import_cf(cf_name.clone(), opts, import_metadata)
.await
{
error!(?partition_id, "Failed to import snapshot");
return Err(e);
}

assert!(self.rocksdb.inner().cf_handle(&cf_name).is_some());
let partition_store = PartitionStore::new(
self.raw_db.clone(),
self.rocksdb.clone(),
cf_name,
partition_id,
partition_key_range,
);
guard.live.insert(partition_id, partition_store.clone());

Ok(partition_store)
}

pub async fn drop_partition(&self, partition_id: PartitionId) {
let mut guard = self.lookup.lock().await;
self.raw_db
.drop_cf(&cf_for_partition(partition_id))
.unwrap();

guard.live.remove(&partition_id);
}
}

fn cf_for_partition(partition_id: PartitionId) -> CfName {
Expand Down
101 changes: 101 additions & 0 deletions crates/partition-store/src/snapshots.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use std::ops::RangeInclusive;
use std::path::PathBuf;

use rocksdb::LiveFile;
use serde::{Deserialize, Serialize};
use serde_with::hex::Hex;
use serde_with::{serde_as, DeserializeAs, SerializeAs};

use restate_types::identifiers::{PartitionId, PartitionKey, SnapshotId};
use restate_types::logs::Lsn;

#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize, Deserialize)]
pub enum SnapshotFormatVersion {
#[default]
V1,
}

/// A partition store snapshot.
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
pub struct PartitionSnapshotMetadata {
pub version: SnapshotFormatVersion,

/// Restate cluster name which produced the snapshot.
pub cluster_name: String,

/// Restate partition id.
pub partition_id: PartitionId,

/// Node that produced this snapshot.
pub node_name: String,

/// Local node time when the snapshot was created.
#[serde(with = "serde_with::As::<serde_with::DisplayFromStr>")]
pub created_at: humantime::Timestamp,

/// Snapshot id.
pub snapshot_id: SnapshotId,

/// The partition key range that the partition processor which generated this snapshot was
/// responsible for, at the time the snapshot was generated.
pub key_range: RangeInclusive<PartitionKey>,

/// The minimum LSN guaranteed to be applied in this snapshot. The actual
/// LSN may be >= [minimum_lsn].
pub min_applied_lsn: Lsn,

/// The RocksDB comparator name used by the partition processor which generated this snapshot.
pub db_comparator_name: String,

/// The RocksDB SST files comprising the snapshot.
#[serde_as(as = "Vec<SnapshotSstFile>")]
pub files: Vec<LiveFile>,
}

/// A locally-stored partition snapshot.
#[derive(Debug)]
pub struct LocalPartitionSnapshot {
pub base_dir: PathBuf,
pub min_applied_lsn: Lsn,
pub db_comparator_name: String,
pub files: Vec<LiveFile>,
}

/// RocksDB SST file that is part of a snapshot. Serialization wrapper around [LiveFile].
#[serde_as]
#[derive(Serialize, Deserialize)]
#[serde(remote = "LiveFile")]
pub struct SnapshotSstFile {
pub column_family_name: String,
pub name: String,
pub directory: String,
pub size: usize,
pub level: i32,
#[serde_as(as = "Option<Hex>")]
pub start_key: Option<Vec<u8>>,
#[serde_as(as = "Option<Hex>")]
pub end_key: Option<Vec<u8>>,
pub smallest_seqno: u64,
pub largest_seqno: u64,
pub num_entries: u64,
pub num_deletions: u64,
}

impl SerializeAs<LiveFile> for SnapshotSstFile {
fn serialize_as<S>(value: &LiveFile, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
SnapshotSstFile::serialize(value, serializer)
}
}

impl<'de> DeserializeAs<'de, LiveFile> for SnapshotSstFile {
fn deserialize_as<D>(deserializer: D) -> Result<LiveFile, D::Error>
where
D: serde::Deserializer<'de>,
{
SnapshotSstFile::deserialize(deserializer)
}
}
Loading

0 comments on commit 4d6717c

Please sign in to comment.