From 7da0dc923c169b3284d95e6dd213edf54dfc7259 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Wed, 25 Sep 2024 23:17:55 +0200 Subject: [PATCH] Expose a partition worker CreateSnapshot RPC stack-info: PR: https://github.com/restatedev/restate/pull/1998, branch: pcholakov/stack/1 --- crates/admin/protobuf/cluster_ctrl_svc.proto | 10 ++ .../admin/src/cluster_controller/service.rs | 154 +++++++++++++++++- .../network_server/handler/cluster_ctrl.rs | 34 +++- crates/types/protobuf/restate/common.proto | 3 + crates/types/src/config/networking.rs | 8 + .../src/net/partition_processor_manager.rs | 24 ++- crates/worker/src/lib.rs | 12 +- .../worker/src/partition_processor_manager.rs | 74 ++++++++- tools/restatectl/src/app.rs | 4 + tools/restatectl/src/commands/mod.rs | 1 + .../src/commands/snapshot/create_snapshot.rs | 59 +++++++ tools/restatectl/src/commands/snapshot/mod.rs | 19 +++ 12 files changed, 387 insertions(+), 15 deletions(-) create mode 100644 tools/restatectl/src/commands/snapshot/create_snapshot.rs create mode 100644 tools/restatectl/src/commands/snapshot/mod.rs diff --git a/crates/admin/protobuf/cluster_ctrl_svc.proto b/crates/admin/protobuf/cluster_ctrl_svc.proto index 444064fd4..0a7b99221 100644 --- a/crates/admin/protobuf/cluster_ctrl_svc.proto +++ b/crates/admin/protobuf/cluster_ctrl_svc.proto @@ -25,6 +25,8 @@ service ClusterCtrlSvc { rpc ListNodes(ListNodesRequest) returns (ListNodesResponse); rpc TrimLog(TrimLogRequest) returns (google.protobuf.Empty); + + rpc CreatePartitionSnapshot(CreatePartitionSnapshotRequest) returns (CreatePartitionSnapshotResponse); } message ClusterStateRequest { } @@ -68,3 +70,11 @@ message TrimLogRequest { uint32 log_id = 1; uint64 trim_point = 2; } + +message CreatePartitionSnapshotRequest { + uint32 partition_id = 1; +} + +message CreatePartitionSnapshotResponse { + string snapshot_id = 1; +} diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index 0e4783138..06a2a417d 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -12,6 +12,7 @@ use std::collections::BTreeMap; use std::pin::Pin; use std::sync::Arc; +use anyhow::anyhow; use codederror::CodedError; use futures::future::OptionFuture; use futures::{Stream, StreamExt}; @@ -22,18 +23,22 @@ use tracing::{debug, info, warn}; use restate_bifrost::{Bifrost, BifrostAdmin}; use restate_core::metadata_store::MetadataStoreClient; -use restate_core::network::{Incoming, MessageRouterBuilder, Networking, TransportConnect}; +use restate_core::network::rpc_router::RpcRouter; +use restate_core::network::{ + Incoming, MessageRouterBuilder, NetworkSender, Networking, TransportConnect, +}; use restate_core::{ cancellation_watcher, Metadata, MetadataWriter, ShutdownError, TargetVersion, TaskCenter, TaskKind, }; use restate_types::cluster::cluster_state::{AliveNode, ClusterState, NodeState}; -use restate_types::config::{AdminOptions, Configuration}; -use restate_types::identifiers::PartitionId; +use restate_types::config::{AdminOptions, Configuration, NetworkingOptions}; +use restate_types::identifiers::{PartitionId, SnapshotId}; use restate_types::live::Live; use restate_types::logs::{LogId, Lsn, SequenceNumber}; use restate_types::net::cluster_controller::{AttachRequest, AttachResponse}; use restate_types::net::metadata::MetadataKind; +use restate_types::net::partition_processor_manager::CreateSnapshotRequest; use restate_types::{GenerationalNodeId, Version}; use super::cluster_state::{ClusterStateRefresher, ClusterStateWatcher}; @@ -52,6 +57,7 @@ pub struct Service { networking: Networking, incoming_messages: Pin> + Send + Sync + 'static>>, cluster_state_refresher: ClusterStateRefresher, + processor_manager_client: PartitionProcessorManagerClient>, command_tx: mpsc::Sender, command_rx: mpsc::Receiver, @@ -88,6 +94,12 @@ where let options = configuration.live_load(); + let processor_manager_client = PartitionProcessorManagerClient::new( + networking.clone(), + options.networking.clone(), + router_builder, + ); + let heartbeat_interval = Self::create_heartbeat_interval(&options.admin); let (log_trim_interval, log_trim_threshold) = Self::create_log_trim_interval(&options.admin); @@ -101,6 +113,7 @@ where cluster_state_refresher, metadata_writer, metadata_store_client, + processor_manager_client, command_tx, command_rx, heartbeat_interval, @@ -132,6 +145,7 @@ where } } +#[derive(Debug)] enum ClusterControllerCommand { GetClusterState(oneshot::Sender>), TrimLog { @@ -139,6 +153,10 @@ enum ClusterControllerCommand { trim_point: Lsn, response_tx: oneshot::Sender>, }, + CreateSnapshot { + partition_id: PartitionId, + response_tx: oneshot::Sender>, + }, } pub struct ClusterControllerHandle { @@ -174,6 +192,23 @@ impl ClusterControllerHandle { rx.await.map_err(|_| ShutdownError) } + + pub async fn create_partition_snapshot( + &self, + partition_id: PartitionId, + ) -> Result, ShutdownError> { + let (tx, rx) = oneshot::channel(); + + let _ = self + .tx + .send(ClusterControllerCommand::CreateSnapshot { + partition_id, + response_tx: tx, + }) + .await; + + rx.await.map_err(|_| ShutdownError) + } } impl Service { @@ -318,6 +353,61 @@ impl Service { Ok(()) } + /// Triggers a snapshot creation for the given partition by issuing an RPC + /// to the node hosting the active leader. + async fn create_partition_snapshot( + &self, + partition_id: PartitionId, + response_tx: oneshot::Sender>, + ) { + let cluster_state = self.cluster_state_refresher.get_cluster_state(); + + // For now, we just pick the leader node since we know that every partition is likely to + // have one. We'll want to update the algorithm to be smart about scheduling snapshot tasks + // in the future to avoid disrupting the leader when there are up-to-date followers. + let leader_node = cluster_state + .alive_nodes() + .filter_map(|node| { + node.partitions + .get(&partition_id) + .filter(|status| status.is_effective_leader()) + .map(|_| node) + .cloned() + }) + .next(); + + match leader_node { + Some(node) => { + debug!( + node_id = %node.generational_node_id, + ?partition_id, + "Asking node to snapshot partition" + ); + + let node_rpc_client = self.processor_manager_client.clone(); + let _ = self.task_center.spawn_child( + TaskKind::Disposable, + "create-snapshot-response", + Some(partition_id), + async move { + let _ = response_tx.send( + node_rpc_client + .create_snapshot(node.generational_node_id, partition_id) + .await, + ); + Ok(()) + }, + ); + } + + None => { + let _ = response_tx.send(Err(anyhow::anyhow!( + "Can not find a suitable node to take snapshot of partition {partition_id}" + ))); + } + }; + } + async fn on_cluster_cmd( &self, command: ClusterControllerCommand, @@ -339,6 +429,14 @@ impl Service { let result = bifrost_admin.trim(log_id, trim_point).await; let _ = response_tx.send(result.map_err(Into::into)); } + ClusterControllerCommand::CreateSnapshot { + partition_id, + response_tx, + } => { + info!(?partition_id, "Create snapshot command received"); + self.create_partition_snapshot(partition_id, response_tx) + .await; + } } } @@ -408,6 +506,56 @@ async fn signal_all_partitions_started( } } +#[derive(Clone)] +struct PartitionProcessorManagerClient +where + N: Clone, +{ + network_sender: N, + networking_options: NetworkingOptions, + create_snapshot_router: RpcRouter, +} + +impl PartitionProcessorManagerClient +where + N: NetworkSender + 'static, +{ + pub fn new( + network_sender: N, + networking_options: NetworkingOptions, + router_builder: &mut MessageRouterBuilder, + ) -> Self { + let create_snapshot_router = RpcRouter::new(router_builder); + + PartitionProcessorManagerClient { + network_sender, + networking_options, + create_snapshot_router, + } + } + + pub async fn create_snapshot( + &self, + node_id: GenerationalNodeId, + partition_id: PartitionId, + ) -> anyhow::Result { + let snapshot_timeout = self.networking_options.rpc_call_timeout.as_ref(); + let response = tokio::time::timeout( + time::Duration::from_millis(snapshot_timeout.as_millis().try_into()?), + self.create_snapshot_router.call( + &self.network_sender, + node_id, + CreateSnapshotRequest { partition_id }, + ), + ) + .await?; + let create_snapshot_response = response?.into_body(); + create_snapshot_response + .result + .map_err(|e| anyhow!("Failed to create snapshot: {:?}", e)) + } +} + #[cfg(test)] mod tests { use super::Service; diff --git a/crates/node/src/network_server/handler/cluster_ctrl.rs b/crates/node/src/network_server/handler/cluster_ctrl.rs index 082cdd4c6..509d49935 100644 --- a/crates/node/src/network_server/handler/cluster_ctrl.rs +++ b/crates/node/src/network_server/handler/cluster_ctrl.rs @@ -14,12 +14,14 @@ use tracing::{debug, info}; use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_server::ClusterCtrlSvc; use restate_admin::cluster_controller::protobuf::{ - ClusterStateRequest, ClusterStateResponse, DescribeLogRequest, DescribeLogResponse, - ListLogsRequest, ListLogsResponse, ListNodesRequest, ListNodesResponse, TrimLogRequest, + ClusterStateRequest, ClusterStateResponse, CreatePartitionSnapshotRequest, + CreatePartitionSnapshotResponse, DescribeLogRequest, DescribeLogResponse, ListLogsRequest, + ListLogsResponse, ListNodesRequest, ListNodesResponse, TrimLogRequest, }; use restate_admin::cluster_controller::ClusterControllerHandle; use restate_bifrost::{Bifrost, FindTailAttributes}; use restate_metadata_store::MetadataStoreClient; +use restate_types::identifiers::PartitionId; use restate_types::logs::metadata::Logs; use restate_types::logs::{LogId, Lsn}; use restate_types::metadata_store::keys::{BIFROST_CONFIG_KEY, NODES_CONFIG_KEY}; @@ -158,6 +160,34 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler { } Ok(Response::new(())) } + + /// Handles ad-hoc snapshot requests, as sent by `restatectl snapshots create`. This is + /// implemented as an RPC call within the cluster to a worker node hosting the partition. + async fn create_partition_snapshot( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let partition_id = PartitionId::from( + u16::try_from(request.partition_id) + .map_err(|id| Status::invalid_argument(format!("Invalid partition id: {id}")))?, + ); + + match self + .controller_handle + .create_partition_snapshot(partition_id) + .await + .map_err(|_| Status::aborted("Node is shutting down"))? + { + Err(err) => { + info!("Failed creating partition snapshot: {err}"); + Err(Status::internal(err.to_string())) + } + Ok(snapshot_id) => Ok(Response::new(CreatePartitionSnapshotResponse { + snapshot_id: snapshot_id.to_string(), + })), + } + } } fn serialize_value(value: T) -> Bytes { diff --git a/crates/types/protobuf/restate/common.proto b/crates/types/protobuf/restate/common.proto index 301b9c27a..cd06b3255 100644 --- a/crates/types/protobuf/restate/common.proto +++ b/crates/types/protobuf/restate/common.proto @@ -43,6 +43,7 @@ enum TargetName { GET_PROCESSORS_STATE_REQUEST = 7; PROCESSORS_STATE_RESPONSE = 8; CONTROL_PROCESSORS = 9; + // LogServer LOG_SERVER_STORE = 10; LOG_SERVER_STORED = 11; @@ -61,6 +62,8 @@ enum TargetName { REPLICATED_LOGLET_APPEND = 22; REPLICATED_LOGLET_APPENDED = 23; + PARTITION_CREATE_SNAPSHOT_REQUEST = 24; + PARTITION_CREATE_SNAPSHOT_RESPONSE = 25; } enum NodeStatus { diff --git a/crates/types/src/config/networking.rs b/crates/types/src/config/networking.rs index a256aa644..8afd829f3 100644 --- a/crates/types/src/config/networking.rs +++ b/crates/types/src/config/networking.rs @@ -41,6 +41,13 @@ pub struct NetworkingOptions { #[serde_as(as = "serde_with::DisplayFromStr")] #[cfg_attr(feature = "schemars", schemars(with = "String"))] pub handshake_timeout: humantime::Duration, + + /// # Request timeout + /// + /// Default timeout for internal cluster RPC calls. + #[serde_as(as = "serde_with::DisplayFromStr")] + #[cfg_attr(feature = "schemars", schemars(with = "String"))] + pub rpc_call_timeout: humantime::Duration, } impl Default for NetworkingOptions { @@ -55,6 +62,7 @@ impl Default for NetworkingOptions { outbound_queue_length: NonZeroUsize::new(1000).expect("Non zero number"), handshake_timeout: Duration::from_secs(3).into(), + rpc_call_timeout: Duration::from_secs(5).into(), } } } diff --git a/crates/types/src/net/partition_processor_manager.rs b/crates/types/src/net/partition_processor_manager.rs index ab1bdcc35..6b721ad7a 100644 --- a/crates/types/src/net/partition_processor_manager.rs +++ b/crates/types/src/net/partition_processor_manager.rs @@ -14,7 +14,7 @@ use serde::{Deserialize, Serialize}; use serde_with::serde_as; use crate::cluster::cluster_state::{PartitionProcessorStatus, RunMode}; -use crate::identifiers::PartitionId; +use crate::identifiers::{PartitionId, SnapshotId}; use crate::net::{define_message, TargetName}; use crate::net::define_rpc; @@ -69,3 +69,25 @@ impl From for ProcessorCommand { } } } + +define_rpc! { + @request = CreateSnapshotRequest, + @response = CreateSnapshotResponse, + @request_target = TargetName::PartitionCreateSnapshotRequest, + @response_target = TargetName::PartitionCreateSnapshotResponse, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CreateSnapshotRequest { + pub partition_id: PartitionId, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CreateSnapshotResponse { + pub result: Result, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum SnapshotError { + SnapshotCreationFailed(String), +} diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index 864076fad..78ebc2a27 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -119,11 +119,10 @@ impl Worker { // ingress_kafka let ingress_kafka = IngressKafkaService::new(ingress_dispatcher.clone()); - let subscription_controller_handle = - subscription_integration::SubscriptionControllerHandle::new( - config.ingress.clone(), - ingress_kafka.create_command_sender(), - ); + let subscription_controller_handle = SubscriptionControllerHandle::new( + config.ingress.clone(), + ingress_kafka.create_command_sender(), + ); let partition_store_manager = PartitionStoreManager::create( updateable_config.clone().map(|c| &c.worker.storage), @@ -154,6 +153,9 @@ impl Worker { bifrost, ); + // handle RPCs + router_builder.add_message_handler(partition_processor_manager.message_handler()); + let storage_query_context = QueryContext::create( &config.admin.query_engine, partition_processor_manager.handle(), diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index 637e89f8b..bb87645e8 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -28,9 +28,9 @@ use tracing::{debug, info, instrument, trace, warn}; use restate_bifrost::Bifrost; use restate_core::network::rpc_router::{RpcError, RpcRouter}; use restate_core::network::{Incoming, MessageRouterBuilder}; -use restate_core::network::{Networking, TransportConnect}; +use restate_core::network::{MessageHandler, Networking, TransportConnect}; use restate_core::worker_api::{ProcessorsManagerCommand, ProcessorsManagerHandle}; -use restate_core::{cancellation_watcher, Metadata, ShutdownError, TaskId, TaskKind}; +use restate_core::{cancellation_watcher, task_center, Metadata, ShutdownError, TaskId, TaskKind}; use restate_core::{RuntimeError, TaskCenter}; use restate_invoker_api::StatusHandle; use restate_invoker_impl::Service as InvokerService; @@ -52,9 +52,12 @@ use restate_types::logs::SequenceNumber; use restate_types::metadata_store::keys::partition_processor_epoch_key; use restate_types::net::cluster_controller::AttachRequest; use restate_types::net::cluster_controller::{Action, AttachResponse}; -use restate_types::net::partition_processor_manager::ProcessorsStateResponse; use restate_types::net::partition_processor_manager::{ - ControlProcessor, ControlProcessors, GetProcessorsState, ProcessorCommand, + ControlProcessor, ControlProcessors, CreateSnapshotResponse, GetProcessorsState, + ProcessorCommand, SnapshotError, +}; +use restate_types::net::partition_processor_manager::{ + CreateSnapshotRequest, ProcessorsStateResponse, }; use restate_types::partition_table::PartitionTable; use restate_types::schema::Schema; @@ -251,6 +254,65 @@ impl PartitionProcessorHandle { } } +/// RPC message handler for Partition Processor management operations. +pub struct PartitionProcessorManagerMessageHandler { + processors_manager_handle: ProcessorsManagerHandle, +} + +impl PartitionProcessorManagerMessageHandler { + fn new( + processors_manager_handle: ProcessorsManagerHandle, + ) -> PartitionProcessorManagerMessageHandler { + Self { + processors_manager_handle, + } + } +} + +impl MessageHandler for PartitionProcessorManagerMessageHandler { + type MessageType = CreateSnapshotRequest; + + async fn on_message(&self, msg: Incoming) { + info!("Received '{:?}' from {}", msg.body(), msg.peer()); + + let processors_manager_handle = self.processors_manager_handle.clone(); + let _ = task_center().spawn_child( + TaskKind::Disposable, + "create-snapshot-request-rpc", + None, + async move { + let result = processors_manager_handle + .create_snapshot(msg.body().partition_id) + .await; + + match result { + Ok(snapshot_id) => { + msg.to_rpc_response(CreateSnapshotResponse { + result: Ok(snapshot_id), + }) + .send() + .await + .ok(); + } + Err(error) => { + msg.to_rpc_response(CreateSnapshotResponse { + result: Err(SnapshotError::SnapshotCreationFailed(format!( + "{}", + error + ))), + }) + .send() + .await + .ok(); + } + } + + Ok(()) + }, + ); + } +} + type ChannelStatusReaderList = Vec<(RangeInclusive, ChannelStatusReader)>; #[derive(Debug, Clone, Default)] @@ -336,6 +398,10 @@ impl PartitionProcessorManager { ProcessorsManagerHandle::new(self.tx.clone()) } + pub(crate) fn message_handler(&self) -> PartitionProcessorManagerMessageHandler { + PartitionProcessorManagerMessageHandler::new(self.handle()) + } + async fn attach(&mut self) -> Result, AttachError> { loop { // We try to get the admin node on every retry since it might change between retries. diff --git a/tools/restatectl/src/app.rs b/tools/restatectl/src/app.rs index f0133b282..2acf55a2e 100644 --- a/tools/restatectl/src/app.rs +++ b/tools/restatectl/src/app.rs @@ -19,6 +19,7 @@ use restate_cli_util::CommonOpts; use crate::commands::dump::Dump; use crate::commands::log::Log; use crate::commands::node::Node; +use crate::commands::snapshot::Snapshot; #[derive(Run, Parser, Clone)] #[command(author, version = crate::build_info::version(), about, infer_subcommands = true)] @@ -50,6 +51,9 @@ pub enum Command { /// Cluster node status #[clap(subcommand)] Nodes(Node), + /// Partition snapshots + #[clap(subcommand)] + Snapshots(Snapshot), } fn init(common_opts: &CommonOpts) { diff --git a/tools/restatectl/src/commands/mod.rs b/tools/restatectl/src/commands/mod.rs index ba9c3f91d..504f23ac1 100644 --- a/tools/restatectl/src/commands/mod.rs +++ b/tools/restatectl/src/commands/mod.rs @@ -12,3 +12,4 @@ mod display_util; pub mod dump; pub mod log; pub mod node; +pub mod snapshot; diff --git a/tools/restatectl/src/commands/snapshot/create_snapshot.rs b/tools/restatectl/src/commands/snapshot/create_snapshot.rs new file mode 100644 index 000000000..0f3f1ae8d --- /dev/null +++ b/tools/restatectl/src/commands/snapshot/create_snapshot.rs @@ -0,0 +1,59 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use anyhow::Context; +use cling::prelude::*; +use tonic::codec::CompressionEncoding; + +use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::ClusterCtrlSvcClient; +use restate_admin::cluster_controller::protobuf::CreatePartitionSnapshotRequest; +use restate_cli_util::c_println; + +use crate::app::ConnectionInfo; +use crate::util::grpc_connect; + +#[derive(Run, Parser, Collect, Clone, Debug)] +#[clap(visible_alias = "create")] +#[cling(run = "create_snapshot")] +pub struct CreateSnapshotOpts { + /// The partition to snapshot + #[arg(short, long)] + partition_id: u16, +} + +async fn create_snapshot( + connection: &ConnectionInfo, + opts: &CreateSnapshotOpts, +) -> anyhow::Result<()> { + let channel = grpc_connect(connection.cluster_controller.clone()) + .await + .with_context(|| { + format!( + "cannot connect to cluster controller at {}", + connection.cluster_controller + ) + })?; + let mut client = + ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); + + let request = CreatePartitionSnapshotRequest { + partition_id: opts.partition_id as u32, + }; + + let response = client + .create_partition_snapshot(request) + .await + .map_err(|e| anyhow::anyhow!("failed to request snapshot: {:?}", e))? + .into_inner(); + + c_println!("Snapshot created: {}", response.snapshot_id); + + Ok(()) +} diff --git a/tools/restatectl/src/commands/snapshot/mod.rs b/tools/restatectl/src/commands/snapshot/mod.rs new file mode 100644 index 000000000..2ef179e6a --- /dev/null +++ b/tools/restatectl/src/commands/snapshot/mod.rs @@ -0,0 +1,19 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +mod create_snapshot; + +use cling::prelude::*; + +#[derive(Run, Subcommand, Clone)] +pub enum Snapshot { + /// Create. + CreateSnapshot(create_snapshot::CreateSnapshotOpts), +}