From 9046c17e35e3dc84cf78ea59c3797fd06c166caa Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Sun, 22 Sep 2024 11:18:21 +0200 Subject: [PATCH] Introduce Partition Snapshot Producer This component can create online partition snapshots at runtime. --- crates/core/src/task_center_types.rs | 8 +- .../worker_api/partition_processor_manager.rs | 12 ++- crates/partition-store/src/snapshots.rs | 2 +- crates/types/src/config/mod.rs | 3 +- crates/types/src/config/worker.rs | 24 +++++ crates/worker/src/partition/mod.rs | 88 +++++++++++++++---- .../worker/src/partition/snapshot_producer.rs | 76 ++++++++++++++++ .../worker/src/partition_processor_manager.rs | 26 +++++- 8 files changed, 211 insertions(+), 28 deletions(-) create mode 100644 crates/worker/src/partition/snapshot_producer.rs diff --git a/crates/core/src/task_center_types.rs b/crates/core/src/task_center_types.rs index c4df2a138..b47d52607 100644 --- a/crates/core/src/task_center_types.rs +++ b/crates/core/src/task_center_types.rs @@ -63,7 +63,7 @@ pub enum TaskKind { /// Do not use. This is a special task kind that indicate that work is running within /// task_center but its lifecycle is not managed by it. InPlace, - /// Tasks used during system initialization. Short lived but will shutdown the node if they + /// Tasks used during system initialization. Short-lived but will shut down the node if they /// failed. #[strum(props(OnCancel = "abort"))] SystemBoot, @@ -73,7 +73,7 @@ pub enum TaskKind { #[strum(props(OnCancel = "abort", OnError = "log"))] RpcConnection, /// A type for ingress until we start enforcing timeouts for inflight requests. This enables us - /// to shutdown cleanly without waiting indefinitely. + /// to shut down cleanly without waiting indefinitely. #[strum(props(OnCancel = "abort", runtime = "ingress"))] IngressServer, RoleRunner, @@ -81,6 +81,10 @@ pub enum TaskKind { #[strum(props(OnCancel = "abort", runtime = "ingress"))] Ingress, PartitionProcessor, + /// Longer-running, low-priority tasks that is responsible for the export, and potentially + /// upload to remote storage, of partition store snapshots. + #[strum(props(OnCancel = "abort", OnError = "log"))] + PartitionSnapshotProducer, #[strum(props(OnError = "log"))] ConnectionReactor, Shuffle, diff --git a/crates/core/src/worker_api/partition_processor_manager.rs b/crates/core/src/worker_api/partition_processor_manager.rs index 5486de0a4..2db6d5aa9 100644 --- a/crates/core/src/worker_api/partition_processor_manager.rs +++ b/crates/core/src/worker_api/partition_processor_manager.rs @@ -8,14 +8,16 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use restate_types::identifiers::PartitionId; use tokio::sync::{mpsc, oneshot}; +use restate_types::identifiers::{PartitionId, SnapshotId}; + use crate::ShutdownError; #[derive(Debug)] pub enum ProcessorsManagerCommand { GetLivePartitions(oneshot::Sender>), + CreateSnapshot(PartitionId, oneshot::Sender>), } #[derive(Debug, Clone)] @@ -34,4 +36,12 @@ impl ProcessorsManagerHandle { .unwrap(); rx.await.map_err(|_| ShutdownError) } + + pub async fn create_snapshot(&self, partition_id: PartitionId) -> anyhow::Result { + let (tx, rx) = oneshot::channel(); + self.0 + .send(ProcessorsManagerCommand::CreateSnapshot(partition_id, tx)) + .await?; + rx.await? + } } diff --git a/crates/partition-store/src/snapshots.rs b/crates/partition-store/src/snapshots.rs index 8b715446e..fe16dc2c7 100644 --- a/crates/partition-store/src/snapshots.rs +++ b/crates/partition-store/src/snapshots.rs @@ -17,7 +17,7 @@ pub enum SnapshotFormatVersion { /// A partition store snapshot. #[serde_as] -#[derive(Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct PartitionSnapshotMetadata { pub version: SnapshotFormatVersion, diff --git a/crates/types/src/config/mod.rs b/crates/types/src/config/mod.rs index 1552d2ba9..a497e7a18 100644 --- a/crates/types/src/config/mod.rs +++ b/crates/types/src/config/mod.rs @@ -51,12 +51,11 @@ use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use serde_with::serde_as; +use super::live::{LiveLoad, Pinned}; use crate::errors::GenericError; use crate::live::Live; use crate::nodes_config::Role; -use super::live::{LiveLoad, Pinned}; - #[cfg(any(test, feature = "test-util"))] enum TempOrPath { Temp(tempfile::TempDir), diff --git a/crates/types/src/config/worker.rs b/crates/types/src/config/worker.rs index b8de434b7..84e348b48 100644 --- a/crates/types/src/config/worker.rs +++ b/crates/types/src/config/worker.rs @@ -18,6 +18,7 @@ use tracing::warn; use restate_serde_util::NonZeroByteCount; use super::{CommonOptions, RocksDbOptions, RocksDbOptionsBuilder}; +use crate::identifiers::PartitionId; use crate::retries::RetryPolicy; /// # Worker options @@ -59,6 +60,8 @@ pub struct WorkerOptions { /// The maximum number of commands a partition processor will apply in a batch. The larger this /// value is, the higher the throughput and latency are. max_command_batch_size: NonZeroUsize, + + pub snapshots: SnapshotsOptions, } impl WorkerOptions { @@ -93,6 +96,7 @@ impl Default for WorkerOptions { storage: StorageOptions::default(), invoker: Default::default(), max_command_batch_size: NonZeroUsize::new(4).expect("Non zero number"), + snapshots: Default::default(), } } } @@ -341,3 +345,23 @@ impl Default for StorageOptions { } } } + +/// # Snapshot options. +/// Configures the worker store partition snapshot mechanism. +#[serde_as] +#[derive(Default, Debug, Clone, Serialize, Deserialize, derive_builder::Builder)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "schemars", schemars(rename = "SnapshotsOptions", default))] +#[serde(rename_all = "kebab-case")] +#[builder(default)] +pub struct SnapshotsOptions {} + +impl SnapshotsOptions { + pub fn snapshots_base_dir(&self) -> PathBuf { + super::data_dir("db-snapshots") + } + + pub fn snapshots_dir(&self, partition_id: PartitionId) -> PathBuf { + super::data_dir("db-snapshots").join(partition_id.to_string()) + } +} diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 5cc464ac8..2cf10d100 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -13,18 +13,17 @@ use std::ops::RangeInclusive; use std::sync::Arc; use std::time::{Duration, Instant}; -use anyhow::Context; +use anyhow::{anyhow, Context}; use assert2::let_assert; use futures::{FutureExt, Stream, StreamExt, TryStreamExt as _}; use metrics::{counter, histogram}; -use tokio::sync::{mpsc, watch}; +use tokio::sync::{mpsc, oneshot, watch}; use tokio::time::MissedTickBehavior; -use tracing::{debug, error, info, instrument, trace, warn, Span}; +use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span}; use restate_bifrost::{Bifrost, FindTailAttributes}; -use restate_core::cancellation_watcher; -use restate_core::metadata; use restate_core::network::{Networking, TransportConnect}; +use restate_core::{cancellation_watcher, metadata, TaskHandle, TaskKind}; use restate_partition_store::{PartitionStore, PartitionStoreTransaction}; use restate_storage_api::deduplication_table::{ DedupInformation, DedupSequenceNumber, DeduplicationTable, ProducerId, @@ -34,9 +33,10 @@ use restate_storage_api::fsm_table::{FsmTable, ReadOnlyFsmTable}; use restate_storage_api::outbox_table::ReadOnlyOutboxTable; use restate_storage_api::{invocation_status_table, StorageError, Transaction}; use restate_types::cluster::cluster_state::{PartitionProcessorStatus, ReplayStatus, RunMode}; -use restate_types::config::WorkerOptions; -use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey}; +use restate_types::config::{Configuration, WorkerOptions}; +use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey, SnapshotId}; use restate_types::journal::raw::RawEntryCodec; +use restate_types::live::Live; use restate_types::logs::MatchKeyQuery; use restate_types::logs::{KeyFilter, LogId, Lsn, SequenceNumber}; use restate_types::time::MillisSinceEpoch; @@ -50,6 +50,7 @@ use crate::metric_definitions::{ }; use crate::partition::invoker_storage_reader::InvokerStorageReader; use crate::partition::leadership::{LeadershipState, PartitionProcessorMetadata}; +use crate::partition::snapshot_producer::{SnapshotProducer, SnapshotSource}; use crate::partition::state_machine::{ActionCollector, StateMachine}; mod action_effect_handler; @@ -57,14 +58,15 @@ mod cleaner; pub mod invoker_storage_reader; mod leadership; pub mod shuffle; +mod snapshot_producer; mod state_machine; pub mod types; /// Control messages from Manager to individual partition processor instances. -#[allow(dead_code)] pub enum PartitionProcessorControlCommand { RunForLeader(LeaderEpoch), StepDown, + CreateSnapshot(Option>>), } #[derive(Debug)] @@ -123,6 +125,7 @@ where networking: Networking, bifrost: Bifrost, mut partition_store: PartitionStore, + configuration: Live, ) -> Result, StorageError> { let PartitionProcessorBuilder { partition_id, @@ -178,11 +181,13 @@ where leadership_state, state_machine, max_command_batch_size, - partition_store: Some(partition_store), + partition_store, bifrost, + configuration, control_rx, status_watch_tx, status, + inflight_create_snapshot_task: None, }) } @@ -220,14 +225,13 @@ pub struct PartitionProcessor { leadership_state: LeadershipState, state_machine: StateMachine, bifrost: Bifrost, + configuration: Live, control_rx: mpsc::Receiver, status_watch_tx: watch::Sender, status: PartitionProcessorStatus, - max_command_batch_size: usize, - - // will be taken by the `run` method to decouple transactions from self - partition_store: Option, + partition_store: PartitionStore, + inflight_create_snapshot_task: Option>>, } impl PartitionProcessor @@ -238,10 +242,7 @@ where { #[instrument(level = "error", skip_all, fields(partition_id = %self.partition_id, is_leader = tracing::field::Empty))] pub async fn run(mut self) -> anyhow::Result<()> { - let mut partition_store = self - .partition_store - .take() - .expect("partition storage must be configured"); + let mut partition_store = self.partition_store.clone(); let last_applied_lsn = partition_store.get_applied_lsn().await?; let last_applied_lsn = last_applied_lsn.unwrap_or(Lsn::INVALID); @@ -253,13 +254,19 @@ where FindTailAttributes::default(), ) .await?; - info!( + debug!( last_applied_lsn = %last_applied_lsn, current_log_tail = ?current_tail, "PartitionProcessor creating log reader", ); if current_tail.offset() == last_applied_lsn.next() { - self.status.replay_status = ReplayStatus::Active; + if self.status.replay_status != ReplayStatus::Active { + debug!( + ?last_applied_lsn, + "Processor has caught up with the log tail." + ); + self.status.replay_status = ReplayStatus::Active; + } } else { // catching up. self.status.target_tail_lsn = Some(current_tail.offset()); @@ -448,6 +455,49 @@ where .await .context("failed handling StepDown command")?; } + PartitionProcessorControlCommand::CreateSnapshot(maybe_sender) => { + if self + .inflight_create_snapshot_task + .as_ref() + .is_some_and(|task| !task.is_finished()) + { + warn!("Snapshot creation already in progress, rejecting request"); + maybe_sender + .and_then(|tx| tx.send(Err(anyhow!("Snapshot creation in progress"))).ok()); + return Ok(()); + } + + let config = self.configuration.live_load(); + let snapshot_source = SnapshotSource { + cluster_name: config.common.cluster_name().into(), + node_name: config.common.node_name().into(), + }; + let snapshot_base_path = config.worker.snapshots.snapshots_dir(self.partition_id); + let partition_store = self.partition_store.clone(); + let snapshot_span = tracing::info_span!("create-snapshot"); + let inflight_create_snapshot_task = restate_core::task_center().spawn_unmanaged( + TaskKind::PartitionSnapshotProducer, + "create-snapshot", + Some(self.partition_id), + async move { + let result = SnapshotProducer::create( + snapshot_source, + partition_store, + snapshot_base_path, + ) + .await; + + if let Some(tx) = maybe_sender { + tx.send(result.map(|metadata| metadata.snapshot_id)).ok(); + } + Ok(()) + } + .instrument(snapshot_span), + )?; + + self.inflight_create_snapshot_task + .replace(inflight_create_snapshot_task); + } } Ok(()) diff --git a/crates/worker/src/partition/snapshot_producer.rs b/crates/worker/src/partition/snapshot_producer.rs new file mode 100644 index 000000000..1b4b9f269 --- /dev/null +++ b/crates/worker/src/partition/snapshot_producer.rs @@ -0,0 +1,76 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::path::PathBuf; +use std::time::SystemTime; + +use anyhow::bail; +use tracing::{info, warn}; + +use restate_partition_store::snapshots::{PartitionSnapshotMetadata, SnapshotFormatVersion}; +use restate_partition_store::PartitionStore; +use restate_types::identifiers::SnapshotId; + +/// Encapsulates producing a Restate partition snapshot out of a partition store. +pub struct SnapshotProducer {} + +pub struct SnapshotSource { + pub cluster_name: String, + pub node_name: String, +} + +impl SnapshotProducer { + pub async fn create( + snapshot_source: SnapshotSource, + mut partition_store: PartitionStore, + snapshot_base_path: PathBuf, + ) -> anyhow::Result { + let partition_snapshots_path = + snapshot_base_path.join(partition_store.partition_id().to_string()); + if let Err(e) = tokio::fs::create_dir_all(&partition_snapshots_path).await { + warn!( + path = ?partition_snapshots_path, + error = ?e, + "Failed to create partition snapshot directory" + ); + bail!("Failed to create partition snapshot directory: {:?}", e); + } + + let snapshot_id = SnapshotId::new(); + let snapshot_path = partition_snapshots_path.join(snapshot_id.to_string()); + let snapshot = partition_store + .create_snapshot(snapshot_path.clone()) + .await?; + + let snapshot_meta = PartitionSnapshotMetadata { + version: SnapshotFormatVersion::V1, + cluster_name: snapshot_source.cluster_name, + node_name: snapshot_source.node_name, + partition_id: partition_store.partition_id(), + created_at: humantime::Timestamp::from(SystemTime::now()), + snapshot_id, + key_range: partition_store.partition_key_range().clone(), + min_applied_lsn: snapshot.min_applied_lsn, + db_comparator_name: snapshot.db_comparator_name.clone(), + files: snapshot.files.clone(), + }; + let metadata_json = serde_json::to_string_pretty(&snapshot_meta)?; + + let metadata_path = snapshot_path.join("metadata.json"); + tokio::fs::write(metadata_path.clone(), metadata_json).await?; + info!( + lsn = %snapshot.min_applied_lsn, + metadata = ?metadata_path, + "Partition snapshot written" + ); + + Ok(snapshot_meta) + } +} diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index e78de6a2c..637e89f8b 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -20,7 +20,7 @@ use futures::stream::StreamExt; use futures::Stream; use metrics::gauge; use tokio::sync::mpsc::error::TrySendError; -use tokio::sync::{mpsc, watch}; +use tokio::sync::{mpsc, oneshot, watch}; use tokio::time; use tokio::time::MissedTickBehavior; use tracing::{debug, info, instrument, trace, warn}; @@ -44,7 +44,7 @@ use restate_types::cluster::cluster_state::ReplayStatus; use restate_types::cluster::cluster_state::{PartitionProcessorStatus, RunMode}; use restate_types::config::{Configuration, StorageOptions}; use restate_types::epoch::EpochMetadata; -use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey}; +use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey, SnapshotId}; use restate_types::live::Live; use restate_types::live::LiveLoad; use restate_types::logs::Lsn; @@ -240,6 +240,15 @@ impl PartitionProcessorHandle { .try_send(PartitionProcessorControlCommand::RunForLeader(leader_epoch))?; Ok(()) } + + fn create_snapshot( + &self, + sender: Option>>, + ) -> Result<(), PartitionProcessorHandleError> { + self.control_tx + .try_send(PartitionProcessorControlCommand::CreateSnapshot(sender))?; + Ok(()) + } } type ChannelStatusReaderList = Vec<(RangeInclusive, ChannelStatusReader)>; @@ -497,6 +506,11 @@ impl PartitionProcessorManager { let live_partitions = self.running_partition_processors.keys().cloned().collect(); let _ = sender.send(live_partitions); } + CreateSnapshot(partition_id, sender) => { + self.running_partition_processors + .get(&partition_id) + .map(|store| store.handle.create_snapshot(Some(sender))); + } } } @@ -688,6 +702,7 @@ impl PartitionProcessorManager { let invoker_name = Box::leak(Box::new(format!("invoker-{}", partition_id))); let invoker_config = self.updateable_config.clone().map(|c| &c.worker.invoker); + let configuration = self.updateable_config.clone(); let maybe_task_id: Result = self.task_center.start_runtime( TaskKind::PartitionProcessor, @@ -715,7 +730,12 @@ impl PartitionProcessorManager { )?; pp_builder - .build::(networking, bifrost, partition_store) + .build::( + networking, + bifrost, + partition_store, + configuration, + ) .await? .run() .await