diff --git a/docs/reference/es_compatible_api.md b/docs/reference/es_compatible_api.md index 99b4a08c2aa..f1fffa587a1 100644 --- a/docs/reference/es_compatible_api.md +++ b/docs/reference/es_compatible_api.md @@ -135,6 +135,7 @@ If a parameter appears both as a query string parameter and in the JSON payload, | `size` | `Integer` | Number of hits to return. | 10 | | `sort` | `String` | Describes how documents should be ranked. See [Sort order](#sort-order) | (Optional) | | `scroll` | `Duration` | Creates a scroll context for "time to live". See [Scroll](#_scroll--scroll-api). | (Optional) | +| `allow_partial_search_results` | `Boolean` | Returns a partial response if some (but not all) of the split searches were unsuccessful. | `true` | #### Supported Request Body parameters @@ -301,7 +302,7 @@ GET api/v1/_elastic/_cat/indices Use the [cat indices API](https://www.elastic.co/guide/en/elasticsearch/reference/current/cat-indices.html) to get the following information for each index in a cluster: * Shard count -* Document count +* Document count * Deleted document count * Primary store size * Total store size diff --git a/monitoring/grafana/dashboards/searchers.json b/monitoring/grafana/dashboards/searchers.json index 55a321da7e0..b6f7ade938b 100644 --- a/monitoring/grafana/dashboards/searchers.json +++ b/monitoring/grafana/dashboards/searchers.json @@ -853,6 +853,10 @@ "text": "All", "value": "$__all" }, + "datasource": { + "type": "prometheus", + "uid": "${datasource}" + }, "definition": "label_values(quickwit_search_leaf_searches_splits_total,instance)", "hide": 0, "includeAll": true, @@ -861,9 +865,8 @@ "name": "instance", "options": [], "query": { - "qryType": 1, "query": "label_values(quickwit_search_leaf_searches_splits_total,instance)", - "refId": "PrometheusVariableQueryEditor-VariableQuery" + "refId": "StandardVariableQuery" }, "refresh": 1, "regex": "", @@ -871,6 +874,7 @@ "sort": 0, "type": "query" } + ] }, "time": { diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 233fa27d139..3ce5526bd7a 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5894,7 +5894,6 @@ version = "0.8.0" dependencies = [ "anyhow", "itertools 0.13.0", - "ouroboros", "serde", "serde_json", "tantivy", diff --git a/quickwit/quickwit-cli/src/tool.rs b/quickwit/quickwit-cli/src/tool.rs index c7ab1911205..db22ff1743e 100644 --- a/quickwit/quickwit-cli/src/tool.rs +++ b/quickwit/quickwit-cli/src/tool.rs @@ -550,6 +550,7 @@ pub async fn local_search_cli(args: LocalSearchArgs) -> anyhow::Result<()> { format: BodyFormat::Json, sort_by, count_all: CountHits::CountAll, + allow_failed_splits: false, }; let search_request = search_request_from_api_request(vec![args.index_id], search_request_query_string)?; diff --git a/quickwit/quickwit-cluster/src/cluster.rs b/quickwit/quickwit-cluster/src/cluster.rs index 9305d6485d7..00d42985bda 100644 --- a/quickwit/quickwit-cluster/src/cluster.rs +++ b/quickwit/quickwit-cluster/src/cluster.rs @@ -521,7 +521,8 @@ fn indexing_task_to_chitchat_kv(indexing_task: &IndexingTask) -> (String, String let index_uid = indexing_task.index_uid(); let key = format!("{INDEXING_TASK_PREFIX}{}", indexing_task.pipeline_uid()); let shard_ids_str = shard_ids.iter().sorted().join(","); - let value = format!("{index_uid}:{source_id}:{shard_ids_str}"); + let fingerprint = indexing_task.params_fingerprint; + let value = format!("{index_uid}:{source_id}:{fingerprint}:{shard_ids_str}"); (key, value) } @@ -536,8 +537,12 @@ fn parse_shard_ids_str(shard_ids_str: &str) -> Vec { fn chitchat_kv_to_indexing_task(key: &str, value: &str) -> Option { let pipeline_uid_str = key.strip_prefix(INDEXING_TASK_PREFIX)?; let pipeline_uid = PipelineUid::from_str(pipeline_uid_str).ok()?; - let (source_uid, shards_str) = value.rsplit_once(':')?; - let (index_uid, source_id) = source_uid.rsplit_once(':')?; + let mut field_iterator = value.rsplitn(4, ':'); + let shards_str = field_iterator.next()?; + let fingerprint_str = field_iterator.next()?; + let source_id = field_iterator.next()?; + let index_uid = field_iterator.next()?; + let params_fingerprint: u64 = fingerprint_str.parse().ok()?; let index_uid = index_uid.parse().ok()?; let shard_ids = parse_shard_ids_str(shards_str); Some(IndexingTask { @@ -545,7 +550,7 @@ fn chitchat_kv_to_indexing_task(key: &str, value: &str) -> Option source_id: source_id.to_string(), pipeline_uid: Some(pipeline_uid), shard_ids, - params_fingerprint: 0, + params_fingerprint, }) } @@ -1143,11 +1148,11 @@ mod tests { let mut chitchat_guard = chitchat_handle.lock().await; chitchat_guard.self_node_state().set( format!("{INDEXING_TASK_PREFIX}01BX5ZZKBKACTAV9WEVGEMMVS0"), - "my_index:00000000000000000000000000:my_source:1,3".to_string(), + "my_index:00000000000000000000000000:my_source:41:1,3".to_string(), ); chitchat_guard.self_node_state().set( format!("{INDEXING_TASK_PREFIX}01BX5ZZKBKACTAV9WEVGEMMVS1"), - "my_index-00000000000000000000000000-my_source:3,5".to_string(), + "my_index-00000000000000000000000000-my_source:53:3,5".to_string(), ); } node.wait_for_ready_members(|members| members.len() == 1, Duration::from_secs(5)) @@ -1358,14 +1363,15 @@ mod tests { #[test] fn test_parse_chitchat_kv() { assert!( - chitchat_kv_to_indexing_task("invalidulid", "my_index:uid:my_source:1,3").is_none() + chitchat_kv_to_indexing_task("invalidulid", "my_index:uid:my_source:42:1,3").is_none() ); let task = super::chitchat_kv_to_indexing_task( "indexer.task:01BX5ZZKBKACTAV9WEVGEMMVS0", - "my_index:00000000000000000000000000:my_source:00000000000000000001,\ + "my_index:00000000000000000000000000:my_source:42:00000000000000000001,\ 00000000000000000003", ) .unwrap(); + assert_eq!(task.params_fingerprint, 42); assert_eq!( task.pipeline_uid(), PipelineUid::from_str("01BX5ZZKBKACTAV9WEVGEMMVS0").unwrap() diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 6cfbdcfc7ac..cc5f2b33fc5 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -676,10 +676,8 @@ impl IngestController { model: &mut ControlPlaneModel, progress: &Progress, ) -> MetastoreResult<()> { - const NUM_PERMITS: u64 = 1; - if !model - .acquire_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS) + .acquire_scaling_permits(&source_uid, ScalingMode::Up) .unwrap_or(false) { return Ok(()); @@ -698,7 +696,7 @@ impl IngestController { if successful_source_uids.is_empty() { // We did not manage to create the shard. // We can release our permit. - model.release_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS); + model.release_scaling_permits(&source_uid, ScalingMode::Up); warn!( index_uid=%source_uid.index_uid, source_id=%source_uid.source_id, @@ -722,7 +720,7 @@ impl IngestController { source_id=%source_uid.source_id, "scaling up number of shards to {new_num_open_shards} failed: {metastore_error:?}" ); - model.release_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS); + model.release_scaling_permits(&source_uid, ScalingMode::Up); Err(metastore_error) } } @@ -860,10 +858,12 @@ impl IngestController { model: &mut ControlPlaneModel, progress: &Progress, ) -> MetastoreResult<()> { - const NUM_PERMITS: u64 = 1; + if shard_stats.num_open_shards == 0 { + return Ok(()); + } if !model - .acquire_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS) + .acquire_scaling_permits(&source_uid, ScalingMode::Down) .unwrap_or(false) { return Ok(()); @@ -876,12 +876,12 @@ impl IngestController { "scaling down number of shards to {new_num_open_shards}" ); let Some((leader_id, shard_id)) = find_scale_down_candidate(&source_uid, model) else { - model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS); + model.release_scaling_permits(&source_uid, ScalingMode::Down); return Ok(()); }; info!("scaling down shard {shard_id} from {leader_id}"); let Some(ingester) = self.ingester_pool.get(&leader_id) else { - model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS); + model.release_scaling_permits(&source_uid, ScalingMode::Down); return Ok(()); }; let shard_pkeys = vec![ShardPKey { @@ -896,7 +896,7 @@ impl IngestController { .await { warn!("failed to scale down number of shards: {error}"); - model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS); + model.release_scaling_permits(&source_uid, ScalingMode::Down); return Ok(()); } model.close_shards(&source_uid, &[shard_id]); diff --git a/quickwit/quickwit-control-plane/src/model/mod.rs b/quickwit/quickwit-control-plane/src/model/mod.rs index ca314233f6a..d4e02f67c2c 100644 --- a/quickwit/quickwit-control-plane/src/model/mod.rs +++ b/quickwit/quickwit-control-plane/src/model/mod.rs @@ -378,10 +378,9 @@ impl ControlPlaneModel { &mut self, source_uid: &SourceUid, scaling_mode: ScalingMode, - num_permits: u64, ) -> Option { self.shard_table - .acquire_scaling_permits(source_uid, scaling_mode, num_permits) + .acquire_scaling_permits(source_uid, scaling_mode) } pub fn drain_scaling_permits(&mut self, source_uid: &SourceUid, scaling_mode: ScalingMode) { @@ -389,14 +388,9 @@ impl ControlPlaneModel { .drain_scaling_permits(source_uid, scaling_mode) } - pub fn release_scaling_permits( - &mut self, - source_uid: &SourceUid, - scaling_mode: ScalingMode, - num_permits: u64, - ) { + pub fn release_scaling_permits(&mut self, source_uid: &SourceUid, scaling_mode: ScalingMode) { self.shard_table - .release_scaling_permits(source_uid, scaling_mode, num_permits) + .release_scaling_permits(source_uid, scaling_mode) } } diff --git a/quickwit/quickwit-control-plane/src/model/shard_table.rs b/quickwit/quickwit-control-plane/src/model/shard_table.rs index 29c579cddcd..00b440dec50 100644 --- a/quickwit/quickwit-control-plane/src/model/shard_table.rs +++ b/quickwit/quickwit-control-plane/src/model/shard_table.rs @@ -544,14 +544,13 @@ impl ShardTable { &mut self, source_uid: &SourceUid, scaling_mode: ScalingMode, - num_permits: u64, ) -> Option { let table_entry = self.table_entries.get_mut(source_uid)?; let scaling_rate_limiter = match scaling_mode { ScalingMode::Up => &mut table_entry.scaling_up_rate_limiter, ScalingMode::Down => &mut table_entry.scaling_down_rate_limiter, }; - Some(scaling_rate_limiter.acquire(num_permits)) + Some(scaling_rate_limiter.acquire(1)) } pub fn drain_scaling_permits(&mut self, source_uid: &SourceUid, scaling_mode: ScalingMode) { @@ -564,18 +563,13 @@ impl ShardTable { } } - pub fn release_scaling_permits( - &mut self, - source_uid: &SourceUid, - scaling_mode: ScalingMode, - num_permits: u64, - ) { + pub fn release_scaling_permits(&mut self, source_uid: &SourceUid, scaling_mode: ScalingMode) { if let Some(table_entry) = self.table_entries.get_mut(source_uid) { let scaling_rate_limiter = match scaling_mode { ScalingMode::Up => &mut table_entry.scaling_up_rate_limiter, ScalingMode::Down => &mut table_entry.scaling_down_rate_limiter, }; - scaling_rate_limiter.release(num_permits); + scaling_rate_limiter.release(1); } } } @@ -1058,7 +1052,7 @@ mod tests { source_id: source_id.clone(), }; assert!(shard_table - .acquire_scaling_permits(&source_uid, ScalingMode::Up, 1) + .acquire_scaling_permits(&source_uid, ScalingMode::Up) .is_none()); shard_table.add_source(&index_uid, &source_id); @@ -1071,7 +1065,7 @@ mod tests { .available_permits(); assert!(shard_table - .acquire_scaling_permits(&source_uid, ScalingMode::Up, 1) + .acquire_scaling_permits(&source_uid, ScalingMode::Up) .unwrap()); let new_available_permits = shard_table @@ -1096,7 +1090,7 @@ mod tests { source_id: source_id.clone(), }; assert!(shard_table - .acquire_scaling_permits(&source_uid, ScalingMode::Down, 1) + .acquire_scaling_permits(&source_uid, ScalingMode::Down) .is_none()); shard_table.add_source(&index_uid, &source_id); @@ -1109,7 +1103,7 @@ mod tests { .available_permits(); assert!(shard_table - .acquire_scaling_permits(&source_uid, ScalingMode::Down, 1) + .acquire_scaling_permits(&source_uid, ScalingMode::Down) .unwrap()); let new_available_permits = shard_table @@ -1143,10 +1137,10 @@ mod tests { .available_permits(); assert!(shard_table - .acquire_scaling_permits(&source_uid, ScalingMode::Up, 1) + .acquire_scaling_permits(&source_uid, ScalingMode::Up) .unwrap()); - shard_table.release_scaling_permits(&source_uid, ScalingMode::Up, 1); + shard_table.release_scaling_permits(&source_uid, ScalingMode::Up); let new_available_permits = shard_table .table_entries @@ -1179,10 +1173,10 @@ mod tests { .available_permits(); assert!(shard_table - .acquire_scaling_permits(&source_uid, ScalingMode::Down, 1) + .acquire_scaling_permits(&source_uid, ScalingMode::Down) .unwrap()); - shard_table.release_scaling_permits(&source_uid, ScalingMode::Down, 1); + shard_table.release_scaling_permits(&source_uid, ScalingMode::Down); let new_available_permits = shard_table .table_entries diff --git a/quickwit/quickwit-datetime/Cargo.toml b/quickwit/quickwit-datetime/Cargo.toml index c30e6b029e1..004e959a348 100644 --- a/quickwit/quickwit-datetime/Cargo.toml +++ b/quickwit/quickwit-datetime/Cargo.toml @@ -13,7 +13,6 @@ license.workspace = true [dependencies] anyhow = { workspace = true } itertools = { workspace = true } -ouroboros = "0.18.0" serde = { workspace = true } serde_json = { workspace = true } tantivy = { workspace = true } diff --git a/quickwit/quickwit-datetime/src/date_time_format.rs b/quickwit/quickwit-datetime/src/date_time_format.rs index 42b282ef6db..1758e289113 100644 --- a/quickwit/quickwit-datetime/src/date_time_format.rs +++ b/quickwit/quickwit-datetime/src/date_time_format.rs @@ -20,138 +20,14 @@ use std::fmt::Display; use std::str::FromStr; -use ouroboros::self_referencing; use serde::de::Error; use serde::{Deserialize, Deserializer, Serialize}; use serde_json::Value as JsonValue; -use time::error::Format; use time::format_description::well_known::{Iso8601, Rfc2822, Rfc3339}; -use time::format_description::FormatItem; -use time::parsing::Parsed; -use time::{Month, OffsetDateTime, PrimitiveDateTime}; -use time_fmt::parse::time_format_item::parse_to_format_item; - -use crate::TantivyDateTime; - -/// A date time parser that holds the format specification `Vec`. -#[self_referencing] -pub struct StrptimeParser { - strptime_format: String, - with_timezone: bool, - #[borrows(strptime_format)] - #[covariant] - items: Vec>, -} - -impl FromStr for StrptimeParser { - type Err = String; - - fn from_str(strptime_format: &str) -> Result { - StrptimeParser::try_new( - strptime_format.to_string(), - strptime_format.to_lowercase().contains("%z"), - |strptime_format: &String| { - parse_to_format_item(strptime_format).map_err(|error| { - format!("invalid strptime format `{strptime_format}`: {error}") - }) - }, - ) - } -} - -impl StrptimeParser { - /// Parse a given date according to the datetime format specified during the StrptimeParser - /// creation. If the date format does not provide a specific a time, the time will be set to - /// 00:00:00. - fn parse_primitive_date_time(&self, date_time_str: &str) -> anyhow::Result { - let mut parsed = Parsed::new(); - if !parsed - .parse_items(date_time_str.as_bytes(), self.borrow_items())? - .is_empty() - { - anyhow::bail!( - "datetime string `{}` does not match strptime format `{}`", - date_time_str, - self.borrow_strptime_format() - ); - } - // The parsed datetime contains a date but seems to be missing "time". - // We complete it artificially with 00:00:00. - if parsed.hour_24().is_none() - && !(parsed.hour_12().is_some() && parsed.hour_12_is_pm().is_some()) - { - parsed.set_hour_24(0u8); - parsed.set_minute(0u8); - parsed.set_second(0u8); - } - if parsed.year().is_none() { - let now = OffsetDateTime::now_utc(); - let year = infer_year(parsed.month(), now.month(), now.year()); - parsed.set_year(year); - } - let date_time = parsed.try_into()?; - Ok(date_time) - } - - pub fn parse_date_time(&self, date_time_str: &str) -> Result { - if *self.borrow_with_timezone() { - OffsetDateTime::parse(date_time_str, self.borrow_items()).map_err(|err| err.to_string()) - } else { - self.parse_primitive_date_time(date_time_str) - .map(|date_time| date_time.assume_utc()) - .map_err(|err| err.to_string()) - } - } - - pub fn format_date_time(&self, date_time: &OffsetDateTime) -> Result { - date_time.format(self.borrow_items()) - } -} +use time::Month; -impl Clone for StrptimeParser { - fn clone(&self) -> Self { - // `self.format` is already known to be a valid format. - Self::from_str(self.borrow_strptime_format().as_str()).unwrap() - } -} - -impl PartialEq for StrptimeParser { - fn eq(&self, other: &Self) -> bool { - self.borrow_strptime_format() == other.borrow_strptime_format() - } -} - -impl Eq for StrptimeParser {} - -impl std::fmt::Debug for StrptimeParser { - fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - formatter - .debug_struct("StrptimeParser") - .field("format", &self.borrow_strptime_format()) - .finish() - } -} - -impl std::hash::Hash for StrptimeParser { - fn hash(&self, state: &mut H) { - self.borrow_strptime_format().hash(state); - } -} - -// `Strftime` format special characters. -// These characters are taken from the parsing crate we use for compatibility. -const STRFTIME_FORMAT_MARKERS: [&str; 36] = [ - "%a", "%A", "%b", "%B", "%c", "%C", "%d", "%D", "%e", "%f", "%F", "%h", "%H", "%I", "%j", "%k", - "%l", "%m", "%M", "%n", "%p", "%P", "%r", "%R", "%S", "%t", "%T", "%U", "%w", "%W", "%x", "%X", - "%y", "%Y", "%z", "%Z", -]; - -// Checks if a format contains `strftime` special characters. -fn is_strftime_formatting(format_str: &str) -> bool { - STRFTIME_FORMAT_MARKERS - .iter() - .any(|marker| format_str.contains(marker)) -} +use crate::java_date_time_format::is_strftime_formatting; +use crate::{StrptimeParser, TantivyDateTime}; /// Specifies the datetime and unix timestamp formats to use when parsing date strings. #[derive(Clone, Debug, Eq, PartialEq, Hash, Default)] @@ -170,7 +46,7 @@ impl DateTimeInputFormat { DateTimeInputFormat::Iso8601 => "iso8601", DateTimeInputFormat::Rfc2822 => "rfc2822", DateTimeInputFormat::Rfc3339 => "rfc3339", - DateTimeInputFormat::Strptime(parser) => parser.borrow_strptime_format(), + DateTimeInputFormat::Strptime(parser) => parser.strptime_format.as_str(), DateTimeInputFormat::Timestamp => "unix_timestamp", } } @@ -198,7 +74,7 @@ impl FromStr for DateTimeInputFormat { format must contain at least one `strftime` special characters" )); } - DateTimeInputFormat::Strptime(StrptimeParser::from_str(date_time_format_str)?) + DateTimeInputFormat::Strptime(StrptimeParser::from_strptime(date_time_format_str)?) } }; Ok(date_time_format) @@ -241,7 +117,7 @@ impl DateTimeOutputFormat { DateTimeOutputFormat::Iso8601 => "iso8601", DateTimeOutputFormat::Rfc2822 => "rfc2822", DateTimeOutputFormat::Rfc3339 => "rfc3339", - DateTimeOutputFormat::Strptime(parser) => parser.borrow_strptime_format(), + DateTimeOutputFormat::Strptime(parser) => parser.strptime_format.as_str(), DateTimeOutputFormat::TimestampSecs => "unix_timestamp_secs", DateTimeOutputFormat::TimestampMillis => "unix_timestamp_millis", DateTimeOutputFormat::TimestampMicros => "unix_timestamp_micros", @@ -300,7 +176,7 @@ impl FromStr for DateTimeOutputFormat { format must contain at least one `strftime` special characters" )); } - DateTimeOutputFormat::Strptime(StrptimeParser::from_str(date_time_format_str)?) + DateTimeOutputFormat::Strptime(StrptimeParser::from_strptime(date_time_format_str)?) } }; Ok(date_time_format) @@ -341,7 +217,6 @@ pub(super) fn infer_year( #[cfg(test)] mod tests { - use time::macros::datetime; use time::Month; use super::*; @@ -462,20 +337,6 @@ mod tests { } } - #[test] - fn test_strictly_parse_datetime_format() { - let parser = StrptimeParser::from_str("%Y-%m-%d").unwrap(); - assert_eq!( - parser.parse_date_time("2021-01-01").unwrap(), - datetime!(2021-01-01 00:00:00 UTC) - ); - let error = parser.parse_date_time("2021-01-01TABC").unwrap_err(); - assert_eq!( - error, - "datetime string `2021-01-01TABC` does not match strptime format `%Y-%m-%d`" - ); - } - #[test] fn test_infer_year() { let inferred_year = infer_year(None, Month::January, 2024); diff --git a/quickwit/quickwit-datetime/src/date_time_parsing.rs b/quickwit/quickwit-datetime/src/date_time_parsing.rs index 14c1fa9be90..54e8d4b88bb 100644 --- a/quickwit/quickwit-datetime/src/date_time_parsing.rs +++ b/quickwit/quickwit-datetime/src/date_time_parsing.rs @@ -179,8 +179,6 @@ pub fn parse_timestamp(timestamp: i64) -> Result { #[cfg(test)] mod tests { - use std::str::FromStr; - use time::macros::datetime; use time::Month; @@ -262,7 +260,7 @@ mod tests { ), ]; for (fmt, date_time_str, expected) in test_data { - let parser = StrptimeParser::from_str(fmt).unwrap(); + let parser = StrptimeParser::from_strptime(fmt).unwrap(); let result = parser.parse_date_time(date_time_str); if let Err(error) = &result { panic!( @@ -276,14 +274,14 @@ mod tests { #[test] fn test_parse_date_without_time() { - let strptime_parser = StrptimeParser::from_str("%Y-%m-%d").unwrap(); + let strptime_parser = StrptimeParser::from_strptime("%Y-%m-%d").unwrap(); let date = strptime_parser.parse_date_time("2012-05-21").unwrap(); assert_eq!(date, datetime!(2012-05-21 00:00:00 UTC)); } #[test] fn test_parse_date_am_pm_hour_not_zeroed() { - let strptime_parser = StrptimeParser::from_str("%Y-%m-%d %I:%M:%S %p").unwrap(); + let strptime_parser = StrptimeParser::from_strptime("%Y-%m-%d %I:%M:%S %p").unwrap(); let date = strptime_parser .parse_date_time("2012-05-21 10:05:12 pm") .unwrap(); @@ -309,13 +307,13 @@ mod tests { DateTimeInputFormat::Rfc2822, DateTimeInputFormat::Rfc3339, DateTimeInputFormat::Strptime( - StrptimeParser::from_str("%Y-%m-%d %H:%M:%S").unwrap(), + StrptimeParser::from_strptime("%Y-%m-%d %H:%M:%S").unwrap(), ), DateTimeInputFormat::Strptime( - StrptimeParser::from_str("%Y/%m/%d %H:%M:%S").unwrap(), + StrptimeParser::from_strptime("%Y/%m/%d %H:%M:%S").unwrap(), ), DateTimeInputFormat::Strptime( - StrptimeParser::from_str("%Y/%m/%d %H:%M:%S %z").unwrap(), + StrptimeParser::from_strptime("%Y/%m/%d %H:%M:%S %z").unwrap(), ), DateTimeInputFormat::Timestamp, ], @@ -452,7 +450,7 @@ mod tests { DateTimeInputFormat::Iso8601, DateTimeInputFormat::Rfc3339, DateTimeInputFormat::Strptime( - StrptimeParser::from_str("%Y-%m-%d %H:%M:%S.%f").unwrap(), + StrptimeParser::from_strptime("%Y-%m-%d %H:%M:%S.%f").unwrap(), ), ], ) diff --git a/quickwit/quickwit-datetime/src/java_date_time_format.rs b/quickwit/quickwit-datetime/src/java_date_time_format.rs new file mode 100644 index 00000000000..1cc035c90f3 --- /dev/null +++ b/quickwit/quickwit-datetime/src/java_date_time_format.rs @@ -0,0 +1,817 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::collections::HashMap; +use std::num::NonZeroU8; +use std::sync::OnceLock; + +use time::error::{Format, TryFromParsed}; +use time::format_description::modifier::{ + Day, Hour, Minute, Month as MonthModifier, Padding, Second, Subsecond, SubsecondDigits, + WeekNumber, WeekNumberRepr, Weekday, WeekdayRepr, Year, YearRepr, +}; +use time::format_description::{Component, OwnedFormatItem}; +use time::parsing::Parsed; +use time::{Month, OffsetDateTime, PrimitiveDateTime, UtcOffset}; +use time_fmt::parse::time_format_item::parse_to_format_item; + +use crate::date_time_format; + +const JAVA_DATE_FORMAT_TOKENS: &[&str] = &[ + "yyyy", + "xxxx", + "xx[xx]", + "SSSSSSSSS", // For nanoseconds + "SSSSSSS", // For microseconds + "SSSSSS", // For fractional seconds up to six digits + "SSSSS", + "SSSS", + "SSS", + "SS", + "ZZ", + "xx", + "ww", + "w[w]", + "yy", + "MM", + "dd", + "HH", + "hh", + "kk", + "mm", + "ss", + "aa", + "a", + "w", + "M", + "d", + "H", + "h", + "k", + "m", + "s", + "S", + "Z", + "e", +]; + +fn literal(s: &[u8]) -> OwnedFormatItem { + // builds a boxed slice from a slice + let boxed_slice: Box<[u8]> = s.to_vec().into_boxed_slice(); + OwnedFormatItem::Literal(boxed_slice) +} + +#[inline] +fn get_padding(ptn: &str) -> Padding { + if ptn.len() == 2 { + Padding::Zero + } else { + Padding::None + } +} + +fn build_zone_offset(_: &str) -> Option { + // 'Z' literal to represent UTC offset + let z_literal = OwnedFormatItem::Literal(Box::from(b"Z".as_ref())); + + // Offset in '+/-HH:MM' format + let offset_with_delimiter_items: Box<[OwnedFormatItem]> = vec![ + OwnedFormatItem::Component(Component::OffsetHour(Default::default())), + OwnedFormatItem::Literal(Box::from(b":".as_ref())), + OwnedFormatItem::Component(Component::OffsetMinute(Default::default())), + ] + .into_boxed_slice(); + let offset_with_delimiter_compound = OwnedFormatItem::Compound(offset_with_delimiter_items); + + // Offset in '+/-HHMM' format + let offset_items: Box<[OwnedFormatItem]> = vec![ + OwnedFormatItem::Component(Component::OffsetHour(Default::default())), + OwnedFormatItem::Component(Component::OffsetMinute(Default::default())), + ] + .into_boxed_slice(); + let offset_compound = OwnedFormatItem::Compound(offset_items); + + Some(OwnedFormatItem::First( + vec![z_literal, offset_with_delimiter_compound, offset_compound].into_boxed_slice(), + )) +} + +fn build_year_item(ptn: &str) -> Option { + let mut full_year = Year::default(); + full_year.repr = YearRepr::Full; + let full_year_component = OwnedFormatItem::Component(Component::Year(full_year)); + + let mut short_year = Year::default(); + short_year.repr = YearRepr::LastTwo; + let short_year_component = OwnedFormatItem::Component(Component::Year(short_year)); + + if ptn.len() == 4 { + Some(full_year_component) + } else if ptn.len() == 2 { + Some(short_year_component) + } else { + Some(OwnedFormatItem::First( + vec![full_year_component, short_year_component].into_boxed_slice(), + )) + } +} + +fn build_week_based_year_item(ptn: &str) -> Option { + // TODO no `Component` for that + build_year_item(ptn) +} + +fn build_month_item(ptn: &str) -> Option { + let mut month: MonthModifier = Default::default(); + month.padding = get_padding(ptn); + Some(OwnedFormatItem::Component(Component::Month(month))) +} + +fn build_day_item(ptn: &str) -> Option { + let mut day = Day::default(); + day.padding = get_padding(ptn); + Some(OwnedFormatItem::Component(Component::Day(day))) +} + +fn build_day_of_week_item(_: &str) -> Option { + let mut weekday = Weekday::default(); + weekday.repr = WeekdayRepr::Monday; + weekday.one_indexed = false; + Some(OwnedFormatItem::Component(Component::Weekday(weekday))) +} + +fn build_week_of_year_item(ptn: &str) -> Option { + let mut week_number = WeekNumber::default(); + week_number.repr = WeekNumberRepr::Monday; + week_number.padding = get_padding(ptn); + Some(OwnedFormatItem::Component(Component::WeekNumber( + week_number, + ))) +} + +fn build_hour_item(ptn: &str) -> Option { + let mut hour = Hour::default(); + hour.padding = get_padding(ptn); + hour.is_12_hour_clock = false; + Some(OwnedFormatItem::Component(Component::Hour(hour))) +} + +fn build_minute_item(ptn: &str) -> Option { + let mut minute: Minute = Default::default(); + minute.padding = get_padding(ptn); + Some(OwnedFormatItem::Component(Component::Minute(minute))) +} + +fn build_second_item(ptn: &str) -> Option { + let mut second: Second = Default::default(); + second.padding = get_padding(ptn); + Some(OwnedFormatItem::Component(Component::Second(second))) +} + +fn build_fraction_of_second_item(_ptn: &str) -> Option { + let mut subsecond: Subsecond = Default::default(); + subsecond.digits = SubsecondDigits::OneOrMore; + Some(OwnedFormatItem::Component(Component::Subsecond(subsecond))) +} + +fn parse_java_datetime_format_items_recursive( + chars: &mut std::iter::Peekable, +) -> Result, String> { + let mut items = Vec::new(); + + while let Some(&c) = chars.peek() { + match c { + '[' => { + chars.next(); + let optional_items = parse_java_datetime_format_items_recursive(chars)?; + items.push(OwnedFormatItem::Optional(Box::new( + OwnedFormatItem::Compound(optional_items.into_boxed_slice()), + ))); + } + ']' => { + chars.next(); + break; + } + '\'' => { + chars.next(); + let mut literal_str = String::new(); + while let Some(&next_c) = chars.peek() { + if next_c == '\'' { + chars.next(); + break; + } else { + literal_str.push(next_c); + chars.next(); + } + } + items.push(literal(literal_str.as_bytes())); + } + _ => { + if let Some(format_item) = match_java_date_format_token(chars)? { + items.push(format_item); + } else { + // Treat as a literal character + items.push(literal(c.to_string().as_bytes())); + chars.next(); + } + } + } + } + + Ok(items) +} + +// Elasticsearch/OpenSearch uses a set of preconfigured formats, more information could be found +// here https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-date-format.html +fn match_java_date_format_token( + chars: &mut std::iter::Peekable, +) -> Result, String> { + if chars.peek().is_none() { + return Ok(None); + } + + let remaining: String = chars.clone().collect(); + + // Try to match the longest possible token + for token in JAVA_DATE_FORMAT_TOKENS { + if remaining.starts_with(token) { + for _ in 0..token.len() { + chars.next(); + } + + let format_item = match *token { + "yyyy" | "yy" => build_year_item(token), + "xxxx" | "xx[xx]" | "xx" => build_week_based_year_item(token), + "MM" | "M" => build_month_item(token), + "dd" | "d" => build_day_item(token), + "HH" | "H" => build_hour_item(token), + "mm" | "m" => build_minute_item(token), + "ss" | "s" => build_second_item(token), + "SSSSSSSSS" | "SSSSSSS" | "SSSSSS" | "SSSSS" | "SSSS" | "SSS" | "SS" | "S" => { + build_fraction_of_second_item(token) + } + "Z" => build_zone_offset(token), + "ww" | "w[w]" | "w" => build_week_of_year_item(token), + "e" => build_day_of_week_item(token), + _ => return Err(format!("Unrecognized token '{}'", token)), + }; + return Ok(format_item); + } + } + + Ok(None) +} + +// Check if the given date time format is a common alias and replace it with the +// Java date format it is mapped to, if any. +// If the java_datetime_format is not an alias, it is expected to be a +// java date time format and should be returned as is. +fn resolve_java_datetime_format_alias(java_datetime_format: &str) -> &str { + static JAVA_DATE_FORMAT_ALIASES: OnceLock> = + OnceLock::new(); + let java_datetime_format_map = JAVA_DATE_FORMAT_ALIASES.get_or_init(|| { + let mut m = HashMap::new(); + m.insert("date_optional_time", "yyyy-MM-dd['T'HH:mm:ss.SSSZ]"); + m.insert( + "strict_date_optional_time", + "yyyy[-MM[-dd['T'HH[:mm[:ss[.SSS[Z]]]]]]]", + ); + m.insert( + "strict_date_optional_time_nanos", + "yyyy[-MM[-dd['T'HH:mm:ss.SSSSSSZ]]]", + ); + m.insert("basic_date", "yyyyMMdd"); + + m.insert("strict_basic_week_date", "xxxx'W'wwe"); + m.insert("basic_week_date", "xx[xx]'W'wwe"); + + m.insert("strict_basic_week_date_time", "xxxx'W'wwe'T'HHmmss.SSSZ"); + m.insert("basic_week_date_time", "xx[xx]'W'wwe'T'HHmmss.SSSZ"); + + m.insert( + "strict_basic_week_date_time_no_millis", + "xxxx'W'wwe'T'HHmmssZ", + ); + m.insert("basic_week_date_time_no_millis", "xx[xx]'W'wwe'T'HHmmssZ"); + + m.insert("strict_week_date", "xxxx-'W'ww-e"); + m.insert("week_date", "xxxx-'W'w[w]-e"); + m + }); + java_datetime_format_map + .get(java_datetime_format) + .copied() + .unwrap_or(java_datetime_format) +} + +/// A date time parser that holds the format specification `Vec`. +#[derive(Clone)] +pub struct StrptimeParser { + pub(crate) strptime_format: String, + items: Box<[OwnedFormatItem]>, +} + +pub fn parse_java_datetime_format_items( + java_datetime_format: &str, +) -> Result, String> { + let mut chars = java_datetime_format.chars().peekable(); + let items = parse_java_datetime_format_items_recursive(&mut chars)?; + Ok(items.into_boxed_slice()) +} + +impl StrptimeParser { + /// Parse a date assume UTC if unspecified. + /// See `parse_date_time_with_default_timezone` for more details. + pub fn parse_date_time(&self, date_time_str: &str) -> Result { + self.parse_date_time_with_default_timezone(date_time_str, UtcOffset::UTC) + } + + /// Parse a date. If no timezone is specified we will assume the timezone passed as + /// `default_offset`. If the date is missing, it will be automatically set to 00:00:00. + pub fn parse_date_time_with_default_timezone( + &self, + date_time_str: &str, + default_offset: UtcOffset, + ) -> Result { + let mut parsed = Parsed::new(); + if !parsed + .parse_items(date_time_str.as_bytes(), &self.items) + .map_err(|err| err.to_string())? + .is_empty() + { + return Err(format!( + "datetime string `{}` does not match strptime format `{}`", + date_time_str, &self.strptime_format + )); + } + + // The parsed datetime contains a date but seems to be missing "time". + // We complete it artificially with 00:00:00. + if parsed.hour_24().is_none() + && !(parsed.hour_12().is_some() && parsed.hour_12_is_pm().is_some()) + { + parsed.set_hour_24(0u8); + parsed.set_minute(0u8); + parsed.set_second(0u8); + } + + if parsed.year().is_none() { + let now = OffsetDateTime::now_utc(); + let year = date_time_format::infer_year(parsed.month(), now.month(), now.year()); + parsed.set_year(year); + } + + if parsed.day().is_none() && parsed.monday_week_number().is_none() { + parsed.set_day(NonZeroU8::try_from(1u8).unwrap()); + } + + if parsed.month().is_none() && parsed.monday_week_number().is_none() { + parsed.set_month(Month::January); + } + + if parsed.offset_hour().is_some() { + let offset_datetime: OffsetDateTime = parsed + .try_into() + .map_err(|err: TryFromParsed| err.to_string())?; + return Ok(offset_datetime); + } + let primitive_date_time: PrimitiveDateTime = parsed + .try_into() + .map_err(|err: TryFromParsed| err.to_string())?; + Ok(primitive_date_time.assume_offset(default_offset)) + } + + pub fn format_date_time(&self, date_time: &OffsetDateTime) -> Result { + date_time.format(&self.items) + } + + pub fn from_strptime(strptime_format: &str) -> Result { + let items: Box<[OwnedFormatItem]> = parse_to_format_item(strptime_format) + .map_err(|err| format!("invalid strptime format `{strptime_format}`: {err}"))? + .into_iter() + .map(|item| item.into()) + .collect::>() + .into_boxed_slice(); + Ok(StrptimeParser::new(strptime_format.to_string(), items)) + } + + pub fn from_java_datetime_format(java_datetime_format: &str) -> Result { + let java_datetime_format_resolved = + resolve_java_datetime_format_alias(java_datetime_format); + let items: Box<[OwnedFormatItem]> = + parse_java_datetime_format_items(java_datetime_format_resolved)?; + Ok(StrptimeParser::new(java_datetime_format.to_string(), items)) + } + + fn new(strptime_format: String, items: Box<[OwnedFormatItem]>) -> Self { + StrptimeParser { + strptime_format, + items, + } + } +} + +impl PartialEq for StrptimeParser { + fn eq(&self, other: &Self) -> bool { + self.strptime_format == other.strptime_format + } +} + +impl Eq for StrptimeParser {} + +impl std::fmt::Debug for StrptimeParser { + fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter + .debug_struct("StrptimeParser") + .field("format", &self.strptime_format) + .finish() + } +} + +impl std::hash::Hash for StrptimeParser { + fn hash(&self, state: &mut H) { + self.strptime_format.hash(state); + } +} + +// `Strftime` format special characters. +// These characters are taken from the parsing crate we use for compatibility. +const STRFTIME_FORMAT_MARKERS: [&str; 36] = [ + "%a", "%A", "%b", "%B", "%c", "%C", "%d", "%D", "%e", "%f", "%F", "%h", "%H", "%I", "%j", "%k", + "%l", "%m", "%M", "%n", "%p", "%P", "%r", "%R", "%S", "%t", "%T", "%U", "%w", "%W", "%x", "%X", + "%y", "%Y", "%z", "%Z", +]; + +// Checks if a format contains `strftime` special characters. +pub fn is_strftime_formatting(format_str: &str) -> bool { + STRFTIME_FORMAT_MARKERS + .iter() + .any(|marker| format_str.contains(marker)) +} + +#[cfg(test)] +mod tests { + use time::macros::datetime; + + use super::*; + use crate::java_date_time_format::parse_java_datetime_format_items; + + #[test] + fn test_parse_datetime_format_missing_time() { + let parser = StrptimeParser::from_strptime("%Y-%m-%d").unwrap(); + assert_eq!( + parser.parse_date_time("2021-01-01").unwrap(), + datetime!(2021-01-01 00:00:00 UTC) + ); + } + + #[test] + fn test_parse_datetime_format_strict_on_trailing_data() { + let parser = StrptimeParser::from_strptime("%Y-%m-%d").unwrap(); + let error = parser.parse_date_time("2021-01-01TABC").unwrap_err(); + assert_eq!( + error, + "datetime string `2021-01-01TABC` does not match strptime format `%Y-%m-%d`" + ); + } + + #[test] + fn test_parse_strptime_with_timezone() { + let parser = StrptimeParser::from_strptime("%Y-%m-%dT%H:%M:%S %z").unwrap(); + let offset_datetime = parser + .parse_date_time("2021-01-01T11:00:03 +07:00") + .unwrap(); + assert_eq!(offset_datetime, datetime!(2021-01-01 11:00:03 +7)); + } + + #[track_caller] + fn test_parse_java_datetime_aux( + java_date_time_format: &str, + date_str: &str, + expected_datetime: OffsetDateTime, + ) { + let parser = StrptimeParser::from_java_datetime_format(java_date_time_format).unwrap(); + let datetime = parser.parse_date_time(date_str).unwrap(); + assert_eq!(datetime, expected_datetime); + } + + #[test] + fn test_parse_java_datetime_format() { + test_parse_java_datetime_aux("yyyyMMdd", "20210101", datetime!(2021-01-01 00:00:00 UTC)); + test_parse_java_datetime_aux( + "yyyy MM dd", + "2021 01 01", + datetime!(2021-01-01 00:00:00 UTC), + ); + test_parse_java_datetime_aux( + "yyyy!MM?dd", + "2021!01?01", + datetime!(2021-01-01 00:00:00 UTC), + ); + test_parse_java_datetime_aux( + "yyyy!MM?dd'T'HH:", + "2021!01?01T13:", + datetime!(2021-01-01 13:00:00 UTC), + ); + test_parse_java_datetime_aux( + "yyyy!MM?dd['T'[HH:]]", + "2021!01?01", + datetime!(2021-01-01 00:00:00 UTC), + ); + test_parse_java_datetime_aux( + "yyyy!MM?dd['T'[HH:]", + "2021!01?01T", + datetime!(2021-01-01 00:00:00 UTC), + ); + test_parse_java_datetime_aux( + "yyyy!MM?dd['T'[HH:]]", + "2021!01?01T13:", + datetime!(2021-01-01 13:00:00 UTC), + ); + } + + #[test] + fn test_parse_java_missing_time() { + test_parse_java_datetime_aux( + "yyyy-MM-dd", + "2021-01-01", + datetime!(2021-01-01 00:00:00 UTC), + ); + } + + #[test] + fn test_parse_java_optional_missing_time() { + test_parse_java_datetime_aux( + "yyyy-MM-dd[ HH:mm:ss]", + "2021-01-01", + datetime!(2021-01-01 00:00:00 UTC), + ); + test_parse_java_datetime_aux( + "yyyy-MM-dd[ HH:mm:ss]", + "2021-01-01 12:34:56", + datetime!(2021-01-01 12:34:56 UTC), + ); + } + + #[test] + fn test_parse_java_datetime_format_aliases() { + test_parse_java_datetime_aux( + "date_optional_time", + "2021-01-01", + datetime!(2021-01-01 00:00:00 UTC), + ); + test_parse_java_datetime_aux( + "date_optional_time", + "2021-01-21T03:01:22.312+01:00", + datetime!(2021-01-21 03:01:22.312 +1), + ); + } + + #[test] + fn test_parse_java_week_formats() { + test_parse_java_datetime_aux( + "basic_week_date", + "2024W313", + datetime!(2024-08-01 0:00:00.0 +00:00:00), + ); + test_parse_java_datetime_aux( + "basic_week_date", + "24W313", + datetime!(2024-08-01 0:00:00.0 +00:00:00), + ); + // // ❌ 'the 'year' component could not be parsed' + // test_parse_java_datetime_aux( + // "basic_week_date", + // "1W313", + // datetime!(2018-08-02 0:00:00.0 +00:00:00), + // ); + test_parse_java_datetime_aux( + "basic_week_date_time", + "2018W313T121212.1Z", + datetime!(2018-08-02 12:12:12.1 +00:00:00), + ); + test_parse_java_datetime_aux( + "basic_week_date_time", + "2018W313T121212.123Z", + datetime!(2018-08-02 12:12:12.123 +00:00:00), + ); + test_parse_java_datetime_aux( + "basic_week_date_time", + "2018W313T121212.123456789Z", + datetime!(2018-08-02 12:12:12.123456789 +00:00:00), + ); + test_parse_java_datetime_aux( + "basic_week_date_time", + "2018W313T121212.123+0100", + datetime!(2018-08-02 12:12:12.123 +01:00:00), + ); + test_parse_java_datetime_aux( + "basic_week_date_time_no_millis", + "2018W313T121212Z", + datetime!(2018-08-02 12:12:12.0 +00:00:00), + ); + test_parse_java_datetime_aux( + "basic_week_date_time_no_millis", + "2018W313T121212+0100", + datetime!(2018-08-02 12:12:12.0 +01:00:00), + ); + test_parse_java_datetime_aux( + "basic_week_date_time_no_millis", + "2018W313T121212+01:00", + datetime!(2018-08-02 12:12:12.0 +01:00:00), + ); + + test_parse_java_datetime_aux( + "week_date", + "2012-W48-6", + datetime!(2012-12-02 0:00:00.0 +00:00:00), + ); + + test_parse_java_datetime_aux( + "week_date", + "2012-W01-6", + datetime!(2012-01-08 0:00:00.0 +00:00:00), + ); + + test_parse_java_datetime_aux( + "week_date", + "2012-W1-6", + datetime!(2012-01-08 0:00:00.0 +00:00:00), + ); + } + + #[test] + fn test_parse_java_strict_week_formats() { + test_parse_java_datetime_aux( + "strict_basic_week_date", + "2024W313", + datetime!(2024-08-01 0:00:00.0 +00:00:00), + ); + + test_parse_java_datetime_aux( + "strict_week_date", + "2012-W48-6", + datetime!(2012-12-02 0:00:00.0 +00:00:00), + ); + + test_parse_java_datetime_aux( + "strict_week_date", + "2012-W01-6", + datetime!(2012-01-08 0:00:00.0 +00:00:00), + ); + } + + #[test] + fn test_parse_strict_date_optional_time() { + let parser = + StrptimeParser::from_java_datetime_format("strict_date_optional_time").unwrap(); + let dates = [ + "2019", + "2019-03", + "2019-03-23", + "2019-03-23T21:34", + "2019-03-23T21:34:46", + "2019-03-23T21:34:46.123Z", + "2019-03-23T21:35:46.123+00:00", + "2019-03-23T21:36:46.123+03:00", + "2019-03-23T21:37:46.123+0300", + ]; + let expected = [ + datetime!(2019-01-01 00:00:00 UTC), + datetime!(2019-03-01 00:00:00 UTC), + datetime!(2019-03-23 00:00:00 UTC), + datetime!(2019-03-23 21:34 UTC), + datetime!(2019-03-23 21:34:46 UTC), + datetime!(2019-03-23 21:34:46.123 UTC), + datetime!(2019-03-23 21:35:46.123 UTC), + datetime!(2019-03-23 21:36:46.123 +03:00:00), + datetime!(2019-03-23 21:37:46.123 +03:00:00), + ]; + for (date_str, &expected_dt) in dates.iter().zip(expected.iter()) { + let parsed_dt = parser + .parse_date_time(date_str) + .unwrap_or_else(|e| panic!("Failed to parse {}: {}", date_str, e)); + assert_eq!(parsed_dt, expected_dt); + } + } + + #[test] + fn test_parse_strict_date_optional_time_nanos() { + let parser = + StrptimeParser::from_java_datetime_format("strict_date_optional_time_nanos").unwrap(); + let dates = [ + "2019", + "2019-03", + "2019-03-23", + "2019-03-23T21:34:46.123456789Z", + "2019-03-23T21:35:46.123456789+00:00", + "2019-03-23T21:36:46.123456789+03:00", + "2019-03-23T21:37:46.123456789+0300", + ]; + let expected = [ + datetime!(2019-01-01 00:00:00 UTC), + datetime!(2019-03-01 00:00:00 UTC), + datetime!(2019-03-23 00:00:00 UTC), + datetime!(2019-03-23 21:34:46.123456789 UTC), + datetime!(2019-03-23 21:35:46.123456789 UTC), + datetime!(2019-03-23 21:36:46.123456789 +03:00:00), + datetime!(2019-03-23 21:37:46.123456789 +03:00:00), + ]; + for (date_str, &expected_dt) in dates.iter().zip(expected.iter()) { + let parsed_dt = parser + .parse_date_time(date_str) + .unwrap_or_else(|e| panic!("Failed to parse {}: {}", date_str, e)); + assert_eq!(parsed_dt, expected_dt); + } + } + + #[test] + fn test_parse_java_datetime_format_items() { + let format_str = "xx[xx]'W'wwe"; + let result = parse_java_datetime_format_items(format_str).unwrap(); + + // We expect the tokens to be parsed as: + // - 'xx[xx]' (week-based year) with optional length + // - 'W' (literal) + // - 'ww' (week of year) + // - 'e' (day of week) + + assert_eq!(result.len(), 4); + + // Verify each token + match &result[0] { + OwnedFormatItem::First(boxed_slice) => { + assert_eq!(boxed_slice.len(), 2); + match (&boxed_slice[0], &boxed_slice[1]) { + ( + OwnedFormatItem::Component(Component::Year(_)), + OwnedFormatItem::Component(Component::Year(_)), + ) => {} + unexpected => { + panic!("Expected two Year components, but found: {:?}", unexpected) + } + } + } + unexpected => panic!( + "Expected First with two Year components, but found: {:?}", + unexpected + ), + } + + match &result[1] { + OwnedFormatItem::Literal(lit) => assert_eq!(lit.as_ref(), b"W"), + unexpected => panic!("Expected literal 'W', but found: {:?}", unexpected), + } + + match &result[2] { + OwnedFormatItem::Component(Component::WeekNumber(_)) => {} + unexpected => panic!("Expected WeekNumber component, but found: {:?}", unexpected), + } + + match &result[3] { + OwnedFormatItem::Component(Component::Weekday(_)) => {} + unexpected => panic!("Expected Weekday component, but found: {:?}", unexpected), + } + } + + #[test] + fn test_parse_java_datetime_format_with_literals() { + let format = "yyyy'T'Z-HHuu"; + let parser = StrptimeParser::from_java_datetime_format(format).unwrap(); + + let test_cases = [ + ("2023TZ-14uu", datetime!(2023-01-01 14:00:00 UTC)), + ("2024TZ-05uu", datetime!(2024-01-01 05:00:00 UTC)), + ("2025TZ-23uu", datetime!(2025-01-01 23:00:00 UTC)), + ]; + + for (input, expected) in test_cases.iter() { + let result = parser.parse_date_time(input).unwrap(); + assert_eq!(result, *expected, "Failed to parse {}", input); + } + + // Test error case + let error_case = "2023-1430"; + assert!( + parser.parse_date_time(error_case).is_err(), + "Expected error for input: {}", + error_case + ); + } +} diff --git a/quickwit/quickwit-datetime/src/lib.rs b/quickwit/quickwit-datetime/src/lib.rs index eb4d8c940ba..03003641dcc 100644 --- a/quickwit/quickwit-datetime/src/lib.rs +++ b/quickwit/quickwit-datetime/src/lib.rs @@ -19,9 +19,11 @@ mod date_time_format; mod date_time_parsing; +pub mod java_date_time_format; -pub use date_time_format::{DateTimeInputFormat, DateTimeOutputFormat, StrptimeParser}; +pub use date_time_format::{DateTimeInputFormat, DateTimeOutputFormat}; pub use date_time_parsing::{ parse_date_time_str, parse_timestamp, parse_timestamp_float, parse_timestamp_int, }; +pub use java_date_time_format::StrptimeParser; pub use tantivy::DateTime as TantivyDateTime; diff --git a/quickwit/quickwit-index-management/src/garbage_collection.rs b/quickwit/quickwit-index-management/src/garbage_collection.rs index 4a24aedc012..74971077b3b 100644 --- a/quickwit/quickwit-index-management/src/garbage_collection.rs +++ b/quickwit/quickwit-index-management/src/garbage_collection.rs @@ -25,6 +25,7 @@ use std::time::Duration; use anyhow::Context; use futures::{Future, StreamExt}; use itertools::Itertools; +use quickwit_common::metrics::IntCounter; use quickwit_common::pretty::PrettySample; use quickwit_common::Progress; use quickwit_metastore::{ @@ -44,6 +45,26 @@ use tracing::{error, instrument}; /// The maximum number of splits that the GC should delete per attempt. const DELETE_SPLITS_BATCH_SIZE: usize = 10_000; +pub struct GcMetrics { + pub deleted_splits: IntCounter, + pub deleted_bytes: IntCounter, + pub failed_splits: IntCounter, +} + +trait RecordGcMetrics { + fn record(&self, num_delete_splits: usize, num_deleted_bytes: u64, num_failed_splits: usize); +} + +impl RecordGcMetrics for Option { + fn record(&self, num_deleted_splits: usize, num_deleted_bytes: u64, num_failed_splits: usize) { + if let Some(metrics) = self { + metrics.deleted_splits.inc_by(num_deleted_splits as u64); + metrics.deleted_bytes.inc_by(num_deleted_bytes); + metrics.failed_splits.inc_by(num_failed_splits as u64); + } + } +} + /// [`DeleteSplitsError`] describes the errors that occurred during the deletion of splits from /// storage and metastore. #[derive(Error, Debug)] @@ -94,6 +115,7 @@ pub async fn run_garbage_collect( deletion_grace_period: Duration, dry_run: bool, progress_opt: Option<&Progress>, + metrics: Option, ) -> anyhow::Result { let grace_period_timestamp = OffsetDateTime::now_utc().unix_timestamp() - staged_grace_period.as_secs() as i64; @@ -170,6 +192,7 @@ pub async fn run_garbage_collect( metastore, indexes, progress_opt, + metrics, ) .await) } @@ -179,6 +202,7 @@ async fn delete_splits( storages: &HashMap>, metastore: MetastoreServiceClient, progress_opt: Option<&Progress>, + metrics: &Option, split_removal_info: &mut SplitRemovalInfo, ) -> Result<(), ()> { let mut delete_split_from_index_res_stream = @@ -219,9 +243,26 @@ async fn delete_splits( while let Some(delete_split_result) = delete_split_from_index_res_stream.next().await { match delete_split_result { Ok(entries) => { + let deleted_bytes = entries + .iter() + .map(|entry| entry.file_size_bytes.as_u64()) + .sum::(); + let deleted_splits_count = entries.len(); + + metrics.record(deleted_splits_count, deleted_bytes, 0); split_removal_info.removed_split_entries.extend(entries); } Err(delete_split_error) => { + let deleted_bytes = delete_split_error + .successes + .iter() + .map(|entry| entry.file_size_bytes.as_u64()) + .sum::(); + let deleted_splits_count = delete_split_error.successes.len(); + let failed_splits_count = delete_split_error.storage_failures.len() + + delete_split_error.metastore_failures.len(); + + metrics.record(deleted_splits_count, deleted_bytes, failed_splits_count); split_removal_info .removed_split_entries .extend(delete_split_error.successes); @@ -265,13 +306,14 @@ async fn list_splits_metadata( /// /// The aim of this is to spread the load out across a longer period /// rather than short, heavy bursts on the metastore and storage system itself. -#[instrument(skip(index_uids, storages, metastore, progress_opt), fields(num_indexes=%index_uids.len()))] +#[instrument(skip(index_uids, storages, metastore, progress_opt, metrics), fields(num_indexes=%index_uids.len()))] async fn delete_splits_marked_for_deletion_several_indexes( index_uids: Vec, updated_before_timestamp: i64, metastore: MetastoreServiceClient, storages: HashMap>, progress_opt: Option<&Progress>, + metrics: Option, ) -> SplitRemovalInfo { let mut split_removal_info = SplitRemovalInfo::default(); @@ -280,7 +322,7 @@ async fn delete_splits_marked_for_deletion_several_indexes( return split_removal_info; }; - let list_splits_query = list_splits_query + let mut list_splits_query = list_splits_query .with_split_state(SplitState::MarkedForDeletion) .with_update_timestamp_lte(updated_before_timestamp) .with_limit(DELETE_SPLITS_BATCH_SIZE) @@ -300,11 +342,13 @@ async fn delete_splits_marked_for_deletion_several_indexes( } }; - let num_splits_to_delete = splits_metadata_to_delete.len(); - - if num_splits_to_delete == 0 { + // set split after which to search for the next loop + let Some(last_split_metadata) = splits_metadata_to_delete.last() else { break; - } + }; + list_splits_query = list_splits_query.after_split(last_split_metadata); + + let num_splits_to_delete = splits_metadata_to_delete.len(); let splits_metadata_to_delete_per_index: HashMap> = splits_metadata_to_delete @@ -312,18 +356,20 @@ async fn delete_splits_marked_for_deletion_several_indexes( .map(|meta| (meta.index_uid.clone(), meta)) .into_group_map(); - let delete_split_res = delete_splits( + // ignore return we continue either way + let _: Result<(), ()> = delete_splits( splits_metadata_to_delete_per_index, &storages, metastore.clone(), progress_opt, + &metrics, &mut split_removal_info, ) .await; - if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE || delete_split_res.is_err() { - // stop the gc if this was the last batch or we encountered an error - // (otherwise we might try deleting the same splits in an endless loop) + if num_splits_to_delete < DELETE_SPLITS_BATCH_SIZE { + // stop the gc if this was the last batch + // we are guaranteed to make progress due to .after_split() break; } } @@ -345,7 +391,7 @@ pub async fn delete_splits_from_storage_and_metastore( metastore: MetastoreServiceClient, splits: Vec, progress_opt: Option<&Progress>, -) -> anyhow::Result, DeleteSplitsError> { +) -> Result, DeleteSplitsError> { let mut split_infos: HashMap = HashMap::with_capacity(splits.len()); for split in splits { @@ -511,6 +557,7 @@ mod tests { Duration::from_secs(30), false, None, + None, ) .await .unwrap(); @@ -538,6 +585,7 @@ mod tests { Duration::from_secs(30), false, None, + None, ) .await .unwrap(); @@ -615,6 +663,7 @@ mod tests { Duration::from_secs(30), false, None, + None, ) .await .unwrap(); @@ -642,6 +691,7 @@ mod tests { Duration::from_secs(0), false, None, + None, ) .await .unwrap(); @@ -680,6 +730,7 @@ mod tests { Duration::from_secs(30), false, None, + None, ) .await .unwrap(); diff --git a/quickwit/quickwit-index-management/src/index.rs b/quickwit/quickwit-index-management/src/index.rs index 5d4dc5ec149..0fe5c77cc2b 100644 --- a/quickwit/quickwit-index-management/src/index.rs +++ b/quickwit/quickwit-index-management/src/index.rs @@ -373,6 +373,7 @@ impl IndexService { Duration::ZERO, dry_run, None, + None, ) .await?; diff --git a/quickwit/quickwit-index-management/src/lib.rs b/quickwit/quickwit-index-management/src/lib.rs index 93b6ee6d1c3..65a7ef861ce 100644 --- a/quickwit/quickwit-index-management/src/lib.rs +++ b/quickwit/quickwit-index-management/src/lib.rs @@ -20,5 +20,5 @@ mod garbage_collection; mod index; -pub use garbage_collection::run_garbage_collect; +pub use garbage_collection::{run_garbage_collect, GcMetrics}; pub use index::{clear_cache_directory, validate_storage_uri, IndexService, IndexServiceError}; diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 4087f2ed230..7ab58bb873f 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -120,6 +120,7 @@ pub struct IndexingPipeline { handles_opt: Option, // Killswitch used for the actors in the pipeline. This is not the supervisor killswitch. kill_switch: KillSwitch, + // The set of shard is something that can change dynamically without necessarily // requiring a respawn of the pipeline. // We keep the list of shards here however, to reassign them after a respawn. @@ -158,12 +159,16 @@ impl Actor for IndexingPipeline { impl IndexingPipeline { pub fn new(params: IndexingPipelineParams) -> Self { + let params_fingerprint = params.params_fingerprint; IndexingPipeline { params, previous_generations_statistics: Default::default(), handles_opt: None, kill_switch: KillSwitch::default(), - statistics: IndexingStatistics::default(), + statistics: IndexingStatistics { + params_fingerprint, + ..Default::default() + }, shard_ids: Default::default(), } } @@ -264,6 +269,7 @@ impl IndexingPipeline { .set_num_spawn_attempts(self.statistics.num_spawn_attempts); let pipeline_metrics_opt = handles.indexer.last_observation().pipeline_metrics_opt; self.statistics.pipeline_metrics_opt = pipeline_metrics_opt; + self.statistics.params_fingerprint = self.params.params_fingerprint; self.statistics.shard_ids.clone_from(&self.shard_ids); ctx.observe(self); } @@ -587,6 +593,7 @@ pub struct IndexingPipelineParams { pub source_storage_resolver: StorageResolver, pub ingester_pool: IngesterPool, pub queues_dir_path: PathBuf, + pub params_fingerprint: u64, pub event_broker: EventBroker, } @@ -716,6 +723,7 @@ mod tests { cooperative_indexing_permits: None, merge_planner_mailbox, event_broker: EventBroker::default(), + params_fingerprint: 42u64, }; let pipeline = IndexingPipeline::new(pipeline_params); let (_pipeline_mailbox, pipeline_handle) = universe.spawn_builder().spawn(pipeline); @@ -828,6 +836,7 @@ mod tests { cooperative_indexing_permits: None, merge_planner_mailbox, event_broker: Default::default(), + params_fingerprint: 42u64, }; let pipeline = IndexingPipeline::new(pipeline_params); let (_pipeline_mailbox, pipeline_handler) = universe.spawn_builder().spawn(pipeline); @@ -926,6 +935,7 @@ mod tests { cooperative_indexing_permits: None, merge_planner_mailbox: merge_planner_mailbox.clone(), event_broker: Default::default(), + params_fingerprint: 42u64, }; let indexing_pipeline = IndexingPipeline::new(indexing_pipeline_params); let (_indexing_pipeline_mailbox, indexing_pipeline_handler) = @@ -1051,6 +1061,7 @@ mod tests { max_concurrent_split_uploads_merge: 5, cooperative_indexing_permits: None, merge_planner_mailbox, + params_fingerprint: 42u64, event_broker: Default::default(), }; let pipeline = IndexingPipeline::new(pipeline_params); diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 757d434adca..df71cc92ea4 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -312,10 +312,12 @@ impl IndexingService { let max_concurrent_split_uploads_merge = (self.max_concurrent_split_uploads - max_concurrent_split_uploads_index).max(1); + let params_fingerprint = index_config.indexing_params_fingerprint(); let pipeline_params = IndexingPipelineParams { pipeline_id: indexing_pipeline_id.clone(), metastore: self.metastore.clone(), storage, + // Indexing-related parameters doc_mapper, indexing_directory, @@ -323,6 +325,7 @@ impl IndexingService { split_store, max_concurrent_split_uploads_index, cooperative_indexing_permits: self.cooperative_indexing_permits.clone(), + // Merge-related parameters merge_policy, max_concurrent_split_uploads_merge, @@ -333,6 +336,7 @@ impl IndexingService { ingester_pool: self.ingester_pool.clone(), queues_dir_path: self.queue_dir_path.clone(), source_storage_resolver: self.storage_resolver.clone(), + params_fingerprint, event_broker: self.event_broker.clone(), }; @@ -755,20 +759,14 @@ impl IndexingService { .indexing_pipelines .values() .map(|pipeline_handle| { - let shard_ids: Vec = pipeline_handle - .handle - .last_observation() - .shard_ids - .iter() - .cloned() - .collect(); - + let assignment = pipeline_handle.handle.last_observation(); + let shard_ids: Vec = assignment.shard_ids.iter().cloned().collect(); IndexingTask { index_uid: Some(pipeline_handle.indexing_pipeline_id.index_uid.clone()), source_id: pipeline_handle.indexing_pipeline_id.source_id.clone(), pipeline_uid: Some(pipeline_handle.indexing_pipeline_id.pipeline_uid), shard_ids, - params_fingerprint: 0, + params_fingerprint: assignment.params_fingerprint, } }) .collect(); @@ -1192,6 +1190,8 @@ mod tests { #[tokio::test] async fn test_indexing_service_apply_plan() { + const PARAMS_FINGERPRINT: u64 = 3865067856550546352u64; + quickwit_common::setup_logging_for_tests(); let transport = ChannelTransport::default(); let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) @@ -1251,14 +1251,14 @@ mod tests { source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(0u128)), - params_fingerprint: 0, + params_fingerprint: PARAMS_FINGERPRINT, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(1u128)), - params_fingerprint: 0, + params_fingerprint: PARAMS_FINGERPRINT, }, ]; indexing_service @@ -1297,28 +1297,28 @@ mod tests { source_id: INGEST_API_SOURCE_ID.to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(3u128)), - params_fingerprint: 0, + params_fingerprint: PARAMS_FINGERPRINT, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(1u128)), - params_fingerprint: 0, + params_fingerprint: PARAMS_FINGERPRINT, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(2u128)), - params_fingerprint: 0, + params_fingerprint: PARAMS_FINGERPRINT, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: source_config_2.source_id.clone(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(4u128)), - params_fingerprint: 0, + params_fingerprint: PARAMS_FINGERPRINT, }, ]; indexing_service @@ -1359,21 +1359,21 @@ mod tests { source_id: INGEST_API_SOURCE_ID.to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(3u128)), - params_fingerprint: 0, + params_fingerprint: PARAMS_FINGERPRINT, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: "test-indexing-service--source-1".to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(1u128)), - params_fingerprint: 0, + params_fingerprint: PARAMS_FINGERPRINT, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: source_config_2.source_id.clone(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(4u128)), - params_fingerprint: 0, + params_fingerprint: PARAMS_FINGERPRINT, }, ]; indexing_service diff --git a/quickwit/quickwit-indexing/src/models/indexing_statistics.rs b/quickwit/quickwit-indexing/src/models/indexing_statistics.rs index 21f84e678da..68b44a9744b 100644 --- a/quickwit/quickwit-indexing/src/models/indexing_statistics.rs +++ b/quickwit/quickwit-indexing/src/models/indexing_statistics.rs @@ -56,6 +56,7 @@ pub struct IndexingStatistics { // List of shard ids. #[schema(value_type = Vec)] pub shard_ids: BTreeSet, + pub params_fingerprint: u64, } impl IndexingStatistics { diff --git a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs index fbfdeb2b1e1..bb82e55d856 100644 --- a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs +++ b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs @@ -20,13 +20,13 @@ use std::collections::{HashMap, HashSet}; use std::path::Path; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use async_trait::async_trait; use futures::{stream, StreamExt}; use quickwit_actors::{Actor, ActorContext, Handler}; use quickwit_common::shared_consts::split_deletion_grace_period; -use quickwit_index_management::run_garbage_collect; +use quickwit_index_management::{run_garbage_collect, GcMetrics}; use quickwit_metastore::ListIndexesMetadataResponseExt; use quickwit_proto::metastore::{ ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient, @@ -36,6 +36,8 @@ use quickwit_storage::{Storage, StorageResolver}; use serde::Serialize; use tracing::{debug, error, info}; +use crate::metrics::JANITOR_METRICS; + const RUN_INTERVAL: Duration = Duration::from_secs(10 * 60); // 10 minutes /// Staged files needs to be deleted if there was a failure. @@ -51,10 +53,10 @@ pub struct GarbageCollectorCounters { pub num_deleted_files: usize, /// The number of bytes deleted. pub num_deleted_bytes: usize, - /// The number of failed garbage collection run on an index. - pub num_failed_gc_run_on_index: usize, - /// The number of successful garbage collection run on an index. - pub num_successful_gc_run_on_index: usize, + /// The number of failed garbage collection run. + pub num_failed_gc_run: usize, + /// The number of successful garbage collection run. + pub num_successful_gc_run: usize, /// The number or failed storage resolution. pub num_failed_storage_resolution: usize, /// The number of splits that were unable to be removed. @@ -86,6 +88,8 @@ impl GarbageCollector { debug!("loading indexes from the metastore"); self.counters.num_passes += 1; + let start = Instant::now(); + let response = match self .metastore .list_indexes_metadata(ListIndexesMetadataRequest::all()) @@ -137,23 +141,43 @@ impl GarbageCollector { split_deletion_grace_period(), false, Some(ctx.progress()), + Some(GcMetrics { + deleted_splits: JANITOR_METRICS + .gc_deleted_splits + .with_label_values(["success"]) + .clone(), + deleted_bytes: JANITOR_METRICS.gc_deleted_bytes.clone(), + failed_splits: JANITOR_METRICS + .gc_deleted_splits + .with_label_values(["error"]) + .clone(), + }), ) .await; + let run_duration = start.elapsed().as_secs(); + JANITOR_METRICS.gc_seconds_total.inc_by(run_duration); + let deleted_file_entries = match gc_res { Ok(removal_info) => { - self.counters.num_successful_gc_run_on_index += 1; + self.counters.num_successful_gc_run += 1; + JANITOR_METRICS.gc_runs.with_label_values(["success"]).inc(); self.counters.num_failed_splits += removal_info.failed_splits.len(); removal_info.removed_split_entries } Err(error) => { - self.counters.num_failed_gc_run_on_index += 1; + self.counters.num_failed_gc_run += 1; + JANITOR_METRICS.gc_runs.with_label_values(["error"]).inc(); error!(error=?error, "failed to run garbage collection"); return; } }; if !deleted_file_entries.is_empty() { let num_deleted_splits = deleted_file_entries.len(); + let num_deleted_bytes = deleted_file_entries + .iter() + .map(|entry| entry.file_size_bytes.as_u64() as usize) + .sum::(); let deleted_files: HashSet<&Path> = deleted_file_entries .iter() .map(|deleted_entry| deleted_entry.file_name.as_path()) @@ -163,11 +187,8 @@ impl GarbageCollector { num_deleted_splits = num_deleted_splits, "Janitor deleted {:?} and {} other splits.", deleted_files, num_deleted_splits, ); - self.counters.num_deleted_files += deleted_file_entries.len(); - self.counters.num_deleted_bytes += deleted_file_entries - .iter() - .map(|entry| entry.file_size_bytes.as_u64() as usize) - .sum::(); + self.counters.num_deleted_files += num_deleted_splits; + self.counters.num_deleted_bytes += num_deleted_bytes; } } } @@ -348,6 +369,7 @@ mod tests { split_deletion_grace_period(), false, None, + None, ) .await; assert!(result.is_ok()); @@ -497,9 +519,9 @@ mod tests { assert_eq!(counters.num_passes, 1); assert_eq!(counters.num_deleted_files, 2); assert_eq!(counters.num_deleted_bytes, 40); - assert_eq!(counters.num_successful_gc_run_on_index, 1); + assert_eq!(counters.num_successful_gc_run, 1); assert_eq!(counters.num_failed_storage_resolution, 0); - assert_eq!(counters.num_failed_gc_run_on_index, 0); + assert_eq!(counters.num_failed_gc_run, 0); assert_eq!(counters.num_failed_splits, 0); // 30 secs later @@ -508,9 +530,9 @@ mod tests { assert_eq!(counters.num_passes, 1); assert_eq!(counters.num_deleted_files, 2); assert_eq!(counters.num_deleted_bytes, 40); - assert_eq!(counters.num_successful_gc_run_on_index, 1); + assert_eq!(counters.num_successful_gc_run, 1); assert_eq!(counters.num_failed_storage_resolution, 0); - assert_eq!(counters.num_failed_gc_run_on_index, 0); + assert_eq!(counters.num_failed_gc_run, 0); assert_eq!(counters.num_failed_splits, 0); // 60 secs later @@ -519,9 +541,9 @@ mod tests { assert_eq!(counters.num_passes, 2); assert_eq!(counters.num_deleted_files, 4); assert_eq!(counters.num_deleted_bytes, 80); - assert_eq!(counters.num_successful_gc_run_on_index, 2); + assert_eq!(counters.num_successful_gc_run, 2); assert_eq!(counters.num_failed_storage_resolution, 0); - assert_eq!(counters.num_failed_gc_run_on_index, 0); + assert_eq!(counters.num_failed_gc_run, 0); assert_eq!(counters.num_failed_splits, 0); universe.assert_quit().await; } @@ -585,9 +607,9 @@ mod tests { assert_eq!(counters.num_passes, 1); assert_eq!(counters.num_deleted_files, 0); assert_eq!(counters.num_deleted_bytes, 0); - assert_eq!(counters.num_successful_gc_run_on_index, 0); + assert_eq!(counters.num_successful_gc_run, 0); assert_eq!(counters.num_failed_storage_resolution, 1); - assert_eq!(counters.num_failed_gc_run_on_index, 0); + assert_eq!(counters.num_failed_gc_run, 0); assert_eq!(counters.num_failed_splits, 0); universe.assert_quit().await; } @@ -608,7 +630,7 @@ mod tests { }); mock_metastore .expect_list_splits() - .times(2) + .times(3) .returning(|list_splits_request| { let query = list_splits_request.deserialize_list_splits_query().unwrap(); assert_eq!(query.index_uids.len(), 2); @@ -616,24 +638,40 @@ mod tests { .contains(&query.index_uids[0].index_id.as_ref())); assert!(["test-index-1", "test-index-2"] .contains(&query.index_uids[1].index_id.as_ref())); - let splits = match query.split_states[0] { + let splits_ids_string: Vec = + (0..8000).map(|seq| format!("split-{seq:04}")).collect(); + let splits_ids: Vec<&str> = splits_ids_string + .iter() + .map(|string| string.as_str()) + .collect(); + let mut splits = match query.split_states[0] { SplitState::Staged => { let mut splits = make_splits("test-index-1", &["a"], SplitState::Staged); splits.append(&mut make_splits("test-index-2", &["a"], SplitState::Staged)); splits } SplitState::MarkedForDeletion => { + assert_eq!(query.limit, Some(10_000)); let mut splits = - make_splits("test-index-1", &["a", "b"], SplitState::MarkedForDeletion); + make_splits("test-index-1", &splits_ids, SplitState::MarkedForDeletion); splits.append(&mut make_splits( "test-index-2", - &["a", "b"], + &splits_ids, SplitState::MarkedForDeletion, )); splits } _ => panic!("only Staged and MarkedForDeletion expected."), }; + if let Some((index_uid, split_id)) = query.after_split { + splits.retain(|split| { + ( + &split.split_metadata.index_uid, + &split.split_metadata.split_id, + ) > (&index_uid, &split_id) + }); + } + splits.truncate(10_000); let splits = ListSplitsResponse::try_from_splits(splits).unwrap(); Ok(ServiceStream::from(vec![Ok(splits)])) }); @@ -648,7 +686,7 @@ mod tests { }); mock_metastore .expect_delete_splits() - .times(2) + .times(3) .returning(|delete_splits_request| { let index_uid: IndexUid = delete_splits_request.index_uid().clone(); let split_ids = HashSet::<&str>::from_iter( @@ -657,14 +695,30 @@ mod tests { .iter() .map(|split_id| split_id.as_str()), ); - let expected_split_ids = HashSet::<&str>::from_iter(["a", "b"]); - - assert_eq!(split_ids, expected_split_ids); + if index_uid.index_id == "test-index-1" { + assert_eq!(split_ids.len(), 8000); + for seq in 0..8000 { + let split_id = format!("split-{seq:04}"); + assert!(split_ids.contains(&*split_id)); + } + } else if split_ids.len() == 2000 { + for seq in 0..2000 { + let split_id = format!("split-{seq:04}"); + assert!(split_ids.contains(&*split_id)); + } + } else if split_ids.len() == 6000 { + for seq in 2000..8000 { + let split_id = format!("split-{seq:04}"); + assert!(split_ids.contains(&*split_id)); + } + } else { + panic!(); + } // This should not cause the whole run to fail and return an error, // instead this should simply get logged and return the list of splits // which have successfully been deleted. - if index_uid.index_id == "test-index-2" { + if index_uid.index_id == "test-index-2" && split_ids.len() == 2000 { Err(MetastoreError::Db { message: "fail to delete".to_string(), }) @@ -682,12 +736,12 @@ mod tests { let counters = handle.process_pending_and_observe().await.state; assert_eq!(counters.num_passes, 1); - assert_eq!(counters.num_deleted_files, 2); - assert_eq!(counters.num_deleted_bytes, 40); - assert_eq!(counters.num_successful_gc_run_on_index, 1); + assert_eq!(counters.num_deleted_files, 14000); + assert_eq!(counters.num_deleted_bytes, 20 * 14000); + assert_eq!(counters.num_successful_gc_run, 1); assert_eq!(counters.num_failed_storage_resolution, 0); - assert_eq!(counters.num_failed_gc_run_on_index, 0); - assert_eq!(counters.num_failed_splits, 2); + assert_eq!(counters.num_failed_gc_run, 0); + assert_eq!(counters.num_failed_splits, 2000); universe.assert_quit().await; } } diff --git a/quickwit/quickwit-janitor/src/metrics.rs b/quickwit/quickwit-janitor/src/metrics.rs index d3392af7b3f..0f3760e6d87 100644 --- a/quickwit/quickwit-janitor/src/metrics.rs +++ b/quickwit/quickwit-janitor/src/metrics.rs @@ -18,10 +18,18 @@ // along with this program. If not, see . use once_cell::sync::Lazy; -use quickwit_common::metrics::{new_gauge_vec, IntGaugeVec}; +use quickwit_common::metrics::{ + new_counter, new_counter_vec, new_gauge_vec, IntCounter, IntCounterVec, IntGaugeVec, +}; pub struct JanitorMetrics { pub ongoing_num_delete_operations_total: IntGaugeVec<1>, + pub gc_deleted_splits: IntCounterVec<1>, + pub gc_deleted_bytes: IntCounter, + pub gc_runs: IntCounterVec<1>, + pub gc_seconds_total: IntCounter, + // TODO having a current run duration which is 0|undefined out of run, and returns `now - + // start_time` during a run would be nice } impl Default for JanitorMetrics { @@ -34,6 +42,32 @@ impl Default for JanitorMetrics { &[], ["index"], ), + gc_deleted_splits: new_counter_vec( + "gc_deleted_splits_total", + "Total number of splits deleted by the garbage collector.", + "quickwit_janitor", + &[], + ["result"], + ), + gc_deleted_bytes: new_counter( + "gc_deleted_bytes_total", + "Total number of bytes deleted by the garbage collector.", + "quickwit_janitor", + &[], + ), + gc_runs: new_counter_vec( + "gc_runs_total", + "Total number of garbage collector execition.", + "quickwit_janitor", + &[], + ["result"], + ), + gc_seconds_total: new_counter( + "gc_seconds_total", + "Total time spent running the garbage collector", + "quickwit_janitor", + &[], + ), } } } diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index 265697b0e81..e4d6799cefa 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -424,48 +424,32 @@ impl FileBackedIndex { /// Lists splits. pub(crate) fn list_splits(&self, query: &ListSplitsQuery) -> MetastoreResult> { - let limit = query.limit.unwrap_or(usize::MAX); - let offset = query.offset.unwrap_or_default(); - - let splits: Vec = match query.sort_by { - SortBy::Staleness => self - .splits + let limit = query + .limit + .map(|limit| limit + query.offset.unwrap_or_default()) + .unwrap_or(usize::MAX); + // skip is done at a higher layer in case other indexes give spltis that would go before + // ours + + let results = if query.sort_by == SortBy::None { + // internally sorted_unstable_by collect everything to an intermediary vec. When not + // sorting at all, skip that. + self.splits .values() .filter(|split| split_query_predicate(split, query)) - .sorted_unstable_by(|left_split, right_split| { - left_split - .split_metadata - .delete_opstamp - .cmp(&right_split.split_metadata.delete_opstamp) - .then_with(|| { - left_split - .publish_timestamp - .cmp(&right_split.publish_timestamp) - }) - }) - .skip(offset) .take(limit) .cloned() - .collect(), - SortBy::IndexUid => self - .splits - .values() - .filter(|split| split_query_predicate(split, query)) - .sorted_unstable_by_key(|split| &split.split_metadata.index_uid) - .skip(offset) - .take(limit) - .cloned() - .collect(), - SortBy::None => self - .splits + .collect() + } else { + self.splits .values() .filter(|split| split_query_predicate(split, query)) - .skip(offset) + .sorted_unstable_by(|lhs, rhs| query.sort_by.compare(lhs, rhs)) .take(limit) .cloned() - .collect(), + .collect() }; - Ok(splits) + Ok(results) } /// Deletes a split. @@ -762,6 +746,17 @@ fn split_query_predicate(split: &&Split, query: &ListSplitsQuery) -> bool { } } + if let Some((index_uid, split_id)) = &query.after_split { + if *index_uid > split.split_metadata.index_uid { + return false; + } + if *index_uid == split.split_metadata.index_uid + && *split_id >= split.split_metadata.split_id + { + return false; + } + } + true } diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs index 729e10b6fe0..10bbd814949 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs @@ -380,7 +380,7 @@ impl FileBackedMetastore { /// No error is returned if any of the requested `index_uid` does not exist. async fn list_splits_inner(&self, request: ListSplitsRequest) -> MetastoreResult> { let list_splits_query = request.deserialize_list_splits_query()?; - let mut all_splits = Vec::new(); + let mut splits_per_index = Vec::with_capacity(list_splits_query.index_uids.len()); for index_uid in &list_splits_query.index_uids { let splits = match self .read(index_uid, |index| index.list_splits(&list_splits_query)) @@ -393,9 +393,19 @@ impl FileBackedMetastore { } Err(error) => return Err(error), }; - all_splits.extend(splits); + splits_per_index.push(splits); } - Ok(all_splits) + + let limit = list_splits_query.limit.unwrap_or(usize::MAX); + let offset = list_splits_query.offset.unwrap_or_default(); + + let merged_results = splits_per_index + .into_iter() + .kmerge_by(|lhs, rhs| list_splits_query.sort_by.compare(lhs, rhs).is_lt()) + .skip(offset) + .take(limit) + .collect(); + Ok(merged_results) } /// Helper used for testing to obtain the data associated with the given index. diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index 06211e1f63a..56dc8b84abf 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -24,6 +24,7 @@ pub mod postgres; pub mod control_plane_metastore; +use std::cmp::Ordering; use std::ops::{Bound, RangeInclusive}; use async_trait::async_trait; @@ -632,6 +633,9 @@ pub struct ListSplitsQuery { /// Sorts the splits by staleness, i.e. by delete opstamp and publish timestamp in ascending /// order. pub sort_by: SortBy, + + /// Only return splits whose (index_uid, split_id) are lexicographically after this split + pub after_split: Option<(IndexUid, SplitId)>, } #[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] @@ -641,6 +645,33 @@ pub enum SortBy { IndexUid, } +impl SortBy { + fn compare(&self, left_split: &Split, right_split: &Split) -> Ordering { + match self { + SortBy::None => Ordering::Equal, + SortBy::Staleness => left_split + .split_metadata + .delete_opstamp + .cmp(&right_split.split_metadata.delete_opstamp) + .then_with(|| { + left_split + .publish_timestamp + .cmp(&right_split.publish_timestamp) + }), + SortBy::IndexUid => left_split + .split_metadata + .index_uid + .cmp(&right_split.split_metadata.index_uid) + .then_with(|| { + left_split + .split_metadata + .split_id + .cmp(&right_split.split_metadata.split_id) + }), + } + } +} + #[allow(unused_attributes)] impl ListSplitsQuery { /// Creates a new [`ListSplitsQuery`] for the designated index. @@ -658,6 +689,7 @@ impl ListSplitsQuery { create_timestamp: Default::default(), mature: Bound::Unbounded, sort_by: SortBy::None, + after_split: None, } } @@ -680,6 +712,7 @@ impl ListSplitsQuery { create_timestamp: Default::default(), mature: Bound::Unbounded, sort_by: SortBy::None, + after_split: None, }) } @@ -850,11 +883,18 @@ impl ListSplitsQuery { self } - /// Sorts the splits by index_uid. + /// Sorts the splits by index_uid and split_id. pub fn sort_by_index_uid(mut self) -> Self { self.sort_by = SortBy::IndexUid; self } + + /// Only return splits whose (index_uid, split_id) are lexicographically after this split. + /// This is only useful if results are sorted by index_uid and split_id. + pub fn after_split(mut self, split_meta: &SplitMetadata) -> Self { + self.after_split = Some((split_meta.index_uid.clone(), split_meta.split_id.clone())); + self + } } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 008b611a36b..ce0d84468e5 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -2076,7 +2076,25 @@ mod tests { assert_eq!( sql.to_string(PostgresQueryBuilder), format!( - r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') ORDER BY "index_uid" ASC"# + r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') ORDER BY "index_uid" ASC, "split_id" ASC"# + ) + ); + + let mut select_statement = Query::select(); + let sql = select_statement.column(Asterisk).from(Splits::Table); + + let query = + ListSplitsQuery::for_index(index_uid.clone()).after_split(&crate::SplitMetadata { + index_uid: index_uid.clone(), + split_id: "my_split".to_string(), + ..Default::default() + }); + append_query_filters(sql, &query); + + assert_eq!( + sql.to_string(PostgresQueryBuilder), + format!( + r#"SELECT * FROM "splits" WHERE "index_uid" IN ('{index_uid}') AND ("index_uid", "split_id") > ('{index_uid}', 'my_split')"# ) ); } diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs index 6e850ae0fed..65ef9ce1df6 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs @@ -187,15 +187,24 @@ pub(super) fn append_query_filters(sql: &mut SelectStatement, query: &ListSplits Expr::expr(val) }); + if let Some((index_uid, split_id)) = &query.after_split { + sql.cond_where( + Expr::tuple([ + Expr::col(Splits::IndexUid).into(), + Expr::col(Splits::SplitId).into(), + ]) + .gt(Expr::tuple([Expr::value(index_uid), Expr::value(split_id)])), + ); + } + match query.sort_by { SortBy::Staleness => { - sql.order_by( - (Splits::DeleteOpstamp, Splits::PublishTimestamp), - Order::Asc, - ); + sql.order_by(Splits::DeleteOpstamp, Order::Asc) + .order_by(Splits::PublishTimestamp, Order::Asc); } SortBy::IndexUid => { - sql.order_by(Splits::IndexUid, Order::Asc); + sql.order_by(Splits::IndexUid, Order::Asc) + .order_by(Splits::SplitId, Order::Asc); } SortBy::None => (), } diff --git a/quickwit/quickwit-metastore/src/tests/list_splits.rs b/quickwit/quickwit-metastore/src/tests/list_splits.rs index cd1cc1712f3..de9c43b7e01 100644 --- a/quickwit/quickwit-metastore/src/tests/list_splits.rs +++ b/quickwit/quickwit-metastore/src/tests/list_splits.rs @@ -1155,3 +1155,356 @@ pub async fn test_metastore_list_stale_splits< cleanup_index(&mut metastore, index_uid).await; } } + +pub async fn test_metastore_list_sorted_splits< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + + let split_id = append_random_suffix("test-list-sorted-splits-"); + let index_id_1 = append_random_suffix("test-list-sorted-splits-1"); + let index_uid_1 = IndexUid::new_with_random_ulid(&index_id_1); + let index_uri_1 = format!("ram:///indexes/{index_id_1}"); + let index_config_1 = IndexConfig::for_test(&index_id_1, &index_uri_1); + + let index_id_2 = append_random_suffix("test-list-sorted-splits-2"); + let index_uid_2 = IndexUid::new_with_random_ulid(&index_id_2); + let index_uri_2 = format!("ram:///indexes/{index_id_2}"); + let index_config_2 = IndexConfig::for_test(&index_id_2, &index_uri_2); + + let split_id_1 = format!("{split_id}--split-1"); + let split_metadata_1 = SplitMetadata { + split_id: split_id_1.clone(), + index_uid: index_uid_1.clone(), + delete_opstamp: 5, + ..Default::default() + }; + let split_id_2 = format!("{split_id}--split-2"); + let split_metadata_2 = SplitMetadata { + split_id: split_id_2.clone(), + index_uid: index_uid_2.clone(), + delete_opstamp: 3, + ..Default::default() + }; + let split_id_3 = format!("{split_id}--split-3"); + let split_metadata_3 = SplitMetadata { + split_id: split_id_3.clone(), + index_uid: index_uid_1.clone(), + delete_opstamp: 1, + ..Default::default() + }; + let split_id_4 = format!("{split_id}--split-4"); + let split_metadata_4 = SplitMetadata { + split_id: split_id_4.clone(), + index_uid: index_uid_2.clone(), + delete_opstamp: 0, + ..Default::default() + }; + let split_id_5 = format!("{split_id}--split-5"); + let split_metadata_5 = SplitMetadata { + split_id: split_id_5.clone(), + index_uid: index_uid_1.clone(), + delete_opstamp: 2, + ..Default::default() + }; + let split_id_6 = format!("{split_id}--split-6"); + let split_metadata_6 = SplitMetadata { + split_id: split_id_6.clone(), + index_uid: index_uid_2.clone(), + delete_opstamp: 4, + ..Default::default() + }; + + let create_index_request = CreateIndexRequest::try_from_index_config(&index_config_1).unwrap(); + let index_uid_1: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); + let create_index_request = CreateIndexRequest::try_from_index_config(&index_config_2).unwrap(); + let index_uid_2: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); + + { + let stage_splits_request = StageSplitsRequest::try_from_splits_metadata( + index_uid_1.clone(), + vec![split_metadata_1, split_metadata_3, split_metadata_5], + ) + .unwrap(); + metastore.stage_splits(stage_splits_request).await.unwrap(); + + let publish_splits_request = PublishSplitsRequest { + index_uid: Some(index_uid_1.clone()), + staged_split_ids: vec![split_id_1.clone()], + ..Default::default() + }; + metastore + .publish_splits(publish_splits_request) + .await + .unwrap(); + + let mark_splits_for_deletion = + MarkSplitsForDeletionRequest::new(index_uid_1.clone(), vec![split_id_3.clone()]); + metastore + .mark_splits_for_deletion(mark_splits_for_deletion) + .await + .unwrap(); + + let stage_splits_request = StageSplitsRequest::try_from_splits_metadata( + index_uid_2.clone(), + vec![split_metadata_2, split_metadata_4, split_metadata_6], + ) + .unwrap(); + metastore.stage_splits(stage_splits_request).await.unwrap(); + + let publish_splits_request = PublishSplitsRequest { + index_uid: Some(index_uid_2.clone()), + staged_split_ids: vec![split_id_2.clone()], + ..Default::default() + }; + metastore + .publish_splits(publish_splits_request) + .await + .unwrap(); + + let mark_splits_for_deletion = + MarkSplitsForDeletionRequest::new(index_uid_2.clone(), vec![split_id_4.clone()]); + metastore + .mark_splits_for_deletion(mark_splits_for_deletion) + .await + .unwrap(); + } + + let query = + ListSplitsQuery::try_from_index_uids(vec![index_uid_1.clone(), index_uid_2.clone()]) + .unwrap() + .sort_by_staleness(); + let splits = metastore + .list_splits(ListSplitsRequest::try_from_list_splits_query(&query).unwrap()) + .await + .unwrap() + .collect_splits() + .await + .unwrap(); + // we don't use collect_split_ids because it sorts splits internally + let split_ids = splits + .iter() + .map(|split| split.split_id()) + .collect::>(); + assert_eq!( + split_ids, + &[ + &split_id_4, + &split_id_3, + &split_id_5, + &split_id_2, + &split_id_6, + &split_id_1, + ] + ); + + let query = + ListSplitsQuery::try_from_index_uids(vec![index_uid_1.clone(), index_uid_2.clone()]) + .unwrap() + .sort_by_index_uid(); + let splits = metastore + .list_splits(ListSplitsRequest::try_from_list_splits_query(&query).unwrap()) + .await + .unwrap() + .collect_splits() + .await + .unwrap(); + // we don't use collect_split_ids because it sorts splits internally + let split_ids = splits + .iter() + .map(|split| split.split_id()) + .collect::>(); + assert_eq!( + split_ids, + &[ + &split_id_1, + &split_id_3, + &split_id_5, + &split_id_2, + &split_id_4, + &split_id_6, + ] + ); + + cleanup_index(&mut metastore, index_uid_1.clone()).await; + cleanup_index(&mut metastore, index_uid_2.clone()).await; +} + +pub async fn test_metastore_list_after_split< + MetastoreToTest: MetastoreServiceExt + DefaultForTest, +>() { + let mut metastore = MetastoreToTest::default_for_test().await; + + let split_id = append_random_suffix("test-list-sorted-splits-"); + let index_id_1 = append_random_suffix("test-list-sorted-splits-1"); + let index_uri_1 = format!("ram:///indexes/{index_id_1}"); + let index_config_1 = IndexConfig::for_test(&index_id_1, &index_uri_1); + + let index_id_2 = append_random_suffix("test-list-sorted-splits-2"); + let index_uri_2 = format!("ram:///indexes/{index_id_2}"); + let index_config_2 = IndexConfig::for_test(&index_id_2, &index_uri_2); + + let create_index_request = CreateIndexRequest::try_from_index_config(&index_config_1).unwrap(); + let index_uid_1: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); + let create_index_request = CreateIndexRequest::try_from_index_config(&index_config_2).unwrap(); + let index_uid_2: IndexUid = metastore + .create_index(create_index_request) + .await + .unwrap() + .index_uid() + .clone(); + + let split_id_1 = format!("{split_id}--split-1"); + let split_metadata_1 = SplitMetadata { + split_id: split_id_1.clone(), + index_uid: index_uid_1.clone(), + ..Default::default() + }; + let split_id_2 = format!("{split_id}--split-2"); + let split_metadata_2 = SplitMetadata { + split_id: split_id_2.clone(), + index_uid: index_uid_2.clone(), + ..Default::default() + }; + let split_id_3 = format!("{split_id}--split-3"); + let split_metadata_3 = SplitMetadata { + split_id: split_id_3.clone(), + index_uid: index_uid_1.clone(), + ..Default::default() + }; + let split_id_4 = format!("{split_id}--split-4"); + let split_metadata_4 = SplitMetadata { + split_id: split_id_4.clone(), + index_uid: index_uid_2.clone(), + ..Default::default() + }; + let split_id_5 = format!("{split_id}--split-5"); + let split_metadata_5 = SplitMetadata { + split_id: split_id_5.clone(), + index_uid: index_uid_1.clone(), + ..Default::default() + }; + let split_id_6 = format!("{split_id}--split-6"); + let split_metadata_6 = SplitMetadata { + split_id: split_id_6.clone(), + index_uid: index_uid_2.clone(), + ..Default::default() + }; + + { + let stage_splits_request = StageSplitsRequest::try_from_splits_metadata( + index_uid_1.clone(), + vec![ + split_metadata_1.clone(), + split_metadata_3.clone(), + split_metadata_5.clone(), + ], + ) + .unwrap(); + metastore.stage_splits(stage_splits_request).await.unwrap(); + + let publish_splits_request = PublishSplitsRequest { + index_uid: Some(index_uid_1.clone()), + staged_split_ids: vec![split_id_1.clone()], + ..Default::default() + }; + metastore + .publish_splits(publish_splits_request) + .await + .unwrap(); + + let mark_splits_for_deletion = + MarkSplitsForDeletionRequest::new(index_uid_1.clone(), vec![split_id_3.clone()]); + metastore + .mark_splits_for_deletion(mark_splits_for_deletion) + .await + .unwrap(); + + let stage_splits_request = StageSplitsRequest::try_from_splits_metadata( + index_uid_2.clone(), + vec![ + split_metadata_2.clone(), + split_metadata_4.clone(), + split_metadata_6.clone(), + ], + ) + .unwrap(); + metastore.stage_splits(stage_splits_request).await.unwrap(); + + let publish_splits_request = PublishSplitsRequest { + index_uid: Some(index_uid_2.clone()), + staged_split_ids: vec![split_id_2.clone()], + ..Default::default() + }; + metastore + .publish_splits(publish_splits_request) + .await + .unwrap(); + + let mark_splits_for_deletion = + MarkSplitsForDeletionRequest::new(index_uid_2.clone(), vec![split_id_4.clone()]); + metastore + .mark_splits_for_deletion(mark_splits_for_deletion) + .await + .unwrap(); + } + + let expected_all = [ + &split_metadata_1, + &split_metadata_3, + &split_metadata_5, + &split_metadata_2, + &split_metadata_4, + &split_metadata_6, + ]; + + for i in 0..expected_all.len() { + let after = expected_all[i]; + let expected_res = expected_all[(i + 1)..] + .iter() + .map(|split| (&split.index_uid, &split.split_id)) + .collect::>(); + + let query = + ListSplitsQuery::try_from_index_uids(vec![index_uid_1.clone(), index_uid_2.clone()]) + .unwrap() + .sort_by_index_uid() + .after_split(after); + let splits = metastore + .list_splits(ListSplitsRequest::try_from_list_splits_query(&query).unwrap()) + .await + .unwrap() + .collect_splits() + .await + .unwrap(); + // we don't use collect_split_ids because it sorts splits internally + let split_ids = splits + .iter() + .map(|split| { + ( + &split.split_metadata.index_uid, + &split.split_metadata.split_id, + ) + }) + .collect::>(); + assert_eq!(split_ids, expected_res,); + } + + cleanup_index(&mut metastore, index_uid_1.clone()).await; + cleanup_index(&mut metastore, index_uid_2.clone()).await; +} diff --git a/quickwit/quickwit-metastore/src/tests/mod.rs b/quickwit/quickwit-metastore/src/tests/mod.rs index 3e0add028df..7699c3eb11f 100644 --- a/quickwit/quickwit-metastore/src/tests/mod.rs +++ b/quickwit/quickwit-metastore/src/tests/mod.rs @@ -431,6 +431,20 @@ macro_rules! metastore_test_suite { $crate::tests::list_splits::test_metastore_list_stale_splits::<$metastore_type>().await; } + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_list_sorted_splits() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::list_splits::test_metastore_list_sorted_splits::<$metastore_type>().await; + } + + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_list_after_split() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::list_splits::test_metastore_list_after_split::<$metastore_type>().await; + } + #[tokio::test] #[serial_test::file_serial] async fn test_metastore_update_splits_delete_opstamp() { diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index 50ad8aec3c1..60671239ecc 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -126,14 +126,14 @@ message ListFieldsRequest { // Optional limit query to a list of fields // Wildcard expressions are supported. repeated string fields = 2; - + // Time filter, expressed in seconds since epoch. // That filter is to be interpreted as the semi-open interval: // [start_timestamp, end_timestamp). optional int64 start_timestamp = 3; optional int64 end_timestamp = 4; - // Control if the the request will fail if split_ids contains a split that does not exist. + // Control if the the request will fail if split_ids contains a split that does not exist. // optional bool fail_on_missing_index = 6; } @@ -149,7 +149,7 @@ message LeafListFieldsRequest { // Optional limit query to a list of fields // Wildcard expressions are supported. repeated string fields = 4; - + } message ListFieldsResponse { @@ -299,6 +299,17 @@ message SearchResponse { // Scroll Id (only set if scroll_secs was set in the request) optional string scroll_id = 6; + + // Returns the list of splits for which search failed. + // For the moment, the cause is unknown. + // + // It is up to the caller to decide whether to interpret + // this as an overall failure or to present the partial results + // to the end user. + repeated SplitSearchError failed_splits = 7; + + // Total number of successful splits searched. + uint64 num_successful_splits = 8; } message SearchPlanResponse { @@ -340,7 +351,7 @@ message LeafSearchRequest { message LeafRequestRef { // The ordinal of the doc_mapper in `LeafSearchRequest.doc_mappers` uint32 doc_mapper_ord = 1; - + // The ordinal of the index uri in LeafSearchRequest.index_uris uint32 index_uri_ord = 2; @@ -453,10 +464,16 @@ message LeafSearchResponse { // The list of splits that failed. LeafSearchResponse can be an aggregation of results, so there may be multiple. repeated SplitSearchError failed_splits = 3; - // Total number of splits the leaf(s) were in charge of. - // num_attempted_splits = num_successful_splits + num_failed_splits. + // Total number of attempt to search into splits. + // We do have: + // `num_splits_requested == num_successful_splits + num_failed_splits.len()` + // But we do not necessarily have: + // `num_splits_requested = num_attempted_splits because of retries.` uint64 num_attempted_splits = 4; + // Total number of successful splits searched. + uint64 num_successful_splits = 7; + // Deprecated json serialized intermediate aggregation_result. reserved 5; @@ -550,8 +567,7 @@ message LeafListTermsResponse { // The list of splits that failed. LeafSearchResponse can be an aggregation of results, so there may be multiple. repeated SplitSearchError failed_splits = 3; - // Total number of splits the leaf(s) were in charge of. - // num_attempted_splits = num_successful_splits + num_failed_splits. + // Total number of single split search attempted. uint64 num_attempted_splits = 4; } diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index ed0219d0a7f..189019162f8 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -230,6 +230,17 @@ pub struct SearchResponse { /// Scroll Id (only set if scroll_secs was set in the request) #[prost(string, optional, tag = "6")] pub scroll_id: ::core::option::Option<::prost::alloc::string::String>, + /// Returns the list of splits for which search failed. + /// For the moment, the cause is unknown. + /// + /// It is up to the caller to decide whether to interpret + /// this as an overall failure or to present the partial results + /// to the end user. + #[prost(message, repeated, tag = "7")] + pub failed_splits: ::prost::alloc::vec::Vec, + /// Total number of successful splits searched. + #[prost(uint64, tag = "8")] + pub num_successful_splits: u64, } #[derive(Serialize, Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] @@ -431,10 +442,16 @@ pub struct LeafSearchResponse { /// The list of splits that failed. LeafSearchResponse can be an aggregation of results, so there may be multiple. #[prost(message, repeated, tag = "3")] pub failed_splits: ::prost::alloc::vec::Vec, - /// Total number of splits the leaf(s) were in charge of. - /// num_attempted_splits = num_successful_splits + num_failed_splits. + /// Total number of attempt to search into splits. + /// We do have: + /// `num_splits_requested == num_successful_splits + num_failed_splits.len()` + /// But we do not necessarily have: + /// `num_splits_requested = num_attempted_splits because of retries.` #[prost(uint64, tag = "4")] pub num_attempted_splits: u64, + /// Total number of successful splits searched. + #[prost(uint64, tag = "7")] + pub num_successful_splits: u64, /// postcard serialized intermediate aggregation_result. #[prost(bytes = "vec", optional, tag = "6")] pub intermediate_aggregation_result: ::core::option::Option< @@ -551,8 +568,7 @@ pub struct LeafListTermsResponse { /// The list of splits that failed. LeafSearchResponse can be an aggregation of results, so there may be multiple. #[prost(message, repeated, tag = "3")] pub failed_splits: ::prost::alloc::vec::Vec, - /// Total number of splits the leaf(s) were in charge of. - /// num_attempted_splits = num_successful_splits + num_failed_splits. + /// Total number of single split search attempted. #[prost(uint64, tag = "4")] pub num_attempted_splits: u64, } diff --git a/quickwit/quickwit-query/Cargo.toml b/quickwit/quickwit-query/Cargo.toml index e1229da8e8b..bee650198c8 100644 --- a/quickwit/quickwit-query/Cargo.toml +++ b/quickwit/quickwit-query/Cargo.toml @@ -22,6 +22,7 @@ serde = { workspace = true } serde_json = { workspace = true } serde_with = { workspace = true } tantivy = { workspace = true } +time = { workspace = true } thiserror = { workspace = true } whichlang = { workspace = true, optional = true } diff --git a/quickwit/quickwit-query/src/elastic_query_dsl/range_query.rs b/quickwit/quickwit-query/src/elastic_query_dsl/range_query.rs index 9e7d07e23da..337ec019e9d 100644 --- a/quickwit/quickwit-query/src/elastic_query_dsl/range_query.rs +++ b/quickwit/quickwit-query/src/elastic_query_dsl/range_query.rs @@ -18,10 +18,10 @@ // along with this program. If not, see . use std::ops::Bound; -use std::str::FromStr; use quickwit_datetime::StrptimeParser; use serde::Deserialize; +use time::format_description::well_known::Rfc3339; use crate::elastic_query_dsl::one_field_map::OneFieldMap; use crate::elastic_query_dsl::ConvertibleToQueryAst; @@ -59,10 +59,9 @@ impl ConvertibleToQueryAst for RangeQuery { boost, format, } = self.value; - let (gt, gte, lt, lte) = if let Some(JsonLiteral::String(fmt)) = format { - let parser = StrptimeParser::from_str(&fmt).map_err(|reason| { - anyhow::anyhow!("failed to create parser from : {}; reason: {}", fmt, reason) - })?; + let (gt, gte, lt, lte) = if let Some(JsonLiteral::String(java_date_format)) = format { + let parser = StrptimeParser::from_java_datetime_format(&java_date_format) + .map_err(|err| anyhow::anyhow!("failed to parse range query date format. {err}"))?; ( gt.map(|v| parse_and_convert(v, &parser)).transpose()?, gte.map(|v| parse_and_convert(v, &parser)).transpose()?, @@ -102,7 +101,8 @@ fn parse_and_convert(literal: JsonLiteral, parser: &StrptimeParser) -> anyhow::R let parsed_date_time = parser .parse_date_time(&date_time_str) .map_err(|reason| anyhow::anyhow!("Failed to parse date time: {}", reason))?; - Ok(JsonLiteral::String(parsed_date_time.to_string())) + let parsed_date_time_rfc3339 = parsed_date_time.format(&Rfc3339)?; + Ok(JsonLiteral::String(parsed_date_time_rfc3339)) } else { Ok(literal) } @@ -110,39 +110,62 @@ fn parse_and_convert(literal: JsonLiteral, parser: &StrptimeParser) -> anyhow::R #[cfg(test)] mod tests { - use std::str::FromStr; + use std::ops::Bound; - use quickwit_datetime::StrptimeParser; - - use crate::elastic_query_dsl::range_query::parse_and_convert; + use super::{RangeQuery as ElasticRangeQuery, RangeQueryParams as ElasticRangeQueryParams}; + use crate::elastic_query_dsl::ConvertibleToQueryAst; + use crate::query_ast::{QueryAst, RangeQuery}; use crate::JsonLiteral; #[test] - fn test_parse_and_convert() -> anyhow::Result<()> { - let parser = StrptimeParser::from_str("%Y-%m-%d %H:%M:%S").unwrap(); - - // valid datetime - let input = JsonLiteral::String("2022-12-30 05:45:00".to_string()); - let result = parse_and_convert(input, &parser)?; - assert_eq!( - result, - JsonLiteral::String("2022-12-30 5:45:00.0 +00:00:00".to_string()) - ); - - // invalid datetime - let input = JsonLiteral::String("invalid datetime".to_string()); - let result = parse_and_convert(input, &parser); - assert!(result.is_err()); - assert!(result - .unwrap_err() - .to_string() - .contains("Failed to parse date time")); - - // non_string(number) input - let input = JsonLiteral::Number(27.into()); - let result = parse_and_convert(input.clone(), &parser)?; - assert_eq!(result, input); + fn test_date_range_query_with_format() { + let range_query_params = ElasticRangeQueryParams { + gt: Some(JsonLiteral::String("2021-01-03T13:32:43".to_string())), + gte: None, + lt: None, + lte: None, + boost: None, + format: JsonLiteral::String("yyyy-MM-dd['T'HH:mm:ss]".to_string()).into(), + }; + let range_query: ElasticRangeQuery = ElasticRangeQuery { + field: "date".to_string(), + value: range_query_params, + }; + let range_query_ast = range_query.convert_to_query_ast().unwrap(); + assert!(matches!( + range_query_ast, + QueryAst::Range(RangeQuery { + field, + lower_bound: Bound::Excluded(lower_bound), + upper_bound: Bound::Unbounded, + }) + if field == "date" && lower_bound == JsonLiteral::String("2021-01-03T13:32:43Z".to_string()) + )); + } - Ok(()) + #[test] + fn test_date_range_query_with_strict_date_optional_time_format() { + let range_query_params = ElasticRangeQueryParams { + gt: None, + gte: None, + lt: None, + lte: Some(JsonLiteral::String("2024-09-28T10:22:55.797Z".to_string())), + boost: None, + format: JsonLiteral::String("strict_date_optional_time".to_string()).into(), + }; + let range_query: ElasticRangeQuery = ElasticRangeQuery { + field: "timestamp".to_string(), + value: range_query_params, + }; + let range_query_ast = range_query.convert_to_query_ast().unwrap(); + assert!(matches!( + range_query_ast, + QueryAst::Range(RangeQuery { + field, + lower_bound: Bound::Unbounded, + upper_bound: Bound::Included(upper_bound), + }) + if field == "timestamp" && upper_bound == JsonLiteral::String("2024-09-28T10:22:55.797Z".to_string()) + )); } } diff --git a/quickwit/quickwit-search/src/cluster_client.rs b/quickwit/quickwit-search/src/cluster_client.rs index 25d53ca7554..d32ad92327c 100644 --- a/quickwit/quickwit-search/src/cluster_client.rs +++ b/quickwit/quickwit-search/src/cluster_client.rs @@ -94,21 +94,36 @@ impl ClusterClient { ) -> crate::Result { let mut response_res = client.leaf_search(request.clone()).await; let retry_policy = LeafSearchRetryPolicy {}; - if let Some(retry_request) = retry_policy.retry_request(request, &response_res) { - assert!(!retry_request.leaf_requests.is_empty()); - client = retry_client( - &self.search_job_placer, - client.grpc_addr(), - &retry_request.leaf_requests[0].split_offsets[0].split_id, - ) - .await?; - debug!( - "Leaf search response error: `{:?}`. Retry once to execute {:?} with {:?}", - response_res, retry_request, client + // We retry only once. + let Some(retry_request) = retry_policy.retry_request(request, &response_res) else { + return response_res; + }; + let Some(first_split) = retry_request + .leaf_requests + .iter() + .flat_map(|leaf_req| leaf_req.split_offsets.iter()) + .next() + else { + warn!( + "the retry request did not contain any split to retry. this should never happen, \ + please report" ); - let retry_result = client.leaf_search(retry_request).await; - response_res = merge_leaf_search_results(response_res, retry_result); - } + return response_res; + }; + // There could be more than one split in the retry request. We pick a single client + // arbitrarily only considering the affinity of the first split. + client = retry_client( + &self.search_job_placer, + client.grpc_addr(), + &first_split.split_id, + ) + .await?; + debug!( + "Leaf search response error: `{:?}`. Retry once to execute {:?} with {:?}", + response_res, retry_request, client + ); + let retry_result = client.leaf_search(retry_request).await; + response_res = merge_original_with_retry_leaf_search_results(response_res, retry_result); response_res } @@ -274,16 +289,24 @@ fn merge_intermediate_aggregation(left: &[u8], right: &[u8]) -> crate::Result crate::Result { - left_response + original_response .partial_hits - .extend(right_response.partial_hits); + .extend(retry_response.partial_hits); let intermediate_aggregation_result: Option> = match ( - left_response.intermediate_aggregation_result, - right_response.intermediate_aggregation_result, + original_response.intermediate_aggregation_result, + retry_response.intermediate_aggregation_result, ) { (Some(left_agg_bytes), Some(right_agg_bytes)) => { let intermediate_aggregation_bytes: Vec = @@ -296,22 +319,24 @@ fn merge_leaf_search_response( }; Ok(LeafSearchResponse { intermediate_aggregation_result, - num_hits: left_response.num_hits + right_response.num_hits, - num_attempted_splits: left_response.num_attempted_splits - + right_response.num_attempted_splits, - failed_splits: right_response.failed_splits, - partial_hits: left_response.partial_hits, + num_hits: original_response.num_hits + retry_response.num_hits, + num_attempted_splits: original_response.num_attempted_splits + + retry_response.num_attempted_splits, + failed_splits: retry_response.failed_splits, + partial_hits: original_response.partial_hits, + num_successful_splits: original_response.num_successful_splits + + retry_response.num_successful_splits, }) } // Merge initial leaf search results with results obtained from a retry. -fn merge_leaf_search_results( +fn merge_original_with_retry_leaf_search_results( left_search_response_result: crate::Result, right_search_response_result: crate::Result, ) -> crate::Result { match (left_search_response_result, right_search_response_result) { (Ok(left_response), Ok(right_response)) => { - merge_leaf_search_response(left_response, right_response) + merge_original_with_retry_leaf_search_response(left_response, right_response) } (Ok(single_valid_response), Err(_)) => Ok(single_valid_response), (Err(_), Ok(single_valid_response)) => Ok(single_valid_response), @@ -626,8 +651,11 @@ mod tests { num_attempted_splits: 1, ..Default::default() }; - let merged_leaf_search_response = - merge_leaf_search_results(Ok(leaf_response), Ok(leaf_response_retry)).unwrap(); + let merged_leaf_search_response = merge_original_with_retry_leaf_search_results( + Ok(leaf_response), + Ok(leaf_response_retry), + ) + .unwrap(); assert_eq!(merged_leaf_search_response.num_attempted_splits, 2); assert_eq!(merged_leaf_search_response.num_hits, 2); assert_eq!(merged_leaf_search_response.partial_hits.len(), 2); @@ -649,7 +677,7 @@ mod tests { num_attempted_splits: 1, ..Default::default() }; - let merged_result = merge_leaf_search_results( + let merged_result = merge_original_with_retry_leaf_search_results( Err(SearchError::Internal("error".to_string())), Ok(leaf_response), ) @@ -663,7 +691,7 @@ mod tests { #[test] fn test_merge_leaf_search_retry_error_on_error() -> anyhow::Result<()> { - let merge_error = merge_leaf_search_results( + let merge_error = merge_original_with_retry_leaf_search_results( Err(SearchError::Internal("error".to_string())), Err(SearchError::Internal("retry error".to_string())), ) diff --git a/quickwit/quickwit-search/src/collector.rs b/quickwit/quickwit-search/src/collector.rs index afeb2e0a4b9..221bcdb1392 100644 --- a/quickwit/quickwit-search/src/collector.rs +++ b/quickwit/quickwit-search/src/collector.rs @@ -592,6 +592,7 @@ impl SegmentCollector for QuickwitSegmentCollector { partial_hits, failed_splits: Vec::new(), num_attempted_splits: 1, + num_successful_splits: 1, }) } } @@ -608,7 +609,8 @@ pub enum QuickwitAggregations { } impl QuickwitAggregations { - fn fast_field_names(&self) -> HashSet { + /// Returns the list of fast fields that should be loaded for the aggregation. + pub fn fast_field_names(&self) -> HashSet { match self { QuickwitAggregations::FindTraceIdsAggregation(collector) => { collector.fast_field_names() @@ -927,6 +929,10 @@ fn merge_leaf_responses( .iter() .map(|leaf_response| leaf_response.num_attempted_splits) .sum(); + let num_successful_splits = leaf_responses + .iter() + .map(|leaf_response| leaf_response.num_successful_splits) + .sum::(); let num_hits: u64 = leaf_responses .iter() .map(|leaf_response| leaf_response.num_hits) @@ -952,6 +958,7 @@ fn merge_leaf_responses( partial_hits: top_k_partial_hits, failed_splits, num_attempted_splits, + num_successful_splits, }) } @@ -1173,6 +1180,7 @@ pub(crate) struct IncrementalCollector { num_hits: u64, failed_splits: Vec, num_attempted_splits: u64, + num_successful_splits: u64, start_offset: usize, } @@ -1193,6 +1201,7 @@ impl IncrementalCollector { num_hits: 0, failed_splits: Vec::new(), num_attempted_splits: 0, + num_successful_splits: 0, } } @@ -1204,12 +1213,14 @@ impl IncrementalCollector { failed_splits, num_attempted_splits, intermediate_aggregation_result, + num_successful_splits, } = leaf_response; self.num_hits += num_hits; self.top_k_hits.add_entries(partial_hits.into_iter()); self.failed_splits.extend(failed_splits); self.num_attempted_splits += num_attempted_splits; + self.num_successful_splits += num_successful_splits; if let Some(intermediate_aggregation_result) = intermediate_aggregation_result { self.incremental_aggregation .add(intermediate_aggregation_result)?; @@ -1252,6 +1263,7 @@ impl IncrementalCollector { partial_hits, failed_splits: self.failed_splits, num_attempted_splits: self.num_attempted_splits, + num_successful_splits: self.num_successful_splits, intermediate_aggregation_result, }) } @@ -1814,6 +1826,7 @@ mod tests { }], failed_splits: Vec::new(), num_attempted_splits: 3, + num_successful_splits: 3, intermediate_aggregation_result: None, }], ); @@ -1831,6 +1844,7 @@ mod tests { }], failed_splits: Vec::new(), num_attempted_splits: 3, + num_successful_splits: 3, intermediate_aggregation_result: None } ); @@ -1868,6 +1882,7 @@ mod tests { ], failed_splits: Vec::new(), num_attempted_splits: 3, + num_successful_splits: 3, intermediate_aggregation_result: None, }, LeafSearchResponse { @@ -1885,6 +1900,7 @@ mod tests { retryable_error: true, }], num_attempted_splits: 2, + num_successful_splits: 1, intermediate_aggregation_result: None, }, ], @@ -1916,6 +1932,7 @@ mod tests { retryable_error: true, }], num_attempted_splits: 5, + num_successful_splits: 4, intermediate_aggregation_result: None } ); @@ -1954,6 +1971,7 @@ mod tests { ], failed_splits: Vec::new(), num_attempted_splits: 3, + num_successful_splits: 3, intermediate_aggregation_result: None, }, LeafSearchResponse { @@ -1971,6 +1989,7 @@ mod tests { retryable_error: true, }], num_attempted_splits: 2, + num_successful_splits: 1, intermediate_aggregation_result: None, }, ], @@ -2002,6 +2021,7 @@ mod tests { retryable_error: true, }], num_attempted_splits: 5, + num_successful_splits: 4, intermediate_aggregation_result: None } ); diff --git a/quickwit/quickwit-search/src/error.rs b/quickwit/quickwit-search/src/error.rs index 17f44f49e1e..76671545e71 100644 --- a/quickwit/quickwit-search/src/error.rs +++ b/quickwit/quickwit-search/src/error.rs @@ -17,10 +17,12 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use itertools::Itertools; use quickwit_common::rate_limited_error; use quickwit_doc_mapper::QueryParserError; use quickwit_proto::error::grpc_error_to_grpc_status; use quickwit_proto::metastore::{EntityKind, MetastoreError}; +use quickwit_proto::search::SplitSearchError; use quickwit_proto::{tonic, GrpcServiceError, ServiceError, ServiceErrorCode}; use quickwit_storage::StorageResolverError; use serde::{Deserialize, Serialize}; @@ -53,6 +55,23 @@ pub enum SearchError { Unavailable(String), } +impl SearchError { + /// Creates an internal `SearchError` from a list of split search errors. + pub fn from_split_errors(failed_splits: &[SplitSearchError]) -> Option { + let first_failing_split = failed_splits.first()?; + let failed_splits = failed_splits + .iter() + .map(|failed_split| &failed_split.split_id) + .join(", "); + let error_msg = format!( + "search failed for the following splits: {failed_splits:}. For instance, split {} \ + failed with the following error message: {}", + first_failing_split.split_id, first_failing_split.error, + ); + Some(SearchError::Internal(error_msg)) + } +} + impl ServiceError for SearchError { fn error_code(&self) -> ServiceErrorCode { match self { diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index d60259065e1..22e4b5551d1 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -338,6 +338,7 @@ fn get_leaf_resp_from_count(count: u64) -> LeafSearchResponse { partial_hits: Vec::new(), failed_splits: Vec::new(), num_attempted_splits: 1, + num_successful_splits: 1, intermediate_aggregation_result: None, } } @@ -1210,7 +1211,13 @@ pub async fn leaf_search( doc_mapper: Arc, aggregations_limits: AggregationLimitsGuard, ) -> Result { - info!(splits_num = splits.len(), split_offsets = ?PrettySample::new(&splits, 5)); + let num_docs: u64 = splits.iter().map(|split| split.num_docs).sum(); + let num_splits = splits.len(); + let current_span = tracing::Span::current(); + current_span.record("num_docs", num_docs); + current_span.record("num_splits", num_splits); + + info!(num_docs, num_splits, split_offsets = ?PrettySample::new(&splits, 5)); let split_filter = CanSplitDoBetter::from_request(&request, doc_mapper.timestamp_field_name()); let split_with_req = split_filter.optimize(request.clone(), splits)?; @@ -1307,7 +1314,7 @@ pub async fn leaf_search( } #[allow(clippy::too_many_arguments)] -#[instrument(skip_all, fields(split_id = split.split_id))] +#[instrument(skip_all, fields(split_id = split.split_id, num_docs = split.num_docs))] async fn leaf_search_single_split_wrapper( request: SearchRequest, searcher_context: Arc, diff --git a/quickwit/quickwit-search/src/leaf_cache.rs b/quickwit/quickwit-search/src/leaf_cache.rs index 86c5496240e..491f66f3aee 100644 --- a/quickwit/quickwit-search/src/leaf_cache.rs +++ b/quickwit/quickwit-search/src/leaf_cache.rs @@ -242,7 +242,8 @@ mod tests { let result = LeafSearchResponse { failed_splits: Vec::new(), intermediate_aggregation_result: None, - num_attempted_splits: 0, + num_attempted_splits: 1, + num_successful_splits: 1, num_hits: 1234, partial_hits: vec![PartialHit { doc_id: 1, @@ -331,7 +332,8 @@ mod tests { let result = LeafSearchResponse { failed_splits: Vec::new(), intermediate_aggregation_result: None, - num_attempted_splits: 0, + num_attempted_splits: 1, + num_successful_splits: 1, num_hits: 1234, partial_hits: vec![PartialHit { doc_id: 1, diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index f2c636c1037..3565f9f68f8 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -47,9 +47,9 @@ use serde::{Deserialize, Serialize}; use tantivy::aggregation::agg_result::AggregationResults; use tantivy::aggregation::intermediate_agg_result::IntermediateAggregationResults; use tantivy::collector::Collector; -use tantivy::schema::{FieldEntry, FieldType, Schema}; +use tantivy::schema::{Field, FieldEntry, FieldType, Schema}; use tantivy::TantivyError; -use tracing::{debug, error, info, info_span, instrument}; +use tracing::{debug, info, info_span, instrument}; use crate::cluster_client::ClusterClient; use crate::collector::{make_merge_collector, QuickwitAggregations}; @@ -473,6 +473,27 @@ fn validate_sort_by_field_type( Ok(()) } +fn check_is_fast_field( + schema: &Schema, + fast_field_name: &str, + dynamic_fast_field: Option, +) -> crate::Result<()> { + let Some((field, _path)): Option<(Field, &str)> = + schema.find_field_with_default(fast_field_name, dynamic_fast_field) + else { + return Err(SearchError::InvalidArgument(format!( + "Field \"{fast_field_name}\" does not exist" + ))); + }; + let field_entry: &FieldEntry = schema.get_field_entry(field); + if !field_entry.is_fast() { + return Err(SearchError::InvalidArgument(format!( + "Field \"{fast_field_name}\" is not configured as a fast field" + ))); + } + Ok(()) +} + fn validate_request( schema: &Schema, timestamp_field_name: &Option<&str>, @@ -491,11 +512,18 @@ fn validate_request( validate_requested_snippet_fields(schema, &search_request.snippet_fields)?; if let Some(agg) = search_request.aggregation_request.as_ref() { - let _aggs: QuickwitAggregations = serde_json::from_str(agg).map_err(|_err| { + let aggs: QuickwitAggregations = serde_json::from_str(agg).map_err(|_err| { let err = serde_json::from_str::(agg) .unwrap_err(); SearchError::InvalidAggregationRequest(err.to_string()) })?; + + // ensure that the required fast fields are indeed configured as fast fields. + let fast_field_names = aggs.fast_field_names(); + let dynamic_field = schema.get_field(DYNAMIC_FIELD_NAME).ok(); + for fast_field_name in &fast_field_names { + check_is_fast_field(schema, fast_field_name, dynamic_field)?; + } }; if search_request.start_offset > 10_000 { @@ -575,6 +603,8 @@ async fn search_partial_hits_phase_with_scroll( max_hits_per_page: max_hits, cached_partial_hits_start_offset: search_request.start_offset, cached_partial_hits, + failed_splits: leaf_search_resp.failed_splits.clone(), + num_successful_splits: leaf_search_resp.num_successful_splits, }; let scroll_key_and_start_offset: ScrollKeyAndStartOffset = ScrollKeyAndStartOffset::new_with_start_offset( @@ -650,11 +680,14 @@ pub fn get_count_from_metadata(split_metadatas: &[SplitMetadata]) -> Vec> = + let leaf_search_results: Vec> = leaf_search_responses.into_iter().map(Ok).collect_vec(); let span = info_span!("merge_fruits"); let leaf_search_response = crate::search_thread_pool() .run_cpu_intensive(move || { let _span_guard = span.enter(); - merge_collector.merge_fruits(leaf_search_responses) + merge_collector.merge_fruits(leaf_search_results) }) .await .context("failed to merge leaf search responses")? @@ -711,9 +744,7 @@ pub(crate) async fn search_partial_hits_phase( "Merged leaf search response." ); if !leaf_search_response.failed_splits.is_empty() { - error!(failed_splits = ?leaf_search_response.failed_splits, "leaf search response contains at least one failed split"); - let errors: String = leaf_search_response.failed_splits.iter().join(", "); - return Err(SearchError::Internal(errors)); + quickwit_common::rate_limited_error!(limit_per_min=6, failed_splits = ?leaf_search_response.failed_splits, "leaf search response contains at least one failed split"); } Ok(leaf_search_response) } @@ -931,6 +962,8 @@ async fn root_search_aux( scroll_id: scroll_key_and_start_offset_opt .as_ref() .map(ToString::to_string), + failed_splits: first_phase_result.failed_splits, + num_successful_splits: first_phase_result.num_successful_splits, }) } @@ -1120,6 +1153,12 @@ pub async fn root_search( ) .await?; + let num_docs: usize = split_metadatas.iter().map(|split| split.num_docs).sum(); + let num_splits = split_metadatas.len(); + let current_span = tracing::Span::current(); + current_span.record("num_docs", num_docs); + current_span.record("num_splits", num_splits); + let mut search_response = root_search_aux( searcher_context, &request_metadata.indexes_meta_for_leaf_search, @@ -3481,7 +3520,7 @@ mod tests { } #[tokio::test] - async fn test_root_search_single_split_retry_single_node_fails() -> anyhow::Result<()> { + async fn test_root_search_single_split_retry_single_node_fails() { let search_request = quickwit_proto::search::SearchRequest { index_id_patterns: vec!["test-index".to_string()], query_ast: qast_json_helper("test", &["body"]), @@ -3538,9 +3577,9 @@ mod tests { MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) - .await; - assert!(search_response.is_err()); - Ok(()) + .await + .unwrap(); + assert_eq!(search_response.failed_splits.len(), 1); } #[tokio::test] @@ -4760,12 +4799,17 @@ mod tests { .map(|split_offset| mock_partial_hit(&split_offset.split_id, 3, 1)) .collect_vec(); partial_hits.extend_from_slice(&partial_hits2); + let num_attempted_splits: u64 = leaf_search_req + .leaf_requests + .iter() + .map(|leaf_req| leaf_req.split_offsets.len() as u64) + .sum::(); Ok(quickwit_proto::search::LeafSearchResponse { num_hits: leaf_search_req.leaf_requests[0].split_offsets.len() as u64 + leaf_search_req.leaf_requests[1].split_offsets.len() as u64, partial_hits, failed_splits: Vec::new(), - num_attempted_splits: 1, + num_attempted_splits, ..Default::default() }) }, @@ -4807,4 +4851,118 @@ mod tests { ); Ok(()) } + + #[tokio::test] + async fn test_root_search_split_failures() -> anyhow::Result<()> { + let search_request = quickwit_proto::search::SearchRequest { + index_id_patterns: vec!["test-index-1".to_string()], + query_ast: qast_json_helper("test", &["body"]), + max_hits: 10, + ..Default::default() + }; + let mut mock_metastore = MockMetastoreService::new(); + let index_metadata_1 = IndexMetadata::for_test("test-index-1", "ram:///test-index-1"); + let index_uid_1 = index_metadata_1.index_uid.clone(); + mock_metastore.expect_list_indexes_metadata().return_once( + move |_list_indexes_metadata_request: ListIndexesMetadataRequest| { + Ok(ListIndexesMetadataResponse::for_test(vec![ + index_metadata_1, + ])) + }, + ); + mock_metastore + .expect_list_splits() + .return_once(move |list_splits_request| { + let list_splits_query = + list_splits_request.deserialize_list_splits_query().unwrap(); + assert!(list_splits_query.index_uids == vec![index_uid_1.clone(),]); + let splits = vec![ + MockSplitBuilder::new("index-1-split-1") + .with_index_uid(&index_uid_1) + .build(), + MockSplitBuilder::new("index-1-split-2") + .with_index_uid(&index_uid_1) + .build(), + ]; + let splits_response = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(splits_response)])) + }); + let mut mock_search_service_1 = MockSearchService::new(); + mock_search_service_1 + .expect_leaf_search() + .withf( + |leaf_search_req: &quickwit_proto::search::LeafSearchRequest| { + leaf_search_req.leaf_requests.len() == 1 + && leaf_search_req.leaf_requests[0].split_offsets.len() == 2 + }, + ) + .times(1) + .returning( + |_leaf_search_req: quickwit_proto::search::LeafSearchRequest| { + let partial_hits = vec![mock_partial_hit("index-1-split-1", 0u64, 1u32)]; + Ok(quickwit_proto::search::LeafSearchResponse { + num_hits: 1, + partial_hits, + failed_splits: vec![{ + SplitSearchError { + error: "some error".to_string(), + split_id: "index-1-split-1".to_string(), + retryable_error: true, + } + }], + num_attempted_splits: 3, + ..Default::default() + }) + }, + ); + mock_search_service_1 + .expect_leaf_search() + .withf( + |leaf_search_req: &quickwit_proto::search::LeafSearchRequest| { + leaf_search_req.leaf_requests.len() == 1 + && leaf_search_req.leaf_requests[0].split_offsets.len() == 1 + }, + ) + .times(1) + .returning( + |_leaf_search_req: quickwit_proto::search::LeafSearchRequest| { + Ok(quickwit_proto::search::LeafSearchResponse { + num_hits: 0, + partial_hits: Vec::new(), + failed_splits: vec![{ + SplitSearchError { + error: "some error".to_string(), + split_id: "index-1-split-1".to_string(), + retryable_error: true, + } + }], + num_attempted_splits: 1, + ..Default::default() + }) + }, + ); + mock_search_service_1 + .expect_fetch_docs() + .times(1) + .returning(|fetch_docs_req| { + Ok(quickwit_proto::search::FetchDocsResponse { + hits: get_doc_for_fetch_req(fetch_docs_req), + }) + }); + let searcher_pool = searcher_pool_for_test([("127.0.0.1:1001", mock_search_service_1)]); + let search_job_placer = SearchJobPlacer::new(searcher_pool); + let cluster_client = ClusterClient::new(search_job_placer.clone()); + let search_response = root_search( + &SearcherContext::for_test(), + search_request, + MetastoreServiceClient::from_mock(mock_metastore), + &cluster_client, + ) + .await + .unwrap(); + assert_eq!(search_response.num_hits, 1); + assert_eq!(search_response.hits.len(), 1); + assert_eq!(search_response.failed_splits.len(), 1); + Ok(()) + } } diff --git a/quickwit/quickwit-search/src/scroll_context.rs b/quickwit/quickwit-search/src/scroll_context.rs index bb21cf6db9b..c62e80084e4 100644 --- a/quickwit/quickwit-search/src/scroll_context.rs +++ b/quickwit/quickwit-search/src/scroll_context.rs @@ -28,7 +28,7 @@ use anyhow::Context; use base64::prelude::BASE64_STANDARD; use base64::Engine; use quickwit_metastore::SplitMetadata; -use quickwit_proto::search::{LeafSearchResponse, PartialHit, SearchRequest}; +use quickwit_proto::search::{LeafSearchResponse, PartialHit, SearchRequest, SplitSearchError}; use quickwit_proto::types::IndexUid; use serde::{Deserialize, Serialize}; use tokio::sync::RwLock; @@ -57,6 +57,8 @@ pub(crate) struct ScrollContext { pub max_hits_per_page: u64, pub cached_partial_hits_start_offset: u64, pub cached_partial_hits: Vec, + pub failed_splits: Vec, + pub num_successful_splits: u64, } impl ScrollContext { diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index 2a57a7ab65b..65516a99e76 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -436,6 +436,8 @@ pub(crate) async fn scroll( scroll_id: Some(next_scroll_id.to_string()), errors: Vec::new(), aggregation: None, + failed_splits: scroll_context.failed_splits, + num_successful_splits: scroll_context.num_successful_splits, }) } /// [`SearcherContext`] provides a common set of variables diff --git a/quickwit/quickwit-search/src/tests.rs b/quickwit/quickwit-search/src/tests.rs index 7f313558b50..ea255e9688d 100644 --- a/quickwit/quickwit-search/src/tests.rs +++ b/quickwit/quickwit-search/src/tests.rs @@ -1461,10 +1461,10 @@ async fn test_single_node_aggregation_missing_fast_field() { ) .await .unwrap_err(); - let SearchError::Internal(error_msg) = single_node_error else { + let SearchError::InvalidArgument(error_msg) = single_node_error else { panic!(); }; - assert!(error_msg.contains("Field \"color\" is not configured as fast field")); + assert!(error_msg.contains("Field \"color\" is not configured as a fast field")); test_sandbox.assert_quit().await; } diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/model/search_query_params.rs b/quickwit/quickwit-serve/src/elasticsearch_api/model/search_query_params.rs index 70409754353..903dfd4ed9c 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/model/search_query_params.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/model/search_query_params.rs @@ -287,6 +287,11 @@ impl SearchQueryParams { })?; Ok(Some(duration)) } + + pub fn allow_partial_search_results(&self) -> bool { + // By default, elastic search allows partial results. + self.allow_partial_search_results.unwrap_or(true) + } } #[doc = "Whether to expand wildcard expression to concrete indices that are open, closed or both."] diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs index e5caa8703bc..433dec8f487 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs @@ -24,7 +24,7 @@ use std::time::{Duration, Instant}; use bytes::Bytes; use elasticsearch_dsl::search::{Hit as ElasticHit, SearchResponse as ElasticsearchResponse}; -use elasticsearch_dsl::{HitsMetadata, Source, TotalHits, TotalHitsRelation}; +use elasticsearch_dsl::{HitsMetadata, ShardStatistics, Source, TotalHits, TotalHitsRelation}; use futures_util::StreamExt; use hyper::StatusCode; use itertools::Itertools; @@ -441,9 +441,16 @@ async fn es_compat_index_search( search_body: SearchBody, search_service: Arc, ) -> Result { + if search_params.scroll.is_some() && !search_params.allow_partial_search_results() { + return Err(ElasticsearchError::from(SearchError::InvalidArgument( + "Quickwit only supports scroll API with allow_partial_search_results set to true" + .to_string(), + ))); + } let _source_excludes = search_params._source_excludes.clone(); let _source_includes = search_params._source_includes.clone(); let start_instant = Instant::now(); + let allow_partial_search_results = search_params.allow_partial_search_results(); let (search_request, append_shard_doc) = build_request_for_es_api(index_id_patterns, search_params, search_body)?; let search_response: SearchResponse = search_service.root_search(search_request).await?; @@ -453,7 +460,8 @@ async fn es_compat_index_search( append_shard_doc, _source_excludes, _source_includes, - ); + allow_partial_search_results, + )?; search_response_rest.took = elapsed.as_millis() as u32; Ok(search_response_rest) } @@ -791,6 +799,7 @@ async fn es_compat_index_multi_search( build_request_for_es_api(index_ids_patterns, search_query_params, search_body)?; search_requests.push(es_request); } + // TODO: forced to do weird referencing to work around https://github.com/rust-lang/rust/issues/100905 // otherwise append_shard_doc is captured by ref, and we get lifetime issues let futures = search_requests @@ -804,12 +813,14 @@ async fn es_compat_index_multi_search( let search_response: SearchResponse = search_service.clone().root_search(search_request).await?; let elapsed = start_instant.elapsed(); - let mut search_response_rest: ElasticsearchResponse = convert_to_es_search_response( - search_response, - append_shard_doc, - _source_excludes, - _source_includes, - ); + let mut search_response_rest: ElasticsearchResponse = + convert_to_es_search_response( + search_response, + append_shard_doc, + _source_excludes, + _source_includes, + true, //< allow_partial_results. Set to to true to match ES's behavior. + )?; search_response_rest.took = elapsed.as_millis() as u32; Ok::<_, ElasticsearchError>(search_response_rest) } @@ -852,8 +863,13 @@ async fn es_scroll( }; let search_response: SearchResponse = search_service.scroll(scroll_request).await?; // TODO append_shard_doc depends on the initial request, but we don't have access to it + + // Ideally, we would have wanted to reuse the setting from the initial search request. + // However, passing that parameter is cumbersome, so we cut some corner and forbid the + // use of scroll requests in combination with allow_partial_results set to false. + let allow_failed_splits = true; let mut search_response_rest: ElasticsearchResponse = - convert_to_es_search_response(search_response, false, None, None); + convert_to_es_search_response(search_response, false, None, None, allow_failed_splits)?; search_response_rest.took = start_instant.elapsed().as_millis() as u32; Ok(search_response_rest) } @@ -918,7 +934,13 @@ fn convert_to_es_search_response( append_shard_doc: bool, _source_excludes: Option>, _source_includes: Option>, -) -> ElasticsearchResponse { + allow_partial_results: bool, +) -> Result { + if !allow_partial_results || resp.num_successful_splits == 0 { + if let Some(search_error) = SearchError::from_split_errors(&resp.failed_splits) { + return Err(ElasticsearchError::from(search_error)); + } + } let hits: Vec = resp .hits .into_iter() @@ -929,7 +951,10 @@ fn convert_to_es_search_response( } else { None }; - ElasticsearchResponse { + let num_failed_splits = resp.failed_splits.len() as u32; + let num_successful_splits = resp.num_successful_splits as u32; + let num_total_splits = num_successful_splits + num_failed_splits; + Ok(ElasticsearchResponse { timed_out: false, hits: HitsMetadata { total: Some(TotalHits { @@ -941,8 +966,16 @@ fn convert_to_es_search_response( }, aggregations, scroll_id: resp.scroll_id, + // There is not concept of shards here, but use this to convey split search failures. + shards: ShardStatistics { + total: num_total_splits, + successful: num_successful_splits, + skipped: 0u32, + failed: num_failed_splits, + failures: Vec::new(), + }, ..Default::default() - } + }) } pub(crate) fn str_lines(body: &str) -> impl Iterator { @@ -954,6 +987,7 @@ pub(crate) fn str_lines(body: &str) -> impl Iterator { #[cfg(test)] mod tests { use hyper::StatusCode; + use quickwit_proto::search::SplitSearchError; use super::{partial_hit_from_search_after_param, *}; @@ -1133,4 +1167,58 @@ mod tests { assert_eq!(fields, expected); } + + // We test that the behavior of allow partial search results. + #[test] + fn test_convert_to_es_search_response_allow_partial() { + let split_error = SplitSearchError { + error: "some-error".to_string(), + split_id: "some-split-id".to_string(), + retryable_error: true, + }; + { + let search_response = SearchResponse { + num_successful_splits: 1, + failed_splits: vec![split_error.clone()], + ..Default::default() + }; + convert_to_es_search_response(search_response, false, None, None, false).unwrap_err(); + } + { + let search_response = SearchResponse { + num_successful_splits: 1, + failed_splits: vec![split_error.clone()], + ..Default::default() + }; + // if we allow partial search results, this should not fail, but we report the presence + // of failed splits in the fail shard response. + let es_search_resp = + convert_to_es_search_response(search_response, false, None, None, true).unwrap(); + assert_eq!(es_search_resp.shards.failed, 1); + } + { + let search_response = SearchResponse { + failed_splits: vec![split_error.clone()], + ..Default::default() + }; + // Event if we allow partial search results, with a fail and no success, we have a + // failure. + convert_to_es_search_response(search_response, false, None, None, true).unwrap_err(); + } + { + // Not having any splits (no failure + no success) is not considered a failure. + for allow_partial in [true, false] { + let search_response = SearchResponse::default(); + let es_search_resp = convert_to_es_search_response( + search_response, + false, + None, + None, + allow_partial, + ) + .unwrap(); + assert_eq!(es_search_resp.shards.failed, 0); + } + } + } } diff --git a/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs b/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs index 1525c4d2b8b..15d76413f47 100644 --- a/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/jaeger_api/rest_handler.rs @@ -463,6 +463,8 @@ mod tests { errors: Vec::new(), aggregation: None, scroll_id: None, + failed_splits: Vec::new(), + num_successful_splits: 1, }) }); let mock_search_service = Arc::new(mock_search_service); @@ -494,6 +496,8 @@ mod tests { errors: Vec::new(), aggregation: None, scroll_id: None, + failed_splits: Vec::new(), + num_successful_splits: 1, }) }); let mock_search_service = Arc::new(mock_search_service); diff --git a/quickwit/quickwit-serve/src/search_api/rest_handler.rs b/quickwit/quickwit-serve/src/search_api/rest_handler.rs index cf9b5d40c84..42a9a0ff44b 100644 --- a/quickwit/quickwit-serve/src/search_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/search_api/rest_handler.rs @@ -239,6 +239,10 @@ pub struct SearchRequestQueryString { #[serde(with = "count_hits_from_bool")] #[serde(default = "count_hits_from_bool::default")] pub count_all: CountHits, + #[param(value_type = bool)] + #[schema(value_type = bool)] + #[serde(default)] + pub allow_failed_splits: bool, } mod count_hits_from_bool { @@ -302,8 +306,22 @@ async fn search_endpoint( search_request: SearchRequestQueryString, search_service: &dyn SearchService, ) -> Result { + let allow_failed_splits = search_request.allow_failed_splits; let search_request = search_request_from_api_request(index_id_patterns, search_request)?; - let search_response = search_service.root_search(search_request).await?; + let search_response = + search_service + .root_search(search_request) + .await + .and_then(|search_response| { + if !allow_failed_splits || search_response.num_successful_splits == 0 { + if let Some(search_error) = + SearchError::from_split_errors(&search_response.failed_splits[..]) + { + return Err(search_error); + } + } + Ok(search_response) + })?; let search_response_rest = SearchResponseRest::try_from(search_response)?; Ok(search_response_rest) } diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/0007-range_queries.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/0007-range_queries.yaml index 5337325c229..bbedea70e0d 100644 --- a/quickwit/rest-api-tests/scenarii/es_compatibility/0007-range_queries.yaml +++ b/quickwit/rest-api-tests/scenarii/es_compatibility/0007-range_queries.yaml @@ -243,5 +243,18 @@ expected: total: value: 68 relation: "eq" - - +--- +# Timestamp field with a custom format. +json: + query: + range: + created_at: + gte: "2015|02|01 T00:00:00.001999Z" + lte: "2015|02|01 T00:00:00.001999Z" + # Elasticsearch date format requires text to be escaped with single quotes + format: yyyy|MM|dd 'T'HH:mm:ss.SSSSSS'Z' +expected: + hits: + total: + value: 1 + relation: "eq" diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/0012-scroll-api.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/0012-scroll-api.yaml index 58e425f07e5..0f6b845b7a0 100644 --- a/quickwit/rest-api-tests/scenarii/es_compatibility/0012-scroll-api.yaml +++ b/quickwit/rest-api-tests/scenarii/es_compatibility/0012-scroll-api.yaml @@ -1,3 +1,17 @@ +--- +engines: ["quickwit"] +params: + size: 1 + scroll: 30m + allow_partial_search_results: "false" +json: + query: + match_all: {} +status_code: 400 +expected: + error: + reason: "Invalid argument: Quickwit only supports scroll API with allow_partial_search_results set to true" +--- params: size: 1 scroll: 30m