diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index be98503b643..c030fffa4af 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -24,13 +24,15 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::{env, fmt, io}; -use anyhow::anyhow; +use anyhow::{anyhow, Context as AnyhhowContext}; use async_trait::async_trait; use aws_credential_types::provider::SharedCredentialsProvider; use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region}; use aws_sdk_s3::error::{ProvideErrorMetadata, SdkError}; +use aws_sdk_s3::operation::delete_objects::DeleteObjectsOutput; use aws_sdk_s3::operation::get_object::{GetObjectError, GetObjectOutput}; use aws_sdk_s3::primitives::ByteStream; +use aws_sdk_s3::types::builders::ObjectIdentifierBuilder; use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier}; use aws_sdk_s3::Client as S3Client; use base64::prelude::{Engine, BASE64_STANDARD}; @@ -378,6 +380,40 @@ impl S3CompatibleObjectStorage { Ok(parts) } + fn build_delete_batch_requests<'a>( + &self, + delete_paths: &'a [&'a Path], + ) -> anyhow::Result> { + #[cfg(test)] + const MAX_NUM_KEYS: usize = 3; + + #[cfg(not(test))] + const MAX_NUM_KEYS: usize = 1_000; + + let path_chunks = delete_paths.chunks(MAX_NUM_KEYS); + let num_delete_requests = path_chunks.len(); + let mut delete_requests: Vec<(&[&Path], Delete)> = Vec::with_capacity(num_delete_requests); + + for path_chunk in path_chunks { + let object_ids: Vec = path_chunk + .iter() + .map(|path| { + let key = self.key(path); + ObjectIdentifierBuilder::default() + .key(key) + .build() + .context("failed to build object identifier") + }) + .collect::>()?; + let delete = Delete::builder() + .set_objects(Some(object_ids)) + .build() + .context("failed to build delete request")?; + delete_requests.push((path_chunk, delete)); + } + Ok(delete_requests) + } + async fn upload_part<'a>( &'a self, upload_id: MultipartUploadId, @@ -583,64 +619,38 @@ impl S3CompatibleObjectStorage { /// API: async fn bulk_delete_multi<'a>(&self, paths: &[&'a Path]) -> Result<(), BulkDeleteError> { let _permit = REQUEST_SEMAPHORE.acquire().await; + + let delete_requests: Vec<(&[&Path], Delete)> = self + .build_delete_batch_requests(paths) + .map_err(|error: anyhow::Error| { + let unattempted = paths.iter().copied().map(Path::to_path_buf).collect(); + BulkDeleteError { + error: Some(StorageErrorKind::Internal.with_error(error)), + successes: Default::default(), + failures: Default::default(), + unattempted, + } + })?; + let mut error = None; let mut successes = Vec::with_capacity(paths.len()); let mut failures = HashMap::new(); let mut unattempted = Vec::new(); - #[cfg(test)] - const MAX_NUM_KEYS: usize = 3; - - #[cfg(not(test))] - const MAX_NUM_KEYS: usize = 1_000; - - let Ok(path_and_oid) = paths - .iter() - .map(|path| { - ObjectIdentifier::builder() - .key(self.key(path)) - .build() - .map(|oid| (path, oid)) - }) - .collect::, _>>() - else { - error = Some( - StorageErrorKind::Internal.with_error(anyhow!("failed to build object identifier")), - ); - unattempted.extend(paths.iter().map(|path| path.to_path_buf())); - return Err(BulkDeleteError { - error, - successes, - failures, - unattempted, - }); - }; - - for chunk in path_and_oid.chunks(MAX_NUM_KEYS) { - if error.is_some() { - unattempted.extend(chunk.iter().map(|(path, _oid)| path.to_path_buf())); - continue; - } - let Ok(delete) = Delete::builder() - .set_objects(Some(chunk.iter().map(|(_path, oid)| oid.clone()).collect())) - .build() - else { - error = Some( - StorageErrorKind::Internal - .with_error(anyhow!("failed to build delete request")), - ); - unattempted.extend(chunk.iter().map(|(path, _oid)| path.to_path_buf())); - continue; - }; - let delete_objects_res = aws_retry(&self.retry_params, || async { - self.s3_client - .delete_objects() - .bucket(self.bucket.clone()) - .delete(delete.clone()) - .send() - .await - }) - .await; + let mut delete_requests_it = delete_requests.iter(); + + for (path_chunk, delete) in &mut delete_requests_it { + let delete_objects_res: StorageResult = + aws_retry(&self.retry_params, || async { + self.s3_client + .delete_objects() + .bucket(self.bucket.clone()) + .delete(delete.clone()) + .send() + .await + }) + .await + .map_err(Into::into); match delete_objects_res { Ok(delete_objects_output) => { @@ -674,21 +684,28 @@ impl S3CompatibleObjectStorage { } } Err(delete_objects_error) => { - error = Some(delete_objects_error.into()); - unattempted.extend(chunk.iter().map(|(path, _oid)| path.to_path_buf())); + error = Some(delete_objects_error); + unattempted.extend(path_chunk.iter().copied().map(PathBuf::from)); + break; } } } + if error.is_none() && failures.is_empty() { - Ok(()) - } else { - Err(BulkDeleteError { - error, - successes, - failures, - unattempted, - }) + return Ok(()); + } + + // Do we have remaining requests? + for (path_chunk, _) in delete_requests_it { + unattempted.extend(path_chunk.iter().copied().map(PathBuf::from)); } + + Err(BulkDeleteError { + error, + successes, + failures, + unattempted, + }) } }