Skip to content

Commit

Permalink
Add File source with SQS notifications (#5148)
Browse files Browse the repository at this point in the history
* Refactor file source

* More explicit reponse for read_batch

* Fix clippy

* Remove batch logic from DocFileReader

* Address styling comments

* Replace FileSourceParams path with URI

* Additional URI related cleanup

* Fix unit tests to now use the URI

* Add queue source with SQS implementation

* Fix the publish_token strategy

* Fix never-used warnings

* Fix unit tests

* Abort visibility task after acknowledging

* Address smaller review comments

* Shorten visibility extension task

* Fix pulsar tests

* Adjustments after rebase

* Move object backed source to file source

* Simpler flow for adhoc file processing

* Fix tests and refactor batch creation to BatchReader

* Add max_messages param to Queue.receive

* Move use_shard_api to the metastore crate

* Dedup within batches

* Improve visibility task

* Re-acquire partitions aggressively

* Address simpler review comments

* Add test for visibility actor (failing)

* Fix visibility actor drop

* Fix reader edge case

* Add end to end tests

* Improve integration test scenario

* Chunk receive future to avoid hanging actor

* Improve error handling

* Fix flaky test

* New SourceConfig format with notifications field

* Improvements to error handling

* Clarification of Queue contract

* Address new round of review comments

* Remove SqsSource for now

* Fix panic

* Revert to forbidding any adhoc file source through the API

* Add docs

* Fix panic on empty file

* Documentation improvments

* Improve documentation

* Improve error handling code and associated docs

* Nitpic and TODO cleanup

* Add tip about ingests directly from object stores

* Ack notifications of undesired type

* Add docs about situations where messages require a DLQ

* Fix integ test after rebase
  • Loading branch information
rdettai authored Jul 31, 2024
1 parent 0caaf07 commit 6fa3c29
Show file tree
Hide file tree
Showing 62 changed files with 4,968 additions and 773 deletions.
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ networks:

services:
localstack:
image: localstack/localstack:${LOCALSTACK_VERSION:-2.3.2}
image: localstack/localstack:${LOCALSTACK_VERSION:-3.5.0}
container_name: localstack
ports:
- "${MAP_HOST_LOCALSTACK:-127.0.0.1}:4566:4566"
Expand All @@ -37,7 +37,7 @@ services:
- all
- localstack
environment:
SERVICES: kinesis,s3
SERVICES: kinesis,s3,sqs
PERSISTENCE: 1
volumes:
- .localstack:/etc/localstack/init/ready.d
Expand Down
134 changes: 134 additions & 0 deletions docs/assets/sqs-file-source.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
terraform {
required_version = "1.7.5"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.39.1"
}
}
}

provider "aws" {
region = "us-east-1"
default_tags {
tags = {
provisioner = "terraform"
author = "Quickwit"
}
}
}

locals {
sqs_notification_queue_name = "qw-tuto-s3-event-notifications"
source_bucket_name = "qw-tuto-source-bucket"
}

resource "aws_s3_bucket" "file_source" {
bucket_prefix = local.source_bucket_name
force_destroy = true
}

data "aws_iam_policy_document" "sqs_notification" {
statement {
effect = "Allow"

principals {
type = "*"
identifiers = ["*"]
}

actions = ["sqs:SendMessage"]
resources = ["arn:aws:sqs:*:*:${local.sqs_notification_queue_name}"]

condition {
test = "ArnEquals"
variable = "aws:SourceArn"
values = [aws_s3_bucket.file_source.arn]
}
}
}


resource "aws_sqs_queue" "s3_events" {
name = local.sqs_notification_queue_name
policy = data.aws_iam_policy_document.sqs_notification.json

redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.s3_events_deadletter.arn
maxReceiveCount = 5
})
}

resource "aws_sqs_queue" "s3_events_deadletter" {
name = "${locals.sqs_notification_queue_name}-deadletter"
}

resource "aws_sqs_queue_redrive_allow_policy" "s3_events_deadletter" {
queue_url = aws_sqs_queue.s3_events_deadletter.id

redrive_allow_policy = jsonencode({
redrivePermission = "byQueue",
sourceQueueArns = [aws_sqs_queue.s3_events.arn]
})
}

resource "aws_s3_bucket_notification" "bucket_notification" {
bucket = aws_s3_bucket.file_source.id

queue {
queue_arn = aws_sqs_queue.s3_events.arn
events = ["s3:ObjectCreated:*"]
}
}

data "aws_iam_policy_document" "quickwit_node" {
statement {
effect = "Allow"
actions = [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:ChangeMessageVisibility",
"sqs:GetQueueAttributes",
]
resources = [aws_sqs_queue.s3_events.arn]
}
statement {
effect = "Allow"
actions = ["s3:GetObject"]
resources = ["${aws_s3_bucket.file_source.arn}/*"]
}
}

