Skip to content

Commit

Permalink
Introduce partition store snapshots
Browse files Browse the repository at this point in the history
This commit adds the foundational support to export and import partition store
snapshots.
  • Loading branch information
pcholakov committed Sep 21, 2024
1 parent 7e56a65 commit 4959252
Show file tree
Hide file tree
Showing 16 changed files with 502 additions and 29 deletions.
7 changes: 5 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ reqwest = { version = "0.12.5", default-features = false, features = [
"stream",
] }
rlimit = { version = "0.10.1" }
rocksdb = { version = "0.29.0", package = "rust-rocksdb", features = ["multi-threaded-cf"], git = "https://github.com/restatedev/rust-rocksdb", rev = "c6a279a40416cb47bbf576ffb190523d55818073" }
rocksdb = { version = "0.29.0", package = "rust-rocksdb", features = ["multi-threaded-cf"], git = "https://github.com/restatedev/rust-rocksdb", rev = "8f832b7e742e0d826fb9fed05a62e4bd747969bf" }
rustls = { version = "0.23.11", default-features = false, features = ["ring"] }
schemars = { version = "0.8", features = ["bytes", "enumset"] }
serde = { version = "1.0", features = ["derive"] }
Expand Down
15 changes: 9 additions & 6 deletions crates/partition-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,6 @@ default = []
options_schema = ["dep:schemars"]

[dependencies]
restate-core = { workspace = true }
restate-errors = { workspace = true }
restate-rocksdb = { workspace = true }
restate-storage-api = { workspace = true }
restate-types = { workspace = true }

anyhow = { workspace = true }
bytes = { workspace = true }
bytestring = { workspace = true }
Expand All @@ -27,12 +21,20 @@ derive_more = { workspace = true }
enum-map = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }
humantime = { workspace = true }
once_cell = { workspace = true }
paste = { workspace = true }
prost = { workspace = true }
restate-core = { workspace = true }
restate-errors = { workspace = true }
restate-rocksdb = { workspace = true }
restate-storage-api = { workspace = true }
restate-types = { workspace = true }
rocksdb = { workspace = true }
schemars = { workspace = true, optional = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true, features = ["base64"] }
static_assertions = { workspace = true }
strum = { workspace = true }
sync_wrapper = { workspace = true }
Expand All @@ -51,6 +53,7 @@ criterion = { workspace = true, features = ["async_tokio"] }
googletest = { workspace = true }
num-bigint = "0.4"
rand = { workspace = true }
serde_json = { workspace = true }
tempfile = { workspace = true }

[[bench]]
Expand Down
1 change: 1 addition & 0 deletions crates/partition-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod partition_store_manager;
pub mod promise_table;
pub mod scan;
pub mod service_status_table;
pub mod snapshots;
pub mod state_table;
pub mod timer_table;

Expand Down
35 changes: 35 additions & 0 deletions crates/partition-store/src/partition_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// by the Apache License, Version 2.0.

use std::ops::RangeInclusive;
use std::path::PathBuf;
use std::slice;
use std::sync::Arc;

Expand All @@ -18,6 +19,7 @@ use codederror::CodedError;
use restate_rocksdb::CfName;
use restate_rocksdb::IoMode;
use restate_rocksdb::Priority;
use restate_storage_api::fsm_table::ReadOnlyFsmTable;
use restate_types::config::Configuration;
use rocksdb::DBCompressionType;
use rocksdb::DBPinnableSlice;
Expand All @@ -39,6 +41,7 @@ use crate::keys::KeyKind;
use crate::keys::TableKey;
use crate::scan::PhysicalScan;
use crate::scan::TableScan;
use crate::snapshots::LocalPartitionSnapshot;

pub type DB = rocksdb::DB;

Expand Down Expand Up @@ -417,6 +420,38 @@ impl PartitionStore {
.map_err(|err| StorageError::Generic(err.into()))?;
Ok(())
}

