Skip to content

Commit c0f2b3f

Browse files
committed
Option to follow component logs at console
Signed-off-by: itowlson <[email protected]>
1 parent d4b8f98 commit c0f2b3f

File tree

13 files changed

+361
-87
lines changed

13 files changed

+361
-87
lines changed

crates/engine/src/io.rs

Lines changed: 205 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,215 @@
1-
use std::sync::{Arc, RwLock};
2-
use wasi_common::pipe::{ReadPipe, WritePipe};
1+
use std::{
2+
collections::HashSet,
3+
io::{LineWriter, Write},
4+
sync::{Arc, RwLock, RwLockReadGuard},
5+
};
6+
use wasi_common::{
7+
pipe::{ReadPipe, WritePipe},
8+
WasiFile,
9+
};
310

4-
/// Input/Output stream redirects
5-
#[derive(Clone)]
6-
pub struct IoStreamRedirects {
7-
/// Standard input redirect.
8-
pub stdin: ReadPipe<std::io::Cursor<Vec<u8>>>,
9-
/// Standard output redirect.
10-
pub stdout: OutRedirect,
11-
/// Standard error redirect.
12-
pub stderr: OutRedirect,
11+
/// Which components should have their logs followed on stdout/stderr.
12+
#[derive(Clone, Debug)]
13+
pub enum FollowComponents {
14+
/// No components should have their logs followed.
15+
None,
16+
/// Only the specified components should have their logs followed.
17+
Named(HashSet<String>),
18+
/// All components should have their logs followed.
19+
All,
1320
}
1421

15-
/// Output redirect and lock.
16-
#[derive(Clone)]
17-
pub struct OutRedirect {
18-
/// Output redirect.
19-
pub out: WritePipe<Vec<u8>>,
20-
/// Lock for writing.
21-
pub lock: Arc<RwLock<Vec<u8>>>,
22+
impl FollowComponents {
23+
/// Whether a given component should have its logs followed on stdout/stderr.
24+
pub fn should_follow(&self, component_id: &str) -> bool {
25+
match self {
26+
Self::None => false,
27+
Self::All => true,
28+
Self::Named(ids) => ids.contains(component_id),
29+
}
30+
}
31+
}
32+
33+
/// The buffers in which Wasm module output has been saved.
34+
pub trait OutputBuffers {
35+
/// The buffer in which stdout has been saved.
36+
fn stdout(&self) -> &[u8];
37+
/// The buffer in which stderr has been saved.
38+
fn stderr(&self) -> &[u8];
39+
}
40+
41+
/// A set of redirected standard I/O streams with which
42+
/// a Wasm module is to be run.
43+
pub struct ModuleIoRedirects {
44+
pub(crate) stdin: Box<dyn WasiFile>,
45+
pub(crate) stdout: Box<dyn WasiFile>,
46+
pub(crate) stderr: Box<dyn WasiFile>,
47+
}
48+
49+
impl ModuleIoRedirects {
50+
/// Constructs an instance from a set of WasiFile objects.
51+
pub fn new(
52+
stdin: Box<dyn WasiFile>,
53+
stdout: Box<dyn WasiFile>,
54+
stderr: Box<dyn WasiFile>,
55+
) -> Self {
56+
Self {
57+
stdin,
58+
stdout,
59+
stderr,
60+
}
61+
}
62+
}
63+
64+
/// The destinations to which redirected module output will be written.
65+
/// Used for subsequently reading back the output.
66+
pub struct RedirectReadHandles {
67+
stdout: Arc<RwLock<WriteDestinations>>,
68+
stderr: Arc<RwLock<WriteDestinations>>,
69+
}
70+
71+
impl RedirectReadHandles {
72+
/// Acquires a read lock for the in-memory output buffers.
73+
pub fn read(&self) -> impl OutputBuffers + '_ {
74+
RedirectReadHandlesLock {
75+
stdout: self.stdout.read().unwrap(),
76+
stderr: self.stderr.read().unwrap(),
77+
}
78+
}
79+
}
80+
81+
struct RedirectReadHandlesLock<'a> {
82+
stdout: RwLockReadGuard<'a, WriteDestinations>,
83+
stderr: RwLockReadGuard<'a, WriteDestinations>,
84+
}
85+
86+
impl<'a> OutputBuffers for RedirectReadHandlesLock<'a> {
87+
fn stdout(&self) -> &[u8] {
88+
self.stdout.buffer()
89+
}
90+
fn stderr(&self) -> &[u8] {
91+
self.stderr.buffer()
92+
}
2293
}
2394

