Skip to content

Commit

Permalink
Add multi indices search to elasticsearch + quickwit search endpoints.
Browse files Browse the repository at this point in the history
  • Loading branch information
fmassot committed Aug 10, 2023
1 parent 5d2c66e commit ed486e8
Show file tree
Hide file tree
Showing 57 changed files with 1,885 additions and 749 deletions.
25 changes: 25 additions & 0 deletions quickwit/quickwit-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,31 @@ pub fn validate_identifier(label: &str, value: &str) -> anyhow::Result<()> {
);
}

/// Checks whether an index ID pattern conforms to Elasticsearch/Quickwit object naming conventions.
/// Index id patterns accepts the same characters as identifiers, but also accepts `*` as a
/// wildcard.
pub fn validate_index_id_pattern(value: &str) -> anyhow::Result<()> {
static IDENTIFIER_REGEX_WITH_GLOB_PATTERN: OnceCell<Regex> = OnceCell::new();

if !IDENTIFIER_REGEX_WITH_GLOB_PATTERN
.get_or_init(|| Regex::new(r"^[a-zA-Z\*][a-zA-Z0-9-_\.\*]{2,254}$").expect("Failed to compile regular expression. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."))
.is_match(value)
{
bail!(
"Index ID pattern `{value}` is invalid. Patterns must match the following regular \
expression: `^[a-zA-Z\\*][a-zA-Z0-9-_\\.\\*]{{2,254}}$`."
);
}

// Forbid multiple stars in the pattern to force the user making simpler patterns
// as multiple stars does not bring any value.
if value.contains("**") {
bail!("Index ID pattern `{value}` is invalid. Patterns must not contain `**`.");
}

Ok(())
}

