Skip to content

Commit

Permalink
cache uri as Uri
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Aug 21, 2023
1 parent 862b7d3 commit 95fb629
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 20 deletions.
16 changes: 13 additions & 3 deletions quickwit/quickwit-config/src/storage_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::ops::Deref;
use std::str::FromStr;
use std::{env, fmt};

use anyhow::ensure;
use byte_unit::Byte;
use itertools::Itertools;
use quickwit_common::uri::Uri;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, EnumMap};

Expand Down Expand Up @@ -406,22 +408,30 @@ pub struct RamStorageConfig;
#[serde(deny_unknown_fields)]
pub struct CacheStorageConfig {
#[serde(default)]
pub cache_uri: Option<String>,
cache_uri: Option<Uri>,
#[serde(default)]
pub max_cache_storage_disk_usage: Option<Byte>,
}

impl CacheStorageConfig {
pub fn cache_uri(&self) -> Option<String> {
pub fn cache_uri(&self) -> Option<Uri> {
// TODO do we really need an env variable?
// TODO shouldn't we have a reasonable default? (or even no config possible)
// TODO is this priority order good?
env::var("QW_CACHE_CACHE_URI")
.ok()
.map(|uri_str| {
Uri::from_str(&uri_str).expect("Invalid uri in QW_CACHE_CACHE_URI")
// TODO make sure the panic would happen at the startup of quickwit, or that we use
// errors.
})
.or_else(|| self.cache_uri.clone())
}

#[cfg(any(test, feature = "testsuite"))]
pub fn for_test() -> Self {
CacheStorageConfig {
cache_uri: Some("ram://cache".to_string()),
cache_uri: Some(Uri::for_test("ram://cache")),
max_cache_storage_disk_usage: None,
}
}
Expand Down
14 changes: 3 additions & 11 deletions quickwit/quickwit-storage/src/cache_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,17 +233,9 @@ impl StorageFactory for CacheStorageFactory {
) -> Result<Arc<dyn Storage>, StorageResolverError> {
if uri.protocol().is_cache() {
// TODO: Prevent stack overflow here if cache uri is also cache
let cache_uri = self
.storage_config
.cache_uri()
.ok_or_else(|| {
StorageResolverError::InvalidConfig("Expected cache uri in config.".to_string())
})?
.parse::<Uri>()
.map_err(|err| {
let message = format!("Cannot parse cache uri `{:?}`.", err.to_string());
StorageResolverError::InvalidConfig(message)
})?;
let cache_uri = self.storage_config.cache_uri().ok_or_else(|| {
StorageResolverError::InvalidConfig("Expected cache uri in config.".to_string())
})?;
let cache = storage_resolver.resolve(&cache_uri).await?;
let upstream_uri = uri
.scheme_specific_part()
Expand Down
10 changes: 4 additions & 6 deletions quickwit/quickwit-storage/src/cached_splits_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,7 @@ impl CachedSplitRegistry {
output_path: &Path,
) -> anyhow::Result<()> {
// TODO: Figure out an ealier way to handle these issues
if let Some(cache_uri_str) = self.inner.storage_config.cache_uri.clone() {
let cache_uri = cache_uri_str.parse()?;
if let Some(cache_uri) = self.inner.storage_config.cache_uri().clone() {
let storage = storage_resolver.resolve(storage_uri).await?;
let cache = storage_resolver.resolve(&cache_uri).await?;
storage.copy_to_storage(path, cache, output_path).await?;
Expand Down Expand Up @@ -219,8 +218,7 @@ impl CachedSplitRegistry {
) -> anyhow::Result<()> {
// TODO: This should've been handle earlier, but we need have a more graceful way of dealing
// with possible issues
if let Some(cache_uri_str) = self.inner.storage_config.cache_uri.clone() {
let cache_uri = cache_uri_str.parse()?;
if let Some(cache_uri) = self.inner.storage_config.cache_uri() {
let cache = storage_resolver.resolve(&cache_uri).await?;
cache.delete(path).await?;
}
Expand Down Expand Up @@ -311,13 +309,13 @@ mod tests {
let storage_resolver = StorageResolver::ram_for_test();
let config = CacheStorageConfig::for_test();
let counters = Arc::new(AtomicCacheStorageCounters::default());
let registry = CachedSplitRegistry::new(config.clone(), counters.clone());
let registry = CachedSplitRegistry::new(config.clone());
let storage = storage_resolver
.resolve(&Uri::for_test("ram://data"))
.await
.unwrap();
let cache = storage_resolver
.resolve(&Uri::for_test(&config.cache_uri.unwrap()))
.resolve(&config.cache_uri().unwrap())
.await
.unwrap();
let split_id = "abcd".to_string();
Expand Down

0 comments on commit 95fb629

Please sign in to comment.