Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 61 additions & 49 deletions packages/server/src/checkin/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,40 @@ use {
super::state::{State, Variant},
crate::{Server, temp::Temp},
bytes::Bytes,
futures::{TryStreamExt, stream::FuturesUnordered},
std::{collections::BTreeSet, os::unix::fs::PermissionsExt as _, sync::Arc},
tangram_client as tg,
tangram_either::Either,
tangram_messenger::Messenger as _,
tangram_store::prelude::*,
tokio_util::task::AbortOnDropHandle,
};

impl Server {
pub(super) async fn checkin_cache(&self, state: Arc<State>) -> tg::Result<()> {
if state.arg.options.destructive {
self.checkin_cache_task_destructive(state).await?;
} else {
tokio::task::spawn_blocking({
let server = self.clone();
let state = state.clone();
move || server.checkin_cache_task_inner(&state)
})
.await
.unwrap()
.map_err(|source| tg::error!(!source, "the checkin cache task failed"))?;
// Collect all the file nodes that we're going to cache.
state
.graph
.nodes
.iter()
.filter_map(|node| {
if node.path.is_none() || node.path_metadata.as_ref().is_none_or(|m| !m.is_file()){
return None;
}
let task = AbortOnDropHandle::new(tokio::task::spawn_blocking({
let server = self.clone();
let node = node.clone();
move || server.checkin_cache_task_inner(&node)
}));
Some(task)
})
.collect::<FuturesUnordered<_>>()
.try_collect::<Vec<_>>()
.await
.map_err(|source| tg::error!(!source, "failed to cache artifacts"))?;
}
Ok(())
}
Expand Down Expand Up @@ -68,51 +82,49 @@ impl Server {
Ok(())
}

fn checkin_cache_task_inner(&self, state: &State) -> tg::Result<()> {
for node in &state.graph.nodes {
let Some(path) = node.path.as_ref() else {
continue;
};
let metadata = node.path_metadata.as_ref().unwrap();
if !metadata.is_file() {
continue;
}
let id = node.object_id.as_ref().unwrap();
fn checkin_cache_task_inner(&self, node: &crate::checkin::state::Node) -> tg::Result<()> {
let id = node.object_id.as_ref().unwrap();
let metadata = node.path_metadata.as_ref().unwrap();
let path = node.path.as_ref().unwrap();

// Copy the file to a temp.
let src = path;
let temp = Temp::new(self);
let dst = temp.path();
std::fs::copy(src, dst)
.map_err(|source| tg::error!(!source, "failed to copy the file"))?;
// Skip files that have already been cached.
let cache_path = self.cache_path().join(id.to_string());
if cache_path.exists() {
return Ok(());
}

// Set its permissions.
if !metadata.is_symlink() {
let executable = metadata.permissions().mode() & 0o111 != 0;
let mode = if executable { 0o555 } else { 0o444 };
let permissions = std::fs::Permissions::from_mode(mode);
std::fs::set_permissions(dst, permissions).map_err(
|source| tg::error!(!source, %path = dst.display(), "failed to set permissions"),
)?;
}
// Copy the file to a temp.
let src = path;
let temp = Temp::new(self);
let dst = temp.path();
std::fs::copy(src, dst).map_err(|source| tg::error!(!source, "failed to copy the file"))?;

// Set its permissions.
if !metadata.is_symlink() {
let executable = metadata.permissions().mode() & 0o111 != 0;
let mode = if executable { 0o555 } else { 0o444 };
let permissions = std::fs::Permissions::from_mode(mode);
std::fs::set_permissions(dst, permissions).map_err(
|source| tg::error!(!source, %path = dst.display(), "failed to set permissions"),
)?;
}

// Rename the temp to the cache directory.
let src = temp.path();
let dst = &self.cache_path().join(id.to_string());
let done = match std::fs::rename(src, dst) {
Ok(()) => false,
Err(source) => {
return Err(tg::error!(!source, "failed to rename the file"));
},
};
// Rename the temp to the cache directory.
let src = temp.path();
let dst = &cache_path;
let done = match std::fs::rename(src, dst) {
Ok(()) => false,
Err(source) => {
return Err(tg::error!(!source, "failed to rename the file"));
},
};

// Set the file times.
if !done {
let epoch = filetime::FileTime::from_system_time(std::time::SystemTime::UNIX_EPOCH);
filetime::set_symlink_file_times(dst, epoch, epoch).map_err(
|source| tg::error!(!source, %path = dst.display(), "failed to set the modified time"),
)?;
}
// Set the file times.
if !done {
let epoch = filetime::FileTime::from_system_time(std::time::SystemTime::UNIX_EPOCH);
filetime::set_symlink_file_times(dst, epoch, epoch).map_err(
|source| tg::error!(!source, %path = dst.display(), "failed to set the modified time"),
)?;
}

Ok(())
Expand Down