pub fn validate_node_id(node_id: &str) -> anyhow::Result<()> {
if !is_valid_hostname(node_id) {
bail!(
Expand Down
17 changes: 11 additions & 6 deletions quickwit/quickwit-control-plane/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use async_trait::async_trait;
use itertools::Itertools;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler};
use quickwit_config::SourceConfig;
use quickwit_metastore::Metastore;
use quickwit_metastore::{ListIndexesQuery, Metastore};
use quickwit_proto::control_plane::{NotifyIndexChangeRequest, NotifyIndexChangeResponse};
use quickwit_proto::indexing::{ApplyIndexingPlanRequest, IndexingService, IndexingTask};
use serde::Serialize;
Expand Down Expand Up @@ -191,7 +191,10 @@ impl IndexingScheduler {
}

async fn fetch_source_configs(&self) -> anyhow::Result<HashMap<IndexSourceId, SourceConfig>> {
let indexes_metadatas = self.metastore.list_indexes_metadatas().await?;
let indexes_metadatas = self
.metastore
.list_indexes_metadatas(ListIndexesQuery::All)
.await?;
let source_configs: HashMap<IndexSourceId, SourceConfig> = indexes_metadatas
.into_iter()
.flat_map(|index_metadata| {
Expand Down Expand Up @@ -530,7 +533,7 @@ mod tests {
use quickwit_config::service::QuickwitService;
use quickwit_config::{KafkaSourceParams, SourceConfig, SourceInputFormat, SourceParams};
use quickwit_indexing::IndexingService;
use quickwit_metastore::{IndexMetadata, MockMetastore};
use quickwit_metastore::{IndexMetadata, ListIndexesQuery, MockMetastore};
use quickwit_proto::indexing::{ApplyIndexingPlanRequest, IndexingServiceClient, IndexingTask};
use serde_json::json;

Expand Down Expand Up @@ -613,9 +616,11 @@ mod tests {
let mut index_metadata_2 = index_metadata_for_test(index_2, source_2, 1, 1);
index_metadata_2.create_timestamp = index_metadata_1.create_timestamp + 1;
let mut metastore = MockMetastore::default();
metastore
.expect_list_indexes_metadatas()
.returning(move || Ok(vec![index_metadata_2.clone(), index_metadata_1.clone()]));
metastore.expect_list_indexes_metadatas().returning(
move |_list_indexes_query: ListIndexesQuery| {
Ok(vec![index_metadata_2.clone(), index_metadata_1.clone()])
},
);
let mut indexer_inboxes = Vec::new();
let indexing_client_pool = Pool::default();
let change_stream = cluster.ready_nodes_change_stream().await;
Expand Down
10 changes: 5 additions & 5 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use quickwit_config::{
build_doc_mapper, IndexConfig, IndexerConfig, SourceConfig, INGEST_API_SOURCE_ID,
};
use quickwit_ingest::{DropQueueRequest, IngestApiService, ListQueuesRequest, QUEUES_DIR_NAME};
use quickwit_metastore::{IndexMetadata, Metastore};
use quickwit_metastore::{IndexMetadata, ListIndexesQuery, Metastore};
use quickwit_proto::indexing::{
ApplyIndexingPlanRequest, ApplyIndexingPlanResponse, IndexingError, IndexingPipelineId,
IndexingTask,
Expand Down Expand Up @@ -602,7 +602,7 @@ impl IndexingService {

let index_ids: HashSet<String> = self
.metastore
.list_indexes_metadatas()
.list_indexes_metadatas(ListIndexesQuery::All)
.await
.context("Failed to list queues")?
.into_iter()
Expand Down Expand Up @@ -1293,9 +1293,9 @@ mod tests {
.insert(source_config.source_id.clone(), source_config.clone());
let mut metastore = MockMetastore::default();
let index_metadata_clone = index_metadata.clone();
metastore
.expect_list_indexes_metadatas()
.returning(move || Ok(vec![index_metadata_clone.clone()]));
metastore.expect_list_indexes_metadatas().returning(
move |_list_indexes_query: ListIndexesQuery| Ok(vec![index_metadata_clone.clone()]),
);
metastore
.expect_index_metadata()
.returning(move |_| Ok(index_metadata.clone()));
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/merge_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ mod tests {
index_uid: index_uid.to_string(),
start_timestamp: None,
end_timestamp: None,
query_ast: quickwit_query::query_ast::qast_helper(delete_query, &["body"]),
query_ast: quickwit_query::query_ast::qast_string_helper(delete_query, &["body"]),
})
.await?;
let split_metadata = metastore
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ mod tests {
.expect_list_splits()
.times(1)
.returning(move |list_split_query| {
assert_eq!(list_split_query.index_uid, index_uid);
assert_eq!(list_split_query.index_uids, vec![index_uid.clone()]);
assert_eq!(
list_split_query.split_states,
vec![quickwit_metastore::SplitState::Published]
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ mod split_store;
mod test_utils;

#[cfg(any(test, feature = "testsuite"))]
pub use test_utils::{mock_split, mock_split_meta, TestSandbox};
pub use test_utils::{mock_split, mock_split_meta, MockSplitBuilder, TestSandbox};

use self::merge_policy::MergePolicy;
pub use self::source::check_source_connectivity;
Expand Down
37 changes: 30 additions & 7 deletions quickwit/quickwit-indexing/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,19 +218,42 @@ impl TestSandbox {
}
}

/// Mock split builder.
pub struct MockSplitBuilder {
split_metadata: SplitMetadata,
}

impl MockSplitBuilder {
pub fn new(split_id: &str) -> Self {
Self {
split_metadata: mock_split_meta(split_id, &IndexUid::new("test-index")),
}
}

pub fn with_index_uid(mut self, index_uid: &IndexUid) -> Self {
self.split_metadata.index_uid = index_uid.clone();
self
}

pub fn build(self) -> Split {
Split {
split_state: SplitState::Published,
split_metadata: self.split_metadata,
update_timestamp: 0,
publish_timestamp: None,
}
}
}

/// Mock split helper.
pub fn mock_split(split_id: &str) -> Split {
Split {
split_state: SplitState::Published,
split_metadata: mock_split_meta(split_id),
update_timestamp: 0,
publish_timestamp: None,
}
MockSplitBuilder::new(split_id).build()
}

/// Mock split meta helper.
pub fn mock_split_meta(split_id: &str) -> SplitMetadata {
pub fn mock_split_meta(split_id: &str, index_uid: &IndexUid) -> SplitMetadata {
SplitMetadata {
index_uid: index_uid.clone(),
split_id: split_id.to_string(),
partition_id: 13u64,
num_docs: 10,
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-jaeger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl JaegerService {
let aggregation_query = build_aggregations_query(trace_query.num_traces as usize);
let max_hits = 0;
let search_request = SearchRequest {
index_id,
index_id_patterns: vec![index_id],
query_ast,
aggregation_request: Some(aggregation_query),
max_hits,
Expand Down Expand Up @@ -303,7 +303,7 @@ impl JaegerService {
serde_json::to_string(&query_ast).map_err(|err| Status::internal(err.to_string()))?;

let search_request = SearchRequest {
index_id: OTEL_TRACES_INDEX_ID.to_string(),
index_id_patterns: vec![OTEL_TRACES_INDEX_ID.to_string()],
query_ast,
start_timestamp: Some(*search_window.start()),
end_timestamp: Some(*search_window.end()),
Expand Down
18 changes: 13 additions & 5 deletions quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ mod tests {
index_uid: index_uid.to_string(),
start_timestamp: None,
end_timestamp: None,
query_ast: quickwit_query::query_ast::qast_helper("body:delete", &[]),
query_ast: quickwit_query::query_ast::qast_string_helper("body:delete", &[]),
})
.await
.unwrap();
Expand All @@ -365,8 +365,12 @@ mod tests {
mock_search_service
.expect_leaf_search()
.withf(|leaf_request| -> bool {
leaf_request.search_request.as_ref().unwrap().index_id
== "test-delete-pipeline-simple"
leaf_request
.search_request
.as_ref()
.unwrap()
.index_id_patterns
== vec!["test-delete-pipeline-simple".to_string()]
})
.times(2)
.returning(move |_: LeafSearchRequest| {
Expand Down Expand Up @@ -441,8 +445,12 @@ mod tests {
mock_search_service
.expect_leaf_search()
.withf(|leaf_request| -> bool {
leaf_request.search_request.as_ref().unwrap().index_id
== "test-delete-pipeline-shut-down"
leaf_request
.search_request
.as_ref()
.unwrap()
.index_id_patterns
== vec!["test-delete-pipeline-shut-down".to_string()]
})
.returning(move |_: LeafSearchRequest| {
Ok(LeafSearchResponse {
Expand Down
43 changes: 29 additions & 14 deletions quickwit/quickwit-janitor/src/actors/delete_task_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -34,7 +34,10 @@ use quickwit_metastore::{
};
use quickwit_proto::metastore::DeleteTask;
use quickwit_proto::{IndexUid, SearchRequest};
use quickwit_search::{jobs_to_leaf_request, SearchJob, SearchJobPlacer};
use quickwit_query::query_ast::QueryAst;
use quickwit_search::{
jobs_to_leaf_requests, IndexMetadataForLeafSearch, SearchJob, SearchJobPlacer,
};
use serde::Serialize;
use tantivy::Inventory;
use tracing::{debug, info};
Expand Down Expand Up @@ -291,25 +294,36 @@ impl DeleteTaskPlanner {
.delete_query
.as_ref()
.expect("Delete task must have a delete query.");
// TODO: resolve with the default fields.
let _query_ast: QueryAst = serde_json::from_str(&delete_query.query_ast)?;
let search_request = SearchRequest {
index_id: IndexUid::from(delete_query.index_uid.clone())
index_id_patterns: vec![IndexUid::from(delete_query.index_uid.clone())
.index_id()
.to_string(),
.to_string()],
query_ast: delete_query.query_ast.clone(),
start_timestamp: delete_query.start_timestamp,
end_timestamp: delete_query.end_timestamp,
..Default::default()
};
let leaf_search_request = jobs_to_leaf_request(
let mut search_indexes_metas = HashMap::new();
search_indexes_metas.insert(
IndexUid::from(delete_query.index_uid.clone()),
IndexMetadataForLeafSearch {
doc_mapper_str: doc_mapper_str.to_string(),
index_uri: Uri::from_well_formed(index_uri),
},
);
let leaf_search_request = jobs_to_leaf_requests(
&search_request,
doc_mapper_str,
index_uri,
&search_indexes_metas,
vec![search_job.clone()],
);
let response = search_client.leaf_search(leaf_search_request).await?;
ctx.record_progress();
if response.num_hits > 0 {
return Ok(true);
)?;
for leaf_request in leaf_search_request {
let response = search_client.leaf_search(leaf_request).await?;
ctx.record_progress();
if response.num_hits > 0 {
return Ok(true);
}
}
}
Ok(false)
Expand Down Expand Up @@ -457,8 +471,9 @@ mod tests {
// Creates 2 delete tasks, one that will match 1 document,
// the other that will match no document.

let body_delete_ast = quickwit_query::query_ast::qast_helper("body:delete", &[]);
let match_nothing_ast = quickwit_query::query_ast::qast_helper("body:matchnothing", &[]);
let body_delete_ast = quickwit_query::query_ast::qast_string_helper("body:delete", &[]);
let match_nothing_ast =
quickwit_query::query_ast::qast_string_helper("body:matchnothing", &[]);
metastore
.create_delete_task(DeleteQuery {
index_uid: index_uid.to_string(),
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-janitor/src/actors/delete_task_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use async_trait::async_trait;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, ActorHandle, Handler};
use quickwit_common::temp_dir::{self};
use quickwit_config::IndexConfig;
use quickwit_metastore::Metastore;
use quickwit_metastore::{ListIndexesQuery, Metastore};
use quickwit_proto::IndexUid;
use quickwit_search::SearchJobPlacer;
use quickwit_storage::StorageResolver;
Expand Down Expand Up @@ -108,7 +108,7 @@ impl DeleteTaskService {
) -> anyhow::Result<()> {
let mut index_config_by_index_id: HashMap<IndexUid, IndexConfig> = self
.metastore
.list_indexes_metadatas()
.list_indexes_metadatas(ListIndexesQuery::All)
.await?
.into_iter()
.map(|index_metadata| {
Expand Down
Loading

0 comments on commit ed486e8

Please sign in to comment.