Skip to content

Implement CustomLogPipes for Spin Logging/IO: The Return #482

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ wasi-cap-std-sync = "0.34"
wasi-common = "0.34"
wasmtime = "0.34"
wasmtime-wasi = "0.34"
cap-std = "0.24.1"

[dev-dependencies]
wit-bindgen-wasmtime = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "2f46ce4cc072107153da0cefe15bdc69aa5b84d0" }
231 changes: 186 additions & 45 deletions crates/engine/src/io.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,32 @@
use cap_std::fs::File as CapFile;
use std::{
collections::HashSet,
fmt::Debug,
fs::{File, OpenOptions},
io::{LineWriter, Write},
path::PathBuf,
sync::{Arc, RwLock, RwLockReadGuard},
};
use wasi_common::{
pipe::{ReadPipe, WritePipe},
WasiFile,
};
use wasmtime_wasi::sync::file::File as WasmtimeFile;

/// Prepares a WASI pipe which writes to a memory buffer, optionally
/// copying to the specified output stream.
pub fn redirect_to_mem_buffer(
follow: Follow,
) -> (WritePipe<WriteDestinations>, Arc<RwLock<WriteDestinations>>) {
let immediate = follow.writer();

let buffer: Vec<u8> = vec![];
let std_dests = WriteDestinations { buffer, immediate };
let lock = Arc::new(RwLock::new(std_dests));
let std_pipe = WritePipe::from_shared(lock.clone());

(std_pipe, lock)
}

/// Which components should have their logs followed on stdout/stderr.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -38,15 +58,152 @@ pub trait OutputBuffers {
fn stderr(&self) -> &[u8];
}

/// Wrapper around File with a convenient PathBuf for cloning
pub struct PipeFile(pub File, pub PathBuf);

impl PipeFile {
/// Constructs an instance from a file, and the PathBuf to that file.
pub fn new(file: File, path: PathBuf) -> Self {
Self(file, path)
}
}

impl std::fmt::Debug for PipeFile {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PipeFile")
.field("File", &self.0)
.field("PathBuf", &self.1)
.finish()
}
}

impl Clone for PipeFile {
fn clone(&self) -> Self {
let f = OpenOptions::new()
.read(true)
.write(true)
.open(&self.1)
.unwrap();
Self(f, self.1.clone())
}
}

/// CustomIoPipes that can be passed to `ExecutionContextConfiguration`
/// to direct out and err
#[derive(Clone, Debug, Default)]
pub struct CustomLogPipes {
/// in pipe (file and pathbuf)
pub stdin_pipe: Option<PipeFile>,
/// out pipe (file and pathbuf)
pub stdout_pipe: Option<PipeFile>,
/// err pipe (file and pathbuf)
pub stderr_pipe: Option<PipeFile>,
}

impl CustomLogPipes {
/// Constructs an instance from a set of PipeFile objects.
pub fn new(
stdin_pipe: Option<PipeFile>,
stdout_pipe: Option<PipeFile>,
stderr_pipe: Option<PipeFile>,
) -> Self {
Self {
stdin_pipe,
stdout_pipe,
stderr_pipe,
}
}
}

/// Types of ModuleIoRedirects
#[derive(Clone, Debug)]
pub enum ModuleIoRedirectsTypes {
/// This will signal the executor to use `capture_io_to_memory_default()`
Default,
/// This will signal the executor to use `capture_io_to_memory_files()`
FromFiles(CustomLogPipes),
}

impl Default for ModuleIoRedirectsTypes {
fn default() -> Self {
Self::Default
}
}

/// A set of redirected standard I/O streams with which
/// a Wasm module is to be run.
pub struct ModuleIoRedirects {
/// pipes for ModuleIoRedirects
pub pipes: RedirectPipes,
/// read handles for ModuleIoRedirects
pub read_handles: RedirectReadHandles,
}

impl Default for ModuleIoRedirects {
fn default() -> Self {
Self::new(false)
}
}

