Skip to content

Commit

Permalink
Expose a partition worker CreateSnapshot RPC
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Sep 25, 2024
1 parent 97014a8 commit 33b351d
Show file tree
Hide file tree
Showing 12 changed files with 368 additions and 14 deletions.
10 changes: 10 additions & 0 deletions crates/admin/protobuf/cluster_ctrl_svc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ service ClusterCtrlSvc {
rpc ListNodes(ListNodesRequest) returns (ListNodesResponse);

rpc TrimLog(TrimLogRequest) returns (google.protobuf.Empty);

rpc CreatePartitionSnapshot(CreatePartitionSnapshotRequest) returns (CreatePartitionSnapshotResponse);
}

message ClusterStateRequest { }
Expand Down Expand Up @@ -68,3 +70,11 @@ message TrimLogRequest {
uint32 log_id = 1;
uint64 trim_point = 2;
}

message CreatePartitionSnapshotRequest {
uint32 partition_id = 1;
}

message CreatePartitionSnapshotResponse {
string snapshot_id = 1;
}
154 changes: 151 additions & 3 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,31 @@

use std::collections::BTreeMap;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;

use codederror::CodedError;
use futures::future::OptionFuture;
use futures::{Stream, StreamExt};
use restate_core::network::rpc_router::RpcRouter;
use restate_types::net::partition_processor_manager::CreateSnapshotRequest;
use tokio::sync::{mpsc, oneshot};
use tokio::time;
use tokio::time::{Instant, Interval, MissedTickBehavior};
use tracing::{debug, info, warn};

use restate_bifrost::{Bifrost, BifrostAdmin};
use restate_core::metadata_store::MetadataStoreClient;
use restate_core::network::{Incoming, MessageRouterBuilder, Networking, TransportConnect};
use restate_core::network::{
Incoming, MessageRouterBuilder, NetworkSender, Networking, TransportConnect,
};
use restate_core::{
cancellation_watcher, Metadata, MetadataWriter, ShutdownError, TargetVersion, TaskCenter,
TaskKind,
};
use restate_types::cluster::cluster_state::{AliveNode, ClusterState, NodeState};
use restate_types::config::{AdminOptions, Configuration};
use restate_types::identifiers::PartitionId;
use restate_types::config::{AdminOptions, Configuration, NetworkingOptions};
use restate_types::identifiers::{PartitionId, SnapshotId};
use restate_types::live::Live;
use restate_types::logs::{LogId, Lsn, SequenceNumber};
use restate_types::net::cluster_controller::{AttachRequest, AttachResponse};
Expand All @@ -52,6 +57,7 @@ pub struct Service<T> {
networking: Networking<T>,
incoming_messages: Pin<Box<dyn Stream<Item = Incoming<AttachRequest>> + Send + Sync + 'static>>,
cluster_state_refresher: ClusterStateRefresher<T>,
processor_manager_client: PartitionProcessorManagerClient<Networking<T>>,
command_tx: mpsc::Sender<ClusterControllerCommand>,
command_rx: mpsc::Receiver<ClusterControllerCommand>,

Expand Down Expand Up @@ -88,6 +94,12 @@ where

let options = configuration.live_load();

let processor_manager_client = PartitionProcessorManagerClient::new(
networking.clone(),
options.networking.clone(),
router_builder,
);

let heartbeat_interval = Self::create_heartbeat_interval(&options.admin);
let (log_trim_interval, log_trim_threshold) =
Self::create_log_trim_interval(&options.admin);
Expand All @@ -101,6 +113,7 @@ where
cluster_state_refresher,
metadata_writer,
metadata_store_client,
processor_manager_client,
command_tx,
command_rx,
heartbeat_interval,
Expand Down Expand Up @@ -132,13 +145,18 @@ where
}
}

#[derive(Debug)]
enum ClusterControllerCommand {
GetClusterState(oneshot::Sender<Arc<ClusterState>>),
TrimLog {
log_id: LogId,
trim_point: Lsn,
response_tx: oneshot::Sender<anyhow::Result<()>>,
},
CreateSnapshot {
partition_id: PartitionId,
response_tx: oneshot::Sender<anyhow::Result<SnapshotId>>,
},
}

pub struct ClusterControllerHandle {
Expand Down Expand Up @@ -174,6 +192,23 @@ impl ClusterControllerHandle {

rx.await.map_err(|_| ShutdownError)
}

pub async fn create_partition_snapshot(
&self,
partition_id: PartitionId,
) -> Result<Result<SnapshotId, anyhow::Error>, ShutdownError> {
let (tx, rx) = oneshot::channel();

let _ = self
.tx
.send(ClusterControllerCommand::CreateSnapshot {
partition_id,
response_tx: tx,
})
.await;

rx.await.map_err(|_| ShutdownError)
}
}

