Skip to content

Commit 2cb85c1

Browse files
bors[bot]jhgg
andauthored
Merge #11281
11281: ide: parallel prime caches r=jonas-schievink a=jhgg cache priming goes brrrr... the successor to #10149 --- this PR implements a parallel cache priming strategy that uses a topological work queue to feed a pool of worker threads the crates to index in parallel. ## todo - [x] should we keep the old prime caches? - [x] we should use num_cpus to detect how many cpus to use to prime caches. should we also expose a config for # of worker CPU threads to use? - [x] something is wonky with cancellation, need to figure it out before this can merge. Co-authored-by: Jake Heinz <[email protected]>
2 parents 5f13d6a + 25f67b6 commit 2cb85c1

File tree

11 files changed

+289
-33
lines changed

11 files changed

+289
-33
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/ide/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ doctest = false
1111

1212
[dependencies]
1313
cov-mark = "2.0.0-pre.1"
14+
crossbeam-channel = "0.5.0"
1415
either = "1.5.3"
1516
itertools = "0.10.0"
1617
tracing = "0.1"

crates/ide/src/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ pub use crate::{
8787
moniker::{MonikerKind, MonikerResult, PackageInformation},
8888
move_item::Direction,
8989
navigation_target::NavigationTarget,
90-
prime_caches::PrimeCachesProgress,
90+
prime_caches::ParallelPrimeCachesProgress,
9191
references::ReferenceSearchResult,
9292
rename::RenameError,
9393
runnables::{Runnable, RunnableKind, TestId},
@@ -244,11 +244,11 @@ impl Analysis {
244244
self.with_db(|db| status::status(&*db, file_id))
245245
}
246246

247-
pub fn prime_caches<F>(&self, cb: F) -> Cancellable<()>
247+
pub fn parallel_prime_caches<F>(&self, num_worker_threads: u8, cb: F) -> Cancellable<()>
248248
where
249-
F: Fn(PrimeCachesProgress) + Sync + std::panic::UnwindSafe,
249+
F: Fn(ParallelPrimeCachesProgress) + Sync + std::panic::UnwindSafe,
250250
{
251-
self.with_db(move |db| prime_caches::prime_caches(db, &cb))
251+
self.with_db(move |db| prime_caches::parallel_prime_caches(db, num_worker_threads, &cb))
252252
}
253253

254254
/// Gets the text of the source file.

crates/ide/src/prime_caches.rs

Lines changed: 132 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,47 +2,158 @@
22
//! sometimes is counter productive when, for example, the first goto definition
33
//! request takes longer to compute. This modules implemented prepopulation of
44
//! various caches, it's not really advanced at the moment.
5+
mod topologic_sort;
6+
7+
use std::time::Duration;
58

69
use hir::db::DefDatabase;
7-
use ide_db::base_db::{SourceDatabase, SourceDatabaseExt};
10+
use ide_db::{
11+
base_db::{
12+
salsa::{Database, ParallelDatabase, Snapshot},
13+
Cancelled, CrateGraph, CrateId, SourceDatabase, SourceDatabaseExt,
14+
},
15+
FxIndexMap,
16+
};
817
use rustc_hash::FxHashSet;
918

1019
use crate::RootDatabase;
1120

12-
/// We started indexing a crate.
21+
/// We're indexing many crates.
1322
#[derive(Debug)]
14-
pub struct PrimeCachesProgress {
15-
pub on_crate: String,
16-
pub n_done: usize,
17-
pub n_total: usize,
23+
pub struct ParallelPrimeCachesProgress {
24+
/// the crates that we are currently priming.
25+
pub crates_currently_indexing: Vec<String>,
26+
/// the total number of crates we want to prime.
27+
pub crates_total: usize,
28+
/// the total number of crates that have finished priming
29+
pub crates_done: usize,
1830
}
1931

20-
pub(crate) fn prime_caches(db: &RootDatabase, cb: &(dyn Fn(PrimeCachesProgress) + Sync)) {
32+
pub(crate) fn parallel_prime_caches(
33+
db: &RootDatabase,
34+
num_worker_threads: u8,
35+
cb: &(dyn Fn(ParallelPrimeCachesProgress) + Sync),
36+
) {
2137
let _p = profile::span("prime_caches");
38+
2239
let graph = db.crate_graph();
40+
let mut crates_to_prime = {
41+
let crate_ids = compute_crates_to_prime(db, &graph);
42+
43+
let mut builder = topologic_sort::TopologicalSortIter::builder();
44+
45+
for &crate_id in &crate_ids {
46+
let crate_data = &graph[crate_id];
47+
let dependencies = crate_data
48+
.dependencies
49+
.iter()
50+
.map(|d| d.crate_id)
51+
.filter(|i| crate_ids.contains(i));
52+
53+
builder.add(crate_id, dependencies);
54+
}
55+
56+
builder.build()
57+
};
58+
59+
enum ParallelPrimeCacheWorkerProgress {
60+
BeginCrate { crate_id: CrateId, crate_name: String },
61+
EndCrate { crate_id: CrateId },
62+
}
63+
64+
let (work_sender, progress_receiver) = {
65+
let (progress_sender, progress_receiver) = crossbeam_channel::unbounded();
66+
let (work_sender, work_receiver) = crossbeam_channel::unbounded();
67+
let prime_caches_worker = move |db: Snapshot<RootDatabase>| {
68+
while let Ok((crate_id, crate_name)) = work_receiver.recv() {
69+
progress_sender
70+
.send(ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name })?;
71+
72+
// This also computes the DefMap
73+
db.import_map(crate_id);
74+
75+
progress_sender.send(ParallelPrimeCacheWorkerProgress::EndCrate { crate_id })?;
76+
}
77+
78+
Ok::<_, crossbeam_channel::SendError<_>>(())
79+
};
80+
81+
for _ in 0..num_worker_threads {
82+
let worker = prime_caches_worker.clone();
83+
let db = db.snapshot();
84+
std::thread::spawn(move || Cancelled::catch(|| worker(db)));
85+
}
86+
87+
(work_sender, progress_receiver)
88+
};
89+
90+
let crates_total = crates_to_prime.pending();
91+
let mut crates_done = 0;
92+
93+
// an index map is used to preserve ordering so we can sort the progress report in order of
94+
// "longest crate to index" first
95+
let mut crates_currently_indexing =
96+
FxIndexMap::with_capacity_and_hasher(num_worker_threads as _, Default::default());
97+
98+
while crates_done < crates_total {
99+
db.unwind_if_cancelled();
100+
101+
for crate_id in &mut crates_to_prime {
102+
work_sender
103+
.send((
104+
crate_id,
105+
graph[crate_id].display_name.as_deref().unwrap_or_default().to_string(),
106+
))
107+
.ok();
108+
}
109+
110+
// recv_timeout is somewhat a hack, we need a way to from this thread check to see if the current salsa revision
111+
// is cancelled on a regular basis. workers will only exit if they are processing a task that is cancelled, or
112+
// if this thread exits, and closes the work channel.
113+
let worker_progress = match progress_receiver.recv_timeout(Duration::from_millis(10)) {
114+
Ok(p) => p,
115+
Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
116+
continue;
117+
}
118+
Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
119+
// our workers may have died from a cancelled task, so we'll check and re-raise here.
120+
db.unwind_if_cancelled();
121+
break;
122+
}
123+
};
124+
match worker_progress {
125+
ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name } => {
126+
crates_currently_indexing.insert(crate_id, crate_name);
127+
}
128+
ParallelPrimeCacheWorkerProgress::EndCrate { crate_id } => {
129+
crates_currently_indexing.remove(&crate_id);
130+
crates_to_prime.mark_done(crate_id);
131+
crates_done += 1;
132+
}
133+
};
134+
135+
let progress = ParallelPrimeCachesProgress {
136+
crates_currently_indexing: crates_currently_indexing.values().cloned().collect(),
137+
crates_done,
138+
crates_total,
139+
};
140+
141+
cb(progress);
142+
}
143+
}
144+
145+
fn compute_crates_to_prime(db: &RootDatabase, graph: &CrateGraph) -> FxHashSet<CrateId> {
23146
// We're only interested in the workspace crates and the `ImportMap`s of their direct
24147
// dependencies, though in practice the latter also compute the `DefMap`s.
25148
// We don't prime transitive dependencies because they're generally not visible in
26149
// the current workspace.
27-
let to_prime: FxHashSet<_> = graph
150+
graph
28151
.iter()
29152
.filter(|&id| {
30153
let file_id = graph[id].root_file_id;
31154
let root_id = db.file_source_root(file_id);
32155
!db.source_root(root_id).is_library
33156
})
34157
.flat_map(|id| graph[id].dependencies.iter().map(|krate| krate.crate_id))
35-
.collect();
36-
37-
// FIXME: This would be easy to parallelize, since it's in the ideal ordering for that.
38-
// Unfortunately rayon prevents panics from propagation out of a `scope`, which breaks
39-
// cancellation, so we cannot use rayon.
40-
let n_total = to_prime.len();
41-
for (n_done, &crate_id) in to_prime.iter().enumerate() {
42-
let crate_name = graph[crate_id].display_name.as_deref().unwrap_or_default().to_string();
43-
44-
cb(PrimeCachesProgress { on_crate: crate_name, n_done, n_total });
45-
// This also computes the DefMap
46-
db.import_map(crate_id);
47-
}
158+
.collect()
48159
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
//! helper data structure to schedule work for parallel prime caches.
2+
use std::{collections::VecDeque, hash::Hash};
3+
4+
use rustc_hash::FxHashMap;
5+
6+
pub(crate) struct TopologicSortIterBuilder<T> {
7+
nodes: FxHashMap<T, Entry<T>>,
8+
}
9+
10+
impl<T> TopologicSortIterBuilder<T>
11+
where
12+
T: Copy + Eq + PartialEq + Hash,
13+
{
14+
fn new() -> Self {
15+
Self { nodes: Default::default() }
16+
}
17+
18+
fn get_or_create_entry(&mut self, item: T) -> &mut Entry<T> {
19+
self.nodes.entry(item).or_default()
20+
}
21+
22+
pub(crate) fn add(&mut self, item: T, predecessors: impl IntoIterator<Item = T>) {
23+
let mut num_predecessors = 0;
24+
25+
for predecessor in predecessors.into_iter() {
26+
self.get_or_create_entry(predecessor).successors.push(item);
27+
num_predecessors += 1;
28+
}
29+
30+
let entry = self.get_or_create_entry(item);
31+
entry.num_predecessors += num_predecessors;
32+
}
33+
34+
pub(crate) fn build(self) -> TopologicalSortIter<T> {
35+
let ready = self
36+
.nodes
37+
.iter()
38+
.filter_map(
39+
|(item, entry)| if entry.num_predecessors == 0 { Some(*item) } else { None },
40+
)
41+
.collect();
42+
43+
TopologicalSortIter { nodes: self.nodes, ready }
44+
}
45+
}
46+
47+
pub(crate) struct TopologicalSortIter<T> {
48+
ready: VecDeque<T>,
49+
nodes: FxHashMap<T, Entry<T>>,
50+
}
51+
52+
impl<T> TopologicalSortIter<T>
53+
where
54+
T: Copy + Eq + PartialEq + Hash,
55+
{
56+
pub(crate) fn builder() -> TopologicSortIterBuilder<T> {
57+
TopologicSortIterBuilder::new()
58+
}
59+
60+
pub(crate) fn pending(&self) -> usize {
61+
self.nodes.len()
62+
}
63+
64+
pub(crate) fn mark_done(&mut self, item: T) {
65+
let entry = self.nodes.remove(&item).expect("invariant: unknown item marked as done");
66+
67+
for successor in entry.successors {
68+
let succ_entry = self
69+
.nodes
70+
.get_mut(&successor)
71+
.expect("invariant: unknown successor referenced by entry");
72+
73+
succ_entry.num_predecessors -= 1;
74+
if succ_entry.num_predecessors == 0 {
75+
self.ready.push_back(successor);
76+
}
77+
}
78+
}
79+
}
80+
81+
impl<T> Iterator for TopologicalSortIter<T> {
82+
type Item = T;
83+
84+
fn next(&mut self) -> Option<Self::Item> {
85+
self.ready.pop_front()
86+
}
87+
}
88+
89+
struct Entry<T> {
90+
successors: Vec<T>,
91+
num_predecessors: usize,
92+
}
93+
94+
impl<T> Default for Entry<T> {
95+
fn default() -> Self {
96+
Self { successors: Default::default(), num_predecessors: 0 }
97+
}
98+
}

crates/rust-analyzer/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ serde = { version = "1.0.106", features = ["derive"] }
3131
serde_json = { version = "1.0.48", features = ["preserve_order"] }
3232
threadpool = "1.7.1"
3333
rayon = "1.5"
34+
num_cpus = "1.13.1"
3435
mimalloc = { version = "0.1.19", default-features = false, optional = true }
3536
lsp-server = "0.5.1"
3637
tracing = "0.1"

crates/rust-analyzer/src/cli/load_cargo.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ pub fn load_workspace(
8888
load_crate_graph(crate_graph, project_folders.source_root_config, &mut vfs, &receiver);
8989

9090
if load_config.prefill_caches {
91-
host.analysis().prime_caches(|_| {})?;
91+
host.analysis().parallel_prime_caches(1, |_| {})?;
9292
}
9393
Ok((host, vfs, proc_macro_client))
9494
}

crates/rust-analyzer/src/config.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,9 @@ config_data! {
298298
/// Whether to show `can't find Cargo.toml` error message.
299299
notifications_cargoTomlNotFound: bool = "true",
300300

301+
/// How many worker threads to to handle priming caches. The default `0` means to pick automatically.
302+
primeCaches_numThreads: ParallelPrimeCachesNumThreads = "0",
303+
301304
/// Enable support for procedural macros, implies `#rust-analyzer.cargo.runBuildScripts#`.
302305
procMacro_enable: bool = "true",
303306
/// Internal config, path to proc-macro server executable (typically,
@@ -1016,6 +1019,13 @@ impl Config {
10161019
yield_points: self.data.highlightRelated_yieldPoints,
10171020
}
10181021
}
1022+
1023+
pub fn prime_caches_num_threads(&self) -> u8 {
1024+
match self.data.primeCaches_numThreads {
1025+
0 => num_cpus::get_physical().try_into().unwrap_or(u8::MAX),
1026+
n => n,
1027+
}
1028+
}
10191029
}
10201030

10211031
#[derive(Deserialize, Debug, Clone, Copy)]
@@ -1130,6 +1140,8 @@ enum WorkspaceSymbolSearchKindDef {
11301140
AllSymbols,
11311141
}
11321142

1143+
type ParallelPrimeCachesNumThreads = u8;
1144+
11331145
macro_rules! _config_data {
11341146
(struct $name:ident {
11351147
$(
@@ -1351,6 +1363,11 @@ fn field_props(field: &str, ty: &str, doc: &[&str], default: &str) -> serde_json
13511363
"Search for all symbols kinds"
13521364
],
13531365
},
1366+
"ParallelPrimeCachesNumThreads" => set! {
1367+
"type": "number",
1368+
"minimum": 0,
1369+
"maximum": 255
1370+
},
13541371
_ => panic!("{}: {}", ty, default),
13551372
}
13561373

0 commit comments

Comments
 (0)