Skip to content

Commit

Permalink
revisit parsing of configuration (#3691)
Browse files Browse the repository at this point in the history
* initial work on serde_multikey

* parse DocMapping with serde_multikey

* parse field config with serde_multikey

* add more log level to kafka and always parse timezone from transform

* use serde_multikey in DefaultDocMapper
  • Loading branch information
trinity-1686a authored Aug 3, 2023
1 parent b60943d commit 06ac446
Show file tree
Hide file tree
Showing 24 changed files with 919 additions and 215 deletions.
1 change: 1 addition & 0 deletions docs/configuration/index-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ fast:
| ------------- | ------------- | ------------- |
| `description` | Optional description for the field. | `None` |
| `stored` | Whether value is stored in the document store | `true` |
| `indexed` | Whether value should be indexed so it can be searhced | `true` |
| `tokenizer` | Name of the `Tokenizer`. ([See tokenizers](#description-of-available-tokenizers)) for a list of available tokenizers. | `default` |
| `record` | Describes the amount of information indexed, choices between `basic`, `freq` and `position` | `basic` |
| `fieldnorms` | Whether to store fieldnorms for the field. Fieldnorms are required to calculate the BM25 Score of the document. | `false` |
Expand Down
4 changes: 4 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ sqlx = { version = "0.7", features = [
"migrate",
"time",
] }
syn = "2.0.11"
syn = { version = "2.0.11", features = [ "extra-traits", "full", "parsing" ]}
sync_wrapper = "0.1.2"
tabled = { version = "0.8", features = ["color"] }
tempfile = "3"
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ vrl-stdlib = { workspace = true, optional=true }

quickwit-common = { workspace = true }
quickwit-doc-mapper = { workspace = true }
quickwit-macros = { workspace = true }

[dev-dependencies]
tokio = { workspace = true }
Expand Down
33 changes: 21 additions & 12 deletions quickwit/quickwit-config/src/index_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use cron::Schedule;
use humantime::parse_duration;
use quickwit_common::uri::Uri;
use quickwit_doc_mapper::{
DefaultDocMapper, DefaultDocMapperBuilder, DocMapper, FieldMappingEntry, ModeType,
DefaultDocMapper, DefaultDocMapperBuilder, DocMapper, FieldMappingEntry, Mode, ModeType,
QuickwitJsonOptions, TokenizerEntry,
};
use serde::{Deserialize, Serialize};
Expand All @@ -44,9 +44,10 @@ use crate::TestableForRegression;

// Note(fmassot): `DocMapping` is a struct only used for
// serialization/deserialization of `DocMapper` parameters.
// This is partly a duplicate of the `DocMapper` and can
// be viewed as a temporary hack for 0.2 release before
// This is partly a duplicate of the `DefaultDocMapper` and
// can be viewed as a temporary hack for 0.2 release before
// refactoring.
#[quickwit_macros::serde_multikey]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
pub struct DocMapping {
Expand All @@ -66,10 +67,17 @@ pub struct DocMapping {
pub store_source: bool,
#[serde(default)]
pub timestamp_field: Option<String>,
#[serde(default)]
pub mode: ModeType,
#[serde(skip_serializing_if = "Option::is_none")]
pub dynamic_mapping: Option<QuickwitJsonOptions>,
#[serde_multikey(
deserializer = Mode::from_parts,
serializer = Mode::into_parts,
fields = (
#[serde(default)]
mode: ModeType,
#[serde(skip_serializing_if = "Option::is_none")]
dynamic_mapping: Option<QuickwitJsonOptions>
),
)]
pub mode: Mode,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub partition_key: Option<String>,
Expand Down Expand Up @@ -436,8 +444,7 @@ impl TestableForRegression for IndexConfig {
.map(|tag_field| tag_field.to_string())
.collect::<BTreeSet<String>>(),
store_source: true,
mode: ModeType::Dynamic,
dynamic_mapping: None,
mode: Mode::default(),
partition_key: Some("tenant_id".to_string()),
max_num_partitions: NonZeroU32::new(100).unwrap(),
timestamp_field: Some("timestamp".to_string()),
Expand Down Expand Up @@ -514,8 +521,7 @@ pub fn build_doc_mapper(
timestamp_field: doc_mapping.timestamp_field.clone(),
field_mappings: doc_mapping.field_mappings.clone(),
tag_fields: doc_mapping.tag_fields.iter().cloned().collect(),
mode: doc_mapping.mode,
dynamic_mapping: doc_mapping.dynamic_mapping.clone(),
mode: doc_mapping.mode.clone(),
partition_key: doc_mapping.partition_key.clone(),
max_num_partitions: doc_mapping.max_num_partitions,
tokenizers: doc_mapping.tokenizers.clone(),
Expand Down Expand Up @@ -713,7 +719,10 @@ mod tests {
&Uri::from_well_formed("s3://my-index"),
)
.unwrap();
assert_eq!(minimal_config.doc_mapping.mode, ModeType::Dynamic);
assert_eq!(
minimal_config.doc_mapping.mode.mode_type(),
ModeType::Dynamic
);
}

#[test]
Expand Down
45 changes: 24 additions & 21 deletions quickwit/quickwit-config/src/source_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl TestableForRegression for SourceConfig {
}),
transform_config: Some(TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone_opt: None,
timezone: default_timezone(),
}),
input_format: SourceInputFormat::Json,
}
Expand Down Expand Up @@ -413,9 +413,12 @@ pub struct TransformConfig {

/// Timezone used in the VRL [`Program`](vrl::compiler::Program) for date and time
/// manipulations. Defaults to `UTC` if not timezone is specified.
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "timezone")]
timezone_opt: Option<String>,
#[serde(default = "default_timezone")]
timezone: String,
}

