Skip to content

Commit

Permalink
chore(sdk): update queue.push() to use variadic parameter for messa…
Browse files Browse the repository at this point in the history
…ges (#3746)

Closes #2278

*By submitting this pull request, I confirm that my contribution is made under the terms of the [Wing Cloud Contribution License](https://github.com/winglang/wing/blob/main/CONTRIBUTION_LICENSE.md)*.
  • Loading branch information
garysassano authored Aug 17, 2023
1 parent d980a25 commit 37f65dc
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 47 deletions.
8 changes: 4 additions & 4 deletions docs/docs/04-standard-library/01-cloud/queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ new cloud.Queue(props?: QueueProps);
| <code><a href="#@winglang/sdk.cloud.IQueueClient.approxSize">approxSize</a></code> | Retrieve the approximate number of messages in the queue. |
| <code><a href="#@winglang/sdk.cloud.IQueueClient.pop">pop</a></code> | Pop a message from the queue. |
| <code><a href="#@winglang/sdk.cloud.IQueueClient.purge">purge</a></code> | Purge all of the messages in the queue. |
| <code><a href="#@winglang/sdk.cloud.IQueueClient.push">push</a></code> | Push a message to the queue. |
| <code><a href="#@winglang/sdk.cloud.IQueueClient.push">push</a></code> | Push one or more messages to the queue. |

---

Expand Down Expand Up @@ -172,12 +172,12 @@ Purge all of the messages in the queue.
##### `push` <a name="push" id="@winglang/sdk.cloud.IQueueClient.push"></a>

```wing
inflight push(message: str): void
inflight push(messages: str): void
```

Push a message to the queue.
Push one or more messages to the queue.

###### `message`<sup>Required</sup> <a name="message" id="@winglang/sdk.cloud.IQueueClient.push.parameter.message"></a>
###### `messages`<sup>Required</sup> <a name="messages" id="@winglang/sdk.cloud.IQueueClient.push.parameter.messages"></a>

- *Type:* str

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ source: libs/wingc/src/lsp/completions.rs
kind: 8
documentation:
kind: markdown
value: "```wing\ninterface IQueueClient\n```\n---\nInflight interface for `Queue`.\n### Methods\n- `approxSize` — Retrieve the approximate number of messages in the queue.\n- `pop` — Pop a message from the queue.\n- `purge` — Purge all of the messages in the queue.\n- `push` — Push a message to the queue."
value: "```wing\ninterface IQueueClient\n```\n---\nInflight interface for `Queue`.\n### Methods\n- `approxSize` — Retrieve the approximate number of messages in the queue.\n- `pop` — Pop a message from the queue.\n- `purge` — Purge all of the messages in the queue.\n- `push` — Push one or more messages to the queue."
sortText: ii|IQueueClient
- label: IQueueSetConsumerHandler
kind: 8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ source: libs/wingc/src/lsp/completions.rs
kind: 8
documentation:
kind: markdown
value: "```wing\ninterface IQueueClient\n```\n---\nInflight interface for `Queue`.\n### Methods\n- `approxSize` — Retrieve the approximate number of messages in the queue.\n- `pop` — Pop a message from the queue.\n- `purge` — Purge all of the messages in the queue.\n- `push` — Push a message to the queue."
value: "```wing\ninterface IQueueClient\n```\n---\nInflight interface for `Queue`.\n### Methods\n- `approxSize` — Retrieve the approximate number of messages in the queue.\n- `pop` — Pop a message from the queue.\n- `purge` — Purge all of the messages in the queue.\n- `push` — Push one or more messages to the queue."
sortText: ii|IQueueClient
- label: IQueueSetConsumerHandler
kind: 8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ source: libs/wingc/src/lsp/completions.rs
kind: 8
documentation:
kind: markdown
value: "```wing\ninterface IQueueClient\n```\n---\nInflight interface for `Queue`.\n### Methods\n- `approxSize` — Retrieve the approximate number of messages in the queue.\n- `pop` — Pop a message from the queue.\n- `purge` — Purge all of the messages in the queue.\n- `push` — Push a message to the queue."
value: "```wing\ninterface IQueueClient\n```\n---\nInflight interface for `Queue`.\n### Methods\n- `approxSize` — Retrieve the approximate number of messages in the queue.\n- `pop` — Pop a message from the queue.\n- `purge` — Purge all of the messages in the queue.\n- `push` — Push one or more messages to the queue."
sortText: ii|IQueueClient
- label: IQueueSetConsumerHandler
kind: 8
Expand Down
6 changes: 3 additions & 3 deletions libs/wingsdk/src/cloud/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@ export interface QueueSetConsumerProps extends FunctionProps {
*/
export interface IQueueClient {
/**
* Push a message to the queue.
* @param message Payload to send to the queue.
* Push one or more messages to the queue.
* @param messages Payload to send to the queue.
* @inflight
*/
push(message: string): Promise<void>;
push(...messages: string[]): Promise<void>;

/**
* Purge all of the messages in the queue.
Expand Down
36 changes: 20 additions & 16 deletions libs/wingsdk/src/shared-aws/queue.inflight.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,27 @@ export class QueueClient implements IQueueClient {
private readonly client: SQSClient = new SQSClient({})
) {}

public async push(message: string): Promise<void> {
try {
const command = new SendMessageCommand({
QueueUrl: this.queueUrl,
MessageBody: message,
});
await this.client.send(command);
} catch (e) {
if (e instanceof InvalidMessageContents) {
throw new Error(
`The message contains characters outside the allowed set (message=${message}): ${
(e as Error).stack
})}`
);
public async push(...messages: string[]): Promise<void> {
const messagePromises = messages.map(async (message) => {
try {
const command = new SendMessageCommand({
QueueUrl: this.queueUrl,
MessageBody: message,
});
await this.client.send(command);
} catch (e) {
if (e instanceof InvalidMessageContents) {
throw new Error(
`The message contains characters outside the allowed set (message=${message}): ${
(e as Error).stack
})}`
);
}
throw new Error((e as Error).stack);
}
throw new Error((e as Error).stack);
}
});

await Promise.all(messagePromises);
}

public async purge(): Promise<void> {
Expand Down
10 changes: 6 additions & 4 deletions libs/wingsdk/src/target-sim/queue.inflight.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ export class Queue
this.subscribers.push(s);
}

public async push(message: string): Promise<void> {
// TODO: enforce maximum queue message size?
// TODO: enforce maximum queue message size?
public async push(...messages: string[]): Promise<void> {
return this.context.withTrace({
message: `Push (message=${message}).`,
message: `Push (messages=${messages}).`,
activity: async () => {
this.messages.push(new QueueMessage(this.retentionPeriod, message));
for (const message of messages) {
this.messages.push(new QueueMessage(this.retentionPeriod, message));
}
},
});
}
Expand Down
32 changes: 32 additions & 0 deletions libs/wingsdk/test/shared-aws/queue.inflight.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
import { mockClient } from "aws-sdk-client-mock";
import { test, expect, beforeEach } from "vitest";
import { QueueClient } from "../../src/shared-aws/queue.inflight";
import "aws-sdk-client-mock-jest";

const sqsMock = mockClient(SQSClient);

Expand Down Expand Up @@ -36,6 +37,37 @@ test("push - happy path", async () => {
expect(response).toEqual(undefined);
});

test("push batch - happy path", async () => {
// GIVEN
const QUEUE_URL = "QUEUE_URL";
const MESSAGES = ["MESSAGE1", "MESSAGE2", "MESSAGE3"];
const RESPONSE = {
MessageId: "MESSAGE_ID",
};

sqsMock.on(SendMessageCommand).resolves(RESPONSE);

// WHEN
const client = new QueueClient(QUEUE_URL);
const response = await client.push(...MESSAGES);

// THEN
expect(response).toEqual(undefined);
expect(sqsMock).toHaveReceivedCommandTimes(SendMessageCommand, 3);
expect(sqsMock).toHaveReceivedNthCommandWith(1, SendMessageCommand, {
QueueUrl: QUEUE_URL,
MessageBody: MESSAGES[0],
});
expect(sqsMock).toHaveReceivedNthCommandWith(2, SendMessageCommand, {
QueueUrl: QUEUE_URL,
MessageBody: MESSAGES[1],
});
expect(sqsMock).toHaveReceivedNthCommandWith(3, SendMessageCommand, {
QueueUrl: QUEUE_URL,
MessageBody: MESSAGES[2],
});
});

test("push - sad path invalid message", async () => {
// GIVEN
const QUEUE_URL = "QUEUE_URL";
Expand Down
25 changes: 12 additions & 13 deletions libs/wingsdk/test/target-sim/__snapshots__/queue.test.ts.snap
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ exports[`messages are not requeued if the function fails after retention timeout
"wingsdk.cloud.Queue created.",
"wingsdk.sim.EventMapping created.",
"Invoke (payload=\\"{\\\\\\"messages\\\\\\":[]}\\").",
"Push (message=BAD MESSAGE).",
"Push (messages=BAD MESSAGE).",
"Sending messages (messages=[\\"BAD MESSAGE\\"], subscriber=sim-1).",
"Invoke (payload=\\"{\\\\\\"messages\\\\\\":[\\\\\\"BAD MESSAGE\\\\\\"]}\\").",
"Subscriber error - returning 1 messages to queue: ERROR",
Expand Down Expand Up @@ -271,7 +271,7 @@ exports[`messages are not requeued if the function fails before timeout 1`] = `
"wingsdk.cloud.Queue created.",
"wingsdk.sim.EventMapping created.",
"Invoke (payload=\\"{\\\\\\"messages\\\\\\":[]}\\").",
"Push (message=BAD MESSAGE).",
"Push (messages=BAD MESSAGE).",
"Sending messages (messages=[\\"BAD MESSAGE\\"], subscriber=sim-1).",
"Invoke (payload=\\"{\\\\\\"messages\\\\\\":[\\\\\\"BAD MESSAGE\\\\\\"]}\\").",
"Subscriber error - returning 1 messages to queue: ERROR",
Expand Down Expand Up @@ -466,7 +466,7 @@ exports[`messages are requeued if the function fails after timeout 1`] = `
"wingsdk.cloud.Queue created.",
"wingsdk.sim.EventMapping created.",
"Invoke (payload=\\"{\\\\\\"messages\\\\\\":[]}\\").",
"Push (message=BAD MESSAGE).",
"Push (messages=BAD MESSAGE).",
"Sending messages (messages=[\\"BAD MESSAGE\\"], subscriber=sim-1).",
"Invoke (payload=\\"{\\\\\\"messages\\\\\\":[\\\\\\"BAD MESSAGE\\\\\\"]}\\").",
"Subscriber error - returning 1 messages to queue: ERROR",
Expand Down Expand Up @@ -662,8 +662,8 @@ exports[`queue batch size of 2, purge the queue 1`] = `
[
"wingsdk.cloud.TestRunner created.",
"wingsdk.cloud.Queue created.",
"Push (message=A).",
"Push (message=B).",
"Push (messages=A).",
"Push (messages=B).",
"ApproxSize ().",
"Purge ().",
"ApproxSize ().",
Expand Down Expand Up @@ -748,12 +748,12 @@ exports[`queue with one subscriber, batch size of 5 1`] = `
"wingsdk.cloud.Queue created.",
"wingsdk.sim.EventMapping created.",
"wingsdk.cloud.Function created.",
"Push (message=A).",
"Push (message=B).",
"Push (message=C).",
"Push (message=D).",
"Push (message=E).",
"Push (message=F).",
"Push (messages=A).",
"Push (messages=B).",
"Push (messages=C).",
"Push (messages=D).",
"Push (messages=E).",
"Push (messages=F).",
"Invoke (payload=\\"\\").",
"OnDeploy invoked.",
"wingsdk.cloud.OnDeploy created.",
Expand Down Expand Up @@ -1049,8 +1049,7 @@ exports[`queue with one subscriber, default batch size of 1 1`] = `
"wingsdk.cloud.Function created.",
"wingsdk.cloud.Queue created.",
"wingsdk.sim.EventMapping created.",
"Push (message=A).",
"Push (message=B).",
"Push (messages=A,B).",
"Sending messages (messages=[\\"A\\"], subscriber=sim-1).",
"Sending messages (messages=[\\"B\\"], subscriber=sim-1).",
"wingsdk.sim.EventMapping deleted.",
Expand Down
7 changes: 3 additions & 4 deletions libs/wingsdk/test/target-sim/queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ test("queue with one subscriber, default batch size of 1", async () => {
const queueClient = s.getResource("/my_queue") as cloud.IQueueClient;

// WHEN
await queueClient.push("A");
await queueClient.push("B");
await queueClient.push("A", "B");

// TODO: queueClient.awaitMessages(2) or queueClient.untilEmpty() or something
await sleep(200);
Expand Down Expand Up @@ -204,7 +203,7 @@ test("messages are not requeued if the function fails before timeout", async ()
.map((trace) => trace.data.message)
).toEqual([
"wingsdk.cloud.Queue created.",
"Push (message=BAD MESSAGE).",
"Push (messages=BAD MESSAGE).",
'Sending messages (messages=["BAD MESSAGE"], subscriber=sim-1).',
"Subscriber error - returning 1 messages to queue: ERROR",
"wingsdk.cloud.Queue deleted.",
Expand Down Expand Up @@ -247,7 +246,7 @@ test("messages are not requeued if the function fails after retention timeout",
.map((trace) => trace.data.message)
).toEqual([
"wingsdk.cloud.Queue created.",
"Push (message=BAD MESSAGE).",
"Push (messages=BAD MESSAGE).",
'Sending messages (messages=["BAD MESSAGE"], subscriber=sim-1).',
"Subscriber error - returning 1 messages to queue: ERROR",
"wingsdk.cloud.Queue deleted.",
Expand Down

0 comments on commit 37f65dc

Please sign in to comment.