From b0f9a794cc647dd7aafbeac1deb2747d0db324b1 Mon Sep 17 00:00:00 2001 From: Muhamad Awad Date: Thu, 3 Oct 2024 14:40:48 +0200 Subject: [PATCH] [ReplicatedLoglet] Implement remote sequencer find tail --- .../src/providers/replicated_loglet/loglet.rs | 4 +- .../replicated_loglet/remote_sequencer.rs | 78 ++++++++++++++++++- .../replicated_loglet/rpc_routers.rs | 6 +- crates/types/protobuf/restate/common.proto | 2 + crates/types/src/net/replicated_loglet.rs | 29 ++++++- 5 files changed, 110 insertions(+), 9 deletions(-) diff --git a/crates/bifrost/src/providers/replicated_loglet/loglet.rs b/crates/bifrost/src/providers/replicated_loglet/loglet.rs index 06f42da66..cadc85695 100644 --- a/crates/bifrost/src/providers/replicated_loglet/loglet.rs +++ b/crates/bifrost/src/providers/replicated_loglet/loglet.rs @@ -158,9 +158,7 @@ impl Loglet for ReplicatedLoglet { async fn find_tail(&self) -> Result, OperationError> { match self.sequencer { SequencerAccess::Local { .. } => Ok(*self.known_global_tail.get()), - SequencerAccess::Remote { .. } => { - todo!("find_tail() is not implemented yet") - } + SequencerAccess::Remote { ref handle } => handle.find_tail().await, } } diff --git a/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs b/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs index bf96571cd..d5831e355 100644 --- a/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs +++ b/crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs @@ -20,15 +20,17 @@ use tokio::sync::{mpsc, Mutex, OwnedSemaphorePermit, Semaphore}; use restate_core::{ network::{ - rpc_router::{RpcRouter, RpcToken}, + rpc_router::{RpcError, RpcRouter, RpcToken}, NetworkError, NetworkSendError, Networking, Outgoing, TransportConnect, WeakConnection, }, task_center, ShutdownError, TaskKind, }; use restate_types::{ config::Configuration, - logs::{metadata::SegmentIndex, LogId, LogletOffset, Record}, - net::replicated_loglet::{Append, Appended, CommonRequestHeader, SequencerStatus}, + logs::{metadata::SegmentIndex, LogId, LogletOffset, Record, SequenceNumber, TailState}, + net::replicated_loglet::{ + Append, Appended, CommonRequestHeader, GetSequencerInfo, SequencerStatus, + }, replicated_loglet::ReplicatedLogletParams, GenerationalNodeId, }; @@ -205,6 +207,76 @@ where Ok(connection) } + + /// Attempts to find tail. + /// + /// This first tries to find tail by synchronizing with sequencer. If this failed + /// duo to sequencer not reachable, it will immediately try to find tail by querying + /// fmajority of loglet servers + pub async fn find_tail(&self) -> Result, OperationError> { + // try to sync with sequencer + if self.sync_sequencer_tail().await.is_ok() { + return Ok(*self.known_global_tail.get()); + } + + // otherwise we need to try to fetch this from the log servers. + self.sync_log_servers_tail().await?; + Ok(*self.known_global_tail.get()) + } + + /// Synchronize known_global_tail with the sequencer + async fn sync_sequencer_tail(&self) -> Result<(), NetworkError> { + let result = self + .sequencers_rpc + .info + .call( + &self.networking, + self.params.sequencer, + GetSequencerInfo { + header: CommonRequestHeader { + log_id: self.log_id, + loglet_id: self.params.loglet_id, + segment_index: self.segment_index, + }, + }, + ) + .await + .map(|incoming| incoming.into_body()); + + let info = match result { + Ok(info) => info, + Err(RpcError::Shutdown(shutdown)) => return Err(NetworkError::Shutdown(shutdown)), + Err(RpcError::SendError(err)) => return Err(err.source), + }; + + match info.header.status { + SequencerStatus::Ok => { + // update header info + if let Some(offset) = info.header.known_global_tail { + self.known_global_tail.notify_offset_update(offset); + } + } + SequencerStatus::Sealed => { + self.known_global_tail.notify( + true, + info.header + .known_global_tail + .unwrap_or(LogletOffset::INVALID), + ); + } + _ => { + unreachable!() + } + }; + + Ok(()) + } + + /// A fallback mechanism in case sequencer is not available + /// to try and sync known_global_tail with fmajority of LogServers + async fn sync_log_servers_tail(&self) -> Result<(), OperationError> { + todo!() + } } /// RemoteSequencerConnection represents a single open connection diff --git a/crates/bifrost/src/providers/replicated_loglet/rpc_routers.rs b/crates/bifrost/src/providers/replicated_loglet/rpc_routers.rs index 23f09ba60..bd214cd38 100644 --- a/crates/bifrost/src/providers/replicated_loglet/rpc_routers.rs +++ b/crates/bifrost/src/providers/replicated_loglet/rpc_routers.rs @@ -14,7 +14,7 @@ use restate_core::network::rpc_router::RpcRouter; use restate_core::network::MessageRouterBuilder; use restate_types::net::log_server::{GetLogletInfo, GetRecords, Release, Seal, Store, Trim}; -use restate_types::net::replicated_loglet::Append; +use restate_types::net::replicated_loglet::{Append, GetSequencerInfo}; /// Used by replicated loglets to send requests and receive responses from log-servers /// Cloning this is cheap and all clones will share the same internal trackers. @@ -56,6 +56,7 @@ impl LogServersRpc { #[derive(Clone)] pub struct SequencersRpc { pub append: RpcRouter, + pub info: RpcRouter, } impl SequencersRpc { @@ -63,7 +64,8 @@ impl SequencersRpc { /// responses are routed correctly. pub fn new(router_builder: &mut MessageRouterBuilder) -> Self { let append = RpcRouter::new(router_builder); + let info = RpcRouter::new(router_builder); - Self { append } + Self { append, info } } } diff --git a/crates/types/protobuf/restate/common.proto b/crates/types/protobuf/restate/common.proto index 517acb8f1..b66de7253 100644 --- a/crates/types/protobuf/restate/common.proto +++ b/crates/types/protobuf/restate/common.proto @@ -63,6 +63,8 @@ enum TargetName { // ReplicatedLoglet REPLICATED_LOGLET_APPEND = 40; REPLICATED_LOGLET_APPENDED = 41; + REPLICATED_LOGLET_GET_INFO = 42; + REPLICATED_LOGLET_INFO = 43; } enum NodeStatus { diff --git a/crates/types/src/net/replicated_loglet.rs b/crates/types/src/net/replicated_loglet.rs index 85a3bcea1..b0c87f308 100644 --- a/crates/types/src/net/replicated_loglet.rs +++ b/crates/types/src/net/replicated_loglet.rs @@ -91,7 +91,8 @@ impl Append { self.payloads .iter() .map(|p| p.estimated_encode_size()) - .sum() + .sum::() + + size_of::() } } @@ -137,3 +138,29 @@ impl Appended { self } } + +define_rpc! { + @request = GetSequencerInfo, + @response = SequencerInfo, + @request_target = TargetName::ReplicatedLogletGetInfo, + @response_target = TargetName::ReplicatedLogletInfo, +} + +// ** APPEND +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GetSequencerInfo { + #[serde(flatten)] + pub header: CommonRequestHeader, +} + +impl GetSequencerInfo { + pub fn estimated_encode_size(&self) -> usize { + size_of::() + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SequencerInfo { + #[serde(flatten)] + pub header: CommonResponseHeader, +}