Skip to content

Commit

Permalink
Expose a partition worker CreateSnapshot RPC
Browse files Browse the repository at this point in the history
stack-info: PR: #1998, branch: pcholakov/stack/1
  • Loading branch information
pcholakov committed Oct 4, 2024
1 parent 13b6fda commit d621a61
Show file tree
Hide file tree
Showing 11 changed files with 382 additions and 15 deletions.
10 changes: 10 additions & 0 deletions crates/admin/protobuf/cluster_ctrl_svc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ service ClusterCtrlSvc {
rpc ListNodes(ListNodesRequest) returns (ListNodesResponse);

rpc TrimLog(TrimLogRequest) returns (google.protobuf.Empty);

rpc CreatePartitionSnapshot(CreatePartitionSnapshotRequest) returns (CreatePartitionSnapshotResponse);
}

message ClusterStateRequest { }
Expand Down Expand Up @@ -68,3 +70,11 @@ message TrimLogRequest {
uint32 log_id = 1;
uint64 trim_point = 2;
}

message CreatePartitionSnapshotRequest {
uint32 partition_id = 1;
}

message CreatePartitionSnapshotResponse {
string snapshot_id = 1;
}
155 changes: 152 additions & 3 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
use std::collections::BTreeMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

use anyhow::anyhow;
use codederror::CodedError;
use futures::future::OptionFuture;
use futures::{Stream, StreamExt};
Expand All @@ -22,18 +24,22 @@ use tracing::{debug, info, warn};

use restate_bifrost::{Bifrost, BifrostAdmin};
use restate_core::metadata_store::MetadataStoreClient;
use restate_core::network::{Incoming, MessageRouterBuilder, Networking, TransportConnect};
use restate_core::network::rpc_router::RpcRouter;
use restate_core::network::{
Incoming, MessageRouterBuilder, NetworkSender, Networking, TransportConnect,
};
use restate_core::{
cancellation_watcher, Metadata, MetadataWriter, ShutdownError, TargetVersion, TaskCenter,
TaskKind,
};
use restate_types::cluster::cluster_state::{AliveNode, ClusterState, NodeState};
use restate_types::config::{AdminOptions, Configuration};
use restate_types::identifiers::PartitionId;
use restate_types::identifiers::{PartitionId, SnapshotId};
use restate_types::live::Live;
use restate_types::logs::{LogId, Lsn, SequenceNumber};
use restate_types::net::cluster_controller::{AttachRequest, AttachResponse};
use restate_types::net::metadata::MetadataKind;
use restate_types::net::partition_processor_manager::CreateSnapshotRequest;
use restate_types::{GenerationalNodeId, Version};

use super::cluster_state::{ClusterStateRefresher, ClusterStateWatcher};
Expand All @@ -52,6 +58,7 @@ pub struct Service<T> {
networking: Networking<T>,
incoming_messages: Pin<Box<dyn Stream<Item = Incoming<AttachRequest>> + Send + Sync + 'static>>,
cluster_state_refresher: ClusterStateRefresher<T>,
processor_manager_client: PartitionProcessorManagerClient<Networking<T>>,
command_tx: mpsc::Sender<ClusterControllerCommand>,
command_rx: mpsc::Receiver<ClusterControllerCommand>,

Expand Down Expand Up @@ -86,8 +93,13 @@ where
router_builder,
);

let options = configuration.live_load();
let processor_manager_client = PartitionProcessorManagerClient::new(
networking.clone(),
configuration.clone(),
router_builder,
);

let options = configuration.live_load();
let heartbeat_interval = Self::create_heartbeat_interval(&options.admin);
let (log_trim_interval, log_trim_threshold) =
Self::create_log_trim_interval(&options.admin);
Expand All @@ -101,6 +113,7 @@ where
cluster_state_refresher,
metadata_writer,
metadata_store_client,
processor_manager_client,
command_tx,
command_rx,
heartbeat_interval,
Expand Down Expand Up @@ -132,13 +145,18 @@ where
}
}

#[derive(Debug)]
enum ClusterControllerCommand {
GetClusterState(oneshot::Sender<Arc<ClusterState>>),
TrimLog {
log_id: LogId,
trim_point: Lsn,
response_tx: oneshot::Sender<anyhow::Result<()>>,
},
CreateSnapshot {
partition_id: PartitionId,
response_tx: oneshot::Sender<anyhow::Result<SnapshotId>>,
},
}

