Skip to content

Commit c854e25

Browse files
author
vikasrohit
authored
Merge pull request #9 from appirio-tech/dev
Release 1.1.0 for handling failed messages as scheduled worker
2 parents 6be4f05 + f646e94 commit c854e25

File tree

14 files changed

+434
-73
lines changed

14 files changed

+434
-73
lines changed

consumer/.ebextensions/01-environment-variables.config

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,15 @@ option_settings:
2626
- namespace: aws:elasticbeanstalk:application:environment
2727
option_name: RABBITMQ_PROJECTS_EXCHANGE
2828
value: dev.projects
29+
- namespace: aws:elasticbeanstalk:application:environment
30+
option_name: RABBITMQ_CONNECT2SF_EXCHANGE
31+
value: dev.tc.connect2sf
2932
- namespace: aws:elasticbeanstalk:application:environment
3033
option_name: QUEUE_PROJECTS
3134
value: dev.project.service
35+
- namespace: aws:elasticbeanstalk:application:environment
36+
option_name: QUEUE_CONNECT2SF
37+
value: dev.tc.connect2sf.exclusive
3238
- namespace: aws:elasticbeanstalk:application:environment
3339
option_name: IDENTITY_SERVICE_URL
3440
value: TBD

consumer/Dockerfile

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ LABEL description="Topcoder Salesforce Integration"
55
RUN apt-get update && \
66
apt-get upgrade -y
77

8+
#RUN apt-get install cron -y
9+
810

911
# Create app directory
1012
RUN mkdir -p /usr/src/app
@@ -15,10 +17,12 @@ COPY . /usr/src/app
1517
# Install app dependencies
1618
RUN npm install
1719

18-
RUN npm install -g forever
20+
RUN npm install -g forever babel-cli
1921

20-
EXPOSE 80
22+
#RUN crontab config/scheduler-cron
2123

22-
CMD forever -c "npm start" --uid "consumer" .
24+
#RUN service cron start
25+
26+
EXPOSE 80
2327

24-
#CMD npm start
28+
CMD forever -c "npm start" --uid "consumer" .

consumer/README.md

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ Env variable: `LOG_LEVEL`
2020
- **rabbitmqURL**
2121
The rabbitmq URL.
2222
Create a free account here https://www.cloudamqp.com/ and create a new instance in any region.
23-
You can get URL by clicking on queue details button.
23+
You can get URL by clicking on queue details button. For deployment in AWS, please make sure that this instance is launched in the VPC which target AWS server can communicate with.
2424
Env variable: `RABBITMQ_URL`
2525

2626
- **ownerId**
@@ -86,7 +86,7 @@ You can use the existing cert.pem from `config` directory.
8686
Or generate a new certificate and key using a command:
8787
`openssl req -newkey rsa:2048 -new -nodes -x509 -days 3650 -keyout key.pem -out cert.pem`
8888

89-
Private key of your certificate is read from environment variable, instead of reading from the config directory. So please make sure you replace all new line characters with `\n` before setting it in the environment variable. Application would add newline characters back to the key when using it to sign the requests.
89+
**Private key of your certificate is read from environment variable, instead of reading from the config directory. So please make sure you replace all new line characters with `\n` before setting it in the environment variable. Application would add newline characters back to the key when using it to sign the requests.**
9090

