Skip to content

Commit

Permalink
fix(sdk): retentionPeriod doesn't work for Queue in target sim (#4273)
Browse files Browse the repository at this point in the history
This fixes the issue #4233

Fixed comparing Duration instances, so the constructor throws a compilation error when the `retentionPeriod` is lower than the `timeout`.
Added iterating over the messages and removing expired ones on each `processMessages` call.

## Checklist

- [x] Title matches [Winglang's style guide](https://www.winglang.io/contributing/start-here/pull_requests#how-are-pull-request-titles-formatted)
- [x] Description explains motivation and solution
- [x] Tests added (always)
- [x] Docs updated (only required for features)
- [ ] Added `pr/e2e-full` label if this feature requires end-to-end testing

*By submitting this pull request, I confirm that my contribution is made under the terms of the [Wing Cloud Contribution License](https://github.com/winglang/wing/blob/main/CONTRIBUTION_LICENSE.md)*.
  • Loading branch information
gcfbn authored Sep 26, 2023
1 parent 9be4c9d commit 6a18060
Show file tree
Hide file tree
Showing 9 changed files with 204 additions and 2 deletions.
15 changes: 15 additions & 0 deletions examples/tests/sdk_tests/queue/retention_period.main.w
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
bring cloud;
bring util;

let var timeout = 100ms;
let var retentionPeriod = 1s;

let q = new cloud.Queue(timeout: timeout, retentionPeriod: retentionPeriod);

test "retentionPeriod" {
q.push("hello", "world");

assert(util.waitUntil(() => {
return q.approxSize() == 0;
}));
}
10 changes: 10 additions & 0 deletions examples/tests/sdk_tests/std/duration.test.w
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ test "duration" {
assert(1s.days == 1 / (60 * 60 * 24));
assert(1s.months == 1 / ((60 * 60 * 24 * 365) / 12));
assert(1s.years == 1 / (60 * 60 * 24 * 365));

assert(1s == 1000ms);
assert(60s == 1m);
assert(60m == 1h);
assert(24h == 1d);
assert(365d == 1y);
assert(12mo == 1y);
assert(3600s == 1h);
assert(86400s == 1d);
assert(31536000s == 1y);
}

// TODO: https://github.com/winglang/wing/issues/2785
10 changes: 10 additions & 0 deletions libs/wingsdk/src/target-sim/queue.inflight.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ export class Queue
let processedMessages = false;
do {
processedMessages = false;
// Remove messages that have expired
const currentTime = new Date();
this.messages.forEach(async (message, index) => {
if (message.retentionTimeout < currentTime) {
await this.context.withTrace({
activity: async () => this.messages.splice(index, 1),
message: `Removing expired message (message=${message.payload}).`,
});
}
});
// Randomize the order of subscribers to avoid user code making
// assumptions on the order that subscribers process messages.
for (const subscriber of new RandomArrayIterator(this.subscribers)) {
Expand Down
2 changes: 1 addition & 1 deletion libs/wingsdk/src/target-sim/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export class Queue extends cloud.Queue implements ISimulatorResource {
this.timeout = props.timeout ?? Duration.fromSeconds(10);
this.retentionPeriod = props.retentionPeriod ?? Duration.fromHours(1);

if (this.retentionPeriod < this.timeout) {
if (this.retentionPeriod.seconds < this.timeout.seconds) {
throw new Error(
"Retention period must be greater than or equal to timeout"
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ exports[`messages are not requeued if the function fails after retention timeout
"Sending messages (messages=[\\"BAD MESSAGE\\"], subscriber=sim-1).",
"Invoke (payload=\\"{\\\\\\"messages\\\\\\":[\\\\\\"BAD MESSAGE\\\\\\"]}\\").",
"Subscriber error - returning 1 messages to queue: ERROR",
"1 messages pushed back to queue after visibility timeout.",
"wingsdk.sim.EventMapping deleted.",
"wingsdk.cloud.Queue deleted.",
"wingsdk.cloud.Function deleted.",
Expand Down Expand Up @@ -153,7 +154,7 @@ async handle(message) {
"path": "root/my_queue",
"props": {
"retentionPeriod": 1,
"timeout": 10,
"timeout": 0.1,
},
"type": "wingsdk.cloud.Queue",
},
Expand Down
17 changes: 17 additions & 0 deletions libs/wingsdk/test/target-sim/queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,21 @@ async handle(message) {
}
}`;

test("try to create a queue with invalid retention period", async () => {
// GIVEN
const app = new SimApp();
const retentionPeriod = Duration.fromSeconds(5);
const timeout = Duration.fromSeconds(10);

// THEN
expect(() => {
cloud.Queue._newQueue(app, "my_queue", {
retentionPeriod,
timeout,
});
}).toThrowError("Retention period must be greater than or equal to timeout");
});

test("create a queue", async () => {
// GIVEN
const app = new SimApp();
Expand Down Expand Up @@ -232,6 +247,7 @@ test("messages are not requeued if the function fails after retention timeout",
const handler = Testing.makeHandler(app, "Handler", INFLIGHT_CODE);
const queue = cloud.Queue._newQueue(app, "my_queue", {
retentionPeriod: Duration.fromSeconds(1),
timeout: Duration.fromMilliseconds(100),
});
queue.setConsumer(handler);
const s = await app.startSimulator();
Expand Down Expand Up @@ -263,6 +279,7 @@ test("messages are not requeued if the function fails after retention timeout",
"Push (messages=BAD MESSAGE).",
"Sending messages (messages=[\\"BAD MESSAGE\\"], subscriber=sim-1).",
"Subscriber error - returning 1 messages to queue: ERROR",
"1 messages pushed back to queue after visibility timeout.",
"wingsdk.cloud.Queue deleted.",
]
`);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# [retention_period.main.w](../../../../../../examples/tests/sdk_tests/queue/retention_period.main.w) | compile | tf-aws

## inflight.$Closure1-1.js
```js
module.exports = function({ $q, $util_Util }) {
class $Closure1 {
constructor({ }) {
const $obj = (...args) => this.handle(...args);
Object.setPrototypeOf($obj, this);
return $obj;
}
async handle() {
(await $q.push("hello","world"));
{((cond) => {if (!cond) throw new Error("assertion failed: util.waitUntil(() => {\n return q.approxSize() == 0;\n })")})((await $util_Util.waitUntil(async () => {
return (((a,b) => { try { return require('assert').deepStrictEqual(a,b) === undefined; } catch { return false; } })((await $q.approxSize()),0));
}
)))};
}
}
return $Closure1;
}

```

## main.tf.json
```json
{
"//": {
"metadata": {
"backend": "local",
"stackName": "root",
"version": "0.17.0"
},
"outputs": {
"root": {
"Default": {
"cloud.TestRunner": {
"TestFunctionArns": "WING_TEST_RUNNER_FUNCTION_ARNS"
}
}
}
}
},
"output": {
"WING_TEST_RUNNER_FUNCTION_ARNS": {
"value": "[]"
}
},
"provider": {
"aws": [
{}
]
},
"resource": {
"aws_sqs_queue": {
"cloudQueue": {
"//": {
"metadata": {
"path": "root/Default/Default/cloud.Queue/Default",
"uniqueId": "cloudQueue"
}
},
"message_retention_seconds": 1,
"name": "cloud-Queue-c86e03d8",
"visibility_timeout_seconds": 0.1
}
}
}
}
```

## preflight.js
```js
const $stdlib = require('@winglang/sdk');
const $plugins = ((s) => !s ? [] : s.split(';'))(process.env.WING_PLUGIN_PATHS);
const $outdir = process.env.WING_SYNTH_DIR ?? ".";
const $wing_is_test = process.env.WING_IS_TEST === "true";
const std = $stdlib.std;
const cloud = $stdlib.cloud;
const util = $stdlib.util;
class $Root extends $stdlib.std.Resource {
constructor(scope, id) {
super(scope, id);
class $Closure1 extends $stdlib.std.Resource {
constructor(scope, id, ) {
super(scope, id);
(std.Node.of(this)).hidden = true;
}
static _toInflightType(context) {
return `
require("./inflight.$Closure1-1.js")({
$q: ${context._lift(q)},
$util_Util: ${context._lift($stdlib.core.toLiftableModuleType(util.Util, "@winglang/sdk/util", "Util"))},
})
`;
}
_toInflight() {
return `
(await (async () => {
const $Closure1Client = ${$Closure1._toInflightType(this)};
const client = new $Closure1Client({
});
if (client.$inflight_init) { await client.$inflight_init(); }
return client;
})())
`;
}
_getInflightOps() {
return ["handle", "$inflight_init"];
}
_registerBind(host, ops) {
if (ops.includes("handle")) {
$Closure1._registerBindObject(q, host, ["approxSize", "push"]);
}
super._registerBind(host, ops);
}
}
let timeout = (std.Duration.fromSeconds(0.1));
let retentionPeriod = (std.Duration.fromSeconds(1));
const q = this.node.root.newAbstract("@winglang/sdk.cloud.Queue",this,"cloud.Queue",{ timeout: timeout, retentionPeriod: retentionPeriod });
this.node.root.new("@winglang/sdk.std.Test",std.Test,this,"test:retentionPeriod",new $Closure1(this,"$Closure1"));
}
}
const $App = $stdlib.core.App.for(process.env.WING_TARGET);
new $App({ outdir: $outdir, name: "retention_period.main", rootConstruct: $Root, plugins: $plugins, isTestEnvironment: $wing_is_test, entrypointDir: process.env['WING_SOURCE_DIR'], rootId: process.env['WING_ROOT_ID'] }).synth();

```
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# [retention_period.main.w](../../../../../../examples/tests/sdk_tests/queue/retention_period.main.w) | test | sim

## stdout.log
```log
pass ─ retention_period.main.wsim » root/env0/test:retentionPeriod
Tests 1 passed (1)
Test Files 1 passed (1)
Duration <DURATION>
```

Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ module.exports = function({ $std_Duration }) {
{((cond) => {if (!cond) throw new Error("assertion failed: 1s.days == 1 / (60 * 60 * 24)")})((((a,b) => { try { return require('assert').deepStrictEqual(a,b) === undefined; } catch { return false; } })((await $std_Duration.fromSeconds(1)).days,(1 / ((60 * 60) * 24)))))};
{((cond) => {if (!cond) throw new Error("assertion failed: 1s.months == 1 / ((60 * 60 * 24 * 365) / 12)")})((((a,b) => { try { return require('assert').deepStrictEqual(a,b) === undefined; } catch { return false; } })((await $std_Duration.fromSeconds(1)).months,(1 / ((((60 * 60) * 24) * 365) / 12)))))};
{((cond) => {if (!cond) throw new Error("assertion failed: 1s.years == 1 / (60 * 60 * 24 * 365)")})((((a,b) => { try { return require('assert').deepStrictEqual(a,b) === undefined; } catch { return false; } })((await $std_Duration.fromSeconds(1)).years,(1 / (((60 * 60) * 24) * 365)))))};
{((cond) => {if (!cond) throw new Error("assertion failed: 1s == 1000ms")})((((a,b) => { try { return require('assert').deepStrictEqual(a,b) === undefined; } catch { return false; } })((await $std_Duration.fromSeconds(1)),(await $std_Duration.fromSeconds(1)))))};
{((cond) => {if (!cond) throw new Error("assertion failed: 60s == 1m")})((((a,b) => { try { return require('assert').deepStrictEqual(a,b) === undefined; } catch { return false; } })((await $std_Duration.fromSeconds(60)),(await $std_Duration.fromSeconds(60)))))};
{((cond) => {if (!cond) throw new Error("assertion failed: 60m == 1h")})((((a,b) => { try { return require('assert').deepStrictEqual(a,b) === undefined; } catch { return false; } })((await $std_Duration.fromSeconds(3600)),(await $std_Duration.fromSeconds(3600)))))};
{((cond) => {if (!cond) throw new Error("assertion failed: 24h == 1d")})((((a,b) => { try { return require('assert').deepStrictEqual(a,b) === undefined; } catch { return false; } })((await $std_Duration.fromSeconds(86400)),(await $std_Duration.fromSeconds(86400)))))};
{((cond) => {if (!cond) throw new Error("assertion failed: 365d == 1y")})((((a,b) => { try { return require('assert').deepStrictEqual(a,b) === undefined; } catch { return false; } })((await $std_Duration.fromSeconds(31536000)),(await $std_Duration.fromSeconds(31536000)))))};
{((cond) => {if (!cond) throw new Error("assertion failed: 12mo == 1y")})((((a,b) => { try { return require('assert').deepStrictEqual(a,b) === undefined; } catch { return false; } })((await $std_Duration.fromSeconds(31536000)),(await $std_Duration.fromSeconds(31536000)))))};
{((cond) => {if (!cond) throw new Error("assertion failed: 3600s == 1h")})((((a,b) => { try { return require('assert').deepStrictEqual(a,b) === undefined; } catch { return false; } })((await $std_Duration.fromSeconds(3600)),(await $std_Duration.fromSeconds(3600)))))};
{((cond) => {if (!cond) throw new Error("assertion failed: 86400s == 1d")})((((a,b) => { try { return require('assert').deepStrictEqual(a,b) === undefined; } catch { return false; } })((await $std_Duration.fromSeconds(86400)),(await $std_Duration.fromSeconds(86400)))))};
{((cond) => {if (!cond) throw new Error("assertion failed: 31536000s == 1y")})((((a,b) => { try { return require('assert').deepStrictEqual(a,b) === undefined; } catch { return false; } })((await $std_Duration.fromSeconds(31536000)),(await $std_Duration.fromSeconds(31536000)))))};
}
}
return $Closure1;
Expand Down

0 comments on commit 6a18060

Please sign in to comment.