Skip to content

Make all diskio functions and structs crate-private #2753

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/diskio/immediate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{
use super::{CompletedIo, Executor, FileBuffer, Item};

#[derive(Debug)]
pub struct _IncrementalFileState {
pub(crate) struct _IncrementalFileState {
completed_chunks: Vec<usize>,
err: Option<io::Result<()>>,
item: Option<Item>,
Expand All @@ -24,12 +24,12 @@ pub struct _IncrementalFileState {
pub(super) type IncrementalFileState = Arc<Mutex<Option<_IncrementalFileState>>>;

#[derive(Default, Debug)]
pub struct ImmediateUnpacker {
pub(crate) struct ImmediateUnpacker {
incremental_state: IncrementalFileState,
}

impl ImmediateUnpacker {
pub fn new() -> Self {
pub(crate) fn new() -> Self {
Self {
..Default::default()
}
Expand Down Expand Up @@ -148,7 +148,7 @@ pub(super) struct IncrementalFileWriter {

impl IncrementalFileWriter {
#[allow(unused_variables)]
pub fn new<P: AsRef<Path>>(
pub(crate) fn new<P: AsRef<Path>>(
path: P,
mode: u32,
state: IncrementalFileState,
Expand All @@ -172,7 +172,7 @@ impl IncrementalFileWriter {
})
}

pub fn chunk_submit(&mut self, chunk: FileBuffer) -> bool {
pub(crate) fn chunk_submit(&mut self, chunk: FileBuffer) -> bool {
if (self.state.lock().unwrap()).is_none() {
return false;
}
Expand Down
48 changes: 24 additions & 24 deletions src/diskio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@
// loss or errors in this model.
// f) data gathering: record (name, bytes, start, duration)
// write to disk afterwards as a csv file?
pub mod immediate;
pub(crate) mod immediate;
#[cfg(test)]
mod test;
pub mod threaded;
pub(crate) mod threaded;

use std::io::{self, Write};
use std::ops::{Deref, DerefMut};
Expand All @@ -71,7 +71,7 @@ use threaded::PoolReference;

/// Carries the implementation specific data for complete file transfers into the executor.
#[derive(Debug)]
pub enum FileBuffer {
pub(crate) enum FileBuffer {
Immediate(Vec<u8>),
// A reference to the object in the pool, and a handle to write to it
Threaded(PoolReference),
Expand Down Expand Up @@ -131,7 +131,7 @@ pub(crate) const IO_CHUNK_SIZE: usize = 16_777_216;

/// Carries the implementation specific channel data into the executor.
#[derive(Debug)]
pub enum IncrementalFile {
pub(crate) enum IncrementalFile {
ImmediateReceiver,
ThreadedReceiver(Receiver<FileBuffer>),
}
Expand Down Expand Up @@ -176,41 +176,41 @@ pub enum IncrementalFile {

/// What kind of IO operation to perform
#[derive(Debug)]
pub enum Kind {
pub(crate) enum Kind {
Directory,
File(FileBuffer),
IncrementalFile(IncrementalFile),
}

/// The details of the IO operation
#[derive(Debug)]
pub struct Item {
pub(crate) struct Item {
/// The path to operate on
pub full_path: PathBuf,
pub(crate) full_path: PathBuf,
/// The operation to perform
pub kind: Kind,
pub(crate) kind: Kind,
/// When the operation started
pub start: Option<Instant>,
start: Option<Instant>,
/// Amount of time the operation took to finish
pub finish: Option<Duration>,
finish: Option<Duration>,
/// The length of the file, for files (for stats)
pub size: Option<usize>,
size: Option<usize>,
/// The result of the operation (could now be factored into CompletedIO...)
pub result: io::Result<()>,
pub(crate) result: io::Result<()>,
/// The mode to apply
pub mode: u32,
mode: u32,
}

#[derive(Debug)]
pub enum CompletedIo {
pub(crate) enum CompletedIo {
/// A submitted Item has completed
Item(Item),
/// An IncrementalFile has completed a single chunk
Chunk(usize),
}

impl Item {
pub fn make_dir(full_path: PathBuf, mode: u32) -> Self {
pub(crate) fn make_dir(full_path: PathBuf, mode: u32) -> Self {
Self {
full_path,
kind: Kind::Directory,
Expand All @@ -222,7 +222,7 @@ impl Item {
}
}

pub fn write_file(full_path: PathBuf, mode: u32, content: FileBuffer) -> Self {
pub(crate) fn write_file(full_path: PathBuf, mode: u32, content: FileBuffer) -> Self {
let len = content.len();
Self {
full_path,
Expand All @@ -235,7 +235,7 @@ impl Item {
}
}

pub fn write_file_segmented<'a>(
pub(crate) fn write_file_segmented<'a>(
full_path: PathBuf,
mode: u32,
state: IncrementalFileState,
Expand All @@ -261,7 +261,7 @@ impl Item {
/// just allows the immediate codepath to get access to the Arc referenced state
/// without holding a lifetime reference to the executor, as the threaded code
/// path is all message passing.
pub enum IncrementalFileState {
pub(crate) enum IncrementalFileState {
Threaded,
Immediate(immediate::IncrementalFileState),
}
Expand Down Expand Up @@ -294,7 +294,7 @@ impl IncrementalFileState {
/// Trait object for performing IO. At this point the overhead
/// of trait invocation is not a bottleneck, but if it becomes
/// one we could consider an enum variant based approach instead.
pub trait Executor {
pub(crate) trait Executor {
/// Perform a single operation.
/// During overload situations previously queued items may
/// need to be completed before the item is accepted:
Expand Down Expand Up @@ -332,7 +332,7 @@ pub trait Executor {

/// Trivial single threaded IO to be used from executors.
/// (Crazy sophisticated ones can obviously ignore this)
pub fn perform<F: Fn(usize)>(item: &mut Item, chunk_complete_callback: F) {
pub(crate) fn perform<F: Fn(usize)>(item: &mut Item, chunk_complete_callback: F) {
// directories: make them, TODO: register with the dir existence cache.
// Files, write them.
item.result = match &mut item.kind {
Expand Down Expand Up @@ -361,7 +361,7 @@ pub fn perform<F: Fn(usize)>(item: &mut Item, chunk_complete_callback: F) {
}

#[allow(unused_variables)]
pub fn write_file<P: AsRef<Path>, C: AsRef<[u8]>>(
pub(crate) fn write_file<P: AsRef<Path>, C: AsRef<[u8]>>(
path: P,
contents: C,
mode: u32,
Expand Down Expand Up @@ -392,7 +392,7 @@ pub fn write_file<P: AsRef<Path>, C: AsRef<[u8]>>(
}

#[allow(unused_variables)]
pub fn write_file_incremental<P: AsRef<Path>, F: Fn(usize)>(
pub(crate) fn write_file_incremental<P: AsRef<Path>, F: Fn(usize)>(
path: P,
content_callback: &mut IncrementalFile,
mode: u32,
Expand Down Expand Up @@ -437,15 +437,15 @@ pub fn write_file_incremental<P: AsRef<Path>, F: Fn(usize)>(
Ok(())
}

pub fn create_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
pub(crate) fn create_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref();
let path_display = format!("{}", path.display());
trace_scoped!("create_dir", "name": path_display);
std::fs::create_dir(path)
}

/// Get the executor for disk IO.
pub fn get_executor<'a>(
pub(crate) fn get_executor<'a>(
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
ram_budget: usize,
) -> Result<Box<dyn Executor + 'a>> {
Expand Down
10 changes: 5 additions & 5 deletions src/diskio/threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::utils::notifications::Notification;
use crate::utils::units::Unit;

#[derive(Copy, Clone, Debug, Enum)]
pub enum Bucket {
pub(crate) enum Bucket {
FourK,
EightK,
OneM,
Expand All @@ -27,13 +27,13 @@ pub enum Bucket {
}

#[derive(Debug)]
pub enum PoolReference {
pub(crate) enum PoolReference {
Owned(OwnedRef<Vec<u8>>, Arc<sharded_slab::Pool<Vec<u8>>>),
Mut(OwnedRefMut<Vec<u8>>, Arc<sharded_slab::Pool<Vec<u8>>>),
}

impl PoolReference {
pub fn clear(&mut self) {
pub(crate) fn clear(&mut self) {
match self {
PoolReference::Mut(orm, pool) => {
pool.clear(orm.key());
Expand Down Expand Up @@ -96,7 +96,7 @@ impl fmt::Debug for Pool {
}
}

pub struct Threaded<'a> {
pub(crate) struct Threaded<'a> {
n_files: Arc<AtomicUsize>,
pool: threadpool::ThreadPool,
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
Expand All @@ -108,7 +108,7 @@ pub struct Threaded<'a> {

impl<'a> Threaded<'a> {
/// Construct a new Threaded executor.
pub fn new(
pub(crate) fn new(
notify_handler: Option<&'a dyn Fn(Notification<'_>)>,
thread_count: usize,
ram_budget: usize,
Expand Down