From d3aca168acb37a3692de406664046ed7c2c48d7f Mon Sep 17 00:00:00 2001 From: Mike Hilgendorf Date: Tue, 21 Oct 2025 15:40:40 -0500 Subject: [PATCH 1/2] fix(server::checkin): only copy files that do not exist in cache, and execute all copies concurrently --- packages/server/src/checkin/output.rs | 110 ++++++++++++++------------ 1 file changed, 61 insertions(+), 49 deletions(-) diff --git a/packages/server/src/checkin/output.rs b/packages/server/src/checkin/output.rs index 0453516d1..f8c9c8638 100644 --- a/packages/server/src/checkin/output.rs +++ b/packages/server/src/checkin/output.rs @@ -2,11 +2,13 @@ use { super::state::{State, Variant}, crate::{Server, temp::Temp}, bytes::Bytes, + futures::{TryStreamExt, stream::FuturesUnordered}, std::{collections::BTreeSet, os::unix::fs::PermissionsExt as _, sync::Arc}, tangram_client as tg, tangram_either::Either, tangram_messenger::Messenger as _, tangram_store::prelude::*, + tokio_util::task::AbortOnDropHandle, }; impl Server { @@ -14,14 +16,26 @@ impl Server { if state.arg.options.destructive { self.checkin_cache_task_destructive(state).await?; } else { - tokio::task::spawn_blocking({ - let server = self.clone(); - let state = state.clone(); - move || server.checkin_cache_task_inner(&state) - }) - .await - .unwrap() - .map_err(|source| tg::error!(!source, "the checkin cache task failed"))?; + // Collect all the file nodes that we're going to cache. + state + .graph + .nodes + .iter() + .filter_map(|node| { + if !node.variant.is_file() { + return None; + }; + let task = AbortOnDropHandle::new(tokio::task::spawn_blocking({ + let server = self.clone(); + let node = node.clone(); + move || server.checkin_cache_task_inner(&node) + })); + Some(task) + }) + .collect::>() + .try_collect::>() + .await + .map_err(|source| tg::error!(!source, "failed to cache artifacts"))?; } Ok(()) } @@ -68,51 +82,49 @@ impl Server { Ok(()) } - fn checkin_cache_task_inner(&self, state: &State) -> tg::Result<()> { - for node in &state.graph.nodes { - let Some(path) = node.path.as_ref() else { - continue; - }; - let metadata = node.path_metadata.as_ref().unwrap(); - if !metadata.is_file() { - continue; - } - let id = node.object_id.as_ref().unwrap(); + fn checkin_cache_task_inner(&self, node: &crate::checkin::state::Node) -> tg::Result<()> { + let id = node.object_id.as_ref().unwrap(); + let metadata = node.path_metadata.as_ref().unwrap(); + let path = node.path.as_ref().unwrap(); - // Copy the file to a temp. - let src = path; - let temp = Temp::new(self); - let dst = temp.path(); - std::fs::copy(src, dst) - .map_err(|source| tg::error!(!source, "failed to copy the file"))?; + // Skip files that have already been cached. + let cache_path = self.cache_path().join(id.to_string()); + if cache_path.exists() { + return Ok(()); + } - // Set its permissions. - if !metadata.is_symlink() { - let executable = metadata.permissions().mode() & 0o111 != 0; - let mode = if executable { 0o555 } else { 0o444 }; - let permissions = std::fs::Permissions::from_mode(mode); - std::fs::set_permissions(dst, permissions).map_err( - |source| tg::error!(!source, %path = dst.display(), "failed to set permissions"), - )?; - } + // Copy the file to a temp. + let src = path; + let temp = Temp::new(self); + let dst = temp.path(); + std::fs::copy(src, dst).map_err(|source| tg::error!(!source, "failed to copy the file"))?; + + // Set its permissions. + if !metadata.is_symlink() { + let executable = metadata.permissions().mode() & 0o111 != 0; + let mode = if executable { 0o555 } else { 0o444 }; + let permissions = std::fs::Permissions::from_mode(mode); + std::fs::set_permissions(dst, permissions).map_err( + |source| tg::error!(!source, %path = dst.display(), "failed to set permissions"), + )?; + } - // Rename the temp to the cache directory. - let src = temp.path(); - let dst = &self.cache_path().join(id.to_string()); - let done = match std::fs::rename(src, dst) { - Ok(()) => false, - Err(source) => { - return Err(tg::error!(!source, "failed to rename the file")); - }, - }; + // Rename the temp to the cache directory. + let src = temp.path(); + let dst = &cache_path; + let done = match std::fs::rename(src, dst) { + Ok(()) => false, + Err(source) => { + return Err(tg::error!(!source, "failed to rename the file")); + }, + }; - // Set the file times. - if !done { - let epoch = filetime::FileTime::from_system_time(std::time::SystemTime::UNIX_EPOCH); - filetime::set_symlink_file_times(dst, epoch, epoch).map_err( - |source| tg::error!(!source, %path = dst.display(), "failed to set the modified time"), - )?; - } + // Set the file times. + if !done { + let epoch = filetime::FileTime::from_system_time(std::time::SystemTime::UNIX_EPOCH); + filetime::set_symlink_file_times(dst, epoch, epoch).map_err( + |source| tg::error!(!source, %path = dst.display(), "failed to set the modified time"), + )?; } Ok(()) From add668f292bf6947eec83d229aaca3597b0247f0 Mon Sep 17 00:00:00 2001 From: Mike Hilgendorf Date: Tue, 21 Oct 2025 16:01:10 -0500 Subject: [PATCH 2/2] . --- packages/server/src/checkin/output.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/server/src/checkin/output.rs b/packages/server/src/checkin/output.rs index f8c9c8638..640a02987 100644 --- a/packages/server/src/checkin/output.rs +++ b/packages/server/src/checkin/output.rs @@ -22,9 +22,9 @@ impl Server { .nodes .iter() .filter_map(|node| { - if !node.variant.is_file() { + if node.path.is_none() || node.path_metadata.as_ref().is_none_or(|m| !m.is_file()){ return None; - }; + } let task = AbortOnDropHandle::new(tokio::task::spawn_blocking({ let server = self.clone(); let node = node.clone();