Skip to content

Commit 4405c21

Browse files
author
Parth Shah
committed
more event related fixes
1 parent f14b8eb commit 4405c21

File tree

11 files changed

+45
-16
lines changed

11 files changed

+45
-16
lines changed

src/constants.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ export const EVENT = {
3939
PROJECT_LAUNCHED: 'project.launched',
4040
PROJECT_UPDATED: 'project.updated',
4141
PROJECT_CANCELLED: 'project.cancelled',
42-
PROJECT_COMPLETED: 'project.completed'
42+
PROJECT_COMPLETED: 'project.completed',
43+
PROJECT_DELETED: 'project.deleted'
4344
}
4445
}
4546
EVENT.INTERNAL = _.mapValues(EVENT.ROUTING_KEY, (a) => { return `internal.${a}` })

src/events/projectMembers/index.js

+11-3
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,18 @@ module.exports = (app, logger) => {
1616

1717
// Publish messages to the queue
1818
_.map(internalEvents, (evt) => {
19-
app.on(evt, (member) => {
19+
app.on(evt, ({payload, props}) => {
2020
logger.debug('handling ', evt)
2121
let key = evt.substring(evt.indexOf('.') + 1)
22-
return app.services.pubsub.publish(key, member)
22+
return app.services.pubsub.publish(key, payload, props)
2323
})
2424
})
2525

2626

2727
// EXTERNAL events
2828
app.on(EVENT.EXTERNAL.PROJECT_MEMBER_ADDED, (msg, next) => {
29+
const origRequestId = msg.properties.correlationId
30+
logger = logger.child({requestId: origRequestId})
2931
let newMember = JSON.parse(msg.content.toString())
3032
logger.debug(`received msg '${EVENT.EXTERNAL.PROJECT_MEMBER_ADDED}'`, newMember)
3133

@@ -38,13 +40,16 @@ module.exports = (app, logger) => {
3840
return util.getSystemUserToken(logger)
3941
.then(token => {
4042
const req = {
41-
id: 1,
43+
id: origRequestId,
4244
log: logger,
4345
headers: { authorization: `Bearer ${token}` }
4446
}
4547
return directProject.addCopilot(req, directProjectId, {
4648
copilotUserId: newMember.userId
4749
})
50+
.then((resp) => {
51+
next()
52+
})
4853
})
4954
.catch(err => {
5055
logger.error('Error caught while adding co-pilot from direct', err)
@@ -62,7 +67,9 @@ module.exports = (app, logger) => {
6267
})
6368

6469
app.on(EVENT.EXTERNAL.PROJECT_MEMBER_REMOVED, (msg, next) => {
70+
const origRequestId = msg.properties.correlationId
6571
const member = JSON.parse(msg.content.toString())
72+
logger = logger.child({requestId: origRequestId})
6673
logger.debug(`received msg '${EVENT.EXTERNAL.PROJECT_MEMBER_REMOVED}'`, member)
6774

6875
if (member.role === PROJECT_MEMBER_ROLE.COPILOT) {
@@ -74,6 +81,7 @@ module.exports = (app, logger) => {
7481
return util.getSystemUserToken(logger)
7582
.then(token => {
7683
const req = {
84+
id: origRequestId,
7785
log: logger,
7886
headers: { authorization: `Bearer ${token}` }
7987
}

src/events/projects/index.js

+6-3
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,22 @@ module.exports = (app, logger) => {
1111
EVENT.INTERNAL.PROJECT_LAUNCHED,
1212
EVENT.INTERNAL.PROJECT_UPDATED,
1313
EVENT.INTERNAL.PROJECT_CANCELLED,
14-
EVENT.INTERNAL.PROJECT_COMPLETED
14+
EVENT.INTERNAL.PROJECT_COMPLETED,
15+
EVENT.INTERNAL.PROJECT_DELETED
1516
]
1617

1718
// Publish messages to the queue
1819
_.map(internalEvents, (evt) => {
19-
app.on(evt, (project) => {
20+
app.on(evt, ({payload, props}) => {
2021
logger.debug('handling ' + evt)
2122
let key = evt.substring(evt.indexOf('.') + 1)
22-
return app.services.pubsub.publish(key, project)
23+
return app.services.pubsub.publish(key, payload, props)
2324
})
2425
})
2526

2627
app.on(EVENT.EXTERNAL.PROJECT_DRAFT_CREATED, (msg, next) => {
28+
// log with correlationId (request Id)
29+
logger = logger.child({requestId: msg.properties.correlationId})
2730
let project = JSON.parse(msg.content.toString())
2831
logger.debug('received msg \'project.draft-created\'', project.id)
2932

src/routes/projectMembers/create.js

+4-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,10 @@ module.exports = [
6464
.then(_newMember => {
6565
newMember = _newMember.get({plain: true})
6666
// fire event
67-
req.app.emit(EVENT.INTERNAL.PROJECT_MEMBER_ADDED, newMember)
67+
req.app.emit(EVENT.INTERNAL.PROJECT_MEMBER_ADDED, {
68+
payload: newMember,
69+
props: { correlationId: req.id }
70+
})
6871
res.status(201).json(util.wrapResponse(req.id, newMember))
6972
})
7073
.catch((err) => {

src/routes/projectMembers/delete.js

+4-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@ module.exports = [
3636
})
3737
.then(member => {
3838
// fire event
39-
req.app.emit(EVENT.INTERNAL.PROJECT_MEMBER_REMOVED, member.get({plain:true}))
39+
req.app.emit(EVENT.INTERNAL.PROJECT_MEMBER_REMOVED, {
40+
payload: member.get({plain:true}),
41+
props: { correlationId: req.id }
42+
})
4043
res.status(204).json({})
4144
})
4245
.catch(err => next(err))

src/routes/projectMembers/update.js

+1
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ module.exports = [
8181
return Promise.all(operations)
8282
})
8383
.then(() => {
84+
// TODO move this to an event
8485
// if copilot role is added or removed should invoke related direct project service
8586
if(previousValue.role !== newValue.role && (previousValue.role === PROJECT_MEMBER_ROLE.COPILOT
8687
|| newValue.role === PROJECT_MEMBER_ROLE.COPILOT)) {

src/routes/projects/create.js

+4-1
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,10 @@ module.exports = [
125125
newProject = _.omit(newProject, ['deletedAt', 'utm'])
126126
// add an empty attachments array
127127
newProject.attachments = []
128-
req.app.emit(EVENT.INTERNAL.PROJECT_DRAFT_CREATED, newProject)
128+
req.app.emit(EVENT.INTERNAL.PROJECT_DRAFT_CREATED, {
129+
payload: newProject,
130+
props: { correlationId: req.id }
131+
})
129132
res.status(201).json(util.wrapResponse(req.id, newProject))
130133
})
131134
.catch((err) => {

src/routes/projects/delete.js

+4
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ module.exports = [
3232
err.status = 404
3333
next(err)
3434
} else {
35+
req.app.emit(EVENT.INTERNAL.PROJECT_DELETED, {
36+
payload: { id: projectId },
37+
props: { correlationId: req.id }
38+
})
3539
res.status(204).json({})
3640
}
3741
})

src/routes/projects/update.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,8 @@ module.exports = [
165165
previousValue = _.omit(previousValue, ['deletedAt'])
166166
// emit original and updated project information
167167
req.app.emit(EVENT.INTERNAL.PROJECT_UPDATED, {
168-
original: previousValue,
169-
updated: project
168+
payload: { original: previousValue, updated: project },
169+
props: { correlationId: req.id }
170170
})
171171
// check context for project members
172172
project.members = req.context.currentProjectMembers

src/services/directProject.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ function _getHttpClient(req){
1818
httpClient.defaults.baseURL = config.get('directProjectServiceEndpoint')
1919
httpClient.defaults.timeout = 4000
2020
httpClient.interceptors.response.use((resp) => {
21-
req.log.debug('resp: ', JSON.stringify(resp.data, null, 2))
21+
// req.log.debug('resp: ', JSON.stringify(resp.data, null, 2))
2222
if (resp.status !== 200 || resp.data.result.status !== 200) {
2323
req.log.error('error resp: ', JSON.stringify(resp.data, null, 2))
2424
return Promise.reject(new Error(resp.data.result.content.message))

src/services/rabbitmq.js

+6-3
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ module.exports = class RabbitMQService extends EventEmitter{
138138
* @param {string} key routing key
139139
* @param {object} payload message payload
140140
*/
141-
publish(key, payload) {
141+
publish(key, payload, props={}) {
142142
var channel = null
143143
var self = this
144144
// first create a channel - this is a lightweight connection
@@ -151,9 +151,12 @@ module.exports = class RabbitMQService extends EventEmitter{
151151
})
152152
}).then(() => {
153153
// publish the message
154-
channel.publish(self.exchangeName, key,
154+
props = _.defaults(props, { contentType: 'application/json'})
155+
channel.publish(
156+
self.exchangeName,
157+
key,
155158
new Buffer(JSON.stringify(payload)),
156-
{ contentType: 'application/json'}
159+
props
157160
)
158161
self.logger.debug('Sent %s: %s', self.exchangeName, payload)
159162
return channel.close()

0 commit comments

Comments
 (0)