Skip to content

Commit 91f54e3

Browse files
committed
Refactor scheduler and worker creation
This commit eliminates unnecessary Option<T> fields, two-phase initialization and unsafe get_mut_unchecked call in GCWorkScheduler. We consider MMTK, GCWorkScheduler and GCWorkerShared as shared data between threads, and GCWorker and GCController as private data of the workers and the controller. We now create shared data, including all GCWorkerShared structs, before spawning any GC threads. This means we no longer spawn GC threads before GCWorkScheduler is fully initialized, and eliminated some unsafe operations. We temporarily make Options::threads only settable via environment variable, because we now create GCWorkerShared instances when GCWorkScheduler is created, which is usually static.
1 parent efc6b99 commit 91f54e3

File tree

9 files changed

+138
-169
lines changed

9 files changed

+138
-169
lines changed

src/memory_manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ pub fn initialize_collection<VM: VMBinding>(mmtk: &'static MMTK<VM>, tls: VMThre
196196
!mmtk.plan.is_initialized(),
197197
"MMTk collection has been initialized (was initialize_collection() already called before?)"
198198
);
199-
mmtk.scheduler.initialize(*mmtk.options.threads, mmtk, tls);
199+
mmtk.scheduler.spawn_gc_threads(mmtk, tls);
200200
mmtk.plan.base().initialized.store(true, Ordering::SeqCst);
201201
}
202202

