Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(sdk): update queue.push() to use variadic parameter for messages #3746

Merged
merged 11 commits into from
Aug 17, 2023
8 changes: 4 additions & 4 deletions docs/docs/04-standard-library/01-cloud/queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ new cloud.Queue(props?: QueueProps);
| <code><a href="#@winglang/sdk.cloud.IQueueClient.approxSize">approxSize</a></code> | Retrieve the approximate number of messages in the queue. |
| <code><a href="#@winglang/sdk.cloud.IQueueClient.pop">pop</a></code> | Pop a message from the queue. |
| <code><a href="#@winglang/sdk.cloud.IQueueClient.purge">purge</a></code> | Purge all of the messages in the queue. |
| <code><a href="#@winglang/sdk.cloud.IQueueClient.push">push</a></code> | Push a message to the queue. |
| <code><a href="#@winglang/sdk.cloud.IQueueClient.push">push</a></code> | Push one or more messages to the queue. |

---

Expand Down Expand Up @@ -172,12 +172,12 @@ Purge all of the messages in the queue.
##### `push` <a name="push" id="@winglang/sdk.cloud.IQueueClient.push"></a>

```wing
inflight push(message: str): void
inflight push(messages: str): void
garysassano marked this conversation as resolved.
Show resolved Hide resolved
```

Push a message to the queue.
Push one or more messages to the queue.

###### `message`<sup>Required</sup> <a name="message" id="@winglang/sdk.cloud.IQueueClient.push.parameter.message"></a>
###### `messages`<sup>Required</sup> <a name="messages" id="@winglang/sdk.cloud.IQueueClient.push.parameter.messages"></a>

