Skip to content

Commit

Permalink
Polish bulk delete
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo committed Jan 9, 2024
1 parent 1fd4fcc commit be447f3
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 13 deletions.
11 changes: 3 additions & 8 deletions quickwit/quickwit-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,14 +327,9 @@ pub(crate) mod test_suite {
test_write_and_delete(storage)
.await
.context("write_and_delete")?;
// Fake GCS Server doesn't support bulk delete correctly.
// ref: <https://github.com/fsouza/fake-gcs-server/issues/1443>
#[cfg(not(feature = "google"))]
{
crate::test_suite::test_write_and_bulk_delete(storage)
.await
.context("write_and_bulk_delete")?;
}
test_write_and_bulk_delete(storage)
.await
.context("write_and_bulk_delete")?;
test_exists(storage).await.context("exists")?;
test_write_and_delete_with_dir_separator(storage)
.await
Expand Down
50 changes: 46 additions & 4 deletions quickwit/quickwit-storage/src/opendal_storage/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use std::fmt;
use std::ops::Range;
use std::path::Path;
use std::path::{Path, PathBuf};

use async_trait::async_trait;
use bytesize::ByteSize;
Expand All @@ -29,8 +29,8 @@ use tokio::io::{AsyncRead, AsyncWriteExt};

use crate::storage::SendableAsync;
use crate::{
BulkDeleteError, OwnedBytes, PutPayload, Storage, StorageError, StorageErrorKind,
StorageResolverError, StorageResult,
BulkDeleteError, DeleteFailure, OwnedBytes, PutPayload, Storage, StorageError,
StorageErrorKind, StorageResolverError, StorageResult,
};

/// OpenDAL based storage implementation.
Expand Down Expand Up @@ -134,10 +134,52 @@ impl Storage for OpendalStorage {
}

async fn bulk_delete<'a>(&self, paths: &[&'a Path]) -> Result<(), BulkDeleteError> {
let paths = paths
// The mock service we used in integration testsuite doesn't support bucket delete.
// Let's fallback to delete one by one in this case.
#[cfg(feature = "integration-testsuite")]
{
let storage_info = self.op.info();
if storage_info.name() == "sample-bucket"
&& storage_info.scheme() == opendal::Scheme::Gcs
{
let mut bulk_error = BulkDeleteError::default();
for (index, path) in paths.iter().enumerate() {
let result = self.op.delete(&path.as_os_str().to_string_lossy()).await;
if let Err(err) = result {
let storage_error_kind = err.kind();
let storage_error: StorageError = err.into();
bulk_error.failures.insert(
PathBuf::from(&path),
DeleteFailure {
code: Some(storage_error_kind.to_string()),
message: Some(storage_error.to_string()),
error: Some(storage_error.clone()),
},
);
bulk_error.error = Some(storage_error);
for path in paths[index..].iter() {
bulk_error.unattempted.push(PathBuf::from(&path))
}
break;
} else {
bulk_error.successes.push(PathBuf::from(&path))
}
}

return if bulk_error.error.is_some() {
Err(bulk_error)
} else {
Ok(())
};
}
}

let paths: Vec<String> = paths
.iter()
.map(|path| path.as_os_str().to_string_lossy().to_string())
.collect();

// OpenDAL will check the services' capability internally.
self.op.remove(paths).await.map_err(|err| BulkDeleteError {
error: Some(err.into()),
..BulkDeleteError::default()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl StorageFactory for GoogleCloudStorageFactory {
pub fn new_emulated_google_cloud_storage(
uri: &Uri,
) -> Result<OpendalStorage, StorageResolverError> {
let (bucket, root) = parse_google_uri(&uri).expect("must be valid google uri");
let (bucket, root) = parse_google_uri(uri).expect("must be valid google uri");

let mut cfg = opendal::services::Gcs::default();
cfg.bucket(&bucket);
Expand Down

0 comments on commit be447f3

Please sign in to comment.