Skip to content

mongodb-utils/mkue

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

22 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

mkue

NPM version Build status Test coverage Dependency Status License Downloads Gittip

A MongoDB-backed job queueing mechanism.

  • Concurrency handling
  • Throttling inputs
  • Persistence of all input/output
  • FIFO
  • Exits the process gracefully

Example

Dispatcher:

var Queue = require('mkue');

var queue = new Queue();
// set a db
queue.collection = db.collection('stuff');
// make sure indexes are set
queue.ensureIndexes();

queue.dispatch('this function', {
  these: 'inputs'
})

Worker:

var Queue = require('mkue');

var queue = new Queue();
// set a db
queue.collection = db.collection('stuff');
// make sure indexes are set
queue.ensureIndexes();

// define a namespaced function
queue.define('this function', function (options) {
  return new Promise(function (resolve) {
    resolve(options.these);
  });
});

// set the concurrency
queue.concurrency(5);

// start listening
queue.run();

API

var queue = new Queue([options])

The options are:

  • concurrency <1> - number of jobs to be processed in parallel in this process
  • delay <1000> - delay to query the next batch of jobs on drain
  • collection - the MongoDB collection for this queue

queue.collection =

You are required to set the collection for this worker queue manually.

queue.concurrency(count )

Set the maximum number of concurrent, local jobs.

queue.delay(ms | )

Set the delay after draining the queue to start looking for jobs again.

queue.ensureIndexes().then( => )

Set the indexes for queues and currently processing jobs. Assumes that the queue is always short.

queue.processing().then( count => )

Get the current number of jobs being processed.

queue.queued().then( count => )

Get the current number of jobs in the queue.

queue.queue([ms | ])

Waits ms to start a new job.

queue.dispatch([name ], fn ).then( job => )

Add a job to the queue.

queue.get([name ], options ).then( job => )

Get the latest job with name and options. May or may not be completed yet.

queue.getById().then( job => )

Get a job by its ID.

queue.poll([name ], options , [ms | ]).then( job => )

Poll the latest job at interval ms with name and options until it's complete.

queue.define([name ], fn )

Define a function. name defaults to 'default' if not set. fn's API should be:

fn([options]).then( result => )

You only need to define this on a worker process.

queue.run()

Start running a new job. Call this on a worker process.

queue.close()

Stop creating new jobs.

About

MongoDB-based job queuing system

Resources

License

Stars

Watchers

Forks

Packages

No packages published