Skip to content

Commit

Permalink
Merge branch 'develop' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
Tai NA committed Apr 2, 2021
2 parents f26902e + 00c52e7 commit 8238fc1
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 12 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ broker.createService({
# Examples

Take a look at [examples](examples) folder for more examples
- [Simple example](examples/simple) : Basic example
- [Local publisher example](examples/localPublisher) : Example with local publisher (allow publisher to create task event when consumer services is offline). Warning: if there are queue configuration difference between publisher and consumer, the queue configuration will be set follow the first one started. Will improve this in future update or please make a PR if you wanna.

# Roadmap

Expand Down
61 changes: 61 additions & 0 deletions examples/localPublisher/consumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
const { ServiceBroker } = require("moleculer");
const QueueMixin = require("../../index");

let broker = new ServiceBroker({
logger: console,
transporter: "TCP",
});

const queueMixin = QueueMixin({
connection: "amqp://localhost",
asyncActions: true, // Enable auto generate .async version for actions
localPublisher: false, // Enable/Disable call this.actions.callAsync to call remote async
});

broker.createService({
name: "consumer",
version: 1,

mixins: [
queueMixin,
],

settings: {
amqp: {
connection: "amqp://localhost", // You can also override setting from service setting
},
},

actions: {
hello: {
queue: { // Enable queue for this action
// Options for AMQP queue
channel: {
assert: {
durable: true,
},
prefetch: 0,
},
consume: {
noAck: false,
},
},
params: {
name: "string|convert:true|empty:false",
},
async handler(ctx) {
this.logger.info(`[CONSUMER] PID: ${process.pid} Received job with name=${ctx.params.name}`);
return new Promise((resolve) => {
setTimeout(() => {
this.logger.info(`[CONSUMER] PID: ${process.pid} Processed job with name=${ctx.params.name}`);
return resolve(`hello ${ctx.params.name}`);
}, 1000);
});
},
},
},
});

broker.start().then(() => {
broker.repl();
});
54 changes: 54 additions & 0 deletions examples/localPublisher/publisher.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
const { ServiceBroker } = require("moleculer");
const QueueMixin = require("../../index");

let broker = new ServiceBroker({
logger: console,
transporter: "TCP",
});

const queueMixin = QueueMixin({
connection: "amqp://localhost",
asyncActions: true, // Enable auto generate .async version for actions
localPublisher: true, // Enable/Disable call this.actions.callAsync to call remote async
});

broker.createService({
name: "publisher",
version: 1,

mixins: [
queueMixin,
],

settings: {
amqp: {
connection: "amqp://localhost", // You can also override setting from service setting
},
},

async started() {
// await broker.waitForServices({ name: "consumer", version: 1 });

let name = 1;
setInterval(async () => {
const response = await this.actions.callAsync({
// remote async action name
action: "v1.consumer.hello",
// `params` is the real param will be passed to original action
params: {
name,
},
// `options` is the real options will be passed to original action
options: {
timeout: 2000,
},
});
this.logger.info(`[PUBLISHER] PID: ${process.pid} Called job with name=${name} response=${JSON.stringify(response)}`);
name++;
}, 2000);
}
});

broker.start().then(() => {
broker.repl();
});
1 change: 1 addition & 0 deletions examples/simple/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const queueMixin = QueueMixin({

broker.createService({
name: "consumer",
version: 1,

mixins: [
queueMixin,
Expand Down
5 changes: 3 additions & 2 deletions examples/simple/publisher.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const queueMixin = QueueMixin({

broker.createService({
name: "publisher",
version: 1,

mixins: [
queueMixin,
Expand All @@ -26,11 +27,11 @@ broker.createService({
},

async started() {
await broker.waitForServices("consumer");
await broker.waitForServices({ name: "consumer", version: 1 });

let name = 1;
setInterval(async () => {
const response = await broker.call("consumer.hello.async", {
const response = await broker.call("v1.consumer.hello.async", {
// `params` is the real param will be passed to original action
params: {
name,
Expand Down
6 changes: 3 additions & 3 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "moleculer-rabbitmq",
"version": "0.1.0",
"version": "0.1.1",
"description": "Moleculer RabbitMQ queue plugin",
"main": "index.js",
"scripts": {
Expand Down
18 changes: 12 additions & 6 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const ACTION_OPTIONS_VALIDATOR = {
const initAMQPQueues = function (schema) {
Object.keys(schema.actions || {}).forEach((originActionName) => {
if (schema.actions[originActionName] && schema.actions[originActionName].queue) {
const queueName = `amqp.${schema.name}.${originActionName}`;
const queueName = `amqp.${schema.version ? `v${schema.version}.` : ""}${schema.name}.${originActionName}`;

const queueOption = Lodash.defaultsDeep({}, schema.actions[originActionName].queue, DEFAULT_QUEUE_OPTIONS);

Expand All @@ -55,7 +55,7 @@ const initAMQPQueues = function (schema) {
return channel.reject(msg, false);
}

const actionName = `${this.name}.${originActionName}`;
const actionName = `${this.version ? `v${this.version}.` : ""}${this.name}.${originActionName}`;
const actionParams = messageData.params || {};
const actionOptions = Lodash.defaultsDeep({}, messageData.options);

Expand All @@ -79,6 +79,10 @@ const initAMQPQueues = function (schema) {
};

const initAMQPActions = function (schema) {
if (!schema.actions) {
schema.actions = {};
}

if (schema.settings.amqp.localPublisher) {
schema.actions.callAsync = {
visibility: "private",
Expand All @@ -94,7 +98,9 @@ const initAMQPActions = function (schema) {
retries: 2,
},
async handler(ctx) {
return this.sendAMQPMessage(ctx.action, {
const queueName = `amqp.${ctx.params.action}`;

return this.sendAMQPMessage(queueName, {
params: ctx.params.params,
options: ctx.params.options,
}, {
Expand All @@ -108,9 +114,9 @@ const initAMQPActions = function (schema) {
}

if (schema.settings.amqp.asyncActions) {
Object.keys(schema.actions || {}).forEach((actionName) => {
Object.keys(schema.actions).forEach((actionName) => {
if (schema.actions[actionName] && schema.actions[actionName].queue) {
const queueName = `amqp.${schema.name}.${actionName}`;
const queueName = `amqp.${schema.version ? `v${schema.version}.` : ""}${schema.name}.${actionName}`;

const asyncParams = {
options: ACTION_OPTIONS_VALIDATOR,
Expand Down Expand Up @@ -225,7 +231,7 @@ module.exports = (options) => ({

async started() {
if (!this.settings.amqp || !this.settings.amqp.connection) {
this.logger.warn(`${this.name} is disabled because of empty "amqp.connection" setting`);
this.logger.warn(`${this.version ? `v${this.version}.` : ""}${this.name} is disabled because of empty "amqp.connection" setting`);
}

if (!["string", "object"].includes(typeof this.settings.amqp.connection)) {
Expand Down

0 comments on commit 8238fc1

Please sign in to comment.