From be447f3b290d3f1798f77734780d1a8ac339b2c5 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 9 Jan 2024 16:12:36 +0800 Subject: [PATCH] Polish bulk delete Signed-off-by: Xuanwo --- quickwit/quickwit-storage/src/lib.rs | 11 ++-- .../src/opendal_storage/base.rs | 50 +++++++++++++++++-- .../opendal_storage/google_cloud_storage.rs | 2 +- 3 files changed, 50 insertions(+), 13 deletions(-) diff --git a/quickwit/quickwit-storage/src/lib.rs b/quickwit/quickwit-storage/src/lib.rs index 737f7a930c2..dbeee5134a1 100644 --- a/quickwit/quickwit-storage/src/lib.rs +++ b/quickwit/quickwit-storage/src/lib.rs @@ -327,14 +327,9 @@ pub(crate) mod test_suite { test_write_and_delete(storage) .await .context("write_and_delete")?; - // Fake GCS Server doesn't support bulk delete correctly. - // ref: - #[cfg(not(feature = "google"))] - { - crate::test_suite::test_write_and_bulk_delete(storage) - .await - .context("write_and_bulk_delete")?; - } + test_write_and_bulk_delete(storage) + .await + .context("write_and_bulk_delete")?; test_exists(storage).await.context("exists")?; test_write_and_delete_with_dir_separator(storage) .await diff --git a/quickwit/quickwit-storage/src/opendal_storage/base.rs b/quickwit/quickwit-storage/src/opendal_storage/base.rs index 41d34d7011d..ae3276ea5bb 100644 --- a/quickwit/quickwit-storage/src/opendal_storage/base.rs +++ b/quickwit/quickwit-storage/src/opendal_storage/base.rs @@ -19,7 +19,7 @@ use std::fmt; use std::ops::Range; -use std::path::Path; +use std::path::{Path, PathBuf}; use async_trait::async_trait; use bytesize::ByteSize; @@ -29,8 +29,8 @@ use tokio::io::{AsyncRead, AsyncWriteExt}; use crate::storage::SendableAsync; use crate::{ - BulkDeleteError, OwnedBytes, PutPayload, Storage, StorageError, StorageErrorKind, - StorageResolverError, StorageResult, + BulkDeleteError, DeleteFailure, OwnedBytes, PutPayload, Storage, StorageError, + StorageErrorKind, StorageResolverError, StorageResult, }; /// OpenDAL based storage implementation. @@ -134,10 +134,52 @@ impl Storage for OpendalStorage { } async fn bulk_delete<'a>(&self, paths: &[&'a Path]) -> Result<(), BulkDeleteError> { - let paths = paths + // The mock service we used in integration testsuite doesn't support bucket delete. + // Let's fallback to delete one by one in this case. + #[cfg(feature = "integration-testsuite")] + { + let storage_info = self.op.info(); + if storage_info.name() == "sample-bucket" + && storage_info.scheme() == opendal::Scheme::Gcs + { + let mut bulk_error = BulkDeleteError::default(); + for (index, path) in paths.iter().enumerate() { + let result = self.op.delete(&path.as_os_str().to_string_lossy()).await; + if let Err(err) = result { + let storage_error_kind = err.kind(); + let storage_error: StorageError = err.into(); + bulk_error.failures.insert( + PathBuf::from(&path), + DeleteFailure { + code: Some(storage_error_kind.to_string()), + message: Some(storage_error.to_string()), + error: Some(storage_error.clone()), + }, + ); + bulk_error.error = Some(storage_error); + for path in paths[index..].iter() { + bulk_error.unattempted.push(PathBuf::from(&path)) + } + break; + } else { + bulk_error.successes.push(PathBuf::from(&path)) + } + } + + return if bulk_error.error.is_some() { + Err(bulk_error) + } else { + Ok(()) + }; + } + } + + let paths: Vec = paths .iter() .map(|path| path.as_os_str().to_string_lossy().to_string()) .collect(); + + // OpenDAL will check the services' capability internally. self.op.remove(paths).await.map_err(|err| BulkDeleteError { error: Some(err.into()), ..BulkDeleteError::default() diff --git a/quickwit/quickwit-storage/src/opendal_storage/google_cloud_storage.rs b/quickwit/quickwit-storage/src/opendal_storage/google_cloud_storage.rs index 5377fa93ccf..2824f1b2a6a 100644 --- a/quickwit/quickwit-storage/src/opendal_storage/google_cloud_storage.rs +++ b/quickwit/quickwit-storage/src/opendal_storage/google_cloud_storage.rs @@ -59,7 +59,7 @@ impl StorageFactory for GoogleCloudStorageFactory { pub fn new_emulated_google_cloud_storage( uri: &Uri, ) -> Result { - let (bucket, root) = parse_google_uri(&uri).expect("must be valid google uri"); + let (bucket, root) = parse_google_uri(uri).expect("must be valid google uri"); let mut cfg = opendal::services::Gcs::default(); cfg.bucket(&bucket);