diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 762cff87cff..ab1199ddc16 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -20,7 +20,6 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::fmt; -use std::iter::once; use std::path::Path; use std::sync::{Arc, Weak}; use std::time::Duration; @@ -56,8 +55,9 @@ use tracing::{debug, error, info, warn}; use super::fetch::FetchStreamTask; use super::metrics::INGEST_V2_METRICS; use super::models::IngesterShard; -use super::mrecord::MRecord; -use super::mrecordlog_utils::{check_enough_capacity, force_delete_queue}; +use super::mrecordlog_utils::{ + append_non_empty_doc_batch, check_enough_capacity, force_delete_queue, AppendDocBatchError, +}; use super::rate_meter::RateMeter; use super::replication::{ ReplicationClient, ReplicationStreamTask, ReplicationStreamTaskHandle, ReplicationTask, @@ -120,6 +120,10 @@ impl Ingester { replication_factor: usize, ) -> IngestV2Result { let self_node_id: NodeId = cluster.self_node_id().into(); + info!( + "opening write-ahead log located at `{}`", + wal_dir_path.display() + ); let mrecordlog = MultiRecordLog::open_with_prefs( wal_dir_path, mrecordlog::SyncPolicy::OnDelay(Duration::from_secs(5)), @@ -232,10 +236,12 @@ impl Ingester { num_closed_shards += 1; } else { // The queue is empty: delete it. - force_delete_queue(&mut state_guard.mrecordlog, &queue_id) - .await - .expect("TODO: handle IO error"); - + if let Err(io_error) = + force_delete_queue(&mut state_guard.mrecordlog, &queue_id).await + { + error!("failed to delete shard `{queue_id}`: {io_error}"); + continue; + } num_deleted_shards += 1; } } @@ -297,6 +303,7 @@ impl Ingester { .await?; if let Err(error) = replication_client.init_replica(shard).await { + // TODO: Remove dangling queue from the WAL. error!("failed to initialize replica shard: {error}",); return Err(IngestV2Error::Internal(format!( "failed to initialize replica shard: {error}" @@ -395,6 +402,13 @@ impl Ingester { let mut persist_failures = Vec::new(); let mut replicate_subrequests: HashMap> = HashMap::new(); + // Keep track of the shards that need to be closed following an IO error. + let mut shards_to_close: HashSet = HashSet::new(); + + // Keep track of dangling shards, i.e., shards for which there is no longer a corresponding + // queue in the WAL and should be deleted. + let mut shards_to_delete: HashSet = HashSet::new(); + let commit_type = persist_request.commit_type(); let force_commit = commit_type == CommitTypeV2::Force; let leader_id: NodeId = persist_request.leader_id.into(); @@ -515,27 +529,43 @@ impl Ingester { rate_meter.update(batch_num_bytes); - let current_position_inclusive: Position = if force_commit { - let encoded_mrecords = doc_batch - .docs() - .map(|doc| MRecord::Doc(doc).encode()) - .chain(once(MRecord::Commit.encode())); - state_guard - .mrecordlog - .append_records(&queue_id, None, encoded_mrecords) - .await - .expect("TODO") // TODO: Io error, close shard? - } else { - let encoded_mrecords = doc_batch.docs().map(|doc| MRecord::Doc(doc).encode()); - state_guard - .mrecordlog - .append_records(&queue_id, None, encoded_mrecords) - .await - .expect("TODO") // TODO: Io error, close shard? - } - .map(Position::offset) - .expect("records should not be empty"); + let append_result = append_non_empty_doc_batch( + &mut state_guard.mrecordlog, + &queue_id, + &doc_batch, + force_commit, + ) + .await; + let current_position_inclusive = match append_result { + Ok(current_position_inclusive) => current_position_inclusive, + Err(append_error) => { + let reason = match &append_error { + AppendDocBatchError::Io(io_error) => { + error!("failed to persist records to shard `{queue_id}`: {io_error}"); + shards_to_close.insert(queue_id); + PersistFailureReason::ShardClosed + } + AppendDocBatchError::QueueNotFound(_) => { + error!( + "failed to persist records to shard `{queue_id}`: WAL queue not \ + found" + ); + shards_to_delete.insert(queue_id); + PersistFailureReason::ShardNotFound + } + }; + let persist_failure = PersistFailure { + subrequest_id: subrequest.subrequest_id, + index_uid: subrequest.index_uid, + source_id: subrequest.source_id, + shard_id: subrequest.shard_id, + reason: reason as i32, + }; + persist_failures.push(persist_failure); + continue; + } + }; // It's more precise the compute the new usage from the current usage + the requested // capacity than from continuously summing up the requested capacities, which are // approximations. @@ -583,6 +613,28 @@ impl Ingester { persist_successes.push(persist_success); } } + if !shards_to_close.is_empty() { + for queue_id in &shards_to_close { + let shard = state_guard + .shards + .get_mut(queue_id) + .expect("shard should exist"); + + shard.shard_state = ShardState::Closed; + shard.notify_shard_status(); + } + info!( + "closed {} shard(s) following IO error(s)", + shards_to_close.len() + ); + } + if !shards_to_delete.is_empty() { + for queue_id in &shards_to_delete { + state_guard.shards.remove(queue_id); + state_guard.rate_trackers.remove(queue_id); + } + info!("deleted {} dangling shard(s)", shards_to_delete.len()); + } if replicate_subrequests.is_empty() { let leader_id = self.self_node_id.to_string(); let persist_response = PersistResponse { @@ -1042,29 +1094,28 @@ impl IngesterState { shard.truncation_position_inclusive = truncate_up_to_position_inclusive; } Err(TruncateError::MissingQueue(_)) => { - warn!("failed to truncate WAL queue `{queue_id}`: queue does not exist"); + error!("failed to truncate shard `{queue_id}`: WAL queue not found"); + self.shards.remove(queue_id); + self.rate_trackers.remove(queue_id); + info!("deleted dangling shard `{queue_id}`"); } - Err(error) => { - error!(%error, "failed to truncate WAL queue `{queue_id}`"); + Err(TruncateError::IoError(io_error)) => { + error!("failed to truncate shard `{queue_id}`: {io_error}"); } }; } /// Deletes the shard identified by `queue_id` from the ingester state. It removes the - /// mrecordlog queue first and then, if the operation is successful, removes the shard. + /// mrecordlog queue first and then removes the associated in-memory shard and rate trackers. async fn delete_shard(&mut self, queue_id: &QueueId) { match self.mrecordlog.delete_queue(queue_id).await { - Ok(_) => { + Ok(_) | Err(DeleteQueueError::MissingQueue(_)) => { self.shards.remove(queue_id); self.rate_trackers.remove(queue_id); - - info!("deleted shard `{queue_id}` from ingester"); + info!("deleted shard `{queue_id}`"); } - Err(DeleteQueueError::MissingQueue(_)) => { - // The shard has already been deleted. - } - Err(DeleteQueueError::IoError(_)) => { - panic!("TODO: handle IO error") + Err(DeleteQueueError::IoError(io_error)) => { + error!("failed to delete shard `{queue_id}`: {io_error}"); } }; } @@ -1152,6 +1203,7 @@ mod tests { use crate::ingest_v2::broadcast::ShardInfos; use crate::ingest_v2::fetch::tests::{into_fetch_eof, into_fetch_payload}; use crate::ingest_v2::test_utils::MultiRecordLogTestExt; + use crate::MRecord; pub(super) struct IngesterForTest { node_id: NodeId, @@ -1501,6 +1553,109 @@ mod tests { ); } + #[tokio::test] + async fn test_ingester_persist_empty() { + let (ingester_ctx, mut ingester) = IngesterForTest::default().build().await; + + let persist_request = PersistRequest { + leader_id: ingester_ctx.node_id.to_string(), + commit_type: CommitTypeV2::Force as i32, + subrequests: Vec::new(), + }; + let persist_response = ingester.persist(persist_request).await.unwrap(); + assert_eq!(persist_response.leader_id, "test-ingester"); + assert_eq!(persist_response.successes.len(), 0); + assert_eq!(persist_response.failures.len(), 0); + + let persist_request = PersistRequest { + leader_id: "test-ingester".to_string(), + commit_type: CommitTypeV2::Force as i32, + subrequests: vec![PersistSubrequest { + subrequest_id: 0, + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + doc_batch: None, + }], + }; + + let init_shards_request = InitShardsRequest { + shards: vec![Shard { + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + shard_state: ShardState::Open as i32, + leader_id: ingester_ctx.node_id.to_string(), + ..Default::default() + }], + }; + ingester.init_shards(init_shards_request).await.unwrap(); + + let persist_response = ingester.persist(persist_request).await.unwrap(); + assert_eq!(persist_response.leader_id, "test-ingester"); + assert_eq!(persist_response.successes.len(), 1); + assert_eq!(persist_response.failures.len(), 0); + + let persist_success = &persist_response.successes[0]; + assert_eq!(persist_success.subrequest_id, 0); + assert_eq!(persist_success.index_uid, "test-index:0"); + assert_eq!(persist_success.source_id, "test-source"); + assert_eq!(persist_success.shard_id, 1); + assert_eq!( + persist_success.replication_position_inclusive, + Some(Position::Beginning) + ); + } + + #[tokio::test] + async fn test_ingester_persist_deletes_dangling_shard() { + let (_ingester_ctx, mut ingester) = IngesterForTest::default().build().await; + + let mut state_guard = ingester.state.write().await; + let queue_id = queue_id("test-index:0", "test-source", 1); + let solo_shard = + IngesterShard::new_solo(ShardState::Open, Position::Beginning, Position::Beginning); + state_guard.shards.insert(queue_id.clone(), solo_shard); + + let rate_limiter = RateLimiter::from_settings(RateLimiterSettings::default()); + let rate_meter = RateMeter::default(); + state_guard + .rate_trackers + .insert(queue_id.clone(), (rate_limiter, rate_meter)); + + drop(state_guard); + + let persist_request = PersistRequest { + leader_id: "test-ingester".to_string(), + commit_type: CommitTypeV2::Force as i32, + subrequests: vec![PersistSubrequest { + subrequest_id: 0, + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + doc_batch: Some(DocBatchV2::for_test(["test-doc-foo"])), + }], + }; + let persist_response = ingester.persist(persist_request).await.unwrap(); + assert_eq!(persist_response.leader_id, "test-ingester"); + assert_eq!(persist_response.successes.len(), 0); + assert_eq!(persist_response.failures.len(), 1); + + let persist_failure = &persist_response.failures[0]; + assert_eq!(persist_failure.subrequest_id, 0); + assert_eq!(persist_failure.index_uid, "test-index:0"); + assert_eq!(persist_failure.source_id, "test-source"); + assert_eq!(persist_failure.shard_id, 1); + assert_eq!( + persist_failure.reason(), + PersistFailureReason::ShardNotFound + ); + + let state_guard = ingester.state.read().await; + assert_eq!(state_guard.shards.len(), 0); + assert_eq!(state_guard.rate_trackers.len(), 0); + } + #[tokio::test] async fn test_ingester_persist_replicate() { let (leader_ctx, mut leader) = IngesterForTest::default() @@ -2144,7 +2299,7 @@ mod tests { } #[tokio::test] - async fn test_ingester_truncate() { + async fn test_ingester_truncate_shards() { let (ingester_ctx, mut ingester) = IngesterForTest::default().build().await; let shard_01 = Shard { @@ -2245,6 +2400,44 @@ mod tests { assert!(!state_guard.mrecordlog.queue_exists(&queue_id_02)); } + #[tokio::test] + async fn test_ingester_truncate_shards_deletes_dangling_shards() { + let (ingester_ctx, mut ingester) = IngesterForTest::default().build().await; + + let queue_id = queue_id("test-index:0", "test-source", 1); + + let mut state_guard = ingester.state.write().await; + let solo_shard = + IngesterShard::new_solo(ShardState::Open, Position::Beginning, Position::Beginning); + state_guard.shards.insert(queue_id.clone(), solo_shard); + + let rate_limiter = RateLimiter::from_settings(RateLimiterSettings::default()); + let rate_meter = RateMeter::default(); + state_guard + .rate_trackers + .insert(queue_id.clone(), (rate_limiter, rate_meter)); + + drop(state_guard); + + let truncate_shards_request = TruncateShardsRequest { + ingester_id: ingester_ctx.node_id.to_string(), + subrequests: vec![TruncateShardsSubrequest { + index_uid: "test-index:0".to_string(), + source_id: "test-source".to_string(), + shard_id: 1, + truncate_up_to_position_inclusive: Some(Position::offset(0u64)), + }], + }; + ingester + .truncate_shards(truncate_shards_request.clone()) + .await + .unwrap(); + + let state_guard = ingester.state.read().await; + assert_eq!(state_guard.shards.len(), 0); + assert_eq!(state_guard.rate_trackers.len(), 0); + } + #[tokio::test] async fn test_ingester_retain_shards() { let (_ingester_ctx, mut ingester) = IngesterForTest::default().build().await; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs b/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs index d32022d47c0..b20f7effcf2 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs @@ -18,12 +18,62 @@ // along with this program. If not, see . use std::io; +use std::iter::once; use std::ops::RangeInclusive; use bytesize::ByteSize; -use mrecordlog::error::DeleteQueueError; +use mrecordlog::error::{AppendError, DeleteQueueError}; use mrecordlog::MultiRecordLog; -use quickwit_proto::types::QueueId; +use quickwit_proto::ingest::DocBatchV2; +use quickwit_proto::types::{Position, QueueId}; + +use crate::MRecord; + +#[derive(Debug, thiserror::Error)] +pub(super) enum AppendDocBatchError { + #[error("IO error: {0}")] + Io(#[from] io::Error), + #[error("WAL queue `{0}` not found")] + QueueNotFound(QueueId), +} + +/// Appends a non-empty document batch to the WAL queue `queue_id`. +/// +/// # Panics +/// +/// Panics if `doc_batch` is empty. +pub(super) async fn append_non_empty_doc_batch( + mrecordlog: &mut MultiRecordLog, + queue_id: &QueueId, + doc_batch: &DocBatchV2, + force_commit: bool, +) -> Result { + let append_result = if force_commit { + let encoded_mrecords = doc_batch + .docs() + .map(|doc| MRecord::Doc(doc).encode()) + .chain(once(MRecord::Commit.encode())); + mrecordlog + .append_records(queue_id, None, encoded_mrecords) + .await + } else { + let encoded_mrecords = doc_batch.docs().map(|doc| MRecord::Doc(doc).encode()); + mrecordlog + .append_records(queue_id, None, encoded_mrecords) + .await + }; + match append_result { + Ok(Some(offset)) => Ok(Position::offset(offset)), + Ok(None) => panic!("`doc_batch` should not be empty"), + Err(AppendError::IoError(io_error)) => Err(AppendDocBatchError::Io(io_error)), + Err(AppendError::MissingQueue(queue_id)) => { + Err(AppendDocBatchError::QueueNotFound(queue_id)) + } + Err(AppendError::Past) => { + panic!("`append_records` should be called with `position_opt: None`") + } + } +} #[derive(Debug, Clone, Copy)] pub(super) struct MRecordLogUsage { @@ -31,9 +81,6 @@ pub(super) struct MRecordLogUsage { pub memory: ByteSize, } -#[derive(Debug, Clone, Copy)] -pub(super) struct MemoryUsage(ByteSize); - /// Error returned when the mrecordlog does not have enough capacity to store some records. #[derive(Debug, Clone, Copy, thiserror::Error)] pub(super) enum NotEnoughCapacityError { @@ -123,6 +170,37 @@ pub(super) fn queue_position_range( mod tests { use super::*; + #[tokio::test] + async fn test_append_non_empty_doc_batch() { + let tempdir = tempfile::tempdir().unwrap(); + let mut mrecordlog = MultiRecordLog::open(tempdir.path()).await.unwrap(); + + let queue_id = "test-queue".to_string(); + let doc_batch = DocBatchV2::for_test(["test-doc-foo"]); + + let append_error = + append_non_empty_doc_batch(&mut mrecordlog, &queue_id, &doc_batch, false) + .await + .unwrap_err(); + + assert!(matches!( + append_error, + AppendDocBatchError::QueueNotFound(..) + )); + + mrecordlog.create_queue(&queue_id).await.unwrap(); + + let position = append_non_empty_doc_batch(&mut mrecordlog, &queue_id, &doc_batch, false) + .await + .unwrap(); + assert_eq!(position, Position::offset(0u64)); + + let position = append_non_empty_doc_batch(&mut mrecordlog, &queue_id, &doc_batch, true) + .await + .unwrap(); + assert_eq!(position, Position::offset(2u64)); + } + #[tokio::test] async fn test_check_enough_capacity() { let tempdir = tempfile::tempdir().unwrap();