src/mmtk.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,15 @@ impl<VM: VMBinding> MMTK<VM> {
5757
// The first call will initialize SFT map. Other calls will be blocked until SFT map is initialized.
5858
SFT_MAP.initialize_once();
5959

60-
let scheduler = GCWorkScheduler::new();
6160
let options = Arc::new(UnsafeOptionsWrapper::new(Options::default()));
61+
62+
let num_workers = if cfg!(feature = "single_worker") {
63+
1
64+
} else {
65+
*options.threads
66+
};
67+
68+
let scheduler = GCWorkScheduler::new(num_workers);
6269
let plan = crate::plan::create_plan(
6370
*options.plan,
6471
&VM_MAP,

src/scheduler/controller.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ impl<VM: VMBinding> GCController<VM> {
8686
}
8787
}
8888
let _guard = self.scheduler.worker_monitor.0.lock().unwrap();
89-
if self.scheduler.worker_group().all_parked() && self.scheduler.all_buckets_empty() {
89+
if self.scheduler.all_workers_parked() && self.scheduler.all_buckets_empty() {
9090
break;
9191
}
9292
}

src/scheduler/gc_work.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ impl<C: GCWorkContext + 'static> GCWork<C::VM> for Prepare<C> {
4949
mmtk.scheduler.work_buckets[WorkBucketStage::Prepare]
5050
.add(PrepareMutator::<C::VM>::new(mutator));
5151
}
52-
for w in &mmtk.scheduler.worker_group().workers_shared {
52+
for w in &mmtk.scheduler.workers_shared {
5353
w.local_work_bucket.add(PrepareCollector);
5454
}
5555
}
@@ -117,7 +117,7 @@ impl<C: GCWorkContext + 'static> GCWork<C::VM> for Release<C> {
117117
mmtk.scheduler.work_buckets[WorkBucketStage::Release]
118118
.add(ReleaseMutator::<C::VM>::new(mutator));
119119
}
120-
for w in &mmtk.scheduler.worker_group().workers_shared {
120+
for w in &mmtk.scheduler.workers_shared {
121121
w.local_work_bucket.add(ReleaseCollector);
122122
}
123123
// TODO: Process weak references properly

src/scheduler/scheduler.rs

Lines changed: 90 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use super::stat::SchedulerStat;
2+
use super::work_bucket::WorkBucketStage::*;
23
use super::work_bucket::*;
3-
use super::worker::{GCWorker, GCWorkerShared, WorkerGroup};
4+
use super::worker::{GCWorker, GCWorkerShared};
45
use super::*;
56
use crate::mmtk::MMTK;
67
use crate::util::opaque_pointer::*;
@@ -10,23 +11,24 @@ use enum_map::{enum_map, EnumMap};
1011
use std::collections::HashMap;
1112
use std::sync::atomic::Ordering;
1213
use std::sync::mpsc::channel;
13-
use std::sync::{Arc, Condvar, Mutex, RwLock};
14+
use std::sync::{Arc, Condvar, Mutex};
1415

1516
pub enum CoordinatorMessage<VM: VMBinding> {
1617
Work(Box<dyn CoordinatorWork<VM>>),
1718
AllWorkerParked,
1819
BucketDrained,
1920
}
2021

22+
/// The shared data structure for distributing work packets between worker threads and the coordinator thread.
2123
pub struct GCWorkScheduler<VM: VMBinding> {
22-
/// Work buckets
24+
/// Work buckets for worker threads
2325
pub work_buckets: EnumMap<WorkBucketStage, WorkBucket<VM>>,
2426
/// Work for the coordinator thread
2527
pub coordinator_work: WorkBucket<VM>,
2628
/// The shared parts of GC workers
27-
worker_group: Option<WorkerGroup<VM>>,
29+
pub workers_shared: Vec<Arc<GCWorkerShared<VM>>>,
2830
/// The shared part of the GC worker object of the controller thread
29-
coordinator_worker_shared: Option<RwLock<Arc<GCWorkerShared<VM>>>>,
31+
coordinator_worker_shared: Arc<GCWorkerShared<VM>>,
3032
/// Condition Variable for worker synchronization
3133
pub worker_monitor: Arc<(Mutex<()>, Condvar)>,
3234
/// A callback to be fired after the `Closure` bucket is drained.
@@ -41,79 +43,42 @@ pub struct GCWorkScheduler<VM: VMBinding> {
4143
closure_end: Mutex<Option<Box<dyn Send + Fn() -> bool>>>,
4244
}
4345

44-
// The 'channel' inside Scheduler disallows Sync for Scheduler. We have to make sure we use channel properly:
45-
// 1. We should never directly use Sender. We clone the sender and let each worker have their own copy.
46-
// 2. Only the coordinator can use Receiver.
47-
// TODO: We should remove channel from Scheduler, and directly send Sender/Receiver when creating the coordinator and
48-
// the workers.
46+
// FIXME: GCWorkScheduler should be naturally Sync, but we cannot remove this `impl` yet.
47+
// Some subtle interaction between ObjectRememberingBarrier, Mutator and some GCWork instances
48+
// makes the compiler think WorkBucket is not Sync.
4949
unsafe impl<VM: VMBinding> Sync for GCWorkScheduler<VM> {}
5050

5151
impl<VM: VMBinding> GCWorkScheduler<VM> {
52-
pub fn new() -> Arc<Self> {
52+
pub fn new(num_workers: usize) -> Arc<Self> {
5353
let worker_monitor: Arc<(Mutex<()>, Condvar)> = Default::default();
54-
Arc::new(Self {
55-
work_buckets: enum_map! {
56-
WorkBucketStage::Unconstrained => WorkBucket::new(true, worker_monitor.clone()),
57-
WorkBucketStage::Prepare => WorkBucket::new(false, worker_monitor.clone()),
58-
WorkBucketStage::Closure => WorkBucket::new(false, worker_monitor.clone()),
59-
WorkBucketStage::RefClosure => WorkBucket::new(false, worker_monitor.clone()),
60-
WorkBucketStage::CalculateForwarding => WorkBucket::new(false, worker_monitor.clone()),
61-
WorkBucketStage::RefForwarding => WorkBucket::new(false, worker_monitor.clone()),
62-
WorkBucketStage::Compact => WorkBucket::new(false, worker_monitor.clone()),
63-
WorkBucketStage::Release => WorkBucket::new(false, worker_monitor.clone()),
64-
WorkBucketStage::Final => WorkBucket::new(false, worker_monitor.clone()),
65-
},
66-
coordinator_work: WorkBucket::new(true, worker_monitor.clone()),
67-
worker_group: None,
68-
coordinator_worker_shared: None,
69-
worker_monitor,
70-
closure_end: Mutex::new(None),
71-
})
72-
}
7354

74-
#[inline]
75-
pub fn num_workers(&self) -> usize {
76-
self.worker_group.as_ref().unwrap().worker_count()
77-
}
78-
79-
pub fn initialize(
80-
self: &'static Arc<Self>,
81-
num_workers: usize,
82-
mmtk: &'static MMTK<VM>,
83-
tls: VMThread,
84-
) {
85-
use crate::scheduler::work_bucket::WorkBucketStage::*;
86-
let num_workers = if cfg!(feature = "single_worker") {
87-
1
88-
} else {
89-
num_workers
55+
// Create work buckets for workers.
56+
let mut work_buckets = enum_map! {
57+
WorkBucketStage::Unconstrained => WorkBucket::new(true, worker_monitor.clone()),
58+
WorkBucketStage::Prepare => WorkBucket::new(false, worker_monitor.clone()),
59+
WorkBucketStage::Closure => WorkBucket::new(false, worker_monitor.clone()),
60+
WorkBucketStage::RefClosure => WorkBucket::new(false, worker_monitor.clone()),
61+
WorkBucketStage::CalculateForwarding => WorkBucket::new(false, worker_monitor.clone()),
62+
WorkBucketStage::RefForwarding => WorkBucket::new(false, worker_monitor.clone()),
63+
WorkBucketStage::Compact => WorkBucket::new(false, worker_monitor.clone()),
64+
WorkBucketStage::Release => WorkBucket::new(false, worker_monitor.clone()),
65+
WorkBucketStage::Final => WorkBucket::new(false, worker_monitor.clone()),
9066
};
9167

92-
let (sender, receiver) = channel::<CoordinatorMessage<VM>>();
93-
94-
let mut self_mut = self.clone();
95-
let self_mut = unsafe { Arc::get_mut_unchecked(&mut self_mut) };
96-
97-
let coordinator_worker = GCWorker::new(mmtk, 0, self.clone(), true, sender.clone());
98-
self_mut.coordinator_worker_shared = Some(RwLock::new(coordinator_worker.shared.clone()));
99-
100-
let (worker_group, spawn_workers) =
101-
WorkerGroup::new(mmtk, num_workers, self.clone(), sender);
102-
self_mut.worker_group = Some(worker_group);
103-
68+
// Set the open condition of each bucket.
10469
{
10570
// Unconstrained is always open. Prepare will be opened at the beginning of a GC.
10671
// This vec will grow for each stage we call with open_next()
10772
let mut open_stages: Vec<WorkBucketStage> = vec![Unconstrained, Prepare];
10873
// The rest will open after the previous stage is done.
10974
let mut open_next = |s: WorkBucketStage| {
11075
let cur_stages = open_stages.clone();
111-
self_mut.work_buckets[s].set_open_condition(move || {
112-
let should_open =
113-
self.are_buckets_drained(&cur_stages) && self.worker_group().all_parked();
76+
work_buckets[s].set_open_condition(move |scheduler: &GCWorkScheduler<VM>| {
77+
let should_open = scheduler.are_buckets_drained(&cur_stages)
78+
&& scheduler.all_workers_parked();
11479
// Additional check before the `RefClosure` bucket opens.
11580
if should_open && s == WorkBucketStage::RefClosure {
116-
if let Some(closure_end) = self.closure_end.lock().unwrap().as_ref() {
81+
if let Some(closure_end) = scheduler.closure_end.lock().unwrap().as_ref() {
11782
if closure_end() {
11883
// Don't open `RefClosure` if `closure_end` added more works to `Closure`.
11984
return false;
@@ -134,18 +99,72 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
13499
open_next(Final);
135100
}
136101

137-
// Now that the scheduler is initialized, we spawn the worker threads and the controller thread.
138-
spawn_workers(tls);
102+
// Create the work bucket for the controller.
103+
let coordinator_work = WorkBucket::new(true, worker_monitor.clone());
104+
105+
// We prepare the shared part of workers, but do not create the actual workers now.
106+
// The shared parts of workers are communication hubs between controller and workers.
107+
let workers_shared = (0..num_workers)
108+
.map(|_| Arc::new(GCWorkerShared::<VM>::new(worker_monitor.clone())))
109+
.collect::<Vec<_>>();
110+
111+
// Similarly, we create the shared part of the work of the controller, but not the controller itself.
112+
let coordinator_worker_shared = Arc::new(GCWorkerShared::<VM>::new(worker_monitor.clone()));
113+
114+
Arc::new(Self {
115+
work_buckets,
116+
coordinator_work,
117+
workers_shared,
118+
coordinator_worker_shared,
119+
worker_monitor,
120+
closure_end: Mutex::new(None),
121+
})
122+
}
123+
124+
#[inline]
125+
pub fn num_workers(&self) -> usize {
126+
self.workers_shared.len()
127+
}
128+
129+
pub fn all_workers_parked(&self) -> bool {
130+
self.workers_shared.iter().all(|w| w.is_parked())
131+
}
132+
133+
/// Create GC threads, including the controller thread and all workers.
134+
pub fn spawn_gc_threads(self: &Arc<Self>, mmtk: &'static MMTK<VM>, tls: VMThread) {
135+
// Create the communication channel.
136+
let (sender, receiver) = channel::<CoordinatorMessage<VM>>();
139137

138+
// Spawn the controller thread.
139+
let coordinator_worker = GCWorker::new(
140+
mmtk,
141+
0,
142+
self.clone(),
143+
true,
144+
sender.clone(),
145+
self.coordinator_worker_shared.clone(),
146+
);
140147
let gc_controller = GCController::new(
141148
mmtk,
142149
mmtk.plan.base().gc_requester.clone(),
143150
self.clone(),
144151
receiver,
145152
coordinator_worker,
146153
);
147-
148154
VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::<VM>::Controller(gc_controller));
155+
156+
// Spawn each worker thread.
157+
for (ordinal, shared) in self.workers_shared.iter().enumerate() {
158+
let worker = Box::new(GCWorker::new(
159+
mmtk,
160+
ordinal,
161+
self.clone(),
162+
false,
163+
sender.clone(),
164+
shared.clone(),
165+
));
166+
VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::<VM>::Worker(worker));
167+
}
149168
}
150169

151170
/// Schedule all the common work packets
@@ -206,10 +225,6 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
206225
*self.closure_end.lock().unwrap() = Some(f);
207226
}
208227

209-
pub fn worker_group(&self) -> &WorkerGroup<VM> {
210-
self.worker_group.as_ref().unwrap()
211-
}
212-
213228
pub fn all_buckets_empty(&self) -> bool {
214229
self.work_buckets.values().all(|bucket| bucket.is_empty())
215230
}
@@ -221,7 +236,7 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
221236
if id == WorkBucketStage::Unconstrained {
222237
continue;
223238
}
224-
buckets_updated |= bucket.update();
239+
buckets_updated |= bucket.update(self);
225240
}
226241
if buckets_updated {
227242
// Notify the workers for new work
@@ -317,7 +332,7 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
317332
}
318333
// Park this worker
319334
worker.shared.parked.store(true, Ordering::SeqCst);
320-
if self.worker_group().all_parked() {
335+
if self.all_workers_parked() {
321336
worker
322337
.sender
323338
.send(CoordinatorMessage::AllWorkerParked)
@@ -331,33 +346,21 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
331346
}
332347

333348
pub fn enable_stat(&self) {
334-
for worker in &self.worker_group().workers_shared {
349+
for worker in &self.workers_shared {
335350
let worker_stat = worker.borrow_stat();
336351
worker_stat.enable();
337352
}
338-
let coordinator_worker_shared = self
339-
.coordinator_worker_shared
340-
.as_ref()
341-
.unwrap()
342-
.read()
343-
.unwrap();
344-
let coordinator_worker_stat = coordinator_worker_shared.borrow_stat();
353+
let coordinator_worker_stat = self.coordinator_worker_shared.borrow_stat();
345354
coordinator_worker_stat.enable();
346355
}
347356

348357
pub fn statistics(&self) -> HashMap<String, String> {
349358
let mut summary = SchedulerStat::default();
350-
for worker in &self.worker_group().workers_shared {
359+
for worker in &self.workers_shared {
351360
let worker_stat = worker.borrow_stat();
352361
summary.merge(&worker_stat);
353362
}
354-
let coordinator_worker_shared = self
355-
.coordinator_worker_shared
356-
.as_ref()
357-
.unwrap()
358-
.read()
359-
.unwrap();
360-
let coordinator_worker_stat = coordinator_worker_shared.borrow_stat();
363+
let coordinator_worker_stat = self.coordinator_worker_shared.borrow_stat();
361364
summary.merge(&coordinator_worker_stat);
362365
summary.harness_stat()
363366
}

src/scheduler/work_bucket.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ pub struct WorkBucket<VM: VMBinding> {
5959
/// A priority queue
6060
queue: RwLock<BinaryHeap<PrioritizedWork<VM>>>,
6161
monitor: Arc<(Mutex<()>, Condvar)>,
62-
can_open: Option<Box<dyn (Fn() -> bool) + Send>>,
62+
can_open: Option<Box<dyn (Fn(&GCWorkScheduler<VM>) -> bool) + Send>>,
6363
}
6464

6565
impl<VM: VMBinding> WorkBucket<VM> {
@@ -132,12 +132,15 @@ impl<VM: VMBinding> WorkBucket<VM> {
132132
}
133133
self.queue.write().pop().map(|v| v.work)
134134
}
135-
pub fn set_open_condition(&mut self, pred: impl Fn() -> bool + Send + 'static) {
135+
pub fn set_open_condition(
136+
&mut self,
137+
pred: impl Fn(&GCWorkScheduler<VM>) -> bool + Send + 'static,
138+
) {
136139
self.can_open = Some(box pred);
137140
}
138-
pub fn update(&self) -> bool {
141+
pub fn update(&self, scheduler: &GCWorkScheduler<VM>) -> bool {
139142
if let Some(can_open) = self.can_open.as_ref() {
140-
if !self.is_activated() && can_open() {
143+
if !self.is_activated() && can_open(scheduler) {
141144
self.activate();
142145
return true;
143146
}

0 commit comments

Comments
 (0)