From 9e95ec9e7a27bdb32c4addb5731d1c41bfd46caf Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 10 Nov 2023 21:52:43 +0900 Subject: [PATCH] Added an integration test for ingest v2 (#4071) - Adds an ingest_v2 method to the cluster sandbox. - Adds two simple integration tests. - Converts the ingest v2 error into rest api errors. - This PR suffers from the deadlock described in #4070 Closes #4065 --- .../src/test_utils/cluster_sandbox.rs | 10 +- .../src/tests/index_tests.rs | 146 ++++++++++++++---- quickwit/quickwit-rest-client/src/models.rs | 1 + .../quickwit-rest-client/src/rest_client.rs | 5 +- quickwit/quickwit-search/src/root.rs | 4 +- .../src/index_api/rest_handler.rs | 1 - .../src/ingest_api/rest_handler.rs | 49 +++++- 7 files changed, 174 insertions(+), 42 deletions(-) diff --git a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs index dc08cbd2186..2d832c9d0b3 100644 --- a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs +++ b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs @@ -201,6 +201,10 @@ impl ClusterSandbox { }) } + pub fn enable_ingest_v2(&mut self) { + self.searcher_rest_client.enable_ingest_v2(); + } + // Starts one node that runs all the services. pub async fn start_standalone_node() -> anyhow::Result { let temp_dir = tempfile::tempdir()?; @@ -286,13 +290,13 @@ impl ClusterSandbox { pub async fn wait_for_splits( &self, index_id: &str, - split_states: Option>, + split_states_filter: Option>, required_splits_num: usize, ) -> anyhow::Result<()> { wait_until_predicate( || { let splits_query_params = ListSplitsQueryParams { - split_states: split_states.clone(), + split_states: split_states_filter.clone(), ..Default::default() }; async move { @@ -322,7 +326,7 @@ impl ClusterSandbox { } }, Duration::from_secs(10), - Duration::from_millis(100), + Duration::from_millis(500), ) .await?; Ok(()) diff --git a/quickwit/quickwit-integration-tests/src/tests/index_tests.rs b/quickwit/quickwit-integration-tests/src/tests/index_tests.rs index e887cb82281..688114cdb98 100644 --- a/quickwit/quickwit-integration-tests/src/tests/index_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/index_tests.rs @@ -23,9 +23,11 @@ use std::time::Duration; use bytes::Bytes; use quickwit_common::test_utils::wait_until_predicate; use quickwit_config::service::QuickwitService; +use quickwit_config::ConfigFormat; use quickwit_indexing::actors::INDEXING_DIR_NAME; use quickwit_janitor::actors::DELETE_SERVICE_TASK_DIR_NAME; use quickwit_metastore::SplitState; +use quickwit_rest_client::error::{ApiError, Error}; use quickwit_rest_client::rest_client::CommitType; use quickwit_serve::SearchRequestQueryString; use serde_json::json; @@ -60,11 +62,7 @@ async fn test_restarting_standalone_server() { sandbox .indexer_rest_client .indexes() - .create( - index_config.clone(), - quickwit_config::ConfigFormat::Yaml, - false, - ) + .create(index_config.clone(), ConfigFormat::Yaml, false) .await .unwrap(); @@ -102,7 +100,7 @@ async fn test_restarting_standalone_server() { sandbox .indexer_rest_client .indexes() - .create(index_config, quickwit_config::ConfigFormat::Yaml, false) + .create(index_config, ConfigFormat::Yaml, false) .await .unwrap(); @@ -215,36 +213,124 @@ async fn test_restarting_standalone_server() { sandbox.shutdown().await.unwrap(); } +const TEST_INDEX_CONFIG: &str = r#" + version: 0.6 + index_id: test_index + doc_mapping: + field_mappings: + - name: body + type: text + indexing_settings: + commit_timeout_secs: 1 + merge_policy: + type: stable_log + merge_factor: 4 + max_merge_factor: 4 +"#; + +#[tokio::test] +async fn test_ingest_v2_index_not_found() { + // This tests checks what happens when we try to ingest into a non-existing index. + quickwit_common::setup_logging_for_tests(); + let nodes_services = &[ + HashSet::from_iter([QuickwitService::Indexer, QuickwitService::Janitor]), + HashSet::from_iter([QuickwitService::Indexer, QuickwitService::Janitor]), + HashSet::from_iter([ + QuickwitService::ControlPlane, + QuickwitService::Metastore, + QuickwitService::Searcher, + ]), + ]; + let mut sandbox = ClusterSandbox::start_cluster_nodes(&nodes_services[..]) + .await + .unwrap(); + sandbox.enable_ingest_v2(); + sandbox.wait_for_cluster_num_ready_nodes(3).await.unwrap(); + let missing_index_err: Error = sandbox + .indexer_rest_client + .ingest( + "missing_index", + ingest_json!({"body": "doc1"}), + None, + None, + CommitType::WaitFor, + ) + .await + .unwrap_err(); + let Error::Api(ApiError { message, code }) = missing_index_err else { + panic!("Expected an API error."); + }; + assert_eq!(code, 404u16); + let error_message = message.unwrap(); + assert_eq!(error_message, "index `missing_index` not found"); + sandbox.shutdown().await.unwrap(); +} + +#[tokio::test] +async fn test_ingest_v2_happy_path() { + // This tests checks our happy path for ingesting one doc. + quickwit_common::setup_logging_for_tests(); + let nodes_services = &[ + HashSet::from_iter([QuickwitService::Indexer, QuickwitService::Janitor]), + HashSet::from_iter([QuickwitService::Indexer, QuickwitService::Janitor]), + HashSet::from_iter([ + QuickwitService::ControlPlane, + QuickwitService::Metastore, + QuickwitService::Searcher, + ]), + ]; + let mut sandbox = ClusterSandbox::start_cluster_nodes(&nodes_services[..]) + .await + .unwrap(); + sandbox.enable_ingest_v2(); + sandbox.wait_for_cluster_num_ready_nodes(3).await.unwrap(); + sandbox + .indexer_rest_client + .indexes() + .create(TEST_INDEX_CONFIG.into(), ConfigFormat::Yaml, false) + .await + .unwrap(); + sandbox + .indexer_rest_client + .sources("test_index") + .toggle("_ingest-source", true) + .await + .unwrap(); + sandbox + .indexer_rest_client + .ingest( + "test_index", + ingest_json!({"body": "doc1"}), + None, + None, + CommitType::WaitFor, + ) + .await + .unwrap(); + let search_req = SearchRequestQueryString { + query: "*".to_string(), + ..Default::default() + }; + let search_result = sandbox + .indexer_rest_client + .search("test_index", search_req) + .await + .unwrap(); + assert_eq!(search_result.num_hits, 1); + sandbox.shutdown().await.unwrap(); +} + #[tokio::test] async fn test_commit_modes() { quickwit_common::setup_logging_for_tests(); let sandbox = ClusterSandbox::start_standalone_node().await.unwrap(); - let index_id = "test_commit_modes_index"; + let index_id = "test_index"; // Create index sandbox .indexer_rest_client .indexes() - .create( - r#" - version: 0.6 - index_id: test_commit_modes_index - doc_mapping: - field_mappings: - - name: body - type: text - indexing_settings: - commit_timeout_secs: 1 - merge_policy: - type: stable_log - merge_factor: 4 - max_merge_factor: 4 - - "# - .into(), - quickwit_config::ConfigFormat::Yaml, - false, - ) + .create(TEST_INDEX_CONFIG.into(), ConfigFormat::Yaml, false) .await .unwrap(); @@ -391,7 +477,7 @@ async fn test_very_large_index_name() { "#, ) .into(), - quickwit_config::ConfigFormat::Yaml, + ConfigFormat::Yaml, false, ) .await @@ -447,7 +533,7 @@ async fn test_very_large_index_name() { "#, ) .into(), - quickwit_config::ConfigFormat::Yaml, + ConfigFormat::Yaml, false, ) .await @@ -484,7 +570,7 @@ async fn test_shutdown() { commit_timeout_secs: 1 "# .into(), - quickwit_config::ConfigFormat::Yaml, + ConfigFormat::Yaml, false, ) .await diff --git a/quickwit/quickwit-rest-client/src/models.rs b/quickwit/quickwit-rest-client/src/models.rs index 72e2fb1b804..6416a8ec286 100644 --- a/quickwit/quickwit-rest-client/src/models.rs +++ b/quickwit/quickwit-rest-client/src/models.rs @@ -26,6 +26,7 @@ use serde::de::DeserializeOwned; use crate::error::{ApiError, Error, ErrorResponsePayload}; +#[derive(Debug)] pub struct ApiResponse { inner: reqwest::Response, } diff --git a/quickwit/quickwit-rest-client/src/rest_client.rs b/quickwit/quickwit-rest-client/src/rest_client.rs index 52ffb9e00ee..6b333131b93 100644 --- a/quickwit/quickwit-rest-client/src/rest_client.rs +++ b/quickwit/quickwit-rest-client/src/rest_client.rs @@ -198,6 +198,10 @@ pub struct QuickwitClient { } impl QuickwitClient { + pub fn enable_ingest_v2(&mut self) { + self.ingest_v2 = true; + } + pub async fn search( &self, index_id: &str, @@ -288,7 +292,6 @@ impl QuickwitClient { timeout, ) .await?; - if response.status_code() == StatusCode::TOO_MANY_REQUESTS { if let Some(event_fn) = &on_ingest_event { event_fn(IngestEvent::Sleep) diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 4d0f459e65a..2a916f00fa4 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -392,7 +392,7 @@ fn get_scroll_ttl_duration(search_request: &SearchRequest) -> crate::Result MetastoreResult> { - info!("get-indexes-metadatas"); metastore .list_indexes_metadata(ListIndexesMetadataRequest::all()) .await diff --git a/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs b/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs index 5a65411d262..9c0dd0ea912 100644 --- a/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs @@ -24,8 +24,8 @@ use quickwit_ingest::{ IngestServiceClient, IngestServiceError, TailRequest, }; use quickwit_proto::ingest::router::{ - IngestRequestV2, IngestResponseV2, IngestRouterService, IngestRouterServiceClient, - IngestSubrequest, + IngestFailureReason, IngestRequestV2, IngestResponseV2, IngestRouterService, + IngestRouterServiceClient, IngestSubrequest, }; use quickwit_proto::ingest::{DocBatchV2, IngestV2Error}; use quickwit_proto::types::IndexId; @@ -127,7 +127,7 @@ async fn ingest_v2( body: Bytes, ingest_options: IngestOptions, mut ingest_router: IngestRouterServiceClient, -) -> Result { +) -> Result { let mut doc_buffer = BytesMut::new(); let mut doc_lengths = Vec::new(); @@ -135,6 +135,7 @@ async fn ingest_v2( doc_lengths.push(line.len() as u32); doc_buffer.put(line); } + let num_docs = doc_lengths.len(); let doc_batch = DocBatchV2 { doc_buffer: doc_buffer.freeze(), doc_lengths, @@ -149,8 +150,46 @@ async fn ingest_v2( commit_type: ingest_options.commit_type as i32, subrequests: vec![subrequest], }; - let response = ingest_router.ingest(request).await?; - Ok(response) + let response = ingest_router + .ingest(request) + .await + .map_err(|err: IngestV2Error| IngestServiceError::Internal(err.to_string()))?; + convert_ingest_response_v2(response, num_docs) +} + +fn convert_ingest_response_v2( + mut response: IngestResponseV2, + num_docs: usize, +) -> Result { + let num_responses = response.successes.len() + response.failures.len(); + if num_responses != 1 { + return Err(IngestServiceError::Internal(format!( + "Expected a single failure/success, got {}.", + num_responses + ))); + } + if response.successes.pop().is_some() { + return Ok(IngestResponse { + num_docs_for_processing: num_docs as u64, + }); + } + let ingest_failure = response.failures.pop().unwrap(); + Err(match ingest_failure.reason() { + IngestFailureReason::Unspecified => { + IngestServiceError::Internal("Unknown reason".to_string()) + } + IngestFailureReason::IndexNotFound => IngestServiceError::IndexNotFound { + index_id: ingest_failure.index_id, + }, + IngestFailureReason::SourceNotFound => IngestServiceError::Internal(format!( + "Ingest v2 source not found for index {}", + ingest_failure.index_id + )), + IngestFailureReason::Internal => IngestServiceError::Internal("Internal error".to_string()), + IngestFailureReason::NoShardsAvailable => IngestServiceError::Unavailable, + IngestFailureReason::RateLimited => IngestServiceError::RateLimited, + IngestFailureReason::ResourceExhausted => IngestServiceError::Unavailable, + }) } #[utoipa::path(