impl<T: TransportConnect> Service<T> {
Expand Down Expand Up @@ -318,6 +353,61 @@ impl<T: TransportConnect> Service<T> {
Ok(())
}

/// Triggers a snapshot creation for the given partition by issuing an RPC
/// to the node hosting the active leader.
async fn create_partition_snapshot(
&self,
partition_id: PartitionId,
response_tx: oneshot::Sender<anyhow::Result<SnapshotId>>,
) {
let cluster_state = self.cluster_state_refresher.get_cluster_state();

// For now, we just pick the leader node since we know that every partition is likely to
// have one. We'll want to update the algorithm to be smart about scheduling snapshot tasks
// in the future to avoid disrupting the leader when there are up-to-date followers.
let leader_node = cluster_state
.alive_nodes()
.filter_map(|node| {
node.partitions
.get(&partition_id)
.filter(|status| status.is_effective_leader())
.map(|_| node)
.cloned()
})
.next();

match leader_node {
Some(node) => {
info!(
node_id = ?node.generational_node_id,
?partition_id,
"Asking node to snapshot partition"
);

let node_rpc_client = self.processor_manager_client.clone();
let _ = self.task_center.spawn_child(
TaskKind::Disposable,
"create-snapshot-response",
Some(partition_id),
async move {
let _ = response_tx.send(
node_rpc_client
.create_snapshot(node.generational_node_id, partition_id)
.await,
);
Ok(())
},
);
}

None => {
let _ = response_tx.send(Err(anyhow::anyhow!(
"Can not find a suitable node to take snapshot of partition {partition_id}"
)));
}
};
}

async fn on_cluster_cmd(
&self,
command: ClusterControllerCommand,
Expand All @@ -339,6 +429,14 @@ impl<T: TransportConnect> Service<T> {
let result = bifrost_admin.trim(log_id, trim_point).await;
let _ = response_tx.send(result.map_err(Into::into));
}
ClusterControllerCommand::CreateSnapshot {
partition_id,
response_tx,
} => {
info!(?partition_id, "Create snapshot command received");
self.create_partition_snapshot(partition_id, response_tx)
.await;
}
}
}

Expand Down Expand Up @@ -408,6 +506,56 @@ async fn signal_all_partitions_started(
}
}

#[derive(Clone)]
struct PartitionProcessorManagerClient<N>
where
N: Clone,
{
network_sender: N,
networking_options: NetworkingOptions,
create_snapshot_router: RpcRouter<CreateSnapshotRequest>,
}

impl<N> PartitionProcessorManagerClient<N>
where
N: NetworkSender + 'static,
{
pub fn new(
network_sender: N,
networking_options: NetworkingOptions,
router_builder: &mut MessageRouterBuilder,
) -> Self {
let create_snapshot_router = RpcRouter::new(router_builder);

PartitionProcessorManagerClient {
network_sender,
networking_options,
create_snapshot_router,
}
}

pub async fn create_snapshot(
&self,
node_id: GenerationalNodeId,
partition_id: PartitionId,
) -> anyhow::Result<SnapshotId> {
let snapshot_timeout = self.networking_options.rpc_call_timeout.as_ref();
let response = tokio::time::timeout(
time::Duration::from_millis(snapshot_timeout.as_millis().try_into()?),
self.create_snapshot_router.call(
&self.network_sender,
node_id,
CreateSnapshotRequest { partition_id },
),
)
.await?;
let create_snapshot_response = response?.into_body();
Ok(SnapshotId::from_str(
create_snapshot_response.snapshot_id.as_str(),
)?)
}
}

#[cfg(test)]
mod tests {
use super::Service;
Expand Down
34 changes: 32 additions & 2 deletions crates/node/src/network_server/handler/cluster_ctrl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ use tracing::{debug, info};

use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_server::ClusterCtrlSvc;
use restate_admin::cluster_controller::protobuf::{
ClusterStateRequest, ClusterStateResponse, DescribeLogRequest, DescribeLogResponse,
ListLogsRequest, ListLogsResponse, ListNodesRequest, ListNodesResponse, TrimLogRequest,
ClusterStateRequest, ClusterStateResponse, CreatePartitionSnapshotRequest,
CreatePartitionSnapshotResponse, DescribeLogRequest, DescribeLogResponse, ListLogsRequest,
ListLogsResponse, ListNodesRequest, ListNodesResponse, TrimLogRequest,
};
use restate_admin::cluster_controller::ClusterControllerHandle;
use restate_bifrost::{Bifrost, FindTailAttributes};
use restate_metadata_store::MetadataStoreClient;
use restate_types::identifiers::PartitionId;
use restate_types::logs::metadata::Logs;
use restate_types::logs::{LogId, Lsn};
use restate_types::metadata_store::keys::{BIFROST_CONFIG_KEY, NODES_CONFIG_KEY};
Expand Down Expand Up @@ -158,6 +160,34 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler {
}
Ok(Response::new(()))
}

