diff --git a/quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs b/quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs index bd00840f657..0f92e27ffa3 100644 --- a/quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs +++ b/quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs @@ -78,7 +78,7 @@ pub struct QueueCoordinator { shared_state: QueueSharedState, local_state: QueueLocalState, publish_token: String, - visible_settings: VisibilitySettings, + visibility_settings: VisibilitySettings, } impl fmt::Debug for QueueCoordinator { @@ -102,6 +102,9 @@ impl QueueCoordinator { metastore: source_runtime.metastore, source_id: source_runtime.pipeline_id.source_id.clone(), index_uid: source_runtime.pipeline_id.index_uid.clone(), + reacquire_grace_period: Duration::from_secs( + 2 * source_runtime.indexing_setting.commit_timeout_secs as u64, + ), }, local_state: QueueLocalState::default(), pipeline_id: source_runtime.pipeline_id, @@ -113,7 +116,7 @@ impl QueueCoordinator { message_type, publish_lock: PublishLock::default(), publish_token: Ulid::new().to_string(), - visible_settings: VisibilitySettings::from_commit_timeout( + visibility_settings: VisibilitySettings::from_commit_timeout( source_runtime.indexing_setting.commit_timeout_secs, ), } @@ -157,7 +160,7 @@ impl QueueCoordinator { async fn poll_messages(&mut self, ctx: &SourceContext) -> Result<(), ActorExitStatus> { let raw_messages = self .queue_receiver - .receive(1, self.visible_settings.deadline_for_receive) + .receive(1, self.visibility_settings.deadline_for_receive) .await?; let mut format_errors = Vec::new(); @@ -215,7 +218,7 @@ impl QueueCoordinator { self.queue.clone(), message.metadata.ack_id.clone(), message.metadata.initial_deadline, - self.visible_settings.clone(), + self.visibility_settings.clone(), ), content: message, position, @@ -254,7 +257,7 @@ impl QueueCoordinator { .await?; if in_progress_ref.batch_reader.is_eof() { self.local_state - .drop_currently_read(self.visible_settings.deadline_for_last_extension) + .drop_currently_read(self.visibility_settings.deadline_for_last_extension) .await?; self.observable_state.num_messages_processed += 1; } @@ -319,7 +322,7 @@ mod tests { use crate::source::doc_file_reader::file_test_helpers::{generate_dummy_doc_file, DUMMY_DOC}; use crate::source::queue_sources::memory_queue::MemoryQueueForTests; use crate::source::queue_sources::message::PreProcessedPayload; - use crate::source::queue_sources::shared_state::shared_state_for_tests::shared_state_for_tests; + use crate::source::queue_sources::shared_state::shared_state_for_tests::init_state; use crate::source::{SourceActor, BATCH_NUM_BYTES_LIMIT}; fn setup_coordinator( @@ -347,7 +350,7 @@ mod tests { source_type: SourceType::Unspecified, storage_resolver: StorageResolver::for_test(), publish_token: Ulid::new().to_string(), - visible_settings: VisibilitySettings::from_commit_timeout(5), + visibility_settings: VisibilitySettings::from_commit_timeout(5), } } @@ -401,7 +404,7 @@ mod tests { #[tokio::test] async fn test_process_empty_queue() { let queue = Arc::new(MemoryQueueForTests::new()); - let shared_state = shared_state_for_tests("test-index", Default::default()); + let shared_state = init_state("test-index", Default::default()); let mut coordinator = setup_coordinator(queue.clone(), shared_state); let batches = process_messages(&mut coordinator, queue, &[]).await; assert_eq!(batches.len(), 0); @@ -410,7 +413,7 @@ mod tests { #[tokio::test] async fn test_process_one_small_message() { let queue = Arc::new(MemoryQueueForTests::new()); - let shared_state = shared_state_for_tests("test-index", Default::default()); + let shared_state = init_state("test-index", Default::default()); let mut coordinator = setup_coordinator(queue.clone(), shared_state.clone()); let (dummy_doc_file, _) = generate_dummy_doc_file(false, 10).await; let test_uri = Uri::from_str(dummy_doc_file.path().to_str().unwrap()).unwrap(); @@ -424,7 +427,7 @@ mod tests { #[tokio::test] async fn test_process_one_big_message() { let queue = Arc::new(MemoryQueueForTests::new()); - let shared_state = shared_state_for_tests("test-index", Default::default()); + let shared_state = init_state("test-index", Default::default()); let mut coordinator = setup_coordinator(queue.clone(), shared_state); let lines = BATCH_NUM_BYTES_LIMIT as usize / DUMMY_DOC.len() + 1; let (dummy_doc_file, _) = generate_dummy_doc_file(true, lines).await; @@ -437,7 +440,7 @@ mod tests { #[tokio::test] async fn test_process_two_messages_different_compression() { let queue = Arc::new(MemoryQueueForTests::new()); - let shared_state = shared_state_for_tests("test-index", Default::default()); + let shared_state = init_state("test-index", Default::default()); let mut coordinator = setup_coordinator(queue.clone(), shared_state); let (dummy_doc_file_1, _) = generate_dummy_doc_file(false, 10).await; let test_uri_1 = Uri::from_str(dummy_doc_file_1.path().to_str().unwrap()).unwrap(); @@ -456,7 +459,7 @@ mod tests { #[tokio::test] async fn test_process_local_duplicate_message() { let queue = Arc::new(MemoryQueueForTests::new()); - let shared_state = shared_state_for_tests("test-index", Default::default()); + let shared_state = init_state("test-index", Default::default()); let mut coordinator = setup_coordinator(queue.clone(), shared_state); let (dummy_doc_file, _) = generate_dummy_doc_file(false, 10).await; let test_uri = Uri::from_str(dummy_doc_file.path().to_str().unwrap()).unwrap(); @@ -477,11 +480,15 @@ mod tests { let partition_id = PreProcessedPayload::ObjectUri(test_uri.clone()).partition_id(); let queue = Arc::new(MemoryQueueForTests::new()); - let shared_state = shared_state_for_tests( + let shared_state = init_state( "test-index", &[( partition_id.clone(), - ("existing_token".to_string(), Position::eof(file_size)), + ( + "existing_token".to_string(), + Position::eof(file_size), + false, + ), )], ); let mut coordinator = setup_coordinator(queue.clone(), shared_state.clone()); @@ -492,30 +499,82 @@ mod tests { assert!(coordinator.local_state.is_completed(&partition_id)); } + #[tokio::test] + async fn test_process_existing_messages() { + let (dummy_doc_file_1, _) = generate_dummy_doc_file(false, 10).await; + let test_uri_1 = Uri::from_str(dummy_doc_file_1.path().to_str().unwrap()).unwrap(); + let partition_id_1 = PreProcessedPayload::ObjectUri(test_uri_1.clone()).partition_id(); + + let (dummy_doc_file_2, _) = generate_dummy_doc_file(false, 10).await; + let test_uri_2 = Uri::from_str(dummy_doc_file_2.path().to_str().unwrap()).unwrap(); + let partition_id_2 = PreProcessedPayload::ObjectUri(test_uri_2.clone()).partition_id(); + + let (dummy_doc_file_3, _) = generate_dummy_doc_file(false, 10).await; + let test_uri_3 = Uri::from_str(dummy_doc_file_3.path().to_str().unwrap()).unwrap(); + let partition_id_3 = PreProcessedPayload::ObjectUri(test_uri_3.clone()).partition_id(); + + let queue = Arc::new(MemoryQueueForTests::new()); + let shared_state = init_state( + "test-index", + &[ + ( + partition_id_1.clone(), + ("existing_token_1".to_string(), Position::Beginning, true), + ), + ( + partition_id_2.clone(), + ( + "existing_token_2".to_string(), + Position::offset((DUMMY_DOC.len() + 1) * 2), + true, + ), + ), + ( + partition_id_3.clone(), + ( + "existing_token_3".to_string(), + Position::offset((DUMMY_DOC.len() + 1) * 6), + false, // should not be processed because not stale yet + ), + ), + ], + ); + let mut coordinator = setup_coordinator(queue.clone(), shared_state.clone()); + let batches = process_messages( + &mut coordinator, + queue, + &[ + (&test_uri_1, "ack-id-1"), + (&test_uri_2, "ack-id-2"), + (&test_uri_3, "ack-id-3"), + ], + ) + .await; + assert_eq!(batches.len(), 2); + assert_eq!(batches.iter().map(|b| b.docs.len()).sum::(), 18); + assert!(coordinator.local_state.is_awaiting_commit(&partition_id_1)); + assert!(coordinator.local_state.is_awaiting_commit(&partition_id_2)); + } + #[tokio::test] async fn test_process_multiple_coordinator() { let queue = Arc::new(MemoryQueueForTests::new()); - let shared_state = shared_state_for_tests("test-index", Default::default()); - let mut proc_1 = setup_coordinator(queue.clone(), shared_state.clone()); - let mut proc_2 = setup_coordinator(queue.clone(), shared_state.clone()); + let shared_state = init_state("test-index", Default::default()); + let mut coord_1 = setup_coordinator(queue.clone(), shared_state.clone()); + let mut coord_2 = setup_coordinator(queue.clone(), shared_state.clone()); let (dummy_doc_file, _) = generate_dummy_doc_file(false, 10).await; let test_uri = Uri::from_str(dummy_doc_file.path().to_str().unwrap()).unwrap(); let partition_id = PreProcessedPayload::ObjectUri(test_uri.clone()).partition_id(); - let batches_1 = process_messages(&mut proc_1, queue.clone(), &[(&test_uri, "ack1")]).await; - let batches_2 = process_messages(&mut proc_2, queue, &[(&test_uri, "ack2")]).await; + let batches_1 = process_messages(&mut coord_1, queue.clone(), &[(&test_uri, "ack1")]).await; + let batches_2 = process_messages(&mut coord_2, queue, &[(&test_uri, "ack2")]).await; assert_eq!(batches_1.len(), 1); assert_eq!(batches_1[0].docs.len(), 10); - assert!(proc_1.local_state.is_awaiting_commit(&partition_id)); - // proc_2 doesn't know for sure what is happening with the message - // (proc_1 might have crashed), so it just acquires it and takes over - // processing - // - // TODO: this test should fail once we implement the grace - // period before a partition can be re-acquired - assert_eq!(batches_2.len(), 1); - assert_eq!(batches_2[0].docs.len(), 10); - assert!(proc_2.local_state.is_awaiting_commit(&partition_id)); + assert!(coord_1.local_state.is_awaiting_commit(&partition_id)); + // proc_2 learns from shared state that the message is likely still + // being processed and skips it + assert_eq!(batches_2.len(), 0); + assert!(!coord_2.local_state.is_tracked(&partition_id)); } } diff --git a/quickwit/quickwit-indexing/src/source/queue_sources/shared_state.rs b/quickwit/quickwit-indexing/src/source/queue_sources/shared_state.rs index 04be2806282..e8ae1dae8f3 100644 --- a/quickwit/quickwit-indexing/src/source/queue_sources/shared_state.rs +++ b/quickwit/quickwit-indexing/src/source/queue_sources/shared_state.rs @@ -18,6 +18,7 @@ // along with this program. If not, see . use std::collections::BTreeMap; +use std::time::Duration; use anyhow::{bail, Context}; use quickwit_metastore::checkpoint::PartitionId; @@ -26,6 +27,7 @@ use quickwit_proto::metastore::{ OpenShardsRequest, }; use quickwit_proto::types::{DocMappingUid, IndexUid, Position, ShardId}; +use time::OffsetDateTime; use tracing::info; use super::message::PreProcessedMessage; @@ -35,6 +37,9 @@ pub struct QueueSharedState { pub metastore: MetastoreServiceClient, pub index_uid: IndexUid, pub source_id: String, + /// Duration after which the processing of a shard is considered stale and + /// should be reacquired + pub reacquire_grace_period: Duration, } impl QueueSharedState { @@ -78,11 +83,13 @@ impl QueueSharedState { let shard = sub.open_shard(); let position = shard.publish_position_inclusive.clone().unwrap_or_default(); let is_owned = sub.open_shard().publish_token.as_deref() == Some(publish_token); + let update_datetime = OffsetDateTime::from_unix_timestamp(shard.update_timestamp) + .context("Invalid shard update timestamp")?; + let is_stale = + OffsetDateTime::now_utc() - update_datetime > self.reacquire_grace_period; if position.is_eof() || (is_owned && position.is_beginning()) { shards.push((partition_id, position)); - } else if !is_owned { - // TODO: Add logic to only re-acquire shards that have a token that is not - // the local token when they haven't been updated recently + } else if !is_owned && is_stale { info!(previous_token = shard.publish_token, "shard re-acquired"); re_acquired_shards.push(shard.shard_id().clone()); } else if is_owned && !position.is_beginning() { @@ -146,6 +153,7 @@ pub async fn checkpoint_messages( pub mod shared_state_for_tests { use std::sync::{Arc, Mutex}; + use itertools::Itertools; use quickwit_proto::ingest::{Shard, ShardState}; use quickwit_proto::metastore::{ AcquireShardsResponse, MockMetastoreService, OpenShardSubresponse, OpenShardsResponse, @@ -153,9 +161,14 @@ pub mod shared_state_for_tests { use super::*; + /// Creates a metastore that mocks the behavior of the Shard API on the open + /// and acquire methods using a simplified in-memory state. pub(super) fn mock_metastore( - initial_state: &[(PartitionId, (String, Position))], + // Shards (token, position, update_timestamp) in the initial state + initial_state: &[(PartitionId, (String, Position, i64))], + // Times open_shards is expected to be called (None <=> no expectation) open_shard_times: Option, + // Times acquire_shards is expected to be called (None <=> no expectation) acquire_times: Option, ) -> MetastoreServiceClient { let mut mock_metastore = MockMetastoreService::new(); @@ -172,16 +185,22 @@ pub mod shared_state_for_tests { .into_iter() .map(|sub_req| { let partition_id: PartitionId = sub_req.shard_id().to_string().into(); - let (token, position) = inner_state_ref + let req_token = sub_req.publish_token.unwrap(); + let (token, position, update_timestamp) = inner_state_ref .lock() .unwrap() .get(&partition_id) .cloned() - .unwrap_or((sub_req.publish_token.unwrap(), Position::Beginning)); - inner_state_ref - .lock() - .unwrap() - .insert(partition_id, (token.clone(), position.clone())); + .unwrap_or(( + req_token.clone(), + Position::Beginning, + OffsetDateTime::now_utc().unix_timestamp(), + )); + + inner_state_ref.lock().unwrap().insert( + partition_id, + (token.clone(), position.clone(), update_timestamp), + ); OpenShardSubresponse { subrequest_id: sub_req.subrequest_id, open_shard: Some(Shard { @@ -194,7 +213,7 @@ pub mod shared_state_for_tests { doc_mapping_uid: sub_req.doc_mapping_uid, publish_position_inclusive: Some(position), shard_state: ShardState::Open as i32, - update_timestamp: 1724158996, + update_timestamp, }), } }) @@ -204,58 +223,77 @@ pub mod shared_state_for_tests { if let Some(times) = open_shard_times { open_shards_expectation.times(times); } - let acquire_shards_expectation = mock_metastore - .expect_acquire_shards() - // .times(acquire_times) - .returning(move |request| { - let acquired_shards = request - .shard_ids - .into_iter() - .map(|shard_id| { - let partition_id: PartitionId = shard_id.to_string().into(); - let (existing_token, position) = inner_state - .lock() - .unwrap() - .get(&partition_id) - .cloned() - .expect("we should never try to acquire a shard that doesn't exist"); - inner_state.lock().unwrap().insert( - partition_id, - (request.publish_token.clone(), position.clone()), - ); - assert_ne!(existing_token, request.publish_token); - Shard { - shard_id: Some(shard_id), - source_id: "dummy".to_string(), - publish_token: Some(request.publish_token.clone()), - index_uid: None, - follower_id: None, - leader_id: "dummy".to_string(), - doc_mapping_uid: None, - publish_position_inclusive: Some(position), - shard_state: ShardState::Open as i32, - update_timestamp: 1724158996, - } - }) - .collect(); - Ok(AcquireShardsResponse { acquired_shards }) - }); + let acquire_shards_expectation = + mock_metastore + .expect_acquire_shards() + .returning(move |request| { + let acquired_shards = request + .shard_ids + .into_iter() + .map(|shard_id| { + let partition_id: PartitionId = shard_id.to_string().into(); + let (existing_token, position, update_timestamp) = inner_state + .lock() + .unwrap() + .get(&partition_id) + .cloned() + .expect( + "we should never try to acquire a shard that doesn't exist", + ); + inner_state.lock().unwrap().insert( + partition_id, + ( + request.publish_token.clone(), + position.clone(), + update_timestamp, + ), + ); + assert_ne!(existing_token, request.publish_token); + Shard { + shard_id: Some(shard_id), + source_id: "dummy".to_string(), + publish_token: Some(request.publish_token.clone()), + index_uid: None, + follower_id: None, + leader_id: "dummy".to_string(), + doc_mapping_uid: None, + publish_position_inclusive: Some(position), + shard_state: ShardState::Open as i32, + update_timestamp, + } + }) + .collect(); + Ok(AcquireShardsResponse { acquired_shards }) + }); if let Some(times) = acquire_times { acquire_shards_expectation.times(times); } MetastoreServiceClient::from_mock(mock_metastore) } - pub fn shared_state_for_tests( + pub fn init_state( index_id: &str, - initial_state: &[(PartitionId, (String, Position))], + // Shards (token, position, is_stale) in the initial state + initial_state: &[(PartitionId, (String, Position, bool))], ) -> QueueSharedState { let index_uid = IndexUid::new_with_random_ulid(index_id); - let metastore = mock_metastore(initial_state, None, None); + let metastore_state = initial_state + .iter() + .map(|(pid, (token, pos, is_stale))| { + let update_timestamp = if *is_stale { + OffsetDateTime::now_utc().unix_timestamp() - 100 + } else { + OffsetDateTime::now_utc().unix_timestamp() + }; + (pid.clone(), (token.clone(), pos.clone(), update_timestamp)) + }) + .collect_vec(); + let metastore = mock_metastore(&metastore_state, None, None); QueueSharedState { metastore, index_uid, source_id: "test-queue-src".to_string(), + reacquire_grace_period: Duration::from_secs(10), } } } @@ -292,13 +330,21 @@ mod tests { async fn test_acquire_shards_with_completed() { let index_id = "test-sqs-index"; let index_uid = IndexUid::new_with_random_ulid(index_id); - let init_state = &[("p1".into(), ("token2".to_string(), Position::eof(100usize)))]; + let init_state = &[( + "p1".into(), + ( + "token2".to_string(), + Position::eof(100usize), + OffsetDateTime::now_utc().unix_timestamp(), + ), + )]; let metastore = mock_metastore(init_state, Some(1), Some(0)); let shared_state = QueueSharedState { metastore, index_uid, source_id: "test-sqs-source".to_string(), + reacquire_grace_period: Duration::from_secs(10), }; let aquired = shared_state @@ -310,12 +356,45 @@ mod tests { } #[tokio::test] - async fn test_re_acquire_shards() { + async fn test_re_acquire_shards_within_grace_period() { + let index_id = "test-sqs-index"; + let index_uid = IndexUid::new_with_random_ulid(index_id); + let init_state = &[( + "p1".into(), + ( + "token2".to_string(), + Position::offset(100usize), + OffsetDateTime::now_utc().unix_timestamp(), + ), + )]; + let metastore = mock_metastore(init_state, Some(1), Some(0)); + + let shared_state = QueueSharedState { + metastore, + index_uid, + source_id: "test-sqs-source".to_string(), + reacquire_grace_period: Duration::from_secs(10), + }; + + let acquired = shared_state + .acquire_partitions("token1", vec!["p1".into(), "p2".into()]) + .await + .unwrap(); + assert_eq!(acquired.len(), 1); + assert!(acquired.contains(&("p2".into(), Position::Beginning))); + } + + #[tokio::test] + async fn test_re_acquire_shards_after_grace_period() { let index_id = "test-sqs-index"; let index_uid = IndexUid::new_with_random_ulid(index_id); let init_state = &[( "p1".into(), - ("token2".to_string(), Position::offset(100usize)), + ( + "token2".to_string(), + Position::offset(100usize), + OffsetDateTime::now_utc().unix_timestamp() - 100, + ), )]; let metastore = mock_metastore(init_state, Some(1), Some(1)); @@ -323,14 +402,13 @@ mod tests { metastore, index_uid, source_id: "test-sqs-source".to_string(), + reacquire_grace_period: Duration::from_secs(10), }; let aquired = shared_state .acquire_partitions("token1", vec!["p1".into(), "p2".into()]) .await .unwrap(); - // TODO: this test should fail once we implement the grace - // period before a partition can be re-acquired assert!(aquired.contains(&("p1".into(), Position::offset(100usize)))); assert!(aquired.contains(&("p2".into(), Position::Beginning))); } @@ -346,13 +424,18 @@ mod tests { let init_state = &[( completed_partition_id.clone(), - ("token2".to_string(), Position::eof(100usize)), + ( + "token2".to_string(), + Position::eof(100usize), + OffsetDateTime::now_utc().unix_timestamp(), + ), )]; let metastore = mock_metastore(init_state, Some(1), Some(0)); let shared_state = QueueSharedState { metastore, index_uid, source_id: "test-sqs-source".to_string(), + reacquire_grace_period: Duration::from_secs(10), }; let checkpointed_msg = checkpoint_messages(&shared_state, "token1", source_messages) diff --git a/quickwit/quickwit-indexing/src/source/queue_sources/visibility.rs b/quickwit/quickwit-indexing/src/source/queue_sources/visibility.rs index 340a6c05b95..7230186137f 100644 --- a/quickwit/quickwit-indexing/src/source/queue_sources/visibility.rs +++ b/quickwit/quickwit-indexing/src/source/queue_sources/visibility.rs @@ -40,7 +40,7 @@ pub(super) struct VisibilitySettings { pub deadline_for_default_extension: Duration, /// Rhe timeout for the visibility extension request pub request_timeout: Duration, - /// an extra margin that is substracted from the expected deadline when + /// an extra margin that is subtracted from the expected deadline when /// asserting whether we are still in time to extend the visibility pub request_margin: Duration, }