Skip to content

Commit

Permalink
Merge branch 'main' into trinity/allow-concat-default-field
Browse files Browse the repository at this point in the history
  • Loading branch information
trinity-1686a authored Aug 21, 2024
2 parents ca7276b + 108ad21 commit c9d4d7f
Show file tree
Hide file tree
Showing 16 changed files with 645 additions and 33 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ test-all: docker-compose-up
test-failpoints:
@$(MAKE) -C $(QUICKWIT_SRC) test-failpoints

test-lambda: DOCKER_SERVICES=localstack
test-lambda: docker-compose-up
@$(MAKE) -C $(QUICKWIT_SRC) test-lambda

# This will build and push all custom cross images for cross-compilation.
# You will need to login into Docker Hub with the `quickwit` account.
IMAGE_TAGS = x86_64-unknown-linux-gnu aarch64-unknown-linux-gnu x86_64-unknown-linux-musl aarch64-unknown-linux-musl
Expand Down Expand Up @@ -104,4 +108,3 @@ build-rustdoc:
.PHONY: build-ui
build-ui:
$(MAKE) -C $(QUICKWIT_SRC) build-ui

8 changes: 8 additions & 0 deletions quickwit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ test-all:
test-failpoints:
cargo nextest run --test failpoints --features fail/failpoints

test-lambda:
AWS_ACCESS_KEY_ID=ignored \
AWS_SECRET_ACCESS_KEY=ignored \
AWS_REGION=us-east-1 \
QW_S3_ENDPOINT=http://localhost:4566 \
QW_S3_FORCE_PATH_STYLE_ACCESS=1 \
cargo nextest run --all-features -p quickwit-lambda --retries 1

# TODO: to be replaced by https://github.com/quickwit-oss/quickwit/issues/237
TARGET ?= x86_64-unknown-linux-gnu
.PHONY: build
Expand Down
39 changes: 39 additions & 0 deletions quickwit/quickwit-cli/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,45 @@ async fn test_ingest_docs_cli() {
));
}

#[tokio::test]
async fn test_reingest_same_file_cli() {
quickwit_common::setup_logging_for_tests();
let index_id = append_random_suffix("test-index-simple");
let test_env = create_test_env(index_id.clone(), TestStorageType::LocalFileSystem)
.await
.unwrap();
test_env.start_server().await.unwrap();
create_logs_index(&test_env).await.unwrap();
let index_uid = test_env.index_metadata().await.unwrap().index_uid;

for _ in 0..2 {
let args = LocalIngestDocsArgs {
config_uri: test_env.resource_files.config.clone(),
index_id: index_id.clone(),
input_path_opt: Some(test_env.resource_files.log_docs.clone()),
input_format: SourceInputFormat::Json,
overwrite: false,
clear_cache: true,
vrl_script: None,
};

local_ingest_docs_cli(args).await.unwrap();
}

let splits_metadata: Vec<SplitMetadata> = test_env
.metastore()
.await
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap())
.await
.unwrap()
.collect_splits_metadata()
.await
.unwrap();

assert_eq!(splits_metadata.len(), 1);
assert_eq!(splits_metadata[0].num_docs, 5);
}

