From 56b44755e0263b4967270a86349fd0911bf307db Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai <66651184+utpilla@users.noreply.github.com> Date: Tue, 13 Aug 2024 07:12:28 +0000 Subject: [PATCH 1/2] Use ValueMap for Gauge --- .../src/metrics/internal/last_value.rs | 93 ++++++++----------- 1 file changed, 41 insertions(+), 52 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/internal/last_value.rs b/opentelemetry-sdk/src/metrics/internal/last_value.rs index 527efee47f..23e174b075 100644 --- a/opentelemetry-sdk/src/metrics/internal/last_value.rs +++ b/opentelemetry-sdk/src/metrics/internal/last_value.rs @@ -1,82 +1,71 @@ use std::{ - collections::{hash_map::Entry, HashMap}, - sync::Mutex, + collections::HashSet, + sync::{atomic::Ordering, Arc}, time::SystemTime, }; -use crate::{metrics::data::DataPoint, metrics::AttributeSet}; -use opentelemetry::{global, metrics::MetricsError, KeyValue}; +use crate::metrics::data::DataPoint; +use opentelemetry::KeyValue; -use super::{ - aggregate::{is_under_cardinality_limit, STREAM_OVERFLOW_ATTRIBUTE_SET}, - Number, -}; - -/// Timestamped measurement data. -struct DataPointValue { - timestamp: SystemTime, - value: T, -} +use super::{Assign, AtomicTracker, Number, ValueMap}; /// Summarizes a set of measurements as the last one made. -#[derive(Default)] -pub(crate) struct LastValue { - values: Mutex>>, +pub(crate) struct LastValue> { + value_map: ValueMap, } impl> LastValue { pub(crate) fn new() -> Self { - Self::default() + LastValue { + value_map: ValueMap::new(), + } } pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { - let d: DataPointValue = DataPointValue { - timestamp: SystemTime::now(), - value: measurement, - }; - - let attrs: AttributeSet = attrs.into(); - if let Ok(mut values) = self.values.lock() { - let size = values.len(); - match values.entry(attrs) { - Entry::Occupied(mut occupied_entry) => { - occupied_entry.insert(d); - } - Entry::Vacant(vacant_entry) => { - if is_under_cardinality_limit(size) { - vacant_entry.insert(d); - } else { - values.insert(STREAM_OVERFLOW_ATTRIBUTE_SET.clone(), d); - global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow.".into())); - } - } - } - } + self.value_map.measure(measurement, attrs); } pub(crate) fn compute_aggregation(&self, dest: &mut Vec>) { + let t = SystemTime::now(); dest.clear(); - let mut values = match self.values.lock() { - Ok(guard) if !guard.is_empty() => guard, - _ => return, - }; - let n = values.len(); + // Max number of data points need to account for the special casing + // of the no attribute value + overflow attribute. + let n = self.value_map.count.load(Ordering::SeqCst) + 2; if n > dest.capacity() { dest.reserve_exact(n - dest.capacity()); } - for (attrs, value) in values.drain() { + if self + .value_map + .has_no_attribute_value + .swap(false, Ordering::AcqRel) + { dest.push(DataPoint { - attributes: attrs - .iter() - .map(|(k, v)| KeyValue::new(k.clone(), v.clone())) - .collect(), - time: Some(value.timestamp), - value: value.value, + attributes: vec![], start_time: None, + time: Some(t), + value: self.value_map.no_attribute_tracker.get_and_reset_value(), exemplars: vec![], }); } + + let mut trackers = match self.value_map.trackers.write() { + Ok(v) => v, + _ => return, + }; + + let mut seen = HashSet::new(); + for (attrs, tracker) in trackers.drain() { + if seen.insert(Arc::as_ptr(&tracker)) { + dest.push(DataPoint { + attributes: attrs.clone(), + start_time: None, + time: Some(t), + value: tracker.get_value(), + exemplars: vec![], + }); + } + } } } From ba287785b7558791d071daf62078486c9a6e5d8b Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai <66651184+utpilla@users.noreply.github.com> Date: Tue, 13 Aug 2024 07:18:49 +0000 Subject: [PATCH 2/2] Code changes --- opentelemetry-sdk/benches/metric_gauge.rs | 2 +- stress/src/metrics_gauge.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/benches/metric_gauge.rs b/opentelemetry-sdk/benches/metric_gauge.rs index 4fe2a4ead7..b4d5686180 100644 --- a/opentelemetry-sdk/benches/metric_gauge.rs +++ b/opentelemetry-sdk/benches/metric_gauge.rs @@ -6,7 +6,7 @@ RAM: 64.0 GB | Test | Average time| |--------------------------------|-------------| - | Gauge_Add | 483.78 ns | + | Gauge_Add | 178.37 ns | */ use criterion::{criterion_group, criterion_main, Criterion}; diff --git a/stress/src/metrics_gauge.rs b/stress/src/metrics_gauge.rs index 0ecdea5d00..9f01dabb16 100644 --- a/stress/src/metrics_gauge.rs +++ b/stress/src/metrics_gauge.rs @@ -3,7 +3,7 @@ OS: Ubuntu 22.04.4 LTS (5.15.153.1-microsoft-standard-WSL2) Hardware: Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz, 16vCPUs, RAM: 64.0 GB - ~1.5 M/sec + ~11.5 M/sec */ use lazy_static::lazy_static;