From 667f54d41e4cc89f68073da6f81b2fd4af296448 Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Fri, 14 Jul 2023 07:29:07 +0900 Subject: [PATCH] Do not ignore storage errors silently when deleting splits --- docs/reference/rest-api.md | 21 +- quickwit/quickwit-cli/src/index.rs | 4 +- quickwit/quickwit-cli/src/tool.rs | 20 +- quickwit/quickwit-common/src/file_entry.rs | 29 -- quickwit/quickwit-common/src/lib.rs | 2 - quickwit/quickwit-core/Cargo.toml | 3 + quickwit/quickwit-core/src/index.rs | 127 +++++- quickwit/quickwit-core/src/lib.rs | 55 --- .../src/actors/garbage_collector.rs | 9 +- .../src/garbage_collection.rs | 409 +++++++++++++----- quickwit/quickwit-janitor/src/lib.rs | 8 +- quickwit/quickwit-metastore/src/lib.rs | 2 +- .../quickwit-metastore/src/split_metadata.rs | 37 +- .../quickwit-rest-client/src/rest_client.rs | 16 +- .../src/index_api/rest_handler.rs | 9 +- .../quickwit-storage/src/bundle_storage.rs | 6 +- .../src/cache/storage_with_cache.rs | 4 +- quickwit/quickwit-storage/src/debouncer.rs | 4 +- quickwit/quickwit-storage/src/error.rs | 57 +++ quickwit/quickwit-storage/src/lib.rs | 5 +- .../src/local_file_storage.rs | 6 +- .../src/object_storage/azure_blob_storage.rs | 6 +- .../object_storage/s3_compatible_storage.rs | 6 +- .../quickwit-storage/src/prefix_storage.rs | 6 +- quickwit/quickwit-storage/src/ram_storage.rs | 5 +- quickwit/quickwit-storage/src/storage.rs | 53 +-- 26 files changed, 581 insertions(+), 328 deletions(-) delete mode 100644 quickwit/quickwit-common/src/file_entry.rs diff --git a/docs/reference/rest-api.md b/docs/reference/rest-api.md index 3f847477b94..1af3556d00f 100644 --- a/docs/reference/rest-api.md +++ b/docs/reference/rest-api.md @@ -24,7 +24,7 @@ GET [..]/search?query=barack%20obama Successful requests return a 2xx HTTP status code. -Failed requests return a 4xx HTTP status code. The response body of failed requests holds a JSON object containing an `message` field that describes the error. +Failed requests return a 4xx HTTP status code. The response body of failed requests holds a JSON object containing a `message` field that describes the error. ```json { @@ -37,7 +37,7 @@ Failed requests return a 4xx HTTP status code. The response body of failed reque ### Search in an index Search for documents matching a query in the given index `api/v1//search`. This endpoint is available as long as you have at least one node running a searcher service in the cluster. -The search endpoint accepts `GET` and `POST` requests. The [parameters](#get-parameters) are URL parameters in case of `GET` or JSON key value pairs in case of `POST`. +The search endpoint accepts `GET` and `POST` requests. The [parameters](#get-parameters) are URL parameters for `GET` requests or JSON key-value pairs for `POST` requests. ``` GET api/v1//search?query=searchterm @@ -91,10 +91,10 @@ The response is a JSON object, and the content type is `application/json; charse GET api/v1//search/stream?query=searchterm&fast_field=my_id ``` -Streams field values from ALL documents matching a search query in the given index ``, in a specified output format among the following: +Streams field values from ALL documents matching a search query in the target index ``, in a specified output format among the following: - [CSV](https://datatracker.ietf.org/doc/html/rfc4180) -- [ClickHouse RowBinary](https://clickhouse.tech/docs/en/interfaces/formats/#rowbinary). If `partition_by_field` is set, Quickwit returns chunks of data for a each partition field value. Each chunk starts with 16 bytes being partition value and content length and then the `fast_field` values in `RowBinary` format. +- [ClickHouse RowBinary](https://clickhouse.tech/docs/en/interfaces/formats/#rowbinary). If `partition_by_field` is set, Quickwit returns chunks of data for each partition field value. Each chunk starts with 16 bytes being partition value and content length and then the `fast_field` values in `RowBinary` format. `fast_field` and `partition_by_field` must be fast fields of type `i64` or `u64`. @@ -104,7 +104,7 @@ This endpoint is available as long as you have at least one node running a searc :::note -The endpoint will return 10 million values if 10 million documents match the query. This is expected, this endpoint is made to support queries matching millions of document and return field values in a reasonable response time. +The endpoint will return 10 million values if 10 million documents match the query. This is expected, this endpoint is made to support queries matching millions of documents and return field values in a reasonable response time. ::: @@ -345,24 +345,27 @@ Delete index of ID `index id`. #### Response -The response is the list of delete split files, and the content type is `application/json; charset=UTF-8.` +The response is the list of deleted split files; the content type is `application/json; charset=UTF-8.` ```json [ { + "split_id": "01GK1XNAECH7P14850S9VV6P94", + "num_docs": 1337, + "uncompressed_docs_size_bytes": 23933408, "file_name": "01GK1XNAECH7P14850S9VV6P94.split", - "file_size_in_bytes": 2991676 + "file_size_bytes": 2991676 } ] ``` -### Get all indexes metadatas +### Get all indexes metadata ``` GET api/v1/indexes ``` -Get the indexes metadatas of all indexes present in the metastore. +Retrieve the metadata of all indexes present in the metastore. #### Response diff --git a/quickwit/quickwit-cli/src/index.rs b/quickwit/quickwit-cli/src/index.rs index 69c621a4b28..feff585db26 100644 --- a/quickwit/quickwit-cli/src/index.rs +++ b/quickwit/quickwit-cli/src/index.rs @@ -891,8 +891,8 @@ pub async fn delete_index_cli(args: DeleteIndexArgs) -> anyhow::Result<()> { "The following files will be removed from the index `{}`", args.index_id ); - for file_entry in affected_files { - println!(" - {}", file_entry.file_name); + for split_info in affected_files { + println!(" - {}", split_info.file_name.display()); } return Ok(()); } diff --git a/quickwit/quickwit-cli/src/tool.rs b/quickwit/quickwit-cli/src/tool.rs index c3cd2a6b141..529fbe010b0 100644 --- a/quickwit/quickwit-cli/src/tool.rs +++ b/quickwit/quickwit-cli/src/tool.rs @@ -510,47 +510,47 @@ pub async fn garbage_collect_index_cli(args: GarbageCollectIndexArgs) -> anyhow: let removal_info = index_service .garbage_collect_index(&args.index_id, args.grace_period, args.dry_run) .await?; - if removal_info.removed_split_entries.is_empty() && removal_info.failed_split_ids.is_empty() { + if removal_info.removed_split_entries.is_empty() && removal_info.failed_splits.is_empty() { println!("No dangling files to garbage collect."); return Ok(()); } if args.dry_run { println!("The following files will be garbage collected."); - for file_entry in removal_info.removed_split_entries { - println!(" - {}", file_entry.file_name); + for split_info in removal_info.removed_split_entries { + println!(" - {}", split_info.file_name.display()); } return Ok(()); } - if !removal_info.failed_split_ids.is_empty() { + if !removal_info.failed_splits.is_empty() { println!("The following splits were attempted to be removed, but failed."); - for split_id in removal_info.failed_split_ids.iter() { - println!(" - {split_id}"); + for split_info in &removal_info.failed_splits { + println!(" - {}", split_info.split_id); } println!( "{} Splits were unable to be removed.", - removal_info.failed_split_ids.len() + removal_info.failed_splits.len() ); } let deleted_bytes: u64 = removal_info .removed_split_entries .iter() - .map(|entry| entry.file_size_in_bytes) + .map(|split_info| split_info.file_size_bytes.get_bytes()) .sum(); println!( "{}MB of storage garbage collected.", deleted_bytes / 1_000_000 ); - if removal_info.failed_split_ids.is_empty() { + if removal_info.failed_splits.is_empty() { println!( "{} Index successfully garbage collected.", "✔".color(GREEN_COLOR) ); } else if removal_info.removed_split_entries.is_empty() - && !removal_info.failed_split_ids.is_empty() + && !removal_info.failed_splits.is_empty() { println!("{} Failed to garbage collect index.", "✘".color(RED_COLOR)); } else { diff --git a/quickwit/quickwit-common/src/file_entry.rs b/quickwit/quickwit-common/src/file_entry.rs deleted file mode 100644 index 445c0a5c0fb..00000000000 --- a/quickwit/quickwit-common/src/file_entry.rs +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright (C) 2023 Quickwit, Inc. -// -// Quickwit is offered under the AGPL v3.0 and as commercial software. -// For commercial licensing, contact us at hello@quickwit.io. -// -// AGPL: -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use serde::{Deserialize, Serialize}; - -#[allow(missing_docs)] -#[derive(Clone, Debug, Deserialize, Serialize, utoipa::ToSchema)] -pub struct FileEntry { - /// The file_name is a file name, within an index directory. - pub file_name: String, - /// File size in bytes. - pub file_size_in_bytes: u64, //< TODO switch to `byte_unit::Byte`. -} diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs index 9ee3d285aa8..5b390432416 100644 --- a/quickwit/quickwit-common/src/lib.rs +++ b/quickwit/quickwit-common/src/lib.rs @@ -22,7 +22,6 @@ mod coolid; pub mod binary_heap; -mod file_entry; pub mod fs; pub mod io; mod kill_switch; @@ -49,7 +48,6 @@ use std::ops::{Range, RangeInclusive}; use std::str::FromStr; pub use coolid::new_coolid; -pub use file_entry::FileEntry; pub use kill_switch::KillSwitch; pub use progress::{Progress, ProtectedZoneGuard}; pub use stream_utils::{BoxStream, ServiceStream}; diff --git a/quickwit/quickwit-core/Cargo.toml b/quickwit/quickwit-core/Cargo.toml index a76bc0cb7ad..bf00ea9fc97 100644 --- a/quickwit/quickwit-core/Cargo.toml +++ b/quickwit/quickwit-core/Cargo.toml @@ -39,3 +39,6 @@ quickwit-storage = { workspace = true } [dev-dependencies] mockall = { workspace = true } serde_yaml = { workspace = true } + +quickwit-common = { workspace = true, features = ["testsuite"] } +quickwit-metastore = { workspace = true, features = ["testsuite"] } diff --git a/quickwit/quickwit-core/src/index.rs b/quickwit/quickwit-core/src/index.rs index 3b18ad8de3a..8047ef40981 100644 --- a/quickwit/quickwit-core/src/index.rs +++ b/quickwit/quickwit-core/src/index.rs @@ -22,14 +22,14 @@ use std::sync::Arc; use std::time::Duration; use quickwit_common::fs::{empty_dir, get_cache_directory_path}; -use quickwit_common::FileEntry; use quickwit_config::{validate_identifier, IndexConfig, SourceConfig}; use quickwit_indexing::check_source_connectivity; use quickwit_janitor::{ - delete_splits_with_files, run_garbage_collect, SplitDeletionError, SplitRemovalInfo, + delete_splits_from_storage_and_metastore, run_garbage_collect, DeleteSplitsError, + SplitRemovalInfo, }; use quickwit_metastore::{ - IndexMetadata, ListSplitsQuery, Metastore, MetastoreError, SplitMetadata, SplitState, + IndexMetadata, ListSplitsQuery, Metastore, MetastoreError, SplitInfo, SplitMetadata, SplitState, }; use quickwit_proto::{IndexUid, ServiceError, ServiceErrorCode}; use quickwit_storage::{StorageResolver, StorageResolverError}; @@ -43,7 +43,7 @@ pub enum IndexServiceError { #[error("Metastore error `{0}`.")] MetastoreError(#[from] MetastoreError), #[error("Split deletion error `{0}`.")] - SplitDeletionError(#[from] SplitDeletionError), + SplitDeletionError(#[from] DeleteSplitsError), #[error("Invalid config: {0:#}.")] InvalidConfig(anyhow::Error), #[error("Invalid identifier: {0}.")] @@ -135,33 +135,29 @@ impl IndexService { &self, index_id: &str, dry_run: bool, - ) -> Result, IndexServiceError> { + ) -> Result, IndexServiceError> { let index_metadata = self.metastore.index_metadata(index_id).await?; let index_uid = index_metadata.index_uid.clone(); let index_uri = index_metadata.into_index_config().index_uri.clone(); let storage = self.storage_resolver.resolve(&index_uri).await?; if dry_run { - let all_splits = self + let splits_to_delete = self .metastore .list_all_splits(index_uid.clone()) .await? .into_iter() - .map(|metadata| metadata.split_metadata) + .map(|split| split.split_metadata.as_split_info()) .collect::>(); - - let file_entries_to_delete: Vec = - all_splits.iter().map(FileEntry::from).collect(); - return Ok(file_entries_to_delete); + return Ok(splits_to_delete); } - // Schedule staged and published splits for deletion. let query = ListSplitsQuery::for_index(index_uid.clone()) .with_split_states([SplitState::Staged, SplitState::Published]); let splits = self.metastore.list_splits(query).await?; let split_ids = splits .iter() - .map(|meta| meta.split_id()) + .map(|split| split.split_id()) .collect::>(); self.metastore .mark_splits_for_deletion(index_uid.clone(), &split_ids) @@ -175,10 +171,10 @@ impl IndexService { .list_splits(query) .await? .into_iter() - .map(|metadata| metadata.split_metadata) + .map(|split| split.split_metadata) .collect::>(); - let deleted_entries = delete_splits_with_files( + let deleted_splits = delete_splits_from_storage_and_metastore( index_uid.clone(), storage, self.metastore.clone(), @@ -187,7 +183,8 @@ impl IndexService { ) .await?; self.metastore.delete_index(index_uid).await?; - Ok(deleted_entries) + + Ok(deleted_splits) } /// Detect all dangling splits and associated files from the index and removes them. @@ -251,7 +248,7 @@ impl IndexService { .map(|split| split.split_metadata) .collect(); // FIXME: return an error. - if let Err(err) = delete_splits_with_files( + if let Err(err) = delete_splits_from_storage_and_metastore( index_uid.clone(), storage, self.metastore.clone(), @@ -351,3 +348,99 @@ pub async fn validate_storage_uri( storage_resolver.resolve(&index_config.index_uri).await?; Ok(()) } + +#[cfg(test)] +mod tests { + use quickwit_common::uri::Uri; + use quickwit_metastore::metastore_for_test; + use quickwit_storage::PutPayload; + + use super::*; + + #[tokio::test] + async fn test_create_index() { + let metastore = metastore_for_test(); + let storage_resolver = StorageResolver::ram_for_test(); + let index_service = IndexService::new(metastore.clone(), storage_resolver); + let index_id = "test-index"; + let index_uri = "ram://indexes/test-index"; + let index_config = IndexConfig::for_test(index_id, index_uri); + let index_metadata_0 = index_service + .create_index(index_config.clone(), false) + .await + .unwrap(); + assert_eq!(index_metadata_0.index_id(), index_id); + assert_eq!(index_metadata_0.index_uri(), &index_uri); + assert!(metastore.index_exists(index_id).await.unwrap()); + + let error = index_service + .create_index(index_config.clone(), false) + .await + .unwrap_err(); + let IndexServiceError::MetastoreError(inner_error) = error else { + panic!("Expected `MetastoreError` variant, got {:?}", error) + }; + assert!( + matches!(inner_error, MetastoreError::IndexAlreadyExists { index_id } if index_id == index_metadata_0.index_id()) + ); + + let index_metadata_1 = index_service + .create_index(index_config, true) + .await + .unwrap(); + assert_eq!(index_metadata_1.index_id(), index_id); + assert_eq!(index_metadata_1.index_uri(), &index_uri); + assert!(index_metadata_0.index_uid != index_metadata_1.index_uid); + } + + #[tokio::test] + async fn test_delete_index() { + let metastore = metastore_for_test(); + let storage_resolver = StorageResolver::ram_for_test(); + let storage = storage_resolver + .resolve(&Uri::for_test("ram://indexes/test-index")) + .await + .unwrap(); + let index_service = IndexService::new(metastore.clone(), storage_resolver); + let index_id = "test-index"; + let index_uri = "ram://indexes/test-index"; + let index_config = IndexConfig::for_test(index_id, index_uri); + let index_uid = index_service + .create_index(index_config.clone(), false) + .await + .unwrap() + .index_uid; + + let split_id = "test-split"; + let split_metadata = SplitMetadata { + split_id: split_id.to_string(), + index_uid: index_uid.clone(), + ..Default::default() + }; + metastore + .stage_splits(index_uid.clone(), vec![split_metadata.clone()]) + .await + .unwrap(); + + let splits = metastore.list_all_splits(index_uid.clone()).await.unwrap(); + assert_eq!(splits.len(), 1); + + let split_path_str = format!("{}.split", split_id); + let split_path = Path::new(&split_path_str); + let payload: Box = Box::new(vec![0]); + storage.put(split_path, payload).await.unwrap(); + assert!(storage.exists(split_path).await.unwrap()); + + let split_infos = index_service.delete_index(index_id, false).await.unwrap(); + assert_eq!(split_infos.len(), 1); + + let error = metastore + .list_all_splits(index_uid.clone()) + .await + .unwrap_err(); + assert!( + matches!(error, MetastoreError::IndexDoesNotExist { index_id } if index_id == index_uid.index_id()) + ); + assert!(!storage.exists(split_path).await.unwrap()); + } +} diff --git a/quickwit/quickwit-core/src/lib.rs b/quickwit/quickwit-core/src/lib.rs index fb6e0c81f12..b22e7abecfb 100644 --- a/quickwit/quickwit-core/src/lib.rs +++ b/quickwit/quickwit-core/src/lib.rs @@ -17,61 +17,6 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -#![deny(clippy::disallowed_methods)] - mod index; pub use index::{clear_cache_directory, validate_storage_uri, IndexService, IndexServiceError}; - -#[cfg(test)] -mod tests { - use std::path::Path; - - use quickwit_common::FileEntry; - use quickwit_indexing::TestSandbox; - use quickwit_storage::StorageResolver; - - use crate::IndexService; - - #[tokio::test] - async fn test_file_entry_from_split_and_index_delete() -> anyhow::Result<()> { - let index_id = "test-index"; - let doc_mapping_yaml = r#" - field_mappings: - - name: title - type: text - - name: body - type: text - - name: url - type: text - "#; - let test_sandbox = - TestSandbox::create(index_id, doc_mapping_yaml, "{}", &["title", "body"]).await?; - test_sandbox.add_documents(vec![ - serde_json::json!({"title": "snoopy", "body": "Snoopy is an anthropomorphic beagle[5] in the comic strip...", "url": "http://snoopy"}), - ]).await?; - let splits = test_sandbox - .metastore() - .list_all_splits(test_sandbox.index_uid()) - .await? - .into_iter() - .map(|metadata| metadata.split_metadata) - .collect::>(); - let file_entries: Vec = splits.iter().map(FileEntry::from).collect(); - assert_eq!(file_entries.len(), 1); - for file_entry in file_entries { - let split_num_bytes = test_sandbox - .storage() - .file_num_bytes(Path::new(file_entry.file_name.as_str())) - .await?; - assert_eq!(split_num_bytes, file_entry.file_size_in_bytes); - } - // Now delete the index. - let index_service = - IndexService::new(test_sandbox.metastore(), StorageResolver::unconfigured()); - let deleted_file_entries = index_service.delete_index(index_id, false).await?; - assert_eq!(deleted_file_entries.len(), 1); - test_sandbox.assert_quit().await; - Ok(()) - } -} diff --git a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs index 56afad8b896..ddfe2b79504 100644 --- a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs +++ b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs @@ -18,6 +18,7 @@ // along with this program. If not, see . use std::collections::HashSet; +use std::path::Path; use std::sync::Arc; use std::time::Duration; @@ -134,7 +135,7 @@ impl GarbageCollector { let deleted_file_entries = match gc_res { Ok(removal_info) => { self.counters.num_successful_gc_run_on_index += 1; - self.counters.num_failed_splits += removal_info.failed_split_ids.len(); + self.counters.num_failed_splits += removal_info.failed_splits.len(); removal_info.removed_split_entries } Err(error) => { @@ -145,9 +146,9 @@ impl GarbageCollector { }; if !deleted_file_entries.is_empty() { let num_deleted_splits = deleted_file_entries.len(); - let deleted_files: HashSet<&str> = deleted_file_entries + let deleted_files: HashSet<&Path> = deleted_file_entries .iter() - .map(|deleted_entry| deleted_entry.file_name.as_str()) + .map(|deleted_entry| deleted_entry.file_name.as_path()) .take(5) .collect(); info!( @@ -160,7 +161,7 @@ impl GarbageCollector { self.counters.num_deleted_files += deleted_file_entries.len(); self.counters.num_deleted_bytes += deleted_file_entries .iter() - .map(|entry| entry.file_size_in_bytes as usize) + .map(|entry| entry.file_size_bytes.get_bytes() as usize) .sum::(); } } diff --git a/quickwit/quickwit-janitor/src/garbage_collection.rs b/quickwit/quickwit-janitor/src/garbage_collection.rs index 2f09c185aa8..95e90d7dbef 100644 --- a/quickwit/quickwit-janitor/src/garbage_collection.rs +++ b/quickwit/quickwit-janitor/src/garbage_collection.rs @@ -17,35 +17,38 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::HashMap; -use std::path::Path; +use std::collections::{HashMap, HashSet}; +use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; use futures::Future; use quickwit_actors::ActorContext; -use quickwit_common::{FileEntry, PrettySample}; -use quickwit_metastore::{ListSplitsQuery, Metastore, MetastoreError, SplitMetadata, SplitState}; +use quickwit_common::PrettySample; +use quickwit_metastore::{ + ListSplitsQuery, Metastore, MetastoreError, SplitInfo, SplitMetadata, SplitState, +}; use quickwit_proto::IndexUid; -use quickwit_storage::Storage; +use quickwit_storage::{BulkDeleteError, Storage}; use thiserror::Error; use time::OffsetDateTime; use tracing::{error, instrument}; use crate::actors::GarbageCollector; -/// The maximum number of splits that should be deleted in one go by the GC. +/// The maximum number of splits that the GC should delete per attempt. const DELETE_SPLITS_BATCH_SIZE: usize = 1000; -/// SplitDeletionError denotes error that can happen when deleting split -/// during garbage collection. +/// [`DeleteSplitsError`] describes the errors that occurred during the deletion of splits from +/// storage and metastore. #[derive(Error, Debug)] -pub enum SplitDeletionError { - #[error("Failed to delete splits from metastore: '{error:?}'.")] - MetastoreFailure { - error: MetastoreError, - failed_split_ids: Vec, - }, +#[error("Failed to delete splits from storage and/or metastore.")] +pub struct DeleteSplitsError { + successes: Vec, + storage_error: Option, + storage_failures: Vec, + metastore_error: Option, + metastore_failures: Vec, } async fn protect_future( @@ -65,9 +68,9 @@ where /// Information on what splits have and have not been cleaned up by the GC. pub struct SplitRemovalInfo { /// The set of splits that have been removed. - pub removed_split_entries: Vec, + pub removed_split_entries: Vec, /// The set of split ids that were attempted to be removed, but were unsuccessful. - pub failed_split_ids: Vec, + pub failed_splits: Vec, } /// Detect all dangling splits and associated files from the index and removes them. @@ -112,24 +115,24 @@ pub async fn run_garbage_collect( let mut splits_marked_for_deletion = protect_future(ctx_opt, metastore.list_splits(query)) .await? .into_iter() - .map(|meta| meta.split_metadata) + .map(|split| split.split_metadata) .collect::>(); splits_marked_for_deletion.extend(deletable_staged_splits); - let candidate_entries: Vec = splits_marked_for_deletion - .iter() - .map(FileEntry::from) + let candidate_entries: Vec = splits_marked_for_deletion + .into_iter() + .map(|split| split.as_split_info()) .collect(); return Ok(SplitRemovalInfo { removed_split_entries: candidate_entries, - failed_split_ids: Vec::new(), + failed_splits: Vec::new(), }); } // Schedule all eligible staged splits for delete let split_ids: Vec<&str> = deletable_staged_splits .iter() - .map(|meta| meta.split_id()) + .map(|split| split.split_id()) .collect(); if !split_ids.is_empty() { protect_future( @@ -144,7 +147,7 @@ pub async fn run_garbage_collect( let updated_before_timestamp = OffsetDateTime::now_utc().unix_timestamp() - deletion_grace_period.as_secs() as i64; - let deleted_files = delete_splits_marked_for_deletion( + let deleted_splits = delete_splits_marked_for_deletion( index_uid, updated_before_timestamp, storage, @@ -153,7 +156,7 @@ pub async fn run_garbage_collect( ) .await; - Ok(deleted_files) + Ok(deleted_splits) } #[instrument(skip(storage, metastore, ctx_opt))] @@ -169,8 +172,9 @@ async fn delete_splits_marked_for_deletion( metastore: Arc, ctx_opt: Option<&ActorContext>, ) -> SplitRemovalInfo { - let mut failed_split_ids = Vec::new(); - let mut removed_split_files = Vec::new(); + let mut removed_splits = Vec::new(); + let mut failed_splits = Vec::new(); + loop { let query = ListSplitsQuery::for_index(index_uid.clone()) .with_split_state(SplitState::MarkedForDeletion) @@ -186,18 +190,17 @@ async fn delete_splits_marked_for_deletion( break; } }; - let splits_to_delete = splits_to_delete .into_iter() .map(|split| split.split_metadata) .collect::>(); let num_splits_to_delete = splits_to_delete.len(); + if num_splits_to_delete == 0 { break; } - - let delete_splits_result = delete_splits_with_files( + let delete_splits_result = delete_splits_from_storage_and_metastore( index_uid.clone(), storage.clone(), metastore.clone(), @@ -207,31 +210,20 @@ async fn delete_splits_marked_for_deletion( .await; match delete_splits_result { - Ok(entries) => removed_split_files.extend(entries), - Err(SplitDeletionError::MetastoreFailure { - error, - failed_split_ids: failed_split_ids_inner, - }) => { - error!( - error=?error, - index_id=%index_uid.index_id(), - split_ids=?PrettySample::new(&failed_split_ids_inner, 5), - "Failed to delete {} splits.", - failed_split_ids_inner.len() - ); - failed_split_ids.extend(failed_split_ids_inner); + Ok(entries) => removed_splits.extend(entries), + Err(delete_splits_error) => { + failed_splits.extend(delete_splits_error.storage_failures); + failed_splits.extend(delete_splits_error.metastore_failures); break; } } - if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE { break; } } - SplitRemovalInfo { - removed_split_entries: removed_split_files, - failed_split_ids, + removed_split_entries: removed_splits, + failed_splits, } } @@ -243,91 +235,100 @@ async fn delete_splits_marked_for_deletion( /// * `metastore` - The metastore managing the target index. /// * `splits` - The list of splits to delete. /// * `ctx_opt` - A context for reporting progress (only useful within quickwit actor). -pub async fn delete_splits_with_files( +pub async fn delete_splits_from_storage_and_metastore( index_uid: IndexUid, storage: Arc, metastore: Arc, splits: Vec, ctx_opt: Option<&ActorContext>, -) -> anyhow::Result, SplitDeletionError> { - let mut paths_to_splits = HashMap::with_capacity(splits.len()); +) -> anyhow::Result, DeleteSplitsError> { + let mut split_infos: HashMap = HashMap::with_capacity(splits.len()); for split in splits { - let file_entry = FileEntry::from(&split); - let split_filename = quickwit_common::split_file(split.split_id()); - let split_path = Path::new(&split_filename); - - paths_to_splits.insert( - split_path.to_path_buf(), - (split.split_id().to_string(), file_entry), - ); + let split_info = split.as_split_info(); + split_infos.insert(split_info.file_name.clone(), split_info); } - - let paths = paths_to_splits + let split_paths = split_infos .keys() - .map(|key| key.as_path()) + .map(|split_path_buf| split_path_buf.as_path()) .collect::>(); - let delete_result = storage.bulk_delete(&paths).await; + let delete_result = protect_future(ctx_opt, storage.bulk_delete(&split_paths)).await; if let Some(ctx) = ctx_opt { ctx.record_progress(); } - - let mut deleted_split_ids = Vec::new(); - let mut deleted_file_entries = Vec::new(); + let mut successes = Vec::with_capacity(split_infos.len()); + let mut storage_error: Option = None; + let mut storage_failures = Vec::new(); match delete_result { - Ok(()) => { - for (split_id, entry) in paths_to_splits.into_values() { - deleted_split_ids.push(split_id); - deleted_file_entries.push(entry); - } - } + Ok(_) => successes.extend(split_infos.into_values()), Err(bulk_delete_error) => { - let num_failed_splits = - bulk_delete_error.failures.len() + bulk_delete_error.unattempted.len(); - let truncated_split_ids = bulk_delete_error - .failures - .keys() - .chain(bulk_delete_error.unattempted.iter()) - .take(5) + let success_split_paths: HashSet<&PathBuf> = + bulk_delete_error.successes.iter().collect(); + for (split_path, split_info) in split_infos { + if success_split_paths.contains(&split_path) { + successes.push(split_info); + } else { + storage_failures.push(split_info); + } + } + let failed_split_paths = storage_failures + .iter() + .map(|split_info| split_info.file_name.as_path()) .collect::>(); - error!( - error = ?bulk_delete_error.error, - index_id = ?index_uid.index_id(), - num_failed_splits = num_failed_splits, - "Failed to delete {:?} and {} other splits.", - truncated_split_ids, num_failed_splits, + error=?bulk_delete_error.error, + index_id=index_uid.index_id(), + "Failed to delete split file(s) {:?} from storage.", + PrettySample::new(&failed_split_paths, 5), ); - - for split_path in bulk_delete_error.successes { - let (split_id, entry) = paths_to_splits - .remove(&split_path) - .expect("The successful split path should be present within the lookup table."); - - deleted_split_ids.push(split_id); - deleted_file_entries.push(entry); - } + storage_error = Some(bulk_delete_error); } }; + if !successes.is_empty() { + let split_ids: Vec<&str> = successes + .iter() + .map(|split_info| split_info.split_id.as_str()) + .collect(); + let metastore_result = protect_future( + ctx_opt, + metastore.delete_splits(index_uid.clone(), &split_ids), + ) + .await; - if !deleted_split_ids.is_empty() { - let split_ids: Vec<&str> = deleted_split_ids.iter().map(String::as_str).collect(); - protect_future(ctx_opt, metastore.delete_splits(index_uid, &split_ids)) - .await - .map_err(|error| SplitDeletionError::MetastoreFailure { - error, - failed_split_ids: split_ids.into_iter().map(str::to_string).collect(), - })?; + if let Err(metastore_error) = metastore_result { + error!( + error=?metastore_error, + index_id=index_uid.index_id(), + "Failed to delete split(s) {:?} from metastore.", + PrettySample::new(&split_ids, 5), + ); + let delete_splits_error = DeleteSplitsError { + successes: Vec::new(), + storage_error, + storage_failures, + metastore_error: Some(metastore_error), + metastore_failures: successes, + }; + return Err(delete_splits_error); + } } - - Ok(deleted_file_entries) + if !storage_failures.is_empty() { + let delete_splits_error = DeleteSplitsError { + successes, + storage_error, + storage_failures, + metastore_error: None, + metastore_failures: Vec::new(), + }; + return Err(delete_splits_error); + } + Ok(successes) } #[cfg(test)] mod tests { - use std::sync::Arc; use std::time::Duration; use quickwit_config::IndexConfig; @@ -335,8 +336,11 @@ mod tests { metastore_for_test, ListSplitsQuery, MockMetastore, SplitMetadata, SplitState, }; use quickwit_proto::IndexUid; - use quickwit_storage::storage_for_test; + use quickwit_storage::{ + storage_for_test, BulkDeleteError, DeleteFailure, MockStorage, PutPayload, + }; + use super::*; use crate::run_garbage_collect; #[tokio::test] @@ -483,4 +487,201 @@ mod tests { .await .unwrap(); } + + #[tokio::test] + async fn test_delete_splits_from_storage_and_metastore_happy_path() { + let storage = storage_for_test(); + let metastore = metastore_for_test(); + + let index_id = "test-delete-splits-happy--index"; + let index_uri = format!("ram:///indexes/{index_id}"); + let index_config = IndexConfig::for_test(index_id, &index_uri); + let index_uid = metastore.create_index(index_config).await.unwrap(); + + let split_id = "test-delete-splits-happy--split"; + let split_metadata = SplitMetadata { + split_id: split_id.to_string(), + index_uid: IndexUid::new(index_id), + ..Default::default() + }; + metastore + .stage_splits(index_uid.clone(), vec![split_metadata.clone()]) + .await + .unwrap(); + metastore + .mark_splits_for_deletion(index_uid.clone(), &[split_id]) + .await + .unwrap(); + + let split_path_str = format!("{}.split", split_id); + let split_path = Path::new(&split_path_str); + let payload: Box = Box::new(vec![0]); + storage.put(split_path, payload).await.unwrap(); + assert!(storage.exists(split_path).await.unwrap()); + + let splits = metastore.list_all_splits(index_uid.clone()).await.unwrap(); + assert_eq!(splits.len(), 1); + + let deleted_split_infos = delete_splits_from_storage_and_metastore( + index_uid.clone(), + storage.clone(), + metastore.clone(), + vec![split_metadata], + None, + ) + .await + .unwrap(); + + assert_eq!(deleted_split_infos.len(), 1); + assert_eq!(deleted_split_infos[0].split_id, split_id,); + assert_eq!( + deleted_split_infos[0].file_name, + Path::new(&format!("{split_id}.split")) + ); + assert!(!storage.exists(split_path).await.unwrap()); + assert!(metastore + .list_all_splits(index_uid) + .await + .unwrap() + .is_empty()); + } + + #[tokio::test] + async fn test_delete_splits_from_storage_and_metastore_storage_error() { + let mut mock_storage = MockStorage::new(); + mock_storage + .expect_bulk_delete() + .return_once(|split_paths| { + assert_eq!(split_paths.len(), 2); + + let split_path = split_paths[0].to_path_buf(); + let successes = vec![split_path]; + + let split_path = split_paths[1].to_path_buf(); + let delete_failure = DeleteFailure { + code: Some("AccessDenied".to_string()), + ..Default::default() + }; + let failures = HashMap::from_iter([(split_path, delete_failure)]); + let bulk_delete_error = BulkDeleteError { + successes, + failures, + ..Default::default() + }; + Err(bulk_delete_error) + }); + let storage = Arc::new(mock_storage); + let metastore = metastore_for_test(); + + let index_id = "test-delete-splits-storage-error--index"; + let index_uri = format!("ram:///indexes/{index_id}"); + let index_config = IndexConfig::for_test(index_id, &index_uri); + let index_uid = metastore.create_index(index_config).await.unwrap(); + + let split_id_0 = "test-delete-splits-storage-error--split-0"; + let split_metadata_0 = SplitMetadata { + split_id: split_id_0.to_string(), + index_uid: index_uid.clone(), + ..Default::default() + }; + let split_id_1 = "test-delete-splits-storage-error--split-1"; + let split_metadata_1 = SplitMetadata { + split_id: split_id_1.to_string(), + index_uid: index_uid.clone(), + ..Default::default() + }; + metastore + .stage_splits( + index_uid.clone(), + vec![split_metadata_0.clone(), split_metadata_1.clone()], + ) + .await + .unwrap(); + metastore + .mark_splits_for_deletion(index_uid.clone(), &[split_id_0, split_id_1]) + .await + .unwrap(); + + let error = delete_splits_from_storage_and_metastore( + index_uid.clone(), + storage.clone(), + metastore.clone(), + vec![split_metadata_0, split_metadata_1], + None, + ) + .await + .unwrap_err(); + + assert_eq!(error.successes.len(), 1); + assert_eq!(error.storage_failures.len(), 1); + assert_eq!(error.metastore_failures.len(), 0); + + let splits = metastore.list_all_splits(index_uid.clone()).await.unwrap(); + assert_eq!(splits.len(), 1); + assert_eq!(splits[0].split_id(), split_id_1); + } + + #[tokio::test] + async fn test_delete_splits_from_storage_and_metastore_metastore_error() { + let mut mock_storage = MockStorage::new(); + mock_storage + .expect_bulk_delete() + .return_once(|split_paths| { + assert_eq!(split_paths.len(), 2); + + let split_path = split_paths[0].to_path_buf(); + let successes = vec![split_path]; + + let split_path = split_paths[1].to_path_buf(); + let delete_failure = DeleteFailure { + code: Some("AccessDenied".to_string()), + ..Default::default() + }; + let failures = HashMap::from_iter([(split_path, delete_failure)]); + let bulk_delete_error = BulkDeleteError { + successes, + failures, + ..Default::default() + }; + Err(bulk_delete_error) + }); + let storage = Arc::new(mock_storage); + + let index_id = "test-delete-splits-storage-error--index"; + let index_uid = IndexUid::new(index_id.to_string()); + + let mut mock_metastore = MockMetastore::new(); + mock_metastore.expect_delete_splits().return_once(|_, _| { + Err(MetastoreError::IndexDoesNotExist { + index_id: index_id.to_string(), + }) + }); + let metastore = Arc::new(mock_metastore); + + let split_id_0 = "test-delete-splits-storage-error--split-0"; + let split_metadata_0 = SplitMetadata { + split_id: split_id_0.to_string(), + index_uid: index_uid.clone(), + ..Default::default() + }; + let split_id_1 = "test-delete-splits-storage-error--split-1"; + let split_metadata_1 = SplitMetadata { + split_id: split_id_1.to_string(), + index_uid: index_uid.clone(), + ..Default::default() + }; + let error = delete_splits_from_storage_and_metastore( + index_uid.clone(), + storage.clone(), + metastore.clone(), + vec![split_metadata_0, split_metadata_1], + None, + ) + .await + .unwrap_err(); + + assert!(error.successes.is_empty()); + assert_eq!(error.storage_failures.len(), 1); + assert_eq!(error.metastore_failures.len(), 1); + } } diff --git a/quickwit/quickwit-janitor/src/lib.rs b/quickwit/quickwit-janitor/src/lib.rs index 0cc157f2c7e..ed62101a01b 100644 --- a/quickwit/quickwit-janitor/src/lib.rs +++ b/quickwit/quickwit-janitor/src/lib.rs @@ -22,9 +22,8 @@ use std::sync::Arc; use quickwit_actors::{Mailbox, Universe}; -use quickwit_common::FileEntry; use quickwit_config::NodeConfig; -use quickwit_metastore::Metastore; +use quickwit_metastore::{Metastore, SplitInfo}; use quickwit_search::SearchJobPlacer; use quickwit_storage::StorageResolver; use tracing::info; @@ -39,12 +38,13 @@ mod retention_policy_execution; pub use janitor_service::JanitorService; pub use self::garbage_collection::{ - delete_splits_with_files, run_garbage_collect, SplitDeletionError, SplitRemovalInfo, + delete_splits_from_storage_and_metastore, run_garbage_collect, DeleteSplitsError, + SplitRemovalInfo, }; use crate::actors::{DeleteTaskService, GarbageCollector, RetentionPolicyExecutor}; #[derive(utoipa::OpenApi)] -#[openapi(components(schemas(FileEntry)))] +#[openapi(components(schemas(SplitInfo)))] /// Schema used for the OpenAPI generation which are apart of this crate. pub struct JanitorApiSchemas; diff --git a/quickwit/quickwit-metastore/src/lib.rs b/quickwit/quickwit-metastore/src/lib.rs index d648bc29801..788fea5df8f 100644 --- a/quickwit/quickwit-metastore/src/lib.rs +++ b/quickwit/quickwit-metastore/src/lib.rs @@ -56,7 +56,7 @@ pub use metastore_factory::{MetastoreFactory, UnsupportedMetastore}; pub use metastore_resolver::MetastoreResolver; use quickwit_common::is_disjoint; use quickwit_doc_mapper::tag_pruning::TagFilterAst; -pub use split_metadata::{Split, SplitMaturity, SplitMetadata, SplitState}; +pub use split_metadata::{Split, SplitInfo, SplitMaturity, SplitMetadata, SplitState}; pub(crate) use split_metadata_version::{SplitMetadataV0_6, VersionedSplitMetadata}; #[derive(utoipa::OpenApi)] diff --git a/quickwit/quickwit-metastore/src/split_metadata.rs b/quickwit/quickwit-metastore/src/split_metadata.rs index 8f4b5060d89..99b91f2fccd 100644 --- a/quickwit/quickwit-metastore/src/split_metadata.rs +++ b/quickwit/quickwit-metastore/src/split_metadata.rs @@ -20,10 +20,11 @@ use std::collections::BTreeSet; use std::fmt; use std::ops::{Range, RangeInclusive}; +use std::path::PathBuf; use std::str::FromStr; use std::time::Duration; -use quickwit_common::FileEntry; +use byte_unit::Byte; use quickwit_proto::IndexUid; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DurationMilliSeconds}; @@ -179,17 +180,39 @@ impl SplitMetadata { ..Default::default() } } -} -impl From<&SplitMetadata> for FileEntry { - fn from(split: &SplitMetadata) -> Self { - FileEntry { - file_name: quickwit_common::split_file(split.split_id()), - file_size_in_bytes: split.footer_offsets.end, + /// Converts the split metadata into a [`SplitInfo`]. + pub fn as_split_info(&self) -> SplitInfo { + let file_name = quickwit_common::split_file(self.split_id()); + + SplitInfo { + uncompressed_docs_size_bytes: Byte::from_bytes(self.uncompressed_docs_size_in_bytes), + file_name: PathBuf::from(file_name), + file_size_bytes: Byte::from_bytes(self.footer_offsets.end), + split_id: self.split_id.clone(), + num_docs: self.num_docs, } } } +/// A summarized version of the split metadata for display purposes. +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)] +pub struct SplitInfo { + /// The split ID. + pub split_id: String, + /// The number of documents in the split. + pub num_docs: usize, + /// The sum of the sizes of the original JSON payloads in bytes. + #[schema(value_type = u64)] + pub uncompressed_docs_size_bytes: Byte, + /// The name of the split file on disk. + #[schema(value_type = String)] + pub file_name: PathBuf, + /// The size of the split file on disk in bytes. + #[schema(value_type = u64)] + pub file_size_bytes: Byte, +} + #[cfg(any(test, feature = "testsuite"))] impl quickwit_config::TestableForRegression for SplitMetadata { fn sample_for_regression() -> Self { diff --git a/quickwit/quickwit-rest-client/src/rest_client.rs b/quickwit/quickwit-rest-client/src/rest_client.rs index 5ea52f1141b..e4b02f27cc6 100644 --- a/quickwit/quickwit-rest-client/src/rest_client.rs +++ b/quickwit/quickwit-rest-client/src/rest_client.rs @@ -21,11 +21,10 @@ use std::time::Duration; use bytes::Bytes; use quickwit_cluster::ClusterSnapshot; -use quickwit_common::FileEntry; use quickwit_config::{ConfigFormat, SourceConfig}; use quickwit_indexing::actors::IndexingServiceCounters; pub use quickwit_ingest::CommitType; -use quickwit_metastore::{IndexMetadata, Split}; +use quickwit_metastore::{IndexMetadata, Split, SplitInfo}; use quickwit_search::SearchResponseRest; use quickwit_serve::{ListSplitsQueryParams, SearchRequestQueryString}; use reqwest::header::{HeaderMap, HeaderValue, CONTENT_TYPE}; @@ -374,7 +373,7 @@ impl<'a> IndexClient<'a> { Ok(()) } - pub async fn delete(&self, index_id: &str, dry_run: bool) -> Result, Error> { + pub async fn delete(&self, index_id: &str, dry_run: bool) -> Result, Error> { let path = format!("indexes/{index_id}"); let response = self .transport @@ -933,10 +932,13 @@ mod test { Mock::given(method("DELETE")) .and(path("/api/v1/indexes/my-index")) .and(query_param("dry_run", "true")) - .respond_with( - ResponseTemplate::new(StatusCode::OK) - .set_body_json(json!([{"file_name": "filename", "file_size_in_bytes": 100}])), - ) + .respond_with(ResponseTemplate::new(StatusCode::OK).set_body_json(json!([{ + "split_id": "my-split", + "num_docs": 1, + "uncompressed_docs_size_bytes": 1024, + "file_name": "my-split.split", + "file_size_bytes": 128, + }]))) .up_to_n_times(1) .mount(&mock_server) .await; diff --git a/quickwit/quickwit-serve/src/index_api/rest_handler.rs b/quickwit/quickwit-serve/src/index_api/rest_handler.rs index 22ae6e5dffc..3205111552e 100644 --- a/quickwit/quickwit-serve/src/index_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/index_api/rest_handler.rs @@ -22,7 +22,6 @@ use std::sync::Arc; use bytes::Bytes; use hyper::header::CONTENT_TYPE; use quickwit_common::uri::Uri; -use quickwit_common::FileEntry; use quickwit_config::{ load_source_config_from_user_config, ConfigFormat, NodeConfig, SourceConfig, SourceParams, CLI_INGEST_SOURCE_ID, INGEST_API_SOURCE_ID, @@ -30,7 +29,7 @@ use quickwit_config::{ use quickwit_core::{IndexService, IndexServiceError}; use quickwit_doc_mapper::{analyze_text, TokenizerConfig}; use quickwit_metastore::{ - IndexMetadata, ListSplitsQuery, Metastore, MetastoreError, Split, SplitState, + IndexMetadata, ListSplitsQuery, Metastore, MetastoreError, Split, SplitInfo, SplitState, }; use quickwit_proto::IndexUid; use serde::de::DeserializeOwned; @@ -503,7 +502,7 @@ async fn delete_index( index_id: String, delete_index_query_param: DeleteIndexQueryParam, index_service: Arc, -) -> Result, IndexServiceError> { +) -> Result, IndexServiceError> { info!(index_id = %index_id, dry_run = delete_index_query_param.dry_run, "delete-index"); index_service .delete_index(&index_id, delete_index_query_param.dry_run) @@ -1162,7 +1161,7 @@ mod tests { let resp_json: serde_json::Value = serde_json::from_slice(resp.body()).unwrap(); let expected_response_json = serde_json::json!([{ "file_name": "split_1.split", - "file_size_in_bytes": 800, + "file_size_bytes": 800, }]); assert_json_include!(actual: resp_json, expected: expected_response_json); } @@ -1176,7 +1175,7 @@ mod tests { let resp_json: serde_json::Value = serde_json::from_slice(resp.body()).unwrap(); let expected_response_json = serde_json::json!([{ "file_name": "split_1.split", - "file_size_in_bytes": 800, + "file_size_bytes": 800, }]); assert_json_include!(actual: resp_json, expected: expected_response_json); } diff --git a/quickwit/quickwit-storage/src/bundle_storage.rs b/quickwit/quickwit-storage/src/bundle_storage.rs index 9966a0efbda..cf02cf2b94b 100644 --- a/quickwit/quickwit-storage/src/bundle_storage.rs +++ b/quickwit/quickwit-storage/src/bundle_storage.rs @@ -36,8 +36,10 @@ use thiserror::Error; use tokio::io::AsyncWriteExt; use tracing::error; -use crate::storage::{BulkDeleteError, SendableAsync}; -use crate::{OwnedBytes, Storage, StorageError, StorageResult, VersionedComponent}; +use crate::storage::SendableAsync; +use crate::{ + BulkDeleteError, OwnedBytes, Storage, StorageError, StorageResult, VersionedComponent, +}; /// BundleStorage bundles together multiple files into a single file. /// with some metadata diff --git a/quickwit/quickwit-storage/src/cache/storage_with_cache.rs b/quickwit/quickwit-storage/src/cache/storage_with_cache.rs index 849964bf4fb..5ca2b1a427f 100644 --- a/quickwit/quickwit-storage/src/cache/storage_with_cache.rs +++ b/quickwit/quickwit-storage/src/cache/storage_with_cache.rs @@ -26,8 +26,8 @@ use async_trait::async_trait; use quickwit_common::uri::Uri; use crate::cache::Cache; -use crate::storage::{BulkDeleteError, SendableAsync}; -use crate::{OwnedBytes, Storage, StorageResult}; +use crate::storage::SendableAsync; +use crate::{BulkDeleteError, OwnedBytes, Storage, StorageResult}; /// Use with care, StorageWithCache is read-only. pub struct StorageWithCache { diff --git a/quickwit/quickwit-storage/src/debouncer.rs b/quickwit/quickwit-storage/src/debouncer.rs index 4a3b5484e98..035662cbe33 100644 --- a/quickwit/quickwit-storage/src/debouncer.rs +++ b/quickwit/quickwit-storage/src/debouncer.rs @@ -30,8 +30,8 @@ use futures::{Future, FutureExt}; use quickwit_common::uri::Uri; use tantivy::directory::OwnedBytes; -use crate::storage::{BulkDeleteError, SendableAsync}; -use crate::{Storage, StorageResult}; +use crate::storage::SendableAsync; +use crate::{BulkDeleteError, Storage, StorageResult}; /// The AsyncDebouncer debounces inflight Futures, so that concurrent async request to the same data /// source can be deduplicated. diff --git a/quickwit/quickwit-storage/src/error.rs b/quickwit/quickwit-storage/src/error.rs index dbe23a0183c..f5a0b6f7a95 100644 --- a/quickwit/quickwit-storage/src/error.rs +++ b/quickwit/quickwit-storage/src/error.rs @@ -17,6 +17,8 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::collections::HashMap; +use std::path::PathBuf; use std::sync::Arc; use std::{fmt, io}; @@ -144,3 +146,58 @@ impl From for StorageError { } } } + +/// Error returned by `bulk_delete`. Under the hood, `bulk_delete` groups the files to +/// delete into multiple batches of fixed size and issues one delete objects request per batch. The +/// whole operation can fail in multiples ways, which is reflected by the quirkiness of the API of +/// [`BulkDeleteError`]. First, a batch can fail partially, i.e. some objects are deleted while +/// others are not. The `successes` and `failures` attributes of the error will be populated +/// accordingly. Second, a batch can fail completely, in which case the `error` field will be set. +/// Because a batch failing entirely usually indicates a systemic error, for instance, a connection +/// or credentials issue, `bulk_delete` does not attempt to delete the remaining batches and +/// populates the `unattempted` attribute. Consequently, the attributes of this error are not +/// "mutually exclusive": there exists a path where all those fields are not empty. The caller is +/// expected to handle this error carefully and inspect the instance thoroughly before any retry +/// attempt. +#[must_use] +#[derive(Debug, Default, thiserror::Error)] +pub struct BulkDeleteError { + /// Error that occurred for a whole batch and caused the entire deletion operation to be + /// aborted. + pub error: Option, + /// List of files that were successfully deleted, including non-existing files. + pub successes: Vec, + /// List of files that failed to be deleted along with the corresponding failure descriptions. + pub failures: HashMap, + /// List of remaining files to delete before the operation was aborted. + pub unattempted: Vec, +} + +/// Describes the failure for an individual file in a batch delete operation. +#[derive(Debug, Default)] +pub struct DeleteFailure { + /// The error that occurred for this file. + pub error: Option, + /// The failure code is a string that uniquely identifies an error condition. It is meant to be + /// read and understood by programs that detect and handle errors by type. + pub code: Option, + /// The error message contains a generic description of the error condition in English. It is + /// intended for a human audience. Simple programs display the message directly to the end user + /// if they encounter an error condition they don't know how or don't care to handle. + /// Sophisticated programs with more exhaustive error handling and proper internationalization + /// are more likely to ignore the error message. + pub message: Option, +} + +impl fmt::Display for BulkDeleteError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "Bulk delete error ({} success(es), {} failure(s), {} unattempted)", + self.successes.len(), + self.failures.len(), + self.unattempted.len() + )?; + Ok(()) + } +} diff --git a/quickwit/quickwit-storage/src/lib.rs b/quickwit/quickwit-storage/src/lib.rs index cc4fa32b1aa..90ba89d3fca 100644 --- a/quickwit/quickwit-storage/src/lib.rs +++ b/quickwit/quickwit-storage/src/lib.rs @@ -82,7 +82,10 @@ pub use self::test_suite::{ storage_test_multi_part_upload, storage_test_single_part_upload, storage_test_suite, test_write_and_bulk_delete, }; -pub use crate::error::{StorageError, StorageErrorKind, StorageResolverError, StorageResult}; +pub use crate::error::{ + BulkDeleteError, DeleteFailure, StorageError, StorageErrorKind, StorageResolverError, + StorageResult, +}; /// Loads an entire local or remote file into memory. pub async fn load_file( diff --git a/quickwit/quickwit-storage/src/local_file_storage.rs b/quickwit/quickwit-storage/src/local_file_storage.rs index 025a6e1fb2a..4a88a42a0ca 100644 --- a/quickwit/quickwit-storage/src/local_file_storage.rs +++ b/quickwit/quickwit-storage/src/local_file_storage.rs @@ -34,10 +34,10 @@ use tokio::fs; use tokio::io::AsyncWriteExt; use tracing::warn; -use crate::storage::{BulkDeleteError, DeleteFailure, SendableAsync}; +use crate::storage::SendableAsync; use crate::{ - DebouncedStorage, OwnedBytes, Storage, StorageError, StorageErrorKind, StorageFactory, - StorageResolverError, StorageResult, + BulkDeleteError, DebouncedStorage, DeleteFailure, OwnedBytes, Storage, StorageError, + StorageErrorKind, StorageFactory, StorageResolverError, StorageResult, }; /// File system compatible storage implementation. diff --git a/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs b/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs index 0bee6afa2bc..8b529f242d6 100644 --- a/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs @@ -48,10 +48,10 @@ use tokio_util::compat::FuturesAsyncReadCompatExt; use tracing::{instrument, warn}; use crate::debouncer::DebouncedStorage; -use crate::storage::{BulkDeleteError, DeleteFailure, SendableAsync}; +use crate::storage::SendableAsync; use crate::{ - MultiPartPolicy, PutPayload, Storage, StorageError, StorageErrorKind, StorageFactory, - StorageResolverError, StorageResult, STORAGE_METRICS, + BulkDeleteError, DeleteFailure, MultiPartPolicy, PutPayload, Storage, StorageError, + StorageErrorKind, StorageFactory, StorageResolverError, StorageResult, STORAGE_METRICS, }; /// Azure object storage resolver. diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 726bae47874..e22ee01dfef 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -45,10 +45,10 @@ use tokio::sync::Semaphore; use tracing::{info, instrument, warn}; use crate::object_storage::MultiPartPolicy; -use crate::storage::{BulkDeleteError, DeleteFailure, SendableAsync}; +use crate::storage::SendableAsync; use crate::{ - OwnedBytes, Storage, StorageError, StorageErrorKind, StorageResolverError, StorageResult, - STORAGE_METRICS, + BulkDeleteError, DeleteFailure, OwnedBytes, Storage, StorageError, StorageErrorKind, + StorageResolverError, StorageResult, STORAGE_METRICS, }; /// Semaphore to limit the number of concurent requests to the object store. Some object stores diff --git a/quickwit/quickwit-storage/src/prefix_storage.rs b/quickwit/quickwit-storage/src/prefix_storage.rs index 7955353bf38..37084361897 100644 --- a/quickwit/quickwit-storage/src/prefix_storage.rs +++ b/quickwit/quickwit-storage/src/prefix_storage.rs @@ -25,8 +25,8 @@ use std::sync::Arc; use async_trait::async_trait; use quickwit_common::uri::Uri; -use crate::storage::{BulkDeleteError, SendableAsync}; -use crate::{OwnedBytes, Storage}; +use crate::storage::SendableAsync; +use crate::{BulkDeleteError, OwnedBytes, Storage}; /// This storage acts as a proxy to another storage that simply modifies each API call /// by preceding each path with a given a prefix. @@ -180,7 +180,7 @@ mod tests { use std::collections::HashMap; use super::*; - use crate::storage::DeleteFailure; + use crate::DeleteFailure; #[test] fn test_strip_prefix_from_error() { diff --git a/quickwit/quickwit-storage/src/ram_storage.rs b/quickwit/quickwit-storage/src/ram_storage.rs index c2bd99ab8b7..d1d88794e13 100644 --- a/quickwit/quickwit-storage/src/ram_storage.rs +++ b/quickwit/quickwit-storage/src/ram_storage.rs @@ -30,9 +30,10 @@ use tokio::io::AsyncWriteExt; use tokio::sync::RwLock; use crate::prefix_storage::add_prefix_to_storage; -use crate::storage::{BulkDeleteError, SendableAsync}; +use crate::storage::SendableAsync; use crate::{ - OwnedBytes, Storage, StorageErrorKind, StorageFactory, StorageResolverError, StorageResult, + BulkDeleteError, OwnedBytes, Storage, StorageErrorKind, StorageFactory, StorageResolverError, + StorageResult, }; /// In Ram implementation of quickwit's storage. diff --git a/quickwit/quickwit-storage/src/storage.rs b/quickwit/quickwit-storage/src/storage.rs index 3f9057fb506..12ba5604a50 100644 --- a/quickwit/quickwit-storage/src/storage.rs +++ b/quickwit/quickwit-storage/src/storage.rs @@ -17,16 +17,15 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::HashMap; use std::fmt; use std::ops::Range; -use std::path::{Path, PathBuf}; +use std::path::Path; use async_trait::async_trait; use quickwit_common::uri::Uri; use tokio::io::AsyncWrite; -use crate::{OwnedBytes, PutPayload, StorageError, StorageErrorKind, StorageResult}; +use crate::{BulkDeleteError, OwnedBytes, PutPayload, StorageErrorKind, StorageResult}; /// This trait is only used to make it build trait object with `AsyncWrite + Send + Unpin`. pub trait SendableAsync: AsyncWrite + Send + Unpin {} @@ -126,51 +125,3 @@ pub trait Storage: fmt::Debug + Send + Sync + 'static { /// Returns an URI identifying the storage fn uri(&self) -> &Uri; } - -/// Error returned by `bulk_delete`. Under the hood, `bulk_delete` groups the files to -/// delete into multiple batches of fixed size and issues one delete objects request per batch. The -/// whole operation can fail in multiples ways, which is reflected by the quirckiness of the API of -/// [`BulkDeleteError`]. First, a batch can fail partially, i.e. some objects are deleted while -/// others are not. The `successes` and `failures` attributes of the error will be populated -/// accordingly. Second, a batch can fail completely, in which case the `error` field will be set. -/// Because a batch failing entirely usually indicates a systemic error, for instance, a connection -/// or credentials issue, `bulk_delete` does not attempt to delete the remaining batches and -/// populates the `unattempted` attribute. Consequently, the attributes of this error are not -/// "mutually exclusive": there exists a path where all those fields are not empty. The caller is -/// expected to handle this error carefully and inspect the instance thoroughly before any retry -/// attempt. -#[must_use] -#[derive(Debug, Default, thiserror::Error)] -pub struct BulkDeleteError { - /// Error that occurred for a whole batch and caused the entire deletion operation to be - /// aborted. - pub error: Option, - /// List of files that were successfully deleted, including non-existing files. - pub successes: Vec, - /// List of files that failed to be deleted along with the corresponding failure descriptions. - pub failures: HashMap, - /// List of remaining files to delete before the operation was aborted. - pub unattempted: Vec, -} - -#[derive(Debug, Default)] -pub struct DeleteFailure { - pub error: Option, - /// The failure code is a string that uniquely identifies an error condition. It is meant to be - /// read and understood by programs that detect and handle errors by type. - pub code: Option, - pub message: Option, -} - -impl fmt::Display for BulkDeleteError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "Bulk delete error ({} success(es), {} failure(s), {} unattempted)", - self.successes.len(), - self.failures.len(), - self.unattempted.len() - )?; - Ok(()) - } -}