diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2af7e3acb58..b6da350bbc9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -52,10 +52,10 @@ jobs: with: filters: | rust_src: - - quickwit/**.rs - - quickwit/**.toml - - quickwit/**.proto - - ./quickwit/rest-api-tests/** + - quickwit/**/*.rs + - quickwit/**/*.toml + - quickwit/**/*.proto + - quickwit/rest-api-tests/** # The following step is just meant to install rustup actually. # The next one installs the correct toolchain. - name: Install rustup @@ -103,11 +103,11 @@ jobs: with: filters: | rust_src: - - quickwit/**.rs - - quickwit/**.toml - - quickwit/**.proto + - quickwit/**/*.rs + - quickwit/**/*.toml + - quickwit/**/*.proto - name: Install Ubuntu packages - if: steps.modified.outputs.rust_src == 'true' + if: always() && steps.modified.outputs.rust_src == 'true' run: sudo apt-get -y install protobuf-compiler python3 python3-pip - name: Setup nightly Rust Toolchain (for rustfmt) uses: actions-rs/toolchain@v1 diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index cf3da3404bd..33d03fdaaa7 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -49,8 +49,6 @@ use metrics::SEARCH_METRICS; use quickwit_common::tower::Pool; use quickwit_doc_mapper::DocMapper; use quickwit_query::query_ast::QueryAst; -use root::{finalize_aggregation, validate_request}; -use service::SearcherContext; use tantivy::schema::NamedFieldDocument; /// Refer to this as `crate::Result`. @@ -59,15 +57,11 @@ pub type Result = std::result::Result; use std::net::SocketAddr; use std::sync::Arc; -use anyhow::Context; pub use find_trace_ids_collector::FindTraceIdsCollector; -use itertools::Itertools; -use quickwit_config::{build_doc_mapper, SearcherConfig}; +use quickwit_config::SearcherConfig; use quickwit_doc_mapper::tag_pruning::extract_tags_from_query; use quickwit_metastore::{ListSplitsQuery, Metastore, SplitMetadata, SplitState}; -use quickwit_proto::{ - Hit, IndexUid, PartialHit, SearchRequest, SearchResponse, SplitIdAndFooterOffsets, -}; +use quickwit_proto::{IndexUid, PartialHit, SearchRequest, SplitIdAndFooterOffsets}; use quickwit_storage::StorageResolver; use tantivy::DocAddress; @@ -173,102 +167,6 @@ fn convert_document_to_json_string( Ok(content_json) } -/// Performs a search on the current node. -/// See also `[distributed_search]`. -pub async fn single_node_search( - mut search_request: SearchRequest, - metastore: &dyn Metastore, - storage_resolver: StorageResolver, -) -> crate::Result { - let start_instant = tokio::time::Instant::now(); - let index_metadata = metastore.index_metadata(&search_request.index_id).await?; - let index_uid = index_metadata.index_uid.clone(); - let index_config = index_metadata.into_index_config(); - - let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings) - .map_err(|err| { - SearchError::InternalError(format!("Failed to build doc mapper. Cause: {err}")) - })?; - - let query_ast: QueryAst = serde_json::from_str(&search_request.query_ast)?; - let query_ast_resolved: QueryAst = - query_ast.parse_user_query(doc_mapper.default_search_fields())?; - search_request.query_ast = serde_json::to_string(&query_ast_resolved)?; - - let index_storage = storage_resolver.resolve(&index_config.index_uri).await?; - let metas = list_relevant_splits(index_uid, &search_request, metastore).await?; - let split_metadata: Vec = - metas.iter().map(extract_split_and_footer_offsets).collect(); - validate_request(&*doc_mapper, &search_request)?; - - // Verifying that the query is valid. - doc_mapper - .query(doc_mapper.schema(), &query_ast_resolved, true) - .map_err(|err| SearchError::InvalidQuery(err.to_string()))?; - - let searcher_context = Arc::new(SearcherContext::new(SearcherConfig::default())); - - let leaf_search_response = leaf_search( - searcher_context.clone(), - &search_request, - index_storage.clone(), - &split_metadata[..], - doc_mapper.clone(), - ) - .await - .context("Failed to perform leaf search.")?; - - let search_request_opt = if !search_request.snippet_fields.is_empty() { - Some(&search_request) - } else { - None - }; - - let fetch_docs_response = fetch_docs( - searcher_context.clone(), - leaf_search_response.partial_hits, - index_storage, - &split_metadata, - doc_mapper, - search_request_opt, - ) - .await - .context("Failed to perform fetch docs.")?; - let hits: Vec = fetch_docs_response - .hits - .into_iter() - .map(|leaf_hit| Hit { - json: leaf_hit.leaf_json, - partial_hit: leaf_hit.partial_hit, - snippet: leaf_hit.leaf_snippet_json, - }) - .collect(); - let elapsed = start_instant.elapsed(); - - let aggregations: Option = search_request - .aggregation_request - .as_ref() - .map(|agg| serde_json::from_str(agg)) - .transpose()?; - - let aggregation = finalize_aggregation( - leaf_search_response.intermediate_aggregation_result, - aggregations, - &searcher_context, - )?; - Ok(SearchResponse { - aggregation, - num_hits: leaf_search_response.num_hits, - hits, - elapsed_time_micros: elapsed.as_micros() as u64, - errors: leaf_search_response - .failed_splits - .iter() - .map(|error| format!("{error:?}")) - .collect_vec(), - }) -} - /// Starts a search node, aka a `searcher`. pub async fn start_searcher_service( searcher_config: SearcherConfig, diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 2d8d475ea89..e875cec2237 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -95,9 +95,9 @@ impl Job for SearchJob { } } -pub(crate) struct FetchDocsJob { +struct FetchDocsJob { offsets: SplitIdAndFooterOffsets, - pub partial_hits: Vec, + partial_hits: Vec, } impl Job for FetchDocsJob { @@ -186,7 +186,7 @@ fn validate_sort_by_field(field_name: &str, schema: &Schema) -> crate::Result<() Ok(()) } -pub(crate) fn validate_request( +fn validate_request( doc_mapper: &dyn DocMapper, search_request: &SearchRequest, ) -> crate::Result<()> { @@ -566,7 +566,7 @@ impl<'a, 'b> QueryAstVisitor<'b> for ExtractTimestampRange<'a> { } } -pub fn finalize_aggregation( +fn finalize_aggregation( intermediate_aggregation_result: Option>, aggregations: Option, searcher_context: &SearcherContext, diff --git a/quickwit/quickwit-search/src/tests.rs b/quickwit/quickwit-search/src/tests.rs index 89d02ce3bf2..3018b433108 100644 --- a/quickwit/quickwit-search/src/tests.rs +++ b/quickwit/quickwit-search/src/tests.rs @@ -18,6 +18,7 @@ // along with this program. If not, see . use std::collections::{BTreeMap, BTreeSet}; +use std::net::Ipv4Addr; use assert_json_diff::{assert_json_eq, assert_json_include}; use quickwit_config::SearcherConfig; @@ -25,7 +26,8 @@ use quickwit_doc_mapper::DefaultDocMapper; use quickwit_indexing::TestSandbox; use quickwit_opentelemetry::otlp::TraceId; use quickwit_proto::{ - LeafListTermsResponse, SearchRequest, SortByValue, SortField, SortOrder, SortValue, + LeafListTermsResponse, SearchRequest, SearchResponse, SortByValue, SortField, SortOrder, + SortValue, }; use quickwit_query::query_ast::{qast_helper, query_ast_from_user_text}; use serde_json::{json, Value as JsonValue}; @@ -35,7 +37,7 @@ use tantivy::Term; use super::*; use crate::find_trace_ids_collector::Span; -use crate::single_node_search; +use crate::service::SearcherContext; #[tokio::test] async fn test_single_node_simple() -> anyhow::Result<()> { @@ -65,7 +67,7 @@ async fn test_single_node_simple() -> anyhow::Result<()> { }; let single_node_result = single_node_search( search_request, - &*test_sandbox.metastore(), + test_sandbox.metastore(), test_sandbox.storage_resolver(), ) .await?; @@ -111,7 +113,7 @@ async fn test_single_node_termset() -> anyhow::Result<()> { }; let single_node_result = single_node_search( search_request, - &*test_sandbox.metastore(), + test_sandbox.metastore(), test_sandbox.storage_resolver(), ) .await?; @@ -152,7 +154,7 @@ async fn test_single_search_with_snippet() -> anyhow::Result<()> { }; let single_node_result = single_node_search( search_request, - &*test_sandbox.metastore(), + test_sandbox.metastore(), test_sandbox.storage_resolver(), ) .await?; @@ -191,7 +193,7 @@ async fn slop_search_and_check( }; let single_node_result = single_node_search( search_request, - &*test_sandbox.metastore(), + test_sandbox.metastore(), test_sandbox.storage_resolver(), ) .await?; @@ -276,6 +278,39 @@ where E: Ord { true } +/// Performs a search on the current node. +/// See also `[distributed_search]`. +async fn single_node_search( + search_request: SearchRequest, + metastore: Arc, + storage_resolver: StorageResolver, +) -> crate::Result { + let socket_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 7280u16); + let searcher_pool = SearcherPool::default(); + let search_job_placer = SearchJobPlacer::new(searcher_pool.clone()); + let cluster_client = ClusterClient::new(search_job_placer); + let search_service = Arc::new(SearchServiceImpl::new( + metastore.clone(), + storage_resolver, + cluster_client.clone(), + SearcherConfig::default(), + )); + let search_service_client = + SearchServiceClient::from_service(search_service.clone(), socket_addr); + searcher_pool + .insert(socket_addr, search_service_client) + .await; + let searcher_config = SearcherConfig::default(); + let searcher_context = SearcherContext::new(searcher_config); + root_search( + &searcher_context, + search_request, + &*metastore, + &cluster_client, + ) + .await +} + #[tokio::test] #[cfg_attr(not(feature = "ci-test"), ignore)] async fn test_single_node_several_splits() -> anyhow::Result<()> { @@ -311,7 +346,7 @@ async fn test_single_node_several_splits() -> anyhow::Result<()> { }; let single_node_result = single_node_search( search_request, - &*test_sandbox.metastore(), + test_sandbox.metastore(), test_sandbox.storage_resolver(), ) .await?; @@ -386,7 +421,7 @@ async fn test_single_node_filtering() -> anyhow::Result<()> { }; let single_node_response = single_node_search( search_request, - &*test_sandbox.metastore(), + test_sandbox.metastore(), test_sandbox.storage_resolver(), ) .await?; @@ -409,7 +444,7 @@ async fn test_single_node_filtering() -> anyhow::Result<()> { }; let single_node_response = single_node_search( search_request, - &*test_sandbox.metastore(), + test_sandbox.metastore(), test_sandbox.storage_resolver(), ) .await?; @@ -431,7 +466,7 @@ async fn test_single_node_filtering() -> anyhow::Result<()> { }; let single_node_response = single_node_search( search_request, - &*test_sandbox.metastore(), + test_sandbox.metastore(), test_sandbox.storage_resolver(), ) .await; @@ -517,7 +552,7 @@ async fn single_node_search_sort_by_field( match single_node_search( search_request, - &*test_sandbox.metastore(), + test_sandbox.metastore(), test_sandbox.storage_resolver(), ) .await @@ -602,7 +637,7 @@ async fn test_sort_bm25() { let metastore = test_sandbox.metastore(); let storage_resolver = test_sandbox.storage_resolver(); async move { - single_node_search(search_request, &*metastore, storage_resolver) + single_node_search(search_request, metastore, storage_resolver) .await .unwrap() .hits @@ -694,7 +729,7 @@ async fn test_sort_by_static_and_dynamic_field() { let metastore = test_sandbox.metastore(); let storage_resolver = test_sandbox.storage_resolver(); async move { - let search_resp = single_node_search(search_request, &*metastore, storage_resolver) + let search_resp = single_node_search(search_request, metastore, storage_resolver) .await .unwrap(); assert_eq!(search_resp.num_hits, 4); @@ -797,7 +832,7 @@ async fn test_sort_by_2_field() { let metastore = test_sandbox.metastore(); let storage_resolver = test_sandbox.storage_resolver(); async move { - let search_resp = single_node_search(search_request, &*metastore, storage_resolver) + let search_resp = single_node_search(search_request, metastore, storage_resolver) .await .unwrap(); assert_eq!(search_resp.num_hits, 5); @@ -873,7 +908,7 @@ async fn test_single_node_invalid_sorting_with_query() { }; let single_node_response = single_node_search( search_request, - &*test_sandbox.metastore(), + test_sandbox.metastore(), test_sandbox.storage_resolver(), ) .await; @@ -1304,7 +1339,7 @@ async fn test_single_node_aggregation() -> anyhow::Result<()> { }; let single_node_result = single_node_search( search_request, - &*test_sandbox.metastore(), + test_sandbox.metastore(), test_sandbox.storage_resolver(), ) .await?; @@ -1375,18 +1410,17 @@ async fn test_single_node_aggregation_missing_fast_field() { aggregation_request: Some(agg_req.to_string()), ..Default::default() }; - let single_node_result = single_node_search( + let single_node_error = single_node_search( search_request, - &*test_sandbox.metastore(), + test_sandbox.metastore(), test_sandbox.storage_resolver(), ) .await - .unwrap(); - assert_eq!(single_node_result.num_hits, 0); - assert_eq!(single_node_result.errors.len(), 1); - assert!(single_node_result.errors[0].contains("color")); - assert!(single_node_result.errors[0].contains("is not configured as")); - assert!(single_node_result.errors[0].contains("fast field")); + .unwrap_err(); + let SearchError::InternalError(error_msg) = single_node_error else { + panic!(); + }; + assert!(error_msg.contains("Field \"color\" is not configured as fast field")); test_sandbox.assert_quit().await; } @@ -1419,7 +1453,7 @@ async fn test_single_node_with_ip_field() -> anyhow::Result<()> { }; let single_node_result = single_node_search( search_request, - &*test_sandbox.metastore(), + test_sandbox.metastore(), test_sandbox.storage_resolver(), ) .await?; @@ -1435,7 +1469,7 @@ async fn test_single_node_with_ip_field() -> anyhow::Result<()> { }; let single_node_result = single_node_search( search_request, - &*test_sandbox.metastore(), + test_sandbox.metastore(), test_sandbox.storage_resolver(), ) .await?; @@ -1494,7 +1528,7 @@ async fn test_single_node_range_queries() -> anyhow::Result<()> { }; let single_node_result = single_node_search( search_request, - &*test_sandbox.metastore(), + test_sandbox.metastore(), test_sandbox.storage_resolver(), ) .await?; @@ -1510,7 +1544,7 @@ async fn test_single_node_range_queries() -> anyhow::Result<()> { }; let single_node_result = single_node_search( search_request, - &*test_sandbox.metastore(), + test_sandbox.metastore(), test_sandbox.storage_resolver(), ) .await?; @@ -1526,7 +1560,7 @@ async fn test_single_node_range_queries() -> anyhow::Result<()> { }; let single_node_result = single_node_search( search_request, - &*test_sandbox.metastore(), + test_sandbox.metastore(), test_sandbox.storage_resolver(), ) .await?; @@ -1542,7 +1576,7 @@ async fn test_single_node_range_queries() -> anyhow::Result<()> { }; let single_node_result = single_node_search( search_request, - &*test_sandbox.metastore(), + test_sandbox.metastore(), test_sandbox.storage_resolver(), ) .await?; @@ -1558,7 +1592,7 @@ async fn test_single_node_range_queries() -> anyhow::Result<()> { }; let single_node_result = single_node_search( search_request, - &*test_sandbox.metastore(), + test_sandbox.metastore(), test_sandbox.storage_resolver(), ) .await?; @@ -1748,7 +1782,7 @@ async fn test_single_node_find_trace_ids_collector() { }; let single_node_result = single_node_search( search_request, - &*test_sandbox.metastore(), + test_sandbox.metastore(), test_sandbox.storage_resolver(), ) .await