Skip to content

Commit 4ebb6b6

Browse files
authored
Fix GCWorker ownership. (#523)
This commit fixes an inconsistency in the VMCollection interface where a GCWorker is exposed to the binding via `Collection::spawn_worker_thread` as a `&GCWorker`, but later received from the binding via `memory_manager::start_worker` as a `&mut GCWorker`. The root cause is because GCWorker is wrongly owned by the scheduler. We make each GC worker thread the owner of its `GCWorker` struct, and we pass the `GCWorker` struct across API boundary as owning `Box`, fixing the interface. We isolate the part of `GCWorker` shared with the GC scheduler into a `GCWorkerShared` struct, and ensure the fields are properly synchronized. Particularly, we now use `AtomicRefCell` for `WorkerLocalStat`. `AtomicRefCell` allows it to be borrowed mutably by the GC worker and the mutator (via `harness_begin/end` methods) at different time. However, it is a known issue that in concurrent GC, it is possible for GC to happen when `harness_begin/end` is called. In that case, it will panic. Fixes: #522
1 parent 749c0e7 commit 4ebb6b6

File tree

8 files changed

+114
-67
lines changed

8 files changed

+114
-67
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ atomic = "0.4.6"
3434
spin = "0.5.2"
3535
env_logger = "0.8.2"
3636
pfm = {version = "0.1.0-beta.1", optional = true}
37+
atomic_refcell = "0.1.7"
3738

3839
[dev-dependencies]
3940
crossbeam = "0.7.3"

src/scheduler/gc_work.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ impl<C: GCWorkContext + 'static> GCWork<C::VM> for Prepare<C> {
4949
mmtk.scheduler.work_buckets[WorkBucketStage::Prepare]
5050
.add(PrepareMutator::<C::VM>::new(mutator));
5151
}
52-
for w in &mmtk.scheduler.worker_group().workers {
52+
for w in &mmtk.scheduler.worker_group().workers_shared {
5353
w.local_work_bucket.add(PrepareCollector);
5454
}
5555
}
@@ -117,7 +117,7 @@ impl<C: GCWorkContext + 'static> GCWork<C::VM> for Release<C> {
117117
mmtk.scheduler.work_buckets[WorkBucketStage::Release]
118118
.add(ReleaseMutator::<C::VM>::new(mutator));
119119
}
120-
for w in &mmtk.scheduler.worker_group().workers {
120+
for w in &mmtk.scheduler.worker_group().workers_shared {
121121
w.local_work_bucket.add(ReleaseCollector);
122122
}
123123
// TODO: Process weak references properly

src/scheduler/scheduler.rs

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ pub struct GCWorkScheduler<VM: VMBinding> {
2222
/// Work for the coordinator thread
2323
pub coordinator_work: WorkBucket<VM>,
2424
/// workers
25-
worker_group: Option<Arc<WorkerGroup<VM>>>,
25+
worker_group: Option<WorkerGroup<VM>>,
2626
/// Condition Variable for worker synchronization
2727
pub worker_monitor: Arc<(Mutex<()>, Condvar)>,
2828
mmtk: Option<&'static MMTK<VM>>,
@@ -104,16 +104,16 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
104104
self_mut.mmtk = Some(mmtk);
105105
self_mut.coordinator_worker = Some(RwLock::new(GCWorker::new(
106106
0,
107-
Arc::downgrade(self),
107+
self.clone(),
108108
true,
109109
self.channel.0.clone(),
110110
)));
111-
self_mut.worker_group = Some(WorkerGroup::new(
112-
num_workers,
113-
Arc::downgrade(self),
114-
self.channel.0.clone(),
115-
));
116-
self.worker_group.as_ref().unwrap().spawn_workers(tls, mmtk);
111+
let (worker_group, spawn_workers) =
112+
WorkerGroup::new(num_workers, self.clone(), self.channel.0.clone());
113+
self_mut.worker_group = Some(worker_group);
114+
// FIXME: because of the `Arc::get_mut_unchanged` above, we are now mutating the scheduler
115+
// while the spawned workers already have access to the scheduler.
116+
spawn_workers(tls);
117117

118118
{
119119
// Unconstrained is always open. Prepare will be opened at the beginning of a GC.
@@ -223,8 +223,8 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
223223
*self.closure_end.lock().unwrap() = Some(f);
224224
}
225225

226-
pub fn worker_group(&self) -> Arc<WorkerGroup<VM>> {
227-
self.worker_group.as_ref().unwrap().clone()
226+
pub fn worker_group(&self) -> &WorkerGroup<VM> {
227+
self.worker_group.as_ref().unwrap()
228228
}
229229

230230
fn all_buckets_empty(&self) -> bool {
@@ -330,8 +330,8 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
330330

331331
#[inline]
332332
fn pop_scheduable_work(&self, worker: &GCWorker<VM>) -> Option<(Box<dyn GCWork<VM>>, bool)> {
333-
if let Some(work) = worker.local_work_bucket.poll() {
334-
return Some((work, worker.local_work_bucket.is_empty()));
333+
if let Some(work) = worker.shared.local_work_bucket.poll() {
334+
return Some((work, worker.shared.local_work_bucket.is_empty()));
335335
}
336336
for work_bucket in self.work_buckets.values() {
337337
if let Some(work) = work_bucket.poll() {
@@ -360,10 +360,10 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
360360

361361
#[cold]
362362
fn poll_slow(&self, worker: &GCWorker<VM>) -> Box<dyn GCWork<VM>> {
363-
debug_assert!(!worker.is_parked());
363+
debug_assert!(!worker.shared.is_parked());
364364
let mut guard = self.worker_monitor.0.lock().unwrap();
365365
loop {
366-
debug_assert!(!worker.is_parked());
366+
debug_assert!(!worker.shared.is_parked());
367367
if let Some((work, bucket_is_empty)) = self.pop_scheduable_work(worker) {
368368
if bucket_is_empty {
369369
worker
@@ -374,7 +374,7 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
374374
return work;
375375
}
376376
// Park this worker
377-
worker.parked.store(true, Ordering::SeqCst);
377+
worker.shared.parked.store(true, Ordering::SeqCst);
378378
if self.worker_group().all_parked() {
379379
worker
380380
.sender
@@ -384,25 +384,29 @@ impl<VM: VMBinding> GCWorkScheduler<VM> {
384384
// Wait
385385
guard = self.worker_monitor.1.wait(guard).unwrap();
386386
// Unpark this worker
387-
worker.parked.store(false, Ordering::SeqCst);
387+
worker.shared.parked.store(false, Ordering::SeqCst);
388388
}
389389
}
390390

391391
pub fn enable_stat(&self) {
392-
for worker in &self.worker_group().workers {
393-
worker.stat.enable();
392+
for worker in &self.worker_group().workers_shared {
393+
let worker_stat = worker.borrow_stat();
394+
worker_stat.enable();
394395
}
395396
let coordinator_worker = self.coordinator_worker.as_ref().unwrap().read().unwrap();
396-
coordinator_worker.stat.enable();
397+
let coordinator_worker_stat = coordinator_worker.shared.borrow_stat();
398+
coordinator_worker_stat.enable();
397399
}
398400

399401
pub fn statistics(&self) -> HashMap<String, String> {
400402
let mut summary = SchedulerStat::default();
401-
for worker in &self.worker_group().workers {
402-
summary.merge(&worker.stat);
403+
for worker in &self.worker_group().workers_shared {
404+
let worker_stat = worker.borrow_stat();
405+
summary.merge(&worker_stat);
403406
}
404407
let coordinator_worker = self.coordinator_worker.as_ref().unwrap().read().unwrap();
405-
summary.merge(&coordinator_worker.stat);
408+
let coordinator_worker_stat = coordinator_worker.shared.borrow_stat();
409+
summary.merge(&coordinator_worker_stat);
406410
summary.harness_stat()
407411
}
408412

src/scheduler/work.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,20 @@ pub trait GCWork<VM: VMBinding>: 'static + Send {
2828

2929
#[cfg(feature = "work_packet_stats")]
3030
// Start collecting statistics
31-
let stat = worker
32-
.stat
33-
.measure_work(TypeId::of::<Self>(), type_name::<Self>(), mmtk);
31+
let stat = {
32+
let mut worker_stat = worker.shared.borrow_stat_mut();
33+
worker_stat.measure_work(TypeId::of::<Self>(), type_name::<Self>(), mmtk)
34+
};
3435

3536
// Do the actual work
3637
self.do_work(worker, mmtk);
3738

3839
#[cfg(feature = "work_packet_stats")]
3940
// Finish collecting statistics
40-
stat.end_of_work(&mut worker.stat);
41+
{
42+
let mut worker_stat = worker.shared.borrow_stat_mut();
43+
stat.end_of_work(&mut worker_stat);
44+
}
4145
}
4246
}
4347

src/scheduler/worker.rs

Lines changed: 72 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -5,50 +5,78 @@ use crate::mmtk::MMTK;
55
use crate::util::copy::GCWorkerCopyContext;
66
use crate::util::opaque_pointer::*;
77
use crate::vm::{Collection, VMBinding};
8+
use atomic_refcell::{AtomicRef, AtomicRefCell, AtomicRefMut};
89
use std::sync::atomic::{AtomicBool, Ordering};
910
use std::sync::mpsc::Sender;
10-
use std::sync::{Arc, Weak};
11+
use std::sync::Arc;
1112

1213
const LOCALLY_CACHED_WORKS: usize = 1;
1314

15+
/// The part shared between a GCWorker and the scheduler.
16+
/// This structure is used for communication, e.g. adding new work packets.
17+
pub struct GCWorkerShared<VM: VMBinding> {
18+
pub parked: AtomicBool,
19+
stat: AtomicRefCell<WorkerLocalStat<VM>>,
20+
pub local_work_bucket: WorkBucket<VM>,
21+
}
22+
23+
/// A GC worker. This part is privately owned by a worker thread.
1424
pub struct GCWorker<VM: VMBinding> {
1525
pub tls: VMWorkerThread,
1626
pub ordinal: usize,
17-
pub parked: AtomicBool,
1827
scheduler: Arc<GCWorkScheduler<VM>>,
1928
copy: GCWorkerCopyContext<VM>,
20-
pub local_work_bucket: WorkBucket<VM>,
2129
pub sender: Sender<CoordinatorMessage<VM>>,
22-
pub stat: WorkerLocalStat<VM>,
2330
mmtk: Option<&'static MMTK<VM>>,
2431
is_coordinator: bool,
2532
local_work_buffer: Vec<(WorkBucketStage, Box<dyn GCWork<VM>>)>,
33+
pub shared: Arc<GCWorkerShared<VM>>,
2634
}
2735

28-
unsafe impl<VM: VMBinding> Sync for GCWorker<VM> {}
29-
unsafe impl<VM: VMBinding> Send for GCWorker<VM> {}
36+
unsafe impl<VM: VMBinding> Sync for GCWorkerShared<VM> {}
37+
unsafe impl<VM: VMBinding> Send for GCWorkerShared<VM> {}
38+
39+
// Error message for borrowing `GCWorkerShared::stat`.
40+
const STAT_BORROWED_MSG: &str = "GCWorkerShared.stat is already borrowed. This may happen if \
41+
the mutator calls harness_begin or harness_end while the GC is running.";
42+
43+
impl<VM: VMBinding> GCWorkerShared<VM> {
44+
pub fn is_parked(&self) -> bool {
45+
self.parked.load(Ordering::SeqCst)
46+
}
47+
48+
pub fn borrow_stat(&self) -> AtomicRef<WorkerLocalStat<VM>> {
49+
self.stat.try_borrow().expect(STAT_BORROWED_MSG)
50+
}
51+
52+
pub fn borrow_stat_mut(&self) -> AtomicRefMut<WorkerLocalStat<VM>> {
53+
self.stat.try_borrow_mut().expect(STAT_BORROWED_MSG)
54+
}
55+
}
3056

3157
impl<VM: VMBinding> GCWorker<VM> {
3258
pub fn new(
3359
ordinal: usize,
34-
scheduler: Weak<GCWorkScheduler<VM>>,
60+
scheduler: Arc<GCWorkScheduler<VM>>,
3561
is_coordinator: bool,
3662
sender: Sender<CoordinatorMessage<VM>>,
3763
) -> Self {
38-
let scheduler = scheduler.upgrade().unwrap();
64+
let worker_monitor = scheduler.worker_monitor.clone();
3965
Self {
4066
tls: VMWorkerThread(VMThread::UNINITIALIZED),
4167
ordinal,
42-
parked: AtomicBool::new(true),
4368
// We will set this later
4469
copy: GCWorkerCopyContext::new_non_copy(),
45-
local_work_bucket: WorkBucket::new(true, scheduler.worker_monitor.clone()),
4670
sender,
4771
scheduler,
48-
stat: Default::default(),
4972
mmtk: None,
5073
is_coordinator,
5174
local_work_buffer: Vec::with_capacity(LOCALLY_CACHED_WORKS),
75+
shared: Arc::new(GCWorkerShared {
76+
parked: AtomicBool::new(true),
77+
stat: Default::default(),
78+
local_work_bucket: WorkBucket::new(true, worker_monitor),
79+
}),
5280
}
5381
}
5482

@@ -73,10 +101,6 @@ impl<VM: VMBinding> GCWorker<VM> {
73101
}
74102
}
75103

76-
pub fn is_parked(&self) -> bool {
77-
self.parked.load(Ordering::SeqCst)
78-
}
79-
80104
pub fn is_coordinator(&self) -> bool {
81105
self.is_coordinator
82106
}
@@ -97,52 +121,66 @@ impl<VM: VMBinding> GCWorker<VM> {
97121
self.tls = tls;
98122
self.copy = crate::plan::create_gc_worker_context(tls, mmtk);
99123
self.mmtk = Some(mmtk);
100-
self.parked.store(false, Ordering::SeqCst);
124+
self.shared.parked.store(false, Ordering::SeqCst);
101125
loop {
102126
while let Some((bucket, mut work)) = self.local_work_buffer.pop() {
103127
debug_assert!(self.scheduler.work_buckets[bucket].is_activated());
104128
work.do_work_with_stat(self, mmtk);
105129
}
106130
let mut work = self.scheduler().poll(self);
107-
debug_assert!(!self.is_parked());
131+
debug_assert!(!self.shared.is_parked());
108132
work.do_work_with_stat(self, mmtk);
109133
}
110134
}
111135
}
112136

113137
pub struct WorkerGroup<VM: VMBinding> {
114-
pub workers: Vec<GCWorker<VM>>,
138+
pub workers_shared: Vec<Arc<GCWorkerShared<VM>>>,
115139
}
116140

117141
impl<VM: VMBinding> WorkerGroup<VM> {
118142
pub fn new(
119143
workers: usize,
120-
scheduler: Weak<GCWorkScheduler<VM>>,
144+
scheduler: Arc<GCWorkScheduler<VM>>,
121145
sender: Sender<CoordinatorMessage<VM>>,
122-
) -> Arc<Self> {
123-
Arc::new(Self {
124-
workers: (0..workers)
125-
.map(|i| GCWorker::new(i, scheduler.clone(), false, sender.clone()))
126-
.collect(),
127-
})
146+
) -> (Self, Box<dyn FnOnce(VMThread)>) {
147+
let mut workers_shared = Vec::new();
148+
let mut workers_to_spawn = Vec::new();
149+
150+
for ordinal in 0..workers {
151+
let worker = Box::new(GCWorker::new(
152+
ordinal,
153+
scheduler.clone(),
154+
false,
155+
sender.clone(),
156+
));
157+
let worker_shared = worker.shared.clone();
158+
workers_shared.push(worker_shared);
159+
workers_to_spawn.push(worker);
160+
}
161+
162+
// NOTE: We cannot call spawn_worker_thread here,
163+
// because the worker will access `Scheduler::worker_group` immediately after started,
164+
// but that field will not be assigned to before this function returns.
165+
// Therefore we defer the spawning operation later.
166+
let deferred_spawn = Box::new(move |tls| {
167+
for worker in workers_to_spawn.drain(..) {
168+
VM::VMCollection::spawn_worker_thread(tls, Some(worker));
169+
}
170+
});
171+
172+
(Self { workers_shared }, deferred_spawn)
128173
}
129174

130175
pub fn worker_count(&self) -> usize {
131-
self.workers.len()
176+
self.workers_shared.len()
132177
}
133178

134179
pub fn parked_workers(&self) -> usize {
135-
self.workers.iter().filter(|w| w.is_parked()).count()
180+
self.workers_shared.iter().filter(|w| w.is_parked()).count()
136181
}
137182

138183
pub fn all_parked(&self) -> bool {
139184
self.parked_workers() == self.worker_count()
140185
}
141-
142-
pub fn spawn_workers(&'static self, tls: VMThread, _mmtk: &'static MMTK<VM>) {
143-
for i in 0..self.worker_count() {
144-
let worker = &self.workers[i];
145-
VM::VMCollection::spawn_worker_thread(tls, Some(worker));
146-
}
147-
}
148186
}

src/util/sanity/sanity_checker.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ impl<P: Plan> GCWork<P::VM> for SanityPrepare<P> {
109109
mmtk.scheduler.work_buckets[WorkBucketStage::Prepare]
110110
.add(PrepareMutator::<P::VM>::new(mutator));
111111
}
112-
for w in &mmtk.scheduler.worker_group().workers {
112+
for w in &mmtk.scheduler.worker_group().workers_shared {
113113
w.local_work_bucket.add(PrepareCollector);
114114
}
115115
}
@@ -133,7 +133,7 @@ impl<P: Plan> GCWork<P::VM> for SanityRelease<P> {
133133
mmtk.scheduler.work_buckets[WorkBucketStage::Release]
134134
.add(ReleaseMutator::<P::VM>::new(mutator));
135135
}
136-
for w in &mmtk.scheduler.worker_group().workers {
136+
for w in &mmtk.scheduler.worker_group().workers_shared {
137137
w.local_work_bucket.add(ReleaseCollector);
138138
}
139139
}

src/vm/collection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ pub trait Collection<VM: VMBinding> {
4949
/// calls `initialize_collection()` and passes as an argument.
5050
/// * `ctx`: The GC worker context for the GC thread. If `None` is passed, it means spawning a GC thread for the GC controller,
5151
/// which does not have a worker context.
52-
fn spawn_worker_thread(tls: VMThread, ctx: Option<&GCWorker<VM>>);
52+
fn spawn_worker_thread(tls: VMThread, ctx: Option<Box<GCWorker<VM>>>);
5353

5454
/// Allow VM-specific behaviors for a mutator after all the mutators are stopped and before any actual GC work starts.
5555
///

vmbindings/dummyvm/src/collection.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ impl Collection<DummyVM> for VMCollection {
1919
panic!("block_for_gc is not implemented")
2020
}
2121

22-
fn spawn_worker_thread(_tls: VMThread, _ctx: Option<&GCWorker<DummyVM>>) {
22+
fn spawn_worker_thread(_tls: VMThread, _ctx: Option<Box<GCWorker<DummyVM>>>) {
2323

2424
}
2525

2626
fn prepare_mutator<T: MutatorContext<DummyVM>>(_tls_w: VMWorkerThread, _tls_m: VMMutatorThread, _mutator: &T) {
2727
unimplemented!()
2828
}
29-
}
29+
}

0 commit comments

Comments
 (0)