diff --git a/.gitignore b/.gitignore index 912a20bd8e0..29c4c486604 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ **/flamegraph.svg local/** quickwit/quickwit-ui/package-lock.json +**/.DS_Store # Remove Cargo.lock from gitignore if creating an executable, leave it for libraries diff --git a/docs/internals/searcher-split-cache.md b/docs/internals/searcher-split-cache.md new file mode 100644 index 00000000000..b22190efd7a --- /dev/null +++ b/docs/internals/searcher-split-cache.md @@ -0,0 +1,24 @@ + +# Searcher split cache + +Quickwit includes a split cache. It can be useful for specific workloads: +- to improve performance +- to reduce the cost associated with GET requests. + +The split cache stores entire split files on disk. +It works under the following configurable constraints: +- number of concurrent downloads +- amount of disk space +- number of on-disk files. + +Searcher get tipped by indexers about the existence of splits (for which they have the best affinity). +They also might learn about split existence, upon read requests. + +The searcher is then in charge of maintaining an in-memory data structure with a bounded list of splits it knows about and their score. +The current strategy for admission/evicton is a simple LRU logic. + +If the most recently accessed split not already in cache has been accessed, we consider downloading it. +If the limits have been reached, we only proceed to eviction if one of the split currently +in cache has been less recently accessed. + + diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 4eabf46cb23..3b6410b92f8 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -69,9 +69,9 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "1.0.5" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c378d78423fdad8089616f827526ee33c19f2fddbd5de1629152c9593ba4783" +checksum = "0f2135563fb5c609d2b2b87c1e8ce7bc41b0b45430fa9661f457981503dd5bf0" dependencies = [ "memchr", ] @@ -290,7 +290,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.34", + "syn 2.0.37", ] [[package]] @@ -301,7 +301,7 @@ checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.34", + "syn 2.0.37", ] [[package]] @@ -1275,18 +1275,18 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.3" +version = "4.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84ed82781cea27b43c9b106a979fe450a13a31aab0500595fb3fc06616de08e6" +checksum = "b1d7b8d5ec32af0fadc644bf1fd509a688c2103b185644bb1e29d164e0703136" dependencies = [ "clap_builder", ] [[package]] name = "clap_builder" -version = "4.4.2" +version = "4.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bb9faaa7c2ef94b2743a21f5a29e6f0010dff4caa69ac8e9d6cf8b6fa74da08" +checksum = "5179bb514e4d7c2051749d8fcefa2ed6d06a9f4e6d69faf3805f5d80b8cf8d56" dependencies = [ "anstream", "anstyle", @@ -1669,7 +1669,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.34", + "syn 2.0.37", ] [[package]] @@ -1691,7 +1691,7 @@ checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" dependencies = [ "darling_core 0.20.3", "quote", - "syn 2.0.34", + "syn 2.0.37", ] [[package]] @@ -1901,9 +1901,9 @@ dependencies = [ [[package]] name = "dyn-clone" -version = "1.0.13" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbfc4744c1b8f2a09adc0e55242f60b1af195d88596bd8700be74418c056c555" +checksum = "23d2f3407d9a573d666de4b5bdf10569d73ca9478087346697dcbae6244bfbcd" [[package]] name = "either" @@ -1916,9 +1916,9 @@ dependencies = [ [[package]] name = "elasticsearch-dsl" -version = "0.4.18" +version = "0.4.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2769dbd50c1e0d74c0685a8b41f2dcc9cb0451fd27b095cab3d2c3301ced4ba8" +checksum = "7066057f05cce0d1c6c7cdbd1e99a83f903bad35faf070cd86e34aa5bc504feb" dependencies = [ "chrono", "num-traits", @@ -2040,7 +2040,7 @@ checksum = "eecf8589574ce9b895052fa12d69af7a233f99e6107f5cb8dd1044f2a17bfdcb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.34", + "syn 2.0.37", ] [[package]] @@ -2343,7 +2343,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.34", + "syn 2.0.37", ] [[package]] @@ -2718,9 +2718,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b" +checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" [[package]] name = "hex" @@ -3005,9 +3005,9 @@ dependencies = [ [[package]] name = "indoc" -version = "2.0.3" +version = "2.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c785eefb63ebd0e33416dfcb8d6da0bf27ce752843a45632a67bf10d4d4b5c4" +checksum = "1e186cfbae8084e513daff4240b4797e342f988cecda4fb6c939150f96315fd8" [[package]] name = "infer" @@ -4028,7 +4028,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.34", + "syn 2.0.37", ] [[package]] @@ -4039,9 +4039,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-src" -version = "300.1.3+3.1.2" +version = "300.1.5+3.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd2c101a165fff9935e34def4669595ab1c7847943c42be86e21503e482be107" +checksum = "559068e4c12950d7dcaa1857a61725c0d38d4fc03ff8e070ab31a75d6e316491" dependencies = [ "cc", ] @@ -4188,7 +4188,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.34", + "syn 2.0.37", ] [[package]] @@ -4400,7 +4400,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.34", + "syn 2.0.37", ] [[package]] @@ -4605,14 +4605,14 @@ dependencies = [ [[package]] name = "predicates" -version = "3.0.3" +version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09963355b9f467184c04017ced4a2ba2d75cbcb4e7462690d388233253d4b1a9" +checksum = "6dfc28575c2e3f19cb3c73b93af36460ae898d426eba6fc15b9bd2a5220758a0" dependencies = [ "anstyle", "difflib", "float-cmp", - "itertools 0.10.5", + "itertools 0.11.0", "normalize-line-endings", "predicates-core", "regex", @@ -4661,7 +4661,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae005bd773ab59b4725093fd7df83fd7892f7d8eafb48dbd7de6e024e4215f9d" dependencies = [ "proc-macro2", - "syn 2.0.34", + "syn 2.0.37", ] [[package]] @@ -4963,7 +4963,7 @@ dependencies = [ "openssl-probe", "opentelemetry", "opentelemetry-otlp", - "predicates 3.0.3", + "predicates 3.0.4", "quickwit-actors", "quickwit-cluster", "quickwit-common", @@ -5037,7 +5037,7 @@ dependencies = [ "prost-build", "quote", "serde", - "syn 2.0.34", + "syn 2.0.37", "tonic-build", ] @@ -5479,7 +5479,7 @@ dependencies = [ "proc-macro2", "quickwit-macros-impl", "quote", - "syn 2.0.34", + "syn 2.0.37", ] [[package]] @@ -5489,7 +5489,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.34", + "syn 2.0.37", ] [[package]] @@ -5765,6 +5765,7 @@ dependencies = [ "azure_storage", "azure_storage_blobs", "base64 0.21.4", + "byte-unit", "bytes", "fnv", "futures", @@ -5777,6 +5778,7 @@ dependencies = [ "quickwit-aws", "quickwit-common", "quickwit-config", + "quickwit-proto", "rand 0.8.5", "regex", "serde", @@ -5789,6 +5791,7 @@ dependencies = [ "tokio-util", "tracing", "tracing-subscriber", + "ulid", ] [[package]] @@ -6211,7 +6214,7 @@ dependencies = [ "proc-macro2", "quote", "rust-embed-utils", - "syn 2.0.34", + "syn 2.0.37", "walkdir", ] @@ -6502,7 +6505,7 @@ checksum = "389894603bd18c46fa56231694f8d827779c0951a667087194cf9de94ed24682" dependencies = [ "proc-macro2", "quote", - "syn 2.0.34", + "syn 2.0.37", ] [[package]] @@ -6629,7 +6632,7 @@ dependencies = [ "darling 0.20.3", "proc-macro2", "quote", - "syn 2.0.34", + "syn 2.0.37", ] [[package]] @@ -7148,9 +7151,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.34" +version = "2.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88ec6cdb6a4c16306eccf52ccd8d492e4ab64705a15a5016acb205251001bf72" +checksum = "7303ef2c05cd654186cb250d29049a24840ca25d2747c25c0381c8d9e2f582e8" dependencies = [ "proc-macro2", "quote", @@ -7406,7 +7409,7 @@ checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35" dependencies = [ "proc-macro2", "quote", - "syn 2.0.34", + "syn 2.0.37", ] [[package]] @@ -7568,7 +7571,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.34", + "syn 2.0.37", ] [[package]] @@ -7850,7 +7853,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.34", + "syn 2.0.37", ] [[package]] @@ -7978,7 +7981,7 @@ checksum = "bfc13d450dc4a695200da3074dacf43d449b968baee95e341920e47f61a3b40f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.34", + "syn 2.0.37", ] [[package]] @@ -8049,9 +8052,9 @@ checksum = "1dd624098567895118886609431a7c3b8f516e41d30e0643f03d94592a147e36" [[package]] name = "unicode-width" -version = "0.1.10" +version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b" +checksum = "e51733f11c9c4f72aa0c160008246859e340b00807569a0da0e7a1079b27ba85" [[package]] name = "unicode-xid" @@ -8150,7 +8153,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.34", + "syn 2.0.37", ] [[package]] @@ -8459,7 +8462,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.34", + "syn 2.0.37", "wasm-bindgen-shared", ] @@ -8493,7 +8496,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.34", + "syn 2.0.37", "wasm-bindgen-backend", "wasm-bindgen-shared", ] diff --git a/quickwit/quickwit-cli/src/tool.rs b/quickwit/quickwit-cli/src/tool.rs index c0061c6d8d1..dca008b9867 100644 --- a/quickwit/quickwit-cli/src/tool.rs +++ b/quickwit/quickwit-cli/src/tool.rs @@ -34,6 +34,7 @@ use colored::{ColoredString, Colorize}; use humantime::format_duration; use quickwit_actors::{ActorExitStatus, ActorHandle, Universe}; use quickwit_cluster::{Cluster, ClusterMember}; +use quickwit_common::pubsub::EventBroker; use quickwit_common::runtimes::RuntimesConfig; use quickwit_common::uri::Uri; use quickwit_config::service::QuickwitService; @@ -460,6 +461,7 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result< None, IngesterPool::default(), storage_resolver, + EventBroker::default(), ) .await?; let universe = Universe::new(); @@ -590,6 +592,7 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> { None, IngesterPool::default(), storage_resolver, + EventBroker::default(), ) .await?; let (indexing_service_mailbox, indexing_service_handle) = diff --git a/quickwit/quickwit-common/src/fs.rs b/quickwit/quickwit-common/src/fs.rs index 39975b3869e..8b22a662387 100644 --- a/quickwit/quickwit-common/src/fs.rs +++ b/quickwit/quickwit-common/src/fs.rs @@ -36,7 +36,7 @@ pub async fn empty_dir>(path: P) -> anyhow::Result<()> { /// Helper function to get the cache path. pub fn get_cache_directory_path(data_dir_path: &Path) -> PathBuf { - data_dir_path.join("cache").join("splits") + data_dir_path.join("indexer-split-cache").join("splits") } #[cfg(test)] diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs index df054c685a3..2bc218cc0cf 100644 --- a/quickwit/quickwit-common/src/lib.rs +++ b/quickwit/quickwit-common/src/lib.rs @@ -46,7 +46,7 @@ pub mod type_map; pub mod uri; use std::env; -use std::fmt::Debug; +use std::fmt::{Debug, Display}; use std::ops::{Range, RangeInclusive}; use std::str::FromStr; @@ -72,7 +72,7 @@ pub fn setup_logging_for_tests() { let _ = env_logger::builder().format_timestamp(None).try_init(); } -pub fn split_file(split_id: &str) -> String { +pub fn split_file(split_id: impl Display) -> String { format!("{split_id}.split") } diff --git a/quickwit/quickwit-common/src/pubsub.rs b/quickwit/quickwit-common/src/pubsub.rs index c1736cf482b..9a187f64409 100644 --- a/quickwit/quickwit-common/src/pubsub.rs +++ b/quickwit/quickwit-common/src/pubsub.rs @@ -39,6 +39,14 @@ dyn_clone::clone_trait_object!( EventSubscriber); type EventSubscriptions = HashMap>; +/// The event broker makes it possible to +/// - emit specific local events +/// - subscribe to these local events +/// The event broker is not distributed in itself. Only events emitted +/// locally will be received by the subscribers. +/// +/// It is however possible to locally subscribe a handler to a kind of event, +/// that will in turn run a RPC to other nodes. #[derive(Debug, Clone, Default)] pub struct EventBroker { inner: Arc, diff --git a/quickwit/quickwit-common/src/rendezvous_hasher.rs b/quickwit/quickwit-common/src/rendezvous_hasher.rs index af284f56cfe..71e04593074 100644 --- a/quickwit/quickwit-common/src/rendezvous_hasher.rs +++ b/quickwit/quickwit-common/src/rendezvous_hasher.rs @@ -24,7 +24,7 @@ use std::hash::{Hash, Hasher}; /// Computes the affinity of a node for a given `key`. /// A higher value means a higher affinity. /// This is the `rendezvous hash`. -fn node_affinity(node: T, key: &U) -> u64 { +pub fn node_affinity(node: T, key: &U) -> u64 { let mut state = DefaultHasher::new(); key.hash(&mut state); node.hash(&mut state); diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index 4dc582de22a..6f5e0c77cca 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -64,7 +64,7 @@ pub use crate::metastore_config::{ MetastoreBackend, MetastoreConfig, MetastoreConfigs, PostgresMetastoreConfig, }; pub use crate::node_config::{ - IndexerConfig, IngestApiConfig, JaegerConfig, NodeConfig, SearcherConfig, + IndexerConfig, IngestApiConfig, JaegerConfig, NodeConfig, SearcherConfig, SplitCacheLimits, DEFAULT_QW_CONFIG_PATH, }; use crate::source_config::serialize::{SourceConfigV0_6, VersionedSourceConfig}; diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index edd8fe33b87..3c54d3970f5 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -22,7 +22,7 @@ mod serialize; use std::collections::{HashMap, HashSet}; use std::env; use std::net::SocketAddr; -use std::num::{NonZeroU64, NonZeroUsize}; +use std::num::{NonZeroU32, NonZeroU64, NonZeroUsize}; use std::path::PathBuf; use std::time::Duration; @@ -110,6 +110,36 @@ impl Default for IndexerConfig { } } +#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct SplitCacheLimits { + pub max_num_bytes: Byte, + #[serde(default = "SplitCacheLimits::default_max_num_splits")] + pub max_num_splits: NonZeroU32, + #[serde(default = "SplitCacheLimits::default_num_concurrent_downloads")] + pub num_concurrent_downloads: NonZeroU32, +} + +impl SplitCacheLimits { + fn default_max_num_splits() -> NonZeroU32 { + NonZeroU32::new(10_000).unwrap() + } + + fn default_num_concurrent_downloads() -> NonZeroU32 { + NonZeroU32::new(1).unwrap() + } +} + +impl Default for SplitCacheLimits { + fn default() -> SplitCacheLimits { + SplitCacheLimits { + max_num_bytes: Byte::from_bytes(1_000_000_000), // 1 GB. + max_num_splits: NonZeroU32::new(100).unwrap(), + num_concurrent_downloads: NonZeroU32::new(1).unwrap(), + } + } +} + #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] #[serde(deny_unknown_fields, default)] pub struct SearcherConfig { @@ -120,6 +150,11 @@ pub struct SearcherConfig { pub partial_request_cache_capacity: Byte, pub max_num_concurrent_split_searches: usize, pub max_num_concurrent_split_streams: usize, + // Strangely, if None, this will also have the effect of not forwarding + // to searcher. + // TODO document and fix if necessary. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub split_cache: Option, } impl Default for SearcherConfig { @@ -132,6 +167,7 @@ impl Default for SearcherConfig { max_num_concurrent_split_searches: 100, aggregation_memory_limit: Byte::from_bytes(500_000_000), // 500M aggregation_bucket_limit: 65000, + split_cache: None, } } } diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index 6a8d42d5beb..b5b7bfe6db2 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -506,6 +506,7 @@ mod tests { partial_request_cache_capacity: Byte::from_str("64M").unwrap(), max_num_concurrent_split_searches: 150, max_num_concurrent_split_streams: 120, + split_cache: None, } ); assert_eq!( diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index fbc73e78618..8b9a3592017 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -26,6 +26,7 @@ use quickwit_actors::{ Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Health, Mailbox, QueueCapacity, Supervisable, HEARTBEAT, }; +use quickwit_common::pubsub::EventBroker; use quickwit_common::temp_dir::TempDirectory; use quickwit_common::KillSwitch; use quickwit_config::{IndexingSettings, SourceConfig}; @@ -348,6 +349,7 @@ impl IndexingPipeline { self.params.split_store.clone(), SplitsUpdateMailbox::Sequencer(sequencer_mailbox), self.params.max_concurrent_split_uploads_index, + self.params.event_broker.clone(), ); let (uploader_mailbox, uploader_handle) = ctx .spawn_actor() @@ -567,6 +569,7 @@ pub struct IndexingPipelineParams { pub source_config: SourceConfig, pub ingester_pool: IngesterPool, pub queues_dir_path: PathBuf, + pub event_broker: EventBroker, } #[cfg(test)] @@ -668,8 +671,9 @@ mod tests { input_format: SourceInputFormat::Json, }; let storage = Arc::new(RamStorage::default()); - let split_store = IndexingSplitStore::create_without_local_store(storage.clone()); + let split_store = IndexingSplitStore::create_without_local_store_for_test(storage.clone()); let (merge_planner_mailbox, _) = universe.create_test_mailbox(); + let event_broker = EventBroker::default(); let pipeline_params = IndexingPipelineParams { pipeline_id, doc_mapper: Arc::new(default_doc_mapper_for_test()), @@ -686,6 +690,7 @@ mod tests { max_concurrent_split_uploads_merge: 5, cooperative_indexing_permits: None, merge_planner_mailbox, + event_broker, }; let pipeline = IndexingPipeline::new(pipeline_params); let (_pipeline_mailbox, pipeline_handle) = universe.spawn_builder().spawn(pipeline); @@ -769,7 +774,7 @@ mod tests { input_format: SourceInputFormat::Json, }; let storage = Arc::new(RamStorage::default()); - let split_store = IndexingSplitStore::create_without_local_store(storage.clone()); + let split_store = IndexingSplitStore::create_without_local_store_for_test(storage.clone()); let (merge_planner_mailbox, _) = universe.create_test_mailbox(); let pipeline_params = IndexingPipelineParams { pipeline_id, @@ -787,6 +792,7 @@ mod tests { max_concurrent_split_uploads_merge: 5, cooperative_indexing_permits: None, merge_planner_mailbox, + event_broker: Default::default(), }; let pipeline = IndexingPipeline::new(pipeline_params); let (_pipeline_mailbox, pipeline_handler) = universe.spawn_builder().spawn(pipeline); @@ -832,7 +838,7 @@ mod tests { input_format: SourceInputFormat::Json, }; let storage = Arc::new(RamStorage::default()); - let split_store = IndexingSplitStore::create_without_local_store(storage.clone()); + let split_store = IndexingSplitStore::create_without_local_store_for_test(storage.clone()); let merge_pipeline_params = MergePipelineParams { pipeline_id: pipeline_id.clone(), doc_mapper: doc_mapper.clone(), @@ -842,6 +848,7 @@ mod tests { merge_policy: default_merge_policy(), max_concurrent_split_uploads: 2, merge_max_io_num_bytes_per_sec: None, + event_broker: Default::default(), }; let merge_pipeline = MergePipeline::new(merge_pipeline_params, universe.spawn_ctx()); let merge_planner_mailbox = merge_pipeline.merge_planner_mailbox().clone(); @@ -863,6 +870,7 @@ mod tests { max_concurrent_split_uploads_merge: 5, cooperative_indexing_permits: None, merge_planner_mailbox: merge_planner_mailbox.clone(), + event_broker: Default::default(), }; let indexing_pipeline = IndexingPipeline::new(indexing_pipeline_params); let (_indexing_pipeline_mailbox, indexing_pipeline_handler) = @@ -950,7 +958,7 @@ mod tests { input_format: SourceInputFormat::Json, }; let storage = Arc::new(RamStorage::default()); - let split_store = IndexingSplitStore::create_without_local_store(storage.clone()); + let split_store = IndexingSplitStore::create_without_local_store_for_test(storage.clone()); let (merge_planner_mailbox, _) = universe.create_test_mailbox(); // Create a minimal mapper with wrong date format to ensure that all documents will fail let broken_mapper = serde_json::from_str::( @@ -986,6 +994,7 @@ mod tests { max_concurrent_split_uploads_merge: 5, cooperative_indexing_permits: None, merge_planner_mailbox, + event_broker: Default::default(), }; let pipeline = IndexingPipeline::new(pipeline_params); let (_pipeline_mailbox, pipeline_handler) = universe.spawn_builder().spawn(pipeline); diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 8ed548a93e9..05e4f75dae3 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -32,6 +32,7 @@ use quickwit_actors::{ }; use quickwit_cluster::Cluster; use quickwit_common::fs::get_cache_directory_path; +use quickwit_common::pubsub::EventBroker; use quickwit_common::temp_dir; use quickwit_config::{ build_doc_mapper, IndexConfig, IndexerConfig, SourceConfig, INGEST_API_SOURCE_ID, @@ -90,6 +91,12 @@ struct MergePipelineHandle { handle: ActorHandle, } +/// The indexing service is (single) actor service running on indexer and in charge +/// of executing the indexing plans received from the control plane. +/// +/// Concretely this means receiving new plans, comparing the current situation +/// with the target situation, and spawning/shutting down the indexing pipelines that +/// are respectively missing or extranumerous. pub struct IndexingService { node_id: String, indexing_root_directory: PathBuf, @@ -106,10 +113,11 @@ pub struct IndexingService { max_concurrent_split_uploads: usize, merge_pipeline_handles: HashMap, cooperative_indexing_permits: Option>, + event_broker: EventBroker, } impl Debug for IndexingService { - fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { formatter .debug_struct("IndexingService") .field("cluster_id", &self.cluster.cluster_id()) @@ -131,6 +139,7 @@ impl IndexingService { ingest_api_service_opt: Option>, ingester_pool: IngesterPool, storage_resolver: StorageResolver, + event_broker: EventBroker, ) -> anyhow::Result { let split_store_space_quota = SplitStoreQuota::new( indexer_config.split_store_max_num_splits, @@ -162,6 +171,7 @@ impl IndexingService { max_concurrent_split_uploads: indexer_config.max_concurrent_split_uploads, merge_pipeline_handles: HashMap::new(), cooperative_indexing_permits, + event_broker, }) } @@ -275,6 +285,7 @@ impl IndexingService { .resources .max_merge_write_throughput, max_concurrent_split_uploads: self.max_concurrent_split_uploads, + event_broker: self.event_broker.clone(), }; let merge_planner_mailbox = self @@ -301,10 +312,13 @@ impl IndexingService { merge_policy, max_concurrent_split_uploads_merge, merge_planner_mailbox, + // Source-related parameters source_config, ingester_pool: self.ingester_pool.clone(), queues_dir_path: self.queue_dir_path.clone(), + + event_broker: self.event_broker.clone(), }; let pipeline = IndexingPipeline::new(pipeline_params); let (pipeline_mailbox, pipeline_handle) = ctx.spawn_actor().spawn(pipeline); @@ -801,7 +815,7 @@ mod tests { use super::*; - async fn spawn_indexing_service( + async fn spawn_indexing_service_for_test( data_dir_path: &Path, universe: &Universe, metastore: Arc, @@ -825,6 +839,7 @@ mod tests { Some(ingest_api_service), IngesterPool::default(), storage_resolver.clone(), + EventBroker::default(), ) .await .unwrap(); @@ -852,7 +867,7 @@ mod tests { let universe = Universe::with_accelerated_time(); let temp_dir = tempfile::tempdir().unwrap(); let (indexing_service, indexing_service_handle) = - spawn_indexing_service(temp_dir.path(), &universe, metastore, cluster).await; + spawn_indexing_service_for_test(temp_dir.path(), &universe, metastore, cluster).await; let observation = indexing_service_handle.observe().await; assert_eq!(observation.num_running_pipelines, 0); assert_eq!(observation.num_failed_pipelines, 0); @@ -942,7 +957,7 @@ mod tests { let universe = Universe::new(); let temp_dir = tempfile::tempdir().unwrap(); let (indexing_service, indexing_server_handle) = - spawn_indexing_service(temp_dir.path(), &universe, metastore, cluster).await; + spawn_indexing_service_for_test(temp_dir.path(), &universe, metastore, cluster).await; // Test `supervise_pipelines` let source_config = SourceConfig { @@ -998,7 +1013,7 @@ mod tests { .unwrap(); let universe = Universe::new(); let temp_dir = tempfile::tempdir().unwrap(); - let (indexing_service, indexing_service_handle) = spawn_indexing_service( + let (indexing_service, indexing_service_handle) = spawn_indexing_service_for_test( temp_dir.path(), &universe, metastore.clone(), @@ -1228,6 +1243,7 @@ mod tests { Some(ingest_api_service), IngesterPool::default(), storage_resolver.clone(), + EventBroker::default(), ) .await .unwrap(); @@ -1335,8 +1351,13 @@ mod tests { metastore.expect_list_splits().returning(|_| Ok(Vec::new())); let universe = Universe::new(); let temp_dir = tempfile::tempdir().unwrap(); - let (indexing_service, indexing_service_handle) = - spawn_indexing_service(temp_dir.path(), &universe, Arc::new(metastore), cluster).await; + let (indexing_service, indexing_service_handle) = spawn_indexing_service_for_test( + temp_dir.path(), + &universe, + Arc::new(metastore), + cluster, + ) + .await; let _pipeline_id = indexing_service .ask_for_res(SpawnPipeline { index_id: index_id.clone(), @@ -1410,6 +1431,7 @@ mod tests { Some(ingest_api_service.clone()), IngesterPool::default(), storage_resolver.clone(), + EventBroker::default(), ) .await .unwrap(); diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs index aa2ea214e97..9254d4a9b3a 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs @@ -27,6 +27,7 @@ use quickwit_actors::{ SpawnContext, Supervisable, HEARTBEAT, }; use quickwit_common::io::IoControls; +use quickwit_common::pubsub::EventBroker; use quickwit_common::temp_dir::TempDirectory; use quickwit_common::KillSwitch; use quickwit_doc_mapper::DocMapper; @@ -244,6 +245,7 @@ impl MergePipeline { self.params.split_store.clone(), merge_publisher_mailbox.into(), self.params.max_concurrent_split_uploads, + self.params.event_broker.clone(), ); let (merge_uploader_mailbox, merge_uploader_handler) = ctx .spawn_actor() @@ -465,6 +467,7 @@ pub struct MergePipelineParams { pub merge_policy: Arc, pub max_concurrent_split_uploads: usize, //< TODO share with the indexing pipeline. pub merge_max_io_num_bytes_per_sec: Option, + pub event_broker: EventBroker, } #[cfg(test)] @@ -510,7 +513,7 @@ mod tests { }); let universe = Universe::with_accelerated_time(); let storage = Arc::new(RamStorage::default()); - let split_store = IndexingSplitStore::create_without_local_store(storage.clone()); + let split_store = IndexingSplitStore::create_without_local_store_for_test(storage.clone()); let pipeline_params = MergePipelineParams { pipeline_id, doc_mapper: Arc::new(default_doc_mapper_for_test()), @@ -520,6 +523,7 @@ mod tests { merge_policy: default_merge_policy(), max_concurrent_split_uploads: 2, merge_max_io_num_bytes_per_sec: None, + event_broker: Default::default(), }; let pipeline = MergePipeline::new(pipeline_params, universe.spawn_ctx()); let (_pipeline_mailbox, pipeline_handler) = universe.spawn_builder().spawn(pipeline); diff --git a/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs b/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs index 1557e6c166f..88c44722849 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs @@ -166,7 +166,7 @@ mod tests { storage_builder = storage_builder.put(&split_file(split.split_id()), &buffer); } let ram_storage = storage_builder.build(); - IndexingSplitStore::create_without_local_store(Arc::new(ram_storage)) + IndexingSplitStore::create_without_local_store_for_test(Arc::new(ram_storage)) }; let universe = Universe::with_accelerated_time(); diff --git a/quickwit/quickwit-indexing/src/actors/uploader.rs b/quickwit/quickwit-indexing/src/actors/uploader.rs index 670443a4d70..ab7d7629e5f 100644 --- a/quickwit/quickwit-indexing/src/actors/uploader.rs +++ b/quickwit/quickwit-indexing/src/actors/uploader.rs @@ -29,8 +29,10 @@ use fail::fail_point; use itertools::Itertools; use once_cell::sync::OnceCell; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity}; +use quickwit_common::pubsub::EventBroker; use quickwit_metastore::checkpoint::IndexCheckpointDelta; use quickwit_metastore::{Metastore, SplitMetadata}; +use quickwit_proto::search::{ReportSplit, ReportSplitsRequest}; use quickwit_proto::{IndexUid, PublishToken}; use quickwit_storage::SplitPayloadBuilder; use serde::Serialize; @@ -166,6 +168,7 @@ pub struct Uploader { split_update_mailbox: SplitsUpdateMailbox, max_concurrent_split_uploads: usize, counters: UploaderCounters, + event_broker: EventBroker, } impl Uploader { @@ -176,6 +179,7 @@ impl Uploader { split_store: IndexingSplitStore, split_update_mailbox: SplitsUpdateMailbox, max_concurrent_split_uploads: usize, + event_broker: EventBroker, ) -> Uploader { Uploader { uploader_type, @@ -185,6 +189,7 @@ impl Uploader { split_update_mailbox, max_concurrent_split_uploads, counters: Default::default(), + event_broker, } } async fn acquire_semaphore( @@ -294,11 +299,14 @@ impl Handler for Uploader { let ctx_clone = ctx.clone(); let merge_policy = self.merge_policy.clone(); info!(split_ids=?split_ids, "start-stage-and-store-splits"); + let event_broker = self.event_broker.clone(); tokio::spawn( async move { fail_point!("uploader:intask:before"); let mut split_metadata_list = Vec::with_capacity(batch.splits.len()); + let mut report_splits: Vec = Vec::with_capacity(batch.splits.len()); + for packaged_split in batch.splits.iter() { if batch.publish_lock.is_dead() { // TODO: Remove the junk right away? @@ -318,7 +326,13 @@ impl Handler for Uploader { split_streamer.footer_range.start..split_streamer.footer_range.end, ); + report_splits.push(ReportSplit { + storage_uri: split_store.remote_uri().to_string(), + split_id: packaged_split.split_id().to_string(), + }); + split_metadata_list.push(split_metadata); + } metastore @@ -327,6 +341,9 @@ impl Handler for Uploader { counters.num_staged_splits.fetch_add(split_metadata_list.len() as u64, Ordering::SeqCst); let mut packaged_splits_and_metadata = Vec::with_capacity(batch.splits.len()); + + event_broker.publish(ReportSplitsRequest { report_splits }); + for (packaged_split, metadata) in batch.splits.into_iter().zip(split_metadata_list) { let upload_result = upload_split( &packaged_split, @@ -462,8 +479,10 @@ async fn upload_split( #[cfg(test)] mod tests { use std::path::PathBuf; + use std::time::Duration; use quickwit_actors::{ObservationType, Universe}; + use quickwit_common::pubsub::EventSubscriber; use quickwit_common::temp_dir::TempDirectory; use quickwit_metastore::checkpoint::{IndexCheckpointDelta, SourceCheckpointDelta}; use quickwit_metastore::MockMetastore; @@ -479,6 +498,7 @@ mod tests { #[tokio::test] async fn test_uploader_with_sequencer() -> anyhow::Result<()> { quickwit_common::setup_logging_for_tests(); + let event_broker = EventBroker::default(); let universe = Universe::new(); let pipeline_id = IndexingPipelineId { index_uid: IndexUid::new("test-index"), @@ -501,7 +521,7 @@ mod tests { .returning(|_, _| Ok(())); let ram_storage = RamStorage::default(); let split_store = - IndexingSplitStore::create_without_local_store(Arc::new(ram_storage.clone())); + IndexingSplitStore::create_without_local_store_for_test(Arc::new(ram_storage.clone())); let merge_policy = Arc::new(NopMergePolicy); let uploader = Uploader::new( UploaderType::IndexUploader, @@ -510,6 +530,7 @@ mod tests { split_store, SplitsUpdateMailbox::Sequencer(sequencer_mailbox), 4, + event_broker, ); let (uploader_mailbox, uploader_handle) = universe.spawn_builder().spawn(uploader); let split_scratch_directory = TempDirectory::for_test(); @@ -612,7 +633,7 @@ mod tests { .returning(|_, _| Ok(())); let ram_storage = RamStorage::default(); let split_store = - IndexingSplitStore::create_without_local_store(Arc::new(ram_storage.clone())); + IndexingSplitStore::create_without_local_store_for_test(Arc::new(ram_storage.clone())); let merge_policy = Arc::new(NopMergePolicy); let uploader = Uploader::new( UploaderType::IndexUploader, @@ -621,6 +642,7 @@ mod tests { split_store, SplitsUpdateMailbox::Sequencer(sequencer_mailbox), 4, + EventBroker::default(), ); let (uploader_mailbox, uploader_handle) = universe.spawn_builder().spawn(uploader); let split_scratch_directory_1 = TempDirectory::for_test(); @@ -751,7 +773,7 @@ mod tests { .returning(|_, _| Ok(())); let ram_storage = RamStorage::default(); let split_store = - IndexingSplitStore::create_without_local_store(Arc::new(ram_storage.clone())); + IndexingSplitStore::create_without_local_store_for_test(Arc::new(ram_storage.clone())); let merge_policy = Arc::new(NopMergePolicy); let uploader = Uploader::new( UploaderType::IndexUploader, @@ -760,6 +782,7 @@ mod tests { split_store, SplitsUpdateMailbox::Publisher(publisher_mailbox), 4, + EventBroker::default(), ); let (uploader_mailbox, uploader_handle) = universe.spawn_builder().spawn(uploader); let split_scratch_directory = TempDirectory::for_test(); @@ -820,7 +843,7 @@ mod tests { mock_metastore.expect_stage_splits().never(); let ram_storage = RamStorage::default(); let split_store = - IndexingSplitStore::create_without_local_store(Arc::new(ram_storage.clone())); + IndexingSplitStore::create_without_local_store_for_test(Arc::new(ram_storage.clone())); let uploader = Uploader::new( UploaderType::IndexUploader, Arc::new(mock_metastore), @@ -828,6 +851,7 @@ mod tests { split_store, SplitsUpdateMailbox::Sequencer(sequencer_mailbox), 4, + EventBroker::default(), ); let (uploader_mailbox, uploader_handle) = universe.spawn_builder().spawn(uploader); let checkpoint_delta = IndexCheckpointDelta { @@ -880,4 +904,110 @@ mod tests { universe.assert_quit().await; Ok(()) } + + #[derive(Clone)] + struct ReportSplitListener { + report_splits_tx: flume::Sender, + } + + impl std::fmt::Debug for ReportSplitListener { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("ReportSplitListener").finish() + } + } + + #[async_trait] + impl EventSubscriber for ReportSplitListener { + async fn handle_event(&mut self, event: ReportSplitsRequest) { + self.report_splits_tx.send(event).unwrap(); + } + } + + #[tokio::test] + async fn test_uploader_notifies_event_broker() -> anyhow::Result<()> { + quickwit_common::setup_logging_for_tests(); + const SPLIT_ULID_STR: &str = "01HAV29D4XY3D462FS3D8K5Q2H"; + let event_broker = EventBroker::default(); + let (report_splits_tx, report_splits_rx) = flume::unbounded(); + let report_splits_listener = ReportSplitListener { report_splits_tx }; + + // we need to keep the handle alive. + let _subscribe_handle = event_broker.subscribe(report_splits_listener); + + let universe = Universe::new(); + let pipeline_id = IndexingPipelineId { + index_uid: IndexUid::new("test-index"), + source_id: "test-source".to_string(), + node_id: "test-node".to_string(), + pipeline_ord: 0, + }; + let mut mock_metastore = MockMetastore::default(); + mock_metastore + .expect_stage_splits() + .times(1) + .returning(|_, _| Ok(())); + let ram_storage = RamStorage::default(); + let split_store = + IndexingSplitStore::create_without_local_store_for_test(Arc::new(ram_storage.clone())); + let merge_policy = Arc::new(NopMergePolicy); + let (publisher_mailbox, _publisher_inbox) = universe.create_test_mailbox(); + let uploader = Uploader::new( + UploaderType::IndexUploader, + Arc::new(mock_metastore), + merge_policy, + split_store, + SplitsUpdateMailbox::Publisher(publisher_mailbox), + 4, + event_broker, + ); + let (uploader_mailbox, uploader_handle) = universe.spawn_builder().spawn(uploader); + let split_scratch_directory = TempDirectory::for_test(); + let checkpoint_delta_opt: Option = Some(IndexCheckpointDelta { + source_id: "test-source".to_string(), + source_delta: SourceCheckpointDelta::from_range(3..15), + }); + uploader_mailbox + .send_message(PackagedSplitBatch::new( + vec![PackagedSplit { + split_attrs: SplitAttrs { + partition_id: 3u64, + pipeline_id, + time_range: Some( + DateTime::from_timestamp_secs(1_628_203_589) + ..=DateTime::from_timestamp_secs(1_628_203_640), + ), + uncompressed_docs_size_in_bytes: 1_000, + num_docs: 10, + replaced_split_ids: Vec::new(), + split_id: SPLIT_ULID_STR.to_string(), + delete_opstamp: 10, + num_merge_ops: 0, + }, + split_scratch_directory, + tags: Default::default(), + hotcache_bytes: Vec::new(), + split_files: Vec::new(), + }], + checkpoint_delta_opt, + PublishLock::default(), + None, + None, + Span::none(), + )) + .await?; + assert_eq!( + uploader_handle.process_pending_and_observe().await.obs_type, + ObservationType::Alive + ); + mem::drop(uploader_mailbox); + let report_splits: ReportSplitsRequest = report_splits_rx + .recv_timeout(Duration::from_secs(1)) + .unwrap(); + assert_eq!(report_splits.report_splits.len(), 1); + let split = &report_splits.report_splits[0]; + assert_eq!(split.storage_uri, "ram:///"); + assert_eq!(split.split_id, SPLIT_ULID_STR); + universe.assert_quit().await; + Ok(()) + } } diff --git a/quickwit/quickwit-indexing/src/lib.rs b/quickwit/quickwit-indexing/src/lib.rs index b5467152ed1..482f992fb1b 100644 --- a/quickwit/quickwit-indexing/src/lib.rs +++ b/quickwit/quickwit-indexing/src/lib.rs @@ -23,6 +23,7 @@ use std::sync::Arc; use quickwit_actors::{Mailbox, Universe}; use quickwit_cluster::Cluster; +use quickwit_common::pubsub::EventBroker; use quickwit_config::NodeConfig; use quickwit_ingest::{IngestApiService, IngesterPool}; use quickwit_metastore::Metastore; @@ -72,6 +73,7 @@ pub async fn start_indexing_service( ingest_api_service: Mailbox, ingester_pool: IngesterPool, storage_resolver: StorageResolver, + event_broker: EventBroker, ) -> anyhow::Result> { info!("Starting indexer service."); @@ -86,6 +88,7 @@ pub async fn start_indexing_service( Some(ingest_api_service), ingester_pool, storage_resolver, + event_broker, ) .await?; let (indexing_service, _) = universe.spawn_builder().spawn(indexing_service); diff --git a/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs b/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs index f58238c79d4..50a849c9dac 100644 --- a/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs +++ b/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs @@ -27,6 +27,7 @@ use anyhow::Context; #[cfg(any(test, feature = "testsuite"))] use byte_unit::Byte; use quickwit_common::io::{IoControls, IoControlsAccess}; +use quickwit_common::uri::Uri; use quickwit_metastore::SplitMetadata; use quickwit_storage::{PutPayload, Storage, StorageResult}; use tantivy::directory::MmapDirectory; @@ -64,7 +65,6 @@ pub struct IndexingSplitStore { struct InnerIndexingSplitStore { /// The remote storage. remote_storage: Arc, - local_split_store: Arc, } @@ -96,7 +96,7 @@ impl IndexingSplitStore { /// Helper function to create a indexing split store for tests. /// The resulting store does not have any local cache. - pub fn create_without_local_store(remote_storage: Arc) -> Self { + pub fn create_without_local_store_for_test(remote_storage: Arc) -> Self { let inner = InnerIndexingSplitStore { remote_storage, local_split_store: Arc::new(LocalSplitStore::no_caching()), @@ -106,6 +106,14 @@ impl IndexingSplitStore { } } + pub fn remote_uri(&self) -> &Uri { + self.inner.remote_storage.uri() + } + + fn split_path(&self, split_id: &str) -> PathBuf { + PathBuf::from(quickwit_common::split_file(split_id)) + } + /// Stores a split. /// /// If a split is identified as mature by the merge policy, @@ -125,7 +133,7 @@ impl IndexingSplitStore { let start = Instant::now(); let split_num_bytes = put_payload.len(); - let key = PathBuf::from(quickwit_common::split_file(split.split_id())); + let key = self.split_path(split.split_id()); let is_mature = split.is_mature(OffsetDateTime::now_utc()); self.inner .remote_storage @@ -176,7 +184,6 @@ impl IndexingSplitStore { /// /// If not, it will be fetched from the remote `Storage`. /// - /// /// # Implementation detail: /// /// Depending on whether the split was obtained from the `Storage` diff --git a/quickwit/quickwit-indexing/src/split_store/local_split_store.rs b/quickwit/quickwit-indexing/src/split_store/local_split_store.rs index 913564720d9..e436023d60a 100644 --- a/quickwit/quickwit-indexing/src/split_store/local_split_store.rs +++ b/quickwit/quickwit-indexing/src/split_store/local_split_store.rs @@ -183,7 +183,7 @@ impl InnerLocalSplitStore { /// Returns the directory filepath of a split in cache. fn split_path(&self, split_id: Ulid) -> PathBuf { - let split_file = split_file(&split_id.to_string()); + let split_file = split_file(split_id); self.split_store_folder.join(split_file) } diff --git a/quickwit/quickwit-indexing/src/test_utils.rs b/quickwit/quickwit-indexing/src/test_utils.rs index 9bba94bedb1..9a38d4e5a99 100644 --- a/quickwit/quickwit-indexing/src/test_utils.rs +++ b/quickwit/quickwit-indexing/src/test_utils.rs @@ -25,6 +25,7 @@ use bytes::Bytes; use chitchat::transport::ChannelTransport; use quickwit_actors::{Mailbox, Universe}; use quickwit_cluster::create_cluster_for_test; +use quickwit_common::pubsub::EventBroker; use quickwit_common::rand::append_random_suffix; use quickwit_common::uri::Uri; use quickwit_config::{ @@ -71,7 +72,7 @@ impl TestSandbox { doc_mapping_yaml: &str, indexing_settings_yaml: &str, search_fields: &[&str], - ) -> anyhow::Result { + ) -> anyhow::Result { let node_id = append_random_suffix("test-node"); let transport = ChannelTransport::default(); let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) @@ -113,6 +114,7 @@ impl TestSandbox { Some(ingest_api_service), IngesterPool::default(), storage_resolver.clone(), + EventBroker::default(), ) .await?; let (indexing_service, _indexing_service_handle) = diff --git a/quickwit/quickwit-jaeger/src/integration_tests.rs b/quickwit/quickwit-jaeger/src/integration_tests.rs index f250e392a82..d4163f918f0 100644 --- a/quickwit/quickwit-jaeger/src/integration_tests.rs +++ b/quickwit/quickwit-jaeger/src/integration_tests.rs @@ -24,6 +24,7 @@ use std::time::Duration; use quickwit_actors::{ActorHandle, Mailbox, Universe}; use quickwit_cluster::{create_cluster_for_test, ChannelTransport, Cluster}; +use quickwit_common::pubsub::EventBroker; use quickwit_common::uri::Uri; use quickwit_config::{IndexerConfig, IngestApiConfig, JaegerConfig, SearcherConfig, SourceConfig}; use quickwit_indexing::models::SpawnPipeline; @@ -51,7 +52,8 @@ use quickwit_proto::opentelemetry::proto::trace::v1::{ ResourceSpans, ScopeSpans, Span as OtlpSpan, Status as OtlpStatus, }; use quickwit_search::{ - start_searcher_service, SearchJobPlacer, SearchService, SearchServiceClient, SearcherPool, + start_searcher_service, SearchJobPlacer, SearchService, SearchServiceClient, SearcherContext, + SearcherPool, }; use quickwit_storage::StorageResolver; use tempfile::TempDir; @@ -352,6 +354,7 @@ async fn indexer_for_test( Some(ingester_service), ingester_pool, storage_resolver, + EventBroker::default(), ) .await .unwrap(); @@ -366,11 +369,12 @@ async fn searcher_for_test( let searcher_config = SearcherConfig::default(); let searcher_pool = SearcherPool::default(); let search_job_placer = SearchJobPlacer::new(searcher_pool.clone()); + let searcher_context = Arc::new(SearcherContext::new(searcher_config, None)); let searcher_service = start_searcher_service( - searcher_config, metastore, storage_resolver, search_job_placer, + searcher_context, ) .await .unwrap(); diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs index 7a4cc08fd23..8b29d8ffe4d 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs @@ -26,6 +26,7 @@ use quickwit_actors::{ Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Supervisor, SupervisorState, }; use quickwit_common::io::IoControls; +use quickwit_common::pubsub::EventBroker; use quickwit_common::temp_dir::{self}; use quickwit_common::uri::Uri; use quickwit_config::build_doc_mapper; @@ -82,6 +83,7 @@ pub struct DeleteTaskPipeline { handles: Option, max_concurrent_split_uploads: usize, state: DeleteTaskPipelineState, + event_broker: EventBroker, } #[async_trait] @@ -129,6 +131,7 @@ impl DeleteTaskPipeline { index_storage: Arc, delete_service_task_dir: PathBuf, max_concurrent_split_uploads: usize, + event_broker: EventBroker, ) -> Self { Self { index_uid, @@ -139,6 +142,7 @@ impl DeleteTaskPipeline { handles: Default::default(), max_concurrent_split_uploads, state: DeleteTaskPipelineState::default(), + event_broker, } } @@ -162,7 +166,7 @@ impl DeleteTaskPipeline { let (publisher_mailbox, publisher_supervisor_handler) = ctx.spawn_actor().supervise(publisher); let split_store = - IndexingSplitStore::create_without_local_store(self.index_storage.clone()); + IndexingSplitStore::create_without_local_store_for_test(self.index_storage.clone()); let merge_policy = merge_policy_from_settings(&index_config.indexing_settings); let uploader = Uploader::new( UploaderType::DeleteUploader, @@ -171,6 +175,7 @@ impl DeleteTaskPipeline { split_store.clone(), SplitsUpdateMailbox::Publisher(publisher_mailbox), self.max_concurrent_split_uploads, + self.event_broker.clone(), ); let (uploader_mailbox, uploader_supervisor_handler) = ctx.spawn_actor().supervise(uploader); @@ -279,6 +284,7 @@ impl Handler for DeleteTaskPipeline { mod tests { use async_trait::async_trait; use quickwit_actors::Handler; + use quickwit_common::pubsub::EventBroker; use quickwit_common::temp_dir::TempDirectory; use quickwit_indexing::TestSandbox; use quickwit_metastore::SplitState; @@ -384,6 +390,7 @@ mod tests { test_sandbox.storage(), delete_service_task_dir.path().into(), 4, + EventBroker::default(), ); let (pipeline_mailbox, pipeline_handler) = @@ -459,6 +466,7 @@ mod tests { test_sandbox.storage(), delete_service_task_dir.path().into(), 4, + EventBroker::default(), ); let (_pipeline_mailbox, pipeline_handler) = diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_service.rs b/quickwit/quickwit-janitor/src/actors/delete_task_service.rs index 3dd5b8a34af..2273b33b3b9 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_service.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_service.rs @@ -24,6 +24,7 @@ use std::time::Duration; use async_trait::async_trait; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, ActorHandle, Handler}; +use quickwit_common::pubsub::EventBroker; use quickwit_common::temp_dir::{self}; use quickwit_config::IndexConfig; use quickwit_metastore::{ListIndexesQuery, Metastore}; @@ -57,6 +58,7 @@ pub struct DeleteTaskService { delete_service_task_dir: PathBuf, pipeline_handles_by_index_uid: HashMap>, max_concurrent_split_uploads: usize, + event_broker: EventBroker, } impl DeleteTaskService { @@ -66,6 +68,7 @@ impl DeleteTaskService { storage_resolver: StorageResolver, data_dir_path: PathBuf, max_concurrent_split_uploads: usize, + event_broker: EventBroker, ) -> anyhow::Result { let delete_service_task_path = data_dir_path.join(DELETE_SERVICE_TASK_DIR_NAME); let delete_service_task_dir = @@ -77,6 +80,7 @@ impl DeleteTaskService { delete_service_task_dir, pipeline_handles_by_index_uid: Default::default(), max_concurrent_split_uploads, + event_broker, }) } } @@ -170,6 +174,7 @@ impl DeleteTaskService { index_storage, self.delete_service_task_dir.clone(), self.max_concurrent_split_uploads, + self.event_broker.clone(), ); let (_pipeline_mailbox, pipeline_handler) = ctx.spawn_actor().spawn(pipeline); self.pipeline_handles_by_index_uid @@ -202,6 +207,7 @@ impl Handler for DeleteTaskService { #[cfg(test)] mod tests { + use quickwit_common::pubsub::EventBroker; use quickwit_indexing::TestSandbox; use quickwit_proto::metastore::DeleteQuery; use quickwit_search::{searcher_pool_for_test, MockSearchService, SearchJobPlacer}; @@ -235,6 +241,7 @@ mod tests { StorageResolver::unconfigured(), data_dir_path, 4, + EventBroker::default(), ) .await .unwrap(); diff --git a/quickwit/quickwit-janitor/src/lib.rs b/quickwit/quickwit-janitor/src/lib.rs index ad0d0c3d43c..906a7902277 100644 --- a/quickwit/quickwit-janitor/src/lib.rs +++ b/quickwit/quickwit-janitor/src/lib.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use quickwit_actors::{Mailbox, Universe}; +use quickwit_common::pubsub::EventBroker; use quickwit_config::NodeConfig; use quickwit_metastore::{Metastore, SplitInfo}; use quickwit_search::SearchJobPlacer; @@ -49,6 +50,7 @@ pub async fn start_janitor_service( metastore: Arc, search_job_placer: SearchJobPlacer, storage_resolver: StorageResolver, + event_broker: EventBroker, ) -> anyhow::Result> { info!("Starting janitor service."); let garbage_collector = GarbageCollector::new(metastore.clone(), storage_resolver.clone()); @@ -63,6 +65,7 @@ pub async fn start_janitor_service( storage_resolver, config.data_dir_path.clone(), config.indexer_config.max_concurrent_split_uploads, + event_broker, ) .await?; let (_, delete_task_service_handle) = universe.spawn_builder().spawn(delete_task_service); diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index 0375fbdef66..5a3561ffa27 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -70,6 +70,8 @@ service SearchService { // Gets a key from the local storage of the targetted node. // This RPC is used in the mini distributed immutable KV store embedded in quickwit. rpc GetKV(GetKVRequest) returns (GetKVResponse); + + rpc ReportSplits(ReportSplitsRequest) returns (ReportSplitsResponse); } /// Scroll Request @@ -95,6 +97,21 @@ message GetKVResponse { optional bytes payload = 1; } + +message ReportSplit { + // Split id (ULID format `01HAV29D4XY3D462FS3D8K5Q2H`) + string split_id = 2; + // The storage uri. This URI does NOT include the split id. + string storage_uri = 1; +} + +message ReportSplitsRequest { + repeated ReportSplit report_splits = 1; +} + +message ReportSplitsResponse {} + + // -- Search ------------------- message SearchRequest { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index 18cbeea386f..0beb6e58d21 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -39,6 +39,28 @@ pub struct GetKvResponse { pub payload: ::core::option::Option<::prost::alloc::vec::Vec>, } #[derive(Serialize, Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ReportSplit { + /// Split id (ULID format `01HAV29D4XY3D462FS3D8K5Q2H`) + #[prost(string, tag = "2")] + pub split_id: ::prost::alloc::string::String, + /// The storage uri. This URI does NOT include the split id. + #[prost(string, tag = "1")] + pub storage_uri: ::prost::alloc::string::String, +} +#[derive(Serialize, Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ReportSplitsRequest { + #[prost(message, repeated, tag = "1")] + pub report_splits: ::prost::alloc::vec::Vec, +} +#[derive(Serialize, Deserialize, utoipa::ToSchema)] +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ReportSplitsResponse {} +#[derive(Serialize, Deserialize, utoipa::ToSchema)] #[derive(Eq, Hash)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -867,6 +889,33 @@ pub mod search_service_client { .insert(GrpcMethod::new("quickwit.search.SearchService", "GetKV")); self.inner.unary(req, path, codec).await } + pub async fn report_splits( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/quickwit.search.SearchService/ReportSplits", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("quickwit.search.SearchService", "ReportSplits"), + ); + self.inner.unary(req, path, codec).await + } } } /// Generated server implementations. @@ -967,6 +1016,13 @@ pub mod search_service_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; + async fn report_splits( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; } #[derive(Debug)] pub struct SearchServiceServer { @@ -1449,6 +1505,52 @@ pub mod search_service_server { }; Box::pin(fut) } + "/quickwit.search.SearchService/ReportSplits" => { + #[allow(non_camel_case_types)] + struct ReportSplitsSvc(pub Arc); + impl< + T: SearchService, + > tonic::server::UnaryService + for ReportSplitsSvc { + type Response = super::ReportSplitsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + (*inner).report_splits(request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = ReportSplitsSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => { Box::pin(async move { Ok( diff --git a/quickwit/quickwit-proto/src/lib.rs b/quickwit/quickwit-proto/src/lib.rs index 3aa111a9d11..f2d142559a1 100644 --- a/quickwit/quickwit-proto/src/lib.rs +++ b/quickwit/quickwit-proto/src/lib.rs @@ -43,6 +43,8 @@ pub mod types; pub use error::{ServiceError, ServiceErrorCode}; pub use types::*; +use crate::search::ReportSplitsRequest; + pub mod jaeger { pub mod api_v2 { include!("codegen/jaeger/jaeger.api_v2.rs"); @@ -247,3 +249,5 @@ impl search::SortOrder { } } } + +impl quickwit_common::pubsub::Event for ReportSplitsRequest {} diff --git a/quickwit/quickwit-search/src/client.rs b/quickwit/quickwit-search/src/client.rs index 6d2b22bb6bb..0ed5c214e83 100644 --- a/quickwit/quickwit-search/src/client.rs +++ b/quickwit/quickwit-search/src/client.rs @@ -24,7 +24,9 @@ use std::time::Duration; use futures::{StreamExt, TryStreamExt}; use http::Uri; -use quickwit_proto::search::{GetKvRequest, LeafSearchStreamResponse, PutKvRequest}; +use quickwit_proto::search::{ + GetKvRequest, LeafSearchStreamResponse, PutKvRequest, ReportSplitsRequest, +}; use quickwit_proto::tonic::codegen::InterceptedService; use quickwit_proto::tonic::transport::{Channel, Endpoint}; use quickwit_proto::tonic::Request; @@ -265,6 +267,29 @@ impl SearchServiceClient { } Ok(()) } + + /// Indexers call report_splits to inform searchers node about the presence of a split, which + /// would then be considered as a candidate for the searcher split cache. + pub async fn report_splits(&mut self, report_splits_request: ReportSplitsRequest) { + match &mut self.client_impl { + SearchServiceClientImpl::Local(service) => { + let _ = service.report_splits(report_splits_request).await; + } + SearchServiceClientImpl::Grpc(search_client) => { + // Ignoring any error. + if search_client + .report_splits(report_splits_request) + .await + .is_err() + { + warn!( + "Failed to report splits. This is not critical as this message is only \ + used to identify caching opportunities." + ); + } + } + } + } } /// Creates a [`SearchServiceClient`] from a socket address. diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 909194d9bc0..c90f43be3e7 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -34,7 +34,7 @@ use quickwit_proto::search::{ }; use quickwit_query::query_ast::QueryAst; use quickwit_storage::{ - wrap_storage_with_long_term_cache, BundleStorage, MemorySizedCache, OwnedBytes, Storage, + wrap_storage_with_cache, BundleStorage, MemorySizedCache, OwnedBytes, SplitCache, Storage, }; use tantivy::collector::Collector; use tantivy::directory::FileSlice; @@ -104,12 +104,21 @@ pub(crate) async fn open_index_with_caches( ) .await?; + // We wrap the top-level storage with the split cache. + // This is before the bundle storage: at this point, this storage is reading `.split` files. + let index_storage_with_split_cache = + if let Some(split_cache) = searcher_context.split_cache_opt.as_ref() { + SplitCache::wrap_storage(split_cache.clone(), index_storage.clone()) + } else { + index_storage.clone() + }; + let (hotcache_bytes, bundle_storage) = BundleStorage::open_from_split_data( - index_storage, + index_storage_with_split_cache, split_file, FileSlice::new(Arc::new(footer_data)), )?; - let bundle_storage_with_cache = wrap_storage_with_long_term_cache( + let bundle_storage_with_cache = wrap_storage_with_cache( searcher_context.fast_fields_cache.clone(), Arc::new(bundle_storage), ); @@ -435,13 +444,12 @@ pub(crate) fn rewrite_start_end_time_bounds( #[instrument(skip_all, fields(index = ?request.index_id_patterns))] pub async fn leaf_search( searcher_context: Arc, - request: &SearchRequest, + request: Arc, index_storage: Arc, splits: &[SplitIdAndFooterOffsets], doc_mapper: Arc, ) -> Result { info!(splits_num = splits.len(), split_offsets = ?PrettySample::new(splits, 5)); - let request = Arc::new(request.clone()); let mut leaf_search_single_split_futures: Vec<_> = Vec::with_capacity(splits.len()); for split in splits { let searcher_context_clone = searcher_context.clone(); @@ -468,9 +476,11 @@ pub async fn leaf_search( doc_mapper_clone, ) .await; - timer.observe_duration(); // We explicitly drop it, to highlight it to the reader and to force the move. - drop(leaf_split_search_permit); + if leaf_search_single_split_res.is_ok() { + timer.observe_duration(); + } + std::mem::drop(leaf_split_search_permit); leaf_search_single_split_res.map_err(|err| (split.split_id.clone(), err)) } .in_current_span(), diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index a6371d9ff3d..5ec4fa7c701 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -64,6 +64,7 @@ use quickwit_metastore::{ListSplitsQuery, Metastore, SplitMetadata, SplitState}; use quickwit_proto::search::{PartialHit, SearchRequest, SearchResponse, SplitIdAndFooterOffsets}; use quickwit_proto::IndexUid; use quickwit_storage::StorageResolver; +pub use service::SearcherContext; use tantivy::DocAddress; pub use crate::client::{ @@ -79,7 +80,6 @@ pub use crate::root::{ pub use crate::search_job_placer::{Job, SearchJobPlacer}; pub use crate::search_response_rest::SearchResponseRest; pub use crate::search_stream::root_search_stream; -use crate::service::SearcherContext; pub use crate::service::{MockSearchService, SearchService, SearchServiceImpl}; use crate::thread_pool::run_cpu_intensive; @@ -166,17 +166,17 @@ fn convert_document_to_json_string( /// Starts a search node, aka a `searcher`. pub async fn start_searcher_service( - searcher_config: SearcherConfig, metastore: Arc, storage_resolver: StorageResolver, search_job_placer: SearchJobPlacer, + searcher_context: Arc, ) -> anyhow::Result> { let cluster_client = ClusterClient::new(search_job_placer); let search_service = Arc::new(SearchServiceImpl::new( metastore, storage_resolver, cluster_client, - searcher_config, + searcher_context, )); Ok(search_service) } @@ -192,19 +192,19 @@ pub async fn single_node_search( let searcher_pool = SearcherPool::default(); let search_job_placer = SearchJobPlacer::new(searcher_pool.clone()); let cluster_client = ClusterClient::new(search_job_placer); + let searcher_config = SearcherConfig::default(); + let searcher_context = Arc::new(SearcherContext::new(searcher_config, None)); let search_service = Arc::new(SearchServiceImpl::new( metastore.clone(), storage_resolver, cluster_client.clone(), - SearcherConfig::default(), + searcher_context.clone(), )); let search_service_client = SearchServiceClient::from_service(search_service.clone(), socket_addr); searcher_pool .insert(socket_addr, search_service_client) .await; - let searcher_config = SearcherConfig::default(); - let searcher_context = SearcherContext::new(searcher_config); root_search( &searcher_context, search_request, diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 7d630e7bb9d..2d061f79cf0 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -1104,7 +1104,7 @@ mod tests { use std::sync::{Arc, RwLock}; use quickwit_common::shared_consts::SCROLL_BATCH_LEN; - use quickwit_config::{DocMapping, IndexingSettings, SearchSettings, SearcherConfig}; + use quickwit_config::{DocMapping, IndexingSettings, SearchSettings}; use quickwit_indexing::MockSplitBuilder; use quickwit_metastore::{IndexMetadata, MockMetastore}; use quickwit_proto::search::{ScrollRequest, SortOrder, SortValue, SplitSearchError}; @@ -1398,7 +1398,7 @@ mod tests { let cluster_client = ClusterClient::new(search_job_placer.clone()); let search_response = root_search( - &Arc::new(SearcherContext::new(SearcherConfig::default())), + &SearcherContext::for_test(), search_request, &metastore, &cluster_client, @@ -1456,8 +1456,9 @@ mod tests { let search_job_placer = SearchJobPlacer::new(searcher_pool); let cluster_client = ClusterClient::new(search_job_placer.clone()); + let searcher_context = SearcherContext::for_test(); let search_response = root_search( - &SearcherContext::new(SearcherConfig::default()), + &searcher_context, search_request, &metastore, &cluster_client, @@ -1541,7 +1542,7 @@ mod tests { let search_job_placer = SearchJobPlacer::new(searcher_pool); let cluster_client = ClusterClient::new(search_job_placer.clone()); let search_response = root_search( - &SearcherContext::new(SearcherConfig::default()), + &SearcherContext::for_test(), search_request, &metastore, &cluster_client, @@ -1664,7 +1665,7 @@ mod tests { let search_job_placer = SearchJobPlacer::new(searcher_pool); let cluster_client = ClusterClient::new(search_job_placer.clone()); let search_response = root_search( - &SearcherContext::new(SearcherConfig::default()), + &SearcherContext::for_test(), search_request.clone(), &metastore, &cluster_client, @@ -1837,7 +1838,7 @@ mod tests { let search_job_placer = SearchJobPlacer::new(searcher_pool); let cluster_client = ClusterClient::new(search_job_placer.clone()); let search_response = root_search( - &SearcherContext::new(SearcherConfig::default()), + &SearcherContext::for_test(), search_request.clone(), &metastore, &cluster_client, @@ -2001,7 +2002,7 @@ mod tests { let search_job_placer = SearchJobPlacer::new(searcher_pool); let cluster_client = ClusterClient::new(search_job_placer.clone()); let search_response = root_search( - &SearcherContext::new(SearcherConfig::default()), + &SearcherContext::for_test(), search_request, &metastore, &cluster_client, @@ -2125,7 +2126,7 @@ mod tests { let search_job_placer = SearchJobPlacer::new(searcher_pool); let cluster_client = ClusterClient::new(search_job_placer.clone()); let search_response = root_search( - &SearcherContext::new(SearcherConfig::default()), + &SearcherContext::for_test(), search_request, &metastore, &cluster_client, @@ -2196,7 +2197,7 @@ mod tests { let search_job_placer = SearchJobPlacer::new(searcher_pool); let cluster_client = ClusterClient::new(search_job_placer.clone()); let search_response = root_search( - &SearcherContext::new(SearcherConfig::default()), + &SearcherContext::for_test(), search_request, &metastore, &cluster_client, @@ -2253,7 +2254,7 @@ mod tests { let search_job_placer = SearchJobPlacer::new(searcher_pool); let cluster_client = ClusterClient::new(search_job_placer.clone()); let search_response = root_search( - &SearcherContext::new(SearcherConfig::default()), + &SearcherContext::for_test(), search_request, &metastore, &cluster_client, @@ -2333,7 +2334,7 @@ mod tests { let search_job_placer = SearchJobPlacer::new(searcher_pool); let cluster_client = ClusterClient::new(search_job_placer.clone()); let search_response = root_search( - &SearcherContext::new(SearcherConfig::default()), + &SearcherContext::for_test(), search_request, &metastore, &cluster_client, @@ -2405,7 +2406,7 @@ mod tests { let search_job_placer = SearchJobPlacer::new(searcher_pool); let cluster_client = ClusterClient::new(search_job_placer.clone()); let search_response = root_search( - &SearcherContext::new(SearcherConfig::default()), + &SearcherContext::for_test(), search_request, &metastore, &cluster_client, @@ -2434,9 +2435,10 @@ mod tests { let searcher_pool = searcher_pool_for_test([("127.0.0.1:1001", MockSearchService::new())]); let search_job_placer = SearchJobPlacer::new(searcher_pool); let cluster_client = ClusterClient::new(search_job_placer.clone()); + let searcher_context = SearcherContext::for_test(); assert!(root_search( - &SearcherContext::new(SearcherConfig::default()), + &searcher_context, quickwit_proto::search::SearchRequest { index_id_patterns: vec!["test-index".to_string()], query_ast: qast_json_helper("invalid_field:\"test\"", &["body"]), @@ -2450,7 +2452,7 @@ mod tests { .is_err()); assert!(root_search( - &SearcherContext::new(SearcherConfig::default()), + &searcher_context, quickwit_proto::search::SearchRequest { index_id_patterns: vec!["test-index".to_string()], query_ast: qast_json_helper("test", &["invalid_field"]), @@ -2509,7 +2511,7 @@ mod tests { let search_job_placer = SearchJobPlacer::new(searcher_pool); let cluster_client = ClusterClient::new(search_job_placer.clone()); let search_response = root_search( - &SearcherContext::new(SearcherConfig::default()), + &SearcherContext::for_test(), search_request, &metastore, &cluster_client, @@ -2549,7 +2551,7 @@ mod tests { let search_job_placer = SearchJobPlacer::new(searcher_pool); let cluster_client = ClusterClient::new(search_job_placer.clone()); let search_response = root_search( - &SearcherContext::new(SearcherConfig::default()), + &SearcherContext::for_test(), search_request, &metastore, &cluster_client, @@ -2569,7 +2571,7 @@ mod tests { }; let search_response = root_search( - &SearcherContext::new(SearcherConfig::default()), + &SearcherContext::for_test(), search_request, &metastore, &cluster_client, @@ -2812,7 +2814,7 @@ mod tests { ); let searcher_pool = searcher_pool_for_test([("127.0.0.1:1001", mock_search_service)]); let search_job_placer = SearchJobPlacer::new(searcher_pool); - let searcher_context = SearcherContext::new(SearcherConfig::default()); + let searcher_context = SearcherContext::for_test(); let cluster_client = ClusterClient::new(search_job_placer.clone()); let mut count_seen_hits = 0; @@ -2990,7 +2992,7 @@ mod tests { let search_job_placer = SearchJobPlacer::new(searcher_pool); let cluster_client = ClusterClient::new(search_job_placer.clone()); let search_response = root_search( - &SearcherContext::new(SearcherConfig::default()), + &SearcherContext::for_test(), search_request, &metastore, &cluster_client, diff --git a/quickwit/quickwit-search/src/search_job_placer.rs b/quickwit/quickwit-search/src/search_job_placer.rs index f78798ed601..648daa06b16 100644 --- a/quickwit/quickwit-search/src/search_job_placer.rs +++ b/quickwit/quickwit-search/src/search_job_placer.rs @@ -19,11 +19,15 @@ use std::cmp::Ordering; use std::collections::{HashMap, HashSet}; +use std::fmt; use std::hash::{Hash, Hasher}; use std::net::SocketAddr; use anyhow::bail; -use quickwit_common::rendezvous_hasher::sort_by_rendez_vous_hash; +use async_trait::async_trait; +use quickwit_common::pubsub::EventSubscriber; +use quickwit_common::rendezvous_hasher::{node_affinity, sort_by_rendez_vous_hash}; +use quickwit_proto::search::{ReportSplit, ReportSplitsRequest}; use crate::{SearchServiceClient, SearcherPool}; @@ -59,6 +63,43 @@ pub struct SearchJobPlacer { searcher_pool: SearcherPool, } +#[async_trait] +impl EventSubscriber for SearchJobPlacer { + async fn handle_event(&mut self, evt: ReportSplitsRequest) { + let mut nodes: HashMap = + self.searcher_pool.all().await.into_iter().collect(); + if nodes.is_empty() { + return; + } + let mut splits_per_node: HashMap> = + HashMap::with_capacity(nodes.len().min(evt.report_splits.len())); + for report_split in evt.report_splits { + let node_addr = nodes + .keys() + .max_by_key(|node_addr| node_affinity(*node_addr, &report_split.split_id)) + // This actually never happens thanks to the if-condition at the + // top of this function. + .expect("`nodes` should not be empty."); + splits_per_node + .entry(*node_addr) + .or_default() + .push(report_split); + } + for (node_addr, report_splits) in splits_per_node { + if let Some(search_client) = nodes.get_mut(&node_addr) { + let report_splits_req = ReportSplitsRequest { report_splits }; + let _ = search_client.report_splits(report_splits_req).await; + } + } + } +} + +impl fmt::Debug for SearchJobPlacer { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("SearchJobPlacer").finish() + } +} + impl SearchJobPlacer { /// Returns an [`SearchJobPlacer`] from a search service client pool. pub fn new(searcher_pool: SearcherPool) -> Self { @@ -197,11 +238,8 @@ impl Eq for CandidateNodes {} #[cfg(test)] mod tests { - use std::collections::HashSet; - use std::net::SocketAddr; - - use crate::root::SearchJob; - use crate::{searcher_pool_for_test, MockSearchService, SearchJobPlacer, SearcherPool}; + use super::*; + use crate::{searcher_pool_for_test, MockSearchService, SearchJob}; #[tokio::test] async fn test_search_job_placer() { diff --git a/quickwit/quickwit-search/src/search_stream/leaf.rs b/quickwit/quickwit-search/src/search_stream/leaf.rs index c551a47dff4..466889497df 100644 --- a/quickwit/quickwit-search/src/search_stream/leaf.rs +++ b/quickwit/quickwit-search/src/search_stream/leaf.rs @@ -450,7 +450,6 @@ mod tests { use std::str::from_utf8; use itertools::Itertools; - use quickwit_config::SearcherConfig; use quickwit_indexing::TestSandbox; use quickwit_query::query_ast::qast_json_helper; use serde_json::json; @@ -505,7 +504,7 @@ mod tests { .into_iter() .map(|split_meta| extract_split_and_footer_offsets(&split_meta.split_metadata)) .collect(); - let searcher_context = Arc::new(SearcherContext::new(SearcherConfig::default())); + let searcher_context = Arc::new(SearcherContext::for_test()); let mut single_node_stream = leaf_search_stream( searcher_context, request, @@ -581,7 +580,7 @@ mod tests { .into_iter() .map(|split_meta| extract_split_and_footer_offsets(&split_meta.split_metadata)) .collect(); - let searcher_context = Arc::new(SearcherContext::new(SearcherConfig::default())); + let searcher_context = Arc::new(SearcherContext::for_test()); let mut single_node_stream = leaf_search_stream( searcher_context, request, @@ -636,7 +635,7 @@ mod tests { .into_iter() .map(|split_meta| extract_split_and_footer_offsets(&split_meta.split_metadata)) .collect(); - let searcher_context = Arc::new(SearcherContext::new(SearcherConfig::default())); + let searcher_context = Arc::new(SearcherContext::for_test()); let mut single_node_stream = leaf_search_stream( searcher_context, request, @@ -724,7 +723,7 @@ mod tests { .into_iter() .map(|split_meta| extract_split_and_footer_offsets(&split_meta.split_metadata)) .collect(); - let searcher_context = Arc::new(SearcherContext::new(SearcherConfig::default())); + let searcher_context = Arc::new(SearcherContext::for_test()); let mut single_node_stream = leaf_search_stream( searcher_context, request, diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index d3d60b179ee..ec44c100516 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -31,10 +31,13 @@ use quickwit_metastore::Metastore; use quickwit_proto::search::{ FetchDocsRequest, FetchDocsResponse, GetKvRequest, Hit, LeafListTermsRequest, LeafListTermsResponse, LeafSearchRequest, LeafSearchResponse, LeafSearchStreamRequest, - LeafSearchStreamResponse, ListTermsRequest, ListTermsResponse, PutKvRequest, ScrollRequest, - SearchRequest, SearchResponse, SearchStreamRequest, SnippetRequest, + LeafSearchStreamResponse, ListTermsRequest, ListTermsResponse, PutKvRequest, + ReportSplitsRequest, ReportSplitsResponse, ScrollRequest, SearchRequest, SearchResponse, + SearchStreamRequest, SnippetRequest, +}; +use quickwit_storage::{ + MemorySizedCache, QuickwitCache, SplitCache, StorageCache, StorageResolver, }; -use quickwit_storage::{MemorySizedCache, QuickwitCache, StorageCache, StorageResolver}; use tantivy::aggregation::AggregationLimits; use tokio::sync::Semaphore; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -129,6 +132,10 @@ pub trait SearchService: 'static + Send + Sync { /// Gets the payload associated to a key in the local cache. /// See also `put_kv(..)`. async fn get_kv(&self, get_kv: GetKvRequest) -> Option>; + + /// Indexers call report_splits to inform searchers node about the presence of a split, which + /// would then be considered as a candidate for the searcher split cache. + async fn report_splits(&self, report_splits: ReportSplitsRequest) -> ReportSplitsResponse; } impl SearchServiceImpl { @@ -137,9 +144,8 @@ impl SearchServiceImpl { metastore: Arc, storage_resolver: StorageResolver, cluster_client: ClusterClient, - searcher_config: SearcherConfig, + searcher_context: Arc, ) -> Self { - let searcher_context = Arc::new(SearcherContext::new(searcher_config)); SearchServiceImpl { metastore, storage_resolver, @@ -174,9 +180,10 @@ impl SearchService for SearchServiceImpl { &self, leaf_search_request: LeafSearchRequest, ) -> crate::Result { - let search_request = leaf_search_request + let search_request: Arc = leaf_search_request .search_request - .ok_or_else(|| SearchError::Internal("no search request".to_string()))?; + .ok_or_else(|| SearchError::Internal("no search request.".to_string()))? + .into(); let storage = self .storage_resolver .resolve(&Uri::from_well_formed(leaf_search_request.index_uri)) @@ -186,7 +193,7 @@ impl SearchService for SearchServiceImpl { let leaf_search_response = leaf_search( self.searcher_context.clone(), - &search_request, + search_request, storage.clone(), &split_ids[..], doc_mapper, @@ -309,6 +316,13 @@ impl SearchService for SearchServiceImpl { let payload: Vec = self.search_after_cache.get(&get_request.key).await?; Some(payload) } + + async fn report_splits(&self, report_splits: ReportSplitsRequest) -> ReportSplitsResponse { + if let Some(split_cache) = self.searcher_context.split_cache_opt.as_ref() { + split_cache.report_splits(report_splits.report_splits); + } + ReportSplitsResponse {} + } } pub(crate) async fn scroll( @@ -400,6 +414,8 @@ pub struct SearcherContext { pub split_stream_semaphore: Semaphore, /// Recent sub-query cache. pub leaf_search_cache: LeafSearchCache, + /// Search split cache. `None` if no split cache is configured. + pub split_cache_opt: Option>, } impl std::fmt::Debug for SearcherContext { @@ -416,7 +432,14 @@ impl std::fmt::Debug for SearcherContext { } impl SearcherContext { - pub fn new(searcher_config: SearcherConfig) -> Self { + #[cfg(test)] + pub fn for_test() -> SearcherContext { + let searcher_config = SearcherConfig::default(); + SearcherContext::new(searcher_config, None) + } + + /// Creates a new searcher context, given a searcher config, and an optional `SplitCache`. + pub fn new(searcher_config: SearcherConfig, split_cache_opt: Option>) -> Self { let capacity_in_bytes = searcher_config.split_footer_cache_capacity.get_bytes() as usize; let global_split_footer_cache = MemorySizedCache::with_capacity_in_bytes( capacity_in_bytes, @@ -433,6 +456,7 @@ impl SearcherContext { let leaf_search_cache = LeafSearchCache::new( searcher_config.partial_request_cache_capacity.get_bytes() as usize, ); + Self { searcher_config, fast_fields_cache: storage_long_term_cache, @@ -440,9 +464,11 @@ impl SearcherContext { split_footer_cache: global_split_footer_cache, split_stream_semaphore, leaf_search_cache, + split_cache_opt, } } - // Returns a new instance to track the aggregation memory usage. + + /// Returns a new instance to track the aggregation memory usage. pub fn get_aggregation_limits(&self) -> AggregationLimits { AggregationLimits::new( Some(self.searcher_config.aggregation_memory_limit.get_bytes()), diff --git a/quickwit/quickwit-search/src/tests.rs b/quickwit/quickwit-search/src/tests.rs index 4cf405fe135..7d1124fab35 100644 --- a/quickwit/quickwit-search/src/tests.rs +++ b/quickwit/quickwit-search/src/tests.rs @@ -974,16 +974,17 @@ async fn test_search_util(test_sandbox: &TestSandbox, query: &str) -> Vec { .into_iter() .map(|split_meta| extract_split_and_footer_offsets(&split_meta.split_metadata)) .collect(); - let request = SearchRequest { + let request = Arc::new(SearchRequest { index_id_patterns: vec![test_sandbox.index_uid().index_id().to_string()], query_ast: qast_json_helper(query, &[]), max_hits: 100, ..Default::default() - }; - let searcher_context = Arc::new(SearcherContext::new(SearcherConfig::default())); + }); + let searcher_context: Arc = + Arc::new(SearcherContext::new(SearcherConfig::default(), None)); let search_response = leaf_search( searcher_context, - &request, + request, test_sandbox.storage(), &splits_offsets, test_sandbox.doc_mapper(), @@ -1605,7 +1606,7 @@ async fn test_single_node_list_terms() -> anyhow::Result<()> { .into_iter() .map(|split_meta| extract_split_and_footer_offsets(&split_meta.split_metadata)) .collect(); - let searcher_context = Arc::new(SearcherContext::new(SearcherConfig::default())); + let searcher_context = Arc::new(SearcherContext::new(SearcherConfig::default(), None)); { let request = ListTermsRequest { diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 0f61a86375f..f60c27ddac1 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -42,10 +42,11 @@ use std::convert::Infallible; use std::fs; use std::net::SocketAddr; use std::num::NonZeroUsize; +use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use byte_unit::n_mib_bytes; pub use format::BodyFormat; use futures::{Stream, StreamExt}; @@ -59,7 +60,7 @@ use quickwit_common::tower::{ Rate, RateLimitLayer, SmaRateEstimator, }; use quickwit_config::service::QuickwitService; -use quickwit_config::{NodeConfig, SearcherConfig}; +use quickwit_config::NodeConfig; use quickwit_control_plane::control_plane::ControlPlane; use quickwit_control_plane::{ControlPlaneEventSubscriber, IndexerNodeInfo, IndexerPool}; use quickwit_index_management::{IndexService as IndexManager, IndexServiceError}; @@ -83,12 +84,13 @@ use quickwit_proto::metastore::events::{ AddSourceEvent, DeleteIndexEvent, DeleteSourceEvent, ToggleSourceEvent, }; use quickwit_proto::metastore::{EntityKind, MetastoreError}; +use quickwit_proto::search::ReportSplitsRequest; use quickwit_proto::NodeId; use quickwit_search::{ create_search_client_from_channel, start_searcher_service, SearchJobPlacer, SearchService, - SearchServiceClient, SearcherPool, + SearchServiceClient, SearcherContext, SearcherPool, }; -use quickwit_storage::StorageResolver; +use quickwit_storage::{SplitCache, StorageResolver}; use tokio::sync::oneshot; use tower::timeout::Timeout; use tower::ServiceBuilder; @@ -115,11 +117,6 @@ struct QuickwitServices { pub metastore_server_opt: Option>, pub metastore_client: Arc, pub control_plane_service: ControlPlaneServiceClient, - #[allow(dead_code)] - /// The control plane listens to metastore events. - /// We must maintain a reference to the subscription handles to continue receiving - /// notifcations. Otherwise, the subscriptions are dropped. - pub control_plane_event_subscription_handles_opt: Option, pub index_manager: IndexManager, pub indexing_service_opt: Option>, // Ingest v1 @@ -132,6 +129,12 @@ struct QuickwitServices { /// It is only used to serve the rest API calls and will only execute /// the root requests. pub search_service: Arc, + + /// The control plane listens to metastore events. + /// We must maintain a reference to the subscription handles to continue receiving + /// notifications. Otherwise, the subscriptions are dropped. + _control_plane_event_subscription_handles_opt: Option, + _report_splits_subscription_handle_opt: Option>, } fn has_node_with_metastore_service(members: &[ClusterMember]) -> bool { @@ -250,6 +253,7 @@ pub async fn serve_quickwit( balance_channel_for_service(&cluster, QuickwitService::ControlPlane).await; ControlPlaneServiceClient::from_channel(balance_channel) }; + // Setup control plane event subscriptions. let control_plane_event_subscription_handles_opt = setup_control_plane_event_subscriptions( &node_config, @@ -284,6 +288,7 @@ pub async fn serve_quickwit( ingest_api_service.clone(), ingester_pool.clone(), storage_resolver.clone(), + event_broker.clone(), ) .await?; let num_buckets = NonZeroUsize::new(60).expect("60 should be non-zero"); @@ -356,17 +361,46 @@ pub async fn serve_quickwit( } } - let searcher_config = node_config.searcher_config.clone(); let cluster_change_stream = cluster.ready_nodes_change_stream().await; + let split_cache_root_directory: PathBuf = + node_config.data_dir_path.join("searcher-split-cache"); + let split_cache_opt: Option> = + if let Some(split_cache_config) = node_config.searcher_config.split_cache { + let split_cache = SplitCache::with_root_path( + split_cache_root_directory, + storage_resolver.clone(), + split_cache_config, + ) + .context("failed to load searcher split cache")?; + Some(Arc::new(split_cache)) + } else { + None + }; + + let searcher_context = Arc::new(SearcherContext::new( + node_config.searcher_config.clone(), + split_cache_opt, + )); + let (search_job_placer, search_service) = setup_searcher( - searcher_config, cluster_change_stream, metastore_client.clone(), storage_resolver.clone(), + searcher_context, ) .await?; + let report_splits_subscription_handle_opt = + // DISCLAIMER: This is quirky here: We base our decision to forward the split report depending + // on the current searcher configuration. + if node_config.searcher_config.split_cache.is_some() { + // The searcher receive hints about new splits to populate their index. + Some(event_broker.subscribe::(search_job_placer.clone())) + } else { + None + }; + let janitor_service_opt = if node_config .enabled_services .contains(&QuickwitService::Janitor) @@ -377,6 +411,7 @@ pub async fn serve_quickwit( metastore_client.clone(), search_job_placer, storage_resolver.clone(), + event_broker.clone(), ) .await?; Some(janitor_service) @@ -394,7 +429,8 @@ pub async fn serve_quickwit( metastore_server_opt, metastore_client: metastore_client.clone(), control_plane_service, - control_plane_event_subscription_handles_opt, + _control_plane_event_subscription_handles_opt: control_plane_event_subscription_handles_opt, + _report_splits_subscription_handle_opt: report_splits_subscription_handle_opt, index_manager, indexing_service_opt, ingest_router_service, @@ -648,18 +684,18 @@ async fn setup_ingest_v2( } async fn setup_searcher( - searcher_config: SearcherConfig, cluster_change_stream: impl Stream + Send + 'static, metastore: Arc, storage_resolver: StorageResolver, + searcher_context: Arc, ) -> anyhow::Result<(SearchJobPlacer, Arc)> { let searcher_pool = SearcherPool::default(); let search_job_placer = SearchJobPlacer::new(searcher_pool.clone()); let search_service = start_searcher_service( - searcher_config, metastore, storage_resolver, search_job_placer.clone(), + searcher_context, ) .await?; let search_service_clone = search_service.clone(); @@ -867,6 +903,7 @@ mod tests { use chitchat::transport::ChannelTransport; use quickwit_cluster::{create_cluster_for_test, ClusterNode}; use quickwit_common::uri::Uri; + use quickwit_config::SearcherConfig; use quickwit_metastore::{metastore_for_test, IndexMetadata, ListIndexesQuery, MockMetastore}; use quickwit_proto::indexing::IndexingTask; use quickwit_search::Job; @@ -1002,13 +1039,13 @@ mod tests { #[tokio::test] async fn test_setup_searcher() { - let searcher_config = SearcherConfig::default(); + let searcher_context = Arc::new(SearcherContext::new(SearcherConfig::default(), None)); let metastore = metastore_for_test(); let (change_stream_tx, change_stream_rx) = mpsc::unbounded_channel(); let change_stream = UnboundedReceiverStream::new(change_stream_rx); let storage_resolver = StorageResolver::unconfigured(); let (search_job_placer, _searcher_service) = - setup_searcher(searcher_config, change_stream, metastore, storage_resolver) + setup_searcher(change_stream, metastore, storage_resolver, searcher_context) .await .unwrap(); diff --git a/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs b/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs index 551366978c3..1bb2607f567 100644 --- a/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs +++ b/quickwit/quickwit-serve/src/search_api/grpc_adapter.rs @@ -24,7 +24,7 @@ use futures::TryStreamExt; use quickwit_proto::error::convert_to_grpc_result; use quickwit_proto::search::{ search_service_server as grpc, GetKvRequest, GetKvResponse, LeafSearchStreamRequest, - LeafSearchStreamResponse, + LeafSearchStreamResponse, ReportSplitsRequest, ReportSplitsResponse, }; use quickwit_proto::{set_parent_span_from_request_metadata, tonic, ServiceError}; use quickwit_search::SearchService; @@ -152,4 +152,15 @@ impl grpc::SearchService for GrpcSearchAdapter { let get_response = GetKvResponse { payload }; Ok(tonic::Response::new(get_response)) } + + #[instrument(skip(self, request))] + async fn report_splits( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + set_parent_span_from_request_metadata(request.metadata()); + let get_search_after_context_request = request.into_inner(); + self.0.report_splits(get_search_after_context_request).await; + Ok(tonic::Response::new(ReportSplitsResponse {})) + } } diff --git a/quickwit/quickwit-storage/Cargo.toml b/quickwit/quickwit-storage/Cargo.toml index 737114c6d01..2745c326e3a 100644 --- a/quickwit/quickwit-storage/Cargo.toml +++ b/quickwit/quickwit-storage/Cargo.toml @@ -13,6 +13,7 @@ documentation = "https://quickwit.io/docs/" anyhow = { workspace = true } async-trait = { workspace = true } base64 = { workspace = true } +byte-unit = { workspace = true } bytes = { workspace = true } fnv = { workspace = true } futures = { workspace = true } @@ -32,6 +33,7 @@ tokio = { workspace = true, features = ["test-util"] } tokio-stream = { workspace = true } tokio-util = { workspace = true } tracing = { workspace = true } +ulid = { workspace = true } aws-config = { workspace = true } aws-credential-types = { workspace = true } @@ -46,6 +48,7 @@ azure_storage_blobs = { workspace = true, optional = true } quickwit-aws = { workspace = true } quickwit-common = { workspace = true } quickwit-config = { workspace = true } +quickwit-proto = { workspace = true } [dev-dependencies] mockall = { workspace = true } diff --git a/quickwit/quickwit-storage/src/cache/mod.rs b/quickwit/quickwit-storage/src/cache/mod.rs index fe621b2f39a..4793703eefb 100644 --- a/quickwit/quickwit-storage/src/cache/mod.rs +++ b/quickwit/quickwit-storage/src/cache/mod.rs @@ -44,7 +44,7 @@ use crate::{OwnedBytes, Storage}; /// - it relies on the idea that all of the files we attempt to cache /// have universally unique names. It happens to be true today, but this might be very error prone /// in the future. -pub fn wrap_storage_with_long_term_cache( +pub fn wrap_storage_with_cache( long_term_cache: Arc, storage: Arc, ) -> Arc { diff --git a/quickwit/quickwit-storage/src/lib.rs b/quickwit/quickwit-storage/src/lib.rs index 1904a0e4a6a..9c5bec34b86 100644 --- a/quickwit/quickwit-storage/src/lib.rs +++ b/quickwit/quickwit-storage/src/lib.rs @@ -49,11 +49,13 @@ mod payload; mod prefix_storage; mod ram_storage; mod split; +mod split_cache; mod storage_factory; mod storage_resolver; mod versioned_component; use quickwit_common::uri::Uri; +pub use split_cache::SplitCache; pub use tantivy::directory::OwnedBytes; pub use versioned_component::VersionedComponent; @@ -61,8 +63,7 @@ pub use self::bundle_storage::{BundleStorage, BundleStorageFileOffsets}; #[cfg(any(test, feature = "testsuite"))] pub use self::cache::MockStorageCache; pub use self::cache::{ - wrap_storage_with_long_term_cache, ByteRangeCache, MemorySizedCache, QuickwitCache, - StorageCache, + wrap_storage_with_cache, ByteRangeCache, MemorySizedCache, QuickwitCache, StorageCache, }; pub use self::local_file_storage::{LocalFileStorage, LocalFileStorageFactory}; #[cfg(feature = "azure")] diff --git a/quickwit/quickwit-storage/src/metrics.rs b/quickwit/quickwit-storage/src/metrics.rs index bb0b29196b6..9002bddf0ae 100644 --- a/quickwit/quickwit-storage/src/metrics.rs +++ b/quickwit/quickwit-storage/src/metrics.rs @@ -28,6 +28,7 @@ pub struct StorageMetrics { pub partial_request_cache: CacheMetrics, pub fast_field_cache: CacheMetrics, pub split_footer_cache: CacheMetrics, + pub searcher_split_cache: CacheMetrics, pub object_storage_get_total: IntCounter, pub object_storage_put_total: IntCounter, pub object_storage_put_parts: IntCounter, @@ -41,6 +42,8 @@ impl Default for StorageMetrics { fast_field_cache: CacheMetrics::for_component("fastfields"), shortlived_cache: CacheMetrics::for_component("shortlived"), partial_request_cache: CacheMetrics::for_component("partial_request"), + searcher_split_cache: CacheMetrics::for_component("searcher_split"), + split_footer_cache: CacheMetrics::for_component("splitfooter"), object_storage_get_total: new_counter( "object_storage_gets_total", @@ -84,7 +87,7 @@ pub struct CacheMetrics { } impl CacheMetrics { - fn for_component(component_name: &str) -> Self { + pub fn for_component(component_name: &str) -> Self { let namespace = format!("quickwit_cache_{component_name}"); CacheMetrics { component_name: component_name.to_string(), diff --git a/quickwit/quickwit-storage/src/split_cache/download_task.rs b/quickwit/quickwit-storage/src/split_cache/download_task.rs new file mode 100644 index 00000000000..d0e6acd2ca0 --- /dev/null +++ b/quickwit/quickwit-storage/src/split_cache/download_task.rs @@ -0,0 +1,121 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::num::NonZeroU32; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use quickwit_common::split_file; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; +use tracing::{error, instrument}; +use ulid::Ulid; + +use crate::split_cache::split_table::{CandidateSplit, DownloadOpportunity, SplitTable}; +use crate::StorageResolver; + +/// Removes the evicted split files from the file system. +/// This function just logs errors, and swallows them. +/// +/// At this point, the disk space is already accounted as released, +/// so the error could result in a "disk space leak". +#[instrument] +pub(crate) fn delete_evicted_splits(root_path: &Path, splits_to_delete: &[Ulid]) { + for &split_to_delete in splits_to_delete { + let split_file_path = root_path.join(split_file(split_to_delete)); + if let Err(_io_err) = std::fs::remove_file(&split_file_path) { + // This is an pretty critical error. The split size is not tracked anymore at this + // point. + error!(path=%split_file_path.display(), "Failed to remove split file from cache directory. This is critical as the file is now not taken in account in the cache size limits."); + } + } +} + +async fn download_split( + root_path: &Path, + candidate_split: &CandidateSplit, + storage_resolver: StorageResolver, +) -> anyhow::Result { + let CandidateSplit { + split_ulid, + storage_uri, + living_token: _, + } = candidate_split; + let split_filename = split_file(*split_ulid); + let target_filepath = root_path.join(&split_filename); + let storage = storage_resolver.resolve(storage_uri).await?; + let num_bytes = storage + .copy_to_file(Path::new(&split_filename), &target_filepath) + .await?; + Ok(num_bytes) +} + +async fn perform_eviction_and_download( + download_opportunity: DownloadOpportunity, + root_path: PathBuf, + storage_resolver: StorageResolver, + shared_split_table: Arc>, + _download_permit: OwnedSemaphorePermit, +) -> anyhow::Result<()> { + let DownloadOpportunity { + splits_to_delete, + split_to_download, + } = download_opportunity; + let split_ulid = split_to_download.split_ulid; + // tokio io runs on `spawn_blocking` threads anyway. + let root_path_clone = root_path.clone(); + let _ = tokio::task::spawn_blocking(move || { + delete_evicted_splits(&root_path_clone, &splits_to_delete[..]); + }) + .await; + let num_bytes = download_split(&root_path, &split_to_download, storage_resolver).await?; + let mut shared_split_table_lock = shared_split_table.lock().unwrap(); + shared_split_table_lock.register_as_downloaded(split_ulid, num_bytes); + Ok(()) +} + +pub(crate) fn spawn_download_task( + root_path: PathBuf, + shared_split_table: Arc>, + storage_resolver: StorageResolver, + num_concurrent_downloads: NonZeroU32, +) { + let semaphore = Arc::new(Semaphore::new(num_concurrent_downloads.get() as usize)); + tokio::task::spawn(async move { + loop { + let download_permit = Semaphore::acquire_owned(semaphore.clone()).await.unwrap(); + let download_opportunity_opt = shared_split_table + .lock() + .unwrap() + .find_download_opportunity(); + if let Some(download_opportunity) = download_opportunity_opt { + tokio::task::spawn(perform_eviction_and_download( + download_opportunity, + root_path.clone(), + storage_resolver.clone(), + shared_split_table.clone(), + download_permit, + )); + } else { + // We wait 1 sec before retrying, to avoid wasting CPU. + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + }); +} diff --git a/quickwit/quickwit-storage/src/split_cache/mod.rs b/quickwit/quickwit-storage/src/split_cache/mod.rs new file mode 100644 index 00000000000..59d4cde360c --- /dev/null +++ b/quickwit/quickwit-storage/src/split_cache/mod.rs @@ -0,0 +1,258 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +mod download_task; +mod split_table; + +use std::collections::BTreeMap; +use std::ffi::OsStr; +use std::fs::File; +use std::io::{self, Read, Seek, SeekFrom}; +use std::ops::Range; +use std::path::{Path, PathBuf}; +use std::str::FromStr; +use std::sync::{Arc, Mutex}; + +use async_trait::async_trait; +use quickwit_common::uri::Uri; +use quickwit_config::SplitCacheLimits; +use quickwit_proto::search::ReportSplit; +use tantivy::directory::OwnedBytes; +use tracing::{error, info, warn}; +use ulid::Ulid; + +use crate::split_cache::download_task::{delete_evicted_splits, spawn_download_task}; +use crate::split_cache::split_table::{SplitGuard, SplitTable}; +use crate::{wrap_storage_with_cache, Storage, StorageCache}; + +/// On disk Cache of splits for searchers. +/// +/// The search acts receives reports of splits. +pub struct SplitCache { + // Directory containing the cached split files. + // Split ids are universally unique, so we all put them in the same directory. + root_path: PathBuf, + // In memory structure, listing the splits we know about regardless + // of whether they are in cache, being downloaded, or just available for download. + split_table: Arc>, +} + +impl SplitCache { + /// Creates a new SplitCache and spawns the task that will continuously search for + /// download opportunities. + pub fn with_root_path( + root_path: PathBuf, + storage_resolver: crate::StorageResolver, + limits: SplitCacheLimits, + ) -> io::Result { + std::fs::create_dir_all(&root_path)?; + let mut existing_splits: BTreeMap = Default::default(); + for dir_entry_res in std::fs::read_dir(&root_path)? { + let dir_entry = dir_entry_res?; + let path = dir_entry.path(); + let meta = std::fs::metadata(&path)?; + if meta.is_dir() { + continue; + } + let ext = path.extension().and_then(OsStr::to_str).unwrap_or(""); + match ext { + "temp" => { + // This file is a temporary file that was being downloaded, when Quickwit was + // stopped (killed for instance) in a way that prevented + // their cleanup. It is important to remove it. + if let Err(io_err) = std::fs::remove_file(&path) { + if io_err.kind() != io::ErrorKind::NotFound { + error!(path=?path, "Failed to remove temporary file."); + } + } + } + "split" => { + if let Some(split_ulid) = split_id_from_path(&path) { + existing_splits.insert(split_ulid, meta.len()); + } else { + warn!(path=%path.display(), ".split file with invalid ulid in split cache directory. Ignoring."); + } + } + _ => { + warn!(path=%path.display(), "Unknown file in split cache directory. Ignoring."); + } + } + } + let mut split_table = SplitTable::with_limits_and_existing_splits(limits, existing_splits); + + // In case of a setting change, it could be useful to evict some splits on startup. + let splits_to_remove_opt = split_table.make_room_for_split_if_necessary(u64::MAX); + let root_path_clone = root_path.clone(); + if let Some(splits_to_remove) = splits_to_remove_opt { + info!( + num_splits = splits_to_remove.len(), + "Evicting splits from the searcher cache. Has the node configuration changed?" + ); + delete_evicted_splits(&root_path_clone, &splits_to_remove[..]); + } + let split_table_arc = Arc::new(Mutex::new(split_table)); + + spawn_download_task( + root_path.clone(), + split_table_arc.clone(), + storage_resolver, + limits.num_concurrent_downloads, + ); + + Ok(SplitCache { + root_path, + split_table: split_table_arc, + }) + } + + /// Wraps a storage with our split cache. + pub fn wrap_storage(self_arc: Arc, storage: Arc) -> Arc { + let cache = Arc::new(SplitCacheBackingStorage { + split_cache: self_arc, + storage_root_uri: storage.uri().clone(), + }); + wrap_storage_with_cache(cache, storage) + } + + /// Report the split cache about the existence of new splits. + pub fn report_splits(&self, report_splits: Vec) { + let mut split_table = self.split_table.lock().unwrap(); + for report_split in report_splits { + let Ok(split_ulid) = Ulid::from_str(&report_split.split_id) else { + error!(split_id=%report_split.split_id, "Received invalid split ulid. Ignoring."); + continue; + }; + let Ok(storage_uri) = Uri::from_str(&report_split.storage_uri) else { + error!(storage_uri=%report_split.storage_uri, "Received invalid storage uri. Ignoring."); + continue; + }; + split_table.report(split_ulid, storage_uri); + } + } + + fn cached_split_filepath(&self, split_id: Ulid) -> PathBuf { + let split_filename = quickwit_common::split_file(split_id); + self.root_path.join(split_filename) + } + + // Returns a split guard object. As long as it is not dropped, the + // split won't be evinced from the cache. + fn get_split_guard(&self, split_id: Ulid, storage_uri: &Uri) -> Option { + let split_guard = self + .split_table + .lock() + .unwrap() + .get_split_guard(split_id, storage_uri)?; + Some(SplitFilepath { + _split_guard: split_guard, + cached_split_file_path: self.cached_split_filepath(split_id), + }) + } +} + +pub struct SplitFilepath { + _split_guard: SplitGuard, + cached_split_file_path: PathBuf, +} + +impl AsRef for SplitFilepath { + fn as_ref(&self) -> &Path { + &self.cached_split_file_path + } +} + +fn split_id_from_path(split_path: &Path) -> Option { + let split_filename = split_path.file_name()?.to_str()?; + let split_id_str = split_filename.strip_suffix(".split")?; + Ulid::from_str(split_id_str).ok() +} + +struct SplitCacheBackingStorage { + split_cache: Arc, + storage_root_uri: Uri, +} + +impl SplitCacheBackingStorage { + async fn get_impl(&self, path: &Path, byte_range: Range) -> Option { + let split_id = split_id_from_path(path)?; + let split_guard = self + .split_cache + .get_split_guard(split_id, &self.storage_root_uri)?; + // TODO touch file in cache. + // We don't use async file io here because it spawn blocks anyway, and it feels dumb to + // spawn block 3 times in a row. + tokio::task::spawn_blocking(move || { + let mut file = File::open(&split_guard).ok()?; + file.seek(SeekFrom::Start(byte_range.start as u64)).ok()?; + let mut buf = Vec::with_capacity(byte_range.len()); + file.take(byte_range.len() as u64) + .read_to_end(&mut buf) + .ok()?; + Some(OwnedBytes::new(buf)) + }) + .await + // TODO Remove file from cache if io error? + .ok()? + } + + async fn get_all_impl(&self, path: &Path) -> Option { + let split_id = split_id_from_path(path)?; + let split_guard = self + .split_cache + .get_split_guard(split_id, &self.storage_root_uri)?; + // We don't use async file io here because it spawn blocks anyway, and it feels dumb to + // spawn block 3 times in a row. + tokio::task::spawn_blocking(move || { + let mut file = File::open(split_guard).ok()?; + let mut buf = Vec::new(); + file.read_to_end(&mut buf).ok()?; + Some(OwnedBytes::new(buf)) + }) + .await + .ok()? + } + + fn record_hit_metrics(&self, result_opt: Option<&OwnedBytes>) { + let split_metrics = &crate::STORAGE_METRICS.searcher_split_cache; + if let Some(result) = result_opt { + split_metrics.hits_num_items.inc(); + split_metrics.hits_num_bytes.inc_by(result.len() as u64); + } else { + split_metrics.misses_num_items.inc(); + } + } +} + +#[async_trait] +impl StorageCache for SplitCacheBackingStorage { + async fn get(&self, path: &Path, byte_range: Range) -> Option { + let result = self.get_impl(path, byte_range).await; + self.record_hit_metrics(result.as_ref()); + result + } + + async fn get_all(&self, path: &Path) -> Option { + let result = self.get_all_impl(path).await; + self.record_hit_metrics(result.as_ref()); + result + } + + async fn put(&self, _path: PathBuf, _byte_range: Range, _bytes: OwnedBytes) {} + async fn put_all(&self, _path: PathBuf, _bytes: OwnedBytes) {} +} diff --git a/quickwit/quickwit-storage/src/split_cache/split_table.rs b/quickwit/quickwit-storage/src/split_cache/split_table.rs new file mode 100644 index 00000000000..3cf08635022 --- /dev/null +++ b/quickwit/quickwit-storage/src/split_cache/split_table.rs @@ -0,0 +1,652 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::cmp::Ordering; +use std::collections::{BTreeMap, BTreeSet, HashMap}; +use std::sync::{Arc, Weak}; +use std::time::Instant; + +use quickwit_common::uri::Uri; +use quickwit_config::SplitCacheLimits; +use ulid::Ulid; + +type LastAccessDate = u64; + +/// Maximum number of splits to track. +const MAX_NUM_CANDIDATES: usize = 1_000; + +#[derive(Clone, Copy)] +pub(crate) struct SplitKey { + pub last_accessed: LastAccessDate, + pub split_ulid: Ulid, +} + +impl PartialOrd for SplitKey { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for SplitKey { + fn cmp(&self, other: &Self) -> Ordering { + (self.last_accessed, &self.split_ulid).cmp(&(other.last_accessed, &other.split_ulid)) + } +} + +impl PartialEq for SplitKey { + fn eq(&self, other: &Self) -> bool { + (self.last_accessed, &self.split_ulid) == (other.last_accessed, &other.split_ulid) + } +} + +impl Eq for SplitKey {} + +#[derive(Clone, Debug)] +enum Status { + Candidate(CandidateSplit), + Downloading { alive_token: Weak<()> }, + OnDisk { num_bytes: u64 }, +} + +impl PartialEq for Status { + fn eq(&self, other: &Status) -> bool { + match (self, other) { + (Status::Candidate(candidate_split), Status::Candidate(other_candidate_split)) => { + candidate_split == other_candidate_split + } + (Status::Downloading { .. }, Status::Downloading { .. }) => true, + ( + Status::OnDisk { num_bytes }, + Status::OnDisk { + num_bytes: other_num_bytes, + }, + ) => num_bytes == other_num_bytes, + _ => false, + } + } +} + +pub struct SplitInfo { + pub(crate) split_key: SplitKey, + status: Status, +} + +/// The split table keeps track of splits we know about (regardless of whether they have already +/// been downloaded or not). +/// +/// Invariant: +/// Each split appearing into split_to_status, should be listed 1 and exactly once in the +/// either +/// - on_disk_splits +/// - downloading_splits +/// - candidate_splits. +/// +/// It is possible for the split table to exceed its limits, by at most one split. +pub struct SplitTable { + on_disk_splits: BTreeSet, + downloading_splits: BTreeSet, + candidate_splits: BTreeSet, + split_to_status: HashMap, + start_time: Instant, + limits: SplitCacheLimits, + on_disk_bytes: u64, +} + +impl SplitTable { + pub(crate) fn with_limits_and_existing_splits( + limits: SplitCacheLimits, + existing_filepaths: BTreeMap, + ) -> SplitTable { + let mut split_table = SplitTable { + on_disk_splits: BTreeSet::default(), + candidate_splits: BTreeSet::default(), + downloading_splits: BTreeSet::default(), + split_to_status: HashMap::default(), + start_time: Instant::now(), + limits, + on_disk_bytes: 0u64, + }; + split_table.acknowledge_on_disk_splits(existing_filepaths); + split_table + } + + fn acknowledge_on_disk_splits(&mut self, existing_filepaths: BTreeMap) { + for (split_ulid, num_bytes) in existing_filepaths { + let split_info = SplitInfo { + split_key: SplitKey { + last_accessed: 0, + split_ulid, + }, + status: Status::OnDisk { num_bytes }, + }; + self.insert(split_info); + } + } +} + +fn compute_timestamp(start: Instant) -> LastAccessDate { + start.elapsed().as_micros() as u64 +} + +// TODO improve SplitGuard with Atomic +// Right only touch is helping. +pub(super) struct SplitGuard; + +impl SplitTable { + pub(super) fn get_split_guard( + &mut self, + split_ulid: Ulid, + storage_uri: &Uri, + ) -> Option { + if let Status::OnDisk { .. } = self.touch(split_ulid, storage_uri) { + Some(SplitGuard) + } else { + None + } + } + + fn remove(&mut self, split_ulid: Ulid) -> Option { + let split_info = self.split_to_status.remove(&split_ulid)?; + let split_queue: &mut BTreeSet = match split_info.status { + Status::Candidate { .. } => &mut self.candidate_splits, + Status::Downloading { .. } => &mut self.downloading_splits, + Status::OnDisk { num_bytes } => { + self.on_disk_bytes -= num_bytes; + crate::metrics::STORAGE_METRICS + .searcher_split_cache + .in_cache_count + .dec(); + crate::metrics::STORAGE_METRICS + .searcher_split_cache + .in_cache_num_bytes + .sub(num_bytes as i64); + &mut self.on_disk_splits + } + }; + let is_in_queue = split_queue.remove(&split_info.split_key); + assert!(is_in_queue); + if let Status::Downloading { alive_token } = &split_info.status { + if alive_token.strong_count() == 0 { + return None; + } + } + Some(split_info) + } + + fn gc_downloading_splits_if_necessary(&mut self) { + if self.downloading_splits.len() + < (self.limits.num_concurrent_downloads.get() as usize + 10) + { + return; + } + let mut splits_to_remove = Vec::new(); + for split in &self.downloading_splits { + if let Some(split_info) = self.split_to_status.get(&split.split_ulid) { + if let Status::Downloading { alive_token } = &split_info.status { + if alive_token.strong_count() == 0 { + splits_to_remove.push(split.split_ulid); + } + } + } + } + for split in splits_to_remove { + self.remove(split); + } + } + + /// Insert a `split_info`. This methods assumes the split was not present in the split table + /// to begin with. It will panic if the split was already present. + /// + /// Keep this method private. + fn insert(&mut self, split_info: SplitInfo) { + let was_not_in_queue = match split_info.status { + Status::Candidate { .. } => { + let was_not_in_queue = self.candidate_splits.insert(split_info.split_key); + self.truncate_candidate_list(); + was_not_in_queue + } + Status::Downloading { .. } => self.downloading_splits.insert(split_info.split_key), + Status::OnDisk { num_bytes } => { + self.on_disk_bytes += num_bytes; + crate::metrics::STORAGE_METRICS + .searcher_split_cache + .in_cache_count + .inc(); + crate::metrics::STORAGE_METRICS + .searcher_split_cache + .in_cache_num_bytes + .add(num_bytes as i64); + self.on_disk_splits.insert(split_info.split_key) + } + }; + self.gc_downloading_splits_if_necessary(); + assert!(was_not_in_queue); + let split_ulid_was_absent = self + .split_to_status + .insert(split_info.split_key.split_ulid, split_info) + .is_none(); + assert!(split_ulid_was_absent); + } + + fn touch(&mut self, split_ulid: Ulid, storage_uri: &Uri) -> Status { + let timestamp = compute_timestamp(self.start_time); + self.mutate_split(split_ulid, |old_split_info| { + if let Some(mut split_info) = old_split_info { + split_info.split_key.last_accessed = timestamp; + split_info + } else { + SplitInfo { + split_key: SplitKey { + split_ulid, + last_accessed: timestamp, + }, + status: Status::Candidate(CandidateSplit { + storage_uri: storage_uri.clone(), + split_ulid, + living_token: Arc::new(()), + }), + } + } + }) + } + + /// Mutates a split ulid. + /// + /// By design this function maintains the invariant. + /// It removes the split with the given ulid, modifies, and re + fn mutate_split( + &mut self, + split_ulid: Ulid, + mutate_fn: impl FnOnce(Option) -> SplitInfo, + ) -> Status { + let split_info_opt = self.remove(split_ulid); + let new_split: SplitInfo = mutate_fn(split_info_opt); + let new_status = new_split.status.clone(); + self.insert(new_split); + new_status + } + + fn change_split_status(&mut self, split_ulid: Ulid, status: Status) { + let start_time = self.start_time; + self.mutate_split(split_ulid, move |split_info_opt| { + if let Some(mut split_info) = split_info_opt { + split_info.status = status; + split_info + } else { + SplitInfo { + split_key: SplitKey { + last_accessed: compute_timestamp(start_time), + split_ulid, + }, + status, + } + } + }); + } + + pub(crate) fn report(&mut self, split_ulid: Ulid, storage_uri: Uri) { + self.mutate_split(split_ulid, move |split_info_opt| { + if let Some(split_info) = split_info_opt { + return split_info; + } + SplitInfo { + split_key: SplitKey { + last_accessed: 0u64, + split_ulid, + }, + status: Status::Candidate(CandidateSplit { + storage_uri, + split_ulid, + living_token: Arc::new(()), + }), + } + }); + } + + /// Make sure we have at most `MAX_CANDIDATES` candidate splits. + fn truncate_candidate_list(&mut self) { + while self.candidate_splits.len() > MAX_NUM_CANDIDATES { + let worst_candidate = self.candidate_splits.first().unwrap().split_ulid; + self.remove(worst_candidate); + } + } + + pub(crate) fn register_as_downloaded(&mut self, split_ulid: Ulid, num_bytes: u64) { + self.change_split_status(split_ulid, Status::OnDisk { num_bytes }); + } + + /// Change the state of the given split from candidate to downloading state, + /// and returns its URI. + /// + /// This function does NOT trigger the download itself. It is up to + /// the caller to actually initiate the download. + pub(crate) fn start_download(&mut self, split_ulid: Ulid) -> Option { + let split_info = self.remove(split_ulid)?; + let Status::Candidate(candidate_split) = split_info.status else { + self.insert(split_info); + return None; + }; + let alive_token = Arc::downgrade(&candidate_split.living_token); + self.insert(SplitInfo { + split_key: split_info.split_key, + status: Status::Downloading { alive_token }, + }); + Some(candidate_split) + } + + fn best_candidate(&self) -> Option { + self.candidate_splits.last().copied() + } + + fn is_out_of_limits(&self) -> bool { + if self.on_disk_splits.is_empty() { + return false; + } + if self.on_disk_splits.len() + self.downloading_splits.len() + > self.limits.max_num_splits.get() as usize + { + return true; + } + if self.on_disk_bytes > self.limits.max_num_bytes.get_bytes() { + return true; + } + false + } + + /// Evicts splits to reach the target limits. + /// + /// Returns false if the first candidate for eviction is + /// fresher that the candidate split. (Note this is suboptimal. + /// + /// Returns `None` if this would mean evicting splits that + /// have been accessed more recently than the candidate split. + pub(crate) fn make_room_for_split_if_necessary( + &mut self, + last_access_date: LastAccessDate, + ) -> Option> { + let mut split_infos = Vec::new(); + while self.is_out_of_limits() { + if let Some(first_split) = self.on_disk_splits.first() { + if first_split.last_accessed > last_access_date { + // This is not worth doing the eviction. + break; + } + split_infos.extend(self.remove(first_split.split_ulid)); + } else { + break; + } + } + if self.is_out_of_limits() { + // We are still out of limits. + // Let's not go through with the eviction, and reinsert the splits. + for split_info in split_infos { + self.insert(split_info); + } + None + } else { + Some( + split_infos + .into_iter() + .map(|split_info| split_info.split_key.split_ulid) + .collect(), + ) + } + } + + pub(crate) fn find_download_opportunity(&mut self) -> Option { + let best_candidate_split_key = self.best_candidate()?; + let splits_to_delete: Vec = + self.make_room_for_split_if_necessary(best_candidate_split_key.last_accessed)?; + let split_to_download: CandidateSplit = + self.start_download(best_candidate_split_key.split_ulid)?; + Some(DownloadOpportunity { + splits_to_delete, + split_to_download, + }) + } + + #[cfg(test)] + pub fn num_bytes(&self) -> u64 { + self.on_disk_bytes + } +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub(crate) struct CandidateSplit { + pub storage_uri: Uri, + pub split_ulid: Ulid, + pub living_token: Arc<()>, +} + +pub(crate) struct DownloadOpportunity { + // At this point, the split have already been removed from the split table. + // The file however need to be deleted. + pub splits_to_delete: Vec, + pub split_to_download: CandidateSplit, +} + +#[cfg(test)] +mod tests { + use std::num::NonZeroU32; + + use byte_unit::Byte; + use quickwit_common::uri::Uri; + use quickwit_config::SplitCacheLimits; + use ulid::Ulid; + + use crate::split_cache::split_table::{DownloadOpportunity, SplitTable}; + + const TEST_STORAGE_URI: &str = "s3://test"; + + fn sorted_split_ulids(num_splits: usize) -> Vec { + let mut split_ulids: Vec = + std::iter::repeat_with(Ulid::new).take(num_splits).collect(); + split_ulids.sort(); + split_ulids + } + + #[test] + fn test_split_table() { + let mut split_table = SplitTable::with_limits_and_existing_splits( + SplitCacheLimits { + max_num_bytes: Byte::from_bytes(1000), + max_num_splits: NonZeroU32::new(1).unwrap(), + num_concurrent_downloads: NonZeroU32::new(1).unwrap(), + }, + Default::default(), + ); + let ulids = sorted_split_ulids(2); + let ulid1 = ulids[0]; + let ulid2 = ulids[1]; + split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(ulid2, Uri::for_test(TEST_STORAGE_URI)); + let candidate = split_table.best_candidate().unwrap(); + assert_eq!(candidate.split_ulid, ulid2); + } + + #[test] + fn test_split_table_prefer_last_touched() { + let mut split_table = SplitTable::with_limits_and_existing_splits( + SplitCacheLimits { + max_num_bytes: Byte::from_bytes(1000), + max_num_splits: NonZeroU32::new(1).unwrap(), + num_concurrent_downloads: NonZeroU32::new(1).unwrap(), + }, + Default::default(), + ); + let ulids = sorted_split_ulids(2); + let ulid1 = ulids[0]; + let ulid2 = ulids[1]; + split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(ulid2, Uri::for_test(TEST_STORAGE_URI)); + let split_guard_opt = split_table.get_split_guard(ulid1, &Uri::for_test("s3://test1/")); + assert!(split_guard_opt.is_none()); + let candidate = split_table.best_candidate().unwrap(); + assert_eq!(candidate.split_ulid, ulid1); + } + + #[test] + fn test_split_table_prefer_start_download_prevent_new_report() { + let mut split_table = SplitTable::with_limits_and_existing_splits( + SplitCacheLimits { + max_num_bytes: Byte::from_bytes(1000), + max_num_splits: NonZeroU32::new(1).unwrap(), + num_concurrent_downloads: NonZeroU32::new(1).unwrap(), + }, + Default::default(), + ); + let ulid1 = Ulid::new(); + split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI)); + assert_eq!(split_table.num_bytes(), 0); + let download = split_table.start_download(ulid1); + assert!(download.is_some()); + assert!(split_table.start_download(ulid1).is_none()); + split_table.register_as_downloaded(ulid1, 10_000_000); + assert_eq!(split_table.num_bytes(), 10_000_000); + split_table.get_split_guard(ulid1, &Uri::for_test(TEST_STORAGE_URI)); + let ulid2 = Ulid::new(); + split_table.report(ulid2, Uri::for_test("s3://test`/")); + let download = split_table.start_download(ulid2); + assert!(download.is_some()); + assert!(split_table.start_download(ulid2).is_none()); + assert_eq!(split_table.num_bytes(), 10_000_000); + split_table.register_as_downloaded(ulid2, 3_000_000); + assert_eq!(split_table.num_bytes(), 13_000_000); + } + + #[test] + fn test_eviction_due_to_size() { + let mut split_table = SplitTable::with_limits_and_existing_splits( + SplitCacheLimits { + max_num_bytes: Byte::from_bytes(1_000_000), + max_num_splits: NonZeroU32::new(30).unwrap(), + num_concurrent_downloads: NonZeroU32::new(1).unwrap(), + }, + Default::default(), + ); + let mut split_ulids: Vec = std::iter::repeat_with(Ulid::new).take(6).collect(); + split_ulids.sort(); + let splits = [ + (split_ulids[0], 10_000), + (split_ulids[1], 20_000), + (split_ulids[2], 300_000), + (split_ulids[3], 400_000), + (split_ulids[4], 100_000), + (split_ulids[5], 300_000), + ]; + for (split_ulid, num_bytes) in splits { + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.register_as_downloaded(split_ulid, num_bytes); + } + let new_ulid = Ulid::new(); + split_table.report(new_ulid, Uri::for_test(TEST_STORAGE_URI)); + let DownloadOpportunity { + splits_to_delete, + split_to_download, + } = split_table.find_download_opportunity().unwrap(); + assert_eq!( + &splits_to_delete[..], + &[splits[0].0, splits[1].0, splits[2].0][..] + ); + assert_eq!(split_to_download.split_ulid, new_ulid); + } + + #[test] + fn test_eviction_due_to_num_splits() { + let mut split_table = SplitTable::with_limits_and_existing_splits( + SplitCacheLimits { + max_num_bytes: Byte::from_bytes(10_000_000), + max_num_splits: NonZeroU32::new(5).unwrap(), + num_concurrent_downloads: NonZeroU32::new(1).unwrap(), + }, + Default::default(), + ); + let mut split_ulids: Vec = std::iter::repeat_with(Ulid::new).take(6).collect(); + split_ulids.sort(); + let splits = [ + (split_ulids[0], 10_000), + (split_ulids[1], 20_000), + (split_ulids[2], 300_000), + (split_ulids[3], 400_000), + (split_ulids[4], 100_000), + (split_ulids[5], 300_000), + ]; + for (split_ulid, num_bytes) in splits { + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.register_as_downloaded(split_ulid, num_bytes); + } + let new_ulid = Ulid::new(); + split_table.report(new_ulid, Uri::for_test(TEST_STORAGE_URI)); + let DownloadOpportunity { + splits_to_delete, + split_to_download, + } = split_table.find_download_opportunity().unwrap(); + assert_eq!(&splits_to_delete[..], &[splits[0].0][..]); + assert_eq!(split_to_download.split_ulid, new_ulid); + } + + #[test] + fn test_failed_download_can_be_re_reported() { + let mut split_table = SplitTable::with_limits_and_existing_splits( + SplitCacheLimits { + max_num_bytes: Byte::from_bytes(10_000_000), + max_num_splits: NonZeroU32::new(5).unwrap(), + num_concurrent_downloads: NonZeroU32::new(1).unwrap(), + }, + Default::default(), + ); + let split_ulid = Ulid::new(); + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + let candidate = split_table.start_download(split_ulid).unwrap(); + // This report should be cancelled as we have a download currently running. + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + + assert!(split_table.start_download(split_ulid).is_none()); + std::mem::drop(candidate); + + // Still not possible to start a download. + assert!(split_table.start_download(split_ulid).is_none()); + + // This report should be considered as our candidate (and its alive token has been dropped) + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + + let candidate2 = split_table.start_download(split_ulid).unwrap(); + assert_eq!(candidate2.split_ulid, split_ulid); + } + + #[test] + fn test_split_table_truncate_candidates() { + let mut split_table = SplitTable::with_limits_and_existing_splits( + SplitCacheLimits { + max_num_bytes: Byte::from_bytes(10_000_000), + max_num_splits: NonZeroU32::new(5).unwrap(), + num_concurrent_downloads: NonZeroU32::new(1).unwrap(), + }, + Default::default(), + ); + for i in 1..2_000 { + let split_ulid = Ulid::new(); + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + assert_eq!( + split_table.candidate_splits.len(), + i.min(super::MAX_NUM_CANDIDATES) + ); + } + } +} diff --git a/quickwit/quickwit-storage/src/split_cache/tests.rs b/quickwit/quickwit-storage/src/split_cache/tests.rs new file mode 100644 index 00000000000..9c85add52e8 --- /dev/null +++ b/quickwit/quickwit-storage/src/split_cache/tests.rs @@ -0,0 +1,178 @@ +// Copyright (C) 2023 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::num::NonZeroU32; + +use byte_unit::Byte; +use quickwit_common::uri::Uri; +use quickwit_config::SplitCacheLimits; +use ulid::Ulid; + +use crate::split_cache::split_table::{DownloadOpportunity, SplitTable}; + +const TEST_STORAGE_URI: &'static str = "s3://test"; + +#[test] +fn test_split_table() { + let mut split_table = SplitTable::with_limits(SplitCacheLimits { + max_num_bytes: Byte::from_bytes(1000), + max_num_splits: NonZeroU32::new(1).unwrap(), + num_concurrent_downloads: NonZeroU32::new(1).unwrap(), + }); + let ulid1 = Ulid::new(); + let ulid2 = Ulid::new(); + split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(ulid2, Uri::for_test(TEST_STORAGE_URI)); + let candidate = split_table.best_candidate().unwrap(); + assert_eq!(candidate.split_ulid, ulid2); +} + +#[test] +fn test_split_table_prefer_last_touched() { + let mut split_table = SplitTable::with_limits(SplitCacheLimits { + max_num_bytes: Byte::from_bytes(1000), + max_num_splits: NonZeroU32::new(1).unwrap(), + num_concurrent_downloads: NonZeroU32::new(1).unwrap(), + }); + let ulid1 = Ulid::new(); + let ulid2 = Ulid::new(); + split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI)); + split_table.report(ulid2, Uri::for_test(TEST_STORAGE_URI)); + let split_guard_opt = split_table.get_split_guard(ulid1, &Uri::for_test("s3://test1/")); + assert!(split_guard_opt.is_none()); + let candidate = split_table.best_candidate().unwrap(); + assert_eq!(candidate.split_ulid, ulid1); +} + +#[test] +fn test_split_table_prefer_start_download_prevent_new_report() { + let mut split_table = SplitTable::with_limits(SplitCacheLimits { + max_num_bytes: Byte::from_bytes(1000), + max_num_splits: NonZeroU32::new(1).unwrap(), + num_concurrent_downloads: NonZeroU32::new(1).unwrap(), + }); + let ulid1 = Ulid::new(); + split_table.report(ulid1, Uri::for_test(TEST_STORAGE_URI)); + assert_eq!(split_table.num_bytes(), 0); + let download = split_table.start_download(ulid1); + assert!(download.is_some()); + assert!(split_table.start_download(ulid1).is_none()); + split_table.register_as_downloaded(ulid1, 10_000_000); + assert_eq!(split_table.num_bytes(), 10_000_000); + split_table.get_split_guard(ulid1, &Uri::for_test(TEST_STORAGE_URI)); + let ulid2 = Ulid::new(); + split_table.report(ulid2, Uri::for_test("s3://test`/")); + let download = split_table.start_download(ulid2); + assert!(download.is_some()); + assert!(split_table.start_download(ulid2).is_none()); + assert_eq!(split_table.num_bytes(), 10_000_000); + split_table.register_as_downloaded(ulid2, 3_000_000); + assert_eq!(split_table.num_bytes(), 13_000_000); +} + +#[test] +fn test_eviction_due_to_size() { + let mut split_table = SplitTable::with_limits(SplitCacheLimits { + max_num_bytes: Byte::from_bytes(1_000_000), + max_num_splits: NonZeroU32::new(30).unwrap(), + num_concurrent_downloads: NonZeroU32::new(1).unwrap(), + }); + let mut split_ulids: Vec = std::iter::repeat_with(Ulid::new).take(6).collect(); + split_ulids.sort(); + let splits = [ + (split_ulids[0], 10_000), + (split_ulids[1], 20_000), + (split_ulids[2], 300_000), + (split_ulids[3], 400_000), + (split_ulids[4], 100_000), + (split_ulids[5], 300_000), + ]; + for (split_ulid, num_bytes) in splits { + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.register_as_downloaded(split_ulid, num_bytes); + } + let new_ulid = Ulid::new(); + split_table.report(new_ulid, Uri::for_test(TEST_STORAGE_URI)); + let DownloadOpportunity { + splits_to_delete, + split_to_download, + } = split_table.find_download_opportunity().unwrap(); + assert_eq!( + &splits_to_delete[..], + &[splits[0].0, splits[1].0, splits[2].0][..] + ); + assert_eq!(split_to_download.split_ulid, new_ulid); +} + +#[test] +fn test_eviction_due_to_num_splits() { + let mut split_table = SplitTable::with_limits(SplitCacheLimits { + max_num_bytes: Byte::from_bytes(10_000_000), + max_num_splits: NonZeroU32::new(5).unwrap(), + num_concurrent_downloads: NonZeroU32::new(1).unwrap(), + }); + let mut split_ulids: Vec = std::iter::repeat_with(Ulid::new).take(6).collect(); + split_ulids.sort(); + let splits = [ + (split_ulids[0], 10_000), + (split_ulids[1], 20_000), + (split_ulids[2], 300_000), + (split_ulids[3], 400_000), + (split_ulids[4], 100_000), + (split_ulids[5], 300_000), + ]; + for (split_ulid, num_bytes) in splits { + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + split_table.register_as_downloaded(split_ulid, num_bytes); + } + let new_ulid = Ulid::new(); + split_table.report(new_ulid, Uri::for_test(TEST_STORAGE_URI)); + let DownloadOpportunity { + splits_to_delete, + split_to_download, + } = split_table.find_download_opportunity().unwrap(); + assert_eq!(&splits_to_delete[..], &[splits[0].0][..]); + assert_eq!(split_to_download.split_ulid, new_ulid); +} + +#[test] +fn test_failed_download_can_be_re_reported() { + let mut split_table = SplitTable::with_limits(SplitCacheLimits { + max_num_bytes: Byte::from_bytes(10_000_000), + max_num_splits: NonZeroU32::new(5).unwrap(), + num_concurrent_downloads: NonZeroU32::new(1).unwrap(), + }); + let split_ulid = Ulid::new(); + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + let candidate = split_table.start_download(split_ulid).unwrap(); + // This report should be cancelled as we have a download currently running. + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + + assert!(split_table.start_download(split_ulid).is_none()); + std::mem::drop(candidate); + + // Still not possible to start a download. + assert!(split_table.start_download(split_ulid).is_none()); + + // This report should be considered as our candidate (and its alive token has been dropped) + split_table.report(split_ulid, Uri::for_test(TEST_STORAGE_URI)); + + let candidate2 = split_table.start_download(split_ulid).unwrap(); + assert_eq!(candidate2.split_ulid, split_ulid); +} diff --git a/quickwit/quickwit-storage/src/storage.rs b/quickwit/quickwit-storage/src/storage.rs index 12ba5604a50..ea1b0aac9a9 100644 --- a/quickwit/quickwit-storage/src/storage.rs +++ b/quickwit/quickwit-storage/src/storage.rs @@ -18,12 +18,16 @@ // along with this program. If not, see . use std::fmt; +use std::io::{self, ErrorKind}; use std::ops::Range; -use std::path::Path; +use std::path::{Path, PathBuf}; use async_trait::async_trait; use quickwit_common::uri::Uri; +use tempfile::TempPath; +use tokio::fs::File; use tokio::io::AsyncWrite; +use tracing::error; use crate::{BulkDeleteError, OwnedBytes, PutPayload, StorageErrorKind, StorageResult}; @@ -81,14 +85,19 @@ pub trait Storage: fmt::Debug + Send + Sync + 'static { /// `output_path` is expected to be a file path (not a directory path) /// without any existing file yet. /// - /// If the call is successful, the file will be created. - /// If not, the file may or may not have been created. + /// This function will attempt to download things to a temporary file + /// in the same directory as the `output_path`, and then atomically move it + /// to the actual `output_path`. + /// + /// In case of failure, `quickwit` (not the OS) will attempt to delete the file + /// using some `Drop` mechanic. + /// If quickwit is killed for instance, this may result in the temporary file not + /// being deleted. It is important, upon started to identify these ".temp" + /// files and delete them. /// /// See also `copy_to`. - async fn copy_to_file(&self, path: &Path, output_path: &Path) -> StorageResult<()> { - let mut file = tokio::fs::File::create(output_path).await?; - self.copy_to(path, &mut file).await?; - Ok(()) + async fn copy_to_file(&self, path: &Path, output_path: &Path) -> StorageResult { + default_copy_to_file(self, path, output_path).await } /// Downloads a slice of a file from the storage, and returns an in memory buffer @@ -125,3 +134,133 @@ pub trait Storage: fmt::Debug + Send + Sync + 'static { /// Returns an URI identifying the storage fn uri(&self) -> &Uri; } + +async fn default_copy_to_file( + storage: &S, + path: &Path, + output_path: &Path, +) -> StorageResult { + let mut download_temp_file = + DownloadTempFile::with_target_path(output_path.to_path_buf()).await?; + storage.copy_to(path, download_temp_file.as_mut()).await?; + let num_bytes = download_temp_file.persist().await?; + Ok(num_bytes) +} + +struct DownloadTempFile { + target_filepath: PathBuf, + temp_filepath: PathBuf, + file: File, + has_attempted_deletion: bool, +} + +impl DownloadTempFile { + /// Creates or truncate temp file. + pub async fn with_target_path(target_filepath: PathBuf) -> io::Result { + let Some(filename) = target_filepath.file_name() else { + return Err(io::Error::new( + ErrorKind::Other, + "Target filepath is not a directory path. Expected a filepath.", + )); + }; + let filename: &str = filename.to_str().ok_or_else(|| { + io::Error::new( + ErrorKind::Other, + "Target filepath is not a valid UTF-8 string.", + ) + })?; + let mut temp_filepath = target_filepath.clone(); + temp_filepath.set_file_name(format!("{filename}.temp")); + let file = tokio::fs::File::create(temp_filepath.clone()).await?; + Ok(DownloadTempFile { + target_filepath, + temp_filepath, + file, + has_attempted_deletion: false, + }) + } + + pub async fn persist(mut self) -> io::Result { + TempPath::from_path(&self.temp_filepath).persist(&self.target_filepath)?; + self.has_attempted_deletion = true; + let num_bytes = std::fs::metadata(&self.target_filepath)?.len(); + Ok(num_bytes) + } +} + +impl Drop for DownloadTempFile { + fn drop(&mut self) { + if self.has_attempted_deletion { + return; + } + let temp_filepath = self.temp_filepath.clone(); + self.has_attempted_deletion = true; + tokio::task::spawn_blocking(move || { + if let Err(io_error) = std::fs::remove_file(&temp_filepath) { + error!(temp_filepath=%temp_filepath.display(), io_error=?io_error, "Failed to remove temporary file"); + } + }); + } +} + +impl AsMut for DownloadTempFile { + fn as_mut(&mut self) -> &mut File { + &mut self.file + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::*; + use crate::{RamStorage, StorageError}; + + const CONTENT: &[u8] = b"hello world"; + + #[tokio::test] + async fn test_copy_to_file() { + let ram_storage = RamStorage::default(); + let temp_dir = tempfile::tempdir().unwrap(); + let dest_filepath = temp_dir.path().join("bar"); + let path = Path::new("foo/bar"); + ram_storage + .put(path, Box::new(CONTENT.to_owned())) + .await + .unwrap(); + let num_bytes = ram_storage + .copy_to_file(path, &dest_filepath) + .await + .unwrap(); + assert_eq!(num_bytes, 11); + let content = std::fs::read(&dest_filepath).unwrap(); + assert_eq!(&content, CONTENT); + } + + #[tokio::test] + async fn test_copy_to_file_deletes_tempfile_on_failure() { + let mut storage = MockStorage::default(); + storage.expect_copy_to().return_once(|_, _| { + Box::pin(futures::future::err(StorageError::from(io::Error::new( + io::ErrorKind::Other, + "fake storage error", + )))) + }); + let path = Path::new("foo/bar"); + let temp_dir = tempfile::tempdir().unwrap(); + let dest_filepath = temp_dir.path().join("bar"); + default_copy_to_file(&storage, path, &dest_filepath) + .await + .unwrap_err(); + tokio::time::sleep(Duration::from_millis(100)).await; + let mut read_dir = tokio::fs::read_dir(dest_filepath.parent().unwrap()) + .await + .unwrap(); + let entry_opt = read_dir + .next_entry() + .await + .unwrap() + .map(|dir_entry| dir_entry.path()); + assert_eq!(entry_opt, None); + } +} diff --git a/quickwit/quickwit-storage/tests/s3_storage.rs b/quickwit/quickwit-storage/tests/s3_storage.rs index cecb80936aa..40ba7c792c5 100644 --- a/quickwit/quickwit-storage/tests/s3_storage.rs +++ b/quickwit/quickwit-storage/tests/s3_storage.rs @@ -48,6 +48,7 @@ fn test_runtime_singleton() -> &'static Runtime { }) } +#[cfg(feature = "testsuite")] async fn run_s3_storage_test_suite(s3_storage_config: S3StorageConfig, bucket_uri: &str) { setup_logging_for_tests();