diff --git a/crates/rattler_cache/src/package_cache/cache_lock.rs b/crates/rattler_cache/src/package_cache/cache_lock.rs index 3ffaffa1b..abcbd8158 100644 --- a/crates/rattler_cache/src/package_cache/cache_lock.rs +++ b/crates/rattler_cache/src/package_cache/cache_lock.rs @@ -11,7 +11,7 @@ use fs4::fs_std::FileExt; use parking_lot::Mutex; use rattler_digest::Sha256Hash; -use crate::package_cache::PackageCacheError; +use crate::package_cache::PackageCacheLayerError; /// A lock on the cache entry. As long as this lock is held, no other process is /// allowed to modify the cache entry. This however, does not guarantee that the @@ -60,7 +60,7 @@ impl Drop for CacheRwLock { } impl CacheRwLock { - pub async fn acquire_read(path: &Path) -> Result { + pub async fn acquire_read(path: &Path) -> Result { let lock_file_path = path.to_path_buf(); let acquire_lock_fut = simple_spawn_blocking::tokio::run_blocking_task(move || { @@ -71,7 +71,7 @@ impl CacheRwLock { .write(true) .open(&lock_file_path) .map_err(|e| { - PackageCacheError::LockError( + PackageCacheLayerError::LockError( format!( "failed to open cache lock for reading: '{}'", lock_file_path.display() @@ -81,7 +81,7 @@ impl CacheRwLock { })?; fs4::fs_std::FileExt::lock_shared(&file).map_err(move |e| { - PackageCacheError::LockError( + PackageCacheLayerError::LockError( format!( "failed to acquire read lock on cache lock file: '{}'", lock_file_path.display() @@ -108,7 +108,7 @@ impl CacheRwLock { } impl CacheRwLock { - pub async fn acquire_write(path: &Path) -> Result { + pub async fn acquire_write(path: &Path) -> Result { let lock_file_path = path.to_path_buf(); let acquire_lock_fut = simple_spawn_blocking::tokio::run_blocking_task(move || { let file = std::fs::OpenOptions::new() @@ -118,7 +118,7 @@ impl CacheRwLock { .read(true) .open(&lock_file_path) .map_err(|e| { - PackageCacheError::LockError( + PackageCacheLayerError::LockError( format!( "failed to open cache lock for writing: '{}", lock_file_path.display() @@ -128,7 +128,7 @@ impl CacheRwLock { })?; file.lock_exclusive().map_err(move |e| { - PackageCacheError::LockError( + PackageCacheLayerError::LockError( format!( "failed to acquire write lock on cache lock file: '{}'", lock_file_path.display() @@ -159,7 +159,7 @@ impl CacheRwLock { &mut self, revision: u64, sha256: Option<&Sha256Hash>, - ) -> Result<(), PackageCacheError> { + ) -> Result<(), PackageCacheLayerError> { let file = self.file.clone(); let sha256 = sha256.cloned(); simple_spawn_blocking::tokio::run_blocking_task(move || { @@ -167,7 +167,7 @@ impl CacheRwLock { // Ensure we write from the start of the file file.rewind().map_err(|e| { - PackageCacheError::LockError( + PackageCacheLayerError::LockError( "failed to rewind cache lock for reading revision".to_string(), e, ) @@ -176,7 +176,7 @@ impl CacheRwLock { // Write the bytes of the revision let revision_bytes = revision.to_be_bytes(); file.write_all(&revision_bytes).map_err(|e| { - PackageCacheError::LockError( + PackageCacheLayerError::LockError( "failed to write revision from cache lock".to_string(), e, ) @@ -187,7 +187,7 @@ impl CacheRwLock { let len = sha.len(); let sha = &sha[..]; file.write_all(sha).map_err(|e| { - PackageCacheError::LockError( + PackageCacheLayerError::LockError( "failed to write sha256 from cache lock".to_string(), e, ) @@ -199,7 +199,7 @@ impl CacheRwLock { // Ensure all bytes are written to disk file.flush().map_err(|e| { - PackageCacheError::LockError( + PackageCacheLayerError::LockError( "failed to flush cache lock after writing revision".to_string(), e, ) @@ -208,7 +208,7 @@ impl CacheRwLock { // Update the length of the file let file_length = revision_bytes.len() + sha_bytes; file.set_len(file_length as u64).map_err(|e| { - PackageCacheError::LockError( + PackageCacheLayerError::LockError( "failed to truncate cache lock after writing revision".to_string(), e, ) @@ -222,10 +222,10 @@ impl CacheRwLock { impl CacheRwLock { /// Reads the revision from the cache lock file. - pub fn read_revision(&mut self) -> Result { + pub fn read_revision(&mut self) -> Result { let mut file = self.file.lock(); file.rewind().map_err(|e| { - PackageCacheError::LockError( + PackageCacheLayerError::LockError( "failed to rewind cache lock for reading revision".to_string(), e, ) @@ -237,7 +237,7 @@ impl CacheRwLock { return Ok(0); } Err(e) => { - return Err(PackageCacheError::LockError( + return Err(PackageCacheLayerError::LockError( "failed to read revision from cache lock".to_string(), e, )); @@ -247,19 +247,22 @@ impl CacheRwLock { } /// Reads the sha256 hash from the cache lock file. - pub fn read_sha256(&mut self) -> Result, PackageCacheError> { + pub fn read_sha256(&mut self) -> Result, PackageCacheLayerError> { const SHA256_LEN: usize = 32; const REVISION_LEN: u64 = 8; let mut file = self.file.lock(); file.rewind().map_err(|e| { - PackageCacheError::LockError( + PackageCacheLayerError::LockError( "failed to rewind cache lock for reading sha256".to_string(), e, ) })?; let mut buf = [0; SHA256_LEN]; let _ = file.seek(SeekFrom::Start(REVISION_LEN)).map_err(|e| { - PackageCacheError::LockError("failed to seek to sha256 in cache lock".to_string(), e) + PackageCacheLayerError::LockError( + "failed to seek to sha256 in cache lock".to_string(), + e, + ) })?; match file.read_exact(&mut buf) { Ok(_) => {} @@ -267,7 +270,7 @@ impl CacheRwLock { return Ok(None); } Err(e) => { - return Err(PackageCacheError::LockError( + return Err(PackageCacheLayerError::LockError( "failed to read sha256 from cache lock".to_string(), e, )); diff --git a/crates/rattler_cache/src/package_cache/mod.rs b/crates/rattler_cache/src/package_cache/mod.rs index dae8e79ec..1b2bb612b 100644 --- a/crates/rattler_cache/src/package_cache/mod.rs +++ b/crates/rattler_cache/src/package_cache/mod.rs @@ -6,6 +6,7 @@ use std::{ fmt::Debug, future::Future, path::{Path, PathBuf}, + pin::Pin, sync::Arc, time::{Duration, SystemTime}, }; @@ -39,7 +40,7 @@ mod reporter; /// Instead, this is left up to the user when the package is requested. If the /// package is found in the cache it is returned immediately. However, if the /// cache is stale a user defined function is called to populate the cache. This -/// separates the corners between caching and fetching of the content. +/// separates the concerns between caching and fetching of the content. #[derive(Clone)] pub struct PackageCache { inner: Arc, @@ -48,6 +49,10 @@ pub struct PackageCache { #[derive(Default)] struct PackageCacheInner { + layers: Vec, +} + +pub struct PackageCacheLayer { path: PathBuf, packages: DashMap>>, } @@ -72,27 +77,55 @@ impl From for BucketKey { } } -#[derive(Default)] +#[derive(Default, Debug)] struct Entry { last_revision: Option, last_sha256: Option, } -/// An error that might be returned from one of the caching function of the -/// [`PackageCache`]. +/// Errors specific to the `PackageCache` interface #[derive(Debug, thiserror::Error)] +#[non_exhaustive] pub enum PackageCacheError { - /// An error occurred while fetching the package. - #[error(transparent)] - FetchError(#[from] Arc), + /// The operation was cancelled + #[error("the operation was cancelled")] + Cancelled, + + /// An error occurred in a cache layer + #[error("failed to interact with the package cache layer.")] + LayerError(#[source] Box), // Wraps layer-specific errors + + /// there are no writable errors to install package to + #[error("no writable layers to install package to")] + NoWritableLayers, +} + +/// Errors specific to individual layers in the `PackageCache` +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +pub enum PackageCacheLayerError { + /// The package is invalid + #[error("package is invalid")] + InvalidPackage, + + /// The package was not found in this layer + #[error("package not found in this layer")] + PackageNotFound, /// A locking error occurred #[error("{0}")] LockError(String, #[source] std::io::Error), /// The operation was cancelled - #[error("operation was cancelled")] + #[error("the operation was cancelled")] Cancelled, + + /// An error occurred while fetching the package. + #[error(transparent)] + FetchError(#[from] Arc), + + #[error("package cache layer error: {0}")] + OtherError(#[source] Box), } impl From for PackageCacheError { @@ -101,16 +134,107 @@ impl From for PackageCacheError { } } +impl From for PackageCacheLayerError { + fn from(_value: Cancelled) -> Self { + Self::Cancelled + } +} + +impl From for PackageCacheError { + fn from(err: PackageCacheLayerError) -> Self { + // Convert the PackageCacheLayerError to a LayerError by boxing it + PackageCacheError::LayerError(Box::new(err)) + } +} + +impl PackageCacheLayer { + /// Determine if the layer is read-only in the filesystem + pub fn is_readonly(&self) -> bool { + self.path + .metadata() + .map(|m| m.permissions().readonly()) + .unwrap_or(false) + } + + /// Validate the packages. + pub async fn try_validate( + &self, + cache_key: &CacheKey, + ) -> Result { + let cache_entry = self + .packages + .get(&cache_key.clone().into()) + .ok_or(PackageCacheLayerError::PackageNotFound)? + .clone(); + let mut cache_entry = cache_entry.lock().await; + let cache_path = self.path.join(cache_key.to_string()); + + match validate_package_common::< + fn(PathBuf) -> _, + Pin> + Send>>, + std::io::Error, + >( + cache_path, + cache_entry.last_revision, + cache_key.sha256.as_ref(), + None, + None, + ) + .await + { + Ok(cache_lock) => { + cache_entry.last_revision = Some(cache_lock.revision); + cache_entry.last_sha256 = cache_lock.sha256; + Ok(cache_lock) + } + Err(err) => Err(err), + } + } + + /// Validate the package, and fetch it if invalid. + pub async fn validate_or_fetch( + &self, + fetch: F, + cache_key: &CacheKey, + reporter: Option>, + ) -> Result + where + F: (Fn(PathBuf) -> Fut) + Send + 'static, + Fut: Future> + Send + 'static, + E: std::error::Error + Send + Sync + 'static, + { + let entry = self + .packages + .entry(cache_key.clone().into()) + .or_default() + .clone(); + + let mut cache_entry = entry.lock().await; + let cache_path = self.path.join(cache_key.to_string()); + + match validate_package_common( + cache_path, + cache_entry.last_revision, + cache_key.sha256.as_ref(), + Some(fetch), + reporter, + ) + .await + { + Ok(cache_lock) => { + cache_entry.last_revision = Some(cache_lock.revision); + cache_entry.last_sha256 = cache_lock.sha256; + Ok(cache_lock) + } + Err(e) => Err(e), + } + } +} + impl PackageCache { - /// Constructs a new [`PackageCache`] located at the specified path. + /// Constructs a new [`PackageCache`] with only one layer. pub fn new(path: impl Into) -> Self { - Self { - inner: Arc::new(PackageCacheInner { - path: path.into(), - packages: DashMap::default(), - }), - cache_origin: false, - } + Self::new_layered(std::iter::once(path.into()), false) } /// Adds the origin (url or path) to the cache key to avoid unwanted cache hits of packages @@ -122,6 +246,40 @@ impl PackageCache { } } + /// Constructs a new [`PackageCache`] located at the specified paths. + /// Layers are queried in the order they are provided. + /// The first writable layer is written to. + pub fn new_layered(paths: I, cache_origin: bool) -> Self + where + I: IntoIterator, + I::Item: Into, + { + let layers = paths + .into_iter() + .map(|path| PackageCacheLayer { + path: path.into(), + packages: DashMap::default(), + }) + .collect(); + + Self { + inner: Arc::new(PackageCacheInner { layers }), + cache_origin, + } + } + + /// Returns a tuple containing two sets of layers: + /// - A collection of read-only layers. + /// - A collection of writable layers. + /// + /// The permissions are checked at the time of the function call. + pub fn split_layers(&self) -> (Vec<&PackageCacheLayer>, Vec<&PackageCacheLayer>) { + self.inner + .layers + .iter() + .partition(|layer| layer.is_readonly()) + } + /// Returns the directory that contains the specified package. /// /// If the package was previously successfully fetched and stored in the @@ -143,36 +301,46 @@ impl PackageCache { Fut: Future> + Send + 'static, E: std::error::Error + Send + Sync + 'static, { - let cache_key: CacheKey = pkg.into(); - let cache_path = self.inner.path.join(cache_key.to_string()); - let cache_entry = self - .inner - .packages - .entry(cache_key.clone().into()) - .or_default() - .clone(); + let cache_key = pkg.into(); + let (_, writable_layers) = self.split_layers(); - // Acquire the entry. From this point on we can be sure that only one task is - // accessing the cache entry. - let mut cache_entry = cache_entry.lock().await; + for layer in self.inner.layers.iter() { + let cache_path = layer.path.join(cache_key.to_string()); - // Validate the cache entry or fetch the package if it is not valid. - let cache_lock = validate_or_fetch_to_cache( - cache_path, - fetch, - cache_entry.last_revision, - cache_key.sha256.as_ref(), - reporter, - ) - .await?; + if cache_path.exists() { + match layer.try_validate(&cache_key).await { + Ok(lock) => { + return Ok(lock); + } + Err(PackageCacheLayerError::InvalidPackage) => { + // Log and continue to the next layer + tracing::warn!( + "Invalid package in layer at path {:?}, trying next layer.", + layer.path + ); + } + Err(PackageCacheLayerError::PackageNotFound) => { + // Log and continue to the next layer + tracing::debug!( + "Package not found in layer at path {:?}, trying next layer.", + layer.path + ); + } + Err(err) => return Err(err.into()), + } + } + } - // Store the current revision stored in the cache. If any other task tries to - // read the cache and the revision stayed the same, we can assume that the cache - // is still valid. - cache_entry.last_revision = Some(cache_lock.revision); - cache_entry.last_sha256 = cache_lock.sha256; + // No matches in all layers, let's write to the first writable layer + tracing::debug!("no matches in all layers. writing to first writable layer"); + if let Some(layer) = writable_layers.first() { + return match layer.validate_or_fetch(fetch, &cache_key, reporter).await { + Ok(cache_lock) => Ok(cache_lock), + Err(e) => Err(e.into()), + }; + } - Ok(cache_lock) + Err(PackageCacheError::NoWritableLayers) } /// Returns the directory that contains the specified package. @@ -339,15 +507,14 @@ impl PackageCache { } } -/// Validates that the package that is currently stored is a valid package and -/// otherwise calls the `fetch` method to populate the cache. -async fn validate_or_fetch_to_cache( +/// Shared logic for validating a package. +async fn validate_package_common( path: PathBuf, - fetch: F, known_valid_revision: Option, given_sha: Option<&Sha256Hash>, + fetch: Option, reporter: Option>, -) -> Result +) -> Result where F: Fn(PathBuf) -> Fut + Send, Fut: Future> + 'static, @@ -357,8 +524,6 @@ where // currently writing to the cache. let lock_file_path = { // Append the `.lock` extension to the cache path to create the lock file path. - // `Path::with_extension` strips too much from the filename if it contains one - // or more dots. let mut path_str = path.as_os_str().to_owned(); path_str.push(".lock"); PathBuf::from(path_str) @@ -368,15 +533,14 @@ where if let Some(root_dir) = lock_file_path.parent() { tokio_fs::create_dir_all(root_dir) .map_err(|e| { - PackageCacheError::LockError( - format!("failed to create cache directory: '{}", root_dir.display()), + PackageCacheLayerError::LockError( + format!("failed to create cache directory: '{}'", root_dir.display()), e, ) }) .await?; } - // The revision of the cache entry that we already know is valid. let mut validated_revision = known_valid_revision; loop { @@ -445,7 +609,7 @@ where } } } else if !cache_dir_exists { - tracing::debug!("cache directory does not exist, fetching package"); + tracing::debug!("cache directory does not exist"); } else if hash_mismatch { tracing::warn!( "hash mismatch, wanted a package with hash {} but the cached package has hash {}, fetching package", @@ -463,32 +627,35 @@ where // lock and check if the revision has changed. If it has, we assume that // another process has already fetched the package and we restart the // validation process. - drop(read_lock); - - let mut write_lock = CacheRwLock::acquire_write(&lock_file_path).await?; - - let read_revision = write_lock.read_revision()?; - if read_revision != cache_revision { - tracing::debug!( - "cache revisions dont match '{}', retrying to acquire lock file.", - lock_file_path.display() - ); - // The cache has been modified since we last checked. We need to re-validate. - continue; - } + if let Some(ref fetch_fn) = fetch { + drop(read_lock); + + let mut write_lock = CacheRwLock::acquire_write(&lock_file_path).await?; + + let read_revision = write_lock.read_revision()?; + if read_revision != cache_revision { + tracing::debug!( + "cache revisions don't match '{}', retrying to acquire lock file.", + lock_file_path.display() + ); + continue; + } - // Write the new revision - let new_revision = cache_revision + 1; - write_lock - .write_revision_and_sha(new_revision, given_sha) - .await?; + // Write the new revision + let new_revision = cache_revision + 1; + write_lock + .write_revision_and_sha(new_revision, given_sha) + .await?; - // Otherwise, defer to populate method to fill our cache. - fetch(path.clone()) - .await - .map_err(|e| PackageCacheError::FetchError(Arc::new(e)))?; + // Fetch the package. + fetch_fn(path.clone()) + .await + .map_err(|e| PackageCacheLayerError::FetchError(Arc::new(e)))?; - validated_revision = Some(new_revision); + validated_revision = Some(new_revision); + } else { + return Err(PackageCacheLayerError::InvalidPackage); + } } } @@ -530,7 +697,10 @@ mod test { future::IntoFuture, net::SocketAddr, path::{Path, PathBuf}, - sync::{atomic::AtomicBool, Arc}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, }; use assert_matches::assert_matches; @@ -552,14 +722,14 @@ mod test { use reqwest::Client; use reqwest_middleware::ClientBuilder; use reqwest_retry::RetryTransientMiddleware; - use tempfile::tempdir; + use tempfile::{tempdir, TempDir}; use tokio::sync::Mutex; use tokio_stream::StreamExt; use url::Url; use super::PackageCache; use crate::{ - package_cache::CacheKey, + package_cache::{CacheKey, PackageCacheError}, validation::{validate_package_directory, ValidationMode}, }; @@ -915,7 +1085,7 @@ mod test { key.clone(), move |destination| { let tar_archive_path = tar_archive_path.clone(); - cloned.store(true, std::sync::atomic::Ordering::Relaxed); + cloned.store(true, Ordering::Release); async move { rattler_package_streaming::tokio::fs::extract( &tar_archive_path, @@ -930,7 +1100,7 @@ mod test { .await .unwrap(); assert!( - should_run.load(std::sync::atomic::Ordering::Relaxed), + should_run.load(Ordering::Relaxed), "fetch function should run again" ); assert_ne!( @@ -939,4 +1109,243 @@ mod test { "expected sha256 to be different" ); } + + #[derive(Debug)] + pub struct PackageInstallInfo { + pub url: Url, + // is_readonly=true and layer_num=0 means this package will be installed to the first readonly cache layer + pub is_readonly: bool, + pub layer_num: usize, + pub expected_sha: String, + } + + /// A helper function to create a layered cache, and install packages to specific layers + async fn create_layered_cache( + readonly_layer_count: usize, + writable_layer_count: usize, + packages: Vec, // Use the new struct + ) -> (PackageCache, Vec) { + let mut readonly_dirs = Vec::new(); + let mut writable_dirs = Vec::new(); + + for _ in 0..readonly_layer_count { + readonly_dirs.push(tempdir().unwrap()); + } + + for _ in 0..writable_layer_count { + writable_dirs.push(tempdir().unwrap()); + } + + let all_layers_paths: Vec = readonly_dirs + .into_iter() + .chain(writable_dirs.into_iter()) + .collect(); + + let cache = PackageCache::new_layered( + all_layers_paths.iter().map(|dir| dir.path().to_path_buf()), + false, + ); + + let (readonly_layers, writable_layers) = cache.inner.layers.split_at(readonly_layer_count); + + // Install the packages to the appropriate layers + for package in packages { + let layer = if package.is_readonly { + &readonly_layers[package.layer_num] + } else { + &writable_layers[package.layer_num] + }; + let tar_archive_path = + tools::download_and_cache_file_async(package.url, &package.expected_sha) + .await + .unwrap(); + + let key: CacheKey = ArchiveIdentifier::try_from_path(&tar_archive_path) + .unwrap() + .into(); + let key = + key.with_sha256(parse_digest_from_hex::(&package.expected_sha).unwrap()); + + layer + .validate_or_fetch( + move |destination| { + let tar_archive_path = tar_archive_path.clone(); + async move { + rattler_package_streaming::tokio::fs::extract( + &tar_archive_path, + &destination, + ) + .await + .map(|_| ()) + } + }, + &key, + None, + ) + .await + .unwrap(); + } + + for layer in readonly_layers { + #[cfg(unix)] + std::fs::set_permissions( + &layer.path, + std::os::unix::fs::PermissionsExt::from_mode(0o555), // r_x r_x r_x + ) + .unwrap(); + #[cfg(windows)] + { + let mut perms = std::fs::metadata(&layer.path).unwrap().permissions(); + perms.set_readonly(true); // Remove write permissions + std::fs::set_permissions(&layer.path, perms).unwrap(); + } + } + (cache, all_layers_paths) + } + + #[tokio::test] + async fn test_package_only_in_readonly() { + // Create one readonly layer and one writable layer, and install the package to the readonly layer + let url: Url = "https://conda.anaconda.org/robostack/linux-64/ros-noetic-rosbridge-suite-0.11.14-py39h6fdeb60_14.tar.bz2".parse().unwrap(); + let sha = "4dd9893f1eee45e1579d1a4f5533ef67a84b5e4b7515de7ed0db1dd47adc6bc8".to_string(); + let (cache, _dirs) = create_layered_cache( + 1, + 1, + vec![PackageInstallInfo { + url: url.clone(), + is_readonly: true, + layer_num: 0, + expected_sha: sha.clone(), + }], + ) + .await; + + let cache_key = CacheKey::from(ArchiveIdentifier::try_from_url(&url).unwrap()); + let cache_key = cache_key.with_sha256(parse_digest_from_hex::(&sha).unwrap()); + + let should_run = Arc::new(AtomicBool::new(false)); + let cloned = should_run.clone(); + + // Fetch function shouldn't run + cache + .get_or_fetch( + cache_key.clone(), + move |_destination| { + cloned.store(true, Ordering::Relaxed); + async { Ok::<_, PackageCacheError>(()) } + }, + None, + ) + .await + .unwrap(); + + assert!( + !should_run.load(Ordering::Relaxed), + "fetch function should not be run" + ); + } + + #[tokio::test] + async fn test_package_only_in_writable() { + // Create one readonly layer and one writable layer, and install the package to the readonly layer + let url: Url = "https://conda.anaconda.org/robostack/linux-64/ros-noetic-rosbridge-suite-0.11.14-py39h6fdeb60_14.tar.bz2".parse().unwrap(); + let sha = "4dd9893f1eee45e1579d1a4f5533ef67a84b5e4b7515de7ed0db1dd47adc6bc8".to_string(); + let (cache, _dirs) = create_layered_cache( + 1, + 1, + vec![PackageInstallInfo { + url: url.clone(), + is_readonly: false, + layer_num: 0, + expected_sha: sha.clone(), + }], + ) + .await; + + let cache_key = CacheKey::from(ArchiveIdentifier::try_from_url(&url).unwrap()); + let cache_key = cache_key.with_sha256(parse_digest_from_hex::(&sha).unwrap()); + + let should_run = Arc::new(AtomicBool::new(false)); + let cloned = should_run.clone(); + + // Fetch function shouldn't run + cache + .get_or_fetch( + cache_key.clone(), + move |_destination| { + cloned.store(true, Ordering::Relaxed); + async { Ok::<_, PackageCacheError>(()) } + }, + None, + ) + .await + .unwrap(); + + assert!( + !should_run.load(Ordering::Relaxed), + "fetch function should not be run" + ); + } + + #[tokio::test] + async fn test_package_not_in_any_layer() { + // Create one readonly layer and one writable layer, and install a package to the readonly layer + let url: Url = "https://conda.anaconda.org/robostack/linux-64/ros-noetic-rosbridge-suite-0.11.14-py39h6fdeb60_14.tar.bz2".parse().unwrap(); + let sha = "4dd9893f1eee45e1579d1a4f5533ef67a84b5e4b7515de7ed0db1dd47adc6bc8".to_string(); + let (cache, _dirs) = create_layered_cache( + 1, + 1, + vec![PackageInstallInfo { + url: url.clone(), + is_readonly: true, + layer_num: 0, + expected_sha: sha.clone(), + }], + ) + .await; + + // Request a different package, not installed in any layer + let other_url: Url = + "https://conda.anaconda.org/conda-forge/win-64/mamba-1.1.0-py39hb3d9227_2.conda" + .parse() + .unwrap(); + let other_sha = + "c172acdf9cb7655dd224879b30361a657b09bb084b65f151e36a2b51e51a080a".to_string(); + + let cache_key = CacheKey::from(ArchiveIdentifier::try_from_url(&other_url).unwrap()); + let cache_key = cache_key.with_sha256(parse_digest_from_hex::(&other_sha).unwrap()); + + let should_run = Arc::new(AtomicBool::new(false)); + let cloned = should_run.clone(); + + let tar_archive_path = tools::download_and_cache_file_async(other_url, &other_sha) + .await + .unwrap(); + + // The fetch function should run + cache + .get_or_fetch( + cache_key.clone(), + move |destination: PathBuf| { + let tar_archive_path = tar_archive_path.clone(); + cloned.store(true, Ordering::Release); + async move { + rattler_package_streaming::tokio::fs::extract( + &tar_archive_path, + &destination, + ) + .await + .map(|_| ()) + } + }, + None, + ) + .await + .unwrap(); + + assert!( + should_run.load(Ordering::Relaxed), + "fetch function should run again" + ); + } }