Skip to content

Commit

Permalink
ingestv2 doc validation improvements (#5375)
Browse files Browse the repository at this point in the history
* validate timestamp field

* let control-plane decide whether shards should validate documents
  • Loading branch information
trinity-1686a authored Sep 3, 2024
1 parent fc5ef16 commit 707f22a
Show file tree
Hide file tree
Showing 12 changed files with 461 additions and 19 deletions.
135 changes: 132 additions & 3 deletions quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,11 @@ impl IngestController {
let index_metadata = model
.index_metadata(&source_uid.index_uid)
.expect("index should exist");
let validate_docs = model
.source_metadata(source_uid)
.expect("source should exist")
.transform_config
.is_none();
let doc_mapping = &index_metadata.index_config.doc_mapping;
let doc_mapping_uid = doc_mapping.doc_mapping_uid;
let doc_mapping_json = serde_utils::to_json_str(doc_mapping)?;
Expand All @@ -799,6 +804,7 @@ impl IngestController {
subrequest_id: subrequest_id as u32,
shard: Some(shard),
doc_mapping_json,
validate_docs,
};
init_shard_subrequests.push(init_shard_subrequest);
}
Expand Down Expand Up @@ -1324,7 +1330,7 @@ mod tests {

let shard = subrequest.shard();
assert_eq!(shard.index_uid(), &index_uid_1_clone);
assert_eq!(shard.source_id, "test-source");
assert_eq!(shard.source_id, source_id);
assert_eq!(shard.leader_id, "test-ingester-2");

let successes = vec![InitShardSuccess {
Expand Down Expand Up @@ -1507,7 +1513,7 @@ mod tests {

let shard = subrequest.shard();
assert_eq!(shard.index_uid(), &index_uid_0);
assert_eq!(shard.source_id, "test-source");
assert_eq!(shard.source_id, source_id);
assert_eq!(shard.leader_id, "test-ingester-1");

let successes = vec![InitShardSuccess {
Expand Down Expand Up @@ -1911,6 +1917,7 @@ mod tests {
..Default::default()
}),
doc_mapping_json: "{}".to_string(),
validate_docs: false,
},
InitShardSubrequest {
subrequest_id: 1,
Expand All @@ -1923,6 +1930,7 @@ mod tests {
..Default::default()
}),
doc_mapping_json: "{}".to_string(),
validate_docs: false,
},
InitShardSubrequest {
subrequest_id: 2,
Expand All @@ -1935,6 +1943,7 @@ mod tests {
..Default::default()
}),
doc_mapping_json: "{}".to_string(),
validate_docs: false,
},
InitShardSubrequest {
subrequest_id: 3,
Expand All @@ -1947,6 +1956,7 @@ mod tests {
..Default::default()
}),
doc_mapping_json: "{}".to_string(),
validate_docs: false,
},
InitShardSubrequest {
subrequest_id: 4,
Expand All @@ -1959,6 +1969,7 @@ mod tests {
..Default::default()
}),
doc_mapping_json: "{}".to_string(),
validate_docs: false,
},
];
let init_shards_response = controller
Expand Down Expand Up @@ -2033,6 +2044,10 @@ mod tests {
source_id: source_id.clone(),
};
let mut index_metadata = IndexMetadata::for_test("test-index", "ram://indexes/test-index");
index_metadata.sources.insert(
source_id.clone(),
SourceConfig::for_test(&source_id, quickwit_config::SourceParams::void()),
);

