Skip to content

Metric refactor - 2x perf and allocation free #1989

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions opentelemetry-sdk/benches/metric_counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
RAM: 64.0 GB
| Test | Average time|
|--------------------------------|-------------|
| Counter_Add_Sorted | 560 ns |
| Counter_Add_Unsorted | 565 ns |
| Counter_Overflow | 568 ns |
| Counter_Add_Sorted | 193 ns |
| Counter_Add_Unsorted | 209 ns |
| Counter_Overflow | 898 ns |
| ThreadLocal_Random_Generator_5 | 37 ns |
*/

Expand Down
177 changes: 98 additions & 79 deletions opentelemetry-sdk/src/metrics/internal/sum.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::collections::HashSet;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::vec;
use std::{
collections::HashMap,
Expand All @@ -8,19 +10,21 @@

use crate::metrics::data::{self, Aggregation, DataPoint, Temporality};
use crate::metrics::AttributeSet;
use once_cell::sync::Lazy;
use opentelemetry::KeyValue;
use opentelemetry::{global, metrics::MetricsError};

use super::{
aggregate::{is_under_cardinality_limit, STREAM_OVERFLOW_ATTRIBUTE_SET},
AtomicTracker, Number,
};
use super::{aggregate::is_under_cardinality_limit, AtomicTracker, Number};

pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy<Vec<KeyValue>> =
Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]);

/// The storage for sums.
struct ValueMap<T: Number<T>> {
values: RwLock<HashMap<AttributeSet, T::AtomicTracker>>,
values: RwLock<HashMap<Vec<KeyValue>, Arc<T::AtomicTracker>>>,
Copy link
Member Author

@cijothomas cijothomas Aug 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metric value is Arc shared to ensure attributes provided in any order results in contributing to same timeseries. Arc equality is also leveraged in export time to de-dup entries.

has_no_value_attribute_value: AtomicBool,
no_attribute_value: T::AtomicTracker,
count: AtomicUsize,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

count is tracked separately, as number of entries in HashMap is no longer accurate, as we insert original+sorted in hashmap.

}

impl<T: Number<T>> Default for ValueMap<T> {
Expand All @@ -35,42 +39,59 @@
values: RwLock::new(HashMap::new()),
has_no_value_attribute_value: AtomicBool::new(false),
no_attribute_value: T::new_atomic_tracker(),
count: AtomicUsize::new(0),
}
}
}

impl<T: Number<T>> ValueMap<T> {
fn measure(&self, measurement: T, attrs: AttributeSet) {
fn measure(&self, measurement: T, attrs: &[KeyValue]) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the key - we operate on incoming ref/slice itself without allocating, copying, sorting, de-duplicating. If the look up fails, the "slow" path will do everything as before. The net effect is, for users who use metrics API normally (i.e without duplicates, and without changing the order of attributes), we'll have highest performance and zero heap allocations.

if attrs.is_empty() {
self.no_attribute_value.add(measurement);
self.has_no_value_attribute_value
.store(true, Ordering::Release);
} else if let Ok(values) = self.values.read() {
if let Some(value_to_update) = values.get(&attrs) {
// Try incoming order first
if let Some(value_to_update) = values.get(attrs) {
value_to_update.add(measurement);
return;
} else {
drop(values);
if let Ok(mut values) = self.values.write() {
// Recheck after acquiring write lock, in case another
// thread has added the value.
if let Some(value_to_update) = values.get(&attrs) {
value_to_update.add(measurement);
return;
} else if is_under_cardinality_limit(values.len()) {
let new_value = T::new_atomic_tracker();
new_value.add(measurement);
values.insert(attrs, new_value);
} else if let Some(overflow_value) =
values.get_mut(&STREAM_OVERFLOW_ATTRIBUTE_SET)
{
overflow_value.add(measurement);
return;
} else {
let new_value = T::new_atomic_tracker();
new_value.add(measurement);
values.insert(STREAM_OVERFLOW_ATTRIBUTE_SET.clone(), new_value);
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into()));
// Then try sorted order.
let sorted_attrs = AttributeSet::from(attrs).into_vec();
if let Some(value_to_update) = values.get(sorted_attrs.as_slice()) {
value_to_update.add(measurement);
} else {
// Give up the lock, before acquiring write lock.
drop(values);
if let Ok(mut values) = self.values.write() {
// Recheck both incoming and sorted after acquiring
// write lock, in case another thread has added the
// value.
if let Some(value_to_update) = values.get(attrs) {
value_to_update.add(measurement);

Check warning on line 70 in opentelemetry-sdk/src/metrics/internal/sum.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/sum.rs#L70

Added line #L70 was not covered by tests
} else if let Some(value_to_update) = values.get(sorted_attrs.as_slice()) {
value_to_update.add(measurement);

Check warning on line 72 in opentelemetry-sdk/src/metrics/internal/sum.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/sum.rs#L72

Added line #L72 was not covered by tests
} else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) {
let new_value = T::new_atomic_tracker();
new_value.add(measurement);
let new_value = Arc::new(new_value);

// Insert original order
values.insert(attrs.to_vec(), new_value.clone());

// Insert sorted order
values.insert(sorted_attrs, new_value);

self.count.fetch_add(1, Ordering::SeqCst);
} else if let Some(overflow_value) =
values.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice())
{
overflow_value.add(measurement);
} else {
let new_value = T::new_atomic_tracker();
new_value.add(measurement);
values.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_value));
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into()));
}
}
}
}
Expand Down Expand Up @@ -100,7 +121,6 @@
}

pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) {
let attrs: AttributeSet = attrs.into();
self.value_map.measure(measurement, attrs)
}

Expand All @@ -125,12 +145,9 @@
s_data.is_monotonic = self.monotonic;
s_data.data_points.clear();

