@@ -13,11 +13,11 @@ const map_deleter = require('./map_deleter');
1313const mongo_utils = require ( '../../util/mongo_utils' ) ;
1414const system_store = require ( '../system_services/system_store' ) . get_instance ( ) ;
1515const node_allocator = require ( '../node_services/node_allocator' ) ;
16- const system_server_utils = require ( '../utils/system_server_utils' ) ;
17- // const promise_utils = require('../../util/promise_utils');
16+ const KeysLock = require ( '../../util/keys_lock' ) ;
1817
1918
2019const replicate_block_sem = new Semaphore ( config . IO_REPLICATE_CONCURRENCY ) ;
20+ const builder_lock = new KeysLock ( ) ;
2121
2222
2323/**
@@ -31,35 +31,54 @@ const replicate_block_sem = new Semaphore(config.IO_REPLICATE_CONCURRENCY);
3131 */
3232class MapBuilder {
3333
34- constructor ( chunks ) {
35- this . chunks = chunks ;
36- this . system_id = chunks [ 0 ] && chunks [ 0 ] . system ;
34+ constructor ( chunk_ids ) {
35+ this . chunk_ids = chunk_ids ;
3736 }
3837
3938 run ( ) {
40- dbg . log1 ( 'MapBuilder.run:' , 'batch start' , this . chunks . length , 'chunks' ) ;
41- if ( ! this . chunks . length ) return ;
42- if ( system_server_utils . system_in_maintenance ( this . system_id ) ) return ;
43- return P . resolve ( )
44- . then ( ( ) => P . join (
45- system_store . refresh ( ) ,
46- md_store . load_blocks_for_chunks ( this . chunks ) ,
47- md_store . load_parts_objects_for_chunks ( this . chunks ) ,
48- this . mark_building ( )
49- ) )
50- . then ( ( ) => this . refresh_alloc ( ) )
51- . then ( ( ) => this . analyze_chunks ( ) )
52- // .then(() => this.refresh_alloc())
53- . then ( ( ) => this . allocate_blocks ( ) )
54- . then ( ( ) => this . replicate_blocks ( ) )
55- . then ( ( ) => this . update_db ( ) )
56- . then ( ( ) => {
57- // return error from the promise if any replication failed,
58- // so that caller will know the build isn't really complete,
59- // although it might partially succeeded
60- if ( this . had_errors ) {
61- throw new Error ( 'MapBuilder had errors' ) ;
62- }
39+ dbg . log1 ( 'MapBuilder.run:' , 'batch start' , this . chunk_ids . length , 'chunks' ) ;
40+ if ( ! this . chunk_ids . length ) return ;
41+
42+ return builder_lock . surround_keys ( _ . map ( this . chunk_ids , String ) , ( ) => {
43+ return P . resolve ( this . reload_chunks ( this . chunk_ids ) )
44+ . then ( ( ) => P . join (
45+ system_store . refresh ( ) ,
46+ md_store . load_blocks_for_chunks ( this . chunks ) ,
47+ md_store . load_parts_objects_for_chunks ( this . chunks ) ,
48+ this . mark_building ( )
49+ ) )
50+ . then ( ( ) => this . refresh_alloc ( ) )
51+ . then ( ( ) => this . analyze_chunks ( ) )
52+ . then ( ( ) => this . allocate_blocks ( ) )
53+ . then ( ( ) => this . replicate_blocks ( ) )
54+ . then ( ( ) => this . update_db ( ) )
55+ . then ( ( ) => {
56+ // return error from the promise if any replication failed,
57+ // so that caller will know the build isn't really complete,
58+ // although it might partially succeeded
59+ if ( this . had_errors ) {
60+ throw new Error ( 'MapBuilder had errors' ) ;
61+ }
62+ } ) ;
63+ } ) ;
64+ }
65+
66+
67+ // In order to get the most relevant data regarding the chunks
68+ // Note that there is always a possibility that the chunks will cease to exist
69+ reload_chunks ( chunk_ids ) {
70+ var query = {
71+ _id : {
72+ $in : chunk_ids
73+ } ,
74+ deleted : null
75+ } ;
76+
77+ return P . resolve ( md_store . DataChunk . find ( query )
78+ . lean ( )
79+ . exec ( ) )
80+ . then ( chunks => {
81+ this . chunks = chunks ;
6382 } ) ;
6483 }
6584
0 commit comments