/// Helper function to compare a json payload.
///
/// It will serialize and deserialize the value in order
Expand Down
137 changes: 136 additions & 1 deletion quickwit/quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,13 +620,52 @@ impl DocMapper for DefaultDocMapper {
let mut field_path: Vec<&str> = Vec::new();
self.field_mappings
.populate_json(&mut named_doc, &mut field_path, &mut doc_json);

if let Some(source_json) = extract_single_obj(&mut named_doc, SOURCE_FIELD_NAME)? {
doc_json.insert(
SOURCE_FIELD_NAME.to_string(),
JsonValue::Object(source_json),
);
}
if matches!(
self.mode,
Mode::Dynamic(ref opt) if opt.stored
) {
// if we are in dynamic mode and there are other fields lefts, we should print them.
// They probably come from older schemas when these fields had a dedicated entry
'field: for (key, mut value) in named_doc {
if key.starts_with('_') {
// this is an internal field, not meant to be shown
continue 'field;
}
let Ok(path) = crate::routing_expression::parse_field_name(&key) else {
continue 'field;
};
let Some((last_segment, path)) = path.split_last() else {
continue 'field;
};
let mut map = &mut doc_json;
for segment in path {
let obj = if map.contains_key(&**segment) {
// we have to do this strange dance to please the borrowchecker
map.get_mut(&**segment).unwrap()
} else {
map.insert(segment.to_string(), serde_json::Map::new().into());
map.get_mut(&**segment).unwrap()
};
let JsonValue::Object(ref mut inner_map) = obj else {
continue 'field;
};
map = inner_map;
}
map.entry(&**last_segment).or_insert_with(|| {
if value.len() == 1 {
tantivy_value_to_json(value.pop().unwrap())
} else {
JsonValue::Array(value.into_iter().map(tantivy_value_to_json).collect())
}
});
}
}

Ok(doc_json)
}
Expand Down Expand Up @@ -2375,4 +2414,100 @@ mod tests {
);
}
}

#[test]
fn test_deserialize_doc_after_mapping_change_json_to_obj() {
use serde::Deserialize;
use tantivy::Document;

let old_mapper = json!({
"field_mappings": [
{"name": "body", "type": "json"}
]
});

let builder = DefaultDocMapperBuilder::deserialize(old_mapper.clone()).unwrap();
let old_mapper = builder.try_build().unwrap();

let JsonValue::Object(doc) = json!({
"body": {
"field.1": "hola",
"field2": {
"key": "val",
"arr": [1,"abc", {"k": "v"}],
},
"field3": ["a", "b"]
}
}) else {
panic!();
};
let tantivy_doc = old_mapper.doc_from_json_obj(doc.clone(), 0).unwrap().1;
let named_doc = tantivy_doc.to_named_doc(&old_mapper.schema());

let new_mapper = json!({
"field_mappings": [
{
"name": "body",
"type": "object",
"field_mappings": [
{"name": "field.1", "type": "text"},
{"name": "field2", "type": "json"},
{"name": "field3", "type": "array<text>"},
]
}
]
});
let builder = DefaultDocMapperBuilder::deserialize(new_mapper).unwrap();
let new_mapper = builder.try_build().unwrap();

assert_eq!(new_mapper.doc_to_json(named_doc.0).unwrap(), doc);
}

#[test]
fn test_deserialize_doc_after_mapping_change_obj_to_json() {
use serde::Deserialize;
use tantivy::Document;

let old_mapper = json!({
"field_mappings": [
{
"name": "body",
"type": "object",
"field_mappings": [
{"name": "field.1", "type": "text"},
{"name": "field2", "type": "json"},
{"name": "field3", "type": "array<text>"},
]
}
]
});

let builder = DefaultDocMapperBuilder::deserialize(old_mapper.clone()).unwrap();
let old_mapper = builder.try_build().unwrap();

let JsonValue::Object(doc) = json!({
"body": {
"field.1": "hola",
"field2": {
"key": "val",
"arr": [1,"abc", {"k": "v"}],
},
"field3": ["a", "b"]
}
}) else {
panic!();
};
let tantivy_doc = old_mapper.doc_from_json_obj(doc.clone(), 0).unwrap().1;
let named_doc = tantivy_doc.to_named_doc(&old_mapper.schema());

let new_mapper = json!({
"field_mappings": [
{"name": "body", "type": "json"}
]
});
let builder = DefaultDocMapperBuilder::deserialize(new_mapper).unwrap();
let new_mapper = builder.try_build().unwrap();

assert_eq!(new_mapper.doc_to_json(named_doc.0).unwrap(), doc);
}
}
Loading

0 comments on commit c9d4d7f

Please sign in to comment.