Skip to content

Commit

Permalink
Merge pull request #63 from Storm8719/assert-exchange-before-publish-…
Browse files Browse the repository at this point in the history
…opts

Add options for enable one-time assertExchange calling
  • Loading branch information
icebob authored Mar 1, 2023
2 parents 88e1cb8 + cba1805 commit 8f248f0
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 15 deletions.
63 changes: 48 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,20 +255,22 @@ Use the `broker.sendToChannel(channelName, payload, opts)` method to send a mess

### Method options

| Name | Type | Supported adapters | Description |
| --------------- | --------- | ----------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `raw` | `Boolean` | \* | If truthy, the payload won't be serialized. |
| `persistent` | `Boolean` | AMQP | If truthy, the message will survive broker restarts provided it’s in a queue that also survives restarts. |
| `ttl` | `Number` | AMQP | If supplied, the message will be discarded from a queue once it’s been there longer than the given number of milliseconds. |
| `priority` | `Number` | AMQP | Priority of the message. |
| `correlationId` | `String` | AMQP | Request identifier. |
| `headers` | `Object` | AMQP, JetStream, Kafka, Redis | Application specific headers to be carried along with the message content. |
| `routingKey` | `Object` | AMQP | The AMQP `publish` method's second argument. If you want to send the message into an external queue instead of exchange, set the `channelName` to `""` and set the queue name to `routingKey` |
| `key` | `String` | Kafka | Key of Kafka message. |
| `partition` | `String` | Kafka | Partition of Kafka message. |
| `acks` | `Number` | Kafka | Control the number of required acks. |
| `timeout` | `Number` | Kafka | The time to await a response in ms. Default: `30000` |
| `compression` | `any` | Kafka | Compression codec. Default: `CompressionTypes.None` |
| Name | Type | Supported adapters | Description |
| --------------------------------------- | -------------------- | ----------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `raw` | `Boolean` | \* | If truthy, the payload won't be serialized. |
| `persistent` | `Boolean` | AMQP | If truthy, the message will survive broker restarts provided it’s in a queue that also survives restarts. |
| `ttl` | `Number` | AMQP | If supplied, the message will be discarded from a queue once it’s been there longer than the given number of milliseconds. |
| `priority` | `Number` | AMQP | Priority of the message. |
| `correlationId` | `String` | AMQP | Request identifier. |
| `headers` | `Object` | AMQP, JetStream, Kafka, Redis | Application specific headers to be carried along with the message content. |
| `routingKey` | `Object` | AMQP | The AMQP `publish` method's second argument. If you want to send the message into an external queue instead of exchange, set the `channelName` to `""` and set the queue name to `routingKey` |
| `publishAssertExchange.enabled` | `Boolean` | AMQP | Enable/disable calling once `channel.assertExchange()` before first publishing in new exchange by `sendToChannel()` |
| `publishAssertExchange.exchangeOptions` | `Object` | AMQP | AMQP lib exchange configuration when publishAssertExchange enabled |
| `key` | `String` | Kafka | Key of Kafka message. |
| `partition` | `String` | Kafka | Partition of Kafka message. |
| `acks` | `Number` | Kafka | Control the number of required acks. |
| `timeout` | `Number` | Kafka | The time to await a response in ms. Default: `30000` |
| `compression` | `any` | Kafka | Compression codec. Default: `CompressionTypes.None` |

## Middleware hooks

