Skip to content

Commit

Permalink
[ReplicatedLoglet] Remote append
Browse files Browse the repository at this point in the history
Summary:
Implements a remote loglet append calls to leader sequencer
  • Loading branch information
muhamadazmy committed Oct 3, 2024
1 parent f53e2ea commit 845d464
Show file tree
Hide file tree
Showing 8 changed files with 667 additions and 14 deletions.
12 changes: 11 additions & 1 deletion crates/bifrost/src/loglet/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use std::fmt::Debug;
use std::sync::Arc;

use restate_core::ShutdownError;
use restate_core::{network::NetworkError, ShutdownError};
use restate_types::errors::{IntoMaybeRetryable, MaybeRetryableError};

#[derive(Debug, Clone, thiserror::Error)]
Expand Down Expand Up @@ -68,3 +68,13 @@ impl From<OperationError> for AppendError {
}
}
}

impl From<NetworkError> for OperationError {
fn from(value: NetworkError) -> Self {
match value {
NetworkError::Shutdown(err) => OperationError::Shutdown(err),
// todo(azmy): are all network errors retryable?
_ => OperationError::retryable(value),
}
}
}
4 changes: 4 additions & 0 deletions crates/bifrost/src/loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ impl LogletCommitResolver {
pub fn offset(self, offset: LogletOffset) {
let _ = self.tx.send(Ok(offset));
}

pub fn error(self, err: AppendError) {
let _ = self.tx.send(Err(err));
}
}

pub struct LogletCommit {
Expand Down
16 changes: 11 additions & 5 deletions crates/bifrost/src/providers/replicated_loglet/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::providers::replicated_loglet::tasks::SealTask;

use super::log_server_manager::RemoteLogServerManager;
use super::record_cache::RecordCache;
use super::remote_sequencer::RemoteSequencer;
use super::rpc_routers::{LogServersRpc, SequencersRpc};

#[derive(derive_more::Debug)]
Expand Down Expand Up @@ -95,7 +96,14 @@ impl<T: TransportConnect> ReplicatedLoglet<T> {
}
} else {
SequencerAccess::Remote {
sequencers_rpc: sequencers_rpc.clone(),
handle: RemoteSequencer::new(
log_id,
segment_index,
my_params.clone(),
networking.clone(),
known_global_tail.clone(),
sequencers_rpc.clone(),
),
}
};
Ok(Self {
Expand All @@ -116,7 +124,7 @@ impl<T: TransportConnect> ReplicatedLoglet<T> {
pub enum SequencerAccess<T> {
/// The sequencer is remote (or retired/preempted)
#[debug("Remote")]
Remote { sequencers_rpc: SequencersRpc },
Remote { handle: RemoteSequencer<T> },
/// We are the loglet leaders
#[debug("Local")]
Local { handle: Sequencer<T> },
Expand All @@ -143,9 +151,7 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result<LogletCommit, OperationError> {
match self.sequencer {
SequencerAccess::Local { ref handle } => handle.enqueue_batch(payloads).await,
SequencerAccess::Remote { .. } => {
todo!("Access to remote sequencers is not implemented yet")
}
SequencerAccess::Remote { ref handle } => handle.append(payloads).await,
}
}

Expand Down
2 changes: 2 additions & 0 deletions crates/bifrost/src/providers/replicated_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ mod network;
mod provider;
#[allow(dead_code)]
mod record_cache;
#[allow(dead_code)]
mod remote_sequencer;
pub mod replication;
mod rpc_routers;
#[allow(dead_code)]
Expand Down
Loading

0 comments on commit 845d464

Please sign in to comment.