Skip to content

Commit

Permalink
Removing single node search. (#3689)
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton authored Jul 26, 2023
1 parent 800a108 commit 90c59a2
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 148 deletions.
16 changes: 8 additions & 8 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ jobs:
with:
filters: |
rust_src:
- quickwit/**.rs
- quickwit/**.toml
- quickwit/**.proto
- ./quickwit/rest-api-tests/**
- quickwit/**/*.rs
- quickwit/**/*.toml
- quickwit/**/*.proto
- quickwit/rest-api-tests/**
# The following step is just meant to install rustup actually.
# The next one installs the correct toolchain.
- name: Install rustup
Expand Down Expand Up @@ -103,11 +103,11 @@ jobs:
with:
filters: |
rust_src:
- quickwit/**.rs
- quickwit/**.toml
- quickwit/**.proto
- quickwit/**/*.rs
- quickwit/**/*.toml
- quickwit/**/*.proto
- name: Install Ubuntu packages
if: steps.modified.outputs.rust_src == 'true'
if: always() && steps.modified.outputs.rust_src == 'true'
run: sudo apt-get -y install protobuf-compiler python3 python3-pip
- name: Setup nightly Rust Toolchain (for rustfmt)
uses: actions-rs/toolchain@v1
Expand Down
106 changes: 2 additions & 104 deletions quickwit/quickwit-search/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ use metrics::SEARCH_METRICS;
use quickwit_common::tower::Pool;
use quickwit_doc_mapper::DocMapper;
use quickwit_query::query_ast::QueryAst;
use root::{finalize_aggregation, validate_request};
use service::SearcherContext;
use tantivy::schema::NamedFieldDocument;

/// Refer to this as `crate::Result<T>`.
Expand All @@ -59,15 +57,11 @@ pub type Result<T> = std::result::Result<T, SearchError>;
use std::net::SocketAddr;
use std::sync::Arc;

use anyhow::Context;
pub use find_trace_ids_collector::FindTraceIdsCollector;
use itertools::Itertools;
use quickwit_config::{build_doc_mapper, SearcherConfig};
use quickwit_config::SearcherConfig;
use quickwit_doc_mapper::tag_pruning::extract_tags_from_query;
use quickwit_metastore::{ListSplitsQuery, Metastore, SplitMetadata, SplitState};
use quickwit_proto::{
Hit, IndexUid, PartialHit, SearchRequest, SearchResponse, SplitIdAndFooterOffsets,
};
use quickwit_proto::{IndexUid, PartialHit, SearchRequest, SplitIdAndFooterOffsets};
use quickwit_storage::StorageResolver;
use tantivy::DocAddress;

Expand Down Expand Up @@ -173,102 +167,6 @@ fn convert_document_to_json_string(
Ok(content_json)
}

/// Performs a search on the current node.
/// See also `[distributed_search]`.
pub async fn single_node_search(
mut search_request: SearchRequest,
metastore: &dyn Metastore,
storage_resolver: StorageResolver,
) -> crate::Result<SearchResponse> {
let start_instant = tokio::time::Instant::now();
let index_metadata = metastore.index_metadata(&search_request.index_id).await?;
let index_uid = index_metadata.index_uid.clone();
let index_config = index_metadata.into_index_config();

let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings)
.map_err(|err| {
SearchError::InternalError(format!("Failed to build doc mapper. Cause: {err}"))
})?;

let query_ast: QueryAst = serde_json::from_str(&search_request.query_ast)?;
let query_ast_resolved: QueryAst =
query_ast.parse_user_query(doc_mapper.default_search_fields())?;
search_request.query_ast = serde_json::to_string(&query_ast_resolved)?;

let index_storage = storage_resolver.resolve(&index_config.index_uri).await?;
let metas = list_relevant_splits(index_uid, &search_request, metastore).await?;
let split_metadata: Vec<SplitIdAndFooterOffsets> =
metas.iter().map(extract_split_and_footer_offsets).collect();
validate_request(&*doc_mapper, &search_request)?;

// Verifying that the query is valid.
doc_mapper
.query(doc_mapper.schema(), &query_ast_resolved, true)
.map_err(|err| SearchError::InvalidQuery(err.to_string()))?;

let searcher_context = Arc::new(SearcherContext::new(SearcherConfig::default()));

let leaf_search_response = leaf_search(
searcher_context.clone(),
&search_request,
index_storage.clone(),
&split_metadata[..],
doc_mapper.clone(),
)
.await
.context("Failed to perform leaf search.")?;

let search_request_opt = if !search_request.snippet_fields.is_empty() {
Some(&search_request)
} else {
None
};

let fetch_docs_response = fetch_docs(
searcher_context.clone(),
leaf_search_response.partial_hits,
index_storage,
&split_metadata,
doc_mapper,
search_request_opt,
)
.await
.context("Failed to perform fetch docs.")?;
let hits: Vec<Hit> = fetch_docs_response
.hits
.into_iter()
.map(|leaf_hit| Hit {
json: leaf_hit.leaf_json,
partial_hit: leaf_hit.partial_hit,
snippet: leaf_hit.leaf_snippet_json,
})
.collect();
let elapsed = start_instant.elapsed();

let aggregations: Option<QuickwitAggregations> = search_request
.aggregation_request
.as_ref()
.map(|agg| serde_json::from_str(agg))
.transpose()?;

let aggregation = finalize_aggregation(
leaf_search_response.intermediate_aggregation_result,
aggregations,
&searcher_context,
)?;
Ok(SearchResponse {
aggregation,
num_hits: leaf_search_response.num_hits,
hits,
elapsed_time_micros: elapsed.as_micros() as u64,
errors: leaf_search_response
.failed_splits
.iter()
.map(|error| format!("{error:?}"))
.collect_vec(),
})
}

/// Starts a search node, aka a `searcher`.
pub async fn start_searcher_service(
searcher_config: SearcherConfig,
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ impl Job for SearchJob {
}
}

pub(crate) struct FetchDocsJob {
struct FetchDocsJob {
offsets: SplitIdAndFooterOffsets,
pub partial_hits: Vec<PartialHit>,
partial_hits: Vec<PartialHit>,
}

impl Job for FetchDocsJob {
Expand Down Expand Up @@ -186,7 +186,7 @@ fn validate_sort_by_field(field_name: &str, schema: &Schema) -> crate::Result<()
Ok(())
}

pub(crate) fn validate_request(
fn validate_request(
doc_mapper: &dyn DocMapper,
search_request: &SearchRequest,
) -> crate::Result<()> {
Expand Down Expand Up @@ -566,7 +566,7 @@ impl<'a, 'b> QueryAstVisitor<'b> for ExtractTimestampRange<'a> {
}
}

pub fn finalize_aggregation(
fn finalize_aggregation(
intermediate_aggregation_result: Option<Vec<u8>>,
aggregations: Option<QuickwitAggregations>,
searcher_context: &SearcherContext,
Expand Down
Loading

0 comments on commit 90c59a2

Please sign in to comment.