Skip to content

Commit 27dc8ad

Browse files
Better manage parallel prime caches
To make best use of available cores, and don't waste time waiting for other tasks. See the comments in the code for explanation.
1 parent d7e977a commit 27dc8ad

File tree

2 files changed

+167
-245
lines changed

2 files changed

+167
-245
lines changed

crates/ide-db/src/prime_caches.rs

+167-141
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,10 @@
22
//! sometimes is counter productive when, for example, the first goto definition
33
//! request takes longer to compute. This module implements prepopulation of
44
//! various caches, it's not really advanced at the moment.
5-
mod topologic_sort;
6-
7-
use std::time::Duration;
5+
use std::panic::AssertUnwindSafe;
86

97
use hir::{Symbol, db::DefDatabase};
10-
use itertools::Itertools;
8+
use rustc_hash::FxHashMap;
119
use salsa::{Cancelled, Database};
1210

1311
use crate::{
@@ -35,58 +33,112 @@ pub fn parallel_prime_caches(
3533
) {
3634
let _p = tracing::info_span!("parallel_prime_caches").entered();
3735

38-
let mut crates_to_prime = {
39-
// FIXME: We already have the crate list topologically sorted (but without the things
40-
// `TopologicalSortIter` gives us). Maybe there is a way to avoid using it and rip it out
41-
// of the codebase?
42-
let mut builder = topologic_sort::TopologicalSortIter::builder();
43-
44-
for &crate_id in db.all_crates().iter() {
45-
builder.add(crate_id, crate_id.data(db).dependencies.iter().map(|d| d.crate_id));
46-
}
47-
48-
builder.build()
49-
};
50-
5136
enum ParallelPrimeCacheWorkerProgress {
52-
BeginCrate { crate_id: Crate, crate_name: Symbol },
53-
EndCrate { crate_id: Crate },
37+
BeginCrateDefMap { crate_id: Crate, crate_name: Symbol },
38+
EndCrateDefMap { crate_id: Crate },
39+
EndCrateImportMap,
40+
EndModuleSymbols,
5441
Cancelled(Cancelled),
5542
}
5643

57-
// We split off def map computation from other work,
58-
// as the def map is the relevant one. Once the defmaps are computed
59-
// the project is ready to go, the other indices are just nice to have for some IDE features.
60-
#[derive(PartialOrd, Ord, PartialEq, Eq, Copy, Clone)]
61-
enum PrimingPhase {
62-
DefMap,
63-
ImportMap,
64-
CrateSymbols,
65-
}
44+
// The setup here is a bit complicated. We try to make best use of compute resources.
45+
// The idea is that if we have a def map available to compute, we should do that first.
46+
// This is because def map is a dependency of both import map and symbols. So if we have
47+
// e.g. a def map and a symbols, if we compute the def map we can, after it completes,
48+
// compute the def maps of dependencies, the existing symbols and the symbols of the
49+
// new crate, all in parallel. But if we compute the symbols, after that we will only
50+
// have the def map to compute, and the rest of the CPU cores will rest, which is not
51+
// good.
52+
// However, it's better to compute symbols/import map than to compute a def map that
53+
// isn't ready yet, because one of its dependencies hasn't yet completed its def map.
54+
// Such def map will just block on the dependency, which is just wasted time. So better
55+
// to compute the symbols/import map of an already computed def map in that time.
56+
57+
let (reverse_deps, mut to_be_done_deps) = {
58+
let all_crates = db.all_crates();
59+
let mut reverse_deps =
60+
all_crates.iter().map(|&krate| (krate, Vec::new())).collect::<FxHashMap<_, _>>();
61+
let mut to_be_done_deps =
62+
all_crates.iter().map(|&krate| (krate, 0u32)).collect::<FxHashMap<_, _>>();
63+
for &krate in &*all_crates {
64+
for dep in &krate.data(db).dependencies {
65+
reverse_deps.get_mut(&dep.crate_id).unwrap().push(krate);
66+
*to_be_done_deps.get_mut(&krate).unwrap() += 1;
67+
}
68+
}
69+
(reverse_deps, to_be_done_deps)
70+
};
6671

67-
let (work_sender, progress_receiver) = {
72+
let (def_map_work_sender, import_map_work_sender, symbols_work_sender, progress_receiver) = {
6873
let (progress_sender, progress_receiver) = crossbeam_channel::unbounded();
69-
let (work_sender, work_receiver) = crossbeam_channel::unbounded();
74+
let (def_map_work_sender, def_map_work_receiver) = crossbeam_channel::unbounded();
75+
let (import_map_work_sender, import_map_work_receiver) = crossbeam_channel::unbounded();
76+
let (symbols_work_sender, symbols_work_receiver) = crossbeam_channel::unbounded();
7077
let prime_caches_worker = move |db: RootDatabase| {
71-
while let Ok((crate_id, crate_name, kind)) = work_receiver.recv() {
72-
progress_sender
73-
.send(ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name })?;
74-
75-
let cancelled = Cancelled::catch(|| match kind {
76-
PrimingPhase::DefMap => _ = hir::crate_def_map(&db, crate_id),
77-
PrimingPhase::ImportMap => _ = db.import_map(crate_id),
78-
PrimingPhase::CrateSymbols => _ = db.crate_symbols(crate_id.into()),
79-
});
78+
let handle_def_map = |crate_id, crate_name| {
79+
progress_sender.send(ParallelPrimeCacheWorkerProgress::BeginCrateDefMap {
80+
crate_id,
81+
crate_name,
82+
})?;
83+
84+
let cancelled = Cancelled::catch(|| _ = hir::crate_def_map(&db, crate_id));
8085

8186
match cancelled {
8287
Ok(()) => progress_sender
83-
.send(ParallelPrimeCacheWorkerProgress::EndCrate { crate_id })?,
88+
.send(ParallelPrimeCacheWorkerProgress::EndCrateDefMap { crate_id })?,
89+
Err(cancelled) => progress_sender
90+
.send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?,
91+
}
92+
93+
Ok::<_, crossbeam_channel::SendError<_>>(())
94+
};
95+
let handle_import_map = |crate_id| {
96+
let cancelled = Cancelled::catch(|| _ = db.import_map(crate_id));
97+
98+
match cancelled {
99+
Ok(()) => {
100+
progress_sender.send(ParallelPrimeCacheWorkerProgress::EndCrateImportMap)?
101+
}
84102
Err(cancelled) => progress_sender
85103
.send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?,
86104
}
87-
}
88105

89-
Ok::<_, crossbeam_channel::SendError<_>>(())
106+
Ok::<_, crossbeam_channel::SendError<_>>(())
107+
};
108+
let handle_symbols = |module| {
109+
let cancelled =
110+
Cancelled::catch(AssertUnwindSafe(|| _ = db.module_symbols(module)));
111+
112+
match cancelled {
113+
Ok(()) => {
114+
progress_sender.send(ParallelPrimeCacheWorkerProgress::EndModuleSymbols)?
115+
}
116+
Err(cancelled) => progress_sender
117+
.send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?,
118+
}
119+
120+
Ok::<_, crossbeam_channel::SendError<_>>(())
121+
};
122+
123+
loop {
124+
db.unwind_if_revision_cancelled();
125+
126+
// Biased because we want to prefer def maps.
127+
crossbeam_channel::select_biased! {
128+
recv(def_map_work_receiver) -> work => {
129+
let Ok((crate_id, crate_name)) = work else { return Ok::<_, crossbeam_channel::SendError<_>>(()) };
130+
handle_def_map(crate_id, crate_name)?;
131+
}
132+
recv(import_map_work_receiver) -> work => {
133+
let Ok(crate_id) = work else { return Ok(()) };
134+
handle_import_map(crate_id)?;
135+
}
136+
recv(symbols_work_receiver) -> work => {
137+
let Ok(module) = work else { return Ok(()) };
138+
handle_symbols(module)?;
139+
}
140+
}
141+
}
90142
};
91143

92144
for id in 0..num_worker_threads {
@@ -103,138 +155,112 @@ pub fn parallel_prime_caches(
103155
.expect("failed to spawn thread");
104156
}
105157

106-
(work_sender, progress_receiver)
158+
(def_map_work_sender, import_map_work_sender, symbols_work_sender, progress_receiver)
107159
};
108160

109-
let crates_total = crates_to_prime.pending();
110-
let mut crates_done = 0;
161+
let crate_def_maps_total = db.all_crates().len();
162+
let mut crate_def_maps_done = 0;
163+
let (mut crate_import_maps_total, mut crate_import_maps_done) = (0usize, 0usize);
164+
let (mut module_symbols_total, mut module_symbols_done) = (0usize, 0usize);
111165

112166
// an index map is used to preserve ordering so we can sort the progress report in order of
113167
// "longest crate to index" first
114168
let mut crates_currently_indexing =
115169
FxIndexMap::with_capacity_and_hasher(num_worker_threads, Default::default());
116170

117-
let mut additional_phases = vec![];
118-
119-
while crates_done < crates_total {
120-
db.unwind_if_revision_cancelled();
121-
122-
for krate in &mut crates_to_prime {
123-
let name = krate.extra_data(db).display_name.as_deref().cloned().unwrap_or_else(|| {
124-
Symbol::integer(salsa::plumbing::AsId::as_id(&krate).as_u32() as usize)
125-
});
126-
let origin = &krate.data(db).origin;
127-
if origin.is_lang() {
128-
additional_phases.push((krate, name.clone(), PrimingPhase::ImportMap));
129-
} else if origin.is_local() {
130-
// Compute the symbol search index.
131-
// This primes the cache for `ide_db::symbol_index::world_symbols()`.
132-
//
133-
// We do this for workspace crates only (members of local_roots), because doing it
134-
// for all dependencies could be *very* unnecessarily slow in a large project.
135-
//
136-
// FIXME: We should do it unconditionally if the configuration is set to default to
137-
// searching dependencies (rust-analyzer.workspace.symbol.search.scope), but we
138-
// would need to pipe that configuration information down here.
139-
additional_phases.push((krate, name.clone(), PrimingPhase::CrateSymbols));
140-
}
141-
142-
work_sender.send((krate, name, PrimingPhase::DefMap)).ok();
171+
for (&krate, &to_be_done_deps) in &to_be_done_deps {
172+
if to_be_done_deps != 0 {
173+
continue;
143174
}
144175

145-
// recv_timeout is somewhat a hack, we need a way to from this thread check to see if the current salsa revision
146-
// is cancelled on a regular basis. workers will only exit if they are processing a task that is cancelled, or
147-
// if this thread exits, and closes the work channel.
148-
let worker_progress = match progress_receiver.recv_timeout(Duration::from_millis(10)) {
149-
Ok(p) => p,
150-
Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
151-
continue;
152-
}
153-
Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
154-
// all our workers have exited, mark us as finished and exit
155-
cb(ParallelPrimeCachesProgress {
156-
crates_currently_indexing: vec![],
157-
crates_done,
158-
crates_total: crates_done,
159-
work_type: "Indexing",
160-
});
161-
return;
162-
}
163-
};
164-
match worker_progress {
165-
ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name } => {
166-
crates_currently_indexing.insert(crate_id, crate_name);
167-
}
168-
ParallelPrimeCacheWorkerProgress::EndCrate { crate_id } => {
169-
crates_currently_indexing.swap_remove(&crate_id);
170-
crates_to_prime.mark_done(crate_id);
171-
crates_done += 1;
172-
}
173-
ParallelPrimeCacheWorkerProgress::Cancelled(cancelled) => {
174-
// Cancelled::throw should probably be public
175-
std::panic::resume_unwind(Box::new(cancelled));
176-
}
177-
};
176+
let name = crate_name(db, krate);
177+
def_map_work_sender.send((krate, name)).ok();
178+
}
179+
180+
while crate_def_maps_done < crate_def_maps_total
181+
|| crate_import_maps_done < crate_import_maps_total
182+
|| module_symbols_done < module_symbols_total
183+
{
184+
db.unwind_if_revision_cancelled();
178185

179186
let progress = ParallelPrimeCachesProgress {
180187
crates_currently_indexing: crates_currently_indexing.values().cloned().collect(),
181-
crates_done,
182-
crates_total,
188+
crates_done: crate_def_maps_done,
189+
crates_total: crate_def_maps_total,
183190
work_type: "Indexing",
184191
};
185192

186193
cb(progress);
187-
}
188-
189-
let mut crates_done = 0;
190-
let crates_total = additional_phases.len();
191-
for w in additional_phases.into_iter().sorted_by_key(|&(_, _, phase)| phase) {
192-
work_sender.send(w).ok();
193-
}
194194

195-
while crates_done < crates_total {
196-
db.unwind_if_revision_cancelled();
197-
198-
// recv_timeout is somewhat a hack, we need a way to from this thread check to see if the current salsa revision
199-
// is cancelled on a regular basis. workers will only exit if they are processing a task that is cancelled, or
200-
// if this thread exits, and closes the work channel.
201-
let worker_progress = match progress_receiver.recv_timeout(Duration::from_millis(10)) {
195+
// Biased to prefer progress updates (and because it's faster).
196+
let progress = match progress_receiver.recv() {
202197
Ok(p) => p,
203-
Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
204-
continue;
205-
}
206-
Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
198+
Err(crossbeam_channel::RecvError) => {
207199
// all our workers have exited, mark us as finished and exit
208200
cb(ParallelPrimeCachesProgress {
209201
crates_currently_indexing: vec![],
210-
crates_done,
211-
crates_total: crates_done,
212-
work_type: "Populating symbols",
202+
crates_done: crate_def_maps_done,
203+
crates_total: crate_def_maps_done,
204+
work_type: "Indexing",
213205
});
214206
return;
215207
}
216208
};
217-
match worker_progress {
218-
ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name } => {
209+
210+
match progress {
211+
ParallelPrimeCacheWorkerProgress::BeginCrateDefMap { crate_id, crate_name } => {
219212
crates_currently_indexing.insert(crate_id, crate_name);
220213
}
221-
ParallelPrimeCacheWorkerProgress::EndCrate { crate_id } => {
214+
ParallelPrimeCacheWorkerProgress::EndCrateDefMap { crate_id } => {
222215
crates_currently_indexing.swap_remove(&crate_id);
223-
crates_done += 1;
216+
crate_def_maps_done += 1;
217+
218+
// Fire ready dependencies.
219+
for &dep in &reverse_deps[&crate_id] {
220+
let to_be_done = to_be_done_deps.get_mut(&dep).unwrap();
221+
*to_be_done -= 1;
222+
if *to_be_done == 0 {
223+
let dep_name = crate_name(db, dep);
224+
def_map_work_sender.send((dep, dep_name)).ok();
225+
}
226+
}
227+
228+
let origin = &crate_id.data(db).origin;
229+
if origin.is_lang() {
230+
crate_import_maps_total += 1;
231+
import_map_work_sender.send(crate_id).ok();
232+
} else if origin.is_local() {
233+
// Compute the symbol search index.
234+
// This primes the cache for `ide_db::symbol_index::world_symbols()`.
235+
//
236+
// We do this for workspace crates only (members of local_roots), because doing it
237+
// for all dependencies could be *very* unnecessarily slow in a large project.
238+
//
239+
// FIXME: We should do it unconditionally if the configuration is set to default to
240+
// searching dependencies (rust-analyzer.workspace.symbol.search.scope), but we
241+
// would need to pipe that configuration information down here.
242+
let modules = hir::Crate::from(crate_id).modules(db);
243+
module_symbols_total += modules.len();
244+
for module in modules {
245+
symbols_work_sender.send(module).ok();
246+
}
247+
}
224248
}
249+
ParallelPrimeCacheWorkerProgress::EndCrateImportMap => crate_import_maps_done += 1,
250+
ParallelPrimeCacheWorkerProgress::EndModuleSymbols => module_symbols_done += 1,
225251
ParallelPrimeCacheWorkerProgress::Cancelled(cancelled) => {
226252
// Cancelled::throw should probably be public
227253
std::panic::resume_unwind(Box::new(cancelled));
228254
}
229-
};
230-
231-
let progress = ParallelPrimeCachesProgress {
232-
crates_currently_indexing: crates_currently_indexing.values().cloned().collect(),
233-
crates_done,
234-
crates_total,
235-
work_type: "Populating symbols",
236-
};
237-
238-
cb(progress);
255+
}
239256
}
240257
}
258+
259+
fn crate_name(db: &RootDatabase, krate: Crate) -> Symbol {
260+
krate
261+
.extra_data(db)
262+
.display_name
263+
.as_deref()
264+
.cloned()
265+
.unwrap_or_else(|| Symbol::integer(salsa::plumbing::AsId::as_id(&krate).as_u32() as usize))
266+
}

0 commit comments

Comments
 (0)