diff --git a/Cargo.lock b/Cargo.lock index 3615a2d84d..849b41f999 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3488,6 +3488,7 @@ version = "0.2.0" dependencies = [ "anyhow", "bytes 1.1.0", + "cap-std", "dirs 4.0.0", "sanitize-filename", "spin-config", diff --git a/crates/engine/Cargo.toml b/crates/engine/Cargo.toml index 188370861a..222890141e 100644 --- a/crates/engine/Cargo.toml +++ b/crates/engine/Cargo.toml @@ -19,6 +19,7 @@ wasi-cap-std-sync = "0.34" wasi-common = "0.34" wasmtime = "0.34" wasmtime-wasi = "0.34" +cap-std = "0.24.1" [dev-dependencies] wit-bindgen-wasmtime = { git = "https://github.com/bytecodealliance/wit-bindgen", rev = "2f46ce4cc072107153da0cefe15bdc69aa5b84d0" } diff --git a/crates/engine/src/io.rs b/crates/engine/src/io.rs index cfa1d091d3..ee96d42ef0 100644 --- a/crates/engine/src/io.rs +++ b/crates/engine/src/io.rs @@ -1,12 +1,32 @@ +use cap_std::fs::File as CapFile; use std::{ collections::HashSet, + fmt::Debug, + fs::{File, OpenOptions}, io::{LineWriter, Write}, + path::PathBuf, sync::{Arc, RwLock, RwLockReadGuard}, }; use wasi_common::{ pipe::{ReadPipe, WritePipe}, WasiFile, }; +use wasmtime_wasi::sync::file::File as WasmtimeFile; + +/// Prepares a WASI pipe which writes to a memory buffer, optionally +/// copying to the specified output stream. +pub fn redirect_to_mem_buffer( + follow: Follow, +) -> (WritePipe, Arc>) { + let immediate = follow.writer(); + + let buffer: Vec = vec![]; + let std_dests = WriteDestinations { buffer, immediate }; + let lock = Arc::new(RwLock::new(std_dests)); + let std_pipe = WritePipe::from_shared(lock.clone()); + + (std_pipe, lock) +} /// Which components should have their logs followed on stdout/stderr. #[derive(Clone, Debug)] @@ -38,15 +58,152 @@ pub trait OutputBuffers { fn stderr(&self) -> &[u8]; } +/// Wrapper around File with a convenient PathBuf for cloning +pub struct PipeFile(pub File, pub PathBuf); + +impl PipeFile { + /// Constructs an instance from a file, and the PathBuf to that file. + pub fn new(file: File, path: PathBuf) -> Self { + Self(file, path) + } +} + +impl std::fmt::Debug for PipeFile { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PipeFile") + .field("File", &self.0) + .field("PathBuf", &self.1) + .finish() + } +} + +impl Clone for PipeFile { + fn clone(&self) -> Self { + let f = OpenOptions::new() + .read(true) + .write(true) + .open(&self.1) + .unwrap(); + Self(f, self.1.clone()) + } +} + +/// CustomIoPipes that can be passed to `ExecutionContextConfiguration` +/// to direct out and err +#[derive(Clone, Debug, Default)] +pub struct CustomLogPipes { + /// in pipe (file and pathbuf) + pub stdin_pipe: Option, + /// out pipe (file and pathbuf) + pub stdout_pipe: Option, + /// err pipe (file and pathbuf) + pub stderr_pipe: Option, +} + +impl CustomLogPipes { + /// Constructs an instance from a set of PipeFile objects. + pub fn new( + stdin_pipe: Option, + stdout_pipe: Option, + stderr_pipe: Option, + ) -> Self { + Self { + stdin_pipe, + stdout_pipe, + stderr_pipe, + } + } +} + +/// Types of ModuleIoRedirects +#[derive(Clone, Debug)] +pub enum ModuleIoRedirectsTypes { + /// This will signal the executor to use `capture_io_to_memory_default()` + Default, + /// This will signal the executor to use `capture_io_to_memory_files()` + FromFiles(CustomLogPipes), +} + +impl Default for ModuleIoRedirectsTypes { + fn default() -> Self { + Self::Default + } +} + /// A set of redirected standard I/O streams with which /// a Wasm module is to be run. pub struct ModuleIoRedirects { + /// pipes for ModuleIoRedirects + pub pipes: RedirectPipes, + /// read handles for ModuleIoRedirects + pub read_handles: RedirectReadHandles, +} + +impl Default for ModuleIoRedirects { + fn default() -> Self { + Self::new(false) + } +} + +impl ModuleIoRedirects { + /// Constructs the ModuleIoRedirects, and RedirectReadHandles instances the default way + pub fn new(follow: bool) -> Self { + let rrh = RedirectReadHandles::new(follow); + + let in_stdpipe: Box = Box::new(ReadPipe::from(vec![])); + let out_stdpipe: Box = Box::new(WritePipe::from_shared(rrh.stdout.clone())); + let err_stdpipe: Box = Box::new(WritePipe::from_shared(rrh.stderr.clone())); + + Self { + pipes: RedirectPipes { + stdin: in_stdpipe, + stdout: out_stdpipe, + stderr: err_stdpipe, + }, + read_handles: rrh, + } + } + + /// Constructs the ModuleIoRedirects, and RedirectReadHandles instances from `File`s directly + pub fn new_from_files( + stdin_file: Option, + stdout_file: Option, + stderr_file: Option, + ) -> Self { + let rrh = RedirectReadHandles::new(true); + + let in_stdpipe: Box = match stdin_file { + Some(inf) => Box::new(WasmtimeFile::from_cap_std(CapFile::from_std(inf))), + None => Box::new(ReadPipe::from(vec![])), + }; + let out_stdpipe: Box = match stdout_file { + Some(ouf) => Box::new(WasmtimeFile::from_cap_std(CapFile::from_std(ouf))), + None => Box::new(WritePipe::from_shared(rrh.stdout.clone())), + }; + let err_stdpipe: Box = match stderr_file { + Some(erf) => Box::new(WasmtimeFile::from_cap_std(CapFile::from_std(erf))), + None => Box::new(WritePipe::from_shared(rrh.stderr.clone())), + }; + + Self { + pipes: RedirectPipes { + stdin: in_stdpipe, + stdout: out_stdpipe, + stderr: err_stdpipe, + }, + read_handles: rrh, + } + } +} + +/// Pipes from `ModuleIoRedirects` +pub struct RedirectPipes { pub(crate) stdin: Box, pub(crate) stdout: Box, pub(crate) stderr: Box, } -impl ModuleIoRedirects { +impl RedirectPipes { /// Constructs an instance from a set of WasiFile objects. pub fn new( stdin: Box, @@ -68,7 +225,35 @@ pub struct RedirectReadHandles { stderr: Arc>, } +impl Default for RedirectReadHandles { + fn default() -> Self { + Self::new(false) + } +} + impl RedirectReadHandles { + /// Creates a new RedirectReadHandles instance + pub fn new(follow: bool) -> Self { + let out_immediate = Follow::stdout(follow).writer(); + let err_immediate = Follow::stderr(follow).writer(); + + let out_buffer: Vec = vec![]; + let err_buffer: Vec = vec![]; + + let out_std_dests = WriteDestinations { + buffer: out_buffer, + immediate: out_immediate, + }; + let err_std_dests = WriteDestinations { + buffer: err_buffer, + immediate: err_immediate, + }; + + Self { + stdout: Arc::new(RwLock::new(out_std_dests)), + stderr: Arc::new(RwLock::new(err_std_dests)), + } + } /// Acquires a read lock for the in-memory output buffers. pub fn read(&self) -> impl OutputBuffers + '_ { RedirectReadHandlesLock { @@ -92,35 +277,6 @@ impl<'a> OutputBuffers for RedirectReadHandlesLock<'a> { } } -/// Prepares WASI pipes which redirect a component's output to -/// memory buffers. -pub fn capture_io_to_memory( - follow_on_stdout: bool, - follow_on_stderr: bool, -) -> (ModuleIoRedirects, RedirectReadHandles) { - let stdout_follow = Follow::stdout(follow_on_stdout); - let stderr_follow = Follow::stderr(follow_on_stderr); - - let stdin = ReadPipe::from(vec![]); - - let (stdout_pipe, stdout_lock) = redirect_to_mem_buffer(stdout_follow); - - let (stderr_pipe, stderr_lock) = redirect_to_mem_buffer(stderr_follow); - - let redirects = ModuleIoRedirects { - stdin: Box::new(stdin), - stdout: Box::new(stdout_pipe), - stderr: Box::new(stderr_pipe), - }; - - let outputs = RedirectReadHandles { - stdout: stdout_lock, - stderr: stderr_lock, - }; - - (redirects, outputs) -} - /// Indicates whether a memory redirect should also pipe the output to /// the console so it can be followed live. pub enum Follow { @@ -160,21 +316,6 @@ impl Follow { } } -/// Prepares a WASI pipe which writes to a memory buffer, optionally -/// copying to the specified output stream. -pub fn redirect_to_mem_buffer( - follow: Follow, -) -> (WritePipe, Arc>) { - let immediate = follow.writer(); - - let buffer: Vec = vec![]; - let std_dests = WriteDestinations { buffer, immediate }; - let lock = Arc::new(RwLock::new(std_dests)); - let std_pipe = WritePipe::from_shared(lock.clone()); - - (std_pipe, lock) -} - /// The destinations to which a component writes an output stream. pub struct WriteDestinations { buffer: Vec, diff --git a/crates/engine/src/lib.rs b/crates/engine/src/lib.rs index ab9e03e50f..c704c03651 100644 --- a/crates/engine/src/lib.rs +++ b/crates/engine/src/lib.rs @@ -9,7 +9,7 @@ pub mod io; use anyhow::{bail, Context, Result}; use host_component::{HostComponent, HostComponents, HostComponentsState}; -use io::{ModuleIoRedirects, OutputBuffers}; +use io::{ModuleIoRedirectsTypes, OutputBuffers, RedirectPipes}; use spin_config::{host_component::ComponentConfig, Resolver}; use spin_manifest::{Application, CoreComponent, DirectoryMount, ModuleSource}; use std::{collections::HashMap, io::Write, path::PathBuf, sync::Arc}; @@ -35,6 +35,8 @@ pub struct ExecutionContextConfiguration { pub log_dir: Option, /// Application configuration resolver. pub config_resolver: Option>, + /// The type of io redirects for the module (default, or files) + pub module_io_redirects: ModuleIoRedirectsTypes, } impl From for ExecutionContextConfiguration { @@ -236,7 +238,7 @@ impl ExecutionContext { &self, component: &str, data: Option, - io: Option, + io: Option, env: Option>, args: Option>, ) -> Result<(Store>, Instance)> { @@ -255,7 +257,7 @@ impl ExecutionContext { /// Save logs for a given component in the log directory on the host pub fn save_output_to_logs( &self, - io_redirects: impl OutputBuffers, + ior: impl OutputBuffers, component: &str, save_stdout: bool, save_stderr: bool, @@ -291,7 +293,7 @@ impl ExecutionContext { .append(true) .create(true) .open(stdout_filename)?; - let contents = io_redirects.stdout(); + let contents = ior.stdout(); file.write_all(contents)?; } @@ -301,7 +303,7 @@ impl ExecutionContext { .append(true) .create(true) .open(stderr_filename)?; - let contents = io_redirects.stderr(); + let contents = ior.stderr(); file.write_all(contents)?; } @@ -312,7 +314,7 @@ impl ExecutionContext { &self, component: &Component, data: Option, - io: Option, + io: Option, env: Option>, args: Option>, ) -> Result>> { diff --git a/crates/http/benches/spin-http-benchmark/Cargo.lock b/crates/http/benches/spin-http-benchmark/Cargo.lock index 23b6fbc243..13958bc935 100644 --- a/crates/http/benches/spin-http-benchmark/Cargo.lock +++ b/crates/http/benches/spin-http-benchmark/Cargo.lock @@ -77,7 +77,7 @@ dependencies = [ [[package]] name = "spin-http-benchmark" -version = "0.1.0" +version = "0.2.0" dependencies = [ "wit-bindgen-rust", ] diff --git a/crates/http/src/spin.rs b/crates/http/src/spin.rs index 524edb32f1..07517ffb65 100644 --- a/crates/http/src/spin.rs +++ b/crates/http/src/spin.rs @@ -6,7 +6,7 @@ use anyhow::Result; use async_trait::async_trait; use http::Uri; use hyper::{Body, Request, Response}; -use spin_engine::io::capture_io_to_memory; +use spin_engine::io::{ModuleIoRedirects, ModuleIoRedirectsTypes}; use std::{net::SocketAddr, str, str::FromStr}; use tokio::task::spawn_blocking; use tracing::log; @@ -32,16 +32,27 @@ impl HttpExecutor for SpinHttpExecutor { component ); - let (redirects, outputs) = capture_io_to_memory(follow, follow); + let mior = if let ModuleIoRedirectsTypes::FromFiles(clp) = + engine.config.module_io_redirects.clone() + { + ModuleIoRedirects::new_from_files( + clp.stdin_pipe.map(|inp| inp.0), + clp.stdout_pipe.map(|oup| oup.0), + clp.stderr_pipe.map(|erp| erp.0), + ) + } else { + ModuleIoRedirects::new(follow) + }; let (store, instance) = - engine.prepare_component(component, None, Some(redirects), None, None)?; + engine.prepare_component(component, None, Some(mior.pipes), None, None)?; let resp_result = Self::execute_impl(store, instance, base, raw_route, req) .await .map_err(contextualise_err); - let log_result = engine.save_output_to_logs(outputs.read(), component, true, true); + let log_result = + engine.save_output_to_logs(mior.read_handles.read(), component, true, true); // Defer checking for failures until here so that the logging runs // even if the guest code fails. (And when checking, check the guest diff --git a/crates/http/src/wagi.rs b/crates/http/src/wagi.rs index 5f3003970f..86a5dc76fb 100644 --- a/crates/http/src/wagi.rs +++ b/crates/http/src/wagi.rs @@ -3,7 +3,7 @@ use anyhow::Result; use async_trait::async_trait; use hyper::{body, Body, Request, Response}; use spin_engine::io::{ - redirect_to_mem_buffer, Follow, ModuleIoRedirects, OutputBuffers, WriteDestinations, + redirect_to_mem_buffer, Follow, OutputBuffers, RedirectPipes, WriteDestinations, }; use spin_manifest::WagiConfig; use std::{ @@ -124,7 +124,7 @@ impl WagiHttpExecutor { fn streams_from_body( body: Vec, follow_on_stderr: bool, - ) -> (ModuleIoRedirects, WagiRedirectReadHandles) { + ) -> (RedirectPipes, WagiRedirectReadHandles) { let stdin = ReadPipe::from(body); let stdout_buf = vec![]; @@ -133,7 +133,7 @@ impl WagiHttpExecutor { let (stderr_pipe, stderr_lock) = redirect_to_mem_buffer(Follow::stderr(follow_on_stderr)); - let rd = ModuleIoRedirects::new( + let rd = RedirectPipes::new( Box::new(stdin), Box::new(stdout_pipe), Box::new(stderr_pipe), diff --git a/crates/redis/src/spin.rs b/crates/redis/src/spin.rs index 3b258151c8..381e8393a2 100644 --- a/crates/redis/src/spin.rs +++ b/crates/redis/src/spin.rs @@ -1,7 +1,7 @@ use crate::{spin_redis::SpinRedis, ExecutionContext, RedisExecutor, RuntimeContext}; use anyhow::Result; use async_trait::async_trait; -use spin_engine::io::capture_io_to_memory; +use spin_engine::io::{ModuleIoRedirects, ModuleIoRedirectsTypes}; use tokio::task::spawn_blocking; use wasmtime::{Instance, Store}; @@ -23,10 +23,20 @@ impl RedisExecutor for SpinRedisExecutor { component ); - let (redirects, outputs) = capture_io_to_memory(follow, follow); + let mior = if let ModuleIoRedirectsTypes::FromFiles(clp) = + engine.config.module_io_redirects.clone() + { + ModuleIoRedirects::new_from_files( + clp.stdin_pipe.map(|inp| inp.0), + clp.stdout_pipe.map(|oup| oup.0), + clp.stderr_pipe.map(|erp| erp.0), + ) + } else { + ModuleIoRedirects::new(follow) + }; let (store, instance) = - engine.prepare_component(component, None, Some(redirects), None, None)?; + engine.prepare_component(component, None, Some(mior.pipes), None, None)?; let result = match Self::execute_impl(store, instance, channel, payload.to_vec()).await { Ok(()) => { @@ -39,7 +49,8 @@ impl RedisExecutor for SpinRedisExecutor { } }; - let log_result = engine.save_output_to_logs(outputs.read(), component, true, true); + let log_result = + engine.save_output_to_logs(mior.read_handles.read(), component, true, true); result.and(log_result) } diff --git a/crates/trigger/src/lib.rs b/crates/trigger/src/lib.rs index 786f9b1a4b..eef9cbfa02 100644 --- a/crates/trigger/src/lib.rs +++ b/crates/trigger/src/lib.rs @@ -3,7 +3,8 @@ use std::{error::Error, path::PathBuf}; use anyhow::{Context, Result}; use async_trait::async_trait; use spin_engine::{ - io::FollowComponents, Builder, Engine, ExecutionContext, ExecutionContextConfiguration, + io::{FollowComponents, ModuleIoRedirectsTypes}, + Builder, Engine, ExecutionContext, ExecutionContextConfiguration, }; use spin_manifest::{Application, ApplicationTrigger, ComponentMap, TriggerConfig}; @@ -73,6 +74,7 @@ where label: app.info.name, log_dir, config_resolver: app.config_resolver, + module_io_redirects: ModuleIoRedirectsTypes::default(), }; let mut builder = match wasmtime_config { Some(wasmtime_config) => {