pub struct ClusterControllerHandle {
Expand Down Expand Up @@ -174,6 +192,23 @@ impl ClusterControllerHandle {

rx.await.map_err(|_| ShutdownError)
}

pub async fn create_partition_snapshot(
&self,
partition_id: PartitionId,
) -> Result<Result<SnapshotId, anyhow::Error>, ShutdownError> {
let (tx, rx) = oneshot::channel();

let _ = self
.tx
.send(ClusterControllerCommand::CreateSnapshot {
partition_id,
response_tx: tx,
})
.await;

rx.await.map_err(|_| ShutdownError)
}
}

impl<T: TransportConnect> Service<T> {
Expand Down Expand Up @@ -318,6 +353,61 @@ impl<T: TransportConnect> Service<T> {
Ok(())
}

/// Triggers a snapshot creation for the given partition by issuing an RPC
/// to the node hosting the active leader.
async fn create_partition_snapshot(
&self,
partition_id: PartitionId,
response_tx: oneshot::Sender<anyhow::Result<SnapshotId>>,
) {
let cluster_state = self.cluster_state_refresher.get_cluster_state();

// For now, we just pick the leader node since we know that every partition is likely to
// have one. We'll want to update the algorithm to be smart about scheduling snapshot tasks
// in the future to avoid disrupting the leader when there are up-to-date followers.
let leader_node = cluster_state
.alive_nodes()
.filter_map(|node| {
node.partitions
.get(&partition_id)
.filter(|status| status.is_effective_leader())
.map(|_| node)
.cloned()
})
.next();

match leader_node {
Some(node) => {
debug!(
node_id = %node.generational_node_id,
?partition_id,
"Asking node to snapshot partition"
);

let mut node_rpc_client = self.processor_manager_client.clone();
let _ = self.task_center.spawn_child(
TaskKind::Disposable,
"create-snapshot-response",
Some(partition_id),
async move {
let _ = response_tx.send(
node_rpc_client
.create_snapshot(node.generational_node_id, partition_id)
.await,
);
Ok(())
},
);
}

None => {
let _ = response_tx.send(Err(anyhow::anyhow!(
"Can not find a suitable node to take snapshot of partition {partition_id}"
)));
}
};
}

async fn on_cluster_cmd(
&self,
command: ClusterControllerCommand,
Expand All @@ -339,6 +429,14 @@ impl<T: TransportConnect> Service<T> {
let result = bifrost_admin.trim(log_id, trim_point).await;
let _ = response_tx.send(result.map_err(Into::into));
}
ClusterControllerCommand::CreateSnapshot {
partition_id,
response_tx,
} => {
info!(?partition_id, "Create snapshot command received");
self.create_partition_snapshot(partition_id, response_tx)
.await;
}
}
}

Expand Down Expand Up @@ -408,6 +506,57 @@ async fn signal_all_partitions_started(
}
}

#[derive(Clone)]
struct PartitionProcessorManagerClient<N>
where
N: Clone,
{
network_sender: N,
// create_snapshot_timeout: Duration,
configuration: Live<Configuration>,

Check failure on line 516 in crates/admin/src/cluster_controller/service.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

field `configuration` is never read
create_snapshot_router: RpcRouter<CreateSnapshotRequest>,
}

impl<N> PartitionProcessorManagerClient<N>
where
N: NetworkSender + 'static,
{
pub fn new(
network_sender: N,
configuration: Live<Configuration>,
router_builder: &mut MessageRouterBuilder,
) -> Self {
let create_snapshot_router = RpcRouter::new(router_builder);

PartitionProcessorManagerClient {
network_sender,
configuration,
create_snapshot_router,
}
}

pub async fn create_snapshot(
&mut self,
node_id: GenerationalNodeId,
partition_id: PartitionId,
) -> anyhow::Result<SnapshotId> {
// todo(pavel): make snapshot RPC timeout configurable, especially if this includes remote upload in the future
let response = tokio::time::timeout(
Duration::from_secs(30),
self.create_snapshot_router.call(
&self.network_sender,
node_id,
CreateSnapshotRequest { partition_id },
),
)
.await?;
let create_snapshot_response = response?.into_body();
create_snapshot_response
.result
.map_err(|e| anyhow!("Failed to create snapshot: {:?}", e))
}
}

