Skip to content

Commit 5e43c1e

Browse files
authored
Merge pull request #2756 from rbtcollins/slabs
Slabs
2 parents 7d8b14d + 74f4727 commit 5e43c1e

15 files changed

+525
-153
lines changed

Cargo.lock

+31
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ chrono = "0.4"
3232
clap = "2"
3333
download = {path = "download", default-features = false}
3434
effective-limits = "0.5.2"
35+
enum-map = "1.1.0"
3536
flate2 = "1"
3637
git-testament = "0.1.4"
3738
home = {git = "https://github.com/rbtcollins/home", rev = "a243ee2fbee6022c57d56f5aa79aefe194eabe53"}
@@ -52,6 +53,7 @@ scopeguard = "1"
5253
semver = "0.11"
5354
serde = {version = "1.0", features = ["derive"]}
5455
sha2 = "0.9"
56+
sharded-slab = "0.1.1"
5557
strsim = "0.10"
5658
tar = "0.4.26"
5759
tempfile = "3.1"

src/cli/common.rs

+3
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,9 @@ impl NotifyOnConsole {
148148
NotificationLevel::Error => {
149149
err!("{}", n);
150150
}
151+
NotificationLevel::Debug => {
152+
debug!("{}", n);
153+
}
151154
}
152155
}
153156
}

src/diskio/immediate.rs

+20-3
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::{
1111
time::Instant,
1212
};
1313

14-
use super::{CompletedIo, Executor, Item};
14+
use super::{CompletedIo, Executor, FileBuffer, Item};
1515

