Skip to content

Commit 528e0a6

Browse files
cijothomasutpilla
andauthored
Metric refactor - 2x perf and allocation free (#1989)
Co-authored-by: Utkarsh Umesan Pillai <[email protected]>
1 parent 76b40ba commit 528e0a6

File tree

4 files changed

+160
-82
lines changed

4 files changed

+160
-82
lines changed

opentelemetry-sdk/benches/metric_counter.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
RAM: 64.0 GB
77
| Test | Average time|
88
|--------------------------------|-------------|
9-
| Counter_Add_Sorted | 560 ns |
10-
| Counter_Add_Unsorted | 565 ns |
11-
| Counter_Overflow | 568 ns |
9+
| Counter_Add_Sorted | 193 ns |
10+
| Counter_Add_Unsorted | 209 ns |
11+
| Counter_Overflow | 898 ns |
1212
| ThreadLocal_Random_Generator_5 | 37 ns |
1313
*/
1414

opentelemetry-sdk/src/metrics/internal/sum.rs

Lines changed: 98 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use std::sync::atomic::{AtomicBool, Ordering};
1+
use std::collections::HashSet;
2+
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3+
use std::sync::Arc;
24
use std::vec;
35
use std::{
46
collections::HashMap,
@@ -8,19 +10,21 @@ use std::{
810

911
use crate::metrics::data::{self, Aggregation, DataPoint, Temporality};
1012
use crate::metrics::AttributeSet;
13+
use once_cell::sync::Lazy;
1114
use opentelemetry::KeyValue;
1215
use opentelemetry::{global, metrics::MetricsError};
1316

14-
use super::{
15-
aggregate::{is_under_cardinality_limit, STREAM_OVERFLOW_ATTRIBUTE_SET},
16-
AtomicTracker, Number,
17-
};
17+
use super::{aggregate::is_under_cardinality_limit, AtomicTracker, Number};
18+
19+
pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy<Vec<KeyValue>> =
20+
Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]);
1821

1922
/// The storage for sums.
2023
struct ValueMap<T: Number<T>> {
21-
values: RwLock<HashMap<AttributeSet, T::AtomicTracker>>,
24+
values: RwLock<HashMap<Vec<KeyValue>, Arc<T::AtomicTracker>>>,
2225
has_no_value_attribute_value: AtomicBool,
2326
no_attribute_value: T::AtomicTracker,
27+
count: AtomicUsize,
2428
}
2529

2630
impl<T: Number<T>> Default for ValueMap<T> {
@@ -35,42 +39,59 @@ impl<T: Number<T>> ValueMap<T> {
3539
values: RwLock::new(HashMap::new()),
3640
has_no_value_attribute_value: AtomicBool::new(false),
3741
no_attribute_value: T::new_atomic_tracker(),
42+
count: AtomicUsize::new(0),
3843
}
3944
}
4045
}
4146

