Skip to content

Commit

Permalink
Added an integration test for ingest v2 (#4071)
Browse files Browse the repository at this point in the history
- Adds an ingest_v2 method to the cluster sandbox.
- Adds two simple integration tests.
- Converts the ingest v2 error into rest api errors.
- This PR suffers from the deadlock described in #4070

Closes #4065
  • Loading branch information
fulmicoton authored Nov 10, 2023
1 parent a43ec93 commit 9e95ec9
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ impl ClusterSandbox {
})
}

pub fn enable_ingest_v2(&mut self) {
self.searcher_rest_client.enable_ingest_v2();
}

// Starts one node that runs all the services.
pub async fn start_standalone_node() -> anyhow::Result<Self> {
let temp_dir = tempfile::tempdir()?;
Expand Down Expand Up @@ -286,13 +290,13 @@ impl ClusterSandbox {
pub async fn wait_for_splits(
&self,
index_id: &str,
split_states: Option<Vec<SplitState>>,
split_states_filter: Option<Vec<SplitState>>,
required_splits_num: usize,
) -> anyhow::Result<()> {
wait_until_predicate(
|| {
let splits_query_params = ListSplitsQueryParams {
split_states: split_states.clone(),
split_states: split_states_filter.clone(),
..Default::default()
};
async move {
Expand Down Expand Up @@ -322,7 +326,7 @@ impl ClusterSandbox {
}
},
Duration::from_secs(10),
Duration::from_millis(100),
Duration::from_millis(500),
)
.await?;
Ok(())
Expand Down
146 changes: 116 additions & 30 deletions quickwit/quickwit-integration-tests/src/tests/index_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ use std::time::Duration;
use bytes::Bytes;
use quickwit_common::test_utils::wait_until_predicate;
use quickwit_config::service::QuickwitService;
use quickwit_config::ConfigFormat;
use quickwit_indexing::actors::INDEXING_DIR_NAME;
use quickwit_janitor::actors::DELETE_SERVICE_TASK_DIR_NAME;
use quickwit_metastore::SplitState;
use quickwit_rest_client::error::{ApiError, Error};
use quickwit_rest_client::rest_client::CommitType;
use quickwit_serve::SearchRequestQueryString;
use serde_json::json;
Expand Down Expand Up @@ -60,11 +62,7 @@ async fn test_restarting_standalone_server() {
sandbox
.indexer_rest_client
.indexes()
.create(
index_config.clone(),
quickwit_config::ConfigFormat::Yaml,
false,
)
.create(index_config.clone(), ConfigFormat::Yaml, false)
.await
.unwrap();

Expand Down Expand Up @@ -102,7 +100,7 @@ async fn test_restarting_standalone_server() {
sandbox
.indexer_rest_client
.indexes()
.create(index_config, quickwit_config::ConfigFormat::Yaml, false)
.create(index_config, ConfigFormat::Yaml, false)
.await
.unwrap();

Expand Down Expand Up @@ -215,36 +213,124 @@ async fn test_restarting_standalone_server() {
sandbox.shutdown().await.unwrap();
}

const TEST_INDEX_CONFIG: &str = r#"
version: 0.6
index_id: test_index
doc_mapping:
field_mappings:
- name: body
type: text
indexing_settings:
commit_timeout_secs: 1
merge_policy:
type: stable_log
merge_factor: 4
max_merge_factor: 4
"#;

#[tokio::test]
async fn test_ingest_v2_index_not_found() {
// This tests checks what happens when we try to ingest into a non-existing index.
quickwit_common::setup_logging_for_tests();
let nodes_services = &[
HashSet::from_iter([QuickwitService::Indexer, QuickwitService::Janitor]),
HashSet::from_iter([QuickwitService::Indexer, QuickwitService::Janitor]),
HashSet::from_iter([
QuickwitService::ControlPlane,
QuickwitService::Metastore,
QuickwitService::Searcher,
]),
];
let mut sandbox = ClusterSandbox::start_cluster_nodes(&nodes_services[..])
.await
.unwrap();
sandbox.enable_ingest_v2();
sandbox.wait_for_cluster_num_ready_nodes(3).await.unwrap();
let missing_index_err: Error = sandbox
.indexer_rest_client
.ingest(
"missing_index",
ingest_json!({"body": "doc1"}),
None,
None,
CommitType::WaitFor,
)
.await
.unwrap_err();
let Error::Api(ApiError { message, code }) = missing_index_err else {
panic!("Expected an API error.");
};
assert_eq!(code, 404u16);
let error_message = message.unwrap();
assert_eq!(error_message, "index `missing_index` not found");
sandbox.shutdown().await.unwrap();
}

#[tokio::test]
async fn test_ingest_v2_happy_path() {
// This tests checks our happy path for ingesting one doc.
quickwit_common::setup_logging_for_tests();
let nodes_services = &[
HashSet::from_iter([QuickwitService::Indexer, QuickwitService::Janitor]),
HashSet::from_iter([QuickwitService::Indexer, QuickwitService::Janitor]),
HashSet::from_iter([
QuickwitService::ControlPlane,
QuickwitService::Metastore,
QuickwitService::Searcher,
]),
];
let mut sandbox = ClusterSandbox::start_cluster_nodes(&nodes_services[..])
.await
.unwrap();
sandbox.enable_ingest_v2();
sandbox.wait_for_cluster_num_ready_nodes(3).await.unwrap();
sandbox
.indexer_rest_client
.indexes()
.create(TEST_INDEX_CONFIG.into(), ConfigFormat::Yaml, false)
.await
.unwrap();
sandbox
.indexer_rest_client
.sources("test_index")
.toggle("_ingest-source", true)
.await
.unwrap();
sandbox
.indexer_rest_client
.ingest(
"test_index",
ingest_json!({"body": "doc1"}),
None,
None,
CommitType::WaitFor,
)
.await
.unwrap();
let search_req = SearchRequestQueryString {
query: "*".to_string(),
..Default::default()
};
let search_result = sandbox
.indexer_rest_client
.search("test_index", search_req)
.await
.unwrap();
assert_eq!(search_result.num_hits, 1);
sandbox.shutdown().await.unwrap();
}

#[tokio::test]
async fn test_commit_modes() {
quickwit_common::setup_logging_for_tests();
let sandbox = ClusterSandbox::start_standalone_node().await.unwrap();
let index_id = "test_commit_modes_index";
let index_id = "test_index";

// Create index
sandbox
.indexer_rest_client
.indexes()
.create(
r#"
version: 0.6
index_id: test_commit_modes_index
doc_mapping:
field_mappings:
- name: body
type: text
indexing_settings:
commit_timeout_secs: 1
merge_policy:
type: stable_log
merge_factor: 4
max_merge_factor: 4
"#
.into(),
quickwit_config::ConfigFormat::Yaml,
false,
)
.create(TEST_INDEX_CONFIG.into(), ConfigFormat::Yaml, false)
.await
.unwrap();

Expand Down Expand Up @@ -391,7 +477,7 @@ async fn test_very_large_index_name() {
"#,
)
.into(),
quickwit_config::ConfigFormat::Yaml,
ConfigFormat::Yaml,
false,
)
.await
Expand Down Expand Up @@ -447,7 +533,7 @@ async fn test_very_large_index_name() {
"#,
)
.into(),
quickwit_config::ConfigFormat::Yaml,
ConfigFormat::Yaml,
false,
)
.await
Expand Down Expand Up @@ -484,7 +570,7 @@ async fn test_shutdown() {
commit_timeout_secs: 1
"#
.into(),
quickwit_config::ConfigFormat::Yaml,
ConfigFormat::Yaml,
false,
)
.await
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-rest-client/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use serde::de::DeserializeOwned;

use crate::error::{ApiError, Error, ErrorResponsePayload};

#[derive(Debug)]
pub struct ApiResponse {
inner: reqwest::Response,
}
Expand Down
5 changes: 4 additions & 1 deletion quickwit/quickwit-rest-client/src/rest_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ pub struct QuickwitClient {
}

impl QuickwitClient {
pub fn enable_ingest_v2(&mut self) {
self.ingest_v2 = true;
}

pub async fn search(
&self,
index_id: &str,
Expand Down Expand Up @@ -288,7 +292,6 @@ impl QuickwitClient {
timeout,
)
.await?;

if response.status_code() == StatusCode::TOO_MANY_REQUESTS {
if let Some(event_fn) = &on_ingest_event {
event_fn(IngestEvent::Sleep)
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ fn get_scroll_ttl_duration(search_request: &SearchRequest) -> crate::Result<Opti
Ok(Some(scroll_ttl))
}

#[instrument(skip_all)]
#[instrument(level = "debug", skip_all)]
async fn search_partial_hits_phase_with_scroll(
searcher_context: &SearcherContext,
indexes_metas_for_leaf_search: &IndexesMetasForLeafSearch,
Expand Down Expand Up @@ -457,7 +457,7 @@ async fn search_partial_hits_phase_with_scroll(
}
}

#[instrument(skip_all)]
#[instrument(level = "debug", skip_all)]
pub(crate) async fn search_partial_hits_phase(
searcher_context: &SearcherContext,
indexes_metas_for_leaf_search: &IndexesMetasForLeafSearch,
Expand Down
1 change: 0 additions & 1 deletion quickwit/quickwit-serve/src/index_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,6 @@ fn mark_splits_for_deletion_handler(
async fn get_indexes_metadatas(
mut metastore: MetastoreServiceClient,
) -> MetastoreResult<Vec<IndexMetadata>> {
info!("get-indexes-metadatas");
metastore
.list_indexes_metadata(ListIndexesMetadataRequest::all())
.await
Expand Down
49 changes: 44 additions & 5 deletions quickwit/quickwit-serve/src/ingest_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use quickwit_ingest::{
IngestServiceClient, IngestServiceError, TailRequest,
};
use quickwit_proto::ingest::router::{
IngestRequestV2, IngestResponseV2, IngestRouterService, IngestRouterServiceClient,
IngestSubrequest,
IngestFailureReason, IngestRequestV2, IngestResponseV2, IngestRouterService,
IngestRouterServiceClient, IngestSubrequest,
};
use quickwit_proto::ingest::{DocBatchV2, IngestV2Error};
use quickwit_proto::types::IndexId;
Expand Down Expand Up @@ -127,14 +127,15 @@ async fn ingest_v2(
body: Bytes,
ingest_options: IngestOptions,
mut ingest_router: IngestRouterServiceClient,
) -> Result<IngestResponseV2, IngestV2Error> {
) -> Result<IngestResponse, IngestServiceError> {
let mut doc_buffer = BytesMut::new();
let mut doc_lengths = Vec::new();

for line in lines(&body) {
doc_lengths.push(line.len() as u32);
doc_buffer.put(line);
}
let num_docs = doc_lengths.len();
let doc_batch = DocBatchV2 {
doc_buffer: doc_buffer.freeze(),
doc_lengths,
Expand All @@ -149,8 +150,46 @@ async fn ingest_v2(
commit_type: ingest_options.commit_type as i32,
subrequests: vec![subrequest],
};
let response = ingest_router.ingest(request).await?;
Ok(response)
let response = ingest_router
.ingest(request)
.await
.map_err(|err: IngestV2Error| IngestServiceError::Internal(err.to_string()))?;
convert_ingest_response_v2(response, num_docs)
}

fn convert_ingest_response_v2(
mut response: IngestResponseV2,
num_docs: usize,
) -> Result<IngestResponse, IngestServiceError> {
let num_responses = response.successes.len() + response.failures.len();
if num_responses != 1 {
return Err(IngestServiceError::Internal(format!(
"Expected a single failure/success, got {}.",
num_responses
)));
}
if response.successes.pop().is_some() {
return Ok(IngestResponse {
num_docs_for_processing: num_docs as u64,
});
}
let ingest_failure = response.failures.pop().unwrap();
Err(match ingest_failure.reason() {
IngestFailureReason::Unspecified => {
IngestServiceError::Internal("Unknown reason".to_string())
}
IngestFailureReason::IndexNotFound => IngestServiceError::IndexNotFound {
index_id: ingest_failure.index_id,
},
IngestFailureReason::SourceNotFound => IngestServiceError::Internal(format!(
"Ingest v2 source not found for index {}",
ingest_failure.index_id
)),
IngestFailureReason::Internal => IngestServiceError::Internal("Internal error".to_string()),
IngestFailureReason::NoShardsAvailable => IngestServiceError::Unavailable,
IngestFailureReason::RateLimited => IngestServiceError::RateLimited,
IngestFailureReason::ResourceExhausted => IngestServiceError::Unavailable,
})
}

#[utoipa::path(
Expand Down

0 comments on commit 9e95ec9

Please sign in to comment.