Skip to content

Commit

Permalink
Close shards on IO error
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Dec 19, 2023
1 parent 4e7764d commit 86a0c74
Show file tree
Hide file tree
Showing 2 changed files with 316 additions and 45 deletions.
273 changes: 233 additions & 40 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::iter::once;
use std::path::Path;
use std::sync::{Arc, Weak};
use std::time::Duration;
Expand Down Expand Up @@ -56,8 +55,9 @@ use tracing::{debug, error, info, warn};
use super::fetch::FetchStreamTask;
use super::metrics::INGEST_V2_METRICS;
use super::models::IngesterShard;
use super::mrecord::MRecord;
use super::mrecordlog_utils::{check_enough_capacity, force_delete_queue};
use super::mrecordlog_utils::{
append_non_empty_doc_batch, check_enough_capacity, force_delete_queue, AppendDocBatchError,
};
use super::rate_meter::RateMeter;
use super::replication::{
ReplicationClient, ReplicationStreamTask, ReplicationStreamTaskHandle, ReplicationTask,
Expand Down Expand Up @@ -120,6 +120,10 @@ impl Ingester {
replication_factor: usize,
) -> IngestV2Result<Self> {
let self_node_id: NodeId = cluster.self_node_id().into();
info!(
"opening write-ahead log located at `{}`",
wal_dir_path.display()
);
let mrecordlog = MultiRecordLog::open_with_prefs(
wal_dir_path,
mrecordlog::SyncPolicy::OnDelay(Duration::from_secs(5)),
Expand Down Expand Up @@ -232,10 +236,12 @@ impl Ingester {
num_closed_shards += 1;
} else {
// The queue is empty: delete it.
force_delete_queue(&mut state_guard.mrecordlog, &queue_id)
.await
.expect("TODO: handle IO error");

if let Err(io_error) =
force_delete_queue(&mut state_guard.mrecordlog, &queue_id).await
{
error!("failed to delete shard `{queue_id}`: {io_error}");
continue;
}
num_deleted_shards += 1;
}
}
Expand Down Expand Up @@ -297,6 +303,7 @@ impl Ingester {
.await?;

if let Err(error) = replication_client.init_replica(shard).await {
// TODO: Remove dangling queue from the WAL.
error!("failed to initialize replica shard: {error}",);
return Err(IngestV2Error::Internal(format!(
"failed to initialize replica shard: {error}"
Expand Down Expand Up @@ -395,6 +402,13 @@ impl Ingester {
let mut persist_failures = Vec::new();
let mut replicate_subrequests: HashMap<NodeId, Vec<ReplicateSubrequest>> = HashMap::new();

// Keep track of the shards that need to be closed following an IO error.
let mut shards_to_close: HashSet<QueueId> = HashSet::new();

// Keep track of dangling shards, i.e., shards for which there is no longer a corresponding
// queue in the WAL and should be deleted.
let mut shards_to_delete: HashSet<QueueId> = HashSet::new();

let commit_type = persist_request.commit_type();
let force_commit = commit_type == CommitTypeV2::Force;
let leader_id: NodeId = persist_request.leader_id.into();
Expand Down Expand Up @@ -515,27 +529,43 @@ impl Ingester {

rate_meter.update(batch_num_bytes);

let current_position_inclusive: Position = if force_commit {
let encoded_mrecords = doc_batch
.docs()
.map(|doc| MRecord::Doc(doc).encode())
.chain(once(MRecord::Commit.encode()));
state_guard
.mrecordlog
.append_records(&queue_id, None, encoded_mrecords)
.await
.expect("TODO") // TODO: Io error, close shard?
} else {
let encoded_mrecords = doc_batch.docs().map(|doc| MRecord::Doc(doc).encode());
state_guard
.mrecordlog
.append_records(&queue_id, None, encoded_mrecords)
.await
.expect("TODO") // TODO: Io error, close shard?
}
.map(Position::offset)
.expect("records should not be empty");
let append_result = append_non_empty_doc_batch(
&mut state_guard.mrecordlog,
&queue_id,
&doc_batch,
force_commit,
)
.await;

let current_position_inclusive = match append_result {
Ok(current_position_inclusive) => current_position_inclusive,
Err(append_error) => {
let reason = match &append_error {
AppendDocBatchError::Io(io_error) => {
error!("failed to persist records to shard `{queue_id}`: {io_error}");
shards_to_close.insert(queue_id);
PersistFailureReason::ShardClosed
}
AppendDocBatchError::QueueNotFound(_) => {
error!(
"failed to persist records to shard `{queue_id}`: WAL queue not \
found"
);
shards_to_delete.insert(queue_id);
PersistFailureReason::ShardNotFound
}
};
let persist_failure = PersistFailure {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
reason: reason as i32,
};
persist_failures.push(persist_failure);
continue;
}
};
// It's more precise the compute the new usage from the current usage + the requested
// capacity than from continuously summing up the requested capacities, which are
// approximations.
Expand Down Expand Up @@ -583,6 +613,28 @@ impl Ingester {
persist_successes.push(persist_success);
}
}
if !shards_to_close.is_empty() {
for queue_id in &shards_to_close {
let shard = state_guard
.shards
.get_mut(queue_id)
.expect("shard should exist");

shard.shard_state = ShardState::Closed;
shard.notify_shard_status();
}
info!(
"closed {} shard(s) following IO error(s)",
shards_to_close.len()
);
}
if !shards_to_delete.is_empty() {
for queue_id in &shards_to_delete {
state_guard.shards.remove(queue_id);
state_guard.rate_trackers.remove(queue_id);
}
info!("deleted {} dangling shard(s)", shards_to_delete.len());
}
if replicate_subrequests.is_empty() {
let leader_id = self.self_node_id.to_string();
let persist_response = PersistResponse {
Expand Down Expand Up @@ -1042,29 +1094,28 @@ impl IngesterState {
shard.truncation_position_inclusive = truncate_up_to_position_inclusive;
}
Err(TruncateError::MissingQueue(_)) => {
warn!("failed to truncate WAL queue `{queue_id}`: queue does not exist");
error!("failed to truncate shard `{queue_id}`: WAL queue not found");
self.shards.remove(queue_id);
self.rate_trackers.remove(queue_id);
info!("deleted dangling shard `{queue_id}`");
}
Err(error) => {
error!(%error, "failed to truncate WAL queue `{queue_id}`");
Err(TruncateError::IoError(io_error)) => {
error!("failed to truncate shard `{queue_id}`: {io_error}");
}
};
}

/// Deletes the shard identified by `queue_id` from the ingester state. It removes the
/// mrecordlog queue first and then, if the operation is successful, removes the shard.
/// mrecordlog queue first and then removes the associated in-memory shard and rate trackers.
async fn delete_shard(&mut self, queue_id: &QueueId) {
match self.mrecordlog.delete_queue(queue_id).await {
Ok(_) => {
Ok(_) | Err(DeleteQueueError::MissingQueue(_)) => {
self.shards.remove(queue_id);
self.rate_trackers.remove(queue_id);

info!("deleted shard `{queue_id}` from ingester");
info!("deleted shard `{queue_id}`");
}
Err(DeleteQueueError::MissingQueue(_)) => {
// The shard has already been deleted.
}
Err(DeleteQueueError::IoError(_)) => {
panic!("TODO: handle IO error")
Err(DeleteQueueError::IoError(io_error)) => {
error!("failed to delete shard `{queue_id}`: {io_error}");
}
};
}
Expand Down Expand Up @@ -1152,6 +1203,7 @@ mod tests {
use crate::ingest_v2::broadcast::ShardInfos;
use crate::ingest_v2::fetch::tests::{into_fetch_eof, into_fetch_payload};
use crate::ingest_v2::test_utils::MultiRecordLogTestExt;
use crate::MRecord;

pub(super) struct IngesterForTest {
node_id: NodeId,
Expand Down Expand Up @@ -1501,6 +1553,109 @@ mod tests {
);
}

#[tokio::test]
async fn test_ingester_persist_empty() {
let (ingester_ctx, mut ingester) = IngesterForTest::default().build().await;

let persist_request = PersistRequest {
leader_id: ingester_ctx.node_id.to_string(),
commit_type: CommitTypeV2::Force as i32,
subrequests: Vec::new(),
};
let persist_response = ingester.persist(persist_request).await.unwrap();
assert_eq!(persist_response.leader_id, "test-ingester");
assert_eq!(persist_response.successes.len(), 0);
assert_eq!(persist_response.failures.len(), 0);

let persist_request = PersistRequest {
leader_id: "test-ingester".to_string(),
commit_type: CommitTypeV2::Force as i32,
subrequests: vec![PersistSubrequest {
subrequest_id: 0,
index_uid: "test-index:0".to_string(),
source_id: "test-source".to_string(),
shard_id: 1,
doc_batch: None,
}],
};

let init_shards_request = InitShardsRequest {
shards: vec![Shard {
index_uid: "test-index:0".to_string(),
source_id: "test-source".to_string(),
shard_id: 1,
shard_state: ShardState::Open as i32,
leader_id: ingester_ctx.node_id.to_string(),
..Default::default()
}],
};
ingester.init_shards(init_shards_request).await.unwrap();

let persist_response = ingester.persist(persist_request).await.unwrap();
assert_eq!(persist_response.leader_id, "test-ingester");
assert_eq!(persist_response.successes.len(), 1);
assert_eq!(persist_response.failures.len(), 0);

let persist_success = &persist_response.successes[0];
assert_eq!(persist_success.subrequest_id, 0);
assert_eq!(persist_success.index_uid, "test-index:0");
assert_eq!(persist_success.source_id, "test-source");
assert_eq!(persist_success.shard_id, 1);
assert_eq!(
persist_success.replication_position_inclusive,
Some(Position::Beginning)
);
}

#[tokio::test]
async fn test_ingester_persist_deletes_dangling_shard() {
let (_ingester_ctx, mut ingester) = IngesterForTest::default().build().await;

let mut state_guard = ingester.state.write().await;
let queue_id = queue_id("test-index:0", "test-source", 1);
let solo_shard =
IngesterShard::new_solo(ShardState::Open, Position::Beginning, Position::Beginning);
state_guard.shards.insert(queue_id.clone(), solo_shard);

let rate_limiter = RateLimiter::from_settings(RateLimiterSettings::default());
let rate_meter = RateMeter::default();
state_guard
.rate_trackers
.insert(queue_id.clone(), (rate_limiter, rate_meter));

drop(state_guard);

let persist_request = PersistRequest {
leader_id: "test-ingester".to_string(),
commit_type: CommitTypeV2::Force as i32,
subrequests: vec![PersistSubrequest {
subrequest_id: 0,
index_uid: "test-index:0".to_string(),
source_id: "test-source".to_string(),
shard_id: 1,
doc_batch: Some(DocBatchV2::for_test(["test-doc-foo"])),
}],
};
let persist_response = ingester.persist(persist_request).await.unwrap();
assert_eq!(persist_response.leader_id, "test-ingester");
assert_eq!(persist_response.successes.len(), 0);
assert_eq!(persist_response.failures.len(), 1);

let persist_failure = &persist_response.failures[0];
assert_eq!(persist_failure.subrequest_id, 0);
assert_eq!(persist_failure.index_uid, "test-index:0");
assert_eq!(persist_failure.source_id, "test-source");
assert_eq!(persist_failure.shard_id, 1);
assert_eq!(
persist_failure.reason(),
PersistFailureReason::ShardNotFound
);

let state_guard = ingester.state.read().await;
assert_eq!(state_guard.shards.len(), 0);
assert_eq!(state_guard.rate_trackers.len(), 0);
}

#[tokio::test]
async fn test_ingester_persist_replicate() {
let (leader_ctx, mut leader) = IngesterForTest::default()
Expand Down Expand Up @@ -2144,7 +2299,7 @@ mod tests {
}

#[tokio::test]
async fn test_ingester_truncate() {
async fn test_ingester_truncate_shards() {
let (ingester_ctx, mut ingester) = IngesterForTest::default().build().await;

let shard_01 = Shard {
Expand Down Expand Up @@ -2245,6 +2400,44 @@ mod tests {
assert!(!state_guard.mrecordlog.queue_exists(&queue_id_02));
}

#[tokio::test]
async fn test_ingester_truncate_shards_deletes_dangling_shards() {
let (ingester_ctx, mut ingester) = IngesterForTest::default().build().await;

let queue_id = queue_id("test-index:0", "test-source", 1);

let mut state_guard = ingester.state.write().await;
let solo_shard =
IngesterShard::new_solo(ShardState::Open, Position::Beginning, Position::Beginning);
state_guard.shards.insert(queue_id.clone(), solo_shard);

let rate_limiter = RateLimiter::from_settings(RateLimiterSettings::default());
let rate_meter = RateMeter::default();
state_guard
.rate_trackers
.insert(queue_id.clone(), (rate_limiter, rate_meter));

drop(state_guard);

let truncate_shards_request = TruncateShardsRequest {
ingester_id: ingester_ctx.node_id.to_string(),
subrequests: vec![TruncateShardsSubrequest {
index_uid: "test-index:0".to_string(),
source_id: "test-source".to_string(),
shard_id: 1,
truncate_up_to_position_inclusive: Some(Position::offset(0u64)),
}],
};
ingester
.truncate_shards(truncate_shards_request.clone())
.await
.unwrap();

let state_guard = ingester.state.read().await;
assert_eq!(state_guard.shards.len(), 0);
assert_eq!(state_guard.rate_trackers.len(), 0);
}

#[tokio::test]
async fn test_ingester_retain_shards() {
let (_ingester_ctx, mut ingester) = IngesterForTest::default().build().await;
Expand Down
Loading

0 comments on commit 86a0c74

Please sign in to comment.