impl ModuleIoRedirects {
/// Constructs the ModuleIoRedirects, and RedirectReadHandles instances the default way
pub fn new(follow: bool) -> Self {
let rrh = RedirectReadHandles::new(follow);

let in_stdpipe: Box<dyn WasiFile> = Box::new(ReadPipe::from(vec![]));
let out_stdpipe: Box<dyn WasiFile> = Box::new(WritePipe::from_shared(rrh.stdout.clone()));
let err_stdpipe: Box<dyn WasiFile> = Box::new(WritePipe::from_shared(rrh.stderr.clone()));

Self {
pipes: RedirectPipes {
stdin: in_stdpipe,
stdout: out_stdpipe,
stderr: err_stdpipe,
},
read_handles: rrh,
}
}

/// Constructs the ModuleIoRedirects, and RedirectReadHandles instances from `File`s directly
pub fn new_from_files(
stdin_file: Option<File>,
stdout_file: Option<File>,
stderr_file: Option<File>,
) -> Self {
let rrh = RedirectReadHandles::new(true);

let in_stdpipe: Box<dyn WasiFile> = match stdin_file {
Some(inf) => Box::new(WasmtimeFile::from_cap_std(CapFile::from_std(inf))),
None => Box::new(ReadPipe::from(vec![])),
};
let out_stdpipe: Box<dyn WasiFile> = match stdout_file {
Some(ouf) => Box::new(WasmtimeFile::from_cap_std(CapFile::from_std(ouf))),
None => Box::new(WritePipe::from_shared(rrh.stdout.clone())),
};
let err_stdpipe: Box<dyn WasiFile> = match stderr_file {
Some(erf) => Box::new(WasmtimeFile::from_cap_std(CapFile::from_std(erf))),
None => Box::new(WritePipe::from_shared(rrh.stderr.clone())),
};

Self {
pipes: RedirectPipes {
stdin: in_stdpipe,
stdout: out_stdpipe,
stderr: err_stdpipe,
},
read_handles: rrh,
}
}
}

/// Pipes from `ModuleIoRedirects`
pub struct RedirectPipes {
pub(crate) stdin: Box<dyn WasiFile>,
pub(crate) stdout: Box<dyn WasiFile>,
pub(crate) stderr: Box<dyn WasiFile>,
}