Expand Down Expand Up @@ -466,6 +468,8 @@ broker.createService({
| `amqp.exchangeOptions` | `Object` | `null` | AMQP | AMQP lib exchange configuration. More info [here](http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertExchange). |
| `amqp.messageOptions` | `Object` | `null` | AMQP | AMQP lib message configuration. More info [here](http://www.squaremobius.net/amqp.node/channel_api.html#channel_publish). |
| `amqp.consumerOptions` | `Object` | `null` | AMQP | AMQP lib consume configuration. More info [here](http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume). |
| `amqp.publishAssertExchange.enabled` | `Boolean` | `false` | AMQP | Enable/disable calling once `channel.assertExchange()` before first publishing in new exchange by `sendToChannel()`. More info [here](http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertExchange). |
| `amqp.publishAssertExchange.exchangeOptions` | `Object` | `null` | AMQP | AMQP lib exchange configuration. More info [here](http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertExchange). |
| `nats.streamConfig` | `Object` | `null` | NATS | NATS JetStream storage configuration. More info [here](https://docs.nats.io/jetstream/concepts/streams). |
| `nats.consumerOptions` | `Object` | `null` | NATS | NATS JetStream consumer configuration. More info [here](https://docs.nats.io/jetstream/concepts/consumers). |
| `kafka.brokers` | `String[]` | `null` | Kafka | Kafka bootstrap brokers. |
Expand Down Expand Up @@ -661,7 +665,14 @@ module.exports = {
// Options for `channel.publish()`
messageOptions: {},
// Options for `channel.consume()`
consumerOptions: {}
consumerOptions: {},
// Note: options for `channel.assertExchange()` before first publishing in new exchange
publishAssertExchange: {
// Enable/disable calling once `channel.assertExchange()` before first publishing in new exchange by `sendToChannel`
enabled: false,
// Options for `channel.assertExchange()` before publishing by `sendToChannel`
exchangeOptions: {}
},
},
maxInFlight: 10,
maxRetries: 3,
Expand All @@ -677,6 +688,28 @@ module.exports = {
};
```

**Example Producing messages with options**

```js
broker.sendToChannel("order.created", {
id: 1234,
items: [
/*...*/
]
},{
// Using specific `assertExchange()` options only for the current sending case
publishAssertExchange:{
// Enable/disable calling once `channel.assertExchange()` before first publishing in new exchange by `sendToChannel`
enabled: true,
// Options for `channel.assertExchange()` before publishing
exchangeOptions: {
/*...*/
}
},
});
```
> Note: If you know that the exchange will be created before `sendToChannel` is called by someone else, then it is better to skip `publishAssertExchange` option
### Kafka

The Kafka adapter uses Apache Kafka topics.
Expand Down
32 changes: 32 additions & 0 deletions src/adapters/amqp.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ let Amqplib;
* @property {Object} amqp.exchangeOptions AMQP lib exchange configuration
* @property {Object} amqp.messageOptions AMQP lib message configuration
* @property {Object} amqp.consumerOptions AMQP lib consume configuration
* @property {publishAssertExchange} amqp.publishAssertExchange AMQP lib exchange configuration for one-time calling assertExchange() before publishing in new exchange by sendToChannel
*/

/**
* @typedef {Object} publishAssertExchange
* @property {Boolean} enabled Enable/disable one-time calling channel.assertExchange() before publishing in new exchange by sendToChannel
* @property {Object} exchangeOptions AMQP lib exchange configuration https://amqp-node.github.io/amqplib/channel_api.html#channel_assertExchange
*/

/**
Expand Down Expand Up @@ -98,6 +105,10 @@ class AmqpAdapter extends BaseAdapter {
this.stopping = false;
this.connectAttempt = 0;
this.connectionCount = 0; // To detect reconnections
/**
* @type {Set<string>}
*/
this.assertedExchanges = new Set(); // For a collecting exchange names on which assetExchange() was called
}

/**
Expand Down Expand Up @@ -254,6 +265,7 @@ class AmqpAdapter extends BaseAdapter {
} catch (err) {
this.logger.error("Error while closing AMQP connection.", err);
}
this.assertedExchanges.clear();
this.stopping = false;
}

Expand Down Expand Up @@ -533,6 +545,26 @@ class AmqpAdapter extends BaseAdapter {
);

const data = opts.raw ? payload : this.serializer.serialize(payload);

const publishAssertExchange = _.defaultsDeep(
opts.publishAssertExchange,
this.opts.amqp.publishAssertExchange,
{
enabled: false,
exchangeOptions: {}
}
);

if (publishAssertExchange.enabled && !this.assertedExchanges.has(channelName)) {
this.logger.debug(`Asserting exchange ${channelName}`);
this.assertedExchanges.add(channelName);
await this.channel.assertExchange(
channelName,
"fanout",
publishAssertExchange.exchangeOptions
);
}

const res = this.channel.publish(channelName, opts.routingKey || "", data, messageOptions);
if (res === false) throw new MoleculerError("AMQP publish error. Write buffer is full.");
this.logger.debug(`Message was published at '${channelName}'`);
Expand Down

0 comments on commit 8f248f0

Please sign in to comment.