Skip to content

Commit 8c43442

Browse files
authored
Merge pull request #19721 from ChayimFriedman2/more-parallel
Better handle parallelism in cache priming
2 parents d30deb5 + f23af92 commit 8c43442

File tree

4 files changed

+188
-256
lines changed

4 files changed

+188
-256
lines changed

crates/ide-db/src/prime_caches.rs

+184-148
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,10 @@
22
//! sometimes is counter productive when, for example, the first goto definition
33
//! request takes longer to compute. This module implements prepopulation of
44
//! various caches, it's not really advanced at the moment.
5-
mod topologic_sort;
6-
7-
use std::time::Duration;
5+
use std::panic::AssertUnwindSafe;
86

97
use hir::{Symbol, db::DefDatabase};
10-
use itertools::Itertools;
8+
use rustc_hash::FxHashMap;
119
use salsa::{Cancelled, Database};
1210

1311
use crate::{
@@ -35,59 +33,114 @@ pub fn parallel_prime_caches(
3533
) {
3634
let _p = tracing::info_span!("parallel_prime_caches").entered();
3735

38-
let mut crates_to_prime = {
39-
// FIXME: We already have the crate list topologically sorted (but without the things
40-
// `TopologicalSortIter` gives us). Maybe there is a way to avoid using it and rip it out
41-
// of the codebase?
42-
let mut builder = topologic_sort::TopologicalSortIter::builder();
43-
44-
for &crate_id in db.all_crates().iter() {
45-
builder.add(crate_id, crate_id.data(db).dependencies.iter().map(|d| d.crate_id));
46-
}
47-
48-
builder.build()
49-
};
50-
5136
enum ParallelPrimeCacheWorkerProgress {
52-
BeginCrate { crate_id: Crate, crate_name: Symbol },
53-
EndCrate { crate_id: Crate },
37+
BeginCrateDefMap { crate_id: Crate, crate_name: Symbol },
38+
EndCrateDefMap { crate_id: Crate },
39+
EndCrateImportMap,
40+
EndModuleSymbols,
5441
Cancelled(Cancelled),
5542
}
5643

57-
// We split off def map computation from other work,
58-
// as the def map is the relevant one. Once the defmaps are computed
59-
// the project is ready to go, the other indices are just nice to have for some IDE features.
60-
#[derive(PartialOrd, Ord, PartialEq, Eq, Copy, Clone)]
61-
enum PrimingPhase {
62-
DefMap,
63-
ImportMap,
64-
CrateSymbols,
65-
}
44+
// The setup here is a bit complicated. We try to make best use of compute resources.
45+
// The idea is that if we have a def map available to compute, we should do that first.
46+
// This is because def map is a dependency of both import map and symbols. So if we have
47+
// e.g. a def map and a symbols, if we compute the def map we can, after it completes,
48+
// compute the def maps of dependencies, the existing symbols and the symbols of the
49+
// new crate, all in parallel. But if we compute the symbols, after that we will only
50+
// have the def map to compute, and the rest of the CPU cores will rest, which is not
51+
// good.
52+
// However, it's better to compute symbols/import map than to compute a def map that
53+
// isn't ready yet, because one of its dependencies hasn't yet completed its def map.
54+
// Such def map will just block on the dependency, which is just wasted time. So better
55+
// to compute the symbols/import map of an already computed def map in that time.
56+
57+
let (reverse_deps, mut to_be_done_deps) = {
58+
let all_crates = db.all_crates();
59+
let to_be_done_deps = all_crates
60+
.iter()
61+
.map(|&krate| (krate, krate.data(db).dependencies.len() as u32))
62+
.collect::<FxHashMap<_, _>>();
63+
let mut reverse_deps =
64+
all_crates.iter().map(|&krate| (krate, Vec::new())).collect::<FxHashMap<_, _>>();
65+
for &krate in &*all_crates {
66+
for dep in &krate.data(db).dependencies {
67+
reverse_deps.get_mut(&dep.crate_id).unwrap().push(krate);
68+
}
69+
}
70+
(reverse_deps, to_be_done_deps)
71+
};
6672

67-
let (work_sender, progress_receiver) = {
73+
let (def_map_work_sender, import_map_work_sender, symbols_work_sender, progress_receiver) = {
6874
let (progress_sender, progress_receiver) = crossbeam_channel::unbounded();
69-
let (work_sender, work_receiver) = crossbeam_channel::unbounded();
70-
let prime_caches_worker = move |db: RootDatabase| {
71-
while let Ok((crate_id, crate_name, kind)) = work_receiver.recv() {
72-
progress_sender
73-
.send(ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name })?;
74-
75-
let cancelled = Cancelled::catch(|| match kind {
76-
PrimingPhase::DefMap => _ = hir::crate_def_map(&db, crate_id),
77-
PrimingPhase::ImportMap => _ = db.import_map(crate_id),
78-
PrimingPhase::CrateSymbols => _ = db.crate_symbols(crate_id.into()),
79-
});
75+
let (def_map_work_sender, def_map_work_receiver) = crossbeam_channel::unbounded();
76+
let (import_map_work_sender, import_map_work_receiver) = crossbeam_channel::unbounded();
77+
let (symbols_work_sender, symbols_work_receiver) = crossbeam_channel::unbounded();
78+
let prime_caches_worker =
79+
move |db: RootDatabase| {
80+
let handle_def_map = |crate_id, crate_name| {
81+
progress_sender.send(ParallelPrimeCacheWorkerProgress::BeginCrateDefMap {
82+
crate_id,
83+
crate_name,
84+
})?;
8085

81-
match cancelled {
82-
Ok(()) => progress_sender
83-
.send(ParallelPrimeCacheWorkerProgress::EndCrate { crate_id })?,
84-
Err(cancelled) => progress_sender
85-
.send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?,
86-
}
87-
}
86+
let cancelled = Cancelled::catch(|| _ = hir::crate_def_map(&db, crate_id));
8887

89-
Ok::<_, crossbeam_channel::SendError<_>>(())
90-
};
88+
match cancelled {
89+
Ok(()) => progress_sender
90+
.send(ParallelPrimeCacheWorkerProgress::EndCrateDefMap { crate_id })?,
91+
Err(cancelled) => progress_sender
92+
.send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?,
93+
}
94+
95+
Ok::<_, crossbeam_channel::SendError<_>>(())
96+
};
97+
let handle_import_map = |crate_id| {
98+
let cancelled = Cancelled::catch(|| _ = db.import_map(crate_id));
99+
100+
match cancelled {
101+
Ok(()) => progress_sender
102+
.send(ParallelPrimeCacheWorkerProgress::EndCrateImportMap)?,
103+
Err(cancelled) => progress_sender
104+
.send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?,
105+
}
106+
107+
Ok::<_, crossbeam_channel::SendError<_>>(())
108+
};
109+
let handle_symbols = |module| {
110+
let cancelled =
111+
Cancelled::catch(AssertUnwindSafe(|| _ = db.module_symbols(module)));
112+
113+
match cancelled {
114+
Ok(()) => progress_sender
115+
.send(ParallelPrimeCacheWorkerProgress::EndModuleSymbols)?,
116+
Err(cancelled) => progress_sender
117+
.send(ParallelPrimeCacheWorkerProgress::Cancelled(cancelled))?,
118+
}
119+
120+
Ok::<_, crossbeam_channel::SendError<_>>(())
121+
};
122+
123+
loop {
124+
db.unwind_if_revision_cancelled();
125+
126+
// Biased because we want to prefer def maps.
127+
crossbeam_channel::select_biased! {
128+
recv(def_map_work_receiver) -> work => {
129+
let Ok((crate_id, crate_name)) = work else { break };
130+
handle_def_map(crate_id, crate_name)?;
131+
}
132+
recv(import_map_work_receiver) -> work => {
133+
let Ok(crate_id) = work else { break };
134+
handle_import_map(crate_id)?;
135+
}
136+
recv(symbols_work_receiver) -> work => {
137+
let Ok(module) = work else { break };
138+
handle_symbols(module)?;
139+
}
140+
}
141+
}
142+
Ok::<_, crossbeam_channel::SendError<_>>(())
143+
};
91144

92145
for id in 0..num_worker_threads {
93146
stdx::thread::Builder::new(
@@ -103,138 +156,121 @@ pub fn parallel_prime_caches(
103156
.expect("failed to spawn thread");
104157
}
105158

106-
(work_sender, progress_receiver)
159+
(def_map_work_sender, import_map_work_sender, symbols_work_sender, progress_receiver)
107160
};
108161

109-
let crates_total = crates_to_prime.pending();
110-
let mut crates_done = 0;
162+
let crate_def_maps_total = db.all_crates().len();
163+
let mut crate_def_maps_done = 0;
164+
let (mut crate_import_maps_total, mut crate_import_maps_done) = (0usize, 0usize);
165+
let (mut module_symbols_total, mut module_symbols_done) = (0usize, 0usize);
111166

112167
// an index map is used to preserve ordering so we can sort the progress report in order of
113168
// "longest crate to index" first
114169
let mut crates_currently_indexing =
115170
FxIndexMap::with_capacity_and_hasher(num_worker_threads, Default::default());
116171

117-
let mut additional_phases = vec![];
118-
119-
while crates_done < crates_total {
120-
db.unwind_if_revision_cancelled();
121-
122-
for krate in &mut crates_to_prime {
123-
let name = krate.extra_data(db).display_name.as_deref().cloned().unwrap_or_else(|| {
124-
Symbol::integer(salsa::plumbing::AsId::as_id(&krate).as_u32() as usize)
125-
});
126-
let origin = &krate.data(db).origin;
127-
if origin.is_lang() {
128-
additional_phases.push((krate, name.clone(), PrimingPhase::ImportMap));
129-
} else if origin.is_local() {
130-
// Compute the symbol search index.
131-
// This primes the cache for `ide_db::symbol_index::world_symbols()`.
132-
//
133-
// We do this for workspace crates only (members of local_roots), because doing it
134-
// for all dependencies could be *very* unnecessarily slow in a large project.
135-
//
136-
// FIXME: We should do it unconditionally if the configuration is set to default to
137-
// searching dependencies (rust-analyzer.workspace.symbol.search.scope), but we
138-
// would need to pipe that configuration information down here.
139-
additional_phases.push((krate, name.clone(), PrimingPhase::CrateSymbols));
140-
}
141-
142-
work_sender.send((krate, name, PrimingPhase::DefMap)).ok();
172+
for (&krate, &to_be_done_deps) in &to_be_done_deps {
173+
if to_be_done_deps != 0 {
174+
continue;
143175
}
144176

145-
// recv_timeout is somewhat a hack, we need a way to from this thread check to see if the current salsa revision
146-
// is cancelled on a regular basis. workers will only exit if they are processing a task that is cancelled, or
147-
// if this thread exits, and closes the work channel.
148-
let worker_progress = match progress_receiver.recv_timeout(Duration::from_millis(10)) {
149-
Ok(p) => p,
150-
Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
151-
continue;
152-
}
153-
Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
154-
// all our workers have exited, mark us as finished and exit
155-
cb(ParallelPrimeCachesProgress {
156-
crates_currently_indexing: vec![],
157-
crates_done,
158-
crates_total: crates_done,
159-
work_type: "Indexing",
160-
});
161-
return;
162-
}
163-
};
164-
match worker_progress {
165-
ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name } => {
166-
crates_currently_indexing.insert(crate_id, crate_name);
167-
}
168-
ParallelPrimeCacheWorkerProgress::EndCrate { crate_id } => {
169-
crates_currently_indexing.swap_remove(&crate_id);
170-
crates_to_prime.mark_done(crate_id);
171-
crates_done += 1;
172-
}
173-
ParallelPrimeCacheWorkerProgress::Cancelled(cancelled) => {
174-
// Cancelled::throw should probably be public
175-
std::panic::resume_unwind(Box::new(cancelled));
176-
}
177-
};
177+
let name = crate_name(db, krate);
178+
def_map_work_sender.send((krate, name)).ok();
179+
}
180+
181+
while crate_def_maps_done < crate_def_maps_total
182+
|| crate_import_maps_done < crate_import_maps_total
183+
|| module_symbols_done < module_symbols_total
184+
{
185+
db.unwind_if_revision_cancelled();
178186

179187
let progress = ParallelPrimeCachesProgress {
180188
crates_currently_indexing: crates_currently_indexing.values().cloned().collect(),
181-
crates_done,
182-
crates_total,
189+
crates_done: crate_def_maps_done,
190+
crates_total: crate_def_maps_total,
183191
work_type: "Indexing",
184192
};
185193

186194
cb(progress);
187-
}
188-
189-
let mut crates_done = 0;
190-
let crates_total = additional_phases.len();
191-
for w in additional_phases.into_iter().sorted_by_key(|&(_, _, phase)| phase) {
192-
work_sender.send(w).ok();
193-
}
194-
195-
while crates_done < crates_total {
196-
db.unwind_if_revision_cancelled();
197195

198-
// recv_timeout is somewhat a hack, we need a way to from this thread check to see if the current salsa revision
199-
// is cancelled on a regular basis. workers will only exit if they are processing a task that is cancelled, or
200-
// if this thread exits, and closes the work channel.
201-
let worker_progress = match progress_receiver.recv_timeout(Duration::from_millis(10)) {
196+
// Biased to prefer progress updates (and because it's faster).
197+
let progress = match progress_receiver.recv() {
202198
Ok(p) => p,
203-
Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
204-
continue;
205-
}
206-
Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
199+
Err(crossbeam_channel::RecvError) => {
207200
// all our workers have exited, mark us as finished and exit
208201
cb(ParallelPrimeCachesProgress {
209202
crates_currently_indexing: vec![],
210-
crates_done,
211-
crates_total: crates_done,
212-
work_type: "Populating symbols",
203+
crates_done: crate_def_maps_done,
204+
crates_total: crate_def_maps_done,
205+
work_type: "Done",
213206
});
214207
return;
215208
}
216209
};
217-
match worker_progress {
218-
ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name } => {
210+
211+
match progress {
212+
ParallelPrimeCacheWorkerProgress::BeginCrateDefMap { crate_id, crate_name } => {
219213
crates_currently_indexing.insert(crate_id, crate_name);
220214
}
221-
ParallelPrimeCacheWorkerProgress::EndCrate { crate_id } => {
215+
ParallelPrimeCacheWorkerProgress::EndCrateDefMap { crate_id } => {
222216
crates_currently_indexing.swap_remove(&crate_id);
223-
crates_done += 1;
217+
crate_def_maps_done += 1;
218+
219+
// Fire ready dependencies.
220+
for &dep in &reverse_deps[&crate_id] {
221+
let to_be_done = to_be_done_deps.get_mut(&dep).unwrap();
222+
*to_be_done -= 1;
223+
if *to_be_done == 0 {
224+
let dep_name = crate_name(db, dep);
225+
def_map_work_sender.send((dep, dep_name)).ok();
226+
}
227+
}
228+
229+
if crate_def_maps_done == crate_def_maps_total {
230+
cb(ParallelPrimeCachesProgress {
231+
crates_currently_indexing: vec![],
232+
crates_done: crate_def_maps_done,
233+
crates_total: crate_def_maps_done,
234+
work_type: "Collecting Symbols",
235+
});
236+
}
237+
238+
let origin = &crate_id.data(db).origin;
239+
if origin.is_lang() {
240+
crate_import_maps_total += 1;
241+
import_map_work_sender.send(crate_id).ok();
242+
} else if origin.is_local() {
243+
// Compute the symbol search index.
244+
// This primes the cache for `ide_db::symbol_index::world_symbols()`.
245+
//
246+
// We do this for workspace crates only (members of local_roots), because doing it
247+
// for all dependencies could be *very* unnecessarily slow in a large project.
248+
//
249+
// FIXME: We should do it unconditionally if the configuration is set to default to
250+
// searching dependencies (rust-analyzer.workspace.symbol.search.scope), but we
251+
// would need to pipe that configuration information down here.
252+
let modules = hir::Crate::from(crate_id).modules(db);
253+
module_symbols_total += modules.len();
254+
for module in modules {
255+
symbols_work_sender.send(module).ok();
256+
}
257+
}
224258
}
259+
ParallelPrimeCacheWorkerProgress::EndCrateImportMap => crate_import_maps_done += 1,
260+
ParallelPrimeCacheWorkerProgress::EndModuleSymbols => module_symbols_done += 1,
225261
ParallelPrimeCacheWorkerProgress::Cancelled(cancelled) => {
226262
// Cancelled::throw should probably be public
227263
std::panic::resume_unwind(Box::new(cancelled));
228264
}
229-
};
230-
231-
let progress = ParallelPrimeCachesProgress {
232-
crates_currently_indexing: crates_currently_indexing.values().cloned().collect(),
233-
crates_done,
234-
crates_total,
235-
work_type: "Populating symbols",
236-
};
237-
238-
cb(progress);
265+
}
239266
}
240267
}
268+
269+
fn crate_name(db: &RootDatabase, krate: Crate) -> Symbol {
270+
krate
271+
.extra_data(db)
272+
.display_name
273+
.as_deref()
274+
.cloned()
275+
.unwrap_or_else(|| Symbol::integer(salsa::plumbing::AsId::as_id(&krate).as_u32() as usize))
276+
}

0 commit comments

Comments
 (0)