2495
/// Prepares WASI pipes which redirect a component's output to
2596
/// memory buffers.
26-
pub fn prepare_io_redirects() -> anyhow::Result<IoStreamRedirects> {
97+
pub fn capture_io_to_memory(
98+
follow_on_stdout: bool,
99+
follow_on_stderr: bool,
100+
) -> (ModuleIoRedirects, RedirectReadHandles) {
101+
let stdout_follow = Follow::stdout(follow_on_stdout);
102+
let stderr_follow = Follow::stderr(follow_on_stderr);
103+
27104
let stdin = ReadPipe::from(vec![]);
28105

29-
let stdout_buf: Vec<u8> = vec![];
30-
let lock = Arc::new(RwLock::new(stdout_buf));
31-
let stdout = WritePipe::from_shared(lock.clone());
32-
let stdout = OutRedirect { out: stdout, lock };
33-
34-
let stderr_buf: Vec<u8> = vec![];
35-
let lock = Arc::new(RwLock::new(stderr_buf));
36-
let stderr = WritePipe::from_shared(lock.clone());
37-
let stderr = OutRedirect { out: stderr, lock };
38-
39-
Ok(IoStreamRedirects {
40-
stdin,
41-
stdout,
42-
stderr,
43-
})
106+
let (stdout_pipe, stdout_lock) = redirect_to_mem_buffer(stdout_follow);
107+
108+
let (stderr_pipe, stderr_lock) = redirect_to_mem_buffer(stderr_follow);
109+
110+
let redirects = ModuleIoRedirects {
111+
stdin: Box::new(stdin),
112+
stdout: Box::new(stdout_pipe),
113+
stderr: Box::new(stderr_pipe),
114+
};
115+
116+
let outputs = RedirectReadHandles {
117+
stdout: stdout_lock,
118+
stderr: stderr_lock,
119+
};
120+
121+
(redirects, outputs)
122+
}
123+
124+
/// Indicates whether a memory redirect should also pipe the output to
125+
/// the console so it can be followed live.
126+
pub enum Follow {
127+
/// Do not pipe to console - only write to memory.
128+
None,
129+
/// Also pipe to stdout.
130+
Stdout,
131+
/// Also pipe to stderr.
132+
Stderr,
133+
}
134+
135+
impl Follow {
136+
pub(crate) fn writer(&self) -> Box<dyn Write + Send + Sync> {
137+
match self {
138+
Self::None => Box::new(DiscardingWriter),
139+
Self::Stdout => Box::new(LineWriter::new(std::io::stdout())),
140+
Self::Stderr => Box::new(LineWriter::new(std::io::stderr())),
141+
}
142+
}
143+
144+
/// Follow on stdout if so specified.
145+
pub fn stdout(follow_on_stdout: bool) -> Self {
146+
if follow_on_stdout {
147+
Self::Stdout
148+
} else {
149+
Self::None
150+
}
151+
}
152+
153+
/// Follow on stderr if so specified.
154+
pub fn stderr(follow_on_stderr: bool) -> Self {
155+
if follow_on_stderr {
156+
Self::Stderr
157+
} else {
158+
Self::None
159+
}
160+
}
161+
}
162+
163+
/// Prepares a WASI pipe which writes to a memory buffer, optionally
164+
/// copying to the specified output stream.
165+
pub fn redirect_to_mem_buffer(
166+
follow: Follow,
167+
) -> (WritePipe<WriteDestinations>, Arc<RwLock<WriteDestinations>>) {
168+
let immediate = follow.writer();
169+
170+
let buffer: Vec<u8> = vec![];
171+
let std_dests = WriteDestinations { buffer, immediate };
172+
let lock = Arc::new(RwLock::new(std_dests));
173+
let std_pipe = WritePipe::from_shared(lock.clone());
174+
175+
(std_pipe, lock)
176+
}
177+
178+
/// The destinations to which a component writes an output stream.
179+
pub struct WriteDestinations {
180+
buffer: Vec<u8>,
181+
immediate: Box<dyn Write + Send + Sync>,
182+
}
183+
184+
impl WriteDestinations {
185+
/// The memory buffer to which a component writes an output stream.
186+
pub fn buffer(&self) -> &[u8] {
187+
&self.buffer
188+
}
189+
}
190+
191+
impl Write for WriteDestinations {
192+
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
193+
let written = self.buffer.write(buf)?;
194+
self.immediate.write_all(&buf[0..written])?;
195+
Ok(written)
196+
}
197+
198+
fn flush(&mut self) -> std::io::Result<()> {
199+
self.buffer.flush()?;
200+
self.immediate.flush()?;
201+
Ok(())
202+
}
203+
}
204+
205+
struct DiscardingWriter;
206+
207+
impl Write for DiscardingWriter {
208+
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
209+
Ok(buf.len())
210+
}
211+
212+
fn flush(&mut self) -> std::io::Result<()> {
213+
Ok(())
214+
}
44215
}

