Skip to content

Commit

Permalink
Add multi indices search to elasticsearch + quickwit search endpoints.
Browse files Browse the repository at this point in the history
  • Loading branch information
fmassot committed Aug 13, 2023
1 parent bd6baf7 commit 4ea81ea
Show file tree
Hide file tree
Showing 64 changed files with 2,281 additions and 914 deletions.
4 changes: 2 additions & 2 deletions quickwit/quickwit-cli/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ async fn test_cmd_ingest_on_non_existing_index() {

assert_eq!(
error.root_cause().downcast_ref::<MetastoreError>().unwrap(),
&MetastoreError::IndexDoesNotExist {
index_id: "index-does-not-exist".to_string()
&MetastoreError::IndexesDoNotExist {
index_ids: vec!["index-does-not-exist".to_string()]
}
);
}
Expand Down
46 changes: 46 additions & 0 deletions quickwit/quickwit-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,38 @@ pub fn validate_identifier(label: &str, value: &str) -> anyhow::Result<()> {
);
}

/// Checks whether an index ID pattern conforms to Quickwit conventions.
/// Index id patterns accepts the same characters as identifiers AND accepts `*`
/// chars to allow for glob-like patterns.
pub fn validate_index_id_pattern(value: &str) -> anyhow::Result<()> {
static IDENTIFIER_REGEX_WITH_GLOB_PATTERN: OnceCell<Regex> = OnceCell::new();

if !IDENTIFIER_REGEX_WITH_GLOB_PATTERN
.get_or_init(|| Regex::new(r"^[a-zA-Z\*][a-zA-Z0-9-_\.\*]{0,254}$").expect("Failed to compile regular expression. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."))
.is_match(value)
{
bail!(
"Index ID pattern `{value}` is invalid. Patterns must match the following regular \
expression: `^[a-zA-Z\\*][a-zA-Z0-9-_\\.\\*]{{0,254}}$`."
);
}

// Forbid multiple stars in the pattern to force the user making simpler patterns
// as multiple stars does not bring any value.
if value.contains("**") {
bail!("Index ID pattern `{value}` is invalid. Patterns must not contain `**`.");
}

// If there is no star in the pattern, we need at least 3 characters.
if !value.contains('*') && value.len() < 3 {
bail!(
"Index ID pattern `{value}` is invalid. An index ID must have at least 3 characters."
);
}

Ok(())
}

