1313// limitations under the License.
1414
1515use crate :: master:: fs:: MasterFilesystem ;
16+ use crate :: master:: quota:: QuotaManager ;
1617use crate :: master:: replication:: master_replication_manager:: MasterReplicationManager ;
1718use crate :: master:: MasterMonitor ;
1819use curvine_common:: error:: FsError ;
@@ -30,6 +31,7 @@ pub struct HeartbeatChecker {
3031 worker_blacklist_ms : u64 ,
3132 worker_lost_ms : u64 ,
3233 replication_manager : Arc < MasterReplicationManager > ,
34+ quota_manager : Arc < QuotaManager > ,
3335}
3436
3537impl HeartbeatChecker {
@@ -38,6 +40,7 @@ impl HeartbeatChecker {
3840 monitor : MasterMonitor ,
3941 executor : Arc < GroupExecutor > ,
4042 replication_manager : Arc < MasterReplicationManager > ,
43+ quota_manager : Arc < QuotaManager > ,
4144 ) -> Self {
4245 let worker_blacklist_ms = fs. conf . worker_blacklist_interval_ms ( ) ;
4346 let worker_lost_ms = fs. conf . worker_lost_interval_ms ( ) ;
@@ -48,6 +51,7 @@ impl HeartbeatChecker {
4851 worker_blacklist_ms,
4952 worker_lost_ms,
5053 replication_manager,
54+ quota_manager,
5155 }
5256 }
5357}
@@ -60,50 +64,56 @@ impl LoopTask for HeartbeatChecker {
6064 return Ok ( ( ) ) ;
6165 }
6266
63- let mut wm = self . fs . worker_manager . write ( ) ;
64- let workers = wm. get_last_heartbeat ( ) ;
65- let now = LocalTime :: mills ( ) ;
67+ {
68+ let mut wm = self . fs . worker_manager . write ( ) ;
69+ let workers = wm. get_last_heartbeat ( ) ;
70+ let now = LocalTime :: mills ( ) ;
6671
67- for ( id, last_update) in workers {
68- if now > last_update + self . worker_blacklist_ms {
69- // Worker blacklist timeout
70- let worker = wm. add_blacklist_worker ( id) ;
71- warn ! (
72- "Worker {:?} has no heartbeat for more than {} ms and will be blacklisted" ,
73- worker, self . worker_blacklist_ms
74- ) ;
75- }
72+ for ( id, last_update) in workers {
73+ if now > last_update + self . worker_blacklist_ms {
74+ // Worker blacklist timeout
75+ let worker = wm. add_blacklist_worker ( id) ;
76+ warn ! (
77+ "Worker {:?} has no heartbeat for more than {} ms and will be blacklisted" ,
78+ worker, self . worker_blacklist_ms
79+ ) ;
80+ }
7681
77- if now > last_update + self . worker_lost_ms {
78- // Heartbeat timeout
79- let removed = wm. remove_expired_worker ( id) ;
80- warn ! (
81- "Worker {:?} has no heartbeat for more than {} ms and will be removed" ,
82- removed, self . worker_lost_ms
83- ) ;
84- // Asynchronously delete all block location data.
85- let fs = self . fs . clone ( ) ;
86- let rm = self . replication_manager . clone ( ) ;
87- let res = self . executor . spawn ( move || {
88- let spend = TimeSpent :: new ( ) ;
89- let block_ids = try_log ! ( fs. delete_locations( id) , vec![ ] ) ;
90- let block_num = block_ids. len ( ) ;
91- if let Err ( e) = rm. report_under_replicated_blocks ( id, block_ids) {
92- error ! (
93- "Errors on reporting under-replicated {} blocks. err: {:?}" ,
94- block_num, e
95- ) ;
96- }
97- info ! (
98- "Delete worker {} all locations used {} ms" ,
99- id,
100- spend. used_ms( )
82+ if now > last_update + self . worker_lost_ms {
83+ // Heartbeat timeout
84+ let removed = wm. remove_expired_worker ( id) ;
85+ warn ! (
86+ "Worker {:?} has no heartbeat for more than {} ms and will be removed" ,
87+ removed, self . worker_lost_ms
10188 ) ;
102- } ) ;
103- let _ = try_log ! ( res) ;
89+ // Asynchronously delete all block location data.
90+ let fs = self . fs . clone ( ) ;
91+ let rm = self . replication_manager . clone ( ) ;
92+ let res = self . executor . spawn ( move || {
93+ let spend = TimeSpent :: new ( ) ;
94+ let block_ids = try_log ! ( fs. delete_locations( id) , vec![ ] ) ;
95+ let block_num = block_ids. len ( ) ;
96+ if let Err ( e) = rm. report_under_replicated_blocks ( id, block_ids) {
97+ error ! (
98+ "Errors on reporting under-replicated {} blocks. err: {:?}" ,
99+ block_num, e
100+ ) ;
101+ }
102+ info ! (
103+ "Delete worker {} all locations used {} ms" ,
104+ id,
105+ spend. used_ms( )
106+ ) ;
107+ } ) ;
108+ let _ = try_log ! ( res) ;
109+ }
104110 }
105111 }
106112
113+ if let Ok ( info) = self . fs . master_info ( ) {
114+ self . quota_manager . detector ( Some ( info) ) ;
115+ } ;
116+
107117 Ok ( ( ) )
108118 }
109119
0 commit comments