Skip to content

Commit

Permalink
memoize s3 client (#5377)
Browse files Browse the repository at this point in the history
* cache S3Client in factory

* remove unused constructor for s3 storage
  • Loading branch information
trinity-1686a authored Sep 4, 2024
1 parent aac8b49 commit 0820c90
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ fn get_region(s3_storage_config: &S3StorageConfig) -> Option<Region> {
})
}

async fn create_s3_client(s3_storage_config: &S3StorageConfig) -> S3Client {
pub async fn create_s3_client(s3_storage_config: &S3StorageConfig) -> S3Client {
let aws_config = get_aws_config().await;
let credentials_provider =
get_credentials_provider(s3_storage_config).or(aws_config.credentials_provider());
Expand Down Expand Up @@ -155,41 +155,40 @@ async fn create_s3_client(s3_storage_config: &S3StorageConfig) -> S3Client {
}

impl S3CompatibleObjectStorage {
/// Creates an object storage given a region and a bucket name.
pub async fn new(
/// Creates an object storage given a region and an uri.
pub async fn from_uri(
s3_storage_config: &S3StorageConfig,
uri: Uri,
bucket: String,
uri: &Uri,
) -> Result<Self, StorageResolverError> {
let s3_client = create_s3_client(s3_storage_config).await;
Self::from_uri_and_client(s3_storage_config, uri, s3_client).await
}

/// Creates an object storage given a region, an uri and an S3 client.
pub async fn from_uri_and_client(
s3_storage_config: &S3StorageConfig,
uri: &Uri,
s3_client: S3Client,
) -> Result<Self, StorageResolverError> {
let (bucket, prefix) = parse_s3_uri(uri).ok_or_else(|| {
let message = format!("failed to extract bucket name from S3 URI: {uri}");
StorageResolverError::InvalidUri(message)
})?;
let retry_params = RetryParams::aggressive();
let disable_multi_object_delete = s3_storage_config.disable_multi_object_delete;
let disable_multipart_upload = s3_storage_config.disable_multipart_upload;
Ok(Self {
s3_client,
uri,
uri: uri.clone(),
bucket,
prefix: PathBuf::new(),
prefix,
multipart_policy: MultiPartPolicy::default(),
retry_params,
disable_multi_object_delete,
disable_multipart_upload,
})
}

/// Creates an object storage given a region and an uri.
pub async fn from_uri(
s3_storage_config: &S3StorageConfig,
uri: &Uri,
) -> Result<Self, StorageResolverError> {
let (bucket, prefix) = parse_s3_uri(uri).ok_or_else(|| {
let message = format!("failed to extract bucket name from S3 URI: {uri}");
StorageResolverError::InvalidUri(message)
})?;
let storage = Self::new(s3_storage_config, uri.clone(), bucket).await?;
Ok(storage.with_prefix(prefix))
}

/// Sets a specific for all buckets.
///
/// This method overrides any existing prefix. (It does NOT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,34 @@
use std::sync::Arc;

use async_trait::async_trait;
use aws_sdk_s3::Client as S3Client;
use quickwit_common::uri::Uri;
use quickwit_config::{S3StorageConfig, StorageBackend};
use tokio::sync::OnceCell;

use super::s3_compatible_storage::create_s3_client;
use crate::{
DebouncedStorage, S3CompatibleObjectStorage, Storage, StorageFactory, StorageResolverError,
};

/// S3 compatible object storage resolver.
pub struct S3CompatibleObjectStorageFactory {
storage_config: S3StorageConfig,
// we cache the S3Client so we don't rebuild one every time we build a new Storage (for
// every search query).
// We don't build it in advance because we don't know if this factory is one that will
// end up being used, or if something like azure, gcs, or even local files, will be used
// instead.
s3_client: OnceCell<S3Client>,
}

impl S3CompatibleObjectStorageFactory {
/// Creates a new S3-compatible storage factory.
pub fn new(storage_config: S3StorageConfig) -> Self {
Self { storage_config }
Self {
storage_config,
s3_client: OnceCell::new(),
}
}
}

Expand All @@ -46,7 +58,14 @@ impl StorageFactory for S3CompatibleObjectStorageFactory {
}

async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError> {
let storage = S3CompatibleObjectStorage::from_uri(&self.storage_config, uri).await?;
let s3_client = self
.s3_client
.get_or_init(|| create_s3_client(&self.storage_config))
.await
.clone();
let storage =
S3CompatibleObjectStorage::from_uri_and_client(&self.storage_config, uri, s3_client)
.await?;
Ok(Arc::new(DebouncedStorage::new(storage)))
}
}

0 comments on commit 0820c90

Please sign in to comment.