@@ -849,31 +849,31 @@ class NodesMonitor extends EventEmitter {
849849 } ) ;
850850 }
851851
852- _run_node ( item ) {
853- if ( ! this . _started ) return P . reject ( new Error ( 'monitor has not started' ) ) ;
852+ async _run_node ( item ) {
853+ if ( ! this . _started ) throw new Error ( 'monitor has not started' ) ;
854854 item . _run_node_serial = item . _run_node_serial || new semaphore . Semaphore ( 1 ) ;
855- if ( item . node . deleted ) return P . reject ( new Error ( `node ${ item . node . name } is deleted` ) ) ;
856- return item . _run_node_serial . surround ( ( ) =>
857- P . resolve ( )
858- . then ( ( ) => dbg . log1 ( '_run_node:' , item . node . name ) )
859- . then ( ( ) => this . _get_agent_info ( item ) )
860- . then ( ( ) => { //If internal or cloud resource, cut down initializing time (in update_rpc_config)
861- if ( ! item . node_from_store && ( item . node . is_mongo_node || item . node . is_cloud_node ) ) {
862- return this . _update_nodes_store ( 'force' ) ;
863- }
864- } )
865- . then ( ( ) => this . _uninstall_deleting_node ( item ) )
866- . then ( ( ) => this . _remove_hideable_nodes ( item ) )
867- . then ( ( ) => this . _update_node_service ( item ) )
868- . then ( ( ) => this . _update_create_node_token ( item ) )
869- . then ( ( ) => this . _update_rpc_config ( item ) )
870- . then ( ( ) => this . _test_nodes_validity ( item ) )
871- . then ( ( ) => this . _update_status ( item ) )
872- . then ( ( ) => this . _handle_issues ( item ) )
873- . then ( ( ) => this . _update_nodes_store ( ) )
874- . catch ( err => {
855+ if ( item . node . deleted ) throw new Error ( `node ${ item . node . name } is deleted` ) ;
856+ return item . _run_node_serial . surround ( async ( ) => {
857+ dbg . log1 ( '_run_node:' , item . node . name ) ;
858+ await this . _get_agent_info ( item ) ;
859+ //If internal or cloud resource, cut down initializing time (in update_rpc_config)
860+ if ( ! item . node_from_store && ( item . node . is_mongo_node || item . node . is_cloud_node ) ) {
861+ return this . _update_nodes_store ( 'force' ) ;
862+ }
863+ try {
864+ await this . _uninstall_deleting_node ( item ) ;
865+ this . _remove_hideable_nodes ( item ) ;
866+ await this . _update_node_service ( item ) ;
867+ await this . _update_create_node_token ( item ) ;
868+ await this . _update_rpc_config ( item ) ;
869+ await this . _test_nodes_validity ( item ) ;
870+ this . _update_status ( item ) ;
871+ this . _handle_issues ( item ) ;
872+ await this . _update_nodes_store ( ) ;
873+ } catch ( err ) {
875874 dbg . warn ( '_run_node: ERROR' , err . stack || err , 'node' , item . node ) ;
876- } ) ) ;
875+ }
876+ } ) ;
877877 }
878878
879879 _handle_issues ( item ) {
@@ -1681,71 +1681,71 @@ class NodesMonitor extends EventEmitter {
16811681 // This is why we are required to use a new variable by the name ready_to_be_deleted
16821682 // In order to mark the nodes that wait for their processes to be removed (cloud/mongo resource)
16831683 // If the node is not relevant to a cloud/mongo resouce it will be just marked as deleted
1684- _update_deleted_nodes ( deleted_nodes ) {
1684+ async _update_deleted_nodes ( deleted_nodes ) {
16851685 if ( ! deleted_nodes . length ) return ;
16861686 const items_to_update = [ ] ;
1687- return P . map_with_concurrency ( 10 , deleted_nodes , item => {
1688- dbg . log0 ( '_update_nodes_store deleted_node:' , item ) ;
16891687
1690- if ( item . node . deleted ) {
1691- if ( ! item . node_from_store . deleted ) {
1692- items_to_update . push ( item ) ;
1693- }
1694- return ;
1695- }
1688+ await P . map_with_concurrency ( 10 , deleted_nodes , async item => {
1689+ dbg . log0 ( '_update_nodes_store deleted_node:' , item ) ;
16961690
1697- // TODO handle deletion of normal nodes (uninstall?)
1698- // Just mark the node as deleted and we will not scan it anymore
1699- // This is done once the node's proccess is deleted (relevant to cloud/mongo resource)
1700- // Or in a normal node it is done immediately
1701- if ( ! item . node . is_cloud_node &&
1702- ! item . node . is_mongo_node &&
1703- ! item . node . is_internal_node ) {
1704- item . node . deleted = Date . now ( ) ;
1691+ if ( item . node . deleted ) {
1692+ if ( ! item . node_from_store . deleted ) {
17051693 items_to_update . push ( item ) ;
1706- return ;
17071694 }
1695+ return ;
1696+ }
17081697
1709- return P . resolve ( )
1710- . then ( ( ) => {
1711- if ( item . node . is_internal_node ) {
1712- return P . reject ( 'Do not support internal_node deletion yet' ) ;
1713- }
1714- // Removing the internal node from the processes
1715- return server_rpc . client . hosted_agents . remove_pool_agent ( {
1716- node_name : item . node . name
1717- } ) ;
1718- } )
1719- . then ( ( ) => {
1720- // Marking the node as deleted since we've removed it completely
1721- // If we did not succeed at removing the process we don't mark the deletion
1722- // This is done in order to cycle the node once again and attempt until
1723- // We succeed
1724- item . node . deleted = Date . now ( ) ;
1725- items_to_update . push ( item ) ;
1726- } )
1727- . catch ( err => {
1728- // We will just wait another cycle and attempt to delete it fully again
1729- dbg . warn ( 'delete_cloud_or_mongo_pool_node ERROR node' , item . node , err ) ;
1730- } ) ;
1731- } )
1732- . then ( ( ) => NodesStore . instance ( ) . bulk_update ( items_to_update ) )
1733- . then ( res => {
1734- // mark failed updates to retry
1735- if ( res . failed ) {
1736- for ( const item of res . failed ) {
1737- this . _set_need_update . add ( item ) ;
1738- }
1698+ // TODO handle deletion of normal nodes (uninstall?)
1699+ // Just mark the node as deleted and we will not scan it anymore
1700+ // This is done once the node's proccess is deleted (relevant to cloud/mongo resource)
1701+ // Or in a normal node it is done immediately
1702+ if ( ! item . node . is_cloud_node &&
1703+ ! item . node . is_mongo_node &&
1704+ ! item . node . is_internal_node ) {
1705+ item . node . deleted = Date . now ( ) ;
1706+ items_to_update . push ( item ) ;
1707+ return ;
1708+ }
1709+
1710+ try {
1711+ if ( item . node . is_internal_node ) {
1712+ throw new Error ( 'Do not support internal_node deletion yet' ) ;
17391713 }
1740- if ( res . updated ) {
1741- for ( const item of res . updated ) {
1742- this . _remove_node_from_maps ( item ) ;
1743- }
1714+ // Removing the internal node from the processes
1715+ await server_rpc . client . hosted_agents . remove_pool_agent ( {
1716+ node_name : item . node . name
1717+ } ) ;
1718+
1719+ // Marking the node as deleted since we've removed it completely
1720+ // If we did not succeed at removing the process we don't mark the deletion
1721+ // This is done in order to cycle the node once again and attempt until
1722+ // We succeed
1723+ item . node . deleted = Date . now ( ) ;
1724+ items_to_update . push ( item ) ;
1725+
1726+ } catch ( err ) {
1727+ // We will just wait another cycle and attempt to delete it fully again
1728+ dbg . warn ( 'delete_cloud_or_mongo_pool_node ERROR node' , item . node , err ) ;
1729+ }
1730+ } ) ;
1731+
1732+ try {
1733+ const res = NodesStore . instance ( ) . bulk_update ( items_to_update ) ;
1734+
1735+ // mark failed updates to retry
1736+ if ( res . failed ) {
1737+ for ( const item of res . failed ) {
1738+ this . _set_need_update . add ( item ) ;
17441739 }
1745- } )
1746- . catch ( err => {
1747- dbg . warn ( '_update_deleted_nodes: ERROR' , err . stack || err ) ;
1748- } ) ;
1740+ }
1741+ if ( res . updated ) {
1742+ for ( const item of res . updated ) {
1743+ this . _remove_node_from_maps ( item ) ;
1744+ }
1745+ }
1746+ } catch ( err ) {
1747+ dbg . warn ( '_update_deleted_nodes: ERROR' , err . stack || err ) ;
1748+ }
17491749 }
17501750
17511751 _should_enable_agent ( info , agent_config ) {
0 commit comments