diff --git a/opentelemetry-sdk/benches/metric_counter.rs b/opentelemetry-sdk/benches/metric_counter.rs index 8ae8a34f6f..34afa4defc 100644 --- a/opentelemetry-sdk/benches/metric_counter.rs +++ b/opentelemetry-sdk/benches/metric_counter.rs @@ -6,9 +6,9 @@ RAM: 64.0 GB | Test | Average time| |--------------------------------|-------------| - | Counter_Add_Sorted | 560 ns | - | Counter_Add_Unsorted | 565 ns | - | Counter_Overflow | 568 ns | + | Counter_Add_Sorted | 193 ns | + | Counter_Add_Unsorted | 209 ns | + | Counter_Overflow | 898 ns | | ThreadLocal_Random_Generator_5 | 37 ns | */ diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 61ceadc21c..912fbacd58 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -1,4 +1,6 @@ -use std::sync::atomic::{AtomicBool, Ordering}; +use std::collections::HashSet; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::Arc; use std::vec; use std::{ collections::HashMap, @@ -8,19 +10,21 @@ use std::{ use crate::metrics::data::{self, Aggregation, DataPoint, Temporality}; use crate::metrics::AttributeSet; +use once_cell::sync::Lazy; use opentelemetry::KeyValue; use opentelemetry::{global, metrics::MetricsError}; -use super::{ - aggregate::{is_under_cardinality_limit, STREAM_OVERFLOW_ATTRIBUTE_SET}, - AtomicTracker, Number, -}; +use super::{aggregate::is_under_cardinality_limit, AtomicTracker, Number}; + +pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy> = + Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]); /// The storage for sums. struct ValueMap> { - values: RwLock>, + values: RwLock, Arc>>, has_no_value_attribute_value: AtomicBool, no_attribute_value: T::AtomicTracker, + count: AtomicUsize, } impl> Default for ValueMap { @@ -35,42 +39,59 @@ impl> ValueMap { values: RwLock::new(HashMap::new()), has_no_value_attribute_value: AtomicBool::new(false), no_attribute_value: T::new_atomic_tracker(), + count: AtomicUsize::new(0), } } } impl> ValueMap { - fn measure(&self, measurement: T, attrs: AttributeSet) { + fn measure(&self, measurement: T, attrs: &[KeyValue]) { if attrs.is_empty() { self.no_attribute_value.add(measurement); self.has_no_value_attribute_value .store(true, Ordering::Release); } else if let Ok(values) = self.values.read() { - if let Some(value_to_update) = values.get(&attrs) { + // Try incoming order first + if let Some(value_to_update) = values.get(attrs) { value_to_update.add(measurement); - return; } else { - drop(values); - if let Ok(mut values) = self.values.write() { - // Recheck after acquiring write lock, in case another - // thread has added the value. - if let Some(value_to_update) = values.get(&attrs) { - value_to_update.add(measurement); - return; - } else if is_under_cardinality_limit(values.len()) { - let new_value = T::new_atomic_tracker(); - new_value.add(measurement); - values.insert(attrs, new_value); - } else if let Some(overflow_value) = - values.get_mut(&STREAM_OVERFLOW_ATTRIBUTE_SET) - { - overflow_value.add(measurement); - return; - } else { - let new_value = T::new_atomic_tracker(); - new_value.add(measurement); - values.insert(STREAM_OVERFLOW_ATTRIBUTE_SET.clone(), new_value); - global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into())); + // Then try sorted order. + let sorted_attrs = AttributeSet::from(attrs).into_vec(); + if let Some(value_to_update) = values.get(sorted_attrs.as_slice()) { + value_to_update.add(measurement); + } else { + // Give up the lock, before acquiring write lock. + drop(values); + if let Ok(mut values) = self.values.write() { + // Recheck both incoming and sorted after acquiring + // write lock, in case another thread has added the + // value. + if let Some(value_to_update) = values.get(attrs) { + value_to_update.add(measurement); + } else if let Some(value_to_update) = values.get(sorted_attrs.as_slice()) { + value_to_update.add(measurement); + } else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) { + let new_value = T::new_atomic_tracker(); + new_value.add(measurement); + let new_value = Arc::new(new_value); + + // Insert original order + values.insert(attrs.to_vec(), new_value.clone()); + + // Insert sorted order + values.insert(sorted_attrs, new_value); + + self.count.fetch_add(1, Ordering::SeqCst); + } else if let Some(overflow_value) = + values.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) + { + overflow_value.add(measurement); + } else { + let new_value = T::new_atomic_tracker(); + new_value.add(measurement); + values.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_value)); + global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into())); + } } } } @@ -100,7 +121,6 @@ impl> Sum { } pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { - let attrs: AttributeSet = attrs.into(); self.value_map.measure(measurement, attrs) } @@ -125,12 +145,9 @@ impl> Sum { s_data.is_monotonic = self.monotonic; s_data.data_points.clear(); - let mut values = match self.value_map.values.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - let n = values.len() + 1; + // Max number of data points need to account for the special casing + // of the no attribute value + overflow attribute. + let n = self.value_map.count.load(Ordering::SeqCst) + 2; if n > s_data.data_points.capacity() { s_data .data_points @@ -152,23 +169,29 @@ impl> Sum { }); } + let mut values = match self.value_map.values.write() { + Ok(v) => v, + Err(_) => return (0, None), + }; + + let mut seen = HashSet::new(); for (attrs, value) in values.drain() { - s_data.data_points.push(DataPoint { - attributes: attrs - .iter() - .map(|(k, v)| KeyValue::new(k.clone(), v.clone())) - .collect(), - start_time: Some(prev_start), - time: Some(t), - value: value.get_value(), - exemplars: vec![], - }); + if seen.insert(Arc::as_ptr(&value)) { + s_data.data_points.push(DataPoint { + attributes: attrs.clone(), + start_time: Some(prev_start), + time: Some(t), + value: value.get_value(), + exemplars: vec![], + }); + } } // The delta collection cycle resets. if let Ok(mut start) = self.start.lock() { *start = t; } + self.value_map.count.store(0, Ordering::SeqCst); ( s_data.data_points.len(), @@ -197,12 +220,9 @@ impl> Sum { s_data.is_monotonic = self.monotonic; s_data.data_points.clear(); - let values = match self.value_map.values.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - let n = values.len() + 1; + // Max number of data points need to account for the special casing + // of the no attribute value + overflow attribute. + let n = self.value_map.count.load(Ordering::SeqCst) + 2; if n > s_data.data_points.capacity() { s_data .data_points @@ -225,16 +245,18 @@ impl> Sum { }); } + let values = match self.value_map.values.write() { + Ok(v) => v, + Err(_) => return (0, None), + }; + // TODO: This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not // overload the system. for (attrs, value) in values.iter() { s_data.data_points.push(DataPoint { - attributes: attrs - .iter() - .map(|(k, v)| KeyValue::new(k.clone(), v.clone())) - .collect(), + attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), value: value.get_value(), @@ -254,7 +276,7 @@ pub(crate) struct PrecomputedSum> { value_map: ValueMap, monotonic: bool, start: Mutex, - reported: Mutex>, + reported: Mutex, T>>, } impl> PrecomputedSum { @@ -268,7 +290,6 @@ impl> PrecomputedSum { } pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { - let attrs: AttributeSet = attrs.into(); self.value_map.measure(measurement, attrs) } @@ -294,12 +315,9 @@ impl> PrecomputedSum { s_data.temporality = Temporality::Delta; s_data.is_monotonic = self.monotonic; - let mut values = match self.value_map.values.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - let n = values.len() + 1; + // Max number of data points need to account for the special casing + // of the no attribute value + overflow attribute. + let n = self.value_map.count.load(Ordering::SeqCst) + 2; if n > s_data.data_points.capacity() { s_data .data_points @@ -325,6 +343,11 @@ impl> PrecomputedSum { }); } + let mut values = match self.value_map.values.write() { + Ok(v) => v, + Err(_) => return (0, None), + }; + let default = T::default(); for (attrs, value) in values.drain() { let delta = value.get_value() - *reported.get(&attrs).unwrap_or(&default); @@ -332,10 +355,7 @@ impl> PrecomputedSum { new_reported.insert(attrs.clone(), value.get_value()); } s_data.data_points.push(DataPoint { - attributes: attrs - .iter() - .map(|(k, v)| KeyValue::new(k.clone(), v.clone())) - .collect(), + attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), value: delta, @@ -347,6 +367,7 @@ impl> PrecomputedSum { if let Ok(mut start) = self.start.lock() { *start = t; } + self.value_map.count.store(0, Ordering::SeqCst); *reported = new_reported; drop(reported); // drop before values guard is dropped @@ -379,12 +400,9 @@ impl> PrecomputedSum { s_data.temporality = Temporality::Cumulative; s_data.is_monotonic = self.monotonic; - let values = match self.value_map.values.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - let n = values.len() + 1; + // Max number of data points need to account for the special casing + // of the no attribute value + overflow attribute. + let n = self.value_map.count.load(Ordering::SeqCst) + 2; if n > s_data.data_points.capacity() { s_data .data_points @@ -410,6 +428,10 @@ impl> PrecomputedSum { }); } + let values = match self.value_map.values.write() { + Ok(v) => v, + Err(_) => return (0, None), + }; let default = T::default(); for (attrs, value) in values.iter() { let delta = value.get_value() - *reported.get(attrs).unwrap_or(&default); @@ -417,10 +439,7 @@ impl> PrecomputedSum { new_reported.insert(attrs.clone(), value.get_value()); } s_data.data_points.push(DataPoint { - attributes: attrs - .iter() - .map(|(k, v)| KeyValue::new(k.clone(), v.clone())) - .collect(), + attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), value: delta, diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 2a65b83e9c..46904095e8 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -925,6 +925,62 @@ mod tests { assert_eq!(data_point1.value, 6); } + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn counter_aggregation_attribute_ordering() { + // Run this test with stdout enabled to see output. + // cargo test counter_aggregation_attribute_ordering --features=testing -- --nocapture + + // Arrange + let mut test_context = TestContext::new(Temporality::Delta); + let counter = test_context.u64_counter("test", "my_counter", None); + + // Act + // Add the same set of attributes in different order. (they are expected + // to be treated as same attributes) + // start with unsorted order + + let attribute_values = [ + "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", + "value9", "value10", + ]; + let mut rng = rngs::SmallRng::from_entropy(); + + for _ in 0..100000 { + let mut rands: [usize; 4] = [0; 4]; + // 4X4X10X10 = 1600 time-series. + rands[0] = rng.gen_range(0..4); + rands[1] = rng.gen_range(0..4); + rands[2] = rng.gen_range(0..10); + rands[3] = rng.gen_range(0..10); + let index_first_attribute = rands[0]; + let index_second_attribute = rands[1]; + let index_third_attribute = rands[2]; + let index_fourth_attribute = rands[3]; + counter.add( + 1, + &[ + KeyValue::new("attribute1", attribute_values[index_first_attribute]), + KeyValue::new("attribute2", attribute_values[index_second_attribute]), + KeyValue::new("attribute3", attribute_values[index_third_attribute]), + KeyValue::new("attribute4", attribute_values[index_fourth_attribute]), + ], + ); + } + + test_context.flush_metrics(); + + let sum = test_context.get_aggregation::>("my_counter", None); + + // Expecting 1600 time-series. + assert_eq!(sum.data_points.len(), 1600); + + // validate that overflow data point is not present. + let overflow_point = + find_datapoint_with_key_value(&sum.data_points, "otel.metric.overflow", "true"); + + assert!(overflow_point.is_none()); + } + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn no_attr_cumulative_counter() { let mut test_context = TestContext::new(Temporality::Cumulative); diff --git a/stress/src/metrics_counter.rs b/stress/src/metrics_counter.rs index 6cde5ed9cb..452907f2bf 100644 --- a/stress/src/metrics_counter.rs +++ b/stress/src/metrics_counter.rs @@ -4,6 +4,9 @@ Hardware: Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz, 16vCPUs, RAM: 64.0 GB ~9.5 M /sec + + Hardware: AMD EPYC 7763 64-Core Processor - 2.44 GHz, 16vCPUs, + ~20 M /sec */ use lazy_static::lazy_static;