diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 28fcd4f757a..941825aa6b7 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -6170,6 +6170,7 @@ version = "0.8.0" dependencies = [ "anyhow", "aws_lambda_events 0.15.1", + "bytesize", "chitchat", "chrono", "flate2", @@ -6199,6 +6200,7 @@ dependencies = [ "reqwest", "serde", "serde_json", + "serial_test", "time", "tokio", "tracing", diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 0b742d87912..546031e574f 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -215,6 +215,7 @@ serde_json_borrow = "0.5" serde_qs = { version = "0.12", features = ["warp"] } serde_with = "3.9.0" serde_yaml = "0.9" +serial_test = { version = "3.1.1", features = ["file_locks"] } siphasher = "0.3" smallvec = "1" sqlx = { version = "0.7", features = [ diff --git a/quickwit/quickwit-actors/src/actor.rs b/quickwit/quickwit-actors/src/actor.rs index c5caf6bcf0b..b71de73c5b3 100644 --- a/quickwit/quickwit-actors/src/actor.rs +++ b/quickwit/quickwit-actors/src/actor.rs @@ -36,11 +36,10 @@ pub enum ActorExitStatus { /// The actor successfully exited. /// /// It happens either because: - /// - all of the existing mailboxes were dropped and the actor message queue was exhausted. - /// No new message could ever arrive to the actor. (This exit is triggered by the framework.) - /// or - /// - the actor `process_message` method returned `Err(ExitStatusCode::Success)`. - /// (This exit is triggered by the actor implementer.) + /// - all of the existing mailboxes were dropped and the actor message queue was exhausted. No + /// new message could ever arrive to the actor. (This exit is triggered by the framework.) or + /// - the actor `process_message` method returned `Err(ExitStatusCode::Success)`. (This exit is + /// triggered by the actor implementer.) /// /// (This is equivalent to exit status code 0.) /// Note that this is not really an error. diff --git a/quickwit/quickwit-actors/src/scheduler.rs b/quickwit/quickwit-actors/src/scheduler.rs index 723faddf5b2..527973e249c 100644 --- a/quickwit/quickwit-actors/src/scheduler.rs +++ b/quickwit/quickwit-actors/src/scheduler.rs @@ -323,10 +323,9 @@ impl Scheduler { /// Updates the simulated time shift, if appropriate. /// /// We advance time if: - /// - someone is actually requesting for a simulated fast forward in time. - /// (if Universe::simulate_time_shift(..) has been called). - /// - no message is queued for processing, no initialize or no finalize - /// is being processed. + /// - someone is actually requesting for a simulated fast forward in time. (if + /// Universe::simulate_time_shift(..) has been called). + /// - no message is queued for processing, no initialize or no finalize is being processed. fn advance_time_if_necessary(&mut self) { let Some(scheduler_client) = self.scheduler_client() else { return; diff --git a/quickwit/quickwit-cli/tests/helpers.rs b/quickwit/quickwit-cli/tests/helpers.rs index 67839f7368b..0d5987a3f88 100644 --- a/quickwit/quickwit-cli/tests/helpers.rs +++ b/quickwit/quickwit-cli/tests/helpers.rs @@ -114,7 +114,6 @@ pub struct TestResourceFiles { pub index_config_without_uri: Uri, pub index_config_with_retention: Uri, pub log_docs: Uri, - pub wikipedia_docs: Uri, } /// A struct to hold few info about the test environment. @@ -130,7 +129,6 @@ pub struct TestEnv { /// The metastore URI. pub metastore_uri: Uri, pub metastore_resolver: MetastoreResolver, - pub metastore: MetastoreServiceClient, pub cluster_endpoint: Url, @@ -219,7 +217,6 @@ pub async fn create_test_env( let storage_resolver = StorageResolver::unconfigured(); let storage = storage_resolver.resolve(&metastore_uri).await?; let metastore_resolver = MetastoreResolver::unconfigured(); - let metastore = metastore_resolver.resolve(&metastore_uri).await?; let index_uri = metastore_uri.join(&index_id).unwrap(); let index_config_path = resources_dir_path.join("index_config.yaml"); fs::write( @@ -258,7 +255,7 @@ pub async fn create_test_env( let log_docs_path = resources_dir_path.join("logs.json"); fs::write(&log_docs_path, LOGS_JSON_DOCS)?; let wikipedia_docs_path = resources_dir_path.join("wikis.json"); - fs::write(&wikipedia_docs_path, WIKI_JSON_DOCS)?; + fs::write(wikipedia_docs_path, WIKI_JSON_DOCS)?; let cluster_endpoint = Url::parse(&format!("http://localhost:{rest_listen_port}")) .context("failed to parse cluster endpoint")?; @@ -269,7 +266,6 @@ pub async fn create_test_env( index_config_without_uri: uri_from_path(&index_config_without_uri_path), index_config_with_retention: uri_from_path(&index_config_with_retention_path), log_docs: uri_from_path(&log_docs_path), - wikipedia_docs: uri_from_path(&wikipedia_docs_path), }; Ok(TestEnv { @@ -279,7 +275,6 @@ pub async fn create_test_env( resource_files, metastore_uri, metastore_resolver, - metastore, cluster_endpoint, index_id, index_uri, diff --git a/quickwit/quickwit-cluster/src/cluster.rs b/quickwit/quickwit-cluster/src/cluster.rs index b34fe47ab29..9305d6485d7 100644 --- a/quickwit/quickwit-cluster/src/cluster.rs +++ b/quickwit/quickwit-cluster/src/cluster.rs @@ -416,6 +416,7 @@ impl Cluster { /// Tasks are grouped by (index_id, source_id), each group is stored in a key as follows: /// - key: `{INDEXING_TASK_PREFIX}{index_id}{INDEXING_TASK_SEPARATOR}{source_id}` /// - value: Number of indexing tasks in the group. + /// /// Keys present in chitchat state but not in the given `indexing_tasks` are marked for /// deletion. pub async fn update_self_node_indexing_tasks(&self, indexing_tasks: &[IndexingTask]) { diff --git a/quickwit/quickwit-codegen/example/Cargo.toml b/quickwit/quickwit-codegen/example/Cargo.toml index 06c7f138c87..38b03f51f6a 100644 --- a/quickwit/quickwit-codegen/example/Cargo.toml +++ b/quickwit/quickwit-codegen/example/Cargo.toml @@ -36,3 +36,6 @@ quickwit-actors = { workspace = true, features = ["testsuite"] } [build-dependencies] quickwit-codegen = { workspace = true } + +[features] +testsuite = [] diff --git a/quickwit/quickwit-common/build.rs b/quickwit/quickwit-common/build.rs new file mode 100644 index 00000000000..f3cc5be67be --- /dev/null +++ b/quickwit/quickwit-common/build.rs @@ -0,0 +1,22 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +fn main() { + println!("cargo::rustc-check-cfg=cfg(tokio_unstable)"); +} diff --git a/quickwit/quickwit-common/src/pubsub.rs b/quickwit/quickwit-common/src/pubsub.rs index 70683167169..57ad7e1cce3 100644 --- a/quickwit/quickwit-common/src/pubsub.rs +++ b/quickwit/quickwit-common/src/pubsub.rs @@ -54,6 +54,7 @@ type EventSubscriptions = HashMap>; /// The event broker makes it possible to /// - emit specific local events /// - subscribe to these local events +/// /// The event broker is not distributed in itself. Only events emitted /// locally will be received by the subscribers. /// diff --git a/quickwit/quickwit-common/src/thread_pool.rs b/quickwit/quickwit-common/src/thread_pool.rs index 229007e20d4..52cd622ff51 100644 --- a/quickwit/quickwit-common/src/thread_pool.rs +++ b/quickwit/quickwit-common/src/thread_pool.rs @@ -69,12 +69,11 @@ impl ThreadPool { /// /// Here are two important differences however: /// - /// 1) The task runs on a rayon thread pool managed by Quickwit. - /// This pool is specifically used only to run CPU-intensive work - /// and is configured to contain `num_cpus` cores. + /// 1) The task runs on a rayon thread pool managed by Quickwit. This pool is specifically used + /// only to run CPU-intensive work and is configured to contain `num_cpus` cores. /// - /// 2) Before the task is effectively scheduled, we check that - /// the spawner is still interested in its result. + /// 2) Before the task is effectively scheduled, we check that the spawner is still interested + /// in its result. /// /// It is therefore required to `await` the result of this /// function to get any work done. diff --git a/quickwit/quickwit-config/src/source_config/serialize.rs b/quickwit/quickwit-config/src/source_config/serialize.rs index a99f9371eed..c747387b0f2 100644 --- a/quickwit/quickwit-config/src/source_config/serialize.rs +++ b/quickwit/quickwit-config/src/source_config/serialize.rs @@ -68,8 +68,8 @@ impl SourceConfigForSerialization { /// Checks the validity of the `SourceConfig` as a "deserializable source". /// /// Two remarks: - /// - This does not check connectivity, it just validate configuration, - /// without performing any IO. See `check_connectivity(..)`. + /// - This does not check connectivity, it just validate configuration, without performing any + /// IO. See `check_connectivity(..)`. /// - This is used each time the `SourceConfig` is deserialized (at creation but also during /// communications with the metastore). When ingesting from stdin, we programmatically create /// an invalid `SourceConfig` and only use it locally. diff --git a/quickwit/quickwit-config/src/storage_config.rs b/quickwit/quickwit-config/src/storage_config.rs index a2485a9cf79..651271d0c61 100644 --- a/quickwit/quickwit-config/src/storage_config.rs +++ b/quickwit/quickwit-config/src/storage_config.rs @@ -18,6 +18,7 @@ // along with this program. If not, see . use std::ops::Deref; +use std::sync::OnceLock; use std::{env, fmt}; use anyhow::ensure; @@ -370,11 +371,14 @@ impl S3StorageConfig { } pub fn force_path_style_access(&self) -> Option { - let force_path_style_access = get_bool_from_env( - "QW_S3_FORCE_PATH_STYLE_ACCESS", - self.force_path_style_access, - ); - Some(force_path_style_access) + static FORCE_PATH_STYLE: OnceLock> = OnceLock::new(); + *FORCE_PATH_STYLE.get_or_init(|| { + let force_path_style_access = get_bool_from_env( + "QW_S3_FORCE_PATH_STYLE_ACCESS", + self.force_path_style_access, + ); + Some(force_path_style_access) + }) } } diff --git a/quickwit/quickwit-config/src/templating.rs b/quickwit/quickwit-config/src/templating.rs index 86ad5503365..827d06608f0 100644 --- a/quickwit/quickwit-config/src/templating.rs +++ b/quickwit/quickwit-config/src/templating.rs @@ -30,7 +30,7 @@ use tracing::debug; // `ENV_VAR` or `ENV_VAR:DEFAULT` // Ignores whitespaces in curly braces static TEMPLATE_ENV_VAR_CAPTURE: Lazy = Lazy::new(|| { - Regex::new(r"\$\{\s*([A-Za-z0-9_]+)\s*(?::\-\s*([\S]+)\s*)?}") + Regex::new(r"\$\{\s*([A-Za-z0-9_]+)\s*(?::\-\s*([^\s\}]+)\s*)?}") .expect("regular expression should compile") }); @@ -158,6 +158,23 @@ mod test { assert_eq!(rendered, "metastore_uri: s3://test-bucket/metastore"); } + #[test] + fn test_template_render_with_multiple_vars_per_line() { + let config_content = + b"metastore_uri: s3://${RENDER_MULTIPLE_BUCKET}/${RENDER_MULTIPLE_PREFIX:-index}#polling_interval=${RENDER_MULTIPLE_INTERVAL}s"; + env::set_var("RENDER_MULTIPLE_BUCKET", "test-bucket"); + env::set_var("RENDER_MULTIPLE_PREFIX", "metastore"); + env::set_var("RENDER_MULTIPLE_INTERVAL", "30"); + let rendered = render_config(config_content).unwrap(); + std::env::remove_var("RENDER_MULTIPLE_BUCKET"); + std::env::remove_var("RENDER_MULTIPLE_PREFIX"); + std::env::remove_var("RENDER_MULTIPLE_INTERVAL"); + assert_eq!( + rendered, + "metastore_uri: s3://test-bucket/metastore#polling_interval=30s" + ); + } + #[test] fn test_template_render_ignores_commented_lines() { { diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs index 35a14d7f316..a9bb01d37e4 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs @@ -633,14 +633,13 @@ fn inflate_node_capacities_if_necessary(problem: &mut SchedulingProblem) { /// to transform scheduling into a math problem. /// /// This function implementation therefore goes -/// - 1) transform our problem into a scheduling problem. Something closer to a well-defined -/// optimization problem. In particular this step removes: -/// - the notion of shard ids, and only considers a number of shards being allocated. -/// - node_ids and shard ids. These are replaced by integers. -/// - 2) convert the current situation of the cluster into something a previous scheduling -/// solution. -/// - 3) compute the new scheduling solution. -/// - 4) convert the new scheduling solution back to the real world by reallocating the shard ids. +/// 1) transform our problem into a scheduling problem. Something closer to a well-defined +/// optimization problem. In particular this step removes: +/// - the notion of shard ids, and only considers a number of shards being allocated. +/// - node_ids and shard ids. These are replaced by integers. +/// 2) convert the current situation of the cluster into something a previous scheduling solution. +/// 3) compute the new scheduling solution. +/// 4) convert the new scheduling solution back to the real world by reallocating the shard ids. /// /// TODO cut into pipelines. /// Panics if any sources has no shards. diff --git a/quickwit/quickwit-directories/src/debug_proxy_directory.rs b/quickwit/quickwit-directories/src/debug_proxy_directory.rs index ff5fe4e3bc4..f080f8aafd3 100644 --- a/quickwit/quickwit-directories/src/debug_proxy_directory.rs +++ b/quickwit/quickwit-directories/src/debug_proxy_directory.rs @@ -111,8 +111,8 @@ impl ReadOperationBuilder { /// recording all of its read operations. /// /// It has two purpose -/// - It is used when building our hotcache, to identify the file sections that -/// should be in the hotcache. +/// - It is used when building our hotcache, to identify the file sections that should be in the +/// hotcache. /// - It is used in the search-api to provide debugging/performance information. #[derive(Debug)] pub struct DebugProxyDirectory { diff --git a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mapping_tree.rs b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mapping_tree.rs index 7c82ae0ff95..28851fd37c3 100644 --- a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mapping_tree.rs +++ b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mapping_tree.rs @@ -1735,7 +1735,7 @@ mod tests { #[test] fn test_parse_i64_too_large() { let leaf = LeafType::I64(QuickwitNumericOptions::default()); - let err = leaf.value_from_json(json!(u64::max_value())).err().unwrap(); + let err = leaf.value_from_json(json!(u64::MAX)).err().unwrap(); assert_eq!( err, "expected i64, got inconvertible JSON number `18446744073709551615`" diff --git a/quickwit/quickwit-indexing/Cargo.toml b/quickwit/quickwit-indexing/Cargo.toml index 7b005301ea9..381bb1151e4 100644 --- a/quickwit/quickwit-indexing/Cargo.toml +++ b/quickwit/quickwit-indexing/Cargo.toml @@ -101,6 +101,7 @@ testsuite = [ "quickwit-storage/testsuite" ] vrl = ["dep:vrl", "quickwit-config/vrl"] +ci-test = [] [dev-dependencies] bytes = { workspace = true } diff --git a/quickwit/quickwit-indexing/src/actors/cooperative_indexing.rs b/quickwit/quickwit-indexing/src/actors/cooperative_indexing.rs index b036b66f3e7..09e86ae2ebc 100644 --- a/quickwit/quickwit-indexing/src/actors/cooperative_indexing.rs +++ b/quickwit/quickwit-indexing/src/actors/cooperative_indexing.rs @@ -36,18 +36,17 @@ static ORIGIN_OF_TIME: Lazy = Lazy::new(Instant::now); /// Cooperative indexing is a mechanism to deal with a large amount of pipelines. /// /// Instead of having all pipelines index concurrently, cooperative indexing: -/// - have them take turn, making sure that at most only N pipelines are indexing -/// at the same time. This has the benefit is reducing RAM using (by having a limited number -/// of `IndexWriter` at the same time), reducing context switching. -/// - keeps the different pipelines work uniformously spread in time. If the system is not -/// at capacity, we prefer to have the indexing pipeline as desynchronized as possible -/// to make sure they don't all use the same resources (disk/cpu/network) at the -/// same time. +/// - have them take turn, making sure that at most only N pipelines are indexing at the same time. +/// This has the benefit is reducing RAM using (by having a limited number of `IndexWriter` at the +/// same time), reducing context switching. +/// - keeps the different pipelines work uniformously spread in time. If the system is not at +/// capacity, we prefer to have the indexing pipeline as desynchronized as possible to make sure +/// they don't all use the same resources (disk/cpu/network) at the same time. /// /// It works by: /// - a semaphore is used to restrict the number of pipelines indexing at the same time. -/// - in the indexer when `on_drain` is called, the indexer will cut a split and -/// "go to sleep" for a given amount of time. +/// - in the indexer when `on_drain` is called, the indexer will cut a split and "go to sleep" for a +/// given amount of time. /// /// The key logic is in the computation of that sleep time. /// diff --git a/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs b/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs index 91f29e16927..597b9839f5c 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs @@ -114,9 +114,9 @@ impl Ord for ScheduledMerge { /// This actor is not supervised and should stay as simple as possible. /// In particular, /// - the `ScheduleMerge` handler should reply in microseconds. -/// - the task should never be dropped before reaching its `split_downloader_mailbox` destination -/// as it would break the consistency of `MergePlanner` with the metastore (ie: several splits will -/// never be merged). +/// - the task should never be dropped before reaching its `split_downloader_mailbox` destination as +/// it would break the consistency of `MergePlanner` with the metastore (ie: several splits will +/// never be merged). pub struct MergeSchedulerService { merge_semaphore: Arc, merge_concurrency: usize, diff --git a/quickwit/quickwit-indexing/src/merge_policy/stable_log_merge_policy.rs b/quickwit/quickwit-indexing/src/merge_policy/stable_log_merge_policy.rs index e971abb71d4..c3858f55dca 100644 --- a/quickwit/quickwit-indexing/src/merge_policy/stable_log_merge_policy.rs +++ b/quickwit/quickwit-indexing/src/merge_policy/stable_log_merge_policy.rs @@ -40,7 +40,7 @@ use crate::merge_policy::{splits_short_debug, MergeOperation, MergePolicy}; /// /// The policy first builds the merge operations /// -/// 1. Build merge operations +/// ### Build merge operations /// We start by sorting the splits by reverse date so that the most recent splits are /// coming first. /// We iterate through the splits and assign them to increasing levels. @@ -157,8 +157,8 @@ enum MergeCandidateSize { /// We should not add an extra split in this candidate. /// This can happen for any of the two following reasons: /// - the number of splits involved already reached `merge_factor_max`. - /// - the overall number of docs that will end up in the merged segment already - /// exceeds `max_merge_docs`. + /// - the overall number of docs that will end up in the merged segment already exceeds + /// `max_merge_docs`. OneMoreSplitWouldBeTooBig, } diff --git a/quickwit/quickwit-indexing/src/metrics.rs b/quickwit/quickwit-indexing/src/metrics.rs index e0d0c8cc236..8e34a0e0f5c 100644 --- a/quickwit/quickwit-indexing/src/metrics.rs +++ b/quickwit/quickwit-indexing/src/metrics.rs @@ -33,6 +33,7 @@ pub struct IndexerMetrics { pub pending_merge_operations: IntGauge, pub pending_merge_bytes: IntGauge, // We use a lazy counter, as most users do not use Kafka. + #[cfg_attr(not(feature = "kafka"), allow(dead_code))] pub kafka_rebalance_total: Lazy, } diff --git a/quickwit/quickwit-indexing/src/source/kafka_source.rs b/quickwit/quickwit-indexing/src/source/kafka_source.rs index c5c055fc10a..55ff8ceb1b6 100644 --- a/quickwit/quickwit-indexing/src/source/kafka_source.rs +++ b/quickwit/quickwit-indexing/src/source/kafka_source.rs @@ -125,7 +125,7 @@ macro_rules! return_if_err { /// The rebalance protocol at a very high level: /// - A consumer joins or leaves a consumer group. /// - Consumers receive a revoke partitions notification, which gives them the opportunity to commit -/// the work in progress. +/// the work in progress. /// - Broker waits for ALL the consumers to ack the revoke notification (synchronization barrier). /// - Consumers receive new partition assignmennts. /// diff --git a/quickwit/quickwit-indexing/src/source/kinesis/api.rs b/quickwit/quickwit-indexing/src/source/kinesis/api.rs index 6357997f8d4..affe11396ac 100644 --- a/quickwit/quickwit-indexing/src/source/kinesis/api.rs +++ b/quickwit/quickwit-indexing/src/source/kinesis/api.rs @@ -119,7 +119,7 @@ pub(crate) async fn list_shards( } } -#[cfg(test)] +#[cfg(all(test, feature = "kinesis-localstack-tests"))] pub(crate) mod tests { use std::collections::BTreeSet; use std::time::Duration; diff --git a/quickwit/quickwit-indexing/src/source/kinesis/helpers.rs b/quickwit/quickwit-indexing/src/source/kinesis/helpers.rs index 18ee020b944..079a6f3be8f 100644 --- a/quickwit/quickwit-indexing/src/source/kinesis/helpers.rs +++ b/quickwit/quickwit-indexing/src/source/kinesis/helpers.rs @@ -50,7 +50,7 @@ pub async fn get_kinesis_client(region_or_endpoint: RegionOrEndpoint) -> anyhow: Ok(Client::from_conf(kinesis_config.build())) } -#[cfg(test)] +#[cfg(all(test, feature = "kinesis-localstack-tests"))] pub(crate) mod tests { use std::collections::HashMap; use std::time::Duration; diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs index 97c41f80179..9d06992c802 100644 --- a/quickwit/quickwit-indexing/src/source/mod.rs +++ b/quickwit/quickwit-indexing/src/source/mod.rs @@ -627,7 +627,10 @@ mod tests { } } - #[cfg(any(feature = "kafka", feature = "sqs"))] + #[cfg(all( + test, + any(feature = "kafka-broker-tests", feature = "sqs-localstack-tests") + ))] pub fn with_metastore(mut self, metastore: MetastoreServiceClient) -> Self { self.metastore_opt = Some(metastore); self diff --git a/quickwit/quickwit-ingest/src/ingest_v2/publish_tracker.rs b/quickwit/quickwit-ingest/src/ingest_v2/publish_tracker.rs index 64f9c907722..eb8f7ef8c32 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/publish_tracker.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/publish_tracker.rs @@ -33,8 +33,8 @@ use tracing::error; /// events to assert when all the persisted events have been published. To /// ensure that no events are missed: /// - create the tracker before any persist requests is sent -/// - call `register_requested_shards` before each persist request to ensure that -/// the associated publish events are recorded +/// - call `register_requested_shards` before each persist request to ensure that the associated +/// publish events are recorded /// - call `track_persisted_shard_position` after each successful persist subrequests pub struct PublishTracker { state: Arc>, diff --git a/quickwit/quickwit-ingest/src/metrics.rs b/quickwit/quickwit-ingest/src/metrics.rs index 0f0f9a8f3e5..f611e9a8611 100644 --- a/quickwit/quickwit-ingest/src/metrics.rs +++ b/quickwit/quickwit-ingest/src/metrics.rs @@ -28,6 +28,7 @@ pub struct IngestMetrics { pub replicated_num_bytes_total: IntCounter, pub replicated_num_docs_total: IntCounter, + #[allow(dead_code)] // this really shouldn't be dead, it needs to be used somewhere pub queue_count: IntGauge, } diff --git a/quickwit/quickwit-lambda/Cargo.toml b/quickwit/quickwit-lambda/Cargo.toml index 621ade40703..e94e3baf107 100644 --- a/quickwit/quickwit-lambda/Cargo.toml +++ b/quickwit/quickwit-lambda/Cargo.toml @@ -24,6 +24,7 @@ s3-localstack-tests = [] [dependencies] anyhow = { workspace = true } aws_lambda_events = "0.15.0" +bytesize = { workspace = true } chitchat = { workspace = true } chrono = { workspace = true } flate2 = { workspace = true } @@ -65,3 +66,6 @@ quickwit-search = { workspace = true } quickwit-serve = { workspace = true } quickwit-storage = { workspace = true } quickwit-telemetry = { workspace = true } + +[dev-dependencies] +serial_test = { workspace = true } diff --git a/quickwit/quickwit-lambda/src/indexer/environment.rs b/quickwit/quickwit-lambda/src/indexer/environment.rs index 606d5c6ae5f..faa10ff4a17 100644 --- a/quickwit/quickwit-lambda/src/indexer/environment.rs +++ b/quickwit/quickwit-lambda/src/indexer/environment.rs @@ -48,3 +48,47 @@ pub static MAX_CHECKPOINTS: Lazy = Lazy::new(|| { .expect("QW_LAMBDA_MAX_CHECKPOINTS must be a positive integer") }) }); + +#[cfg(test)] +mod tests { + + use quickwit_config::{ConfigFormat, NodeConfig}; + + use super::*; + + #[tokio::test] + #[serial_test::file_serial(with_env)] + async fn test_load_config() { + let bucket = "mock-test-bucket"; + std::env::set_var("QW_LAMBDA_METASTORE_BUCKET", bucket); + std::env::set_var("QW_LAMBDA_INDEX_BUCKET", bucket); + std::env::set_var( + "QW_LAMBDA_INDEX_CONFIG_URI", + "s3://mock-index-config-bucket", + ); + std::env::set_var("QW_LAMBDA_INDEX_ID", "lambda-test"); + + let node_config = NodeConfig::load(ConfigFormat::Yaml, CONFIGURATION_TEMPLATE.as_bytes()) + .await + .unwrap(); + // + assert_eq!( + node_config.data_dir_path.to_string_lossy(), + "/tmp", + "only `/tmp` is writeable in AWS Lambda" + ); + assert_eq!( + node_config.default_index_root_uri, + "s3://mock-test-bucket/index" + ); + assert_eq!( + node_config.metastore_uri.to_string(), + "s3://mock-test-bucket/index" + ); + + std::env::remove_var("QW_LAMBDA_METASTORE_BUCKET"); + std::env::remove_var("QW_LAMBDA_INDEX_BUCKET"); + std::env::remove_var("QW_LAMBDA_INDEX_CONFIG_URI"); + std::env::remove_var("QW_LAMBDA_INDEX_ID"); + } +} diff --git a/quickwit/quickwit-lambda/src/indexer/ingest/mod.rs b/quickwit/quickwit-lambda/src/indexer/ingest/mod.rs index f380c78eee6..c68b82713b7 100644 --- a/quickwit/quickwit-lambda/src/indexer/ingest/mod.rs +++ b/quickwit/quickwit-lambda/src/indexer/ingest/mod.rs @@ -150,6 +150,7 @@ mod tests { } #[tokio::test] + #[serial_test::file_serial(with_env)] async fn test_ingest() -> anyhow::Result<()> { quickwit_common::setup_logging_for_tests(); let bucket = "quickwit-integration-tests"; @@ -246,6 +247,13 @@ mod tests { assert_eq!(stats.num_docs, 1); } + std::env::remove_var("QW_LAMBDA_METASTORE_BUCKET"); + std::env::remove_var("QW_LAMBDA_INDEX_BUCKET"); + std::env::remove_var("QW_LAMBDA_METASTORE_PREFIX"); + std::env::remove_var("QW_LAMBDA_INDEX_PREFIX"); + std::env::remove_var("QW_LAMBDA_INDEX_CONFIG_URI"); + std::env::remove_var("QW_LAMBDA_INDEX_ID"); + Ok(()) } } diff --git a/quickwit/quickwit-lambda/src/searcher/environment.rs b/quickwit/quickwit-lambda/src/searcher/environment.rs index 027810dad23..1ead2e672ca 100644 --- a/quickwit/quickwit-lambda/src/searcher/environment.rs +++ b/quickwit/quickwit-lambda/src/searcher/environment.rs @@ -26,3 +26,51 @@ data_dir: /tmp searcher: partial_request_cache_capacity: ${QW_LAMBDA_PARTIAL_REQUEST_CACHE_CAPACITY:-64M} "#; + +#[cfg(test)] +mod tests { + + use bytesize::ByteSize; + use quickwit_config::{ConfigFormat, NodeConfig}; + + use super::*; + + #[tokio::test] + #[serial_test::file_serial(with_env)] + async fn test_load_config() { + let bucket = "mock-test-bucket"; + std::env::set_var("QW_LAMBDA_METASTORE_BUCKET", bucket); + std::env::set_var("QW_LAMBDA_INDEX_BUCKET", bucket); + std::env::set_var( + "QW_LAMBDA_INDEX_CONFIG_URI", + "s3://mock-index-config-bucket", + ); + std::env::set_var("QW_LAMBDA_INDEX_ID", "lambda-test"); + + let node_config = NodeConfig::load(ConfigFormat::Yaml, CONFIGURATION_TEMPLATE.as_bytes()) + .await + .unwrap(); + assert_eq!( + node_config.data_dir_path.to_string_lossy(), + "/tmp", + "only `/tmp` is writeable in AWS Lambda" + ); + assert_eq!( + node_config.default_index_root_uri, + "s3://mock-test-bucket/index" + ); + assert_eq!( + node_config.metastore_uri.to_string(), + "s3://mock-test-bucket/index#polling_interval=60s" + ); + assert_eq!( + node_config.searcher_config.partial_request_cache_capacity, + ByteSize::mb(64) + ); + + std::env::remove_var("QW_LAMBDA_METASTORE_BUCKET"); + std::env::remove_var("QW_LAMBDA_INDEX_BUCKET"); + std::env::remove_var("QW_LAMBDA_INDEX_CONFIG_URI"); + std::env::remove_var("QW_LAMBDA_INDEX_ID"); + } +} diff --git a/quickwit/quickwit-metastore/Cargo.toml b/quickwit/quickwit-metastore/Cargo.toml index 68b7eb76f1a..a2ff2470cbc 100644 --- a/quickwit/quickwit-metastore/Cargo.toml +++ b/quickwit/quickwit-metastore/Cargo.toml @@ -52,7 +52,7 @@ futures = { workspace = true } md5 = { workspace = true } mockall = { workspace = true } rand = { workspace = true } -serial_test = { version = "3.1.1", features = ["file_locks"] } +serial_test = { workspace = true } tempfile = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/quickwit/quickwit-metastore/src/backward_compatibility_tests/mod.rs b/quickwit/quickwit-metastore/src/backward_compatibility_tests/mod.rs index 53ad17346ee..6c14d0e359c 100644 --- a/quickwit/quickwit-metastore/src/backward_compatibility_tests/mod.rs +++ b/quickwit/quickwit-metastore/src/backward_compatibility_tests/mod.rs @@ -192,8 +192,8 @@ where for<'a> T: Serialize { /// for JSON deserialization regression tests and runs them sequentially. /// /// - `test_name` is just the subdirectory name, for the type being test. -/// - `test` is a function asserting the equality of the deserialized version -/// and the expected version. +/// - `test` is a function asserting the equality of the deserialized version and the expected +/// version. pub(crate) fn test_json_backward_compatibility_helper(test_name: &str) -> anyhow::Result<()> where T: TestableForRegression + std::fmt::Debug { let sample_instance: T = T::sample_for_regression(); diff --git a/quickwit/quickwit-metastore/src/lib.rs b/quickwit/quickwit-metastore/src/lib.rs index 6d17ae0ebcf..78a9bae981c 100644 --- a/quickwit/quickwit-metastore/src/lib.rs +++ b/quickwit/quickwit-metastore/src/lib.rs @@ -26,7 +26,7 @@ //! metastore: //! - file-backed metastore //! - PostgreSQL metastore -//! etc. +//! - etc. #[allow(missing_docs)] pub mod checkpoint; diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index b08d8b32016..50ad8aec3c1 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -33,7 +33,7 @@ service SearchService { // // It is like a regular search except that: // - the node should perform the search locally instead of dispatching - // it to other nodes. + // it to other nodes. // - it should be applied on the given subset of splits // - Hit content is not fetched, and we instead return so called `PartialHit`. rpc LeafSearch(LeafSearchRequest) returns (LeafSearchResponse); @@ -56,7 +56,7 @@ service SearchService { // // It is like a regular list term except that: // - the node should perform the listing locally instead of dispatching - // it to other nodes. + // it to other nodes. // - it should be applied on the given subset of splits rpc LeafListTerms(LeafListTermsRequest) returns (LeafListTermsResponse); @@ -319,8 +319,9 @@ message SplitSearchError { /// A LeafSearchRequest can span multiple indices. /// message LeafSearchRequest { - // Search request. This is a perfect copy of the original search request, - // that was sent to root apart from the start_offset & max_hits params. + // Search request. This is a perfect copy of the original search request + // that was sent to root apart from the start_offset, max_hits params and index_id_patterns. + // index_id_patterns contains the actual index ids queried on that leaf. SearchRequest search_request = 1; // List of leaf requests, one per index. @@ -372,9 +373,9 @@ message SplitIdAndFooterOffsets { // For instance: // - it may contain a _source and a _dynamic field. // - since tantivy has no notion of cardinality, -// all fields is are arrays. +// all fields are arrays. // - since tantivy has no notion of object, the object is -// flattened by concatenating the path to the root. +// flattened by concatenating the path to the root. // // See `quickwit_search::convert_leaf_hit` message LeafHit { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index a2c90de2ae0..ed0219d0a7f 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -258,8 +258,9 @@ pub struct SplitSearchError { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct LeafSearchRequest { - /// Search request. This is a perfect copy of the original search request, - /// that was sent to root apart from the start_offset & max_hits params. + /// Search request. This is a perfect copy of the original search request + /// that was sent to root apart from the start_offset, max_hits params and index_id_patterns. + /// index_id_patterns contains the actual index ids queried on that leaf. #[prost(message, optional, tag = "1")] pub search_request: ::core::option::Option, /// List of leaf requests, one per index. @@ -322,9 +323,9 @@ pub struct SplitIdAndFooterOffsets { /// For instance: /// - it may contain a _source and a _dynamic field. /// - since tantivy has no notion of cardinality, -/// all fields is are arrays. +/// all fields are arrays. /// - since tantivy has no notion of object, the object is -/// flattened by concatenating the path to the root. +/// flattened by concatenating the path to the root. /// /// See `quickwit_search::convert_leaf_hit` #[derive(Serialize, Deserialize, utoipa::ToSchema)] @@ -910,7 +911,7 @@ pub mod search_service_client { /// /// It is like a regular search except that: /// - the node should perform the search locally instead of dispatching - /// it to other nodes. + /// it to other nodes. /// - it should be applied on the given subset of splits /// - Hit content is not fetched, and we instead return so called `PartialHit`. pub async fn leaf_search( @@ -1029,7 +1030,7 @@ pub mod search_service_client { /// /// It is like a regular list term except that: /// - the node should perform the listing locally instead of dispatching - /// it to other nodes. + /// it to other nodes. /// - it should be applied on the given subset of splits pub async fn leaf_list_terms( &mut self, @@ -1256,7 +1257,7 @@ pub mod search_service_server { /// /// It is like a regular search except that: /// - the node should perform the search locally instead of dispatching - /// it to other nodes. + /// it to other nodes. /// - it should be applied on the given subset of splits /// - Hit content is not fetched, and we instead return so called `PartialHit`. async fn leaf_search( @@ -1308,7 +1309,7 @@ pub mod search_service_server { /// /// It is like a regular list term except that: /// - the node should perform the listing locally instead of dispatching - /// it to other nodes. + /// it to other nodes. /// - it should be applied on the given subset of splits async fn leaf_list_terms( &self, diff --git a/quickwit/quickwit-proto/src/types/doc_mapping_uid.rs b/quickwit/quickwit-proto/src/types/doc_mapping_uid.rs index 7f2690067cc..fc45358922c 100644 --- a/quickwit/quickwit-proto/src/types/doc_mapping_uid.rs +++ b/quickwit/quickwit-proto/src/types/doc_mapping_uid.rs @@ -161,7 +161,7 @@ impl sqlx::Type for DocMappingUid { #[cfg(feature = "postgres")] impl sqlx::Encode<'_, sqlx::Postgres> for DocMappingUid { fn encode_by_ref(&self, buf: &mut sqlx::postgres::PgArgumentBuffer) -> sqlx::encode::IsNull { - sqlx::Encode::::encode(&self.0.to_string(), buf) + sqlx::Encode::::encode(self.0.to_string(), buf) } } diff --git a/quickwit/quickwit-proto/src/types/index_uid.rs b/quickwit/quickwit-proto/src/types/index_uid.rs index 2a5d5f098d3..9cee0796625 100644 --- a/quickwit/quickwit-proto/src/types/index_uid.rs +++ b/quickwit/quickwit-proto/src/types/index_uid.rs @@ -199,7 +199,7 @@ impl sqlx::Encode<'_, sqlx::Postgres> for IndexUid { fn encode_by_ref(&self, buf: &mut sqlx::postgres::PgArgumentBuffer) -> sqlx::encode::IsNull { let _ = sqlx::Encode::::encode(&self.index_id, buf); let _ = sqlx::Encode::::encode(":", buf); - sqlx::Encode::::encode(&self.incarnation_id.to_string(), buf) + sqlx::Encode::::encode(self.incarnation_id.to_string(), buf) } } diff --git a/quickwit/quickwit-query/src/tokenizers/multilang.rs b/quickwit/quickwit-query/src/tokenizers/multilang.rs index 72580e47c6a..cf6d4d87373 100644 --- a/quickwit/quickwit-query/src/tokenizers/multilang.rs +++ b/quickwit/quickwit-query/src/tokenizers/multilang.rs @@ -66,8 +66,9 @@ static KOR_TOKENIZER: Lazy = Lazy::new(|| { /// and uses the appropriate tokenizer for the detected language: /// - lindera for Chinese, Japanese, and Korean. /// - Quickwit's default tokenizer for other languages. +/// /// It is possible to bypass the language detection by prefixing the text with the language code -/// followed by a colon. For example, `KOR:일본입니다` will be tokenized by the english tokenizer. +/// followed by a colon. For example, `KOR:일본입니다` will be tokenized by the korean tokenizer. /// Current supported prefix are: /// - `KOR:` for Korean tokenizer /// - `JPN:` for Japanese tokenizer diff --git a/quickwit/quickwit-search/Cargo.toml b/quickwit/quickwit-search/Cargo.toml index a1ced46e1fd..fa815f47b56 100644 --- a/quickwit/quickwit-search/Cargo.toml +++ b/quickwit/quickwit-search/Cargo.toml @@ -60,3 +60,4 @@ quickwit-storage = { workspace = true, features = ["testsuite"] } [features] testsuite = [] +ci-test = [] diff --git a/quickwit/quickwit-search/src/collector.rs b/quickwit/quickwit-search/src/collector.rs index 9b775e1aa88..74c2cb8b331 100644 --- a/quickwit/quickwit-search/src/collector.rs +++ b/quickwit/quickwit-search/src/collector.rs @@ -343,7 +343,7 @@ impl SortingFieldExtractorComponent { return None; } // clamp value for comparison - val = val.min(1).max(0); + val = val.clamp(0, 1); (val == 1).to_u64() } (SortValue::I64(mut val), SortFieldType::Bool) => { @@ -353,7 +353,7 @@ impl SortingFieldExtractorComponent { return None; } // clamp value for comparison - val = val.min(1).max(0); + val = val.clamp(0, 1); (val == 1).to_u64() } (SortValue::F64(mut val), SortFieldType::Bool) => { @@ -362,7 +362,7 @@ impl SortingFieldExtractorComponent { if all_values_ahead1 || all_values_ahead2 { return None; } - val = val.min(1.0).max(0.0); + val = val.clamp(0.0, 1.0); (val >= 0.5).to_u64() // Is this correct? } }; diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 25bc6baf115..f7aa7395172 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -177,6 +177,7 @@ struct RequestMetadata { /// - resolved query ASTs are the same across indexes. /// - if a sort field is of type datetime, it must be a datetime field on all indexes. This /// constraint come from the need to support datetime formatting on sort values. +/// /// Returns the timestamp field, the resolved query AST and the indexes metadatas /// needed for leaf search requests. /// Note: the requirements on timestamp fields and resolved query ASTs can be lifted @@ -1563,6 +1564,7 @@ pub fn jobs_to_leaf_request( let mut search_request_for_leaf = request.clone(); search_request_for_leaf.start_offset = 0; search_request_for_leaf.max_hits += request.start_offset; + search_request_for_leaf.index_id_patterns = Vec::new(); let mut leaf_search_request = LeafSearchRequest { search_request: Some(search_request_for_leaf), @@ -1575,6 +1577,12 @@ pub fn jobs_to_leaf_request( // Group jobs by index uid, as the split offsets are relative to the index. group_jobs_by_index_id(jobs, |job_group| { let index_uid = &job_group[0].index_uid; + leaf_search_request + .search_request + .as_mut() + .unwrap() + .index_id_patterns + .push(index_uid.index_id.to_string()); let search_index_meta = search_indexes_metadatas.get(index_uid).ok_or_else(|| { SearchError::Internal(format!( "received job for an unknown index {index_uid}. it should never happen" diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs b/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs index 1220fcc415c..b979a8d4592 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/bulk_v2.rs @@ -172,9 +172,8 @@ fn make_elastic_bulk_response_v2( &mut per_subrequest_doc_handles, success.subrequest_id, ) - .map_err(|err| { + .inspect_err(|_| { rate_limited_error!(limit_per_min=6, index_id=%index_id, "could not find subrequest id"); - err })?; doc_handles.sort_unstable_by(|left, right| left.doc_uid.cmp(&right.doc_uid)); @@ -236,16 +235,14 @@ fn make_elastic_bulk_response_v2( // Find the doc handles for the subrequest. let doc_handles = - remove_doc_handles(&mut per_subrequest_doc_handles, failure.subrequest_id).map_err( - |err| { + remove_doc_handles(&mut per_subrequest_doc_handles, failure.subrequest_id) + .inspect_err(|_| { rate_limited_error!( limit_per_min = 6, subrequest = failure.subrequest_id, "failed to find error subrequest" ); - err - }, - )?; + })?; // Populate the response items with one error per doc handle. let (exception, reason, status) = match failure.reason() { diff --git a/quickwit/quickwit-serve/src/jaeger_api/parse_duration.rs b/quickwit/quickwit-serve/src/jaeger_api/parse_duration.rs index 876e5876e6d..c1d444d3d7a 100644 --- a/quickwit/quickwit-serve/src/jaeger_api/parse_duration.rs +++ b/quickwit/quickwit-serve/src/jaeger_api/parse_duration.rs @@ -59,7 +59,7 @@ fn parse_duration_nanos(input: &str) -> anyhow::Result { "h" => num * 3600.0 * 1_000_000_000.0, _ => anyhow::bail!("Invalid time unit: {}", unit), }; - if num < std::i64::MIN as f64 || num > std::i64::MAX as f64 { + if num < i64::MIN as f64 || num > i64::MAX as f64 { anyhow::bail!("Invalid duration: {}", num_str) } return Ok(duration.round() as i64); diff --git a/quickwit/quickwit-serve/src/simple_list.rs b/quickwit/quickwit-serve/src/simple_list.rs index a9ad5bfe05d..f184a96a215 100644 --- a/quickwit/quickwit-serve/src/simple_list.rs +++ b/quickwit/quickwit-serve/src/simple_list.rs @@ -17,29 +17,10 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::convert::Infallible; use std::str::FromStr; use serde::{Deserialize, Deserializer, Serializer}; -/// A helper struct to serialize/deserialize a comma separated list. -/// Used by the Elasticsearch and Quickwit REST API. -#[derive(Debug, Deserialize)] -pub struct SimpleList(pub Vec); - -impl FromStr for SimpleList { - type Err = Infallible; - - fn from_str(str_sequence: &str) -> Result { - let items = str_sequence - .trim_matches(',') - .split(',') - .map(|item| item.to_owned()) - .collect::>(); - Ok(Self(items)) - } -} - /// Serializes an `Option<&[Serialize]>` with /// `Some(value)` to a comma separated string of values. /// Used to serialize values within the query string diff --git a/quickwit/quickwit-storage/src/cache/mod.rs b/quickwit/quickwit-storage/src/cache/mod.rs index e4869a846f3..d77ecd51a5c 100644 --- a/quickwit/quickwit-storage/src/cache/mod.rs +++ b/quickwit/quickwit-storage/src/cache/mod.rs @@ -41,9 +41,8 @@ use crate::{OwnedBytes, Storage}; /// /// FIXME The current approach is quite horrible in that: /// - it uses a global -/// - it relies on the idea that all of the files we attempt to cache -/// have universally unique names. It happens to be true today, but this might be very error prone -/// in the future. +/// - it relies on the idea that all of the files we attempt to cache have universally unique names. +/// It happens to be true today, but this might be very error prone in the future. pub fn wrap_storage_with_cache( long_term_cache: Arc, storage: Arc, diff --git a/quickwit/quickwit-storage/src/lib.rs b/quickwit/quickwit-storage/src/lib.rs index 1f8f9f3f156..02a2e2389b6 100644 --- a/quickwit/quickwit-storage/src/lib.rs +++ b/quickwit/quickwit-storage/src/lib.rs @@ -27,9 +27,9 @@ //! - object storages (S3) //! - local filesystem //! - distributed filesystems. -//! etc. +//! - etc. //! -//! - The `BundleStorage` bundles together multiple files into a single file. +//! The `BundleStorage` bundles together multiple files into a single file. mod cache; mod debouncer; mod file_descriptor_cache;