From 0820c90c3af8192d09adbb3a1a851b3c4fc59587 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Wed, 4 Sep 2024 15:18:43 +0200 Subject: [PATCH] memoize s3 client (#5377) * cache S3Client in factory * remove unused constructor for s3 storage --- .../object_storage/s3_compatible_storage.rs | 39 +++++++++---------- .../s3_compatible_storage_resolver.rs | 23 ++++++++++- 2 files changed, 40 insertions(+), 22 deletions(-) diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 335747fe638..95a030e560f 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -127,7 +127,7 @@ fn get_region(s3_storage_config: &S3StorageConfig) -> Option { }) } -async fn create_s3_client(s3_storage_config: &S3StorageConfig) -> S3Client { +pub async fn create_s3_client(s3_storage_config: &S3StorageConfig) -> S3Client { let aws_config = get_aws_config().await; let credentials_provider = get_credentials_provider(s3_storage_config).or(aws_config.credentials_provider()); @@ -155,21 +155,33 @@ async fn create_s3_client(s3_storage_config: &S3StorageConfig) -> S3Client { } impl S3CompatibleObjectStorage { - /// Creates an object storage given a region and a bucket name. - pub async fn new( + /// Creates an object storage given a region and an uri. + pub async fn from_uri( s3_storage_config: &S3StorageConfig, - uri: Uri, - bucket: String, + uri: &Uri, ) -> Result { let s3_client = create_s3_client(s3_storage_config).await; + Self::from_uri_and_client(s3_storage_config, uri, s3_client).await + } + + /// Creates an object storage given a region, an uri and an S3 client. + pub async fn from_uri_and_client( + s3_storage_config: &S3StorageConfig, + uri: &Uri, + s3_client: S3Client, + ) -> Result { + let (bucket, prefix) = parse_s3_uri(uri).ok_or_else(|| { + let message = format!("failed to extract bucket name from S3 URI: {uri}"); + StorageResolverError::InvalidUri(message) + })?; let retry_params = RetryParams::aggressive(); let disable_multi_object_delete = s3_storage_config.disable_multi_object_delete; let disable_multipart_upload = s3_storage_config.disable_multipart_upload; Ok(Self { s3_client, - uri, + uri: uri.clone(), bucket, - prefix: PathBuf::new(), + prefix, multipart_policy: MultiPartPolicy::default(), retry_params, disable_multi_object_delete, @@ -177,19 +189,6 @@ impl S3CompatibleObjectStorage { }) } - /// Creates an object storage given a region and an uri. - pub async fn from_uri( - s3_storage_config: &S3StorageConfig, - uri: &Uri, - ) -> Result { - let (bucket, prefix) = parse_s3_uri(uri).ok_or_else(|| { - let message = format!("failed to extract bucket name from S3 URI: {uri}"); - StorageResolverError::InvalidUri(message) - })?; - let storage = Self::new(s3_storage_config, uri.clone(), bucket).await?; - Ok(storage.with_prefix(prefix)) - } - /// Sets a specific for all buckets. /// /// This method overrides any existing prefix. (It does NOT diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage_resolver.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage_resolver.rs index 580f4bf89c4..ae746f0a26b 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage_resolver.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage_resolver.rs @@ -20,9 +20,12 @@ use std::sync::Arc; use async_trait::async_trait; +use aws_sdk_s3::Client as S3Client; use quickwit_common::uri::Uri; use quickwit_config::{S3StorageConfig, StorageBackend}; +use tokio::sync::OnceCell; +use super::s3_compatible_storage::create_s3_client; use crate::{ DebouncedStorage, S3CompatibleObjectStorage, Storage, StorageFactory, StorageResolverError, }; @@ -30,12 +33,21 @@ use crate::{ /// S3 compatible object storage resolver. pub struct S3CompatibleObjectStorageFactory { storage_config: S3StorageConfig, + // we cache the S3Client so we don't rebuild one every time we build a new Storage (for + // every search query). + // We don't build it in advance because we don't know if this factory is one that will + // end up being used, or if something like azure, gcs, or even local files, will be used + // instead. + s3_client: OnceCell, } impl S3CompatibleObjectStorageFactory { /// Creates a new S3-compatible storage factory. pub fn new(storage_config: S3StorageConfig) -> Self { - Self { storage_config } + Self { + storage_config, + s3_client: OnceCell::new(), + } } } @@ -46,7 +58,14 @@ impl StorageFactory for S3CompatibleObjectStorageFactory { } async fn resolve(&self, uri: &Uri) -> Result, StorageResolverError> { - let storage = S3CompatibleObjectStorage::from_uri(&self.storage_config, uri).await?; + let s3_client = self + .s3_client + .get_or_init(|| create_s3_client(&self.storage_config)) + .await + .clone(); + let storage = + S3CompatibleObjectStorage::from_uri_and_client(&self.storage_config, uri, s3_client) + .await?; Ok(Arc::new(DebouncedStorage::new(storage))) } }