@@ -6,6 +6,12 @@ const helper = require('../common/helper')
66const appConst = require ( '../consts' )
77const esClient = require ( './es-client' ) . getESClient ( )
88
9+ const {
10+ TopResources,
11+ UserResources,
12+ OrganizationResources
13+ } = appConst
14+
915const DOCUMENTS = config . ES . DOCUMENTS
1016const RESOURCES = Object . keys ( DOCUMENTS )
1117
@@ -283,21 +289,213 @@ function escapeRegex (str) {
283289 /* eslint-enable no-useless-escape */
284290}
285291
292+ /**
293+ * Function to get user from es
294+ * @param {String } userId
295+ * @returns {Object } user
296+ */
297+ async function getUser ( userId ) {
298+ const { body : user } = await esClient . get ( { index : TopResources . user . index , type : TopResources . user . type , id : userId } )
299+ return { seqNo : user . _seq_no , primaryTerm : user . _primary_term , user : user . _source }
300+ }
301+
302+ /**
303+ * Function to update es user
304+ * @param {String } userId
305+ * @param {Number } seqNo
306+ * @param {Number } primaryTerm
307+ * @param {Object } body
308+ */
309+ async function updateUser ( userId , body , seqNo , primaryTerm ) {
310+ try {
311+ await esClient . index ( {
312+ index : TopResources . user . index ,
313+ type : TopResources . user . type ,
314+ id : userId ,
315+ body,
316+ if_seq_no : seqNo ,
317+ if_primary_term : primaryTerm ,
318+ pipeline : TopResources . user . ingest . pipeline . id ,
319+ refresh : 'wait_for'
320+ } )
321+ logger . debug ( 'Update user completed' )
322+ } catch ( err ) {
323+ if ( err && err . meta && err . meta . body && err . meta . body . error ) {
324+ logger . debug ( JSON . stringify ( err . meta . body . error , null , 4 ) )
325+ }
326+ logger . debug ( JSON . stringify ( err ) )
327+ throw err
328+ }
329+ }
330+
331+ /**
332+ * Function to get org from es
333+ * @param {String } organizationId
334+ * @returns {Object } organization
335+ */
336+ async function getOrg ( organizationId ) {
337+ const { body : org } = await esClient . get ( { index : TopResources . organization . index , type : TopResources . organization . type , id : organizationId } )
338+ return { seqNo : org . _seq_no , primaryTerm : org . _primary_term , org : org . _source }
339+ }
340+
341+ /**
342+ * Function to update es organization
343+ * @param {String } organizationId
344+ * @param {Number } seqNo
345+ * @param {Number } primaryTerm
346+ * @param {Object } body
347+ */
348+ async function updateOrg ( organizationId , body , seqNo , primaryTerm ) {
349+ await esClient . index ( {
350+ index : TopResources . organization . index ,
351+ type : TopResources . organization . type ,
352+ id : organizationId ,
353+ body,
354+ if_seq_no : seqNo ,
355+ if_primary_term : primaryTerm ,
356+ refresh : 'wait_for'
357+ } )
358+ await esClient . enrich . executePolicy ( { name : TopResources . organization . enrich . policyName } )
359+ }
360+
286361/**
287362 * Process create entity
288363 * @param {String } resource resource name
289364 * @param {Object } entity entity object
290365 */
291366async function processCreate ( resource , entity ) {
292- helper . validProperties ( entity , [ 'id' ] )
293- await esClient . index ( {
294- index : DOCUMENTS [ resource ] . index ,
295- type : DOCUMENTS [ resource ] . type ,
296- id : entity . id ,
297- body : entity ,
298- refresh : 'true'
299- } )
300- logger . info ( `Insert in Elasticsearch resource ${ resource } entity, , ${ JSON . stringify ( entity , null , 2 ) } ` )
367+ if ( _ . includes ( _ . keys ( TopResources ) , resource ) ) {
368+ // process the top resources such as user, skill...
369+ helper . validProperties ( entity , [ 'id' ] )
370+ await esClient . index ( {
371+ index : TopResources [ resource ] . index ,
372+ type : TopResources [ resource ] . type ,
373+ id : entity . id ,
374+ body : _ . omit ( entity , [ 'resource' , 'originalTopic' ] ) ,
375+ pipeline : TopResources [ resource ] . ingest ? TopResources [ resource ] . ingest . pipeline . id : undefined ,
376+ refresh : 'true'
377+ } )
378+ if ( TopResources [ resource ] . enrich ) {
379+ await esClient . enrich . executePolicy ( {
380+ name : TopResources [ resource ] . enrich . policyName
381+ } )
382+ }
383+ } else if ( _ . includes ( _ . keys ( UserResources ) , resource ) ) {
384+ // process user resources such as userSkill, userAttribute...
385+ const userResource = UserResources [ resource ]
386+ userResource . validate ( entity )
387+ const { seqNo, primaryTerm, user } = await getUser ( entity . userId )
388+ const relateId = entity [ userResource . relateKey ]
389+ if ( ! user [ userResource . propertyName ] ) {
390+ user [ userResource . propertyName ] = [ ]
391+ }
392+
393+ // import groups for a new user
394+ if ( resource === 'externalprofile' && entity . externalId ) {
395+ const userGroups = await helper . getUserGroup ( entity . externalId )
396+ user [ config . get ( 'ES.USER_GROUP_PROPERTY_NAME' ) ] = _ . unionBy ( user [ config . get ( 'ES.USER_GROUP_PROPERTY_NAME' ) ] , userGroups , 'id' )
397+ }
398+
399+ // check the resource does not exist
400+ if ( _ . some ( user [ userResource . propertyName ] , [ userResource . relateKey , relateId ] ) ) {
401+ logger . error ( `Can't create existed ${ resource } with the ${ userResource . relateKey } : ${ relateId } , userId: ${ entity . userId } ` )
402+ throw helper . getErrorWithStatus ( '[version_conflict_engine_exception]' , 409 )
403+ } else {
404+ user [ userResource . propertyName ] . push ( entity )
405+ await updateUser ( entity . userId , user , seqNo , primaryTerm )
406+ }
407+ } else if ( _ . includes ( _ . keys ( OrganizationResources ) , resource ) ) {
408+ // process org resources such as org skill provider
409+ const orgResources = OrganizationResources [ resource ]
410+ orgResources . validate ( entity )
411+ const { seqNo, primaryTerm, org } = await getOrg ( entity . organizationId )
412+ const relateId = entity [ orgResources . relateKey ]
413+ if ( ! org [ orgResources . propertyName ] ) {
414+ org [ orgResources . propertyName ] = [ ]
415+ }
416+
417+ // check the resource does not exist
418+ if ( _ . some ( org [ orgResources . propertyName ] , [ orgResources . relateKey , relateId ] ) ) {
419+ logger . error ( `Can't create existing ${ resource } with the ${ orgResources . relateKey } : ${ relateId } , organizationId: ${ entity . organizationId } ` )
420+ throw helper . getErrorWithStatus ( '[version_conflict_engine_exception]' , 409 )
421+ } else {
422+ org [ orgResources . propertyName ] . push ( entity )
423+ await updateOrg ( entity . organizationId , org , seqNo , primaryTerm )
424+ }
425+ } else {
426+ logger . info ( `Ignore this message since resource is not in [${ _ . union ( _ . values ( TopResources ) , _ . keys ( UserResources ) , _ . keys ( OrganizationResources ) ) } ]` )
427+ }
428+ }
429+
430+ /**
431+ * Process update entity
432+ * @param {String } resource resource name
433+ * @param {Object } entity entity object
434+ */
435+ async function processUpdate ( resource , entity ) {
436+ if ( _ . includes ( _ . keys ( TopResources ) , resource ) ) {
437+ logger . info ( `Processing top level resource: ${ resource } ` )
438+ // process the top resources such as user, skill...
439+ helper . validProperties ( entity , [ 'id' ] )
440+ const { index, type } = TopResources [ resource ]
441+ const id = entity . id
442+ const { body : source } = await esClient . get ( { index, type, id } )
443+ await esClient . index ( {
444+ index,
445+ type,
446+ id,
447+ body : _ . assign ( source . _source , _ . omit ( entity , [ 'resource' , 'originalTopic' ] ) ) ,
448+ pipeline : TopResources [ resource ] . ingest ? TopResources [ resource ] . ingest . pipeline . id : undefined ,
449+ if_seq_no : source . _seq_no ,
450+ if_primary_term : source . _primary_term ,
451+ refresh : 'true'
452+ } )
453+ if ( TopResources [ resource ] . enrich ) {
454+ await esClient . enrich . executePolicy ( {
455+ name : TopResources [ resource ] . enrich . policyName
456+ } )
457+ }
458+ } else if ( _ . includes ( _ . keys ( UserResources ) , resource ) ) {
459+ // process user resources such as userSkill, userAttribute...
460+ const userResource = UserResources [ resource ]
461+ const relateId = entity [ userResource . relateKey ]
462+ logger . info ( `Processing user level resource: ${ resource } :${ relateId } ` )
463+ userResource . validate ( entity )
464+ logger . info ( `Resource validated for ${ relateId } ` )
465+ const { seqNo, primaryTerm, user } = await getUser ( entity . userId )
466+ logger . info ( `User fetched ${ user . id } and ${ relateId } ` )
467+
468+ // check the resource exist
469+ if ( ! user [ userResource . propertyName ] || ! _ . some ( user [ userResource . propertyName ] , [ userResource . relateKey , relateId ] ) ) {
470+ logger . error ( `The ${ resource } with the ${ userResource . relateKey } : ${ relateId } , userId: ${ entity . userId } not exist` )
471+ throw helper . getErrorWithStatus ( '[resource_not_found_exception]' , 404 )
472+ } else {
473+ const updateIndex = _ . findIndex ( user [ userResource . propertyName ] , [ userResource . relateKey , relateId ] )
474+ user [ userResource . propertyName ] . splice ( updateIndex , 1 , entity )
475+ logger . info ( `Updating ${ user . id } and ${ relateId } ` )
476+ await updateUser ( entity . userId , user , seqNo , primaryTerm )
477+ logger . info ( `Updated ${ user . id } and ${ relateId } ` )
478+ }
479+ } else if ( _ . includes ( _ . keys ( OrganizationResources ) , resource ) ) {
480+ logger . info ( `Processing org level resource: ${ resource } ` )
481+ // process org resources such as org skill providers
482+ const orgResource = OrganizationResources [ resource ]
483+ orgResource . validate ( entity )
484+ const { seqNo, primaryTerm, org } = await getOrg ( entity . organizationId )
485+ const relateId = entity [ orgResource . relateKey ]
486+
487+ // check the resource exist
488+ if ( ! org [ orgResource . propertyName ] || ! _ . some ( org [ orgResource . propertyName ] , [ orgResource . relateKey , relateId ] ) ) {
489+ logger . error ( `The ${ resource } with the ${ orgResource . relateKey } : ${ relateId } , organizationId: ${ entity . organizationId } not exist` )
490+ throw helper . getErrorWithStatus ( '[resource_not_found_exception]' , 404 )
491+ } else {
492+ const updateIndex = _ . findIndex ( org [ orgResource . propertyName ] , [ orgResource . relateKey , relateId ] )
493+ org [ orgResource . propertyName ] . splice ( updateIndex , 1 , entity )
494+ await updateOrg ( entity . organizationId , org , seqNo , primaryTerm )
495+ }
496+ } else {
497+ logger . info ( `Ignore this message since resource is not in [${ _ . union ( _ . values ( TopResources ) , _ . keys ( UserResources ) , _ . keys ( OrganizationResources ) ) } ]` )
498+ }
301499}
302500
303501/**
@@ -306,13 +504,53 @@ async function processCreate (resource, entity) {
306504 * @param {Object } entity entity object
307505 */
308506async function processDelete ( resource , entity ) {
309- helper . validProperties ( entity , [ 'id' ] )
310- await esClient . delete ( {
311- index : DOCUMENTS [ resource ] . index ,
312- type : DOCUMENTS [ resource ] . type ,
313- id : entity . id ,
314- refresh : 'wait_for'
315- } )
507+ if ( _ . includes ( _ . keys ( TopResources ) , resource ) ) {
508+ // process the top resources such as user, skill...
509+ helper . validProperties ( entity , [ 'id' ] )
510+ await esClient . delete ( {
511+ index : TopResources [ resource ] . index ,
512+ type : TopResources [ resource ] . type ,
513+ id : entity . id ,
514+ refresh : 'wait_for'
515+ } )
516+ if ( TopResources [ resource ] . enrich ) {
517+ await esClient . enrich . executePolicy ( {
518+ name : TopResources [ resource ] . enrich . policyName
519+ } )
520+ }
521+ } else if ( _ . includes ( _ . keys ( UserResources ) , resource ) ) {
522+ // process user resources such as userSkill, userAttribute...
523+ const userResource = UserResources [ resource ]
524+ userResource . validate ( entity )
525+ const { seqNo, primaryTerm, user } = await getUser ( entity . userId )
526+ const relateId = entity [ userResource . relateKey ]
527+
528+ // check the resource exist
529+ if ( ! user [ userResource . propertyName ] || ! _ . some ( user [ userResource . propertyName ] , [ userResource . relateKey , relateId ] ) ) {
530+ logger . error ( `The ${ resource } with the ${ userResource . relateKey } : ${ relateId } , userId: ${ entity . userId } not exist` )
531+ throw helper . getErrorWithStatus ( '[resource_not_found_exception]' , 404 )
532+ } else {
533+ _ . remove ( user [ userResource . propertyName ] , [ userResource . relateKey , relateId ] )
534+ await updateUser ( entity . userId , user , seqNo , primaryTerm )
535+ }
536+ } else if ( _ . includes ( _ . keys ( OrganizationResources ) , resource ) ) {
537+ // process user resources such as org skill provider
538+ const orgResource = OrganizationResources [ resource ]
539+ orgResource . validate ( entity )
540+ const { seqNo, primaryTerm, org } = await getOrg ( entity . organizationId )
541+ const relateId = entity [ orgResource . relateKey ]
542+
543+ // check the resource exist
544+ if ( ! org [ orgResource . propertyName ] || ! _ . some ( org [ orgResource . propertyName ] , [ orgResource . relateKey , relateId ] ) ) {
545+ logger . error ( `The ${ resource } with the ${ orgResource . relateKey } : ${ relateId } , organizationId: ${ entity . organizationId } not exist` )
546+ throw helper . getErrorWithStatus ( '[resource_not_found_exception]' , 404 )
547+ } else {
548+ _ . remove ( org [ orgResource . propertyName ] , [ orgResource . relateKey , relateId ] )
549+ await updateOrg ( entity . organizationId , org , seqNo , primaryTerm )
550+ }
551+ } else {
552+ logger . info ( `Ignore this message since resource is not in [${ _ . union ( _ . keys ( TopResources ) , _ . keys ( UserResources ) , _ . keys ( OrganizationResources ) ) } ]` )
553+ }
316554}
317555
318556async function getOrganizationId ( handle ) {
@@ -1487,7 +1725,7 @@ async function searchAchievementValues ({ organizationId, keyword }) {
14871725
14881726module . exports = {
14891727 processCreate,
1490- processUpdate : processCreate ,
1728+ processUpdate,
14911729 processDelete,
14921730 searchElasticSearch,
14931731 getFromElasticSearch,
0 commit comments