/// Creates a snapshot of the partition in the given directory, which must not exist prior to
/// the export. The snapshot is atomic and contains, at a minimum, the reported applied LSN.
/// Additional log records may have been applied between when the LSN was read, and when the
/// snapshot was actually created. The actual snapshot applied LSN will always be equal to, or
/// greater than, the reported applied LSN.
///
/// *NB:* Creating a snapshot causes an implicit flush of the column family!
///
/// See [rocksdb::checkpoint::Checkpoint::export_column_family] for additional implementation details.
pub async fn create_snapshot(
&mut self,
snapshot_dir: PathBuf,
) -> Result<LocalPartitionSnapshot> {
let applied_lsn = self
.get_applied_lsn()
.await?
.ok_or(StorageError::DataIntegrityError)?;

let metadata = self
.rocksdb
.export_cf(self.data_cf_name.clone(), snapshot_dir.clone())
.await
.map_err(|err| StorageError::Generic(err.into()))?;

Ok(LocalPartitionSnapshot {
base_dir: snapshot_dir,
files: metadata.get_files(),
db_comparator_name: metadata.get_db_comparator_name(),
min_applied_lsn: applied_lsn,
})
}
}

fn find_cf_handle<'a>(
Expand Down
90 changes: 81 additions & 9 deletions crates/partition-store/src/partition_store_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,19 @@ use std::collections::BTreeMap;
use std::ops::RangeInclusive;
use std::sync::Arc;

use restate_types::live::BoxedLiveLoad;
use rocksdb::ExportImportFilesMetaData;
use tokio::sync::Mutex;
use tracing::debug;
use tracing::{debug, error, info, warn};

use restate_rocksdb::{
CfName, CfPrefixPattern, DbName, DbSpecBuilder, RocksDb, RocksDbManager, RocksError,
};
use restate_types::config::RocksDbOptions;
use restate_types::config::StorageOptions;
use restate_types::identifiers::PartitionId;
use restate_types::identifiers::PartitionKey;
use restate_types::live::LiveLoad;
use restate_types::config::{RocksDbOptions, StorageOptions};
use restate_types::identifiers::{PartitionId, PartitionKey};
use restate_types::live::{BoxedLiveLoad, LiveLoad};

use crate::cf_options;
use crate::snapshots::LocalPartitionSnapshot;
use crate::PartitionStore;
use crate::DB;

