A MongoDB-backed job queueing mechanism.
- Concurrency handling
- Throttling inputs
- Persistence of all input/output
- FIFO
- Exits the process gracefully
Dispatcher:
var Queue = require('mkue');
var queue = new Queue();
// set a db
queue.collection = db.collection('stuff');
// make sure indexes are set
queue.ensureIndexes();
queue.dispatch('this function', {
these: 'inputs'
})
Worker:
var Queue = require('mkue');
var queue = new Queue();
// set a db
queue.collection = db.collection('stuff');
// make sure indexes are set
queue.ensureIndexes();
// define a namespaced function
queue.define('this function', function (options) {
return new Promise(function (resolve) {
resolve(options.these);
});
});
// set the concurrency
queue.concurrency(5);
// start listening
queue.run();
The options are:
concurrency <1>
- number of jobs to be processed in parallel in this processdelay <1000>
- delay to query the next batch of jobs on draincollection
- the MongoDB collection for this queue
You are required to set the collection for this worker queue manually.
Set the maximum number of concurrent, local jobs.
Set the delay after draining the queue to start looking for jobs again.
Set the indexes for queues and currently processing jobs. Assumes that the queue is always short.
Get the current number of jobs being processed.
Get the current number of jobs in the queue.
Waits ms
to start a new job.
Add a job to the queue.
queue.get([name ], options ).then( job => )
Get the latest job with name
and options
.
May or may not be completed yet.
queue.getById().then( job => )
Get a job by its ID.
queue.poll([name ], options , [ms | ]).then( job => )
Poll the latest job at interval ms
with name
and options
until it's complete.
queue.define([name ], fn )
Define a function.
name
defaults to 'default'
if not set.
fn
's API should be:
fn([options]).then( result => )
You only need to define this on a worker process.
queue.run()
Start running a new job. Call this on a worker process.
queue.close()
Stop creating new jobs.