@@ -41,7 +41,7 @@ use once_cell::sync::OnceCell;
41
41
use parquet:: errors:: ParquetError ;
42
42
use relative_path:: RelativePathBuf ;
43
43
use std:: time:: Duration ;
44
- use sysinfo:: { Disks , System } ;
44
+ use sysinfo:: Disks ;
45
45
use tokio:: fs:: { self , DirEntry } ;
46
46
use tokio:: io:: AsyncWriteExt ;
47
47
use tokio_stream:: wrappers:: ReadDirStream ;
@@ -135,24 +135,24 @@ impl HotTierManager {
135
135
}
136
136
}
137
137
138
- let ( total_disk_space, available_disk_space, used_disk_space) = get_disk_usage ( ) ;
139
-
140
- if let ( Some ( total_disk_space) , _, Some ( used_disk_space) ) =
141
- ( total_disk_space, available_disk_space, used_disk_space)
138
+ if let Some ( DiskUtil {
139
+ total_space,
140
+ used_space,
141
+ ..
142
+ } ) = get_disk_usage ( )
142
143
{
143
144
let ( total_hot_tier_size, total_hot_tier_used_size) =
144
145
self . get_hot_tiers_size ( stream) . await ?;
145
- let disk_threshold =
146
- ( CONFIG . parseable . max_disk_usage * total_disk_space as f64 ) / 100.0 ;
146
+ let disk_threshold = ( CONFIG . parseable . max_disk_usage * total_space as f64 ) / 100.0 ;
147
147
let max_allowed_hot_tier_size = disk_threshold
148
148
- total_hot_tier_size as f64
149
- - ( used_disk_space as f64
149
+ - ( used_space as f64
150
150
- total_hot_tier_used_size as f64
151
151
- existing_hot_tier_used_size as f64 ) ;
152
152
153
153
if stream_hot_tier_size as f64 > max_allowed_hot_tier_size {
154
154
log:: error!( "disk_threshold: {}, used_disk_space: {}, total_hot_tier_used_size: {}, existing_hot_tier_used_size: {}, total_hot_tier_size: {}" ,
155
- bytes_to_human_size( disk_threshold as u64 ) , bytes_to_human_size( used_disk_space ) , bytes_to_human_size( total_hot_tier_used_size) , bytes_to_human_size( existing_hot_tier_used_size) , bytes_to_human_size( total_hot_tier_size) ) ;
155
+ bytes_to_human_size( disk_threshold as u64 ) , bytes_to_human_size( used_space ) , bytes_to_human_size( total_hot_tier_used_size) , bytes_to_human_size( existing_hot_tier_used_size) , bytes_to_human_size( total_hot_tier_size) ) ;
156
156
return Err ( HotTierError :: ObjectStorageError ( ObjectStorageError :: Custom ( format ! (
157
157
"{} is the total usable disk space for hot tier, cannot set a bigger value." , bytes_to_human_size( max_allowed_hot_tier_size as u64 )
158
158
) ) ) ) ;
@@ -677,16 +677,17 @@ impl HotTierManager {
677
677
///check if the disk is available to download the parquet file
678
678
/// check if the disk usage is above the threshold
679
679
pub async fn is_disk_available ( & self , size_to_download : u64 ) -> Result < bool , HotTierError > {
680
- let ( total_disk_space, available_disk_space, used_disk_space) = get_disk_usage ( ) ;
681
-
682
- if let ( Some ( total_disk_space) , Some ( available_disk_space) , Some ( used_disk_space) ) =
683
- ( total_disk_space, available_disk_space, used_disk_space)
680
+ if let Some ( DiskUtil {
681
+ total_space,
682
+ available_space,
683
+ used_space,
684
+ } ) = get_disk_usage ( )
684
685
{
685
- if available_disk_space < size_to_download {
686
+ if available_space < size_to_download {
686
687
return Ok ( false ) ;
687
688
}
688
689
689
- if ( ( used_disk_space + size_to_download) as f64 * 100.0 / total_disk_space as f64 )
690
+ if ( ( used_space + size_to_download) as f64 * 100.0 / total_space as f64 )
690
691
> CONFIG . parseable . max_disk_usage
691
692
{
692
693
return Ok ( false ) ;
@@ -775,30 +776,31 @@ pub fn hot_tier_file_path(
775
776
object_store:: path:: Path :: from_absolute_path ( path)
776
777
}
777
778
778
- ///get the disk usage for the hot tier storage path
779
- pub fn get_disk_usage ( ) -> ( Option < u64 > , Option < u64 > , Option < u64 > ) {
780
- let mut sys = System :: new_all ( ) ;
781
- sys . refresh_all ( ) ;
782
- let path = CONFIG . parseable . hot_tier_storage_path . as_ref ( ) . unwrap ( ) ;
779
+ struct DiskUtil {
780
+ total_space : u64 ,
781
+ available_space : u64 ,
782
+ used_space : u64 ,
783
+ }
783
784
785
+ ///get the disk usage for the hot tier storage path
786
+ fn get_disk_usage ( ) -> Option < DiskUtil > {
787
+ let path = CONFIG . parseable . hot_tier_storage_path . as_ref ( ) ?;
784
788
let mut disks = Disks :: new_with_refreshed_list ( ) ;
789
+ // TODO: figure out why we sort
785
790
disks. sort_by_key ( |disk| disk. mount_point ( ) . to_str ( ) . unwrap ( ) . len ( ) ) ;
786
791
disks. reverse ( ) ;
787
792
788
793
for disk in disks. iter ( ) {
789
794
if path. starts_with ( disk. mount_point ( ) . to_str ( ) . unwrap ( ) ) {
790
- let total_disk_space = disk. total_space ( ) ;
791
- let available_disk_space = disk. available_space ( ) ;
792
- let used_disk_space = total_disk_space - available_disk_space;
793
- return (
794
- Some ( total_disk_space) ,
795
- Some ( available_disk_space) ,
796
- Some ( used_disk_space) ,
797
- ) ;
795
+ return Some ( DiskUtil {
796
+ total_space : disk. total_space ( ) ,
797
+ available_space : disk. available_space ( ) ,
798
+ used_space : disk. total_space ( ) - disk. available_space ( ) ,
799
+ } ) ;
798
800
}
799
801
}
800
802
801
- ( None , None , None )
803
+ None
802
804
}
803
805
804
806
async fn delete_empty_directory_hot_tier ( path : & Path ) -> io:: Result < ( ) > {
0 commit comments