9191
![Alt text](https://monosnap.com/file/tT9ZZXUH1aa1j7cFzYxaV9RjmHWCum.png)
9292
Click Save
@@ -224,11 +224,19 @@ Check the Lead details in Saleforce
224224
![Alt text](https://monosnap.com/file/PdMF97k18cBGeZjR9qOkkBe1AjYw2n.png)
225225
Lead is removed from the campaign
226226

227+
## Deployment Checklist
228+
1. AppXpressConfig table exists in dynamodb with dripcampaignId
229+
2. Make sure configured rabbitmq exchange and queue are created appropriately in cloumamqp
230+
3. There should be proper mapping between exchange and queue specified in the conifguration
231+
4. Grant permission, with user conifgured, for the app once using url https://login.salesforce.com/services/oauth2/authorize?client_id=[clientId]&redirect_uri=https://login.salesforce.com&response_type=code
227232

228-
Notes on Error Handling.
229-
UnprocessableError is thrown if operation cannot be completed.
233+
## CI
234+
* All changes into dev will be built and deployed to AWS beanstalk environment `tc-connect2sf-dev`
235+
* All changes into master will be built and deployed to AWS beanstalk environment `tc-connect2sf-prod`
236+
237+
## Notes on Error Handling.
238+
`UnprocessableError` is thrown if operation cannot be completed.
230239
For example: duplicated project id added to the queue, Lead cannot be found etc.
231240
In such situation, the message from rabbitmq will be marked as ACK (removed).
232-
If we won't remove it from queue, the message will be stuck forever.
233-
234-
241+
If we won't remove it from queue, the message will be stuck forever.
242+
For any other type of error the message from the rabbitmq will me marked as ACK as well, however, it would requeued into another queue for later inspection. It right now publishes the message content to the same rabbitmq exchange (configured as mentioned in Configuration section) with routing key being `connect2sf.failed`. So, we have to map the exchange and routing key comibation to a queue to which no consumer is listeting e.g. `tc-connect2sf.failed` is used in dev environment. Now we can see messages, via rabbitmq manager UI, in this queue to check if any of the messages failed and what was id of the project which failed. We can either remove those messages from the queue, if we are going to add those leads manually in saleforce or move them again to the original queue after fixing the deployed environment.

consumer/config/constants.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@ export const EVENT = {
44
PROJECT_DRAFT_CREATED: 'project.draft-created',
55
PROJECT_UPDATED: 'project.updated',
66
PROJECT_DELETED: 'project.deleted',
7-
CONNECT_TO_SF_FAILED: 'connect2sf.failed'
7+
FAILED_SUFFIX: '.failed'
88
},
99
};

consumer/config/custom-environment-variables.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
},
2222
"rabbitmq" : {
2323
"projectsExchange" : "RABBITMQ_PROJECTS_EXCHANGE",
24+
"connect2sfExchange" : "RABBITMQ_CONNECT2SF_EXCHANGE",
2425
"queues": {
25-
"project": "QUEUE_PROJECTS"
26+
"project": "QUEUE_PROJECTS",
27+
"connect2sf": "QUEUE_CONNECT2SF"
2628
}
2729
}
2830
}

consumer/config/scheduler-cron

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
*/5 * * * * babel-node /usr/src/app/src/scheduled-worker.js

consumer/config/test.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
},
2121
"rabbitmq" : {
2222
"projectsExchange" : "dev.projects",
23+
"connect2sfExchange": "dev.tc.connect2sf",
2324
"queues": {
24-
"project": "dev.project.service"
25+
"project": "dev.project.service",
26+
"connect2sf": "dev.tc.connect2sf.exclusive"
2527
}
2628
}
2729
}

consumer/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
"joi": "^9.0.4",
4747
"jsonwebtoken": "^7.1.7",
4848
"lodash": "^4.14.2",
49+
"node-cron": "^1.1.3",
4950
"superagent": "^2.1.0",
5051
"superagent-promise": "^1.1.0",
5152
"winston": "^2.2.0"

