Skip to content

Commit 74f4727

Browse files
committed
Reuse the same memory buffers during unpacking
We think we have memory fragmentation causing failed extraction in Windows containers and smaller Unix devices. Writes of both full objects and streamed objects now re-use the Vec via a sharded-slab implementation. To facilitate the more complicated memory logic, buffer limit management is now integrated into the IO Executor: the immediate executor doesn't limit at all as no outstanding buffers occur, and the threaded executor tracks both the total allocated buffers as well as whether a reusable buffer is available.
1 parent cbd1c84 commit 74f4727

15 files changed

+520
-153
lines changed

Cargo.lock

+31
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ chrono = "0.4"
3232
clap = "2"
3333
download = {path = "download", default-features = false}
3434
effective-limits = "0.5.2"
35+
enum-map = "1.1.0"
3536
flate2 = "1"
3637
git-testament = "0.1.4"
3738
home = {git = "https://github.com/rbtcollins/home", rev = "a243ee2fbee6022c57d56f5aa79aefe194eabe53"}
@@ -52,6 +53,7 @@ scopeguard = "1"
5253
semver = "0.11"
5354
serde = {version = "1.0", features = ["derive"]}
5455
sha2 = "0.9"
56+
sharded-slab = "0.1.1"
5557
strsim = "0.10"
5658
tar = "0.4.26"
5759
tempfile = "3.1"

src/cli/common.rs

+3
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,9 @@ impl NotifyOnConsole {
148148
NotificationLevel::Error => {
149149
err!("{}", n);
150150
}
151+
NotificationLevel::Debug => {
152+
debug!("{}", n);
153+
}
151154
}
152155
}
153156
}

src/diskio/immediate.rs

+20-3
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::{
1111
time::Instant,
1212
};
1313

14-
use super::{CompletedIo, Executor, Item};
14+
use super::{CompletedIo, Executor, FileBuffer, Item};
1515