- *Type:* str

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ source: libs/wingc/src/lsp/completions.rs
kind: 8
documentation:
kind: markdown
value: "```wing\ninterface IQueueClient\n```\n---\nInflight interface for `Queue`.\n### Methods\n- `approxSize` — Retrieve the approximate number of messages in the queue.\n- `pop` — Pop a message from the queue.\n- `purge` — Purge all of the messages in the queue.\n- `push` — Push a message to the queue."
value: "```wing\ninterface IQueueClient\n```\n---\nInflight interface for `Queue`.\n### Methods\n- `approxSize` — Retrieve the approximate number of messages in the queue.\n- `pop` — Pop a message from the queue.\n- `purge` — Purge all of the messages in the queue.\n- `push` — Push one or more messages to the queue."
sortText: ii|IQueueClient
- label: IQueueSetConsumerHandler
kind: 8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ source: libs/wingc/src/lsp/completions.rs
kind: 8
documentation:
kind: markdown
value: "```wing\ninterface IQueueClient\n```\n---\nInflight interface for `Queue`.\n### Methods\n- `approxSize` — Retrieve the approximate number of messages in the queue.\n- `pop` — Pop a message from the queue.\n- `purge` — Purge all of the messages in the queue.\n- `push` — Push a message to the queue."
value: "```wing\ninterface IQueueClient\n```\n---\nInflight interface for `Queue`.\n### Methods\n- `approxSize` — Retrieve the approximate number of messages in the queue.\n- `pop` — Pop a message from the queue.\n- `purge` — Purge all of the messages in the queue.\n- `push` — Push one or more messages to the queue."
sortText: ii|IQueueClient
- label: IQueueSetConsumerHandler
kind: 8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ source: libs/wingc/src/lsp/completions.rs
kind: 8
documentation:
kind: markdown
value: "```wing\ninterface IQueueClient\n```\n---\nInflight interface for `Queue`.\n### Methods\n- `approxSize` — Retrieve the approximate number of messages in the queue.\n- `pop` — Pop a message from the queue.\n- `purge` — Purge all of the messages in the queue.\n- `push` — Push a message to the queue."
value: "```wing\ninterface IQueueClient\n```\n---\nInflight interface for `Queue`.\n### Methods\n- `approxSize` — Retrieve the approximate number of messages in the queue.\n- `pop` — Pop a message from the queue.\n- `purge` — Purge all of the messages in the queue.\n- `push` — Push one or more messages to the queue."
sortText: ii|IQueueClient
- label: IQueueSetConsumerHandler
kind: 8
Expand Down
6 changes: 3 additions & 3 deletions libs/wingsdk/src/cloud/queue.ts
garysassano marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@ export interface QueueSetConsumerProps extends FunctionProps {
*/
export interface IQueueClient {
/**
* Push a message to the queue.
* @param message Payload to send to the queue.
* Push one or more messages to the queue.
* @param messages Payload to send to the queue.
* @inflight
*/
push(message: string): Promise<void>;
push(...messages: string[]): Promise<void>;

/**
* Purge all of the messages in the queue.
Expand Down
14 changes: 9 additions & 5 deletions libs/wingsdk/src/shared-aws/queue.inflight.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@ export class QueueClient implements IQueueClient {
private readonly client: SQSClient = new SQSClient({})
) {}

public async push(message: string): Promise<void> {
const command = new SendMessageCommand({
QueueUrl: this.queueUrl,
MessageBody: message,
public async push(...messages: string[]): Promise<void> {
const messagePromises = messages.map(async (message) => {
const command = new SendMessageCommand({
QueueUrl: this.queueUrl,
MessageBody: message,
});
await this.client.send(command);
});
await this.client.send(command);

await Promise.all(messagePromises);
}

public async purge(): Promise<void> {
Expand Down
10 changes: 6 additions & 4 deletions libs/wingsdk/src/target-sim/queue.inflight.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ export class Queue
this.subscribers.push(s);
}

public async push(message: string): Promise<void> {
// TODO: enforce maximum queue message size?
// TODO: enforce maximum queue message size?
public async push(...messages: string[]): Promise<void> {
return this.context.withTrace({
message: `Push (message=${message}).`,
message: `Push (messages=${messages}).`,
activity: async () => {
this.messages.push(new QueueMessage(this.retentionPeriod, message));
for (const message of messages) {
this.messages.push(new QueueMessage(this.retentionPeriod, message));
}
},
});
}
Expand Down
32 changes: 32 additions & 0 deletions libs/wingsdk/test/shared-aws/queue.inflight.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
import { mockClient } from "aws-sdk-client-mock";
import { test, expect, beforeEach } from "vitest";
import { QueueClient } from "../../src/shared-aws/queue.inflight";
import "aws-sdk-client-mock-jest";

const sqsMock = mockClient(SQSClient);

Expand Down Expand Up @@ -35,6 +36,37 @@ test("push - happy path", async () => {
expect(response).toEqual(undefined);
});

test("push batch - happy path", async () => {
// GIVEN
const QUEUE_URL = "QUEUE_URL";
const MESSAGES = ["MESSAGE1", "MESSAGE2", "MESSAGE3"];
const RESPONSE = {
MessageId: "MESSAGE_ID",
};

sqsMock.on(SendMessageCommand).resolves(RESPONSE);

// WHEN
const client = new QueueClient(QUEUE_URL);
const response = await client.push(...MESSAGES);

// THEN
expect(response).toEqual(undefined);
expect(sqsMock).toHaveReceivedCommandTimes(SendMessageCommand, 3);
expect(sqsMock).toHaveReceivedNthCommandWith(1, SendMessageCommand, {
QueueUrl: QUEUE_URL,
MessageBody: MESSAGES[0],
});
expect(sqsMock).toHaveReceivedNthCommandWith(2, SendMessageCommand, {
QueueUrl: QUEUE_URL,
MessageBody: MESSAGES[1],
});
expect(sqsMock).toHaveReceivedNthCommandWith(3, SendMessageCommand, {
QueueUrl: QUEUE_URL,
MessageBody: MESSAGES[2],
});
});

test("purge - happy path", async () => {
// GIVEN
const QUEUE_URL = "QUEUE_URL";
Expand Down
13 changes: 6 additions & 7 deletions libs/wingsdk/test/target-sim/__snapshots__/queue.test.ts.snap
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ exports[`messages are not requeued if the function fails after retention timeout
"wingsdk.cloud.Queue created.",
"wingsdk.sim.EventMapping created.",
"Invoke (payload=\\"{\\\\\\"messages\\\\\\":[]}\\").",
"Push (message=BAD MESSAGE).",
"Push (messages=BAD MESSAGE).",
"Sending messages (messages=[\\"BAD MESSAGE\\"], subscriber=sim-1).",
"Invoke (payload=\\"{\\\\\\"messages\\\\\\":[\\\\\\"BAD MESSAGE\\\\\\"]}\\").",
"Subscriber error - returning 1 messages to queue: ERROR",
Expand Down Expand Up @@ -310,7 +310,7 @@ exports[`messages are not requeued if the function fails before timeout 1`] = `
"wingsdk.cloud.Queue created.",
"wingsdk.sim.EventMapping created.",
"Invoke (payload=\\"{\\\\\\"messages\\\\\\":[]}\\").",
"Push (message=BAD MESSAGE).",
"Push (messages=BAD MESSAGE).",
"Sending messages (messages=[\\"BAD MESSAGE\\"], subscriber=sim-1).",
"Invoke (payload=\\"{\\\\\\"messages\\\\\\":[\\\\\\"BAD MESSAGE\\\\\\"]}\\").",
"Subscriber error - returning 1 messages to queue: ERROR",
Expand Down Expand Up @@ -542,7 +542,7 @@ exports[`messages are requeued if the function fails after timeout 1`] = `
"wingsdk.cloud.Queue created.",
"wingsdk.sim.EventMapping created.",
"Invoke (payload=\\"{\\\\\\"messages\\\\\\":[]}\\").",
"Push (message=BAD MESSAGE).",
"Push (messages=BAD MESSAGE).",
"Sending messages (messages=[\\"BAD MESSAGE\\"], subscriber=sim-1).",
"Invoke (payload=\\"{\\\\\\"messages\\\\\\":[\\\\\\"BAD MESSAGE\\\\\\"]}\\").",
"Subscriber error - returning 1 messages to queue: ERROR",
Expand Down Expand Up @@ -775,8 +775,8 @@ exports[`queue batch size of 2, purge the queue 1`] = `
[
"wingsdk.cloud.TestRunner created.",
"wingsdk.cloud.Queue created.",
"Push (message=A).",
"Push (message=B).",
"Push (messages=A).",
"Push (messages=B).",
"ApproxSize ().",
"Purge ().",
"ApproxSize ().",
Expand Down Expand Up @@ -1225,8 +1225,7 @@ exports[`queue with one subscriber, default batch size of 1 1`] = `
"wingsdk.cloud.Function created.",
"wingsdk.cloud.Queue created.",
"wingsdk.sim.EventMapping created.",
"Push (message=A).",
"Push (message=B).",
"Push (messages=A,B).",
"Sending messages (messages=[\\"A\\"], subscriber=sim-1).",
"Sending messages (messages=[\\"B\\"], subscriber=sim-1).",
"wingsdk.sim.EventMapping deleted.",
Expand Down
7 changes: 3 additions & 4 deletions libs/wingsdk/test/target-sim/queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ test("queue with one subscriber, default batch size of 1", async () => {
const queueClient = s.getResource("/my_queue") as cloud.IQueueClient;

// WHEN
await queueClient.push("A");
await queueClient.push("B");
await queueClient.push("A", "B");

// TODO: queueClient.awaitMessages(2) or queueClient.untilEmpty() or something
await sleep(200);
Expand Down Expand Up @@ -204,7 +203,7 @@ test("messages are not requeued if the function fails before timeout", async ()
.map((trace) => trace.data.message)
).toEqual([
"wingsdk.cloud.Queue created.",
"Push (message=BAD MESSAGE).",
"Push (messages=BAD MESSAGE).",
'Sending messages (messages=["BAD MESSAGE"], subscriber=sim-1).',
"Subscriber error - returning 1 messages to queue: ERROR",
"wingsdk.cloud.Queue deleted.",
Expand Down Expand Up @@ -247,7 +246,7 @@ test("messages are not requeued if the function fails after retention timeout",
.map((trace) => trace.data.message)
).toEqual([
"wingsdk.cloud.Queue created.",
"Push (message=BAD MESSAGE).",
"Push (messages=BAD MESSAGE).",
'Sending messages (messages=["BAD MESSAGE"], subscriber=sim-1).',
"Subscriber error - returning 1 messages to queue: ERROR",
"wingsdk.cloud.Queue deleted.",
Expand Down
Loading