From 8532ae872d3600ec21eded882be0fd8cfe6b1089 Mon Sep 17 00:00:00 2001 From: Kunshan Wang Date: Fri, 4 Feb 2022 23:00:26 +0800 Subject: [PATCH] Refactor scheduler and worker creation This commit eliminates unnecessary Option fields, two-phase initialization and unsafe get_mut_unchecked call in GCWorkScheduler. We consider MMTK, GCWorkScheduler and GCWorkerShared as shared data between threads, and GCWorker and GCController as private data of the workers and the controller. We now create shared data, including all GCWorkerShared structs, before spawning any GC threads. This means we no longer spawn GC threads before GCWorkScheduler is fully initialized, and eliminated some unsafe operations. We temporarily make Options::threads only settable via environment variable, because we now create GCWorkerShared instances when GCWorkScheduler is created, which is usually static. --- src/memory_manager.rs | 2 +- src/mmtk.rs | 9 +- src/scheduler/controller.rs | 2 +- src/scheduler/gc_work.rs | 4 +- src/scheduler/scheduler.rs | 177 +++++++++++++++--------------- src/scheduler/work_bucket.rs | 11 +- src/scheduler/worker.rs | 75 +++---------- src/util/options.rs | 23 ++-- src/util/sanity/sanity_checker.rs | 4 +- 9 files changed, 138 insertions(+), 169 deletions(-) diff --git a/src/memory_manager.rs b/src/memory_manager.rs index 490ce6b899..e77a2ab002 100644 --- a/src/memory_manager.rs +++ b/src/memory_manager.rs @@ -196,7 +196,7 @@ pub fn initialize_collection(mmtk: &'static MMTK, tls: VMThre !mmtk.plan.is_initialized(), "MMTk collection has been initialized (was initialize_collection() already called before?)" ); - mmtk.scheduler.initialize(*mmtk.options.threads, mmtk, tls); + mmtk.scheduler.spawn_gc_threads(mmtk, tls); mmtk.plan.base().initialized.store(true, Ordering::SeqCst); } diff --git a/src/mmtk.rs b/src/mmtk.rs index cc3bd2d31f..989fd85baf 100644 --- a/src/mmtk.rs +++ b/src/mmtk.rs @@ -57,8 +57,15 @@ impl MMTK { // The first call will initialize SFT map. Other calls will be blocked until SFT map is initialized. SFT_MAP.initialize_once(); - let scheduler = GCWorkScheduler::new(); let options = Arc::new(UnsafeOptionsWrapper::new(Options::default())); + + let num_workers = if cfg!(feature = "single_worker") { + 1 + } else { + *options.threads + }; + + let scheduler = GCWorkScheduler::new(num_workers); let plan = crate::plan::create_plan( *options.plan, &VM_MAP, diff --git a/src/scheduler/controller.rs b/src/scheduler/controller.rs index 50c111b868..ceaae741d7 100644 --- a/src/scheduler/controller.rs +++ b/src/scheduler/controller.rs @@ -86,7 +86,7 @@ impl GCController { } } let _guard = self.scheduler.worker_monitor.0.lock().unwrap(); - if self.scheduler.worker_group().all_parked() && self.scheduler.all_buckets_empty() { + if self.scheduler.all_workers_parked() && self.scheduler.all_buckets_empty() { break; } } diff --git a/src/scheduler/gc_work.rs b/src/scheduler/gc_work.rs index 13c060e89d..0def65f90a 100644 --- a/src/scheduler/gc_work.rs +++ b/src/scheduler/gc_work.rs @@ -49,7 +49,7 @@ impl GCWork for Prepare { mmtk.scheduler.work_buckets[WorkBucketStage::Prepare] .add(PrepareMutator::::new(mutator)); } - for w in &mmtk.scheduler.worker_group().workers_shared { + for w in &mmtk.scheduler.workers_shared { w.local_work_bucket.add(PrepareCollector); } } @@ -117,7 +117,7 @@ impl GCWork for Release { mmtk.scheduler.work_buckets[WorkBucketStage::Release] .add(ReleaseMutator::::new(mutator)); } - for w in &mmtk.scheduler.worker_group().workers_shared { + for w in &mmtk.scheduler.workers_shared { w.local_work_bucket.add(ReleaseCollector); } // TODO: Process weak references properly diff --git a/src/scheduler/scheduler.rs b/src/scheduler/scheduler.rs index 44b1b98458..1ddefbe6cd 100644 --- a/src/scheduler/scheduler.rs +++ b/src/scheduler/scheduler.rs @@ -1,6 +1,7 @@ use super::stat::SchedulerStat; +use super::work_bucket::WorkBucketStage::*; use super::work_bucket::*; -use super::worker::{GCWorker, GCWorkerShared, WorkerGroup}; +use super::worker::{GCWorker, GCWorkerShared}; use super::*; use crate::mmtk::MMTK; use crate::util::opaque_pointer::*; @@ -10,7 +11,7 @@ use enum_map::{enum_map, EnumMap}; use std::collections::HashMap; use std::sync::atomic::Ordering; use std::sync::mpsc::channel; -use std::sync::{Arc, Condvar, Mutex, RwLock}; +use std::sync::{Arc, Condvar, Mutex}; pub enum CoordinatorMessage { Work(Box>), @@ -18,15 +19,16 @@ pub enum CoordinatorMessage { BucketDrained, } +/// The shared data structure for distributing work packets between worker threads and the coordinator thread. pub struct GCWorkScheduler { - /// Work buckets + /// Work buckets for worker threads pub work_buckets: EnumMap>, /// Work for the coordinator thread pub coordinator_work: WorkBucket, /// The shared parts of GC workers - worker_group: Option>, + pub workers_shared: Vec>>, /// The shared part of the GC worker object of the controller thread - coordinator_worker_shared: Option>>>, + coordinator_worker_shared: Arc>, /// Condition Variable for worker synchronization pub worker_monitor: Arc<(Mutex<()>, Condvar)>, /// A callback to be fired after the `Closure` bucket is drained. @@ -41,66 +43,29 @@ pub struct GCWorkScheduler { closure_end: Mutex bool>>>, } -// The 'channel' inside Scheduler disallows Sync for Scheduler. We have to make sure we use channel properly: -// 1. We should never directly use Sender. We clone the sender and let each worker have their own copy. -// 2. Only the coordinator can use Receiver. -// TODO: We should remove channel from Scheduler, and directly send Sender/Receiver when creating the coordinator and -// the workers. +// FIXME: GCWorkScheduler should be naturally Sync, but we cannot remove this `impl` yet. +// Some subtle interaction between ObjectRememberingBarrier, Mutator and some GCWork instances +// makes the compiler think WorkBucket is not Sync. unsafe impl Sync for GCWorkScheduler {} impl GCWorkScheduler { - pub fn new() -> Arc { + pub fn new(num_workers: usize) -> Arc { let worker_monitor: Arc<(Mutex<()>, Condvar)> = Default::default(); - Arc::new(Self { - work_buckets: enum_map! { - WorkBucketStage::Unconstrained => WorkBucket::new(true, worker_monitor.clone()), - WorkBucketStage::Prepare => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::Closure => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::RefClosure => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::CalculateForwarding => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::RefForwarding => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::Compact => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::Release => WorkBucket::new(false, worker_monitor.clone()), - WorkBucketStage::Final => WorkBucket::new(false, worker_monitor.clone()), - }, - coordinator_work: WorkBucket::new(true, worker_monitor.clone()), - worker_group: None, - coordinator_worker_shared: None, - worker_monitor, - closure_end: Mutex::new(None), - }) - } - #[inline] - pub fn num_workers(&self) -> usize { - self.worker_group.as_ref().unwrap().worker_count() - } - - pub fn initialize( - self: &'static Arc, - num_workers: usize, - mmtk: &'static MMTK, - tls: VMThread, - ) { - use crate::scheduler::work_bucket::WorkBucketStage::*; - let num_workers = if cfg!(feature = "single_worker") { - 1 - } else { - num_workers + // Create work buckets for workers. + let mut work_buckets = enum_map! { + WorkBucketStage::Unconstrained => WorkBucket::new(true, worker_monitor.clone()), + WorkBucketStage::Prepare => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::Closure => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::RefClosure => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::CalculateForwarding => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::RefForwarding => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::Compact => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::Release => WorkBucket::new(false, worker_monitor.clone()), + WorkBucketStage::Final => WorkBucket::new(false, worker_monitor.clone()), }; - let (sender, receiver) = channel::>(); - - let mut self_mut = self.clone(); - let self_mut = unsafe { Arc::get_mut_unchecked(&mut self_mut) }; - - let coordinator_worker = GCWorker::new(mmtk, 0, self.clone(), true, sender.clone()); - self_mut.coordinator_worker_shared = Some(RwLock::new(coordinator_worker.shared.clone())); - - let (worker_group, spawn_workers) = - WorkerGroup::new(mmtk, num_workers, self.clone(), sender); - self_mut.worker_group = Some(worker_group); - + // Set the open condition of each bucket. { // Unconstrained is always open. Prepare will be opened at the beginning of a GC. // This vec will grow for each stage we call with open_next() @@ -108,12 +73,12 @@ impl GCWorkScheduler { // The rest will open after the previous stage is done. let mut open_next = |s: WorkBucketStage| { let cur_stages = open_stages.clone(); - self_mut.work_buckets[s].set_open_condition(move || { - let should_open = - self.are_buckets_drained(&cur_stages) && self.worker_group().all_parked(); + work_buckets[s].set_open_condition(move |scheduler: &GCWorkScheduler| { + let should_open = scheduler.are_buckets_drained(&cur_stages) + && scheduler.all_workers_parked(); // Additional check before the `RefClosure` bucket opens. if should_open && s == WorkBucketStage::RefClosure { - if let Some(closure_end) = self.closure_end.lock().unwrap().as_ref() { + if let Some(closure_end) = scheduler.closure_end.lock().unwrap().as_ref() { if closure_end() { // Don't open `RefClosure` if `closure_end` added more works to `Closure`. return false; @@ -134,9 +99,51 @@ impl GCWorkScheduler { open_next(Final); } - // Now that the scheduler is initialized, we spawn the worker threads and the controller thread. - spawn_workers(tls); + // Create the work bucket for the controller. + let coordinator_work = WorkBucket::new(true, worker_monitor.clone()); + + // We prepare the shared part of workers, but do not create the actual workers now. + // The shared parts of workers are communication hubs between controller and workers. + let workers_shared = (0..num_workers) + .map(|_| Arc::new(GCWorkerShared::::new(worker_monitor.clone()))) + .collect::>(); + + // Similarly, we create the shared part of the work of the controller, but not the controller itself. + let coordinator_worker_shared = Arc::new(GCWorkerShared::::new(worker_monitor.clone())); + + Arc::new(Self { + work_buckets, + coordinator_work, + workers_shared, + coordinator_worker_shared, + worker_monitor, + closure_end: Mutex::new(None), + }) + } + + #[inline] + pub fn num_workers(&self) -> usize { + self.workers_shared.len() + } + + pub fn all_workers_parked(&self) -> bool { + self.workers_shared.iter().all(|w| w.is_parked()) + } + + /// Create GC threads, including the controller thread and all workers. + pub fn spawn_gc_threads(self: &Arc, mmtk: &'static MMTK, tls: VMThread) { + // Create the communication channel. + let (sender, receiver) = channel::>(); + // Spawn the controller thread. + let coordinator_worker = GCWorker::new( + mmtk, + 0, + self.clone(), + true, + sender.clone(), + self.coordinator_worker_shared.clone(), + ); let gc_controller = GCController::new( mmtk, mmtk.plan.base().gc_requester.clone(), @@ -144,8 +151,20 @@ impl GCWorkScheduler { receiver, coordinator_worker, ); - VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::::Controller(gc_controller)); + + // Spawn each worker thread. + for (ordinal, shared) in self.workers_shared.iter().enumerate() { + let worker = Box::new(GCWorker::new( + mmtk, + ordinal, + self.clone(), + false, + sender.clone(), + shared.clone(), + )); + VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::::Worker(worker)); + } } /// Schedule all the common work packets @@ -206,10 +225,6 @@ impl GCWorkScheduler { *self.closure_end.lock().unwrap() = Some(f); } - pub fn worker_group(&self) -> &WorkerGroup { - self.worker_group.as_ref().unwrap() - } - pub fn all_buckets_empty(&self) -> bool { self.work_buckets.values().all(|bucket| bucket.is_empty()) } @@ -221,7 +236,7 @@ impl GCWorkScheduler { if id == WorkBucketStage::Unconstrained { continue; } - buckets_updated |= bucket.update(); + buckets_updated |= bucket.update(self); } if buckets_updated { // Notify the workers for new work @@ -317,7 +332,7 @@ impl GCWorkScheduler { } // Park this worker worker.shared.parked.store(true, Ordering::SeqCst); - if self.worker_group().all_parked() { + if self.all_workers_parked() { worker .sender .send(CoordinatorMessage::AllWorkerParked) @@ -331,33 +346,21 @@ impl GCWorkScheduler { } pub fn enable_stat(&self) { - for worker in &self.worker_group().workers_shared { + for worker in &self.workers_shared { let worker_stat = worker.borrow_stat(); worker_stat.enable(); } - let coordinator_worker_shared = self - .coordinator_worker_shared - .as_ref() - .unwrap() - .read() - .unwrap(); - let coordinator_worker_stat = coordinator_worker_shared.borrow_stat(); + let coordinator_worker_stat = self.coordinator_worker_shared.borrow_stat(); coordinator_worker_stat.enable(); } pub fn statistics(&self) -> HashMap { let mut summary = SchedulerStat::default(); - for worker in &self.worker_group().workers_shared { + for worker in &self.workers_shared { let worker_stat = worker.borrow_stat(); summary.merge(&worker_stat); } - let coordinator_worker_shared = self - .coordinator_worker_shared - .as_ref() - .unwrap() - .read() - .unwrap(); - let coordinator_worker_stat = coordinator_worker_shared.borrow_stat(); + let coordinator_worker_stat = self.coordinator_worker_shared.borrow_stat(); summary.merge(&coordinator_worker_stat); summary.harness_stat() } diff --git a/src/scheduler/work_bucket.rs b/src/scheduler/work_bucket.rs index f5b9ad3150..cc6badd08b 100644 --- a/src/scheduler/work_bucket.rs +++ b/src/scheduler/work_bucket.rs @@ -59,7 +59,7 @@ pub struct WorkBucket { /// A priority queue queue: RwLock>>, monitor: Arc<(Mutex<()>, Condvar)>, - can_open: Option bool) + Send>>, + can_open: Option) -> bool) + Send>>, } impl WorkBucket { @@ -132,12 +132,15 @@ impl WorkBucket { } self.queue.write().pop().map(|v| v.work) } - pub fn set_open_condition(&mut self, pred: impl Fn() -> bool + Send + 'static) { + pub fn set_open_condition( + &mut self, + pred: impl Fn(&GCWorkScheduler) -> bool + Send + 'static, + ) { self.can_open = Some(box pred); } - pub fn update(&self) -> bool { + pub fn update(&self, scheduler: &GCWorkScheduler) -> bool { if let Some(can_open) = self.can_open.as_ref() { - if !self.is_activated() && can_open() { + if !self.is_activated() && can_open(scheduler) { self.activate(); return true; } diff --git a/src/scheduler/worker.rs b/src/scheduler/worker.rs index 50a1701bd8..809c78eb27 100644 --- a/src/scheduler/worker.rs +++ b/src/scheduler/worker.rs @@ -4,11 +4,11 @@ use super::*; use crate::mmtk::MMTK; use crate::util::copy::GCWorkerCopyContext; use crate::util::opaque_pointer::*; -use crate::vm::{Collection, GCThreadContext, VMBinding}; +use crate::vm::VMBinding; use atomic_refcell::{AtomicRef, AtomicRefCell, AtomicRefMut}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::Sender; -use std::sync::Arc; +use std::sync::{Arc, Condvar, Mutex}; const LOCALLY_CACHED_WORKS: usize = 1; @@ -23,6 +23,16 @@ pub struct GCWorkerShared { pub local_work_bucket: WorkBucket, } +impl GCWorkerShared { + pub fn new(worker_monitor: Arc<(Mutex<()>, Condvar)>) -> Self { + Self { + parked: AtomicBool::new(true), + stat: Default::default(), + local_work_bucket: WorkBucket::new(true, worker_monitor), + } + } +} + /// A GC worker. This part is privately owned by a worker thread. /// The GC controller also has an embedded `GCWorker` because it may also execute work packets. pub struct GCWorker { @@ -77,8 +87,8 @@ impl GCWorker { scheduler: Arc>, is_coordinator: bool, sender: Sender>, + shared: Arc>, ) -> Self { - let worker_monitor = scheduler.worker_monitor.clone(); Self { tls: VMWorkerThread(VMThread::UNINITIALIZED), ordinal, @@ -89,11 +99,7 @@ impl GCWorker { mmtk, is_coordinator, local_work_buffer: Vec::with_capacity(LOCALLY_CACHED_WORKS), - shared: Arc::new(GCWorkerShared { - parked: AtomicBool::new(true), - stat: Default::default(), - local_work_bucket: WorkBucket::new(true, worker_monitor), - }), + shared, } } @@ -149,56 +155,3 @@ impl GCWorker { } } } - -pub struct WorkerGroup { - pub workers_shared: Vec>>, -} - -impl WorkerGroup { - pub fn new( - mmtk: &'static MMTK, - workers: usize, - scheduler: Arc>, - sender: Sender>, - ) -> (Self, Box) { - let mut workers_shared = Vec::new(); - let mut workers_to_spawn = Vec::new(); - - for ordinal in 0..workers { - let worker = Box::new(GCWorker::new( - mmtk, - ordinal, - scheduler.clone(), - false, - sender.clone(), - )); - let worker_shared = worker.shared.clone(); - workers_shared.push(worker_shared); - workers_to_spawn.push(worker); - } - - // NOTE: We cannot call spawn_gc_thread here, - // because the worker will access `Scheduler::worker_group` immediately after started, - // but that field will not be assigned to before this function returns. - // Therefore we defer the spawning operation later. - let deferred_spawn = Box::new(move |tls| { - for worker in workers_to_spawn.drain(..) { - VM::VMCollection::spawn_gc_thread(tls, GCThreadContext::::Worker(worker)); - } - }); - - (Self { workers_shared }, deferred_spawn) - } - - pub fn worker_count(&self) -> usize { - self.workers_shared.len() - } - - pub fn parked_workers(&self) -> usize { - self.workers_shared.iter().filter(|w| w.is_parked()).count() - } - - pub fn all_parked(&self) -> bool { - self.parked_workers() == self.worker_count() - } -} diff --git a/src/util/options.rs b/src/util/options.rs index ae0ed7926b..b49c4cd654 100644 --- a/src/util/options.rs +++ b/src/util/options.rs @@ -148,9 +148,9 @@ mod process_tests { fn test_process_valid() { serial_test(|| { let options = UnsafeOptionsWrapper::new(Options::default()); - let success = unsafe { options.process("threads", "1") }; + let success = unsafe { options.process("no_finalizer", "true") }; assert!(success); - assert_eq!(*options.threads, 1); + assert!(*options.no_finalizer); }) } @@ -158,10 +158,10 @@ mod process_tests { fn test_process_invalid() { serial_test(|| { let options = UnsafeOptionsWrapper::new(Options::default()); - let default_threads = *options.threads; - let success = unsafe { options.process("threads", "a") }; + let default_no_finalizer = *options.no_finalizer; + let success = unsafe { options.process("no_finalizer", "100") }; assert!(!success); - assert_eq!(*options.threads, default_threads); + assert_eq!(*options.no_finalizer, default_no_finalizer); }) } @@ -178,9 +178,9 @@ mod process_tests { fn test_process_bulk_valid() { serial_test(|| { let options = UnsafeOptionsWrapper::new(Options::default()); - let success = unsafe { options.process_bulk("threads=1 stress_factor=42") }; + let success = unsafe { options.process_bulk("no_finalizer=true stress_factor=42") }; assert!(success); - assert_eq!(*options.threads, 1); + assert!(*options.no_finalizer); assert_eq!(*options.stress_factor, 42); }) } @@ -189,7 +189,7 @@ mod process_tests { fn test_process_bulk_invalid() { serial_test(|| { let options = UnsafeOptionsWrapper::new(Options::default()); - let success = unsafe { options.process_bulk("threads=a stress_factor=42") }; + let success = unsafe { options.process_bulk("no_finalizer=true stress_factor=a") }; assert!(!success); }) } @@ -307,8 +307,11 @@ macro_rules! options { options! { // The plan to use. This needs to be initialized before creating an MMTk instance (currently by setting env vars) plan: PlanSelector [env_var: true, command_line: false] [always_valid] = PlanSelector::NoGC, - // Number of GC threads. - threads: usize [env_var: true, command_line: true] [|v: &usize| *v > 0] = num_cpus::get(), + // Number of GC worker threads. (There is always one GC controller thread.) + // FIXME: Currently we create GCWorkScheduler when MMTK is created, which is usually static. + // To allow this as a command-line option, we need to refactor the creation fo the `MMTK` instance. + // See: https://github.com/mmtk/mmtk-core/issues/532 + threads: usize [env_var: true, command_line: false] [|v: &usize| *v > 0] = num_cpus::get(), // Enable an optimization that only scans the part of the stack that has changed since the last GC (not supported) use_short_stack_scans: bool [env_var: true, command_line: true] [always_valid] = false, // Enable a return barrier (not supported) diff --git a/src/util/sanity/sanity_checker.rs b/src/util/sanity/sanity_checker.rs index c7b7621150..e02e67c4d3 100644 --- a/src/util/sanity/sanity_checker.rs +++ b/src/util/sanity/sanity_checker.rs @@ -109,7 +109,7 @@ impl GCWork for SanityPrepare

{ mmtk.scheduler.work_buckets[WorkBucketStage::Prepare] .add(PrepareMutator::::new(mutator)); } - for w in &mmtk.scheduler.worker_group().workers_shared { + for w in &mmtk.scheduler.workers_shared { w.local_work_bucket.add(PrepareCollector); } } @@ -133,7 +133,7 @@ impl GCWork for SanityRelease

{ mmtk.scheduler.work_buckets[WorkBucketStage::Release] .add(ReleaseMutator::::new(mutator)); } - for w in &mmtk.scheduler.worker_group().workers_shared { + for w in &mmtk.scheduler.workers_shared { w.local_work_bucket.add(ReleaseCollector); } }