Skip to content

Commit 6be4f05

Browse files
author
vikasrohit
authored
Merge pull request #8 from appirio-tech/dev
Requeue, failed messages, in different exchange
2 parents 02723a4 + 5a1222d commit 6be4f05

File tree

4 files changed

+35
-8
lines changed

4 files changed

+35
-8
lines changed

consumer/config/constants.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,6 @@ export const EVENT = {
44
PROJECT_DRAFT_CREATED: 'project.draft-created',
55
PROJECT_UPDATED: 'project.updated',
66
PROJECT_DELETED: 'project.deleted',
7+
CONNECT_TO_SF_FAILED: 'connect2sf.failed'
78
},
89
};

consumer/src/services/ConfigurationService.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ class ConfigurationService {
2424
},
2525
},
2626
}).promise();
27-
console.log('node env: ' + process.env.NODE_ENV);
2827
if (!result.Items.length) {
2928
throw new Error('Configuration for AppXpressConfig not found');
3029
}

consumer/src/worker.js

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,9 @@ export function initHandlers(handlers) {
3535
* @param {String} exchangeName the exchange name
3636
* @param {String} queue the queue name
3737
*/
38-
export async function consume(channel, exchangeName, queue) {
38+
export async function consume(channel, exchangeName, queue, publishChannel) {
3939
channel.assertExchange(exchangeName, 'topic', { durable: true });
40+
publishChannel.assertExchange(exchangeName, 'topic', { durable: true });
4041
channel.assertQueue(queue, { durable: true });
4142
const bindings = _.keys(EVENT_HANDLERS);
4243
const bindingPromises = _.map(bindings, rk =>
@@ -73,8 +74,22 @@ export async function consume(channel, exchangeName, queue) {
7374
if (e.shouldAck) {
7475
channel.ack(msg);
7576
} else {
76-
// acking for debugging issue on production. this would prevent log pile up
77+
// ack the message but copy it to other queue where no consumer is listening
78+
// we can listen to that queue on adhoc basis when we see error case like lead not created in SF
79+
// we can use cloudamqp console to check the messages and may be manually create SF lead
80+
// nacking here was causing flood of messages to the worker and it keep on consuming high resources
7781
channel.ack(msg);
82+
try {
83+
publishChannel.publish(
84+
exchangeName,
85+
EVENT.ROUTING_KEY.CONNECT_TO_SF_FAILED,
86+
new Buffer(msg.content.toString())
87+
);
88+
} catch(e) {
89+
// TODO decide if we want nack the original msg here
90+
// for now just ignoring the error in requeue
91+
logger.logFullError(e, `Error in publising Exchange to ${exchangeName}`);
92+
}
7893
}
7994
}
8095
});
@@ -91,7 +106,13 @@ async function start() {
91106
debug('created connection successfully with URL: ' + config.rabbitmqURL);
92107
const channel = await connection.createConfirmChannel();
93108
debug('Channel confirmed...');
94-
consume(channel, config.rabbitmq.projectsExchange, config.rabbitmq.queues.project);
109+
const publishChannel = await connection.createConfirmChannel();
110+
consume(
111+
channel,
112+
config.rabbitmq.projectsExchange,
113+
config.rabbitmq.queues.project,
114+
publishChannel
115+
);
95116
} catch (e) {
96117
debug('Unable to connect to RabbitMQ');
97118
}

consumer/test/worker.spec.js

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import './setup';
99
describe('worker', () => {
1010
describe('consume', () => {
1111
const queueName = 'sample-queue';
12-
const exchangeName = EVENT.ROUTING_KEY.PROJECT_DRAFT_CREATED//'sample-exchange';
12+
const exchangeName = 'sample-exchange';
1313
const validMessage = {
1414
content: JSON.stringify({ sampleData: 'foo' }),
1515
properties: { correlationId : 'unit-tests'},
@@ -24,6 +24,7 @@ describe('worker', () => {
2424
let rabbitConsume;
2525
let exchangeHandlerSpy = sinon.spy();
2626
let fakeExchangeHandlerSpy = sinon.spy();
27+
let channelPublishSpy = sinon.spy();
2728

2829
beforeEach(() => {
2930
handler = sinon.spy();
@@ -58,7 +59,11 @@ describe('worker', () => {
5859
done(e);
5960
}
6061
},
61-
}, exchangeName, queueName);
62+
}, exchangeName, queueName,
63+
{
64+
publish: channelPublishSpy,
65+
assertExchange
66+
});
6267
}
6368

6469
it('should consume and ack a message successfully', (done) => {
@@ -91,15 +96,16 @@ describe('worker', () => {
9196
invokeConsume(done);
9297
});
9398

94-
xit('should nack if error is thrown', (done) => {
99+
it('should ack, with message being copied to temp queue, if error is thrown', (done) => {
95100
initHandlers({
96101
[exchangeName] : () => {
97102
throw new Error('foo');
98103
}
99104
})
100105
rabbitConsume = async (queue, fn) => {
101106
await fn(validMessage);
102-
nack.should.have.been.calledWith(validMessage);
107+
ack.should.have.been.calledWith(validMessage);
108+
channelPublishSpy.should.have.been.calledWith(exchangeName, EVENT.ROUTING_KEY.CONNECT_TO_SF_FAILED, new Buffer(validMessage.content));
103109
};
104110
invokeConsume(done);
105111
});

0 commit comments

Comments
 (0)