From cc49b413f940748d32201886718cd72317b32f8a Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 19 Aug 2024 13:09:55 +0200 Subject: [PATCH] Implement simple networking layer for RaftMetadataStore This fixes #1803. --- Cargo.lock | 1 + .../admin/src/cluster_controller/service.rs | 4 +- crates/metadata-store/Cargo.toml | 1 + crates/metadata-store/build.rs | 6 + .../proto/raft_metadata_store_svc.proto | 24 +++ .../src/raft/connection_manager.rs | 192 ++++++++++++++++++ crates/metadata-store/src/raft/grpc_svc.rs | 14 ++ crates/metadata-store/src/raft/handler.rs | 67 ++++++ crates/metadata-store/src/raft/mod.rs | 4 + crates/metadata-store/src/raft/networking.rs | 145 +++++++++++++ crates/metadata-store/src/raft/service.rs | 39 +++- crates/metadata-store/src/raft/storage.rs | 4 +- crates/metadata-store/src/raft/store.rs | 51 +++-- crates/node/src/lib.rs | 6 +- crates/types/src/config/metadata_store.rs | 18 +- 15 files changed, 551 insertions(+), 25 deletions(-) create mode 100644 crates/metadata-store/proto/raft_metadata_store_svc.proto create mode 100644 crates/metadata-store/src/raft/connection_manager.rs create mode 100644 crates/metadata-store/src/raft/grpc_svc.rs create mode 100644 crates/metadata-store/src/raft/handler.rs create mode 100644 crates/metadata-store/src/raft/networking.rs diff --git a/Cargo.lock b/Cargo.lock index 004caf32a..1a2ab6b64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6032,6 +6032,7 @@ dependencies = [ "bytestring", "codederror", "derive_builder", + "derive_more", "flexbuffers", "futures", "googletest", diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index 1d4a824fd..3524c26c1 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -241,7 +241,9 @@ where } } Ok(cluster_state) = cluster_state_watcher.next_cluster_state() => { - scheduler.on_cluster_state_update(cluster_state).await?; + if let Err(err) = scheduler.on_cluster_state_update(cluster_state).await { + warn!("Could not perform scheduling operation: {err}"); + } } Some(cmd) = self.command_rx.recv() => { self.on_cluster_cmd(cmd, bifrost_admin).await; diff --git a/crates/metadata-store/Cargo.toml b/crates/metadata-store/Cargo.toml index b1bf2753b..40ea78a0e 100644 --- a/crates/metadata-store/Cargo.toml +++ b/crates/metadata-store/Cargo.toml @@ -23,6 +23,7 @@ async-trait = { workspace = true } bytes = { workspace = true } bytestring = { workspace = true } derive_builder = { workspace = true } +derive_more = { workspace = true } futures = { workspace = true } http = { workspace = true } humantime = { workspace = true } diff --git a/crates/metadata-store/build.rs b/crates/metadata-store/build.rs index b2b100490..cdb69918f 100644 --- a/crates/metadata-store/build.rs +++ b/crates/metadata-store/build.rs @@ -21,5 +21,11 @@ fn main() -> Result<(), Box> { .protoc_arg("--experimental_allow_proto3_optional") .compile(&["./proto/metadata_store_svc.proto"], &["proto"])?; + tonic_build::configure() + .bytes(["."]) + .file_descriptor_set_path(out_dir.join("raft_metadata_store_svc.bin")) + .protoc_arg("--experimental_allow_proto3_optional") + .compile(&["./proto/raft_metadata_store_svc.proto"], &["proto"])?; + Ok(()) } diff --git a/crates/metadata-store/proto/raft_metadata_store_svc.proto b/crates/metadata-store/proto/raft_metadata_store_svc.proto new file mode 100644 index 000000000..84a4f4f07 --- /dev/null +++ b/crates/metadata-store/proto/raft_metadata_store_svc.proto @@ -0,0 +1,24 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate service protocol, which is +// released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/proto/blob/main/LICENSE + +syntax = "proto3"; + +import "google/protobuf/empty.proto"; + +package dev.restate.raft_metadata_store_svc; + +// Grpc service definition for the RaftMetadataStore implementation. +service RaftMetadataStoreSvc { + rpc Raft(stream RaftMessage) returns (stream RaftMessage); +} + +message RaftMessage { + bytes message = 1; +} + diff --git a/crates/metadata-store/src/raft/connection_manager.rs b/crates/metadata-store/src/raft/connection_manager.rs new file mode 100644 index 000000000..f579fc81c --- /dev/null +++ b/crates/metadata-store/src/raft/connection_manager.rs @@ -0,0 +1,192 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::raft::grpc_svc::RaftMessage; +use futures::StreamExt; +use protobuf::Message as ProtobufMessage; +use raft::prelude::Message; +use restate_core::{cancellation_watcher, ShutdownError, TaskCenter, TaskKind}; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TrySendError; +use tokio_stream::wrappers::ReceiverStream; +use tonic::codegen::BoxStream; +use tracing::{debug, instrument}; + +#[derive(Debug, thiserror::Error)] +pub enum ConnectionError { + #[error("internal error: {0}")] + Internal(String), + #[error(transparent)] + Shutdown(#[from] ShutdownError), +} + +#[derive(Clone, derive_more::Debug)] +pub struct ConnectionManager { + inner: Arc, + #[debug(skip)] + task_center: TaskCenter, +} + +impl ConnectionManager { + pub fn new(task_center: TaskCenter, identity: u64, router: mpsc::Sender) -> Self { + ConnectionManager { + inner: Arc::new(ConnectionManagerInner::new(identity, router)), + task_center, + } + } + + pub fn identity(&self) -> u64 { + self.inner.identity + } + + pub fn accept_connection( + &self, + raft_peer: u64, + incoming_rx: tonic::Streaming, + ) -> Result, ConnectionError> { + let (outgoing_tx, outgoing_rx) = mpsc::channel(128); + self.run_connection(raft_peer, outgoing_tx, incoming_rx)?; + + let outgoing_stream = ReceiverStream::new(outgoing_rx) + .map(Result::<_, tonic::Status>::Ok) + .boxed(); + Ok(outgoing_stream) + } + + pub fn run_connection( + &self, + remote_peer: u64, + outgoing_tx: mpsc::Sender, + incoming_rx: tonic::Streaming, + ) -> Result<(), ConnectionError> { + let mut guard = self.inner.connections.lock().unwrap(); + + if guard.contains_key(&remote_peer) { + // we already have a connection established to remote peer + return Ok(()); + } + + let connection = Connection::new(outgoing_tx); + guard.insert(remote_peer, connection); + + let reactor = ConnectionReactor { + remote_peer, + connection_manager: Arc::clone(&self.inner), + }; + + let _task_id = self.task_center.spawn_child( + TaskKind::ConnectionReactor, + "raft-connection-reactor", + None, + reactor.run(incoming_rx), + )?; + + Ok(()) + } + + pub fn get_connection(&self, target: u64) -> Option { + self.inner.connections.lock().unwrap().get(&target).cloned() + } +} + +struct ConnectionReactor { + remote_peer: u64, + connection_manager: Arc, +} + +impl ConnectionReactor { + #[instrument(level = "debug", skip_all, fields(remote_peer = %self.remote_peer))] + async fn run(self, mut incoming_rx: tonic::Streaming) -> anyhow::Result<()> { + let mut shutdown = std::pin::pin!(cancellation_watcher()); + debug!("Run connection reactor"); + + loop { + tokio::select! { + _ = &mut shutdown => { + break; + }, + message = incoming_rx.next() => { + match message { + Some(message) => { + match message { + Ok(message) => { + let message = Message::parse_from_carllerche_bytes(&message.message)?; + + assert_eq!(message.to, self.connection_manager.identity, "Expect to only receive messages for peer '{}'", self.connection_manager.identity); + + if self.connection_manager.router.send(message).await.is_err() { + // system is shutting down + debug!("System is shutting down; closing connection"); + break; + } + } + Err(err) => { + debug!("Closing connection because received error: {err}"); + break; + } + } + } + None => { + debug!("Remote peer closed connection"); + break + }, + } + } + } + } + + Ok(()) + } +} + +impl Drop for ConnectionReactor { + fn drop(&mut self) { + debug!(remote_peer = %self.remote_peer, "Close connection"); + self.connection_manager + .connections + .lock() + .expect("shouldn't be poisoned") + .remove(&self.remote_peer); + } +} + +#[derive(Debug)] +struct ConnectionManagerInner { + identity: u64, + connections: Mutex>, + router: mpsc::Sender, +} + +impl ConnectionManagerInner { + pub fn new(identity: u64, router: mpsc::Sender) -> Self { + ConnectionManagerInner { + identity, + router, + connections: Mutex::default(), + } + } +} + +#[derive(Debug, Clone)] +pub struct Connection { + tx: mpsc::Sender, +} + +impl Connection { + pub fn new(tx: mpsc::Sender) -> Self { + Connection { tx } + } + + pub fn try_send(&self, message: RaftMessage) -> Result<(), TrySendError> { + self.tx.try_send(message) + } +} diff --git a/crates/metadata-store/src/raft/grpc_svc.rs b/crates/metadata-store/src/raft/grpc_svc.rs new file mode 100644 index 000000000..a9064c261 --- /dev/null +++ b/crates/metadata-store/src/raft/grpc_svc.rs @@ -0,0 +1,14 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +tonic::include_proto!("dev.restate.raft_metadata_store_svc"); + +pub const FILE_DESCRIPTOR_SET: &[u8] = + tonic::include_file_descriptor_set!("raft_metadata_store_svc"); diff --git a/crates/metadata-store/src/raft/handler.rs b/crates/metadata-store/src/raft/handler.rs new file mode 100644 index 000000000..da9995ccb --- /dev/null +++ b/crates/metadata-store/src/raft/handler.rs @@ -0,0 +1,67 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::raft::connection_manager::{ConnectionError, ConnectionManager}; +use crate::raft::grpc_svc::raft_metadata_store_svc_server::RaftMetadataStoreSvc; +use crate::raft::grpc_svc::RaftMessage; +use std::str::FromStr; +use tonic::codegen::BoxStream; +use tonic::{Request, Response, Status, Streaming}; + +pub const RAFT_PEER_METADATA_KEY: &str = "x-restate-raft-peer"; + +#[derive(Debug)] +pub struct RaftMetadataStoreHandler { + connection_manager: ConnectionManager, +} + +impl RaftMetadataStoreHandler { + pub fn new(connection_manager: ConnectionManager) -> Self { + Self { connection_manager } + } +} + +#[async_trait::async_trait] +impl RaftMetadataStoreSvc for RaftMetadataStoreHandler { + type RaftStream = BoxStream; + + async fn raft( + &self, + request: Request>, + ) -> Result, Status> { + let raft_peer_metadata = + request + .metadata() + .get(RAFT_PEER_METADATA_KEY) + .ok_or(Status::invalid_argument(format!( + "'{}' is missing", + RAFT_PEER_METADATA_KEY + )))?; + let raft_peer = u64::from_str( + raft_peer_metadata + .to_str() + .map_err(|err| Status::invalid_argument(err.to_string()))?, + ) + .map_err(|err| Status::invalid_argument(err.to_string()))?; + let outgoing_rx = self + .connection_manager + .accept_connection(raft_peer, request.into_inner())?; + Ok(Response::new(outgoing_rx)) + } +} + +impl From for Status { + fn from(value: ConnectionError) -> Self { + match value { + ConnectionError::Internal(err) => Status::internal(err), + ConnectionError::Shutdown(err) => Status::aborted(err.to_string()), + } + } +} diff --git a/crates/metadata-store/src/raft/mod.rs b/crates/metadata-store/src/raft/mod.rs index 7c14a5b27..f39cee2ce 100644 --- a/crates/metadata-store/src/raft/mod.rs +++ b/crates/metadata-store/src/raft/mod.rs @@ -8,6 +8,10 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +mod connection_manager; +pub mod grpc_svc; +mod handler; +mod networking; pub mod service; mod storage; mod store; diff --git a/crates/metadata-store/src/raft/networking.rs b/crates/metadata-store/src/raft/networking.rs new file mode 100644 index 000000000..62b64561b --- /dev/null +++ b/crates/metadata-store/src/raft/networking.rs @@ -0,0 +1,145 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::raft::connection_manager::ConnectionManager; +use crate::raft::grpc_svc::RaftMessage; +use crate::raft::handler::RAFT_PEER_METADATA_KEY; +use bytes::{BufMut, BytesMut}; +use futures::FutureExt; +use protobuf::Message as ProtobufMessage; +use raft::prelude::Message; +use restate_core::network::net_util; +use restate_core::TaskCenter; +use restate_types::errors::GenericError; +use restate_types::net::AdvertisedAddress; +use std::collections::HashMap; +use std::mem; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; +use tokio_stream::wrappers::ReceiverStream; +use tonic::metadata::MetadataValue; +use tonic::IntoStreamingRequest; +use tracing::{debug, trace}; + +#[derive(Debug, thiserror::Error)] +pub enum TrySendError { + #[error("failed sending message")] + Send(T), + #[error("failed encoding message")] + Encode(T, GenericError), + #[error("unknown peer: {0}")] + UnknownPeer(u64), +} + +#[derive(derive_more::Debug)] +pub struct Networking { + connection_manager: ConnectionManager, + addresses: HashMap, + connection_attempts: HashMap>>, + serde_buffer: BytesMut, + #[debug(skip)] + task_center: TaskCenter, +} + +impl Networking { + pub fn new(task_center: TaskCenter, connection_manager: ConnectionManager) -> Self { + Networking { + connection_manager, + addresses: HashMap::default(), + connection_attempts: HashMap::default(), + serde_buffer: BytesMut::with_capacity(1024), + task_center, + } + } + + pub fn register_address(&mut self, peer: u64, address: AdvertisedAddress) { + self.addresses.insert(peer, address); + } + + pub fn try_send(&mut self, message: Message) -> Result<(), TrySendError> { + let target = message.to; + + if let Some(connection) = self.connection_manager.get_connection(target) { + let mut writer = mem::take(&mut self.serde_buffer).writer(); + message + .write_to_writer(&mut writer) + .map_err(|err| TrySendError::Encode(message.clone(), err.into()))?; + self.serde_buffer = writer.into_inner(); + + // todo: Maybe send message directly w/o indirection through RaftMessage + let raft_message = RaftMessage { + message: self.serde_buffer.split().freeze(), + }; + + connection + .try_send(raft_message) + .map_err(|_err| TrySendError::Send(message))?; + } else if let Some(address) = self.addresses.get(&target) { + if let Some(join_handle) = self.connection_attempts.remove(&target) { + if !join_handle.is_finished() { + return Ok(()); + } else { + match join_handle.now_or_never().expect("should be finished") { + Ok(result) => { + match result { + Ok(_) => trace!("Previous connection attempt to '{target}' succeeded but connection was closed in meantime."), + Err(err) => trace!("Previous connection attempt to '{target}' failed: {}", err) + } + + } + Err(err) => { + trace!("Previous connection attempt to '{target}' panicked: {}", err) + } + } + } + } + + self.connection_attempts.insert( + target, + Self::try_connecting_to( + self.task_center.clone(), + self.connection_manager.clone(), + target, + address.clone(), + ), + ); + } else { + return Err(TrySendError::UnknownPeer(target)); + } + + Ok(()) + } + + fn try_connecting_to( + task_center: TaskCenter, + connection_manager: ConnectionManager, + target: u64, + address: AdvertisedAddress, + ) -> JoinHandle> { + tokio::spawn(async move { + task_center.run_in_scope("raft-connection-attempt", None, async move { + trace!(%target, "Try connecting to raft peer"); + let channel = + net_util::create_tonic_channel_from_advertised_address(address.clone())?; + let mut raft_client = crate::raft::grpc_svc::raft_metadata_store_svc_client::RaftMetadataStoreSvcClient::new(channel); + let (outgoing_tx, outgoing_rx) = mpsc::channel(128); + + let mut request = ReceiverStream::new(outgoing_rx).into_streaming_request(); + // send our identity alongside with the request to the target + request.metadata_mut().insert(RAFT_PEER_METADATA_KEY, MetadataValue::try_from(connection_manager.identity().to_string())?); + let incoming_rx = raft_client.raft(request).await?; + + connection_manager.run_connection(target, outgoing_tx, incoming_rx.into_inner())?; + + Ok(()) + }).await + }) + } +} diff --git a/crates/metadata-store/src/raft/service.rs b/crates/metadata-store/src/raft/service.rs index e788c58c7..6a488fb36 100644 --- a/crates/metadata-store/src/raft/service.rs +++ b/crates/metadata-store/src/raft/service.rs @@ -12,20 +12,33 @@ use crate::grpc::handler::MetadataStoreHandler; use crate::grpc::server::GrpcServer; use crate::grpc::service_builder::GrpcServiceBuilder; use crate::grpc_svc::metadata_store_svc_server::MetadataStoreSvcServer; +use crate::raft::connection_manager::ConnectionManager; +use crate::raft::grpc_svc::raft_metadata_store_svc_server::RaftMetadataStoreSvcServer; +use crate::raft::handler::RaftMetadataStoreHandler; +use crate::raft::networking::Networking; use crate::raft::store::RaftMetadataStore; use crate::{grpc_svc, Error, MetadataStoreService}; +use assert2::let_assert; use futures::TryFutureExt; use restate_core::{task_center, TaskKind}; -use restate_types::config::MetadataStoreOptions; +use restate_types::config::{Kind, MetadataStoreOptions, RocksDbOptions}; use restate_types::live::BoxedLiveLoad; +use tokio::sync::mpsc; pub struct RaftMetadataStoreService { options: BoxedLiveLoad, + rocksdb_options: BoxedLiveLoad, } impl RaftMetadataStoreService { - pub fn new(options: BoxedLiveLoad) -> Self { - Self { options } + pub fn new( + options: BoxedLiveLoad, + rocksdb_options: BoxedLiveLoad, + ) -> Self { + Self { + options, + rocksdb_options, + } } } @@ -33,7 +46,20 @@ impl RaftMetadataStoreService { impl MetadataStoreService for RaftMetadataStoreService { async fn run(mut self) -> Result<(), Error> { let store_options = self.options.live_load(); - let store = RaftMetadataStore::create().await.map_err(Error::generic)?; + let_assert!(Kind::Raft(raft_options) = &store_options.kind); + + let (router_tx, router_rx) = mpsc::channel(128); + let task_center = task_center(); + let connection_manager = + ConnectionManager::new(task_center.clone(), raft_options.id, router_tx); + let store = RaftMetadataStore::create( + raft_options, + self.rocksdb_options, + Networking::new(task_center.clone(), connection_manager.clone()), + router_rx, + ) + .await + .map_err(Error::generic)?; let mut builder = GrpcServiceBuilder::default(); @@ -41,11 +67,14 @@ impl MetadataStoreService for RaftMetadataStoreService { builder.add_service(MetadataStoreSvcServer::new(MetadataStoreHandler::new( store.request_sender(), ))); + builder.add_service(RaftMetadataStoreSvcServer::new( + RaftMetadataStoreHandler::new(connection_manager), + )); let grpc_server = GrpcServer::new(store_options.bind_address.clone(), builder.build().await?); - task_center().spawn_child( + task_center.spawn_child( TaskKind::RpcServer, "metadata-store-grpc", None, diff --git a/crates/metadata-store/src/raft/storage.rs b/crates/metadata-store/src/raft/storage.rs index 0e37bbbce..835e4cb35 100644 --- a/crates/metadata-store/src/raft/storage.rs +++ b/crates/metadata-store/src/raft/storage.rs @@ -124,9 +124,7 @@ impl RocksDbStorage { fn find_last_index(db: &DB) -> u64 { let cf = db.cf_handle(RAFT_CF).expect("RAFT_CF exists"); let start = Self::raft_entry_key(0); - // end is exclusive so switch to the next discriminator - let mut end = [0; 9]; - end[0] = RAFT_ENTRY_DISCRIMINATOR + 1; + let end = Self::raft_entry_key(u64::MAX); let mut options = ReadOptions::default(); options.set_async_io(true); diff --git a/crates/metadata-store/src/raft/store.rs b/crates/metadata-store/src/raft/store.rs index e859b89f8..2f8e7bea7 100644 --- a/crates/metadata-store/src/raft/store.rs +++ b/crates/metadata-store/src/raft/store.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::raft::networking::Networking; use crate::raft::storage; use crate::raft::storage::RocksDbStorage; use crate::{ @@ -21,7 +22,8 @@ use raft::prelude::{ConfChange, ConfChangeV2, ConfState, Entry, EntryType, Messa use raft::{Config, RawNode}; use restate_core::cancellation_watcher; use restate_core::metadata_store::{Precondition, VersionedValue}; -use restate_types::config::Configuration; +use restate_types::config::{Configuration, RaftOptions, RocksDbOptions}; +use restate_types::live::BoxedLiveLoad; use restate_types::storage::{StorageCodec, StorageDecodeError, StorageEncodeError}; use restate_types::{flexbuffers_storage_encode_decode, Version}; use slog::o; @@ -61,6 +63,8 @@ pub enum Error { pub struct RaftMetadataStore { _logger: slog::Logger, raw_node: RawNode, + networking: Networking, + raft_rx: mpsc::Receiver, tick_interval: time::Interval, callbacks: HashMap, @@ -71,28 +75,38 @@ pub struct RaftMetadataStore { } impl RaftMetadataStore { - pub async fn create() -> Result { + pub async fn create( + raft_options: &RaftOptions, + rocksdb_options: BoxedLiveLoad, + mut networking: Networking, + raft_rx: mpsc::Receiver, + ) -> Result { let (request_tx, request_rx) = mpsc::channel(2); let config = Config { - id: 1, + id: raft_options.id, ..Default::default() }; - let rocksdb_options = Configuration::updateable() - .map(|configuration| &configuration.common.rocksdb) - .boxed(); let mut metadata_store_options = Configuration::updateable().map(|configuration| &configuration.metadata_store); - let mut store = + let mut storage = RocksDbStorage::create(metadata_store_options.live_load(), rocksdb_options).await?; - let conf_state = ConfState::from((vec![1], vec![])); - store.store_conf_state(conf_state).await?; + + // todo: Only write configuration on initialization + let voters: Vec<_> = raft_options.peers.keys().cloned().collect(); + let conf_state = ConfState::from((voters, vec![])); + storage.store_conf_state(conf_state).await?; + + // todo: Persist address information with configuration + for (peer, address) in &raft_options.peers { + networking.register_address(*peer, address.clone()); + } let drain = TracingSlogDrain; let logger = slog::Logger::root(drain, o!()); - let raw_node = RawNode::new(&config, store, &logger)?; + let raw_node = RawNode::new(&config, storage, &logger)?; let mut tick_interval = time::interval(Duration::from_millis(100)); tick_interval.set_missed_tick_behavior(MissedTickBehavior::Burst); @@ -101,6 +115,8 @@ impl RaftMetadataStore { // we only need to keep it alive _logger: logger, raw_node, + raft_rx, + networking, tick_interval, callbacks: HashMap::default(), kv_entries: HashMap::default(), @@ -120,6 +136,13 @@ impl RaftMetadataStore { tokio::select! { _ = &mut cancellation => { break; + }, + raft = self.raft_rx.recv() => { + if let Some(raft) = raft { + self.raw_node.step(raft)?; + } else { + break; + } } Some(request) = self.request_rx.recv() => { // todo: Unclear whether every replica should be allowed to propose. Maybe @@ -223,8 +246,12 @@ impl RaftMetadataStore { self.callbacks.insert(callback.request_id, callback); } - fn send_messages(&self, _messages: Vec) { - // todo: Send messages to other peers + fn send_messages(&mut self, messages: Vec) { + for message in messages { + if let Err(err) = self.networking.try_send(message) { + info!("failed sending message: {err}"); + } + } } async fn handle_committed_entries( diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index eecd0477c..6aaada3a0 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -281,8 +281,12 @@ impl Node { .boxed(), ) .boxed(), - Kind::Raft => RaftMetadataStoreService::new( + Kind::Raft(_) => RaftMetadataStoreService::new( updateable_config.clone().map(|c| &c.metadata_store).boxed(), + updateable_config + .clone() + .map(|config| &config.metadata_store.rocksdb) + .boxed(), ) .boxed(), } diff --git a/crates/types/src/config/metadata_store.rs b/crates/types/src/config/metadata_store.rs index e034444ff..c0d07f73b 100644 --- a/crates/types/src/config/metadata_store.rs +++ b/crates/types/src/config/metadata_store.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::collections::HashMap; use std::num::NonZeroUsize; use std::path::PathBuf; @@ -18,7 +19,7 @@ use restate_serde_util::NonZeroByteCount; use tracing::warn; use super::{data_dir, CommonOptions, RocksDbOptions, RocksDbOptionsBuilder}; -use crate::net::BindAddress; +use crate::net::{AdvertisedAddress, BindAddress}; /// # Metadata store options #[serde_as] @@ -67,13 +68,13 @@ pub struct MetadataStoreOptions { pub kind: Kind, } -#[derive(Debug, Default, Copy, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize)] #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] #[serde(rename_all = "kebab-case")] pub enum Kind { #[default] Local, - Raft, + Raft(RaftOptions), } impl MetadataStoreOptions { @@ -130,3 +131,14 @@ impl Default for MetadataStoreOptions { } } } + +#[serde_as] +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[serde(rename_all = "kebab-case")] +pub struct RaftOptions { + pub id: u64, + #[cfg_attr(feature = "schemars", schemars(with = "Vec<(u64, String)>"))] + #[serde_as(as = "serde_with::Seq<(_, _)>")] + pub peers: HashMap, +}