diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index 4f788ea7c48..944fa09c67d 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -186,6 +186,8 @@ pub struct SplitCacheLimits { pub max_num_splits: NonZeroU32, #[serde(default = "SplitCacheLimits::default_num_concurrent_downloads")] pub num_concurrent_downloads: NonZeroU32, + #[serde(default = "SplitCacheLimits::default_max_file_descriptors")] + pub max_file_descriptors: NonZeroU32, } impl SplitCacheLimits { @@ -196,15 +198,9 @@ impl SplitCacheLimits { fn default_num_concurrent_downloads() -> NonZeroU32 { NonZeroU32::new(1).unwrap() } -} -impl Default for SplitCacheLimits { - fn default() -> SplitCacheLimits { - SplitCacheLimits { - max_num_bytes: ByteSize::gb(1), - max_num_splits: NonZeroU32::new(100).unwrap(), - num_concurrent_downloads: NonZeroU32::new(1).unwrap(), - } + fn default_max_file_descriptors() -> NonZeroU32 { + NonZeroU32::new(100).unwrap() } } @@ -240,6 +236,34 @@ impl Default for SearcherConfig { } } +impl SearcherConfig { + fn validate(&self) -> anyhow::Result<()> { + if let Some(split_cache_limits) = self.split_cache { + if self.max_num_concurrent_split_searches + > split_cache_limits.max_file_descriptors.get() as usize + { + anyhow::bail!( + "max_num_concurrent_split_searches ({}) must be lower or equal to \ + split_cache.max_file_descriptors ({})", + self.max_num_concurrent_split_searches, + split_cache_limits.max_file_descriptors + ); + } + if self.max_num_concurrent_split_streams + > split_cache_limits.max_file_descriptors.get() as usize + { + anyhow::bail!( + "max_num_concurrent_split_streams ({}) must be lower or equal to \ + split_cache.max_file_descriptors ({})", + self.max_num_concurrent_split_streams, + split_cache_limits.max_file_descriptors + ); + } + } + Ok(()) + } +} + #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] #[serde(deny_unknown_fields, default)] pub struct IngestApiConfig { diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index b315d949a51..200fe49c91b 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -288,6 +288,7 @@ impl NodeConfigBuilder { self.storage_configs.validate()?; self.storage_configs.apply_flavors(); self.ingest_api_config.validate()?; + self.searcher_config.validate()?; let node_config = NodeConfig { cluster_id: self.cluster_id.resolve(env_vars)?, diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 0662f3f5d73..9e600d8afdd 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -142,7 +142,6 @@ impl IngestController { warn!("failed to sync with ingester `{ingester}`: not available"); return wait_handle; }; - let mut retain_shards_req = RetainShardsRequest::default(); for (source_uid, shard_ids) in &*model.list_shards_for_node(ingester) { let shards_for_source = RetainShardsForSource { diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 9b6e154366e..f2b29f386fe 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -459,14 +459,14 @@ pub async fn serve_quickwit( let split_cache_root_directory: PathBuf = node_config.data_dir_path.join("searcher-split-cache"); let split_cache_opt: Option> = - if let Some(split_cache_config) = node_config.searcher_config.split_cache { + if let Some(split_cache_limits) = node_config.searcher_config.split_cache { let split_cache = SplitCache::with_root_path( split_cache_root_directory, storage_resolver.clone(), - split_cache_config, + split_cache_limits, ) .context("failed to load searcher split cache")?; - Some(Arc::new(split_cache)) + Some(split_cache) } else { None }; diff --git a/quickwit/quickwit-storage/src/file_descriptor_cache.rs b/quickwit/quickwit-storage/src/file_descriptor_cache.rs new file mode 100644 index 00000000000..909455eac99 --- /dev/null +++ b/quickwit/quickwit-storage/src/file_descriptor_cache.rs @@ -0,0 +1,280 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::fs::File; +use std::io; +use std::num::{NonZeroU32, NonZeroUsize}; +use std::ops::Range; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; + +use tantivy::directory::OwnedBytes; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; +use ulid::Ulid; + +use crate::metrics::CacheMetrics; + +pub struct FileDescriptorCache { + fd_cache: Mutex>, + fd_semaphore: Arc, + fd_cache_metrics: CacheMetrics, +} + +#[derive(Clone)] +pub struct SplitFile(Arc); + +struct SplitFileInner { + num_bytes: u64, + // Order matters here. We want file to be dropped (closed) before the semaphore. + file: File, + _fd_semaphore_guard: OwnedSemaphorePermit, +} + +fn get_split_file_path(root_path: &Path, split_id: Ulid) -> PathBuf { + let split_filename = quickwit_common::split_file(split_id); + root_path.join(split_filename) +} + +impl FileDescriptorCache { + /// Creates a new file descriptor cache. + /// `max_fd_limit` is the total number of file descriptors that can be open at the same time. + /// `fd_cache_capacity` is the number of file descriptors that can be cached. It is required to + /// be less than `max_fd_limit`. + /// + /// # Warning + /// + /// The file descriptor cache can be prone to deadlocks. + /// Currently the risk is only avoided due to the split search concurrency limit. + /// + /// When setting the two limit, ensure the max_fd_limit is higher than the split search + /// concurrency limit and that you have set some margin between the two, and also make sure + /// the `max_fd_limit` is sufficient to avoid deadlocks. + /// + /// TODO It would be good to refactor this to enforce this with a bit of a refactoring. + /// For instance, client could be forced to declare upfront the number of file descriptors they + /// will need. In Quickwit however, one task is hitting one split at a time, so the risk is + /// absent. + fn new( + max_fd_limit: NonZeroU32, + fd_cache_capacity: NonZeroU32, + fd_cache_metrics: CacheMetrics, + ) -> FileDescriptorCache { + assert!(max_fd_limit.get() > fd_cache_capacity.get()); + let fd_cache = Mutex::new(lru::LruCache::new( + NonZeroUsize::new(fd_cache_capacity.get() as usize).unwrap(), + )); + let fd_semaphore = Arc::new(Semaphore::new(max_fd_limit.get() as usize)); + FileDescriptorCache { + fd_cache, + fd_semaphore, + fd_cache_metrics, + } + } + + pub fn with_fd_cache_capacity(fd_cache_capacity: NonZeroU32) -> FileDescriptorCache { + let max_fd_limit = (fd_cache_capacity.get() * 2) + .clamp(fd_cache_capacity.get() + 100, fd_cache_capacity.get() + 200); + Self::new( + NonZeroU32::new(max_fd_limit).unwrap(), + fd_cache_capacity, + crate::STORAGE_METRICS.fd_cache_metrics.clone(), + ) + } + + fn get_split_file(&self, split_id: Ulid) -> Option { + self.fd_cache.lock().unwrap().get(&split_id).cloned() + } + + fn put_split_file(&self, split_id: Ulid, split_file: SplitFile) { + let mut fd_cache_lock = self.fd_cache.lock().unwrap(); + fd_cache_lock.push(split_id, split_file); + self.fd_cache_metrics + .in_cache_count + .set(fd_cache_lock.len() as i64); + } + + /// Evicts the given list of split ids from the file descriptor cache. + /// This method does NOT remove the actual files. + pub fn evict_split_files(&self, split_ids: &[Ulid]) { + let mut fd_cache_lock = self.fd_cache.lock().unwrap(); + for split_id in split_ids { + fd_cache_lock.pop(split_id); + } + self.fd_cache_metrics + .in_cache_count + .set(fd_cache_lock.len() as i64); + } + + pub async fn get_or_open_split_file( + &self, + root_path: &Path, + split_id: Ulid, + num_bytes: u64, + ) -> std::io::Result { + if let Some(split_file) = self.get_split_file(split_id) { + self.fd_cache_metrics.hits_num_items.inc(); + return Ok(split_file); + } else { + self.fd_cache_metrics.misses_num_items.inc(); + } + let split_path = get_split_file_path(root_path, split_id); + let fd_semaphore_guard = Semaphore::acquire_owned(self.fd_semaphore.clone()) + .await + .expect("fd_semaphore acquire failed. please report"); + let file: File = tokio::task::spawn_blocking(move || std::fs::File::open(split_path)) + .await + .map_err(|join_error| { + io::Error::new( + io::ErrorKind::Other, + format!("Failed to open file: {:?}", join_error), + ) + })??; + let split_file = SplitFile(Arc::new(SplitFileInner { + num_bytes, + file, + _fd_semaphore_guard: fd_semaphore_guard, + })); + self.put_split_file(split_id, split_file.clone()); + Ok(split_file) + } +} + +impl SplitFile { + pub async fn get_range(&self, range: Range) -> io::Result { + use std::os::unix::fs::FileExt; + let file = self.clone(); + let buf = tokio::task::spawn_blocking(move || { + let mut buf = vec![0u8; range.len()]; + file.0.file.read_exact_at(&mut buf, range.start as u64)?; + io::Result::Ok(buf) + }) + .await + .unwrap()?; + Ok(OwnedBytes::new(buf)) + } + + pub async fn get_all(&self) -> io::Result { + self.get_range(0..self.0.num_bytes as usize).await + } +} + +#[cfg(test)] +mod tests { + use std::num::NonZeroU32; + + use tokio::fs; + use ulid::Ulid; + + use super::FileDescriptorCache; + use crate::metrics::CacheMetrics; + + #[tokio::test] + async fn test_fd_cache_big_cache() { + let cache_metrics = CacheMetrics::for_component("fdtest"); + let fd_cache = FileDescriptorCache::new( + NonZeroU32::new(20).unwrap(), + NonZeroU32::new(10).unwrap(), + cache_metrics.clone(), + ); + let tempdir = tempfile::tempdir().unwrap(); + let split_ids: Vec = std::iter::repeat_with(Ulid::new).take(100).collect(); + for &split_id in &split_ids { + let split_filepath = super::get_split_file_path(tempdir.path(), split_id); + let content = split_id.to_string(); + assert_eq!(content.len(), 26); + fs::write(split_filepath, content.as_bytes()).await.unwrap(); + } + for &split_id in &split_ids[0..10] { + fd_cache + .get_or_open_split_file(tempdir.path(), split_id, 26) + .await + .unwrap(); + } + for &split_id in &split_ids[0..10] { + fd_cache + .get_or_open_split_file(tempdir.path(), split_id, 26) + .await + .unwrap(); + } + for &split_id in &split_ids[0..10] { + fd_cache + .get_or_open_split_file(tempdir.path(), split_id, 26) + .await + .unwrap(); + } + assert_eq!(cache_metrics.in_cache_count.get(), 10); + assert_eq!(cache_metrics.hits_num_items.get(), 20); + assert_eq!(cache_metrics.misses_num_items.get(), 10); + } + + // This mimicks Quickwit's workload where the fd cache is much smaller than the number of + // splits. Each search will read from the same split file, and the cache will help avoid + // opening the file several times. + #[tokio::test] + async fn test_fd_cache_small_cache() { + let cache_metrics = CacheMetrics::for_component("fdtest2"); + let fd_cache = FileDescriptorCache::new( + NonZeroU32::new(20).unwrap(), + NonZeroU32::new(10).unwrap(), + cache_metrics.clone(), + ); + let tempdir = tempfile::tempdir().unwrap(); + let split_ids: Vec = std::iter::repeat_with(Ulid::new).take(100).collect(); + for &split_id in &split_ids { + let split_filepath = super::get_split_file_path(tempdir.path(), split_id); + let content = split_id.to_string(); + assert_eq!(content.len(), 26); + fs::write(split_filepath, content.as_bytes()).await.unwrap(); + } + for &split_id in &split_ids[0..100] { + for _ in 0..10 { + fd_cache + .get_or_open_split_file(tempdir.path(), split_id, 26) + .await + .unwrap(); + } + } + assert_eq!(cache_metrics.in_cache_count.get(), 10); + assert_eq!(cache_metrics.hits_num_items.get(), 100 * 9); + assert_eq!(cache_metrics.misses_num_items.get(), 100); + } + + #[tokio::test] + async fn test_split_file() { + let fd_cache = FileDescriptorCache::with_fd_cache_capacity(NonZeroU32::new(20).unwrap()); + let tempdir = tempfile::tempdir().unwrap(); + let split_id: Ulid = Ulid::new(); + let split_filepath = super::get_split_file_path(tempdir.path(), split_id); + let content = split_id.to_string(); + assert_eq!(content.len(), 26); + fs::write(split_filepath, content.as_bytes()).await.unwrap(); + let split_file = fd_cache + .get_or_open_split_file(tempdir.path(), split_id, 26) + .await + .unwrap(); + { + let bytes = split_file.get_all().await.unwrap(); + assert_eq!(bytes.as_slice(), content.as_bytes()); + } + { + let bytes = split_file.get_range(1..3).await.unwrap(); + assert_eq!(bytes.as_slice(), &content.as_bytes()[1..3]); + } + } +} diff --git a/quickwit/quickwit-storage/src/lib.rs b/quickwit/quickwit-storage/src/lib.rs index 5027be9cd76..f082a41b9b0 100644 --- a/quickwit/quickwit-storage/src/lib.rs +++ b/quickwit/quickwit-storage/src/lib.rs @@ -32,6 +32,7 @@ //! - The `BundleStorage` bundles together multiple files into a single file. mod cache; mod debouncer; +mod file_descriptor_cache; mod metrics; mod storage; pub use debouncer::AsyncDebouncer; diff --git a/quickwit/quickwit-storage/src/metrics.rs b/quickwit/quickwit-storage/src/metrics.rs index 2e56be0e4fe..e442f765f1a 100644 --- a/quickwit/quickwit-storage/src/metrics.rs +++ b/quickwit/quickwit-storage/src/metrics.rs @@ -26,6 +26,7 @@ use quickwit_common::metrics::{new_counter, new_gauge, IntCounter, IntGauge}; pub struct StorageMetrics { pub shortlived_cache: CacheMetrics, pub partial_request_cache: CacheMetrics, + pub fd_cache_metrics: CacheMetrics, pub fast_field_cache: CacheMetrics, pub split_footer_cache: CacheMetrics, pub searcher_split_cache: CacheMetrics, @@ -40,10 +41,10 @@ impl Default for StorageMetrics { fn default() -> Self { StorageMetrics { fast_field_cache: CacheMetrics::for_component("fastfields"), + fd_cache_metrics: CacheMetrics::for_component("fd"), shortlived_cache: CacheMetrics::for_component("shortlived"), partial_request_cache: CacheMetrics::for_component("partial_request"), searcher_split_cache: CacheMetrics::for_component("searcher_split"), - split_footer_cache: CacheMetrics::for_component("splitfooter"), object_storage_get_total: new_counter( "object_storage_gets_total", diff --git a/quickwit/quickwit-storage/src/split_cache/download_task.rs b/quickwit/quickwit-storage/src/split_cache/download_task.rs index 75065469697..ae32ba096aa 100644 --- a/quickwit/quickwit-storage/src/split_cache/download_task.rs +++ b/quickwit/quickwit-storage/src/split_cache/download_task.rs @@ -18,34 +18,15 @@ // along with this program. If not, see . use std::num::NonZeroU32; -use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex}; +use std::path::Path; +use std::sync::Arc; use std::time::Duration; use quickwit_common::split_file; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; -use tracing::{error, instrument}; -use ulid::Ulid; -use crate::split_cache::split_table::{CandidateSplit, DownloadOpportunity, SplitTable}; -use crate::StorageResolver; - -/// Removes the evicted split files from the file system. -/// This function just logs errors, and swallows them. -/// -/// At this point, the disk space is already accounted as released, -/// so the error could result in a "disk space leak". -#[instrument] -pub(crate) fn delete_evicted_splits(root_path: &Path, splits_to_delete: &[Ulid]) { - for &split_to_delete in splits_to_delete { - let split_file_path = root_path.join(split_file(split_to_delete)); - if let Err(_io_err) = std::fs::remove_file(&split_file_path) { - // This is an pretty critical error. The split size is not tracked anymore at this - // point. - error!(path=%split_file_path.display(), "failed to remove split file from cache directory. This is critical as the file is now not taken in account in the cache size limits"); - } - } -} +use crate::split_cache::split_table::{CandidateSplit, DownloadOpportunity}; +use crate::{SplitCache, StorageResolver}; async fn download_split( root_path: &Path, @@ -68,9 +49,8 @@ async fn download_split( async fn perform_eviction_and_download( download_opportunity: DownloadOpportunity, - root_path: PathBuf, + split_cache: Arc, storage_resolver: StorageResolver, - shared_split_table: Arc>, _download_permit: OwnedSemaphorePermit, ) -> anyhow::Result<()> { let DownloadOpportunity { @@ -79,20 +59,20 @@ async fn perform_eviction_and_download( } = download_opportunity; let split_ulid = split_to_download.split_ulid; // tokio io runs on `spawn_blocking` threads anyway. - let root_path_clone = root_path.clone(); + let split_cache_clone = split_cache.clone(); let _ = tokio::task::spawn_blocking(move || { - delete_evicted_splits(&root_path_clone, &splits_to_delete[..]); + split_cache_clone.evict(&splits_to_delete[..]); }) .await; - let num_bytes = download_split(&root_path, &split_to_download, storage_resolver).await?; - let mut shared_split_table_lock = shared_split_table.lock().unwrap(); + let num_bytes = + download_split(&split_cache.root_path, &split_to_download, storage_resolver).await?; + let mut shared_split_table_lock = split_cache.split_table.lock().unwrap(); shared_split_table_lock.register_as_downloaded(split_ulid, num_bytes); Ok(()) } pub(crate) fn spawn_download_task( - root_path: PathBuf, - shared_split_table: Arc>, + split_cache: Arc, storage_resolver: StorageResolver, num_concurrent_downloads: NonZeroU32, ) { @@ -100,16 +80,17 @@ pub(crate) fn spawn_download_task( tokio::task::spawn(async move { loop { let download_permit = Semaphore::acquire_owned(semaphore.clone()).await.unwrap(); - let download_opportunity_opt = shared_split_table + let download_opportunity_opt = split_cache + .split_table .lock() .unwrap() .find_download_opportunity(); if let Some(download_opportunity) = download_opportunity_opt { + let split_cache_clone = split_cache.clone(); tokio::task::spawn(perform_eviction_and_download( download_opportunity, - root_path.clone(), + split_cache_clone, storage_resolver.clone(), - shared_split_table.clone(), download_permit, )); } else { diff --git a/quickwit/quickwit-storage/src/split_cache/mod.rs b/quickwit/quickwit-storage/src/split_cache/mod.rs index 9b996d77ab7..d2fbacd9727 100644 --- a/quickwit/quickwit-storage/src/split_cache/mod.rs +++ b/quickwit/quickwit-storage/src/split_cache/mod.rs @@ -22,23 +22,24 @@ mod split_table; use std::collections::BTreeMap; use std::ffi::OsStr; -use std::fs::File; -use std::io::{self, Read, Seek, SeekFrom}; +use std::io; use std::ops::Range; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::{Arc, Mutex}; use async_trait::async_trait; +use quickwit_common::split_file; use quickwit_common::uri::Uri; use quickwit_config::SplitCacheLimits; use quickwit_proto::search::ReportSplit; use tantivy::directory::OwnedBytes; -use tracing::{error, info, warn}; +use tracing::{error, info, instrument, warn}; use ulid::Ulid; -use crate::split_cache::download_task::{delete_evicted_splits, spawn_download_task}; -use crate::split_cache::split_table::{SplitGuard, SplitTable}; +use crate::file_descriptor_cache::{FileDescriptorCache, SplitFile}; +use crate::split_cache::download_task::spawn_download_task; +use crate::split_cache::split_table::SplitTable; use crate::{wrap_storage_with_cache, Storage, StorageCache}; /// On disk Cache of splits for searchers. @@ -50,7 +51,8 @@ pub struct SplitCache { root_path: PathBuf, // In memory structure, listing the splits we know about regardless // of whether they are in cache, being downloaded, or just available for download. - split_table: Arc>, + split_table: Mutex, + fd_cache: FileDescriptorCache, } impl SplitCache { @@ -60,7 +62,7 @@ impl SplitCache { root_path: PathBuf, storage_resolver: crate::StorageResolver, limits: SplitCacheLimits, - ) -> io::Result { + ) -> io::Result> { std::fs::create_dir_all(&root_path)?; let mut existing_splits: BTreeMap = Default::default(); for dir_entry_res in std::fs::read_dir(&root_path)? { @@ -97,28 +99,35 @@ impl SplitCache { let mut split_table = SplitTable::with_limits_and_existing_splits(limits, existing_splits); // In case of a setting change, it could be useful to evict some splits on startup. - let splits_to_remove_opt = split_table.make_room_for_split_if_necessary(u64::MAX); - let root_path_clone = root_path.clone(); - if let Some(splits_to_remove) = splits_to_remove_opt { + let splits_to_remove_res = split_table.make_room_for_split_if_necessary(u64::MAX); + if let Ok(splits_to_remove) = splits_to_remove_res { info!( num_splits = splits_to_remove.len(), "Evicting splits from the searcher cache. Has the node configuration changed?" ); - delete_evicted_splits(&root_path_clone, &splits_to_remove[..]); + delete_evicted_splits(&root_path, &splits_to_remove[..]); } - let split_table_arc = Arc::new(Mutex::new(split_table)); + let fd_cache = FileDescriptorCache::with_fd_cache_capacity(limits.max_file_descriptors); + let split_cache = Arc::new(SplitCache { + root_path, + split_table: Mutex::new(split_table), + fd_cache, + }); spawn_download_task( - root_path.clone(), - split_table_arc.clone(), + split_cache.clone(), storage_resolver, limits.num_concurrent_downloads, ); - Ok(SplitCache { - root_path, - split_table: split_table_arc, - }) + Ok(split_cache) + } + + /// Remove splits from both the fd cache and the split cache. + /// This method does NOT update the split table. + pub(crate) fn evict(&self, splits_to_evict: &[Ulid]) { + self.fd_cache.evict_split_files(splits_to_evict); + delete_evicted_splits(&self.root_path, splits_to_evict); } /// Wraps a storage with our split cache. @@ -146,34 +155,39 @@ impl SplitCache { } } - fn cached_split_filepath(&self, split_id: Ulid) -> PathBuf { - let split_filename = quickwit_common::split_file(split_id); - self.root_path.join(split_filename) - } - // Returns a split guard object. As long as it is not dropped, the // split won't be evinced from the cache. - fn get_split_guard(&self, split_id: Ulid, storage_uri: &Uri) -> Option { - let split_guard = self + async fn get_split_file(&self, split_id: Ulid, storage_uri: &Uri) -> Option { + // We touch before even checking the fd cache in order to update the file's last access time + // for the file cache. + let num_bytes_opt: Option = self .split_table .lock() .unwrap() - .get_split_guard(split_id, storage_uri)?; - Some(SplitFilepath { - _split_guard: split_guard, - cached_split_file_path: self.cached_split_filepath(split_id), - }) - } -} + .touch(split_id, storage_uri); -pub struct SplitFilepath { - _split_guard: SplitGuard, - cached_split_file_path: PathBuf, + let num_bytes = num_bytes_opt?; + self.fd_cache + .get_or_open_split_file(&self.root_path, split_id, num_bytes) + .await + .ok() + } } -impl AsRef for SplitFilepath { - fn as_ref(&self) -> &Path { - &self.cached_split_file_path +/// Removes the evicted split files from the file system. +/// This function just logs errors, and swallows them. +/// +/// At this point, the disk space is already accounted as released, +/// so the error could result in a "disk space leak". +#[instrument] +fn delete_evicted_splits(root_path: &Path, splits_to_delete: &[Ulid]) { + for &split_to_delete in splits_to_delete { + let split_file_path = root_path.join(split_file(split_to_delete)); + if let Err(_io_err) = std::fs::remove_file(&split_file_path) { + // This is an pretty critical error. The split size is not tracked anymore at this + // point. + error!(path=%split_file_path.display(), "failed to remove split file from cache directory. This is critical as the file is now not taken in account in the cache size limits"); + } } } @@ -191,41 +205,22 @@ struct SplitCacheBackingStorage { impl SplitCacheBackingStorage { async fn get_impl(&self, path: &Path, byte_range: Range) -> Option { let split_id = split_id_from_path(path)?; - let split_guard = self + let split_file: SplitFile = self .split_cache - .get_split_guard(split_id, &self.storage_root_uri)?; - // TODO touch file in cache. + .get_split_file(split_id, &self.storage_root_uri) + .await?; // We don't use async file io here because it spawn blocks anyway, and it feels dumb to // spawn block 3 times in a row. - tokio::task::spawn_blocking(move || { - let mut file = File::open(&split_guard).ok()?; - file.seek(SeekFrom::Start(byte_range.start as u64)).ok()?; - let mut buf = Vec::with_capacity(byte_range.len()); - file.take(byte_range.len() as u64) - .read_to_end(&mut buf) - .ok()?; - Some(OwnedBytes::new(buf)) - }) - .await - // TODO Remove file from cache if io error? - .ok()? + split_file.get_range(byte_range).await.ok() } async fn get_all_impl(&self, path: &Path) -> Option { let split_id = split_id_from_path(path)?; - let split_guard = self + let split_file = self .split_cache - .get_split_guard(split_id, &self.storage_root_uri)?; - // We don't use async file io here because it spawn blocks anyway, and it feels dumb to - // spawn block 3 times in a row. - tokio::task::spawn_blocking(move || { - let mut file = File::open(split_guard).ok()?; - let mut buf = Vec::new(); - file.read_to_end(&mut buf).ok()?; - Some(OwnedBytes::new(buf)) - }) - .await - .ok()? + .get_split_file(split_id, &self.storage_root_uri) + .await?; + split_file.get_all().await.ok() } fn record_hit_metrics(&self, result_opt: Option<&OwnedBytes>) { diff --git a/quickwit/quickwit-storage/src/split_cache/split_table.rs b/quickwit/quickwit-storage/src/split_cache/split_table.rs index f99e6def63d..86a63e1fbce 100644 --- a/quickwit/quickwit-storage/src/split_cache/split_table.rs +++ b/quickwit/quickwit-storage/src/split_cache/split_table.rs @@ -148,23 +148,7 @@ fn compute_timestamp(start: Instant) -> LastAccessDate { start.elapsed().as_micros() as u64 } -// TODO improve SplitGuard with Atomic -// Right only touch is helping. -pub(super) struct SplitGuard; - impl SplitTable { - pub(super) fn get_split_guard( - &mut self, - split_ulid: Ulid, - storage_uri: &Uri, - ) -> Option { - if let Status::OnDisk { .. } = self.touch(split_ulid, storage_uri) { - Some(SplitGuard) - } else { - None - } - } - fn remove(&mut self, split_ulid: Ulid) -> Option { let split_info = self.split_to_status.remove(&split_ulid)?; let split_queue: &mut BTreeSet = match split_info.status { @@ -248,9 +232,15 @@ impl SplitTable { assert!(split_ulid_was_absent); } - fn touch(&mut self, split_ulid: Ulid, storage_uri: &Uri) -> Status { + /// Touch the file, updating its last access time, possibly extending its life in the + /// cache (if in cache). + /// + /// If the file is already on the disk cache, return `Some(num_bytes)`. + /// If the file is not in cache, return `None`, and register the file in the candidate for + /// download list. + pub fn touch(&mut self, split_ulid: Ulid, storage_uri: &Uri) -> Option { let timestamp = compute_timestamp(self.origin_time); - self.mutate_split(split_ulid, |old_split_info| { + let status = self.mutate_split(split_ulid, |old_split_info| { if let Some(mut split_info) = old_split_info { split_info.split_key.last_accessed = timestamp; split_info @@ -267,7 +257,12 @@ impl SplitTable { }), } } - }) + }); + if let Status::OnDisk { num_bytes } = status { + Some(num_bytes) + } else { + None + } } /// Mutates a split ulid. @@ -385,7 +380,7 @@ impl SplitTable { pub(crate) fn make_room_for_split_if_necessary( &mut self, last_access_date: LastAccessDate, - ) -> Option> { + ) -> Result, NoRoomAvailable> { let mut split_infos = Vec::new(); while self.is_out_of_limits() { if let Some(first_split) = self.on_disk_splits.first() { @@ -404,21 +399,20 @@ impl SplitTable { for split_info in split_infos { self.insert(split_info); } - None + Err(NoRoomAvailable) } else { - Some( - split_infos - .into_iter() - .map(|split_info| split_info.split_key.split_ulid) - .collect(), - ) + Ok(split_infos + .into_iter() + .map(|split_info| split_info.split_key.split_ulid) + .collect()) } } pub(crate) fn find_download_opportunity(&mut self) -> Option { let best_candidate_split_key = self.best_candidate()?; - let splits_to_delete: Vec = - self.make_room_for_split_if_necessary(best_candidate_split_key.last_accessed)?; + let splits_to_delete: Vec = self + .make_room_for_split_if_necessary(best_candidate_split_key.last_accessed) + .ok()?; let split_to_download: CandidateSplit = self.start_download(best_candidate_split_key.split_ulid)?; Some(DownloadOpportunity { @@ -433,6 +427,9 @@ impl SplitTable { } } +#[derive(Clone, Copy, Debug)] +pub(crate) struct NoRoomAvailable; + #[derive(Clone, Debug, Eq, PartialEq)] pub(crate) struct CandidateSplit { pub storage_uri: Uri, @@ -474,6 +471,7 @@ mod tests { max_num_bytes: ByteSize::kb(1), max_num_splits: NonZeroU32::new(1).unwrap(), num_concurrent_downloads: NonZeroU32::new(1).unwrap(), + max_file_descriptors: NonZeroU32::new(100).unwrap(), }, Default::default(), ); @@ -493,6 +491,7 @@ mod tests { max_num_bytes: ByteSize::kb(1), max_num_splits: NonZeroU32::new(1).unwrap(), num_concurrent_downloads: NonZeroU32::new(1).unwrap(), + max_file_descriptors: NonZeroU32::new(100).unwrap(), }, Default::default(), ); @@ -501,8 +500,8 @@ mod tests { let ulid2 = ulids[1]; split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI)); split_table.report(ulid2, Uri::for_test(TEST_STORAGE_URI)); - let split_guard_opt = split_table.get_split_guard(ulid1, &Uri::for_test("s3://test1/")); - assert!(split_guard_opt.is_none()); + let num_bytes_opt = split_table.touch(ulid1, &Uri::for_test("s3://test1/")); + assert!(num_bytes_opt.is_none()); let candidate = split_table.best_candidate().unwrap(); assert_eq!(candidate.split_ulid, ulid1); } @@ -514,6 +513,7 @@ mod tests { max_num_bytes: ByteSize::kb(1), max_num_splits: NonZeroU32::new(1).unwrap(), num_concurrent_downloads: NonZeroU32::new(1).unwrap(), + max_file_descriptors: NonZeroU32::new(100).unwrap(), }, Default::default(), ); @@ -525,7 +525,10 @@ mod tests { assert!(split_table.start_download(ulid1).is_none()); split_table.register_as_downloaded(ulid1, 10_000_000); assert_eq!(split_table.num_bytes(), 10_000_000); - split_table.get_split_guard(ulid1, &Uri::for_test(TEST_STORAGE_URI)); + assert_eq!( + split_table.touch(ulid1, &Uri::for_test(TEST_STORAGE_URI)), + Some(10_000_000) + ); let ulid2 = Ulid::new(); split_table.report(ulid2, Uri::for_test("s3://test`/")); let download = split_table.start_download(ulid2); @@ -543,6 +546,7 @@ mod tests { max_num_bytes: ByteSize::mb(1), max_num_splits: NonZeroU32::new(30).unwrap(), num_concurrent_downloads: NonZeroU32::new(1).unwrap(), + max_file_descriptors: NonZeroU32::new(100).unwrap(), }, Default::default(), ); @@ -580,6 +584,7 @@ mod tests { max_num_bytes: ByteSize::mb(10), max_num_splits: NonZeroU32::new(5).unwrap(), num_concurrent_downloads: NonZeroU32::new(1).unwrap(), + max_file_descriptors: NonZeroU32::new(100).unwrap(), }, Default::default(), ); @@ -614,6 +619,7 @@ mod tests { max_num_bytes: ByteSize::mb(10), max_num_splits: NonZeroU32::new(5).unwrap(), num_concurrent_downloads: NonZeroU32::new(1).unwrap(), + max_file_descriptors: NonZeroU32::new(100).unwrap(), }, Default::default(), ); @@ -643,6 +649,7 @@ mod tests { max_num_bytes: ByteSize::mb(10), max_num_splits: NonZeroU32::new(5).unwrap(), num_concurrent_downloads: NonZeroU32::new(1).unwrap(), + max_file_descriptors: NonZeroU32::new(100).unwrap(), }, Default::default(), );