Skip to content

Commit

Permalink
Added an integration test for ingest v2
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Nov 2, 2023
1 parent 3cb75b9 commit 5a30c48
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -286,13 +286,13 @@ impl ClusterSandbox {
pub async fn wait_for_splits(
&self,
index_id: &str,
split_states: Option<Vec<SplitState>>,
split_states_filter: Option<Vec<SplitState>>,
required_splits_num: usize,
) -> anyhow::Result<()> {
wait_until_predicate(
|| {
let splits_query_params = ListSplitsQueryParams {
split_states: split_states.clone(),
split_states: split_states_filter.clone(),
..Default::default()
};
async move {
Expand Down Expand Up @@ -322,7 +322,7 @@ impl ClusterSandbox {
}
},
Duration::from_secs(10),
Duration::from_millis(100),
Duration::from_millis(500),
)
.await?;
Ok(())
Expand Down
149 changes: 119 additions & 30 deletions quickwit/quickwit-integration-tests/src/tests/index_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ use std::time::Duration;
use bytes::Bytes;
use quickwit_common::test_utils::wait_until_predicate;
use quickwit_config::service::QuickwitService;
use quickwit_config::ConfigFormat;
use quickwit_indexing::actors::INDEXING_DIR_NAME;
use quickwit_janitor::actors::DELETE_SERVICE_TASK_DIR_NAME;
use quickwit_metastore::SplitState;
use quickwit_rest_client::error::{ApiError, Error};
use quickwit_rest_client::rest_client::CommitType;
use quickwit_serve::SearchRequestQueryString;
use serde_json::json;
Expand Down Expand Up @@ -60,11 +62,7 @@ async fn test_restarting_standalone_server() {
sandbox
.indexer_rest_client
.indexes()
.create(
index_config.clone(),
quickwit_config::ConfigFormat::Yaml,
false,
)
.create(index_config.clone(), ConfigFormat::Yaml, false)
.await
.unwrap();

Expand Down Expand Up @@ -102,7 +100,7 @@ async fn test_restarting_standalone_server() {
sandbox
.indexer_rest_client
.indexes()
.create(index_config, quickwit_config::ConfigFormat::Yaml, false)
.create(index_config, ConfigFormat::Yaml, false)
.await
.unwrap();

Expand Down Expand Up @@ -215,36 +213,127 @@ async fn test_restarting_standalone_server() {
sandbox.shutdown().await.unwrap();
}

const TEST_INDEX_CONFIG: &'static str = r#"
version: 0.6
index_id: test_index
doc_mapping:
field_mappings:
- name: body
type: text
indexing_settings:
commit_timeout_secs: 1
merge_policy:
type: stable_log
merge_factor: 4
max_merge_factor: 4
"#;

#[tokio::test]
async fn test_ingest_v2_index_not_found() {
// This tests checks what happens when we try to ingest into a non-existing index.
quickwit_common::setup_logging_for_tests();
let nodes_services = &[
HashSet::from_iter([QuickwitService::Indexer, QuickwitService::Janitor]),
HashSet::from_iter([QuickwitService::Indexer, QuickwitService::Janitor]),
HashSet::from_iter([
QuickwitService::ControlPlane,
QuickwitService::Metastore,
QuickwitService::Searcher,
]),
];
let sandbox = ClusterSandbox::start_cluster_nodes(&nodes_services[..])
.await
.unwrap();
sandbox.wait_for_cluster_num_ready_nodes(3).await.unwrap();
let missing_index_err: Error = sandbox
.indexer_rest_client
.ingest_v2(
"missing_index",
ingest_json!({"body": "doc1"}),
None,
None,
CommitType::WaitFor,
)
.await
.unwrap_err();
let Error::Api(ApiError { message, code }) = missing_index_err else {
panic!("Expected an API error.");
};
assert_eq!(code, 404u16);
let error_message = message.unwrap();
assert_eq!(error_message, "index `missing_index` not found");
sandbox.shutdown().await.unwrap();
}

#[tokio::test]
async fn test_ingest_v2_happy_path() {
// This tests checks our happy path for ingesting one doc.
quickwit_common::setup_logging_for_tests();
let nodes_services = &[
HashSet::from_iter([QuickwitService::Indexer, QuickwitService::Janitor]),
HashSet::from_iter([QuickwitService::Indexer, QuickwitService::Janitor]),
HashSet::from_iter([
QuickwitService::ControlPlane,
QuickwitService::Metastore,
QuickwitService::Searcher,
]),
];
let sandbox = ClusterSandbox::start_cluster_nodes(&nodes_services[..])
.await
.unwrap();
sandbox.wait_for_cluster_num_ready_nodes(3).await.unwrap();
sandbox
.indexer_rest_client
.indexes()
.create(TEST_INDEX_CONFIG.into(), ConfigFormat::Yaml, false)
.await
.unwrap();
sandbox
.indexer_rest_client
.sources("test_index")
.toggle("_ingest-source", true)
.await
.unwrap();
sandbox
.indexer_rest_client
.ingest_v2(
"test_index",
ingest_json!({"body": "doc1"}),
None,
None,
CommitType::WaitFor,
)
.await
.unwrap();
let search_req = SearchRequestQueryString {
query: "*".to_string(),
..Default::default()
};
// TODO Ideally wait for should wait before returning.
sandbox
.wait_for_splits("test_index", None, 1)
.await
.unwrap();
let search_result = sandbox
.indexer_rest_client
.search("test_index", search_req)
.await
.unwrap();
assert_eq!(search_result.num_hits, 1);
sandbox.shutdown().await.unwrap();
}

#[tokio::test]
async fn test_commit_modes() {
quickwit_common::setup_logging_for_tests();
let sandbox = ClusterSandbox::start_standalone_node().await.unwrap();
let index_id = "test_commit_modes_index";
let index_id = "test_index";

// Create index
sandbox
.indexer_rest_client
.indexes()
.create(
r#"
version: 0.6
index_id: test_commit_modes_index
doc_mapping:
field_mappings:
- name: body
type: text
indexing_settings:
commit_timeout_secs: 1
merge_policy:
type: stable_log
merge_factor: 4
max_merge_factor: 4
"#
.into(),
quickwit_config::ConfigFormat::Yaml,
false,
)
.create(TEST_INDEX_CONFIG.into(), ConfigFormat::Yaml, false)
.await
.unwrap();

Expand Down Expand Up @@ -391,7 +480,7 @@ async fn test_very_large_index_name() {
"#,
)
.into(),
quickwit_config::ConfigFormat::Yaml,
ConfigFormat::Yaml,
false,
)
.await
Expand Down Expand Up @@ -447,7 +536,7 @@ async fn test_very_large_index_name() {
"#,
)
.into(),
quickwit_config::ConfigFormat::Yaml,
ConfigFormat::Yaml,
false,
)
.await
Expand Down Expand Up @@ -484,7 +573,7 @@ async fn test_shutdown() {
commit_timeout_secs: 1
"#
.into(),
quickwit_config::ConfigFormat::Yaml,
ConfigFormat::Yaml,
false,
)
.await
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-rest-client/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use serde::de::DeserializeOwned;

use crate::error::{ApiError, Error, ErrorResponsePayload};

#[derive(Debug)]
pub struct ApiResponse {
inner: reqwest::Response,
}
Expand Down
46 changes: 44 additions & 2 deletions quickwit/quickwit-rest-client/src/rest_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,50 @@ impl QuickwitClient {
on_ingest_event: Option<&(dyn Fn(IngestEvent) + Sync)>,
last_block_commit: CommitType,
) -> Result<(), Error> {
let ingest_path = format!("{index_id}/ingest");
self.ingest_aux(
index_id,
ingest_source,
batch_size_limit_opt,
on_ingest_event,
last_block_commit,
false,
)
.await
}

pub async fn ingest_v2(
&self,
index_id: &str,
ingest_source: IngestSource,
batch_size_limit_opt: Option<usize>,
on_ingest_event: Option<&(dyn Fn(IngestEvent) + Sync)>,
last_block_commit: CommitType,
) -> Result<(), Error> {
self.ingest_aux(
index_id,
ingest_source,
batch_size_limit_opt,
on_ingest_event,
last_block_commit,
true,
)
.await
}

pub async fn ingest_aux(
&self,
index_id: &str,
ingest_source: IngestSource,
batch_size_limit_opt: Option<usize>,
on_ingest_event: Option<&(dyn Fn(IngestEvent) + Sync)>,
last_block_commit: CommitType,
ingest_v2: bool,
) -> Result<(), Error> {
let ingest_path = if ingest_v2 {
format!("{index_id}/ingest-v2")
} else {
format!("{index_id}/ingest")
};
let batch_size_limit = batch_size_limit_opt.unwrap_or(INGEST_CONTENT_LENGTH_LIMIT);
let mut batch_reader = match ingest_source {
IngestSource::File(filepath) => {
Expand Down Expand Up @@ -286,7 +329,6 @@ impl QuickwitClient {
timeout,
)
.await?;

if response.status_code() == StatusCode::TOO_MANY_REQUESTS {
if let Some(event_fn) = &on_ingest_event {
event_fn(IngestEvent::Sleep)
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ fn get_scroll_ttl_duration(search_request: &SearchRequest) -> crate::Result<Opti
Ok(Some(scroll_ttl))
}

#[instrument(skip_all)]
#[instrument(level = "debug", skip_all)]
async fn search_partial_hits_phase_with_scroll(
searcher_context: &SearcherContext,
indexes_metas_for_leaf_search: &IndexesMetasForLeafSearch,
Expand Down Expand Up @@ -457,7 +457,7 @@ async fn search_partial_hits_phase_with_scroll(
}
}

#[instrument(skip_all)]
#[instrument(level = "debug", skip_all)]
pub(crate) async fn search_partial_hits_phase(
searcher_context: &SearcherContext,
indexes_metas_for_leaf_search: &IndexesMetasForLeafSearch,
Expand Down
1 change: 0 additions & 1 deletion quickwit/quickwit-serve/src/index_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,6 @@ fn mark_splits_for_deletion_handler(
async fn get_indexes_metadatas(
mut metastore: MetastoreServiceClient,
) -> MetastoreResult<Vec<IndexMetadata>> {
info!("get-indexes-metadatas");
metastore
.list_indexes_metadata(ListIndexesMetadataRequest::all())
.await
Expand Down

0 comments on commit 5a30c48

Please sign in to comment.