From 9cf2d2bc3bf21f1555561419175e593c1c7b7c3e Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Tue, 1 Oct 2024 12:36:31 +0100 Subject: [PATCH] [Bifrost] Seal task for replicated loglet Initial implementation for the seal task for the replicated loglet. The seal task is not responsible for repairing the tail, just executes seal on an f-majority of nodes of the nodeset. --- .../src/providers/replicated_loglet/error.rs | 4 + .../src/providers/replicated_loglet/loglet.rs | 61 +++++- .../src/providers/replicated_loglet/mod.rs | 1 + .../providers/replicated_loglet/tasks/mod.rs | 13 ++ .../providers/replicated_loglet/tasks/seal.rs | 176 ++++++++++++++++++ crates/types/src/config/bifrost.rs | 13 +- 6 files changed, 263 insertions(+), 5 deletions(-) create mode 100644 crates/bifrost/src/providers/replicated_loglet/tasks/mod.rs create mode 100644 crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs diff --git a/crates/bifrost/src/providers/replicated_loglet/error.rs b/crates/bifrost/src/providers/replicated_loglet/error.rs index 22f1ea545..a0b062a5f 100644 --- a/crates/bifrost/src/providers/replicated_loglet/error.rs +++ b/crates/bifrost/src/providers/replicated_loglet/error.rs @@ -14,6 +14,7 @@ use restate_core::ShutdownError; use restate_types::errors::MaybeRetryableError; use restate_types::logs::metadata::SegmentIndex; use restate_types::logs::LogId; +use restate_types::replicated_loglet::ReplicatedLogletId; use crate::loglet::OperationError; @@ -21,6 +22,8 @@ use crate::loglet::OperationError; pub(crate) enum ReplicatedLogletError { #[error("cannot parse loglet configuration for log_id={0} at segment_index={1}: {2}")] LogletParamsParsingError(LogId, SegmentIndex, serde_json::Error), + #[error("could not seal loglet_id={0}, insufficient nodes available for seal")] + SealFailed(ReplicatedLogletId), #[error(transparent)] Shutdown(#[from] ShutdownError), } @@ -29,6 +32,7 @@ impl MaybeRetryableError for ReplicatedLogletError { fn retryable(&self) -> bool { match self { Self::LogletParamsParsingError(..) => false, + Self::SealFailed(..) => true, Self::Shutdown(_) => false, } } diff --git a/crates/bifrost/src/providers/replicated_loglet/loglet.rs b/crates/bifrost/src/providers/replicated_loglet/loglet.rs index 8653a1661..dacb97715 100644 --- a/crates/bifrost/src/providers/replicated_loglet/loglet.rs +++ b/crates/bifrost/src/providers/replicated_loglet/loglet.rs @@ -15,10 +15,10 @@ use std::sync::Arc; use async_trait::async_trait; use futures::stream::BoxStream; -use tracing::debug; +use tracing::{debug, info}; use restate_core::network::{Networking, TransportConnect}; -use restate_core::ShutdownError; +use restate_core::{task_center, ShutdownError}; use restate_types::logs::metadata::SegmentIndex; use restate_types::logs::{KeyFilter, LogId, LogletOffset, Record, SequenceNumber, TailState}; use restate_types::replicated_loglet::ReplicatedLogletParams; @@ -27,6 +27,7 @@ use crate::loglet::util::TailOffsetWatch; use crate::loglet::{Loglet, LogletCommit, OperationError, SendableLogletReadStream}; use crate::providers::replicated_loglet::replication::spread_selector::SelectorStrategy; use crate::providers::replicated_loglet::sequencer::Sequencer; +use crate::providers::replicated_loglet::tasks::SealTask; use super::log_server_manager::RemoteLogServerManager; use super::record_cache::RecordCache; @@ -168,7 +169,17 @@ impl Loglet for ReplicatedLoglet { } async fn seal(&self) -> Result<(), OperationError> { - todo!() + // todo(asoli): If we are the sequencer node, let the sequencer know. + let _ = SealTask::new( + task_center(), + self.my_params.clone(), + self.logservers_rpc.seal.clone(), + self.known_global_tail.clone(), + ) + .run(self.networking.clone()) + .await?; + info!(loglet_id=%self.my_params.loglet_id, "Loglet has been sealed successfully"); + Ok(()) } } @@ -190,7 +201,7 @@ mod tests { use restate_types::replicated_loglet::{NodeSet, ReplicatedLogletId, ReplicationProperty}; use restate_types::{GenerationalNodeId, PlainNodeId}; - use crate::loglet::Loglet; + use crate::loglet::{AppendError, Loglet}; struct TestEnv { pub loglet: Arc, @@ -295,4 +306,46 @@ mod tests { }) .await } + + // ** Single-node replicated-loglet seal ** + #[test(tokio::test(start_paused = true))] + async fn test_seal_local_sequencer_single_node() -> Result<()> { + let loglet_id = ReplicatedLogletId::new(122); + let params = ReplicatedLogletParams { + loglet_id, + sequencer: GenerationalNodeId::new(1, 1), + replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()), + nodeset: NodeSet::from_single(PlainNodeId::new(1)), + write_set: None, + }; + + run_in_test_env(params, |env| async move { + let batch: Arc<[Record]> = vec![ + ("record-1", Keys::Single(1)).into(), + ("record-2", Keys::Single(2)).into(), + ("record-3", Keys::Single(3)).into(), + ] + .into(); + let offset = env.loglet.enqueue_batch(batch.clone()).await?.await?; + assert_that!(offset, eq(LogletOffset::new(3))); + let offset = env.loglet.enqueue_batch(batch.clone()).await?.await?; + assert_that!(offset, eq(LogletOffset::new(6))); + let tail = env.loglet.find_tail().await?; + assert_that!(tail, eq(TailState::Open(LogletOffset::new(7)))); + + env.loglet.seal().await?; + let batch: Arc<[Record]> = vec![ + ("record-4", Keys::Single(4)).into(), + ("record-5", Keys::Single(5)).into(), + ] + .into(); + let not_appended = env.loglet.enqueue_batch(batch).await?.await; + assert_that!(not_appended, err(pat!(AppendError::Sealed))); + let tail = env.loglet.find_tail().await?; + assert_that!(tail, eq(TailState::Sealed(LogletOffset::new(7)))); + + Ok(()) + }) + .await + } } diff --git a/crates/bifrost/src/providers/replicated_loglet/mod.rs b/crates/bifrost/src/providers/replicated_loglet/mod.rs index 067f742ed..41fbbfa61 100644 --- a/crates/bifrost/src/providers/replicated_loglet/mod.rs +++ b/crates/bifrost/src/providers/replicated_loglet/mod.rs @@ -20,6 +20,7 @@ pub mod replication; mod rpc_routers; #[allow(dead_code)] pub mod sequencer; +mod tasks; #[cfg(any(test, feature = "test-util"))] pub mod test_util; diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/mod.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/mod.rs new file mode 100644 index 000000000..3db0842b3 --- /dev/null +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/mod.rs @@ -0,0 +1,13 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +mod seal; + +pub use seal::*; diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs new file mode 100644 index 000000000..955908478 --- /dev/null +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs @@ -0,0 +1,176 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use tokio::sync::mpsc; +use tracing::trace; + +use restate_core::network::rpc_router::{RpcError, RpcRouter}; +use restate_core::network::{Incoming, Networking, TransportConnect}; +use restate_core::{TaskCenter, TaskKind}; +use restate_types::config::Configuration; +use restate_types::logs::{LogletOffset, SequenceNumber}; +use restate_types::net::log_server::{Seal, Sealed, Status}; +use restate_types::replicated_loglet::{ + EffectiveNodeSet, ReplicatedLogletId, ReplicatedLogletParams, +}; +use restate_types::retries::RetryPolicy; +use restate_types::{GenerationalNodeId, PlainNodeId}; + +use crate::loglet::util::TailOffsetWatch; +use crate::providers::replicated_loglet::error::ReplicatedLogletError; +use crate::providers::replicated_loglet::replication::NodeSetChecker; + +/// Sends a seal request to as many log-servers in the nodeset +/// +/// We broadcast the seal to all nodes that we can, but only wait for f-majority +/// responses before acknowleding the seal. +/// +/// The seal operation is idempotent. It's safe to seal a loglet if it's already partially or fully +/// sealed. Note that the seal task ignores the "seal" state in the input known_global_tail watch, +/// but it will set it to `true` after the seal. +pub struct SealTask { + task_center: TaskCenter, + my_params: ReplicatedLogletParams, + seal_router: RpcRouter, + known_global_tail: TailOffsetWatch, +} + +impl SealTask { + pub fn new( + task_center: TaskCenter, + my_params: ReplicatedLogletParams, + seal_router: RpcRouter, + known_global_tail: TailOffsetWatch, + ) -> Self { + Self { + task_center, + my_params, + seal_router, + known_global_tail, + } + } + + pub async fn run( + self, + networking: Networking, + ) -> Result { + // Use the entire nodeset except for StorageState::Disabled. + let effective_nodeset = EffectiveNodeSet::new( + &self.my_params.nodeset, + &networking.metadata().nodes_config_ref(), + ); + + let (tx, mut rx) = mpsc::unbounded_channel(); + + let mut nodeset_checker = NodeSetChecker::<'_, bool>::new( + &effective_nodeset, + &networking.metadata().nodes_config_ref(), + &self.my_params.replication, + ); + + let retry_policy = Configuration::pinned() + .bifrost + .replicated_loglet + .log_server_retry_policy + .clone(); + + for node in effective_nodeset.iter() { + let task = SealSingleNode { + node_id: *node, + loglet_id: self.my_params.loglet_id, + sequencer: self.my_params.sequencer, + seal_router: self.seal_router.clone(), + networking: networking.clone(), + known_global_tail: self.known_global_tail.clone(), + }; + self.task_center.spawn_child( + TaskKind::Disposable, + "send-seal-request", + None, + task.run(tx.clone(), retry_policy.clone()), + )?; + } + drop(tx); + + // Max observed local-tail from sealed nodes + let mut max_tail = LogletOffset::INVALID; + while let Some((node_id, local_tail)) = rx.recv().await { + max_tail = std::cmp::max(max_tail, local_tail); + nodeset_checker.set_attribute(node_id, true); + + // Do we have f-majority responses? + if nodeset_checker.check_fmajority(|sealed| *sealed).passed() { + self.known_global_tail.notify_seal(); + // note that the rest of seal requests will continue in the background + return Ok(max_tail); + } + } + + // no more tasks left. We this means that we failed to seal + Err(ReplicatedLogletError::SealFailed(self.my_params.loglet_id)) + } +} + +struct SealSingleNode { + node_id: PlainNodeId, + loglet_id: ReplicatedLogletId, + sequencer: GenerationalNodeId, + seal_router: RpcRouter, + networking: Networking, + known_global_tail: TailOffsetWatch, +} + +impl SealSingleNode { + /// Returns local-tail. Note that this will _only_ return if seal was successful, otherwise, + /// it'll continue to retry. + pub async fn run( + self, + tx: mpsc::UnboundedSender<(PlainNodeId, LogletOffset)>, + retry_policy: RetryPolicy, + ) -> anyhow::Result<()> { + let mut retry_iter = retry_policy.into_iter(); + loop { + match self.do_seal().await { + Ok(res) if res.body().sealed || res.body().status == Status::Ok => { + let _ = tx.send((self.node_id, res.body().local_tail)); + return Ok(()); + } + // not sealed, or seal has failed + Ok(res) => { + // Sent, but sealing not successful + trace!(loglet_id = %self.loglet_id, "Seal failed on node {} with status {:?}", self.node_id, res.body().status); + } + Err(_) => { + trace!(loglet_id = %self.loglet_id, "Failed to send seal message to node {}", self.node_id); + } + } + if let Some(pause) = retry_iter.next() { + tokio::time::sleep(pause).await; + } else { + return Err(anyhow::anyhow!(format!( + "Exhausted retries while attempting to seal the loglet {} on node {}", + self.loglet_id, self.node_id + ))); + } + } + } + + async fn do_seal(&self) -> Result, RpcError> { + let request = Seal { + loglet_id: self.loglet_id, + sequencer: self.sequencer.clone(), + known_global_tail: self.known_global_tail.latest_offset(), + }; + trace!(loglet_id = %self.loglet_id, "Sending seal message to node {}", self.node_id); + self.seal_router + .call(&self.networking, self.node_id, request) + .await + } +} diff --git a/crates/types/src/config/bifrost.rs b/crates/types/src/config/bifrost.rs index c55c20110..0c54808d9 100644 --- a/crates/types/src/config/bifrost.rs +++ b/crates/types/src/config/bifrost.rs @@ -232,6 +232,11 @@ pub struct ReplicatedLogletOptions { /// /// Timeout waiting on log server response pub log_server_timeout: Duration, + + /// log_server RPC retry policy + /// + /// Retry policy for log server RPCs + pub log_server_retry_policy: RetryPolicy, } impl Default for ReplicatedLogletOptions { @@ -241,11 +246,17 @@ impl Default for ReplicatedLogletOptions { sequencer_backoff_strategy: RetryPolicy::exponential( Duration::from_millis(100), - 0.1, + 2.0, None, Some(Duration::from_millis(2000)), ), log_server_timeout: Duration::from_millis(500), + log_server_retry_policy: RetryPolicy::exponential( + Duration::from_millis(250), + 2.0, + Some(10), + Some(Duration::from_millis(2000)), + ), } } }