resource "aws_iam_user" "quickwit_node" {
name = "quickwit-filesource-tutorial"
path = "/system/"
}

resource "aws_iam_user_policy" "quickwit_node" {
name = "quickwit-filesource-tutorial"
user = aws_iam_user.quickwit_node.name
policy = data.aws_iam_policy_document.quickwit_node.json
}

resource "aws_iam_access_key" "quickwit_node" {
user = aws_iam_user.quickwit_node.name
}

output "source_bucket_name" {
value = aws_s3_bucket.file_source.bucket

}

output "notification_queue_url" {
value = aws_sqs_queue.s3_events.id
}

output "quickwit_node_access_key_id" {
value = aws_iam_access_key.quickwit_node.id
sensitive = true
}

output "quickwit_node_secret_access_key" {
value = aws_iam_access_key.quickwit_node.secret
sensitive = true
}
55 changes: 51 additions & 4 deletions docs/configuration/source-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,62 @@ The source type designates the kind of source being configured. As of version 0.

The source parameters indicate how to connect to a data store and are specific to the source type.

### File source (CLI only)
### File source

A file source reads data from a local file. The file must consist of JSON objects separated by a newline (NDJSON).
As of version 0.5, a file source can only be ingested with the [CLI command](/docs/reference/cli.md#tool-local-ingest). Compressed files (bz2, gzip, ...) and remote files (Amazon S3, HTTP, ...) are not supported.
A file source reads data from files containing JSON objects separated by newlines (NDJSON). Gzip compression is supported provided that the file name ends with the `.gz` suffix.

#### Ingest a single file (CLI only)

To ingest a specific file, run the indexing directly in an adhoc CLI process with:

```bash
./quickwit tool local-ingest --index <index> --input-path <input-path>
```

Both local and object files are supported, provided that the environment is configured with the appropriate permissions. A tutorial is available [here](/docs/ingest-data/ingest-local-file.md).

#### Notification based file ingestion (beta)

Quickwit can automatically ingest all new files that are uploaded to an S3 bucket. This requires creating and configuring an [SQS notification queue](https://docs.aws.amazon.com/AmazonS3/latest/userguide/ways-to-add-notification-config-to-bucket.html). A complete example can be found [in this tutorial](/docs/ingest-data/sqs-files.md).


The `notifications` parameter takes an array of notification settings. Currently one notifier can be configured per source and only the SQS notification `type` is supported.

Required fields for the SQS `notifications` parameter items:
- `type`: `sqs`
- `queue_url`: complete URL of the SQS queue (e.g `https://sqs.us-east-1.amazonaws.com/123456789012/queue-name`)
- `message_type`: format of the message payload, either
- `s3_notification`: an [S3 event notification](https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventNotifications.html)
- `raw_uri`: a message containing just the file object URI (e.g. `s3://mybucket/mykey`)

*Adding a file source with SQS notifications to an index with the [CLI](../reference/cli.md#source)*

```bash
./quickwit tool local-ingest --input-path <INPUT_PATH>
cat << EOF > source-config.yaml
version: 0.8
source_id: my-sqs-file-source
source_type: file
num_pipelines: 2
params:
notifications:
- type: sqs
queue_url: https://sqs.us-east-1.amazonaws.com/123456789012/queue-name
message_type: s3_notification
EOF
./quickwit source create --index my-index --source-config source-config.yaml
```

:::note

- Quickwit does not automatically delete the source files after a successful ingestion. You can use [S3 object expiration](https://docs.aws.amazon.com/AmazonS3/latest/userguide/lifecycle-expire-general-considerations.html) to configure how long they should be retained in the bucket.
- Configure the notification to only forward events of type `s3:ObjectCreated:*`. Other events are acknowledged by the source without further processing and an warning is logged.
- We strongly recommend using a [dead letter queue](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html) to receive all messages that couldn't be processed by the file source. A `maxReceiveCount` of 5 is a good default value. Here are some common situations where the notification message ends up in the dead letter queue:
- the notification message could not be parsed (e.g it is not a valid S3 notification)
- the file was not found
- the file is corrupted (e.g unexpected compression)

:::

### Ingest API source

An ingest API source reads data from the [Ingest API](/docs/reference/rest-api.md#ingest-data-into-an-index). This source is automatically created at the index creation and cannot be deleted nor disabled.
Expand Down
6 changes: 6 additions & 0 deletions docs/ingest-data/ingest-local-file.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ Clearing local cache directory...
✔ Documents successfully indexed.
```

:::tip

Object store URIs like `s3://mybucket/mykey.json` are also supported as `--input-path`, provided that your environment is configured with the appropriate permissions.

:::

## Tear down resources (optional)

That's it! You can now tear down the resources you created. You can do so by running the following command:
Expand Down
Loading

0 comments on commit 6fa3c29

Please sign in to comment.