Skip to content

Commit

Permalink
Introduce Partition Snapshot Producer
Browse files Browse the repository at this point in the history
This component can create online partition snapshots at runtime.
  • Loading branch information
pcholakov committed Sep 27, 2024
1 parent 1b0a34d commit 9046c17
Show file tree
Hide file tree
Showing 8 changed files with 211 additions and 28 deletions.
8 changes: 6 additions & 2 deletions crates/core/src/task_center_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub enum TaskKind {
/// Do not use. This is a special task kind that indicate that work is running within
/// task_center but its lifecycle is not managed by it.
InPlace,
/// Tasks used during system initialization. Short lived but will shutdown the node if they
/// Tasks used during system initialization. Short-lived but will shut down the node if they
/// failed.
#[strum(props(OnCancel = "abort"))]
SystemBoot,
Expand All @@ -73,14 +73,18 @@ pub enum TaskKind {
#[strum(props(OnCancel = "abort", OnError = "log"))]
RpcConnection,
/// A type for ingress until we start enforcing timeouts for inflight requests. This enables us
/// to shutdown cleanly without waiting indefinitely.
/// to shut down cleanly without waiting indefinitely.
#[strum(props(OnCancel = "abort", runtime = "ingress"))]
IngressServer,
RoleRunner,
SystemService,
#[strum(props(OnCancel = "abort", runtime = "ingress"))]
Ingress,
PartitionProcessor,
/// Longer-running, low-priority tasks that is responsible for the export, and potentially
/// upload to remote storage, of partition store snapshots.
#[strum(props(OnCancel = "abort", OnError = "log"))]
PartitionSnapshotProducer,
#[strum(props(OnError = "log"))]
ConnectionReactor,
Shuffle,
Expand Down
12 changes: 11 additions & 1 deletion crates/core/src/worker_api/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use restate_types::identifiers::PartitionId;
use tokio::sync::{mpsc, oneshot};

use restate_types::identifiers::{PartitionId, SnapshotId};

use crate::ShutdownError;

#[derive(Debug)]
pub enum ProcessorsManagerCommand {
GetLivePartitions(oneshot::Sender<Vec<PartitionId>>),
CreateSnapshot(PartitionId, oneshot::Sender<anyhow::Result<SnapshotId>>),
}

#[derive(Debug, Clone)]
Expand All @@ -34,4 +36,12 @@ impl ProcessorsManagerHandle {
.unwrap();
rx.await.map_err(|_| ShutdownError)
}

pub async fn create_snapshot(&self, partition_id: PartitionId) -> anyhow::Result<SnapshotId> {
let (tx, rx) = oneshot::channel();
self.0
.send(ProcessorsManagerCommand::CreateSnapshot(partition_id, tx))
.await?;
rx.await?
}
}
2 changes: 1 addition & 1 deletion crates/partition-store/src/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub enum SnapshotFormatVersion {

/// A partition store snapshot.
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PartitionSnapshotMetadata {
pub version: SnapshotFormatVersion,

Expand Down
3 changes: 1 addition & 2 deletions crates/types/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,11 @@ use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;

use super::live::{LiveLoad, Pinned};
use crate::errors::GenericError;
use crate::live::Live;
use crate::nodes_config::Role;

use super::live::{LiveLoad, Pinned};

#[cfg(any(test, feature = "test-util"))]
enum TempOrPath {
Temp(tempfile::TempDir),
Expand Down
24 changes: 24 additions & 0 deletions crates/types/src/config/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use tracing::warn;
use restate_serde_util::NonZeroByteCount;

use super::{CommonOptions, RocksDbOptions, RocksDbOptionsBuilder};
use crate::identifiers::PartitionId;
use crate::retries::RetryPolicy;

/// # Worker options
Expand Down Expand Up @@ -59,6 +60,8 @@ pub struct WorkerOptions {
/// The maximum number of commands a partition processor will apply in a batch. The larger this
/// value is, the higher the throughput and latency are.
max_command_batch_size: NonZeroUsize,

pub snapshots: SnapshotsOptions,
}

impl WorkerOptions {
Expand Down Expand Up @@ -93,6 +96,7 @@ impl Default for WorkerOptions {
storage: StorageOptions::default(),
invoker: Default::default(),
max_command_batch_size: NonZeroUsize::new(4).expect("Non zero number"),
snapshots: Default::default(),
}
}
}
Expand Down Expand Up @@ -341,3 +345,23 @@ impl Default for StorageOptions {
}
}
}

/// # Snapshot options.
/// Configures the worker store partition snapshot mechanism.
#[serde_as]
#[derive(Default, Debug, Clone, Serialize, Deserialize, derive_builder::Builder)]
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
#[cfg_attr(feature = "schemars", schemars(rename = "SnapshotsOptions", default))]
#[serde(rename_all = "kebab-case")]
#[builder(default)]
pub struct SnapshotsOptions {}

impl SnapshotsOptions {
pub fn snapshots_base_dir(&self) -> PathBuf {
super::data_dir("db-snapshots")
}

pub fn snapshots_dir(&self, partition_id: PartitionId) -> PathBuf {
super::data_dir("db-snapshots").join(partition_id.to_string())
}
}
88 changes: 69 additions & 19 deletions crates/worker/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,17 @@ use std::ops::RangeInclusive;
use std::sync::Arc;
use std::time::{Duration, Instant};

use anyhow::Context;
use anyhow::{anyhow, Context};
use assert2::let_assert;
use futures::{FutureExt, Stream, StreamExt, TryStreamExt as _};
use metrics::{counter, histogram};
use tokio::sync::{mpsc, watch};
use tokio::sync::{mpsc, oneshot, watch};
use tokio::time::MissedTickBehavior;
use tracing::{debug, error, info, instrument, trace, warn, Span};
use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};

