Skip to content

Commit

Permalink
Adds pollQueue method for long polling SQS.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rick Harrison committed Jul 20, 2016
1 parent bfa8e82 commit 9492579
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 3 deletions.
86 changes: 84 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict';

const assign = require('lodash.assign');
const aws = require('aws-sdk');

const DEFAULT_OPTS = {
Expand All @@ -14,7 +15,9 @@ const Client = function Client (opts = DEFAULT_OPTS) {

const { region, accessKeyId, secretAccessKey, queue } = opts;

console.log(opts);
if (!accessKeyId || !secretAccessKey || !queue) {
throw new Error('Missing a required parameter: accessKeyId, secretAccessKey, or queue');
}

this.sqs = new aws.SQS({
region,
Expand All @@ -32,7 +35,7 @@ const Client = function Client (opts = DEFAULT_OPTS) {
* @param payload - An object containing the message data
*/

Client.prototype.sendMessage = function sendMessage(payload) {
Client.prototype.sendMessage = function sendMessage (payload) {
return new Promise((resolve, reject) => {
this.sqs.sendMessage({
MessageBody: JSON.stringify(payload)
Expand All @@ -46,4 +49,83 @@ Client.prototype.sendMessage = function sendMessage(payload) {
});
};

/*
* Poll the SQS queue for new messages
*
* @param opts - An object to pass directly to the aws `receiveMessage` method
* @param handler - A function that should return a promise when passed a message
*/

Client.prototype.pollQueue = function pollQueue (opts = {}, handler) {
if (!this.receiveOptions) {
this.receiveOptions = assign({
AttributeNames: ['All'],
MessageAttributeNames: ['All'],
WaitTimeSeconds: 20
}, opts);
}

if (!this.handler) {
this.handler = handler;
}

const self = this;

this.sqs.receiveMessage(this.receiveOptions, (err, data) => {
const promises = [];

if (data && data.Messages) {
data.Messages.forEach(function (message) {
promises.push(this.handleMessage(message, this.handler));
}, this);
}

Promise.all(promises).then(function () {
setImmediate(function () {
self.pollQueue();
});
}, function () {
setImmediate(function () {
self.pollQueue();
});
});
});
};

/*
* Interacts with the handler to process a message
*
* @param message - A message returned from SQS
* @param handler - The message handler that will return a promise
*/

Client.prototype.handleMessage = function handleMessage (message, handler) {
const body = JSON.parse(message.Body);
const messagePromise = handler(body, message);

if (!messagePromise) {
return Promise.resolve();
}

return messagePromise.then(() => {
return this.deleteMessage(message.ReceiptHandle);
})
};

/*
* Deletes a message from the SQS queue
*
* @param receipt - A message receipt returned from SQS on `receiveMessage`
*/

Client.prototype.deleteMessage = function deleteMessage (receipt) {
return new Promise((resolve) => {
this.sqs.deleteMessage({
ReceiptHandle: receipt
}, function () {
resolve();
});
});
};

module.exports = Client;
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
},
"homepage": "https://github.com/meadow/sqs",
"dependencies": {
"aws-sdk": "^2.4.7"
"aws-sdk": "^2.4.7",
"lodash.assign": "^4.0.9"
}
}

0 comments on commit 9492579

Please sign in to comment.