#[cfg(test)]
mod tests {
use super::Service;
Expand Down
34 changes: 32 additions & 2 deletions crates/node/src/network_server/handler/cluster_ctrl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ use tracing::{debug, info};

use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_server::ClusterCtrlSvc;
use restate_admin::cluster_controller::protobuf::{
ClusterStateRequest, ClusterStateResponse, DescribeLogRequest, DescribeLogResponse,
ListLogsRequest, ListLogsResponse, ListNodesRequest, ListNodesResponse, TrimLogRequest,
ClusterStateRequest, ClusterStateResponse, CreatePartitionSnapshotRequest,
CreatePartitionSnapshotResponse, DescribeLogRequest, DescribeLogResponse, ListLogsRequest,
ListLogsResponse, ListNodesRequest, ListNodesResponse, TrimLogRequest,
};
use restate_admin::cluster_controller::ClusterControllerHandle;
use restate_bifrost::{Bifrost, FindTailAttributes};
use restate_metadata_store::MetadataStoreClient;
use restate_types::identifiers::PartitionId;
use restate_types::logs::metadata::Logs;
use restate_types::logs::{LogId, Lsn};
use restate_types::metadata_store::keys::{BIFROST_CONFIG_KEY, NODES_CONFIG_KEY};
Expand Down Expand Up @@ -158,6 +160,34 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler {
}
Ok(Response::new(()))
}

/// Handles ad-hoc snapshot requests, as sent by `restatectl snapshots create`. This is
/// implemented as an RPC call within the cluster to a worker node hosting the partition.
async fn create_partition_snapshot(
&self,
request: Request<CreatePartitionSnapshotRequest>,
) -> Result<Response<CreatePartitionSnapshotResponse>, Status> {
let request = request.into_inner();
let partition_id = PartitionId::from(
u16::try_from(request.partition_id)
.map_err(|id| Status::invalid_argument(format!("Invalid partition id: {id}")))?,
);

match self
.controller_handle
.create_partition_snapshot(partition_id)
.await
.map_err(|_| Status::aborted("Node is shutting down"))?
{
Err(err) => {
info!("Failed creating partition snapshot: {err}");
Err(Status::internal(err.to_string()))
}
Ok(snapshot_id) => Ok(Response::new(CreatePartitionSnapshotResponse {
snapshot_id: snapshot_id.to_string(),
})),
}
}
}

fn serialize_value<T: StorageEncode>(value: T) -> Bytes {
Expand Down
4 changes: 4 additions & 0 deletions crates/types/protobuf/restate/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ enum TargetName {
GET_PROCESSORS_STATE_REQUEST = 7;
PROCESSORS_STATE_RESPONSE = 8;
CONTROL_PROCESSORS = 9;

// LogServer
LOG_SERVER_STORE = 10;
LOG_SERVER_STORED = 11;
Expand All @@ -63,6 +64,9 @@ enum TargetName {
// ReplicatedLoglet
REPLICATED_LOGLET_APPEND = 40;
REPLICATED_LOGLET_APPENDED = 41;

PARTITION_CREATE_SNAPSHOT_REQUEST = 42;
PARTITION_CREATE_SNAPSHOT_RESPONSE = 43;
}

enum NodeStatus {
Expand Down
24 changes: 23 additions & 1 deletion crates/types/src/net/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use serde::{Deserialize, Serialize};
use serde_with::serde_as;

use crate::cluster::cluster_state::{PartitionProcessorStatus, RunMode};
use crate::identifiers::PartitionId;
use crate::identifiers::{PartitionId, SnapshotId};
use crate::net::{define_message, TargetName};

use crate::net::define_rpc;
Expand Down Expand Up @@ -69,3 +69,25 @@ impl From<RunMode> for ProcessorCommand {
}
}
}

define_rpc! {
@request = CreateSnapshotRequest,
@response = CreateSnapshotResponse,
@request_target = TargetName::PartitionCreateSnapshotRequest,
@response_target = TargetName::PartitionCreateSnapshotResponse,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateSnapshotRequest {
pub partition_id: PartitionId,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateSnapshotResponse {
pub result: Result<SnapshotId, SnapshotError>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SnapshotError {
SnapshotCreationFailed(String),
}
12 changes: 7 additions & 5 deletions crates/worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,10 @@ impl<T: TransportConnect> Worker<T> {

// ingress_kafka
let ingress_kafka = IngressKafkaService::new(ingress_dispatcher.clone());
let subscription_controller_handle =
subscription_integration::SubscriptionControllerHandle::new(
config.ingress.clone(),
ingress_kafka.create_command_sender(),
);
let subscription_controller_handle = SubscriptionControllerHandle::new(
config.ingress.clone(),
ingress_kafka.create_command_sender(),
);

let partition_store_manager = PartitionStoreManager::create(
updateable_config.clone().map(|c| &c.worker.storage),
Expand Down Expand Up @@ -154,6 +153,9 @@ impl<T: TransportConnect> Worker<T> {
bifrost,
);

// handle RPCs
router_builder.add_message_handler(partition_processor_manager.message_handler());

let storage_query_context = QueryContext::create(
&config.admin.query_engine,
partition_processor_manager.handle(),
Expand Down
Loading

0 comments on commit d621a61

Please sign in to comment.