use restate_bifrost::{Bifrost, FindTailAttributes};
use restate_core::cancellation_watcher;
use restate_core::metadata;
use restate_core::network::{Networking, TransportConnect};
use restate_core::{cancellation_watcher, metadata, TaskHandle, TaskKind};
use restate_partition_store::{PartitionStore, PartitionStoreTransaction};
use restate_storage_api::deduplication_table::{
DedupInformation, DedupSequenceNumber, DeduplicationTable, ProducerId,
Expand All @@ -34,9 +33,10 @@ use restate_storage_api::fsm_table::{FsmTable, ReadOnlyFsmTable};
use restate_storage_api::outbox_table::ReadOnlyOutboxTable;
use restate_storage_api::{invocation_status_table, StorageError, Transaction};
use restate_types::cluster::cluster_state::{PartitionProcessorStatus, ReplayStatus, RunMode};
use restate_types::config::WorkerOptions;
use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey};
use restate_types::config::{Configuration, WorkerOptions};
use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey, SnapshotId};
use restate_types::journal::raw::RawEntryCodec;
use restate_types::live::Live;
use restate_types::logs::MatchKeyQuery;
use restate_types::logs::{KeyFilter, LogId, Lsn, SequenceNumber};
use restate_types::time::MillisSinceEpoch;
Expand All @@ -50,21 +50,23 @@ use crate::metric_definitions::{
};
use crate::partition::invoker_storage_reader::InvokerStorageReader;
use crate::partition::leadership::{LeadershipState, PartitionProcessorMetadata};
use crate::partition::snapshot_producer::{SnapshotProducer, SnapshotSource};
use crate::partition::state_machine::{ActionCollector, StateMachine};

mod action_effect_handler;
mod cleaner;
pub mod invoker_storage_reader;
mod leadership;
pub mod shuffle;
mod snapshot_producer;
mod state_machine;
pub mod types;

/// Control messages from Manager to individual partition processor instances.
#[allow(dead_code)]
pub enum PartitionProcessorControlCommand {
RunForLeader(LeaderEpoch),
StepDown,
CreateSnapshot(Option<oneshot::Sender<anyhow::Result<SnapshotId>>>),
}

#[derive(Debug)]
Expand Down Expand Up @@ -123,6 +125,7 @@ where
networking: Networking<T>,
bifrost: Bifrost,
mut partition_store: PartitionStore,
configuration: Live<Configuration>,
) -> Result<PartitionProcessor<Codec, InvokerInputSender, T>, StorageError> {
let PartitionProcessorBuilder {
partition_id,
Expand Down Expand Up @@ -178,11 +181,13 @@ where
leadership_state,
state_machine,
max_command_batch_size,
partition_store: Some(partition_store),
partition_store,
bifrost,
configuration,
control_rx,
status_watch_tx,
status,
inflight_create_snapshot_task: None,
})
}

Expand Down Expand Up @@ -220,14 +225,13 @@ pub struct PartitionProcessor<Codec, InvokerSender, T> {
leadership_state: LeadershipState<InvokerSender, T>,
state_machine: StateMachine<Codec>,
bifrost: Bifrost,
configuration: Live<Configuration>,
control_rx: mpsc::Receiver<PartitionProcessorControlCommand>,
status_watch_tx: watch::Sender<PartitionProcessorStatus>,
status: PartitionProcessorStatus,

max_command_batch_size: usize,

// will be taken by the `run` method to decouple transactions from self
partition_store: Option<PartitionStore>,
partition_store: PartitionStore,
inflight_create_snapshot_task: Option<TaskHandle<anyhow::Result<()>>>,
}