impl ModuleIoRedirects {
impl RedirectPipes {
/// Constructs an instance from a set of WasiFile objects.
pub fn new(
stdin: Box<dyn WasiFile>,
Expand All @@ -68,7 +225,35 @@ pub struct RedirectReadHandles {
stderr: Arc<RwLock<WriteDestinations>>,
}

impl Default for RedirectReadHandles {
fn default() -> Self {
Self::new(false)
}
}

impl RedirectReadHandles {
/// Creates a new RedirectReadHandles instance
pub fn new(follow: bool) -> Self {
let out_immediate = Follow::stdout(follow).writer();
let err_immediate = Follow::stderr(follow).writer();

let out_buffer: Vec<u8> = vec![];
let err_buffer: Vec<u8> = vec![];

let out_std_dests = WriteDestinations {
buffer: out_buffer,
immediate: out_immediate,
};
let err_std_dests = WriteDestinations {
buffer: err_buffer,
immediate: err_immediate,
};

Self {
stdout: Arc::new(RwLock::new(out_std_dests)),
stderr: Arc::new(RwLock::new(err_std_dests)),
}
}
/// Acquires a read lock for the in-memory output buffers.
pub fn read(&self) -> impl OutputBuffers + '_ {
RedirectReadHandlesLock {
Expand All @@ -92,35 +277,6 @@ impl<'a> OutputBuffers for RedirectReadHandlesLock<'a> {
}
}

/// Prepares WASI pipes which redirect a component's output to
/// memory buffers.
pub fn capture_io_to_memory(
follow_on_stdout: bool,
follow_on_stderr: bool,
) -> (ModuleIoRedirects, RedirectReadHandles) {
let stdout_follow = Follow::stdout(follow_on_stdout);
let stderr_follow = Follow::stderr(follow_on_stderr);

let stdin = ReadPipe::from(vec![]);

let (stdout_pipe, stdout_lock) = redirect_to_mem_buffer(stdout_follow);

let (stderr_pipe, stderr_lock) = redirect_to_mem_buffer(stderr_follow);

let redirects = ModuleIoRedirects {
stdin: Box::new(stdin),
stdout: Box::new(stdout_pipe),
stderr: Box::new(stderr_pipe),
};

let outputs = RedirectReadHandles {
stdout: stdout_lock,
stderr: stderr_lock,
};

(redirects, outputs)
}

/// Indicates whether a memory redirect should also pipe the output to
/// the console so it can be followed live.
pub enum Follow {
Expand Down Expand Up @@ -160,21 +316,6 @@ impl Follow {
}
}

/// Prepares a WASI pipe which writes to a memory buffer, optionally
/// copying to the specified output stream.
pub fn redirect_to_mem_buffer(
follow: Follow,
) -> (WritePipe<WriteDestinations>, Arc<RwLock<WriteDestinations>>) {
let immediate = follow.writer();

let buffer: Vec<u8> = vec![];
let std_dests = WriteDestinations { buffer, immediate };
let lock = Arc::new(RwLock::new(std_dests));
let std_pipe = WritePipe::from_shared(lock.clone());

(std_pipe, lock)
}

/// The destinations to which a component writes an output stream.
pub struct WriteDestinations {
buffer: Vec<u8>,
Expand Down
14 changes: 8 additions & 6 deletions crates/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub mod io;

use anyhow::{bail, Context, Result};
use host_component::{HostComponent, HostComponents, HostComponentsState};
use io::{ModuleIoRedirects, OutputBuffers};
use io::{ModuleIoRedirectsTypes, OutputBuffers, RedirectPipes};
use spin_config::{host_component::ComponentConfig, Resolver};
use spin_manifest::{Application, CoreComponent, DirectoryMount, ModuleSource};
use std::{collections::HashMap, io::Write, path::PathBuf, sync::Arc};
Expand All @@ -35,6 +35,8 @@ pub struct ExecutionContextConfiguration {
pub log_dir: Option<PathBuf>,
/// Application configuration resolver.
pub config_resolver: Option<Arc<Resolver>>,
/// The type of io redirects for the module (default, or files)
pub module_io_redirects: ModuleIoRedirectsTypes,
}

impl From<Application> for ExecutionContextConfiguration {
Expand Down Expand Up @@ -236,7 +238,7 @@ impl<T: Default> ExecutionContext<T> {
&self,
component: &str,
data: Option<T>,
io: Option<ModuleIoRedirects>,
io: Option<RedirectPipes>,
env: Option<HashMap<String, String>>,
args: Option<Vec<String>>,
) -> Result<(Store<RuntimeContext<T>>, Instance)> {
Expand All @@ -255,7 +257,7 @@ impl<T: Default> ExecutionContext<T> {
/// Save logs for a given component in the log directory on the host
pub fn save_output_to_logs(
&self,
io_redirects: impl OutputBuffers,
ior: impl OutputBuffers,
component: &str,
save_stdout: bool,
save_stderr: bool,
Expand Down Expand Up @@ -291,7 +293,7 @@ impl<T: Default> ExecutionContext<T> {
.append(true)
.create(true)
.open(stdout_filename)?;
let contents = io_redirects.stdout();
let contents = ior.stdout();
file.write_all(contents)?;
}

Expand All @@ -301,7 +303,7 @@ impl<T: Default> ExecutionContext<T> {
.append(true)
.create(true)
.open(stderr_filename)?;
let contents = io_redirects.stderr();
let contents = ior.stderr();
file.write_all(contents)?;
}

Expand All @@ -312,7 +314,7 @@ impl<T: Default> ExecutionContext<T> {
&self,
component: &Component<T>,
data: Option<T>,
io: Option<ModuleIoRedirects>,
io: Option<RedirectPipes>,
env: Option<HashMap<String, String>>,
args: Option<Vec<String>>,
) -> Result<Store<RuntimeContext<T>>> {
Expand Down
2 changes: 1 addition & 1 deletion crates/http/benches/spin-http-benchmark/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading