Skip to content

Commit ebe0935

Browse files
author
Vikas Agarwal
committed
Instead of nacking messages on service call failures, queued them in separate exchange with different routing queue. So that they don't overflow the server resources by continuously trying with failed attempts. It will further allow manually fixing the failed lead creation by looking at the error queue.
1 parent a352499 commit ebe0935

File tree

4 files changed

+27
-8
lines changed

4 files changed

+27
-8
lines changed

consumer/config/constants.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,6 @@ export const EVENT = {
44
PROJECT_DRAFT_CREATED: 'project.draft-created',
55
PROJECT_UPDATED: 'project.updated',
66
PROJECT_DELETED: 'project.deleted',
7+
CONNECT_TO_SF_FAILED: 'connect2sf.failed'
78
},
89
};

consumer/src/services/ConfigurationService.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ class ConfigurationService {
2424
},
2525
},
2626
}).promise();
27-
console.log('node env: ' + process.env.NODE_ENV);
2827
if (!result.Items.length) {
2928
throw new Error('Configuration for AppXpressConfig not found');
3029
}

consumer/src/worker.js

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ export function initHandlers(handlers) {
3535
* @param {String} exchangeName the exchange name
3636
* @param {String} queue the queue name
3737
*/
38-
export async function consume(channel, exchangeName, queue) {
38+
export async function consume(channel, exchangeName, queue, publishChannel) {
3939
channel.assertExchange(exchangeName, 'topic', { durable: true });
4040
channel.assertQueue(queue, { durable: true });
4141
const bindings = _.keys(EVENT_HANDLERS);
@@ -73,8 +73,16 @@ export async function consume(channel, exchangeName, queue) {
7373
if (e.shouldAck) {
7474
channel.ack(msg);
7575
} else {
76-
// acking for debugging issue on production. this would prevent log pile up
76+
// ack the message but copy it to other queue where no consumer is listening
77+
// we can listen to that queue on adhoc basis when we see error case like lead not created in SF
78+
// we can use cloudamqp console to check the messages and may be manually create SF lead
79+
// nacking here was causing flood of messages to the worker and it keep on consuming high resources
7780
channel.ack(msg);
81+
publishChannel.publish(
82+
exchangeName,
83+
EVENT.ROUTING_KEY.CONNECT_TO_SF_FAILED,
84+
new Buffer(msg.content.toString())
85+
);
7886
}
7987
}
8088
});
@@ -91,7 +99,13 @@ async function start() {
9199
debug('created connection successfully with URL: ' + config.rabbitmqURL);
92100
const channel = await connection.createConfirmChannel();
93101
debug('Channel confirmed...');
94-
consume(channel, config.rabbitmq.projectsExchange, config.rabbitmq.queues.project);
102+
const publishChannel = await connection.createConfirmChannel();
103+
consume(
104+
channel,
105+
config.rabbitmq.projectsExchange,
106+
config.rabbitmq.queues.project,
107+
publishChannel
108+
);
95109
} catch (e) {
96110
debug('Unable to connect to RabbitMQ');
97111
}

consumer/test/worker.spec.js

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import './setup';
99
describe('worker', () => {
1010
describe('consume', () => {
1111
const queueName = 'sample-queue';
12-
const exchangeName = EVENT.ROUTING_KEY.PROJECT_DRAFT_CREATED//'sample-exchange';
12+
const exchangeName = 'sample-exchange';
1313
const validMessage = {
1414
content: JSON.stringify({ sampleData: 'foo' }),
1515
properties: { correlationId : 'unit-tests'},
@@ -24,6 +24,7 @@ describe('worker', () => {
2424
let rabbitConsume;
2525
let exchangeHandlerSpy = sinon.spy();
2626
let fakeExchangeHandlerSpy = sinon.spy();
27+
let channelPublishSpy = sinon.spy();
2728

2829
beforeEach(() => {
2930
handler = sinon.spy();
@@ -58,7 +59,10 @@ describe('worker', () => {
5859
done(e);
5960
}
6061
},
61-
}, exchangeName, queueName);
62+
}, exchangeName, queueName,
63+
{
64+
publish: channelPublishSpy
65+
});
6266
}
6367

6468
it('should consume and ack a message successfully', (done) => {
@@ -91,15 +95,16 @@ describe('worker', () => {
9195
invokeConsume(done);
9296
});
9397

94-
xit('should nack if error is thrown', (done) => {
98+
it('should ack, with message being copied to temp queue, if error is thrown', (done) => {
9599
initHandlers({
96100
[exchangeName] : () => {
97101
throw new Error('foo');
98102
}
99103
})
100104
rabbitConsume = async (queue, fn) => {
101105
await fn(validMessage);
102-
nack.should.have.been.calledWith(validMessage);
106+
ack.should.have.been.calledWith(validMessage);
107+
channelPublishSpy.should.have.been.calledWith(exchangeName, EVENT.ROUTING_KEY.CONNECT_TO_SF_FAILED, new Buffer(validMessage.content));
103108
};
104109
invokeConsume(done);
105110
});

0 commit comments

Comments
 (0)