consumer/src/scheduled-worker.js

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/**
2+
* The main app entry
3+
*/
4+
5+
import config from 'config';
6+
import amqp from 'amqplib';
7+
import _ from 'lodash';
8+
import logger from './common/logger';
9+
import ConsumerService from './services/ConsumerService';
10+
import { EVENT } from '../config/constants';
11+
12+
const debug = require('debug')('app:worker');
13+
14+
const FETCH_LIMIT = 10;
15+
16+
let connection;
17+
process.once('SIGINT', () => {
18+
debug('Received SIGINT...closing connection...')
19+
try {
20+
connection.close();
21+
} catch (ignore) { // eslint-ignore-line
22+
logger.logFullError(ignore)
23+
}
24+
process.exit();
25+
});
26+
27+
let EVENT_HANDLERS = {
28+
[EVENT.ROUTING_KEY.PROJECT_DRAFT_CREATED]: ConsumerService.processProjectCreated
29+
// [EVENT.ROUTING_KEY.PROJECT_UPDATED]: ConsumerService.processProjectUpdated
30+
}
31+
32+
function close() {
33+
console.log('closing self after processing messages...')
34+
try {
35+
setTimeout(connection.close.bind(connection), 30000);
36+
} catch (ignore) { // eslint-ignore-line
37+
logger.logFullError(ignore)
38+
}
39+
}
40+
41+
export function initHandlers(handlers) {
42+
EVENT_HANDLERS = handlers;
43+
}
44+
45+
/**
46+
* Processes the given message and acks/nacks the channel
47+
* @param {Object} channel the target channel
48+
* @param {Object} msg the message to be processed
49+
*/
50+
export function processMessage(channel, msg) {
51+
return new Promise((resolve, reject) => {
52+
if (!msg) {
53+
reject(new Error('Empty message. Ignoring'));
54+
return;
55+
}
56+
debug(`Consuming message in \n${msg.content}`);
57+
const key = _.get(msg, 'fields.routingKey');
58+
debug('Received Message', key, msg.fields);
59+
60+
let handler;
61+
let data;
62+
try {
63+
handler = EVENT_HANDLERS[key];
64+
if (!_.isFunction(handler)) {
65+
logger.error(`Unknown message type: ${key}, NACKing... `);
66+
reject(new Error(`Unknown message type: ${key}`));
67+
return;
68+
}
69+
data = JSON.parse(msg.content.toString());
70+
} catch (ignore) {
71+
logger.info(ignore);
72+
logger.error('Invalid message. Ignoring');
73+
resolve('Invalid message. Ignoring');
74+
return;
75+
}
76+
return handler(logger, data).then(() => {
77+
resolve(msg);
78+
return;
79+
})
80+
.catch((e) => {
81+
// logger.logFullError(e, `Error processing message`);
82+
if (e.shouldAck) {
83+
debug("Resolving for Unprocessable Error in handler...");
84+
resolve(msg);
85+
} else {
86+
debug("Rejecting promise for error in msg processing...")
87+
reject(new Error('Error processing message'));
88+
}
89+
});
90+
})
91+
}
92+
93+
function assertExchangeQueues(channel, exchangeName, queue) {
94+
channel.assertExchange(exchangeName, 'topic', { durable: true });
95+
channel.assertQueue(queue, { durable: true });
96+
const bindings = _.keys(EVENT_HANDLERS);
97+
const bindingPromises = _.map(bindings, rk =>
98+
channel.bindQueue(queue, exchangeName, rk));
99+
debug('binding queue ' + queue + ' to exchange: ' + exchangeName);
100+
return Promise.all(bindingPromises);
101+
}
102+
103+
/**
104+
* Start the worker
105+
*/
106+
export async function start() {
107+
try {
108+
console.log("Scheduled Worker Connecting to RabbitMQ: " + config.rabbitmqURL.substr(-5));
109+
connection = await amqp.connect(config.rabbitmqURL);
110+
connection.on('error', (e) => {
111+
logger.logFullError(e, `ERROR IN CONNECTION`);
112+
})
113+
connection.on('close', () => {
114+
debug('Before closing connection...')
115+
})
116+
debug('created connection successfully with URL: ' + config.rabbitmqURL);
117+
const connect2sfChannel = await connection.createConfirmChannel();
118+
debug('Channel created for consuming failed messages ...');
119+
connect2sfChannel.prefetch(FETCH_LIMIT);
120+
assertExchangeQueues(
121+
connect2sfChannel,
122+
config.rabbitmq.connect2sfExchange,
123+
config.rabbitmq.queues.connect2sf
124+
).then(() => {
125+
debug('Asserted all required exchanges and queues');
126+
let counter = 0;
127+
_.range(1, 11).forEach(() => {
128+
return connect2sfChannel.get(config.rabbitmq.queues.connect2sf).
129+
then((msg) => {
130+
if (msg) {
131+
return processMessage(
132+
connect2sfChannel,
133+
msg
134+
).then((responses) => {
135+
counter++;
136+
debug('Processed message');
137+
connect2sfChannel.ack(msg);
138+
if (counter >= FETCH_LIMIT) {
139+
close();
140+
}
141+
}).catch((e) => {
142+
counter++;
143+
debug('Processed message with Error');
144+
connect2sfChannel.nack(msg);
145+
logger.logFullError(e, `Unable to process one of the messages`);
146+
if (counter >= FETCH_LIMIT) {
147+
close();
148+
}
149+
})
150+
} else {
151+
counter++;
152+
debug('Processed Empty message');
153+
if (counter >= FETCH_LIMIT) {
154+
close();
155+
}
156+
}
157+
}).catch(() => {
158+
console.log('get failed to consume')
159+
})
160+
})
161+
})
162+
163+
} catch (e) {
164+
logger.logFullError(e, `Unable to connect to RabbitMQ`);
165+
}
166+
}
167+
168+
if (!module.parent) {
169+
start();
170+
}

0 commit comments

Comments
 (0)