diff --git a/config/tutorials/hdfs-logs/index-config-partitioned.yaml b/config/tutorials/hdfs-logs/index-config-partitioned.yaml index 5a40a9f6764..95c53d29423 100644 --- a/config/tutorials/hdfs-logs/index-config-partitioned.yaml +++ b/config/tutorials/hdfs-logs/index-config-partitioned.yaml @@ -43,5 +43,3 @@ indexing_settings: merge_factor: 10 max_merge_ops: 3 maturation_period: 48 hours - resources: - max_merge_write_throughput: 100mb diff --git a/docs/reference/cli.md b/docs/reference/cli.md index 651e162013d..e97e9effb59 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -182,39 +182,15 @@ quickwit index create --endpoint=http://127.0.0.1:7280 --index-config wikipedia_ ### index update +Updates an index using an index config file. `quickwit index update [args]` -#### index update search-settings - -Updates default search settings. -`quickwit index update search-settings [args]` *Synopsis* ```bash -quickwit index update search-settings +quickwit index update --index - --default-search-fields -``` - -*Options* - -| Option | Description | -|-----------------|-------------| -| `--index` | ID of the target index | -| `--default-search-fields` | List of fields that Quickwit will search into if the user query does not explicitly target a field. Space-separated list, e.g. "field1 field2". If no value is provided, existing defaults are removed and queries without target field will fail. | -#### index update retention-policy - -Configure or disable the retention policy. -`quickwit index update retention-policy [args]` - -*Synopsis* - -```bash -quickwit index update retention-policy - --index - [--period ] - [--schedule ] - [--disable] + --index-config ``` *Options* @@ -222,9 +198,7 @@ quickwit index update retention-policy | Option | Description | |-----------------|-------------| | `--index` | ID of the target index | -| `--period` | Duration after which splits are dropped. Expressed in a human-readable way (`1 day`, `2 hours`, `1 week`, ...) | -| `--schedule` | Frequency at which the retention policy is evaluated and applied. Expressed as a cron expression (0 0 * * * *) or human-readable form (hourly, daily, weekly, ...). | -| `--disable` | Disable the retention policy. Old indexed data will not be cleaned up anymore. | +| `--index-config` | Location of the index config file. | ### index clear Clears an index: deletes all splits and resets checkpoint. @@ -381,7 +355,7 @@ quickwit index ingest | `--batch-size-limit` | Size limit of each submitted document batch. | | `--wait` | Wait for all documents to be commited and available for search before exiting | | `--force` | Force a commit after the last document is sent, and wait for all documents to be committed and available for search before exiting | -| `--commit-timeout` | Timeout for ingest operations that require waiting for the final commit (`--wait` or `--force`). This is different from the `commit_timeout_secs` indexing setting which sets the maximum time before commiting splits after their creation. | +| `--commit-timeout` | Timeout for ingest operations that require waiting for the final commit (`--wait` or `--force`). This is different from the `commit_timeout_secs` indexing setting, which sets the maximum time before commiting splits after their creation. | *Examples* diff --git a/docs/reference/rest-api.md b/docs/reference/rest-api.md index d0606b74372..7835b1b1d8a 100644 --- a/docs/reference/rest-api.md +++ b/docs/reference/rest-api.md @@ -58,18 +58,18 @@ POST api/v1//search #### Parameters -| Variable | Type | Description | Default value | -|---------------------|------------|--------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------| -| `query` | `String` | Query text. See the [query language doc](query-language.md) (mandatory) | | -| `start_timestamp` | `i64` | If set, restrict search to documents with a `timestamp >= start_timestamp`, taking advantage of potential time pruning oportunities. The value must be in seconds. | | -| `end_timestamp` | `i64` | If set, restrict search to documents with a `timestamp < end_timestamp`, taking advantage of potential time pruning oportunities. The value must be in seconds. | | -| `start_offset` | `Integer` | Number of documents to skip | `0` | -| `max_hits` | `Integer` | Maximum number of hits to return (by default 20) | `20` | -| `search_field` | `[String]` | Fields to search on if no field name is specified in the query. Comma-separated list, e.g. "field1,field2" | index_config.search_settings.default_search_fields | -| `snippet_fields` | `[String]` | Fields to extract snippet on. Comma-separated list, e.g. "field1,field2" | | -| `sort_by` | `[String]` | Fields to sort the query results on. You can sort by one or two fast fields or by BM25 `_score` (requires fieldnorms). By default, hits are sorted by their document ID. | | -| `format` | `Enum` | The output format. Allowed values are "json" or "pretty_json" | `pretty_json` | -| `aggs` | `JSON` | The aggregations request. See the [aggregations doc](aggregation.md) for supported aggregations. | | +| Variable | Type | Description | Default value | +|---------------------|------------|-----------------|-----------------| +| `query` | `String` | Query text. See the [query language doc](query-language.md) | _required_ | +| `start_timestamp` | `i64` | If set, restrict search to documents with a `timestamp >= start_timestamp`, taking advantage of potential time pruning oportunities. The value must be in seconds. | | +| `end_timestamp` | `i64` | If set, restrict search to documents with a `timestamp < end_timestamp`, taking advantage of potential time pruning oportunities. The value must be in seconds. | | +| `start_offset` | `Integer` | Number of documents to skip | `0` | +| `max_hits` | `Integer` | Maximum number of hits to return (by default 20) | `20` | +| `search_field` | `[String]` | Fields to search on if no field name is specified in the query. Comma-separated list, e.g. "field1,field2" | index_config.search_settings.default_search_fields | +| `snippet_fields` | `[String]` | Fields to extract snippet on. Comma-separated list, e.g. "field1,field2" | | +| `sort_by` | `[String]` | Fields to sort the query results on. You can sort by one or two fast fields or by BM25 `_score` (requires fieldnorms). By default, hits are sorted by their document ID. | | +| `format` | `Enum` | The output format. Allowed values are "json" or "pretty_json" | `pretty_json` | +| `aggs` | `JSON` | The aggregations request. See the [aggregations doc](aggregation.md) for supported aggregations. | | :::info The `start_timestamp` and `end_timestamp` should be specified in seconds regardless of the timestamp field precision. @@ -144,14 +144,14 @@ The endpoint will return 10 million values if 10 million documents match the que #### Get parameters -| Variable | Type | Description | Default value | -|---------------------|------------|------------------------------------------------------------------------------------------------------------------|----------------------------------------------------| -| `query` | `String` | Query text. See the [query language doc](query-language.md) (mandatory) | | -| `fast_field` | `String` | Name of a field to retrieve from documents. This field must be a fast field of type `i64` or `u64`. (mandatory) | | -| `search_field` | `[String]` | Fields to search on. Comma-separated list, e.g. "field1,field2" | index_config.search_settings.default_search_fields | -| `start_timestamp` | `i64` | If set, restrict search to documents with a `timestamp >= start_timestamp`. The value must be in seconds. | | -| `end_timestamp` | `i64` | If set, restrict search to documents with a `timestamp < end_timestamp`. The value must be in seconds. | | -| `partition_by_field` | `String` | If set, the endpoint returns chunks of data for each partition field value. This field must be a fast field of type `i64` or `u64`. | | +| Variable | Type | Description | Default value | +|---------------------|------------|----------------------------------------------------------------------------------------------------------|----------------------------------------------------| +| `query` | `String` | Query text. See the [query language doc](query-language.md) | _required_ | +| `fast_field` | `String` | Name of a field to retrieve from documents. This field must be a fast field of type `i64` or `u64`. | _required_ | +| `search_field` | `[String]` | Fields to search on. Comma-separated list, e.g. "field1,field2" | index_config.search_settings.default_search_fields | +| `start_timestamp` | `i64` | If set, restrict search to documents with a `timestamp >= start_timestamp`. The value must be in seconds. | | +| `end_timestamp` | `i64` | If set, restrict search to documents with a `timestamp < end_timestamp`. The value must be in seconds. | | +| `partition_by_field` | `String` | If set, the endpoint returns chunks of data for each partition field value. This field must be a fast field of type `i64` or `u64`. | | | `output_format` | `String` | Response output format. `csv` or `clickHouseRowBinary` | `csv` | :::info @@ -223,16 +223,16 @@ The response is a JSON object, and the content type is `application/json; charse POST api/v1/indexes ``` -Create an index by posting an `IndexConfig` payload. The API accepts JSON with `content-type: application/json` and YAML `content-type: application/yaml`. +Create an index by posting an `IndexConfig` payload. The API accepts JSON with `content-type: application/json` and YAML with `content-type: application/yaml`. #### POST payload | Variable | Type | Description | Default value | |---------------------|--------------------|-----------------------------------------------------------------------------------------------------------------------|---------------------------------------| -| `version` | `String` | Config format version, use the same as your Quickwit version. (mandatory) | | -| `index_id` | `String` | Index ID, see its [validation rules](../configuration/index-config.md#index-id) on identifiers. (mandatory) | | +| `version` | `String` | Config format version, use the same as your Quickwit version. | _required_ | +| `index_id` | `String` | Index ID, see its [validation rules](../configuration/index-config.md#index-id) on identifiers. | _required_ | | `index_uri` | `String` | Defines where the index files are stored. This parameter expects a [storage URI](../configuration/storage-config.md#storage-uris). | `{default_index_root_uri}/{index_id}` | -| `doc_mapping` | `DocMapping` | Doc mapping object as specified in the [index config docs](../configuration/index-config.md#doc-mapping) (mandatory) | | +| `doc_mapping` | `DocMapping` | Doc mapping object as specified in the [index config docs](../configuration/index-config.md#doc-mapping). | _required_ | | `indexing_settings` | `IndexingSettings` | Indexing settings object as specified in the [index config docs](../configuration/index-config.md#indexing-settings). | | | `search_settings` | `SearchSettings` | Search settings object as specified in the [index config docs](../configuration/index-config.md#search-settings). | | | `retention` | `Retention` | Retention policy object as specified in the [index config docs](../configuration/index-config.md#retention-policy). | | @@ -240,11 +240,11 @@ Create an index by posting an `IndexConfig` payload. The API accepts JSON with ` **Payload Example** -curl -XPOST http://0.0.0.0:8080/api/v1/indexes --data @index_config.json -H "Content-Type: application/json" +curl -XPOST http://localhost:7280/api/v1/indexes --data @index_config.json -H "Content-Type: application/json" ```json title="index_config.json { - "version": "0.7", + "version": "0.8", "index_id": "hdfs-logs", "doc_mapping": { "field_mappings": [ @@ -285,9 +285,6 @@ curl -XPOST http://0.0.0.0:8080/api/v1/indexes --data @index_config.json -H "Con "max_merge_ops": 3, "merge_factor": 10, "max_merge_factor": 12 - }, - "resources": { - "max_merge_write_throughput": "80mb" } }, "retention": { @@ -301,62 +298,105 @@ curl -XPOST http://0.0.0.0:8080/api/v1/indexes --data @index_config.json -H "Con The response is the index metadata of the created index, and the content type is `application/json; charset=UTF-8.` -| Field | Description | Type | -|----------------------|-----------------------------------------|:---------------------:| -| `index_config` | The posted index config. | `IndexConfig` | -| `checkpoint` | Map of checkpoints by source. | `IndexCheckpoint` | -| `create_timestamp` | Index creation timestamp | `number` | -| `sources` | List of the index sources configurations. | `Array` | +| Field | Description | Type | +|----------------------|-----------------------------------------------|:---------------------:| +| `version` | The current index configuration format version. | `string` | +| `index_uid` | The server-generated index UID. | `string` | +| `index_config` | The posted index config. | `IndexConfig` | +| `checkpoint` | Map of checkpoints by source. | `IndexCheckpoint` | +| `create_timestamp` | Index creation timestamp | `number` | +| `sources` | List of the index sources configurations. | `Array` | -### Update an index (search settings and retention policy only) +### Update an index ``` PUT api/v1/indexes/ ``` -Updates the search settings and retention policy of an index. This endpoint follows PUT semantics (not PATCH), which means that all the updatable fields of the index configuration are replaced by the values specified in this request. In particular, omitting an optional field like retention_policy will delete the associated configuration. Unlike the create endpoint, this API only accepts JSON payloads. +Updates the configurations of an index. This endpoint follows PUT semantics, which means that all the fields of the current configuration are replaced by the values specified in this request or the associated defaults. In particular, if the field is optional (e.g. `retention_policy`), omitting it will delete the associated configuration. If the new configuration file contains updates that cannot be applied, the request fails, and none of the updates are applied. The API accepts JSON with `content-type: application/json` and YAML with `content-type: application/yaml`. + +- The retention policy update is automatically picked up by the janitor service on its next state refresh. +- The search settings update is automatically picked up by searcher nodes when the next query is executed. +- The indexing settings update is not automatically picked up by the indexer nodes, they need to be manually restarted. #### PUT payload | Variable | Type | Description | Default value | |---------------------|--------------------|-----------------------------------------------------------------------------------------------------------------------|---------------------------------------| +| `version` | `String` | Config format version, use the same as your Quickwit version. | _required_ | +| `index_id` | `String` | Index ID, must be the same index as in the request URI. | _required_ | +| `index_uri` | `String` | Defines where the index files are stored. Cannot be updated. | `{current_index_uri}` | +| `doc_mapping` | `DocMapping` | Doc mapping object as specified in the [index config docs](../configuration/index-config.md#doc-mapping). Cannot be updated. | _required_ | +| `indexing_settings` | `IndexingSettings` | Indexing settings object as specified in the [index config docs](../configuration/index-config.md#indexing-settings). | | | `search_settings` | `SearchSettings` | Search settings object as specified in the [index config docs](../configuration/index-config.md#search-settings). | | | `retention` | `Retention` | Retention policy object as specified in the [index config docs](../configuration/index-config.md#retention-policy). | | **Payload Example** -curl -XPUT http://0.0.0.0:8080/api/v1/indexes --data @index_update.json -H "Content-Type: application/json" +curl -XPUT http://localhost:7280/api/v1/indexes/hdfs-logs --data @updated_index_update.json -H "Content-Type: application/json" -```json title="index_update.json +```json title="updated_index_update.json { + "version": "0.8", + "index_id": "hdfs-logs", + "doc_mapping": { + "field_mappings": [ + { + "name": "tenant_id", + "type": "u64", + "fast": true + }, + { + "name": "app_id", + "type": "u64", + "fast": true + }, + { + "name": "timestamp", + "type": "datetime", + "input_formats": ["unix_timestamp"], + "fast_precision": "seconds", + "fast": true + }, + { + "name": "body", + "type": "text", + "record": "position" + } + ], + "partition_key": "tenant_id", + "max_num_partitions": 200, + "tag_fields": ["tenant_id"], + "timestamp_field": "timestamp" + }, "search_settings": { "default_search_fields": ["body"] }, + "indexing_settings": { + "merge_policy": { + "type": "limit_merge", + "max_merge_ops": 3, + "merge_factor": 10, + "max_merge_factor": 12 + } + }, "retention": { - "period": "3 days", + "period": "30 days", "schedule": "@daily" } } ``` -:::warning -Calling the update endpoint with the following payload will remove the current retention policy. -```json -{ - "search_settings": { - "default_search_fields": ["body"] - } -} -``` - #### Response The response is the index metadata of the updated index, and the content type is `application/json; charset=UTF-8.` | Field | Description | Type | |----------------------|-----------------------------------------|:---------------------:| +| `version` | The current server configuration version. | `string` | +| `index_uid` | The server-generated index UID. | `string` | | `index_config` | The posted index config. | `IndexConfig` | | `checkpoint` | Map of checkpoints by source. | `IndexCheckpoint` | | `create_timestamp` | Index creation timestamp | `number` | @@ -377,6 +417,8 @@ The response is the index metadata of the requested index, and the content type | Field | Description | Type | |----------------------|-------------------------------------------|:---------------------:| +| `version` | The current server configuration version. | `string` | +| `index_uid` | The server-generated index UID. | `string` | | `index_config` | The posted index config. | `IndexConfig` | | `checkpoint` | Map of checkpoints by source. | `IndexCheckpoint` | | `create_timestamp` | Index creation timestamp. | `number` | @@ -543,22 +585,22 @@ Create source by posting a source config JSON payload. #### POST payload -| Variable | Type | Description | Default value | -|-------------------|----------|------------------------------------------------------------------------------------------------------|---------------| -| `version** | `String` | Config format version, put your current Quickwit version. (mandatory) | | -| `source_id` | `String` | Source ID. See ID [validation rules](../configuration/source-config.md)(mandatory) | | -| `source_type` | `String` | Source type: `kafka`, `kinesis` or `pulsar` (mandatory) | | -| `num_pipelines` | `usize` | Number of running indexing pipelines per node for this source. | 1 | -| `params` | `object` | Source parameters as defined in [source config docs](../configuration/source-config.md). (mandatory) | | +| Variable | Type | Description | Default value | +|-------------------|----------|----------------------------------------------------------------------------------------|---------------| +| `version** | `String` | Config format version, put your current Quickwit version. | _required_ | +| `source_id` | `String` | Source ID. See ID [validation rules](../configuration/source-config.md). | _required_ | +| `source_type` | `String` | Source type: `kafka`, `kinesis` or `pulsar`. | _required_ | +| `num_pipelines` | `usize` | Number of running indexing pipelines per node for this source. | 1 | +| `params` | `object` | Source parameters as defined in [source config docs](../configuration/source-config.md). | _required_ | **Payload Example** -curl -XPOST http://0.0.0.0:8080/api/v1/indexes/my-index/sources --data @source_config.json -H "Content-Type: application/json" +curl -XPOST http://localhost:7280/api/v1/indexes/my-index/sources --data @source_config.json -H "Content-Type: application/json" ```json title="source_config.json { - "version": "0.7", + "version": "0.8", "source_id": "kafka-source", "source_type": "kafka", "params": { @@ -647,9 +689,9 @@ The endpoint simply appends your delete task to the delete task queue in the met #### POST payload `DeleteQuery` -| Variable | Type | Description | Default value | -|---------------------|------------|-----------------------------------------------------------------------------------------------------------|----------------------------------------------------| -| `query` | `String` | Query text. See the [query language doc](query-language.md) (mandatory) | | +| Variable | Type | Description | Default value | +|---------------------|------------|---------------------------------------------------------------------------------------------------------|----------------------------------------------------| +| `query` | `String` | Query text. See the [query language doc](query-language.md) | _required_ | | `search_field` | `[String]` | Fields to search on. Comma-separated list, e.g. "field1,field2" | index_config.search_settings.default_search_fields | | `start_timestamp` | `i64` | If set, restrict search to documents with a `timestamp >= start_timestamp`. The value must be in seconds. | | | `end_timestamp` | `i64` | If set, restrict search to documents with a `timestamp < end_timestamp`. The value must be in seconds. | | diff --git a/quickwit/quickwit-cli/src/index/mod.rs b/quickwit/quickwit-cli/src/index.rs similarity index 94% rename from quickwit/quickwit-cli/src/index/mod.rs rename to quickwit/quickwit-cli/src/index.rs index efc253b9667..9028fa26c56 100644 --- a/quickwit/quickwit-cli/src/index/mod.rs +++ b/quickwit/quickwit-cli/src/index.rs @@ -57,13 +57,10 @@ use tabled::{Table, Tabled}; use thousands::Separable; use tracing::{debug, Level}; -use self::update::{build_index_update_command, IndexUpdateCliCommand}; use crate::checklist::GREEN_COLOR; use crate::stats::{mean, percentile, std_deviation}; use crate::{client_args, make_table, prompt_confirmation, ClientArgs, THROUGHPUT_WINDOW_SIZE}; -pub mod update; - pub fn build_index_command() -> Command { Command::new("index") .about("Manages indexes: creates, updates, deletes, ingests, searches, describes...") @@ -81,7 +78,18 @@ pub fn build_index_command() -> Command { ]) ) .subcommand( - build_index_update_command().display_order(2) + Command::new("update") + .display_order(1) + .about("Updates an index using an index config file.") + .long_about("This command follows PUT semantics, which means that all the fields of the current configuration are replaced by the values specified in this request or the associated defaults. In particular, if the field is optional (e.g. `retention_policy`), omitting it will delete the associated configuration. If the new configuration file contains updates that cannot be applied, the request fails, and none of the updates are applied.") + .args(&[ + arg!(--index "ID of the target index") + .display_order(1) + .required(true), + arg!(--"index-config" "Location of the index config file.") + .display_order(2) + .required(true), + ]) ) .subcommand( Command::new("clear") @@ -213,6 +221,14 @@ pub struct CreateIndexArgs { pub assume_yes: bool, } +#[derive(Debug, Eq, PartialEq)] +pub struct UpdateIndexArgs { + pub client_args: ClientArgs, + pub index_id: IndexId, + pub index_config_uri: Uri, + pub assume_yes: bool, +} + #[derive(Debug, Eq, PartialEq)] pub struct DescribeIndexArgs { pub client_args: ClientArgs, @@ -260,12 +276,12 @@ pub struct ListIndexesArgs { pub enum IndexCliCommand { Clear(ClearIndexArgs), Create(CreateIndexArgs), + Update(UpdateIndexArgs), Delete(DeleteIndexArgs), Describe(DescribeIndexArgs), Ingest(IngestDocsArgs), List(ListIndexesArgs), Search(SearchIndexArgs), - Update(IndexUpdateCliCommand), } impl IndexCliCommand { @@ -288,7 +304,7 @@ impl IndexCliCommand { "ingest" => Self::parse_ingest_args(submatches), "list" => Self::parse_list_args(submatches), "search" => Self::parse_search_args(submatches), - "update" => Ok(Self::Update(IndexUpdateCliCommand::parse_args(submatches)?)), + "update" => Self::parse_update_args(submatches), _ => bail!("unknown index subcommand `{subcommand}`"), } } @@ -323,6 +339,25 @@ impl IndexCliCommand { })) } + fn parse_update_args(mut matches: ArgMatches) -> anyhow::Result { + let client_args = ClientArgs::parse(&mut matches)?; + let index_id = matches + .remove_one::("index") + .expect("`index` should be a required arg."); + let index_config_uri = matches + .remove_one::("index-config") + .map(|uri| Uri::from_str(&uri)) + .expect("`index-config` should be a required arg.")?; + let assume_yes = matches.get_flag("yes"); + + Ok(Self::Update(UpdateIndexArgs { + index_id, + client_args, + index_config_uri, + assume_yes, + })) + } + fn parse_describe_args(mut matches: ArgMatches) -> anyhow::Result { let client_args = ClientArgs::parse(&mut matches)?; let index_id = matches @@ -449,7 +484,7 @@ impl IndexCliCommand { Self::Ingest(args) => ingest_docs_cli(args).await, Self::List(args) => list_index_cli(args).await, Self::Search(args) => search_index_cli(args).await, - Self::Update(args) => args.execute().await, + Self::Update(args) => update_index_cli(args).await, } } } @@ -501,6 +536,35 @@ pub async fn create_index_cli(args: CreateIndexArgs) -> anyhow::Result<()> { Ok(()) } +pub async fn update_index_cli(args: UpdateIndexArgs) -> anyhow::Result<()> { + debug!(args=?args, "update-index"); + println!("❯ Updating index..."); + let storage_resolver = StorageResolver::unconfigured(); + let file_content = load_file(&storage_resolver, &args.index_config_uri).await?; + let index_config_str = std::str::from_utf8(&file_content) + .with_context(|| { + format!( + "index config file `{}` contains some invalid UTF-8 characters", + args.index_config_uri + ) + })? + .to_string(); + let config_format = ConfigFormat::sniff_from_uri(&args.index_config_uri)?; + let qw_client = args.client_args.client(); + if !args.assume_yes { + let prompt = "This operation will update the index configuration. Do you want to proceed?"; + if !prompt_confirmation(prompt, false) { + return Ok(()); + } + } + qw_client + .indexes() + .update(&args.index_id, &index_config_str, config_format) + .await?; + println!("{} Index successfully updated.", "✔".color(GREEN_COLOR)); + Ok(()) +} + pub async fn list_index_cli(args: ListIndexesArgs) -> anyhow::Result<()> { debug!(args=?args, "list-index"); let qw_client = args.client_args.client(); diff --git a/quickwit/quickwit-cli/src/index/update.rs b/quickwit/quickwit-cli/src/index/update.rs deleted file mode 100644 index f417dbb0216..00000000000 --- a/quickwit/quickwit-cli/src/index/update.rs +++ /dev/null @@ -1,257 +0,0 @@ -// Copyright (C) 2024 Quickwit, Inc. -// -// Quickwit is offered under the AGPL v3.0 and as commercial software. -// For commercial licensing, contact us at hello@quickwit.io. -// -// AGPL: -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use anyhow::{bail, Context}; -use clap::{arg, ArgMatches, Command}; -use colored::Colorize; -use quickwit_config::{RetentionPolicy, SearchSettings}; -use quickwit_proto::types::IndexId; -use quickwit_serve::IndexUpdates; -use tracing::debug; - -use crate::checklist::GREEN_COLOR; -use crate::ClientArgs; - -pub fn build_index_update_command() -> Command { - Command::new("update") - .subcommand_required(true) - .subcommand( - Command::new("search-settings") - .about("Updates default search settings.") - .args(&[ - arg!(--index "ID of the target index") - .display_order(1) - .required(true), - arg!(--"default-search-fields" "List of fields that Quickwit will search into if the user query does not explicitly target a field. Space-separated list, e.g. \"field1 field2\". If no value is provided, existing defaults are removed and queries without target field will fail.") - .display_order(2) - .num_args(0..) - .required(true), - ])) - .subcommand( - Command::new("retention-policy") - .about("Configures or disables the retention policy.") - .args(&[ - arg!(--index "ID of the target index") - .display_order(1) - .required(true), - arg!(--"period" "Duration after which splits are dropped. Expressed in a human-readable way (`1 day`, `2 hours`, `1 week`, ...)") - .display_order(2) - .required(false), - arg!(--"schedule" "Frequency at which the retention policy is evaluated and applied. Expressed as a cron expression (0 0 * * * *) or human-readable form (hourly, daily, weekly, ...).") - .display_order(3) - .required(false), - arg!(--"disable" "Disables the retention policy. Old indexed data will not be cleaned up anymore.") - .display_order(4) - .required(false), - ]) - ) -} - -#[derive(Debug, Eq, PartialEq)] -pub struct RetentionPolicyArgs { - pub client_args: ClientArgs, - pub index_id: IndexId, - pub disable: bool, - pub period: Option, - pub schedule: Option, -} - -#[derive(Debug, Eq, PartialEq)] -pub struct SearchSettingsArgs { - pub client_args: ClientArgs, - pub index_id: IndexId, - pub default_search_fields: Vec, -} - -#[derive(Debug, Eq, PartialEq)] -pub enum IndexUpdateCliCommand { - RetentionPolicy(RetentionPolicyArgs), - SearchSettings(SearchSettingsArgs), -} - -impl IndexUpdateCliCommand { - pub fn parse_args(mut matches: ArgMatches) -> anyhow::Result { - let (subcommand, submatches) = matches - .remove_subcommand() - .context("failed to parse index update subcommand")?; - match subcommand.as_str() { - "retention-policy" => Self::parse_update_retention_policy_args(submatches), - "search-settings" => Self::parse_update_search_settings_args(submatches), - _ => bail!("unknown index update subcommand `{subcommand}`"), - } - } - - fn parse_update_retention_policy_args(mut matches: ArgMatches) -> anyhow::Result { - let client_args = ClientArgs::parse(&mut matches)?; - let index_id = matches - .remove_one::("index") - .expect("`index` should be a required arg."); - let disable = matches.get_flag("disable"); - let period = matches.remove_one::("period"); - let schedule = matches.remove_one::("schedule"); - Ok(Self::RetentionPolicy(RetentionPolicyArgs { - client_args, - index_id, - disable, - period, - schedule, - })) - } - - fn parse_update_search_settings_args(mut matches: ArgMatches) -> anyhow::Result { - let client_args = ClientArgs::parse(&mut matches)?; - let index_id = matches - .remove_one::("index") - .expect("`index` should be a required arg."); - let default_search_fields = matches - .remove_many::("default-search-fields") - .map(|values| values.collect()) - // --default-search-fields should be made optional if other fields - // are added to SearchSettings - .expect("`default-search-fields` should be a required arg."); - Ok(Self::SearchSettings(SearchSettingsArgs { - client_args, - index_id, - default_search_fields, - })) - } - - pub async fn execute(self) -> anyhow::Result<()> { - match self { - Self::RetentionPolicy(args) => update_retention_policy_cli(args).await, - Self::SearchSettings(args) => update_search_settings_cli(args).await, - } - } -} - -pub async fn update_retention_policy_cli(args: RetentionPolicyArgs) -> anyhow::Result<()> { - debug!(args=?args, "update-index-retention-policy"); - println!("❯ Updating index retention policy..."); - let qw_client = args.client_args.client(); - let metadata = qw_client.indexes().get(&args.index_id).await?; - let new_retention_policy_opt = match ( - args.disable, - args.period, - args.schedule, - metadata.index_config.retention_policy_opt, - ) { - (true, Some(_), Some(_), _) | (true, None, Some(_), _) | (true, Some(_), None, _) => { - bail!("`--period` and `--schedule` cannot be used together with `--disable`") - } - (false, None, None, _) => bail!("either `--period` or `--disable` must be specified"), - (false, None, Some(_), None) => { - bail!("`--period` is required when creating a retention policy") - } - (true, None, None, _) => None, - (false, None, Some(schedule), Some(policy)) => Some(RetentionPolicy { - retention_period: policy.retention_period, - evaluation_schedule: schedule, - }), - (false, Some(period), schedule_opt, None) => Some(RetentionPolicy { - retention_period: period, - evaluation_schedule: schedule_opt.unwrap_or(RetentionPolicy::default_schedule()), - }), - (false, Some(period), schedule_opt, Some(policy)) => Some(RetentionPolicy { - retention_period: period, - evaluation_schedule: schedule_opt.unwrap_or(policy.evaluation_schedule.clone()), - }), - }; - if let Some(new_retention_policy) = new_retention_policy_opt.as_ref() { - println!( - "New retention policy: {}", - serde_json::to_string(&new_retention_policy)? - ); - } else { - println!("Disable retention policy."); - } - qw_client - .indexes() - .update( - &args.index_id, - IndexUpdates { - retention_policy_opt: new_retention_policy_opt, - search_settings: metadata.index_config.search_settings, - }, - ) - .await?; - println!("{} Index successfully updated.", "✔".color(GREEN_COLOR)); - Ok(()) -} - -pub async fn update_search_settings_cli(args: SearchSettingsArgs) -> anyhow::Result<()> { - debug!(args=?args, "update-index-search-settings"); - println!("❯ Updating index search settings..."); - let qw_client = args.client_args.client(); - let metadata = qw_client.indexes().get(&args.index_id).await?; - let search_settings = SearchSettings { - default_search_fields: args.default_search_fields, - }; - println!( - "New search settings: {}", - serde_json::to_string(&search_settings)? - ); - qw_client - .indexes() - .update( - &args.index_id, - IndexUpdates { - retention_policy_opt: metadata.index_config.retention_policy_opt, - search_settings, - }, - ) - .await?; - println!("{} Index successfully updated.", "✔".color(GREEN_COLOR)); - Ok(()) -} - -#[cfg(test)] -mod test { - use super::*; - use crate::cli::{build_cli, CliCommand}; - use crate::index::IndexCliCommand; - - #[test] - fn test_cmd_update_subsubcommand() { - let app = build_cli().no_binary_name(true); - let matches = app - .try_get_matches_from([ - "index", - "update", - "retention-policy", - "--index", - "my-index", - "--period", - "1 day", - ]) - .unwrap(); - let command = CliCommand::parse_cli_args(matches).unwrap(); - assert!(matches!( - command, - CliCommand::Index(IndexCliCommand::Update( - IndexUpdateCliCommand::RetentionPolicy(RetentionPolicyArgs { - client_args: _, - index_id, - disable: false, - period: Some(period), - schedule: None, - }) - )) if &index_id == "my-index" && &period == "1 day" - )); - } -} diff --git a/quickwit/quickwit-cli/tests/cli.rs b/quickwit/quickwit-cli/tests/cli.rs index 848934c6a28..25f840fd1e7 100644 --- a/quickwit/quickwit-cli/tests/cli.rs +++ b/quickwit/quickwit-cli/tests/cli.rs @@ -22,25 +22,21 @@ mod helpers; use std::path::Path; -use std::str::FromStr; use anyhow::Result; use clap::error::ErrorKind; use helpers::{TestEnv, TestStorageType}; use quickwit_cli::checklist::ChecklistError; use quickwit_cli::cli::build_cli; -use quickwit_cli::index::update::{update_retention_policy_cli, RetentionPolicyArgs}; use quickwit_cli::index::{ - create_index_cli, delete_index_cli, search_index, CreateIndexArgs, DeleteIndexArgs, - SearchIndexArgs, + create_index_cli, delete_index_cli, search_index, update_index_cli, CreateIndexArgs, + DeleteIndexArgs, SearchIndexArgs, UpdateIndexArgs, }; use quickwit_cli::tool::{ garbage_collect_index_cli, local_ingest_docs_cli, GarbageCollectIndexArgs, LocalIngestDocsArgs, }; -use quickwit_cli::ClientArgs; use quickwit_common::fs::get_cache_directory_path; use quickwit_common::rand::append_random_suffix; -use quickwit_common::uri::Uri; use quickwit_config::{RetentionPolicy, SourceInputFormat, CLI_SOURCE_ID}; use quickwit_metastore::{ ListSplitsRequestExt, MetastoreResolver, MetastoreServiceExt, MetastoreServiceStreamSplitsExt, @@ -57,11 +53,8 @@ use crate::helpers::{create_test_env, upload_test_file, PACKAGE_BIN_NAME}; async fn create_logs_index(test_env: &TestEnv) -> anyhow::Result<()> { let args = CreateIndexArgs { - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - ..Default::default() - }, - index_config_uri: test_env.index_config_uri.clone(), + client_args: test_env.default_client_args(), + index_config_uri: test_env.resource_files.index_config.clone(), overwrite: false, assume_yes: true, }; @@ -70,7 +63,7 @@ async fn create_logs_index(test_env: &TestEnv) -> anyhow::Result<()> { async fn local_ingest_docs(input_path: &Path, test_env: &TestEnv) -> anyhow::Result<()> { let args = LocalIngestDocsArgs { - config_uri: test_env.config_uri.clone(), + config_uri: test_env.resource_files.config.clone(), index_id: test_env.index_id.clone(), input_path_opt: Some(input_path.to_path_buf()), input_format: SourceInputFormat::Json, @@ -118,12 +111,9 @@ async fn test_cmd_create_no_index_uri() { .unwrap(); test_env.start_server().await.unwrap(); - let index_config_without_uri = Uri::from_str(&test_env.index_config_without_uri()).unwrap(); + let index_config_without_uri = test_env.resource_files.index_config_without_uri.clone(); let args = CreateIndexArgs { - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - ..Default::default() - }, + client_args: test_env.default_client_args(), index_config_uri: index_config_without_uri, overwrite: false, assume_yes: true, @@ -146,12 +136,9 @@ async fn test_cmd_create_overwrite() { .unwrap(); test_env.start_server().await.unwrap(); - let index_config_without_uri = Uri::from_str(&test_env.index_config_without_uri()).unwrap(); + let index_config_without_uri = test_env.resource_files.index_config_without_uri.clone(); let args = CreateIndexArgs { - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - ..Default::default() - }, + client_args: test_env.default_client_args(), index_config_uri: index_config_without_uri, overwrite: true, assume_yes: true, @@ -182,9 +169,9 @@ async fn test_cmd_ingest_on_non_existing_index() { .unwrap(); let args = LocalIngestDocsArgs { - config_uri: test_env.config_uri, + config_uri: test_env.resource_files.config, index_id: "index-does-not-exist".to_string(), - input_path_opt: Some(test_env.resource_files["logs"].clone()), + input_path_opt: Some(test_env.resource_files.log_docs.clone()), input_format: SourceInputFormat::Json, overwrite: false, clear_cache: true, @@ -212,9 +199,9 @@ async fn test_ingest_docs_cli_keep_cache() { create_logs_index(&test_env).await.unwrap(); let args = LocalIngestDocsArgs { - config_uri: test_env.config_uri, + config_uri: test_env.resource_files.config, index_id, - input_path_opt: Some(test_env.resource_files["logs"].clone()), + input_path_opt: Some(test_env.resource_files.log_docs.clone()), input_format: SourceInputFormat::Json, overwrite: false, clear_cache: false, @@ -239,9 +226,9 @@ async fn test_ingest_docs_cli() { let index_uid = test_env.index_metadata().await.unwrap().index_uid; let args = LocalIngestDocsArgs { - config_uri: test_env.config_uri.clone(), + config_uri: test_env.resource_files.config.clone(), index_id: index_id.clone(), - input_path_opt: Some(test_env.resource_files["logs"].clone()), + input_path_opt: Some(test_env.resource_files.log_docs.clone()), input_format: SourceInputFormat::Json, overwrite: false, clear_cache: true, @@ -270,7 +257,7 @@ async fn test_ingest_docs_cli() { // Ingest a non-existing file should fail. let args = LocalIngestDocsArgs { - config_uri: test_env.config_uri, + config_uri: test_env.resource_files.config, index_id: test_env.index_id, input_path_opt: Some(test_env.data_dir_path.join("file-does-not-exist.json")), input_format: SourceInputFormat::Json, @@ -345,7 +332,7 @@ async fn test_cmd_search_aggregation() { test_env.start_server().await.unwrap(); create_logs_index(&test_env).await.unwrap(); - local_ingest_docs(test_env.resource_files["logs"].as_path(), &test_env) + local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env) .await .unwrap(); @@ -371,7 +358,7 @@ async fn test_cmd_search_aggregation() { // search with aggregation let args = SearchIndexArgs { - index_id: test_env.index_id, + index_id: test_env.index_id.clone(), query: "paris OR tokio OR london".to_string(), aggregation: Some(serde_json::to_string(&aggregation).unwrap()), max_hits: 10, @@ -380,10 +367,7 @@ async fn test_cmd_search_aggregation() { snippet_fields: None, start_timestamp: None, end_timestamp: None, - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint, - ..Default::default() - }, + client_args: test_env.default_client_args(), sort_by_score: false, }; let search_response = search_index(args).await.unwrap(); @@ -448,13 +432,13 @@ async fn test_cmd_search_with_snippets() -> Result<()> { test_env.start_server().await.unwrap(); create_logs_index(&test_env).await.unwrap(); - local_ingest_docs(test_env.resource_files["logs"].as_path(), &test_env) + local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env) .await .unwrap(); // search with snippets let args = SearchIndexArgs { - index_id: test_env.index_id, + index_id: test_env.index_id.clone(), query: "event:baz".to_string(), aggregation: None, max_hits: 10, @@ -463,10 +447,7 @@ async fn test_cmd_search_with_snippets() -> Result<()> { snippet_fields: Some(vec!["event".to_string()]), start_timestamp: None, end_timestamp: None, - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint, - ..Default::default() - }, + client_args: test_env.default_client_args(), sort_by_score: false, }; let search_response = search_index(args).await.unwrap(); @@ -493,10 +474,7 @@ async fn test_search_index_cli() { create_logs_index(&test_env).await.unwrap(); let create_search_args = |query: &str| SearchIndexArgs { - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - ..Default::default() - }, + client_args: test_env.default_client_args(), index_id: index_id.clone(), query: query.to_string(), aggregation: None, @@ -509,7 +487,7 @@ async fn test_search_index_cli() { sort_by_score: false, }; - local_ingest_docs(test_env.resource_files["logs"].as_path(), &test_env) + local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env) .await .unwrap(); @@ -544,20 +522,16 @@ async fn test_cmd_update_index() { test_env.start_server().await.unwrap(); create_logs_index(&test_env).await.unwrap(); - // add a policy - update_retention_policy_cli(RetentionPolicyArgs { + // add retention policy + let args = UpdateIndexArgs { + client_args: test_env.default_client_args(), index_id: index_id.clone(), - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - ..Default::default() - }, - disable: false, - period: Some(String::from("1 week")), - schedule: Some(String::from("daily")), - }) - .await - .unwrap(); + index_config_uri: test_env.resource_files.index_config_with_retention.clone(), + assume_yes: true, + }; + update_index_cli(args).await.unwrap(); let index_metadata = test_env.index_metadata().await.unwrap(); + assert_eq!(index_metadata.index_id(), test_env.index_id); assert_eq!( index_metadata.index_config.retention_policy_opt, Some(RetentionPolicy { @@ -566,34 +540,16 @@ async fn test_cmd_update_index() { }) ); - // invalid args - update_retention_policy_cli(RetentionPolicyArgs { - index_id: index_id.clone(), - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - ..Default::default() - }, - disable: true, - period: Some(String::from("a week")), - schedule: Some(String::from("daily")), - }) - .await - .unwrap_err(); - - // remove the policy - update_retention_policy_cli(RetentionPolicyArgs { + // remove retention policy + let args = UpdateIndexArgs { + client_args: test_env.default_client_args(), index_id, - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - ..Default::default() - }, - disable: true, - period: None, - schedule: None, - }) - .await - .unwrap(); + index_config_uri: test_env.resource_files.index_config.clone(), + assume_yes: true, + }; + update_index_cli(args).await.unwrap(); let index_metadata = test_env.index_metadata().await.unwrap(); + assert_eq!(index_metadata.index_id(), test_env.index_id); assert_eq!(index_metadata.index_config.retention_policy_opt, None); } @@ -620,10 +576,7 @@ async fn test_delete_index_cli_dry_run() { }; let create_delete_args = |dry_run| DeleteIndexArgs { - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - ..Default::default() - }, + client_args: test_env.default_client_args(), index_id: index_id.clone(), dry_run, assume_yes: true, @@ -647,7 +600,7 @@ async fn test_delete_index_cli_dry_run() { .unwrap(); assert!(metastore.index_exists(&index_id).await.unwrap()); - local_ingest_docs(test_env.resource_files["logs"].as_path(), &test_env) + local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env) .await .unwrap(); @@ -673,15 +626,12 @@ async fn test_delete_index_cli() { test_env.start_server().await.unwrap(); create_logs_index(&test_env).await.unwrap(); - local_ingest_docs(test_env.resource_files["logs"].as_path(), &test_env) + local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env) .await .unwrap(); let args = DeleteIndexArgs { - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - ..Default::default() - }, + client_args: test_env.default_client_args(), index_id: index_id.clone(), assume_yes: true, dry_run: false, @@ -702,7 +652,7 @@ async fn test_garbage_collect_cli_no_grace() { test_env.start_server().await.unwrap(); create_logs_index(&test_env).await.unwrap(); let index_uid = test_env.index_metadata().await.unwrap().index_uid; - local_ingest_docs(test_env.resource_files["logs"].as_path(), &test_env) + local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env) .await .unwrap(); @@ -724,7 +674,7 @@ async fn test_garbage_collect_cli_no_grace() { }; let create_gc_args = |dry_run| GarbageCollectIndexArgs { - config_uri: test_env.config_uri.clone(), + config_uri: test_env.resource_files.config.clone(), index_id: index_id.clone(), grace_period: Duration::from_secs(3600), dry_run, @@ -792,10 +742,7 @@ async fn test_garbage_collect_cli_no_grace() { ); let args = DeleteIndexArgs { - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - ..Default::default() - }, + client_args: test_env.default_client_args(), index_id, dry_run: false, assume_yes: true, @@ -815,7 +762,7 @@ async fn test_garbage_collect_index_cli() { test_env.start_server().await.unwrap(); create_logs_index(&test_env).await.unwrap(); let index_uid = test_env.index_metadata().await.unwrap().index_uid; - local_ingest_docs(test_env.resource_files["logs"].as_path(), &test_env) + local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env) .await .unwrap(); @@ -832,7 +779,7 @@ async fn test_garbage_collect_index_cli() { }; let create_gc_args = |grace_period_secs| GarbageCollectIndexArgs { - config_uri: test_env.config_uri.clone(), + config_uri: test_env.resource_files.config.clone(), index_id: index_id.clone(), grace_period: Duration::from_secs(grace_period_secs), dry_run: false, @@ -967,7 +914,7 @@ async fn test_all_local_index() { .unwrap(); assert!(metadata_file_exists); - local_ingest_docs(test_env.resource_files["logs"].as_path(), &test_env) + local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env) .await .unwrap(); @@ -997,10 +944,7 @@ async fn test_all_local_index() { assert_eq!(search_stream_response, "72057597000000\n72057608000000\n"); let args = DeleteIndexArgs { - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - ..Default::default() - }, + client_args: test_env.default_client_args(), index_id, dry_run: false, assume_yes: true, @@ -1028,7 +972,7 @@ async fn test_all_with_s3_localstack_cli() { let s3_path = upload_test_file( test_env.storage_resolver.clone(), - test_env.resource_files["logs"].clone(), + test_env.resource_files.log_docs.clone(), "quickwit-integration-tests", "sources/", &append_random_suffix("test-all--cli-s3-localstack"), @@ -1039,10 +983,7 @@ async fn test_all_with_s3_localstack_cli() { // Cli search let args = SearchIndexArgs { - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - ..Default::default() - }, + client_args: test_env.default_client_args(), index_id: index_id.clone(), query: "level:info".to_string(), aggregation: None, @@ -1072,10 +1013,7 @@ async fn test_all_with_s3_localstack_cli() { assert_eq!(result["num_hits"], Value::Number(Number::from(2i64))); let args = DeleteIndexArgs { - client_args: ClientArgs { - cluster_endpoint: test_env.cluster_endpoint.clone(), - ..Default::default() - }, + client_args: test_env.default_client_args(), index_id: index_id.clone(), dry_run: false, assume_yes: true, diff --git a/quickwit/quickwit-cli/tests/helpers.rs b/quickwit/quickwit-cli/tests/helpers.rs index 1ff7fb8234e..0a52f6b9792 100644 --- a/quickwit/quickwit-cli/tests/helpers.rs +++ b/quickwit/quickwit-cli/tests/helpers.rs @@ -18,7 +18,6 @@ // along with this program. If not, see . use std::borrow::Borrow; -use std::collections::HashMap; use std::fs; use std::path::PathBuf; use std::str::FromStr; @@ -27,6 +26,7 @@ use std::sync::Arc; use anyhow::Context; use predicates::str; use quickwit_cli::service::RunCliCommand; +use quickwit_cli::ClientArgs; use quickwit_common::net::find_available_tcp_port; use quickwit_common::test_utils::wait_for_server_ready; use quickwit_common::uri::Uri; @@ -81,6 +81,12 @@ const DEFAULT_INDEX_CONFIG: &str = r#" default_search_fields: [event] "#; +const RETENTION_CONFIG: &str = r#" + retention: + period: 1 week + schedule: daily +"#; + const DEFAULT_QUICKWIT_CONFIG: &str = r#" version: 0.8 metastore_uri: #metastore_uri @@ -103,6 +109,15 @@ const WIKI_JSON_DOCS: &str = r#"{"body": "foo", "title": "shimroy", "url": "http {"body": "biz", "title": "modern", "url": "https://wiki.com?id=13"} "#; +pub struct TestResourceFiles { + pub config: Uri, + pub index_config: Uri, + pub index_config_without_uri: Uri, + pub index_config_with_retention: Uri, + pub log_docs: PathBuf, + pub wikipedia_docs: PathBuf, +} + /// A struct to hold few info about the test environment. pub struct TestEnv { /// The temporary directory of the test. @@ -112,14 +127,14 @@ pub struct TestEnv { /// Path of the directory where indexes are stored. pub indexes_dir_path: PathBuf, /// Resource files needed for the test. - pub resource_files: HashMap<&'static str, PathBuf>, + pub resource_files: TestResourceFiles, /// The metastore URI. pub metastore_uri: Uri, pub metastore_resolver: MetastoreResolver, pub metastore: MetastoreServiceClient, - pub config_uri: Uri, + pub cluster_endpoint: Url, - pub index_config_uri: Uri, + /// The index ID. pub index_id: IndexId, pub index_uri: Uri, @@ -137,12 +152,6 @@ impl TestEnv { .unwrap() } - pub fn index_config_without_uri(&self) -> String { - self.resource_files["index_config_without_uri"] - .display() - .to_string() - } - pub async fn index_metadata(&self) -> anyhow::Result { let index_metadata = self .metastore() @@ -155,7 +164,7 @@ impl TestEnv { pub async fn start_server(&self) -> anyhow::Result<()> { let run_command = RunCliCommand { - config_uri: self.config_uri.clone(), + config_uri: self.resource_files.config.clone(), services: Some(QuickwitService::supported_services()), }; tokio::spawn(async move { @@ -169,6 +178,13 @@ impl TestEnv { wait_for_server_ready(([127, 0, 0, 1], self.rest_listen_port).into()).await?; Ok(()) } + + pub fn default_client_args(&self) -> ClientArgs { + ClientArgs { + cluster_endpoint: self.cluster_endpoint.clone(), + ..Default::default() + } + } } pub enum TestStorageType { @@ -176,6 +192,10 @@ pub enum TestStorageType { LocalFileSystem, } +fn uri_from_path(path: PathBuf) -> Uri { + Uri::from_str(&format!("file://{}", path.display())).unwrap() +} + /// Creates all necessary artifacts in a test environment. pub async fn create_test_env( index_id: IndexId, @@ -216,6 +236,14 @@ pub async fn create_test_env( .replace("#index_id", &index_id) .replace("index_uri: #index_uri\n", ""), )?; + let index_config_with_retention_path = + resources_dir_path.join("index_config_with_retention.yaml"); + fs::write( + &index_config_with_retention_path, + format!("{DEFAULT_INDEX_CONFIG}{RETENTION_CONFIG}") + .replace("#index_id", &index_id) + .replace("#index_uri", index_uri.as_str()), + )?; let node_config_path = resources_dir_path.join("config.yaml"); let rest_listen_port = find_available_tcp_port()?; let grpc_listen_port = find_available_tcp_port()?; @@ -233,23 +261,18 @@ pub async fn create_test_env( let wikipedia_docs_path = resources_dir_path.join("wikis.json"); fs::write(&wikipedia_docs_path, WIKI_JSON_DOCS)?; - let mut resource_files = HashMap::new(); - resource_files.insert("config", node_config_path); - resource_files.insert("index_config", index_config_path); - resource_files.insert("index_config_without_uri", index_config_without_uri_path); - resource_files.insert("logs", log_docs_path); - resource_files.insert("wiki", wikipedia_docs_path); - - let config_uri = - Uri::from_str(&format!("file://{}", resource_files["config"].display())).unwrap(); - let index_config_uri = Uri::from_str(&format!( - "file://{}", - resource_files["index_config"].display() - )) - .unwrap(); let cluster_endpoint = Url::parse(&format!("http://localhost:{rest_listen_port}")) .context("failed to parse cluster endpoint")?; + let resource_files = TestResourceFiles { + config: uri_from_path(node_config_path), + index_config: uri_from_path(index_config_path), + index_config_without_uri: uri_from_path(index_config_without_uri_path), + index_config_with_retention: uri_from_path(index_config_with_retention_path), + log_docs: log_docs_path, + wikipedia_docs: wikipedia_docs_path, + }; + Ok(TestEnv { _temp_dir: temp_dir, data_dir_path, @@ -258,9 +281,7 @@ pub async fn create_test_env( metastore_uri, metastore_resolver, metastore, - config_uri, cluster_endpoint, - index_config_uri, index_id, index_uri, rest_listen_port, diff --git a/quickwit/quickwit-config/src/index_config/mod.rs b/quickwit/quickwit-config/src/index_config/mod.rs index 574c68dea45..48adf301b75 100644 --- a/quickwit/quickwit-config/src/index_config/mod.rs +++ b/quickwit/quickwit-config/src/index_config/mod.rs @@ -34,7 +34,7 @@ use quickwit_common::uri::Uri; use quickwit_doc_mapper::{DefaultDocMapperBuilder, DocMapper, DocMapping, Mode}; use quickwit_proto::types::IndexId; use serde::{Deserialize, Serialize}; -pub use serialize::load_index_config_from_user_config; +pub use serialize::{load_index_config_from_user_config, load_index_config_update}; use tracing::warn; use crate::index_config::serialize::VersionedIndexConfig; diff --git a/quickwit/quickwit-config/src/index_config/serialize.rs b/quickwit/quickwit-config/src/index_config/serialize.rs index 07885867962..b02322d5339 100644 --- a/quickwit/quickwit-config/src/index_config/serialize.rs +++ b/quickwit/quickwit-config/src/index_config/serialize.rs @@ -17,7 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use anyhow::Context; +use anyhow::{ensure, Context}; use quickwit_common::uri::Uri; use quickwit_proto::types::IndexId; use serde::{Deserialize, Serialize}; @@ -61,6 +61,45 @@ pub fn load_index_config_from_user_config( index_config_for_serialization.build_and_validate(Some(default_index_root_uri)) } +/// Parses and validates an [`IndexConfig`] update. +/// +/// Ensures that the new configuration is valid in itself and compared to the +/// current index config. If the new configuration omits some fields, the +/// default values will be used, not those of the current index config. The only +/// exception is the index_uri because it cannot be updated. +pub fn load_index_config_update( + config_format: ConfigFormat, + index_config_bytes: &[u8], + current_index_config: &IndexConfig, +) -> anyhow::Result { + let current_index_parent_dir = ¤t_index_config + .index_uri + .parent() + .context("Unexpected `index_uri` format on current configuration")?; + let new_index_config = load_index_config_from_user_config( + config_format, + index_config_bytes, + current_index_parent_dir, + )?; + ensure!( + current_index_config.index_id == new_index_config.index_id, + "`index_id` in config file {} does not match updated `index_id` {}", + current_index_config.index_id, + new_index_config.index_id + ); + ensure!( + current_index_config.index_uri == new_index_config.index_uri, + "`index_uri` cannot be updated, current value {}, new expected value {}", + current_index_config.index_uri, + new_index_config.index_uri + ); + ensure!( + current_index_config.doc_mapping == new_index_config.doc_mapping, + "`doc_mapping` cannot be updated" + ); + Ok(new_index_config) +} + impl IndexConfigForSerialization { fn index_uri_or_fallback_to_default( &self, @@ -246,4 +285,155 @@ mod test { assert_eq!(index_config.index_uri.as_str(), "s3://mybucket/hdfs-logs"); } } + + #[test] + fn test_update_index_root_uri() { + let original_config_yaml = r#" + version: 0.8 + index_id: hdfs-logs + doc_mapping: {} + "#; + let original_config: IndexConfig = load_index_config_from_user_config( + ConfigFormat::Yaml, + original_config_yaml.as_bytes(), + &Uri::for_test("s3://mybucket"), + ) + .unwrap(); + { + // use default in update + let updated_config_yaml = r#" + version: 0.8 + index_id: hdfs-logs + doc_mapping: {} + "#; + let updated_config = load_index_config_update( + ConfigFormat::Yaml, + updated_config_yaml.as_bytes(), + &original_config, + ) + .unwrap(); + assert_eq!(updated_config.index_uri.as_str(), "s3://mybucket/hdfs-logs"); + } + { + // use the current index_uri explicitely + let updated_config_yaml = r#" + version: 0.8 + index_id: hdfs-logs + index_uri: s3://mybucket/hdfs-logs + doc_mapping: {} + "#; + let updated_config = load_index_config_update( + ConfigFormat::Yaml, + updated_config_yaml.as_bytes(), + &original_config, + ) + .unwrap(); + assert_eq!(updated_config.index_uri.as_str(), "s3://mybucket/hdfs-logs"); + } + { + // try using a different index_uri + let updated_config_yaml = r#" + version: 0.8 + index_id: hdfs-logs + index_uri: s3://mybucket/new-directory/ + doc_mapping: {} + "#; + let load_error = load_index_config_update( + ConfigFormat::Yaml, + updated_config_yaml.as_bytes(), + &original_config, + ) + .unwrap_err(); + assert!(format!("{:?}", load_error).contains("`index_uri` cannot be updated")); + } + } + + #[test] + fn test_update_reset_defaults() { + let original_config_yaml = r#" + version: 0.8 + index_id: hdfs-logs + doc_mapping: + field_mappings: + - name: timestamp + type: datetime + fast: true + timestamp_field: timestamp + + search_settings: + default_search_fields: [body] + + indexing_settings: + commit_timeout_secs: 10 + + retention: + period: 90 days + schedule: daily + "#; + let original_config: IndexConfig = load_index_config_from_user_config( + ConfigFormat::Yaml, + original_config_yaml.as_bytes(), + &Uri::for_test("s3://mybucket"), + ) + .unwrap(); + + let updated_config_yaml = r#" + version: 0.8 + index_id: hdfs-logs + doc_mapping: + field_mappings: + - name: timestamp + type: datetime + fast: true + timestamp_field: timestamp + "#; + let updated_config = load_index_config_update( + ConfigFormat::Yaml, + updated_config_yaml.as_bytes(), + &original_config, + ) + .unwrap(); + assert_eq!( + updated_config.search_settings.default_search_fields, + Vec::::default(), + ); + assert_eq!( + updated_config.indexing_settings.commit_timeout_secs, + IndexingSettings::default_commit_timeout_secs() + ); + assert_eq!(updated_config.retention_policy_opt, None); + } + + #[test] + fn test_update_doc_mappings() { + let original_config_yaml = r#" + version: 0.8 + index_id: hdfs-logs + doc_mapping: {} + "#; + let original_config: IndexConfig = load_index_config_from_user_config( + ConfigFormat::Yaml, + original_config_yaml.as_bytes(), + &Uri::for_test("s3://mybucket"), + ) + .unwrap(); + + let updated_config_yaml = r#" + version: 0.8 + index_id: hdfs-logs + doc_mapping: + field_mappings: + - name: body + type: text + tokenizer: default + record: position + "#; + let load_error = load_index_config_update( + ConfigFormat::Yaml, + updated_config_yaml.as_bytes(), + &original_config, + ) + .unwrap_err(); + assert!(format!("{:?}", load_error).contains("`doc_mapping` cannot be updated")); + } } diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index 9a3f9263aa1..e41c46dce5b 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -48,8 +48,8 @@ pub use cluster_config::ClusterConfig; // See #2048 use index_config::serialize::{IndexConfigV0_8, VersionedIndexConfig}; pub use index_config::{ - build_doc_mapper, load_index_config_from_user_config, IndexConfig, IndexingResources, - IndexingSettings, RetentionPolicy, SearchSettings, + build_doc_mapper, load_index_config_from_user_config, load_index_config_update, IndexConfig, + IndexingResources, IndexingSettings, RetentionPolicy, SearchSettings, }; pub use quickwit_doc_mapper::DocMapping; use serde::de::DeserializeOwned; diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index 33c9aa0dfcd..8cf93ceb8cd 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -39,7 +39,7 @@ use quickwit_common::Progress; use quickwit_config::service::QuickwitService; use quickwit_config::{ClusterConfig, IndexConfig, IndexTemplate, SourceConfig}; use quickwit_ingest::{IngesterPool, LocalShardsUpdate}; -use quickwit_metastore::{CreateIndexRequestExt, CreateIndexResponseExt}; +use quickwit_metastore::{CreateIndexRequestExt, CreateIndexResponseExt, IndexMetadataResponseExt}; use quickwit_proto::control_plane::{ AdviseResetShardsRequest, AdviseResetShardsResponse, ControlPlaneError, ControlPlaneResult, GetOrCreateOpenShardsRequest, GetOrCreateOpenShardsResponse, GetOrCreateOpenShardsSubrequest, @@ -582,6 +582,15 @@ impl Handler for ControlPlane { return convert_metastore_error(metastore_error); } }; + let index_metadata = match response.deserialize_index_metadata() { + Ok(index_metadata) => index_metadata, + Err(serde_error) => { + error!(error=?serde_error, "failed to deserialize index metadata"); + return Err(ActorExitStatus::from(anyhow::anyhow!(serde_error))); + } + }; + self.model + .update_index_config(&index_uid, index_metadata.index_config)?; // TODO: Handle doc mapping and/or indexing settings update here. info!(%index_uid, "updated index"); Ok(Ok(response)) diff --git a/quickwit/quickwit-control-plane/src/model/mod.rs b/quickwit/quickwit-control-plane/src/model/mod.rs index 229f9275558..8948eb0cfca 100644 --- a/quickwit/quickwit-control-plane/src/model/mod.rs +++ b/quickwit/quickwit-control-plane/src/model/mod.rs @@ -29,7 +29,7 @@ use anyhow::bail; use fnv::{FnvHashMap, FnvHashSet}; use quickwit_common::pretty::PrettyDisplay; use quickwit_common::Progress; -use quickwit_config::SourceConfig; +use quickwit_config::{IndexConfig, SourceConfig}; use quickwit_ingest::ShardInfos; use quickwit_metastore::{IndexMetadata, ListIndexesMetadataResponseExt}; use quickwit_proto::control_plane::ControlPlaneResult; @@ -194,6 +194,21 @@ impl ControlPlaneModel { self.update_metrics(); } + /// Updates the configuration of the specified index, returning an error if + /// the index didn't exist. + pub(crate) fn update_index_config( + &mut self, + index_uid: &IndexUid, + index_config: IndexConfig, + ) -> anyhow::Result<()> { + let Some(index_model) = self.index_table.get_mut(index_uid) else { + bail!("index `{}` not found", index_uid.index_id); + }; + index_model.index_config = index_config; + self.update_metrics(); + Ok(()) + } + pub(crate) fn delete_index(&mut self, index_uid: &IndexUid) { self.index_table.remove(index_uid); self.index_uid_table.remove(&index_uid.index_id); @@ -529,6 +544,27 @@ mod tests { assert_eq!(model.shard_table.get_shards(&source_uid).unwrap().len(), 0); } + #[test] + fn test_control_plane_model_update_index_config() { + let mut model = ControlPlaneModel::default(); + let index_metadata = IndexMetadata::for_test("test-index", "ram:///indexes"); + let index_uid = index_metadata.index_uid.clone(); + model.add_index(index_metadata.clone()); + + // Update the index config + let mut index_config = index_metadata.index_config.clone(); + index_config.search_settings.default_search_fields = vec!["myfield".to_string()]; + model + .update_index_config(&index_uid, index_config.clone()) + .unwrap(); + + assert_eq!(model.index_table.len(), 1); + assert_eq!( + model.index_table.get(&index_uid).unwrap().index_config, + index_config + ); + } + #[test] fn test_control_plane_model_delete_index() { let mut model = ControlPlaneModel::default(); diff --git a/quickwit/quickwit-integration-tests/src/tests/index_update_tests.rs b/quickwit/quickwit-integration-tests/src/tests/index_update_tests.rs index 5ddc8a034ec..d9bfdbac5eb 100644 --- a/quickwit/quickwit-integration-tests/src/tests/index_update_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/index_update_tests.rs @@ -21,9 +21,8 @@ use std::collections::HashSet; use std::time::Duration; use quickwit_config::service::QuickwitService; -use quickwit_config::SearchSettings; use quickwit_rest_client::rest_client::CommitType; -use quickwit_serve::{IndexUpdates, SearchRequestQueryString}; +use quickwit_serve::SearchRequestQueryString; use serde_json::json; use crate::ingest_json; @@ -121,12 +120,21 @@ async fn test_update_on_multi_nodes_cluster() { .indexes() .update( "my-updatable-index", - IndexUpdates { - search_settings: SearchSettings { - default_search_fields: vec!["title".to_string(), "body".to_string()], - }, - retention_policy_opt: None, - }, + r#" + version: 0.8 + index_id: my-updatable-index + doc_mapping: + field_mappings: + - name: title + type: text + - name: body + type: text + indexing_settings: + commit_timeout_secs: 1 + search_settings: + default_search_fields: [title, body] + "#, + quickwit_config::ConfigFormat::Yaml, ) .await .unwrap(); diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index 2a480e3d716..68a6dc56a77 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -30,7 +30,9 @@ use std::ops::Bound; use itertools::Itertools; use quickwit_common::pretty::PrettySample; -use quickwit_config::{RetentionPolicy, SearchSettings, SourceConfig, INGEST_V2_SOURCE_ID}; +use quickwit_config::{ + IndexingSettings, RetentionPolicy, SearchSettings, SourceConfig, INGEST_V2_SOURCE_ID, +}; use quickwit_proto::metastore::{ AcquireShardsRequest, AcquireShardsResponse, DeleteQuery, DeleteShardsRequest, DeleteShardsResponse, DeleteTask, EntityKind, ListShardsSubrequest, ListShardsSubresponse, @@ -223,6 +225,11 @@ impl FileBackedIndex { self.metadata.set_search_settings(search_settings) } + /// Replaces the indexing settings in the index config, returning whether a mutation occurred. + pub fn set_indexing_settings(&mut self, search_settings: IndexingSettings) -> bool { + self.metadata.set_indexing_settings(search_settings) + } + /// Stages a single split. /// /// If a split already exists and is in the [SplitState::Staged] state, diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs index 244033f8177..57f2091f7af 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs @@ -504,12 +504,14 @@ impl MetastoreService for FileBackedMetastore { ) -> MetastoreResult { let retention_policy_opt = request.deserialize_retention_policy()?; let search_settings = request.deserialize_search_settings()?; + let indexing_settings = request.deserialize_indexing_settings()?; let index_uid = request.index_uid(); let index_metadata = self .mutate(index_uid, |index| { let mut mutation_occurred = index.set_retention_policy(retention_policy_opt); mutation_occurred |= index.set_search_settings(search_settings); + mutation_occurred |= index.set_indexing_settings(indexing_settings); let index_metadata = index.metadata().clone(); diff --git a/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs b/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs index f3e59fe90b6..4176be081d3 100644 --- a/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs @@ -23,7 +23,9 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use quickwit_common::uri::Uri; -use quickwit_config::{IndexConfig, RetentionPolicy, SearchSettings, SourceConfig}; +use quickwit_config::{ + IndexConfig, IndexingSettings, RetentionPolicy, SearchSettings, SourceConfig, +}; use quickwit_proto::metastore::{EntityKind, MetastoreError, MetastoreResult}; use quickwit_proto::types::{IndexUid, SourceId}; use serde::{Deserialize, Serialize}; @@ -108,7 +110,7 @@ impl IndexMetadata { } } - /// Replaces or removes the current search settings, returning whether a mutation occurred. + /// Replaces the current search settings, returning whether a mutation occurred. pub fn set_search_settings(&mut self, search_settings: SearchSettings) -> bool { if self.index_config.search_settings != search_settings { self.index_config.search_settings = search_settings; @@ -118,6 +120,16 @@ impl IndexMetadata { } } + /// Replaces the current indexing settings, returning whether a mutation occurred. + pub fn set_indexing_settings(&mut self, indexing_settings: IndexingSettings) -> bool { + if self.index_config.indexing_settings != indexing_settings { + self.index_config.indexing_settings = indexing_settings; + true + } else { + false + } + } + /// Adds a source to the index. Returns an error if the source already exists. pub fn add_source(&mut self, source_config: SourceConfig) -> MetastoreResult<()> { match self.sources.entry(source_config.source_id.clone()) { diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index 3ca762f6068..7378faa94fc 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -32,7 +32,9 @@ use futures::TryStreamExt; pub use index_metadata::IndexMetadata; use itertools::Itertools; use quickwit_common::thread_pool::run_cpu_intensive; -use quickwit_config::{IndexConfig, RetentionPolicy, SearchSettings, SourceConfig}; +use quickwit_config::{ + IndexConfig, IndexingSettings, RetentionPolicy, SearchSettings, SourceConfig, +}; use quickwit_doc_mapper::tag_pruning::TagFilterAst; use quickwit_proto::metastore::{ serde_utils, AddSourceRequest, CreateIndexRequest, CreateIndexResponse, DeleteTask, @@ -188,6 +190,7 @@ pub trait UpdateIndexRequestExt { index_uid: impl Into, search_settings: &SearchSettings, retention_policy_opt: &Option, + indexing_settings: &IndexingSettings, ) -> MetastoreResult; /// Deserializes the `search_settings_json` field of an [`UpdateIndexRequest`] into a @@ -197,6 +200,10 @@ pub trait UpdateIndexRequestExt { /// Deserializes the `retention_policy_json` field of an [`UpdateIndexRequest`] into a /// [`RetentionPolicy`] object. fn deserialize_retention_policy(&self) -> MetastoreResult>; + + /// Deserializes the `indexing_settings_json` field of an [`UpdateIndexRequest`] into a + /// [`IndexingSettings`] object. + fn deserialize_indexing_settings(&self) -> MetastoreResult; } impl UpdateIndexRequestExt for UpdateIndexRequest { @@ -204,17 +211,20 @@ impl UpdateIndexRequestExt for UpdateIndexRequest { index_uid: impl Into, search_settings: &SearchSettings, retention_policy_opt: &Option, + indexing_settings: &IndexingSettings, ) -> MetastoreResult { - let search_settings_json = serde_utils::to_json_str(&search_settings)?; + let search_settings_json = serde_utils::to_json_str(search_settings)?; let retention_policy_json = retention_policy_opt .as_ref() .map(serde_utils::to_json_str) .transpose()?; + let indexing_settings_json = serde_utils::to_json_str(indexing_settings)?; let update_request = UpdateIndexRequest { index_uid: Some(index_uid.into()), search_settings_json, retention_policy_json, + indexing_settings_json, }; Ok(update_request) } @@ -229,6 +239,10 @@ impl UpdateIndexRequestExt for UpdateIndexRequest { .map(|policy| serde_utils::from_json_str(policy)) .transpose() } + + fn deserialize_indexing_settings(&self) -> MetastoreResult { + serde_utils::from_json_str(&self.indexing_settings_json) + } } /// Helper trait to build a [`IndexMetadataResponse`] and deserialize its payload. diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index c6fcfb45ada..6efeeec02be 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -398,12 +398,14 @@ impl MetastoreService for PostgresqlMetastore { ) -> MetastoreResult { let retention_policy_opt = request.deserialize_retention_policy()?; let search_settings = request.deserialize_search_settings()?; + let indexing_settings = request.deserialize_indexing_settings()?; let index_uid: IndexUid = request.index_uid().clone(); let updated_index_metadata = run_with_tx!(self.connection_pool, tx, { mutate_index_metadata::(tx, index_uid, |index_metadata| { let mut mutation_occurred = index_metadata.set_retention_policy(retention_policy_opt); mutation_occurred |= index_metadata.set_search_settings(search_settings); + mutation_occurred |= index_metadata.set_indexing_settings(indexing_settings); Ok(MutationOccurred::from(mutation_occurred)) }) .await diff --git a/quickwit/quickwit-metastore/src/tests/index.rs b/quickwit/quickwit-metastore/src/tests/index.rs index 48dbcd14655..2e2a991406e 100644 --- a/quickwit/quickwit-metastore/src/tests/index.rs +++ b/quickwit/quickwit-metastore/src/tests/index.rs @@ -25,13 +25,12 @@ // - list_indexes // - delete_index -use std::collections::BTreeSet; - use quickwit_common::rand::append_random_suffix; +use quickwit_config::merge_policy_config::{MergePolicyConfig, StableLogMergePolicyConfig}; use quickwit_config::{ - IndexConfig, RetentionPolicy, SearchSettings, SourceConfig, CLI_SOURCE_ID, INGEST_V2_SOURCE_ID, + IndexConfig, IndexingSettings, RetentionPolicy, SearchSettings, SourceConfig, CLI_SOURCE_ID, + INGEST_V2_SOURCE_ID, }; -use quickwit_doc_mapper::FieldMappingType; use quickwit_proto::metastore::{ CreateIndexRequest, DeleteIndexRequest, EntityKind, IndexMetadataFailure, IndexMetadataFailureReason, IndexMetadataRequest, IndexMetadataSubrequest, @@ -86,15 +85,14 @@ pub async fn test_metastore_create_index< cleanup_index(&mut metastore, index_uid).await; } -pub async fn test_metastore_update_index< +async fn setup_metastore_for_update< MetastoreToTest: MetastoreService + MetastoreServiceExt + DefaultForTest, ->() { +>() -> (MetastoreToTest, IndexUid, IndexConfig) { let mut metastore = MetastoreToTest::default_for_test().await; let index_id = append_random_suffix("test-update-index"); let index_uri = format!("ram:///indexes/{index_id}"); let index_config = IndexConfig::for_test(&index_id, &index_uri); - let create_index_request = CreateIndexRequest::try_from_index_config(&index_config).unwrap(); let index_uid = metastore .create_index(create_index_request.clone()) @@ -103,51 +101,31 @@ pub async fn test_metastore_update_index< .index_uid() .clone(); - let index_metadata = metastore - .index_metadata(IndexMetadataRequest::for_index_id(index_id.to_string())) - .await - .unwrap() - .deserialize_index_metadata() - .unwrap(); - - // use all fields that are currently not set as default - let current_defaults = BTreeSet::from_iter( - index_metadata - .index_config - .search_settings - .default_search_fields, - ); - let new_search_setting = SearchSettings { - default_search_fields: index_metadata - .index_config - .doc_mapping - .field_mappings - .iter() - .filter(|f| matches!(f.mapping_type, FieldMappingType::Text(..))) - .filter(|f| !current_defaults.contains(&f.name)) - .map(|f| f.name.clone()) - .collect(), - }; + (metastore, index_uid, index_config) +} +pub async fn test_metastore_update_retention_policy< + MetastoreToTest: MetastoreService + MetastoreServiceExt + DefaultForTest, +>() { + let (mut metastore, index_uid, index_config) = + setup_metastore_for_update::().await; let new_retention_policy_opt = Some(RetentionPolicy { retention_period: String::from("3 days"), evaluation_schedule: String::from("daily"), }); - assert_ne!( - index_metadata.index_config.retention_policy_opt, new_retention_policy_opt, - "original and updated value are the same, test became inefficient" - ); - // run same update twice to check idempotence, then None as a corner case check + // set and unset retention policy multiple times for loop_retention_policy_opt in [ + None, new_retention_policy_opt.clone(), new_retention_policy_opt.clone(), None, ] { let index_update = UpdateIndexRequest::try_from_updates( index_uid.clone(), - &new_search_setting, + &index_config.search_settings, &loop_retention_policy_opt, + &index_config.indexing_settings, ) .unwrap(); let response_metadata = metastore @@ -157,23 +135,124 @@ pub async fn test_metastore_update_index< .deserialize_index_metadata() .unwrap(); assert_eq!(response_metadata.index_uid, index_uid); - assert_eq!( - response_metadata.index_config.search_settings, - new_search_setting - ); assert_eq!( response_metadata.index_config.retention_policy_opt, loop_retention_policy_opt ); let updated_metadata = metastore - .index_metadata(IndexMetadataRequest::for_index_id(index_id.to_string())) + .index_metadata(IndexMetadataRequest::for_index_id( + index_uid.index_id.to_string(), + )) .await .unwrap() .deserialize_index_metadata() .unwrap(); assert_eq!(response_metadata, updated_metadata); } + cleanup_index(&mut metastore, index_uid).await; +} +pub async fn test_metastore_update_search_settings< + MetastoreToTest: MetastoreService + MetastoreServiceExt + DefaultForTest, +>() { + let (mut metastore, index_uid, index_config) = + setup_metastore_for_update::().await; + + for loop_search_settings in [ + Vec::new(), + vec!["body".to_string()], + vec!["body".to_string()], + vec!["body".to_string(), "owner".to_string()], + Vec::new(), + ] { + let index_update = UpdateIndexRequest::try_from_updates( + index_uid.clone(), + &SearchSettings { + default_search_fields: loop_search_settings.clone(), + }, + &index_config.retention_policy_opt, + &index_config.indexing_settings, + ) + .unwrap(); + let response_metadata = metastore + .update_index(index_update) + .await + .unwrap() + .deserialize_index_metadata() + .unwrap(); + assert_eq!( + response_metadata + .index_config + .search_settings + .default_search_fields, + loop_search_settings + ); + let updated_metadata = metastore + .index_metadata(IndexMetadataRequest::for_index_id( + index_uid.index_id.to_string(), + )) + .await + .unwrap() + .deserialize_index_metadata() + .unwrap(); + assert_eq!( + updated_metadata + .index_config + .search_settings + .default_search_fields, + loop_search_settings + ); + } + cleanup_index(&mut metastore, index_uid).await; +} + +pub async fn test_metastore_update_indexing_settings< + MetastoreToTest: MetastoreService + MetastoreServiceExt + DefaultForTest, +>() { + let (mut metastore, index_uid, index_config) = + setup_metastore_for_update::().await; + + for loop_indexing_settings in [ + MergePolicyConfig::Nop, + MergePolicyConfig::Nop, + MergePolicyConfig::StableLog(StableLogMergePolicyConfig { + merge_factor: 5, + ..Default::default() + }), + ] { + let index_update = UpdateIndexRequest::try_from_updates( + index_uid.clone(), + &index_config.search_settings, + &index_config.retention_policy_opt, + &IndexingSettings { + merge_policy: loop_indexing_settings.clone(), + ..Default::default() + }, + ) + .unwrap(); + let resp_metadata = metastore + .update_index(index_update) + .await + .unwrap() + .deserialize_index_metadata() + .unwrap(); + assert_eq!( + resp_metadata.index_config.indexing_settings.merge_policy, + loop_indexing_settings + ); + let updated_metadata = metastore + .index_metadata(IndexMetadataRequest::for_index_id( + index_uid.index_id.to_string(), + )) + .await + .unwrap() + .deserialize_index_metadata() + .unwrap(); + assert_eq!( + updated_metadata.index_config.indexing_settings.merge_policy, + loop_indexing_settings + ); + } cleanup_index(&mut metastore, index_uid).await; } diff --git a/quickwit/quickwit-metastore/src/tests/mod.rs b/quickwit/quickwit-metastore/src/tests/mod.rs index a8d6820548b..4a579f12eac 100644 --- a/quickwit/quickwit-metastore/src/tests/mod.rs +++ b/quickwit/quickwit-metastore/src/tests/mod.rs @@ -195,9 +195,21 @@ macro_rules! metastore_test_suite { } #[tokio::test] - async fn test_metastore_update_index() { + async fn test_metastore_update_retention_policy() { let _ = tracing_subscriber::fmt::try_init(); - $crate::tests::index::test_metastore_update_index::<$metastore_type>().await; + $crate::tests::index::test_metastore_update_retention_policy::<$metastore_type>().await; + } + + #[tokio::test] + async fn test_metastore_update_search_settings() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::index::test_metastore_update_search_settings::<$metastore_type>().await; + } + + #[tokio::test] + async fn test_metastore_update_indexing_settings() { + let _ = tracing_subscriber::fmt::try_init(); + $crate::tests::index::test_metastore_update_indexing_settings::<$metastore_type>().await; } #[tokio::test] diff --git a/quickwit/quickwit-proto/protos/quickwit/metastore.proto b/quickwit/quickwit-proto/protos/quickwit/metastore.proto index ac4e35b0817..084dccad417 100644 --- a/quickwit/quickwit-proto/protos/quickwit/metastore.proto +++ b/quickwit/quickwit-proto/protos/quickwit/metastore.proto @@ -215,6 +215,7 @@ message UpdateIndexRequest { quickwit.common.IndexUid index_uid = 1; string search_settings_json = 2; optional string retention_policy_json = 3; + string indexing_settings_json = 4; } message ListIndexesMetadataRequest { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs index d3537e02799..76abf454b70 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs @@ -30,6 +30,8 @@ pub struct UpdateIndexRequest { pub search_settings_json: ::prost::alloc::string::String, #[prost(string, optional, tag = "3")] pub retention_policy_json: ::core::option::Option<::prost::alloc::string::String>, + #[prost(string, tag = "4")] + pub indexing_settings_json: ::prost::alloc::string::String, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/quickwit/quickwit-rest-client/src/rest_client.rs b/quickwit/quickwit-rest-client/src/rest_client.rs index fe2f796a38c..062b5389f92 100644 --- a/quickwit/quickwit-rest-client/src/rest_client.rs +++ b/quickwit/quickwit-rest-client/src/rest_client.rs @@ -26,9 +26,7 @@ use quickwit_indexing::actors::IndexingServiceCounters; pub use quickwit_ingest::CommitType; use quickwit_metastore::{IndexMetadata, Split, SplitInfo}; use quickwit_search::SearchResponseRest; -use quickwit_serve::{ - IndexUpdates, ListSplitsQueryParams, ListSplitsResponse, SearchRequestQueryString, -}; +use quickwit_serve::{ListSplitsQueryParams, ListSplitsResponse, SearchRequestQueryString}; use reqwest::header::{HeaderMap, HeaderValue, CONTENT_TYPE}; use reqwest::{Client, ClientBuilder, Method, StatusCode, Url}; use serde::Serialize; @@ -333,12 +331,12 @@ impl<'a> IndexClient<'a> { pub async fn create( &self, - index_config: impl ToString, + index_config: impl AsRef<[u8]>, config_format: ConfigFormat, overwrite: bool, ) -> Result { let header_map = header_from_config_format(config_format); - let body = Bytes::from(index_config.to_string()); + let body = Bytes::copy_from_slice(index_config.as_ref()); let response = self .transport .send( @@ -357,13 +355,22 @@ impl<'a> IndexClient<'a> { pub async fn update( &self, index_id: &str, - index_updates: IndexUpdates, + index_config: impl AsRef<[u8]>, + config_format: ConfigFormat, ) -> Result { - let body = Bytes::from(serde_json::to_string(&index_updates)?); + let header_map = header_from_config_format(config_format); + let body = Bytes::copy_from_slice(index_config.as_ref()); let path = format!("indexes/{index_id}"); let response = self .transport - .send::<()>(Method::PUT, &path, None, None, Some(body), self.timeout) + .send::<()>( + Method::PUT, + &path, + Some(header_map), + None, + Some(body), + self.timeout, + ) .await?; let index_metadata = response.deserialize().await?; Ok(index_metadata) @@ -490,11 +497,11 @@ impl<'a> SourceClient<'a> { pub async fn create( &self, - source_config_input: impl ToString, + source_config_input: impl AsRef<[u8]>, config_format: ConfigFormat, ) -> Result { let header_map = header_from_config_format(config_format); - let source_config_bytes: Bytes = Bytes::from(source_config_input.to_string()); + let source_config_bytes = Bytes::copy_from_slice(source_config_input.as_ref()); let response = self .transport .send::<()>( diff --git a/quickwit/quickwit-serve/src/index_api/mod.rs b/quickwit/quickwit-serve/src/index_api/mod.rs index 64d58cddb94..a11878ac264 100644 --- a/quickwit/quickwit-serve/src/index_api/mod.rs +++ b/quickwit/quickwit-serve/src/index_api/mod.rs @@ -20,6 +20,6 @@ mod rest_handler; pub use self::rest_handler::{ - get_index_metadata_handler, index_management_handlers, IndexApi, IndexUpdates, - ListSplitsQueryParams, ListSplitsResponse, + get_index_metadata_handler, index_management_handlers, IndexApi, ListSplitsQueryParams, + ListSplitsResponse, }; diff --git a/quickwit/quickwit-serve/src/index_api/rest_handler.rs b/quickwit/quickwit-serve/src/index_api/rest_handler.rs index 12e2f22b367..22f9e38e0ac 100644 --- a/quickwit/quickwit-serve/src/index_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/index_api/rest_handler.rs @@ -22,9 +22,8 @@ use std::sync::Arc; use bytes::Bytes; use quickwit_common::uri::Uri; use quickwit_config::{ - load_source_config_from_user_config, validate_index_id_pattern, ConfigFormat, NodeConfig, - RetentionPolicy, SearchSettings, SourceConfig, SourceParams, CLI_SOURCE_ID, - INGEST_API_SOURCE_ID, + load_index_config_update, load_source_config_from_user_config, validate_index_id_pattern, + ConfigFormat, NodeConfig, SourceConfig, SourceParams, CLI_SOURCE_ID, INGEST_API_SOURCE_ID, }; use quickwit_doc_mapper::{analyze_text, TokenizerConfig}; use quickwit_index_management::{IndexService, IndexServiceError}; @@ -68,7 +67,7 @@ use crate::with_arg; toggle_source, delete_source, ), - components(schemas(ToggleSource, SplitsForDeletion, IndexStats, IndexUpdates)) + components(schemas(ToggleSource, SplitsForDeletion, IndexStats)) )] pub struct IndexApi; @@ -529,22 +528,14 @@ async fn create_index( .await } -/// The body of the index update request. All fields will be replaced in the -/// existing configuration. -#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Default, utoipa::ToSchema)] -#[serde(deny_unknown_fields)] // Remove when adding new fields to allow to ensure forward compatibility -pub struct IndexUpdates { - pub search_settings: SearchSettings, - #[serde(rename = "retention_policy")] - pub retention_policy_opt: Option, -} - fn update_index_handler( metastore: MetastoreServiceClient, ) -> impl Filter + Clone { warp::path!("indexes" / String) .and(warp::put()) - .and(json_body()) + .and(extract_config_format()) + .and(warp::body::content_length_limit(1024 * 1024)) + .and(warp::filters::body::bytes()) .and(with_arg(metastore)) .then(update_index) .map(log_failure("failed to update index")) @@ -556,9 +547,9 @@ fn update_index_handler( put, tag = "Indexes", path = "/indexes/{index_id}", - request_body = IndexUpdates, + request_body = VersionedIndexConfig, responses( - (status = 200, description = "Successfully updated the index configuration.") + (status = 200, description = "Successfully updated the index configuration.", body = VersionedIndexMetadata) ), params( ("index_id" = String, Path, description = "The index ID to update."), @@ -566,26 +557,37 @@ fn update_index_handler( )] /// Updates an existing index. /// -/// This endpoint has PUT semantics, which means that all the updatable fields of the index -/// configuration are replaced by the values specified in the request. In particular, omitting an -/// optional field like `retention_policy` will delete the associated configuration. +/// This endpoint follows PUT semantics, which means that all the fields of the +/// current configuration are replaced by the values specified in this request +/// or the associated defaults. In particular, if the field is optional (e.g. +/// `retention_policy`), omitting it will delete the associated configuration. +/// If the new configuration file contains updates that cannot be applied, the +/// request fails, and none of the updates are applied. async fn update_index( - index_id: IndexId, - request: IndexUpdates, + target_index_id: IndexId, + config_format: ConfigFormat, + index_config_bytes: Bytes, mut metastore: MetastoreServiceClient, ) -> Result { - info!(index_id = %index_id, "update-index"); - let index_metadata_request = IndexMetadataRequest::for_index_id(index_id.to_string()); - let index_uid: IndexUid = metastore + info!(index_id = %target_index_id, "update-index"); + + let index_metadata_request = IndexMetadataRequest::for_index_id(target_index_id.to_string()); + let current_index_metadata = metastore .index_metadata(index_metadata_request) .await? - .deserialize_index_metadata()? - .index_uid; + .deserialize_index_metadata()?; + let index_uid = current_index_metadata.index_uid.clone(); + let current_index_config = current_index_metadata.into_index_config(); + + let new_index_config = + load_index_config_update(config_format, &index_config_bytes, ¤t_index_config) + .map_err(IndexServiceError::InvalidConfig)?; let update_request = UpdateIndexRequest::try_from_updates( index_uid, - &request.search_settings, - &request.retention_policy_opt, + &new_index_config.search_settings, + &new_index_config.retention_policy_opt, + &new_index_config.indexing_settings, )?; let update_resp = metastore.update_index(update_request).await?; Ok(update_resp.deserialize_index_metadata()?) @@ -1813,7 +1815,7 @@ mod tests { .path("/indexes/hdfs-logs") .method("PUT") .json(&true) - .body(r#"{"search_settings":{"default_search_fields":["severity_text","body"]}}"#) + .body(r#"{"version": "0.7", "index_id": "hdfs-logs", "doc_mapping": {"field_mappings":[{"name": "timestamp", "type": "i64", "fast": true, "indexed": true}]},"search_settings":{"default_search_fields":["severity_text", "body"]}}"#) .reply(&index_management_handler) .await; assert_eq!(resp.status(), 200); diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index c361e7b9153..0a9157f4ff4 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -120,7 +120,7 @@ use tracing::{debug, error, info, warn}; use warp::{Filter, Rejection}; pub use crate::build_info::{BuildInfo, RuntimeInfo}; -pub use crate::index_api::{IndexUpdates, ListSplitsQueryParams, ListSplitsResponse}; +pub use crate::index_api::{ListSplitsQueryParams, ListSplitsResponse}; pub use crate::metrics::SERVE_METRICS; use crate::rate_modulator::RateModulator; #[cfg(test)]