From 500c7590aa00457a6287c511f82d51dae04815b1 Mon Sep 17 00:00:00 2001 From: Mindaugas Vinkelis Date: Tue, 10 Sep 2024 12:46:59 +0300 Subject: [PATCH] Fix issue where dupliate attributes are not always removed --- .../expected/serialized_traces.json | 6 + opentelemetry-sdk/Cargo.toml | 1 + .../src/metrics/attribute_set.rs | 63 ++++ .../src/metrics/internal/histogram.rs | 177 +++------- .../src/metrics/internal/last_value.rs | 104 ++---- opentelemetry-sdk/src/metrics/internal/mod.rs | 308 ++++++++++++++---- .../src/metrics/internal/precomputed_sum.rs | 119 ++----- opentelemetry-sdk/src/metrics/internal/sum.rs | 117 ++----- opentelemetry-sdk/src/metrics/mod.rs | 67 +--- 9 files changed, 456 insertions(+), 506 deletions(-) create mode 100644 opentelemetry-sdk/src/metrics/attribute_set.rs diff --git a/opentelemetry-otlp/tests/integration_test/expected/serialized_traces.json b/opentelemetry-otlp/tests/integration_test/expected/serialized_traces.json index 849e66dd7b..e5982877cf 100644 --- a/opentelemetry-otlp/tests/integration_test/expected/serialized_traces.json +++ b/opentelemetry-otlp/tests/integration_test/expected/serialized_traces.json @@ -112,6 +112,12 @@ "value": { "intValue": "100" } + }, + { + "key": "number/int", + "value": { + "intValue": "100" + } } ], "droppedAttributesCount": 0 diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index 8761a6a669..24848ce3a6 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -20,6 +20,7 @@ futures-util = { workspace = true, features = ["std", "sink", "async-await-macro once_cell = { workspace = true } percent-encoding = { version = "2.0", optional = true } rand = { workspace = true, features = ["std", "std_rng","small_rng"], optional = true } +rustc-hash = "2.0" glob = { version = "0.3.1", optional =true} serde = { workspace = true, features = ["derive", "rc"], optional = true } serde_json = { workspace = true, optional = true } diff --git a/opentelemetry-sdk/src/metrics/attribute_set.rs b/opentelemetry-sdk/src/metrics/attribute_set.rs new file mode 100644 index 0000000000..e05a86563e --- /dev/null +++ b/opentelemetry-sdk/src/metrics/attribute_set.rs @@ -0,0 +1,63 @@ +use std::hash::{Hash, Hasher}; + +use opentelemetry::{Key, KeyValue, Value}; +use rustc_hash::FxHasher; + +/// A unique set of attributes that can be used as instrument identifiers. +/// +/// This must implement [Hash], [PartialEq], and [Eq] so it may be used as +/// HashMap keys and other de-duplication methods. +#[derive(Clone, Default, Debug, PartialEq, Eq)] +pub(crate) struct AttributeSet(Vec, u64); + +impl From<&[KeyValue]> for AttributeSet { + fn from(values: &[KeyValue]) -> Self { + let mut vec = Vec::from_iter(values.into_iter().cloned()); + vec.sort_by(|a, b| a.key.cmp(&b.key)); + + // we cannot use vec.dedup_by because it will remove last duplicate not first + if vec.len() > 1 { + let mut i = vec.len() - 1; + while i != 0 { + let is_same = unsafe { vec.get_unchecked(i - 1).key == vec.get_unchecked(i).key }; + if is_same { + vec.remove(i - 1); + } + i -= 1; + } + } + + let hash = calculate_hash(&vec); + AttributeSet(vec, hash) + } +} + +fn calculate_hash(values: &[KeyValue]) -> u64 { + let mut hasher = FxHasher::default(); + values.iter().fold(&mut hasher, |mut hasher, item| { + item.hash(&mut hasher); + hasher + }); + hasher.finish() +} + +impl AttributeSet { + /// Iterate over key value pairs in the set + pub(crate) fn iter(&self) -> impl Iterator { + self.0.iter().map(|kv| (&kv.key, &kv.value)) + } + + pub(crate) fn into_inner(self) -> Vec { + self.0 + } + + pub(crate) fn as_ref(&self) -> &Vec { + &self.0 + } +} + +impl Hash for AttributeSet { + fn hash(&self, state: &mut H) { + state.write_u64(self.1) + } +} diff --git a/opentelemetry-sdk/src/metrics/internal/histogram.rs b/opentelemetry-sdk/src/metrics/internal/histogram.rs index 8fc5bf59c1..e08d34edf3 100644 --- a/opentelemetry-sdk/src/metrics/internal/histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/histogram.rs @@ -1,13 +1,11 @@ -use std::collections::HashSet; -use std::sync::atomic::Ordering; -use std::sync::Arc; +use std::mem::take; use std::{sync::Mutex, time::SystemTime}; use crate::metrics::data::HistogramDataPoint; use crate::metrics::data::{self, Aggregation, Temporality}; use opentelemetry::KeyValue; -use super::Number; +use super::{collect_data_points_readonly, collect_data_points_reset, Number}; use super::{AtomicTracker, AtomicallyUpdate, Operation, ValueMap}; struct HistogramUpdate; @@ -45,7 +43,6 @@ impl> AtomicallyUpdate for HistogramTracker { } } -#[derive(Default)] struct Buckets { counts: Vec, count: u64, @@ -61,7 +58,8 @@ impl> Buckets { counts: vec![0; n], min: T::max(), max: T::min(), - ..Default::default() + count: 0, + total: T::default(), } } @@ -80,14 +78,17 @@ impl> Buckets { } } - fn reset(&mut self) { - for item in &mut self.counts { - *item = 0; - } - self.count = Default::default(); - self.total = Default::default(); - self.min = T::max(); - self.max = T::min(); + fn clone_and_reset(&mut self) -> Self { + let n = self.counts.len(); + let res = Buckets { + counts: take(&mut self.counts), + count: self.count, + total: self.total, + min: self.min, + max: self.max, + }; + *self = Buckets::new(n); + res } } @@ -155,26 +156,27 @@ impl> Histogram { h.temporality = Temporality::Delta; h.data_points.clear(); - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > h.data_points.capacity() { - h.data_points.reserve_exact(n - h.data_points.capacity()); - } + let Ok(mut trackers) = self.value_map.trackers.write() else { + return (0, None); + }; - if self - .value_map - .has_no_attribute_value - .swap(false, Ordering::AcqRel) - { - if let Ok(ref mut b) = self.value_map.no_attribute_tracker.buckets.lock() { - h.data_points.push(HistogramDataPoint { - attributes: vec![], + collect_data_points_reset( + &self.value_map.no_attribs_tracker, + &mut trackers, + &mut h.data_points, + |attributes, tracker| { + let b = tracker + .buckets + .lock() + .unwrap_or_else(|err| err.into_inner()) + .clone_and_reset(); + HistogramDataPoint { + attributes, start_time: start, time: t, count: b.count, bounds: self.bounds.clone(), - bucket_counts: b.counts.clone(), + bucket_counts: b.counts, sum: if self.record_sum { b.total } else { @@ -191,54 +193,14 @@ impl> Histogram { None }, exemplars: vec![], - }); - - b.reset(); - } - } - - let mut trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.drain() { - if seen.insert(Arc::as_ptr(&tracker)) { - if let Ok(b) = tracker.buckets.lock() { - h.data_points.push(HistogramDataPoint { - attributes: attrs.clone(), - start_time: start, - time: t, - count: b.count, - bounds: self.bounds.clone(), - bucket_counts: b.counts.clone(), - sum: if self.record_sum { - b.total - } else { - T::default() - }, - min: if self.record_min_max { - Some(b.min) - } else { - None - }, - max: if self.record_min_max { - Some(b.max) - } else { - None - }, - exemplars: vec![], - }); } - } - } + }, + ); // The delta collection cycle resets. if let Ok(mut start) = self.start.lock() { *start = t; } - self.value_map.count.store(0, Ordering::SeqCst); (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>)) } @@ -266,21 +228,21 @@ impl> Histogram { h.temporality = Temporality::Cumulative; h.data_points.clear(); - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > h.data_points.capacity() { - h.data_points.reserve_exact(n - h.data_points.capacity()); - } + let Ok(trackers) = self.value_map.trackers.read() else { + return (0, None); + }; - if self - .value_map - .has_no_attribute_value - .load(Ordering::Acquire) - { - if let Ok(b) = &self.value_map.no_attribute_tracker.buckets.lock() { - h.data_points.push(HistogramDataPoint { - attributes: vec![], + collect_data_points_readonly( + &self.value_map.no_attribs_tracker, + &trackers, + &mut h.data_points, + |attributes, tracker| { + let b = tracker + .buckets + .lock() + .unwrap_or_else(|err| err.into_inner()); + HistogramDataPoint { + attributes, start_time: start, time: t, count: b.count, @@ -302,50 +264,9 @@ impl> Histogram { None }, exemplars: vec![], - }); - } - } - - let trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - // TODO: This will use an unbounded amount of memory if there - // are unbounded number of attribute sets being aggregated. Attribute - // sets that become "stale" need to be forgotten so this will not - // overload the system. - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.iter() { - if seen.insert(Arc::as_ptr(tracker)) { - if let Ok(b) = tracker.buckets.lock() { - h.data_points.push(HistogramDataPoint { - attributes: attrs.clone(), - start_time: start, - time: t, - count: b.count, - bounds: self.bounds.clone(), - bucket_counts: b.counts.clone(), - sum: if self.record_sum { - b.total - } else { - T::default() - }, - min: if self.record_min_max { - Some(b.min) - } else { - None - }, - max: if self.record_min_max { - Some(b.max) - } else { - None - }, - exemplars: vec![], - }); } - } - } + }, + ); (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>)) } diff --git a/opentelemetry-sdk/src/metrics/internal/last_value.rs b/opentelemetry-sdk/src/metrics/internal/last_value.rs index cce14c5c42..a9d73ab495 100644 --- a/opentelemetry-sdk/src/metrics/internal/last_value.rs +++ b/opentelemetry-sdk/src/metrics/internal/last_value.rs @@ -1,13 +1,12 @@ -use std::{ - collections::HashSet, - sync::{atomic::Ordering, Arc, Mutex}, - time::SystemTime, -}; +use std::{sync::Mutex, time::SystemTime}; use crate::metrics::data::DataPoint; use opentelemetry::KeyValue; -use super::{Assign, AtomicTracker, Number, ValueMap}; +use super::{ + collect_data_points_readonly, collect_data_points_reset, Assign, AtomicTracker, Number, + ValueMap, +}; /// Summarizes a set of measurements as the last one made. pub(crate) struct LastValue> { @@ -33,50 +32,27 @@ impl> LastValue { let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); dest.clear(); - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > dest.capacity() { - dest.reserve_exact(n - dest.capacity()); - } + let Ok(mut trackers) = self.value_map.trackers.write() else { + return; + }; - if self - .value_map - .has_no_attribute_value - .swap(false, Ordering::AcqRel) - { - dest.push(DataPoint { - attributes: vec![], + collect_data_points_reset( + &self.value_map.no_attribs_tracker, + &mut trackers, + dest, + |attributes, tracker| DataPoint { + attributes, start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.get_and_reset_value(), + value: tracker.get_and_reset_value(), exemplars: vec![], - }); - } - - let mut trackers = match self.value_map.trackers.write() { - Ok(v) => v, - _ => return, - }; - - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.drain() { - if seen.insert(Arc::as_ptr(&tracker)) { - dest.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: tracker.get_value(), - exemplars: vec![], - }); - } - } + }, + ); // The delta collection cycle resets. if let Ok(mut start) = self.start.lock() { *start = t; } - self.value_map.count.store(0, Ordering::SeqCst); } pub(crate) fn compute_aggregation_cumulative(&self, dest: &mut Vec>) { @@ -85,43 +61,21 @@ impl> LastValue { dest.clear(); - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > dest.capacity() { - dest.reserve_exact(n - dest.capacity()); - } + let Ok(trackers) = self.value_map.trackers.read() else { + return; + }; - if self - .value_map - .has_no_attribute_value - .load(Ordering::Acquire) - { - dest.push(DataPoint { - attributes: vec![], + collect_data_points_readonly( + &self.value_map.no_attribs_tracker, + &trackers, + dest, + |attributes, tracker| DataPoint { + attributes, start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.get_value(), + value: tracker.get_value(), exemplars: vec![], - }); - } - - let trackers = match self.value_map.trackers.write() { - Ok(v) => v, - _ => return, - }; - - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.iter() { - if seen.insert(Arc::as_ptr(tracker)) { - dest.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: tracker.get_value(), - exemplars: vec![], - }); - } - } + }, + ); } } diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 41f97aa20b..fe39328bf1 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -9,7 +9,7 @@ use core::fmt; use std::collections::HashMap; use std::marker::PhantomData; use std::ops::{Add, AddAssign, Sub}; -use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering}; use std::sync::{Arc, RwLock}; use aggregate::is_under_cardinality_limit; @@ -21,8 +21,11 @@ use opentelemetry::{global, KeyValue}; use crate::metrics::AttributeSet; -pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy> = - Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]); +const STREAM_OVERFLOW_MSG:&str = "Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged."; + +pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy = Lazy::new(|| { + AttributeSet::from(&[KeyValue::new("otel.metric.overflow", "true")] as &[KeyValue]) +}); /// Abstracts the update operation for a measurement. pub(crate) trait Operation { @@ -45,21 +48,161 @@ impl Operation for Assign { } } +pub(crate) struct AttributeSetTracker +where + AU: AtomicallyUpdate, + T: Number, +{ + list: HashMap, Arc>, + sorted_deduped: HashMap>, + /// Buckets Count is only used by Histogram. + buckets_count: Option, +} + +impl AttributeSetTracker +where + AU: AtomicallyUpdate, + T: Number, +{ + fn new(buckets_count: Option) -> Self { + Self { + list: Default::default(), + sorted_deduped: Default::default(), + buckets_count, + } + } + + fn get_existing(&self, attributes: &[KeyValue]) -> Option<&Arc> { + self.list.get(attributes) + } + + fn create_new(&mut self, attributes: &[KeyValue]) -> &mut Arc { + if is_under_cardinality_limit(self.list.len()) { + // Recheck in case another thread has pushed an update in the meantime. + self.list.entry(attributes.into()).or_insert_with(|| { + // return existing combination if we have one + self.sorted_deduped + .entry(AttributeSet::from(attributes)) + .or_insert_with(|| Arc::new(AU::new_atomic_tracker(self.buckets_count))) + .clone() + }) + } else { + self.sorted_deduped + .entry(STREAM_OVERFLOW_ATTRIBUTES.clone()) + .or_insert_with(|| { + global::handle_error(MetricsError::Other(STREAM_OVERFLOW_MSG.into())); + Arc::new(AU::new_atomic_tracker(self.buckets_count)) + }) + } + } +} + +pub(crate) struct LockFreeNoAttribsTracker +where + AU: AtomicallyUpdate, + T: Number, +{ + /// Indicates whether a value with no attributes has been stored. + is_set: AtomicBool, + /// Tracker for values with no attributes attached. + tracker: AU::AtomicTracker, +} + +impl LockFreeNoAttribsTracker +where + AU: AtomicallyUpdate, + T: Number, +{ + fn new(buckets_count: Option) -> Self { + Self { + is_set: AtomicBool::new(false), + tracker: AU::new_atomic_tracker(buckets_count), + } + } + + fn update_tracker(&self, measurement: T, index: usize) + where + O: Operation, + { + O::update_tracker(&self.tracker, measurement, index); + self.is_set.store(true, Ordering::Release); + } +} + +/// Collect all data points from various sources in readonly mode +/// without reseting anything +pub(crate) fn collect_data_points_readonly( + no_attribs: &LockFreeNoAttribsTracker, + with_attribs: &AttributeSetTracker, + dest: &mut Vec, + f: impl Fn(Vec, &AU::AtomicTracker) -> Res, +) where + AU: AtomicallyUpdate, + T: Number, +{ + // Max number of data points need to account for the special casing + // of the no attribute value + let n = with_attribs.list.len() + 1; + if n > dest.capacity() { + dest.reserve_exact(n - dest.capacity()); + } + if no_attribs.is_set.load(Ordering::Acquire) { + dest.push(f(vec![], &no_attribs.tracker)); + } + // TODO: This will use an unbounded amount of memory if there + // are unbounded number of attribute sets being aggregated. Attribute + // sets that become "stale" need to be forgotten so this will not + // overload the system. + dest.extend( + with_attribs + .sorted_deduped + .iter() + .map(|(attrs, tracker)| f(attrs.as_ref().clone(), tracker.as_ref())), + ) +} + +/// Collect all data sources and also resets everything +/// It is expected for caller to reset AtomicTracker as well +pub(crate) fn collect_data_points_reset( + no_attribs: &LockFreeNoAttribsTracker, + with_attribs: &mut AttributeSetTracker, + dest: &mut Vec, + mut f: impl FnMut(Vec, &AU::AtomicTracker) -> Res, +) where + AU: AtomicallyUpdate, + T: Number, +{ + // Max number of data points need to account for the special casing + // of the no attribute value + let n = with_attribs.list.len() + 1; + if n > dest.capacity() { + dest.reserve_exact(n - dest.capacity()); + } + if no_attribs.is_set.swap(false, Ordering::AcqRel) { + dest.push(f(vec![], &no_attribs.tracker)); + } + + with_attribs.list.clear(); + // TODO: This will use an unbounded amount of memory if there + // are unbounded number of attribute sets being aggregated. Attribute + // sets that become "stale" need to be forgotten so this will not + // overload the system. + dest.extend( + with_attribs + .sorted_deduped + .drain() + .map(|(attrs, tracker)| f(attrs.into_inner(), tracker.as_ref())), + ); +} + /// The storage for sums. /// /// This structure is parametrized by an `Operation` that indicates how /// updates to the underlying value trackers should be performed. pub(crate) struct ValueMap, T: Number, O> { /// Trackers store the values associated with different attribute sets. - trackers: RwLock, Arc>>, - /// Number of different attribute set stored in the `trackers` map. - count: AtomicUsize, - /// Indicates whether a value with no attributes has been stored. - has_no_attribute_value: AtomicBool, - /// Tracker for values with no attributes attached. - no_attribute_tracker: AU::AtomicTracker, - /// Buckets Count is only used by Histogram. - buckets_count: Option, + trackers: RwLock>, + no_attribs_tracker: LockFreeNoAttribsTracker, phantom: PhantomData, } @@ -72,22 +215,16 @@ impl, T: Number, O> Default for ValueMap { impl, T: Number, O> ValueMap { fn new() -> Self { ValueMap { - trackers: RwLock::new(HashMap::new()), - has_no_attribute_value: AtomicBool::new(false), - no_attribute_tracker: AU::new_atomic_tracker(None), - count: AtomicUsize::new(0), - buckets_count: None, + trackers: RwLock::new(AttributeSetTracker::new(None)), + no_attribs_tracker: LockFreeNoAttribsTracker::new(None), phantom: PhantomData, } } fn new_with_buckets_count(buckets_count: usize) -> Self { ValueMap { - trackers: RwLock::new(HashMap::new()), - has_no_attribute_value: AtomicBool::new(false), - no_attribute_tracker: AU::new_atomic_tracker(Some(buckets_count)), - count: AtomicUsize::new(0), - buckets_count: Some(buckets_count), + trackers: RwLock::new(AttributeSetTracker::new(Some(buckets_count))), + no_attribs_tracker: LockFreeNoAttribsTracker::new(Some(buckets_count)), phantom: PhantomData, } } @@ -96,57 +233,24 @@ impl, T: Number, O> ValueMap { impl, T: Number, O: Operation> ValueMap { fn measure(&self, measurement: T, attributes: &[KeyValue], index: usize) { if attributes.is_empty() { - O::update_tracker(&self.no_attribute_tracker, measurement, index); - self.has_no_attribute_value.store(true, Ordering::Release); + self.no_attribs_tracker + .update_tracker::(measurement, index); return; } - let Ok(trackers) = self.trackers.read() else { - return; - }; - - // Try to retrieve and update the tracker with the attributes in the provided order first - if let Some(tracker) = trackers.get(attributes) { - O::update_tracker(&**tracker, measurement, index); - return; - } - - // Try to retrieve and update the tracker with the attributes sorted. - let sorted_attrs = AttributeSet::from(attributes).into_vec(); - if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { - O::update_tracker(&**tracker, measurement, index); - return; + match self.trackers.read() { + Ok(trackers) => { + if let Some(tracker) = trackers.get_existing(attributes) { + O::update_tracker(tracker.as_ref(), measurement, index); + return; + } + } + Err(_) => return, } - // Give up the read lock before acquiring the write lock. - drop(trackers); - - let Ok(mut trackers) = self.trackers.write() else { - return; - }; - - // Recheck both the provided and sorted orders after acquiring the write lock - // in case another thread has pushed an update in the meantime. - if let Some(tracker) = trackers.get(attributes) { - O::update_tracker(&**tracker, measurement, index); - } else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { - O::update_tracker(&**tracker, measurement, index); - } else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) { - let new_tracker = Arc::new(AU::new_atomic_tracker(self.buckets_count)); - O::update_tracker(&*new_tracker, measurement, index); - - // Insert tracker with the attributes in the provided and sorted orders - trackers.insert(attributes.to_vec(), new_tracker.clone()); - trackers.insert(sorted_attrs, new_tracker); - - self.count.fetch_add(1, Ordering::SeqCst); - } else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) { - O::update_tracker(&**overflow_value, measurement, index); - } else { - let new_tracker = AU::new_atomic_tracker(self.buckets_count); - O::update_tracker(&new_tracker, measurement, index); - trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker)); - global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into())); + if let Ok(mut trackers) = self.trackers.write() { + let tracker = trackers.create_new(attributes); + O::update_tracker(tracker.as_ref(), measurement, index); } } } @@ -350,6 +454,10 @@ impl AtomicallyUpdate for f64 { #[cfg(test)] mod tests { + use std::collections::HashSet; + + use sum::Sum; + use super::*; #[test] @@ -465,4 +573,68 @@ mod tests { assert!(f64::abs(15.5 - value) < 0.0001, "Incorrect first value"); assert!(f64::abs(0.0 - value2) < 0.0001, "Incorrect second value"); } + + #[test] + fn attributes_should_be_sorted_and_deduplicated() { + let counter = Sum::::new(true); + let mut results = HashSet::new(); + let attr1 = [ + KeyValue::new("key1", "a1"), + KeyValue::new("key3", "c1"), + KeyValue::new("key1", "a2"), + ]; + counter.measure(4, &attr1); + results.insert(AttributeSet::from(&attr1 as &[KeyValue]).into_inner()); + assert!( + results.contains([KeyValue::new("key1", "a2"), KeyValue::new("key3", "c1")].as_slice()) + ); + // add more variations because Rust default hasher has randomness + // more cases should help to fight that + let attr2 = [ + KeyValue::new("key3", "c1"), + KeyValue::new("key2", "b1"), + KeyValue::new("key2", "b2"), + ]; + counter.measure(4, &attr2); + results.insert(AttributeSet::from(&attr2 as &[KeyValue]).into_inner()); + assert!( + results.contains([KeyValue::new("key2", "b2"), KeyValue::new("key3", "c1")].as_slice()) + ); + let attr3 = [ + KeyValue::new("key3", "c4"), + KeyValue::new("key2", "b1"), + KeyValue::new("key3", "c2"), + ]; + counter.measure(4, &attr3); + results.insert(AttributeSet::from(&attr3 as &[KeyValue]).into_inner()); + assert!( + results.contains([KeyValue::new("key2", "b1"), KeyValue::new("key3", "c2")].as_slice()) + ); + + let attr4 = [ + KeyValue::new("key3", "c4"), + KeyValue::new("key3", "c2"), + KeyValue::new("key1", "b3"), + ]; + counter.measure(4, &attr4); + results.insert(AttributeSet::from(&attr4 as &[KeyValue]).into_inner()); + assert!( + results.contains([KeyValue::new("key1", "b3"), KeyValue::new("key3", "c2")].as_slice()) + ); + + let res = counter.cumulative(None).1.unwrap(); + let data = res + .as_any() + .downcast_ref::>() + .unwrap(); + + assert_eq!(data.data_points.len(), 4); + for points in &data.data_points { + assert!( + results.contains(&points.attributes), + "unexpected attributes: {:?}", + points.attributes + ); + } + } } diff --git a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs index 14e9c19b25..ce16a75736 100644 --- a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs @@ -2,12 +2,11 @@ use opentelemetry::KeyValue; use crate::metrics::data::{self, Aggregation, DataPoint, Temporality}; -use super::{Assign, AtomicTracker, Number, ValueMap}; -use std::{ - collections::{HashMap, HashSet}, - sync::{atomic::Ordering, Arc, Mutex}, - time::SystemTime, +use super::{ + collect_data_points_readonly, collect_data_points_reset, Assign, AtomicTracker, Number, + ValueMap, }; +use std::{collections::HashMap, sync::Mutex, time::SystemTime}; /// Summarizes a set of pre-computed sums as their arithmetic sum. pub(crate) struct PrecomputedSum> { @@ -54,64 +53,40 @@ impl> PrecomputedSum { s_data.temporality = Temporality::Delta; s_data.is_monotonic = self.monotonic; - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > s_data.data_points.capacity() { - s_data - .data_points - .reserve_exact(n - s_data.data_points.capacity()); - } - let mut new_reported = HashMap::with_capacity(n); let mut reported = match self.reported.lock() { Ok(r) => r, Err(_) => return (0, None), }; - if self - .value_map - .has_no_attribute_value - .swap(false, Ordering::AcqRel) - { - let value = self.value_map.no_attribute_tracker.get_value(); - let delta = value - *reported.get(&vec![]).unwrap_or(&T::default()); - new_reported.insert(vec![], value); - - s_data.data_points.push(DataPoint { - attributes: vec![], - start_time: Some(prev_start), - time: Some(t), - value: delta, - exemplars: vec![], - }); - } - - let mut trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), + let Ok(mut trackers) = self.value_map.trackers.write() else { + return (0, None); }; - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.drain() { - if seen.insert(Arc::as_ptr(&tracker)) { - let value = tracker.get_value(); - let delta = value - *reported.get(&attrs).unwrap_or(&T::default()); - new_reported.insert(attrs.clone(), value); - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), + // same logic as in `collect_data_points_drain` + let mut new_reported = HashMap::with_capacity(trackers.list.len() + 1); + + collect_data_points_reset( + &self.value_map.no_attribs_tracker, + &mut trackers, + &mut s_data.data_points, + |attributes, tracker| { + let prev_value = *reported.get(&attributes).unwrap_or(&T::default()); + let curr_value = tracker.get_and_reset_value(); + new_reported.insert(attributes.clone(), curr_value); + DataPoint { + attributes, start_time: Some(prev_start), time: Some(t), - value: delta, + value: curr_value - prev_value, exemplars: vec![], - }); - } - } + } + }, + ); // The delta collection cycle resets. if let Ok(mut start) = self.start.lock() { *start = t; } - self.value_map.count.store(0, Ordering::SeqCst); *reported = new_reported; drop(reported); // drop before values guard is dropped @@ -144,46 +119,22 @@ impl> PrecomputedSum { s_data.temporality = Temporality::Cumulative; s_data.is_monotonic = self.monotonic; - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > s_data.data_points.capacity() { - s_data - .data_points - .reserve_exact(n - s_data.data_points.capacity()); - } + let Ok(trackers) = self.value_map.trackers.read() else { + return (0, None); + }; - if self - .value_map - .has_no_attribute_value - .load(Ordering::Acquire) - { - s_data.data_points.push(DataPoint { - attributes: vec![], + collect_data_points_readonly( + &self.value_map.no_attribs_tracker, + &trackers, + &mut s_data.data_points, + |attributes, tracker| DataPoint { + attributes, start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.get_value(), + value: tracker.get_value(), exemplars: vec![], - }); - } - - let trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.iter() { - if seen.insert(Arc::as_ptr(tracker)) { - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: tracker.get_value(), - exemplars: vec![], - }); - } - } + }, + ); ( s_data.data_points.len(), diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 68a58d1e8d..c80e809ff3 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -1,13 +1,10 @@ -use std::collections::HashSet; -use std::sync::atomic::Ordering; -use std::sync::Arc; use std::vec; use std::{sync::Mutex, time::SystemTime}; use crate::metrics::data::{self, Aggregation, DataPoint, Temporality}; use opentelemetry::KeyValue; -use super::{AtomicTracker, Number}; +use super::{collect_data_points_readonly, collect_data_points_reset, AtomicTracker, Number}; use super::{Increment, ValueMap}; /// Summarizes a set of measurements made as their arithmetic sum. @@ -40,7 +37,7 @@ impl> Sum { &self, dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { - let t = SystemTime::now(); + let now = SystemTime::now(); let s_data = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if s_data.is_none() { @@ -57,53 +54,29 @@ impl> Sum { s_data.is_monotonic = self.monotonic; s_data.data_points.clear(); - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > s_data.data_points.capacity() { - s_data - .data_points - .reserve_exact(n - s_data.data_points.capacity()); - } - - let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); - if self - .value_map - .has_no_attribute_value - .swap(false, Ordering::AcqRel) - { - s_data.data_points.push(DataPoint { - attributes: vec![], - start_time: Some(prev_start), - time: Some(t), - value: self.value_map.no_attribute_tracker.get_and_reset_value(), - exemplars: vec![], - }); - } + let prev_start = self.start.lock().map(|start| *start).unwrap_or(now); - let mut trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), + let Ok(mut trackers) = self.value_map.trackers.write() else { + return (0, None); }; - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.drain() { - if seen.insert(Arc::as_ptr(&tracker)) { - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: tracker.get_value(), - exemplars: vec![], - }); - } - } + collect_data_points_reset( + &self.value_map.no_attribs_tracker, + &mut trackers, + &mut s_data.data_points, + |attributes, tracker| DataPoint { + attributes, + start_time: Some(prev_start), + time: Some(now), + value: tracker.get_and_reset_value(), + exemplars: vec![], + }, + ); // The delta collection cycle resets. if let Ok(mut start) = self.start.lock() { - *start = t; + *start = now; } - self.value_map.count.store(0, Ordering::SeqCst); ( s_data.data_points.len(), @@ -132,52 +105,24 @@ impl> Sum { s_data.is_monotonic = self.monotonic; s_data.data_points.clear(); - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > s_data.data_points.capacity() { - s_data - .data_points - .reserve_exact(n - s_data.data_points.capacity()); - } - let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); - if self - .value_map - .has_no_attribute_value - .load(Ordering::Acquire) - { - s_data.data_points.push(DataPoint { - attributes: vec![], + let Ok(trackers) = self.value_map.trackers.read() else { + return (0, None); + }; + + collect_data_points_readonly( + &self.value_map.no_attribs_tracker, + &trackers, + &mut s_data.data_points, + |attributes, tracker| DataPoint { + attributes, start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.get_value(), + value: tracker.get_value(), exemplars: vec![], - }); - } - - let trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - // TODO: This will use an unbounded amount of memory if there - // are unbounded number of attribute sets being aggregated. Attribute - // sets that become "stale" need to be forgotten so this will not - // overload the system. - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.iter() { - if seen.insert(Arc::as_ptr(tracker)) { - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: tracker.get_value(), - exemplars: vec![], - }); - } - } + }, + ); ( s_data.data_points.len(), diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 0e1f978525..ea7f263582 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -40,6 +40,7 @@ //! [Resource]: crate::Resource pub(crate) mod aggregation; +pub(crate) mod attribute_set; pub mod data; pub mod exporter; pub(crate) mod instrument; @@ -53,6 +54,7 @@ pub mod reader; pub(crate) mod view; pub use aggregation::*; +pub(crate) use attribute_set::*; pub use instrument::*; pub use manual_reader::*; pub use meter::*; @@ -61,71 +63,6 @@ pub use periodic_reader::*; pub use pipeline::Pipeline; pub use view::*; -use std::collections::hash_map::DefaultHasher; -use std::collections::HashSet; -use std::hash::{Hash, Hasher}; - -use opentelemetry::{Key, KeyValue, Value}; - -/// A unique set of attributes that can be used as instrument identifiers. -/// -/// This must implement [Hash], [PartialEq], and [Eq] so it may be used as -/// HashMap keys and other de-duplication methods. -#[derive(Clone, Default, Debug, PartialEq, Eq)] -pub(crate) struct AttributeSet(Vec, u64); - -impl From<&[KeyValue]> for AttributeSet { - fn from(values: &[KeyValue]) -> Self { - let mut seen_keys = HashSet::with_capacity(values.len()); - let vec = values - .iter() - .rev() - .filter_map(|kv| { - if seen_keys.insert(kv.key.clone()) { - Some(kv.clone()) - } else { - None - } - }) - .collect::>(); - - AttributeSet::new(vec) - } -} - -fn calculate_hash(values: &[KeyValue]) -> u64 { - let mut hasher = DefaultHasher::new(); - values.iter().fold(&mut hasher, |mut hasher, item| { - item.hash(&mut hasher); - hasher - }); - hasher.finish() -} - -impl AttributeSet { - fn new(mut values: Vec) -> Self { - values.sort_unstable(); - let hash = calculate_hash(&values); - AttributeSet(values, hash) - } - - /// Iterate over key value pairs in the set - pub(crate) fn iter(&self) -> impl Iterator { - self.0.iter().map(|kv| (&kv.key, &kv.value)) - } - - /// Returns the underlying Vec of KeyValue pairs - pub(crate) fn into_vec(self) -> Vec { - self.0 - } -} - -impl Hash for AttributeSet { - fn hash(&self, state: &mut H) { - state.write_u64(self.1) - } -} - #[cfg(all(test, feature = "testing"))] mod tests { use self::data::{DataPoint, HistogramDataPoint, ScopeMetrics};