4247
impl<T: Number<T>> ValueMap<T> {
43-
fn measure(&self, measurement: T, attrs: AttributeSet) {
48+
fn measure(&self, measurement: T, attrs: &[KeyValue]) {
4449
if attrs.is_empty() {
4550
self.no_attribute_value.add(measurement);
4651
self.has_no_value_attribute_value
4752
.store(true, Ordering::Release);
4853
} else if let Ok(values) = self.values.read() {
49-
if let Some(value_to_update) = values.get(&attrs) {
54+
// Try incoming order first
55+
if let Some(value_to_update) = values.get(attrs) {
5056
value_to_update.add(measurement);
51-
return;
5257
} else {
53-
drop(values);
54-
if let Ok(mut values) = self.values.write() {
55-
// Recheck after acquiring write lock, in case another
56-
// thread has added the value.
57-
if let Some(value_to_update) = values.get(&attrs) {
58-
value_to_update.add(measurement);
59-
return;
60-
} else if is_under_cardinality_limit(values.len()) {
61-
let new_value = T::new_atomic_tracker();
62-
new_value.add(measurement);
63-
values.insert(attrs, new_value);
64-
} else if let Some(overflow_value) =
65-
values.get_mut(&STREAM_OVERFLOW_ATTRIBUTE_SET)
66-
{
67-
overflow_value.add(measurement);
68-
return;
69-
} else {
70-
let new_value = T::new_atomic_tracker();
71-
new_value.add(measurement);
72-
values.insert(STREAM_OVERFLOW_ATTRIBUTE_SET.clone(), new_value);
73-
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into()));
58+
// Then try sorted order.
59+
let sorted_attrs = AttributeSet::from(attrs).into_vec();
60+
if let Some(value_to_update) = values.get(sorted_attrs.as_slice()) {
61+
value_to_update.add(measurement);
62+
} else {
63+
// Give up the lock, before acquiring write lock.
64+
drop(values);
65+
if let Ok(mut values) = self.values.write() {
66+
// Recheck both incoming and sorted after acquiring
67+
// write lock, in case another thread has added the
68+
// value.
69+
if let Some(value_to_update) = values.get(attrs) {
70+
value_to_update.add(measurement);
71+
} else if let Some(value_to_update) = values.get(sorted_attrs.as_slice()) {
72+
value_to_update.add(measurement);
73+
} else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) {
74+
let new_value = T::new_atomic_tracker();
75+
new_value.add(measurement);
76+
let new_value = Arc::new(new_value);
77+
78+
// Insert original order
79+
values.insert(attrs.to_vec(), new_value.clone());
80+
81+
// Insert sorted order
82+
values.insert(sorted_attrs, new_value);
83+
84+
self.count.fetch_add(1, Ordering::SeqCst);
85+
} else if let Some(overflow_value) =
86+
values.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice())
87+
{
88+
overflow_value.add(measurement);
89+
} else {
90+
let new_value = T::new_atomic_tracker();
91+
new_value.add(measurement);
92+
values.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_value));
93+
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into()));
94+
}
7495
}
7596
}
7697
}
@@ -100,7 +121,6 @@ impl<T: Number<T>> Sum<T> {
100121
}
101122

102123
pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) {
103-
let attrs: AttributeSet = attrs.into();
104124
self.value_map.measure(measurement, attrs)
105125
}
106126

@@ -125,12 +145,9 @@ impl<T: Number<T>> Sum<T> {
125145
s_data.is_monotonic = self.monotonic;
126146
s_data.data_points.clear();
127147

128-
let mut values = match self.value_map.values.write() {
129-
Ok(v) => v,
130-
Err(_) => return (0, None),
131-
};
132-
133-
let n = values.len() + 1;
148+
// Max number of data points need to account for the special casing
149+
// of the no attribute value + overflow attribute.
150+
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
134151
if n > s_data.data_points.capacity() {
135152
s_data
136153
.data_points
@@ -152,23 +169,29 @@ impl<T: Number<T>> Sum<T> {
152169
});
153170
}
154171

172+
let mut values = match self.value_map.values.write() {
173+
Ok(v) => v,
174+
Err(_) => return (0, None),
175+
};
176+
177+
let mut seen = HashSet::new();
155178
for (attrs, value) in values.drain() {
156-
s_data.data_points.push(DataPoint {
157-
attributes: attrs
158-
.iter()
159-
.map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
160-
.collect(),
161-
start_time: Some(prev_start),
162-
time: Some(t),
163-
value: value.get_value(),
164-
exemplars: vec![],
165-
});
179+
if seen.insert(Arc::as_ptr(&value)) {
180+
s_data.data_points.push(DataPoint {
181+
attributes: attrs.clone(),
182+
start_time: Some(prev_start),
183+
time: Some(t),
184+
value: value.get_value(),
185+
exemplars: vec![],
186+
});
187+
}
166188
}
167189

168190
// The delta collection cycle resets.
169191
if let Ok(mut start) = self.start.lock() {
170192
*start = t;
171193
}
194+
self.value_map.count.store(0, Ordering::SeqCst);
172195

