Skip to content

Commit 55ba10d

Browse files
authored
Merge pull request #2750 from rbtcollins/fillthebuffers
Fixes #2748 by filling the streaming buffers fully
2 parents 8e632bb + ad3ce8b commit 55ba10d

File tree

2 files changed

+19
-9
lines changed

2 files changed

+19
-9
lines changed

src/diskio/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,7 @@ pub fn write_file_incremental<P: AsRef<Path>, F: Fn(usize)>(
343343
{
344344
trace_scoped!("write_segment", "name": path_display, "len": len);
345345
f.write_all(&contents)?;
346+
drop(contents);
346347
chunk_complete_callback(len);
347348
}
348349
}

src/dist/component/package.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -364,16 +364,18 @@ fn unpack_without_first_dir<'a, R: Read>(
364364
continue;
365365
}
366366

367+
struct SenderEntry<'a, 'b, R: std::io::Read> {
368+
sender: Box<dyn FnMut(Vec<u8>) -> bool + 'a>,
369+
entry: tar::Entry<'b, R>,
370+
}
371+
367372
/// true if either no sender_entry was provided, or the incremental file
368373
/// has been fully dispatched.
369374
fn flush_ios<'a, R: std::io::Read, P: AsRef<Path>>(
370375
mut budget: &mut MemoryBudget,
371376
io_executor: &dyn Executor,
372377
mut directories: &mut HashMap<PathBuf, DirStatus>,
373-
mut sender_entry: Option<&mut (
374-
Box<dyn FnMut(Vec<u8>) -> bool + 'a>,
375-
&mut tar::Entry<'_, R>,
376-
)>,
378+
mut sender_entry: Option<&mut SenderEntry<'a, '_, R>>,
377379
full_path: P,
378380
) -> Result<bool> {
379381
let mut result = sender_entry.is_none();
@@ -384,16 +386,20 @@ fn unpack_without_first_dir<'a, R: Read>(
384386
trigger_children(&*io_executor, &mut directories, &mut budget, op)?;
385387
}
386388
// Maybe stream a file incrementally
387-
if let Some((sender, entry)) = sender_entry.as_mut() {
389+
if let Some(sender) = sender_entry.as_mut() {
388390
if budget.available() as u64 >= IO_CHUNK_SIZE {
389391
let mut v = vec![0; IO_CHUNK_SIZE as usize];
390-
let len = entry.read(&mut v)?;
392+
let len = sender
393+
.entry
394+
.by_ref()
395+
.take(IO_CHUNK_SIZE)
396+
.read_to_end(&mut v)?;
391397
if len == 0 {
392398
result = true;
393399
}
394400
v.resize(len, 0);
395401
budget.claim_chunk(len);
396-
if !sender(v) {
402+
if !(sender.sender)(v) {
397403
bail!(format!(
398404
"IO receiver for '{}' disconnected",
399405
full_path.as_ref().display()
@@ -519,8 +525,11 @@ fn unpack_without_first_dir<'a, R: Read>(
519525
}
520526
}
521527

522-
let mut incremental_file_sender = incremental_file_sender
523-
.map(|incremental_file_sender| (incremental_file_sender, &mut entry));
528+
let mut incremental_file_sender =
529+
incremental_file_sender.map(|incremental_file_sender| SenderEntry {
530+
sender: incremental_file_sender,
531+
entry,
532+
});
524533

525534
// monitor io queue and feed in the content of the file (if needed)
526535
while !flush_ios(

0 commit comments

Comments
 (0)