diff --git a/quickwit/quickwit-cli/src/index.rs b/quickwit/quickwit-cli/src/index.rs
index 69c621a4b28..feff585db26 100644
--- a/quickwit/quickwit-cli/src/index.rs
+++ b/quickwit/quickwit-cli/src/index.rs
@@ -891,8 +891,8 @@ pub async fn delete_index_cli(args: DeleteIndexArgs) -> anyhow::Result<()> {
"The following files will be removed from the index `{}`",
args.index_id
);
- for file_entry in affected_files {
- println!(" - {}", file_entry.file_name);
+ for split_info in affected_files {
+ println!(" - {}", split_info.file_name.display());
}
return Ok(());
}
diff --git a/quickwit/quickwit-cli/src/tool.rs b/quickwit/quickwit-cli/src/tool.rs
index 96b88706ca7..4709e00df08 100644
--- a/quickwit/quickwit-cli/src/tool.rs
+++ b/quickwit/quickwit-cli/src/tool.rs
@@ -510,47 +510,47 @@ pub async fn garbage_collect_index_cli(args: GarbageCollectIndexArgs) -> anyhow:
let removal_info = index_service
.garbage_collect_index(&args.index_id, args.grace_period, args.dry_run)
.await?;
- if removal_info.removed_split_entries.is_empty() && removal_info.failed_split_ids.is_empty() {
+ if removal_info.removed_split_entries.is_empty() && removal_info.failed_splits.is_empty() {
println!("No dangling files to garbage collect.");
return Ok(());
}
if args.dry_run {
println!("The following files will be garbage collected.");
- for file_entry in removal_info.removed_split_entries {
- println!(" - {}", file_entry.file_name);
+ for split_info in removal_info.removed_split_entries {
+ println!(" - {}", split_info.file_name.display());
}
return Ok(());
}
- if !removal_info.failed_split_ids.is_empty() {
+ if !removal_info.failed_splits.is_empty() {
println!("The following splits were attempted to be removed, but failed.");
- for split_id in removal_info.failed_split_ids.iter() {
- println!(" - {split_id}");
+ for split_info in &removal_info.failed_splits {
+ println!(" - {}", split_info.split_id);
}
println!(
"{} Splits were unable to be removed.",
- removal_info.failed_split_ids.len()
+ removal_info.failed_splits.len()
);
}
let deleted_bytes: u64 = removal_info
.removed_split_entries
.iter()
- .map(|entry| entry.file_size_in_bytes)
+ .map(|split_info| split_info.file_size_bytes.get_bytes())
.sum();
println!(
"{}MB of storage garbage collected.",
deleted_bytes / 1_000_000
);
- if removal_info.failed_split_ids.is_empty() {
+ if removal_info.failed_splits.is_empty() {
println!(
"{} Index successfully garbage collected.",
"✔".color(GREEN_COLOR)
);
} else if removal_info.removed_split_entries.is_empty()
- && !removal_info.failed_split_ids.is_empty()
+ && !removal_info.failed_splits.is_empty()
{
println!("{} Failed to garbage collect index.", "✘".color(RED_COLOR));
} else {
diff --git a/quickwit/quickwit-common/src/file_entry.rs b/quickwit/quickwit-common/src/file_entry.rs
deleted file mode 100644
index 445c0a5c0fb..00000000000
--- a/quickwit/quickwit-common/src/file_entry.rs
+++ /dev/null
@@ -1,29 +0,0 @@
-// Copyright (C) 2023 Quickwit, Inc.
-//
-// Quickwit is offered under the AGPL v3.0 and as commercial software.
-// For commercial licensing, contact us at hello@quickwit.io.
-//
-// AGPL:
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as
-// published by the Free Software Foundation, either version 3 of the
-// License, or (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see .
-
-use serde::{Deserialize, Serialize};
-
-#[allow(missing_docs)]
-#[derive(Clone, Debug, Deserialize, Serialize, utoipa::ToSchema)]
-pub struct FileEntry {
- /// The file_name is a file name, within an index directory.
- pub file_name: String,
- /// File size in bytes.
- pub file_size_in_bytes: u64, //< TODO switch to `byte_unit::Byte`.
-}
diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs
index 9ee3d285aa8..5b390432416 100644
--- a/quickwit/quickwit-common/src/lib.rs
+++ b/quickwit/quickwit-common/src/lib.rs
@@ -22,7 +22,6 @@
mod coolid;
pub mod binary_heap;
-mod file_entry;
pub mod fs;
pub mod io;
mod kill_switch;
@@ -49,7 +48,6 @@ use std::ops::{Range, RangeInclusive};
use std::str::FromStr;
pub use coolid::new_coolid;
-pub use file_entry::FileEntry;
pub use kill_switch::KillSwitch;
pub use progress::{Progress, ProtectedZoneGuard};
pub use stream_utils::{BoxStream, ServiceStream};
diff --git a/quickwit/quickwit-core/src/index.rs b/quickwit/quickwit-core/src/index.rs
index 3b18ad8de3a..356156b13ae 100644
--- a/quickwit/quickwit-core/src/index.rs
+++ b/quickwit/quickwit-core/src/index.rs
@@ -22,14 +22,14 @@ use std::sync::Arc;
use std::time::Duration;
use quickwit_common::fs::{empty_dir, get_cache_directory_path};
-use quickwit_common::FileEntry;
use quickwit_config::{validate_identifier, IndexConfig, SourceConfig};
use quickwit_indexing::check_source_connectivity;
use quickwit_janitor::{
- delete_splits_with_files, run_garbage_collect, SplitDeletionError, SplitRemovalInfo,
+ delete_splits_from_storage_and_metastore, run_garbage_collect, DeleteSplitsError,
+ SplitRemovalInfo,
};
use quickwit_metastore::{
- IndexMetadata, ListSplitsQuery, Metastore, MetastoreError, SplitMetadata, SplitState,
+ IndexMetadata, ListSplitsQuery, Metastore, MetastoreError, SplitInfo, SplitMetadata, SplitState,
};
use quickwit_proto::{IndexUid, ServiceError, ServiceErrorCode};
use quickwit_storage::{StorageResolver, StorageResolverError};
@@ -43,7 +43,7 @@ pub enum IndexServiceError {
#[error("Metastore error `{0}`.")]
MetastoreError(#[from] MetastoreError),
#[error("Split deletion error `{0}`.")]
- SplitDeletionError(#[from] SplitDeletionError),
+ SplitDeletionError(#[from] DeleteSplitsError),
#[error("Invalid config: {0:#}.")]
InvalidConfig(anyhow::Error),
#[error("Invalid identifier: {0}.")]
@@ -135,33 +135,29 @@ impl IndexService {
&self,
index_id: &str,
dry_run: bool,
- ) -> Result, IndexServiceError> {
+ ) -> Result, IndexServiceError> {
let index_metadata = self.metastore.index_metadata(index_id).await?;
let index_uid = index_metadata.index_uid.clone();
let index_uri = index_metadata.into_index_config().index_uri.clone();
let storage = self.storage_resolver.resolve(&index_uri).await?;
if dry_run {
- let all_splits = self
+ let splits_to_delete = self
.metastore
.list_all_splits(index_uid.clone())
.await?
.into_iter()
- .map(|metadata| metadata.split_metadata)
+ .map(|split| split.split_metadata.as_split_info())
.collect::>();
-
- let file_entries_to_delete: Vec =
- all_splits.iter().map(FileEntry::from).collect();
- return Ok(file_entries_to_delete);
+ return Ok(splits_to_delete);
}
-
// Schedule staged and published splits for deletion.
let query = ListSplitsQuery::for_index(index_uid.clone())
.with_split_states([SplitState::Staged, SplitState::Published]);
let splits = self.metastore.list_splits(query).await?;
let split_ids = splits
.iter()
- .map(|meta| meta.split_id())
+ .map(|split| split.split_id())
.collect::>();
self.metastore
.mark_splits_for_deletion(index_uid.clone(), &split_ids)
@@ -175,10 +171,10 @@ impl IndexService {
.list_splits(query)
.await?
.into_iter()
- .map(|metadata| metadata.split_metadata)
+ .map(|split| split.split_metadata)
.collect::>();
- let deleted_entries = delete_splits_with_files(
+ let deleted_splits = delete_splits_from_storage_and_metastore(
index_uid.clone(),
storage,
self.metastore.clone(),
@@ -187,7 +183,8 @@ impl IndexService {
)
.await?;
self.metastore.delete_index(index_uid).await?;
- Ok(deleted_entries)
+
+ Ok(deleted_splits)
}
/// Detect all dangling splits and associated files from the index and removes them.
@@ -251,7 +248,7 @@ impl IndexService {
.map(|split| split.split_metadata)
.collect();
// FIXME: return an error.
- if let Err(err) = delete_splits_with_files(
+ if let Err(err) = delete_splits_from_storage_and_metastore(
index_uid.clone(),
storage,
self.metastore.clone(),
diff --git a/quickwit/quickwit-core/src/lib.rs b/quickwit/quickwit-core/src/lib.rs
index fb6e0c81f12..b22e7abecfb 100644
--- a/quickwit/quickwit-core/src/lib.rs
+++ b/quickwit/quickwit-core/src/lib.rs
@@ -17,61 +17,6 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-#![deny(clippy::disallowed_methods)]
-
mod index;
pub use index::{clear_cache_directory, validate_storage_uri, IndexService, IndexServiceError};
-
-#[cfg(test)]
-mod tests {
- use std::path::Path;
-
- use quickwit_common::FileEntry;
- use quickwit_indexing::TestSandbox;
- use quickwit_storage::StorageResolver;
-
- use crate::IndexService;
-
- #[tokio::test]
- async fn test_file_entry_from_split_and_index_delete() -> anyhow::Result<()> {
- let index_id = "test-index";
- let doc_mapping_yaml = r#"
- field_mappings:
- - name: title
- type: text
- - name: body
- type: text
- - name: url
- type: text
- "#;
- let test_sandbox =
- TestSandbox::create(index_id, doc_mapping_yaml, "{}", &["title", "body"]).await?;
- test_sandbox.add_documents(vec![
- serde_json::json!({"title": "snoopy", "body": "Snoopy is an anthropomorphic beagle[5] in the comic strip...", "url": "http://snoopy"}),
- ]).await?;
- let splits = test_sandbox
- .metastore()
- .list_all_splits(test_sandbox.index_uid())
- .await?
- .into_iter()
- .map(|metadata| metadata.split_metadata)
- .collect::>();
- let file_entries: Vec = splits.iter().map(FileEntry::from).collect();
- assert_eq!(file_entries.len(), 1);
- for file_entry in file_entries {
- let split_num_bytes = test_sandbox
- .storage()
- .file_num_bytes(Path::new(file_entry.file_name.as_str()))
- .await?;
- assert_eq!(split_num_bytes, file_entry.file_size_in_bytes);
- }
- // Now delete the index.
- let index_service =
- IndexService::new(test_sandbox.metastore(), StorageResolver::unconfigured());
- let deleted_file_entries = index_service.delete_index(index_id, false).await?;
- assert_eq!(deleted_file_entries.len(), 1);
- test_sandbox.assert_quit().await;
- Ok(())
- }
-}
diff --git a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs
index 56afad8b896..ddfe2b79504 100644
--- a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs
+++ b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs
@@ -18,6 +18,7 @@
// along with this program. If not, see .
use std::collections::HashSet;
+use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
@@ -134,7 +135,7 @@ impl GarbageCollector {
let deleted_file_entries = match gc_res {
Ok(removal_info) => {
self.counters.num_successful_gc_run_on_index += 1;
- self.counters.num_failed_splits += removal_info.failed_split_ids.len();
+ self.counters.num_failed_splits += removal_info.failed_splits.len();
removal_info.removed_split_entries
}
Err(error) => {
@@ -145,9 +146,9 @@ impl GarbageCollector {
};
if !deleted_file_entries.is_empty() {
let num_deleted_splits = deleted_file_entries.len();
- let deleted_files: HashSet<&str> = deleted_file_entries
+ let deleted_files: HashSet<&Path> = deleted_file_entries
.iter()
- .map(|deleted_entry| deleted_entry.file_name.as_str())
+ .map(|deleted_entry| deleted_entry.file_name.as_path())
.take(5)
.collect();
info!(
@@ -160,7 +161,7 @@ impl GarbageCollector {
self.counters.num_deleted_files += deleted_file_entries.len();
self.counters.num_deleted_bytes += deleted_file_entries
.iter()
- .map(|entry| entry.file_size_in_bytes as usize)
+ .map(|entry| entry.file_size_bytes.get_bytes() as usize)
.sum::();
}
}
diff --git a/quickwit/quickwit-janitor/src/garbage_collection.rs b/quickwit/quickwit-janitor/src/garbage_collection.rs
index 2f09c185aa8..faecd89380e 100644
--- a/quickwit/quickwit-janitor/src/garbage_collection.rs
+++ b/quickwit/quickwit-janitor/src/garbage_collection.rs
@@ -18,34 +18,37 @@
// along with this program. If not, see .
use std::collections::HashMap;
-use std::path::Path;
+use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use futures::Future;
use quickwit_actors::ActorContext;
-use quickwit_common::{FileEntry, PrettySample};
-use quickwit_metastore::{ListSplitsQuery, Metastore, MetastoreError, SplitMetadata, SplitState};
+use quickwit_common::PrettySample;
+use quickwit_metastore::{
+ ListSplitsQuery, Metastore, MetastoreError, SplitInfo, SplitMetadata, SplitState,
+};
use quickwit_proto::IndexUid;
-use quickwit_storage::Storage;
+use quickwit_storage::{BulkDeleteError, Storage};
use thiserror::Error;
use time::OffsetDateTime;
use tracing::{error, instrument};
use crate::actors::GarbageCollector;
-/// The maximum number of splits that should be deleted in one go by the GC.
+/// The maximum number of splits that the GC should delete per attempt.
const DELETE_SPLITS_BATCH_SIZE: usize = 1000;
-/// SplitDeletionError denotes error that can happen when deleting split
-/// during garbage collection.
+/// [`DeleteSplitsError`] describes the errors that occurred during the deletion of splits from
+/// storage and metastore.
#[derive(Error, Debug)]
-pub enum SplitDeletionError {
- #[error("Failed to delete splits from metastore: '{error:?}'.")]
- MetastoreFailure {
- error: MetastoreError,
- failed_split_ids: Vec,
- },
+#[error("Failed to delete splits from storage and/or metastore.")]
+pub struct DeleteSplitsError {
+ successes: Vec,
+ storage_error: Option,
+ storage_failures: Vec,
+ metastore_error: Option,
+ metastore_failures: Vec,
}
async fn protect_future(
@@ -65,9 +68,9 @@ where
/// Information on what splits have and have not been cleaned up by the GC.
pub struct SplitRemovalInfo {
/// The set of splits that have been removed.
- pub removed_split_entries: Vec,
+ pub removed_split_entries: Vec,
/// The set of split ids that were attempted to be removed, but were unsuccessful.
- pub failed_split_ids: Vec,
+ pub failed_splits: Vec,
}
/// Detect all dangling splits and associated files from the index and removes them.
@@ -112,24 +115,24 @@ pub async fn run_garbage_collect(
let mut splits_marked_for_deletion = protect_future(ctx_opt, metastore.list_splits(query))
.await?
.into_iter()
- .map(|meta| meta.split_metadata)
+ .map(|split| split.split_metadata)
.collect::>();
splits_marked_for_deletion.extend(deletable_staged_splits);
- let candidate_entries: Vec = splits_marked_for_deletion
- .iter()
- .map(FileEntry::from)
+ let candidate_entries: Vec = splits_marked_for_deletion
+ .into_iter()
+ .map(|split| split.as_split_info())
.collect();
return Ok(SplitRemovalInfo {
removed_split_entries: candidate_entries,
- failed_split_ids: Vec::new(),
+ failed_splits: Vec::new(),
});
}
// Schedule all eligible staged splits for delete
let split_ids: Vec<&str> = deletable_staged_splits
.iter()
- .map(|meta| meta.split_id())
+ .map(|split| split.split_id())
.collect();
if !split_ids.is_empty() {
protect_future(
@@ -144,7 +147,7 @@ pub async fn run_garbage_collect(
let updated_before_timestamp =
OffsetDateTime::now_utc().unix_timestamp() - deletion_grace_period.as_secs() as i64;
- let deleted_files = delete_splits_marked_for_deletion(
+ let deleted_splits = delete_splits_marked_for_deletion(
index_uid,
updated_before_timestamp,
storage,
@@ -153,7 +156,7 @@ pub async fn run_garbage_collect(
)
.await;
- Ok(deleted_files)
+ Ok(deleted_splits)
}
#[instrument(skip(storage, metastore, ctx_opt))]
@@ -169,8 +172,9 @@ async fn delete_splits_marked_for_deletion(
metastore: Arc,
ctx_opt: Option<&ActorContext>,
) -> SplitRemovalInfo {
- let mut failed_split_ids = Vec::new();
- let mut removed_split_files = Vec::new();
+ let mut removed_splits = Vec::new();
+ let mut failed_splits = Vec::new();
+
loop {
let query = ListSplitsQuery::for_index(index_uid.clone())
.with_split_state(SplitState::MarkedForDeletion)
@@ -186,18 +190,17 @@ async fn delete_splits_marked_for_deletion(
break;
}
};
-
let splits_to_delete = splits_to_delete
.into_iter()
.map(|split| split.split_metadata)
.collect::>();
let num_splits_to_delete = splits_to_delete.len();
+
if num_splits_to_delete == 0 {
break;
}
-
- let delete_splits_result = delete_splits_with_files(
+ let delete_splits_result = delete_splits_from_storage_and_metastore(
index_uid.clone(),
storage.clone(),
metastore.clone(),
@@ -207,31 +210,20 @@ async fn delete_splits_marked_for_deletion(
.await;
match delete_splits_result {
- Ok(entries) => removed_split_files.extend(entries),
- Err(SplitDeletionError::MetastoreFailure {
- error,
- failed_split_ids: failed_split_ids_inner,
- }) => {
- error!(
- error=?error,
- index_id=%index_uid.index_id(),
- split_ids=?PrettySample::new(&failed_split_ids_inner, 5),
- "Failed to delete {} splits.",
- failed_split_ids_inner.len()
- );
- failed_split_ids.extend(failed_split_ids_inner);
+ Ok(entries) => removed_splits.extend(entries),
+ Err(delete_splits_error) => {
+ failed_splits.extend(delete_splits_error.storage_failures);
+ failed_splits.extend(delete_splits_error.metastore_failures);
break;
}
}
-
if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE {
break;
}
}
-
SplitRemovalInfo {
- removed_split_entries: removed_split_files,
- failed_split_ids,
+ removed_split_entries: removed_splits,
+ failed_splits,
}
}
@@ -243,91 +235,92 @@ async fn delete_splits_marked_for_deletion(
/// * `metastore` - The metastore managing the target index.
/// * `splits` - The list of splits to delete.
/// * `ctx_opt` - A context for reporting progress (only useful within quickwit actor).
-pub async fn delete_splits_with_files(
+pub async fn delete_splits_from_storage_and_metastore(
index_uid: IndexUid,
storage: Arc,
metastore: Arc,
splits: Vec,
ctx_opt: Option<&ActorContext>,
-) -> anyhow::Result, SplitDeletionError> {
- let mut paths_to_splits = HashMap::with_capacity(splits.len());
+) -> anyhow::Result, DeleteSplitsError> {
+ let mut split_infos: HashMap = HashMap::with_capacity(splits.len());
for split in splits {
- let file_entry = FileEntry::from(&split);
- let split_filename = quickwit_common::split_file(split.split_id());
- let split_path = Path::new(&split_filename);
-
- paths_to_splits.insert(
- split_path.to_path_buf(),
- (split.split_id().to_string(), file_entry),
- );
+ let split_info = split.as_split_info();
+ split_infos.insert(split_info.file_name.clone(), split_info);
}
-
- let paths = paths_to_splits
+ let split_paths = split_infos
.keys()
- .map(|key| key.as_path())
+ .map(|split_path_buf| split_path_buf.as_path())
.collect::>();
- let delete_result = storage.bulk_delete(&paths).await;
+ let delete_result = protect_future(ctx_opt, storage.bulk_delete(&split_paths)).await;
if let Some(ctx) = ctx_opt {
ctx.record_progress();
}
+ let mut storage_error: Option = None;
+ let mut storage_failures = Vec::new();
+
+ if let Err(bulk_delete_error) = delete_result {
+ storage_failures = bulk_delete_error
+ .failures
+ .keys()
+ .chain(&bulk_delete_error.unattempted)
+ .flat_map(|split_path| split_infos.remove(split_path))
+ .collect::>();
+ error!(
+ error=?bulk_delete_error.error,
+ index_id=index_uid.index_id(),
+ "Failed to delete split files {:?} from storage.",
+ PrettySample::new(&storage_failures, 5),
+ );
+ storage_error = Some(bulk_delete_error);
+ }
+ if !split_infos.is_empty() {
+ let split_ids: Vec<&str> = split_infos
+ .values()
+ .map(|split_info| split_info.split_id.as_str())
+ .collect();
+ let metastore_result = protect_future(
+ ctx_opt,
+ metastore.delete_splits(index_uid.clone(), &split_ids),
+ )
+ .await;
- let mut deleted_split_ids = Vec::new();
- let mut deleted_file_entries = Vec::new();
-
- match delete_result {
- Ok(()) => {
- for (split_id, entry) in paths_to_splits.into_values() {
- deleted_split_ids.push(split_id);
- deleted_file_entries.push(entry);
- }
- }
- Err(bulk_delete_error) => {
- let num_failed_splits =
- bulk_delete_error.failures.len() + bulk_delete_error.unattempted.len();
- let truncated_split_ids = bulk_delete_error
- .failures
- .keys()
- .chain(bulk_delete_error.unattempted.iter())
- .take(5)
- .collect::>();
-
+ if let Err(metastore_error) = metastore_result {
error!(
- error = ?bulk_delete_error.error,
- index_id = ?index_uid.index_id(),
- num_failed_splits = num_failed_splits,
- "Failed to delete {:?} and {} other splits.",
- truncated_split_ids, num_failed_splits,
+ error=?metastore_error,
+ index_id=index_uid.index_id(),
+ "Failed to delete split {:?} from the metastore.",
+ PrettySample::new(&split_ids, 5),
);
-
- for split_path in bulk_delete_error.successes {
- let (split_id, entry) = paths_to_splits
- .remove(&split_path)
- .expect("The successful split path should be present within the lookup table.");
-
- deleted_split_ids.push(split_id);
- deleted_file_entries.push(entry);
- }
+ let metastore_failures = split_infos.into_values().collect::>();
+ let delete_splits_error = DeleteSplitsError {
+ successes: Vec::new(),
+ storage_error,
+ storage_failures,
+ metastore_error: Some(metastore_error),
+ metastore_failures,
+ };
+ return Err(delete_splits_error);
}
- };
-
- if !deleted_split_ids.is_empty() {
- let split_ids: Vec<&str> = deleted_split_ids.iter().map(String::as_str).collect();
- protect_future(ctx_opt, metastore.delete_splits(index_uid, &split_ids))
- .await
- .map_err(|error| SplitDeletionError::MetastoreFailure {
- error,
- failed_split_ids: split_ids.into_iter().map(str::to_string).collect(),
- })?;
}
-
- Ok(deleted_file_entries)
+ let successes = split_infos.into_values().collect::>();
+
+ if !storage_failures.is_empty() {
+ let delete_splits_error = DeleteSplitsError {
+ successes,
+ storage_error,
+ storage_failures,
+ metastore_error: None,
+ metastore_failures: Vec::new(),
+ };
+ return Err(delete_splits_error);
+ }
+ Ok(successes)
}
#[cfg(test)]
mod tests {
- use std::sync::Arc;
use std::time::Duration;
use quickwit_config::IndexConfig;
@@ -335,8 +328,11 @@ mod tests {
metastore_for_test, ListSplitsQuery, MockMetastore, SplitMetadata, SplitState,
};
use quickwit_proto::IndexUid;
- use quickwit_storage::storage_for_test;
+ use quickwit_storage::{
+ storage_for_test, BulkDeleteError, DeleteFailure, MockStorage, PutPayload,
+ };
+ use super::*;
use crate::run_garbage_collect;
#[tokio::test]
@@ -483,4 +479,200 @@ mod tests {
.await
.unwrap();
}
+
+ #[tokio::test]
+ async fn test_delete_splits_from_storage_and_metastore_happy_path() {
+ let storage = storage_for_test();
+ let metastore = metastore_for_test();
+
+ let index_id = "test-delete-splits-happy--index";
+ let index_uri = format!("ram:///indexes/{index_id}");
+ let index_config = IndexConfig::for_test(index_id, &index_uri);
+ let index_uid = metastore.create_index(index_config).await.unwrap();
+
+ let split_id = "test-delete-splits-happy--split";
+ let split_metadata = SplitMetadata {
+ split_id: split_id.to_string(),
+ index_uid: IndexUid::new(index_id),
+ ..Default::default()
+ };
+ metastore
+ .stage_splits(index_uid.clone(), vec![split_metadata.clone()])
+ .await
+ .unwrap();
+ metastore
+ .mark_splits_for_deletion(index_uid.clone(), &[split_id])
+ .await
+ .unwrap();
+
+ let split_path_str = format!("{}.split", split_id);
+ let split_path = Path::new(&split_path_str);
+ let payload: Box = Box::new(vec![0]);
+ storage.put(split_path, payload).await.unwrap();
+ assert!(storage.exists(split_path).await.unwrap());
+
+ let splits = metastore.list_all_splits(index_uid.clone()).await.unwrap();
+ assert_eq!(splits.len(), 1);
+
+ let deleted_split_infos = delete_splits_from_storage_and_metastore(
+ index_uid.clone(),
+ storage.clone(),
+ metastore.clone(),
+ vec![split_metadata],
+ None,
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(deleted_split_infos.len(), 1);
+ assert_eq!(deleted_split_infos[0].split_id, split_id,);
+ assert_eq!(
+ deleted_split_infos[0].file_name,
+ Path::new(&format!("{split_id}.split"))
+ );
+ assert!(!storage.exists(split_path).await.unwrap());
+ assert!(metastore
+ .list_all_splits(index_uid)
+ .await
+ .unwrap()
+ .is_empty());
+ }
+
+ #[tokio::test]
+ async fn test_delete_splits_from_storage_and_metastore_storage_error() {
+ let mut mock_storage = MockStorage::new();
+ mock_storage
+ .expect_bulk_delete()
+ .return_once(|split_paths| {
+ assert_eq!(split_paths.len(), 2);
+
+ let split_path = split_paths[0].to_path_buf();
+ let successes = vec![split_path];
+
+ let split_path = split_paths[1].to_path_buf();
+ let delete_failure = DeleteFailure {
+ code: Some("AccessDenied".to_string()),
+ ..Default::default()
+ };
+ let failures = HashMap::from_iter([(split_path, delete_failure)]);
+ let bulk_delete_error = BulkDeleteError {
+ successes,
+ failures,
+ ..Default::default()
+ };
+ Err(bulk_delete_error)
+ });
+ let storage = Arc::new(mock_storage);
+ let metastore = metastore_for_test();
+
+ let index_id = "test-delete-splits-storage-error--index";
+ let index_uri = format!("ram:///indexes/{index_id}");
+ let index_config = IndexConfig::for_test(index_id, &index_uri);
+ let index_uid = metastore.create_index(index_config).await.unwrap();
+
+ let split_id_0 = "test-delete-splits-storage-error--split-0";
+ let split_metadata_0 = SplitMetadata {
+ split_id: split_id_0.to_string(),
+ index_uid: index_uid.clone(),
+ ..Default::default()
+ };
+ let split_id_1 = "test-delete-splits-storage-error--split-1";
+ let split_metadata_1 = SplitMetadata {
+ split_id: split_id_1.to_string(),
+ index_uid: index_uid.clone(),
+ ..Default::default()
+ };
+ metastore
+ .stage_splits(
+ index_uid.clone(),
+ vec![split_metadata_0.clone(), split_metadata_1.clone()],
+ )
+ .await
+ .unwrap();
+ metastore
+ .mark_splits_for_deletion(index_uid.clone(), &[split_id_0, split_id_1])
+ .await
+ .unwrap();
+
+ let error = delete_splits_from_storage_and_metastore(
+ index_uid.clone(),
+ storage.clone(),
+ metastore.clone(),
+ vec![split_metadata_0, split_metadata_1],
+ None,
+ )
+ .await
+ .unwrap_err();
+
+ assert_eq!(error.successes.len(), 1);
+ assert_eq!(error.storage_failures.len(), 1);
+ assert_eq!(error.metastore_failures.len(), 0);
+
+ let splits = metastore.list_all_splits(index_uid.clone()).await.unwrap();
+ assert_eq!(splits.len(), 1);
+ }
+
+ #[tokio::test]
+ async fn test_delete_splits_from_storage_and_metastore_metastore_error() {
+ let mut mock_storage = MockStorage::new();
+ mock_storage
+ .expect_bulk_delete()
+ .return_once(|split_paths| {
+ assert_eq!(split_paths.len(), 2);
+
+ let split_path = split_paths[0].to_path_buf();
+ let successes = vec![split_path];
+
+ let split_path = split_paths[1].to_path_buf();
+ let delete_failure = DeleteFailure {
+ code: Some("AccessDenied".to_string()),
+ ..Default::default()
+ };
+ let failures = HashMap::from_iter([(split_path, delete_failure)]);
+ let bulk_delete_error = BulkDeleteError {
+ successes,
+ failures,
+ ..Default::default()
+ };
+ Err(bulk_delete_error)
+ });
+ let storage = Arc::new(mock_storage);
+
+ let index_id = "test-delete-splits-storage-error--index";
+ let index_uid = IndexUid::new(index_id.to_string());
+
+ let mut mock_metastore = MockMetastore::new();
+ mock_metastore.expect_delete_splits().return_once(|_, _| {
+ Err(MetastoreError::IndexDoesNotExist {
+ index_id: index_id.to_string(),
+ })
+ });
+ let metastore = Arc::new(mock_metastore);
+
+ let split_id_0 = "test-delete-splits-storage-error--split-0";
+ let split_metadata_0 = SplitMetadata {
+ split_id: split_id_0.to_string(),
+ index_uid: index_uid.clone(),
+ ..Default::default()
+ };
+ let split_id_1 = "test-delete-splits-storage-error--split-1";
+ let split_metadata_1 = SplitMetadata {
+ split_id: split_id_1.to_string(),
+ index_uid: index_uid.clone(),
+ ..Default::default()
+ };
+ let error = delete_splits_from_storage_and_metastore(
+ index_uid.clone(),
+ storage.clone(),
+ metastore.clone(),
+ vec![split_metadata_0, split_metadata_1],
+ None,
+ )
+ .await
+ .unwrap_err();
+
+ assert!(error.successes.is_empty());
+ assert_eq!(error.storage_failures.len(), 1);
+ assert_eq!(error.metastore_failures.len(), 1);
+ }
}
diff --git a/quickwit/quickwit-janitor/src/lib.rs b/quickwit/quickwit-janitor/src/lib.rs
index 8dd576f9ec6..9f91b1ce6da 100644
--- a/quickwit/quickwit-janitor/src/lib.rs
+++ b/quickwit/quickwit-janitor/src/lib.rs
@@ -22,9 +22,8 @@
use std::sync::Arc;
use quickwit_actors::{Mailbox, Universe};
-use quickwit_common::FileEntry;
use quickwit_config::QuickwitConfig;
-use quickwit_metastore::Metastore;
+use quickwit_metastore::{Metastore, SplitInfo};
use quickwit_search::SearchJobPlacer;
use quickwit_storage::StorageResolver;
use tracing::info;
@@ -39,12 +38,13 @@ mod retention_policy_execution;
pub use janitor_service::JanitorService;
pub use self::garbage_collection::{
- delete_splits_with_files, run_garbage_collect, SplitDeletionError, SplitRemovalInfo,
+ delete_splits_from_storage_and_metastore, run_garbage_collect, DeleteSplitsError,
+ SplitRemovalInfo,
};
use crate::actors::{DeleteTaskService, GarbageCollector, RetentionPolicyExecutor};
#[derive(utoipa::OpenApi)]
-#[openapi(components(schemas(FileEntry)))]
+#[openapi(components(schemas(SplitInfo)))]
/// Schema used for the OpenAPI generation which are apart of this crate.
pub struct JanitorApiSchemas;
diff --git a/quickwit/quickwit-metastore/src/lib.rs b/quickwit/quickwit-metastore/src/lib.rs
index d648bc29801..788fea5df8f 100644
--- a/quickwit/quickwit-metastore/src/lib.rs
+++ b/quickwit/quickwit-metastore/src/lib.rs
@@ -56,7 +56,7 @@ pub use metastore_factory::{MetastoreFactory, UnsupportedMetastore};
pub use metastore_resolver::MetastoreResolver;
use quickwit_common::is_disjoint;
use quickwit_doc_mapper::tag_pruning::TagFilterAst;
-pub use split_metadata::{Split, SplitMaturity, SplitMetadata, SplitState};
+pub use split_metadata::{Split, SplitInfo, SplitMaturity, SplitMetadata, SplitState};
pub(crate) use split_metadata_version::{SplitMetadataV0_6, VersionedSplitMetadata};
#[derive(utoipa::OpenApi)]
diff --git a/quickwit/quickwit-metastore/src/split_metadata.rs b/quickwit/quickwit-metastore/src/split_metadata.rs
index 8f4b5060d89..8d5e653db85 100644
--- a/quickwit/quickwit-metastore/src/split_metadata.rs
+++ b/quickwit/quickwit-metastore/src/split_metadata.rs
@@ -20,10 +20,11 @@
use std::collections::BTreeSet;
use std::fmt;
use std::ops::{Range, RangeInclusive};
+use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;
-use quickwit_common::FileEntry;
+use byte_unit::Byte;
use quickwit_proto::IndexUid;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DurationMilliSeconds};
@@ -179,17 +180,38 @@ impl SplitMetadata {
..Default::default()
}
}
-}
-impl From<&SplitMetadata> for FileEntry {
- fn from(split: &SplitMetadata) -> Self {
- FileEntry {
- file_name: quickwit_common::split_file(split.split_id()),
- file_size_in_bytes: split.footer_offsets.end,
+ /// Converts the split metadata into a [`SplitInfo`].
+ pub fn as_split_info(&self) -> SplitInfo {
+ let file_name = quickwit_common::split_file(self.split_id());
+
+ SplitInfo {
+ uncompressed_docs_size_bytes: Byte::from_bytes(self.uncompressed_docs_size_in_bytes),
+ file_name: PathBuf::from(file_name),
+ file_size_bytes: Byte::from_bytes(self.footer_offsets.end),
+ split_id: self.split_id.clone(),
+ num_docs: self.num_docs,
}
}
}
+/// A summarized version of the split metadata for display purposes.
+#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
+pub struct SplitInfo {
+ /// The split ID.
+ pub split_id: String,
+ /// The number of documents in the split.
+ pub num_docs: usize,
+ /// The sum of the sizes of the original JSON payloads in bytes.
+ #[schema(value_type = u64)]
+ pub uncompressed_docs_size_bytes: Byte,
+ /// The name of the split file on disk.
+ pub file_name: PathBuf,
+ /// The size of the split file on disk in bytes.
+ #[schema(value_type = u64)]
+ pub file_size_bytes: Byte,
+}
+
#[cfg(any(test, feature = "testsuite"))]
impl quickwit_config::TestableForRegression for SplitMetadata {
fn sample_for_regression() -> Self {
diff --git a/quickwit/quickwit-rest-client/src/rest_client.rs b/quickwit/quickwit-rest-client/src/rest_client.rs
index 5ea52f1141b..e4b02f27cc6 100644
--- a/quickwit/quickwit-rest-client/src/rest_client.rs
+++ b/quickwit/quickwit-rest-client/src/rest_client.rs
@@ -21,11 +21,10 @@ use std::time::Duration;
use bytes::Bytes;
use quickwit_cluster::ClusterSnapshot;
-use quickwit_common::FileEntry;
use quickwit_config::{ConfigFormat, SourceConfig};
use quickwit_indexing::actors::IndexingServiceCounters;
pub use quickwit_ingest::CommitType;
-use quickwit_metastore::{IndexMetadata, Split};
+use quickwit_metastore::{IndexMetadata, Split, SplitInfo};
use quickwit_search::SearchResponseRest;
use quickwit_serve::{ListSplitsQueryParams, SearchRequestQueryString};
use reqwest::header::{HeaderMap, HeaderValue, CONTENT_TYPE};
@@ -374,7 +373,7 @@ impl<'a> IndexClient<'a> {
Ok(())
}
- pub async fn delete(&self, index_id: &str, dry_run: bool) -> Result, Error> {
+ pub async fn delete(&self, index_id: &str, dry_run: bool) -> Result, Error> {
let path = format!("indexes/{index_id}");
let response = self
.transport
@@ -933,10 +932,13 @@ mod test {
Mock::given(method("DELETE"))
.and(path("/api/v1/indexes/my-index"))
.and(query_param("dry_run", "true"))
- .respond_with(
- ResponseTemplate::new(StatusCode::OK)
- .set_body_json(json!([{"file_name": "filename", "file_size_in_bytes": 100}])),
- )
+ .respond_with(ResponseTemplate::new(StatusCode::OK).set_body_json(json!([{
+ "split_id": "my-split",
+ "num_docs": 1,
+ "uncompressed_docs_size_bytes": 1024,
+ "file_name": "my-split.split",
+ "file_size_bytes": 128,
+ }])))
.up_to_n_times(1)
.mount(&mock_server)
.await;
diff --git a/quickwit/quickwit-serve/src/index_api/rest_handler.rs b/quickwit/quickwit-serve/src/index_api/rest_handler.rs
index 093d2c39a20..bb7497ed771 100644
--- a/quickwit/quickwit-serve/src/index_api/rest_handler.rs
+++ b/quickwit/quickwit-serve/src/index_api/rest_handler.rs
@@ -22,7 +22,6 @@ use std::sync::Arc;
use bytes::Bytes;
use hyper::header::CONTENT_TYPE;
use quickwit_common::uri::Uri;
-use quickwit_common::FileEntry;
use quickwit_config::{
load_source_config_from_user_config, ConfigFormat, QuickwitConfig, SourceConfig, SourceParams,
CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID,
@@ -30,7 +29,7 @@ use quickwit_config::{
use quickwit_core::{IndexService, IndexServiceError};
use quickwit_doc_mapper::{analyze_text, TokenizerConfig};
use quickwit_metastore::{
- IndexMetadata, ListSplitsQuery, Metastore, MetastoreError, Split, SplitState,
+ IndexMetadata, ListSplitsQuery, Metastore, MetastoreError, Split, SplitInfo, SplitState,
};
use quickwit_proto::IndexUid;
use serde::de::DeserializeOwned;
@@ -503,7 +502,7 @@ async fn delete_index(
index_id: String,
delete_index_query_param: DeleteIndexQueryParam,
index_service: Arc,
-) -> Result, IndexServiceError> {
+) -> Result, IndexServiceError> {
info!(index_id = %index_id, dry_run = delete_index_query_param.dry_run, "delete-index");
index_service
.delete_index(&index_id, delete_index_query_param.dry_run)
@@ -1162,7 +1161,7 @@ mod tests {
let resp_json: serde_json::Value = serde_json::from_slice(resp.body()).unwrap();
let expected_response_json = serde_json::json!([{
"file_name": "split_1.split",
- "file_size_in_bytes": 800,
+ "file_size": 800,
}]);
assert_json_include!(actual: resp_json, expected: expected_response_json);
}
@@ -1176,7 +1175,7 @@ mod tests {
let resp_json: serde_json::Value = serde_json::from_slice(resp.body()).unwrap();
let expected_response_json = serde_json::json!([{
"file_name": "split_1.split",
- "file_size_in_bytes": 800,
+ "file_size": 800,
}]);
assert_json_include!(actual: resp_json, expected: expected_response_json);
}
diff --git a/quickwit/quickwit-storage/src/bundle_storage.rs b/quickwit/quickwit-storage/src/bundle_storage.rs
index 9966a0efbda..cf02cf2b94b 100644
--- a/quickwit/quickwit-storage/src/bundle_storage.rs
+++ b/quickwit/quickwit-storage/src/bundle_storage.rs
@@ -36,8 +36,10 @@ use thiserror::Error;
use tokio::io::AsyncWriteExt;
use tracing::error;
-use crate::storage::{BulkDeleteError, SendableAsync};
-use crate::{OwnedBytes, Storage, StorageError, StorageResult, VersionedComponent};
+use crate::storage::SendableAsync;
+use crate::{
+ BulkDeleteError, OwnedBytes, Storage, StorageError, StorageResult, VersionedComponent,
+};
/// BundleStorage bundles together multiple files into a single file.
/// with some metadata
diff --git a/quickwit/quickwit-storage/src/cache/storage_with_cache.rs b/quickwit/quickwit-storage/src/cache/storage_with_cache.rs
index 849964bf4fb..5ca2b1a427f 100644
--- a/quickwit/quickwit-storage/src/cache/storage_with_cache.rs
+++ b/quickwit/quickwit-storage/src/cache/storage_with_cache.rs
@@ -26,8 +26,8 @@ use async_trait::async_trait;
use quickwit_common::uri::Uri;
use crate::cache::Cache;
-use crate::storage::{BulkDeleteError, SendableAsync};
-use crate::{OwnedBytes, Storage, StorageResult};
+use crate::storage::SendableAsync;
+use crate::{BulkDeleteError, OwnedBytes, Storage, StorageResult};
/// Use with care, StorageWithCache is read-only.
pub struct StorageWithCache {
diff --git a/quickwit/quickwit-storage/src/debouncer.rs b/quickwit/quickwit-storage/src/debouncer.rs
index 4a3b5484e98..035662cbe33 100644
--- a/quickwit/quickwit-storage/src/debouncer.rs
+++ b/quickwit/quickwit-storage/src/debouncer.rs
@@ -30,8 +30,8 @@ use futures::{Future, FutureExt};
use quickwit_common::uri::Uri;
use tantivy::directory::OwnedBytes;
-use crate::storage::{BulkDeleteError, SendableAsync};
-use crate::{Storage, StorageResult};
+use crate::storage::SendableAsync;
+use crate::{BulkDeleteError, Storage, StorageResult};
/// The AsyncDebouncer debounces inflight Futures, so that concurrent async request to the same data
/// source can be deduplicated.
diff --git a/quickwit/quickwit-storage/src/error.rs b/quickwit/quickwit-storage/src/error.rs
index dbe23a0183c..f5a0b6f7a95 100644
--- a/quickwit/quickwit-storage/src/error.rs
+++ b/quickwit/quickwit-storage/src/error.rs
@@ -17,6 +17,8 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
+use std::collections::HashMap;
+use std::path::PathBuf;
use std::sync::Arc;
use std::{fmt, io};
@@ -144,3 +146,58 @@ impl From for StorageError {
}
}
}
+
+/// Error returned by `bulk_delete`. Under the hood, `bulk_delete` groups the files to
+/// delete into multiple batches of fixed size and issues one delete objects request per batch. The
+/// whole operation can fail in multiples ways, which is reflected by the quirkiness of the API of
+/// [`BulkDeleteError`]. First, a batch can fail partially, i.e. some objects are deleted while
+/// others are not. The `successes` and `failures` attributes of the error will be populated
+/// accordingly. Second, a batch can fail completely, in which case the `error` field will be set.
+/// Because a batch failing entirely usually indicates a systemic error, for instance, a connection
+/// or credentials issue, `bulk_delete` does not attempt to delete the remaining batches and
+/// populates the `unattempted` attribute. Consequently, the attributes of this error are not
+/// "mutually exclusive": there exists a path where all those fields are not empty. The caller is
+/// expected to handle this error carefully and inspect the instance thoroughly before any retry
+/// attempt.
+#[must_use]
+#[derive(Debug, Default, thiserror::Error)]
+pub struct BulkDeleteError {
+ /// Error that occurred for a whole batch and caused the entire deletion operation to be
+ /// aborted.
+ pub error: Option,
+ /// List of files that were successfully deleted, including non-existing files.
+ pub successes: Vec,
+ /// List of files that failed to be deleted along with the corresponding failure descriptions.
+ pub failures: HashMap,
+ /// List of remaining files to delete before the operation was aborted.
+ pub unattempted: Vec,
+}
+
+/// Describes the failure for an individual file in a batch delete operation.
+#[derive(Debug, Default)]
+pub struct DeleteFailure {
+ /// The error that occurred for this file.
+ pub error: Option,
+ /// The failure code is a string that uniquely identifies an error condition. It is meant to be
+ /// read and understood by programs that detect and handle errors by type.
+ pub code: Option,
+ /// The error message contains a generic description of the error condition in English. It is
+ /// intended for a human audience. Simple programs display the message directly to the end user
+ /// if they encounter an error condition they don't know how or don't care to handle.
+ /// Sophisticated programs with more exhaustive error handling and proper internationalization
+ /// are more likely to ignore the error message.
+ pub message: Option,
+}
+
+impl fmt::Display for BulkDeleteError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(
+ f,
+ "Bulk delete error ({} success(es), {} failure(s), {} unattempted)",
+ self.successes.len(),
+ self.failures.len(),
+ self.unattempted.len()
+ )?;
+ Ok(())
+ }
+}
diff --git a/quickwit/quickwit-storage/src/lib.rs b/quickwit/quickwit-storage/src/lib.rs
index cc4fa32b1aa..90ba89d3fca 100644
--- a/quickwit/quickwit-storage/src/lib.rs
+++ b/quickwit/quickwit-storage/src/lib.rs
@@ -82,7 +82,10 @@ pub use self::test_suite::{
storage_test_multi_part_upload, storage_test_single_part_upload, storage_test_suite,
test_write_and_bulk_delete,
};
-pub use crate::error::{StorageError, StorageErrorKind, StorageResolverError, StorageResult};
+pub use crate::error::{
+ BulkDeleteError, DeleteFailure, StorageError, StorageErrorKind, StorageResolverError,
+ StorageResult,
+};
/// Loads an entire local or remote file into memory.
pub async fn load_file(
diff --git a/quickwit/quickwit-storage/src/local_file_storage.rs b/quickwit/quickwit-storage/src/local_file_storage.rs
index 025a6e1fb2a..4a88a42a0ca 100644
--- a/quickwit/quickwit-storage/src/local_file_storage.rs
+++ b/quickwit/quickwit-storage/src/local_file_storage.rs
@@ -34,10 +34,10 @@ use tokio::fs;
use tokio::io::AsyncWriteExt;
use tracing::warn;
-use crate::storage::{BulkDeleteError, DeleteFailure, SendableAsync};
+use crate::storage::SendableAsync;
use crate::{
- DebouncedStorage, OwnedBytes, Storage, StorageError, StorageErrorKind, StorageFactory,
- StorageResolverError, StorageResult,
+ BulkDeleteError, DebouncedStorage, DeleteFailure, OwnedBytes, Storage, StorageError,
+ StorageErrorKind, StorageFactory, StorageResolverError, StorageResult,
};
/// File system compatible storage implementation.
diff --git a/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs b/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs
index 99c215a3c1a..2a6c8e42e23 100644
--- a/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs
+++ b/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs
@@ -48,10 +48,10 @@ use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::{instrument, warn};
use crate::debouncer::DebouncedStorage;
-use crate::storage::{BulkDeleteError, DeleteFailure, SendableAsync};
+use crate::storage::SendableAsync;
use crate::{
- MultiPartPolicy, PutPayload, Storage, StorageError, StorageErrorKind, StorageFactory,
- StorageResolverError, StorageResult, STORAGE_METRICS,
+ BulkDeleteError, DeleteFailure, MultiPartPolicy, PutPayload, Storage, StorageError,
+ StorageErrorKind, StorageFactory, StorageResolverError, StorageResult, STORAGE_METRICS,
};
/// Azure object storage resolver.
diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs
index 726bae47874..e22ee01dfef 100644
--- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs
+++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs
@@ -45,10 +45,10 @@ use tokio::sync::Semaphore;
use tracing::{info, instrument, warn};
use crate::object_storage::MultiPartPolicy;
-use crate::storage::{BulkDeleteError, DeleteFailure, SendableAsync};
+use crate::storage::SendableAsync;
use crate::{
- OwnedBytes, Storage, StorageError, StorageErrorKind, StorageResolverError, StorageResult,
- STORAGE_METRICS,
+ BulkDeleteError, DeleteFailure, OwnedBytes, Storage, StorageError, StorageErrorKind,
+ StorageResolverError, StorageResult, STORAGE_METRICS,
};
/// Semaphore to limit the number of concurent requests to the object store. Some object stores
diff --git a/quickwit/quickwit-storage/src/prefix_storage.rs b/quickwit/quickwit-storage/src/prefix_storage.rs
index 7955353bf38..37084361897 100644
--- a/quickwit/quickwit-storage/src/prefix_storage.rs
+++ b/quickwit/quickwit-storage/src/prefix_storage.rs
@@ -25,8 +25,8 @@ use std::sync::Arc;
use async_trait::async_trait;
use quickwit_common::uri::Uri;
-use crate::storage::{BulkDeleteError, SendableAsync};
-use crate::{OwnedBytes, Storage};
+use crate::storage::SendableAsync;
+use crate::{BulkDeleteError, OwnedBytes, Storage};
/// This storage acts as a proxy to another storage that simply modifies each API call
/// by preceding each path with a given a prefix.
@@ -180,7 +180,7 @@ mod tests {
use std::collections::HashMap;
use super::*;
- use crate::storage::DeleteFailure;
+ use crate::DeleteFailure;
#[test]
fn test_strip_prefix_from_error() {
diff --git a/quickwit/quickwit-storage/src/ram_storage.rs b/quickwit/quickwit-storage/src/ram_storage.rs
index c2bd99ab8b7..d1d88794e13 100644
--- a/quickwit/quickwit-storage/src/ram_storage.rs
+++ b/quickwit/quickwit-storage/src/ram_storage.rs
@@ -30,9 +30,10 @@ use tokio::io::AsyncWriteExt;
use tokio::sync::RwLock;
use crate::prefix_storage::add_prefix_to_storage;
-use crate::storage::{BulkDeleteError, SendableAsync};
+use crate::storage::SendableAsync;
use crate::{
- OwnedBytes, Storage, StorageErrorKind, StorageFactory, StorageResolverError, StorageResult,
+ BulkDeleteError, OwnedBytes, Storage, StorageErrorKind, StorageFactory, StorageResolverError,
+ StorageResult,
};
/// In Ram implementation of quickwit's storage.
diff --git a/quickwit/quickwit-storage/src/storage.rs b/quickwit/quickwit-storage/src/storage.rs
index 3f9057fb506..12ba5604a50 100644
--- a/quickwit/quickwit-storage/src/storage.rs
+++ b/quickwit/quickwit-storage/src/storage.rs
@@ -17,16 +17,15 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-use std::collections::HashMap;
use std::fmt;
use std::ops::Range;
-use std::path::{Path, PathBuf};
+use std::path::Path;
use async_trait::async_trait;
use quickwit_common::uri::Uri;
use tokio::io::AsyncWrite;
-use crate::{OwnedBytes, PutPayload, StorageError, StorageErrorKind, StorageResult};
+use crate::{BulkDeleteError, OwnedBytes, PutPayload, StorageErrorKind, StorageResult};
/// This trait is only used to make it build trait object with `AsyncWrite + Send + Unpin`.
pub trait SendableAsync: AsyncWrite + Send + Unpin {}
@@ -126,51 +125,3 @@ pub trait Storage: fmt::Debug + Send + Sync + 'static {
/// Returns an URI identifying the storage
fn uri(&self) -> &Uri;
}
-
-/// Error returned by `bulk_delete`. Under the hood, `bulk_delete` groups the files to
-/// delete into multiple batches of fixed size and issues one delete objects request per batch. The
-/// whole operation can fail in multiples ways, which is reflected by the quirckiness of the API of
-/// [`BulkDeleteError`]. First, a batch can fail partially, i.e. some objects are deleted while
-/// others are not. The `successes` and `failures` attributes of the error will be populated
-/// accordingly. Second, a batch can fail completely, in which case the `error` field will be set.
-/// Because a batch failing entirely usually indicates a systemic error, for instance, a connection
-/// or credentials issue, `bulk_delete` does not attempt to delete the remaining batches and
-/// populates the `unattempted` attribute. Consequently, the attributes of this error are not
-/// "mutually exclusive": there exists a path where all those fields are not empty. The caller is
-/// expected to handle this error carefully and inspect the instance thoroughly before any retry
-/// attempt.
-#[must_use]
-#[derive(Debug, Default, thiserror::Error)]
-pub struct BulkDeleteError {
- /// Error that occurred for a whole batch and caused the entire deletion operation to be
- /// aborted.
- pub error: Option,
- /// List of files that were successfully deleted, including non-existing files.
- pub successes: Vec,
- /// List of files that failed to be deleted along with the corresponding failure descriptions.
- pub failures: HashMap,
- /// List of remaining files to delete before the operation was aborted.
- pub unattempted: Vec,
-}
-
-#[derive(Debug, Default)]
-pub struct DeleteFailure {
- pub error: Option,
- /// The failure code is a string that uniquely identifies an error condition. It is meant to be
- /// read and understood by programs that detect and handle errors by type.
- pub code: Option,
- pub message: Option,
-}
-
-impl fmt::Display for BulkDeleteError {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(
- f,
- "Bulk delete error ({} success(es), {} failure(s), {} unattempted)",
- self.successes.len(),
- self.failures.len(),
- self.unattempted.len()
- )?;
- Ok(())
- }
-}