impl<Codec, InvokerSender, T> PartitionProcessor<Codec, InvokerSender, T>
Expand All @@ -238,10 +242,7 @@ where
{
#[instrument(level = "error", skip_all, fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty))]
pub async fn run(mut self) -> anyhow::Result<()> {
let mut partition_store = self
.partition_store
.take()
.expect("partition storage must be configured");
let mut partition_store = self.partition_store.clone();
let last_applied_lsn = partition_store.get_applied_lsn().await?;
let last_applied_lsn = last_applied_lsn.unwrap_or(Lsn::INVALID);

Expand All @@ -253,13 +254,19 @@ where
FindTailAttributes::default(),
)
.await?;
info!(
debug!(
last_applied_lsn = %last_applied_lsn,
current_log_tail = ?current_tail,
"PartitionProcessor creating log reader",
);
if current_tail.offset() == last_applied_lsn.next() {
self.status.replay_status = ReplayStatus::Active;
if self.status.replay_status != ReplayStatus::Active {
debug!(
?last_applied_lsn,
"Processor has caught up with the log tail."
);
self.status.replay_status = ReplayStatus::Active;
}
} else {
// catching up.
self.status.target_tail_lsn = Some(current_tail.offset());
Expand Down Expand Up @@ -448,6 +455,49 @@ where
.await
.context("failed handling StepDown command")?;
}
PartitionProcessorControlCommand::CreateSnapshot(maybe_sender) => {
if self
.inflight_create_snapshot_task
.as_ref()
.is_some_and(|task| !task.is_finished())
{
warn!("Snapshot creation already in progress, rejecting request");
maybe_sender
.and_then(|tx| tx.send(Err(anyhow!("Snapshot creation in progress"))).ok());
return Ok(());
}

let config = self.configuration.live_load();
let snapshot_source = SnapshotSource {
cluster_name: config.common.cluster_name().into(),
node_name: config.common.node_name().into(),
};
let snapshot_base_path = config.worker.snapshots.snapshots_dir(self.partition_id);
let partition_store = self.partition_store.clone();
let snapshot_span = tracing::info_span!("create-snapshot");
let inflight_create_snapshot_task = restate_core::task_center().spawn_unmanaged(
TaskKind::PartitionSnapshotProducer,
"create-snapshot",
Some(self.partition_id),
async move {
let result = SnapshotProducer::create(
snapshot_source,
partition_store,
snapshot_base_path,
)
.await;

if let Some(tx) = maybe_sender {
tx.send(result.map(|metadata| metadata.snapshot_id)).ok();
}
Ok(())
}
.instrument(snapshot_span),
)?;

self.inflight_create_snapshot_task
.replace(inflight_create_snapshot_task);
}
}

Ok(())
Expand Down
76 changes: 76 additions & 0 deletions crates/worker/src/partition/snapshot_producer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::path::PathBuf;
use std::time::SystemTime;

use anyhow::bail;
use tracing::{info, warn};

use restate_partition_store::snapshots::{PartitionSnapshotMetadata, SnapshotFormatVersion};
use restate_partition_store::PartitionStore;
use restate_types::identifiers::SnapshotId;

/// Encapsulates producing a Restate partition snapshot out of a partition store.
pub struct SnapshotProducer {}

pub struct SnapshotSource {
pub cluster_name: String,
pub node_name: String,
}

impl SnapshotProducer {
pub async fn create(
snapshot_source: SnapshotSource,
mut partition_store: PartitionStore,
snapshot_base_path: PathBuf,
) -> anyhow::Result<PartitionSnapshotMetadata> {
let partition_snapshots_path =
snapshot_base_path.join(partition_store.partition_id().to_string());
if let Err(e) = tokio::fs::create_dir_all(&partition_snapshots_path).await {
warn!(
path = ?partition_snapshots_path,
error = ?e,
"Failed to create partition snapshot directory"
);
bail!("Failed to create partition snapshot directory: {:?}", e);
}

let snapshot_id = SnapshotId::new();
let snapshot_path = partition_snapshots_path.join(snapshot_id.to_string());
let snapshot = partition_store
.create_snapshot(snapshot_path.clone())
.await?;

let snapshot_meta = PartitionSnapshotMetadata {
version: SnapshotFormatVersion::V1,
cluster_name: snapshot_source.cluster_name,
node_name: snapshot_source.node_name,
partition_id: partition_store.partition_id(),
created_at: humantime::Timestamp::from(SystemTime::now()),
snapshot_id,
key_range: partition_store.partition_key_range().clone(),
min_applied_lsn: snapshot.min_applied_lsn,
db_comparator_name: snapshot.db_comparator_name.clone(),
files: snapshot.files.clone(),
};
let metadata_json = serde_json::to_string_pretty(&snapshot_meta)?;

let metadata_path = snapshot_path.join("metadata.json");
tokio::fs::write(metadata_path.clone(), metadata_json).await?;
info!(
lsn = %snapshot.min_applied_lsn,
metadata = ?metadata_path,
"Partition snapshot written"
);

Ok(snapshot_meta)
}
}
Loading

0 comments on commit 9046c17

Please sign in to comment.