Skip to content

Commit

Permalink
Readability delete chunk2 (#4969)
Browse files Browse the repository at this point in the history
* attempt to simplify the code of delete bulk requests

* CR comments
  • Loading branch information
fulmicoton authored May 10, 2024
1 parent 8b91fad commit 894188f
Showing 1 changed file with 81 additions and 64 deletions.
145 changes: 81 additions & 64 deletions quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use std::{env, fmt, io};

use anyhow::anyhow;
use anyhow::{anyhow, Context as AnyhhowContext};
use async_trait::async_trait;
use aws_credential_types::provider::SharedCredentialsProvider;
use aws_sdk_s3::config::{BehaviorVersion, Credentials, Region};
use aws_sdk_s3::error::{ProvideErrorMetadata, SdkError};
use aws_sdk_s3::operation::delete_objects::DeleteObjectsOutput;
use aws_sdk_s3::operation::get_object::{GetObjectError, GetObjectOutput};
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::types::builders::ObjectIdentifierBuilder;
use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier};
use aws_sdk_s3::Client as S3Client;
use base64::prelude::{Engine, BASE64_STANDARD};
Expand Down Expand Up @@ -378,6 +380,40 @@ impl S3CompatibleObjectStorage {
Ok(parts)
}

fn build_delete_batch_requests<'a>(
&self,
delete_paths: &'a [&'a Path],
) -> anyhow::Result<Vec<(&'a [&'a Path], Delete)>> {
#[cfg(test)]
const MAX_NUM_KEYS: usize = 3;

#[cfg(not(test))]
const MAX_NUM_KEYS: usize = 1_000;

let path_chunks = delete_paths.chunks(MAX_NUM_KEYS);
let num_delete_requests = path_chunks.len();
let mut delete_requests: Vec<(&[&Path], Delete)> = Vec::with_capacity(num_delete_requests);

for path_chunk in path_chunks {
let object_ids: Vec<ObjectIdentifier> = path_chunk
.iter()
.map(|path| {
let key = self.key(path);
ObjectIdentifierBuilder::default()
.key(key)
.build()
.context("failed to build object identifier")
})
.collect::<anyhow::Result<_>>()?;
let delete = Delete::builder()
.set_objects(Some(object_ids))
.build()
.context("failed to build delete request")?;
delete_requests.push((path_chunk, delete));
}
Ok(delete_requests)
}

async fn upload_part<'a>(
&'a self,
upload_id: MultipartUploadId,
Expand Down Expand Up @@ -583,64 +619,38 @@ impl S3CompatibleObjectStorage {
/// API: <https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html>
async fn bulk_delete_multi<'a>(&self, paths: &[&'a Path]) -> Result<(), BulkDeleteError> {
let _permit = REQUEST_SEMAPHORE.acquire().await;

let delete_requests: Vec<(&[&Path], Delete)> = self
.build_delete_batch_requests(paths)
.map_err(|error: anyhow::Error| {
let unattempted = paths.iter().copied().map(Path::to_path_buf).collect();
BulkDeleteError {
error: Some(StorageErrorKind::Internal.with_error(error)),
successes: Default::default(),
failures: Default::default(),
unattempted,
}
})?;

let mut error = None;
let mut successes = Vec::with_capacity(paths.len());
let mut failures = HashMap::new();
let mut unattempted = Vec::new();

#[cfg(test)]
const MAX_NUM_KEYS: usize = 3;

#[cfg(not(test))]
const MAX_NUM_KEYS: usize = 1_000;

let Ok(path_and_oid) = paths
.iter()
.map(|path| {
ObjectIdentifier::builder()
.key(self.key(path))
.build()
.map(|oid| (path, oid))
})
.collect::<Result<Vec<_>, _>>()
else {
error = Some(
StorageErrorKind::Internal.with_error(anyhow!("failed to build object identifier")),
);
unattempted.extend(paths.iter().map(|path| path.to_path_buf()));
return Err(BulkDeleteError {
error,
successes,
failures,
unattempted,
});
};

for chunk in path_and_oid.chunks(MAX_NUM_KEYS) {
if error.is_some() {
unattempted.extend(chunk.iter().map(|(path, _oid)| path.to_path_buf()));
continue;
}
let Ok(delete) = Delete::builder()
.set_objects(Some(chunk.iter().map(|(_path, oid)| oid.clone()).collect()))
.build()
else {
error = Some(
StorageErrorKind::Internal
.with_error(anyhow!("failed to build delete request")),
);
unattempted.extend(chunk.iter().map(|(path, _oid)| path.to_path_buf()));
continue;
};
let delete_objects_res = aws_retry(&self.retry_params, || async {
self.s3_client
.delete_objects()
.bucket(self.bucket.clone())
.delete(delete.clone())
.send()
.await
})
.await;
let mut delete_requests_it = delete_requests.iter();

for (path_chunk, delete) in &mut delete_requests_it {
let delete_objects_res: StorageResult<DeleteObjectsOutput> =
aws_retry(&self.retry_params, || async {
self.s3_client
.delete_objects()
.bucket(self.bucket.clone())
.delete(delete.clone())
.send()
.await
})
.await
.map_err(Into::into);

match delete_objects_res {
Ok(delete_objects_output) => {
Expand Down Expand Up @@ -674,21 +684,28 @@ impl S3CompatibleObjectStorage {
}
}
Err(delete_objects_error) => {
error = Some(delete_objects_error.into());
unattempted.extend(chunk.iter().map(|(path, _oid)| path.to_path_buf()));
error = Some(delete_objects_error);
unattempted.extend(path_chunk.iter().copied().map(PathBuf::from));
break;
}
}
}

if error.is_none() && failures.is_empty() {
Ok(())
} else {
Err(BulkDeleteError {
error,
successes,
failures,
unattempted,
})
return Ok(());
}

// Do we have remaining requests?
for (path_chunk, _) in delete_requests_it {
unattempted.extend(path_chunk.iter().copied().map(PathBuf::from));
}

Err(BulkDeleteError {
error,
successes,
failures,
unattempted,
})
}
}

Expand Down

0 comments on commit 894188f

Please sign in to comment.