Skip to content

Commit

Permalink
Do not ignore storage errors silently when deleting splits
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Jul 14, 2023
1 parent ad28cfb commit a806a9f
Show file tree
Hide file tree
Showing 24 changed files with 464 additions and 323 deletions.
4 changes: 2 additions & 2 deletions quickwit/quickwit-cli/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -891,8 +891,8 @@ pub async fn delete_index_cli(args: DeleteIndexArgs) -> anyhow::Result<()> {
"The following files will be removed from the index `{}`",
args.index_id
);
for file_entry in affected_files {
println!(" - {}", file_entry.file_name);
for split_info in affected_files {
println!(" - {}", split_info.file_name.display());
}
return Ok(());
}
Expand Down
20 changes: 10 additions & 10 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,47 +510,47 @@ pub async fn garbage_collect_index_cli(args: GarbageCollectIndexArgs) -> anyhow:
let removal_info = index_service
.garbage_collect_index(&args.index_id, args.grace_period, args.dry_run)
.await?;
if removal_info.removed_split_entries.is_empty() && removal_info.failed_split_ids.is_empty() {
if removal_info.removed_split_entries.is_empty() && removal_info.failed_splits.is_empty() {
println!("No dangling files to garbage collect.");
return Ok(());
}

if args.dry_run {
println!("The following files will be garbage collected.");
for file_entry in removal_info.removed_split_entries {
println!(" - {}", file_entry.file_name);
for split_info in removal_info.removed_split_entries {
println!(" - {}", split_info.file_name.display());
}
return Ok(());
}

if !removal_info.failed_split_ids.is_empty() {
if !removal_info.failed_splits.is_empty() {
println!("The following splits were attempted to be removed, but failed.");
for split_id in removal_info.failed_split_ids.iter() {
println!(" - {split_id}");
for split_info in &removal_info.failed_splits {
println!(" - {}", split_info.split_id);
}
println!(
"{} Splits were unable to be removed.",
removal_info.failed_split_ids.len()
removal_info.failed_splits.len()
);
}

let deleted_bytes: u64 = removal_info
.removed_split_entries
.iter()
.map(|entry| entry.file_size_in_bytes)
.map(|split_info| split_info.file_size_bytes.get_bytes())
.sum();
println!(
"{}MB of storage garbage collected.",
deleted_bytes / 1_000_000
);

if removal_info.failed_split_ids.is_empty() {
if removal_info.failed_splits.is_empty() {
println!(
"{} Index successfully garbage collected.",
"✔".color(GREEN_COLOR)
);
} else if removal_info.removed_split_entries.is_empty()
&& !removal_info.failed_split_ids.is_empty()
&& !removal_info.failed_splits.is_empty()
{
println!("{} Failed to garbage collect index.", "✘".color(RED_COLOR));
} else {
Expand Down
29 changes: 0 additions & 29 deletions quickwit/quickwit-common/src/file_entry.rs

This file was deleted.

2 changes: 0 additions & 2 deletions quickwit/quickwit-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
mod coolid;

pub mod binary_heap;
mod file_entry;
pub mod fs;
pub mod io;
mod kill_switch;
Expand All @@ -49,7 +48,6 @@ use std::ops::{Range, RangeInclusive};
use std::str::FromStr;

pub use coolid::new_coolid;
pub use file_entry::FileEntry;
pub use kill_switch::KillSwitch;
pub use progress::{Progress, ProtectedZoneGuard};
pub use stream_utils::{BoxStream, ServiceStream};
Expand Down
31 changes: 14 additions & 17 deletions quickwit/quickwit-core/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ use std::sync::Arc;
use std::time::Duration;

use quickwit_common::fs::{empty_dir, get_cache_directory_path};
use quickwit_common::FileEntry;
use quickwit_config::{validate_identifier, IndexConfig, SourceConfig};
use quickwit_indexing::check_source_connectivity;
use quickwit_janitor::{
delete_splits_with_files, run_garbage_collect, SplitDeletionError, SplitRemovalInfo,
delete_splits_from_storage_and_metastore, run_garbage_collect, DeleteSplitsError,
SplitRemovalInfo,
};
use quickwit_metastore::{
IndexMetadata, ListSplitsQuery, Metastore, MetastoreError, SplitMetadata, SplitState,
IndexMetadata, ListSplitsQuery, Metastore, MetastoreError, SplitInfo, SplitMetadata, SplitState,
};
use quickwit_proto::{IndexUid, ServiceError, ServiceErrorCode};
use quickwit_storage::{StorageResolver, StorageResolverError};
Expand All @@ -43,7 +43,7 @@ pub enum IndexServiceError {
#[error("Metastore error `{0}`.")]
MetastoreError(#[from] MetastoreError),
#[error("Split deletion error `{0}`.")]
SplitDeletionError(#[from] SplitDeletionError),
SplitDeletionError(#[from] DeleteSplitsError),
#[error("Invalid config: {0:#}.")]
InvalidConfig(anyhow::Error),
#[error("Invalid identifier: {0}.")]
Expand Down Expand Up @@ -135,33 +135,29 @@ impl IndexService {
&self,
index_id: &str,
dry_run: bool,
) -> Result<Vec<FileEntry>, IndexServiceError> {
) -> Result<Vec<SplitInfo>, IndexServiceError> {
let index_metadata = self.metastore.index_metadata(index_id).await?;
let index_uid = index_metadata.index_uid.clone();
let index_uri = index_metadata.into_index_config().index_uri.clone();
let storage = self.storage_resolver.resolve(&index_uri).await?;

if dry_run {
let all_splits = self
let splits_to_delete = self
.metastore
.list_all_splits(index_uid.clone())
.await?
.into_iter()
.map(|metadata| metadata.split_metadata)
.map(|split| split.split_metadata.as_split_info())
.collect::<Vec<_>>();

let file_entries_to_delete: Vec<FileEntry> =
all_splits.iter().map(FileEntry::from).collect();
return Ok(file_entries_to_delete);
return Ok(splits_to_delete);
}

// Schedule staged and published splits for deletion.
let query = ListSplitsQuery::for_index(index_uid.clone())
.with_split_states([SplitState::Staged, SplitState::Published]);
let splits = self.metastore.list_splits(query).await?;
let split_ids = splits
.iter()
.map(|meta| meta.split_id())
.map(|split| split.split_id())
.collect::<Vec<_>>();
self.metastore
.mark_splits_for_deletion(index_uid.clone(), &split_ids)
Expand All @@ -175,10 +171,10 @@ impl IndexService {
.list_splits(query)
.await?
.into_iter()
.map(|metadata| metadata.split_metadata)
.map(|split| split.split_metadata)
.collect::<Vec<_>>();

let deleted_entries = delete_splits_with_files(
let deleted_splits = delete_splits_from_storage_and_metastore(
index_uid.clone(),
storage,
self.metastore.clone(),
Expand All @@ -187,7 +183,8 @@ impl IndexService {
)
.await?;
self.metastore.delete_index(index_uid).await?;
Ok(deleted_entries)

Ok(deleted_splits)
}

/// Detect all dangling splits and associated files from the index and removes them.
Expand Down Expand Up @@ -251,7 +248,7 @@ impl IndexService {
.map(|split| split.split_metadata)
.collect();
// FIXME: return an error.
if let Err(err) = delete_splits_with_files(
if let Err(err) = delete_splits_from_storage_and_metastore(
index_uid.clone(),
storage,
self.metastore.clone(),
Expand Down
55 changes: 0 additions & 55 deletions quickwit/quickwit-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,61 +17,6 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

#![deny(clippy::disallowed_methods)]

mod index;

pub use index::{clear_cache_directory, validate_storage_uri, IndexService, IndexServiceError};

#[cfg(test)]
mod tests {
use std::path::Path;

use quickwit_common::FileEntry;
use quickwit_indexing::TestSandbox;
use quickwit_storage::StorageResolver;

use crate::IndexService;

#[tokio::test]
async fn test_file_entry_from_split_and_index_delete() -> anyhow::Result<()> {
let index_id = "test-index";
let doc_mapping_yaml = r#"
field_mappings:
- name: title
type: text
- name: body
type: text
- name: url
type: text
"#;
let test_sandbox =
TestSandbox::create(index_id, doc_mapping_yaml, "{}", &["title", "body"]).await?;
test_sandbox.add_documents(vec![
serde_json::json!({"title": "snoopy", "body": "Snoopy is an anthropomorphic beagle[5] in the comic strip...", "url": "http://snoopy"}),
]).await?;
let splits = test_sandbox
.metastore()
.list_all_splits(test_sandbox.index_uid())
.await?
.into_iter()
.map(|metadata| metadata.split_metadata)
.collect::<Vec<_>>();
let file_entries: Vec<FileEntry> = splits.iter().map(FileEntry::from).collect();
assert_eq!(file_entries.len(), 1);
for file_entry in file_entries {
let split_num_bytes = test_sandbox
.storage()
.file_num_bytes(Path::new(file_entry.file_name.as_str()))
.await?;
assert_eq!(split_num_bytes, file_entry.file_size_in_bytes);
}
// Now delete the index.
let index_service =
IndexService::new(test_sandbox.metastore(), StorageResolver::unconfigured());
let deleted_file_entries = index_service.delete_index(index_id, false).await?;
assert_eq!(deleted_file_entries.len(), 1);
test_sandbox.assert_quit().await;
Ok(())
}
}
9 changes: 5 additions & 4 deletions quickwit/quickwit-janitor/src/actors/garbage_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashSet;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -134,7 +135,7 @@ impl GarbageCollector {
let deleted_file_entries = match gc_res {
Ok(removal_info) => {
self.counters.num_successful_gc_run_on_index += 1;
self.counters.num_failed_splits += removal_info.failed_split_ids.len();
self.counters.num_failed_splits += removal_info.failed_splits.len();
removal_info.removed_split_entries
}
Err(error) => {
Expand All @@ -145,9 +146,9 @@ impl GarbageCollector {
};
if !deleted_file_entries.is_empty() {
let num_deleted_splits = deleted_file_entries.len();
let deleted_files: HashSet<&str> = deleted_file_entries
let deleted_files: HashSet<&Path> = deleted_file_entries
.iter()
.map(|deleted_entry| deleted_entry.file_name.as_str())
.map(|deleted_entry| deleted_entry.file_name.as_path())
.take(5)
.collect();
info!(
Expand All @@ -160,7 +161,7 @@ impl GarbageCollector {
self.counters.num_deleted_files += deleted_file_entries.len();
self.counters.num_deleted_bytes += deleted_file_entries
.iter()
.map(|entry| entry.file_size_in_bytes as usize)
.map(|entry| entry.file_size_bytes.get_bytes() as usize)
.sum::<usize>();
}
}
Expand Down
Loading

0 comments on commit a806a9f

Please sign in to comment.