From bfff12ac2d8b19df997a218dcab16263ce4e7aa6 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Thu, 3 Aug 2023 15:11:29 -1000 Subject: [PATCH] Refactor storage factory initialization For #3443 I need to be able to perform initialization on storage factory level and in order to do that I need access to the config parameters during initialization rather than during storage resolution. This PR moves the storage config parameters to the storage initializer. --- quickwit/quickwit-cli/src/lib.rs | 9 +- quickwit/quickwit-storage/src/lib.rs | 4 +- .../src/local_file_storage.rs | 19 +-- .../src/object_storage/azure_blob_storage.rs | 22 +-- .../s3_compatible_storage_resolver.rs | 22 +-- quickwit/quickwit-storage/src/ram_storage.rs | 34 +---- .../quickwit-storage/src/storage_factory.rs | 14 +- .../quickwit-storage/src/storage_resolver.rs | 131 +++++------------- 8 files changed, 76 insertions(+), 179 deletions(-) diff --git a/quickwit/quickwit-cli/src/lib.rs b/quickwit/quickwit-cli/src/lib.rs index de45b37d934..0f35012e4e2 100644 --- a/quickwit/quickwit-cli/src/lib.rs +++ b/quickwit/quickwit-cli/src/lib.rs @@ -442,14 +442,7 @@ mod tests { }; let storage_configs = StorageConfigs::new(vec![s3_storage_config.into()]); let metastore_configs = MetastoreConfigs::default(); - let (storage_resolver, _metastore_resolver) = + let (_storage_resolver, _metastore_resolver) = get_resolvers(&storage_configs, &metastore_configs); - assert!( - storage_resolver - .storage_configs() - .find_s3() - .unwrap() - .force_path_style_access - ); } } diff --git a/quickwit/quickwit-storage/src/lib.rs b/quickwit/quickwit-storage/src/lib.rs index 90ba89d3fca..baeefcb1866 100644 --- a/quickwit/quickwit-storage/src/lib.rs +++ b/quickwit/quickwit-storage/src/lib.rs @@ -122,14 +122,12 @@ pub use for_test::storage_for_test; mod tests { use std::str::FromStr; - use quickwit_config::FileStorageConfig; - use super::*; #[tokio::test] async fn test_load_file() { let storage_resolver = StorageResolver::builder() - .register(LocalFileStorageFactory, FileStorageConfig::default().into()) + .register(LocalFileStorageFactory) .build() .unwrap(); let expected_bytes = tokio::fs::read_to_string("Cargo.toml").await.unwrap(); diff --git a/quickwit/quickwit-storage/src/local_file_storage.rs b/quickwit/quickwit-storage/src/local_file_storage.rs index 4a88a42a0ca..3b617e3a977 100644 --- a/quickwit/quickwit-storage/src/local_file_storage.rs +++ b/quickwit/quickwit-storage/src/local_file_storage.rs @@ -29,7 +29,7 @@ use futures::future::{BoxFuture, FutureExt}; use futures::StreamExt; use quickwit_common::ignore_error_kind; use quickwit_common::uri::Uri; -use quickwit_config::{StorageBackend, StorageConfig}; +use quickwit_config::StorageBackend; use tokio::fs; use tokio::io::AsyncWriteExt; use tracing::warn; @@ -345,11 +345,7 @@ impl StorageFactory for LocalFileStorageFactory { StorageBackend::File } - async fn resolve( - &self, - _storage_config: &StorageConfig, - uri: &Uri, - ) -> Result, StorageResolverError> { + async fn resolve(&self, uri: &Uri) -> Result, StorageResolverError> { let storage = LocalFileStorage::from_uri(uri)?; Ok(Arc::new(DebouncedStorage::new(storage))) } @@ -360,8 +356,6 @@ mod tests { use std::str::FromStr; - use quickwit_config::FileStorageConfig; - use super::*; use crate::test_suite::storage_test_suite; @@ -395,25 +389,22 @@ mod tests { #[tokio::test] async fn test_local_file_storage_factory() -> anyhow::Result<()> { - let storage_config = FileStorageConfig::default().into(); let temp_dir = tempfile::tempdir()?; let index_uri = Uri::from_well_formed(format!("file://{}/foo/bar", temp_dir.path().display())); let local_file_storage_factory = LocalFileStorageFactory::default(); - let local_file_storage = local_file_storage_factory - .resolve(&storage_config, &index_uri) - .await?; + let local_file_storage = local_file_storage_factory.resolve(&index_uri).await?; assert_eq!(local_file_storage.uri(), &index_uri); let err = local_file_storage_factory - .resolve(&storage_config, &Uri::from_well_formed("s3://foo/bar")) + .resolve(&Uri::from_well_formed("s3://foo/bar")) .await .err() .unwrap(); assert!(matches!(err, StorageResolverError::InvalidUri { .. })); let err = local_file_storage_factory - .resolve(&storage_config, &Uri::from_well_formed("s3://")) + .resolve(&Uri::from_well_formed("s3://")) .await .err() .unwrap(); diff --git a/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs b/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs index 8b529f242d6..79744cf1b2e 100644 --- a/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs @@ -55,8 +55,16 @@ use crate::{ }; /// Azure object storage resolver. -#[derive(Default)] -pub struct AzureBlobStorageFactory; +pub struct AzureBlobStorageFactory { + storage_config: StorageConfig, +} + +impl AzureBlobStorageFactory { + /// Create a new Azure blob storage factory + pub fn new(storage_config: StorageConfig) -> Self { + Self { storage_config } + } +} #[async_trait] impl StorageFactory for AzureBlobStorageFactory { @@ -64,15 +72,11 @@ impl StorageFactory for AzureBlobStorageFactory { StorageBackend::Azure } - async fn resolve( - &self, - storage_config: &StorageConfig, - uri: &Uri, - ) -> Result, StorageResolverError> { - let azure_storage_config = storage_config.as_azure().ok_or_else(|| { + async fn resolve(&self, uri: &Uri) -> Result, StorageResolverError> { + let azure_storage_config = self.storage_config.as_azure().ok_or_else(|| { let message = format!( "Expected Azure storage config, got `{:?}`.", - storage_config.backend() + self.storage_config.backend() ); StorageResolverError::InvalidConfig(message) })?; diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage_resolver.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage_resolver.rs index f59d62f2096..052eb4af1c8 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage_resolver.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage_resolver.rs @@ -28,8 +28,16 @@ use crate::{ }; /// S3 compatible object storage resolver. -#[derive(Default)] -pub struct S3CompatibleObjectStorageFactory; +pub struct S3CompatibleObjectStorageFactory { + storage_config: StorageConfig, +} + +impl S3CompatibleObjectStorageFactory { + /// Create a new S3 compatible storage factory + pub fn new(storage_config: StorageConfig) -> Self { + Self { storage_config } + } +} #[async_trait] impl StorageFactory for S3CompatibleObjectStorageFactory { @@ -37,15 +45,11 @@ impl StorageFactory for S3CompatibleObjectStorageFactory { StorageBackend::S3 } - async fn resolve( - &self, - storage_config: &StorageConfig, - uri: &Uri, - ) -> Result, StorageResolverError> { - let s3_storage_config = storage_config.as_s3().ok_or_else(|| { + async fn resolve(&self, uri: &Uri) -> Result, StorageResolverError> { + let s3_storage_config = self.storage_config.as_s3().ok_or_else(|| { let message = format!( "Expected S3 storage config, got `{:?}`.", - storage_config.backend() + self.storage_config.backend() ); StorageResolverError::InvalidConfig(message) })?; diff --git a/quickwit/quickwit-storage/src/ram_storage.rs b/quickwit/quickwit-storage/src/ram_storage.rs index d1d88794e13..8816356a7b1 100644 --- a/quickwit/quickwit-storage/src/ram_storage.rs +++ b/quickwit/quickwit-storage/src/ram_storage.rs @@ -25,7 +25,7 @@ use std::sync::Arc; use async_trait::async_trait; use quickwit_common::uri::Uri; -use quickwit_config::{StorageBackend, StorageConfig}; +use quickwit_config::StorageBackend; use tokio::io::AsyncWriteExt; use tokio::sync::RwLock; @@ -194,11 +194,7 @@ impl StorageFactory for RamStorageFactory { StorageBackend::Ram } - async fn resolve( - &self, - _storage_config: &StorageConfig, - uri: &Uri, - ) -> Result, StorageResolverError> { + async fn resolve(&self, uri: &Uri) -> Result, StorageResolverError> { match uri.filepath() { Some(prefix) if uri.protocol().is_ram() => Ok(add_prefix_to_storage( self.ram_storage.clone(), @@ -215,7 +211,6 @@ impl StorageFactory for RamStorageFactory { #[cfg(test)] mod tests { - use quickwit_config::RamStorageConfig; use super::*; use crate::test_suite::storage_test_suite; @@ -230,34 +225,17 @@ mod tests { #[tokio::test] async fn test_ram_storage_factory() { let ram_storage_factory = RamStorageFactory::default(); - let storage_config = RamStorageConfig::default().into(); let ram_uri = Uri::from_well_formed("s3:///foo"); - let err = ram_storage_factory - .resolve(&storage_config, &ram_uri) - .await - .err() - .unwrap(); + let err = ram_storage_factory.resolve(&ram_uri).await.err().unwrap(); assert!(matches!(err, StorageResolverError::InvalidUri { .. })); let data_uri = Uri::from_well_formed("ram:///data"); - let data_storage = ram_storage_factory - .resolve(&storage_config, &data_uri) - .await - .ok() - .unwrap(); + let data_storage = ram_storage_factory.resolve(&data_uri).await.ok().unwrap(); let home_uri = Uri::from_well_formed("ram:///home"); - let home_storage = ram_storage_factory - .resolve(&storage_config, &home_uri) - .await - .ok() - .unwrap(); + let home_storage = ram_storage_factory.resolve(&home_uri).await.ok().unwrap(); assert_ne!(data_storage.uri(), home_storage.uri()); - let data_storage_two = ram_storage_factory - .resolve(&storage_config, &data_uri) - .await - .ok() - .unwrap(); + let data_storage_two = ram_storage_factory.resolve(&data_uri).await.ok().unwrap(); assert_eq!(data_storage.uri(), data_storage_two.uri()); } diff --git a/quickwit/quickwit-storage/src/storage_factory.rs b/quickwit/quickwit-storage/src/storage_factory.rs index 12f8d9d861f..e4cf74b7b7b 100644 --- a/quickwit/quickwit-storage/src/storage_factory.rs +++ b/quickwit/quickwit-storage/src/storage_factory.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use async_trait::async_trait; use quickwit_common::uri::Uri; -use quickwit_config::{StorageBackend, StorageConfig}; +use quickwit_config::StorageBackend; use crate::{Storage, StorageResolverError}; @@ -34,11 +34,7 @@ pub trait StorageFactory: Send + Sync + 'static { fn backend(&self) -> StorageBackend; /// Returns the appropriate [`Storage`] object for the URI. - async fn resolve( - &self, - storage_config: &StorageConfig, - uri: &Uri, - ) -> Result, StorageResolverError>; + async fn resolve(&self, uri: &Uri) -> Result, StorageResolverError>; } /// A storage factory for handling unsupported or unavailable storage backends. @@ -61,11 +57,7 @@ impl StorageFactory for UnsupportedStorage { self.backend } - async fn resolve( - &self, - _storage_config: &StorageConfig, - _uri: &Uri, - ) -> Result, StorageResolverError> { + async fn resolve(&self, _uri: &Uri) -> Result, StorageResolverError> { Err(StorageResolverError::UnsupportedBackend( self.message.to_string(), )) diff --git a/quickwit/quickwit-storage/src/storage_resolver.rs b/quickwit/quickwit-storage/src/storage_resolver.rs index 3afda9d34ef..ec3588dcb4b 100644 --- a/quickwit/quickwit-storage/src/storage_resolver.rs +++ b/quickwit/quickwit-storage/src/storage_resolver.rs @@ -21,10 +21,9 @@ use std::collections::HashMap; use std::fmt; use std::sync::Arc; -use anyhow::ensure; use once_cell::sync::Lazy; use quickwit_common::uri::{Protocol, Uri}; -use quickwit_config::{StorageBackend, StorageConfig, StorageConfigs}; +use quickwit_config::{StorageBackend, StorageConfigs}; use crate::local_file_storage::LocalFileStorageFactory; use crate::ram_storage::RamStorageFactory; @@ -32,14 +31,12 @@ use crate::ram_storage::RamStorageFactory; use crate::AzureBlobStorageFactory; use crate::{S3CompatibleObjectStorageFactory, Storage, StorageFactory, StorageResolverError}; -type FactoryAndConfig = (Box, StorageConfig); - /// Returns the [`Storage`] instance associated with the protocol of a URI. The actual creation of /// storage objects is delegated to pre-registered [`StorageFactory`]. The resolver is only /// responsible for dispatching to the appropriate factory. #[derive(Clone)] pub struct StorageResolver { - per_backend_factories: Arc>, + per_backend_factories: Arc>>, } impl fmt::Debug for StorageResolver { @@ -69,12 +66,11 @@ impl StorageResolver { return Err(StorageResolverError::UnsupportedBackend(message)); } }; - let (storage_factory, storage_config) = - self.per_backend_factories.get(&backend).ok_or({ - let message = format!("no storage factory is registered for {}.", uri.protocol()); - StorageResolverError::UnsupportedBackend(message) - })?; - let storage = storage_factory.resolve(storage_config, uri).await?; + let storage_factory = self.per_backend_factories.get(&backend).ok_or({ + let message = format!("no storage factory is registered for {}.", uri.protocol()); + StorageResolverError::UnsupportedBackend(message) + })?; + let storage = storage_factory.resolve(uri).await?; Ok(storage) } @@ -93,40 +89,24 @@ impl StorageResolver { /// Creates and returns a [`StorageResolver`]. pub fn configured(storage_configs: &StorageConfigs) -> Self { let mut builder = StorageResolver::builder() - .register( - LocalFileStorageFactory, - storage_configs - .find_file() - .cloned() - .unwrap_or_default() - .into(), - ) - .register( - RamStorageFactory::default(), - storage_configs - .find_ram() - .cloned() - .unwrap_or_default() - .into(), - ) - .register( - S3CompatibleObjectStorageFactory, + .register(LocalFileStorageFactory) + .register(RamStorageFactory::default()) + .register(S3CompatibleObjectStorageFactory::new( storage_configs .find_s3() .cloned() .unwrap_or_default() .into(), - ); + )); #[cfg(feature = "azure")] { - builder = builder.register( - AzureBlobStorageFactory::default(), + builder = builder.register(AzureBlobStorageFactory::new( storage_configs .find_azure() .cloned() .unwrap_or_default() .into(), - ); + )); } #[cfg(not(feature = "azure"))] { @@ -134,13 +114,10 @@ impl StorageResolver { use crate::storage_factory::UnsupportedStorage; - builder = builder.register( - UnsupportedStorage::new( - StorageBackend::Azure, - "Quickwit was compiled without the `azure` feature.", - ), - AzureStorageConfig::default().into(), - ) + builder = builder.register(UnsupportedStorage::new( + StorageBackend::Azure, + "Quickwit was compiled without the `azure` feature.", + )) } builder .build() @@ -150,58 +127,28 @@ impl StorageResolver { /// Returns a [`StorageResolver`] for testing purposes. Unlike /// [`StorageResolver::unconfigured`], this resolver does not return a singleton. pub fn ram_for_test() -> Self { - use quickwit_config::RamStorageConfig; - StorageResolver::builder() - .register( - RamStorageFactory::default(), - RamStorageConfig::default().into(), - ) + .register(RamStorageFactory::default()) .build() .expect("Storage factory and config backends should match.") } - - /// Returns the [`StorageConfigs`] associated with this resolver for testing purposes only. - #[cfg(any(test, feature = "testsuite"))] - pub fn storage_configs(&self) -> StorageConfigs { - let storage_configs = self - .per_backend_factories - .values() - .map(|(_, storage_config)| storage_config.clone()) - .collect(); - StorageConfigs::new(storage_configs) - } } #[derive(Default)] pub struct StorageResolverBuilder { - per_backend_factories: HashMap, StorageConfig)>, + per_backend_factories: HashMap>, } impl StorageResolverBuilder { /// Registers a [`StorageFactory`] and a [`StorageConfig`]. - pub fn register( - mut self, - storage_factory: S, - storage_config: StorageConfig, - ) -> Self { - self.per_backend_factories.insert( - storage_factory.backend(), - (Box::new(storage_factory), storage_config), - ); + pub fn register(mut self, storage_factory: S) -> Self { + self.per_backend_factories + .insert(storage_factory.backend(), Box::new(storage_factory)); self } /// Builds the [`StorageResolver`]. pub fn build(self) -> anyhow::Result { - for (storage_factory, storage_config) in self.per_backend_factories.values() { - ensure!( - storage_factory.backend() == storage_config.backend(), - "Storage factory and config backends do not match: {:?} vs. {:?}.", - storage_factory.backend(), - storage_config.backend(), - ); - } let storage_resolver = StorageResolver { per_backend_factories: Arc::new(self.per_backend_factories), }; @@ -213,8 +160,6 @@ impl StorageResolverBuilder { mod tests { use std::path::Path; - use quickwit_config::{FileStorageConfig, RamStorageConfig}; - use super::*; use crate::{MockStorageFactory, RamStorage}; @@ -229,18 +174,16 @@ mod tests { ram_storage_factory .expect_backend() .returning(|| StorageBackend::Ram); - ram_storage_factory - .expect_resolve() - .returning(|_storage_config, _uri| { - Ok(Arc::new( - RamStorage::builder() - .put("hello", b"hello_content_second") - .build(), - )) - }); + ram_storage_factory.expect_resolve().returning(|_uri| { + Ok(Arc::new( + RamStorage::builder() + .put("hello", b"hello_content_second") + .build(), + )) + }); let storage_resolver = StorageResolver::builder() - .register(file_storage_factory, FileStorageConfig::default().into()) - .register(ram_storage_factory, RamStorageConfig::default().into()) + .register(file_storage_factory) + .register(ram_storage_factory) .build() .unwrap(); let storage = storage_resolver @@ -264,7 +207,7 @@ mod tests { .returning(|| StorageBackend::Ram); second_ram_storage_factory .expect_resolve() - .returning(|_storage_config, uri| { + .returning(|uri| { assert_eq!(uri.as_str(), "ram:///home"); Ok(Arc::new( RamStorage::builder() @@ -273,14 +216,8 @@ mod tests { )) }); let storage_resolver = StorageResolver::builder() - .register( - first_ram_storage_factory, - RamStorageConfig::default().into(), - ) - .register( - second_ram_storage_factory, - RamStorageConfig::default().into(), - ) + .register(first_ram_storage_factory) + .register(second_ram_storage_factory) .build() .unwrap(); let storage = storage_resolver