2
2
//! for installing from a directory or tarball to an installation
3
3
//! prefix, represented by a `Components` instance.
4
4
5
+ use std:: cmp:: min;
5
6
use std:: collections:: { HashMap , HashSet } ;
6
7
use std:: fmt;
7
8
use std:: io:: { self , ErrorKind as IOErrorKind , Read } ;
@@ -364,16 +365,19 @@ fn unpack_without_first_dir<'a, R: Read>(
364
365
continue ;
365
366
}
366
367
368
+ struct SenderEntry < ' a , ' b , R : std:: io:: Read > {
369
+ sender : Box < dyn FnMut ( Vec < u8 > ) -> bool + ' a > ,
370
+ entry : tar:: Entry < ' b , R > ,
371
+ read : usize ,
372
+ }
373
+
367
374
/// true if either no sender_entry was provided, or the incremental file
368
375
/// has been fully dispatched.
369
376
fn flush_ios < ' a , R : std:: io:: Read , P : AsRef < Path > > (
370
377
mut budget : & mut MemoryBudget ,
371
378
io_executor : & dyn Executor ,
372
379
mut directories : & mut HashMap < PathBuf , DirStatus > ,
373
- mut sender_entry : Option < & mut (
374
- Box < dyn FnMut ( Vec < u8 > ) -> bool + ' a > ,
375
- & mut tar:: Entry < ' _ , R > ,
376
- ) > ,
380
+ mut sender_entry : Option < & mut SenderEntry < ' a , ' _ , R > > ,
377
381
full_path : P ,
378
382
) -> Result < bool > {
379
383
let mut result = sender_entry. is_none ( ) ;
@@ -384,16 +388,19 @@ fn unpack_without_first_dir<'a, R: Read>(
384
388
trigger_children ( & * io_executor, & mut directories, & mut budget, op) ?;
385
389
}
386
390
// Maybe stream a file incrementally
387
- if let Some ( ( sender, entry ) ) = sender_entry. as_mut ( ) {
391
+ if let Some ( sender) = sender_entry. as_mut ( ) {
388
392
if budget. available ( ) as u64 >= IO_CHUNK_SIZE {
389
- let mut v = vec ! [ 0 ; IO_CHUNK_SIZE as usize ] ;
390
- let len = entry. read ( & mut v) ?;
393
+ let chunk_size = min ( IO_CHUNK_SIZE , sender. entry . size ( ) - sender. read as u64 ) ;
394
+ let mut v = vec ! [ 0 ; chunk_size as usize ] ;
395
+ let len = v. len ( ) ;
391
396
if len == 0 {
392
397
result = true ;
398
+ } else {
399
+ sender. entry . read_exact ( & mut v) ?;
400
+ sender. read += v. len ( ) ;
393
401
}
394
- v. resize ( len, 0 ) ;
395
402
budget. claim_chunk ( len) ;
396
- if !sender ( v) {
403
+ if !( sender. sender ) ( v) {
397
404
bail ! ( format!(
398
405
"IO receiver for '{}' disconnected" ,
399
406
full_path. as_ref( ) . display( )
@@ -519,8 +526,13 @@ fn unpack_without_first_dir<'a, R: Read>(
519
526
}
520
527
}
521
528
522
- let mut incremental_file_sender = incremental_file_sender
523
- . map ( |incremental_file_sender| ( incremental_file_sender, & mut entry) ) ;
529
+ let mut incremental_file_sender = incremental_file_sender. map ( |incremental_file_sender| {
530
+ ( SenderEntry {
531
+ sender : incremental_file_sender,
532
+ entry,
533
+ read : 0 ,
534
+ } )
535
+ } ) ;
524
536
525
537
// monitor io queue and feed in the content of the file (if needed)
526
538
while !flush_ios (
0 commit comments