pub fn validate_node_id(node_id: &str) -> anyhow::Result<()> {
if !is_valid_hostname(node_id) {
bail!(
Expand Down Expand Up @@ -216,6 +248,7 @@ pub trait TestableForRegression: Serialize + DeserializeOwned {
#[cfg(test)]
mod tests {
use super::validate_identifier;
use crate::validate_index_id_pattern;

#[test]
fn test_validate_identifier() {
Expand All @@ -236,4 +269,17 @@ mod tests {
.to_string()
.contains("Cluster ID identifier `foo!` is invalid."));
}

#[test]
fn test_validate_index_id_pattern() {
validate_index_id_pattern("*").unwrap();
validate_index_id_pattern("abc.*").unwrap();
validate_index_id_pattern("ab").unwrap_err();
validate_index_id_pattern("").unwrap_err();
validate_index_id_pattern("**").unwrap_err();
assert!(validate_index_id_pattern("foo!")
.unwrap_err()
.to_string()
.contains("Index ID pattern `foo!` is invalid."));
}
}
17 changes: 11 additions & 6 deletions quickwit/quickwit-control-plane/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use async_trait::async_trait;
use itertools::Itertools;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler};
use quickwit_config::SourceConfig;
use quickwit_metastore::Metastore;
use quickwit_metastore::{ListIndexesQuery, Metastore};
use quickwit_proto::control_plane::{NotifyIndexChangeRequest, NotifyIndexChangeResponse};
use quickwit_proto::indexing::{ApplyIndexingPlanRequest, IndexingService, IndexingTask};
use serde::Serialize;
Expand Down Expand Up @@ -191,7 +191,10 @@ impl IndexingScheduler {
}

async fn fetch_source_configs(&self) -> anyhow::Result<HashMap<IndexSourceId, SourceConfig>> {
let indexes_metadatas = self.metastore.list_indexes_metadatas().await?;
let indexes_metadatas = self
.metastore
.list_indexes_metadatas(ListIndexesQuery::All)
.await?;
let source_configs: HashMap<IndexSourceId, SourceConfig> = indexes_metadatas
.into_iter()
.flat_map(|index_metadata| {
Expand Down Expand Up @@ -530,7 +533,7 @@ mod tests {
use quickwit_config::service::QuickwitService;
use quickwit_config::{KafkaSourceParams, SourceConfig, SourceInputFormat, SourceParams};
use quickwit_indexing::IndexingService;
use quickwit_metastore::{IndexMetadata, MockMetastore};
use quickwit_metastore::{IndexMetadata, ListIndexesQuery, MockMetastore};
use quickwit_proto::indexing::{ApplyIndexingPlanRequest, IndexingServiceClient, IndexingTask};
use serde_json::json;

Expand Down Expand Up @@ -613,9 +616,11 @@ mod tests {
let mut index_metadata_2 = index_metadata_for_test(index_2, source_2, 1, 1);
index_metadata_2.create_timestamp = index_metadata_1.create_timestamp + 1;
let mut metastore = MockMetastore::default();
metastore
.expect_list_indexes_metadatas()
.returning(move || Ok(vec![index_metadata_2.clone(), index_metadata_1.clone()]));
metastore.expect_list_indexes_metadatas().returning(
move |_list_indexes_query: ListIndexesQuery| {
Ok(vec![index_metadata_2.clone(), index_metadata_1.clone()])
},
);
let mut indexer_inboxes = Vec::new();
let indexing_client_pool = Pool::default();
let change_stream = cluster.ready_nodes_change_stream().await;
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-index-management/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,8 +652,8 @@ mod tests {

let mut mock_metastore = MockMetastore::new();
mock_metastore.expect_delete_splits().return_once(|_, _| {
Err(MetastoreError::IndexDoesNotExist {
index_id: index_id.to_string(),
Err(MetastoreError::IndexesDoNotExist {
index_ids: vec![index_id.to_string()],
})
});
let metastore = Arc::new(mock_metastore);
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-index-management/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ impl IndexService {
if overwrite {
match self.delete_index(&index_config.index_id, false).await {
Ok(_)
| Err(IndexServiceError::MetastoreError(MetastoreError::IndexDoesNotExist {
index_id: _,
| Err(IndexServiceError::MetastoreError(MetastoreError::IndexesDoNotExist {
index_ids: _,
})) => {
// Ignore IndexDoesNotExist error.
// Ignore IndexesDoNotExist error.
}
Err(error) => {
return Err(error);
Expand Down Expand Up @@ -442,7 +442,7 @@ mod tests {
.await
.unwrap_err();
assert!(
matches!(error, MetastoreError::IndexDoesNotExist { index_id } if index_id == index_uid.index_id())
matches!(error, MetastoreError::IndexesDoNotExist { index_ids } if index_ids == vec![index_uid.index_id().to_string()])
);
assert!(!storage.exists(split_path).await.unwrap());
}
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ impl Handler<Spawn> for IndexingPipeline {
}
self.previous_generations_statistics.num_spawn_attempts = 1 + spawn.retry_count;
if let Err(spawn_error) = self.spawn_pipeline(ctx).await {
if let Some(MetastoreError::IndexDoesNotExist { .. }) =
if let Some(MetastoreError::IndexesDoNotExist { .. }) =
spawn_error.downcast_ref::<MetastoreError>()
{
info!(error = ?spawn_error, "Could not spawn pipeline, index might have been deleted.");
Expand Down
10 changes: 5 additions & 5 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use quickwit_config::{
build_doc_mapper, IndexConfig, IndexerConfig, SourceConfig, INGEST_API_SOURCE_ID,
};
use quickwit_ingest::{DropQueueRequest, IngestApiService, ListQueuesRequest, QUEUES_DIR_NAME};
use quickwit_metastore::{IndexMetadata, Metastore};
use quickwit_metastore::{IndexMetadata, ListIndexesQuery, Metastore};
use quickwit_proto::indexing::{
ApplyIndexingPlanRequest, ApplyIndexingPlanResponse, IndexingError, IndexingPipelineId,
IndexingTask,
Expand Down Expand Up @@ -602,7 +602,7 @@ impl IndexingService {

let index_ids: HashSet<String> = self
.metastore
.list_indexes_metadatas()
.list_indexes_metadatas(ListIndexesQuery::All)
.await
.context("Failed to list queues")?
.into_iter()
Expand Down Expand Up @@ -1293,9 +1293,9 @@ mod tests {
.insert(source_config.source_id.clone(), source_config.clone());
let mut metastore = MockMetastore::default();
let index_metadata_clone = index_metadata.clone();
metastore
.expect_list_indexes_metadatas()
.returning(move || Ok(vec![index_metadata_clone.clone()]));
metastore.expect_list_indexes_metadatas().returning(
move |_list_indexes_query: ListIndexesQuery| Ok(vec![index_metadata_clone.clone()]),
);
metastore
.expect_index_metadata()
.returning(move |_| Ok(index_metadata.clone()));
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/merge_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ mod tests {
index_uid: index_uid.to_string(),
start_timestamp: None,
end_timestamp: None,
query_ast: quickwit_query::query_ast::qast_helper(delete_query, &["body"]),
query_ast: quickwit_query::query_ast::qast_string_helper(delete_query, &["body"]),
})
.await?;
let split_metadata = metastore
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ impl Handler<Spawn> for MergePipeline {
}
self.previous_generations_statistics.num_spawn_attempts = 1 + spawn.retry_count;
if let Err(spawn_error) = self.spawn_pipeline(ctx).await {
if let Some(MetastoreError::IndexDoesNotExist { .. }) =
if let Some(MetastoreError::IndexesDoNotExist { .. }) =
spawn_error.downcast_ref::<MetastoreError>()
{
info!(error = ?spawn_error, "Could not spawn pipeline, index might have been deleted.");
Expand Down Expand Up @@ -471,7 +471,7 @@ mod tests {
.expect_list_splits()
.times(1)
.returning(move |list_split_query| {
assert_eq!(list_split_query.index_uid, index_uid);
assert_eq!(list_split_query.index_uids, vec![index_uid.clone()]);
assert_eq!(
list_split_query.split_states,
vec![quickwit_metastore::SplitState::Published]
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ mod split_store;
mod test_utils;

#[cfg(any(test, feature = "testsuite"))]
pub use test_utils::{mock_split, mock_split_meta, TestSandbox};
pub use test_utils::{mock_split, mock_split_meta, MockSplitBuilder, TestSandbox};

use self::merge_policy::MergePolicy;
pub use self::source::check_source_connectivity;
Expand Down
37 changes: 30 additions & 7 deletions quickwit/quickwit-indexing/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,19 +218,42 @@ impl TestSandbox {
}
}

/// Mock split builder.
pub struct MockSplitBuilder {
split_metadata: SplitMetadata,
}

impl MockSplitBuilder {
pub fn new(split_id: &str) -> Self {
Self {
split_metadata: mock_split_meta(split_id, &IndexUid::new("test-index")),
}
}

pub fn with_index_uid(mut self, index_uid: &IndexUid) -> Self {
self.split_metadata.index_uid = index_uid.clone();
self
}

pub fn build(self) -> Split {
Split {
split_state: SplitState::Published,
split_metadata: self.split_metadata,
update_timestamp: 0,
publish_timestamp: None,
}
}
}

/// Mock split helper.
pub fn mock_split(split_id: &str) -> Split {
Split {
split_state: SplitState::Published,
split_metadata: mock_split_meta(split_id),
update_timestamp: 0,
publish_timestamp: None,
}
MockSplitBuilder::new(split_id).build()
}

/// Mock split meta helper.
pub fn mock_split_meta(split_id: &str) -> SplitMetadata {
pub fn mock_split_meta(split_id: &str, index_uid: &IndexUid) -> SplitMetadata {
SplitMetadata {
index_uid: index_uid.clone(),
split_id: split_id.to_string(),
partition_id: 13u64,
num_docs: 10,
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-jaeger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl JaegerService {
let aggregation_query = build_aggregations_query(trace_query.num_traces as usize);
let max_hits = 0;
let search_request = SearchRequest {
index_id,
index_id_patterns: vec![index_id],
query_ast,
aggregation_request: Some(aggregation_query),
max_hits,
Expand Down Expand Up @@ -303,7 +303,7 @@ impl JaegerService {
serde_json::to_string(&query_ast).map_err(|err| Status::internal(err.to_string()))?;

let search_request = SearchRequest {
index_id: OTEL_TRACES_INDEX_ID.to_string(),
index_id_patterns: vec![OTEL_TRACES_INDEX_ID.to_string()],
query_ast,
start_timestamp: Some(*search_window.start()),
end_timestamp: Some(*search_window.end()),
Expand Down
18 changes: 13 additions & 5 deletions quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ mod tests {
index_uid: index_uid.to_string(),
start_timestamp: None,
end_timestamp: None,
query_ast: quickwit_query::query_ast::qast_helper("body:delete", &[]),
query_ast: quickwit_query::query_ast::qast_string_helper("body:delete", &[]),
})
.await
.unwrap();
Expand All @@ -365,8 +365,12 @@ mod tests {
mock_search_service
.expect_leaf_search()
.withf(|leaf_request| -> bool {
leaf_request.search_request.as_ref().unwrap().index_id
== "test-delete-pipeline-simple"
leaf_request
.search_request
.as_ref()
.unwrap()
.index_id_patterns
== vec!["test-delete-pipeline-simple".to_string()]
})
.times(2)
.returning(move |_: LeafSearchRequest| {
Expand Down Expand Up @@ -441,8 +445,12 @@ mod tests {
mock_search_service
.expect_leaf_search()
.withf(|leaf_request| -> bool {
leaf_request.search_request.as_ref().unwrap().index_id
== "test-delete-pipeline-shut-down"
leaf_request
.search_request
.as_ref()
.unwrap()
.index_id_patterns
== vec!["test-delete-pipeline-shut-down".to_string()]
})
.returning(move |_: LeafSearchRequest| {
Ok(LeafSearchResponse {
Expand Down
Loading

0 comments on commit 4ea81ea

Please sign in to comment.