Skip to content

Queue.process

Grant Carthew edited this page May 7, 2017 · 43 revisions

Method Signature

Queue.process(Handler)

Parameter: Handler Function

  • A job processing Function supporting three parameters, job, next, and onCancel.
  • The Handler signature is function (job, next, onCancel) { }
  • The onCancel argument is a function with this signature function (job, callback) { }
  • The callback is a user function that will gracefully stop job processing.

Returns: Promise => true

  • This Promise is from Queue.ready() ensuring the Queue is in a ready state.
  • Feel free to ignore this Promise, it is only initiating a Queue Master review.

Example:

// The onCancel argument is optional
q.process((job, next, onCancel) => {
  // Process the job here, call next when done passing null for the error
  next(null, 'Job finished successfully')
  // Initialize onCancel as follows
  onCancel(job, () => {
    // Gracefully cancel your job processing here
  }
})

Description

This is the business end of rethinkdb-job-queue where your jobs get processed. Any type of job can be used with the Queue. From sending e-mails to processing files to training machine learning models.

You do not need to call the Queue.process method to use the Queue object. By not calling Queue.process you are leaving the Queue object as a publisher for the queue. It will not be a subscriber or process jobs. See the Queue PubSub document for more detail.

Handler Callback Function

Signature: function (job, next, onCancel) { }

The handler callback arguments job, next, and onCancel are described below.

job

The function you pass into Queue.process() will carry out your job processing. The details required for your job can be obtained from the Job object that is passed into your process handler. If you populated your jobs with a Job.recipient property, then you use the job.recipient property in your handling function.

Whilst processing your job you can call the Job.updateProgress method on the Job object to update the progress in the database. Be careful about how many times you call Job.updateProgress because each call will touch the database. Another major feature of Job.updateProgress is that it will reset the job timeout process and the dateEnable property for the job in the database. This prevents a long running job from timing out and being classified as 'failed' by the Queue Master database review process.

On completion of your processing call the next(null, jobResult) function to let the Queue know you have completed the job or it has failed.

next(null, jobResult)

Returns: Promise => Number

  • The number of running jobs for the Queue object.

When your job has finished being processed you inform the Queue that the job has completed, updated, or failed by calling the next() function within your process handler. You can catch errors returned by the next() Promise. They will also be raised as error events.

Within the Queue object the argument you pass to the next() informing the Queue object that the job processing has finished. The next() function follows the Node.js style callback pattern. The next() callback function is called with the Error object (if any) passed as the first argument. If no error was raised, the first argument will be passed as null. The following examples show the options for calling the next() callback:

  • Any object or value to indicate job success.
  • next(null, 'Success Message')
  • An undefined value to indicate job success.
  • next()
  • The Job object to finish saving the job back to the queue for future processing.
  • next(null, Job)
  • An Error object to indicate job failure.
  • next(new Error('Something went wrong'))
  • An Error object to indicate job failure containing a custom property cancelJob that will cause the Queue object to cancel the job preventing further retries.
  • next(err) <= err object must have an err.cancelJob property that is not null.

Just for information purposes the next() Promise resolves to the total number of currently running jobs in the Queue object.

Completed Job

If the your job processing completed successfully you need to call the next() function to inform the queue. When you call the next() function you can pass in the job result data or message, however there is no need to. The string or object you pass into the next(null, result) call is used for logging purposes. The job result will be added as extra data for the completed log entry for the job.

next()
// or
next(null, 'Radiation levels are nominal')

Partly Completed or Delayed Job

New since v2.2.0

For some complex, multi-part, or fifo jobs you can part process them using this feature.

For more detail on how to process complex jobs see the Complex Job document. Following is how saving partly completed jobs works with the next() callback function.

If you have finished processing a job and you have a reason to delay it, you can call the next() callback passing in the Job object itself. When you do this the following internal steps take place within the processing Queue object:

  1. Job timeout processing and onCancel handlers are removed.
  2. If the Job object has a Job Status of active, its status is changed to waiting.
  3. A new log entry is added to the Job with the message "Job has not been processed to completion and is being placed back into the queue". Log entry new in v3.1.1
  4. The Job object is saved as is back to the queue database table.

With this in mind, you have the ability to add or change any data associated with the job whilst the job is within the process handler, saving it on the next() call.

job.ComplexStepsCompleted = 2
job.dateEnable = <some date in the future>
next(null, job)

Failed Job

If the job processing has failed, create a new error object and pass it in as the only argument. The error.message and error.stack values will be logged against the job.

const err = new Error('Watch out for them gamma rays')
next(err)

This will fail the job in the database and begin the Job Retry process.

Failed Job with Cancel

If the job is unrecoverable and you wish to cancel it preventing any retry attempt, populate the Error object with a cancelJob property containing a cancel reason message. If you don't want to supply a reason message, just set cancelJob to true.

const err = new Error('Watch out for them gamma rays')
err.cancelJob = 'Turned into the big green guy'
next(err)

The Error.cancelJob property is detected by the Queue and will change the jobs status from 'failed' to 'terminated' preventing retries.

onCancel

Signature: onCancel(job, function () { // Code to gracefully stop your job })

If a job is currently being processed and the Queue.cancelJob method is invoked, the job will be globally cancelled. When this occurs the callback you pass to onCancel will be invoked. You need to pass the onCancel function both the job so the Queue knows which job to invoke the callback for, and the callback function.

The callback function you pass to onCancel should gracefully stop your job processing. Do not call next() from within the onCancel callback function.

Examples

E-Mail Job Processing

This example shows the full functionality of the rethinkdb-job-queue for processing an email job. It is just an example that was copied from the mailgun-js README with Queue functionality added. It may not work even with your api_key.

const api_key = 'key-XXXXXXXXXXXXXXXXXXXXXXX'
const domain = 'mydomain.mailgun.org'
const mailgun = require('mailgun-js')({apiKey: api_key, domain: domain});
const Queue = require('rethinkdb-job-queue')
const q = new Queue()
const job = q.createJob()

// Here we attach the jobs payload. In this case mailgun data.
job.data = {
  from: 'Super Heros <[email protected]>',
  to: '[email protected]',
  subject: 'Registration for superheros.com',
  text: 'Click this link to activate your account on superheros.com'
}

q.addJob(job).catch(err => console.error(err))

// Queue.process can be placed before or after adding jobs
q.process((job, next, onCancel) => {
  onCancel(job, () => {
    // mailgun doesn't seem to have a graceful way
    // to stop sending mail. This is where you could if
    // the job was cancelled elsewhere
  })
  mailgun.messages().send(job.data, function (err, body) {
    if (err && err.message === 'something really bad') {
      err.cancelJob = 'something really bad happened'
    }
    if (err) { return next(err) }
    console.log(body)
    return next(null, 'email sent successfully')
  })
})

Main

How It Works

Contributing

API

Queue Methods

Queue Properties

Queue Events

Job Methods

Job Properties

Documentation

Clone this wiki locally