Skip to content

Commit

Permalink
[Bifrost] Seal task for replicated loglet
Browse files Browse the repository at this point in the history
Initial implementation for the seal task for the replicated loglet. The seal task is not responsible for repairing the tail, just executes seal on an f-majority of nodes of the nodeset.
  • Loading branch information
AhmedSoliman committed Oct 1, 2024
1 parent f208019 commit 9cf2d2b
Show file tree
Hide file tree
Showing 6 changed files with 263 additions and 5 deletions.
4 changes: 4 additions & 0 deletions crates/bifrost/src/providers/replicated_loglet/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ use restate_core::ShutdownError;
use restate_types::errors::MaybeRetryableError;
use restate_types::logs::metadata::SegmentIndex;
use restate_types::logs::LogId;
use restate_types::replicated_loglet::ReplicatedLogletId;

use crate::loglet::OperationError;

#[derive(Debug, thiserror::Error)]
pub(crate) enum ReplicatedLogletError {
#[error("cannot parse loglet configuration for log_id={0} at segment_index={1}: {2}")]
LogletParamsParsingError(LogId, SegmentIndex, serde_json::Error),
#[error("could not seal loglet_id={0}, insufficient nodes available for seal")]
SealFailed(ReplicatedLogletId),
#[error(transparent)]
Shutdown(#[from] ShutdownError),
}
Expand All @@ -29,6 +32,7 @@ impl MaybeRetryableError for ReplicatedLogletError {
fn retryable(&self) -> bool {
match self {
Self::LogletParamsParsingError(..) => false,
Self::SealFailed(..) => true,
Self::Shutdown(_) => false,
}
}
Expand Down
61 changes: 57 additions & 4 deletions crates/bifrost/src/providers/replicated_loglet/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ use std::sync::Arc;

use async_trait::async_trait;
use futures::stream::BoxStream;
use tracing::debug;
use tracing::{debug, info};

use restate_core::network::{Networking, TransportConnect};
use restate_core::ShutdownError;
use restate_core::{task_center, ShutdownError};
use restate_types::logs::metadata::SegmentIndex;
use restate_types::logs::{KeyFilter, LogId, LogletOffset, Record, SequenceNumber, TailState};
use restate_types::replicated_loglet::ReplicatedLogletParams;
Expand All @@ -27,6 +27,7 @@ use crate::loglet::util::TailOffsetWatch;
use crate::loglet::{Loglet, LogletCommit, OperationError, SendableLogletReadStream};
use crate::providers::replicated_loglet::replication::spread_selector::SelectorStrategy;
use crate::providers::replicated_loglet::sequencer::Sequencer;
use crate::providers::replicated_loglet::tasks::SealTask;

use super::log_server_manager::RemoteLogServerManager;
use super::record_cache::RecordCache;
Expand Down Expand Up @@ -168,7 +169,17 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
}

async fn seal(&self) -> Result<(), OperationError> {
todo!()
// todo(asoli): If we are the sequencer node, let the sequencer know.
let _ = SealTask::new(
task_center(),
self.my_params.clone(),
self.logservers_rpc.seal.clone(),
self.known_global_tail.clone(),
)
.run(self.networking.clone())
.await?;
info!(loglet_id=%self.my_params.loglet_id, "Loglet has been sealed successfully");
Ok(())
}
}

Expand All @@ -190,7 +201,7 @@ mod tests {
use restate_types::replicated_loglet::{NodeSet, ReplicatedLogletId, ReplicationProperty};
use restate_types::{GenerationalNodeId, PlainNodeId};

use crate::loglet::Loglet;
use crate::loglet::{AppendError, Loglet};

struct TestEnv {
pub loglet: Arc<dyn Loglet>,
Expand Down Expand Up @@ -295,4 +306,46 @@ mod tests {
})
.await
}

