Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Dec 18, 2023
1 parent 35eba19 commit 4bd03f6
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 41 deletions.
111 changes: 75 additions & 36 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::iter::once;
use std::path::Path;
use std::sync::{Arc, Weak};
use std::time::Duration;
Expand Down Expand Up @@ -56,8 +55,9 @@ use tracing::{debug, error, info, warn};
use super::fetch::FetchStreamTask;
use super::metrics::INGEST_V2_METRICS;
use super::models::IngesterShard;
use super::mrecord::MRecord;
use super::mrecordlog_utils::{check_enough_capacity, force_delete_queue};
use super::mrecordlog_utils::{
append_non_empty_doc_batch, check_enough_capacity, force_delete_queue, AppendDocBatchError,
};
use super::rate_meter::RateMeter;
use super::replication::{
ReplicationClient, ReplicationStreamTask, ReplicationStreamTaskHandle, ReplicationTask,
Expand Down Expand Up @@ -232,10 +232,12 @@ impl Ingester {
num_closed_shards += 1;
} else {
// The queue is empty: delete it.
force_delete_queue(&mut state_guard.mrecordlog, &queue_id)
.await
.expect("TODO: handle IO error");

if let Err(io_error) =
force_delete_queue(&mut state_guard.mrecordlog, &queue_id).await
{
error!("failed to delete WAL queue `{queue_id}`: {io_error}",);
continue;
}
num_deleted_shards += 1;
}
}
Expand Down Expand Up @@ -297,6 +299,7 @@ impl Ingester {
.await?;

if let Err(error) = replication_client.init_replica(shard).await {
// TODO: Remove the dangling queue from the WAL.
error!("failed to initialize replica shard: {error}",);
return Err(IngestV2Error::Internal(format!(
"failed to initialize replica shard: {error}"
Expand Down Expand Up @@ -395,6 +398,9 @@ impl Ingester {
let mut persist_failures = Vec::new();
let mut replicate_subrequests: HashMap<NodeId, Vec<ReplicateSubrequest>> = HashMap::new();

// Keep track of the shards that need to be closed after an IO error.
let mut shards_to_close: HashSet<QueueId> = HashSet::new();

let commit_type = persist_request.commit_type();
let force_commit = commit_type == CommitTypeV2::Force;
let leader_id: NodeId = persist_request.leader_id.into();
Expand Down Expand Up @@ -515,27 +521,42 @@ impl Ingester {

rate_meter.update(batch_num_bytes);

let current_position_inclusive: Position = if force_commit {
let encoded_mrecords = doc_batch
.docs()
.map(|doc| MRecord::Doc(doc).encode())
.chain(once(MRecord::Commit.encode()));
state_guard
.mrecordlog
.append_records(&queue_id, None, encoded_mrecords)
.await
.expect("TODO") // TODO: Io error, close shard?
} else {
let encoded_mrecords = doc_batch.docs().map(|doc| MRecord::Doc(doc).encode());
state_guard
.mrecordlog
.append_records(&queue_id, None, encoded_mrecords)
.await
.expect("TODO") // TODO: Io error, close shard?
}
.map(Position::offset)
.expect("records should not be empty");
let append_result = append_non_empty_doc_batch(
&mut state_guard.mrecordlog,
&queue_id,
&doc_batch,
force_commit,
)
.await;

let current_position_inclusive = match append_result {
Ok(current_position_inclusive) => current_position_inclusive,
Err(append_error) => {
let reason = match &append_error {
AppendDocBatchError::Io(io_error) => {
error!("failed to append records to shard `{queue_id}`: {io_error}");
shards_to_close.insert(queue_id);
PersistFailureReason::ShardClosed
}
AppendDocBatchError::QueueNotFound(_) => {
error!(
"WAL queue `{queue_id}` does not exist: this should never happen, \
please report this issue on GitHub",
);
PersistFailureReason::ShardNotFound
}
};
let persist_failure = PersistFailure {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
reason: reason as i32,
};
persist_failures.push(persist_failure);
continue;
}
};
// It's more precise the compute the new usage from the current usage + the requested
// capacity than from continuously summing up the requested capacities, which are
// approximations.
Expand Down Expand Up @@ -583,6 +604,16 @@ impl Ingester {
persist_successes.push(persist_success);
}
}
for queue_id in shards_to_close {
let shard = state_guard
.shards
.get_mut(&queue_id)
.expect("shard should exist");
{
shard.shard_state = ShardState::Closed;
shard.notify_shard_status();
}
}
if replicate_subrequests.is_empty() {
let leader_id = self.self_node_id.to_string();
let persist_response = PersistResponse {
Expand Down Expand Up @@ -1042,10 +1073,13 @@ impl IngesterState {
shard.truncation_position_inclusive = truncate_up_to_position_inclusive;
}
Err(TruncateError::MissingQueue(_)) => {
warn!("failed to truncate WAL queue `{queue_id}`: queue does not exist");
error!(
"WAL queue `{queue_id}` does not exist: this should never happen, please \
report this issue on GitHub",
);
}
Err(error) => {
error!(%error, "failed to truncate WAL queue `{queue_id}`");
Err(TruncateError::IoError(io_error)) => {
error!("failed to truncate shard `{queue_id}`: {io_error}");
}
};
}
Expand All @@ -1055,16 +1089,20 @@ impl IngesterState {
async fn delete_shard(&mut self, queue_id: &QueueId) {
match self.mrecordlog.delete_queue(queue_id).await {
Ok(_) => {
self.shards.remove(queue_id);
self.rate_trackers.remove(queue_id);

info!("deleted shard `{queue_id}` from ingester");
let removed_shard_opt = self.shards.remove(queue_id);
let removed_rate_track_opt = self.rate_trackers.remove(queue_id);
debug_assert!(removed_shard_opt.is_some() && removed_rate_track_opt.is_some());
info!("deleted shard `{queue_id}`");
}
Err(DeleteQueueError::MissingQueue(_)) => {
// The shard has already been deleted.
let removed_shard_opt = self.shards.remove(queue_id);
let removed_rate_track_opt = self.rate_trackers.remove(queue_id);
debug_assert!(removed_shard_opt.is_none() && removed_rate_track_opt.is_none());
info!("deleted shard `{queue_id}`");
}
Err(DeleteQueueError::IoError(_)) => {
panic!("TODO: handle IO error")
Err(DeleteQueueError::IoError(io_error)) => {
error!("failed to delete WAL queue `{queue_id}`: {io_error}");
}
};
}
Expand Down Expand Up @@ -1152,6 +1190,7 @@ mod tests {
use crate::ingest_v2::broadcast::ShardInfos;
use crate::ingest_v2::fetch::tests::{into_fetch_eof, into_fetch_payload};
use crate::ingest_v2::test_utils::MultiRecordLogTestExt;
use crate::MRecord;

pub(super) struct IngesterForTest {
node_id: NodeId,
Expand Down
57 changes: 52 additions & 5 deletions quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,69 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::io;
use std::iter::once;
use std::ops::RangeInclusive;

use bytesize::ByteSize;
use mrecordlog::error::DeleteQueueError;
use mrecordlog::error::{AppendError, DeleteQueueError};
use mrecordlog::MultiRecordLog;
use quickwit_proto::types::QueueId;
use quickwit_proto::ingest::DocBatchV2;
use quickwit_proto::types::{Position, QueueId};

use crate::MRecord;

#[derive(Debug, thiserror::Error)]
pub(super) enum AppendDocBatchError {
#[error("IO error: {0}")]
Io(#[from] io::Error),
#[error("WAL queue `{0}` not found")]
QueueNotFound(QueueId),
}

/// Appends a non-empty document batch to the WAL queue `queue_id`.
///
/// # Panics
///
/// Panics if `doc_batch` is empty.
pub(super) async fn append_non_empty_doc_batch(
mrecordlog: &mut MultiRecordLog,
queue_id: &QueueId,
doc_batch: &DocBatchV2,
force_commit: bool,
) -> Result<Position, AppendDocBatchError> {
let append_result = if force_commit {
let encoded_mrecords = doc_batch
.docs()
.map(|doc| MRecord::Doc(doc).encode())
.chain(once(MRecord::Commit.encode()));
mrecordlog
.append_records(&queue_id, None, encoded_mrecords)
.await
} else {
let encoded_mrecords = doc_batch.docs().map(|doc| MRecord::Doc(doc).encode());
mrecordlog
.append_records(&queue_id, None, encoded_mrecords)
.await
};
match append_result {
Ok(Some(offset)) => Ok(Position::offset(offset)),
Ok(None) => panic!("`doc_batch` should not be empty"),
Err(AppendError::IoError(io_error)) => Err(AppendDocBatchError::Io(io_error)),
Err(AppendError::MissingQueue(queue_id)) => {
Err(AppendDocBatchError::QueueNotFound(queue_id))
}
Err(AppendError::Past) => {
panic!("`append_records` should be called with `position_opt: None`")
}
}
}

#[derive(Debug, Clone, Copy)]
pub(super) struct MRecordLogUsage {
pub disk: ByteSize,
pub memory: ByteSize,
}

#[derive(Debug, Clone, Copy)]
pub(super) struct MemoryUsage(ByteSize);

/// Error returned when the mrecordlog does not have enough capacity to store some records.
#[derive(Debug, Clone, Copy, thiserror::Error)]
pub(super) enum NotEnoughCapacityError {
Expand Down

0 comments on commit 4bd03f6

Please sign in to comment.