diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 762cff87cff..52029fecee4 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -20,7 +20,6 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::fmt; -use std::iter::once; use std::path::Path; use std::sync::{Arc, Weak}; use std::time::Duration; @@ -56,8 +55,9 @@ use tracing::{debug, error, info, warn}; use super::fetch::FetchStreamTask; use super::metrics::INGEST_V2_METRICS; use super::models::IngesterShard; -use super::mrecord::MRecord; -use super::mrecordlog_utils::{check_enough_capacity, force_delete_queue}; +use super::mrecordlog_utils::{ + append_non_empty_doc_batch, check_enough_capacity, force_delete_queue, AppendDocBatchError, +}; use super::rate_meter::RateMeter; use super::replication::{ ReplicationClient, ReplicationStreamTask, ReplicationStreamTaskHandle, ReplicationTask, @@ -232,10 +232,12 @@ impl Ingester { num_closed_shards += 1; } else { // The queue is empty: delete it. - force_delete_queue(&mut state_guard.mrecordlog, &queue_id) - .await - .expect("TODO: handle IO error"); - + if let Err(io_error) = + force_delete_queue(&mut state_guard.mrecordlog, &queue_id).await + { + error!("failed to delete WAL queue `{queue_id}`: {io_error}",); + continue; + } num_deleted_shards += 1; } } @@ -297,6 +299,7 @@ impl Ingester { .await?; if let Err(error) = replication_client.init_replica(shard).await { + // TODO: Remove the dangling queue from the WAL. error!("failed to initialize replica shard: {error}",); return Err(IngestV2Error::Internal(format!( "failed to initialize replica shard: {error}" @@ -395,6 +398,9 @@ impl Ingester { let mut persist_failures = Vec::new(); let mut replicate_subrequests: HashMap> = HashMap::new(); + // Keep track of the shards that need to be closed after an IO error. + let mut shards_to_close: HashSet = HashSet::new(); + let commit_type = persist_request.commit_type(); let force_commit = commit_type == CommitTypeV2::Force; let leader_id: NodeId = persist_request.leader_id.into(); @@ -515,27 +521,42 @@ impl Ingester { rate_meter.update(batch_num_bytes); - let current_position_inclusive: Position = if force_commit { - let encoded_mrecords = doc_batch - .docs() - .map(|doc| MRecord::Doc(doc).encode()) - .chain(once(MRecord::Commit.encode())); - state_guard - .mrecordlog - .append_records(&queue_id, None, encoded_mrecords) - .await - .expect("TODO") // TODO: Io error, close shard? - } else { - let encoded_mrecords = doc_batch.docs().map(|doc| MRecord::Doc(doc).encode()); - state_guard - .mrecordlog - .append_records(&queue_id, None, encoded_mrecords) - .await - .expect("TODO") // TODO: Io error, close shard? - } - .map(Position::offset) - .expect("records should not be empty"); + let append_result = append_non_empty_doc_batch( + &mut state_guard.mrecordlog, + &queue_id, + &doc_batch, + force_commit, + ) + .await; + let current_position_inclusive = match append_result { + Ok(current_position_inclusive) => current_position_inclusive, + Err(append_error) => { + let reason = match &append_error { + AppendDocBatchError::Io(io_error) => { + error!("failed to append records to shard `{queue_id}`: {io_error}"); + shards_to_close.insert(queue_id); + PersistFailureReason::ShardClosed + } + AppendDocBatchError::QueueNotFound(_) => { + error!( + "WAL queue `{queue_id}` does not exist: this should never happen, \ + please report this issue on GitHub", + ); + PersistFailureReason::ShardNotFound + } + }; + let persist_failure = PersistFailure { + subrequest_id: subrequest.subrequest_id, + index_uid: subrequest.index_uid, + source_id: subrequest.source_id, + shard_id: subrequest.shard_id, + reason: reason as i32, + }; + persist_failures.push(persist_failure); + continue; + } + }; // It's more precise the compute the new usage from the current usage + the requested // capacity than from continuously summing up the requested capacities, which are // approximations. @@ -583,6 +604,16 @@ impl Ingester { persist_successes.push(persist_success); } } + for queue_id in shards_to_close { + let shard = state_guard + .shards + .get_mut(&queue_id) + .expect("shard should exist"); + { + shard.shard_state = ShardState::Closed; + shard.notify_shard_status(); + } + } if replicate_subrequests.is_empty() { let leader_id = self.self_node_id.to_string(); let persist_response = PersistResponse { @@ -1042,10 +1073,13 @@ impl IngesterState { shard.truncation_position_inclusive = truncate_up_to_position_inclusive; } Err(TruncateError::MissingQueue(_)) => { - warn!("failed to truncate WAL queue `{queue_id}`: queue does not exist"); + error!( + "WAL queue `{queue_id}` does not exist: this should never happen, please \ + report this issue on GitHub", + ); } - Err(error) => { - error!(%error, "failed to truncate WAL queue `{queue_id}`"); + Err(TruncateError::IoError(io_error)) => { + error!("failed to truncate shard `{queue_id}`: {io_error}"); } }; } @@ -1055,16 +1089,20 @@ impl IngesterState { async fn delete_shard(&mut self, queue_id: &QueueId) { match self.mrecordlog.delete_queue(queue_id).await { Ok(_) => { - self.shards.remove(queue_id); - self.rate_trackers.remove(queue_id); - - info!("deleted shard `{queue_id}` from ingester"); + let removed_shard_opt = self.shards.remove(queue_id); + let removed_rate_track_opt = self.rate_trackers.remove(queue_id); + debug_assert!(removed_shard_opt.is_some() && removed_rate_track_opt.is_some()); + info!("deleted shard `{queue_id}`"); } Err(DeleteQueueError::MissingQueue(_)) => { // The shard has already been deleted. + let removed_shard_opt = self.shards.remove(queue_id); + let removed_rate_track_opt = self.rate_trackers.remove(queue_id); + debug_assert!(removed_shard_opt.is_none() && removed_rate_track_opt.is_none()); + info!("deleted shard `{queue_id}`"); } - Err(DeleteQueueError::IoError(_)) => { - panic!("TODO: handle IO error") + Err(DeleteQueueError::IoError(io_error)) => { + error!("failed to delete WAL queue `{queue_id}`: {io_error}"); } }; } @@ -1152,6 +1190,7 @@ mod tests { use crate::ingest_v2::broadcast::ShardInfos; use crate::ingest_v2::fetch::tests::{into_fetch_eof, into_fetch_payload}; use crate::ingest_v2::test_utils::MultiRecordLogTestExt; + use crate::MRecord; pub(super) struct IngesterForTest { node_id: NodeId, diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs b/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs index d32022d47c0..ec0ce811ef3 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs @@ -18,12 +18,62 @@ // along with this program. If not, see . use std::io; +use std::iter::once; use std::ops::RangeInclusive; use bytesize::ByteSize; -use mrecordlog::error::DeleteQueueError; +use mrecordlog::error::{AppendError, DeleteQueueError}; use mrecordlog::MultiRecordLog; -use quickwit_proto::types::QueueId; +use quickwit_proto::ingest::DocBatchV2; +use quickwit_proto::types::{Position, QueueId}; + +use crate::MRecord; + +#[derive(Debug, thiserror::Error)] +pub(super) enum AppendDocBatchError { + #[error("IO error: {0}")] + Io(#[from] io::Error), + #[error("WAL queue `{0}` not found")] + QueueNotFound(QueueId), +} + +/// Appends a non-empty document batch to the WAL queue `queue_id`. +/// +/// # Panics +/// +/// Panics if `doc_batch` is empty. +pub(super) async fn append_non_empty_doc_batch( + mrecordlog: &mut MultiRecordLog, + queue_id: &QueueId, + doc_batch: &DocBatchV2, + force_commit: bool, +) -> Result { + let append_result = if force_commit { + let encoded_mrecords = doc_batch + .docs() + .map(|doc| MRecord::Doc(doc).encode()) + .chain(once(MRecord::Commit.encode())); + mrecordlog + .append_records(&queue_id, None, encoded_mrecords) + .await + } else { + let encoded_mrecords = doc_batch.docs().map(|doc| MRecord::Doc(doc).encode()); + mrecordlog + .append_records(&queue_id, None, encoded_mrecords) + .await + }; + match append_result { + Ok(Some(offset)) => Ok(Position::offset(offset)), + Ok(None) => panic!("`doc_batch` should not be empty"), + Err(AppendError::IoError(io_error)) => Err(AppendDocBatchError::Io(io_error)), + Err(AppendError::MissingQueue(queue_id)) => { + Err(AppendDocBatchError::QueueNotFound(queue_id)) + } + Err(AppendError::Past) => { + panic!("`append_records` should be called with `position_opt: None`") + } + } +} #[derive(Debug, Clone, Copy)] pub(super) struct MRecordLogUsage { @@ -31,9 +81,6 @@ pub(super) struct MRecordLogUsage { pub memory: ByteSize, } -#[derive(Debug, Clone, Copy)] -pub(super) struct MemoryUsage(ByteSize); - /// Error returned when the mrecordlog does not have enough capacity to store some records. #[derive(Debug, Clone, Copy, thiserror::Error)] pub(super) enum NotEnoughCapacityError {