173196
(
174197
s_data.data_points.len(),
@@ -197,12 +220,9 @@ impl<T: Number<T>> Sum<T> {
197220
s_data.is_monotonic = self.monotonic;
198221
s_data.data_points.clear();
199222

200-
let values = match self.value_map.values.write() {
201-
Ok(v) => v,
202-
Err(_) => return (0, None),
203-
};
204-
205-
let n = values.len() + 1;
223+
// Max number of data points need to account for the special casing
224+
// of the no attribute value + overflow attribute.
225+
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
206226
if n > s_data.data_points.capacity() {
207227
s_data
208228
.data_points
@@ -225,16 +245,18 @@ impl<T: Number<T>> Sum<T> {
225245
});
226246
}
227247

248+
let values = match self.value_map.values.write() {
249+
Ok(v) => v,
250+
Err(_) => return (0, None),
251+
};
252+
228253
// TODO: This will use an unbounded amount of memory if there
229254
// are unbounded number of attribute sets being aggregated. Attribute
230255
// sets that become "stale" need to be forgotten so this will not
231256
// overload the system.
232257
for (attrs, value) in values.iter() {
233258
s_data.data_points.push(DataPoint {
234-
attributes: attrs
235-
.iter()
236-
.map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
237-
.collect(),
259+
attributes: attrs.clone(),
238260
start_time: Some(prev_start),
239261
time: Some(t),
240262
value: value.get_value(),
@@ -254,7 +276,7 @@ pub(crate) struct PrecomputedSum<T: Number<T>> {
254276
value_map: ValueMap<T>,
255277
monotonic: bool,
256278
start: Mutex<SystemTime>,
257-
reported: Mutex<HashMap<AttributeSet, T>>,
279+
reported: Mutex<HashMap<Vec<KeyValue>, T>>,
258280
}
259281

260282
impl<T: Number<T>> PrecomputedSum<T> {
@@ -268,7 +290,6 @@ impl<T: Number<T>> PrecomputedSum<T> {
268290
}
269291

270292
pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) {
271-
let attrs: AttributeSet = attrs.into();
272293
self.value_map.measure(measurement, attrs)
273294
}
274295

@@ -294,12 +315,9 @@ impl<T: Number<T>> PrecomputedSum<T> {
294315
s_data.temporality = Temporality::Delta;
295316
s_data.is_monotonic = self.monotonic;
296317

297-
let mut values = match self.value_map.values.write() {
298-
Ok(v) => v,
299-
Err(_) => return (0, None),
300-
};
301-
302-
let n = values.len() + 1;
318+
// Max number of data points need to account for the special casing
319+
// of the no attribute value + overflow attribute.
320+
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
303321
if n > s_data.data_points.capacity() {
304322
s_data
305323
.data_points
@@ -325,17 +343,19 @@ impl<T: Number<T>> PrecomputedSum<T> {
325343
});
326344
}
327345

346+
let mut values = match self.value_map.values.write() {
347+
Ok(v) => v,
348+
Err(_) => return (0, None),
349+
};
350+
328351
let default = T::default();
329352
for (attrs, value) in values.drain() {
330353
let delta = value.get_value() - *reported.get(&attrs).unwrap_or(&default);
331354
if delta != default {
332355
new_reported.insert(attrs.clone(), value.get_value());
333356
}
334357
s_data.data_points.push(DataPoint {
335-
attributes: attrs
336-
.iter()
337-
.map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
338-
.collect(),
358+
attributes: attrs.clone(),
339359
start_time: Some(prev_start),
340360
time: Some(t),
341361
value: delta,
@@ -347,6 +367,7 @@ impl<T: Number<T>> PrecomputedSum<T> {
347367
if let Ok(mut start) = self.start.lock() {
348368
*start = t;
349369
}
370+
self.value_map.count.store(0, Ordering::SeqCst);
350371

351372
*reported = new_reported;
352373
drop(reported); // drop before values guard is dropped
@@ -379,12 +400,9 @@ impl<T: Number<T>> PrecomputedSum<T> {
379400
s_data.temporality = Temporality::Cumulative;
380401
s_data.is_monotonic = self.monotonic;
381402

382-
let values = match self.value_map.values.write() {
383-
Ok(v) => v,
384-
Err(_) => return (0, None),
385-
};
386-
387-
let n = values.len() + 1;
403+
// Max number of data points need to account for the special casing
404+
// of the no attribute value + overflow attribute.
405+
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
388406
if n > s_data.data_points.capacity() {
389407
s_data
390408
.data_points
@@ -410,17 +428,18 @@ impl<T: Number<T>> PrecomputedSum<T> {
410428
});
411429
}
412430

431+
let values = match self.value_map.values.write() {
432+
Ok(v) => v,
433+
Err(_) => return (0, None),
434+
};
413435
let default = T::default();
414436
for (attrs, value) in values.iter() {
415437
let delta = value.get_value() - *reported.get(attrs).unwrap_or(&default);
416438
if delta != default {
417439
new_reported.insert(attrs.clone(), value.get_value());
418440
}
419441
s_data.data_points.push(DataPoint {
420-
attributes: attrs
421-
.iter()
422-
.map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
423-
.collect(),
442+
attributes: attrs.clone(),
424443
start_time: Some(prev_start),
425444
time: Some(t),
426445
value: delta,

0 commit comments

Comments
 (0)