let mut values = match self.value_map.values.write() {
Ok(v) => v,
Err(_) => return (0, None),
};

let n = values.len() + 1;
// Max number of data points need to account for the special casing
// of the no attribute value + overflow attribute.
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this calculation is important for correctness of the export. If it is, then we should do this under the self.value_map.values.write() lock. Otherwise, we read a certain value here and before we reserve capacity for data points, some update thread(s) could enter the write lock and increment the count further.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not look important for correctness. Even if it is off by few, it just results in vec resizing in the middle of collect.
We can revisit this.

if n > s_data.data_points.capacity() {
s_data
.data_points
Expand All @@ -152,23 +169,29 @@
});
}

let mut values = match self.value_map.values.write() {
Ok(v) => v,
Err(_) => return (0, None),

Check warning on line 174 in opentelemetry-sdk/src/metrics/internal/sum.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/sum.rs#L174

Added line #L174 was not covered by tests
};

let mut seen = HashSet::new();
for (attrs, value) in values.drain() {
s_data.data_points.push(DataPoint {
attributes: attrs
.iter()
.map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
.collect(),
start_time: Some(prev_start),
time: Some(t),
value: value.get_value(),
exemplars: vec![],
});
if seen.insert(Arc::as_ptr(&value)) {
Copy link
Member Author

@cijothomas cijothomas Aug 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dedup sorted+original attributes!
@utpilla I leveraged Arc pointer to de-dup. Storing metric points in a different array was tried as well, but this looks easier to me.

s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: value.get_value(),
exemplars: vec![],
});
}
}

// The delta collection cycle resets.
if let Ok(mut start) = self.start.lock() {
*start = t;
}
self.value_map.count.store(0, Ordering::SeqCst);

(
s_data.data_points.len(),
Expand Down Expand Up @@ -197,12 +220,9 @@
s_data.is_monotonic = self.monotonic;
s_data.data_points.clear();

let values = match self.value_map.values.write() {
Ok(v) => v,
Err(_) => return (0, None),
};

let n = values.len() + 1;
// Max number of data points need to account for the special casing
// of the no attribute value + overflow attribute.
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
if n > s_data.data_points.capacity() {
s_data
.data_points
Expand All @@ -225,16 +245,18 @@
});
}

let values = match self.value_map.values.write() {
Ok(v) => v,
Err(_) => return (0, None),

Check warning on line 250 in opentelemetry-sdk/src/metrics/internal/sum.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/sum.rs#L250

Added line #L250 was not covered by tests
};

// TODO: This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
for (attrs, value) in values.iter() {
s_data.data_points.push(DataPoint {
attributes: attrs
.iter()
.map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
.collect(),
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: value.get_value(),
Expand All @@ -254,7 +276,7 @@
value_map: ValueMap<T>,
monotonic: bool,
start: Mutex<SystemTime>,
reported: Mutex<HashMap<AttributeSet, T>>,
reported: Mutex<HashMap<Vec<KeyValue>, T>>,
}

impl<T: Number<T>> PrecomputedSum<T> {
Expand All @@ -268,7 +290,6 @@
}

pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) {
let attrs: AttributeSet = attrs.into();
self.value_map.measure(measurement, attrs)
}

Expand All @@ -294,12 +315,9 @@
s_data.temporality = Temporality::Delta;
s_data.is_monotonic = self.monotonic;

let mut values = match self.value_map.values.write() {
Ok(v) => v,
Err(_) => return (0, None),
};

let n = values.len() + 1;
// Max number of data points need to account for the special casing
// of the no attribute value + overflow attribute.
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
if n > s_data.data_points.capacity() {
s_data
.data_points
Expand All @@ -325,17 +343,19 @@
});
}

let mut values = match self.value_map.values.write() {
Ok(v) => v,
Err(_) => return (0, None),

Check warning on line 348 in opentelemetry-sdk/src/metrics/internal/sum.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/sum.rs#L348

Added line #L348 was not covered by tests
};

let default = T::default();
for (attrs, value) in values.drain() {
let delta = value.get_value() - *reported.get(&attrs).unwrap_or(&default);
if delta != default {
new_reported.insert(attrs.clone(), value.get_value());
}
s_data.data_points.push(DataPoint {
attributes: attrs
.iter()
.map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
.collect(),
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: delta,
Expand All @@ -347,6 +367,7 @@
if let Ok(mut start) = self.start.lock() {
*start = t;
}
self.value_map.count.store(0, Ordering::SeqCst);

*reported = new_reported;
drop(reported); // drop before values guard is dropped
Expand Down Expand Up @@ -379,12 +400,9 @@
s_data.temporality = Temporality::Cumulative;
s_data.is_monotonic = self.monotonic;

let values = match self.value_map.values.write() {
Ok(v) => v,
Err(_) => return (0, None),
};

let n = values.len() + 1;
// Max number of data points need to account for the special casing
// of the no attribute value + overflow attribute.
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
if n > s_data.data_points.capacity() {
s_data
.data_points
Expand All @@ -410,17 +428,18 @@
});
}

let values = match self.value_map.values.write() {
Ok(v) => v,
Err(_) => return (0, None),

Check warning on line 433 in opentelemetry-sdk/src/metrics/internal/sum.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/sum.rs#L433

Added line #L433 was not covered by tests
};
let default = T::default();
for (attrs, value) in values.iter() {
let delta = value.get_value() - *reported.get(attrs).unwrap_or(&default);
if delta != default {
new_reported.insert(attrs.clone(), value.get_value());
}
s_data.data_points.push(DataPoint {
attributes: attrs
.iter()
.map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
.collect(),
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: delta,
Expand Down
Loading
Loading