Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions app/buck2_re_configuration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,8 @@ pub struct Buck2OssReConfiguration {
pub grpc_keepalive_timeout_secs: Option<u64>,
/// Whether to send HTTP/2 pings when connection is idle.
pub grpc_keepalive_while_idle: Option<bool>,
/// Maximum retries for network requests.
pub max_retries: usize,
}

#[derive(Clone, Debug, Default, Allocative)]
Expand Down Expand Up @@ -562,6 +564,12 @@ impl Buck2OssReConfiguration {
section: BUCK2_RE_CLIENT_CFG_SECTION,
property: "grpc_keepalive_while_idle",
})?,
max_retries: legacy_config
.parse(BuckconfigKeyRef {
section: BUCK2_RE_CLIENT_CFG_SECTION,
property: "max_retries",
})?
.unwrap_or(0),
})
}
}
Expand Down
55 changes: 53 additions & 2 deletions remote_execution/oss/re_grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ pub struct RERuntimeOpts {
max_concurrent_uploads_per_action: Option<usize>,
/// Time that digests are assumed to live in CAS after being touched.
cas_ttl_secs: i64,
/// Maximum retries for network requests.
max_retries: usize,
}

struct InstanceName(Option<String>);
Expand Down Expand Up @@ -373,6 +375,7 @@ impl REClientBuilder {
// NOTE: This is an arbitrary number because RBE does not return information
// on the TTL of the remote blob.
cas_ttl_secs: opts.cas_ttl_secs.unwrap_or(60),
max_retries: opts.max_retries,
},
grpc_clients,
capabilities,
Expand Down Expand Up @@ -871,6 +874,7 @@ impl REClient {
request: DownloadRequest,
) -> anyhow::Result<DownloadResponse> {
download_impl(
&self.runtime_opts,
&self.instance_name,
request,
self.capabilities.max_total_batch_size,
Expand Down Expand Up @@ -1248,10 +1252,11 @@ fn convert_t_action_result2(t_action_result: TActionResult2) -> anyhow::Result<A
}

async fn download_impl<Byt, BytRet, Cas>(
opts: &RERuntimeOpts,
instance_name: &InstanceName,
request: DownloadRequest,
max_total_batch_size: usize,
cas_f: impl Fn(BatchReadBlobsRequest) -> Cas,
cas_f: impl Clone + Fn(BatchReadBlobsRequest) -> Cas,
bystream_fut: impl Fn(ReadRequest) -> Byt + Sync + Send + Copy,
) -> anyhow::Result<DownloadResponse>
where
Expand Down Expand Up @@ -1321,9 +1326,10 @@ where

let mut batched_blobs_response = HashMap::new();
for read_blob_req in requests {
let resp = cas_f(read_blob_req)
let resp = batch_read_blobs(opts, read_blob_req, cas_f.clone())
.await
.context("Failed to make BatchReadBlobs request")?;

for r in resp.responses.into_iter() {
let digest = tdigest_from(r.digest.context("Response digest not found.")?);
check_status(r.status.unwrap_or_default())?;
Expand Down Expand Up @@ -1421,6 +1427,35 @@ where
})
}

async fn batch_read_blobs<Cas>(
opts: &RERuntimeOpts,
read_blobs_request: BatchReadBlobsRequest,
cas_f: impl Clone + Fn(BatchReadBlobsRequest) -> Cas,
) -> anyhow::Result<BatchReadBlobsResponse>
where
Cas: Future<Output = anyhow::Result<BatchReadBlobsResponse>>,
{
for i in 1..=opts.max_retries + 1 {
// TODO: Hopefully this isn't too expensive? Can we take a reference to the request
// instead?
match cas_f(read_blobs_request.clone()).await {
Ok(resp) => {
return Ok(resp);
}
Err(e) => {
tracing::warn!(
"Failed to make BatchReadBlobs request, retrying after sleeping {} seconds: {:#?}",
i,
e
);
tokio::time::sleep(Duration::from_secs(i as u64)).await;
}
}
}

cas_f(read_blobs_request).await
}

async fn upload_impl<Byt, Cas>(
instance_name: &InstanceName,
request: UploadRequest,
Expand Down Expand Up @@ -1741,6 +1776,15 @@ mod tests {

use super::*;

fn test_re_runtime_opts() -> RERuntimeOpts {
RERuntimeOpts {
use_fbcode_metadata: false,
max_concurrent_uploads_per_action: None,
cas_ttl_secs: 60,
max_retries: 0,
}
}

#[tokio::test]
async fn test_download_named() -> anyhow::Result<()> {
let work = tempfile::tempdir()?;
Expand Down Expand Up @@ -1804,6 +1848,7 @@ mod tests {
};

download_impl(
&test_re_runtime_opts(),
&InstanceName(None),
req,
10000,
Expand Down Expand Up @@ -1910,6 +1955,7 @@ mod tests {
};

download_impl(
&test_re_runtime_opts(),
&InstanceName(None),
req,
10, // kept small to simulate a large file download
Expand Down Expand Up @@ -1991,6 +2037,7 @@ mod tests {
};

let res = download_impl(
&test_re_runtime_opts(),
&InstanceName(None),
req,
100000,
Expand Down Expand Up @@ -2077,6 +2124,7 @@ mod tests {
let counter = AtomicU16::new(0);

let res = download_impl(
&test_re_runtime_opts(),
&InstanceName(None),
req,
7,
Expand Down Expand Up @@ -2145,6 +2193,7 @@ mod tests {
};

let res = download_impl(
&test_re_runtime_opts(),
&InstanceName(None),
req,
10, // intentionally small value to keep data in the test blobs small
Expand Down Expand Up @@ -2200,6 +2249,7 @@ mod tests {
let res = BatchReadBlobsResponse { responses: vec![] };

let res = download_impl(
&test_re_runtime_opts(),
&InstanceName(None),
req,
100000,
Expand Down Expand Up @@ -2238,6 +2288,7 @@ mod tests {
};

download_impl(
&test_re_runtime_opts(),
&InstanceName(Some("instance".to_owned())),
req,
0,
Expand Down
Loading