diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 3ae8bb05ce..9fd45e30cc 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -15,6 +15,7 @@ pub(crate) use scheduler::MMTkScheduler; pub(self) use scheduler::Scheduler; mod stat; +mod work_counter; mod work; pub use work::CoordinatorWork; diff --git a/src/scheduler/stat.rs b/src/scheduler/stat.rs index 9b5a8ca7ac..2cc21da41f 100644 --- a/src/scheduler/stat.rs +++ b/src/scheduler/stat.rs @@ -1,13 +1,28 @@ +//! Statistics for work packets +use super::work_counter::{WorkCounter, WorkCounterBase, WorkDuration}; use std::any::TypeId; use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; -use std::time::{Duration, SystemTime}; +/// Merge and print the work-packet level statistics from all worker threads #[derive(Default)] pub struct SchedulerStat { + /// Map work packet type IDs to work packet names work_id_name_map: HashMap, + /// Count the number of work packets executed for different types work_counts: HashMap, - work_durations: HashMap>, + /// Collect work counters from work threads. + /// Two dimensional vectors are used, e.g. + /// `[[foo_0, ..., foo_n], ..., [bar_0, ..., bar_n]]`. + /// The first dimension is for different types of work counters, + /// (`foo` and `bar` in the above example). + /// The second dimension if for work counters of the same type but from + /// different threads (`foo_0` and `bar_0` are from the same thread). + /// The order of insertion is determined by when [`SchedulerStat::merge`] is + /// called for each [`WorkerLocalStat`]. + /// We assume different threads have the same set of work counters + /// (in the same order). + work_counters: HashMap>>>, } impl SchedulerStat { @@ -22,37 +37,7 @@ impl SchedulerStat { } } - fn geomean(&self, values: &[f64]) -> f64 { - // Geomean(xs, N=xs.len()) = (PI(xs))^(1/N) = e^{log{PI(xs)^(1/N)}} = e^{ (1/N) * sum_{x \in xs}{ log(x) } } - let logs = values.iter().map(|v| v.ln()); - let sum_logs = logs.sum::(); - (sum_logs / values.len() as f64).exp() - } - - fn min(&self, values: &[f64]) -> f64 { - let mut min = values[0]; - for v in values { - if *v < min { - min = *v - } - } - min - } - - fn max(&self, values: &[f64]) -> f64 { - let mut max = values[0]; - for v in values { - if *v > max { - max = *v - } - } - max - } - - fn sum(&self, values: &[f64]) -> f64 { - values.iter().sum() - } - + /// Used during statistics printing at [`crate::memory_manager::harness_end`] pub fn harness_stat(&self) -> HashMap { let mut stat = HashMap::new(); // Work counts @@ -67,59 +52,59 @@ impl SchedulerStat { } stat.insert("total-work.count".to_owned(), format!("{}", total_count)); // Work execution times - let mut total_durations = vec![]; - for (t, durations) in &self.work_durations { - for d in durations { - total_durations.push(*d); - } + let mut duration_overall: WorkCounterBase = Default::default(); + for (t, vs) in &self.work_counters { + // Name of the work packet type let n = self.work_id_name_map[t]; - let geomean = self.geomean( - &durations - .iter() - .map(|d| d.as_nanos() as f64) - .collect::>(), - ); - stat.insert( - format!("work.{}.time.geomean", self.work_name(n)), - format!("{:.2}", geomean), - ); - let sum = self.sum( - &durations + // Iterate through different types of work counters + for v in vs.iter() { + // Aggregate work counters of the same type but from different + // worker threads + let fold = v .iter() - .map(|d| d.as_nanos() as f64) - .collect::>(), - ); - stat.insert( - format!("work.{}.time.sum", self.work_name(n)), - format!("{:.2}", sum), - ); - } - let durations = total_durations - .iter() - .map(|d| d.as_nanos() as f64) - .collect::>(); - if !durations.is_empty() { - stat.insert( - "total-work.time.geomean".to_owned(), - format!("{:.2}", self.geomean(&durations)), - ); - stat.insert( - "total-work.time.min".to_owned(), - format!("{:.2}", self.min(&durations)), - ); - stat.insert( - "total-work.time.max".to_owned(), - format!("{:.2}", self.max(&durations)), - ); + .fold(Default::default(), |acc: WorkCounterBase, x| { + acc.merge(x.get_base()) + }); + // Update the overall execution time + duration_overall.merge_inplace(&fold); + let name = v.first().unwrap().name(); + stat.insert( + format!("work.{}.{}.total", self.work_name(n), name), + format!("{:.2}", fold.total), + ); + stat.insert( + format!("work.{}.{}.min", self.work_name(n), name), + format!("{:.2}", fold.min), + ); + stat.insert( + format!("work.{}.{}.max", self.work_name(n), name), + format!("{:.2}", fold.max), + ); + } } + // Print out overall execution time + stat.insert( + "total-work.time.total".to_owned(), + format!("{:.2}", duration_overall.total), + ); + stat.insert( + "total-work.time.min".to_owned(), + format!("{:.2}", duration_overall.min), + ); + stat.insert( + "total-work.time.max".to_owned(), + format!("{:.2}", duration_overall.max), + ); stat } - + /// Merge work counters from different worker threads pub fn merge(&mut self, stat: &WorkerLocalStat) { + // Merge work packet type ID to work packet name mapping for (id, name) in &stat.work_id_name_map { self.work_id_name_map.insert(*id, *name); } + // Merge work count for different work packet types for (id, count) in &stat.work_counts { if self.work_counts.contains_key(id) { *self.work_counts.get_mut(id).unwrap() += *count; @@ -127,49 +112,64 @@ impl SchedulerStat { self.work_counts.insert(*id, *count); } } - for (id, durations) in &stat.work_durations { - if self.work_durations.contains_key(id) { - let work_durations = self.work_durations.get_mut(id).unwrap(); - for d in durations { - work_durations.push(*d); - } - } else { - self.work_durations.insert(*id, durations.clone()); + // Merge work counter for different work packet types + for (id, counters) in &stat.work_counters { + // Initialize the two dimensional vector + // [ + // [], // foo counter + // [], // bar counter + // ] + let vs = self + .work_counters + .entry(*id) + .or_insert_with(|| vec![vec![]; counters.len()]); + // [ + // [counters[0] of type foo], + // [counters[1] of type bar] + // ] + for (v, c) in vs.iter_mut().zip(counters.iter()) { + v.push(c.clone()); } } } } +/// Describing a single work packet pub struct WorkStat { type_id: TypeId, type_name: &'static str, - start_time: SystemTime, } impl WorkStat { + /// Stop all work counters for the work packet type of the just executed + /// work packet #[inline(always)] pub fn end_of_work(&self, worker_stat: &mut WorkerLocalStat) { if !worker_stat.is_enabled() { return; }; + // Insert type ID, name pair worker_stat .work_id_name_map .insert(self.type_id, self.type_name); + // Increment work count *worker_stat.work_counts.entry(self.type_id).or_insert(0) += 1; - let duration = self.start_time.elapsed().unwrap(); + // Stop counters worker_stat - .work_durations + .work_counters .entry(self.type_id) - .or_insert_with(Vec::new) - .push(duration); + .and_modify(|v| { + v.iter_mut().for_each(|c| c.stop()); + }); } } +/// Worker thread local counterpart of [`SchedulerStat`] #[derive(Default)] pub struct WorkerLocalStat { work_id_name_map: HashMap, work_counts: HashMap, - work_durations: HashMap>, + work_counters: HashMap>>, enabled: AtomicBool, } @@ -182,12 +182,26 @@ impl WorkerLocalStat { pub fn enable(&self) { self.enabled.store(true, Ordering::SeqCst); } + /// Measure the execution of a work packet by starting all counters for that + /// type #[inline] pub fn measure_work(&mut self, work_id: TypeId, work_name: &'static str) -> WorkStat { - WorkStat { + let stat = WorkStat { type_id: work_id, type_name: work_name, - start_time: SystemTime::now(), + }; + if self.is_enabled() { + self.work_counters + .entry(work_id) + .or_insert_with(WorkerLocalStat::counter_set) + .iter_mut() + .for_each(|c| c.start()); } + stat + } + + // The set of work counters for all work packet types + fn counter_set() -> Vec> { + vec![Box::new(WorkDuration::new())] } } diff --git a/src/scheduler/work_counter.rs b/src/scheduler/work_counter.rs new file mode 100644 index 0000000000..eea7c08b1b --- /dev/null +++ b/src/scheduler/work_counter.rs @@ -0,0 +1,135 @@ +//! Counter for work packets +//! +//! Provides an abstraction and implementations of counters for collecting +//! work-packet level statistics +//! +//! See [`crate::util::statistics`] for collecting statistics over a GC cycle +use std::time::SystemTime; + +/// Common struct for different work counters +/// +/// Stores the total, min and max of counter readings +#[derive(Copy, Clone, Debug)] +pub(super) struct WorkCounterBase { + pub(super) total: f64, + pub(super) min: f64, + pub(super) max: f64, +} + +/// Make [`WorkCounter`] trait objects cloneable +pub(super) trait WorkCounterClone { + /// Clone the object + fn clone_box(&self) -> Box; +} + +impl WorkCounterClone for T { + fn clone_box(&self) -> Box { + Box::new(self.clone()) + } +} + +/// An abstraction of work counters +/// +/// Use for trait objects, as we have might have types of work counters for +/// the same work packet and the types are not statically known. +/// The overhead should be negligible compared with the cost of executing +/// a work packet. +pub(super) trait WorkCounter: WorkCounterClone + std::fmt::Debug { + // TODO: consolidate with crate::util::statistics::counter::Counter; + /// Start the counter + fn start(&mut self); + /// Stop the counter + fn stop(&mut self); + /// Name of counter + fn name(&self) -> String; + /// Return a reference to [`WorkCounterBase`] + fn get_base(&self) -> &WorkCounterBase; + /// Return a mutatable reference to [`WorkCounterBase`] + fn get_base_mut(&mut self) -> &mut WorkCounterBase; +} + +impl Clone for Box { + fn clone(&self) -> Box { + self.clone_box() + } +} + +impl Default for WorkCounterBase { + fn default() -> Self { + WorkCounterBase { + total: 0.0, + min: f64::INFINITY, + max: f64::NEG_INFINITY, + } + } +} + +impl WorkCounterBase { + /// Merge two [`WorkCounterBase`], keep the semantics of the fields, + /// and return a new object + pub(super) fn merge(&self, other: &Self) -> Self { + let min = self.min.min(other.min); + let max = self.max.max(other.max); + let total = self.total + other.total; + WorkCounterBase { total, min, max } + } + + /// Merge two [`WorkCounterBase`], modify the current object in place, + /// and keep the semantics of the fields + pub(super) fn merge_inplace(&mut self, other: &Self) { + self.min = self.min.min(other.min); + self.max = self.max.max(other.max); + self.total += other.total; + } + + /// Update the object based on a single value + pub(super) fn merge_val(&mut self, val: f64) { + self.min = self.min.min(val); + self.max = self.max.max(val); + self.total += val; + } +} + +/// Measure the durations of work packets +/// +/// Timing is based on [`SystemTime`] +#[derive(Copy, Clone, Debug)] +pub(super) struct WorkDuration { + base: WorkCounterBase, + start_value: Option, + running: bool, +} + +impl WorkDuration { + pub(super) fn new() -> Self { + WorkDuration { + base: Default::default(), + start_value: None, + running: false, + } + } +} + +impl WorkCounter for WorkDuration { + fn start(&mut self) { + self.start_value = Some(SystemTime::now()); + self.running = true; + } + + fn stop(&mut self) { + let duration = self.start_value.unwrap().elapsed().unwrap().as_nanos() as f64; + self.base.merge_val(duration); + } + + fn name(&self) -> String { + "time".to_owned() + } + + fn get_base(&self) -> &WorkCounterBase { + &self.base + } + + fn get_base_mut(&mut self) -> &mut WorkCounterBase { + &mut self.base + } +}