Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor storage factory initialization #3709

Merged
merged 8 commits into from
Aug 5, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions quickwit/quickwit-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,14 +442,7 @@ mod tests {
};
let storage_configs = StorageConfigs::new(vec![s3_storage_config.into()]);
let metastore_configs = MetastoreConfigs::default();
let (storage_resolver, _metastore_resolver) =
let (_storage_resolver, _metastore_resolver) =
get_resolvers(&storage_configs, &metastore_configs);
assert!(
storage_resolver
.storage_configs()
.find_s3()
.unwrap()
.force_path_style_access
);
}
}
4 changes: 1 addition & 3 deletions quickwit/quickwit-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,12 @@ pub use for_test::storage_for_test;
mod tests {
use std::str::FromStr;

use quickwit_config::FileStorageConfig;

use super::*;

#[tokio::test]
async fn test_load_file() {
let storage_resolver = StorageResolver::builder()
.register(LocalFileStorageFactory, FileStorageConfig::default().into())
.register(LocalFileStorageFactory)
.build()
.unwrap();
let expected_bytes = tokio::fs::read_to_string("Cargo.toml").await.unwrap();
Expand Down
19 changes: 5 additions & 14 deletions quickwit/quickwit-storage/src/local_file_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use futures::future::{BoxFuture, FutureExt};
use futures::StreamExt;
use quickwit_common::ignore_error_kind;
use quickwit_common::uri::Uri;
use quickwit_config::{StorageBackend, StorageConfig};
use quickwit_config::StorageBackend;
use tokio::fs;
use tokio::io::AsyncWriteExt;
use tracing::warn;
Expand Down Expand Up @@ -345,11 +345,7 @@ impl StorageFactory for LocalFileStorageFactory {
StorageBackend::File
}

async fn resolve(
&self,
_storage_config: &StorageConfig,
uri: &Uri,
) -> Result<Arc<dyn Storage>, StorageResolverError> {
async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError> {
let storage = LocalFileStorage::from_uri(uri)?;
Ok(Arc::new(DebouncedStorage::new(storage)))
}
Expand All @@ -360,8 +356,6 @@ mod tests {

use std::str::FromStr;

use quickwit_config::FileStorageConfig;

use super::*;
use crate::test_suite::storage_test_suite;

Expand Down Expand Up @@ -395,25 +389,22 @@ mod tests {

#[tokio::test]
async fn test_local_file_storage_factory() -> anyhow::Result<()> {
let storage_config = FileStorageConfig::default().into();
let temp_dir = tempfile::tempdir()?;
let index_uri =
Uri::from_well_formed(format!("file://{}/foo/bar", temp_dir.path().display()));
let local_file_storage_factory = LocalFileStorageFactory::default();
let local_file_storage = local_file_storage_factory
.resolve(&storage_config, &index_uri)
.await?;
let local_file_storage = local_file_storage_factory.resolve(&index_uri).await?;
assert_eq!(local_file_storage.uri(), &index_uri);

let err = local_file_storage_factory
.resolve(&storage_config, &Uri::from_well_formed("s3://foo/bar"))
.resolve(&Uri::from_well_formed("s3://foo/bar"))
.await
.err()
.unwrap();
assert!(matches!(err, StorageResolverError::InvalidUri { .. }));

let err = local_file_storage_factory
.resolve(&storage_config, &Uri::from_well_formed("s3://"))
.resolve(&Uri::from_well_formed("s3://"))
.await
.err()
.unwrap();
Expand Down
22 changes: 13 additions & 9 deletions quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,24 +55,28 @@ use crate::{
};

/// Azure object storage resolver.
#[derive(Default)]
pub struct AzureBlobStorageFactory;
pub struct AzureBlobStorageFactory {
storage_config: StorageConfig,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use an AzureStorageConfig directly?

}

impl AzureBlobStorageFactory {
/// Create a new Azure blob storage factory
imotov marked this conversation as resolved.
Show resolved Hide resolved
pub fn new(storage_config: StorageConfig) -> Self {
Self { storage_config }
}
}

#[async_trait]
impl StorageFactory for AzureBlobStorageFactory {
fn backend(&self) -> StorageBackend {
StorageBackend::Azure
}

async fn resolve(
&self,
storage_config: &StorageConfig,
uri: &Uri,
) -> Result<Arc<dyn Storage>, StorageResolverError> {
let azure_storage_config = storage_config.as_azure().ok_or_else(|| {
async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError> {
let azure_storage_config = self.storage_config.as_azure().ok_or_else(|| {
let message = format!(
"Expected Azure storage config, got `{:?}`.",
storage_config.backend()
self.storage_config.backend()
);
StorageResolverError::InvalidConfig(message)
})?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,28 @@ use crate::{
};

/// S3 compatible object storage resolver.
#[derive(Default)]
pub struct S3CompatibleObjectStorageFactory;
pub struct S3CompatibleObjectStorageFactory {
storage_config: StorageConfig,
}

impl S3CompatibleObjectStorageFactory {
/// Create a new S3 compatible storage factory
imotov marked this conversation as resolved.
Show resolved Hide resolved
pub fn new(storage_config: StorageConfig) -> Self {
Self { storage_config }
}
}

#[async_trait]
impl StorageFactory for S3CompatibleObjectStorageFactory {
fn backend(&self) -> StorageBackend {
StorageBackend::S3
}

async fn resolve(
&self,
storage_config: &StorageConfig,
uri: &Uri,
) -> Result<Arc<dyn Storage>, StorageResolverError> {
let s3_storage_config = storage_config.as_s3().ok_or_else(|| {
async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError> {
let s3_storage_config = self.storage_config.as_s3().ok_or_else(|| {
let message = format!(
"Expected S3 storage config, got `{:?}`.",
storage_config.backend()
self.storage_config.backend()
);
StorageResolverError::InvalidConfig(message)
})?;
Expand Down
34 changes: 6 additions & 28 deletions quickwit/quickwit-storage/src/ram_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use quickwit_common::uri::Uri;
use quickwit_config::{StorageBackend, StorageConfig};
use quickwit_config::StorageBackend;
use tokio::io::AsyncWriteExt;
use tokio::sync::RwLock;

Expand Down Expand Up @@ -194,11 +194,7 @@ impl StorageFactory for RamStorageFactory {
StorageBackend::Ram
}

async fn resolve(
&self,
_storage_config: &StorageConfig,
uri: &Uri,
) -> Result<Arc<dyn Storage>, StorageResolverError> {
async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError> {
match uri.filepath() {
Some(prefix) if uri.protocol().is_ram() => Ok(add_prefix_to_storage(
self.ram_storage.clone(),
Expand All @@ -215,7 +211,6 @@ impl StorageFactory for RamStorageFactory {

#[cfg(test)]
mod tests {
use quickwit_config::RamStorageConfig;

use super::*;
use crate::test_suite::storage_test_suite;
Expand All @@ -230,34 +225,17 @@ mod tests {
#[tokio::test]
async fn test_ram_storage_factory() {
let ram_storage_factory = RamStorageFactory::default();
let storage_config = RamStorageConfig::default().into();
let ram_uri = Uri::from_well_formed("s3:///foo");
let err = ram_storage_factory
.resolve(&storage_config, &ram_uri)
.await
.err()
.unwrap();
let err = ram_storage_factory.resolve(&ram_uri).await.err().unwrap();
assert!(matches!(err, StorageResolverError::InvalidUri { .. }));

let data_uri = Uri::from_well_formed("ram:///data");
let data_storage = ram_storage_factory
.resolve(&storage_config, &data_uri)
.await
.ok()
.unwrap();
let data_storage = ram_storage_factory.resolve(&data_uri).await.ok().unwrap();
let home_uri = Uri::from_well_formed("ram:///home");
let home_storage = ram_storage_factory
.resolve(&storage_config, &home_uri)
.await
.ok()
.unwrap();
let home_storage = ram_storage_factory.resolve(&home_uri).await.ok().unwrap();
assert_ne!(data_storage.uri(), home_storage.uri());

let data_storage_two = ram_storage_factory
.resolve(&storage_config, &data_uri)
.await
.ok()
.unwrap();
let data_storage_two = ram_storage_factory.resolve(&data_uri).await.ok().unwrap();
assert_eq!(data_storage.uri(), data_storage_two.uri());
}

Expand Down
16 changes: 4 additions & 12 deletions quickwit/quickwit-storage/src/storage_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,20 @@ use std::sync::Arc;

use async_trait::async_trait;
use quickwit_common::uri::Uri;
use quickwit_config::{StorageBackend, StorageConfig};
use quickwit_config::StorageBackend;

use crate::{Storage, StorageResolverError};

/// A storage factory builds a [`Storage`] object for a target [`StorageBackend`] from a
/// [`StorageConfig`] and a [`Uri`].
/// [`Uri`].
#[cfg_attr(any(test, feature = "testsuite"), mockall::automock)]
#[async_trait]
pub trait StorageFactory: Send + Sync + 'static {
/// Returns the storage backend targeted by the factory.
fn backend(&self) -> StorageBackend;

/// Returns the appropriate [`Storage`] object for the URI.
async fn resolve(
&self,
storage_config: &StorageConfig,
uri: &Uri,
) -> Result<Arc<dyn Storage>, StorageResolverError>;
async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError>;
}

/// A storage factory for handling unsupported or unavailable storage backends.
Expand All @@ -61,11 +57,7 @@ impl StorageFactory for UnsupportedStorage {
self.backend
}

async fn resolve(
&self,
_storage_config: &StorageConfig,
_uri: &Uri,
) -> Result<Arc<dyn Storage>, StorageResolverError> {
async fn resolve(&self, _uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError> {
Err(StorageResolverError::UnsupportedBackend(
self.message.to_string(),
))
Expand Down
Loading
Loading