/// Handles ad-hoc snapshot requests, as sent by `restatectl snapshots create`. This is
/// implemented as an RPC call within the cluster to a worker node hosting the partition.
async fn create_partition_snapshot(
&self,
request: Request<CreatePartitionSnapshotRequest>,
) -> Result<Response<CreatePartitionSnapshotResponse>, Status> {
let request = request.into_inner();
let partition_id = PartitionId::from(
u16::try_from(request.partition_id)
.map_err(|id| Status::invalid_argument(format!("Invalid partition id: {id}")))?,
);

match self
.controller_handle
.create_partition_snapshot(partition_id)
.await
.map_err(|_| Status::aborted("Node is shutting down"))?
{
Err(err) => {
info!("Failed creating partition snapshot: {err}");
Err(Status::internal(err.to_string()))
}
Ok(snapshot_id) => Ok(Response::new(CreatePartitionSnapshotResponse {
snapshot_id: snapshot_id.to_string(),
})),
}
}
}

fn serialize_value<T: StorageEncode>(value: T) -> Bytes {
Expand Down
3 changes: 3 additions & 0 deletions crates/types/protobuf/restate/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ enum TargetName {
GET_PROCESSORS_STATE_REQUEST = 7;
PROCESSORS_STATE_RESPONSE = 8;
CONTROL_PROCESSORS = 9;

// LogServer
LOG_SERVER_STORE = 10;
LOG_SERVER_STORED = 11;
Expand All @@ -61,6 +62,8 @@ enum TargetName {
REPLICATED_LOGLET_APPEND = 22;
REPLICATED_LOGLET_APPENDED = 23;

PARTITION_CREATE_SNAPSHOT_REQUEST = 24;
PARTITION_CREATE_SNAPSHOT_RESPONSE = 25;
}

enum NodeStatus {
Expand Down
8 changes: 8 additions & 0 deletions crates/types/src/config/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ pub struct NetworkingOptions {
#[serde_as(as = "serde_with::DisplayFromStr")]
#[cfg_attr(feature = "schemars", schemars(with = "String"))]
pub handshake_timeout: humantime::Duration,

/// # Request timeout
///
/// Default timeout for internal cluster RPC calls.
#[serde_as(as = "serde_with::DisplayFromStr")]
#[cfg_attr(feature = "schemars", schemars(with = "String"))]
pub rpc_call_timeout: humantime::Duration,
}

impl Default for NetworkingOptions {
Expand All @@ -55,6 +62,7 @@ impl Default for NetworkingOptions {

outbound_queue_length: NonZeroUsize::new(1000).expect("Non zero number"),
handshake_timeout: Duration::from_secs(3).into(),
rpc_call_timeout: Duration::from_secs(5).into(),
}
}
}
17 changes: 17 additions & 0 deletions crates/types/src/net/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,20 @@ impl From<RunMode> for ProcessorCommand {
}
}
}

define_rpc! {
@request = CreateSnapshotRequest,
@response = CreateSnapshotResponse,
@request_target = TargetName::PartitionCreateSnapshotRequest,
@response_target = TargetName::PartitionCreateSnapshotResponse,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateSnapshotRequest {
pub partition_id: PartitionId,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateSnapshotResponse {
pub snapshot_id: String,
}
12 changes: 7 additions & 5 deletions crates/worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,10 @@ impl<T: TransportConnect> Worker<T> {

// ingress_kafka
let ingress_kafka = IngressKafkaService::new(ingress_dispatcher.clone());
let subscription_controller_handle =
subscription_integration::SubscriptionControllerHandle::new(
config.ingress.clone(),
ingress_kafka.create_command_sender(),
);
let subscription_controller_handle = SubscriptionControllerHandle::new(
config.ingress.clone(),
ingress_kafka.create_command_sender(),
);

let partition_store_manager = PartitionStoreManager::create(
updateable_config.clone().map(|c| &c.worker.storage),
Expand Down Expand Up @@ -154,6 +153,9 @@ impl<T: TransportConnect> Worker<T> {
bifrost,
);

// handle RPCs
router_builder.add_message_handler(partition_processor_manager.message_handler());

let storage_query_context = QueryContext::create(
&config.admin.query_engine,
partition_processor_manager.handle(),
Expand Down
Loading

0 comments on commit 33b351d

Please sign in to comment.