Skip to content

Commit

Permalink
Release v1.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Tai NA committed Apr 15, 2021
2 parents 8238fc1 + 085b7dc commit 4412174
Show file tree
Hide file tree
Showing 14 changed files with 2,075 additions and 5,231 deletions.
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
## v1.0.0
- [Breaking Change]
- Queues in RabbitMQ created by v0.1.0 must be re-create because retry/deduplication feature cause queue configuration changed
- Rename configuration to make it more relevant with amqplib configuration, please follow README to validate your configuration
- queue.channel => queue.amqp
- queue.channel.assert => queue.amqp.queueAssert
- queue.consume => queue.amqp.consume
- [Add]
- Add retry feature by RabbitMQ requeue or using [rabbitmq-delayed-message-exchange](https://github.com/rabbitmq/rabbitmq-delayed-message-exchange) plugin
- Add deduplication feature using [rabbitmq-message-deduplication](https://github.com/noxdafox/rabbitmq-message-deduplication) plugin
- [Fix]
- Fix bug not apply queue configuration when assert queue

## v0.1.0
- First release
118 changes: 107 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ const QueueMixin = require("moleculer-rabbitmq");
const queueMixin = QueueMixin({
connection: "amqp://localhost",
asyncActions: true, // Enable auto generate .async version for actions
localPublisher: false, // Enable/Disable call this.actions.callAsync to call remote async
});

broker.createService({
Expand All @@ -37,15 +36,15 @@ broker.createService({
// Enable queue for this action
queue: {
// Options for AMQP queue
channel: {
assert: {
amqp: {
queueAssert: {
durable: true,
},
consume: {
noAck: false,
},
prefetch: 0,
},
consume: {
noAck: false,
},
},
params: {
name: "string|convert:true|empty:false",
Expand Down Expand Up @@ -76,7 +75,6 @@ const QueueMixin = require("moleculer-rabbitmq");
const queueMixin = QueueMixin({
connection: "amqp://localhost",
asyncActions: true, // Enable auto generate .async version for actions
localPublisher: false, // Enable/Disable call this.actions.callAsync to call remote async
});

broker.createService({
Expand Down Expand Up @@ -114,17 +112,115 @@ broker.createService({
});
```

# Retry failed jobs
By default, this plugin will not retry failed job. There are two option to enable retry logic: *RabbitMQ requeue* and using *[rabbitmq-delayed-message-exchange](https://github.com/rabbitmq/rabbitmq-delayed-message-exchange) plugin*

## RabbitMQ requeue
Set retry option to true when declare queue to enable Rabbitmq requeue.
Please note that the message will re requeue forever because *max_retry* is not available
```javascript
actions: {
hello: {
queue: {
retry: true, // Using rabbitmq default requeue logic
},
// ...
},
},
```

## Retry using *rabbitmq-delayed-message-exchange* plugins
**REQUIRE [rabbitmq-delayed-message-exchange](https://github.com/rabbitmq/rabbitmq-delayed-message-exchange) plugin to be install and enabled in RabbitMQ**

[Example RabbitMQ dockerfile](examples/rabbitmq/Dockerfile)

Example:
```javascript
actions: {
hello: {
queue: {
retryExchangeAssert: {
durable: true, // (boolean) if true, the exchange will survive broker restarts. Defaults to true.
autoDelete: false, // (boolean) if true, the exchange will be destroyed once the number of bindings for which it is the source drop to zero. Defaults to false.
alternateExchange: null, // (string) an exchange to send messages to if this exchange can’t route them to any queues.
arguments: { // additional arguments, usually parameters for some kind of broker-specific extension e.g., high availability, TTL
},
},
retry: {
max_retry: 3, // Max retry count, 3 mean if the first time failed, it will try 3 more times
delay: (retry_count) => { // Number of miliseconds delay between each retry, could be a number or a function(retry_count) that return a number
return retry_count * 1000;
},
},
},
// ...
},
},
```

# Deduplicate jobs
This plugin allow you to avoid dupplicate task using `dedupHash` option that can be number, string or function(ctx) that return a number or string
To get this feature to work, you have to install [rabbitmq-message-deduplication](https://github.com/noxdafox/rabbitmq-message-deduplication) plugin.
And please aware that using this feature may cause decrease in queue performance

[Example RabbitMQ dockerfile](examples/rabbitmq/Dockerfile)
[Example plugin usage with deduplication](examples/deduplication)

# Plugin Configuration

## Mixin configuration
```javascript
connection: "amqp://localhost", // (String|Object) Required. connection string or object, passed to amqplib.connect (You can also set this on broker.createService settings.amqp.connection parameter)
asyncActions: true, // (Boolean) Optional, default: false. Enable auto generate .async version for actions
```

## Action configuration
```javascript
queue: {
amqp: {
queueAssert: { // Options for job queue (Ref: http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue)
exclusive: false, // (boolean) if true, scopes the queue to the connection (defaults to false)
durable: true, // (boolean) if true, the queue will survive broker restarts, modulo the effects of exclusive and autoDelete; this defaults to true if not supplied, unlike the others
autoDelete: false, // (boolean) if true, the queue will be deleted when the number of consumers drops to zero (defaults to false)
arguments: { // additional arguments, usually parameters for some kind of broker-specific extension e.g., high availability, TTL
"x-message-deduplication": true, // Preserve for deduplication feature
},
},
retryExchangeAssert: { // Options for retry exchange (Ref: http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertExchange)
durable: true, // (boolean) if true, the exchange will survive broker restarts. Defaults to true.
autoDelete: false, // (boolean) if true, the exchange will be destroyed once the number of bindings for which it is the source drop to zero. Defaults to false.
alternateExchange: null, // (string) an exchange to send messages to if this exchange can’t route them to any queues.
arguments: { // additional arguments, usually parameters for some kind of broker-specific extension e.g., high availability, TTL
"arguments.x-delayed-type": "direct", // Set by this plugin
"x-message-deduplication": true, // Preserve for deduplication feature
},
},
consume: { // Options for consumer (Ref: http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume)
noAck: false,
},
prefetch: 0, // Set the prefetch count for this channel
},
retry: { // (Boolean|Object) : Enable or disable retry option
max_retry: 0, // Max retry count
delay: 0, // Delay in ms each retry
},
dedupHash: null, // (Number|String|Function(ctx)) Hash or function to calculate hash to deduplicate task
}
```

# Examples

Take a look at [examples](examples) folder for more examples
- [Simple example](examples/simple) : Basic example
- [Local publisher example](examples/localPublisher) : Example with local publisher (allow publisher to create task event when consumer services is offline). Warning: if there are queue configuration difference between publisher and consumer, the queue configuration will be set follow the first one started. Will improve this in future update or please make a PR if you wanna.
- [Simple example](examples/simple) : Example for basic usage
- [Retry example](examples/retry) : Example with retry logic
- [Deduplication example](examples/deduplication) : Example with deduplicate message feature

# Roadmap

- [x] Implement retry logic for rabbitmq queue
- [x] Allow deduplicate message
- [ ] Graceful shutdown queue
- [ ] Allow deduplicate message
- [ ] Implement retry logic for rabbitmq queue
- [ ] Test & Coverage

# License

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ let broker = new ServiceBroker({
const queueMixin = QueueMixin({
connection: "amqp://localhost",
asyncActions: true, // Enable auto generate .async version for actions
localPublisher: false, // Enable/Disable call this.actions.callAsync to call remote async
});

broker.createService({
Expand All @@ -30,14 +29,17 @@ broker.createService({
hello: {
queue: { // Enable queue for this action
// Options for AMQP queue
channel: {
assert: {
amqp: {
queueAssert: {
durable: true,
},
consume: {
noAck: false,
},
prefetch: 0,
},
consume: {
noAck: false,
dedupHash: (ctx) => {
return ctx.params.name;
},
},
params: {
Expand All @@ -49,7 +51,7 @@ broker.createService({
setTimeout(() => {
this.logger.info(`[CONSUMER] PID: ${process.pid} Processed job with name=${ctx.params.name}`);
return resolve(`hello ${ctx.params.name}`);
}, 1000);
}, 10000);
});
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ let broker = new ServiceBroker({
const queueMixin = QueueMixin({
connection: "amqp://localhost",
asyncActions: true, // Enable auto generate .async version for actions
localPublisher: true, // Enable/Disable call this.actions.callAsync to call remote async
});

broker.createService({
Expand All @@ -27,25 +26,22 @@ broker.createService({
},

async started() {
// await broker.waitForServices({ name: "consumer", version: 1 });
await broker.waitForServices({ name: "consumer", version: 1 });

let name = 1;
let name = "repeat_name";
setInterval(async () => {
const response = await this.actions.callAsync({
// remote async action name
action: "v1.consumer.hello",
const response = await broker.call("v1.consumer.hello.async", {
// `params` is the real param will be passed to original action
params: {
name,
},
// `options` is the real options will be passed to original action
options: {
timeout: 2000,
timeout: 12000,
},
});
this.logger.info(`[PUBLISHER] PID: ${process.pid} Called job with name=${name} response=${JSON.stringify(response)}`);
name++;
}, 2000);
}, 200);
}
});

Expand Down
10 changes: 10 additions & 0 deletions examples/rabbitmq/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM rabbitmq:3-management

ADD https://github.com/noxdafox/rabbitmq-message-deduplication/releases/download/0.5.0/elixir-1.10.4.ez /opt/rabbitmq/plugins/
ADD https://github.com/noxdafox/rabbitmq-message-deduplication/releases/download/0.5.0/rabbitmq_message_deduplication-0.5.0.ez /opt/rabbitmq/plugins/

ADD https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.9/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez /opt/rabbitmq/plugins/

RUN chown rabbitmq:rabbitmq /opt/rabbitmq/plugins/*.ez \
&& rabbitmq-plugins enable --offline rabbitmq_message_deduplication \
&& rabbitmq-plugins enable --offline rabbitmq_delayed_message_exchange
10 changes: 10 additions & 0 deletions examples/rabbitmq/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Build image

docker build -t moleculer-rabbitmq/rabbitmq:latest .

# Push image

docker push moleculer-rabbitmq/rabbitmq:latest

# Run container
docker run -d --name rabbitmq -p 15672:15672 -p 5672:5672 taina/rabbitmq:latest
75 changes: 75 additions & 0 deletions examples/retry/consumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
const { ServiceBroker } = require("moleculer");
const QueueMixin = require("../../index");

let broker = new ServiceBroker({
logger: console,
transporter: "TCP",
});

const queueMixin = QueueMixin({
connection: "amqp://localhost",
asyncActions: true, // Enable auto generate .async version for actions
});

broker.createService({
name: "consumer",
version: 1,

mixins: [
queueMixin,
],

settings: {
amqp: {
connection: "amqp://localhost", // You can also override setting from service setting
},
},

actions: {
hello: {
queue: { // Enable queue for this action
// Options for AMQP queue
amqp: {
queueAssert: {
exclusive: false, // (boolean) if true, scopes the queue to the connection (defaults to false)
durable: true, // (boolean) if true, the queue will survive broker restarts, modulo the effects of exclusive and autoDelete; this defaults to true if not supplied, unlike the others
autoDelete: false, // (boolean) if true, the queue will be deleted when the number of consumers drops to zero (defaults to false)
arguments: { // additional arguments, usually parameters for some kind of broker-specific extension e.g., high availability, TTL
},
},
prefetch: 0,
},
retryExchangeAssert: {
durable: true, // (boolean) if true, the exchange will survive broker restarts. Defaults to true.
autoDelete: false, // (boolean) if true, the exchange will be destroyed once the number of bindings for which it is the source drop to zero. Defaults to false.
alternateExchange: null, // (string) an exchange to send messages to if this exchange can’t route them to any queues.
arguments: { // additional arguments, usually parameters for some kind of broker-specific extension e.g., high availability, TTL
},
},
retry: true, // Using rabbitmq default requeue logic (retry forever)
// retry: {
// max_retry: 3,
// delay: (retry_count) => {
// return retry_count * 1000;
// },
// },
},
params: {
name: "string|convert:true|empty:false",
},
async handler(ctx) {
this.logger.info(`[CONSUMER] PID: ${process.pid} Received job with name=${ctx.params.name}`);
return new Promise((resolve, reject) => {
setTimeout(() => {
this.logger.info(`[CONSUMER] PID: ${process.pid} Processed job with name=${ctx.params.name}`);
return reject(new Error("TEST"));
}, 1000);
});
},
},
},
});

broker.start().then(() => {
broker.repl();
});
49 changes: 49 additions & 0 deletions examples/retry/publisher.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
const { ServiceBroker } = require("moleculer");
const QueueMixin = require("../../index");

let broker = new ServiceBroker({
logger: console,
transporter: "TCP",
});

const queueMixin = QueueMixin({
connection: "amqp://localhost",
asyncActions: true, // Enable auto generate .async version for actions
});

broker.createService({
name: "publisher",
version: 1,

mixins: [
queueMixin,
],

settings: {
amqp: {
connection: "amqp://localhost", // You can also override setting from service setting
},
},

async started() {
await broker.waitForServices({ name: "consumer", version: 1 });

let name = "buggy_message";
const response = await broker.call("v1.consumer.hello.async", {
// `params` is the real param will be passed to original action
params: {
name,
},
// `options` is the real options will be passed to original action
options: {
timeout: 2000,
},
});
this.logger.info(`[PUBLISHER] PID: ${process.pid} Called job with name=${name} response=${JSON.stringify(response)}`);
name++;
}
});

broker.start().then(() => {
broker.repl();
});
Loading

0 comments on commit 4412174

Please sign in to comment.