1616
#[derive(Debug)]
1717
pub struct _IncrementalFileState {
@@ -70,7 +70,11 @@ impl Executor for ImmediateUnpacker {
7070
item.result = match &mut item.kind {
7171
super::Kind::Directory => super::create_dir(&item.full_path),
7272
super::Kind::File(ref contents) => {
73-
super::write_file(&item.full_path, &contents, item.mode)
73+
if let super::FileBuffer::Immediate(ref contents) = &contents {
74+
super::write_file(&item.full_path, contents, item.mode)
75+
} else {
76+
unreachable!()
77+
}
7478
}
7579
super::Kind::IncrementalFile(_incremental_file) => {
7680
return {
@@ -124,6 +128,14 @@ impl Executor for ImmediateUnpacker {
124128
super::IncrementalFileState::Immediate(self.incremental_state.clone())
125129
}
126130
}
131+
132+
fn get_buffer(&mut self, capacity: usize) -> super::FileBuffer {
133+
super::FileBuffer::Immediate(Vec::with_capacity(capacity))
134+
}
135+
136+
fn buffer_available(&self, _len: usize) -> bool {
137+
true
138+
}
127139
}
128140

129141
/// The non-shared state for writing a file incrementally
@@ -160,10 +172,15 @@ impl IncrementalFileWriter {
160172
})
161173
}
162174

163-
pub fn chunk_submit(&mut self, chunk: Vec<u8>) -> bool {
175+
pub fn chunk_submit(&mut self, chunk: FileBuffer) -> bool {
164176
if (self.state.lock().unwrap()).is_none() {
165177
return false;
166178
}
179+
let chunk = if let FileBuffer::Immediate(v) = chunk {
180+
v
181+
} else {
182+
unreachable!()
183+
};
167184
match self.write(chunk) {
168185
Ok(v) => v,
169186
Err(e) => {

src/diskio/mod.rs

+91-10
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ mod test;
5757
pub mod threaded;
5858

5959
use std::io::{self, Write};
60+
use std::ops::{Deref, DerefMut};
6061
use std::path::{Path, PathBuf};
6162
use std::sync::mpsc::Receiver;
6263
use std::time::{Duration, Instant};
@@ -66,12 +67,73 @@ use anyhow::{Context, Result};
6667

6768
use crate::process;
6869
use crate::utils::notifications::Notification;
70+
use threaded::PoolReference;
71+
72+
/// Carries the implementation specific data for complete file transfers into the executor.
73+
#[derive(Debug)]
74+
pub enum FileBuffer {
75+
Immediate(Vec<u8>),
76+
// A reference to the object in the pool, and a handle to write to it
77+
Threaded(PoolReference),
78+
}
79+
80+
impl FileBuffer {
81+
/// All the buffers space to be re-used when the last reference to it is dropped.
82+
pub(crate) fn clear(&mut self) {
83+
if let FileBuffer::Threaded(ref mut contents) = self {
84+
contents.clear()
85+
}
86+
}
87+
88+
pub(crate) fn len(&self) -> usize {
89+
match self {
90+
FileBuffer::Immediate(ref vec) => vec.len(),
91+
FileBuffer::Threaded(PoolReference::Owned(owned, _)) => owned.len(),
92+
FileBuffer::Threaded(PoolReference::Mut(mutable, _)) => mutable.len(),
93+
}
94+
}
95+
96+
pub(crate) fn finished(self) -> Self {
97+
match self {
98+
FileBuffer::Threaded(PoolReference::Mut(mutable, pool)) => {
99+
FileBuffer::Threaded(PoolReference::Owned(mutable.downgrade(), pool))
100+
}
101+
_ => self,
102+
}
103+
}
104+
}
105+
106+
impl Deref for FileBuffer {
107+
type Target = Vec<u8>;
108+
109+
fn deref(&self) -> &Self::Target {
110+
match self {
111+
FileBuffer::Immediate(ref vec) => &vec,
112+
FileBuffer::Threaded(PoolReference::Owned(owned, _)) => owned,
113+
FileBuffer::Threaded(PoolReference::Mut(mutable, _)) => mutable,
114+
}
115+
}
116+
}
117+
118+
impl DerefMut for FileBuffer {
119+
fn deref_mut(&mut self) -> &mut Self::Target {
120+
match self {
121+
FileBuffer::Immediate(ref mut vec) => vec,
122+
FileBuffer::Threaded(PoolReference::Owned(_, _)) => {
123+
unimplemented!()
124+
}
125+
FileBuffer::Threaded(PoolReference::Mut(mutable, _)) => mutable,
126+
}
127+
}
128+
}
129+
130+
pub(crate) const IO_CHUNK_SIZE: usize = 16_777_216;
69131

70132
/// Carries the implementation specific channel data into the executor.
71133
#[derive(Debug)]
72134
pub enum IncrementalFile {
73135
ImmediateReceiver,
74-
ThreadedReceiver(Receiver<Vec<u8>>),
136+
ThreadedReceiver(Receiver<FileBuffer>),
75137
}
76138

77139
// The basic idea is that in single threaded mode we get this pattern:
@@ -116,7 +178,7 @@ pub enum IncrementalFile {
116178
#[derive(Debug)]
117179
pub enum Kind {
118180
Directory,
119-
File(Vec<u8>),
181+
File(FileBuffer),
120182
IncrementalFile(IncrementalFile),
121183
}
122184

@@ -160,7 +222,7 @@ impl Item {
160222
}
161223
}
162224

163-
pub fn write_file(full_path: PathBuf, content: Vec<u8>, mode: u32) -> Self {
225+
pub fn write_file(full_path: PathBuf, mode: u32, content: FileBuffer) -> Self {
164226
let len = content.len();
165227
Self {
166228
full_path,
@@ -177,7 +239,7 @@ impl Item {
177239
full_path: PathBuf,
178240
mode: u32,
179241
state: IncrementalFileState,
180-
) -> Result<(Self, Box<dyn FnMut(Vec<u8>) -> bool + 'a>)> {
242+
) -> Result<(Self, Box<dyn FnMut(FileBuffer) -> bool + 'a>)> {
181243
let (chunk_submit, content_callback) = state.incremental_file_channel(&full_path, mode)?;
182244
let result = Self {
183245
full_path,
@@ -210,19 +272,19 @@ impl IncrementalFileState {
210272
&self,
211273
path: &Path,
212274
mode: u32,
213-
) -> Result<(Box<dyn FnMut(Vec<u8>) -> bool>, IncrementalFile)> {
275+
) -> Result<(Box<dyn FnMut(FileBuffer) -> bool>, IncrementalFile)> {
214276
use std::sync::mpsc::channel;
215277
match *self {
216278
IncrementalFileState::Threaded => {
217-
let (tx, rx) = channel::<Vec<u8>>();
279+
let (tx, rx) = channel::<FileBuffer>();
218280
let content_callback = IncrementalFile::ThreadedReceiver(rx);
219-
let chunk_submit = move |chunk: Vec<u8>| tx.send(chunk).is_ok();
281+
let chunk_submit = move |chunk: FileBuffer| tx.send(chunk).is_ok();
220282
Ok((Box::new(chunk_submit), content_callback))
221283
}
222284
IncrementalFileState::Immediate(ref state) => {
223285
let content_callback = IncrementalFile::ImmediateReceiver;
224286
let mut writer = immediate::IncrementalFileWriter::new(path, mode, state.clone())?;
225-
let chunk_submit = move |chunk: Vec<u8>| writer.chunk_submit(chunk);
287+
let chunk_submit = move |chunk: FileBuffer| writer.chunk_submit(chunk);
226288
Ok((Box::new(chunk_submit), content_callback))
227289
}
228290
}
@@ -258,6 +320,14 @@ pub trait Executor {
258320

259321
/// Get any state needed for incremental file processing
260322
fn incremental_file_state(&self) -> IncrementalFileState;
323+
324+
/// Get a disk buffer E.g. this gets the right sized pool object for
325+
/// optimized situations, or just a malloc when optimisations are off etc
326+
/// etc.
327+
fn get_buffer(&mut self, len: usize) -> FileBuffer;
328+
329+
/// Query the memory budget to see if a particular size buffer is available
330+
fn buffer_available(&self, len: usize) -> bool;
261331
}
262332

263333
/// Trivial single threaded IO to be used from executors.
@@ -267,7 +337,17 @@ pub fn perform<F: Fn(usize)>(item: &mut Item, chunk_complete_callback: F) {
267337
// Files, write them.
268338
item.result = match &mut item.kind {
269339
Kind::Directory => create_dir(&item.full_path),
270-
Kind::File(ref contents) => write_file(&item.full_path, &contents, item.mode),
340+
Kind::File(ref mut contents) => {
341+
contents.clear();
342+
match contents {
343+
FileBuffer::Immediate(ref contents) => {
344+
write_file(&item.full_path, &contents, item.mode)
345+
}
346+
FileBuffer::Threaded(ref mut contents) => {
347+
write_file(&item.full_path, &contents, item.mode)
348+
}
349+
}
350+
}
271351
Kind::IncrementalFile(incremental_file) => write_file_incremental(
272352
&item.full_path,
273353
incremental_file,
@@ -367,6 +447,7 @@ pub fn create_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
367447
/// Get the executor for disk IO.
368448
pub fn get_executor<'a>(
369449
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
450+
ram_budget: usize,
370451
) -> Result<Box<dyn Executor + 'a>> {
371452
// If this gets lots of use, consider exposing via the config file.
372453
let thread_count = match process().var("RUSTUP_IO_THREADS") {
@@ -377,6 +458,6 @@ pub fn get_executor<'a>(
377458
};
378459
Ok(match thread_count {
379460
0 | 1 => Box::new(immediate::ImmediateUnpacker::new()),
380-
n => Box::new(threaded::Threaded::new(notify_handler, n)),
461+
n => Box::new(threaded::Threaded::new(notify_handler, n, ram_budget)),
381462
})
382463
}

0 commit comments

Comments
 (0)