// ** Single-node replicated-loglet seal **
#[test(tokio::test(start_paused = true))]
async fn test_seal_local_sequencer_single_node() -> Result<()> {
let loglet_id = ReplicatedLogletId::new(122);
let params = ReplicatedLogletParams {
loglet_id,
sequencer: GenerationalNodeId::new(1, 1),
replication: ReplicationProperty::new(NonZeroU8::new(1).unwrap()),
nodeset: NodeSet::from_single(PlainNodeId::new(1)),
write_set: None,
};

run_in_test_env(params, |env| async move {
let batch: Arc<[Record]> = vec![
("record-1", Keys::Single(1)).into(),
("record-2", Keys::Single(2)).into(),
("record-3", Keys::Single(3)).into(),
]
.into();
let offset = env.loglet.enqueue_batch(batch.clone()).await?.await?;
assert_that!(offset, eq(LogletOffset::new(3)));
let offset = env.loglet.enqueue_batch(batch.clone()).await?.await?;
assert_that!(offset, eq(LogletOffset::new(6)));
let tail = env.loglet.find_tail().await?;
assert_that!(tail, eq(TailState::Open(LogletOffset::new(7))));

env.loglet.seal().await?;
let batch: Arc<[Record]> = vec![
("record-4", Keys::Single(4)).into(),
("record-5", Keys::Single(5)).into(),
]
.into();
let not_appended = env.loglet.enqueue_batch(batch).await?.await;
assert_that!(not_appended, err(pat!(AppendError::Sealed)));
let tail = env.loglet.find_tail().await?;
assert_that!(tail, eq(TailState::Sealed(LogletOffset::new(7))));

Ok(())
})
.await
}
}
1 change: 1 addition & 0 deletions crates/bifrost/src/providers/replicated_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod replication;
mod rpc_routers;
#[allow(dead_code)]
pub mod sequencer;
mod tasks;
#[cfg(any(test, feature = "test-util"))]
pub mod test_util;

Expand Down
13 changes: 13 additions & 0 deletions crates/bifrost/src/providers/replicated_loglet/tasks/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod seal;

pub use seal::*;
176 changes: 176 additions & 0 deletions crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use tokio::sync::mpsc;
use tracing::trace;

use restate_core::network::rpc_router::{RpcError, RpcRouter};
use restate_core::network::{Incoming, Networking, TransportConnect};
use restate_core::{TaskCenter, TaskKind};
use restate_types::config::Configuration;
use restate_types::logs::{LogletOffset, SequenceNumber};
use restate_types::net::log_server::{Seal, Sealed, Status};
use restate_types::replicated_loglet::{
EffectiveNodeSet, ReplicatedLogletId, ReplicatedLogletParams,
};
use restate_types::retries::RetryPolicy;
use restate_types::{GenerationalNodeId, PlainNodeId};

use crate::loglet::util::TailOffsetWatch;
use crate::providers::replicated_loglet::error::ReplicatedLogletError;
use crate::providers::replicated_loglet::replication::NodeSetChecker;

/// Sends a seal request to as many log-servers in the nodeset
///
/// We broadcast the seal to all nodes that we can, but only wait for f-majority
/// responses before acknowleding the seal.
///
/// The seal operation is idempotent. It's safe to seal a loglet if it's already partially or fully
/// sealed. Note that the seal task ignores the "seal" state in the input known_global_tail watch,
/// but it will set it to `true` after the seal.
pub struct SealTask {
task_center: TaskCenter,
my_params: ReplicatedLogletParams,
seal_router: RpcRouter<Seal>,
known_global_tail: TailOffsetWatch,
}