crates/engine/src/lib.rs

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ pub mod io;
99

1010
use anyhow::{bail, Context, Result};
1111
use host_component::{HostComponent, HostComponents, HostComponentsState};
12-
use io::IoStreamRedirects;
12+
use io::{ModuleIoRedirects, OutputBuffers};
1313
use spin_config::{host_component::ComponentConfig, Resolver};
1414
use spin_manifest::{Application, CoreComponent, DirectoryMount, ModuleSource};
1515
use std::{collections::HashMap, io::Write, path::PathBuf, sync::Arc};
@@ -219,7 +219,7 @@ impl<T: Default> ExecutionContext<T> {
219219
&self,
220220
component: &str,
221221
data: Option<T>,
222-
io: Option<IoStreamRedirects>,
222+
io: Option<ModuleIoRedirects>,
223223
env: Option<HashMap<String, String>>,
224224
args: Option<Vec<String>>,
225225
) -> Result<(Store<RuntimeContext<T>>, Instance)> {
@@ -238,7 +238,7 @@ impl<T: Default> ExecutionContext<T> {
238238
/// Save logs for a given component in the log directory on the host
239239
pub fn save_output_to_logs(
240240
&self,
241-
io_redirects: IoStreamRedirects,
241+
io_redirects: impl OutputBuffers,
242242
component: &str,
243243
save_stdout: bool,
244244
save_stderr: bool,
@@ -274,18 +274,18 @@ impl<T: Default> ExecutionContext<T> {
274274
.append(true)
275275
.create(true)
276276
.open(stdout_filename)?;
277-
let contents = io_redirects.stdout.lock.read().unwrap();
278-
file.write_all(&contents)?;
277+
let contents = io_redirects.stdout();
278+
file.write_all(contents)?;
279279
}
280280

281281
if save_stderr {
282-
let contents = io_redirects.stderr.lock.read().unwrap();
283282
let mut file = std::fs::OpenOptions::new()
284283
.write(true)
285284
.append(true)
286285
.create(true)
287286
.open(stderr_filename)?;
288-
file.write_all(&contents)?;
287+
let contents = io_redirects.stderr();
288+
file.write_all(contents)?;
289289
}
290290

291291
Ok(())
@@ -295,7 +295,7 @@ impl<T: Default> ExecutionContext<T> {
295295
&self,
296296
component: &Component<T>,
297297
data: Option<T>,
298-
io: Option<IoStreamRedirects>,
298+
io: Option<ModuleIoRedirects>,
299299
env: Option<HashMap<String, String>>,
300300
args: Option<Vec<String>>,
301301
) -> Result<Store<RuntimeContext<T>>> {
@@ -307,10 +307,7 @@ impl<T: Default> ExecutionContext<T> {
307307
.envs(&env)?;
308308
match io {
309309
Some(r) => {
310-
wasi_ctx = wasi_ctx
311-
.stderr(Box::new(r.stderr.out))
312-
.stdout(Box::new(r.stdout.out))
313-
.stdin(Box::new(r.stdin));
310+
wasi_ctx = wasi_ctx.stderr(r.stderr).stdout(r.stdout).stdin(r.stdin);
314311
}
315312
None => wasi_ctx = wasi_ctx.inherit_stdio(),
316313
};

0 commit comments

Comments
 (0)