Skip to content

Commit

Permalink
Refactor storage factory initialization
Browse files Browse the repository at this point in the history
For #3443 I need to be able to perform initialization on storage factory
level and in order to do that I need access to the config parameters
during initialization rather than during storage resolution. This PR
moves the storage config parameters to the storage initializer.
  • Loading branch information
imotov committed Aug 4, 2023
1 parent 8c2caf5 commit bfff12a
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 179 deletions.
9 changes: 1 addition & 8 deletions quickwit/quickwit-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,14 +442,7 @@ mod tests {
};
let storage_configs = StorageConfigs::new(vec![s3_storage_config.into()]);
let metastore_configs = MetastoreConfigs::default();
let (storage_resolver, _metastore_resolver) =
let (_storage_resolver, _metastore_resolver) =
get_resolvers(&storage_configs, &metastore_configs);
assert!(
storage_resolver
.storage_configs()
.find_s3()
.unwrap()
.force_path_style_access
);
}
}
4 changes: 1 addition & 3 deletions quickwit/quickwit-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,12 @@ pub use for_test::storage_for_test;
mod tests {
use std::str::FromStr;

use quickwit_config::FileStorageConfig;

use super::*;

#[tokio::test]
async fn test_load_file() {
let storage_resolver = StorageResolver::builder()
.register(LocalFileStorageFactory, FileStorageConfig::default().into())
.register(LocalFileStorageFactory)
.build()
.unwrap();
let expected_bytes = tokio::fs::read_to_string("Cargo.toml").await.unwrap();
Expand Down
19 changes: 5 additions & 14 deletions quickwit/quickwit-storage/src/local_file_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use futures::future::{BoxFuture, FutureExt};
use futures::StreamExt;
use quickwit_common::ignore_error_kind;
use quickwit_common::uri::Uri;
use quickwit_config::{StorageBackend, StorageConfig};
use quickwit_config::StorageBackend;
use tokio::fs;
use tokio::io::AsyncWriteExt;
use tracing::warn;
Expand Down Expand Up @@ -345,11 +345,7 @@ impl StorageFactory for LocalFileStorageFactory {
StorageBackend::File
}

async fn resolve(
&self,
_storage_config: &StorageConfig,
uri: &Uri,
) -> Result<Arc<dyn Storage>, StorageResolverError> {
async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError> {
let storage = LocalFileStorage::from_uri(uri)?;
Ok(Arc::new(DebouncedStorage::new(storage)))
}
Expand All @@ -360,8 +356,6 @@ mod tests {

use std::str::FromStr;

use quickwit_config::FileStorageConfig;

use super::*;
use crate::test_suite::storage_test_suite;

Expand Down Expand Up @@ -395,25 +389,22 @@ mod tests {

#[tokio::test]
async fn test_local_file_storage_factory() -> anyhow::Result<()> {
let storage_config = FileStorageConfig::default().into();
let temp_dir = tempfile::tempdir()?;
let index_uri =
Uri::from_well_formed(format!("file://{}/foo/bar", temp_dir.path().display()));
let local_file_storage_factory = LocalFileStorageFactory::default();
let local_file_storage = local_file_storage_factory
.resolve(&storage_config, &index_uri)
.await?;
let local_file_storage = local_file_storage_factory.resolve(&index_uri).await?;
assert_eq!(local_file_storage.uri(), &index_uri);

let err = local_file_storage_factory
.resolve(&storage_config, &Uri::from_well_formed("s3://foo/bar"))
.resolve(&Uri::from_well_formed("s3://foo/bar"))
.await
.err()
.unwrap();
assert!(matches!(err, StorageResolverError::InvalidUri { .. }));

let err = local_file_storage_factory
.resolve(&storage_config, &Uri::from_well_formed("s3://"))
.resolve(&Uri::from_well_formed("s3://"))
.await
.err()
.unwrap();
Expand Down
22 changes: 13 additions & 9 deletions quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,24 +55,28 @@ use crate::{
};

/// Azure object storage resolver.
#[derive(Default)]
pub struct AzureBlobStorageFactory;
pub struct AzureBlobStorageFactory {
storage_config: StorageConfig,
}

impl AzureBlobStorageFactory {
/// Create a new Azure blob storage factory
pub fn new(storage_config: StorageConfig) -> Self {
Self { storage_config }
}
}

#[async_trait]
impl StorageFactory for AzureBlobStorageFactory {
fn backend(&self) -> StorageBackend {
StorageBackend::Azure
}

async fn resolve(
&self,
storage_config: &StorageConfig,
uri: &Uri,
) -> Result<Arc<dyn Storage>, StorageResolverError> {
let azure_storage_config = storage_config.as_azure().ok_or_else(|| {
async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError> {
let azure_storage_config = self.storage_config.as_azure().ok_or_else(|| {
let message = format!(
"Expected Azure storage config, got `{:?}`.",
storage_config.backend()
self.storage_config.backend()
);
StorageResolverError::InvalidConfig(message)
})?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,28 @@ use crate::{
};

/// S3 compatible object storage resolver.
#[derive(Default)]
pub struct S3CompatibleObjectStorageFactory;
pub struct S3CompatibleObjectStorageFactory {
storage_config: StorageConfig,
}

impl S3CompatibleObjectStorageFactory {
/// Create a new S3 compatible storage factory
pub fn new(storage_config: StorageConfig) -> Self {
Self { storage_config }
}
}

#[async_trait]
impl StorageFactory for S3CompatibleObjectStorageFactory {
fn backend(&self) -> StorageBackend {
StorageBackend::S3
}

async fn resolve(
&self,
storage_config: &StorageConfig,
uri: &Uri,
) -> Result<Arc<dyn Storage>, StorageResolverError> {
let s3_storage_config = storage_config.as_s3().ok_or_else(|| {
async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError> {
let s3_storage_config = self.storage_config.as_s3().ok_or_else(|| {
let message = format!(
"Expected S3 storage config, got `{:?}`.",
storage_config.backend()
self.storage_config.backend()
);
StorageResolverError::InvalidConfig(message)
})?;
Expand Down
34 changes: 6 additions & 28 deletions quickwit/quickwit-storage/src/ram_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use quickwit_common::uri::Uri;
use quickwit_config::{StorageBackend, StorageConfig};
use quickwit_config::StorageBackend;
use tokio::io::AsyncWriteExt;
use tokio::sync::RwLock;

Expand Down Expand Up @@ -194,11 +194,7 @@ impl StorageFactory for RamStorageFactory {
StorageBackend::Ram
}

async fn resolve(
&self,
_storage_config: &StorageConfig,
uri: &Uri,
) -> Result<Arc<dyn Storage>, StorageResolverError> {
async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError> {
match uri.filepath() {
Some(prefix) if uri.protocol().is_ram() => Ok(add_prefix_to_storage(
self.ram_storage.clone(),
Expand All @@ -215,7 +211,6 @@ impl StorageFactory for RamStorageFactory {

#[cfg(test)]
mod tests {
use quickwit_config::RamStorageConfig;

use super::*;
use crate::test_suite::storage_test_suite;
Expand All @@ -230,34 +225,17 @@ mod tests {
#[tokio::test]
async fn test_ram_storage_factory() {
let ram_storage_factory = RamStorageFactory::default();
let storage_config = RamStorageConfig::default().into();
let ram_uri = Uri::from_well_formed("s3:///foo");
let err = ram_storage_factory
.resolve(&storage_config, &ram_uri)
.await
.err()
.unwrap();
let err = ram_storage_factory.resolve(&ram_uri).await.err().unwrap();
assert!(matches!(err, StorageResolverError::InvalidUri { .. }));

let data_uri = Uri::from_well_formed("ram:///data");
let data_storage = ram_storage_factory
.resolve(&storage_config, &data_uri)
.await
.ok()
.unwrap();
let data_storage = ram_storage_factory.resolve(&data_uri).await.ok().unwrap();
let home_uri = Uri::from_well_formed("ram:///home");
let home_storage = ram_storage_factory
.resolve(&storage_config, &home_uri)
.await
.ok()
.unwrap();
let home_storage = ram_storage_factory.resolve(&home_uri).await.ok().unwrap();
assert_ne!(data_storage.uri(), home_storage.uri());

let data_storage_two = ram_storage_factory
.resolve(&storage_config, &data_uri)
.await
.ok()
.unwrap();
let data_storage_two = ram_storage_factory.resolve(&data_uri).await.ok().unwrap();
assert_eq!(data_storage.uri(), data_storage_two.uri());
}

Expand Down
14 changes: 3 additions & 11 deletions quickwit/quickwit-storage/src/storage_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use quickwit_common::uri::Uri;
use quickwit_config::{StorageBackend, StorageConfig};
use quickwit_config::StorageBackend;

use crate::{Storage, StorageResolverError};

Expand All @@ -34,11 +34,7 @@ pub trait StorageFactory: Send + Sync + 'static {
fn backend(&self) -> StorageBackend;

/// Returns the appropriate [`Storage`] object for the URI.
async fn resolve(
&self,
storage_config: &StorageConfig,
uri: &Uri,
) -> Result<Arc<dyn Storage>, StorageResolverError>;
async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError>;
}

/// A storage factory for handling unsupported or unavailable storage backends.
Expand All @@ -61,11 +57,7 @@ impl StorageFactory for UnsupportedStorage {
self.backend
}

async fn resolve(
&self,
_storage_config: &StorageConfig,
_uri: &Uri,
) -> Result<Arc<dyn Storage>, StorageResolverError> {
async fn resolve(&self, _uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError> {
Err(StorageResolverError::UnsupportedBackend(
self.message.to_string(),
))
Expand Down
Loading

0 comments on commit bfff12a

Please sign in to comment.