impl SealTask {
pub fn new(
task_center: TaskCenter,
my_params: ReplicatedLogletParams,
seal_router: RpcRouter<Seal>,
known_global_tail: TailOffsetWatch,
) -> Self {
Self {
task_center,
my_params,
seal_router,
known_global_tail,
}
}

pub async fn run<T: TransportConnect>(
self,
networking: Networking<T>,
) -> Result<LogletOffset, ReplicatedLogletError> {
// Use the entire nodeset except for StorageState::Disabled.
let effective_nodeset = EffectiveNodeSet::new(
&self.my_params.nodeset,
&networking.metadata().nodes_config_ref(),
);

let (tx, mut rx) = mpsc::unbounded_channel();

let mut nodeset_checker = NodeSetChecker::<'_, bool>::new(
&effective_nodeset,
&networking.metadata().nodes_config_ref(),
&self.my_params.replication,
);

let retry_policy = Configuration::pinned()
.bifrost
.replicated_loglet
.log_server_retry_policy
.clone();

for node in effective_nodeset.iter() {
let task = SealSingleNode {
node_id: *node,
loglet_id: self.my_params.loglet_id,
sequencer: self.my_params.sequencer,
seal_router: self.seal_router.clone(),
networking: networking.clone(),
known_global_tail: self.known_global_tail.clone(),
};
self.task_center.spawn_child(
TaskKind::Disposable,
"send-seal-request",
None,
task.run(tx.clone(), retry_policy.clone()),
)?;
}
drop(tx);

// Max observed local-tail from sealed nodes
let mut max_tail = LogletOffset::INVALID;
while let Some((node_id, local_tail)) = rx.recv().await {
max_tail = std::cmp::max(max_tail, local_tail);
nodeset_checker.set_attribute(node_id, true);

// Do we have f-majority responses?
if nodeset_checker.check_fmajority(|sealed| *sealed).passed() {
self.known_global_tail.notify_seal();
// note that the rest of seal requests will continue in the background
return Ok(max_tail);
}
}

// no more tasks left. We this means that we failed to seal
Err(ReplicatedLogletError::SealFailed(self.my_params.loglet_id))
}
}

struct SealSingleNode<T> {
node_id: PlainNodeId,
loglet_id: ReplicatedLogletId,
sequencer: GenerationalNodeId,
seal_router: RpcRouter<Seal>,
networking: Networking<T>,
known_global_tail: TailOffsetWatch,
}

impl<T: TransportConnect> SealSingleNode<T> {
/// Returns local-tail. Note that this will _only_ return if seal was successful, otherwise,
/// it'll continue to retry.
pub async fn run(
self,
tx: mpsc::UnboundedSender<(PlainNodeId, LogletOffset)>,
retry_policy: RetryPolicy,
) -> anyhow::Result<()> {
let mut retry_iter = retry_policy.into_iter();
loop {
match self.do_seal().await {
Ok(res) if res.body().sealed || res.body().status == Status::Ok => {
let _ = tx.send((self.node_id, res.body().local_tail));
return Ok(());
}
// not sealed, or seal has failed
Ok(res) => {
// Sent, but sealing not successful
trace!(loglet_id = %self.loglet_id, "Seal failed on node {} with status {:?}", self.node_id, res.body().status);
}
Err(_) => {
trace!(loglet_id = %self.loglet_id, "Failed to send seal message to node {}", self.node_id);
}
}
if let Some(pause) = retry_iter.next() {
tokio::time::sleep(pause).await;
} else {
return Err(anyhow::anyhow!(format!(
"Exhausted retries while attempting to seal the loglet {} on node {}",
self.loglet_id, self.node_id
)));
}
}
}

async fn do_seal(&self) -> Result<Incoming<Sealed>, RpcError<Seal>> {
let request = Seal {
loglet_id: self.loglet_id,
sequencer: self.sequencer.clone(),
known_global_tail: self.known_global_tail.latest_offset(),
};
trace!(loglet_id = %self.loglet_id, "Sending seal message to node {}", self.node_id);
self.seal_router
.call(&self.networking, self.node_id, request)
.await
}
}
13 changes: 12 additions & 1 deletion crates/types/src/config/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ pub struct ReplicatedLogletOptions {
///
/// Timeout waiting on log server response
pub log_server_timeout: Duration,

/// log_server RPC retry policy
///
/// Retry policy for log server RPCs
pub log_server_retry_policy: RetryPolicy,
}

impl Default for ReplicatedLogletOptions {
Expand All @@ -241,11 +246,17 @@ impl Default for ReplicatedLogletOptions {

sequencer_backoff_strategy: RetryPolicy::exponential(
Duration::from_millis(100),
0.1,
2.0,
None,
Some(Duration::from_millis(2000)),
),
log_server_timeout: Duration::from_millis(500),
log_server_retry_policy: RetryPolicy::exponential(
Duration::from_millis(250),
2.0,
Some(10),
Some(Duration::from_millis(2000)),
),
}
}
}

0 comments on commit 9cf2d2b

Please sign in to comment.