Skip to content

Commit

Permalink
Garbage collect shards in SQS Filesource (#5339)
Browse files Browse the repository at this point in the history
* Garbage collect shards in SQS Filesource

* Run shard pruning in a background task

* Expose deduplication window to users

* Add integration test

* Rename cleanup interval config

* Address smaller review comments

* Change strong_count to Weak

* High level design

* Remove unpure iterators

* Rewrite time operation to rule out underflow

* Remove inappropiate unwrap and fix typo

* Refactor un-necessary deadline_for_last_extension paramter

* Add more details to design document

* Clarify what checkpoint_messages does
  • Loading branch information
rdettai authored Sep 19, 2024
1 parent c9dfb6d commit 565becd
Show file tree
Hide file tree
Showing 18 changed files with 515 additions and 199 deletions.
4 changes: 4 additions & 0 deletions docs/configuration/source-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ Required fields for the SQS `notifications` parameter items:
- `message_type`: format of the message payload, either
- `s3_notification`: an [S3 event notification](https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventNotifications.html)
- `raw_uri`: a message containing just the file object URI (e.g. `s3://mybucket/mykey`)
- `deduplication_window_duration_sec`: maximum duration for which ingested files checkpoints are kept (default 3600)
- `deduplication_window_max_messages`: maximum number of ingested file checkpoints kept (default 100k)
- `deduplication_cleanup_interval_secs`: frequency at which outdated file checkpoints are cleaned up

*Adding a file source with SQS notifications to an index with the [CLI](../reference/cli.md#source)*

Expand All @@ -82,6 +85,7 @@ EOF
- the notification message could not be parsed (e.g it is not a valid S3 notification)
- the file was not found
- the file is corrupted (e.g unexpected compression)
- AWS S3 notifications and AWS SQS provide "at least once" delivery guaranties. To avoid duplicates, the file source includes a mechanism that prevents the same file from being ingested twice. It works by storing checkpoints in the metastore that track the indexing progress for each file. You can decrease `deduplication_window_*` or increase `deduplication_cleanup_interval_secs` to reduce the load on the metastore.

:::

Expand Down
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 31 additions & 1 deletion quickwit/quickwit-config/src/source_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,24 @@ pub enum FileSourceMessageType {
pub struct FileSourceSqs {
pub queue_url: String,
pub message_type: FileSourceMessageType,
#[serde(default = "default_deduplication_window_duration_secs")]
pub deduplication_window_duration_secs: u32,
#[serde(default = "default_deduplication_window_max_messages")]
pub deduplication_window_max_messages: u32,
#[serde(default = "default_deduplication_cleanup_interval_secs")]
pub deduplication_cleanup_interval_secs: u32,
}

fn default_deduplication_window_duration_secs() -> u32 {
3600
}

fn default_deduplication_window_max_messages() -> u32 {
100_000
}

fn default_deduplication_cleanup_interval_secs() -> u32 {
60
}

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
Expand Down Expand Up @@ -891,12 +909,24 @@ mod tests {
queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/queue-name"
.to_string(),
message_type: FileSourceMessageType::S3Notification,
deduplication_window_duration_secs: default_deduplication_window_duration_secs(
),
deduplication_window_max_messages: default_deduplication_window_max_messages(),
deduplication_cleanup_interval_secs:
default_deduplication_cleanup_interval_secs()
})),
);
let file_params_reserialized = serde_json::to_value(&file_params_deserialized).unwrap();
assert_eq!(
file_params_reserialized,
json!({"notifications": [{"type": "sqs", "queue_url": "https://sqs.us-east-1.amazonaws.com/123456789012/queue-name", "message_type": "s3_notification"}]})
json!({"notifications": [{
"type": "sqs",
"queue_url": "https://sqs.us-east-1.amazonaws.com/123456789012/queue-name",
"message_type": "s3_notification",
"deduplication_window_duration_secs": default_deduplication_window_duration_secs(),
"deduplication_window_max_messages": default_deduplication_window_max_messages(),
"deduplication_cleanup_interval_secs": default_deduplication_cleanup_interval_secs(),
}]})
);
}
{
Expand Down
3 changes: 3 additions & 0 deletions quickwit/quickwit-indexing/src/source/file_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,9 @@ mod localstack_tests {
FileSourceParams::Notifications(FileSourceNotification::Sqs(FileSourceSqs {
queue_url,
message_type: FileSourceMessageType::RawUri,
deduplication_window_duration_secs: 100,
deduplication_window_max_messages: 100,
deduplication_cleanup_interval_secs: 60,
}));
let source_config = SourceConfig::for_test(
"test-file-source-sqs-notifications",
Expand Down
43 changes: 28 additions & 15 deletions quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use quickwit_config::{FileSourceMessageType, FileSourceSqs};
use quickwit_metastore::checkpoint::SourceCheckpoint;
use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::metastore::SourceType;
use quickwit_proto::types::SourceUid;
use quickwit_storage::StorageResolver;
use serde::Serialize;
use ulid::Ulid;
Expand Down Expand Up @@ -96,16 +97,22 @@ impl QueueCoordinator {
source_runtime: SourceRuntime,
queue: Arc<dyn Queue>,
message_type: MessageType,
shard_max_age: Option<Duration>,
shard_max_count: Option<u32>,
shard_pruning_interval: Duration,
) -> Self {
Self {
shared_state: QueueSharedState {
metastore: source_runtime.metastore,
source_id: source_runtime.pipeline_id.source_id.clone(),
index_uid: source_runtime.pipeline_id.index_uid.clone(),
reacquire_grace_period: Duration::from_secs(
2 * source_runtime.indexing_setting.commit_timeout_secs as u64,
),
},
shared_state: QueueSharedState::new(
source_runtime.metastore,
SourceUid {
index_uid: source_runtime.pipeline_id.index_uid.clone(),
source_id: source_runtime.pipeline_id.source_id.clone(),
},
Duration::from_secs(2 * source_runtime.indexing_setting.commit_timeout_secs as u64),
shard_max_age,
shard_max_count,
shard_pruning_interval,
),
local_state: QueueLocalState::default(),
pipeline_id: source_runtime.pipeline_id,
source_type: source_runtime.source_config.source_type(),
Expand Down Expand Up @@ -133,10 +140,14 @@ impl QueueCoordinator {
FileSourceMessageType::S3Notification => MessageType::S3Notification,
FileSourceMessageType::RawUri => MessageType::RawUri,
};
let shard_max_age = Duration::from_secs(config.deduplication_window_duration_secs as u64);
Ok(QueueCoordinator::new(
source_runtime,
Arc::new(queue),
message_type,
Some(shard_max_age),
Some(config.deduplication_window_max_messages),
Duration::from_secs(config.deduplication_cleanup_interval_secs as u64),
))
}

Expand Down Expand Up @@ -203,8 +214,12 @@ impl QueueCoordinator {
}
}

let checkpointed_messages =
checkpoint_messages(&self.shared_state, &self.publish_token, untracked_locally).await?;
let checkpointed_messages = checkpoint_messages(
&mut self.shared_state,
&self.publish_token,
untracked_locally,
)
.await?;

let mut ready_messages = Vec::new();
for (message, position) in checkpointed_messages {
Expand Down Expand Up @@ -256,9 +271,7 @@ impl QueueCoordinator {
.send_message(batch_builder.build())
.await?;
if in_progress_ref.batch_reader.is_eof() {
self.local_state
.drop_currently_read(self.visibility_settings.deadline_for_last_extension)
.await?;
self.local_state.drop_currently_read().await?;
self.observable_state.num_messages_processed += 1;
}
} else if let Some(ready_message) = self.local_state.get_ready_for_read() {
Expand Down Expand Up @@ -331,8 +344,8 @@ mod tests {
) -> QueueCoordinator {
let pipeline_id = IndexingPipelineId {
node_id: NodeId::from_str("test-node").unwrap(),
index_uid: shared_state.index_uid.clone(),
source_id: shared_state.source_id.clone(),
index_uid: shared_state.source_uid.index_uid.clone(),
source_id: shared_state.source_uid.source_id.clone(),
pipeline_uid: PipelineUid::random(),
};

Expand Down
59 changes: 59 additions & 0 deletions quickwit/quickwit-indexing/src/source/queue_sources/design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Queue source design

## Exactly once

Besides the usual failures that can happen during indexing, most queues are also subject to duplicates. To ensure that all object files are indexed exactly once, we track the progress of their indexing using the shard table:
- each file object is tracked as a shard, the file URI is the shard ID
- progress made on the indexing of a given shard is committed in the shard table in a common transaction with the split publishing
- after some time (called deduplication window) shards are garbage collected to keep the size of the shard table small

## Visibility extension task

To keep messages invisible to other pipelines while they are being processed, each received message spawns a visibility extension task. This task is responsible of extending the visibility timeout each time the visibility deadlines approaches. When the last batch is read for the message and sent to the indexing pipeline:
- a last visibility extension is requested to give time for the indexing to complete (typically twice the commit timeout)
- the visibility extension task stopped

## Cleanup of old shards

Garbage collection is owned by the queue based sources. Each pipeline with a queue source will spawn a garbage collection task. To avoid having an increased load on the metastore as the number of pipeline scales, garbage collection calls are debounced by the control plane.

## Onboarding new queues

This module is meant to be generic enough to:
- use other queue implementations, such as GCP Pub/Sub
- source the data from other sources than object storage, e.g directly from the message

Note that because every single messages is tracked by the metastore, this design will not behave well with high message rates. For instance it is not meant to be efficient with a data stream where every message contains a single event. As a rule of thumb, to protect the metastore, it is discouraged to try processing more than 50 messages per second with this design. This means that high throughput can only be achieved with larger contents for each message (e.g larger files when the using the file source with queue notifications).

## Implementation

The `QueueCoordinator` is a concrete implementation of the machinery necessary to consume data from a queue, from the message reception to its acknowledgment after indexing. The `QueueCoordinator` interacts with 3 main components.

### The `Queue`

The `Queue` is an abstract interface that can represent any queue implementation (AWS SQS, Google Pub/Sub...). It is sufficient that the queue guaranties at least one delivery of its messages. The abstraction reduces the actual queue's API surface to 3 main functions:
- receive messages that are ready to be processed
- extend their visibility timeout, i.e delay the time at which a message is visible again to other consumers
- acknowledge messages, i.e delete them definitively from the queue after successful indexing

### The `QueueLocalState`

The local state is an in memory data structure that keeps track of the knowledge that the current source has of recently received messages. It manages the transitions of messages between 4 states:
- ready for read
- read in progress
- awaiting commit
- completed


### The `QueueSharedState`

The shared state is a client of the Shard API, a metastore API that was mainly designed to serve ingest V2. The Shard API improves on the previous checkpoint API which was stored as a blob in one of the fields of the index model. The flow is the following one:

The queue source opens a shard, using an ID that uniquely identifies the content of the message as shard ID. For the file source, the shard ID is the file URI. Each source has a unique publish token that is provided in the `OpenShards` metastore request. The response of the `OpenShards` requests returns the token of the caller that called the API first. Either:
- The returned token matches the current pipeline's token. This means that we have the ownership of this message content and can proceed with its indexing
- The returned token does not match the current pipeline's token. This means that another pipeline has the ownership. In that case, we look at the content of the shard:
- if it's already completely processed (EOF), we acknowledge the message drop it
- if its last update timestamp is old (e.g twice the commit timeout), we assume the processing of the content to be stale (e.g the owning pipeline failed). We perform an `AcquireShards` call to update the shard's token in the metastore with the local one. This indicates subsequent attempts to process the shard that this pipeline now has its ownership. Note though that this is subject to a race conditions: 2 pipelines might acquire the shard concurrently. In that case both pipelines will assume that it owns the shard, and one of them will fail at commit time.
- if its last update timestamp is recent, we assume that the processing of the content is still in progress in another pipeline. We just drop the message (without any acknowledgment) and let it be re-processed once its visibility timeout expires.

The `QueueSharedState` also owns the background task that will periodically initiate a call to `PruneShards` to garbage collect old shards.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::time::Duration;

use anyhow::bail;
use quickwit_metastore::checkpoint::PartitionId;
Expand Down Expand Up @@ -95,18 +94,15 @@ impl QueueLocalState {
self.read_in_progress.as_mut()
}

pub async fn drop_currently_read(
&mut self,
deadline_for_last_extension: Duration,
) -> anyhow::Result<()> {
pub async fn drop_currently_read(&mut self) -> anyhow::Result<()> {
if let Some(in_progress) = self.read_in_progress.take() {
self.awaiting_commit.insert(
in_progress.partition_id.clone(),
in_progress.visibility_handle.ack_id().to_string(),
);
in_progress
.visibility_handle
.request_last_extension(deadline_for_last_extension)
.request_last_extension()
.await?;
}
Ok(())
Expand Down
Loading

0 comments on commit 565becd

Please sign in to comment.