1616
#[derive(Debug)]
1717
pub struct _IncrementalFileState {
@@ -70,7 +70,11 @@ impl Executor for ImmediateUnpacker {
7070
item.result = match &mut item.kind {
7171
super::Kind::Directory => super::create_dir(&item.full_path),
7272
super::Kind::File(ref contents) => {
73-
super::write_file(&item.full_path, &contents, item.mode)
73+
if let super::FileBuffer::Immediate(ref contents) = &contents {
74+
super::write_file(&item.full_path, contents, item.mode)
75+
} else {
76+
unreachable!()
77+
}
7478
}
7579
super::Kind::IncrementalFile(_incremental_file) => {
7680
return {
@@ -124,6 +128,14 @@ impl Executor for ImmediateUnpacker {
124128
super::IncrementalFileState::Immediate(self.incremental_state.clone())
125129
}
126130
}
131+
132+
fn get_buffer(&mut self, capacity: usize) -> super::FileBuffer {
133+
super::FileBuffer::Immediate(Vec::with_capacity(capacity))
134+
}
135+
136+
fn buffer_available(&self, _len: usize) -> bool {
137+
true
138+
}
127139
}
128140

129141
/// The non-shared state for writing a file incrementally
@@ -160,10 +172,15 @@ impl IncrementalFileWriter {
160172
})
161173
}
162174

163-
pub fn chunk_submit(&mut self, chunk: Vec<u8>) -> bool {
175+
pub fn chunk_submit(&mut self, chunk: FileBuffer) -> bool {
164176
if (self.state.lock().unwrap()).is_none() {
165177
return false;
166178
}
179+
let chunk = if let FileBuffer::Immediate(v) = chunk {
180+
v
181+
} else {
182+
unreachable!()
183+
};
167184
match self.write(chunk) {
168185
Ok(v) => v,
169186
Err(e) => {

src/diskio/mod.rs

+91-10
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ mod test;
5757
pub mod threaded;
5858

5959
use std::io::{self, Write};
60+
use std::ops::{Deref, DerefMut};
6061
use std::path::{Path, PathBuf};
6162
use std::sync::mpsc::Receiver;
6263
use std::time::{Duration, Instant};
@@ -66,12 +67,73 @@ use anyhow::{Context, Result};
6667

6768
use crate::process;
6869
use crate::utils::notifications::Notification;
70+
use threaded::PoolReference;
71+
72+
/// Carries the implementation specific data for complete file transfers into the executor.
73+
#[derive(Debug)]
74+
pub enum FileBuffer {
75+
Immediate(Vec<u8>),
76+
// A reference to the object in the pool, and a handle to write to it
77+
Threaded(PoolReference),
78+
}
79+
80+
impl FileBuffer {
81+
/// All the buffers space to be re-used when the last reference to it is dropped.
82+
pub(crate) fn clear(&mut self) {
83+
if let FileBuffer::Threaded(ref mut contents) = self {
84+
contents.clear()
85+
}
86+
}
87+
88+
pub(crate) fn len(&self) -> usize {
89+
match self {
90+
FileBuffer::Immediate(ref vec) => vec.len(),
91+
FileBuffer::Threaded(PoolReference::Owned(owned, _)) => owned.len(),
92+
FileBuffer::Threaded(PoolReference::Mut(mutable, _)) => mutable.len(),
93+
}
94+
}
95+
96+
pub(crate) fn finished(self) -> Self {
97+
match self {
98+
FileBuffer::Threaded(PoolReference::Mut(mutable, pool)) => {
99+
FileBuffer::Threaded(PoolReference::Owned(mutable.downgrade(), pool))
100+
}
101+
_ => self,
102+
}
103+
}
104+
}
105+
106+
impl Deref for FileBuffer {
107+
type Target = Vec<u8>;
108+
109+
fn deref(&self) -> &Self::Target {
110+
match self {
111+
FileBuffer::Immediate(ref vec) => &vec,
112+
FileBuffer::Threaded(PoolReference::Owned(owned, _)) => owned,
113+
FileBuffer::Threaded(PoolReference::Mut(mutable, _)) => mutable,
114+
}
115+
}
116+
}
117+
118+
impl DerefMut for FileBuffer {
119+
fn deref_mut(&mut self) -> &mut Self::Target {
120+
match self {
121+
FileBuffer::Immediate(ref mut vec) => vec,
122+
FileBuffer::Threaded(PoolReference::Owned(_, _)) => {
123+
unimplemented!()
124+
}
125+
FileBuffer::Threaded(PoolReference::Mut(mutable, _)) => mutable,
126+
}
127+
}
128+
}
129+
130+
pub(crate) const IO_CHUNK_SIZE: usize = 16_777_216;
69131

70132
/// Carries the implementation specific channel data into the executor.
71133
#[derive(Debug)]
72134
pub enum IncrementalFile {
73135
ImmediateReceiver,
74-
ThreadedReceiver(Receiver<Vec<u8>>),
136+
ThreadedReceiver(Receiver<FileBuffer>),
75137
}
76138

77139
// The basic idea is that in single threaded mode we get this pattern:
@@ -116,7 +178,7 @@ pub enum IncrementalFile {
116178
#[derive(Debug)]
117179
pub enum Kind {
118180
Directory,
119-
File(Vec<u8>),
181+
File(FileBuffer),
120182
IncrementalFile(IncrementalFile),
121183
}
122184

@@ -160,7 +222,7 @@ impl Item {
160222
}
161223
}
162224

163-
pub fn write_file(full_path: PathBuf, content: Vec<u8>, mode: u32) -> Self {
225+
pub fn write_file(full_path: PathBuf, mode: u32, content: FileBuffer) -> Self {
164226
let len = content.len();
165227
Self {
166228
full_path,
@@ -177,7 +239,7 @@ impl Item {
177239
full_path: PathBuf,
178240
mode: u32,
179241
state: IncrementalFileState,
180-
) -> Result<(Self, Box<dyn FnMut(Vec<u8>) -> bool + 'a>)> {
242+
) -> Result<(Self, Box<dyn FnMut(FileBuffer) -> bool + 'a>)> {
181243
let (chunk_submit, content_callback) = state.incremental_file_channel(&full_path, mode)?;
182244
let result = Self {
183245
full_path,
@@ -210,19 +272,19 @@ impl IncrementalFileState {
210272
&self,
211273
path: &Path,
212274
mode: u32,
213-
) -> Result<(Box<dyn FnMut(Vec<u8>) -> bool>, IncrementalFile)> {
275+
) -> Result<(Box<dyn FnMut(FileBuffer) -> bool>, IncrementalFile)> {
214276
use std::sync::mpsc::channel;
215277
match *self {
216278
IncrementalFileState::Threaded => {
217-
let (tx, rx) = channel::<Vec<u8>>();
279+
let (tx, rx) = channel::<FileBuffer>();
218280
let content_callback = IncrementalFile::ThreadedReceiver(rx);
219-
let chunk_submit = move |chunk: Vec<u8>| tx.send(chunk).is_ok();
281+
let chunk_submit = move |chunk: FileBuffer| tx.send(chunk).is_ok();
220282
Ok((Box::new(chunk_submit), content_callback))
221283
}
222284
IncrementalFileState::Immediate(ref state) => {
223285
let content_callback = IncrementalFile::ImmediateReceiver;
224286
let mut writer = immediate::IncrementalFileWriter::new(path, mode, state.clone())?;
225-
let chunk_submit = move |chunk: Vec<u8>| writer.chunk_submit(chunk);
287+
let chunk_submit = move |chunk: FileBuffer| writer.chunk_submit(chunk);
226288
Ok((Box::new(chunk_submit), content_callback))
227289
}
228290
}
@@ -258,6 +320,14 @@ pub trait Executor {
258320

259321
/// Get any state needed for incremental file processing
260322
fn incremental_file_state(&self) -> IncrementalFileState;
323+
324+
/// Get a disk buffer E.g. this gets the right sized pool object for
325+
/// optimized situations, or just a malloc when optimisations are off etc
326+
/// etc.
327+
fn get_buffer(&mut self, len: usize) -> FileBuffer;
328+
329+
/// Query the memory budget to see if a particular size buffer is available
330+
fn buffer_available(&self, len: usize) -> bool;
261331
}
262332

263333
/// Trivial single threaded IO to be used from executors.
@@ -267,7 +337,17 @@ pub fn perform<F: Fn(usize)>(item: &mut Item, chunk_complete_callback: F) {
267337
// Files, write them.
268338
item.result = match &mut item.kind {
269339
Kind::Directory => create_dir(&item.full_path),
270-
Kind::File(ref contents) => write_file(&item.full_path, &contents, item.mode),
340+
Kind::File(ref mut contents) => {
341+
contents.clear();
342+
match contents {
343+
FileBuffer::Immediate(ref contents) => {
344+
write_file(&item.full_path, &contents, item.mode)
345+
}
346+
FileBuffer::Threaded(ref mut contents) => {
347+
write_file(&item.full_path, &contents, item.mode)
348+
}
349+
}
350+
}
271351
Kind::IncrementalFile(incremental_file) => write_file_incremental(
272352
&item.full_path,
273353
incremental_file,
@@ -367,6 +447,7 @@ pub fn create_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
367447
/// Get the executor for disk IO.
368448
pub fn get_executor<'a>(
369449
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
450+
ram_budget: usize,
370451
) -> Result<Box<dyn Executor + 'a>> {
371452
// If this gets lots of use, consider exposing via the config file.
372453
let thread_count = match process().var("RUSTUP_IO_THREADS") {
@@ -377,6 +458,6 @@ pub fn get_executor<'a>(
377458
};
378459
Ok(match thread_count {
379460
0 | 1 => Box::new(immediate::ImmediateUnpacker::new()),
380-
n => Box::new(threaded::Threaded::new(notify_handler, n)),
461+
n => Box::new(threaded::Threaded::new(notify_handler, n, ram_budget)),
381462
})
382463
}

0 commit comments

Comments
 (0)