diff --git a/.moon/workspace.yml b/.moon/workspace.yml index a463d2809ea..a4c8de7dbaa 100644 --- a/.moon/workspace.yml +++ b/.moon/workspace.yml @@ -35,17 +35,23 @@ docker: include: - '*.config.js' - '*.json' -# unstable_remote: -# host: 'http://0.0.0.0:8080' -# # host: 'grpc://0.0.0.0:9092' -# cache: -# compression: 'zstd' -# mtls: -# caCert: 'crates/remote/tests/__fixtures__/certs-local/ca.pem' -# clientCert: 'crates/remote/tests/__fixtures__/certs-local/client.pem' -# clientKey: 'crates/remote/tests/__fixtures__/certs-local/client.key' -# domain: 'localhost' -# tls: -# # assumeHttp2: true -# cert: 'crates/remote/tests/__fixtures__/certs-local/ca.pem' -# # domain: 'localhost' +unstable_remote: + # host: 'http://0.0.0.0:8080' + # host: 'grpcs://0.0.0.0:9092' + host: 'grpcs://cache.depot.dev' + auth: + token: 'DEPOT_TOKEN' + headers: + 'X-Depot-Org': '1xtpjd084j' + 'X-Depot-Project': '90xxfkst9n' + # cache: + # compression: 'zstd' + # mtls: + # caCert: 'crates/remote/tests/__fixtures__/certs-local/ca.pem' + # clientCert: 'crates/remote/tests/__fixtures__/certs-local/client.pem' + # clientKey: 'crates/remote/tests/__fixtures__/certs-local/client.key' + # domain: 'localhost' + # tls: + # # assumeHttp2: true + # cert: 'crates/remote/tests/__fixtures__/certs-local/client.pem' + # domain: 'localhost' diff --git a/crates/config/src/workspace/remote_config.rs b/crates/config/src/workspace/remote_config.rs index 2faf86cf228..b3fed94b615 100644 --- a/crates/config/src/workspace/remote_config.rs +++ b/crates/config/src/workspace/remote_config.rs @@ -1,4 +1,5 @@ use crate::portable_path::FilePath; +use rustc_hash::FxHashMap; use schematic::{derive_enum, validate, Config, ConfigEnum, ValidateError, ValidateResult}; fn path_is_required( @@ -14,6 +15,16 @@ fn path_is_required( Ok(()) } +/// Configures basic HTTP authentication. +#[derive(Clone, Config, Debug)] +pub struct RemoteAuthConfig { + /// HTTP headers to inject into every request. + pub headers: FxHashMap, + + /// The name of an environment variable to use as a bearer token. + pub token: Option, +} + derive_enum!( #[derive(Copy, ConfigEnum, Default)] pub enum RemoteCompression { @@ -81,6 +92,10 @@ pub struct RemoteMtlsConfig { /// Configures the remote service, powered by the Bazel Remote Execution API. #[derive(Clone, Config, Debug)] pub struct RemoteConfig { + /// Connect to the host using basic HTTP authentication. + #[setting(nested)] + pub auth: Option, + /// Configures the action cache (AC) and content addressable cache (CAS). #[setting(nested)] pub cache: RemoteCacheConfig, @@ -106,6 +121,6 @@ impl RemoteConfig { } pub fn is_secure(&self) -> bool { - self.tls.is_some() || self.mtls.is_some() + self.auth.is_some() || self.tls.is_some() || self.mtls.is_some() } } diff --git a/crates/remote/src/grpc_remote_client.rs b/crates/remote/src/grpc_remote_client.rs index 456855c9ec9..8191a2fcbc2 100644 --- a/crates/remote/src/grpc_remote_client.rs +++ b/crates/remote/src/grpc_remote_client.rs @@ -11,11 +11,12 @@ use bazel_remote_apis::build::bazel::remote::execution::v2::{ GetCapabilitiesRequest, ServerCapabilities, UpdateActionResultRequest, }; use moon_common::color; -use moon_config::{RemoteCompression, RemoteConfig}; -use std::{error::Error, path::Path}; +use moon_config::RemoteConfig; +use std::{env, error::Error, path::Path}; use tonic::{ + metadata::{MetadataKey, MetadataValue}, transport::{Channel, Endpoint}, - Code, + Code, Request, Status, }; use tracing::{trace, warn}; @@ -26,6 +27,8 @@ fn map_transport_error(error: tonic::transport::Error) -> RemoteError { } fn map_status_error(error: tonic::Status) -> RemoteError { + dbg!(&error); + match error.source() { Some(src) => RemoteError::GrpcCallFailedViaSource { error: src.to_string(), @@ -39,8 +42,46 @@ fn map_status_error(error: tonic::Status) -> RemoteError { #[derive(Default)] pub struct GrpcRemoteClient { channel: Option, - compression: RemoteCompression, - instance_name: String, + config: RemoteConfig, +} + +impl GrpcRemoteClient { + fn inject_auth_headers(&self, mut req: Request<()>) -> Result, Status> { + if self.config.mtls.is_some() || self.config.tls.is_some() { + return Ok(req); + } + + if let Some(auth) = &self.config.auth { + let headers = req.metadata_mut(); + + for (key, value) in &auth.headers { + headers.insert( + MetadataKey::from_bytes(key.as_bytes()).unwrap(), + MetadataValue::try_from(value).unwrap(), + ); + } + + if let Some(token_name) = &auth.token { + let token = env::var(token_name).unwrap_or_default(); + + if token.is_empty() { + warn!( + "Remote service auth token {} does not exist, unable to authorize", + color::property(token_name) + ); + } else { + headers.insert( + "Authorization", + MetadataValue::try_from(format!("Bearer {token}")).unwrap(), + ); + } + } + } + + dbg!(&req); + + Ok(req) + } } #[async_trait::async_trait] @@ -60,12 +101,22 @@ impl RemoteClient for GrpcRemoteClient { "(with mTLS)" } else if config.tls.is_some() { "(with TLS)" + } else if config.auth.is_some() { + "(with auth)" } else { "(insecure)" } ); - let mut endpoint = Endpoint::from_shared(host.to_owned()) + // Although we use a grpc(s) protocol for the host, + // tonic only supports http(s), so change it + let url = if let Some(suffix) = host.strip_prefix("grpc") { + format!("http{suffix}") + } else { + host.to_owned() + }; + + let mut endpoint = Endpoint::from_shared(url) .map_err(map_transport_error)? .user_agent("moon") .map_err(map_transport_error)? @@ -92,22 +143,31 @@ impl RemoteClient for GrpcRemoteClient { ); } - self.channel = Some(endpoint.connect().await.map_err(map_transport_error)?); - self.compression = config.cache.compression; - self.instance_name = config.cache.instance_name.clone(); + self.config = config.to_owned(); + + // We can't inject auth headers into this initial connection, + // so defer the connection until a client is used + if self.config.auth.is_some() { + self.channel = Some(endpoint.connect_lazy()); + } else { + self.channel = Some(endpoint.connect().await.map_err(map_transport_error)?); + } Ok(()) } // https://github.com/bazelbuild/remote-apis/blob/main/build/bazel/remote/execution/v2/remote_execution.proto#L452 async fn load_capabilities(&self) -> miette::Result { - let mut client = CapabilitiesClient::new(self.channel.clone().unwrap()); + let mut client = + CapabilitiesClient::with_interceptor(self.channel.clone().unwrap(), |req| { + self.inject_auth_headers(req) + }); trace!("Loading remote execution API capabilities from gRPC server"); let response = client .get_capabilities(GetCapabilitiesRequest { - instance_name: self.instance_name.clone(), + instance_name: self.config.cache.instance_name.clone(), }) .await .map_err(map_status_error)?; @@ -117,13 +177,16 @@ impl RemoteClient for GrpcRemoteClient { // https://github.com/bazelbuild/remote-apis/blob/main/build/bazel/remote/execution/v2/remote_execution.proto#L170 async fn get_action_result(&self, digest: &Digest) -> miette::Result> { - let mut client = ActionCacheClient::new(self.channel.clone().unwrap()); + let mut client = + ActionCacheClient::with_interceptor(self.channel.clone().unwrap(), |req| { + self.inject_auth_headers(req) + }); trace!(hash = &digest.hash, "Checking for a cached action result"); match client .get_action_result(GetActionResultRequest { - instance_name: self.instance_name.clone(), + instance_name: self.config.cache.instance_name.clone(), action_digest: Some(digest.to_owned()), inline_stderr: true, inline_stdout: true, @@ -164,7 +227,10 @@ impl RemoteClient for GrpcRemoteClient { digest: &Digest, result: ActionResult, ) -> miette::Result> { - let mut client = ActionCacheClient::new(self.channel.clone().unwrap()); + let mut client = + ActionCacheClient::with_interceptor(self.channel.clone().unwrap(), |req| { + self.inject_auth_headers(req) + }); trace!( hash = &digest.hash, @@ -177,7 +243,7 @@ impl RemoteClient for GrpcRemoteClient { match client .update_action_result(UpdateActionResultRequest { - instance_name: self.instance_name.clone(), + instance_name: self.config.cache.instance_name.clone(), action_digest: Some(digest.to_owned()), action_result: Some(result), digest_function: digest_function::Value::Sha256 as i32, @@ -222,19 +288,22 @@ impl RemoteClient for GrpcRemoteClient { digest: &Digest, blob_digests: Vec, ) -> miette::Result> { - let mut client = ContentAddressableStorageClient::new(self.channel.clone().unwrap()); + let mut client = ContentAddressableStorageClient::with_interceptor( + self.channel.clone().unwrap(), + |req| self.inject_auth_headers(req), + ); trace!( hash = &digest.hash, - compression = self.compression.to_string(), + compression = self.config.cache.compression.to_string(), "Downloading {} output blobs", blob_digests.len() ); let response = match client .batch_read_blobs(BatchReadBlobsRequest { - acceptable_compressors: get_acceptable_compressors(self.compression), - instance_name: self.instance_name.clone(), + acceptable_compressors: get_acceptable_compressors(self.config.cache.compression), + instance_name: self.config.cache.instance_name.clone(), digests: blob_digests, digest_function: digest_function::Value::Sha256 as i32, }) @@ -272,7 +341,7 @@ impl RemoteClient for GrpcRemoteClient { if let Some(digest) = download.digest { blobs.push(Blob { digest, - bytes: decompress_blob(self.compression, download.data)?, + bytes: decompress_blob(self.config.cache.compression, download.data)?, }); } @@ -295,11 +364,14 @@ impl RemoteClient for GrpcRemoteClient { digest: &Digest, blobs: Vec, ) -> miette::Result>> { - let mut client = ContentAddressableStorageClient::new(self.channel.clone().unwrap()); + let mut client = ContentAddressableStorageClient::with_interceptor( + self.channel.clone().unwrap(), + |req| self.inject_auth_headers(req), + ); trace!( hash = &digest.hash, - compression = self.compression.to_string(), + compression = self.config.cache.compression.to_string(), "Uploading {} output blobs", blobs.len() ); @@ -309,14 +381,14 @@ impl RemoteClient for GrpcRemoteClient { for blob in blobs { requests.push(batch_update_blobs_request::Request { digest: Some(blob.digest), - data: compress_blob(self.compression, blob.bytes)?, - compressor: get_compressor(self.compression), + data: compress_blob(self.config.cache.compression, blob.bytes)?, + compressor: get_compressor(self.config.cache.compression), }); } let response = match client .batch_update_blobs(BatchUpdateBlobsRequest { - instance_name: self.instance_name.clone(), + instance_name: self.config.cache.instance_name.clone(), requests, digest_function: digest_function::Value::Sha256 as i32, })