Skip to content

rust: create multi-file run loader #4343

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Nov 19, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions tensorboard/data/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ rust_library(
"event_file.rs",
"masked_crc.rs",
"reservoir.rs",
"run.rs",
"scripted_reader.rs",
"tf_record.rs",
"types.rs",
Expand All @@ -50,6 +51,9 @@ rust_library(
rust_test(
name = "rustboard_core_test",
crate = ":rustboard_core",
deps = [
"//third_party/rust:tempfile",
],
)

rust_doc_test(
Expand Down
2 changes: 1 addition & 1 deletion tensorboard/data/server/data_compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ limitations under the License.
use crate::proto::tensorboard as pb;
use pb::{summary::value::Value, summary_metadata::PluginData};

const SCALARS_PLUGIN_NAME: &str = "scalars";
pub(crate) const SCALARS_PLUGIN_NAME: &str = "scalars";

/// Determines the metadata for a time series given its first event.
///
Expand Down
1 change: 1 addition & 0 deletions tensorboard/data/server/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod data_compat;
pub mod event_file;
pub mod masked_crc;
pub mod reservoir;
pub mod run;
pub mod tf_record;
pub mod types;

Expand Down
363 changes: 363 additions & 0 deletions tensorboard/data/server/run.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,363 @@
/* Copyright 2020 The TensorFlow Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/

//! Loader for a single run, with one or more event files.

use std::collections::{BTreeMap, HashMap, HashSet};
use std::fs::File;
use std::io::BufReader;
use std::path::PathBuf;

use crate::event_file::EventFileReader;
use crate::proto::tensorboard as pb;
use crate::reservoir::StageReservoir;
use crate::types::{Step, Tag, WallTime};

/// A loader to accumulate reservoir-sampled events in a single TensorBoard run.
///
/// For now, a run loader always reads from [`File`]s on disk. In the future, this may be
/// parameterized over a filesystem interface.
#[derive(Debug)]
pub struct RunLoader {
/// The earliest event `wall_time` seen in any event file in this run.
///
/// This is `None` if and only if no events have been seen. Its value may decrease as new
/// events are read, but in practice this is expected to be the wall time of the first
/// `file_version` event in the first event file.
start_time: Option<WallTime>,

/// The event files in this run.
///
/// Event files are sorted and read lexicographically by name, which is designed to coincide
/// with actual start time. See [`EventFile::Dead`] for conditions under which an event file
/// may be dead. Once an event file is added to this map, it may become dead, but it will not
/// be removed entirely. This way, we know not to just re-open it again at the next load cycle.
files: BTreeMap<PathBuf, EventFile<BufReader<File>>>,

/// Reservoir-sampled data and metadata for each time series.
time_series: HashMap<Tag, TimeSeries>,
}

#[derive(Debug)]
enum EventFile<R> {
/// An event file that may still have more valid data.
Active(EventFileReader<R>),
/// An event file that can no longer be read.
///
/// This can be because of a non-recoverable read error (e.g., a bad length checksum), due to
/// the last-read record being very old (note: not yet implemented), or due to the file being
/// deleted.
Dead,
}

#[derive(Debug)]
struct TimeSeries {
data_class: pb::DataClass,
metadata: Box<pb::SummaryMetadata>,
rsv: StageReservoir<StageValue>,
}

impl TimeSeries {
fn new(metadata: Box<pb::SummaryMetadata>) -> Self {
let data_class =
pb::DataClass::from_i32(metadata.data_class).unwrap_or(pb::DataClass::Unknown);
let capacity = match data_class {
pb::DataClass::Scalar => 1000,
pb::DataClass::Tensor => 100,
pb::DataClass::BlobSequence => 10,
_ => 0,
};
Self {
data_class,
metadata,
rsv: StageReservoir::new(capacity),
}
}
}

/// A value staged in the reservoir.
///
/// This is kept as close as possible to the on-disk event representation, since every record in
/// the stream is converted into this format.
#[derive(Debug)]
struct StageValue {
wall_time: WallTime,
payload: StagePayload,
}

#[derive(Debug)]
enum StagePayload {
#[allow(dead_code)]
GraphDef(Vec<u8>),
Summary(Box<pb::summary::value::Value>),
}

impl RunLoader {
pub fn new() -> Self {
Self {
start_time: None,
files: BTreeMap::new(),
time_series: HashMap::new(),
}
}

/// Loads new data given the current set of event files.
///
/// The provided filenames should correspond to the entire set of event files currently part of
/// this run.
pub fn reload(&mut self, filenames: Vec<PathBuf>) {
self.update_file_set(filenames);
self.reload_files();
}

/// Updates the active key set of `self.files` to match the given filenames.
///
/// After this function returns, `self.files` may still have keys not in `filenames`, but they
/// will all map to [`EventFile::Dead`].
fn update_file_set(&mut self, filenames: Vec<PathBuf>) {
// Remove any discarded files.
let new_file_set: HashSet<&PathBuf> = filenames.iter().collect();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of some interest: you have probably seen us write .collect() before to
turn an iterator into a vector. But here we’re turning one into a hash
set! In fact, collect can generate any collection type, as long as
it implements FromIterator. The specific type in any instance is
determined by context, and the compiler will complain if there’s not
enough information to uniquely identify a type. This is sometimes called
target typing.

for (k, v) in self.files.iter_mut() {
if !new_file_set.contains(k) {
*v = EventFile::Dead;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just confirming an obvious. what happens to the old reference to v? Does compiler essentially deallocates it since the owner disavowed it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. We say that the old value is dropped here, and so the compiler
will call the appropriate destructors and free the associated memory.

}
}

// Open readers for any new files.
for filename in filenames {
use std::collections::btree_map;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it kosher to "import" mid module? If so, when is this pattern used? I suspect that a good compiler will do the right thing no matter how you write this but I am still curious as to when I should prefer this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is fine!

First, note that Rust imports are always pure. Modules do not have side
effects. So you can freely reorder imports (rustfmt sorts them) or
declare them at any scope that you like without worrying about bugs.

So, then, it’s all about scope. The general Rust convention is to import
most types and traits that you use at top level, and modules if you
want, too. Importing bare functions is rarer, I think, but not unheard
of. I like to be able to see data_compat::initial_metadata(...) to
clarify that it’s not just a helper function in this module.

So I could have imported btree_map::Entry at top level. But I think
that that would be a wee bit confusing, because if you see Entry in
some arbitrary place in the code, it’s not obvious what that refers to
(as opposed to, e.g., HashMap).

I also could have imported btree_map at top level, and that would also
have been reasonable imho. I don’t really have strong opinions here.

Some examples of my conventions:

// OK: types and traits that are clear from context
use std::collections::{HashMap, HashSet};
use std::io::{Read, Write};

// OK: modules, aliases
use std::io;
use std::path;
use crate::proto::tensorboard as pb;

// Avoid: types not clear from context
// use std::io::Error;  // a fn returns "Error"---what kind of error?

// Weak-avoid: standalone functions
// use std::iter::once;  // seeing "iter::once" would be clearer

match self.files.entry(filename) {
btree_map::Entry::Occupied(_) => {}
btree_map::Entry::Vacant(v) => {
let event_file = match File::open(v.key()) {
Ok(file) => {
let reader = EventFileReader::new(BufReader::new(file));
EventFile::Active(reader)
}
// TODO(@wchargin): Improve error handling?
Err(e) => {
eprintln!("failed to open event file {:?}: {:?}", v.key(), e);
EventFile::Dead
}
};
v.insert(event_file);
}
};
}
}

/// Reads data from all active event files.
fn reload_files(&mut self) {
for (filename, ef) in self.files.iter_mut() {
let reader = match ef {
EventFile::Dead => continue,
EventFile::Active(reader) => reader,
};

loop {
use crate::event_file::ReadEventError::ReadRecordError;
use crate::tf_record::ReadRecordError::Truncated;
let event = match reader.read_event() {
Ok(event) => event,
Err(ReadRecordError(Truncated)) => break,
Err(e) => {
// TODO(@wchargin): Improve error handling?
eprintln!("read error in {}: {:?}", filename.display(), e);
*ef = EventFile::Dead;
break;
}
};
read_event(&mut self.time_series, &mut self.start_time, event);
}
}
}
}

/// Reads a single event into the structures of a run loader.
///
/// This is a standalone function because it's called from `reload_files` in a context that already
/// has an exclusive reference into `self.files`, and so can't call methods on `&mut self`.
fn read_event(
time_series: &mut HashMap<Tag, TimeSeries>,
start_time: &mut Option<WallTime>,
e: pb::Event,
) {
let step = Step(e.step);
let wall_time = match WallTime::new(e.wall_time) {
None => {
// TODO(@wchargin): Improve error handling.
eprintln!(
"dropping event at step {} with invalid wall time {}",
e.step, e.wall_time
);
return;
}
Some(wt) => wt,
};
if start_time.map(|start| wall_time < start).unwrap_or(true) {
*start_time = Some(wall_time);
}
match e.what {
Some(pb::event::What::GraphDef(_)) => {
// TODO(@wchargin): Handle run graphs.
eprintln!("graph_def events not yet handled");
}
Some(pb::event::What::Summary(sum)) => {
for mut summary_value in sum.value {
let value = match summary_value.value {
None => continue,
Some(v) => v,
};

use std::collections::hash_map::Entry;
let ts = match time_series.entry(Tag(summary_value.tag)) {
Entry::Occupied(o) => o.into_mut(),
Entry::Vacant(v) => {
let metadata = crate::data_compat::initial_metadata(
summary_value.metadata.take(),
&value,
);
v.insert(TimeSeries::new(metadata))
}
};
let sv = StageValue {
wall_time,
payload: StagePayload::Summary(Box::new(value)),
};
ts.rsv.offer(step, sv);
}
}
_ => {}
}
}

impl Default for RunLoader {
fn default() -> Self {
Self::new()
}
}

#[cfg(test)]
mod test {
use super::*;
use std::fs::File;
use std::io::{BufWriter, Write};

/// Writes an event to the given writer, in TFRecord form.
fn write_event<W: Write>(writer: W, event: &pb::Event) -> std::io::Result<()> {
use prost::Message;
let mut data = Vec::new();
event.encode(&mut data)?;
crate::tf_record::TfRecord::from_data(data).write(writer)?;
Ok(())
}

/// Writes a TF 1.x scalar event (`simple_value`) to the given writer.
fn write_scalar<W: Write>(
writer: W,
tag: &Tag,
step: Step,
wt: WallTime,
value: f32,
) -> std::io::Result<()> {
let event = pb::Event {
step: step.0,
wall_time: wt.into(),
what: Some(pb::event::What::Summary(pb::Summary {
value: vec![pb::summary::Value {
tag: tag.0.clone(),
value: Some(pb::summary::value::Value::SimpleValue(value)),
..Default::default()
}],
..Default::default()
})),
..Default::default()
};
write_event(writer, &event)
}

#[test]
fn test() -> Result<(), Box<dyn std::error::Error>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On this line, dyn std::error::Error opts us into dynamic dispatch: the
error may be any value implementing Error, along with its vtable.
This isn’t useful for programmatic callers, since it’s hard to usefully
inspect the cause, but it’s really convenient for main functions or test
functions, like this one.

let logdir = tempfile::tempdir()?;
let f1_name = logdir.path().join("tfevents.123");
let f2_name = logdir.path().join("tfevents.456");
let mut f1 = BufWriter::new(File::create(&f1_name)?);
let mut f2 = BufWriter::new(File::create(&f2_name)?);
Comment on lines +294 to +297
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No action required.

Interesting; I thought the module would consider an inevitable abstraction of the file system and operate on testable EventFile or something akin to that.

I assume this is actually writing these events out to the temp directory (at sync_all call in L315)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was certainly my plan, to parameterize the reader:

pub struct RunLoader<R> {
    files: HashMap<PathBuf, EventFile<R>>,
    // ...
}
impl<R: Read> for RunLoader { /* ... */ }

This works swimmingly for reading, but the problem is that Read is
just a stream type, and doesn’t actually give you a way to open a
file. So I would have needed to define something like:

trait Filesystem {
    type File: Read + Seek;
    fn open(p: &Path) -> io::Result<Self::File>;
}

struct RealFilesystem;
impl Filesystem for RealFilesystem {
    type File = std::fs::File;
    fn open(p: &Path) -> io::Result<Self::File> { std::fs::File::open(p) }
}

And this works perfectly fine, and we probably will want it
eventually, but I didn’t really want to bother implementing my own fake
in-memory file system when the real filesystem will do just fine.

I assume this is actually writing these events out to the temp
directory (at sync_all call in L315)?

Yep. The sync_all is an fsync(2). Without it, the files would still
be written when they’re dropped, but here we needed to keep a reference
to them later, so this flushes the BufWriter’s buffer.


// Write file versions.
for (f, wall_time) in &mut [(&mut f1, 1234.0), (&mut f2, 2345.0)] {
let file_version = pb::Event {
wall_time: *wall_time,
what: Some(pb::event::What::FileVersion("brain.Event:2".to_string())),
..Default::default()
};
write_event(f, &file_version)?;
}

// Write some data points across both files.
let tag = Tag("accuracy".to_string());
write_scalar(&mut f1, &tag, Step(0), WallTime::new(1235.0).unwrap(), 0.25)?;
write_scalar(&mut f1, &tag, Step(1), WallTime::new(1236.0).unwrap(), 0.50)?;
write_scalar(&mut f2, &tag, Step(2), WallTime::new(2346.0).unwrap(), 0.75)?;
write_scalar(&mut f2, &tag, Step(3), WallTime::new(2347.0).unwrap(), 1.00)?;
f1.into_inner()?.sync_all()?;
f2.into_inner()?.sync_all()?;

let mut loader = RunLoader::new();
loader.reload(vec![f1_name, f2_name]);

// Start time should be that of the file version event, even though that didn't correspond
// to any time series.
assert_eq!(loader.start_time, Some(WallTime::new(1234.0).unwrap()));
assert_eq!(loader.time_series.keys().collect::<Vec<_>>(), vec![&tag]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<_>! One day I will see a code with my favorite emoji, >_<

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm… I’m having trouble thinking of a piece of standard Rust syntax that
contains >_<, since _ is not a valid identifier by itself. :-(
But this time, macros can save the day:

macro_rules! swlmoji {
    (<_>) => { "meep" };
    (>_<) => { "mrrp" };
}
fn main() {
    println!("{}", swlmoji!(<_>));
    println!("{}", swlmoji!(>_<));
}

Copy link
Contributor Author

@wchargin wchargin Nov 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(oh, and the _ in Vec<_> means “please infer type parameter” :-) )

let ts = loader.time_series.get_mut(&tag).unwrap();
assert_eq!(
*ts.metadata,
pb::SummaryMetadata {
plugin_data: Some(pb::summary_metadata::PluginData {
plugin_name: crate::data_compat::SCALARS_PLUGIN_NAME.to_string(),
..Default::default()
}),
data_class: pb::DataClass::Scalar.into(),
..Default::default()
}
);
let mut basin = crate::reservoir::Basin::new();
ts.rsv.commit(&mut basin);

// Points should be as expected (no downsampling at these sizes).
let mut actual_points = Vec::new();
for (step, StageValue { wall_time, payload }) in basin.as_slice() {
if let StagePayload::Summary(value_box) = payload {
if let pb::summary::value::Value::SimpleValue(f) = **value_box {
actual_points.push((*step, *wall_time, f));
continue;
}
}
panic!("step {:?}: {:?}", step, payload);
}
assert_eq!(
actual_points,
vec![
(Step(0), WallTime::new(1235.0).unwrap(), 0.25),
(Step(1), WallTime::new(1236.0).unwrap(), 0.50),
(Step(2), WallTime::new(2346.0).unwrap(), 0.75),
(Step(3), WallTime::new(2347.0).unwrap(), 1.00),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing my understanding: I assume the loader.time_series does not actually sort/order events by step, right? It is just happens to be sorted since we wrote as such in L311-L314. Don't think I've read the reservoir module thoroughly to know that on top of my head, but it looks like rsv.offer simply pushes onto the vec.

Copy link
Contributor Author

@wchargin wchargin Nov 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is sorted because the reservoir preempts any non-preceding records
before pushing onto the vector, so when it pushes, the step of the
new record is always larger than the steps of all other records.

Including a preemption in this test is probably a pretty reasonable
idea. I’ll go ahead and do that.

]
);

Ok(())
}
}