Skip to content

Commit

Permalink
feat(sdk): dead letter queue support for queues (#6060)
Browse files Browse the repository at this point in the history
⛔ As the dead-letter queue linked to the function is a specific case of AWS, I will remove it from the scope of this PR.

- [x] dlq (with retries) for queue (tf-aws / awscdk)
- [x] dlq (with retries) for queue (sim)
~dlq for function (tf-aws / awscdk)~
~dlq for function (sim)~

Closes #6033

## Checklist

- [x] Title matches [Winglang's style guide](https://www.winglang.io/contributing/start-here/pull_requests#how-are-pull-request-titles-formatted)
- [x] Description explains motivation and solution
- [x] Tests added (always)
- [ ] Docs updated (only required for features)
- [ ] Added `pr/e2e-full` label if this feature requires end-to-end testing

*By submitting this pull request, I confirm that my contribution is made under the terms of the [Wing Cloud Contribution License](https://github.com/winglang/wing/blob/main/CONTRIBUTION_LICENSE.md)*.
  • Loading branch information
marciocadev authored Apr 20, 2024
1 parent 8d9d77e commit 2dea835
Show file tree
Hide file tree
Showing 33 changed files with 777 additions and 46 deletions.
76 changes: 76 additions & 0 deletions docs/docs/04-standard-library/cloud/queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,22 @@ new cloud.Function(inflight () => {
});
```

### Adding a dead-letter queue

Creating a queue and adding a dead-letter queue with the maximum number of attempts configured

```ts playground
bring cloud;

let dlq = new cloud.Queue() as "dead-letter queue";
let q = new cloud.Queue(
dlq: {
queue: dlq,
maxDeliveryAttempts: 2
}
);
```

### Referencing an external queue

If you would like to reference an existing queue from within your application you can use the
Expand Down Expand Up @@ -270,6 +286,52 @@ The tree node.

## Structs <a name="Structs" id="Structs"></a>

### DeadLetterQueueProps <a name="DeadLetterQueueProps" id="@winglang/sdk.cloud.DeadLetterQueueProps"></a>

Dead letter queue options.

#### Initializer <a name="Initializer" id="@winglang/sdk.cloud.DeadLetterQueueProps.Initializer"></a>

```wing
bring cloud;
let DeadLetterQueueProps = cloud.DeadLetterQueueProps{ ... };
```

#### Properties <a name="Properties" id="Properties"></a>

| **Name** | **Type** | **Description** |
| --- | --- | --- |
| <code><a href="#@winglang/sdk.cloud.DeadLetterQueueProps.property.queue">queue</a></code> | <code><a href="#@winglang/sdk.cloud.Queue">Queue</a></code> | Queue to receive messages that failed processing. |
| <code><a href="#@winglang/sdk.cloud.DeadLetterQueueProps.property.maxDeliveryAttempts">maxDeliveryAttempts</a></code> | <code>num</code> | Number of times a message will be processed before being sent to the dead-letter queue. |

---

##### `queue`<sup>Required</sup> <a name="queue" id="@winglang/sdk.cloud.DeadLetterQueueProps.property.queue"></a>

```wing
queue: Queue;
```

- *Type:* <a href="#@winglang/sdk.cloud.Queue">Queue</a>

Queue to receive messages that failed processing.

---

##### `maxDeliveryAttempts`<sup>Optional</sup> <a name="maxDeliveryAttempts" id="@winglang/sdk.cloud.DeadLetterQueueProps.property.maxDeliveryAttempts"></a>

```wing
maxDeliveryAttempts: num;
```

- *Type:* num
- *Default:* 1

Number of times a message will be processed before being sent to the dead-letter queue.

---

### QueueProps <a name="QueueProps" id="@winglang/sdk.cloud.QueueProps"></a>

Options for `Queue`.
Expand All @@ -286,11 +348,25 @@ let QueueProps = cloud.QueueProps{ ... };

| **Name** | **Type** | **Description** |
| --- | --- | --- |
| <code><a href="#@winglang/sdk.cloud.QueueProps.property.dlq">dlq</a></code> | <code><a href="#@winglang/sdk.cloud.DeadLetterQueueProps">DeadLetterQueueProps</a></code> | A dead-letter queue. |
| <code><a href="#@winglang/sdk.cloud.QueueProps.property.retentionPeriod">retentionPeriod</a></code> | <code><a href="#@winglang/sdk.std.Duration">duration</a></code> | How long a queue retains a message. |
| <code><a href="#@winglang/sdk.cloud.QueueProps.property.timeout">timeout</a></code> | <code><a href="#@winglang/sdk.std.Duration">duration</a></code> | How long a queue's consumers have to process a message. |

---

##### `dlq`<sup>Optional</sup> <a name="dlq" id="@winglang/sdk.cloud.QueueProps.property.dlq"></a>

```wing
dlq: DeadLetterQueueProps;
```

- *Type:* <a href="#@winglang/sdk.cloud.DeadLetterQueueProps">DeadLetterQueueProps</a>
- *Default:* no dead letter queue

A dead-letter queue.

---

##### `retentionPeriod`<sup>Optional</sup> <a name="retentionPeriod" id="@winglang/sdk.cloud.QueueProps.property.retentionPeriod"></a>

```wing
Expand Down
14 changes: 14 additions & 0 deletions docs/docs/04-standard-library/cloud/topic.md
Original file line number Diff line number Diff line change
Expand Up @@ -395,11 +395,25 @@ let TopicSubscribeQueueOptions = cloud.TopicSubscribeQueueOptions{ ... };

| **Name** | **Type** | **Description** |
| --- | --- | --- |
| <code><a href="#@winglang/sdk.cloud.TopicSubscribeQueueOptions.property.dlq">dlq</a></code> | <code><a href="#@winglang/sdk.cloud.DeadLetterQueueProps">DeadLetterQueueProps</a></code> | A dead-letter queue. |
| <code><a href="#@winglang/sdk.cloud.TopicSubscribeQueueOptions.property.retentionPeriod">retentionPeriod</a></code> | <code><a href="#@winglang/sdk.std.Duration">duration</a></code> | How long a queue retains a message. |
| <code><a href="#@winglang/sdk.cloud.TopicSubscribeQueueOptions.property.timeout">timeout</a></code> | <code><a href="#@winglang/sdk.std.Duration">duration</a></code> | How long a queue's consumers have to process a message. |

---

##### `dlq`<sup>Optional</sup> <a name="dlq" id="@winglang/sdk.cloud.TopicSubscribeQueueOptions.property.dlq"></a>

```wing
dlq: DeadLetterQueueProps;
```

- *Type:* <a href="#@winglang/sdk.cloud.DeadLetterQueueProps">DeadLetterQueueProps</a>
- *Default:* no dead letter queue

A dead-letter queue.

---

##### `retentionPeriod`<sup>Optional</sup> <a name="retentionPeriod" id="@winglang/sdk.cloud.TopicSubscribeQueueOptions.property.retentionPeriod"></a>

```wing
Expand Down
63 changes: 63 additions & 0 deletions examples/tests/sdk_tests/queue/dead-letter-queue.test.w
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
bring cloud;
bring util;

let counter_received_messages = new cloud.Counter();

let dlq_without_retries = new cloud.Queue() as "dlq without retries";
let queue_without_retries = new cloud.Queue(
dlq: { queue: dlq_without_retries }
) as "queue without retries";
queue_without_retries.setConsumer(inflight (msg: str) => {
counter_received_messages.inc(1, msg);
if msg == "fail" {
throw "error";
}
});


new std.Test(
inflight () => {
queue_without_retries.push("Hello");
queue_without_retries.push("fail");
queue_without_retries.push("World!");

// wait until it executes once.
assert(util.waitUntil(inflight () => { return counter_received_messages.peek("Hello") == 1; }));
assert(util.waitUntil(inflight () => { return counter_received_messages.peek("World!") == 1; }));
assert(util.waitUntil(inflight () => { return counter_received_messages.peek("fail") == 1; }));

// check if the "fail" message has arrived at the dead-letter queue
assert(util.waitUntil(inflight () => { return dlq_without_retries.pop() == "fail"; }));
},
// To make this test work on AWS, it's necessary to set a high timeout
// because if the message fails, we have to wait for the visibility timeout
// to expire in order to retrieve the same message from the queue again.
timeout: 5m) as "one execution and send fail message to dead-letter queue";

let dlq_with_retries = new cloud.Queue() as "dlq with retries";
let queue_with_retries = new cloud.Queue(
dlq: {
queue: dlq_with_retries,
maxDeliveryAttempts: 2
}
) as "queue with retries";
queue_with_retries.setConsumer(inflight (msg: str) => {
counter_received_messages.inc(1, msg);
if msg == "fail" {
throw "error";
}
});

new std.Test(inflight () => {
queue_with_retries.push("Hello");
queue_with_retries.push("fail");
queue_with_retries.push("World!");

// wait until it executes once and retry one more times.
assert(util.waitUntil(inflight () => { return counter_received_messages.peek("Hello") == 1; }));
assert(util.waitUntil(inflight () => { return counter_received_messages.peek("World!") == 1; }));
assert(util.waitUntil(inflight () => { return counter_received_messages.peek("fail") == 2; }));

// check if the "fail" message has arrived at the dead-letter queue
assert(util.waitUntil(inflight () => { return dlq_with_retries.pop() == "fail"; }));
}, timeout: 5m) as "one execution, two retries and send the fail message to dead-letter queue";
20 changes: 17 additions & 3 deletions libs/awscdk/src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { App } from "./app";
import { std, core, cloud } from "@winglang/sdk";
import { convertBetweenHandlers } from "@winglang/sdk/lib/shared/convert";
import { calculateQueuePermissions } from "@winglang/sdk/lib/shared-aws/permissions";
import { IAwsQueue } from "@winglang/sdk/lib/shared-aws/queue";
import { IAwsQueue, Queue as AwsQueue } from "@winglang/sdk/lib/shared-aws/queue";
import { addPolicyStatements, isAwsCdkFunction } from "./function";

/**
Expand All @@ -23,14 +23,27 @@ export class Queue extends cloud.Queue implements IAwsQueue {
super(scope, id, props);
this.timeout = props.timeout ?? std.Duration.fromSeconds(30);

this.queue = new SQSQueue(this, "Default", {
const queueOpt = props.dlq ? {
visibilityTimeout: props.timeout
? Duration.seconds(props.timeout?.seconds)
: Duration.seconds(30),
retentionPeriod: props.retentionPeriod
? Duration.seconds(props.retentionPeriod?.seconds)
: Duration.hours(1),
});
deadLetterQueue: {
queue: SQSQueue.fromQueueArn(this, "DeadLetterQueue", AwsQueue.from(props.dlq.queue)?.queueArn!),
maxReceiveCount: props.dlq.maxDeliveryAttempts ?? cloud.DEFAULT_DELIVERY_ATTEMPTS,
}
} : {
visibilityTimeout: props.timeout
? Duration.seconds(props.timeout?.seconds)
: Duration.seconds(30),
retentionPeriod: props.retentionPeriod
? Duration.seconds(props.retentionPeriod?.seconds)
: Duration.hours(1),
}

this.queue = new SQSQueue(this, "Default", queueOpt);
}

public setConsumer(
Expand Down Expand Up @@ -63,6 +76,7 @@ export class Queue extends cloud.Queue implements IAwsQueue {

const eventSource = new SqsEventSource(this.queue, {
batchSize: props.batchSize ?? 1,
reportBatchItemFailures: true,
});

fn.awscdkFunction.addEventSource(eventSource);
Expand Down
3 changes: 3 additions & 0 deletions libs/awscdk/test/__snapshots__/queue.test.ts.snap
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ exports[`queue with a consumer function 2`] = `
"FunctionName": {
"Ref": "QueueSetConsumer06749388A",
},
"FunctionResponseTypes": [
"ReportBatchItemFailures",
],
},
"Type": "AWS::Lambda::EventSourceMapping",
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ source: libs/wingc/src/lsp/completions.rs
kind: 7
documentation:
kind: markdown
value: "```wing\nclass Queue\n```\n---\nA queue.\n\n### Initializer\n- `...props` — `QueueProps?`\n \n - `retentionPeriod?` — `duration?` — How long a queue retains a message.\n - `timeout?` — `duration?` — How long a queue's consumers have to process a message.\n### Fields\n- `node` — `Node` — The tree node.\n### Methods\n- `approxSize` — `inflight (): num` — Retrieve the approximate number of messages in the queue.\n- `isConstruct` — `preflight (x: any): bool` — Checks if `x` is a construct.\n- `onLift` — `preflight (host: IInflightHost, ops: Array<str>): void` — A hook called by the Wing compiler once for each inflight host that needs to use this resource inflight.\n- `onLiftType` — `preflight (host: IInflightHost, ops: Array<str>): void` — A hook called by the Wing compiler once for each inflight host that needs to use this type inflight.\n- `pop` — `inflight (): str?` — Pop a message from the queue.\n- `purge` — `inflight (): void` — Purge all of the messages in the queue.\n- `push` — `inflight (...messages: Array<str>?): void` — Push one or more messages to the queue.\n- `setConsumer` — `preflight (handler: inflight (message: str): void, props: QueueSetConsumerOptions?): Function` — Create a function to consume messages from this queue.\n- `toString` — `preflight (): str` — Returns a string representation of this construct."
value: "```wing\nclass Queue\n```\n---\nA queue.\n\n### Initializer\n- `...props` — `QueueProps?`\n \n - `dlq?` — `DeadLetterQueueProps?` — A dead-letter queue.\n - `retentionPeriod?` — `duration?` — How long a queue retains a message.\n - `timeout?` — `duration?` — How long a queue's consumers have to process a message.\n### Fields\n- `node` — `Node` — The tree node.\n### Methods\n- `approxSize` — `inflight (): num` — Retrieve the approximate number of messages in the queue.\n- `isConstruct` — `preflight (x: any): bool` — Checks if `x` is a construct.\n- `onLift` — `preflight (host: IInflightHost, ops: Array<str>): void` — A hook called by the Wing compiler once for each inflight host that needs to use this resource inflight.\n- `onLiftType` — `preflight (host: IInflightHost, ops: Array<str>): void` — A hook called by the Wing compiler once for each inflight host that needs to use this type inflight.\n- `pop` — `inflight (): str?` — Pop a message from the queue.\n- `purge` — `inflight (): void` — Purge all of the messages in the queue.\n- `push` — `inflight (...messages: Array<str>?): void` — Push one or more messages to the queue.\n- `setConsumer` — `preflight (handler: inflight (message: str): void, props: QueueSetConsumerOptions?): Function` — Create a function to consume messages from this queue.\n- `toString` — `preflight (): str` — Returns a string representation of this construct."
sortText: gg|Queue
- label: Schedule
kind: 7
Expand Down Expand Up @@ -229,6 +229,12 @@ source: libs/wingc/src/lsp/completions.rs
kind: markdown
value: "```wing\nstruct CounterProps\n```\n---\nOptions for `Counter`.\n### Fields\n- `initial?` — `num?` — The initial value of the counter."
sortText: hh|CounterProps
- label: DeadLetterQueueProps
kind: 22
documentation:
kind: markdown
value: "```wing\nstruct DeadLetterQueueProps\n```\n---\nDead letter queue options.\n### Fields\n- `queue` — `Queue` — Queue to receive messages that failed processing.\n- `maxDeliveryAttempts?` — `num?` — Number of times a message will be processed before being sent to the dead-letter queue."
sortText: hh|DeadLetterQueueProps
- label: DomainProps
kind: 22
documentation:
Expand Down Expand Up @@ -269,7 +275,7 @@ source: libs/wingc/src/lsp/completions.rs
kind: 22
documentation:
kind: markdown
value: "```wing\nstruct QueueProps\n```\n---\nOptions for `Queue`.\n### Fields\n- `retentionPeriod?` — `duration?` — How long a queue retains a message.\n- `timeout?` — `duration?` — How long a queue's consumers have to process a message."
value: "```wing\nstruct QueueProps\n```\n---\nOptions for `Queue`.\n### Fields\n- `dlq?` — `DeadLetterQueueProps?` — A dead-letter queue.\n- `retentionPeriod?` — `duration?` — How long a queue retains a message.\n- `timeout?` — `duration?` — How long a queue's consumers have to process a message."
sortText: hh|QueueProps
- label: QueueSetConsumerOptions
kind: 22
Expand Down Expand Up @@ -323,7 +329,7 @@ source: libs/wingc/src/lsp/completions.rs
kind: 22
documentation:
kind: markdown
value: "```wing\nstruct TopicSubscribeQueueOptions extends QueueProps\n```\n---\nOptions for `Topic.subscribeQueue`.\n### Fields\n- `retentionPeriod?` — `duration?`\n- `timeout?` — `duration?`"
value: "```wing\nstruct TopicSubscribeQueueOptions extends QueueProps\n```\n---\nOptions for `Topic.subscribeQueue`.\n### Fields\n- `dlq?` — `DeadLetterQueueProps?`\n- `retentionPeriod?` — `duration?`\n- `timeout?` — `duration?`"
sortText: hh|TopicSubscribeQueueOptions
- label: WebsiteDomainOptions
kind: 22
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ source: libs/wingc/src/lsp/completions.rs
kind: 7
documentation:
kind: markdown
value: "```wing\nclass Queue\n```\n---\nA queue.\n\n### Initializer\n- `...props` — `QueueProps?`\n \n - `retentionPeriod?` — `duration?` — How long a queue retains a message.\n - `timeout?` — `duration?` — How long a queue's consumers have to process a message.\n### Fields\n- `node` — `Node` — The tree node.\n### Methods\n- `approxSize` — `inflight (): num` — Retrieve the approximate number of messages in the queue.\n- `isConstruct` — `preflight (x: any): bool` — Checks if `x` is a construct.\n- `onLift` — `preflight (host: IInflightHost, ops: Array<str>): void` — A hook called by the Wing compiler once for each inflight host that needs to use this resource inflight.\n- `onLiftType` — `preflight (host: IInflightHost, ops: Array<str>): void` — A hook called by the Wing compiler once for each inflight host that needs to use this type inflight.\n- `pop` — `inflight (): str?` — Pop a message from the queue.\n- `purge` — `inflight (): void` — Purge all of the messages in the queue.\n- `push` — `inflight (...messages: Array<str>?): void` — Push one or more messages to the queue.\n- `setConsumer` — `preflight (handler: inflight (message: str): void, props: QueueSetConsumerOptions?): Function` — Create a function to consume messages from this queue.\n- `toString` — `preflight (): str` — Returns a string representation of this construct."
value: "```wing\nclass Queue\n```\n---\nA queue.\n\n### Initializer\n- `...props` — `QueueProps?`\n \n - `dlq?` — `DeadLetterQueueProps?` — A dead-letter queue.\n - `retentionPeriod?` — `duration?` — How long a queue retains a message.\n - `timeout?` — `duration?` — How long a queue's consumers have to process a message.\n### Fields\n- `node` — `Node` — The tree node.\n### Methods\n- `approxSize` — `inflight (): num` — Retrieve the approximate number of messages in the queue.\n- `isConstruct` — `preflight (x: any): bool` — Checks if `x` is a construct.\n- `onLift` — `preflight (host: IInflightHost, ops: Array<str>): void` — A hook called by the Wing compiler once for each inflight host that needs to use this resource inflight.\n- `onLiftType` — `preflight (host: IInflightHost, ops: Array<str>): void` — A hook called by the Wing compiler once for each inflight host that needs to use this type inflight.\n- `pop` — `inflight (): str?` — Pop a message from the queue.\n- `purge` — `inflight (): void` — Purge all of the messages in the queue.\n- `push` — `inflight (...messages: Array<str>?): void` — Push one or more messages to the queue.\n- `setConsumer` — `preflight (handler: inflight (message: str): void, props: QueueSetConsumerOptions?): Function` — Create a function to consume messages from this queue.\n- `toString` — `preflight (): str` — Returns a string representation of this construct."
sortText: gg|Queue
- label: Schedule
kind: 7
Expand Down Expand Up @@ -229,6 +229,12 @@ source: libs/wingc/src/lsp/completions.rs
kind: markdown
value: "```wing\nstruct CounterProps\n```\n---\nOptions for `Counter`.\n### Fields\n- `initial?` — `num?` — The initial value of the counter."
sortText: hh|CounterProps
- label: DeadLetterQueueProps
kind: 22
documentation:
kind: markdown
value: "```wing\nstruct DeadLetterQueueProps\n```\n---\nDead letter queue options.\n### Fields\n- `queue` — `Queue` — Queue to receive messages that failed processing.\n- `maxDeliveryAttempts?` — `num?` — Number of times a message will be processed before being sent to the dead-letter queue."
sortText: hh|DeadLetterQueueProps
- label: DomainProps
kind: 22
documentation:
Expand Down Expand Up @@ -269,7 +275,7 @@ source: libs/wingc/src/lsp/completions.rs
kind: 22
documentation:
kind: markdown
value: "```wing\nstruct QueueProps\n```\n---\nOptions for `Queue`.\n### Fields\n- `retentionPeriod?` — `duration?` — How long a queue retains a message.\n- `timeout?` — `duration?` — How long a queue's consumers have to process a message."
value: "```wing\nstruct QueueProps\n```\n---\nOptions for `Queue`.\n### Fields\n- `dlq?` — `DeadLetterQueueProps?` — A dead-letter queue.\n- `retentionPeriod?` — `duration?` — How long a queue retains a message.\n- `timeout?` — `duration?` — How long a queue's consumers have to process a message."
sortText: hh|QueueProps
- label: QueueSetConsumerOptions
kind: 22
Expand Down Expand Up @@ -323,7 +329,7 @@ source: libs/wingc/src/lsp/completions.rs
kind: 22
documentation:
kind: markdown
value: "```wing\nstruct TopicSubscribeQueueOptions extends QueueProps\n```\n---\nOptions for `Topic.subscribeQueue`.\n### Fields\n- `retentionPeriod?` — `duration?`\n- `timeout?` — `duration?`"
value: "```wing\nstruct TopicSubscribeQueueOptions extends QueueProps\n```\n---\nOptions for `Topic.subscribeQueue`.\n### Fields\n- `dlq?` — `DeadLetterQueueProps?`\n- `retentionPeriod?` — `duration?`\n- `timeout?` — `duration?`"
sortText: hh|TopicSubscribeQueueOptions
- label: WebsiteDomainOptions
kind: 22
Expand Down
Loading

0 comments on commit 2dea835

Please sign in to comment.