diff --git a/tensorboard/data/server/main.rs b/tensorboard/data/server/main.rs index a75b256a39..8a6748f482 100644 --- a/tensorboard/data/server/main.rs +++ b/tensorboard/data/server/main.rs @@ -13,15 +13,44 @@ See the License for the specific language governing permissions and limitations under the License. ==============================================================================*/ +use std::path::PathBuf; +use std::time::{Duration, Instant}; use tonic::transport::Server; +use rustboard_core::commit::Commit; +use rustboard_core::logdir::LogdirLoader; use rustboard_core::proto::tensorboard::data::tensor_board_data_provider_server::TensorBoardDataProviderServer; use rustboard_core::server::DataProviderHandler; +const RELOAD_INTERVAL: Duration = Duration::from_secs(5); + #[tokio::main] async fn main() -> Result<(), Box> { let addr = "[::0]:6806".parse::()?; - let handler = DataProviderHandler; + let logdir = match std::env::args_os().nth(1) { + Some(d) => PathBuf::from(d), + None => { + eprintln!("fatal: specify logdir as first command-line argument"); + std::process::exit(1); + } + }; + + // Leak the commit object, since the Tonic server must have only 'static references. This only + // leaks the outer commit structure (of constant size), not the pointers to the actual data. + let commit: &'static Commit = Box::leak(Box::new(Commit::new())); + std::thread::spawn(move || { + let mut loader = LogdirLoader::new(commit, logdir); + loop { + eprintln!("beginning load cycle"); + let start = Instant::now(); + loader.reload(); + let end = Instant::now(); + eprintln!("finished load cycle ({:?})", end - start); + std::thread::sleep(RELOAD_INTERVAL); + } + }); + + let handler = DataProviderHandler { commit }; Server::builder() .add_service(TensorBoardDataProviderServer::new(handler)) .serve(addr) diff --git a/tensorboard/data/server/run.rs b/tensorboard/data/server/run.rs index ccfc83d7fb..edb3e95ac1 100644 --- a/tensorboard/data/server/run.rs +++ b/tensorboard/data/server/run.rs @@ -216,7 +216,8 @@ impl RunLoader { } fn commit_all(&mut self, run_data: &RwLock) { - let mut run = run_data.write().expect("acquiring run data lock"); + let mut run = run_data.write().expect("acquiring tags lock"); + run.start_time = self.start_time; for (tag, ts) in &mut self.time_series { ts.commit(tag, &mut *run); } diff --git a/tensorboard/data/server/server.rs b/tensorboard/data/server/server.rs index b3169df15e..a643d8899b 100644 --- a/tensorboard/data/server/server.rs +++ b/tensorboard/data/server/server.rs @@ -14,15 +14,31 @@ limitations under the License. ==============================================================================*/ use futures_core::Stream; +use std::collections::HashMap; use std::pin::Pin; +use std::sync::{RwLock, RwLockReadGuard}; use tonic::{Request, Response, Status}; +use crate::commit::{self, Commit}; use crate::proto::tensorboard::data; +use crate::types::{Run, WallTime}; use data::tensor_board_data_provider_server::TensorBoardDataProvider; /// Data provider gRPC service implementation. #[derive(Debug)] -pub struct DataProviderHandler; +pub struct DataProviderHandler { + pub commit: &'static Commit, +} + +impl DataProviderHandler { + /// Obtains a read-lock to `self.commit.runs`, or fails with `Status::internal`. + fn read_runs(&self) -> Result>>, Status> { + self.commit + .runs + .read() + .map_err(|_| Status::internal("failed to read commit.runs")) + } +} const FAKE_START_TIME: f64 = 1605752017.0; @@ -44,12 +60,33 @@ impl TensorBoardDataProvider for DataProviderHandler { &self, _request: Request, ) -> Result, Status> { - let mut res: data::ListRunsResponse = Default::default(); - res.runs.push(data::Run { - name: "train".to_string(), - start_time: FAKE_START_TIME, + let runs = self.read_runs()?; + + // Buffer up started runs to sort by wall time. Keep `WallTime` rather than projecting down + // to f64 so that we're guaranteed that they're non-NaN and can sort them. + let mut results: Vec<(Run, WallTime)> = Vec::with_capacity(runs.len()); + for (run, data) in runs.iter() { + let data = data + .read() + .map_err(|_| Status::internal(format!("failed to read run data for {:?}", run)))?; + if let Some(start_time) = data.start_time { + results.push((run.clone(), start_time)); + } + } + results.sort_by_key(|&(_, start_time)| start_time); + drop(runs); // release lock a bit earlier + + let res = data::ListRunsResponse { + runs: results + .into_iter() + .map(|(Run(name), start_time)| data::Run { + name, + start_time: start_time.into(), + ..Default::default() + }) + .collect(), ..Default::default() - }); + }; Ok(Response::new(res)) } @@ -139,9 +176,80 @@ impl TensorBoardDataProvider for DataProviderHandler { mod tests { use super::*; + use crate::commit::{ScalarValue, TimeSeries}; + use crate::data_compat; + use crate::proto::tensorboard as pb; + use crate::reservoir::StageReservoir; + use crate::types::{Run, Step, Tag, WallTime}; + + /// Creates a commit with some test data. + fn sample_commit() -> Commit { + let commit = Commit::new(); + + let mut runs = commit.runs.write().unwrap(); + + fn scalar_series(points: Vec<(Step, WallTime, f64)>) -> TimeSeries { + use pb::summary::value::Value::SimpleValue; + let mut ts = commit::TimeSeries::new( + data_compat::SummaryValue(Box::new(SimpleValue(0.0))).initial_metadata(None), + ); + let mut rsv = StageReservoir::new(points.len()); + for (step, wall_time, value) in points { + rsv.offer(step, (wall_time, Ok(commit::ScalarValue(value)))); + } + rsv.commit(&mut ts.basin); + ts + } + + let mut train = runs + .entry(Run("train".to_string())) + .or_default() + .write() + .unwrap(); + train.start_time = Some(WallTime::new(1234.0).unwrap()); + train.scalars.insert( + Tag("xent".to_string()), + scalar_series(vec![ + (Step(0), WallTime::new(1235.0).unwrap(), 0.5), + (Step(1), WallTime::new(1236.0).unwrap(), 0.25), + (Step(2), WallTime::new(1237.0).unwrap(), 0.125), + ]), + ); + drop(train); + + let mut test = runs + .entry(Run("test".to_string())) + .or_default() + .write() + .unwrap(); + test.start_time = Some(WallTime::new(6234.0).unwrap()); + test.scalars.insert( + Tag("accuracy".to_string()), + scalar_series(vec![ + (Step(0), WallTime::new(6235.0).unwrap(), 0.125), + (Step(1), WallTime::new(6236.0).unwrap(), 0.25), + (Step(2), WallTime::new(6237.0).unwrap(), 0.5), + ]), + ); + drop(test); + + // An run with no start time or data: should not show up in results. + runs.entry(Run("empty".to_string())).or_default(); + + drop(runs); + commit + } + + fn sample_handler() -> DataProviderHandler { + DataProviderHandler { + // Leak the commit object, since the Tonic server must have only 'static references. + commit: Box::leak(Box::new(sample_commit())), + } + } + #[tokio::test] async fn test_list_plugins() { - let handler = DataProviderHandler; + let handler = sample_handler(); let req = Request::new(data::ListPluginsRequest { experiment_id: "123".to_string(), ..Default::default() @@ -155,21 +263,32 @@ mod tests { #[tokio::test] async fn test_list_runs() { - let handler = DataProviderHandler; + let handler = sample_handler(); let req = Request::new(data::ListRunsRequest { experiment_id: "123".to_string(), ..Default::default() }); let res = handler.list_runs(req).await.unwrap().into_inner(); - assert_eq!(res.runs.len(), 1); - let run = &res.runs[0]; - assert_eq!(run.start_time, FAKE_START_TIME); - assert_eq!(run.name, "train"); + assert_eq!( + res.runs, + vec![ + data::Run { + name: "train".to_string(), + start_time: 1234.0, + ..Default::default() + }, + data::Run { + name: "test".to_string(), + start_time: 6234.0, + ..Default::default() + }, + ] + ); } #[tokio::test] async fn test_list_scalars() { - let handler = DataProviderHandler; + let handler = sample_handler(); let req = Request::new(data::ListScalarsRequest { experiment_id: "123".to_string(), plugin_filter: Some(data::PluginFilter { @@ -186,7 +305,7 @@ mod tests { #[tokio::test] async fn test_read_scalars() { - let handler = DataProviderHandler; + let handler = sample_handler(); let req = Request::new(data::ReadScalarsRequest { experiment_id: "123".to_string(), plugin_filter: Some(data::PluginFilter {