Skip to content

Commit

Permalink
Loglet::enqueue_batch should return OperationError
Browse files Browse the repository at this point in the history
  • Loading branch information
muhamadazmy committed Oct 2, 2024
1 parent 9cf2d2b commit 89d0aa5
Show file tree
Hide file tree
Showing 6 changed files with 9 additions and 16 deletions.
2 changes: 1 addition & 1 deletion crates/bifrost/src/loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub trait Loglet: Send + Sync + std::fmt::Debug {
/// retry failing appends indefinitely until the loglet is sealed. In that case, such commits
/// might still appear to future readers but without returning the commit acknowledgement to
/// the original writer.
async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result<LogletCommit, ShutdownError>;
async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result<LogletCommit, OperationError>;

/// The tail is *the first unwritten position* in the loglet.
///
Expand Down
13 changes: 3 additions & 10 deletions crates/bifrost/src/loglet_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::sync::Arc;
use futures::{Stream, StreamExt};
use tracing::instrument;

use restate_core::ShutdownError;
use restate_types::logs::metadata::SegmentIndex;
use restate_types::logs::{KeyFilter, LogletOffset, Lsn, SequenceNumber};
use restate_types::logs::{Record, TailState};
Expand Down Expand Up @@ -140,10 +139,7 @@ impl LogletWrapper {
#[allow(unused)]
#[cfg(any(test, feature = "test-util"))]
pub async fn append(&self, payload: Record) -> Result<Lsn, AppendError> {
let commit = self
.enqueue_batch(Arc::new([payload]))
.await
.map_err(AppendError::Shutdown)?;
let commit = self.enqueue_batch(Arc::new([payload])).await?;
commit.await
}

Expand All @@ -166,13 +162,10 @@ impl LogletWrapper {
)
)]
pub async fn append_batch(&self, payloads: Arc<[Record]>) -> Result<Lsn, AppendError> {
self.enqueue_batch(payloads)
.await
.map_err(AppendError::Shutdown)?
.await
self.enqueue_batch(payloads).await?.await
}

pub async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result<Commit, ShutdownError> {
pub async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result<Commit, OperationError> {
if self.tail_lsn.is_some() {
return Ok(Commit::sealed());
}
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/src/providers/local_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl Loglet for LocalLoglet {
Box::pin(self.tail_watch.to_stream())
}

async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result<LogletCommit, ShutdownError> {
async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result<LogletCommit, OperationError> {
// NOTE: This implementation doesn't perform pipelined writes yet. This will block the caller
// while the underlying write is in progress and only return the Commit future as resolved.
// This is temporary until pipelined writes are fully supported.
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/src/providers/memory_loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ impl Loglet for MemoryLoglet {
Box::pin(self.tail_watch.to_stream())
}

async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result<LogletCommit, ShutdownError> {
async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result<LogletCommit, OperationError> {
let mut log = self.log.lock().unwrap();
if self.sealed.load(Ordering::Relaxed) {
return Ok(LogletCommit::sealed());
Expand Down
2 changes: 1 addition & 1 deletion crates/bifrost/src/providers/replicated_loglet/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
Box::pin(self.known_global_tail.to_stream())
}

async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result<LogletCommit, ShutdownError> {
async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result<LogletCommit, OperationError> {
match self.sequencer {
SequencerAccess::Local { ref handle } => handle.enqueue_batch(payloads).await,
SequencerAccess::Remote { .. } => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use super::{
record_cache::RecordCache,
replication::spread_selector::{SelectorStrategy, SpreadSelector},
};
use crate::loglet::{util::TailOffsetWatch, LogletCommit};
use crate::loglet::{util::TailOffsetWatch, LogletCommit, OperationError};
use appender::SequencerAppender;

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -160,7 +160,7 @@ impl<T: TransportConnect> Sequencer<T> {
pub async fn enqueue_batch(
&self,
payloads: Arc<[Record]>,
) -> Result<LogletCommit, ShutdownError> {
) -> Result<LogletCommit, OperationError> {
if self
.sequencer_shared_state
.global_committed_tail()
Expand Down

0 comments on commit 89d0aa5

Please sign in to comment.