From f5fdc071d300bd6616c5dcc64a78dd7a8f4b78d0 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Tue, 20 Aug 2024 14:47:09 +0200 Subject: [PATCH] fix conversion from old to new mapping for json and object (#5281) * fix conversion from old to new mapping for json and object * add more unit tests --- .../src/default_doc_mapper/default_mapper.rs | 137 ++++++++- .../src/default_doc_mapper/mapping_tree.rs | 267 +++++++++++++++++- .../src/routing_expression/mod.rs | 1 + .../src/actors/doc_processor.rs | 6 +- .../tests/update_tests/doc_mapping_tests.rs | 50 +++- 5 files changed, 452 insertions(+), 9 deletions(-) diff --git a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs index 6f05700fd92..f94cb39340d 100644 --- a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs +++ b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs @@ -620,13 +620,52 @@ impl DocMapper for DefaultDocMapper { let mut field_path: Vec<&str> = Vec::new(); self.field_mappings .populate_json(&mut named_doc, &mut field_path, &mut doc_json); - if let Some(source_json) = extract_single_obj(&mut named_doc, SOURCE_FIELD_NAME)? { doc_json.insert( SOURCE_FIELD_NAME.to_string(), JsonValue::Object(source_json), ); } + if matches!( + self.mode, + Mode::Dynamic(ref opt) if opt.stored + ) { + // if we are in dynamic mode and there are other fields lefts, we should print them. + // They probably come from older schemas when these fields had a dedicated entry + 'field: for (key, mut value) in named_doc { + if key.starts_with('_') { + // this is an internal field, not meant to be shown + continue 'field; + } + let Ok(path) = crate::routing_expression::parse_field_name(&key) else { + continue 'field; + }; + let Some((last_segment, path)) = path.split_last() else { + continue 'field; + }; + let mut map = &mut doc_json; + for segment in path { + let obj = if map.contains_key(&**segment) { + // we have to do this strange dance to please the borrowchecker + map.get_mut(&**segment).unwrap() + } else { + map.insert(segment.to_string(), serde_json::Map::new().into()); + map.get_mut(&**segment).unwrap() + }; + let JsonValue::Object(ref mut inner_map) = obj else { + continue 'field; + }; + map = inner_map; + } + map.entry(&**last_segment).or_insert_with(|| { + if value.len() == 1 { + tantivy_value_to_json(value.pop().unwrap()) + } else { + JsonValue::Array(value.into_iter().map(tantivy_value_to_json).collect()) + } + }); + } + } Ok(doc_json) } @@ -2353,4 +2392,100 @@ mod tests { ); } } + + #[test] + fn test_deserialize_doc_after_mapping_change_json_to_obj() { + use serde::Deserialize; + use tantivy::Document; + + let old_mapper = json!({ + "field_mappings": [ + {"name": "body", "type": "json"} + ] + }); + + let builder = DefaultDocMapperBuilder::deserialize(old_mapper.clone()).unwrap(); + let old_mapper = builder.try_build().unwrap(); + + let JsonValue::Object(doc) = json!({ + "body": { + "field.1": "hola", + "field2": { + "key": "val", + "arr": [1,"abc", {"k": "v"}], + }, + "field3": ["a", "b"] + } + }) else { + panic!(); + }; + let tantivy_doc = old_mapper.doc_from_json_obj(doc.clone(), 0).unwrap().1; + let named_doc = tantivy_doc.to_named_doc(&old_mapper.schema()); + + let new_mapper = json!({ + "field_mappings": [ + { + "name": "body", + "type": "object", + "field_mappings": [ + {"name": "field.1", "type": "text"}, + {"name": "field2", "type": "json"}, + {"name": "field3", "type": "array"}, + ] + } + ] + }); + let builder = DefaultDocMapperBuilder::deserialize(new_mapper).unwrap(); + let new_mapper = builder.try_build().unwrap(); + + assert_eq!(new_mapper.doc_to_json(named_doc.0).unwrap(), doc); + } + + #[test] + fn test_deserialize_doc_after_mapping_change_obj_to_json() { + use serde::Deserialize; + use tantivy::Document; + + let old_mapper = json!({ + "field_mappings": [ + { + "name": "body", + "type": "object", + "field_mappings": [ + {"name": "field.1", "type": "text"}, + {"name": "field2", "type": "json"}, + {"name": "field3", "type": "array"}, + ] + } + ] + }); + + let builder = DefaultDocMapperBuilder::deserialize(old_mapper.clone()).unwrap(); + let old_mapper = builder.try_build().unwrap(); + + let JsonValue::Object(doc) = json!({ + "body": { + "field.1": "hola", + "field2": { + "key": "val", + "arr": [1,"abc", {"k": "v"}], + }, + "field3": ["a", "b"] + } + }) else { + panic!(); + }; + let tantivy_doc = old_mapper.doc_from_json_obj(doc.clone(), 0).unwrap().1; + let named_doc = tantivy_doc.to_named_doc(&old_mapper.schema()); + + let new_mapper = json!({ + "field_mappings": [ + {"name": "body", "type": "json"} + ] + }); + let builder = DefaultDocMapperBuilder::deserialize(new_mapper).unwrap(); + let new_mapper = builder.try_build().unwrap(); + + assert_eq!(new_mapper.doc_to_json(named_doc.0).unwrap(), doc); + } } diff --git a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mapping_tree.rs b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mapping_tree.rs index 920e1cc0c55..204b124ee8c 100644 --- a/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mapping_tree.rs +++ b/quickwit/quickwit-doc-mapper/src/default_doc_mapper/mapping_tree.rs @@ -450,8 +450,52 @@ fn extract_json_val( field_path: &[&str], cardinality: Cardinality, ) -> Option { - let full_path = field_path.join("."); - let vals = named_doc.remove(&full_path)?; + let mut full_path = field_path.join("."); + let vals: Vec = if let Some(vals) = named_doc.remove(&full_path) { + // we have our value directly + vals + } else { + let mut end_range = full_path.clone(); + full_path.push('.'); + // '/' is the character directly after . lexicographically + end_range.push('/'); + + // TODO use BTreeMap::drain once it exists and is stable + let matches = named_doc + .range::(&full_path..&end_range) + .map(|(k, _)| k.clone()) + .collect::>(); + + if !matches.is_empty() { + let mut map = Vec::new(); + for match_ in matches { + let Some(suffix) = match_.strip_prefix(&full_path) else { + // this should never happen + continue; + }; + let Some(tantivy_values) = named_doc.remove(&match_) else { + continue; + }; + + add_key_to_vec_map(&mut map, suffix, tantivy_values); + } + vec![TantivyValue::Object(map)] + } else { + // we didn't find our value, or any child of it, but maybe what we search is actually a + // json field closer to the root? + let mut split_point_iter = (1..(field_path.len())).rev(); + loop { + let split_point = split_point_iter.next()?; + let (doc_path, json_path) = field_path.split_at(split_point); + let prefix_path = doc_path.join("."); + if let Some(vals) = named_doc.get_mut(&prefix_path) { + // if we found a possible json field, there is no point in searching higher, our + // result would have been in it. + break extract_val_from_tantivy_val(json_path, vals); + } + } + } + }; let mut vals_with_correct_type_it = vals .into_iter() .flat_map(|value| value_to_json(value, leaf_type)); @@ -461,6 +505,109 @@ fn extract_json_val( } } +/// extract a subfield from a TantivyValue. The path must be non-empty +fn extract_val_from_tantivy_val( + full_path: &[&str], + tantivy_values: &mut [TantivyValue], +) -> Vec { + // return *objects* matching path + fn extract_val_aux<'a>( + path: &[&str], + tantivy_values: &'a mut [TantivyValue], + ) -> Vec<&'a mut Vec<(String, TantivyValue)>> { + let mut maps: Vec<&'a mut Vec<(String, TantivyValue)>> = tantivy_values + .iter_mut() + .filter_map(|value| { + if let TantivyValue::Object(map) = value { + Some(map) + } else { + None + } + }) + .collect(); + let mut scratch_buffer = Vec::new(); + for path_segment in path { + scratch_buffer.extend( + maps.drain(..) + .flatten() + .filter(|(key, _)| key == path_segment) + .filter_map(|(_, value)| { + if let TantivyValue::Object(map) = value { + Some(map) + } else { + None + } + }), + ); + std::mem::swap(&mut maps, &mut scratch_buffer); + } + maps + } + + let Some((last_segment, path)) = full_path.split_last() else { + return Vec::new(); + }; + + let mut results = Vec::new(); + for object in extract_val_aux(path, tantivy_values) { + // TODO use extract_if once it's stable + let mut i = 0; + while i < object.len() { + if object[i].0 == *last_segment { + let (_, val) = object.swap_remove(i); + match val { + TantivyValue::Array(mut vals) => results.append(&mut vals), + _ => results.push(val), + } + } else { + i += 1; + } + } + } + + results +} + +fn add_key_to_vec_map( + mut map: &mut Vec<(String, TantivyValue)>, + suffix: &str, + mut tantivy_value: Vec, +) { + let Ok(full_inner_path) = crate::routing_expression::parse_field_name(suffix) else { + return; + }; + let Some((last_segment, inner_path)) = full_inner_path.split_last() else { + return; + }; + for path_segment in inner_path { + // there is a cleaner way with find(), but the borrow checker is unhappy for no real reason + // thinking there are lifetime issues between two exclusive branches + map = if let Some(pos) = map.iter().position(|(key, _)| key == path_segment) { + if let (_, TantivyValue::Object(ref mut value)) = map[pos] { + value + } else { + // there is already a key before the end of the path ?! + return; + } + } else { + map.push((path_segment.to_string(), TantivyValue::Object(Vec::new()))); + let TantivyValue::Object(ref mut new_map) = map.last_mut().unwrap().1 else { + unreachable!(); + }; + new_map + } + } + // if we are here the doc mapping was changed from obj to json. We don't really know if the + // field of that obj was multivalued or not. As a best effort, we say it was multivalued + // if we have !=1 value. We could always return a vec, but then *every* field would be + // transformed into an array of itself. + if tantivy_value.len() == 1 { + map.push((last_segment.to_string(), tantivy_value.pop().unwrap())); + } else { + map.push((last_segment.to_string(), TantivyValue::Array(tantivy_value))); + } +} + fn value_to_string(value: TantivyValue) -> Result { match value { TantivyValue::Str(s) => return Ok(JsonValue::String(s)), @@ -1436,7 +1583,10 @@ mod tests { use time::macros::datetime; use time::OffsetDateTime; - use super::{value_to_json, JsonValueIterator, LeafType, MapOrArrayIter, MappingLeaf}; + use super::{ + add_key_to_vec_map, extract_val_from_tantivy_val, value_to_json, JsonValueIterator, + LeafType, MapOrArrayIter, MappingLeaf, + }; use crate::default_doc_mapper::date_time_type::QuickwitDateTimeOptions; use crate::default_doc_mapper::field_mapping_entry::{ BinaryFormat, NumericOutputFormat, QuickwitBoolOptions, QuickwitBytesOptions, @@ -2093,4 +2243,115 @@ mod tests { vec![json!(1), json!(2), json!(3), json!(4)] ); } + + #[test] + fn test_extract_val_from_tantivy_val() { + let obj = TantivyValue::Object; + fn array(val: impl IntoIterator>) -> TantivyValue { + TantivyValue::Array(val.into_iter().map(Into::into).collect()) + } + + let mut sample = vec![obj(vec![ + ( + "some".to_string(), + obj(vec![ + ( + "path".to_string(), + obj(vec![("with.dots".to_string(), 1u64.into())]), + ), + ( + "other".to_string(), + obj(vec![("path".to_string(), array([2u64, 3]))]), + ), + ]), + ), + ("short".to_string(), 4u64.into()), + ])]; + + assert_eq!( + extract_val_from_tantivy_val(&["some", "other"], &mut sample), + vec![obj(vec![("path".to_string(), array([2u64, 3]))])] + ); + assert_eq!( + extract_val_from_tantivy_val(&["some", "other"], &mut sample), + Vec::new() + ); + assert_eq!( + extract_val_from_tantivy_val(&["some", "path", "with.dots"], &mut sample), + vec![1u64.into()] + ); + assert_eq!( + extract_val_from_tantivy_val(&["some", "path", "with.dots"], &mut sample), + Vec::new() + ); + assert_eq!( + extract_val_from_tantivy_val(&["short"], &mut sample), + vec![4u64.into()] + ); + assert_eq!( + extract_val_from_tantivy_val(&["short"], &mut sample), + Vec::new() + ); + } + + #[test] + fn test_add_key_to_vec_map() { + let obj = TantivyValue::Object; + fn array(val: impl IntoIterator>) -> TantivyValue { + TantivyValue::Array(val.into_iter().map(Into::into).collect()) + } + + let mut map = Vec::new(); + + add_key_to_vec_map(&mut map, "some.path.with\\.dots", vec![1u64.into()]); + assert_eq!( + map, + &[( + "some".to_string(), + obj(vec![( + "path".to_string(), + obj(vec![("with.dots".to_string(), 1u64.into())]) + )]) + )] + ); + + add_key_to_vec_map(&mut map, "some.other.path", vec![2u64.into(), 3u64.into()]); + assert_eq!( + map, + &[( + "some".to_string(), + obj(vec![ + ( + "path".to_string(), + obj(vec![("with.dots".to_string(), 1u64.into())]) + ), + ( + "other".to_string(), + obj(vec![("path".to_string(), array([2u64, 3]))]) + ), + ]) + )] + ); + + add_key_to_vec_map(&mut map, "short", vec![4u64.into()]); + assert_eq!( + map, + &[ + ( + "some".to_string(), + obj(vec![ + ( + "path".to_string(), + obj(vec![("with.dots".to_string(), 1u64.into())]) + ), + ( + "other".to_string(), + obj(vec![("path".to_string(), array([2u64, 3]))]) + ), + ]) + ), + ("short".to_string(), 4u64.into()) + ] + ); + } } diff --git a/quickwit/quickwit-doc-mapper/src/routing_expression/mod.rs b/quickwit/quickwit-doc-mapper/src/routing_expression/mod.rs index b72636d9beb..651d59b2ce6 100644 --- a/quickwit/quickwit-doc-mapper/src/routing_expression/mod.rs +++ b/quickwit/quickwit-doc-mapper/src/routing_expression/mod.rs @@ -23,6 +23,7 @@ use std::hash::{Hash, Hasher}; use std::str::FromStr; use std::sync::Arc; +pub(crate) use expression_dsl::parse_field_name; use serde_json::Value as JsonValue; use siphasher::sip::SipHasher; diff --git a/quickwit/quickwit-indexing/src/actors/doc_processor.rs b/quickwit/quickwit-indexing/src/actors/doc_processor.rs index 31d6f25efd6..a41576a686b 100644 --- a/quickwit/quickwit-indexing/src/actors/doc_processor.rs +++ b/quickwit/quickwit-indexing/src/actors/doc_processor.rs @@ -719,9 +719,9 @@ mod tests { }, "body": "happy", "response_date": "2021-12-19T16:39:59Z", - "response_payload": "YWJj", - "response_time": 2.0, - "timestamp": 1628837062 + "response_payload": "YWJj", + "response_time": 2.0, + "timestamp": 1628837062 }) ); universe.assert_quit().await; diff --git a/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs b/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs index 2df523d54c1..8d9725ab7f0 100644 --- a/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/update_tests/doc_mapping_tests.rs @@ -229,7 +229,6 @@ async fn test_update_doc_mapping_json_to_text() { } #[tokio::test] -#[ignore] async fn test_update_doc_mapping_json_to_object() { let index_id = "update-json-to-object"; let original_doc_mappings = json!({ @@ -277,6 +276,54 @@ async fn test_update_doc_mapping_json_to_object() { .await; } +#[tokio::test] +async fn test_update_doc_mapping_object_to_json() { + let index_id = "update-json-to-object"; + let original_doc_mappings = json!({ + "field_mappings": [ + { + "name": "body", + "type": "object", + "field_mappings": [ + {"name": "field1", "type": "text"}, + {"name": "field2", "type": "text"}, + ] + } + ] + }); + let ingest_before_update = &[ + json!({"body": {"field1": "hello"}}), + json!({"body": {"field2": "world"}}), + ]; + let updated_doc_mappings = json!({ + "field_mappings": [ + {"name": "body", "type": "json"} + ] + }); + let ingest_after_update = &[ + json!({"body": {"field1": "hola"}}), + json!({"body": {"field2": "mundo"}}), + ]; + validate_search_across_doc_mapping_updates( + index_id, + original_doc_mappings, + ingest_before_update, + updated_doc_mappings, + ingest_after_update, + &[ + ( + "body.field1:hello", + Ok(&[json!({"body": {"field1": "hello"}})]), + ), + ( + "body.field1:hola", + Ok(&[json!({"body": {"field1": "hola"}})]), + ), + ], + ) + .await; +} + #[tokio::test] async fn test_update_doc_mapping_tokenizer_default_to_raw() { let index_id = "update-tokenizer-default-to-raw"; @@ -448,7 +495,6 @@ async fn test_update_doc_mapping_unindexed_to_indexed() { } #[tokio::test] -#[ignore] async fn test_update_doc_mapping_strict_to_dynamic() { let index_id = "update-strict-to-dynamic"; let original_doc_mappings = json!({