diff --git a/.github/workflows/moon.yml b/.github/workflows/moon.yml index 993ae74b08b..2851a81003d 100644 --- a/.github/workflows/moon.yml +++ b/.github/workflows/moon.yml @@ -37,9 +37,11 @@ jobs: cache-base: '^(master|develop-)' - run: cargo run -- --color --log trace ci --base ${{ github.base_ref || 'master' }} env: + DEPOT_TOKEN: ${{ secrets.DEPOT_TOKEN }} MOON_NODE_VERSION: ${{ matrix.node-version }} MOONBASE_SECRET_KEY: ${{ secrets.MOONBASE_SECRET_KEY }} MOONBASE_ACCESS_KEY: ${{ secrets.MOONBASE_ACCESS_KEY }} + RUST_BACKTRACE: '1' - uses: moonrepo/run-report-action@v1 if: success() || failure() with: diff --git a/.moon/workspace.yml b/.moon/workspace.yml index a463d2809ea..3ab758bac69 100644 --- a/.moon/workspace.yml +++ b/.moon/workspace.yml @@ -35,17 +35,22 @@ docker: include: - '*.config.js' - '*.json' -# unstable_remote: -# host: 'http://0.0.0.0:8080' -# # host: 'grpc://0.0.0.0:9092' -# cache: -# compression: 'zstd' -# mtls: -# caCert: 'crates/remote/tests/__fixtures__/certs-local/ca.pem' -# clientCert: 'crates/remote/tests/__fixtures__/certs-local/client.pem' -# clientKey: 'crates/remote/tests/__fixtures__/certs-local/client.key' -# domain: 'localhost' -# tls: -# # assumeHttp2: true -# cert: 'crates/remote/tests/__fixtures__/certs-local/ca.pem' -# # domain: 'localhost' + +unstable_remote: + host: 'grpcs://cache.depot.dev' + auth: + token: 'DEPOT_TOKEN' + headers: + 'X-Depot-Org': '1xtpjd084j' + 'X-Depot-Project': '90xxfkst9n' + # cache: + # compression: 'zstd' + # mtls: + # caCert: 'crates/remote/tests/__fixtures__/certs-local/ca.pem' + # clientCert: 'crates/remote/tests/__fixtures__/certs-local/client.pem' + # clientKey: 'crates/remote/tests/__fixtures__/certs-local/client.key' + # domain: 'localhost' + # tls: + # # assumeHttp2: true + # cert: 'crates/remote/tests/__fixtures__/certs-local/client.pem' + # domain: 'localhost' diff --git a/.yarn/versions/c89cc34c.yml b/.yarn/versions/c89cc34c.yml index 28fac241db0..0e53606a2e7 100644 --- a/.yarn/versions/c89cc34c.yml +++ b/.yarn/versions/c89cc34c.yml @@ -1,9 +1,16 @@ releases: - "@moonrepo/cli": minor - "@moonrepo/core-linux-arm64-gnu": minor - "@moonrepo/core-linux-arm64-musl": minor - "@moonrepo/core-linux-x64-gnu": minor - "@moonrepo/core-linux-x64-musl": minor - "@moonrepo/core-macos-arm64": minor - "@moonrepo/core-macos-x64": minor - "@moonrepo/core-windows-x64-msvc": minor + '@moonrepo/cli': minor + '@moonrepo/core-linux-arm64-gnu': minor + '@moonrepo/core-linux-arm64-musl': minor + '@moonrepo/core-linux-x64-gnu': minor + '@moonrepo/core-linux-x64-musl': minor + '@moonrepo/core-macos-arm64': minor + '@moonrepo/core-macos-x64': minor + '@moonrepo/core-windows-x64-msvc': minor + '@moonrepo/types': minor + +declined: + - '@moonrepo/nx-compat' + - '@moonrepo/report' + - '@moonrepo/runtime' + - website diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ad00ae61c0..ff5fdea4a86 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,17 @@ # Changelog +## Unreleased + +#### 🚀 Updates + +- Updated our unstable remote service (Bazel RE API) with new functionality: + - You can now use `http(s)` protocols for gRPC servers, instead of just `grpc(s)`. + - Added an `unstable_remote.api` setting, which can be used to inform the server's API format. + Defaults to `grpc`. + - Added an `unstable_remote.auth` setting, which can be used for HTTP Bearer/token Authorization + based endpoints. Can also be used to set headers for all requests. + - Added support for Depot cloud-based caching: https://depot.dev/docs/cache/overview + ## 1.31.3 #### 🐞 Fixes diff --git a/Cargo.lock b/Cargo.lock index 8544d16f453..26e4eb67fe1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4303,11 +4303,13 @@ version = "0.0.1" dependencies = [ "async-trait", "bazel-remote-apis", + "bincode", "chrono", "miette 7.4.0", "moon_action", "moon_common", "moon_config", + "moon_task", "reqwest", "rustc-hash", "scc", diff --git a/crates/config/src/workspace/remote_config.rs b/crates/config/src/workspace/remote_config.rs index 2faf86cf228..3642308d9e5 100644 --- a/crates/config/src/workspace/remote_config.rs +++ b/crates/config/src/workspace/remote_config.rs @@ -1,4 +1,5 @@ use crate::portable_path::FilePath; +use rustc_hash::FxHashMap; use schematic::{derive_enum, validate, Config, ConfigEnum, ValidateError, ValidateResult}; fn path_is_required( @@ -15,6 +16,27 @@ fn path_is_required( } derive_enum!( + /// The API format of the remote service. + #[derive(Copy, ConfigEnum, Default)] + pub enum RemoteApi { + /// gRPC endpoints. + #[default] + Grpc, + } +); + +/// Configures basic HTTP authentication. +#[derive(Clone, Config, Debug)] +pub struct RemoteAuthConfig { + /// HTTP headers to inject into every request. + pub headers: FxHashMap, + + /// The name of an environment variable to use as a bearer token. + pub token: Option, +} + +derive_enum!( + /// Supported blob compression levels. #[derive(Copy, ConfigEnum, Default)] pub enum RemoteCompression { /// No compression. @@ -81,6 +103,13 @@ pub struct RemoteMtlsConfig { /// Configures the remote service, powered by the Bazel Remote Execution API. #[derive(Clone, Config, Debug)] pub struct RemoteConfig { + /// The API format of the remote service. + pub api: RemoteApi, + + /// Connect to the host using basic HTTP authentication. + #[setting(nested)] + pub auth: Option, + /// Configures the action cache (AC) and content addressable cache (CAS). #[setting(nested)] pub cache: RemoteCacheConfig, @@ -101,11 +130,19 @@ pub struct RemoteConfig { } impl RemoteConfig { + pub fn is_bearer_auth(&self) -> bool { + self.auth.as_ref().is_some_and(|auth| auth.token.is_some()) + } + pub fn is_localhost(&self) -> bool { self.host.contains("localhost") || self.host.contains("0.0.0.0") } pub fn is_secure(&self) -> bool { - self.tls.is_some() || self.mtls.is_some() + self.is_bearer_auth() || self.tls.is_some() || self.mtls.is_some() + } + + pub fn is_secure_protocol(&self) -> bool { + self.host.starts_with("https") || self.host.starts_with("grpcs") } } diff --git a/crates/remote/Cargo.toml b/crates/remote/Cargo.toml index 1e7f184ae07..868ef3df692 100644 --- a/crates/remote/Cargo.toml +++ b/crates/remote/Cargo.toml @@ -8,8 +8,10 @@ publish = false moon_action = { path = "../action" } moon_common = { path = "../common" } moon_config = { path = "../config" } +moon_task = { path = "../task" } async-trait = { workspace = true } bazel-remote-apis = { version = "0.12.0", features = ["serde"] } +bincode = "1.3.3" chrono = { workspace = true } miette = { workspace = true } reqwest = { workspace = true, features = ["json"] } diff --git a/crates/remote/src/action_state.rs b/crates/remote/src/action_state.rs new file mode 100644 index 00000000000..d01199551be --- /dev/null +++ b/crates/remote/src/action_state.rs @@ -0,0 +1,145 @@ +use crate::fs_digest::*; +use bazel_remote_apis::build::bazel::remote::execution::v2::{ + command, platform, Action, ActionResult, Command, Digest, ExecutedActionMetadata, +}; +use miette::IntoDiagnostic; +use moon_action::Operation; +use moon_task::Task; +use std::collections::BTreeMap; +use std::path::Path; + +pub struct ActionState<'task> { + task: &'task Task, + + // RE API + pub action: Option, + pub action_result: Option, + pub command: Option, + pub digest: Digest, + + // To upload + pub blobs: Vec, +} + +impl ActionState<'_> { + pub fn new(digest: Digest, task: &Task) -> ActionState<'_> { + ActionState { + task, + action: None, + action_result: None, + command: None, + digest, + blobs: vec![], + } + } + + pub fn create_action_from_task(&mut self) { + let mut action = Action { + command_digest: Some(self.digest.clone()), + do_not_cache: !self.task.options.cache, + input_root_digest: None, // TODO? + ..Default::default() + }; + + // https://github.com/bazelbuild/remote-apis/blob/main/build/bazel/remote/execution/v2/platform.md + if let Some(os_list) = &self.task.options.os { + let platform = action.platform.get_or_insert_default(); + + for os in os_list { + platform.properties.push(platform::Property { + name: "OSFamily".into(), + value: os.to_string(), + }); + } + } + + // Since we don't support (or plan to) remote execution, + // then we can ignore all the working directory logic + let mut command = Command { + arguments: vec![self.task.command.clone()], + output_paths: vec![], // TODO + ..Default::default() + }; + + command.arguments.extend(self.task.args.clone()); + + for (name, value) in BTreeMap::from_iter(self.task.env.clone()) { + command + .environment_variables + .push(command::EnvironmentVariable { name, value }); + } + + self.action = Some(action); + self.command = Some(command); + } + + pub fn create_action_result_from_operation( + &mut self, + operation: &Operation, + ) -> miette::Result<()> { + let mut result = ActionResult { + execution_metadata: Some(ExecutedActionMetadata { + worker: "moon".into(), + execution_start_timestamp: create_timestamp_from_naive(operation.started_at), + execution_completed_timestamp: operation + .finished_at + .and_then(create_timestamp_from_naive), + ..Default::default() + }), + ..Default::default() + }; + + if let Some(exec) = operation.get_output() { + result.exit_code = exec.exit_code.unwrap_or_default(); + + if let Some(stderr) = &exec.stderr { + let blob = Blob::new(stderr.as_bytes().to_owned()); + + result.stderr_digest = Some(blob.digest.clone()); + self.blobs.push(blob); + } + + if let Some(stdout) = &exec.stdout { + let blob = Blob::new(stdout.as_bytes().to_owned()); + + result.stdout_digest = Some(blob.digest.clone()); + self.blobs.push(blob); + } + } + + self.action_result = Some(result); + + Ok(()) + } + + pub fn set_action_result(&mut self, result: ActionResult) { + self.action_result = Some(result); + } + + pub fn compute_outputs(&mut self, workspace_root: &Path) -> miette::Result<()> { + let mut outputs = OutputDigests::default(); + + for path in self.task.get_output_files(workspace_root, true)? { + outputs.insert_relative_path(path, workspace_root)?; + } + + if let Some(result) = &mut self.action_result { + result.output_files = outputs.files; + result.output_symlinks = outputs.symlinks; + result.output_directories = outputs.dirs; + self.blobs.extend(outputs.blobs); + } + + Ok(()) + } + + pub fn get_command_as_bytes(&self) -> miette::Result> { + bincode::serialize(&self.command).into_diagnostic() + } + + pub fn prepare_for_upload(&mut self) -> Option<(ActionResult, Vec)> { + self.action_result + .take() + .map(|result| (result, self.blobs.drain(0..).collect::>())) + } +} diff --git a/crates/remote/src/compression.rs b/crates/remote/src/compression.rs index 669328df00a..3e7441c6224 100644 --- a/crates/remote/src/compression.rs +++ b/crates/remote/src/compression.rs @@ -23,6 +23,21 @@ pub fn get_compressor(compression: RemoteCompression) -> i32 { } } +#[allow(dead_code)] +pub fn get_compression_from_value(compressor: compressor::Value) -> RemoteCompression { + match compressor { + compressor::Value::Zstd => RemoteCompression::Zstd, + _ => RemoteCompression::None, + } +} + +pub fn get_compression_from_code(compressor: i32) -> RemoteCompression { + match compressor { + zstd if zstd == compressor::Value::Zstd as i32 => RemoteCompression::Zstd, + _ => RemoteCompression::None, + } +} + pub fn compress_blob(compression: RemoteCompression, bytes: Vec) -> miette::Result> { let result = match compression { RemoteCompression::None => Ok(bytes), diff --git a/crates/remote/src/fs_digest.rs b/crates/remote/src/fs_digest.rs index 1687a124cc9..8b07edd3236 100644 --- a/crates/remote/src/fs_digest.rs +++ b/crates/remote/src/fs_digest.rs @@ -3,7 +3,7 @@ use bazel_remote_apis::build::bazel::remote::execution::v2::{ Digest, NodeProperties, OutputDirectory, OutputFile, OutputSymlink, }; -use bazel_remote_apis::google::protobuf::{Timestamp, UInt32Value}; +use bazel_remote_apis::google::protobuf::Timestamp; use chrono::NaiveDateTime; use moon_common::path::{PathExt, WorkspaceRelativePathBuf}; use sha2::{Digest as Sha256Digest, Sha256}; @@ -14,7 +14,6 @@ use std::{ path::{Path, PathBuf}, time::{Duration, SystemTime, UNIX_EPOCH}, }; -use tracing::instrument; pub struct Blob { pub bytes: Vec, @@ -77,6 +76,7 @@ pub fn compute_node_properties(metadata: &Metadata) -> NodeProperties { #[cfg(unix)] { + use bazel_remote_apis::google::protobuf::UInt32Value; use std::os::unix::fs::PermissionsExt; props.unix_mode = Some(UInt32Value { @@ -157,20 +157,6 @@ impl OutputDigests { } } -#[instrument] -pub fn compute_digests_for_outputs( - paths: Vec, - workspace_root: &Path, -) -> miette::Result { - let mut result = OutputDigests::default(); - - for path in paths { - result.insert_relative_path(path, workspace_root)?; - } - - Ok(result) -} - fn apply_node_properties(path: &Path, props: &NodeProperties) -> miette::Result<()> { if let Some(mtime) = &props.mtime { let modified = Duration::new(mtime.seconds as u64, mtime.nanos as u32); diff --git a/crates/remote/src/grpc_remote_client.rs b/crates/remote/src/grpc_remote_client.rs index 456855c9ec9..e3d8bc32fa2 100644 --- a/crates/remote/src/grpc_remote_client.rs +++ b/crates/remote/src/grpc_remote_client.rs @@ -7,40 +7,95 @@ use bazel_remote_apis::build::bazel::remote::execution::v2::{ action_cache_client::ActionCacheClient, batch_update_blobs_request, capabilities_client::CapabilitiesClient, content_addressable_storage_client::ContentAddressableStorageClient, digest_function, - ActionResult, BatchReadBlobsRequest, BatchUpdateBlobsRequest, Digest, GetActionResultRequest, - GetCapabilitiesRequest, ServerCapabilities, UpdateActionResultRequest, + ActionResult, BatchReadBlobsRequest, BatchUpdateBlobsRequest, Digest, FindMissingBlobsRequest, + GetActionResultRequest, GetCapabilitiesRequest, ServerCapabilities, UpdateActionResultRequest, }; +use miette::IntoDiagnostic; use moon_common::color; -use moon_config::{RemoteCompression, RemoteConfig}; -use std::{error::Error, path::Path}; +use moon_config::RemoteConfig; +use std::{env, path::Path, str::FromStr}; use tonic::{ + metadata::{KeyAndValueRef, MetadataKey, MetadataMap, MetadataValue}, transport::{Channel, Endpoint}, - Code, + Code, Request, Status, }; use tracing::{trace, warn}; fn map_transport_error(error: tonic::transport::Error) -> RemoteError { + // dbg!(&error); RemoteError::GrpcConnectFailed { error: Box::new(error), } } fn map_status_error(error: tonic::Status) -> RemoteError { - match error.source() { - Some(src) => RemoteError::GrpcCallFailedViaSource { - error: src.to_string(), - }, - None => RemoteError::GrpcCallFailed { - error: Box::new(error), - }, + // dbg!(&error); + RemoteError::GrpcCallFailed { + error: Box::new(error), } } #[derive(Default)] pub struct GrpcRemoteClient { channel: Option, - compression: RemoteCompression, - instance_name: String, + config: RemoteConfig, + headers: MetadataMap, +} + +impl GrpcRemoteClient { + fn extract_headers(&mut self) -> miette::Result { + let mut enabled = true; + + if let Some(auth) = &self.config.auth { + for (key, value) in &auth.headers { + self.headers.insert( + MetadataKey::from_str(key).into_diagnostic()?, + MetadataValue::from_str(value).into_diagnostic()?, + ); + } + + if let Some(token_name) = &auth.token { + let token = env::var(token_name).unwrap_or_default(); + + if token.is_empty() { + enabled = false; + + warn!( + "Auth token {} does not exist, unable to authorize for remote service", + color::property(token_name) + ); + } else { + self.headers.insert( + MetadataKey::from_str("Authorization").into_diagnostic()?, + MetadataValue::from_str(&format!("Bearer {token}")).into_diagnostic()?, + ); + } + } + } + + Ok(enabled) + } + + fn inject_auth_headers(&self, mut req: Request<()>) -> Result, Status> { + if self.headers.is_empty() { + return Ok(req); + } + + let headers = req.metadata_mut(); + + for entry in self.headers.iter() { + match entry { + KeyAndValueRef::Ascii(key, value) => { + headers.insert(key.clone(), value.clone()); + } + KeyAndValueRef::Binary(key, value) => { + headers.insert_bin(key.clone(), value.clone()); + } + }; + } + + Ok(req) + } } #[async_trait::async_trait] @@ -49,7 +104,7 @@ impl RemoteClient for GrpcRemoteClient { &mut self, config: &RemoteConfig, workspace_root: &Path, - ) -> miette::Result<()> { + ) -> miette::Result { let host = &config.host; trace!( @@ -60,12 +115,22 @@ impl RemoteClient for GrpcRemoteClient { "(with mTLS)" } else if config.tls.is_some() { "(with TLS)" + } else if config.is_bearer_auth() { + "(with auth)" } else { "(insecure)" } ); - let mut endpoint = Endpoint::from_shared(host.to_owned()) + // Although we use a grpc(s) protocol for the host, + // tonic only supports http(s), so change it + let url = if let Some(suffix) = host.strip_prefix("grpc") { + format!("http{suffix}") + } else { + host.to_owned() + }; + + let mut endpoint = Endpoint::from_shared(url) .map_err(map_transport_error)? .user_agent("moon") .map_err(map_transport_error)? @@ -79,6 +144,10 @@ impl RemoteClient for GrpcRemoteClient { endpoint = endpoint .tls_config(create_tls_config(tls, workspace_root)?) .map_err(map_transport_error)?; + } else if config.is_secure_protocol() { + endpoint = endpoint + .tls_config(create_native_tls_config()?) + .map_err(map_transport_error)?; } if config.is_localhost() { @@ -92,22 +161,32 @@ impl RemoteClient for GrpcRemoteClient { ); } - self.channel = Some(endpoint.connect().await.map_err(map_transport_error)?); - self.compression = config.cache.compression; - self.instance_name = config.cache.instance_name.clone(); + self.config = config.to_owned(); + let enabled = self.extract_headers()?; + + // We can't inject auth headers into this initial connection, + // so defer the connection until a client is used + if self.config.is_bearer_auth() { + self.channel = Some(endpoint.connect_lazy()); + } else { + self.channel = Some(endpoint.connect().await.map_err(map_transport_error)?); + } - Ok(()) + Ok(enabled) } // https://github.com/bazelbuild/remote-apis/blob/main/build/bazel/remote/execution/v2/remote_execution.proto#L452 async fn load_capabilities(&self) -> miette::Result { - let mut client = CapabilitiesClient::new(self.channel.clone().unwrap()); + let mut client = + CapabilitiesClient::with_interceptor(self.channel.clone().unwrap(), |req| { + self.inject_auth_headers(req) + }); trace!("Loading remote execution API capabilities from gRPC server"); let response = client .get_capabilities(GetCapabilitiesRequest { - instance_name: self.instance_name.clone(), + instance_name: self.config.cache.instance_name.clone(), }) .await .map_err(map_status_error)?; @@ -117,13 +196,16 @@ impl RemoteClient for GrpcRemoteClient { // https://github.com/bazelbuild/remote-apis/blob/main/build/bazel/remote/execution/v2/remote_execution.proto#L170 async fn get_action_result(&self, digest: &Digest) -> miette::Result> { - let mut client = ActionCacheClient::new(self.channel.clone().unwrap()); + let mut client = + ActionCacheClient::with_interceptor(self.channel.clone().unwrap(), |req| { + self.inject_auth_headers(req) + }); trace!(hash = &digest.hash, "Checking for a cached action result"); match client .get_action_result(GetActionResultRequest { - instance_name: self.instance_name.clone(), + instance_name: self.config.cache.instance_name.clone(), action_digest: Some(digest.to_owned()), inline_stderr: true, inline_stdout: true, @@ -150,8 +232,20 @@ impl RemoteClient for GrpcRemoteClient { if matches!(status.code(), Code::NotFound) { trace!(hash = &digest.hash, "Cache miss on action result"); + Ok(None) + } + // If we hit an out of range error, the payload is larger than the grpc + // limit, and will fail the entire pipeline. Instead of letting that + // happen, let's just do a cache miss instead... + else if matches!(status.code(), Code::OutOfRange) { + trace!( + hash = &digest.hash, + "Cache miss because the expected payload is too large" + ); + Ok(None) } else { + // dbg!("get_action_result", digest); Err(map_status_error(status).into()) } } @@ -164,7 +258,10 @@ impl RemoteClient for GrpcRemoteClient { digest: &Digest, result: ActionResult, ) -> miette::Result> { - let mut client = ActionCacheClient::new(self.channel.clone().unwrap()); + let mut client = + ActionCacheClient::with_interceptor(self.channel.clone().unwrap(), |req| { + self.inject_auth_headers(req) + }); trace!( hash = &digest.hash, @@ -177,7 +274,7 @@ impl RemoteClient for GrpcRemoteClient { match client .update_action_result(UpdateActionResultRequest { - instance_name: self.instance_name.clone(), + instance_name: self.config.cache.instance_name.clone(), action_digest: Some(digest.to_owned()), action_result: Some(result), digest_function: digest_function::Value::Sha256 as i32, @@ -210,31 +307,55 @@ impl RemoteClient for GrpcRemoteClient { Ok(None) } else { + // dbg!("update_action_result", digest); Err(map_status_error(status).into()) } } } } + // https://github.com/bazelbuild/remote-apis/blob/main/build/bazel/remote/execution/v2/remote_execution.proto#L351 + async fn find_missing_blobs(&self, blob_digests: Vec) -> miette::Result> { + let mut client = ContentAddressableStorageClient::with_interceptor( + self.channel.clone().unwrap(), + |req| self.inject_auth_headers(req), + ); + + match client + .find_missing_blobs(FindMissingBlobsRequest { + instance_name: self.config.cache.instance_name.clone(), + blob_digests, + digest_function: digest_function::Value::Sha256 as i32, + }) + .await + { + Ok(response) => Ok(response.into_inner().missing_blob_digests), + Err(status) => Err(map_status_error(status).into()), + } + } + // https://github.com/bazelbuild/remote-apis/blob/main/build/bazel/remote/execution/v2/remote_execution.proto#L403 async fn batch_read_blobs( &self, digest: &Digest, blob_digests: Vec, ) -> miette::Result> { - let mut client = ContentAddressableStorageClient::new(self.channel.clone().unwrap()); + let mut client = ContentAddressableStorageClient::with_interceptor( + self.channel.clone().unwrap(), + |req| self.inject_auth_headers(req), + ); trace!( hash = &digest.hash, - compression = self.compression.to_string(), + compression = self.config.cache.compression.to_string(), "Downloading {} output blobs", blob_digests.len() ); let response = match client .batch_read_blobs(BatchReadBlobsRequest { - acceptable_compressors: get_acceptable_compressors(self.compression), - instance_name: self.instance_name.clone(), + acceptable_compressors: get_acceptable_compressors(self.config.cache.compression), + instance_name: self.config.cache.instance_name.clone(), digests: blob_digests, digest_function: digest_function::Value::Sha256 as i32, }) @@ -250,6 +371,7 @@ impl RemoteClient for GrpcRemoteClient { Ok(vec![]) } else { + // dbg!("batch_read_blobs", digest); Err(map_status_error(status).into()) }; } @@ -260,11 +382,19 @@ impl RemoteClient for GrpcRemoteClient { for download in response.into_inner().responses { if let Some(status) = download.status { - if status.code != 0 { + let code = Code::from_i32(status.code); + + if !matches!(code, Code::Ok | Code::NotFound) { warn!( + hash = &digest.hash, details = ?status.details, + code = ?code, "Failed to download blob: {}", - status.message + if status.message.is_empty() { + code.to_string() + } else { + status.message + } ); } } @@ -272,7 +402,10 @@ impl RemoteClient for GrpcRemoteClient { if let Some(digest) = download.digest { blobs.push(Blob { digest, - bytes: decompress_blob(self.compression, download.data)?, + bytes: decompress_blob( + get_compression_from_code(download.compressor), + download.data, + )?, }); } @@ -295,11 +428,14 @@ impl RemoteClient for GrpcRemoteClient { digest: &Digest, blobs: Vec, ) -> miette::Result>> { - let mut client = ContentAddressableStorageClient::new(self.channel.clone().unwrap()); + let mut client = ContentAddressableStorageClient::with_interceptor( + self.channel.clone().unwrap(), + |req| self.inject_auth_headers(req), + ); trace!( hash = &digest.hash, - compression = self.compression.to_string(), + compression = self.config.cache.compression.to_string(), "Uploading {} output blobs", blobs.len() ); @@ -309,14 +445,14 @@ impl RemoteClient for GrpcRemoteClient { for blob in blobs { requests.push(batch_update_blobs_request::Request { digest: Some(blob.digest), - data: compress_blob(self.compression, blob.bytes)?, - compressor: get_compressor(self.compression), + data: compress_blob(self.config.cache.compression, blob.bytes)?, + compressor: get_compressor(self.config.cache.compression), }); } let response = match client .batch_update_blobs(BatchUpdateBlobsRequest { - instance_name: self.instance_name.clone(), + instance_name: self.config.cache.instance_name.clone(), requests, digest_function: digest_function::Value::Sha256 as i32, }) @@ -342,6 +478,7 @@ impl RemoteClient for GrpcRemoteClient { Ok(vec![]) } else { + // dbg!("batch_update_blobs", digest); Err(map_status_error(status).into()) }; } @@ -352,11 +489,19 @@ impl RemoteClient for GrpcRemoteClient { for upload in response.into_inner().responses { if let Some(status) = upload.status { - if status.code != 0 { + let code = Code::from_i32(status.code); + + if !matches!(code, Code::Ok) { warn!( + hash = &digest.hash, details = ?status.details, + code = ?code, "Failed to upload blob: {}", - status.message + if status.message.is_empty() { + code.to_string() + } else { + status.message + } ); } } diff --git a/crates/remote/src/grpc_tls.rs b/crates/remote/src/grpc_tls.rs index b0c04437018..e6ac31db790 100644 --- a/crates/remote/src/grpc_tls.rs +++ b/crates/remote/src/grpc_tls.rs @@ -16,6 +16,10 @@ use tracing::trace; // - client `*.key` file // - domain name +pub fn create_native_tls_config() -> miette::Result { + Ok(ClientTlsConfig::new().with_enabled_roots()) +} + // https://github.com/hyperium/tonic/blob/master/examples/src/tls/client.rs pub fn create_tls_config( config: &RemoteTlsConfig, diff --git a/crates/remote/src/lib.rs b/crates/remote/src/lib.rs index 7042471680e..4afce306cd2 100644 --- a/crates/remote/src/lib.rs +++ b/crates/remote/src/lib.rs @@ -1,3 +1,4 @@ +mod action_state; mod compression; mod fs_digest; mod grpc_remote_client; @@ -8,6 +9,7 @@ mod remote_client; mod remote_error; mod remote_service; +pub use action_state::*; pub use bazel_remote_apis::build::bazel::remote::execution::v2::Digest; pub use fs_digest::*; pub use remote_error::*; diff --git a/crates/remote/src/remote_client.rs b/crates/remote/src/remote_client.rs index c83a9fbbd55..020011cf2b8 100644 --- a/crates/remote/src/remote_client.rs +++ b/crates/remote/src/remote_client.rs @@ -11,7 +11,7 @@ pub trait RemoteClient: Send + Sync { &mut self, config: &RemoteConfig, workspace_root: &Path, - ) -> miette::Result<()>; + ) -> miette::Result; async fn load_capabilities(&self) -> miette::Result; @@ -23,6 +23,8 @@ pub trait RemoteClient: Send + Sync { result: ActionResult, ) -> miette::Result>; + async fn find_missing_blobs(&self, blob_digests: Vec) -> miette::Result>; + async fn batch_read_blobs( &self, digest: &Digest, diff --git a/crates/remote/src/remote_error.rs b/crates/remote/src/remote_error.rs index 51217b3fbe5..aa83dfa6d09 100644 --- a/crates/remote/src/remote_error.rs +++ b/crates/remote/src/remote_error.rs @@ -5,15 +5,8 @@ use thiserror::Error; #[derive(Error, Debug, Diagnostic)] pub enum RemoteError { #[diagnostic(code(remote::grpc::call_failed))] - #[error("Failed to make gRPC call.")] - GrpcCallFailed { - #[source] - error: Box, - }, - - #[diagnostic(code(remote::grpc::call_failed))] - #[error("Failed to make gRPC call: {error}")] - GrpcCallFailedViaSource { error: String }, + #[error("Failed to make gRPC call.\n{}: {}", .error.code(), .error.message())] + GrpcCallFailed { error: Box }, #[diagnostic(code(remote::grpc::connect_failed))] #[error("Failed to connect to gRPC host.")] diff --git a/crates/remote/src/remote_service.rs b/crates/remote/src/remote_service.rs index 02e9e7b6b47..f8f2d47af69 100644 --- a/crates/remote/src/remote_service.rs +++ b/crates/remote/src/remote_service.rs @@ -2,15 +2,14 @@ use crate::compression::*; use crate::fs_digest::*; use crate::grpc_remote_client::GrpcRemoteClient; // use crate::http_remote_client::HttpRemoteClient; +use crate::action_state::ActionState; use crate::remote_client::RemoteClient; -use crate::RemoteError; use bazel_remote_apis::build::bazel::remote::execution::v2::{ - digest_function, ActionResult, Digest, ExecutedActionMetadata, ServerCapabilities, + digest_function, ActionResult, Digest, ServerCapabilities, }; use miette::IntoDiagnostic; -use moon_action::Operation; use moon_common::{color, is_ci}; -use moon_config::RemoteConfig; +use moon_config::{RemoteApi, RemoteCompression, RemoteConfig}; use rustc_hash::FxHashMap; use std::collections::BTreeMap; use std::path::{Path, PathBuf}; @@ -26,7 +25,6 @@ pub struct RemoteService { pub config: RemoteConfig, pub workspace_root: PathBuf, - action_results: scc::HashMap, cache_enabled: bool, capabilities: ServerCapabilities, client: Arc>, @@ -38,6 +36,10 @@ impl RemoteService { INSTANCE.get().cloned() } + pub fn is_enabled() -> bool { + INSTANCE.get().is_some_and(|remote| remote.cache_enabled) + } + #[instrument] pub async fn connect(config: &RemoteConfig, workspace_root: &Path) -> miette::Result<()> { if is_ci() && config.is_localhost() { @@ -55,38 +57,35 @@ impl RemoteService { ); info!("Please report any issues to GitHub or Discord"); - let mut client: Box = - if config.host.starts_with("http://") || config.host.starts_with("https://") { - // Box::new(HttpRemoteClient::default()) - return Err(RemoteError::NoHttpClient.into()); - } else if config.host.starts_with("grpc://") || config.host.starts_with("grpcs://") { - Box::new(GrpcRemoteClient::default()) - } else { - return Err(RemoteError::UnknownHostProtocol.into()); - }; - - client.connect_to_host(config, workspace_root).await?; + let mut client = match config.api { + RemoteApi::Grpc => Box::new(GrpcRemoteClient::default()), + }; let mut instance = Self { - action_results: scc::HashMap::default(), - capabilities: client.load_capabilities().await?, - cache_enabled: false, + cache_enabled: client.connect_to_host(config, workspace_root).await?, + capabilities: ServerCapabilities::default(), client: Arc::new(client), config: config.to_owned(), upload_requests: Arc::new(RwLock::new(vec![])), workspace_root: workspace_root.to_owned(), }; - instance.validate_capabilities()?; + instance.validate_capabilities().await?; let _ = INSTANCE.set(Arc::new(instance)); Ok(()) } - pub fn validate_capabilities(&mut self) -> miette::Result<()> { + pub async fn validate_capabilities(&mut self) -> miette::Result<()> { let host = &self.config.host; - let mut enabled = true; + let mut enabled = self.cache_enabled; + + if !enabled { + return Ok(()); + } + + self.capabilities = self.client.load_capabilities().await?; if let Some(cap) = &self.capabilities.cache_capabilities { let sha256_fn = digest_function::Value::Sha256 as i32; @@ -100,27 +99,32 @@ impl RemoteService { ); } - let compressor = get_compressor(self.config.cache.compression); + let compression = self.config.cache.compression; + let compressor = get_compressor(compression); - if !cap.supported_compressors.contains(&compressor) { + if compression != RemoteCompression::None + && !cap.supported_compressors.contains(&compressor) + { enabled = false; warn!( host, "Remote service does not support {} compression, but it has been configured and enabled through the {} setting", - compressor, - color::property("remote.cache.compression"), + compression, + color::property("unstable_remote.cache.compression"), ); } - if !cap.supported_batch_update_compressors.contains(&compressor) { + if compression != RemoteCompression::None + && !cap.supported_batch_update_compressors.contains(&compressor) + { enabled = false; warn!( host, "Remote service does not support {} compression for batching, but it has been configured and enabled through the {} setting", - compressor, - color::property("remote.cache.compression"), + compression, + color::property("unstable_remote.cache.compression"), ); } @@ -151,7 +155,8 @@ impl RemoteService { } pub fn get_max_batch_size(&self) -> i64 { - self.capabilities + let max = self + .capabilities .cache_capabilities .as_ref() .and_then(|cap| { @@ -161,104 +166,77 @@ impl RemoteService { Some(cap.max_batch_total_size_bytes) } }) - // grpc limit: 4mb - buffer - .unwrap_or(4194304 - (1024 * 10)) + // grpc limit: 4mb + .unwrap_or(4194304); + + // Subtract a chunk from the max size, because when down/uploading blobs, + // we need to account for the non-blob data in the request/response, like the + // compression level, digest strings, etc. All of these "add up" and can + // bump the total body size larger than the actual limit. Is there a better + // way to handle this? Probably... + max - (1024 * 10) } - #[instrument(skip(self))] - pub async fn is_operation_cached(&self, digest: &Digest) -> miette::Result { + #[instrument(skip(self, state))] + pub async fn is_action_cached( + &self, + state: &ActionState<'_>, + ) -> miette::Result> { if !self.cache_enabled { - return Ok(false); - } - - if self.action_results.contains_async(&digest.hash).await { - return Ok(true); - } - - if let Some(result) = self.client.get_action_result(digest).await? { - let _ = self - .action_results - .insert_async(digest.hash.clone(), result) - .await; - - return Ok(true); + return Ok(None); } - Ok(false) + self.client.get_action_result(&state.digest).await } - #[instrument(skip(self, operation))] - pub async fn save_operation( - &self, - digest: &Digest, - operation: &Operation, - ) -> miette::Result<()> { - if !self.cache_enabled || operation.has_failed() { + #[instrument(skip(self, state))] + pub async fn save_action(&self, state: &mut ActionState<'_>) -> miette::Result<()> { + if !self.cache_enabled { return Ok(()); } - let operation_label = operation.label().to_owned(); - - debug!( - hash = &digest.hash, - "Caching {} operation", - color::muted_light(&operation_label) - ); - - let result = self.create_action_result_from_operation(operation, None)?; - let digest = digest.to_owned(); - let client = Arc::clone(&self.client); - - self.upload_requests - .write() - .await - .push(tokio::spawn(async move { - if let Err(error) = client.update_action_result(&digest, result).await { - warn!( - hash = &digest.hash, - "Failed to cache {} operation: {}", - color::muted_light(operation_label), - color::muted_light(error.to_string()), - ); - } - })); + let missing = self + .client + .find_missing_blobs(vec![state.digest.clone()]) + .await?; + + if missing.contains(&state.digest) { + // Create on demand when needed, instead of always + state.create_action_from_task(); + + self.client + .batch_update_blobs( + &state.digest, + vec![Blob { + bytes: state.get_command_as_bytes()?, + digest: state.digest.clone(), + }], + ) + .await?; + } Ok(()) } - #[instrument(skip(self, operation, outputs))] - pub async fn save_operation_with_outputs( - &self, - digest: &Digest, - operation: &Operation, - mut outputs: OutputDigests, - ) -> miette::Result<()> { - if !self.cache_enabled || operation.has_failed() { + #[instrument(skip(self, state))] + pub async fn save_action_result(&self, state: &mut ActionState<'_>) -> miette::Result<()> { + if !self.cache_enabled { return Ok(()); } - let operation_label = operation.label().to_owned(); - - debug!( - hash = &digest.hash, - "Caching {} operation with outputs", - color::muted_light(&operation_label) - ); - - let mut result = self.create_action_result_from_operation(operation, Some(&mut outputs))?; - result.output_files = outputs.files; - result.output_symlinks = outputs.symlinks; - result.output_directories = outputs.dirs; + let Some((mut result, blobs)) = state.prepare_for_upload() else { + return Ok(()); + }; - let digest = digest.to_owned(); let client = Arc::clone(&self.client); + let digest = state.digest.clone(); let max_size = self.get_max_batch_size(); self.upload_requests .write() .await .push(tokio::spawn(async move { - if !outputs.blobs.is_empty() { + if !blobs.is_empty() { if let Some(metadata) = &mut result.execution_metadata { metadata.output_upload_start_timestamp = create_timestamp(SystemTime::now()); @@ -267,7 +245,7 @@ impl RemoteService { let upload_result = batch_upload_blobs( client.clone(), digest.clone(), - outputs.blobs, + blobs, max_size as usize, ) .await; @@ -285,8 +263,7 @@ impl RemoteService { if let Err(error) = client.update_action_result(&digest, result).await { warn!( hash = &digest.hash, - "Failed to cache {} operation: {}", - color::muted_light(operation_label), + "Failed to cache action result: {}", color::muted_light(error.to_string()), ); } @@ -295,65 +272,65 @@ impl RemoteService { Ok(()) } - #[instrument(skip(self, operation))] - pub async fn restore_operation( - &self, - digest: &Digest, - operation: &mut Operation, - ) -> miette::Result<()> { + #[instrument(skip(self, state))] + pub async fn restore_action_result(&self, state: &mut ActionState<'_>) -> miette::Result<()> { if !self.cache_enabled { return Ok(()); } - let Some(result) = self.action_results.get_async(&digest.hash).await else { + let Some(result) = &mut state.action_result else { return Ok(()); }; - let operation_label = operation.label().to_owned(); - let has_outputs = !result.output_files.is_empty() - || !result.output_symlinks.is_empty() - || !result.output_directories.is_empty(); - - if has_outputs { - debug!( - hash = &digest.hash, - "Restoring {} operation with outputs", - color::muted_light(&operation_label) - ); - } else { - debug!( - hash = &digest.hash, - "Restoring {} operation", - color::muted_light(&operation_label) - ); - } + batch_download_blobs( + self.client.clone(), + &state.digest, + result, + &self.workspace_root, + self.get_max_batch_size() as usize, + ) + .await?; - if let Some(output) = operation.get_output_mut() { - output.exit_code = Some(result.exit_code); + // The stderr/stdout blobs may not have been inlined, + // so we need to fetch them manually + let mut stdio_digests = vec![]; - if !result.stderr_raw.is_empty() { - output.set_stderr(String::from_utf8_lossy(&result.stderr_raw).into()); + if let Some(stderr_digest) = &result.stderr_digest { + if result.stderr_raw.is_empty() && stderr_digest.size_bytes > 0 { + stdio_digests.push(stderr_digest.to_owned()); } + } - if !result.stdout_raw.is_empty() { - output.set_stdout(String::from_utf8_lossy(&result.stdout_raw).into()); + if let Some(stdout_digest) = &result.stdout_digest { + if result.stdout_raw.is_empty() && stdout_digest.size_bytes > 0 { + stdio_digests.push(stdout_digest.to_owned()); } } - batch_download_blobs( - self.client.clone(), - digest, - &result, - &self.workspace_root, - self.get_max_batch_size() as usize, - ) - .await?; + if !stdio_digests.is_empty() { + for blob in self + .client + .batch_read_blobs(&state.digest, stdio_digests) + .await? + { + if result + .stderr_digest + .as_ref() + .is_some_and(|dig| dig == &blob.digest) + { + result.stderr_raw = blob.bytes; + continue; + } - debug!( - hash = &digest.hash, - "Restored {} operation", - color::muted_light(&operation_label) - ); + if result + .stdout_digest + .as_ref() + .is_some_and(|dig| dig == &blob.digest) + { + result.stdout_raw = blob.bytes; + } + } + } Ok(()) } @@ -368,46 +345,6 @@ impl RemoteService { let _ = future.await; } } - - fn create_action_result_from_operation( - &self, - operation: &Operation, - outputs: Option<&mut OutputDigests>, - ) -> miette::Result { - let mut result = ActionResult { - execution_metadata: Some(ExecutedActionMetadata { - worker: "moon".into(), - execution_start_timestamp: create_timestamp_from_naive(operation.started_at), - execution_completed_timestamp: operation - .finished_at - .and_then(create_timestamp_from_naive), - ..Default::default() - }), - ..Default::default() - }; - - if let Some(exec) = operation.get_output() { - result.exit_code = exec.exit_code.unwrap_or_default(); - - if let Some(outputs) = outputs { - if let Some(stderr) = &exec.stderr { - let blob = Blob::new(stderr.as_bytes().to_owned()); - - result.stderr_digest = Some(blob.digest.clone()); - outputs.blobs.push(blob); - } - - if let Some(stdout) = &exec.stdout { - let blob = Blob::new(stdout.as_bytes().to_owned()); - - result.stdout_digest = Some(blob.digest.clone()); - outputs.blobs.push(blob); - } - } - } - - Ok(result) - } } async fn batch_upload_blobs( @@ -557,7 +494,7 @@ fn partition_into_groups( // Try and find a partition that this item can go into for (index, group) in &groups { - if group.size + item_size < max_size { + if group.size + item_size <= max_size { index_to_use = *index; break; } @@ -568,12 +505,12 @@ fn partition_into_groups( index_to_use = groups.len() as i32; } - let entry = groups.entry(index_to_use).or_insert_with(|| Partition { + let group = groups.entry(index_to_use).or_insert_with(|| Partition { items: vec![], size: 0, }); - entry.size += item_size; - entry.items.push(item); + group.size += item_size; + group.items.push(item); } groups diff --git a/crates/task-expander/tests/task_expander_test.rs b/crates/task-expander/tests/task_expander_test.rs index a5c55a08a69..76d3f6220ee 100644 --- a/crates/task-expander/tests/task_expander_test.rs +++ b/crates/task-expander/tests/task_expander_test.rs @@ -26,9 +26,7 @@ mod task_expander { task.input_files.insert("project/source/out".into()); let context = create_context(sandbox.path()); - let task = TaskExpander::new(&project, &context) - .expand(&mut task) - .unwrap(); + let task = TaskExpander::new(&project, &context).expand(&task).unwrap(); assert!(task.input_files.is_empty()); assert_eq!( @@ -48,9 +46,7 @@ mod task_expander { task.input_globs.insert("project/source/out/**/*".into()); let context = create_context(sandbox.path()); - let task = TaskExpander::new(&project, &context) - .expand(&mut task) - .unwrap(); + let task = TaskExpander::new(&project, &context).expand(&task).unwrap(); assert!(task.input_globs.is_empty()); assert_eq!( @@ -72,9 +68,7 @@ mod task_expander { task.inputs = vec![InputPath::ProjectFile("dir".into())]; let context = create_context(sandbox.path()); - let task = TaskExpander::new(&project, &context) - .expand(&mut task) - .unwrap(); + let task = TaskExpander::new(&project, &context).expand(&task).unwrap(); assert!(task.input_files.is_empty()); assert_eq!( diff --git a/crates/task-runner/src/output_archiver.rs b/crates/task-runner/src/output_archiver.rs index 2541cc0ee1f..3d500672963 100644 --- a/crates/task-runner/src/output_archiver.rs +++ b/crates/task-runner/src/output_archiver.rs @@ -1,10 +1,9 @@ use crate::task_runner_error::TaskRunnerError; -use moon_action::Operation; use moon_api::Moonbase; use moon_app_context::AppContext; use moon_common::color; use moon_project::Project; -use moon_remote::{compute_digests_for_outputs, Digest, RemoteService}; +use moon_remote::{ActionState, RemoteService}; use moon_task::{TargetError, TargetScope, Task}; use starbase_archive::tar::TarPacker; use starbase_archive::Archiver; @@ -21,11 +20,11 @@ pub struct OutputArchiver<'task> { } impl OutputArchiver<'_> { - #[instrument(skip(self, operation))] + #[instrument(skip(self, remote_state))] pub async fn archive( &self, - digest: &Digest, - operation: Option<&Operation>, + hash: &str, + remote_state: Option<&mut ActionState<'_>>, ) -> miette::Result> { if !self.is_archivable()? { return Ok(None); @@ -40,34 +39,31 @@ impl OutputArchiver<'_> { } // If so, create and pack the archive! - let archive_file = self.app.cache_engine.hash.get_archive_path(&digest.hash); + let archive_file = self.app.cache_engine.hash.get_archive_path(hash); if !archive_file.exists() { if self.app.cache_engine.is_writable() { debug!( task_target = self.task.target.as_str(), - hash = &digest.hash, - "Archiving task outputs from project" + hash, "Archiving task outputs from project" ); - self.create_local_archive(&digest.hash, &archive_file)?; + self.create_local_archive(hash, &archive_file)?; if archive_file.exists() { - self.upload_to_remote_storage(&digest.hash, &archive_file) - .await?; + self.upload_to_remote_storage(hash, &archive_file).await?; } } else { debug!( task_target = self.task.target.as_str(), - hash = &digest.hash, - "Cache is not writable, skipping output archiving" + hash, "Cache is not writable, skipping output archiving" ); } } // Then cache the result in the remote service - if let Some(operation) = operation { - self.upload_to_remote_service(digest, operation).await?; + if let Some(state) = remote_state { + self.upload_to_remote_service(state).await?; } Ok(if archive_file.exists() { @@ -208,21 +204,13 @@ impl OutputArchiver<'_> { Ok(()) } - #[instrument(skip(self, operation))] - async fn upload_to_remote_service( - &self, - digest: &Digest, - operation: &Operation, - ) -> miette::Result<()> { + #[instrument(skip(self, state))] + async fn upload_to_remote_service(&self, state: &mut ActionState<'_>) -> miette::Result<()> { if let Some(remote) = RemoteService::session() { - let output_digests = compute_digests_for_outputs( - self.task.get_output_files(&self.app.workspace_root, true)?, - &self.app.workspace_root, - )?; + state.compute_outputs(&self.app.workspace_root)?; - remote - .save_operation_with_outputs(digest, operation, output_digests) - .await?; + remote.save_action(state).await?; + remote.save_action_result(state).await?; } Ok(()) diff --git a/crates/task-runner/src/output_hydrater.rs b/crates/task-runner/src/output_hydrater.rs index 209fa7b71a1..e4a4123cf85 100644 --- a/crates/task-runner/src/output_hydrater.rs +++ b/crates/task-runner/src/output_hydrater.rs @@ -1,9 +1,7 @@ -use crate::run_state::read_stdlog_state_files; -use moon_action::Operation; use moon_api::Moonbase; use moon_app_context::AppContext; use moon_common::color; -use moon_remote::{Digest, RemoteService}; +use moon_remote::{ActionState, RemoteService}; use moon_task::Task; use starbase_archive::tar::TarUnpacker; use starbase_archive::Archiver; @@ -25,75 +23,59 @@ pub struct OutputHydrater<'task> { } impl OutputHydrater<'_> { - #[instrument(skip(self, operation))] + #[instrument(skip(self, remote_state))] pub async fn hydrate( &self, from: HydrateFrom, - digest: &Digest, - operation: &mut Operation, + hash: &str, + remote_state: Option<&mut ActionState<'_>>, ) -> miette::Result { - let result = match from { + match from { // Only hydrate when the hash is different from the previous build, // as we can assume the outputs from the previous build still exist? HydrateFrom::PreviousOutput => Ok(true), // Based on the remote execution APIs - HydrateFrom::RemoteCache => self.download_from_remote_service(digest, operation).await, + HydrateFrom::RemoteCache => { + if let Some(state) = remote_state { + self.download_from_remote_service(state).await + } else { + Ok(false) + } + } // Otherwise write to local cache, then download archive from moonbase HydrateFrom::LocalCache | HydrateFrom::Moonbase => { - let archive_file = self.app.cache_engine.hash.get_archive_path(&digest.hash); + let archive_file = self.app.cache_engine.hash.get_archive_path(hash); let mut hydrated = false; if self.app.cache_engine.is_readable() { debug!( task_target = self.task.target.as_str(), - hash = &digest.hash, - "Hydrating cached outputs into project" + hash, "Hydrating cached outputs into project" ); // Attempt to download from remote cache to `.moon/outputs/` if !archive_file.exists() && matches!(from, HydrateFrom::Moonbase) { - self.download_from_remote_storage(&digest.hash, &archive_file) + self.download_from_remote_storage(hash, &archive_file) .await?; } // Otherwise hydrate the cached archive into the task's outputs if archive_file.exists() { - self.unpack_local_archive(&digest.hash, &archive_file)?; + self.unpack_local_archive(hash, &archive_file)?; hydrated = true } } else { debug!( task_target = self.task.target.as_str(), - hash = &digest.hash, - "Cache is not readable, skipping output hydration" + hash, "Cache is not readable, skipping output hydration" ); } Ok(hydrated) } - }; - - match result { - Ok(hydrated) => { - // If not from the remote cache, we need to read the - // locally cached stdout/stderr into the operation - // so that it can be replayed in the console - if !matches!(from, HydrateFrom::RemoteCache) { - read_stdlog_state_files( - self.app - .cache_engine - .state - .get_target_dir(&self.task.target), - operation, - )?; - } - - Ok(hydrated) - } - Err(error) => Err(error), } } @@ -150,14 +132,13 @@ impl OutputHydrater<'_> { Ok(()) } - #[instrument(skip(self, operation))] + #[instrument(skip(self, state))] async fn download_from_remote_service( &self, - digest: &Digest, - operation: &mut Operation, + state: &mut ActionState<'_>, ) -> miette::Result { if let Some(remote) = RemoteService::session() { - remote.restore_operation(digest, operation).await?; + remote.restore_action_result(state).await?; return Ok(true); } diff --git a/crates/task-runner/src/run_state.rs b/crates/task-runner/src/run_state.rs index 07d2abbc0a8..6cb7d9d83e2 100644 --- a/crates/task-runner/src/run_state.rs +++ b/crates/task-runner/src/run_state.rs @@ -1,7 +1,4 @@ -use moon_action::Operation; use moon_cache_item::cache_item; -use starbase_utils::fs; -use std::path::PathBuf; cache_item!( pub struct TaskRunCacheState { @@ -11,54 +8,3 @@ cache_item!( pub target: String, } ); - -pub fn read_stdlog_state_files( - state_dir: PathBuf, - operation: &mut Operation, -) -> miette::Result<()> { - if let Some(output) = operation.get_output_mut() { - let err_path = state_dir.join("stderr.log"); - let out_path = state_dir.join("stdout.log"); - - if err_path.exists() { - output.set_stderr(fs::read_file(err_path)?); - } - - if out_path.exists() { - output.set_stdout(fs::read_file(out_path)?); - } - } - - Ok(()) -} - -pub fn write_stdlog_state_files(state_dir: PathBuf, operation: &Operation) -> miette::Result<()> { - let err_path = state_dir.join("stderr.log"); - let out_path = state_dir.join("stdout.log"); - - if let Some(output) = operation.get_output() { - fs::write_file( - err_path, - output - .stderr - .as_ref() - .map(|log| log.as_bytes()) - .unwrap_or_default(), - )?; - - fs::write_file( - out_path, - output - .stdout - .as_ref() - .map(|log| log.as_bytes()) - .unwrap_or_default(), - )?; - } else { - // Ensure logs from a previous run are removed - fs::remove_file(err_path)?; - fs::remove_file(out_path)?; - } - - Ok(()) -} diff --git a/crates/task-runner/src/task_runner.rs b/crates/task-runner/src/task_runner.rs index 373293137c8..b01779a4dc7 100644 --- a/crates/task-runner/src/task_runner.rs +++ b/crates/task-runner/src/task_runner.rs @@ -13,7 +13,7 @@ use moon_console::TaskReportItem; use moon_platform::PlatformManager; use moon_process::ProcessError; use moon_project::Project; -use moon_remote::{Digest, RemoteService}; +use moon_remote::{ActionState, Digest, RemoteService}; use moon_task::Task; use moon_task_hasher::TaskHasher; use moon_time::{is_stale, now_millis}; @@ -39,10 +39,11 @@ pub struct TaskRunner<'task> { hydrater: OutputHydrater<'task>, // Public for testing - pub action_digest: Digest, pub cache: CacheItem, pub operations: OperationList, + pub remote_state: Option>, pub report_item: TaskReportItem, + pub target_state: Option, } impl<'task> TaskRunner<'task> { @@ -68,17 +69,15 @@ impl<'task> TaskRunner<'task> { Ok(Self { cache, archiver: OutputArchiver { app, project, task }, - action_digest: Digest { - hash: String::new(), - size_bytes: 0, - }, hydrater: OutputHydrater { app, task }, platform_manager: PlatformManager::read(), project, + remote_state: None, report_item: TaskReportItem { output_style: task.options.output_style, ..Default::default() }, + target_state: None, task, app, operations: OperationList::default(), @@ -96,7 +95,7 @@ impl<'task> TaskRunner<'task> { ) -> miette::Result> { // If a dependency has failed or been skipped, we should skip this task if !self.is_dependencies_complete(context)? { - self.skip(context)?; + self.skip()?; return Ok(None); } @@ -110,10 +109,8 @@ impl<'task> TaskRunner<'task> { let hash = self.generate_hash(context, node).await?; - self.report_item.hash = Some(hash.clone()); - // Exit early if this build has already been cached/hashed - if self.hydrate(context, &hash).await? { + if self.hydrate(&hash).await? { return Ok(Some(hash)); } @@ -152,6 +149,11 @@ impl<'task> TaskRunner<'task> { match result { Ok(maybe_hash) => { + context.set_target_state( + &self.task.target, + self.target_state.take().unwrap_or(TargetState::Passthrough), + ); + self.report_item.hash = maybe_hash.clone(); self.app.console.reporter.on_task_completed( @@ -168,6 +170,11 @@ impl<'task> TaskRunner<'task> { }) } Err(error) => { + context.set_target_state( + &self.task.target, + self.target_state.take().unwrap_or(TargetState::Failed), + ); + self.inject_failed_task_execution(Some(&error))?; self.app.console.reporter.on_task_completed( @@ -308,13 +315,15 @@ impl<'task> TaskRunner<'task> { } // Check if the outputs have been cached in the remote service - if let Some(remote) = RemoteService::session() { - if remote.is_operation_cached(&self.action_digest).await? { + if let (Some(state), Some(remote)) = (&mut self.remote_state, RemoteService::session()) { + if let Some(result) = remote.is_action_cached(state).await? { debug!( task_target = self.task.target.as_str(), hash, "Cache hit in remote service, will attempt to download output blobs" ); + state.set_action_result(result); + return Ok(Some(HydrateFrom::RemoteCache)); } } @@ -436,7 +445,7 @@ impl<'task> TaskRunner<'task> { hasher.hash_content(task_hasher.hash())?; - // Hash platform fields + // Hash toolchain fields trace!( task_target = self.task.target.as_str(), toolchains = ?self.task.toolchains.iter().map(|tc| tc.as_str()).collect::>(), @@ -459,11 +468,17 @@ impl<'task> TaskRunner<'task> { operation.finish(ActionStatus::Passed); self.operations.push(operation); - - self.action_digest = Digest { - hash: hash.clone(), - size_bytes: size_bytes as i64, - }; + self.report_item.hash = Some(hash.clone()); + + if RemoteService::is_enabled() { + self.remote_state = Some(ActionState::new( + Digest { + hash: hash.clone(), + size_bytes: size_bytes as i64, + }, + self.task, + )); + } debug!( task_target = self.task.target.as_str(), @@ -482,7 +497,7 @@ impl<'task> TaskRunner<'task> { ) -> miette::Result<()> { // If the task is a no-operation, we should exit early if self.task.is_no_op() { - self.skip_no_op(context)?; + self.skip_no_op()?; return Ok(()); } @@ -530,15 +545,20 @@ impl<'task> TaskRunner<'task> { executor.execute(context, &mut self.report_item).await? }; + // Persist the state locally and for the remote service if let Some(last_attempt) = result.attempts.get_last_execution() { self.persist_state(last_attempt)?; + + if let Some(state) = &mut self.remote_state { + state.create_action_result_from_operation(last_attempt)?; + } } // Extract the attempts from the result self.operations.merge(result.attempts); // Update the action state based on the result - context.set_target_state(&self.task.target, result.run_state); + self.target_state = Some(result.run_state); // If the execution as a whole failed, return the error. // We do this here instead of in `execute` so that we can @@ -564,8 +584,8 @@ impl<'task> TaskRunner<'task> { Ok(()) } - #[instrument(skip_all)] - pub fn skip(&mut self, context: &ActionContext) -> miette::Result<()> { + #[instrument(skip(self))] + pub fn skip(&mut self) -> miette::Result<()> { debug!(task_target = self.task.target.as_str(), "Skipping task"); self.operations.push(Operation::new_finished( @@ -573,13 +593,13 @@ impl<'task> TaskRunner<'task> { ActionStatus::Skipped, )); - context.set_target_state(&self.task.target, TargetState::Skipped); + self.target_state = Some(TargetState::Skipped); Ok(()) } - #[instrument(skip(self, context))] - pub fn skip_no_op(&mut self, context: &ActionContext) -> miette::Result<()> { + #[instrument(skip(self))] + pub fn skip_no_op(&mut self) -> miette::Result<()> { debug!( task_target = self.task.target.as_str(), "Skipping task as its a no-operation" @@ -590,10 +610,7 @@ impl<'task> TaskRunner<'task> { ActionStatus::Passed, )); - context.set_target_state( - &self.task.target, - TargetState::from_hash(self.report_item.hash.as_deref()), - ); + self.target_state = Some(TargetState::from_hash(self.report_item.hash.as_deref())); Ok(()) } @@ -609,7 +626,7 @@ impl<'task> TaskRunner<'task> { let archived = match self .archiver - .archive(&self.action_digest, self.operations.get_last_execution()) + .archive(hash, self.remote_state.as_mut()) .await? { Some(archive_file) => { @@ -640,8 +657,8 @@ impl<'task> TaskRunner<'task> { Ok(archived) } - #[instrument(skip(self, context))] - pub async fn hydrate(&mut self, context: &ActionContext, hash: &str) -> miette::Result { + #[instrument(skip(self))] + pub async fn hydrate(&mut self, hash: &str) -> miette::Result { let mut operation = Operation::output_hydration(); // Not cached @@ -667,7 +684,7 @@ impl<'task> TaskRunner<'task> { if !self .hydrater - .hydrate(from, &self.action_digest, &mut operation) + .hydrate(from, hash, self.remote_state.as_mut()) .await? { debug!(task_target = self.task.target.as_str(), "Did not hydrate"); @@ -688,7 +705,45 @@ impl<'task> TaskRunner<'task> { // Fill in these values since the command executor does not run! if let Some(output) = operation.get_output_mut() { output.command = Some(self.task.get_command_line()); - output.exit_code = Some(self.cache.data.exit_code); + + // If we received an action result from the remote cache, + // extract the logs from it + if let Some(result) = self + .remote_state + .as_ref() + .and_then(|state| state.action_result.as_ref()) + { + output.exit_code = Some(result.exit_code); + + if !result.stderr_raw.is_empty() { + output.set_stderr(String::from_utf8_lossy(&result.stderr_raw).into()); + } + + if !result.stdout_raw.is_empty() { + output.set_stdout(String::from_utf8_lossy(&result.stdout_raw).into()); + } + } + // If not from the remote cache, we need to read the locally + // cached stdout/stderr log files + else { + output.exit_code = Some(self.cache.data.exit_code); + + let state_dir = self + .app + .cache_engine + .state + .get_target_dir(&self.task.target); + let err_path = state_dir.join("stderr.log"); + let out_path = state_dir.join("stdout.log"); + + if err_path.exists() { + output.set_stderr(fs::read_file(err_path)?); + } + + if out_path.exists() { + output.set_stdout(fs::read_file(out_path)?); + } + } } // Then finalize the operation and target state @@ -699,9 +754,8 @@ impl<'task> TaskRunner<'task> { self.persist_state(&operation)?; - context.set_target_state(&self.task.target, TargetState::Passed(hash.to_owned())); - self.operations.push(operation); + self.target_state = Some(TargetState::Passed(hash.to_owned())); Ok(true) } @@ -748,16 +802,38 @@ impl<'task> TaskRunner<'task> { } fn persist_state(&mut self, operation: &Operation) -> miette::Result<()> { - write_stdlog_state_files( - self.app - .cache_engine - .state - .get_target_dir(&self.task.target), - operation, - )?; + let state_dir = self + .app + .cache_engine + .state + .get_target_dir(&self.task.target); + let err_path = state_dir.join("stderr.log"); + let out_path = state_dir.join("stdout.log"); if let Some(output) = operation.get_output() { self.cache.data.exit_code = output.get_exit_code(); + + fs::write_file( + err_path, + output + .stderr + .as_ref() + .map(|log| log.as_bytes()) + .unwrap_or_default(), + )?; + + fs::write_file( + out_path, + output + .stdout + .as_ref() + .map(|log| log.as_bytes()) + .unwrap_or_default(), + )?; + } else { + // Ensure logs from a previous run are removed + fs::remove_file(err_path)?; + fs::remove_file(out_path)?; } Ok(()) diff --git a/crates/task-runner/tests/output_archiver_test.rs b/crates/task-runner/tests/output_archiver_test.rs index 318d253e134..3f0286b26ab 100644 --- a/crates/task-runner/tests/output_archiver_test.rs +++ b/crates/task-runner/tests/output_archiver_test.rs @@ -1,7 +1,6 @@ mod utils; use moon_cache::CacheMode; -use moon_remote::Digest; use moon_task::Target; use starbase_archive::Archiver; use std::env; @@ -9,13 +8,6 @@ use std::fs; use std::sync::Arc; use utils::*; -fn stub_digest() -> Digest { - Digest { - hash: "hash123".into(), - size_bytes: 0, - } -} - mod output_archiver { use super::*; @@ -26,9 +18,8 @@ mod output_archiver { async fn does_nothing_if_no_outputs_in_task() { let container = TaskRunnerContainer::new("archive", "no-outputs").await; let archiver = container.create_archiver(); - let digest = stub_digest(); - assert!(archiver.archive(&digest, None).await.unwrap().is_none()); + assert!(archiver.archive("hash123", None).await.unwrap().is_none()); } #[tokio::test] @@ -36,9 +27,8 @@ mod output_archiver { async fn errors_if_outputs_not_created() { let container = TaskRunnerContainer::new("archive", "file-outputs").await; let archiver = container.create_archiver(); - let digest = stub_digest(); - archiver.archive(&digest, None).await.unwrap(); + archiver.archive("hash123", None).await.unwrap(); } #[tokio::test] @@ -47,9 +37,8 @@ mod output_archiver { container.sandbox.create_file("project/file.txt", ""); let archiver = container.create_archiver(); - let digest = stub_digest(); - assert!(archiver.archive(&digest, None).await.unwrap().is_some()); + assert!(archiver.archive("hash123", None).await.unwrap().is_some()); assert!(container .sandbox .path() @@ -66,9 +55,8 @@ mod output_archiver { .create_file(".moon/cache/outputs/hash123.tar.gz", ""); let archiver = container.create_archiver(); - let digest = stub_digest(); - let file = archiver.archive(&digest, None).await.unwrap().unwrap(); + let file = archiver.archive("hash123", None).await.unwrap().unwrap(); assert_eq!(fs::metadata(file).unwrap().len(), 0); } @@ -79,14 +67,13 @@ mod output_archiver { container.sandbox.create_file("project/file.txt", ""); let archiver = container.create_archiver(); - let digest = stub_digest(); container .app_context .cache_engine .force_mode(CacheMode::Off); - assert!(archiver.archive(&digest, None).await.unwrap().is_none()); + assert!(archiver.archive("hash123", None).await.unwrap().is_none()); env::remove_var("MOON_CACHE"); } @@ -97,14 +84,13 @@ mod output_archiver { container.sandbox.create_file("project/file.txt", ""); let archiver = container.create_archiver(); - let digest = stub_digest(); container .app_context .cache_engine .force_mode(CacheMode::Read); - assert!(archiver.archive(&digest, None).await.unwrap().is_none()); + assert!(archiver.archive("hash123", None).await.unwrap().is_none()); env::remove_var("MOON_CACHE"); } @@ -115,9 +101,8 @@ mod output_archiver { container.sandbox.create_file("project/file.txt", ""); let archiver = container.create_archiver(); - let digest = stub_digest(); - let file = archiver.archive(&digest, None).await.unwrap().unwrap(); + let file = archiver.archive("hash123", None).await.unwrap().unwrap(); let dir = container.sandbox.path().join("out"); Archiver::new(&dir, &file).unpack_from_ext().unwrap(); @@ -133,9 +118,8 @@ mod output_archiver { container.sandbox.create_file("project/three.txt", ""); let archiver = container.create_archiver(); - let digest = stub_digest(); - let file = archiver.archive(&digest, None).await.unwrap().unwrap(); + let file = archiver.archive("hash123", None).await.unwrap().unwrap(); let dir = container.sandbox.path().join("out"); Archiver::new(&dir, &file).unpack_from_ext().unwrap(); @@ -157,9 +141,8 @@ mod output_archiver { container.sandbox.create_file("project/file.txt", ""); let archiver = container.create_archiver(); - let digest = stub_digest(); - let file = archiver.archive(&digest, None).await.unwrap().unwrap(); + let file = archiver.archive("hash123", None).await.unwrap().unwrap(); let dir = container.sandbox.path().join("out"); Archiver::new(&dir, &file).unpack_from_ext().unwrap(); @@ -181,9 +164,8 @@ mod output_archiver { container.sandbox.create_file("project/c.txt", ""); let archiver = container.create_archiver(); - let digest = stub_digest(); - let file = archiver.archive(&digest, None).await.unwrap().unwrap(); + let file = archiver.archive("hash123", None).await.unwrap().unwrap(); let dir = container.sandbox.path().join("out"); Archiver::new(&dir, &file).unpack_from_ext().unwrap(); @@ -201,9 +183,8 @@ mod output_archiver { container.sandbox.create_file("project/c.txt", ""); let archiver = container.create_archiver(); - let digest = stub_digest(); - let file = archiver.archive(&digest, None).await.unwrap().unwrap(); + let file = archiver.archive("hash123", None).await.unwrap().unwrap(); let dir = container.sandbox.path().join("out"); Archiver::new(&dir, &file).unpack_from_ext().unwrap(); @@ -219,9 +200,8 @@ mod output_archiver { container.sandbox.create_file("project/file.txt", ""); let archiver = container.create_archiver(); - let digest = stub_digest(); - let file = archiver.archive(&digest, None).await.unwrap().unwrap(); + let file = archiver.archive("hash123", None).await.unwrap().unwrap(); let dir = container.sandbox.path().join("out"); Archiver::new(&dir, &file).unpack_from_ext().unwrap(); @@ -237,9 +217,8 @@ mod output_archiver { container.sandbox.create_file("project/c.txt", ""); let archiver = container.create_archiver(); - let digest = stub_digest(); - let file = archiver.archive(&digest, None).await.unwrap().unwrap(); + let file = archiver.archive("hash123", None).await.unwrap().unwrap(); let dir = container.sandbox.path().join("out"); Archiver::new(&dir, &file).unpack_from_ext().unwrap(); @@ -255,9 +234,8 @@ mod output_archiver { container.sandbox.create_file("project/dir/file.txt", ""); let archiver = container.create_archiver(); - let digest = stub_digest(); - let file = archiver.archive(&digest, None).await.unwrap().unwrap(); + let file = archiver.archive("hash123", None).await.unwrap().unwrap(); let dir = container.sandbox.path().join("out"); Archiver::new(&dir, &file).unpack_from_ext().unwrap(); @@ -273,9 +251,8 @@ mod output_archiver { container.sandbox.create_file("project/c/file.txt", ""); let archiver = container.create_archiver(); - let digest = stub_digest(); - let file = archiver.archive(&digest, None).await.unwrap().unwrap(); + let file = archiver.archive("hash123", None).await.unwrap().unwrap(); let dir = container.sandbox.path().join("out"); Archiver::new(&dir, &file).unpack_from_ext().unwrap(); @@ -292,9 +269,8 @@ mod output_archiver { container.sandbox.create_file("project/dir/file.txt", ""); let archiver = container.create_archiver(); - let digest = stub_digest(); - let file = archiver.archive(&digest, None).await.unwrap().unwrap(); + let file = archiver.archive("hash123", None).await.unwrap().unwrap(); let dir = container.sandbox.path().join("out"); Archiver::new(&dir, &file).unpack_from_ext().unwrap(); @@ -311,9 +287,8 @@ mod output_archiver { container.sandbox.create_file("shared/z.txt", ""); let archiver = container.create_archiver(); - let digest = stub_digest(); - let file = archiver.archive(&digest, None).await.unwrap().unwrap(); + let file = archiver.archive("hash123", None).await.unwrap().unwrap(); let dir = container.sandbox.path().join("out"); Archiver::new(&dir, &file).unpack_from_ext().unwrap(); @@ -331,9 +306,8 @@ mod output_archiver { container.sandbox.create_file("project/file.txt", ""); let archiver = container.create_archiver(); - let digest = stub_digest(); - let file = archiver.archive(&digest, None).await.unwrap().unwrap(); + let file = archiver.archive("hash123", None).await.unwrap().unwrap(); let dir = container.sandbox.path().join("out"); Archiver::new(&dir, &file).unpack_from_ext().unwrap(); diff --git a/crates/task-runner/tests/output_hydrater_test.rs b/crates/task-runner/tests/output_hydrater_test.rs index 82b8fe2b1fa..80b3150f230 100644 --- a/crates/task-runner/tests/output_hydrater_test.rs +++ b/crates/task-runner/tests/output_hydrater_test.rs @@ -1,23 +1,10 @@ mod utils; -use moon_action::Operation; use moon_cache::CacheMode; -use moon_remote::Digest; use moon_task_runner::output_hydrater::HydrateFrom; use std::env; use utils::*; -fn stub_digest() -> Digest { - Digest { - hash: "hash123".into(), - size_bytes: 0, - } -} - -fn stub_operation() -> Operation { - Operation::output_hydration() -} - mod output_hydrater { use super::*; @@ -36,11 +23,9 @@ mod output_hydrater { async fn does_nothing_if_from_prev_outputs() { let container = TaskRunnerContainer::new("archive", "file-outputs").await; let hydrater = container.create_hydrator(); - let digest = stub_digest(); - let mut operation = stub_operation(); assert!(hydrater - .hydrate(HydrateFrom::PreviousOutput, &digest, &mut operation) + .hydrate(HydrateFrom::PreviousOutput, "hash123", None) .await .unwrap()); } @@ -53,8 +38,6 @@ mod output_hydrater { .create_file(".moon/cache/outputs/hash123.tar.gz", ""); let hydrater = container.create_hydrator(); - let digest = stub_digest(); - let mut operation = stub_operation(); container .app_context @@ -62,7 +45,7 @@ mod output_hydrater { .force_mode(CacheMode::Off); assert!(!hydrater - .hydrate(HydrateFrom::LocalCache, &digest, &mut operation) + .hydrate(HydrateFrom::LocalCache, "hash123", None) .await .unwrap()); @@ -77,8 +60,6 @@ mod output_hydrater { .create_file(".moon/cache/outputs/hash123.tar.gz", ""); let hydrater = container.create_hydrator(); - let digest = stub_digest(); - let mut operation = stub_operation(); container .app_context @@ -86,7 +67,7 @@ mod output_hydrater { .force_mode(CacheMode::Write); assert!(!hydrater - .hydrate(HydrateFrom::LocalCache, &digest, &mut operation) + .hydrate(HydrateFrom::LocalCache, "hash123", None) .await .unwrap()); @@ -101,11 +82,9 @@ mod output_hydrater { assert!(!container.sandbox.path().join("project/file.txt").exists()); let hydrater = container.create_hydrator(); - let digest = stub_digest(); - let mut operation = stub_operation(); hydrater - .hydrate(HydrateFrom::LocalCache, &digest, &mut operation) + .hydrate(HydrateFrom::LocalCache, "hash123", None) .await .unwrap(); @@ -124,11 +103,9 @@ mod output_hydrater { .exists()); let hydrater = container.create_hydrator(); - let digest = stub_digest(); - let mut operation = stub_operation(); hydrater - .hydrate(HydrateFrom::LocalCache, &digest, &mut operation) + .hydrate(HydrateFrom::LocalCache, "hash123", None) .await .unwrap(); diff --git a/crates/task-runner/tests/task_runner_test.rs b/crates/task-runner/tests/task_runner_test.rs index b4aa77a9dcd..45453ba0652 100644 --- a/crates/task-runner/tests/task_runner_test.rs +++ b/crates/task-runner/tests/task_runner_test.rs @@ -3,7 +3,6 @@ mod utils; use moon_action::ActionStatus; use moon_action_context::*; use moon_cache::CacheMode; -use moon_remote::Digest; use moon_task::Target; use moon_task_runner::output_hydrater::HydrateFrom; use moon_task_runner::TaskRunner; @@ -11,13 +10,6 @@ use moon_time::now_millis; use std::env; use utils::*; -fn stub_digest() -> Digest { - Digest { - hash: "hash123".into(), - size_bytes: 0, - } -} - mod task_runner { use super::*; @@ -709,7 +701,6 @@ mod task_runner { fn setup_exec_state(runner: &mut TaskRunner) { runner.report_item.hash = Some("hash123".into()); - runner.action_digest = stub_digest(); } #[tokio::test] @@ -726,11 +717,7 @@ mod task_runner { runner.execute(&context, &node).await.unwrap(); assert_eq!( - context - .target_states - .get(&runner.task.target) - .unwrap() - .get(), + runner.target_state.as_ref().unwrap(), &TargetState::Passed("hash123".into()) ); } @@ -747,11 +734,7 @@ mod task_runner { runner.execute(&context, &node).await.unwrap(); assert_eq!( - context - .target_states - .get(&runner.task.target) - .unwrap() - .get(), + runner.target_state.as_ref().unwrap(), &TargetState::Passthrough ); } @@ -770,14 +753,7 @@ mod task_runner { // Swallow panic so we can check operations let _ = runner.execute(&context, &node).await; - assert_eq!( - context - .target_states - .get(&runner.task.target) - .unwrap() - .get(), - &TargetState::Failed - ); + assert_eq!(runner.target_state.as_ref().unwrap(), &TargetState::Failed); } #[tokio::test] @@ -895,9 +871,8 @@ mod task_runner { async fn creates_an_operation() { let container = TaskRunnerContainer::new("runner", "base").await; let mut runner = container.create_runner(); - let context = ActionContext::default(); - runner.skip(&context).unwrap(); + runner.skip().unwrap(); let operation = runner.operations.last().unwrap(); @@ -909,18 +884,10 @@ mod task_runner { async fn sets_skipped_state() { let container = TaskRunnerContainer::new("runner", "base").await; let mut runner = container.create_runner(); - let context = ActionContext::default(); - runner.skip(&context).unwrap(); + runner.skip().unwrap(); - assert_eq!( - context - .target_states - .get(&runner.task.target) - .unwrap() - .get(), - &TargetState::Skipped - ); + assert_eq!(runner.target_state.as_ref().unwrap(), &TargetState::Skipped); } } @@ -931,9 +898,8 @@ mod task_runner { async fn creates_an_operation() { let container = TaskRunnerContainer::new("runner", "base").await; let mut runner = container.create_runner(); - let context = ActionContext::default(); - runner.skip_no_op(&context).unwrap(); + runner.skip_no_op().unwrap(); let operation = runner.operations.last().unwrap(); @@ -945,16 +911,11 @@ mod task_runner { async fn sets_passthrough_state() { let container = TaskRunnerContainer::new("runner", "base").await; let mut runner = container.create_runner(); - let context = ActionContext::default(); - runner.skip_no_op(&context).unwrap(); + runner.skip_no_op().unwrap(); assert_eq!( - context - .target_states - .get(&runner.task.target) - .unwrap() - .get(), + runner.target_state.as_ref().unwrap(), &TargetState::Passthrough ); } @@ -962,17 +923,12 @@ mod task_runner { async fn sets_completed_state() { let container = TaskRunnerContainer::new("runner", "base").await; let mut runner = container.create_runner(); - let context = ActionContext::default(); runner.report_item.hash = Some("hash123".into()); - runner.skip_no_op(&context).unwrap(); + runner.skip_no_op().unwrap(); assert_eq!( - context - .target_states - .get(&runner.task.target) - .unwrap() - .get(), + runner.target_state.as_ref().unwrap(), &TargetState::Passed("hash123".into()) ); } @@ -1046,8 +1002,7 @@ mod task_runner { let mut runner = container.create_runner(); - let context = ActionContext::default(); - let result = runner.hydrate(&context, "hash123").await.unwrap(); + let result = runner.hydrate("hash123").await.unwrap(); assert!(!result); @@ -1067,7 +1022,6 @@ mod task_runner { runner.cache.data.exit_code = 0; runner.cache.data.hash = "hash123".into(); - runner.action_digest = stub_digest(); } #[tokio::test] @@ -1077,8 +1031,7 @@ mod task_runner { setup_previous_state(&container, &mut runner); - let context = ActionContext::default(); - let result = runner.hydrate(&context, "hash123").await.unwrap(); + let result = runner.hydrate("hash123").await.unwrap(); assert!(result); @@ -1095,15 +1048,10 @@ mod task_runner { setup_previous_state(&container, &mut runner); - let context = ActionContext::default(); - runner.hydrate(&context, "hash123").await.unwrap(); + runner.hydrate("hash123").await.unwrap(); assert_eq!( - context - .target_states - .get(&runner.task.target) - .unwrap() - .get(), + runner.target_state.as_ref().unwrap(), &TargetState::Passed("hash123".into()) ); } @@ -1113,11 +1061,9 @@ mod task_runner { use super::*; use std::fs; - fn setup_local_state(container: &TaskRunnerContainer, runner: &mut TaskRunner) { + fn setup_local_state(container: &TaskRunnerContainer, _runner: &mut TaskRunner) { container.sandbox.enable_git(); container.pack_archive(); - - runner.action_digest = stub_digest(); } #[tokio::test] @@ -1127,8 +1073,7 @@ mod task_runner { setup_local_state(&container, &mut runner); - let context = ActionContext::default(); - let result = runner.hydrate(&context, "hash123").await.unwrap(); + let result = runner.hydrate("hash123").await.unwrap(); assert!(result); @@ -1145,15 +1090,10 @@ mod task_runner { setup_local_state(&container, &mut runner); - let context = ActionContext::default(); - runner.hydrate(&context, "hash123").await.unwrap(); + runner.hydrate("hash123").await.unwrap(); assert_eq!( - context - .target_states - .get(&runner.task.target) - .unwrap() - .get(), + runner.target_state.as_ref().unwrap(), &TargetState::Passed("hash123".into()) ); } @@ -1165,8 +1105,7 @@ mod task_runner { setup_local_state(&container, &mut runner); - let context = ActionContext::default(); - runner.hydrate(&context, "hash123").await.unwrap(); + runner.hydrate("hash123").await.unwrap(); let output_file = container.sandbox.path().join("project/file.txt"); @@ -1181,8 +1120,7 @@ mod task_runner { setup_local_state(&container, &mut runner); - let context = ActionContext::default(); - let result = runner.hydrate(&context, "hash123").await.unwrap(); + let result = runner.hydrate("hash123").await.unwrap(); assert!(result); diff --git a/packages/types/src/workspace-config.ts b/packages/types/src/workspace-config.ts index dcd10a50991..a47e9186c1f 100644 --- a/packages/types/src/workspace-config.ts +++ b/packages/types/src/workspace-config.ts @@ -240,6 +240,18 @@ export interface RunnerConfig { logRunningCommand: boolean; } +/** The API format of the remote service. */ +export type RemoteApi = 'grpc'; + +/** Configures basic HTTP authentication. */ +export interface RemoteAuthConfig { + /** HTTP headers to inject into every request. */ + headers: Record; + /** The name of an environment variable to use as a bearer token. */ + token: string | null; +} + +/** Supported blob compression levels. */ export type RemoteCompression = 'none' | 'zstd'; /** Configures the action cache (AC) and content addressable cache (CAS). */ @@ -303,6 +315,15 @@ export interface RemoteTlsConfig { /** Configures the remote service, powered by the Bazel Remote Execution API. */ export interface RemoteConfig { + /** + * The API format of the remote service. + * + * @default 'grpc' + * @type {'grpc'} + */ + api: RemoteApi; + /** Connect to the host using basic HTTP authentication. */ + auth: RemoteAuthConfig | null; /** Configures the action cache (AC) and content addressable cache (CAS). */ cache: RemoteCacheConfig; /** @@ -645,6 +666,14 @@ export interface PartialRunnerConfig { logRunningCommand?: boolean | null; } +/** Configures basic HTTP authentication. */ +export interface PartialRemoteAuthConfig { + /** HTTP headers to inject into every request. */ + headers?: Record | null; + /** The name of an environment variable to use as a bearer token. */ + token?: string | null; +} + /** Configures the action cache (AC) and content addressable cache (CAS). */ export interface PartialRemoteCacheConfig { /** @@ -705,6 +734,14 @@ export interface PartialRemoteTlsConfig { /** Configures the remote service, powered by the Bazel Remote Execution API. */ export interface PartialRemoteConfig { + /** + * The API format of the remote service. + * + * @default 'grpc' + */ + api?: RemoteApi | null; + /** Connect to the host using basic HTTP authentication. */ + auth?: PartialRemoteAuthConfig | null; /** Configures the action cache (AC) and content addressable cache (CAS). */ cache?: PartialRemoteCacheConfig | null; /** diff --git a/website/docs/config/workspace.mdx b/website/docs/config/workspace.mdx index 54372833673..de7037670af 100644 --- a/website/docs/config/workspace.mdx +++ b/website/docs/config/workspace.mdx @@ -489,6 +489,54 @@ notifier: Configures a remote service, primarily for cloud-based caching of artifacts. Learn more about this in the [remote caching](../guides/remote-cache) guide. +### `api` + + + +The API format of the remote server. This format dictates which type of client moon uses for +communicating with. Defaults to `grpc`. + +```yaml title=".moon/workspace.yml" {2} +unstable_remote: + api: 'grpc' +``` + +### `auth` + + + +Configures authorization and authentication level features of our remote clients. + +#### `headers` + + + +A mapping of HTTP headers to include in all requests to the remote server. These headers are applied +to all [API formats and protocols](#api), not just HTTP. + +```yaml title=".moon/workspace.yml" {2-4} +unstable_remote: + auth: + headers: + 'X-Custom-Header': 'value' +``` + +#### `token` + + + +The name of an environment variable in which to extract a token for +[Bearer HTTP authorization](https://swagger.io/docs/specification/v3_0/authentication/bearer-authentication/). +An `Authorization` HTTP header will be included in all requests to the remote server. + +If the token does not exist, or is not enabled, remote caching will be disabled. + +```yaml title=".moon/workspace.yml" {2-4} +unstable_remote: + auth: + token: 'ENV_VAR_NAME' +``` + ### `cache` diff --git a/website/docs/guides/remote-cache.mdx b/website/docs/guides/remote-cache.mdx index 5246f49c35f..68e49465815 100644 --- a/website/docs/guides/remote-cache.mdx +++ b/website/docs/guides/remote-cache.mdx @@ -97,6 +97,31 @@ unstable_remote: domain: 'your-host.com' ``` +## Cloud-hosted: Depot + +If you'd prefer not to host your own solution, you could use +[Depot Cache](https://depot.dev/products/cache), a cloud-based caching solution. To make use of +Depot, follow these steps: + +- Create an account on [depot.dev](https://depot.dev) +- Create an organization +- Go to organization settings -> API tokens +- Create a new API token +- Add the token as a `DEPOT_TOKEN` environment variable to your moon pipelines + +Once these steps have been completed, you can enable remote caching in moon with the following +configuration. If your Depot account has more than 1 organization, you'll need to set the +`X-Depot-Org` header. + +```yaml title=".moon/workspace.yml" +unstable_remote: + host: 'grpcs://cache.depot.dev' + auth: + token: 'DEPOT_TOKEN' + headers: + 'X-Depot-Org': '' +``` + ## Cloud-hosted: moonbase This solution utilizes our [moonbase](/moonbase) service, which is a SaaS cloud offering hosted and diff --git a/website/static/schemas/workspace.json b/website/static/schemas/workspace.json index 6b034145f9c..58a61c5443a 100644 --- a/website/static/schemas/workspace.json +++ b/website/static/schemas/workspace.json @@ -513,6 +513,43 @@ "description": "Strategies and protocols for locating plugins.", "type": "string" }, + "RemoteApi": { + "description": "The API format of the remote service.", + "type": "string", + "enum": [ + "grpc" + ] + }, + "RemoteAuthConfig": { + "description": "Configures basic HTTP authentication.", + "type": "object", + "properties": { + "headers": { + "title": "headers", + "description": "HTTP headers to inject into every request.", + "type": "object", + "additionalProperties": { + "type": "string" + }, + "propertyNames": { + "type": "string" + } + }, + "token": { + "title": "token", + "description": "The name of an environment variable to use as a bearer token.", + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ] + } + }, + "additionalProperties": false + }, "RemoteCacheConfig": { "description": "Configures the action cache (AC) and content addressable cache (CAS).", "type": "object", @@ -537,6 +574,7 @@ "additionalProperties": false }, "RemoteCompression": { + "description": "Supported blob compression levels.", "type": "string", "enum": [ "none", @@ -547,6 +585,28 @@ "description": "Configures the remote service, powered by the Bazel Remote Execution API.", "type": "object", "properties": { + "api": { + "title": "api", + "description": "The API format of the remote service.", + "default": "grpc", + "allOf": [ + { + "$ref": "#/definitions/RemoteApi" + } + ] + }, + "auth": { + "title": "auth", + "description": "Connect to the host using basic HTTP authentication.", + "anyOf": [ + { + "$ref": "#/definitions/RemoteAuthConfig" + }, + { + "type": "null" + } + ] + }, "cache": { "title": "cache", "description": "Configures the action cache (AC) and content addressable cache (CAS).",