Skip to content

Commit

Permalink
Fix and extra tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Sep 5, 2024
1 parent c5c3caf commit b04f0de
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 37 deletions.
3 changes: 3 additions & 0 deletions quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,9 @@ impl Source for IngestSource {
num_millis=%now.elapsed().as_millis(),
"Sending doc batch to indexer."
);
if !self.fetch_stream.has_active_shard_subscriptions() {
batch_builder.force_commit();
}
let message = batch_builder.build();
ctx.send_message(doc_processor_mailbox, message).await?;
}
Expand Down
11 changes: 10 additions & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,14 @@ impl MultiFetchStream {
self.fetch_message_tx.clone()
}

pub fn has_active_shard_subscriptions(&self) -> bool {
tracing::info!(
tx_count = self.fetch_message_tx.strong_count(),
"has_active_shard_subscriptions"
);
self.fetch_message_tx.strong_count() > 1
}

/// Subscribes to a shard and fails over to the replica if an error occurs.
#[allow(clippy::too_many_arguments)]
pub async fn subscribe(
Expand Down Expand Up @@ -1864,8 +1872,9 @@ pub(super) mod tests {
let client_id = "test-client".to_string();
let ingester_pool = IngesterPool::default();
let retry_params = RetryParams::for_test();
let _multi_fetch_stream =
let multi_fetch_stream =
MultiFetchStream::new(self_node_id, client_id, ingester_pool, retry_params);
assert!(!multi_fetch_stream.has_active_shard_subscriptions())
// TODO: Backport from original branch.
}
}
80 changes: 44 additions & 36 deletions quickwit/quickwit-integration-tests/src/tests/ingest_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashSet;
use std::time::Duration;

use futures_util::FutureExt;
Expand Down Expand Up @@ -602,7 +603,6 @@ async fn test_shutdown_single_node() {

sandbox.enable_ingest_v2();

// Create index
sandbox
.indexer_rest_client
.indexes()
Expand All @@ -625,41 +625,29 @@ async fn test_shutdown_single_node() {
.await
.unwrap();

// Ensure that the index is ready to accept records.
ingest_with_retry(
&sandbox.indexer_rest_client,
index_id,
ingest_json!({"body": "one"}),
CommitType::Force,
)
.await
.unwrap();

// Test force commit
sandbox
.indexer_rest_client
.ingest(
index_id,
ingest_json!({"body": "two"}),
ingest_json!({"body": "commit me before shutdown"}),
None,
None,
CommitType::Force,
CommitType::Auto,
)
.await
.unwrap();

// assert that we don't wait the commit timeout (60s)
tokio::time::timeout(std::time::Duration::from_secs(10), sandbox.shutdown())
.await
.unwrap()
.unwrap();
}

/// When the control plane is on a different node, it might be shutdown
/// before the ingest pipeline is scheduled on the indexer.
#[tokio::test]
async fn test_shutdown_control_plane_early_shutdown() {
async fn test_shutdown_indexer_first() {
initialize_tests();
let sandbox = ClusterSandboxBuilder::default()
let mut sandbox = ClusterSandboxBuilder::default()
.add_node([QuickwitService::Indexer])
.add_node([
QuickwitService::ControlPlane,
Expand All @@ -671,10 +659,8 @@ async fn test_shutdown_control_plane_early_shutdown() {
.await;
let index_id = "test_shutdown_separate_indexer";

// TODO: make this test work with ingest v2 (#5068)
// sandbox.enable_ingest_v2();
sandbox.enable_ingest_v2();

// Create index
sandbox
.indexer_rest_client
.indexes()
Expand All @@ -688,7 +674,7 @@ async fn test_shutdown_control_plane_early_shutdown() {
- name: body
type: text
indexing_settings:
commit_timeout_secs: 1
commit_timeout_secs: 60
"#
),
ConfigFormat::Yaml,
Expand All @@ -697,28 +683,40 @@ async fn test_shutdown_control_plane_early_shutdown() {
.await
.unwrap();

// Ensure that the index is ready to accept records.
ingest_with_retry(
&sandbox.indexer_rest_client,
index_id,
ingest_json!({"body": "one"}),
CommitType::Force,
sandbox
.indexer_rest_client
.ingest(
index_id,
ingest_json!({"body": "commit me before shutdown"}),
None,
None,
CommitType::Auto,
)
.await
.unwrap();

// assert that we don't wait the commit timeout (60s)
tokio::time::timeout(
std::time::Duration::from_secs(10),
sandbox.shutdown_services(&HashSet::from_iter([QuickwitService::Indexer])),
)
.await
.unwrap()
.unwrap();

// still, the doc should be committed during the graceful shutdown
sandbox.assert_hit_count(index_id, "body:before", 1).await;

tokio::time::timeout(std::time::Duration::from_secs(10), sandbox.shutdown())
.await
.unwrap()
.unwrap();
}

/// When the control plane/metastore are shutdown before the indexer, the
/// indexer shutdown should not hang indefinitely
#[tokio::test]
async fn test_shutdown_separate_indexer() {
async fn test_shutdown_metastore_first() {
initialize_tests();
let sandbox = ClusterSandboxBuilder::default()
let mut sandbox = ClusterSandboxBuilder::default()
.add_node([QuickwitService::Indexer])
.add_node([
QuickwitService::ControlPlane,
Expand All @@ -731,6 +729,8 @@ async fn test_shutdown_separate_indexer() {
let index_id = "test_shutdown_separate_indexer";

// TODO: make this test work with ingest v2 (#5068)
// When the control plane/metastore are shutdown before the indexer, the
// indexer shutdown should not hang indefinitely
// sandbox.enable_ingest_v2();

// Create index
Expand Down Expand Up @@ -766,10 +766,18 @@ async fn test_shutdown_separate_indexer() {
.await
.unwrap();

sandbox
.wait_for_splits(index_id, Some(vec![SplitState::Published]), 1)
.await
.unwrap();
tokio::time::timeout(
std::time::Duration::from_secs(10),
sandbox.shutdown_services(&HashSet::from_iter([
QuickwitService::ControlPlane,
QuickwitService::Searcher,
QuickwitService::Metastore,
QuickwitService::Janitor,
])),
)
.await
.unwrap()
.unwrap();

tokio::time::timeout(std::time::Duration::from_secs(10), sandbox.shutdown())
.await
Expand Down

0 comments on commit b04f0de

Please sign in to comment.