Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reacquire SQS shards only if they are stale #5338

Merged
merged 1 commit into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 88 additions & 29 deletions quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub struct QueueCoordinator {
shared_state: QueueSharedState,
local_state: QueueLocalState,
publish_token: String,
visible_settings: VisibilitySettings,
visibility_settings: VisibilitySettings,
}

impl fmt::Debug for QueueCoordinator {
Expand All @@ -102,6 +102,9 @@ impl QueueCoordinator {
metastore: source_runtime.metastore,
source_id: source_runtime.pipeline_id.source_id.clone(),
index_uid: source_runtime.pipeline_id.index_uid.clone(),
reacquire_grace_period: Duration::from_secs(
2 * source_runtime.indexing_setting.commit_timeout_secs as u64,
),
},
local_state: QueueLocalState::default(),
pipeline_id: source_runtime.pipeline_id,
Expand All @@ -113,7 +116,7 @@ impl QueueCoordinator {
message_type,
publish_lock: PublishLock::default(),
publish_token: Ulid::new().to_string(),
visible_settings: VisibilitySettings::from_commit_timeout(
visibility_settings: VisibilitySettings::from_commit_timeout(
source_runtime.indexing_setting.commit_timeout_secs,
),
}
Expand Down Expand Up @@ -157,7 +160,7 @@ impl QueueCoordinator {
async fn poll_messages(&mut self, ctx: &SourceContext) -> Result<(), ActorExitStatus> {
let raw_messages = self
.queue_receiver
.receive(1, self.visible_settings.deadline_for_receive)
.receive(1, self.visibility_settings.deadline_for_receive)
.await?;

let mut format_errors = Vec::new();
Expand Down Expand Up @@ -215,7 +218,7 @@ impl QueueCoordinator {
self.queue.clone(),
message.metadata.ack_id.clone(),
message.metadata.initial_deadline,
self.visible_settings.clone(),
self.visibility_settings.clone(),
),
content: message,
position,
Expand Down Expand Up @@ -254,7 +257,7 @@ impl QueueCoordinator {
.await?;
if in_progress_ref.batch_reader.is_eof() {
self.local_state
.drop_currently_read(self.visible_settings.deadline_for_last_extension)
.drop_currently_read(self.visibility_settings.deadline_for_last_extension)
.await?;
self.observable_state.num_messages_processed += 1;
}
Expand Down Expand Up @@ -319,7 +322,7 @@ mod tests {
use crate::source::doc_file_reader::file_test_helpers::{generate_dummy_doc_file, DUMMY_DOC};
use crate::source::queue_sources::memory_queue::MemoryQueueForTests;
use crate::source::queue_sources::message::PreProcessedPayload;
use crate::source::queue_sources::shared_state::shared_state_for_tests::shared_state_for_tests;
use crate::source::queue_sources::shared_state::shared_state_for_tests::init_state;
use crate::source::{SourceActor, BATCH_NUM_BYTES_LIMIT};

fn setup_coordinator(
Expand Down Expand Up @@ -347,7 +350,7 @@ mod tests {
source_type: SourceType::Unspecified,
storage_resolver: StorageResolver::for_test(),
publish_token: Ulid::new().to_string(),
visible_settings: VisibilitySettings::from_commit_timeout(5),
visibility_settings: VisibilitySettings::from_commit_timeout(5),
}
}

Expand Down Expand Up @@ -401,7 +404,7 @@ mod tests {
#[tokio::test]
async fn test_process_empty_queue() {
let queue = Arc::new(MemoryQueueForTests::new());
let shared_state = shared_state_for_tests("test-index", Default::default());
let shared_state = init_state("test-index", Default::default());
let mut coordinator = setup_coordinator(queue.clone(), shared_state);
let batches = process_messages(&mut coordinator, queue, &[]).await;
assert_eq!(batches.len(), 0);
Expand All @@ -410,7 +413,7 @@ mod tests {
#[tokio::test]
async fn test_process_one_small_message() {
let queue = Arc::new(MemoryQueueForTests::new());
let shared_state = shared_state_for_tests("test-index", Default::default());
let shared_state = init_state("test-index", Default::default());
let mut coordinator = setup_coordinator(queue.clone(), shared_state.clone());
let (dummy_doc_file, _) = generate_dummy_doc_file(false, 10).await;
let test_uri = Uri::from_str(dummy_doc_file.path().to_str().unwrap()).unwrap();
Expand All @@ -424,7 +427,7 @@ mod tests {
#[tokio::test]
async fn test_process_one_big_message() {
let queue = Arc::new(MemoryQueueForTests::new());
let shared_state = shared_state_for_tests("test-index", Default::default());
let shared_state = init_state("test-index", Default::default());
let mut coordinator = setup_coordinator(queue.clone(), shared_state);
let lines = BATCH_NUM_BYTES_LIMIT as usize / DUMMY_DOC.len() + 1;
let (dummy_doc_file, _) = generate_dummy_doc_file(true, lines).await;
Expand All @@ -437,7 +440,7 @@ mod tests {
#[tokio::test]
async fn test_process_two_messages_different_compression() {
let queue = Arc::new(MemoryQueueForTests::new());
let shared_state = shared_state_for_tests("test-index", Default::default());
let shared_state = init_state("test-index", Default::default());
let mut coordinator = setup_coordinator(queue.clone(), shared_state);
let (dummy_doc_file_1, _) = generate_dummy_doc_file(false, 10).await;
let test_uri_1 = Uri::from_str(dummy_doc_file_1.path().to_str().unwrap()).unwrap();
Expand All @@ -456,7 +459,7 @@ mod tests {
#[tokio::test]
async fn test_process_local_duplicate_message() {
let queue = Arc::new(MemoryQueueForTests::new());
let shared_state = shared_state_for_tests("test-index", Default::default());
let shared_state = init_state("test-index", Default::default());
let mut coordinator = setup_coordinator(queue.clone(), shared_state);
let (dummy_doc_file, _) = generate_dummy_doc_file(false, 10).await;
let test_uri = Uri::from_str(dummy_doc_file.path().to_str().unwrap()).unwrap();
Expand All @@ -477,11 +480,15 @@ mod tests {
let partition_id = PreProcessedPayload::ObjectUri(test_uri.clone()).partition_id();

let queue = Arc::new(MemoryQueueForTests::new());
let shared_state = shared_state_for_tests(
let shared_state = init_state(
"test-index",
&[(
partition_id.clone(),
("existing_token".to_string(), Position::eof(file_size)),
(
"existing_token".to_string(),
Position::eof(file_size),
false,
),
)],
);
let mut coordinator = setup_coordinator(queue.clone(), shared_state.clone());
Expand All @@ -492,30 +499,82 @@ mod tests {
assert!(coordinator.local_state.is_completed(&partition_id));
}

#[tokio::test]
async fn test_process_existing_messages() {
let (dummy_doc_file_1, _) = generate_dummy_doc_file(false, 10).await;
let test_uri_1 = Uri::from_str(dummy_doc_file_1.path().to_str().unwrap()).unwrap();
let partition_id_1 = PreProcessedPayload::ObjectUri(test_uri_1.clone()).partition_id();

let (dummy_doc_file_2, _) = generate_dummy_doc_file(false, 10).await;
let test_uri_2 = Uri::from_str(dummy_doc_file_2.path().to_str().unwrap()).unwrap();
let partition_id_2 = PreProcessedPayload::ObjectUri(test_uri_2.clone()).partition_id();

let (dummy_doc_file_3, _) = generate_dummy_doc_file(false, 10).await;
let test_uri_3 = Uri::from_str(dummy_doc_file_3.path().to_str().unwrap()).unwrap();
let partition_id_3 = PreProcessedPayload::ObjectUri(test_uri_3.clone()).partition_id();

let queue = Arc::new(MemoryQueueForTests::new());
let shared_state = init_state(
"test-index",
&[
(
partition_id_1.clone(),
("existing_token_1".to_string(), Position::Beginning, true),
),
(
partition_id_2.clone(),
(
"existing_token_2".to_string(),
Position::offset((DUMMY_DOC.len() + 1) * 2),
true,
),
),
(
partition_id_3.clone(),
(
"existing_token_3".to_string(),
Position::offset((DUMMY_DOC.len() + 1) * 6),
false, // should not be processed because not stale yet
),
),
],
);
let mut coordinator = setup_coordinator(queue.clone(), shared_state.clone());
let batches = process_messages(
&mut coordinator,
queue,
&[
(&test_uri_1, "ack-id-1"),
(&test_uri_2, "ack-id-2"),
(&test_uri_3, "ack-id-3"),
],
)
.await;
assert_eq!(batches.len(), 2);
assert_eq!(batches.iter().map(|b| b.docs.len()).sum::<usize>(), 18);
assert!(coordinator.local_state.is_awaiting_commit(&partition_id_1));
assert!(coordinator.local_state.is_awaiting_commit(&partition_id_2));
}

#[tokio::test]
async fn test_process_multiple_coordinator() {
let queue = Arc::new(MemoryQueueForTests::new());
let shared_state = shared_state_for_tests("test-index", Default::default());
let mut proc_1 = setup_coordinator(queue.clone(), shared_state.clone());
let mut proc_2 = setup_coordinator(queue.clone(), shared_state.clone());
let shared_state = init_state("test-index", Default::default());
let mut coord_1 = setup_coordinator(queue.clone(), shared_state.clone());
let mut coord_2 = setup_coordinator(queue.clone(), shared_state.clone());
let (dummy_doc_file, _) = generate_dummy_doc_file(false, 10).await;
let test_uri = Uri::from_str(dummy_doc_file.path().to_str().unwrap()).unwrap();
let partition_id = PreProcessedPayload::ObjectUri(test_uri.clone()).partition_id();

let batches_1 = process_messages(&mut proc_1, queue.clone(), &[(&test_uri, "ack1")]).await;
let batches_2 = process_messages(&mut proc_2, queue, &[(&test_uri, "ack2")]).await;
let batches_1 = process_messages(&mut coord_1, queue.clone(), &[(&test_uri, "ack1")]).await;
let batches_2 = process_messages(&mut coord_2, queue, &[(&test_uri, "ack2")]).await;

assert_eq!(batches_1.len(), 1);
assert_eq!(batches_1[0].docs.len(), 10);
assert!(proc_1.local_state.is_awaiting_commit(&partition_id));
// proc_2 doesn't know for sure what is happening with the message
// (proc_1 might have crashed), so it just acquires it and takes over
// processing
//
// TODO: this test should fail once we implement the grace
// period before a partition can be re-acquired
assert_eq!(batches_2.len(), 1);
assert_eq!(batches_2[0].docs.len(), 10);
assert!(proc_2.local_state.is_awaiting_commit(&partition_id));
assert!(coord_1.local_state.is_awaiting_commit(&partition_id));
// proc_2 learns from shared state that the message is likely still
// being processed and skips it
assert_eq!(batches_2.len(), 0);
assert!(!coord_2.local_state.is_tracked(&partition_id));
}
}
Loading
Loading