Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new: Support Depot for remote caching. #1792

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 22 additions & 14 deletions .moon/workspace.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,25 @@ docker:
include:
- '*.config.js'
- '*.json'
# unstable_remote:
# host: 'http://0.0.0.0:8080'
# # host: 'grpc://0.0.0.0:9092'
# cache:
# compression: 'zstd'
# mtls:
# caCert: 'crates/remote/tests/__fixtures__/certs-local/ca.pem'
# clientCert: 'crates/remote/tests/__fixtures__/certs-local/client.pem'
# clientKey: 'crates/remote/tests/__fixtures__/certs-local/client.key'
# domain: 'localhost'
# tls:
# # assumeHttp2: true
# cert: 'crates/remote/tests/__fixtures__/certs-local/ca.pem'
# # domain: 'localhost'
unstable_remote:
# host: 'http://0.0.0.0:8080'
# host: 'grpcs://0.0.0.0:9092'
host: 'grpcs://cache.depot.dev'
auth:
token: 'DEPOT_TOKEN'
headers:
Authorization:
'Bearer depot_org_ece2365d877c7f2f437f46de5bf03bd54862e3ca7fde67f079264e971ecb0789'
'X-Depot-Org': '1xtpjd084j'
'X-Depot-Project': '90xxfkst9n'
# cache:
# compression: 'zstd'
# mtls:
# caCert: 'crates/remote/tests/__fixtures__/certs-local/ca.pem'
# clientCert: 'crates/remote/tests/__fixtures__/certs-local/client.pem'
# clientKey: 'crates/remote/tests/__fixtures__/certs-local/client.key'
# domain: 'localhost'
# tls:
# # assumeHttp2: true
# cert: 'crates/remote/tests/__fixtures__/certs-local/client.pem'
# domain: 'localhost'
21 changes: 20 additions & 1 deletion crates/config/src/workspace/remote_config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::portable_path::FilePath;
use rustc_hash::FxHashMap;
use schematic::{derive_enum, validate, Config, ConfigEnum, ValidateError, ValidateResult};

