Skip to content

Commit

Permalink
Do not ignore storage errors silently when deleting splits (#3636)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Jul 18, 2023
1 parent 436616c commit e45bf58
Show file tree
Hide file tree
Showing 26 changed files with 587 additions and 328 deletions.
21 changes: 12 additions & 9 deletions docs/reference/rest-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ GET [..]/search?query=barack%20obama

Successful requests return a 2xx HTTP status code.

Failed requests return a 4xx HTTP status code. The response body of failed requests holds a JSON object containing an `message` field that describes the error.
Failed requests return a 4xx HTTP status code. The response body of failed requests holds a JSON object containing a `message` field that describes the error.

```json
{
Expand All @@ -37,7 +37,7 @@ Failed requests return a 4xx HTTP status code. The response body of failed reque
### Search in an index

Search for documents matching a query in the given index `api/v1/<index id>/search`. This endpoint is available as long as you have at least one node running a searcher service in the cluster.
The search endpoint accepts `GET` and `POST` requests. The [parameters](#get-parameters) are URL parameters in case of `GET` or JSON key value pairs in case of `POST`.
The search endpoint accepts `GET` and `POST` requests. The [parameters](#get-parameters) are URL parameters for `GET` requests or JSON key-value pairs for `POST` requests.

```
GET api/v1/<index id>/search?query=searchterm
Expand Down Expand Up @@ -91,10 +91,10 @@ The response is a JSON object, and the content type is `application/json; charse
GET api/v1/<index id>/search/stream?query=searchterm&fast_field=my_id
```

Streams field values from ALL documents matching a search query in the given index `<index id>`, in a specified output format among the following:
Streams field values from ALL documents matching a search query in the target index `<index id>`, in a specified output format among the following:

- [CSV](https://datatracker.ietf.org/doc/html/rfc4180)
- [ClickHouse RowBinary](https://clickhouse.tech/docs/en/interfaces/formats/#rowbinary). If `partition_by_field` is set, Quickwit returns chunks of data for a each partition field value. Each chunk starts with 16 bytes being partition value and content length and then the `fast_field` values in `RowBinary` format.
- [ClickHouse RowBinary](https://clickhouse.tech/docs/en/interfaces/formats/#rowbinary). If `partition_by_field` is set, Quickwit returns chunks of data for each partition field value. Each chunk starts with 16 bytes being partition value and content length and then the `fast_field` values in `RowBinary` format.

`fast_field` and `partition_by_field` must be fast fields of type `i64` or `u64`.

Expand All @@ -104,7 +104,7 @@ This endpoint is available as long as you have at least one node running a searc

:::note

The endpoint will return 10 million values if 10 million documents match the query. This is expected, this endpoint is made to support queries matching millions of document and return field values in a reasonable response time.
The endpoint will return 10 million values if 10 million documents match the query. This is expected, this endpoint is made to support queries matching millions of documents and return field values in a reasonable response time.

:::

Expand Down Expand Up @@ -345,24 +345,27 @@ Delete index of ID `index id`.

#### Response

The response is the list of delete split files, and the content type is `application/json; charset=UTF-8.`
The response is the list of deleted split files; the content type is `application/json; charset=UTF-8.`

```json
[
{
"split_id": "01GK1XNAECH7P14850S9VV6P94",
"num_docs": 1337,
"uncompressed_docs_size_bytes": 23933408,
"file_name": "01GK1XNAECH7P14850S9VV6P94.split",
"file_size_in_bytes": 2991676
"file_size_bytes": 2991676
}
]
```

### Get all indexes metadatas
### Get all indexes metadata

```
GET api/v1/indexes
```

Get the indexes metadatas of all indexes present in the metastore.
Retrieve the metadata of all indexes present in the metastore.

#### Response

Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-cli/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -891,8 +891,8 @@ pub async fn delete_index_cli(args: DeleteIndexArgs) -> anyhow::Result<()> {
"The following files will be removed from the index `{}`",
args.index_id
);
for file_entry in affected_files {
println!(" - {}", file_entry.file_name);
for split_info in affected_files {
println!(" - {}", split_info.file_name.display());
}
return Ok(());
}
Expand Down
20 changes: 10 additions & 10 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,47 +509,47 @@ pub async fn garbage_collect_index_cli(args: GarbageCollectIndexArgs) -> anyhow:
let removal_info = index_service
.garbage_collect_index(&args.index_id, args.grace_period, args.dry_run)
.await?;
if removal_info.removed_split_entries.is_empty() && removal_info.failed_split_ids.is_empty() {
if removal_info.removed_split_entries.is_empty() && removal_info.failed_splits.is_empty() {
println!("No dangling files to garbage collect.");
return Ok(());
}

if args.dry_run {
println!("The following files will be garbage collected.");
for file_entry in removal_info.removed_split_entries {
println!(" - {}", file_entry.file_name);
for split_info in removal_info.removed_split_entries {
println!(" - {}", split_info.file_name.display());
}
return Ok(());
}

if !removal_info.failed_split_ids.is_empty() {
if !removal_info.failed_splits.is_empty() {
println!("The following splits were attempted to be removed, but failed.");
for split_id in removal_info.failed_split_ids.iter() {
println!(" - {split_id}");
for split_info in &removal_info.failed_splits {
println!(" - {}", split_info.split_id);
}
println!(
"{} Splits were unable to be removed.",
removal_info.failed_split_ids.len()
removal_info.failed_splits.len()
);
}

let deleted_bytes: u64 = removal_info
.removed_split_entries
.iter()
.map(|entry| entry.file_size_in_bytes)
.map(|split_info| split_info.file_size_bytes.get_bytes())
.sum();
println!(
"{}MB of storage garbage collected.",
deleted_bytes / 1_000_000
);

if removal_info.failed_split_ids.is_empty() {
if removal_info.failed_splits.is_empty() {
println!(
"{} Index successfully garbage collected.",
"✔".color(GREEN_COLOR)
);
} else if removal_info.removed_split_entries.is_empty()
&& !removal_info.failed_split_ids.is_empty()
&& !removal_info.failed_splits.is_empty()
{
println!("{} Failed to garbage collect index.", "✘".color(RED_COLOR));
} else {
Expand Down
29 changes: 0 additions & 29 deletions quickwit/quickwit-common/src/file_entry.rs

This file was deleted.

2 changes: 0 additions & 2 deletions quickwit/quickwit-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
mod coolid;

pub mod binary_heap;
mod file_entry;
pub mod fs;
pub mod io;
mod kill_switch;
Expand All @@ -49,7 +48,6 @@ use std::ops::{Range, RangeInclusive};
use std::str::FromStr;

pub use coolid::new_coolid;
pub use file_entry::FileEntry;
pub use kill_switch::KillSwitch;
pub use progress::{Progress, ProtectedZoneGuard};
pub use stream_utils::{BoxStream, ServiceStream};
Expand Down
3 changes: 3 additions & 0 deletions quickwit/quickwit-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ quickwit-storage = { workspace = true }
[dev-dependencies]
mockall = { workspace = true }
serde_yaml = { workspace = true }

quickwit-common = { workspace = true, features = ["testsuite"] }
quickwit-metastore = { workspace = true, features = ["testsuite"] }
127 changes: 110 additions & 17 deletions quickwit/quickwit-core/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ use std::sync::Arc;
use std::time::Duration;

use quickwit_common::fs::{empty_dir, get_cache_directory_path};
use quickwit_common::FileEntry;
use quickwit_config::{validate_identifier, IndexConfig, SourceConfig};
use quickwit_indexing::check_source_connectivity;
use quickwit_janitor::{
delete_splits_with_files, run_garbage_collect, SplitDeletionError, SplitRemovalInfo,
delete_splits_from_storage_and_metastore, run_garbage_collect, DeleteSplitsError,
SplitRemovalInfo,
};
use quickwit_metastore::{
IndexMetadata, ListSplitsQuery, Metastore, MetastoreError, SplitMetadata, SplitState,
IndexMetadata, ListSplitsQuery, Metastore, MetastoreError, SplitInfo, SplitMetadata, SplitState,
};
use quickwit_proto::{IndexUid, ServiceError, ServiceErrorCode};
use quickwit_storage::{StorageResolver, StorageResolverError};
Expand All @@ -43,7 +43,7 @@ pub enum IndexServiceError {
#[error("Metastore error `{0}`.")]
MetastoreError(#[from] MetastoreError),
#[error("Split deletion error `{0}`.")]
SplitDeletionError(#[from] SplitDeletionError),
SplitDeletionError(#[from] DeleteSplitsError),
#[error("Invalid config: {0:#}.")]
InvalidConfig(anyhow::Error),
#[error("Invalid identifier: {0}.")]
Expand Down Expand Up @@ -135,33 +135,29 @@ impl IndexService {
&self,
index_id: &str,
dry_run: bool,
) -> Result<Vec<FileEntry>, IndexServiceError> {
) -> Result<Vec<SplitInfo>, IndexServiceError> {
let index_metadata = self.metastore.index_metadata(index_id).await?;
let index_uid = index_metadata.index_uid.clone();
let index_uri = index_metadata.into_index_config().index_uri.clone();
let storage = self.storage_resolver.resolve(&index_uri).await?;

if dry_run {
let all_splits = self
let splits_to_delete = self
.metastore
.list_all_splits(index_uid.clone())
.await?
.into_iter()
.map(|metadata| metadata.split_metadata)
.map(|split| split.split_metadata.as_split_info())
.collect::<Vec<_>>();

let file_entries_to_delete: Vec<FileEntry> =
all_splits.iter().map(FileEntry::from).collect();
return Ok(file_entries_to_delete);
return Ok(splits_to_delete);
}

// Schedule staged and published splits for deletion.
let query = ListSplitsQuery::for_index(index_uid.clone())
.with_split_states([SplitState::Staged, SplitState::Published]);
let splits = self.metastore.list_splits(query).await?;
let split_ids = splits
.iter()
.map(|meta| meta.split_id())
.map(|split| split.split_id())
.collect::<Vec<_>>();
self.metastore
.mark_splits_for_deletion(index_uid.clone(), &split_ids)
Expand All @@ -175,10 +171,10 @@ impl IndexService {
.list_splits(query)
.await?
.into_iter()
.map(|metadata| metadata.split_metadata)
.map(|split| split.split_metadata)
.collect::<Vec<_>>();

let deleted_entries = delete_splits_with_files(
let deleted_splits = delete_splits_from_storage_and_metastore(
index_uid.clone(),
storage,
self.metastore.clone(),
Expand All @@ -187,7 +183,8 @@ impl IndexService {
)
.await?;
self.metastore.delete_index(index_uid).await?;
Ok(deleted_entries)

Ok(deleted_splits)
}

/// Detect all dangling splits and associated files from the index and removes them.
Expand Down Expand Up @@ -251,7 +248,7 @@ impl IndexService {
.map(|split| split.split_metadata)
.collect();
// FIXME: return an error.
if let Err(err) = delete_splits_with_files(
if let Err(err) = delete_splits_from_storage_and_metastore(
index_uid.clone(),
storage,
self.metastore.clone(),
Expand Down Expand Up @@ -351,3 +348,99 @@ pub async fn validate_storage_uri(
storage_resolver.resolve(&index_config.index_uri).await?;
Ok(())
}

#[cfg(test)]
mod tests {
use quickwit_common::uri::Uri;
use quickwit_metastore::metastore_for_test;
use quickwit_storage::PutPayload;

use super::*;

#[tokio::test]
async fn test_create_index() {
let metastore = metastore_for_test();
let storage_resolver = StorageResolver::ram_for_test();
let index_service = IndexService::new(metastore.clone(), storage_resolver);
let index_id = "test-index";
let index_uri = "ram://indexes/test-index";
let index_config = IndexConfig::for_test(index_id, index_uri);
let index_metadata_0 = index_service
.create_index(index_config.clone(), false)
.await
.unwrap();
assert_eq!(index_metadata_0.index_id(), index_id);
assert_eq!(index_metadata_0.index_uri(), &index_uri);
assert!(metastore.index_exists(index_id).await.unwrap());

let error = index_service
.create_index(index_config.clone(), false)
.await
.unwrap_err();
let IndexServiceError::MetastoreError(inner_error) = error else {
panic!("Expected `MetastoreError` variant, got {:?}", error)
};
assert!(
matches!(inner_error, MetastoreError::IndexAlreadyExists { index_id } if index_id == index_metadata_0.index_id())
);

let index_metadata_1 = index_service
.create_index(index_config, true)
.await
.unwrap();
assert_eq!(index_metadata_1.index_id(), index_id);
assert_eq!(index_metadata_1.index_uri(), &index_uri);
assert!(index_metadata_0.index_uid != index_metadata_1.index_uid);
}

#[tokio::test]
async fn test_delete_index() {
let metastore = metastore_for_test();
let storage_resolver = StorageResolver::ram_for_test();
let storage = storage_resolver
.resolve(&Uri::for_test("ram://indexes/test-index"))
.await
.unwrap();
let index_service = IndexService::new(metastore.clone(), storage_resolver);
let index_id = "test-index";
let index_uri = "ram://indexes/test-index";
let index_config = IndexConfig::for_test(index_id, index_uri);
let index_uid = index_service
.create_index(index_config.clone(), false)
.await
.unwrap()
.index_uid;

let split_id = "test-split";
let split_metadata = SplitMetadata {
split_id: split_id.to_string(),
index_uid: index_uid.clone(),
..Default::default()
};
metastore
.stage_splits(index_uid.clone(), vec![split_metadata.clone()])
.await
.unwrap();

let splits = metastore.list_all_splits(index_uid.clone()).await.unwrap();
assert_eq!(splits.len(), 1);

let split_path_str = format!("{}.split", split_id);
let split_path = Path::new(&split_path_str);
let payload: Box<dyn PutPayload> = Box::new(vec![0]);
storage.put(split_path, payload).await.unwrap();
assert!(storage.exists(split_path).await.unwrap());

let split_infos = index_service.delete_index(index_id, false).await.unwrap();
assert_eq!(split_infos.len(), 1);

let error = metastore
.list_all_splits(index_uid.clone())
.await
.unwrap_err();
assert!(
matches!(error, MetastoreError::IndexDoesNotExist { index_id } if index_id == index_uid.index_id())
);
assert!(!storage.exists(split_path).await.unwrap());
}
}
Loading

0 comments on commit e45bf58

Please sign in to comment.