let doc_mapping_json = format!(
r#"{{
Expand Down Expand Up @@ -2159,8 +2174,12 @@ mod tests {
);

let index_uid = IndexUid::for_test("test-index", 0);
let index_metadata = IndexMetadata::for_test("test-index", "ram://indexes/test-index");
let mut index_metadata = IndexMetadata::for_test("test-index", "ram://indexes/test-index");
let source_id: SourceId = "test-source".to_string();
index_metadata.sources.insert(
source_id.clone(),
SourceConfig::for_test(&source_id, quickwit_config::SourceParams::void()),
);

let source_uid = SourceUid {
index_uid: index_uid.clone(),
Expand Down Expand Up @@ -2228,6 +2247,7 @@ mod tests {
assert_eq!(init_shard_request.subrequests.len(), 1);
let init_shard_subrequest: &InitShardSubrequest =
&init_shard_request.subrequests[0];
assert!(init_shard_subrequest.validate_docs);
Ok(InitShardsResponse {
successes: vec![InitShardSuccess {
subrequest_id: init_shard_subrequest.subrequest_id,
Expand Down Expand Up @@ -2311,6 +2331,115 @@ mod tests {
.unwrap();
}

#[tokio::test]
async fn test_ingest_controller_disable_validation_when_vrl() {
let mut mock_metastore = MockMetastoreService::new();
mock_metastore
.expect_open_shards()
.once()
.returning(|request| {
let subrequest: &OpenShardSubrequest = &request.subrequests[0];
let shard = Shard {
index_uid: subrequest.index_uid.clone(),
source_id: subrequest.source_id.clone(),
shard_id: subrequest.shard_id.clone(),
shard_state: ShardState::Open as i32,
leader_id: subrequest.leader_id.clone(),
follower_id: subrequest.follower_id.clone(),
doc_mapping_uid: subrequest.doc_mapping_uid,
publish_position_inclusive: Some(Position::Beginning),
publish_token: None,
update_timestamp: 1724158996,
};
let response = OpenShardsResponse {
subresponses: vec![OpenShardSubresponse {
subrequest_id: subrequest.subrequest_id,
open_shard: Some(shard),
}],
};
Ok(response)
});
let metastore = MetastoreServiceClient::from_mock(mock_metastore);
let ingester_pool = IngesterPool::default();
let replication_factor = 1;

let mut controller = IngestController::new(
metastore,
ingester_pool.clone(),
replication_factor,
TEST_SHARD_THROUGHPUT_LIMIT_MIB,
);

let index_uid = IndexUid::for_test("test-index", 0);
let mut index_metadata = IndexMetadata::for_test("test-index", "ram://indexes/test-index");
let source_id: SourceId = "test-source".to_string();
let mut source_config =
SourceConfig::for_test(&source_id, quickwit_config::SourceParams::void());
// set a vrl script
source_config.transform_config =
Some(quickwit_config::TransformConfig::new("".to_string(), None));
index_metadata
.sources
.insert(source_id.clone(), source_config);

let source_uid = SourceUid {
index_uid: index_uid.clone(),
source_id: source_id.clone(),
};
let mut model = ControlPlaneModel::default();
model.add_index(index_metadata);
let progress = Progress::default();

let shards = vec![Shard {
index_uid: Some(index_uid.clone()),
source_id: source_id.clone(),
shard_id: Some(ShardId::from(1)),
leader_id: "test-ingester".to_string(),
shard_state: ShardState::Open as i32,
..Default::default()
}];
model.insert_shards(&index_uid, &source_id, shards);

let mut mock_ingester = MockIngesterService::new();

mock_ingester.expect_init_shards().returning(
move |init_shard_request: InitShardsRequest| {
assert_eq!(init_shard_request.subrequests.len(), 1);
let init_shard_subrequest: &InitShardSubrequest =
&init_shard_request.subrequests[0];
// we have vrl, so no validation
assert!(!init_shard_subrequest.validate_docs);
Ok(InitShardsResponse {
successes: vec![InitShardSuccess {
subrequest_id: init_shard_subrequest.subrequest_id,
shard: init_shard_subrequest.shard.clone(),
}],
failures: Vec::new(),
})
},
);

let ingester = IngesterServiceClient::from_mock(mock_ingester);
ingester_pool.insert("test-ingester".into(), ingester);

let shard_infos = BTreeSet::from_iter([ShardInfo {
shard_id: ShardId::from(1),
shard_state: ShardState::Open,
short_term_ingestion_rate: RateMibPerSec(4),
long_term_ingestion_rate: RateMibPerSec(4),
}]);
let local_shards_update = LocalShardsUpdate {
leader_id: "test-ingester".into(),
source_uid: source_uid.clone(),
shard_infos,
};

controller
.handle_local_shards_update(local_shards_update, &mut model, &progress)
.await
.unwrap();
}

#[tokio::test]
async fn test_ingest_controller_try_scale_up_shards() {
let mut mock_metastore = MockMetastoreService::new();
Expand Down
6 changes: 6 additions & 0 deletions quickwit/quickwit-control-plane/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ impl ControlPlaneModel {
self.index_table.get(index_uid)
}

pub fn source_metadata(&self, source_uid: &SourceUid) -> Option<&SourceConfig> {
self.index_metadata(&source_uid.index_uid)?
.sources
.get(&source_uid.source_id)
}

fn update_metrics(&self) {
crate::metrics::CONTROL_PLANE_METRICS
.indexes_total
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ use tantivy::TantivyDocument as Document;
use super::field_mapping_entry::RAW_TOKENIZER_NAME;
use super::DefaultDocMapperBuilder;
use crate::default_doc_mapper::mapping_tree::{
build_mapping_tree, map_primitive_json_to_tantivy, JsonValueIterator, MappingNode,
MappingNodeRoot,
build_field_path_from_str, build_mapping_tree, map_primitive_json_to_tantivy,
JsonValueIterator, MappingNode, MappingNodeRoot,
};
use crate::default_doc_mapper::FieldMappingType;
use crate::doc_mapper::{JsonObject, Partition};
Expand Down Expand Up @@ -81,6 +81,8 @@ pub struct DefaultDocMapper {
default_search_field_names: Vec<String>,
/// Timestamp field name.
timestamp_field_name: Option<String>,
/// Timestamp field path (name parsed)
timestamp_field_path: Option<Vec<String>>,
/// Root node of the field mapping tree.
/// See [`MappingNode`].
field_mappings: MappingNode,
Expand Down Expand Up @@ -197,8 +199,12 @@ impl TryFrom<DefaultDocMapperBuilder> for DefaultDocMapper {
if !concatenate_dynamic_fields.is_empty() && dynamic_field.is_none() {
bail!("concatenate field has `include_dynamic_fields` set, but index isn't dynamic");
}
if let Some(timestamp_field_path) = &doc_mapping.timestamp_field {
validate_timestamp_field(timestamp_field_path, &field_mappings)?;
let timestamp_field_path = if let Some(timestamp_field_name) = &doc_mapping.timestamp_field
{
validate_timestamp_field(timestamp_field_name, &field_mappings)?;
Some(build_field_path_from_str(timestamp_field_name))
} else {
None
};
let schema = schema_builder.build();

Expand Down Expand Up @@ -288,6 +294,7 @@ impl TryFrom<DefaultDocMapperBuilder> for DefaultDocMapper {
document_size_field,
default_search_field_names,
timestamp_field_name: doc_mapping.timestamp_field,
timestamp_field_path,
field_mappings,
concatenate_dynamic_fields,
tag_field_names,
Expand Down Expand Up @@ -513,6 +520,32 @@ impl DocMapper for DefaultDocMapper {
let mut field_path = Vec::new();
self.field_mappings
.validate_from_json(json_obj, is_strict, &mut field_path)?;
if let Some(timestamp_field_path) = &self.timestamp_field_path {
let missing_ts_field =
|| DocParsingError::RequiredField("timestamp field is required".to_string());
match &timestamp_field_path[..] {
[] => (), // ?
[single_part] => {
let obj = json_obj.get(single_part).ok_or_else(missing_ts_field)?;
if !(obj.is_string() || obj.is_number()) {
return Err(missing_ts_field());
}
}
[first_part, more_part @ ..] => {
let mut obj = json_obj.get(first_part).ok_or_else(missing_ts_field)?;
for part in more_part {
obj = obj
.as_object()
.ok_or_else(missing_ts_field)?
.get(part)
.ok_or_else(missing_ts_field)?;
}
if !(obj.is_string() || obj.is_number()) {
return Err(missing_ts_field());
}
}
};
}
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1410,7 +1410,7 @@ fn field_name_for_field_path(field_path: &[&str]) -> String {
/// starting from the root of the document.
/// Dots '.' define the boundaries between field names.
/// If a dot is part of a field name, it must be escaped with '\'.
fn build_field_path_from_str(field_path_as_str: &str) -> Vec<String> {
pub(crate) fn build_field_path_from_str(field_path_as_str: &str) -> Vec<String> {
let mut field_path = Vec::new();
let mut current_path_fragment = String::new();
let mut escaped = false;
Expand Down
Loading

0 comments on commit 707f22a

Please sign in to comment.