Expand Down Expand Up @@ -56,7 +55,7 @@ impl PartitionStoreManager {
mut storage_opts: impl LiveLoad<StorageOptions> + Send + 'static,
updateable_opts: BoxedLiveLoad<RocksDbOptions>,
initial_partition_set: &[(PartitionId, RangeInclusive<PartitionKey>)],
) -> std::result::Result<Self, RocksError> {
) -> Result<Self, RocksError> {
let options = storage_opts.live_load();

let per_partition_memory_budget = options.rocksdb_memory_budget()
Expand Down Expand Up @@ -102,7 +101,7 @@ impl PartitionStoreManager {
partition_key_range: RangeInclusive<PartitionKey>,
open_mode: OpenMode,
opts: &RocksDbOptions,
) -> std::result::Result<PartitionStore, RocksError> {
) -> Result<PartitionStore, RocksError> {
let mut guard = self.lookup.lock().await;
if let Some(store) = guard.live.get(&partition_id) {
return Ok(store.clone());
Expand Down Expand Up @@ -130,6 +129,79 @@ impl PartitionStoreManager {

Ok(partition_store)
}

/// Imports a partition snapshot and opens it as a partition store.
/// The database must not have an existing column family for the partition id;
/// it will be created based on the supplied snapshot.
pub async fn restore_partition_store_snapshot(
&self,
partition_id: PartitionId,
partition_key_range: RangeInclusive<PartitionKey>,
snapshot: LocalPartitionSnapshot,
opts: &RocksDbOptions,
) -> Result<PartitionStore, RocksError> {
let mut guard = self.lookup.lock().await;
if guard.live.contains_key(&partition_id) {
warn!(
?partition_id,
?snapshot,
"The partition store is already open, refusing to import snapshot"
);
return Err(RocksError::AlreadyOpen);
}

let cf_name = cf_for_partition(partition_id);
let cf_exists = self.rocksdb.inner().cf_handle(&cf_name).is_some();
if cf_exists {
warn!(
?partition_id,
?cf_name,
?snapshot,
"The column family for partition already exists in the database, cannot import snapshot"
);
return Err(RocksError::ColumnFamilyExists);
}

let mut import_metadata = ExportImportFilesMetaData::default();
import_metadata.set_db_comparator_name(snapshot.db_comparator_name.as_str());
import_metadata.set_files(&snapshot.files);

info!(
?partition_id,
min_applied_lsn = ?snapshot.min_applied_lsn,
"Initializing partition store from snapshot"
);

if let Err(e) = self
.rocksdb
.import_cf(cf_name.clone(), opts, import_metadata)
.await
{
error!(?partition_id, "Failed to import snapshot");
return Err(e);
}

assert!(self.rocksdb.inner().cf_handle(&cf_name).is_some());
let partition_store = PartitionStore::new(
self.raw_db.clone(),
self.rocksdb.clone(),
cf_name,
partition_id,
partition_key_range,
);
guard.live.insert(partition_id, partition_store.clone());

Ok(partition_store)
}

pub async fn drop_partition(&self, partition_id: PartitionId) {
let mut guard = self.lookup.lock().await;
self.raw_db
.drop_cf(&cf_for_partition(partition_id))
.unwrap();

guard.live.remove(&partition_id);
}
}

fn cf_for_partition(partition_id: PartitionId) -> CfName {
Expand Down
101 changes: 101 additions & 0 deletions crates/partition-store/src/snapshots.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use std::ops::RangeInclusive;
use std::path::PathBuf;

use rocksdb::LiveFile;
use serde::{Deserialize, Serialize};
use serde_with::base64::Base64;
use serde_with::{serde_as, DeserializeAs, SerializeAs};

use restate_types::identifiers::{PartitionId, PartitionKey, SnapshotId};
use restate_types::logs::Lsn;

#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize, Deserialize)]
pub enum SnapshotFormatVersion {
#[default]
V1,
}

/// A partition store snapshot.
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
pub struct PartitionSnapshotMetadata {
pub version: SnapshotFormatVersion,

/// Restate cluster name which produced the snapshot.
pub cluster_name: String,

/// Restate partition id.
pub partition_id: PartitionId,

/// Node that produced this snapshot.
pub node_name: String,

/// Local node time when the snapshot was created.
#[serde(with = "serde_with::As::<serde_with::DisplayFromStr>")]
pub created_at: humantime::Timestamp,

/// Snapshot id.
pub snapshot_id: SnapshotId,

/// The partition key range that the partition processor which generated this snapshot was
/// responsible for, at the time the snapshot was generated.
pub key_range: RangeInclusive<PartitionKey>,

/// The minimum LSN guaranteed to be applied in this snapshot. The actual
/// LSN may be >= [minimum_lsn].
pub min_applied_lsn: Lsn,

/// The RocksDB comparator name used by the partition processor which generated this snapshot.
pub db_comparator_name: String,

/// The RocksDB SST files comprising the snapshot.
#[serde_as(as = "Vec<SnapshotSstFile>")]
pub files: Vec<LiveFile>,
}

/// A locally-stored partition snapshot.
#[derive(Debug)]
pub struct LocalPartitionSnapshot {
pub base_dir: PathBuf,
pub min_applied_lsn: Lsn,
pub db_comparator_name: String,
pub files: Vec<LiveFile>,
}

/// RocksDB SST file that is part of a snapshot. Serialization wrapper around [LiveFile].
#[serde_as]
#[derive(Serialize, Deserialize)]
#[serde(remote = "LiveFile")]
pub struct SnapshotSstFile {
pub column_family_name: String,
pub name: String,
pub directory: String,
pub size: usize,
pub level: i32,
#[serde_as(as = "Option<Base64>")]
pub start_key: Option<Vec<u8>>,
#[serde_as(as = "Option<Base64>")]
pub end_key: Option<Vec<u8>>,
pub smallest_seqno: u64,
pub largest_seqno: u64,
pub num_entries: u64,
pub num_deletions: u64,
}

impl SerializeAs<LiveFile> for SnapshotSstFile {
fn serialize_as<S>(value: &LiveFile, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
SnapshotSstFile::serialize(value, serializer)
}
}

impl<'de> DeserializeAs<'de, LiveFile> for SnapshotSstFile {
fn deserialize_as<D>(deserializer: D) -> Result<LiveFile, D::Error>
where
D: serde::Deserializer<'de>,
{
SnapshotSstFile::deserialize(deserializer)
}
}
Loading

0 comments on commit 4959252

Please sign in to comment.