fn path_is_required<D, C>(
Expand All @@ -14,6 +15,16 @@ fn path_is_required<D, C>(
Ok(())
}

/// Configures basic HTTP authentication.
#[derive(Clone, Config, Debug)]
pub struct RemoteAuthConfig {
/// HTTP headers to inject into every request.
pub headers: FxHashMap<String, String>,

/// The name of an environment variable to use as a bearer token.
pub token: Option<String>,
}

derive_enum!(
#[derive(Copy, ConfigEnum, Default)]
pub enum RemoteCompression {
Expand Down Expand Up @@ -81,6 +92,10 @@ pub struct RemoteMtlsConfig {
/// Configures the remote service, powered by the Bazel Remote Execution API.
#[derive(Clone, Config, Debug)]
pub struct RemoteConfig {
/// Connect to the host using basic HTTP authentication.
#[setting(nested)]
pub auth: Option<RemoteAuthConfig>,

/// Configures the action cache (AC) and content addressable cache (CAS).
#[setting(nested)]
pub cache: RemoteCacheConfig,
Expand All @@ -101,11 +116,15 @@ pub struct RemoteConfig {
}

impl RemoteConfig {
pub fn is_bearer_auth(&self) -> bool {
self.auth.as_ref().is_some_and(|auth| auth.token.is_some())
}

pub fn is_localhost(&self) -> bool {
self.host.contains("localhost") || self.host.contains("0.0.0.0")
}

pub fn is_secure(&self) -> bool {
self.tls.is_some() || self.mtls.is_some()
self.is_bearer_auth() || self.tls.is_some() || self.mtls.is_some()
}
}
116 changes: 92 additions & 24 deletions crates/remote/src/grpc_remote_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ use bazel_remote_apis::build::bazel::remote::execution::v2::{
GetCapabilitiesRequest, ServerCapabilities, UpdateActionResultRequest,
};
use moon_common::color;
use moon_config::{RemoteCompression, RemoteConfig};
use std::{error::Error, path::Path};
use moon_config::RemoteConfig;
use std::{env, error::Error, path::Path};
use tonic::{
metadata::{MetadataKey, MetadataValue},
transport::{Channel, Endpoint},
Code,
Code, Request, Status,
};
use tracing::{trace, warn};

Expand All @@ -26,6 +27,8 @@ fn map_transport_error(error: tonic::transport::Error) -> RemoteError {
}

fn map_status_error(error: tonic::Status) -> RemoteError {
dbg!(&error);

match error.source() {
Some(src) => RemoteError::GrpcCallFailedViaSource {
error: src.to_string(),
Expand All @@ -39,8 +42,42 @@ fn map_status_error(error: tonic::Status) -> RemoteError {
#[derive(Default)]
pub struct GrpcRemoteClient {
channel: Option<Channel>,
compression: RemoteCompression,
instance_name: String,
config: RemoteConfig,
}

impl GrpcRemoteClient {
fn inject_auth_headers(&self, mut req: Request<()>) -> Result<Request<()>, Status> {
if let Some(auth) = &self.config.auth {
let headers = req.metadata_mut();

for (key, value) in &auth.headers {
headers.insert(
MetadataKey::from_bytes(key.as_bytes()).unwrap(),
MetadataValue::try_from(value).unwrap(),
);
}

if let Some(token_name) = &auth.token {
let token = env::var(token_name).unwrap_or_default();

if token.is_empty() {
warn!(
"Remote service auth token {} does not exist, unable to authorize",
color::property(token_name)
);
} else {
headers.insert(
"Authorization",
MetadataValue::try_from(format!("Bearer {token}")).unwrap(),
);
}
}
}

dbg!(&req);

Ok(req)
}
}

#[async_trait::async_trait]
Expand All @@ -60,11 +97,21 @@ impl RemoteClient for GrpcRemoteClient {
"(with mTLS)"
} else if config.tls.is_some() {
"(with TLS)"
} else if config.is_bearer_auth() {
"(with auth)"
} else {
"(insecure)"
}
);

// Although we use a grpc(s) protocol for the host,
// tonic only supports http(s), so change it
// let url = if let Some(suffix) = host.strip_prefix("grpc") {
// format!("http{suffix}")
// } else {
// host.to_owned()
// };

let mut endpoint = Endpoint::from_shared(host.to_owned())
.map_err(map_transport_error)?
.user_agent("moon")
Expand Down Expand Up @@ -92,22 +139,31 @@ impl RemoteClient for GrpcRemoteClient {
);
}

self.channel = Some(endpoint.connect().await.map_err(map_transport_error)?);
self.compression = config.cache.compression;
self.instance_name = config.cache.instance_name.clone();
self.config = config.to_owned();

// We can't inject auth headers into this initial connection,
// so defer the connection until a client is used
if self.config.is_bearer_auth() {
self.channel = Some(endpoint.connect_lazy());
} else {
self.channel = Some(endpoint.connect().await.map_err(map_transport_error)?);
}

Ok(())
}

// https://github.com/bazelbuild/remote-apis/blob/main/build/bazel/remote/execution/v2/remote_execution.proto#L452
async fn load_capabilities(&self) -> miette::Result<ServerCapabilities> {
let mut client = CapabilitiesClient::new(self.channel.clone().unwrap());
let mut client =
CapabilitiesClient::with_interceptor(self.channel.clone().unwrap(), |req| {
self.inject_auth_headers(req)
});

trace!("Loading remote execution API capabilities from gRPC server");

let response = client
.get_capabilities(GetCapabilitiesRequest {
instance_name: self.instance_name.clone(),
instance_name: self.config.cache.instance_name.clone(),
})
.await
.map_err(map_status_error)?;
Expand All @@ -117,13 +173,16 @@ impl RemoteClient for GrpcRemoteClient {

// https://github.com/bazelbuild/remote-apis/blob/main/build/bazel/remote/execution/v2/remote_execution.proto#L170
async fn get_action_result(&self, digest: &Digest) -> miette::Result<Option<ActionResult>> {
let mut client = ActionCacheClient::new(self.channel.clone().unwrap());
let mut client =
ActionCacheClient::with_interceptor(self.channel.clone().unwrap(), |req| {
self.inject_auth_headers(req)
});

trace!(hash = &digest.hash, "Checking for a cached action result");

match client
.get_action_result(GetActionResultRequest {
instance_name: self.instance_name.clone(),
instance_name: self.config.cache.instance_name.clone(),
action_digest: Some(digest.to_owned()),
inline_stderr: true,
inline_stdout: true,
Expand Down Expand Up @@ -164,7 +223,10 @@ impl RemoteClient for GrpcRemoteClient {
digest: &Digest,
result: ActionResult,
) -> miette::Result<Option<ActionResult>> {
let mut client = ActionCacheClient::new(self.channel.clone().unwrap());
let mut client =
ActionCacheClient::with_interceptor(self.channel.clone().unwrap(), |req| {
self.inject_auth_headers(req)
});

trace!(
hash = &digest.hash,
Expand All @@ -177,7 +239,7 @@ impl RemoteClient for GrpcRemoteClient {

match client
.update_action_result(UpdateActionResultRequest {
instance_name: self.instance_name.clone(),
instance_name: self.config.cache.instance_name.clone(),
action_digest: Some(digest.to_owned()),
action_result: Some(result),
digest_function: digest_function::Value::Sha256 as i32,
Expand Down Expand Up @@ -222,19 +284,22 @@ impl RemoteClient for GrpcRemoteClient {
digest: &Digest,
blob_digests: Vec<Digest>,
) -> miette::Result<Vec<Blob>> {
let mut client = ContentAddressableStorageClient::new(self.channel.clone().unwrap());
let mut client = ContentAddressableStorageClient::with_interceptor(
self.channel.clone().unwrap(),
|req| self.inject_auth_headers(req),
);

trace!(
hash = &digest.hash,
compression = self.compression.to_string(),
compression = self.config.cache.compression.to_string(),
"Downloading {} output blobs",
blob_digests.len()
);

let response = match client
.batch_read_blobs(BatchReadBlobsRequest {
acceptable_compressors: get_acceptable_compressors(self.compression),
instance_name: self.instance_name.clone(),
acceptable_compressors: get_acceptable_compressors(self.config.cache.compression),
instance_name: self.config.cache.instance_name.clone(),
digests: blob_digests,
digest_function: digest_function::Value::Sha256 as i32,
})
Expand Down Expand Up @@ -272,7 +337,7 @@ impl RemoteClient for GrpcRemoteClient {
if let Some(digest) = download.digest {
blobs.push(Blob {
digest,
bytes: decompress_blob(self.compression, download.data)?,
bytes: decompress_blob(self.config.cache.compression, download.data)?,
});
}

Expand All @@ -295,11 +360,14 @@ impl RemoteClient for GrpcRemoteClient {
digest: &Digest,
blobs: Vec<Blob>,
) -> miette::Result<Vec<Option<Digest>>> {
let mut client = ContentAddressableStorageClient::new(self.channel.clone().unwrap());
let mut client = ContentAddressableStorageClient::with_interceptor(
self.channel.clone().unwrap(),
|req| self.inject_auth_headers(req),
);

trace!(
hash = &digest.hash,
compression = self.compression.to_string(),
compression = self.config.cache.compression.to_string(),
"Uploading {} output blobs",
blobs.len()
);
Expand All @@ -309,14 +377,14 @@ impl RemoteClient for GrpcRemoteClient {
for blob in blobs {
requests.push(batch_update_blobs_request::Request {
digest: Some(blob.digest),
data: compress_blob(self.compression, blob.bytes)?,
compressor: get_compressor(self.compression),
data: compress_blob(self.config.cache.compression, blob.bytes)?,
compressor: get_compressor(self.config.cache.compression),
});
}

let response = match client
.batch_update_blobs(BatchUpdateBlobsRequest {
instance_name: self.instance_name.clone(),
instance_name: self.config.cache.instance_name.clone(),
requests,
digest_function: digest_function::Value::Sha256 as i32,
})
Expand Down
2 changes: 2 additions & 0 deletions packages/types/src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ export interface Runtime {
}

export type ExtendsFrom = string[] | string;

// 123
Loading