Skip to content

Commit

Permalink
fix(sdk): aws queue client does not delete popped messages (#4314)
Browse files Browse the repository at this point in the history
## Checklist

- [X] Title matches [Winglang's style guide](https://www.winglang.io/contributing/start-here/pull_requests#how-are-pull-request-titles-formatted)
- [ ] Description explains motivation and solution
- [X] Tests added (always)
- [ ] Docs updated (only required for features)
- [ ] Added `pr/e2e-full` label if this feature requires end-to-end testing

*By submitting this pull request, I confirm that my contribution is made under the terms of the [Wing Cloud Contribution License](https://github.com/winglang/wing/blob/main/CONTRIBUTION_LICENSE.md)*.
  • Loading branch information
flyingImer authored Sep 29, 2023
1 parent c6d50eb commit fac3f72
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 9 deletions.
8 changes: 7 additions & 1 deletion examples/tests/sdk_tests/queue/pop.test.w
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
bring cloud;
bring util;

let q = new cloud.Queue();
let timeout = 3s;
let q = new cloud.Queue(timeout: timeout);

test "pop" {
q.push("Foo", "Bar");

let first = q.pop();
let second = q.pop();

// ensure messages are deleted after timeout
util.sleep(timeout);

let third = q.pop();

// queue is not FIFO
Expand Down
2 changes: 1 addition & 1 deletion libs/wingsdk/src/shared-aws/permissions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export function calculateQueuePermissions(

if (ops.includes(cloud.QueueInflightMethods.POP)) {
policies.push({
actions: ["sqs:ReceiveMessage"],
actions: ["sqs:ReceiveMessage", "sqs:DeleteMessage"],
resources: [arn],
});
}
Expand Down
24 changes: 21 additions & 3 deletions libs/wingsdk/src/shared-aws/queue.inflight.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
GetQueueAttributesCommand,
ReceiveMessageCommand,
InvalidMessageContents,
DeleteMessageCommand,
} from "@aws-sdk/client-sqs";
import { IQueueClient } from "../cloud";

Expand Down Expand Up @@ -57,14 +58,31 @@ export class QueueClient implements IQueueClient {
}

public async pop(): Promise<string | undefined> {
const command = new ReceiveMessageCommand({
const receiveCommand = new ReceiveMessageCommand({
QueueUrl: this.queueUrl,
MaxNumberOfMessages: 1,
});
const data = await this.client.send(command);
const data = await this.client.send(receiveCommand);
if (!data.Messages) {
return undefined;
}
return data.Messages[0].Body;

const message = data.Messages[0];

if (message.ReceiptHandle) {
const deleteCommand = new DeleteMessageCommand({
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle,
});
await this.client.send(deleteCommand);
} else {
console.warn(
`No receipt handle found, message not deleted. Message: ${JSON.stringify(
message
)}`
);
}

return message.Body;
}
}
61 changes: 60 additions & 1 deletion libs/wingsdk/test/shared-aws/queue.inflight.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import {
SQSClient,
ReceiveMessageCommand,
InvalidMessageContents,
DeleteMessageCommand,
ReceiveMessageCommandOutput,
} from "@aws-sdk/client-sqs";
import { mockClient } from "aws-sdk-client-mock";
import { test, expect, beforeEach } from "vitest";
Expand Down Expand Up @@ -166,12 +168,14 @@ test("pop - happy path", async () => {
// GIVEN
const QUEUE_URL = "QUEUE_URL";
const MESSAGE = "MESSAGE";
const ONE_MSG_RESPONSE = {
const ONE_MSG_RESPONSE: ReceiveMessageCommandOutput = {
Messages: [
{
Body: MESSAGE,
ReceiptHandle: "RECEIPT_HANDLE",
},
],
$metadata: {},
};
const NO_MSG_RESPONSE = {};

Expand All @@ -188,4 +192,59 @@ test("pop - happy path", async () => {
// THEN
expect(firstPopResponse).toEqual(MESSAGE);
expect(secondPopResponse).toBeUndefined();
expect(sqsMock).toHaveReceivedCommandTimes(ReceiveMessageCommand, 2);
expect(sqsMock).toHaveReceivedCommandTimes(DeleteMessageCommand, 1);
});

test("pop - happy path w/o message receipt", async () => {
// GIVEN
const QUEUE_URL = "QUEUE_URL";
const MESSAGE = "MESSAGE";
const ONE_MSG_RESPONSE: ReceiveMessageCommandOutput = {
Messages: [
{
Body: MESSAGE,
ReceiptHandle: undefined, // <- no receipt handle, unusual but it's fine
},
],
$metadata: {},
};
const NO_MSG_RESPONSE = {};

sqsMock
.on(ReceiveMessageCommand, { QueueUrl: QUEUE_URL })
.resolvesOnce(ONE_MSG_RESPONSE)
.resolves(NO_MSG_RESPONSE);

// WHEN
const client = new QueueClient(QUEUE_URL);
const firstPopResponse = await client.pop();
const secondPopResponse = await client.pop();

// THEN
expect(firstPopResponse).toEqual(MESSAGE);
expect(secondPopResponse).toBeUndefined();
expect(sqsMock).toHaveReceivedCommandTimes(ReceiveMessageCommand, 2);
expect(sqsMock).toHaveReceivedCommandTimes(DeleteMessageCommand, 0); // <- delete message command skipped
});

test("pop - happy path w/ no message in the queue", async () => {
// GIVEN
const QUEUE_URL = "QUEUE_URL";
const NO_MSG_RESPONSE = {};

sqsMock
.on(ReceiveMessageCommand, { QueueUrl: QUEUE_URL })
.resolves(NO_MSG_RESPONSE);

// WHEN
const client = new QueueClient(QUEUE_URL);
const firstPopResponse = await client.pop();
const secondPopResponse = await client.pop();

// THEN
expect(firstPopResponse).toBeUndefined();
expect(secondPopResponse).toBeUndefined();
expect(sqsMock).toHaveReceivedCommandTimes(ReceiveMessageCommand, 2);
expect(sqsMock).toHaveReceivedCommandTimes(DeleteMessageCommand, 0);
});
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## inflight.$Closure1-1.js
```js
module.exports = function({ $q }) {
module.exports = function({ $q, $timeout, $util_Util }) {
class $Closure1 {
constructor({ }) {
const $obj = (...args) => this.handle(...args);
Expand All @@ -13,6 +13,7 @@ module.exports = function({ $q }) {
(await $q.push("Foo","Bar"));
const first = (await $q.pop());
const second = (await $q.pop());
(await $util_Util.sleep($timeout));
const third = (await $q.pop());
{((cond) => {if (!cond) throw new Error("assertion failed: first == \"Foo\" || first == \"Bar\"")})(((((a,b) => { try { return require('assert').deepStrictEqual(a,b) === undefined; } catch { return false; } })(first,"Foo")) || (((a,b) => { try { return require('assert').deepStrictEqual(a,b) === undefined; } catch { return false; } })(first,"Bar"))))};
{((cond) => {if (!cond) throw new Error("assertion failed: second == \"Foo\" || second == \"Bar\"")})(((((a,b) => { try { return require('assert').deepStrictEqual(a,b) === undefined; } catch { return false; } })(second,"Foo")) || (((a,b) => { try { return require('assert').deepStrictEqual(a,b) === undefined; } catch { return false; } })(second,"Bar"))))};
Expand Down Expand Up @@ -62,7 +63,8 @@ module.exports = function({ $q }) {
"uniqueId": "cloudQueue"
}
},
"name": "cloud-Queue-c86e03d8"
"name": "cloud-Queue-c86e03d8",
"visibility_timeout_seconds": 3
}
}
}
Expand All @@ -77,6 +79,7 @@ const $outdir = process.env.WING_SYNTH_DIR ?? ".";
const $wing_is_test = process.env.WING_IS_TEST === "true";
const std = $stdlib.std;
const cloud = $stdlib.cloud;
const util = $stdlib.util;
class $Root extends $stdlib.std.Resource {
constructor(scope, id) {
super(scope, id);
Expand All @@ -89,6 +92,8 @@ class $Root extends $stdlib.std.Resource {
return `
require("./inflight.$Closure1-1.js")({
$q: ${context._lift(q)},
$timeout: ${context._lift(timeout)},
$util_Util: ${context._lift($stdlib.core.toLiftableModuleType(util.Util, "@winglang/sdk/util", "Util"))},
})
`;
}
Expand All @@ -109,11 +114,13 @@ class $Root extends $stdlib.std.Resource {
_registerBind(host, ops) {
if (ops.includes("handle")) {
$Closure1._registerBindObject(q, host, ["pop", "push"]);
$Closure1._registerBindObject(timeout, host, []);
}
super._registerBind(host, ops);
}
}
const q = this.node.root.newAbstract("@winglang/sdk.cloud.Queue",this,"cloud.Queue");
const timeout = (std.Duration.fromSeconds(3));
const q = this.node.root.newAbstract("@winglang/sdk.cloud.Queue",this,"cloud.Queue",{ timeout: timeout });
this.node.root.new("@winglang/sdk.std.Test",std.Test,this,"test:pop",new $Closure1(this,"$Closure1"));
}
}
Expand Down

0 comments on commit fac3f72

Please sign in to comment.