Skip to content

Commit

Permalink
First step in refactoring of storage configuration
Browse files Browse the repository at this point in the history
In this step we just change the way CacheStorageService is getting
access to CachedSplitRegistry.
  • Loading branch information
imotov committed Aug 22, 2023
1 parent 03b4328 commit 664e339
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 84 deletions.
29 changes: 19 additions & 10 deletions quickwit/quickwit-cache-storage/src/cache_storage_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::fmt;
use anyhow::anyhow;
use async_trait::async_trait;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Universe};
use quickwit_common::uri::Uri;
use quickwit_config::NodeConfig;
use quickwit_proto::cache_storage::{
CacheStorageServiceClient, NotifySplitsChangeRequest, NotifySplitsChangeResponse,
Expand All @@ -30,7 +31,7 @@ use quickwit_proto::cache_storage::{
use quickwit_proto::metastore::MetastoreError;
use quickwit_proto::{ServiceError, ServiceErrorCode};
use quickwit_storage::{
CacheStorageCounters, CacheStorageFactory, StorageError, StorageResolver, StorageResolverError,
CacheStorageCounters, CachedSplitRegistry, StorageError, StorageResolver, StorageResolverError,
};
use thiserror::Error;
use tracing::{debug, error, info};
Expand Down Expand Up @@ -62,19 +63,19 @@ impl ServiceError for CacheStorageServiceError {
pub struct CacheStorageService {
node_id: String,
storage_resolver: StorageResolver,
cache_storage_factory: CacheStorageFactory,
cached_split_registry: CachedSplitRegistry,
}

impl CacheStorageService {
pub async fn new(
node_id: String,
storage_resolver: StorageResolver,
) -> anyhow::Result<CacheStorageService> {
if let Some(cache_storage_factory) = storage_resolver.cache_storage_factory() {
if let Some(cached_split_registry) = storage_resolver.cached_split_registry() {
Ok(Self {
node_id,
storage_resolver,
cache_storage_factory,
cached_split_registry,
})
} else {
Err(anyhow!(CacheStorageServiceError::InvalidParams(
Expand All @@ -89,12 +90,20 @@ impl CacheStorageService {

pub async fn update_split_cache(
&self,
splits: Vec<SplitsChangeNotification>,
notifications: Vec<SplitsChangeNotification>,
) -> Result<(), ActorExitStatus> {
self.cache_storage_factory
.update_split_cache(&self.storage_resolver, splits)
.await
.map_err(|e| ActorExitStatus::from(anyhow!("Failed to update split cache: {:?}", e)))
let mut splits: Vec<(String, String, Uri)> = Vec::new();
for notification in notifications {
splits.push((
notification.split_id,
notification.index_id,
notification.storage_uri.parse()?,
));
}
self.cached_split_registry
.bulk_update(&self.storage_resolver, &splits)
.await;
Ok(())
}
}

Expand Down Expand Up @@ -146,7 +155,7 @@ impl Actor for CacheStorageService {
type ObservableState = CacheStorageCounters;

fn observable_state(&self) -> Self::ObservableState {
self.cache_storage_factory.counters()
self.cached_split_registry.counters()
}

async fn initialize(&mut self, ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
Expand Down
78 changes: 45 additions & 33 deletions quickwit/quickwit-storage/src/cache_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl CacheStorage {
}

fn counters(&self) -> &AtomicCacheStorageCounters {
self.cache_split_registry.counters()
self.cache_split_registry.atomic_counters()
}
}

Expand Down Expand Up @@ -182,28 +182,37 @@ impl AtomicCacheStorageCounters {
/// Storage resolver for [`CacheStorage`].
#[derive(Clone)]
pub struct CacheStorageFactory {
inner: Arc<InnerCacheStorageFactory>,
}

struct InnerCacheStorageFactory {
storage_config: CacheStorageConfig,
cache_split_registry: CachedSplitRegistry,
cached_split_registry: Option<CachedSplitRegistry>,
}

impl CacheStorageFactory {
/// Create a new storage factory
pub fn new(storage_config: CacheStorageConfig) -> Self {
// the split registry is only needed if we have cache configured on this node
let cached_split_registry = if storage_config.max_cache_storage_disk_usage.is_some() {
Some(CachedSplitRegistry::new(storage_config.clone()))
} else {
None
};
Self {
inner: Arc::new(InnerCacheStorageFactory {
storage_config: storage_config.clone(),
cache_split_registry: CachedSplitRegistry::new(storage_config),
}),
storage_config,
cached_split_registry,
}
}

/// Returns the cache storage stats
pub fn counters(&self) -> CacheStorageCounters {
self.inner.cache_split_registry.counters().as_counters()
if let Some(cached_split_registry) = &self.cached_split_registry {
cached_split_registry.counters()
} else {
CacheStorageCounters::default()
}
}

/// Returns the cached split registry or None on the nodes where cache is not configured
pub fn cached_split_registry(&self) -> Option<CachedSplitRegistry> {
self.cached_split_registry.clone()
}

/// Update all split caches on the node
Expand All @@ -212,18 +221,19 @@ impl CacheStorageFactory {
storage_resolver: &StorageResolver,
notifications: Vec<SplitsChangeNotification>,
) -> anyhow::Result<()> {
let mut splits: Vec<(String, String, Uri)> = Vec::new();
for notification in notifications {
splits.push((
notification.split_id,
notification.index_id,
notification.storage_uri.parse()?,
));
if let Some(cached_split_registry) = &self.cached_split_registry {
let mut splits: Vec<(String, String, Uri)> = Vec::new();
for notification in notifications {
splits.push((
notification.split_id,
notification.index_id,
notification.storage_uri.parse()?,
));
}
cached_split_registry
.bulk_update(storage_resolver, &splits)
.await;
}
self.inner
.cache_split_registry
.bulk_update(storage_resolver, &splits)
.await;
Ok(())
}
}
Expand All @@ -241,7 +251,7 @@ impl StorageFactory for CacheStorageFactory {
) -> Result<Arc<dyn Storage>, StorageResolverError> {
if uri.protocol().is_cache() {
// TODO: Prevent stack overflow here if cache uri is also cache
let cache_uri = self.inner.storage_config.cache_uri().ok_or_else(|| {
let cache_uri = self.storage_config.cache_uri().ok_or_else(|| {
StorageResolverError::InvalidConfig("Expected cache uri in config.".to_string())
})?;
let cache = storage_resolver.resolve(&cache_uri).await?;
Expand All @@ -259,22 +269,24 @@ impl StorageFactory for CacheStorageFactory {
StorageResolverError::InvalidConfig(message)
})?;
let storage = storage_resolver.resolve(&upstream_uri).await?;
let cache_storage = CacheStorage {
uri: uri.clone(),
storage,
cache,
cache_split_registry: self.inner.cache_split_registry.clone(),
let cache_storage = if let Some(cached_split_registry) = &self.cached_split_registry {
Arc::new(CacheStorage {
uri: uri.clone(),
storage,
cache,
cache_split_registry: cached_split_registry.clone(),
})
} else {
// We don't have cache configured on this node - we can just return an underlying
// storage
storage
};
Ok(Arc::new(cache_storage))
Ok(cache_storage)
} else {
let message = format!("URI `{uri}` is not a valid Cache URI.");
Err(StorageResolverError::InvalidUri(message))
}
}

fn as_cache_storage_factory(&self) -> Option<CacheStorageFactory> {
Some(self.clone())
}
}

#[cfg(test)]
Expand Down
26 changes: 17 additions & 9 deletions quickwit/quickwit-storage/src/cached_splits_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use tokio::sync::{RwLock, RwLockWriteGuard};
use tracing::error;

use crate::cache_storage::AtomicCacheStorageCounters;
use crate::{Storage, StorageResolver, StorageResult};
use crate::{CacheStorageCounters, Storage, StorageResolver, StorageResult};

enum SplitState {
Initializing,
Expand All @@ -45,6 +45,7 @@ struct SplitInfo {
state_lock: Arc<RwLock<SplitState>>,
}

/// Internal structure that mainains a list of cached splits
#[derive(Clone)]
pub struct CachedSplitRegistry {
inner: Arc<InnerCachedSplitRegistry>,
Expand All @@ -69,10 +70,15 @@ impl CachedSplitRegistry {
}
}

pub(crate) fn counters(&self) -> &AtomicCacheStorageCounters {
pub(crate) fn atomic_counters(&self) -> &AtomicCacheStorageCounters {
&self.inner.counters
}

/// Returns the stats
pub fn counters(&self) -> CacheStorageCounters {
self.inner.counters.as_counters()
}

// TODO why `Option`
async fn cache_storage(&self, storage_resolver: &StorageResolver) -> Option<Arc<dyn Storage>> {
self.inner
Expand All @@ -92,7 +98,7 @@ impl CachedSplitRegistry {
}

#[allow(dead_code)]
pub async fn load(
pub(crate) async fn load(
&self,
storage_resolver: &StorageResolver,
split_id: &str,
Expand Down Expand Up @@ -195,7 +201,7 @@ impl CachedSplitRegistry {
}

#[allow(dead_code)]
pub async fn delete(&self, storage_resolver: &StorageResolver, split_id: &String) {
pub(crate) async fn delete(&self, storage_resolver: &StorageResolver, split_id: &String) {
let splits_guard = self.inner.splits.write().await;
self.inner_delete(&splits_guard, storage_resolver, split_id)
.await;
Expand Down Expand Up @@ -245,6 +251,8 @@ impl CachedSplitRegistry {
Ok(())
}

/// Start an async process that updates the stored splits to match the list provided to the
/// method.
pub async fn bulk_update(
&self,
storage_resolver: &StorageResolver,
Expand Down Expand Up @@ -274,7 +282,7 @@ impl CachedSplitRegistry {
}
}

pub async fn get_slice(
pub(crate) async fn get_slice(
&self,
cache: Arc<dyn Storage>,
path: &Path,
Expand All @@ -294,7 +302,7 @@ impl CachedSplitRegistry {
None
}

pub async fn get_all(
pub(crate) async fn get_all(
&self,
cache: Arc<dyn Storage>,
path: &Path,
Expand Down Expand Up @@ -340,7 +348,7 @@ mod tests {
let split_id = "abcd".to_string();
let index_id = "my_index".to_string();
let path = PathBuf::new().join("abcd.split");
assert_eq!(registry.counters().as_counters().num_downloaded_splits, 0);
assert_eq!(registry.counters().num_downloaded_splits, 0);
storage
.put(&path, Box::new(b"abcdefg"[..].to_vec()))
.await
Expand Down Expand Up @@ -370,7 +378,7 @@ mod tests {
.await
.unwrap();

assert_eq!(registry.counters().as_counters().num_downloaded_splits, 1);
assert_eq!(registry.counters().num_downloaded_splits, 1);

registry.delete(&storage_resolver, &split_id).await;

Expand All @@ -394,7 +402,7 @@ mod tests {
.await
.is_err());

assert_eq!(registry.counters().as_counters().num_downloaded_splits, 0);
assert_eq!(registry.counters().num_downloaded_splits, 0);
}

async fn test_get_all(
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub use self::cache::{
wrap_storage_with_long_term_cache, ByteRangeCache, Cache, MemorySizedCache, QuickwitCache,
};
pub use self::cache_storage::{CacheStorageCounters, CacheStorageFactory};
pub use self::cached_splits_registry::CachedSplitRegistry;
pub use self::local_file_storage::{LocalFileStorage, LocalFileStorageFactory};
#[cfg(feature = "azure")]
pub use self::object_storage::{AzureBlobStorage, AzureBlobStorageFactory};
Expand Down
6 changes: 0 additions & 6 deletions quickwit/quickwit-storage/src/storage_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use async_trait::async_trait;
use quickwit_common::uri::Uri;
use quickwit_config::StorageBackend;

use crate::cache_storage::CacheStorageFactory;
use crate::{Storage, StorageResolver, StorageResolverError};

/// A storage factory builds a [`Storage`] object for a target [`StorageBackend`] from a
Expand All @@ -40,11 +39,6 @@ pub trait StorageFactory: Send + Sync + 'static {
storage_resolver: &StorageResolver,
uri: &Uri,
) -> Result<Arc<dyn Storage>, StorageResolverError>;

/// TODO: This is ugly. I will need to replace it when I refactor storage.
fn as_cache_storage_factory(&self) -> Option<CacheStorageFactory> {
None
}
}

/// A storage factory for handling unsupported or unavailable storage backends.
Expand Down
Loading

0 comments on commit 664e339

Please sign in to comment.