Skip to content

Commit

Permalink
Simplification
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Aug 21, 2023
1 parent 95fb629 commit 7ce7cb4
Showing 1 changed file with 23 additions and 6 deletions.
29 changes: 23 additions & 6 deletions quickwit/quickwit-storage/src/cached_splits_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ struct InnerCachedSplitRegistry {
storage_config: CacheStorageConfig,
counters: AtomicCacheStorageCounters,
splits: RwLock<HashMap<String, SplitInfo>>,
cache_storage: tokio::sync::OnceCell<Option<Arc<dyn Storage>>>,
}

impl CachedSplitRegistry {
Expand All @@ -63,6 +64,7 @@ impl CachedSplitRegistry {
storage_config,
counters: Default::default(),
splits: RwLock::new(HashMap::new()),
cache_storage: Default::default(),
}),
}
}
Expand All @@ -71,6 +73,23 @@ impl CachedSplitRegistry {
&self.inner.counters
}

// TODO why `Option`
async fn cache_storage(&self, storage_resolver: &StorageResolver) -> Option<Arc<dyn Storage>> {
self.inner.cache_storage.get_or_init(|| async {
let cache_uri = self.inner.storage_config.cache_uri().clone()?;
match storage_resolver.resolve(&cache_uri).await {
Ok(cache) => {
Some(cache)
}
Err(err) => {
error!("Failed to resolve cache storage. {:?}", err);
None
}
}
}).await.clone()

}

#[allow(dead_code)]
pub async fn load(
&self,
Expand Down Expand Up @@ -165,10 +184,9 @@ impl CachedSplitRegistry {
output_path: &Path,
) -> anyhow::Result<()> {
// TODO: Figure out an ealier way to handle these issues
if let Some(cache_uri) = self.inner.storage_config.cache_uri().clone() {
if let Some(cache_storage) = self.cache_storage(storage_resolver).await {
let storage = storage_resolver.resolve(storage_uri).await?;
let cache = storage_resolver.resolve(&cache_uri).await?;
storage.copy_to_storage(path, cache, output_path).await?;
storage.copy_to_storage(path, cache_storage, output_path).await?;
}
Ok(())
}
Expand Down Expand Up @@ -218,9 +236,8 @@ impl CachedSplitRegistry {
) -> anyhow::Result<()> {
// TODO: This should've been handle earlier, but we need have a more graceful way of dealing
// with possible issues
if let Some(cache_uri) = self.inner.storage_config.cache_uri() {
let cache = storage_resolver.resolve(&cache_uri).await?;
cache.delete(path).await?;
if let Some(cache_storage) = self.cache_storage(storage_resolver).await {
cache_storage.delete(path).await?;
}
Ok(())
}
Expand Down

0 comments on commit 7ce7cb4

Please sign in to comment.