Skip to content

Commit 9ad981d

Browse files
authored
program: add gRPC data ingester (#4411)
Summary: TensorBoard can now start RustBoard if `--load_fast` is given at runtime and `--define=link_data_server=true` was given at build time. The default configuration still has no Rust code, so our Pip packages are still portable. When we actually deploy this, we can distribute the Rust binary in a separate Pip package that TensorBoard imports, but the UX can stay the same: “just add `--load_fast`.” Test Plan: Test in the following configurations: - with just a `--logdir`, everything works as normal; - with `--define=link_data_server=true -- --logdir ... --load_fast`, the subprocess is spawned and cleaned up on a successful exit or a SIGKILL to TensorBoard, and INFO logs are shown iff `--verbosity 0` is specified; - with `--logdir ... --load_fast` but no `--define`, TensorBoard fails to start the server and prints a message before exiting; and - with `--grpc_data_provider localhost:6806`, TensorBoard connects to an existing server without needing `--logdir` or `--define`. To test the “data server died” case, comment out the `--logdir=%s` flag, which will cause the server to fail with a usage message. That message should appear in the logs. To test the polling, add `thread::sleep(Duration::from_secs(3))` before the server writes its port file, and run with `--verbosity 0` to note the “Polling for data server port” messages. This also works after syncing into Google, in all relevant configurations; see <http://cl/344900410> and <http://cl/344955833>. wchargin-branch: grpc-ingester
1 parent 83ce2a2 commit 9ad981d

File tree

12 files changed

+494
-35
lines changed

12 files changed

+494
-35
lines changed

tensorboard/BUILD

+1-2
Original file line numberDiff line numberDiff line change
@@ -216,11 +216,10 @@ py_library(
216216
":version",
217217
"//tensorboard:expect_absl_flags_installed",
218218
"//tensorboard:expect_absl_logging_installed",
219-
"//tensorboard:expect_grpc_installed",
220219
"//tensorboard/backend:application",
221220
"//tensorboard/backend/event_processing:data_ingester",
222221
"//tensorboard/backend/event_processing:event_file_inspector",
223-
"//tensorboard/data:grpc_provider",
222+
"//tensorboard/data:server_ingester",
224223
"//tensorboard/util:argparse_util",
225224
"@org_pocoo_werkzeug",
226225
"@org_pythonhosted_six",

tensorboard/backend/event_processing/BUILD

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ py_library(
3737
":data_provider",
3838
":event_multiplexer",
3939
":tag_types",
40+
"//tensorboard/data:ingester",
4041
"//tensorboard/plugins/audio:metadata",
4142
"//tensorboard/plugins/histogram:metadata",
4243
"//tensorboard/plugins/image:metadata",

tensorboard/backend/event_processing/data_ingester.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from tensorboard.backend.event_processing import data_provider
2525
from tensorboard.backend.event_processing import plugin_event_multiplexer
2626
from tensorboard.backend.event_processing import tag_types
27+
from tensorboard.data import ingester
2728
from tensorboard.plugins.audio import metadata as audio_metadata
2829
from tensorboard.plugins.histogram import metadata as histogram_metadata
2930
from tensorboard.plugins.image import metadata as image_metadata
@@ -48,7 +49,7 @@
4849
logger = tb_logging.get_logger()
4950

5051

51-
class LocalDataIngester(object):
52+
class LocalDataIngester(ingester.DataIngester):
5253
"""Data ingestion implementation to use when running locally."""
5354

5455
def __init__(self, flags):

tensorboard/data/BUILD

+53
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,59 @@ py_test(
6464
],
6565
)
6666

67+
py_library(
68+
name = "ingester",
69+
srcs = ["ingester.py"],
70+
srcs_version = "PY3",
71+
)
72+
73+
py_test(
74+
name = "ingester_test",
75+
size = "small",
76+
srcs = ["ingester_test.py"],
77+
srcs_version = "PY3",
78+
tags = ["support_notf"],
79+
deps = [
80+
":ingester",
81+
],
82+
)
83+
84+
config_setting(
85+
name = "link_data_server",
86+
define_values = {"link_data_server": "true"},
87+
)
88+
89+
py_library(
90+
name = "server_ingester",
91+
srcs = ["server_ingester.py"],
92+
data = select({
93+
":link_data_server": ["//tensorboard/data/server"],
94+
"//conditions:default": [],
95+
}),
96+
srcs_version = "PY3",
97+
deps = [
98+
":grpc_provider",
99+
":ingester",
100+
"//tensorboard:expect_grpc_installed",
101+
"//tensorboard/util:tb_logging",
102+
],
103+
)
104+
105+
py_test(
106+
name = "server_ingester_test",
107+
size = "medium", # time.sleep
108+
timeout = "short",
109+
srcs = ["server_ingester_test.py"],
110+
srcs_version = "PY3",
111+
tags = ["support_notf"],
112+
deps = [
113+
":grpc_provider",
114+
":server_ingester",
115+
"//tensorboard:expect_grpc_installed",
116+
"//tensorboard:test",
117+
],
118+
)
119+
67120
py_library(
68121
name = "grpc_provider",
69122
srcs = ["grpc_provider.py"],

tensorboard/data/ingester.py

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
# ==============================================================================
15+
"""Abstraction for data ingestion logic."""
16+
17+
import abc
18+
19+
20+
class DataIngester(metaclass=abc.ABCMeta):
21+
"""Link between a data source and a data provider.
22+
23+
A data ingester starts a reload operation in the background and
24+
provides a data provider as a view.
25+
"""
26+
27+
@property
28+
@abc.abstractmethod
29+
def data_provider(self):
30+
"""Returns a `DataProvider`.
31+
32+
It may be an error to dereference this before `start` is called.
33+
"""
34+
pass
35+
36+
@abc.abstractmethod
37+
def start(self):
38+
"""Starts ingesting data.
39+
40+
This may start a background thread or process, and will return
41+
once communication with that task is established. It won't block
42+
forever as data is reloaded.
43+
44+
Must only be called once.
45+
"""
46+
pass

tensorboard/data/ingester_test.py

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
# ==============================================================================
15+
"""Unit tests for `tensorboard.ingester`."""
16+
17+
from tensorboard.data import ingester
18+
19+
# This is a pure abstract class. There's really nothing to test other than that
20+
# it executes successfully.
21+
del ingester

tensorboard/data/server/DEVELOPMENT.md

+41-2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,45 @@ Rust source files in your editor. For editor setup, consult
3636

3737
[ra-docs]: https://rust-analyzer.github.io/
3838

39+
## Running under TensorBoard
40+
41+
You can point TensorBoard at a data server in two ways: start the server
42+
yourself and give TensorBoard an address, or tell TensorBoard to start the
43+
server as a subprocess.
44+
45+
To connect to an existing server, pass `--grpc_data_provider ADDRESS`, where the
46+
address is like `localhost:6806`. Thus:
47+
48+
```
49+
bazel run -c opt //tensorboard -- --grpc_data_provider localhost:6806
50+
```
51+
52+
You don’t have to pass a `--logdir` if you do this, but you do have to
53+
concurrently run `//tensorboard/data/server` (say, in the background, or in a
54+
separate shell). You can also swap out the data server whenever you want without
55+
restarting TensorBoard; new RPCs will transparently reconnect. The server
56+
doesn’t have to be running when TensorBoard starts.
57+
58+
To tell TensorBoard to start the server as a subprocess, build with
59+
`--define=link_data_server=true` and pass `--load_fast` to TensorBoard along
60+
with a normal `--logdir`. Thus:
61+
62+
```
63+
bazel run -c opt --define=link_data_server=true //tensorboard -- \
64+
--load_fast --logdir ~/tensorboard_data/mnist/ --bind_all --verbosity 0
65+
```
66+
67+
This is an easier one-shot solution, but requires a `--define` flag, offers less
68+
flexibility over the flags to the data server, and requires restarting
69+
TensorBoard if you want to restart the data server (though that’s not usually a
70+
big deal). The data server will automatically shut down when TensorBoard exits
71+
for any reason.
72+
73+
As an alternative to `--define=link_data_server=true`, you can set the
74+
`TENSORBOARD_DATA_SERVER_BINARY` environment variable to the path to a data
75+
server binary, and pass `--load_fast`. If running with `bazel run`, this should
76+
be an absolute path.
77+
3978
## Adding third-party dependencies
4079

4180
Rust dependencies are usually hosted on [crates.io]. We use [`cargo-raze`][raze]
@@ -51,8 +90,8 @@ dependency:
5190
package on <https://crates.io/>.
5291
3. Change into the `tensorboard/data/server/` directory.
5392
4. Run `cargo fetch` to update `Cargo.lock`. Running this before `cargo raze`
54-
ensures that the `http_archive` workspace rules in the generated build
55-
files will have `sha256` checksums.
93+
ensures that the `http_archive` workspace rules in the generated build files
94+
will have `sha256` checksums.
5695
5. Run `cargo raze` to update `third_party/rust/...`.
5796

5897
This will add a new target like `//third_party/rust:rand`. Manually build it

tensorboard/data/server/main.rs

+48-12
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,11 @@ limitations under the License.
1414
==============================================================================*/
1515

1616
use clap::Clap;
17-
use log::{debug, info, LevelFilter};
18-
use std::io::Read;
17+
use log::{debug, error, info, LevelFilter};
18+
use std::fs::File;
19+
use std::io::{Read, Write};
1920
use std::net::{IpAddr, SocketAddr};
20-
use std::path::PathBuf;
21+
use std::path::{Path, PathBuf};
2122
use std::str::FromStr;
2223
use std::thread;
2324
use std::time::{Duration, Instant};
@@ -74,6 +75,15 @@ struct Opts {
7475
/// sense) but not killed.
7576
#[clap(long)]
7677
die_after_stdin: bool,
78+
79+
/// Write bound port to this file
80+
///
81+
/// Once a server socket is opened, write the port on which it's listening to the file at this
82+
/// path. Useful with `--port 0`. Port will be written as ASCII decimal followed by a newline
83+
/// (e.g., "6806\n"). If the server fails to start, this file may not be written at all. If the
84+
/// port file is specified but cannot be written, the server will die.
85+
#[clap(long)]
86+
port_file: Option<PathBuf>,
7787
}
7888

7989
/// A duration in seconds.
@@ -113,21 +123,39 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
113123
let bound = listener.local_addr()?;
114124
eprintln!("listening on {:?}", bound);
115125

126+
if let Some(port_file) = opts.port_file {
127+
let port = bound.port();
128+
if let Err(e) = write_port_file(&port_file, port) {
129+
error!(
130+
"Failed to write port \"{}\" to {}: {}",
131+
port,
132+
port_file.display(),
133+
e
134+
);
135+
std::process::exit(1);
136+
}
137+
info!("Wrote port \"{}\" to {}", port, port_file.display());
138+
}
139+
116140
// Leak the commit object, since the Tonic server must have only 'static references. This only
117141
// leaks the outer commit structure (of constant size), not the pointers to the actual data.
118142
let commit: &'static Commit = Box::leak(Box::new(Commit::new()));
119143

120144
thread::Builder::new()
121145
.name("Reloader".to_string())
122-
.spawn(move || {
123-
let mut loader = LogdirLoader::new(commit, opts.logdir);
124-
loop {
125-
info!("Starting load cycle");
126-
let start = Instant::now();
127-
loader.reload();
128-
let end = Instant::now();
129-
info!("Finished load cycle ({:?})", end - start);
130-
thread::sleep(opts.reload_interval.duration());
146+
.spawn({
147+
let logdir = opts.logdir;
148+
let reload_interval = opts.reload_interval;
149+
move || {
150+
let mut loader = LogdirLoader::new(commit, logdir);
151+
loop {
152+
info!("Starting load cycle");
153+
let start = Instant::now();
154+
loader.reload();
155+
let end = Instant::now();
156+
info!("Finished load cycle ({:?})", end - start);
157+
thread::sleep(reload_interval.duration());
158+
}
131159
}
132160
})
133161
.expect("failed to spawn reloader thread");
@@ -156,3 +184,11 @@ fn die_after_stdin() {
156184
info!("Stdin closed; exiting");
157185
std::process::exit(0);
158186
}
187+
188+
/// Writes `port` to file `path` as an ASCII decimal followed by newline.
189+
fn write_port_file(path: &Path, port: u16) -> std::io::Result<()> {
190+
let mut f = File::create(path)?;
191+
writeln!(f, "{}", port)?;
192+
f.sync_all()?;
193+
Ok(())
194+
}

0 commit comments

Comments
 (0)