diff --git a/crates/resolver-tests/src/lib.rs b/crates/resolver-tests/src/lib.rs index f47017d1d2a..eae6469545d 100644 --- a/crates/resolver-tests/src/lib.rs +++ b/crates/resolver-tests/src/lib.rs @@ -7,6 +7,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::fmt; use std::fmt::Write; use std::rc::Rc; +use std::task::Poll; use std::time::Instant; use cargo::core::dependency::DepKind; @@ -129,14 +130,14 @@ pub fn resolve_with_config_raw( dep: &Dependency, f: &mut dyn FnMut(Summary), fuzzy: bool, - ) -> CargoResult<()> { + ) -> Poll> { for summary in self.list.iter() { if fuzzy || dep.matches(summary) { self.used.insert(summary.package_id()); f(summary.clone()); } } - Ok(()) + Poll::Ready(Ok(())) } fn describe_source(&self, _src: SourceId) -> String { @@ -146,6 +147,10 @@ pub fn resolve_with_config_raw( fn is_replaced(&self, _src: SourceId) -> bool { false } + + fn block_until_ready(&mut self) -> CargoResult<()> { + Ok(()) + } } impl<'a> Drop for MyRegistry<'a> { fn drop(&mut self) { diff --git a/src/cargo/core/compiler/future_incompat.rs b/src/cargo/core/compiler/future_incompat.rs index e13d331e0b8..23538b8656d 100644 --- a/src/cargo/core/compiler/future_incompat.rs +++ b/src/cargo/core/compiler/future_incompat.rs @@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize}; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::fmt::Write as _; use std::io::{Read, Write}; +use std::task::Poll; pub const REPORT_PREAMBLE: &str = "\ The following warnings were discovered during the build. These warnings are an @@ -264,7 +265,7 @@ fn get_updates(ws: &Workspace<'_>, package_ids: &BTreeSet) -> Option< let _lock = ws.config().acquire_package_cache_lock().ok()?; // Create a set of updated registry sources. let map = SourceConfigMap::new(ws.config()).ok()?; - let package_ids: BTreeSet<_> = package_ids + let mut package_ids: BTreeSet<_> = package_ids .iter() .filter(|pkg_id| pkg_id.source_id().is_registry()) .collect(); @@ -279,15 +280,35 @@ fn get_updates(ws: &Workspace<'_>, package_ids: &BTreeSet) -> Option< Some((sid, source)) }) .collect(); - // Query the sources for new versions. + + // Query the sources for new versions, mapping `package_ids` into `summaries`. + let mut summaries = Vec::new(); + while !package_ids.is_empty() { + package_ids.retain(|&pkg_id| { + let source = match sources.get_mut(&pkg_id.source_id()) { + Some(s) => s, + None => return false, + }; + let dep = match Dependency::parse(pkg_id.name(), None, pkg_id.source_id()) { + Ok(dep) => dep, + Err(_) => return false, + }; + match source.query_vec(&dep) { + Poll::Ready(Ok(sum)) => { + summaries.push((pkg_id, sum)); + false + } + Poll::Ready(Err(_)) => false, + Poll::Pending => true, + } + }); + for (_, source) in sources.iter_mut() { + source.block_until_ready().ok()?; + } + } + let mut updates = String::new(); - for pkg_id in package_ids { - let source = match sources.get_mut(&pkg_id.source_id()) { - Some(s) => s, - None => continue, - }; - let dep = Dependency::parse(pkg_id.name(), None, pkg_id.source_id()).ok()?; - let summaries = source.query_vec(&dep).ok()?; + for (pkg_id, summaries) in summaries { let mut updated_versions: Vec<_> = summaries .iter() .map(|summary| summary.version()) diff --git a/src/cargo/core/registry.rs b/src/cargo/core/registry.rs index 47a11796701..fa94ef81552 100644 --- a/src/cargo/core/registry.rs +++ b/src/cargo/core/registry.rs @@ -1,11 +1,12 @@ use std::collections::{HashMap, HashSet}; +use std::task::Poll; use crate::core::PackageSet; use crate::core::{Dependency, PackageId, Source, SourceId, SourceMap, Summary}; use crate::sources::config::SourceConfigMap; use crate::util::errors::CargoResult; use crate::util::interning::InternedString; -use crate::util::{profile, CanonicalUrl, Config}; +use crate::util::{CanonicalUrl, Config}; use anyhow::{bail, Context as _}; use log::{debug, trace}; use url::Url; @@ -20,16 +21,19 @@ pub trait Registry { dep: &Dependency, f: &mut dyn FnMut(Summary), fuzzy: bool, - ) -> CargoResult<()>; + ) -> Poll>; - fn query_vec(&mut self, dep: &Dependency, fuzzy: bool) -> CargoResult> { + fn query_vec(&mut self, dep: &Dependency, fuzzy: bool) -> Poll>> { let mut ret = Vec::new(); - self.query(dep, &mut |s| ret.push(s), fuzzy)?; - Ok(ret) + self.query(dep, &mut |s| ret.push(s), fuzzy) + .map_ok(|()| ret) } fn describe_source(&self, source: SourceId) -> String; fn is_replaced(&self, source: SourceId) -> bool; + + /// Block until all outstanding Poll::Pending requests are Poll::Ready. + fn block_until_ready(&mut self) -> CargoResult<()>; } /// This structure represents a registry of known packages. It internally @@ -176,6 +180,7 @@ impl<'cfg> PackageRegistry<'cfg> { } self.load(namespace, kind)?; + self.block_until_ready()?; Ok(()) } @@ -274,16 +279,18 @@ impl<'cfg> PackageRegistry<'cfg> { // Remember that each dependency listed in `[patch]` has to resolve to // precisely one package, so that's why we're just creating a flat list // of summaries which should be the same length as `deps` above. - let unlocked_summaries = deps - .iter() - .map(|(orig_patch, locked)| { - // Remove double reference in orig_patch. Is there maybe a - // magic pattern that could avoid this? - let orig_patch = *orig_patch; + + let mut deps_remaining: Vec<_> = deps.iter().collect(); + let mut unlocked_summaries = Vec::new(); + while !deps_remaining.is_empty() { + let mut deps_pending = Vec::new(); + for dep_remaining in deps_remaining { + let (orig_patch, locked) = dep_remaining; + // Use the locked patch if it exists, otherwise use the original. let dep = match locked { Some(lock) => &lock.dependency, - None => orig_patch, + None => *orig_patch, }; debug!( "registering a patch for `{}` with `{}`", @@ -314,37 +321,55 @@ impl<'cfg> PackageRegistry<'cfg> { .sources .get_mut(dep.source_id()) .expect("loaded source not present"); - let summaries = source.query_vec(dep)?; - let (summary, should_unlock) = summary_for_patch( - orig_patch, locked, summaries, source, - ) - .with_context(|| { - format!( - "patch for `{}` in `{}` failed to resolve", - orig_patch.package_name(), - url, - ) - })?; + + let summaries = match source.query_vec(dep)? { + Poll::Ready(deps) => deps, + Poll::Pending => { + deps_pending.push(dep_remaining); + continue; + } + }; + + let (summary, should_unlock) = + match summary_for_patch(orig_patch, &locked, summaries, source) { + Poll::Ready(x) => x, + Poll::Pending => { + deps_pending.push(dep_remaining); + continue; + } + } + .with_context(|| { + format!( + "patch for `{}` in `{}` failed to resolve", + orig_patch.package_name(), + url, + ) + }) + .with_context(|| format!("failed to resolve patches for `{}`", url))?; + debug!( "patch summary is {:?} should_unlock={:?}", summary, should_unlock ); if let Some(unlock_id) = should_unlock { - unlock_patches.push((orig_patch.clone(), unlock_id)); + unlock_patches.push(((*orig_patch).clone(), unlock_id)); } if *summary.package_id().source_id().canonical_url() == canonical { - anyhow::bail!( + return Err(anyhow::anyhow!( "patch for `{}` in `{}` points to the same source, but \ - patches must point to different sources", + patches must point to different sources", dep.package_name(), url - ); + )) + .context(format!("failed to resolve patches for `{}`", url)); } - Ok(summary) - }) - .collect::>>() - .with_context(|| format!("failed to resolve patches for `{}`", url))?; + unlocked_summaries.push(summary); + } + + deps_remaining = deps_pending; + self.block_until_ready()?; + } let mut name_and_version = HashSet::new(); for summary in unlocked_summaries.iter() { @@ -422,34 +447,46 @@ impl<'cfg> PackageRegistry<'cfg> { } fn load(&mut self, source_id: SourceId, kind: Kind) -> CargoResult<()> { - (|| { - debug!("loading source {}", source_id); - let source = self.source_config.load(source_id, &self.yanked_whitelist)?; - assert_eq!(source.source_id(), source_id); - - if kind == Kind::Override { - self.overrides.push(source_id); - } - self.add_source(source, kind); + debug!("loading source {}", source_id); + let source = self + .source_config + .load(source_id, &self.yanked_whitelist) + .with_context(|| format!("Unable to update {}", source_id))?; + assert_eq!(source.source_id(), source_id); + + if kind == Kind::Override { + self.overrides.push(source_id); + } + self.add_source(source, kind); - // Ensure the source has fetched all necessary remote data. - let _p = profile::start(format!("updating: {}", source_id)); - self.sources.get_mut(source_id).unwrap().update() - })() - .with_context(|| format!("Unable to update {}", source_id))?; + // If we have an imprecise version then we don't know what we're going + // to look for, so we always attempt to perform an update here. + // + // If we have a precise version, then we'll update lazily during the + // querying phase. Note that precise in this case is only + // `Some("locked")` as other `Some` values indicate a `cargo update + // --precise` request + if source_id.precise() != Some("locked") { + self.sources.get_mut(source_id).unwrap().invalidate_cache(); + } else { + debug!("skipping update due to locked registry"); + } Ok(()) } - fn query_overrides(&mut self, dep: &Dependency) -> CargoResult> { + fn query_overrides(&mut self, dep: &Dependency) -> Poll>> { for &s in self.overrides.iter() { let src = self.sources.get_mut(s).unwrap(); let dep = Dependency::new_override(dep.package_name(), s); - let mut results = src.query_vec(&dep)?; + let mut results = match src.query_vec(&dep) { + Poll::Ready(results) => results?, + Poll::Pending => return Poll::Pending, + }; if !results.is_empty() { - return Ok(Some(results.remove(0))); + return Poll::Ready(Ok(Some(results.remove(0)))); } } - Ok(None) + Poll::Ready(Ok(None)) } /// This function is used to transform a summary to another locked summary @@ -535,11 +572,14 @@ impl<'cfg> Registry for PackageRegistry<'cfg> { dep: &Dependency, f: &mut dyn FnMut(Summary), fuzzy: bool, - ) -> CargoResult<()> { + ) -> Poll> { assert!(self.patches_locked); let (override_summary, n, to_warn) = { // Look for an override and get ready to query the real source. - let override_summary = self.query_overrides(dep)?; + let override_summary = match self.query_overrides(dep) { + Poll::Ready(override_summary) => override_summary?, + Poll::Pending => return Poll::Pending, + }; // Next up on our list of candidates is to check the `[patch]` // section of the manifest. Here we look through all patches @@ -569,7 +609,7 @@ impl<'cfg> Registry for PackageRegistry<'cfg> { Some(summary) => (summary, 1, Some(patch)), None => { f(patch); - return Ok(()); + return Poll::Ready(Ok(())); } } } else { @@ -596,8 +636,10 @@ impl<'cfg> Registry for PackageRegistry<'cfg> { let source = self.sources.get_mut(dep.source_id()); match (override_summary, source) { - (Some(_), None) => anyhow::bail!("override found but no real ones"), - (None, None) => return Ok(()), + (Some(_), None) => { + return Poll::Ready(Err(anyhow::anyhow!("override found but no real ones"))) + } + (None, None) => return Poll::Ready(Ok(())), // If we don't have an override then we just ship // everything upstairs after locking the summary @@ -636,7 +678,9 @@ impl<'cfg> Registry for PackageRegistry<'cfg> { // the summaries it gives us though. (Some(override_summary), Some(source)) => { if !patches.is_empty() { - anyhow::bail!("found patches and a path override") + return Poll::Ready(Err(anyhow::anyhow!( + "found patches and a path override" + ))); } let mut n = 0; let mut to_warn = None; @@ -645,10 +689,13 @@ impl<'cfg> Registry for PackageRegistry<'cfg> { n += 1; to_warn = Some(summary); }; - if fuzzy { - source.fuzzy_query(dep, callback)?; + let pend = if fuzzy { + source.fuzzy_query(dep, callback)? } else { - source.query(dep, callback)?; + source.query(dep, callback)? + }; + if pend.is_pending() { + return Poll::Pending; } } (override_summary, n, to_warn) @@ -658,12 +705,14 @@ impl<'cfg> Registry for PackageRegistry<'cfg> { }; if n > 1 { - anyhow::bail!("found an override with a non-locked list"); + return Poll::Ready(Err(anyhow::anyhow!( + "found an override with a non-locked list" + ))); } else if let Some(summary) = to_warn { self.warn_bad_override(&override_summary, &summary)?; } f(self.lock(override_summary)); - Ok(()) + Poll::Ready(Ok(())) } fn describe_source(&self, id: SourceId) -> String { @@ -679,6 +728,15 @@ impl<'cfg> Registry for PackageRegistry<'cfg> { None => false, } } + + fn block_until_ready(&mut self) -> CargoResult<()> { + for (source_id, source) in self.sources.sources_mut() { + source + .block_until_ready() + .with_context(|| format!("Unable to update {}", source_id))?; + } + Ok(()) + } } fn lock( @@ -795,9 +853,9 @@ fn summary_for_patch( locked: &Option, mut summaries: Vec, source: &mut dyn Source, -) -> CargoResult<(Summary, Option)> { +) -> Poll)>> { if summaries.len() == 1 { - return Ok((summaries.pop().unwrap(), None)); + return Poll::Ready(Ok((summaries.pop().unwrap(), None))); } if summaries.len() > 1 { // TODO: In the future, it might be nice to add all of these @@ -810,7 +868,7 @@ fn summary_for_patch( let mut vers: Vec<_> = summaries.iter().map(|summary| summary.version()).collect(); vers.sort(); let versions: Vec<_> = vers.into_iter().map(|v| v.to_string()).collect(); - anyhow::bail!( + return Poll::Ready(Err(anyhow::anyhow!( "patch for `{}` in `{}` resolved to more than one candidate\n\ Found versions: {}\n\ Update the patch definition to select only one package.\n\ @@ -820,13 +878,17 @@ fn summary_for_patch( orig_patch.source_id(), versions.join(", "), versions.last().unwrap() - ); + ))); } assert!(summaries.is_empty()); // No summaries found, try to help the user figure out what is wrong. if let Some(locked) = locked { // Since the locked patch did not match anything, try the unlocked one. - let orig_matches = source.query_vec(orig_patch).unwrap_or_else(|e| { + let orig_matches = match source.query_vec(orig_patch) { + Poll::Pending => return Poll::Pending, + Poll::Ready(deps) => deps, + } + .unwrap_or_else(|e| { log::warn!( "could not determine unlocked summaries for dep {:?}: {:?}", orig_patch, @@ -834,14 +896,24 @@ fn summary_for_patch( ); Vec::new() }); - let (summary, _) = summary_for_patch(orig_patch, &None, orig_matches, source)?; + + let summary = match summary_for_patch(orig_patch, &None, orig_matches, source) { + Poll::Pending => return Poll::Pending, + Poll::Ready(summary) => summary?, + }; + // The unlocked version found a match. This returns a value to // indicate that this entry should be unlocked. - return Ok((summary, Some(locked.package_id))); + return Poll::Ready(Ok((summary.0, Some(locked.package_id)))); } // Try checking if there are *any* packages that match this by name. let name_only_dep = Dependency::new_override(orig_patch.package_name(), orig_patch.source_id()); - let name_summaries = source.query_vec(&name_only_dep).unwrap_or_else(|e| { + + let name_summaries = match source.query_vec(&name_only_dep) { + Poll::Pending => return Poll::Pending, + Poll::Ready(deps) => deps, + } + .unwrap_or_else(|e| { log::warn!( "failed to do name-only summary query for {:?}: {:?}", name_only_dep, @@ -862,15 +934,15 @@ fn summary_for_patch( format!("versions `{}`", strs.join(", ")) } }; - if found.is_empty() { - anyhow::bail!( + Poll::Ready(Err(if found.is_empty() { + anyhow::anyhow!( "The patch location `{}` does not appear to contain any packages \ matching the name `{}`.", orig_patch.source_id(), orig_patch.package_name() - ); + ) } else { - anyhow::bail!( + anyhow::anyhow!( "The patch location `{}` contains a `{}` package with {}, but the patch \ definition requires `{}`.\n\ Check that the version in the patch location is what you expect, \ @@ -879,6 +951,6 @@ fn summary_for_patch( orig_patch.package_name(), found, orig_patch.version_req() - ); - } + ) + })) } diff --git a/src/cargo/core/resolver/dep_cache.rs b/src/cargo/core/resolver/dep_cache.rs index d3ad787d589..067f8f74f3e 100644 --- a/src/cargo/core/resolver/dep_cache.rs +++ b/src/cargo/core/resolver/dep_cache.rs @@ -24,6 +24,7 @@ use anyhow::Context as _; use log::debug; use std::collections::{BTreeSet, HashMap, HashSet}; use std::rc::Rc; +use std::task::Poll; pub struct RegistryQueryer<'a> { pub registry: &'a mut (dyn Registry + 'a), @@ -34,11 +35,11 @@ pub struct RegistryQueryer<'a> { /// specify minimum dependency versions to be used. minimal_versions: bool, /// a cache of `Candidate`s that fulfil a `Dependency` - registry_cache: HashMap>>, + registry_cache: HashMap>>>, /// a cache of `Dependency`s that are required for a `Summary` summary_cache: HashMap< (Option, Summary, ResolveOpts), - Rc<(HashSet, Rc>)>, + (Rc<(HashSet, Rc>)>, bool), >, /// all the cases we ended up using a supplied replacement used_replacements: HashMap, @@ -62,6 +63,23 @@ impl<'a> RegistryQueryer<'a> { } } + pub fn reset_pending(&mut self) -> bool { + let mut all_ready = true; + self.registry_cache.retain(|_, r| { + if !r.is_ready() { + all_ready = false; + } + r.is_ready() + }); + self.summary_cache.retain(|_, (_, r)| { + if !*r { + all_ready = false; + } + *r + }); + all_ready + } + pub fn used_replacement_for(&self, p: PackageId) -> Option<(PackageId, PackageId)> { self.used_replacements.get(&p).map(|r| (p, r.package_id())) } @@ -76,19 +94,23 @@ impl<'a> RegistryQueryer<'a> { /// any candidates are returned which match an override then the override is /// applied by performing a second query for what the override should /// return. - pub fn query(&mut self, dep: &Dependency) -> CargoResult>> { + pub fn query(&mut self, dep: &Dependency) -> Poll>>> { if let Some(out) = self.registry_cache.get(dep).cloned() { - return Ok(out); + return out.map(Result::Ok); } let mut ret = Vec::new(); - self.registry.query( + let ready = self.registry.query( dep, &mut |s| { ret.push(s); }, false, )?; + if ready.is_pending() { + self.registry_cache.insert(dep.clone(), Poll::Pending); + return Poll::Pending; + } for summary in ret.iter_mut() { let mut potential_matches = self .replacements @@ -105,7 +127,13 @@ impl<'a> RegistryQueryer<'a> { dep.version_req() ); - let mut summaries = self.registry.query_vec(dep, false)?.into_iter(); + let mut summaries = match self.registry.query_vec(dep, false)? { + Poll::Ready(s) => s.into_iter(), + Poll::Pending => { + self.registry_cache.insert(dep.clone(), Poll::Pending); + return Poll::Pending; + } + }; let s = summaries.next().ok_or_else(|| { anyhow::format_err!( "no matching package for override `{}` found\n\ @@ -122,13 +150,13 @@ impl<'a> RegistryQueryer<'a> { .iter() .map(|s| format!(" * {}", s.package_id())) .collect::>(); - anyhow::bail!( + return Poll::Ready(Err(anyhow::anyhow!( "the replacement specification `{}` matched \ multiple packages:\n * {}\n{}", spec, s.package_id(), bullets.join("\n") - ); + ))); } // The dependency should be hard-coded to have the same name and an @@ -147,13 +175,13 @@ impl<'a> RegistryQueryer<'a> { // Make sure no duplicates if let Some(&(ref spec, _)) = potential_matches.next() { - anyhow::bail!( + return Poll::Ready(Err(anyhow::anyhow!( "overlapping replacement specifications found:\n\n \ * {}\n * {}\n\nboth specifications match: {}", matched_spec, spec, summary.package_id() - ); + ))); } for dep in summary.dependencies() { @@ -175,11 +203,11 @@ impl<'a> RegistryQueryer<'a> { }, ); - let out = Rc::new(ret); + let out = Poll::Ready(Rc::new(ret)); self.registry_cache.insert(dep.clone(), out.clone()); - Ok(out) + out.map(Result::Ok) } /// Find out what dependencies will be added by activating `candidate`, @@ -198,9 +226,8 @@ impl<'a> RegistryQueryer<'a> { if let Some(out) = self .summary_cache .get(&(parent, candidate.clone(), opts.clone())) - .cloned() { - return Ok(out); + return Ok(out.0.clone()); } // First, figure out our set of dependencies based on the requested set // of features. This also calculates what features we're going to enable @@ -209,17 +236,24 @@ impl<'a> RegistryQueryer<'a> { // Next, transform all dependencies into a list of possible candidates // which can satisfy that dependency. + let mut all_ready = true; let mut deps = deps .into_iter() - .map(|(dep, features)| { - let candidates = self.query(&dep).with_context(|| { + .filter_map(|(dep, features)| match self.query(&dep) { + Poll::Ready(Ok(candidates)) => Some(Ok((dep, candidates, features))), + Poll::Pending => { + all_ready = false; + // we can ignore Pending deps, resolve will be repeatedly called + // until there are none to ignore + None + } + Poll::Ready(Err(e)) => Some(Err(e).with_context(|| { format!( "failed to get `{}` as a dependency of {}", dep.package_name(), describe_path_in_context(cx, &candidate.package_id()), ) - })?; - Ok((dep, candidates, features)) + })), }) .collect::>>()?; @@ -233,8 +267,10 @@ impl<'a> RegistryQueryer<'a> { // If we succeed we add the result to the cache so we can use it again next time. // We don't cache the failure cases as they don't impl Clone. - self.summary_cache - .insert((parent, candidate.clone(), opts.clone()), out.clone()); + self.summary_cache.insert( + (parent, candidate.clone(), opts.clone()), + (out.clone(), all_ready), + ); Ok(out) } diff --git a/src/cargo/core/resolver/errors.rs b/src/cargo/core/resolver/errors.rs index 5cabd01bae4..89b98a2e8d4 100644 --- a/src/cargo/core/resolver/errors.rs +++ b/src/cargo/core/resolver/errors.rs @@ -1,4 +1,5 @@ use std::fmt; +use std::task::Poll; use crate::core::{Dependency, PackageId, Registry, Summary}; use crate::util::lev_distance::lev_distance; @@ -212,15 +213,23 @@ pub(super) fn activation_error( // give an error message that nothing was found. // // Maybe the user mistyped the ver_req? Like `dep="2"` when `dep="0.2"` - // was meant. So we re-query the registry with `deb="*"` so we can + // was meant. So we re-query the registry with `dep="*"` so we can // list a few versions that were actually found. let all_req = semver::VersionReq::parse("*").unwrap(); let mut new_dep = dep.clone(); new_dep.set_version_req(all_req); - let mut candidates = match registry.query_vec(&new_dep, false) { - Ok(candidates) => candidates, - Err(e) => return to_resolve_err(e), + + let mut candidates = loop { + match registry.query_vec(&new_dep, false) { + Poll::Ready(Ok(candidates)) => break candidates, + Poll::Ready(Err(e)) => return to_resolve_err(e), + Poll::Pending => match registry.block_until_ready() { + Ok(()) => continue, + Err(e) => return to_resolve_err(e), + }, + } }; + candidates.sort_unstable_by(|a, b| b.version().cmp(a.version())); let mut msg = @@ -269,10 +278,17 @@ pub(super) fn activation_error( } else { // Maybe the user mistyped the name? Like `dep-thing` when `Dep_Thing` // was meant. So we try asking the registry for a `fuzzy` search for suggestions. - let mut candidates = Vec::new(); - if let Err(e) = registry.query(&new_dep, &mut |s| candidates.push(s), true) { - return to_resolve_err(e); + let mut candidates = loop { + match registry.query_vec(&new_dep, true) { + Poll::Ready(Ok(candidates)) => break candidates, + Poll::Ready(Err(e)) => return to_resolve_err(e), + Poll::Pending => match registry.block_until_ready() { + Ok(()) => continue, + Err(e) => return to_resolve_err(e), + }, + } }; + candidates.sort_unstable_by_key(|a| a.name()); candidates.dedup_by(|a, b| a.name() == b.name()); let mut candidates: Vec<_> = candidates diff --git a/src/cargo/core/resolver/mod.rs b/src/cargo/core/resolver/mod.rs index 28b328132d2..000ded24652 100644 --- a/src/cargo/core/resolver/mod.rs +++ b/src/cargo/core/resolver/mod.rs @@ -58,6 +58,7 @@ use crate::core::PackageIdSpec; use crate::core::{Dependency, PackageId, Registry, Summary}; use crate::util::config::Config; use crate::util::errors::CargoResult; +use crate::util::network::PollExt; use crate::util::profile; use self::context::Context; @@ -127,7 +128,6 @@ pub fn resolve( config: Option<&Config>, check_public_visible_dependencies: bool, ) -> CargoResult { - let cx = Context::new(check_public_visible_dependencies); let _p = profile::start("resolving"); let minimal_versions = match config { Some(config) => config.cli_unstable().minimal_versions, @@ -135,7 +135,15 @@ pub fn resolve( }; let mut registry = RegistryQueryer::new(registry, replacements, version_prefs, minimal_versions); - let cx = activate_deps_loop(cx, &mut registry, summaries, config)?; + let cx = loop { + let cx = Context::new(check_public_visible_dependencies); + let cx = activate_deps_loop(cx, &mut registry, summaries, config)?; + if registry.reset_pending() { + break cx; + } else { + registry.registry.block_until_ready()?; + } + }; let mut cksums = HashMap::new(); for (summary, _) in cx.activations.values() { @@ -854,6 +862,7 @@ fn generalize_conflicting( if let Some(others) = registry .query(critical_parents_dep) .expect("an already used dep now error!?") + .expect("an already used dep now pending!?") .iter() .rev() // the last one to be tried is the least likely to be in the cache, so start with that. .map(|other| { diff --git a/src/cargo/core/source/mod.rs b/src/cargo/core/source/mod.rs index 81009ea6529..3f31d9e6dab 100644 --- a/src/cargo/core/source/mod.rs +++ b/src/cargo/core/source/mod.rs @@ -1,5 +1,6 @@ use std::collections::hash_map::HashMap; use std::fmt; +use std::task::Poll; use crate::core::package::PackageSet; use crate::core::{Dependency, Package, PackageId, Summary}; @@ -28,23 +29,25 @@ pub trait Source { fn requires_precise(&self) -> bool; /// Attempts to find the packages that match a dependency request. - fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()>; + fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> Poll>; /// Attempts to find the packages that are close to a dependency request. /// Each source gets to define what `close` means for it. /// Path/Git sources may return all dependencies that are at that URI, /// whereas an `Index` source may return dependencies that have the same canonicalization. - fn fuzzy_query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()>; + fn fuzzy_query( + &mut self, + dep: &Dependency, + f: &mut dyn FnMut(Summary), + ) -> Poll>; - fn query_vec(&mut self, dep: &Dependency) -> CargoResult> { + fn query_vec(&mut self, dep: &Dependency) -> Poll>> { let mut ret = Vec::new(); - self.query(dep, &mut |s| ret.push(s))?; - Ok(ret) + self.query(dep, &mut |s| ret.push(s)).map_ok(|_| ret) } - /// Performs any network operations required to get the entire list of all names, - /// versions and dependencies of packages managed by the `Source`. - fn update(&mut self) -> CargoResult<()>; + /// Ensure that the source is fully up-to-date for the current session on the next query. + fn invalidate_cache(&mut self); /// Fetches the full package for each name and version specified. fn download(&mut self, package: PackageId) -> CargoResult; @@ -101,6 +104,15 @@ pub trait Source { /// Query if a package is yanked. Only registry sources can mark packages /// as yanked. This ignores the yanked whitelist. fn is_yanked(&mut self, _pkg: PackageId) -> CargoResult; + + /// Block until all outstanding Poll::Pending requests are `Poll::Ready`. + /// + /// After calling this function, the source should return `Poll::Ready` for + /// any queries that previously returned `Poll::Pending`. + /// + /// If no queries previously returned `Poll::Pending`, and `invalidate_cache` + /// was not called, this function should be a no-op. + fn block_until_ready(&mut self) -> CargoResult<()>; } pub enum MaybePackage { @@ -130,18 +142,21 @@ impl<'a, T: Source + ?Sized + 'a> Source for Box { } /// Forwards to `Source::query`. - fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()> { + fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> Poll> { (**self).query(dep, f) } /// Forwards to `Source::query`. - fn fuzzy_query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()> { + fn fuzzy_query( + &mut self, + dep: &Dependency, + f: &mut dyn FnMut(Summary), + ) -> Poll> { (**self).fuzzy_query(dep, f) } - /// Forwards to `Source::update`. - fn update(&mut self) -> CargoResult<()> { - (**self).update() + fn invalidate_cache(&mut self) { + (**self).invalidate_cache() } /// Forwards to `Source::download`. @@ -178,6 +193,10 @@ impl<'a, T: Source + ?Sized + 'a> Source for Box { fn is_yanked(&mut self, pkg: PackageId) -> CargoResult { (**self).is_yanked(pkg) } + + fn block_until_ready(&mut self) -> CargoResult<()> { + (**self).block_until_ready() + } } impl<'a, T: Source + ?Sized + 'a> Source for &'a mut T { @@ -197,16 +216,20 @@ impl<'a, T: Source + ?Sized + 'a> Source for &'a mut T { (**self).requires_precise() } - fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()> { + fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> Poll> { (**self).query(dep, f) } - fn fuzzy_query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()> { + fn fuzzy_query( + &mut self, + dep: &Dependency, + f: &mut dyn FnMut(Summary), + ) -> Poll> { (**self).fuzzy_query(dep, f) } - fn update(&mut self) -> CargoResult<()> { - (**self).update() + fn invalidate_cache(&mut self) { + (**self).invalidate_cache() } fn download(&mut self, id: PackageId) -> CargoResult { @@ -240,6 +263,10 @@ impl<'a, T: Source + ?Sized + 'a> Source for &'a mut T { fn is_yanked(&mut self, pkg: PackageId) -> CargoResult { (**self).is_yanked(pkg) } + + fn block_until_ready(&mut self) -> CargoResult<()> { + (**self).block_until_ready() + } } /// A `HashMap` of `SourceId` -> `Box`. diff --git a/src/cargo/ops/cargo_package.rs b/src/cargo/ops/cargo_package.rs index ed264079bec..ab1a6180cb3 100644 --- a/src/cargo/ops/cargo_package.rs +++ b/src/cargo/ops/cargo_package.rs @@ -9,7 +9,7 @@ use std::sync::Arc; use crate::core::compiler::{BuildConfig, CompileMode, DefaultExecutor, Executor}; use crate::core::resolver::CliFeatures; use crate::core::{Feature, Shell, Verbosity, Workspace}; -use crate::core::{Package, PackageId, PackageSet, Resolve, Source, SourceId}; +use crate::core::{Package, PackageId, PackageSet, Resolve, SourceId}; use crate::sources::PathSource; use crate::util::errors::CargoResult; use crate::util::toml::TomlManifest; diff --git a/src/cargo/ops/common_for_install_and_uninstall.rs b/src/cargo/ops/common_for_install_and_uninstall.rs index 834137715e1..fd456ae90ad 100644 --- a/src/cargo/ops/common_for_install_and_uninstall.rs +++ b/src/cargo/ops/common_for_install_and_uninstall.rs @@ -4,6 +4,7 @@ use std::io::prelude::*; use std::io::SeekFrom; use std::path::{Path, PathBuf}; use std::rc::Rc; +use std::task::Poll; use anyhow::{bail, format_err, Context as _}; use serde::{Deserialize, Serialize}; @@ -535,10 +536,15 @@ where let _lock = config.acquire_package_cache_lock()?; if needs_update { - source.update()?; + source.invalidate_cache(); } - let deps = source.query_vec(&dep)?; + let deps = loop { + match source.query_vec(&dep)? { + Poll::Ready(deps) => break deps, + Poll::Pending => source.block_until_ready()?, + } + }; match deps.iter().map(|p| p.package_id()).max() { Some(pkgid) => { let pkg = Box::new(source).download_now(pkgid, config)?; @@ -585,7 +591,7 @@ where // with other global Cargos let _lock = config.acquire_package_cache_lock()?; - source.update()?; + source.invalidate_cache(); return if let Some(dep) = dep { select_dep_pkg(source, dep, config, false) diff --git a/src/cargo/ops/registry.rs b/src/cargo/ops/registry.rs index 43dfe4dd605..cf29e1a8a31 100644 --- a/src/cargo/ops/registry.rs +++ b/src/cargo/ops/registry.rs @@ -4,6 +4,7 @@ use std::io::{self, BufRead}; use std::iter::repeat; use std::path::PathBuf; use std::str; +use std::task::Poll; use std::time::Duration; use std::{cmp, env}; @@ -431,17 +432,16 @@ fn registry( let _lock = config.acquire_package_cache_lock()?; let mut src = RegistrySource::remote(sid, &HashSet::new(), config); // Only update the index if the config is not available or `force` is set. - let cfg = src.config(); - let mut updated_cfg = || { - src.update() - .with_context(|| format!("failed to update {}", sid))?; - src.config() - }; - - let cfg = if force_update { - updated_cfg()? - } else { - cfg.or_else(|_| updated_cfg())? + if force_update { + src.invalidate_cache() + } + let cfg = loop { + match src.config()? { + Poll::Pending => src + .block_until_ready() + .with_context(|| format!("failed to update {}", sid))?, + Poll::Ready(cfg) => break cfg, + } }; cfg.and_then(|cfg| cfg.api) diff --git a/src/cargo/ops/resolve.rs b/src/cargo/ops/resolve.rs index 3f96e262934..d4c53910162 100644 --- a/src/cargo/ops/resolve.rs +++ b/src/cargo/ops/resolve.rs @@ -20,9 +20,7 @@ use crate::core::resolver::{ }; use crate::core::summary::Summary; use crate::core::Feature; -use crate::core::{ - GitReference, PackageId, PackageIdSpec, PackageSet, Source, SourceId, Workspace, -}; +use crate::core::{GitReference, PackageId, PackageIdSpec, PackageSet, SourceId, Workspace}; use crate::ops; use crate::sources::PathSource; use crate::util::errors::CargoResult; diff --git a/src/cargo/sources/directory.rs b/src/cargo/sources/directory.rs index 7a00b560f87..20eea5fad85 100644 --- a/src/cargo/sources/directory.rs +++ b/src/cargo/sources/directory.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::fmt::{self, Debug, Formatter}; use std::path::{Path, PathBuf}; +use std::task::Poll; use crate::core::source::MaybePackage; use crate::core::{Dependency, Package, PackageId, Source, SourceId, Summary}; @@ -17,6 +18,7 @@ pub struct DirectorySource<'cfg> { root: PathBuf, packages: HashMap, config: &'cfg Config, + updated: bool, } #[derive(Deserialize)] @@ -32,6 +34,7 @@ impl<'cfg> DirectorySource<'cfg> { root: path.to_path_buf(), config, packages: HashMap::new(), + updated: false, } } } @@ -43,21 +46,31 @@ impl<'cfg> Debug for DirectorySource<'cfg> { } impl<'cfg> Source for DirectorySource<'cfg> { - fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()> { + fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> Poll> { + if !self.updated { + return Poll::Pending; + } let packages = self.packages.values().map(|p| &p.0); let matches = packages.filter(|pkg| dep.matches(pkg.summary())); for summary in matches.map(|pkg| pkg.summary().clone()) { f(summary); } - Ok(()) + Poll::Ready(Ok(())) } - fn fuzzy_query(&mut self, _dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()> { + fn fuzzy_query( + &mut self, + _dep: &Dependency, + f: &mut dyn FnMut(Summary), + ) -> Poll> { + if !self.updated { + return Poll::Pending; + } let packages = self.packages.values().map(|p| &p.0); for summary in packages.map(|pkg| pkg.summary().clone()) { f(summary); } - Ok(()) + Poll::Ready(Ok(())) } fn supports_checksums(&self) -> bool { @@ -72,7 +85,10 @@ impl<'cfg> Source for DirectorySource<'cfg> { self.source_id } - fn update(&mut self) -> CargoResult<()> { + fn block_until_ready(&mut self) -> CargoResult<()> { + if self.updated { + return Ok(()); + } self.packages.clear(); let entries = self.root.read_dir().with_context(|| { format!( @@ -143,6 +159,7 @@ impl<'cfg> Source for DirectorySource<'cfg> { self.packages.insert(pkg.package_id(), (pkg, cksum)); } + self.updated = true; Ok(()) } @@ -205,4 +222,8 @@ impl<'cfg> Source for DirectorySource<'cfg> { fn is_yanked(&mut self, _pkg: PackageId) -> CargoResult { Ok(false) } + + fn invalidate_cache(&mut self) { + // Path source has no local cache. + } } diff --git a/src/cargo/sources/git/source.rs b/src/cargo/sources/git/source.rs index 9d7c42b829f..b166aff340d 100644 --- a/src/cargo/sources/git/source.rs +++ b/src/cargo/sources/git/source.rs @@ -9,6 +9,7 @@ use crate::util::Config; use anyhow::Context; use log::trace; use std::fmt::{self, Debug, Formatter}; +use std::task::Poll; use url::Url; pub struct GitSource<'cfg> { @@ -52,7 +53,8 @@ impl<'cfg> GitSource<'cfg> { pub fn read_packages(&mut self) -> CargoResult> { if self.path_source.is_none() { - self.update()?; + self.invalidate_cache(); + self.block_until_ready()?; } self.path_source.as_mut().unwrap().read_packages() } @@ -83,20 +85,24 @@ impl<'cfg> Debug for GitSource<'cfg> { } impl<'cfg> Source for GitSource<'cfg> { - fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()> { - let src = self - .path_source - .as_mut() - .expect("BUG: `update()` must be called before `query()`"); - src.query(dep, f) + fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> Poll> { + if let Some(src) = self.path_source.as_mut() { + src.query(dep, f) + } else { + Poll::Pending + } } - fn fuzzy_query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()> { - let src = self - .path_source - .as_mut() - .expect("BUG: `update()` must be called before `query()`"); - src.fuzzy_query(dep, f) + fn fuzzy_query( + &mut self, + dep: &Dependency, + f: &mut dyn FnMut(Summary), + ) -> Poll> { + if let Some(src) = self.path_source.as_mut() { + src.fuzzy_query(dep, f) + } else { + Poll::Pending + } } fn supports_checksums(&self) -> bool { @@ -111,7 +117,11 @@ impl<'cfg> Source for GitSource<'cfg> { self.source_id } - fn update(&mut self) -> CargoResult<()> { + fn block_until_ready(&mut self) -> CargoResult<()> { + if self.path_source.is_some() { + return Ok(()); + } + let git_path = self.config.git_path(); let git_path = self.config.assert_package_cache_locked(&git_path); let db_path = git_path.join("db").join(&self.ident); @@ -212,6 +222,8 @@ impl<'cfg> Source for GitSource<'cfg> { fn is_yanked(&mut self, _pkg: PackageId) -> CargoResult { Ok(false) } + + fn invalidate_cache(&mut self) {} } #[cfg(test)] diff --git a/src/cargo/sources/path.rs b/src/cargo/sources/path.rs index cc1c9874179..c0cdd79a9c4 100644 --- a/src/cargo/sources/path.rs +++ b/src/cargo/sources/path.rs @@ -1,6 +1,7 @@ use std::collections::HashSet; use std::fmt::{self, Debug, Formatter}; use std::path::{Path, PathBuf}; +use std::task::Poll; use crate::core::source::MaybePackage; use crate::core::{Dependency, Package, PackageId, Source, SourceId, Summary}; @@ -477,6 +478,16 @@ impl<'cfg> PathSource<'cfg> { pub fn path(&self) -> &Path { &self.path } + + pub fn update(&mut self) -> CargoResult<()> { + if !self.updated { + let packages = self.read_packages()?; + self.packages.extend(packages.into_iter()); + self.updated = true; + } + + Ok(()) + } } impl<'cfg> Debug for PathSource<'cfg> { @@ -486,20 +497,30 @@ impl<'cfg> Debug for PathSource<'cfg> { } impl<'cfg> Source for PathSource<'cfg> { - fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()> { + fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> Poll> { + if !self.updated { + return Poll::Pending; + } for s in self.packages.iter().map(|p| p.summary()) { if dep.matches(s) { f(s.clone()) } } - Ok(()) + Poll::Ready(Ok(())) } - fn fuzzy_query(&mut self, _dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()> { + fn fuzzy_query( + &mut self, + _dep: &Dependency, + f: &mut dyn FnMut(Summary), + ) -> Poll> { + if !self.updated { + return Poll::Pending; + } for s in self.packages.iter().map(|p| p.summary()) { f(s.clone()) } - Ok(()) + Poll::Ready(Ok(())) } fn supports_checksums(&self) -> bool { @@ -514,16 +535,6 @@ impl<'cfg> Source for PathSource<'cfg> { self.source_id } - fn update(&mut self) -> CargoResult<()> { - if !self.updated { - let packages = self.read_packages()?; - self.packages.extend(packages.into_iter()); - self.updated = true; - } - - Ok(()) - } - fn download(&mut self, id: PackageId) -> CargoResult { trace!("getting packages; id={}", id); @@ -558,4 +569,12 @@ impl<'cfg> Source for PathSource<'cfg> { fn is_yanked(&mut self, _pkg: PackageId) -> CargoResult { Ok(false) } + + fn block_until_ready(&mut self) -> CargoResult<()> { + self.update() + } + + fn invalidate_cache(&mut self) { + // Path source has no local cache. + } } diff --git a/src/cargo/sources/registry/index.rs b/src/cargo/sources/registry/index.rs index 1394d9c1ab6..dd6819eef16 100644 --- a/src/cargo/sources/registry/index.rs +++ b/src/cargo/sources/registry/index.rs @@ -80,6 +80,7 @@ use std::convert::TryInto; use std::fs; use std::path::Path; use std::str; +use std::task::Poll; /// Crates.io treats hyphen and underscores as interchangeable, but the index and old Cargo do not. /// Therefore, the index must store uncanonicalized version of the name so old Cargo's can find it. @@ -263,16 +264,18 @@ impl<'cfg> RegistryIndex<'cfg> { } /// Returns the hash listed for a specified `PackageId`. - pub fn hash(&mut self, pkg: PackageId, load: &mut dyn RegistryData) -> CargoResult<&str> { + pub fn hash(&mut self, pkg: PackageId, load: &mut dyn RegistryData) -> Poll> { let req = OptVersionReq::exact(pkg.version()); - let summary = self - .summaries(pkg.name(), &req, load)? - .next() - .ok_or_else(|| internal(format!("no hash listed for {}", pkg)))?; - summary + let summary = self.summaries(pkg.name(), &req, load)?; + let summary = match summary { + Poll::Ready(mut summary) => summary.next(), + Poll::Pending => return Poll::Pending, + }; + Poll::Ready(Ok(summary + .ok_or_else(|| internal(format!("no hash listed for {}", pkg)))? .summary .checksum() - .ok_or_else(|| internal(format!("no hash listed for {}", pkg))) + .ok_or_else(|| internal(format!("no hash listed for {}", pkg)))?)) } /// Load a list of summaries for `name` package in this registry which @@ -287,7 +290,7 @@ impl<'cfg> RegistryIndex<'cfg> { name: InternedString, req: &'b OptVersionReq, load: &mut dyn RegistryData, - ) -> CargoResult + 'b> + ) -> Poll + 'b>> where 'a: 'b, { @@ -298,7 +301,10 @@ impl<'cfg> RegistryIndex<'cfg> { // has run previously this will parse a Cargo-specific cache file rather // than the registry itself. In effect this is intended to be a quite // cheap operation. - let summaries = self.load_summaries(name, load)?; + let summaries = match self.load_summaries(name, load)? { + Poll::Ready(summaries) => summaries, + Poll::Pending => return Poll::Pending, + }; // Iterate over our summaries, extract all relevant ones which match our // version requirement, and then parse all corresponding rows in the @@ -307,7 +313,7 @@ impl<'cfg> RegistryIndex<'cfg> { // minimize the amount of work being done here and parse as little as // necessary. let raw_data = &summaries.raw_data; - Ok(summaries + Poll::Ready(Ok(summaries .versions .iter_mut() .filter_map(move |(k, v)| if req.matches(k) { Some(v) } else { None }) @@ -332,25 +338,24 @@ impl<'cfg> RegistryIndex<'cfg> { } else { true } - })) + }))) } fn load_summaries( &mut self, name: InternedString, load: &mut dyn RegistryData, - ) -> CargoResult<&mut Summaries> { + ) -> Poll> { // If we've previously loaded what versions are present for `name`, just // return that since our cache should still be valid. if self.summaries_cache.contains_key(&name) { - return Ok(self.summaries_cache.get_mut(&name).unwrap()); + return Poll::Ready(Ok(self.summaries_cache.get_mut(&name).unwrap())); } // Prepare the `RegistryData` which will lazily initialize internal data // structures. load.prepare()?; - // let root = self.config.assert_package_cache_locked(&self.path); let root = load.assert_index_locked(&self.path); let cache_root = root.join(".cache"); let index_version = load.current_version(); @@ -363,12 +368,13 @@ impl<'cfg> RegistryIndex<'cfg> { .collect::(); let raw_path = make_dep_path(&fs_name, false); + let mut any_pending = false; // Attempt to handle misspellings by searching for a chain of related // names to the original `raw_path` name. Only return summaries // associated with the first hit, however. The resolver will later // reject any candidates that have the wrong name, and with this it'll // along the way produce helpful "did you mean?" suggestions. - for path in UncanonicalizedIter::new(&raw_path).take(1024) { + for (i, path) in UncanonicalizedIter::new(&raw_path).take(1024).enumerate() { let summaries = Summaries::parse( index_version.as_deref(), root, @@ -378,16 +384,35 @@ impl<'cfg> RegistryIndex<'cfg> { load, self.config, )?; - if let Some(summaries) = summaries { + if summaries.is_pending() { + if i == 0 { + // If we have not herd back about the name as requested + // then don't ask about other spellings yet. + // This prevents us spamming all the variations in the + // case where we have the correct spelling. + return Poll::Pending; + } + any_pending = true; + } + if let Poll::Ready(Some(summaries)) = summaries { self.summaries_cache.insert(name, summaries); - return Ok(self.summaries_cache.get_mut(&name).unwrap()); + return Poll::Ready(Ok(self.summaries_cache.get_mut(&name).unwrap())); } } + if any_pending { + return Poll::Pending; + } + // If nothing was found then this crate doesn't exists, so just use an // empty `Summaries` list. self.summaries_cache.insert(name, Summaries::default()); - Ok(self.summaries_cache.get_mut(&name).unwrap()) + Poll::Ready(Ok(self.summaries_cache.get_mut(&name).unwrap())) + } + + /// Clears the in-memory summmaries cache. + pub fn clear_summaries_cache(&mut self) { + self.summaries_cache.clear(); } pub fn query_inner( @@ -396,11 +421,12 @@ impl<'cfg> RegistryIndex<'cfg> { load: &mut dyn RegistryData, yanked_whitelist: &HashSet, f: &mut dyn FnMut(Summary), - ) -> CargoResult<()> { + ) -> Poll> { if self.config.offline() - && self.query_inner_with_online(dep, load, yanked_whitelist, f, false)? != 0 + && self.query_inner_with_online(dep, load, yanked_whitelist, f, false)? + != Poll::Ready(0) { - return Ok(()); + return Poll::Ready(Ok(())); // If offline, and there are no matches, try again with online. // This is necessary for dependencies that are not used (such as // target-cfg or optional), but are not downloaded. Normally the @@ -410,8 +436,8 @@ impl<'cfg> RegistryIndex<'cfg> { // indicating that the required dependency is unavailable while // offline will be displayed. } - self.query_inner_with_online(dep, load, yanked_whitelist, f, true)?; - Ok(()) + self.query_inner_with_online(dep, load, yanked_whitelist, f, true) + .map_ok(|_| ()) } fn query_inner_with_online( @@ -421,10 +447,15 @@ impl<'cfg> RegistryIndex<'cfg> { yanked_whitelist: &HashSet, f: &mut dyn FnMut(Summary), online: bool, - ) -> CargoResult { + ) -> Poll> { let source_id = self.source_id; - let summaries = self - .summaries(dep.package_name(), dep.version_req(), load)? + + let summaries = match self.summaries(dep.package_name(), dep.version_req(), load)? { + Poll::Ready(summaries) => summaries, + Poll::Pending => return Poll::Pending, + }; + + let summaries = summaries // First filter summaries for `--offline`. If we're online then // everything is a candidate, otherwise if we're offline we're only // going to consider candidates which are actually present on disk. @@ -489,15 +520,19 @@ impl<'cfg> RegistryIndex<'cfg> { f(summary); count += 1; } - Ok(count) + Poll::Ready(Ok(count)) } - pub fn is_yanked(&mut self, pkg: PackageId, load: &mut dyn RegistryData) -> CargoResult { + pub fn is_yanked( + &mut self, + pkg: PackageId, + load: &mut dyn RegistryData, + ) -> Poll> { let req = OptVersionReq::exact(pkg.version()); let found = self - .summaries(pkg.name(), &req, load)? - .any(|summary| summary.yanked); - Ok(found) + .summaries(pkg.name(), &req, load) + .map_ok(|mut p| p.any(|summary| summary.yanked)); + found } } @@ -531,7 +566,7 @@ impl Summaries { source_id: SourceId, load: &mut dyn RegistryData, config: &Config, - ) -> CargoResult> { + ) -> Poll>> { // First up, attempt to load the cache. This could fail for all manner // of reasons, but consider all of them non-fatal and just log their // occurrence in case anyone is debugging anything. @@ -545,7 +580,7 @@ impl Summaries { if cfg!(debug_assertions) { cache_contents = Some(s.raw_data); } else { - return Ok(Some(s)); + return Poll::Ready(Ok(Some(s))); } } Err(e) => { @@ -556,14 +591,14 @@ impl Summaries { } } - // This is the fallback path where we actually talk to libgit2 to load + // This is the fallback path where we actually talk to the registry backend to load // information. Here we parse every single line in the index (as we need // to find the versions) log::debug!("slow path for {:?}", relative); let mut ret = Summaries::default(); let mut hit_closure = false; let mut cache_bytes = None; - let err = load.load(root, relative, &mut |contents| { + let result = load.load(root, relative, &mut |contents| { ret.raw_data = contents.to_vec(); let mut cache = SummariesCache::default(); hit_closure = true; @@ -598,14 +633,15 @@ impl Summaries { Ok(()) }); - // We ignore lookup failures as those are just crates which don't exist - // or we haven't updated the registry yet. If we actually ran the - // closure though then we care about those errors. + if result?.is_pending() { + assert!(!hit_closure); + return Poll::Pending; + } + if !hit_closure { debug_assert!(cache_contents.is_none()); - return Ok(None); + return Poll::Ready(Ok(None)); } - err?; // If we've got debug assertions enabled and the cache was previously // present and considered fresh this is where the debug assertions @@ -636,7 +672,7 @@ impl Summaries { } } - Ok(Some(ret)) + Poll::Ready(Ok(Some(ret))) } /// Parses an open `File` which represents information previously cached by diff --git a/src/cargo/sources/registry/local.rs b/src/cargo/sources/registry/local.rs index cccc553ee9e..4f813a7b412 100644 --- a/src/cargo/sources/registry/local.rs +++ b/src/cargo/sources/registry/local.rs @@ -8,6 +8,7 @@ use std::fs::File; use std::io::prelude::*; use std::io::SeekFrom; use std::path::Path; +use std::task::Poll; /// A local registry is a registry that lives on the filesystem as a set of /// `.crate` files with an `index` directory in the same format as a remote @@ -17,6 +18,7 @@ pub struct LocalRegistry<'cfg> { root: Filesystem, src_path: Filesystem, config: &'cfg Config, + updated: bool, } impl<'cfg> LocalRegistry<'cfg> { @@ -26,6 +28,7 @@ impl<'cfg> LocalRegistry<'cfg> { index_path: Filesystem::new(root.join("index")), root: Filesystem::new(root.to_path_buf()), config, + updated: false, } } } @@ -54,34 +57,50 @@ impl<'cfg> RegistryData for LocalRegistry<'cfg> { root: &Path, path: &Path, data: &mut dyn FnMut(&[u8]) -> CargoResult<()>, - ) -> CargoResult<()> { - data(&paths::read_bytes(&root.join(path))?) + ) -> Poll> { + if self.updated { + Poll::Ready(Ok(data(&paths::read_bytes(&root.join(path))?)?)) + } else { + Poll::Pending + } } - fn config(&mut self) -> CargoResult> { + fn config(&mut self) -> Poll>> { // Local registries don't have configuration for remote APIs or anything // like that - Ok(None) + Poll::Ready(Ok(None)) } - fn update_index(&mut self) -> CargoResult<()> { + fn block_until_ready(&mut self) -> CargoResult<()> { + if self.updated { + return Ok(()); + } // Nothing to update, we just use what's on disk. Verify it actually // exists though. We don't use any locks as we're just checking whether // these directories exist. let root = self.root.clone().into_path_unlocked(); if !root.is_dir() { - anyhow::bail!("local registry path is not a directory: {}", root.display()) + anyhow::bail!("local registry path is not a directory: {}", root.display()); } let index_path = self.index_path.clone().into_path_unlocked(); if !index_path.is_dir() { anyhow::bail!( "local registry index path is not a directory: {}", index_path.display() - ) + ); } + self.updated = true; Ok(()) } + fn invalidate_cache(&mut self) { + // Local registry has no cache - just reads from disk. + } + + fn is_updated(&self) -> bool { + self.updated + } + fn download(&mut self, pkg: PackageId, checksum: &str) -> CargoResult { let crate_file = format!("{}-{}.crate", pkg.name(), pkg.version()); diff --git a/src/cargo/sources/registry/mod.rs b/src/cargo/sources/registry/mod.rs index d9df11bbfd2..902c9349649 100644 --- a/src/cargo/sources/registry/mod.rs +++ b/src/cargo/sources/registry/mod.rs @@ -164,6 +164,7 @@ use std::collections::HashSet; use std::fs::{File, OpenOptions}; use std::io::Write; use std::path::{Path, PathBuf}; +use std::task::Poll; use anyhow::Context as _; use flate2::read::GzDecoder; @@ -179,6 +180,7 @@ use crate::sources::PathSource; use crate::util::hex; use crate::util::interning::InternedString; use crate::util::into_url::IntoUrl; +use crate::util::network::PollExt; use crate::util::{restricted_names, CargoResult, Config, Filesystem, OptVersionReq}; const PACKAGE_SOURCE_LOCK: &str = ".cargo-ok"; @@ -203,15 +205,6 @@ pub struct RegistrySource<'cfg> { src_path: Filesystem, /// Local reference to [`Config`] for convenience. config: &'cfg Config, - /// Whether or not the index has been updated. - /// - /// This is used as an optimization to avoid updating if not needed, such - /// as `Cargo.lock` already exists and the index already contains the - /// locked entries. Or, to avoid updating multiple times. - /// - /// Only remote registries really need to update. Local registries only - /// check that the index exists. - updated: bool, /// Abstraction for interfacing to the different registry kinds. ops: Box, /// Interface for managing the on-disk index. @@ -440,23 +433,25 @@ pub trait RegistryData { /// * `root` is the root path to the index. /// * `path` is the relative path to the package to load (like `ca/rg/cargo`). /// * `data` is a callback that will receive the raw bytes of the index JSON file. + /// + /// If `load` returns a `Poll::Pending` then it must not have called data. fn load( &self, root: &Path, path: &Path, data: &mut dyn FnMut(&[u8]) -> CargoResult<()>, - ) -> CargoResult<()>; + ) -> Poll>; /// Loads the `config.json` file and returns it. /// /// Local registries don't have a config, and return `None`. - fn config(&mut self) -> CargoResult>; + fn config(&mut self) -> Poll>>; - /// Updates the index. - /// - /// For a remote registry, this updates the index over the network. Local - /// registries only check that the index exists. - fn update_index(&mut self) -> CargoResult<()>; + /// Invalidates locally cached data. + fn invalidate_cache(&mut self); + + /// Is the local cached data up-to-date? + fn is_updated(&self) -> bool; /// Prepare to start downloading a `.crate` file. /// @@ -508,6 +503,9 @@ pub trait RegistryData { /// /// This is used by index caching to check if the cache is out of date. fn current_version(&self) -> Option; + + /// Block until all outstanding Poll::Pending requests are Poll::Ready. + fn block_until_ready(&mut self) -> CargoResult<()>; } /// The status of [`RegistryData::download`] which indicates if a `.crate` @@ -566,7 +564,6 @@ impl<'cfg> RegistrySource<'cfg> { src_path: config.registry_source_path().join(name), config, source_id, - updated: false, index: index::RegistryIndex::new(source_id, ops.index_path(), config), yanked_whitelist: yanked_whitelist.clone(), ops, @@ -576,7 +573,7 @@ impl<'cfg> RegistrySource<'cfg> { /// Decode the configuration stored within the registry. /// /// This requires that the index has been at least checked out. - pub fn config(&mut self) -> CargoResult> { + pub fn config(&mut self) -> Poll>> { self.ops.config() } @@ -653,14 +650,6 @@ impl<'cfg> RegistrySource<'cfg> { Ok(unpack_dir.to_path_buf()) } - fn do_update(&mut self) -> CargoResult<()> { - self.ops.update_index()?; - let path = self.ops.index_path(); - self.index = index::RegistryIndex::new(self.source_id, path, self.config); - self.updated = true; - Ok(()) - } - fn get_pkg(&mut self, package: PackageId, path: &File) -> CargoResult { let path = self .unpack_package(package, path) @@ -678,6 +667,7 @@ impl<'cfg> RegistrySource<'cfg> { let summary_with_cksum = self .index .summaries(package.name(), &req, &mut *self.ops)? + .expect("a downloaded dep now pending!?") .map(|s| s.summary.clone()) .next() .expect("summary not found"); @@ -692,26 +682,31 @@ impl<'cfg> RegistrySource<'cfg> { } impl<'cfg> Source for RegistrySource<'cfg> { - fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()> { + fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> Poll> { // If this is a precise dependency, then it came from a lock file and in // theory the registry is known to contain this version. If, however, we // come back with no summaries, then our registry may need to be // updated, so we fall back to performing a lazy update. - if dep.source_id().precise().is_some() && !self.updated { + if dep.source_id().precise().is_some() && !self.ops.is_updated() { debug!("attempting query without update"); let mut called = false; - self.index - .query_inner(dep, &mut *self.ops, &self.yanked_whitelist, &mut |s| { - if dep.matches(&s) { - called = true; - f(s); - } - })?; + let pend = + self.index + .query_inner(dep, &mut *self.ops, &self.yanked_whitelist, &mut |s| { + if dep.matches(&s) { + called = true; + f(s); + } + })?; + if pend.is_pending() { + return Poll::Pending; + } if called { - return Ok(()); + return Poll::Ready(Ok(())); } else { debug!("falling back to an update"); - self.do_update()?; + self.invalidate_cache(); + return Poll::Pending; } } @@ -723,7 +718,11 @@ impl<'cfg> Source for RegistrySource<'cfg> { }) } - fn fuzzy_query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()> { + fn fuzzy_query( + &mut self, + dep: &Dependency, + f: &mut dyn FnMut(Summary), + ) -> Poll> { self.index .query_inner(dep, &mut *self.ops, &self.yanked_whitelist, f) } @@ -740,24 +739,16 @@ impl<'cfg> Source for RegistrySource<'cfg> { self.source_id } - fn update(&mut self) -> CargoResult<()> { - // If we have an imprecise version then we don't know what we're going - // to look for, so we always attempt to perform an update here. - // - // If we have a precise version, then we'll update lazily during the - // querying phase. Note that precise in this case is only - // `Some("locked")` as other `Some` values indicate a `cargo update - // --precise` request - if self.source_id.precise() != Some("locked") { - self.do_update()?; - } else { - debug!("skipping update due to locked registry"); - } - Ok(()) + fn invalidate_cache(&mut self) { + self.index.clear_summaries_cache(); + self.ops.invalidate_cache(); } fn download(&mut self, package: PackageId) -> CargoResult { - let hash = self.index.hash(package, &mut *self.ops)?; + let hash = self + .index + .hash(package, &mut *self.ops)? + .expect("we got to downloading a dep while pending!?"); match self.ops.download(package, hash)? { MaybeLock::Ready(file) => self.get_pkg(package, &file).map(MaybePackage::Ready), MaybeLock::Download { url, descriptor } => { @@ -767,7 +758,10 @@ impl<'cfg> Source for RegistrySource<'cfg> { } fn finish_download(&mut self, package: PackageId, data: Vec) -> CargoResult { - let hash = self.index.hash(package, &mut *self.ops)?; + let hash = self + .index + .hash(package, &mut *self.ops)? + .expect("we got to downloading a dep while pending!?"); let file = self.ops.finish_download(package, hash, &data)?; self.get_pkg(package, &file) } @@ -785,9 +779,16 @@ impl<'cfg> Source for RegistrySource<'cfg> { } fn is_yanked(&mut self, pkg: PackageId) -> CargoResult { - if !self.updated { - self.do_update()?; + self.invalidate_cache(); + loop { + match self.index.is_yanked(pkg, &mut *self.ops)? { + Poll::Ready(yanked) => return Ok(yanked), + Poll::Pending => self.block_until_ready()?, + } } - self.index.is_yanked(pkg, &mut *self.ops) + } + + fn block_until_ready(&mut self) -> CargoResult<()> { + self.ops.block_until_ready() } } diff --git a/src/cargo/sources/registry/remote.rs b/src/cargo/sources/registry/remote.rs index f3bc0edb53a..190b4695623 100644 --- a/src/cargo/sources/registry/remote.rs +++ b/src/cargo/sources/registry/remote.rs @@ -20,6 +20,7 @@ use std::io::SeekFrom; use std::mem; use std::path::Path; use std::str; +use std::task::Poll; /// A remote registry is a registry that lives at a remote URL (such as /// crates.io). The git index is cloned locally, and `.crate` files are @@ -35,6 +36,8 @@ pub struct RemoteRegistry<'cfg> { repo: LazyCell, head: Cell>, current_sha: Cell>, + needs_update: Cell, // Does this registry need to be updated? + updated: bool, // Has this registry been updated this session? } impl<'cfg> RemoteRegistry<'cfg> { @@ -50,6 +53,8 @@ impl<'cfg> RemoteRegistry<'cfg> { repo: LazyCell::new(), head: Cell::new(None), current_sha: Cell::new(None), + needs_update: Cell::new(false), + updated: false, } } @@ -168,35 +173,76 @@ impl<'cfg> RegistryData for RemoteRegistry<'cfg> { _root: &Path, path: &Path, data: &mut dyn FnMut(&[u8]) -> CargoResult<()>, - ) -> CargoResult<()> { + ) -> Poll> { + if self.needs_update.get() { + return Poll::Pending; + } + // Note that the index calls this method and the filesystem is locked // in the index, so we don't need to worry about an `update_index` // happening in a different process. - let repo = self.repo()?; - let tree = self.tree()?; - let entry = tree.get_path(path)?; - let object = entry.to_object(repo)?; - let blob = match object.as_blob() { - Some(blob) => blob, - None => anyhow::bail!("path `{}` is not a blob in the git repo", path.display()), - }; - data(blob.content()) + fn load_helper( + registry: &RemoteRegistry<'_>, + path: &Path, + data: &mut dyn FnMut(&[u8]) -> CargoResult<()>, + ) -> CargoResult> { + let repo = registry.repo()?; + let tree = registry.tree()?; + let entry = tree.get_path(path); + let entry = entry?; + let object = entry.to_object(repo)?; + let blob = match object.as_blob() { + Some(blob) => blob, + None => anyhow::bail!("path `{}` is not a blob in the git repo", path.display()), + }; + Ok(data(blob.content())) + } + + match load_helper(&self, path, data) { + Ok(result) => Poll::Ready(result), + Err(_) if !self.updated => { + // If git returns an error and we haven't updated the repo, return + // pending to allow an update to try again. + self.needs_update.set(true); + Poll::Pending + } + Err(e) + if e.downcast_ref::() + .map(|e| e.code() == git2::ErrorCode::NotFound) + .unwrap_or_default() => + { + // The repo has been updated and the file does not exist. + Poll::Ready(Ok(())) + } + Err(e) => Poll::Ready(Err(e)), + } } - fn config(&mut self) -> CargoResult> { + fn config(&mut self) -> Poll>> { debug!("loading config"); self.prepare()?; self.config.assert_package_cache_locked(&self.index_path); let mut config = None; - self.load(Path::new(""), Path::new("config.json"), &mut |json| { + match self.load(Path::new(""), Path::new("config.json"), &mut |json| { config = Some(serde_json::from_slice(json)?); Ok(()) - })?; - trace!("config loaded"); - Ok(config) + })? { + Poll::Ready(()) => { + trace!("config loaded"); + Poll::Ready(Ok(config)) + } + Poll::Pending => Poll::Pending, + } } - fn update_index(&mut self) -> CargoResult<()> { + fn block_until_ready(&mut self) -> CargoResult<()> { + if !self.needs_update.get() { + return Ok(()); + } + + self.updated = true; + self.needs_update.set(false); + if self.config.offline() { return Ok(()); } @@ -244,6 +290,16 @@ impl<'cfg> RegistryData for RemoteRegistry<'cfg> { Ok(()) } + fn invalidate_cache(&mut self) { + if !self.updated { + self.needs_update.set(true); + } + } + + fn is_updated(&self) -> bool { + self.updated + } + fn download(&mut self, pkg: PackageId, checksum: &str) -> CargoResult { let filename = self.filename(pkg); @@ -262,7 +318,13 @@ impl<'cfg> RegistryData for RemoteRegistry<'cfg> { } } - let config = self.config()?.unwrap(); + let config = loop { + match self.config()? { + Poll::Pending => self.block_until_ready()?, + Poll::Ready(cfg) => break cfg.unwrap(), + } + }; + let mut url = config.dl; if !url.contains(CRATE_TEMPLATE) && !url.contains(VERSION_TEMPLATE) diff --git a/src/cargo/sources/replaced.rs b/src/cargo/sources/replaced.rs index 468df095cd6..29b0b6dfdc5 100644 --- a/src/cargo/sources/replaced.rs +++ b/src/cargo/sources/replaced.rs @@ -1,6 +1,7 @@ use crate::core::source::MaybePackage; use crate::core::{Dependency, Package, PackageId, Source, SourceId, Summary}; use crate::util::errors::CargoResult; +use std::task::Poll; use anyhow::Context as _; @@ -41,7 +42,7 @@ impl<'cfg> Source for ReplacedSource<'cfg> { self.inner.requires_precise() } - fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()> { + fn query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> Poll> { let (replace_with, to_replace) = (self.replace_with, self.to_replace); let dep = dep.clone().map_source(to_replace, replace_with); @@ -49,11 +50,19 @@ impl<'cfg> Source for ReplacedSource<'cfg> { .query(&dep, &mut |summary| { f(summary.map_source(replace_with, to_replace)) }) - .with_context(|| format!("failed to query replaced source {}", self.to_replace))?; - Ok(()) + .map_err(|e| { + e.context(format!( + "failed to query replaced source {}", + self.to_replace + )) + }) } - fn fuzzy_query(&mut self, dep: &Dependency, f: &mut dyn FnMut(Summary)) -> CargoResult<()> { + fn fuzzy_query( + &mut self, + dep: &Dependency, + f: &mut dyn FnMut(Summary), + ) -> Poll> { let (replace_with, to_replace) = (self.replace_with, self.to_replace); let dep = dep.clone().map_source(to_replace, replace_with); @@ -61,15 +70,16 @@ impl<'cfg> Source for ReplacedSource<'cfg> { .fuzzy_query(&dep, &mut |summary| { f(summary.map_source(replace_with, to_replace)) }) - .with_context(|| format!("failed to query replaced source {}", self.to_replace))?; - Ok(()) + .map_err(|e| { + e.context(format!( + "failed to query replaced source {}", + self.to_replace + )) + }) } - fn update(&mut self) -> CargoResult<()> { - self.inner - .update() - .with_context(|| format!("failed to update replaced source {}", self.to_replace))?; - Ok(()) + fn invalidate_cache(&mut self) { + self.inner.invalidate_cache() } fn download(&mut self, id: PackageId) -> CargoResult { @@ -127,4 +137,10 @@ impl<'cfg> Source for ReplacedSource<'cfg> { fn is_yanked(&mut self, pkg: PackageId) -> CargoResult { self.inner.is_yanked(pkg) } + + fn block_until_ready(&mut self) -> CargoResult<()> { + self.inner + .block_until_ready() + .with_context(|| format!("failed to update replaced source {}", self.to_replace)) + } } diff --git a/src/cargo/util/network.rs b/src/cargo/util/network.rs index 2a590bc1364..07c7ceae1ab 100644 --- a/src/cargo/util/network.rs +++ b/src/cargo/util/network.rs @@ -2,6 +2,21 @@ use anyhow::Error; use crate::util::errors::{CargoResult, HttpNot200}; use crate::util::Config; +use std::task::Poll; + +pub trait PollExt { + fn expect(self, msg: &str) -> T; +} + +impl PollExt for Poll { + #[track_caller] + fn expect(self, msg: &str) -> T { + match self { + Poll::Ready(val) => val, + Poll::Pending => panic!("{}", msg), + } + } +} pub struct Retry<'a> { config: &'a Config, diff --git a/tests/testsuite/search.rs b/tests/testsuite/search.rs index eadc46c8b91..0b5228b97b1 100644 --- a/tests/testsuite/search.rs +++ b/tests/testsuite/search.rs @@ -151,7 +151,8 @@ fn not_update() { ); let lock = cfg.acquire_package_cache_lock().unwrap(); let mut regsrc = RegistrySource::remote(sid, &HashSet::new(), &cfg); - regsrc.update().unwrap(); + regsrc.invalidate_cache(); + regsrc.block_until_ready().unwrap(); drop(lock); cargo_process("search postgres")