diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 022886db938..6cfbdcfc7ac 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -779,6 +779,11 @@ impl IngestController { let index_metadata = model .index_metadata(&source_uid.index_uid) .expect("index should exist"); + let validate_docs = model + .source_metadata(source_uid) + .expect("source should exist") + .transform_config + .is_none(); let doc_mapping = &index_metadata.index_config.doc_mapping; let doc_mapping_uid = doc_mapping.doc_mapping_uid; let doc_mapping_json = serde_utils::to_json_str(doc_mapping)?; @@ -799,6 +804,7 @@ impl IngestController { subrequest_id: subrequest_id as u32, shard: Some(shard), doc_mapping_json, + validate_docs, }; init_shard_subrequests.push(init_shard_subrequest); } @@ -1324,7 +1330,7 @@ mod tests { let shard = subrequest.shard(); assert_eq!(shard.index_uid(), &index_uid_1_clone); - assert_eq!(shard.source_id, "test-source"); + assert_eq!(shard.source_id, source_id); assert_eq!(shard.leader_id, "test-ingester-2"); let successes = vec![InitShardSuccess { @@ -1507,7 +1513,7 @@ mod tests { let shard = subrequest.shard(); assert_eq!(shard.index_uid(), &index_uid_0); - assert_eq!(shard.source_id, "test-source"); + assert_eq!(shard.source_id, source_id); assert_eq!(shard.leader_id, "test-ingester-1"); let successes = vec![InitShardSuccess { @@ -1911,6 +1917,7 @@ mod tests { ..Default::default() }), doc_mapping_json: "{}".to_string(), + validate_docs: false, }, InitShardSubrequest { subrequest_id: 1, @@ -1923,6 +1930,7 @@ mod tests { ..Default::default() }), doc_mapping_json: "{}".to_string(), + validate_docs: false, }, InitShardSubrequest { subrequest_id: 2, @@ -1935,6 +1943,7 @@ mod tests { ..Default::default() }), doc_mapping_json: "{}".to_string(), + validate_docs: false, }, InitShardSubrequest { subrequest_id: 3, @@ -1947,6 +1956,7 @@ mod tests { ..Default::default() }), doc_mapping_json: "{}".to_string(), + validate_docs: false, }, InitShardSubrequest { subrequest_id: 4, @@ -1959,6 +1969,7 @@ mod tests { ..Default::default() }), doc_mapping_json: "{}".to_string(), + validate_docs: false, }, ]; let init_shards_response = controller @@ -2033,6 +2044,10 @@ mod tests { source_id: source_id.clone(), }; let mut index_metadata = IndexMetadata::for_test("test-index", "ram://indexes/test-index"); + index_metadata.sources.insert( + source_id.clone(), + SourceConfig::for_test(&source_id, quickwit_config::SourceParams::void()), + ); let doc_mapping_json = format!( r#"{{ @@ -2159,8 +2174,12 @@ mod tests { ); let index_uid = IndexUid::for_test("test-index", 0); - let index_metadata = IndexMetadata::for_test("test-index", "ram://indexes/test-index"); + let mut index_metadata = IndexMetadata::for_test("test-index", "ram://indexes/test-index"); let source_id: SourceId = "test-source".to_string(); + index_metadata.sources.insert( + source_id.clone(), + SourceConfig::for_test(&source_id, quickwit_config::SourceParams::void()), + ); let source_uid = SourceUid { index_uid: index_uid.clone(), @@ -2228,6 +2247,7 @@ mod tests { assert_eq!(init_shard_request.subrequests.len(), 1); let init_shard_subrequest: &InitShardSubrequest = &init_shard_request.subrequests[0]; + assert!(init_shard_subrequest.validate_docs); Ok(InitShardsResponse { successes: vec![InitShardSuccess { subrequest_id: init_shard_subrequest.subrequest_id, @@ -2311,6 +2331,115 @@ mod tests { .unwrap(); } + #[tokio::test] + async fn test_ingest_controller_disable_validation_when_vrl() { + let mut mock_metastore = MockMetastoreService::new(); + mock_metastore + .expect_open_shards() + .once() + .returning(|request| { + let subrequest: &OpenShardSubrequest = &request.subrequests[0]; + let shard = Shard { + index_uid: subrequest.index_uid.clone(), + source_id: subrequest.source_id.clone(), + shard_id: subrequest.shard_id.clone(), + shard_state: ShardState::Open as i32, + leader_id: subrequest.leader_id.clone(), + follower_id: subrequest.follower_id.clone(), + doc_mapping_uid: subrequest.doc_mapping_uid, + publish_position_inclusive: Some(Position::Beginning), + publish_token: None, + update_timestamp: 1724158996, + }; + let response = OpenShardsResponse { + subresponses: vec![OpenShardSubresponse { + subrequest_id: subrequest.subrequest_id, + open_shard: Some(shard), + }], + }; + Ok(response) + }); + let metastore = MetastoreServiceClient::from_mock(mock_metastore); + let ingester_pool = IngesterPool::default(); + let replication_factor = 1; + + let mut controller = IngestController::new( + metastore, + ingester_pool.clone(), + replication_factor, + TEST_SHARD_THROUGHPUT_LIMIT_MIB, + ); + + let index_uid = IndexUid::for_test("test-index", 0); + let mut index_metadata = IndexMetadata::for_test("test-index", "ram://indexes/test-index"); + let source_id: SourceId = "test-source".to_string(); + let mut source_config = + SourceConfig::for_test(&source_id, quickwit_config::SourceParams::void()); + // set a vrl script + source_config.transform_config = + Some(quickwit_config::TransformConfig::new("".to_string(), None)); + index_metadata + .sources + .insert(source_id.clone(), source_config); + + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: source_id.clone(), + }; + let mut model = ControlPlaneModel::default(); + model.add_index(index_metadata); + let progress = Progress::default(); + + let shards = vec![Shard { + index_uid: Some(index_uid.clone()), + source_id: source_id.clone(), + shard_id: Some(ShardId::from(1)), + leader_id: "test-ingester".to_string(), + shard_state: ShardState::Open as i32, + ..Default::default() + }]; + model.insert_shards(&index_uid, &source_id, shards); + + let mut mock_ingester = MockIngesterService::new(); + + mock_ingester.expect_init_shards().returning( + move |init_shard_request: InitShardsRequest| { + assert_eq!(init_shard_request.subrequests.len(), 1); + let init_shard_subrequest: &InitShardSubrequest = + &init_shard_request.subrequests[0]; + // we have vrl, so no validation + assert!(!init_shard_subrequest.validate_docs); + Ok(InitShardsResponse { + successes: vec![InitShardSuccess { + subrequest_id: init_shard_subrequest.subrequest_id, + shard: init_shard_subrequest.shard.clone(), + }], + failures: Vec::new(), + }) + }, + ); + + let ingester = IngesterServiceClient::from_mock(mock_ingester); + ingester_pool.insert("test-ingester".into(), ingester); + + let shard_infos = BTreeSet::from_iter([ShardInfo { + shard_id: ShardId::from(1), + shard_state: ShardState::Open, + short_term_ingestion_rate: RateMibPerSec(4), + long_term_ingestion_rate: RateMibPerSec(4), + }]); + let local_shards_update = LocalShardsUpdate { + leader_id: "test-ingester".into(), + source_uid: source_uid.clone(), + shard_infos, + }; + + controller + .handle_local_shards_update(local_shards_update, &mut model, &progress) + .await + .unwrap(); + } + #[tokio::test] async fn test_ingest_controller_try_scale_up_shards() { let mut mock_metastore = MockMetastoreService::new(); diff --git a/quickwit/quickwit-control-plane/src/model/mod.rs b/quickwit/quickwit-control-plane/src/model/mod.rs index 4883a9bed25..b378931c5eb 100644 --- a/quickwit/quickwit-control-plane/src/model/mod.rs +++ b/quickwit/quickwit-control-plane/src/model/mod.rs @@ -160,6 +160,12 @@ impl ControlPlaneModel { self.index_table.get(index_uid) } + pub fn source_metadata(&self, source_uid: &SourceUid) -> Option<&SourceConfig> { + self.index_metadata(&source_uid.index_uid)? + .sources + .get(&source_uid.source_id) + } + fn update_metrics(&self) { crate::metrics::CONTROL_PLANE_METRICS .indexes_total diff --git a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs index 512a35e1ab7..a6930b632f3 100644 --- a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs +++ b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs @@ -40,8 +40,8 @@ use tantivy::TantivyDocument as Document; use super::field_mapping_entry::RAW_TOKENIZER_NAME; use super::DefaultDocMapperBuilder; use crate::default_doc_mapper::mapping_tree::{ - build_mapping_tree, map_primitive_json_to_tantivy, JsonValueIterator, MappingNode, - MappingNodeRoot, + build_field_path_from_str, build_mapping_tree, map_primitive_json_to_tantivy, + JsonValueIterator, MappingNode, MappingNodeRoot, }; use crate::default_doc_mapper::FieldMappingType; use crate::doc_mapper::{JsonObject, Partition}; @@ -81,6 +81,8 @@ pub struct DefaultDocMapper { default_search_field_names: Vec, /// Timestamp field name. timestamp_field_name: Option, + /// Timestamp field path (name parsed) + timestamp_field_path: Option>, /// Root node of the field mapping tree. /// See [`MappingNode`]. field_mappings: MappingNode, @@ -197,8 +199,12 @@ impl TryFrom for DefaultDocMapper { if !concatenate_dynamic_fields.is_empty() && dynamic_field.is_none() { bail!("concatenate field has `include_dynamic_fields` set, but index isn't dynamic"); } - if let Some(timestamp_field_path) = &doc_mapping.timestamp_field { - validate_timestamp_field(timestamp_field_path, &field_mappings)?; + let timestamp_field_path = if let Some(timestamp_field_name) = &doc_mapping.timestamp_field + { + validate_timestamp_field(timestamp_field_name, &field_mappings)?; + Some(build_field_path_from_str(timestamp_field_name)) + } else { + None }; let schema = schema_builder.build(); @@ -288,6 +294,7 @@ impl TryFrom for DefaultDocMapper { document_size_field, default_search_field_names, timestamp_field_name: doc_mapping.timestamp_field, + timestamp_field_path, field_mappings, concatenate_dynamic_fields, tag_field_names, @@ -513,6 +520,32 @@ impl DocMapper for DefaultDocMapper { let mut field_path = Vec::new(); self.field_mappings .validate_from_json(json_obj, is_strict, &mut field_path)?; + if let Some(timestamp_field_path) = &self.timestamp_field_path { + let missing_ts_field = + || DocParsingError::RequiredField("timestamp field is required".to_string()); + match ×tamp_field_path[..] { + [] => (), // ? + [single_part] => { + let obj = json_obj.get(single_part).ok_or_else(missing_ts_field)?; + if !(obj.is_string() || obj.is_number()) { + return Err(missing_ts_field()); + } + } + [first_part, more_part @ ..] => { + let mut obj = json_obj.get(first_part).ok_or_else(missing_ts_field)?; + for part in more_part { + obj = obj + .as_object() + .ok_or_else(missing_ts_field)? + .get(part) + .ok_or_else(missing_ts_field)?; + } + if !(obj.is_string() || obj.is_number()) { + return Err(missing_ts_field()); + } + } + }; + } Ok(()) } diff --git a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mapping_tree.rs b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mapping_tree.rs index 4439268eb3c..7c82ae0ff95 100644 --- a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mapping_tree.rs +++ b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mapping_tree.rs @@ -1410,7 +1410,7 @@ fn field_name_for_field_path(field_path: &[&str]) -> String { /// starting from the root of the document. /// Dots '.' define the boundaries between field names. /// If a dot is part of a field name, it must be escaped with '\'. -fn build_field_path_from_str(field_path_as_str: &str) -> Vec { +pub(crate) fn build_field_path_from_str(field_path_as_str: &str) -> Vec { let mut field_path = Vec::new(); let mut current_path_fragment = String::new(); let mut escaped = false; diff --git a/quickwit/quickwit-doc-mapper/src/doc_mapper.rs b/quickwit/quickwit-doc-mapper/src/doc_mapper.rs index b58d3edaa7c..e8c2704f41e 100644 --- a/quickwit/quickwit-doc-mapper/src/doc_mapper.rs +++ b/quickwit/quickwit-doc-mapper/src/doc_mapper.rs @@ -463,35 +463,187 @@ mod tests { }"#; let doc_mapper = serde_json::from_str::(JSON_CONFIG_VALUE).unwrap(); { - assert!(test_validate_doc_aux(&doc_mapper, r#"{ "body": "toto"}"#).is_ok()); + assert!(test_validate_doc_aux( + &doc_mapper, + r#"{ "body": "toto", "timestamp": "2024-01-01T01:01:01Z"}"# + ) + .is_ok()); } { assert!(matches!( - test_validate_doc_aux(&doc_mapper, r#"{ "response_time": "toto"}"#).unwrap_err(), + test_validate_doc_aux( + &doc_mapper, + r#"{ "response_time": "toto", "timestamp": "2024-01-01T01:01:01Z"}"# + ) + .unwrap_err(), DocParsingError::ValueError(_, _) )); } { - assert!(test_validate_doc_aux(&doc_mapper, r#"{ "response_time": "2.3"}"#).is_ok(),); + assert!(test_validate_doc_aux( + &doc_mapper, + r#"{ "response_time": "2.3", "timestamp": "2024-01-01T01:01:01Z"}"# + ) + .is_ok(),); } { // coercion disabled assert!(matches!( - test_validate_doc_aux(&doc_mapper, r#"{"response_time_no_coercion": "2.3"}"#) - .unwrap_err(), + test_validate_doc_aux( + &doc_mapper, + r#"{"response_time_no_coercion": "2.3", "timestamp": "2024-01-01T01:01:01Z"}"# + ) + .unwrap_err(), DocParsingError::ValueError(_, _) )); } { assert!(matches!( - test_validate_doc_aux(&doc_mapper, r#"{"response_time": [2.3]}"#).unwrap_err(), + test_validate_doc_aux( + &doc_mapper, + r#"{"response_time": [2.3], "timestamp": "2024-01-01T01:01:01Z"}"# + ) + .unwrap_err(), DocParsingError::MultiValuesNotSupported(_) )); } { - assert!( - test_validate_doc_aux(&doc_mapper, r#"{"attributes": {"numbers": [-2]}}"#).is_ok() - ); + assert!(test_validate_doc_aux( + &doc_mapper, + r#"{"attributes": {"numbers": [-2]}, "timestamp": "2024-01-01T01:01:01Z"}"# + ) + .is_ok()); + } + } + + #[test] + fn test_validate_doc_timestamp() { + const JSON_CONFIG_TS_AT_ROOT: &str = r#"{ + "timestamp_field": "timestamp", + "field_mappings": [ + { + "name": "timestamp", + "type": "datetime", + "fast": true + }, + { + "name": "body", + "type": "text" + } + ] + }"#; + const JSON_CONFIG_TS_WITH_DOT: &str = r#"{ + "timestamp_field": "timestamp\\.now", + "field_mappings": [ + { + "name": "timestamp.now", + "type": "datetime", + "fast": true + }, + { + "name": "body", + "type": "text" + } + ] + }"#; + const JSON_CONFIG_TS_NESTED: &str = r#"{ + "timestamp_field": "doc.timestamp", + "field_mappings": [ + { + "name": "doc", + "type": "object", + "field_mappings": [ + { + "name": "timestamp", + "type": "datetime", + "fast": true + } + ] + }, + { + "name": "body", + "type": "text" + } + ] + }"#; + let doc_mapper = serde_json::from_str::(JSON_CONFIG_TS_AT_ROOT).unwrap(); + { + assert!(test_validate_doc_aux( + &doc_mapper, + r#"{ "body": "toto", "timestamp": "2024-01-01T01:01:01Z"}"# + ) + .is_ok()); + } + { + assert!(matches!( + test_validate_doc_aux( + &doc_mapper, + r#"{ "body": "toto", "timestamp": "invalid timestamp"}"# + ) + .unwrap_err(), + DocParsingError::ValueError(_, _), + )); + } + { + assert!(matches!( + test_validate_doc_aux(&doc_mapper, r#"{ "body": "toto", "timestamp": null}"#) + .unwrap_err(), + DocParsingError::RequiredField(_), + )); + } + { + assert!(matches!( + test_validate_doc_aux(&doc_mapper, r#"{ "body": "toto"}"#).unwrap_err(), + DocParsingError::RequiredField(_), + )); + } + + let doc_mapper = serde_json::from_str::(JSON_CONFIG_TS_WITH_DOT).unwrap(); + { + assert!(test_validate_doc_aux( + &doc_mapper, + r#"{ "body": "toto", "timestamp.now": "2024-01-01T01:01:01Z"}"# + ) + .is_ok()); + } + { + assert!(matches!( + test_validate_doc_aux( + &doc_mapper, + r#"{ "body": "toto", "timestamp.now": "invalid timestamp"}"# + ) + .unwrap_err(), + DocParsingError::ValueError(_, _), + )); + } + { + assert!(matches!( + test_validate_doc_aux( + &doc_mapper, + r#"{ "body": "toto", "timestamp": {"now": "2024-01-01T01:01:01Z"}}"# + ) + .unwrap_err(), + DocParsingError::RequiredField(_), + )); + } + + let doc_mapper = serde_json::from_str::(JSON_CONFIG_TS_NESTED).unwrap(); + { + assert!(test_validate_doc_aux( + &doc_mapper, + r#"{ "body": "toto", "doc":{"timestamp": "2024-01-01T01:01:01Z"}}"# + ) + .is_ok()); + } + { + assert!(matches!( + test_validate_doc_aux( + &doc_mapper, + r#"{ "body": "toto", "doc.timestamp": "2024-01-01T01:01:01Z"}"# + ) + .unwrap_err(), + DocParsingError::RequiredField(_), + )); } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs index 565598ae2b9..1d2a639b66c 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs @@ -606,6 +606,7 @@ mod tests { Position::Beginning, None, Instant::now(), + false, ); state_guard.shards.insert(queue_id_00.clone(), shard_00); @@ -616,6 +617,7 @@ mod tests { Position::Beginning, None, Instant::now(), + false, ); shard_01.is_advertisable = true; state_guard.shards.insert(queue_id_01.clone(), shard_01); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/idle.rs b/quickwit/quickwit-ingest/src/ingest_v2/idle.rs index 0263e23d194..a94a6168598 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/idle.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/idle.rs @@ -107,6 +107,7 @@ mod tests { Position::Beginning, None, now - idle_shard_timeout, + false, ); let queue_id_01 = queue_id(&index_uid, "test-source", &ShardId::from(1)); state_guard.shards.insert(queue_id_01.clone(), shard_01); @@ -117,6 +118,7 @@ mod tests { Position::Beginning, None, now - idle_shard_timeout / 2, + false, ); let queue_id_02 = queue_id(&index_uid, "test-source", &ShardId::from(2)); state_guard.shards.insert(queue_id_02.clone(), shard_02); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index fb9a9980c8b..7412a25cf28 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -192,6 +192,7 @@ impl Ingester { shard: Shard, doc_mapping_json: &str, now: Instant, + validate: bool, ) -> IngestV2Result<()> { let queue_id = shard.queue_id(); info!( @@ -252,6 +253,7 @@ impl Ingester { Position::Beginning, doc_mapper, now, + validate, ) } else { IngesterShard::new_solo( @@ -260,6 +262,7 @@ impl Ingester { Position::Beginning, Some(doc_mapper), now, + validate, ) }; entry.insert(primary_shard); @@ -516,6 +519,7 @@ impl Ingester { continue; } let doc_mapper = shard.doc_mapper_opt.clone().expect("shard should be open"); + let validate_shard = shard.validate; let follower_id_opt = shard.follower_id_opt().cloned(); let from_position_exclusive = shard.replication_position_inclusive.clone(); @@ -570,8 +574,12 @@ impl Ingester { // Total number of bytes (valid and invalid documents) let original_batch_num_bytes = doc_batch.num_bytes() as u64; - let (valid_doc_batch, parse_failures) = - validate_doc_batch(doc_batch, doc_mapper).await?; + + let (valid_doc_batch, parse_failures) = if validate_shard { + validate_doc_batch(doc_batch, doc_mapper).await? + } else { + (doc_batch, Vec::new()) + }; if valid_doc_batch.is_empty() { crate::metrics::INGEST_METRICS @@ -947,6 +955,7 @@ impl Ingester { subrequest.shard().clone(), &subrequest.doc_mapping_json, now, + subrequest.validate_docs, ) .await; if init_primary_shard_result.is_ok() { @@ -1551,6 +1560,7 @@ mod tests { Position::Beginning, None, Instant::now(), + false, ); state_guard.shards.insert(queue_id_00.clone(), shard_00); @@ -1561,6 +1571,7 @@ mod tests { Position::Beginning, None, Instant::now(), + false, ); shard_01.is_advertisable = true; state_guard.shards.insert(queue_id_01.clone(), shard_01); @@ -1652,6 +1663,7 @@ mod tests { primary_shard, &doc_mapping_json, Instant::now(), + true, ) .await .unwrap(); @@ -1694,6 +1706,7 @@ mod tests { subrequest_id: 0, shard: Some(shard.clone()), doc_mapping_json, + validate_docs: true, }], }; let response = ingester.init_shards(init_shards_request).await.unwrap(); @@ -1744,6 +1757,7 @@ mod tests { ..Default::default() }), doc_mapping_json: doc_mapping_json.clone(), + validate_docs: true, }, InitShardSubrequest { subrequest_id: 1, @@ -1757,6 +1771,7 @@ mod tests { ..Default::default() }), doc_mapping_json, + validate_docs: true, }, ], }; @@ -1867,6 +1882,7 @@ mod tests { ..Default::default() }), doc_mapping_json, + validate_docs: true, }], }; let response = ingester.init_shards(init_shards_request).await.unwrap(); @@ -1937,6 +1953,7 @@ mod tests { ..Default::default() }), doc_mapping_json, + validate_docs: true, }], }; let response = ingester.init_shards(init_shards_request).await.unwrap(); @@ -1984,6 +2001,66 @@ mod tests { assert!(parse_failure_2.message.contains("not declared")); } + #[tokio::test] + async fn test_ingester_persist_doesnt_validates_docs_when_requested() { + let (ingester_ctx, ingester) = IngesterForTest::default().build().await; + + let index_uid: IndexUid = IndexUid::for_test("test-index", 0); + + let doc_mapping_uid = DocMappingUid::random(); + let doc_mapping_json = format!( + r#"{{ + "doc_mapping_uid": "{doc_mapping_uid}", + "mode": "strict", + "field_mappings": [{{"name": "doc", "type": "text"}}] + }}"# + ); + let init_shards_request = InitShardsRequest { + subrequests: vec![InitShardSubrequest { + subrequest_id: 0, + shard: Some(Shard { + index_uid: Some(index_uid.clone()), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(0)), + shard_state: ShardState::Open as i32, + leader_id: ingester_ctx.node_id.to_string(), + doc_mapping_uid: Some(doc_mapping_uid), + ..Default::default() + }), + doc_mapping_json, + validate_docs: false, + }], + }; + let response = ingester.init_shards(init_shards_request).await.unwrap(); + assert_eq!(response.successes.len(), 1); + assert_eq!(response.failures.len(), 0); + + let persist_request = PersistRequest { + leader_id: ingester_ctx.node_id.to_string(), + commit_type: CommitTypeV2::Force as i32, + subrequests: vec![PersistSubrequest { + subrequest_id: 0, + index_uid: Some(index_uid.clone()), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(0)), + doc_batch: Some(DocBatchV2::for_test([ + "", // invalid + "[]", // invalid + r#"{"foo": "bar"}"#, // invalid + r#"{"doc": "test-doc-000"}"#, // valid + ])), + }], + }; + let persist_response = ingester.persist(persist_request).await.unwrap(); + assert_eq!(persist_response.leader_id, "test-ingester"); + assert_eq!(persist_response.successes.len(), 1); + assert_eq!(persist_response.failures.len(), 0); + + let persist_success = &persist_response.successes[0]; + assert_eq!(persist_success.num_persisted_docs, 4); + assert_eq!(persist_success.parse_failures.len(), 0); + } + #[tokio::test] async fn test_ingester_persist_checks_capacity_before_validating_docs() { let (ingester_ctx, ingester) = IngesterForTest::default() @@ -2014,6 +2091,7 @@ mod tests { ..Default::default() }), doc_mapping_json, + validate_docs: true, }], }; let response = ingester.init_shards(init_shards_request).await.unwrap(); @@ -2074,6 +2152,7 @@ mod tests { ..Default::default() }), doc_mapping_json, + validate_docs: true, }], }; let response = ingester.init_shards(init_shards_request).await.unwrap(); @@ -2125,6 +2204,7 @@ mod tests { Position::Beginning, None, Instant::now(), + false, ); state_guard.shards.insert(queue_id.clone(), solo_shard); @@ -2190,6 +2270,7 @@ mod tests { Position::Beginning, Some(doc_mapper), Instant::now(), + false, ); state_guard.shards.insert(queue_id.clone(), solo_shard); @@ -2276,6 +2357,7 @@ mod tests { ..Default::default() }), doc_mapping_json: doc_mapping_json.clone(), + validate_docs: true, }, InitShardSubrequest { subrequest_id: 1, @@ -2290,6 +2372,7 @@ mod tests { ..Default::default() }), doc_mapping_json, + validate_docs: true, }, ], }; @@ -2481,6 +2564,7 @@ mod tests { ..Default::default() }), doc_mapping_json: doc_mapping_json.clone(), + validate_docs: true, }, InitShardSubrequest { subrequest_id: 1, @@ -2495,6 +2579,7 @@ mod tests { ..Default::default() }), doc_mapping_json, + validate_docs: true, }, ], }; @@ -2618,6 +2703,7 @@ mod tests { Position::Beginning, None, Instant::now(), + false, ); ingester .state @@ -2696,6 +2782,7 @@ mod tests { primary_shard, &doc_mapping_json, Instant::now(), + true, ) .await .unwrap(); @@ -2776,6 +2863,7 @@ mod tests { primary_shard, &doc_mapping_json, Instant::now(), + true, ) .await .unwrap(); @@ -2898,6 +2986,7 @@ mod tests { shard, &doc_mapping_json, Instant::now(), + true, ) .await .unwrap(); @@ -3022,6 +3111,7 @@ mod tests { shard_01, &doc_mapping_json_01, now, + true, ) .await .unwrap(); @@ -3032,6 +3122,7 @@ mod tests { shard_02, &doc_mapping_json_02, now, + true, ) .await .unwrap(); @@ -3122,6 +3213,7 @@ mod tests { Position::Beginning, None, Instant::now(), + false, ); state_guard.shards.insert(queue_id.clone(), solo_shard); @@ -3232,6 +3324,7 @@ mod tests { shard_01, &doc_mapping_json, now, + true, ) .await .unwrap(); @@ -3242,6 +3335,7 @@ mod tests { shard_02, &doc_mapping_json, now, + true, ) .await .unwrap(); @@ -3314,6 +3408,7 @@ mod tests { shard_17, &doc_mapping_json, now, + true, ) .await .unwrap(); @@ -3325,6 +3420,7 @@ mod tests { shard_18, &doc_mapping_json, now, + true, ) .await .unwrap(); @@ -3382,6 +3478,7 @@ mod tests { shard, &doc_mapping_json, Instant::now(), + true, ) .await .unwrap(); @@ -3495,6 +3592,7 @@ mod tests { Position::Beginning, None, Instant::now(), + false, ), ); ingester.check_decommissioning_status(&mut state_guard); @@ -3551,6 +3649,7 @@ mod tests { shard_01, &doc_mapping_json, now, + true, ) .await .unwrap(); @@ -3561,6 +3660,7 @@ mod tests { shard_02, &doc_mapping_json, now, + true, ) .await .unwrap(); @@ -3656,6 +3756,7 @@ mod tests { shard_01, &doc_mapping_json, now - idle_shard_timeout, + true, ) .await .unwrap(); @@ -3723,6 +3824,7 @@ mod tests { shard_01, &doc_mapping_json, now, + true, ) .await .unwrap(); @@ -3733,6 +3835,7 @@ mod tests { shard_02, &doc_mapping_json, now, + true, ) .await .unwrap(); @@ -3743,6 +3846,7 @@ mod tests { shard_03, &doc_mapping_json, now, + true, ) .await .unwrap(); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/models.rs b/quickwit/quickwit-ingest/src/ingest_v2/models.rs index 8d93954f9f9..79d2932c9af 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/models.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/models.rs @@ -55,6 +55,9 @@ pub(super) struct IngesterShard { pub is_advertisable: bool, /// Document mapper for the shard. Replica shards and closed solo shards do not have one. pub doc_mapper_opt: Option>, + /// Whether to validate documents in this shard. True if no preprocessing (VRL) will happen + /// before indexing. + pub validate: bool, pub shard_status_tx: watch::Sender, pub shard_status_rx: watch::Receiver, /// Instant at which the shard was last written to. @@ -69,6 +72,7 @@ impl IngesterShard { truncation_position_inclusive: Position, doc_mapper: Arc, now: Instant, + validate: bool, ) -> Self { let shard_status = (shard_state, replication_position_inclusive.clone()); let (shard_status_tx, shard_status_rx) = watch::channel(shard_status); @@ -79,6 +83,7 @@ impl IngesterShard { truncation_position_inclusive, is_advertisable: false, doc_mapper_opt: Some(doc_mapper), + validate, shard_status_tx, shard_status_rx, last_write_instant: now, @@ -103,6 +108,7 @@ impl IngesterShard { // anyway. is_advertisable: false, doc_mapper_opt: None, + validate: false, shard_status_tx, shard_status_rx, last_write_instant: now, @@ -115,6 +121,7 @@ impl IngesterShard { truncation_position_inclusive: Position, doc_mapper_opt: Option>, now: Instant, + validate: bool, ) -> Self { let shard_status = (shard_state, replication_position_inclusive.clone()); let (shard_status_tx, shard_status_rx) = watch::channel(shard_status); @@ -125,6 +132,7 @@ impl IngesterShard { truncation_position_inclusive, is_advertisable: false, doc_mapper_opt, + validate, shard_status_tx, shard_status_rx, last_write_instant: now, @@ -253,6 +261,7 @@ mod tests { Position::Beginning, doc_mapper, Instant::now(), + true, ); assert!(matches!( &primary_shard.shard_type, @@ -305,6 +314,7 @@ mod tests { Position::Beginning, None, Instant::now(), + false, ); solo_shard.assert_is_solo(); assert!(!solo_shard.is_replica()); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/state.rs b/quickwit/quickwit-ingest/src/ingest_v2/state.rs index 050155919d7..11993a6cad4 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/state.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/state.rs @@ -190,6 +190,7 @@ impl IngesterState { truncation_position_inclusive, None, now, + false, ); // We want to advertise the shard as read-only right away. solo_shard.is_advertisable = true; diff --git a/quickwit/quickwit-proto/protos/quickwit/ingester.proto b/quickwit/quickwit-proto/protos/quickwit/ingester.proto index a65d7d9fcfd..8874176b941 100644 --- a/quickwit/quickwit-proto/protos/quickwit/ingester.proto +++ b/quickwit/quickwit-proto/protos/quickwit/ingester.proto @@ -260,6 +260,7 @@ message InitShardSubrequest { uint32 subrequest_id = 1; quickwit.ingest.Shard shard = 2; string doc_mapping_json = 3; + bool validate_docs = 4; } message InitShardsResponse { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs index 0b169811727..ccb13a5e44d 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs @@ -360,6 +360,8 @@ pub struct InitShardSubrequest { pub shard: ::core::option::Option, #[prost(string, tag = "3")] pub doc_mapping_json: ::prost::alloc::string::String, + #[prost(bool, tag = "4")] + pub validate_docs: bool, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)]