From 42837c18cf8d82bb9e5053d5d71f4fcb9f8124fb Mon Sep 17 00:00:00 2001 From: Icebob Date: Sun, 23 Apr 2023 17:25:17 +0200 Subject: [PATCH] support Redis capped streams --- .vscode/launch.json | 4 ++-- README.md | 18 ++++++++++++++++++ examples/simple/index.js | 4 ++-- src/adapters/redis.js | 22 +++++++++++++++++----- 4 files changed, 39 insertions(+), 9 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 68b327f..27f86cf 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -8,11 +8,11 @@ { "type": "node", "request": "launch", - "name": "Launch selected demo", + "name": "Launch demo", "program": "examples/index.js", "cwd": "${workspaceRoot}", "args": [ - "context" + "simple" ], "console": "integratedTerminal", "env": { diff --git a/README.md b/README.md index 6b4609f..4178832 100644 --- a/README.md +++ b/README.md @@ -271,6 +271,7 @@ Use the `broker.sendToChannel(channelName, payload, opts)` method to send a mess | `acks` | `Number` | Kafka | Control the number of required acks. | | `timeout` | `Number` | Kafka | The time to await a response in ms. Default: `30000` | | `compression` | `any` | Kafka | Compression codec. Default: `CompressionTypes.None` | +| `xaddMaxLen` | `Number` or `String` | Redis | Define `MAXLEN` for `XADD` command | ## Middleware hooks @@ -621,6 +622,23 @@ module.exports = { }; ``` +#### Capped Streams + +To support Redis ["capped streams"](https://redis.io/docs/data-types/streams-tutorial/#capped-streams), you can define the `MAXLEN` value in `sendToChannel` options as `xaddMaxLen`. It can be a number or a string with `~` prefix like `~1000`. It will be transformed to `...MAXLEN ~ 1000 ...` + +**Example** + +```js +broker.sendToChannel("order.created", { + id: 1234, + items: [ + /*...*/ + ] +},{ + xaddMaxLen: "~1000" +}); +``` + ### AMQP (RabbitMQ) The AMQP adapter uses the exchange-queue logic of RabbitMQ for creating consumer groups. It means the `sendToChannel` method sends the message to the exchange and not for a queue. diff --git a/examples/simple/index.js b/examples/simple/index.js index 106ec36..7bbd20d 100644 --- a/examples/simple/index.js +++ b/examples/simple/index.js @@ -14,7 +14,7 @@ const broker = new ServiceBroker({ }, middlewares: [ ChannelsMiddleware({ - adapter: process.env.ADAPTER || "Fake" + adapter: process.env.ADAPTER || "redis://localhost:6379" }) ], replCommands: [ @@ -33,7 +33,7 @@ const broker = new ServiceBroker({ count: ++c, pid: process.pid }, - { key: "" + c, headers: { a: "something" } } + { key: "" + c, headers: { a: "something" }, xaddMaxLen: "~10" } ); } }, diff --git a/src/adapters/redis.js b/src/adapters/redis.js index d2956e9..3f24754 100644 --- a/src/adapters/redis.js +++ b/src/adapters/redis.js @@ -688,15 +688,27 @@ class RedisAdapter extends BaseAdapter { try { let args = [ - channelName, // Stream name - "*", // Auto ID - "payload", // Entry - opts.raw ? payload : this.serializer.serialize(payload) // Actual payload + channelName // Stream name ]; + // Support Capped Streams. More info: https://redis.io/docs/data-types/streams-tutorial/#capped-streams + if (opts && opts.xaddMaxLen) { + const maxLen = "" + opts.xaddMaxLen; + if (maxLen.startsWith("~")) { + args.push("MAXLEN", "~", maxLen.substring(1)); + } else { + args.push("MAXLEN", maxLen); + } + } + + // Auto ID + args.push("*"); + // Add payload + args.push("payload", opts.raw ? payload : this.serializer.serialize(payload)); + // Add headers if (opts.headers) { - args.push(...["headers", this.serializer.serialize(opts.headers)]); + args.push("headers", this.serializer.serialize(opts.headers)); } // https://redis.io/commands/XADD