diff --git a/index.js b/index.js index 478f660..ecf9185 100644 --- a/index.js +++ b/index.js @@ -136,6 +136,8 @@ Client.prototype.pollQueue = function pollQueue (opts = {}, handler) { */ Client.prototype.handleMessage = function handleMessage (message, handler) { + const { preventVisibilityTimeoutRemoval, visibilityTimeoutOnError } = this.options; + const messagePromise = Promise.resolve().then(function () { const body = JSON.parse(message.Body); @@ -145,10 +147,14 @@ Client.prototype.handleMessage = function handleMessage (message, handler) { return messagePromise.then(() => { return this.deleteMessage(message.ReceiptHandle); }).catch(() => { - if (this.options.preventVisibilityTimeoutRemoval === true) { + if (preventVisibilityTimeoutRemoval === true) { return Promise.resolve(); } + if (visibilityTimeoutOnError) { + return this.changeVisibilityTimeout(message, visibilityTimeoutOnError); + } + return this.removeVisibilityTimeout(message); }); }; diff --git a/package.json b/package.json index 774d5f0..40d24f1 100644 --- a/package.json +++ b/package.json @@ -40,6 +40,7 @@ "babel-eslint": "^6.1.2", "codecov": "^3.7.1", "eslint": "^3.1.1", + "lodash.get": "^4.4.2", "nyc": "^7.0.0", "sinon": "^1.17.4" }, diff --git a/tests/poll-queue.js b/tests/poll-queue.js index e1b7e99..1a9f027 100644 --- a/tests/poll-queue.js +++ b/tests/poll-queue.js @@ -129,6 +129,23 @@ test.cb('should call removeVisibilityTimeout after handler throws', (t) => { }); }); +test.cb('should call removeVisibilityTimeout after handler throws', (t) => { + t.plan(1); + + t.context.data = { + Messages: [{ Body: '{ "foobar": "bazqux" }' }] + }; + + t.context.client.pollQueue({}, function () { + setImmediate(function () { + t.is(t.context.client.removeVisibilityTimeout.callCount, 1); + t.end(); + }); + + throw new Error('bar'); + }); +}); + test.cb('should call poll queue multiple times', (t) => { t.plan(1); diff --git a/tests/visibility-timeout-on-error.js b/tests/visibility-timeout-on-error.js new file mode 100644 index 0000000..cef0970 --- /dev/null +++ b/tests/visibility-timeout-on-error.js @@ -0,0 +1,76 @@ +import test from 'ava'; +import _get from 'lodash.get'; +import sinon from 'sinon'; + +import sqs from '../index'; + +test.beforeEach((t) => { + t.context.client = sqs({ accessKeyId: 'foo', secretAccessKey: 'bar', queue: 'baz', visibilityTimeoutOnError: 1234 }); + + sinon.stub(t.context.client.sqs, 'receiveMessage', function (params, callback) { + setImmediate(function () { + callback(null, t.context.data); + }); + }); + + sinon.stub(t.context.client, 'deleteMessage', function () { + return Promise.resolve(); + }); + + sinon.stub(t.context.client, 'changeVisibilityTimeout', function () { + return Promise.resolve(); + }); + + sinon.stub(t.context.client, 'removeVisibilityTimeout', function () { + return Promise.resolve(); + }); +}); + +test.afterEach((t) => { + t.context.client.sqs.receiveMessage.restore(); + t.context.client.deleteMessage.restore(); + t.context.client.changeVisibilityTimeout.restore(); + t.context.client.removeVisibilityTimeout.restore(); +}); + +test.cb('should use `visibilityTimeoutOnError` option', (t) => { + t.plan(2); + + t.context.data = { + Messages: [{ Body: '{ "foobar": "bazqux" }' }] + }; + + t.context.client.pollQueue({}, function () { + const promise = Promise.resolve(); + + setImmediate(function () { + promise.then(function () { + t.is(_get(t.context.client.changeVisibilityTimeout, 'firstCall.args[1]'), 1234); + t.is(t.context.client.changeVisibilityTimeout.callCount, 1); + t.end(); + }); + }); + + return promise.then(function () { + throw new Error('bar'); + }); + }); +}); + +test.cb('should call removeVisibilityTimeout after handler throws', (t) => { + t.plan(2); + + t.context.data = { + Messages: [{ Body: '{ "foobar": "bazqux" }' }] + }; + + t.context.client.pollQueue({}, function () { + setImmediate(function () { + t.is(_get(t.context.client.changeVisibilityTimeout, 'firstCall.args[1]'), 1234); + t.is(t.context.client.changeVisibilityTimeout.callCount, 1); + t.end(); + }); + + throw new Error('bar'); + }); +});