fn default_timezone() -> String {
"UTC".to_string()
}

impl TransformConfig {
Expand All @@ -424,7 +427,7 @@ impl TransformConfig {
pub fn new(vrl_script: String, timezone_opt: Option<String>) -> Self {
Self {
vrl_script,
timezone_opt,
timezone: timezone_opt.unwrap_or_else(default_timezone),
}
}

Expand All @@ -450,11 +453,11 @@ impl TransformConfig {
&self,
) -> anyhow::Result<(vrl::compiler::Program, vrl::compiler::TimeZone)> {
use anyhow::Context;
let timezone_str = self.timezone_opt.as_deref().unwrap_or("UTC");
let timezone = vrl::compiler::TimeZone::parse(timezone_str).with_context(|| {
let timezone = vrl::compiler::TimeZone::parse(&self.timezone).with_context(|| {
format!(
"Failed to parse timezone: `{timezone_str}`. Timezone must be a valid name \
in the TZ database: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones"
"Failed to parse timezone: `{}`. Timezone must be a valid name \
in the TZ database: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones",
self.timezone,
)
})?;
// Append "\n." to the script to return the entire document and not only the modified
Expand Down Expand Up @@ -487,7 +490,7 @@ impl TransformConfig {
pub fn for_test(vrl_script: &str) -> Self {
Self {
vrl_script: vrl_script.to_string(),
timezone_opt: None,
timezone: default_timezone(),
}
}
}
Expand Down Expand Up @@ -532,7 +535,7 @@ mod tests {
}),
transform_config: Some(TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone_opt: Some("local".to_string()),
timezone: "local".to_string(),
}),
input_format: SourceInputFormat::Json,
};
Expand Down Expand Up @@ -628,7 +631,7 @@ mod tests {
}),
transform_config: Some(TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone_opt: Some("local".to_string()),
timezone: "local".to_string(),
}),
input_format: SourceInputFormat::Json,
};
Expand Down Expand Up @@ -1032,7 +1035,7 @@ mod tests {
source_params: SourceParams::IngestApi,
transform_config: Some(TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone_opt: None,
timezone: default_timezone(),
}),
input_format: SourceInputFormat::Json,
};
Expand All @@ -1045,7 +1048,7 @@ mod tests {
{
let transform_config = TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone_opt: Some("local".to_string()),
timezone: "local".to_string(),
};
let transform_config_yaml = serde_yaml::to_string(&transform_config).unwrap();
assert_eq!(
Expand All @@ -1056,7 +1059,7 @@ mod tests {
{
let transform_config = TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone_opt: None,
timezone: default_timezone(),
};
let transform_config_yaml = serde_yaml::to_string(&transform_config).unwrap();
assert_eq!(
Expand All @@ -1077,7 +1080,7 @@ mod tests {

let expected_transform_config = TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone_opt: None,
timezone: default_timezone(),
};
assert_eq!(transform_config, expected_transform_config);
}
Expand All @@ -1091,7 +1094,7 @@ mod tests {

let expected_transform_config = TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone_opt: Some("Turkey".to_string()),
timezone: "Turkey".to_string(),
};
assert_eq!(transform_config, expected_transform_config);
}
Expand All @@ -1103,7 +1106,7 @@ mod tests {
{
let transform_config = TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone_opt: Some("Turkey".to_string()),
timezone: "Turkey".to_string(),
};
transform_config.compile_vrl_script().unwrap();
}
Expand All @@ -1116,22 +1119,22 @@ mod tests {
.message = downcase(string!(.message))
"#
.to_string(),
timezone_opt: None,
timezone: default_timezone(),
};
transform_config.compile_vrl_script().unwrap();
}
{
let transform_config = TransformConfig {
vrl_script: ".message = downcase(string!(.message))".to_string(),
timezone_opt: Some("foo".to_string()),
timezone: "foo".to_string(),
};
let error = transform_config.compile_vrl_script().unwrap_err();
assert!(error.to_string().starts_with("Failed to parse timezone"));
}
{
let transform_config = TransformConfig {
vrl_script: "foo".to_string(),
timezone_opt: Some("Turkey".to_string()),
timezone: "Turkey".to_string(),
};
let error = transform_config.compile_vrl_script().unwrap_err();
assert!(error.to_string().starts_with("Failed to compile"));
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-doc-mapper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ typetag = { workspace = true }
utoipa = { workspace = true }

quickwit-datetime = { workspace = true }
quickwit-macros = { workspace = true }
quickwit-query = { workspace = true }

[dev-dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,29 +39,10 @@ use crate::doc_mapper::{JsonObject, Partition};
use crate::query_builder::build_query;
use crate::routing_expression::RoutingExpr;
use crate::{
Cardinality, DocMapper, DocParsingError, ModeType, QueryParserError, TokenizerEntry,
WarmupInfo, DYNAMIC_FIELD_NAME, SOURCE_FIELD_NAME,
Cardinality, DocMapper, DocParsingError, Mode, QueryParserError, TokenizerEntry, WarmupInfo,
DYNAMIC_FIELD_NAME, SOURCE_FIELD_NAME,
};

/// Defines how an unmapped field should be handled.
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub(crate) enum Mode {
#[default]
Lenient,
Strict,
Dynamic(QuickwitJsonOptions),
}

impl Mode {
pub fn mode_type(&self) -> ModeType {
match self {
Mode::Lenient => ModeType::Lenient,
Mode::Strict => ModeType::Strict,
Mode::Dynamic(_) => ModeType::Dynamic,
}
}
}

/// Default [`DocMapper`] implementation
/// which defines a set of rules to map json fields
/// to tantivy index fields.
Expand Down Expand Up @@ -149,7 +130,6 @@ impl TryFrom<DefaultDocMapperBuilder> for DefaultDocMapper {
type Error = anyhow::Error;

fn try_from(builder: DefaultDocMapperBuilder) -> anyhow::Result<DefaultDocMapper> {
let mode = builder.mode()?;
let mut schema_builder = Schema::builder();
let field_mappings = build_mapping_tree(&builder.field_mappings, &mut schema_builder)?;
let source_field = if builder.store_source {
Expand All @@ -162,7 +142,7 @@ impl TryFrom<DefaultDocMapperBuilder> for DefaultDocMapper {
validate_timestamp_field(timestamp_field_path, &field_mappings)?;
};

let dynamic_field = if let Mode::Dynamic(json_options) = &mode {
let dynamic_field = if let Mode::Dynamic(json_options) = &builder.mode {
Some(schema_builder.add_json_field(DYNAMIC_FIELD_NAME, json_options.clone()))
} else {
None
Expand Down Expand Up @@ -255,7 +235,7 @@ impl TryFrom<DefaultDocMapperBuilder> for DefaultDocMapper {
required_fields,
partition_key,
max_num_partitions: builder.max_num_partitions,
mode,
mode: builder.mode,
tokenizer_entries: builder.tokenizers,
tokenizer_manager,
})
Expand Down Expand Up @@ -338,11 +318,6 @@ fn validate_fields_tokenizers(

impl From<DefaultDocMapper> for DefaultDocMapperBuilder {
fn from(default_doc_mapper: DefaultDocMapper) -> Self {
let mode = default_doc_mapper.mode.mode_type();
let dynamic_mapping: Option<QuickwitJsonOptions> = match &default_doc_mapper.mode {
Mode::Dynamic(mapping_options) => Some(mapping_options.clone()),
_ => None,
};
let partition_key_str = default_doc_mapper.partition_key.to_string();
let partition_key_opt: Option<String> = if partition_key_str.is_empty() {
None
Expand All @@ -357,8 +332,7 @@ impl From<DefaultDocMapper> for DefaultDocMapperBuilder {
field_mappings: default_doc_mapper.field_mappings.into(),
tag_fields: default_doc_mapper.tag_field_names.into_iter().collect(),
default_search_fields: default_doc_mapper.default_search_field_names,
mode,
dynamic_mapping,
mode: default_doc_mapper.mode,
partition_key: partition_key_opt,
max_num_partitions: default_doc_mapper.max_num_partitions,
tokenizers: default_doc_mapper.tokenizer_entries,
Expand Down Expand Up @@ -1545,8 +1519,8 @@ mod tests {
.unwrap();
match &field_mapping_type {
super::FieldMappingType::Text(options, _) => {
assert!(options.tokenizer.is_some());
let tokenizer = options.tokenizer.as_ref().unwrap();
assert!(options.indexing_options.is_some());
let tokenizer = &options.indexing_options.as_ref().unwrap().tokenizer;
assert_eq!(tokenizer.name(), "my_tokenizer");
}
_ => panic!("Expected a text field"),
Expand Down
Loading

0 comments on commit 06ac446

Please sign in to comment.