Skip to content

Commit

Permalink
Refactor: joined failed error messages into a single message
Browse files Browse the repository at this point in the history
  • Loading branch information
nick spragg committed Dec 16, 2014
1 parent c7150f6 commit d338a83
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 43 deletions.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,25 @@ Enqueues messages onto a given SQS queue

## Installation

```
npm install BBC/sqs-producer --save
```
## Usage

```js
var Producer = require('sqs-producer');

var producer = Producer.create({
queueUrl: 'https://sqs.eu-west-1.amazonaws.com/account-id/queue-name',
region: 'eu-west-1'
});

producer.send(['msg1', 'msg2'], function(err) {
if (err) console.log(err);
});

```

## Test

```
Expand Down
64 changes: 32 additions & 32 deletions lib/producer.js
Original file line number Diff line number Diff line change
@@ -1,59 +1,59 @@
var AWS = require('aws-sdk');
//var stats = require('ibl-stats');
var BATCH_SIZE = 10;

// TODO: add event emitter
// TODO: add stats
var BATCH_SIZE = 10;

function Producer(options) {
this.url = options.url;
this.sqs = options.sqs;
this.queueUrl = options.queueUrl;
this.sqs = options.sqs || new AWS.SQS({region: options.region});
}

Producer.prototype._createEntries = function(batch) {
return batch.map(function (message) {
return {
Id: message,
MessageBody: message
};
});
};
function entryId(entry) {
return entry.Id;
}

Producer.prototype._sendBatch = function(errors, messages, startIndex, cb) {
function entryFromMessage(message) {
return {
Id: message,
MessageBody: message
};
}

var producer = this;
function createError(failedMessages) {
if (failedMessages.length === 0) {
return null;
}

return new Error('Failed to send messages: ' + failedMessages.join(', '));
}

Producer.prototype._sendBatch = function(failedMessages, messages, startIndex, cb) {

var producer = this;
var endIndex = startIndex + BATCH_SIZE;
var batch = messages.slice(startIndex, endIndex);
var params = {
Entries: this._createEntries(batch),
QueueUrl: this.url
Entries: batch.map(entryFromMessage),
QueueUrl: this.queueUrl
};

//stats.increment('sqs.sentBatches');

this.sqs.sendMessageBatch(params, function (err, result) {
if (err) {
errors.push(err);

} else {
result.Failed.forEach(function (entry) {
errors.push(new Error("Failed to send message: " + entry.Id));
});
}
if (err) return cb(err);

failedMessages = failedMessages.concat(result.Failed.map(entryId));

if (endIndex < messages.length) {
return producer._sendBatch(errors, messages, endIndex, cb);
return producer._sendBatch(failedMessages, messages, endIndex, cb);
}

cb(errors);
cb(createError(failedMessages));
});
};

Producer.prototype.send = function(messages, cb) {
var errors = [];
var failedMessages = [];
var startIndex = 0;
this._sendBatch(errors, messages, startIndex, cb);

this._sendBatch(failedMessages, messages, startIndex, cb);
};

module.exports.create = function(options) {
Expand Down
22 changes: 21 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
"description": "Enqueues messages onto a given SQS queue",
"main": "index.js",
"scripts": {
"test": "mocha"
"test": "mocha",
"coverage": "jscoverage lib lib-cov && NODE_ENV=dev COVERAGE=1 mocha -R html-cov > coverage.html && open coverage.html",
"lint" : "jshint ."
},
"repository": {
"type": "git",
Expand All @@ -19,5 +21,23 @@
"dependencies": {
"aws-sdk": "^2.1.2",
"sinon": "^1.12.2"
},
"devDependencies": {
"jscoverage": "^0.5.9",
"jshint": "^2.5.10"
},
"jshintConfig": {
"quotmark": "single",
"unused": true,
"undef": true,
"node": true,
"globals": {
"describe": false,
"it": false,
"before": false,
"beforeEach": false,
"after": false,
"afterEach": false
}
}
}
34 changes: 24 additions & 10 deletions test/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ describe('Producer', function () {
sinon.stub(sqs, 'sendMessageBatch').yields(null, {Failed: []});

producer = Producer.create({
url: queueUrl,
queueUrl: queueUrl,
sqs: sqs
});

Expand All @@ -24,7 +24,7 @@ describe('Producer', function () {
sqs.sendMessageBatch.restore();
});

it('send pids as batch', function (done) {
it('sends pids as a batch', function (done) {
var expectedParams = {
Entries: [
{ Id: 'pid1', MessageBody: 'pid1' },
Expand All @@ -33,28 +33,42 @@ describe('Producer', function () {
QueueUrl: queueUrl
};

producer.send(['pid1', 'pid2'], function (errors) {
producer.send(['pid1', 'pid2'], function (err) {
assert.ifError(err);
sinon.assert.calledOnce(sqs.sendMessageBatch);
sinon.assert.calledWith(sqs.sendMessageBatch, expectedParams);
assert.equal(errors.length, 0);
done();
});
});

it('makes multiple batch request when the pids length is larger than 10', function (done) {
producer.send(['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11'], function (errors) {
it('makes multiple batch requests when the number of messages is larger than 10', function (done) {
producer.send(['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11'], function (err) {
assert.ifError(err);
sinon.assert.calledTwice(sqs.sendMessageBatch);
assert.equal(errors.length, 0);
done();
});
});

it('returns error when sqs fail', function (done) {
it('returns an error when SQS fails', function (done) {
var sqsError = new Error('sqs failed');

sqs.sendMessageBatch.restore();
sinon.stub(sqs, 'sendMessageBatch').yields(sqsError);
producer.send([], function (errors) {
assert.equal(errors[0], sqsError);

producer.send(['foo'], function (err) {
assert.equal(err, sqsError);
done();
});
});

it('returns an error identifting the message that failed', function (done) {
sqs.sendMessageBatch.restore();

var failedMessages = [{Id: 'pid1'}, {Id: 'pid2'}, {Id: 'pid3'}];
sinon.stub(sqs, 'sendMessageBatch').yields(null, {Failed: failedMessages});

producer.send(['pid1', 'pid2', 'pid3'], function (err) {
assert.equal(err.message, 'Failed to send messages: pid1, pid2, pid3');
done();
});
});
Expand Down

0 comments on commit d338a83

Please sign in to comment.