Skip to content

Commit 89d270d

Browse files
frailltMindaugas Vinkelis
authored and
Mindaugas Vinkelis
committed
Fix issue where dupliate attributes are not always removed
1 parent 38f7fd5 commit 89d270d

File tree

9 files changed

+448
-506
lines changed

9 files changed

+448
-506
lines changed

Diff for: opentelemetry-otlp/tests/integration_test/expected/serialized_traces.json

+6
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,12 @@
112112
"value": {
113113
"intValue": "100"
114114
}
115+
},
116+
{
117+
"key": "number/int",
118+
"value": {
119+
"intValue": "100"
120+
}
115121
}
116122
],
117123
"droppedAttributesCount": 0

Diff for: opentelemetry-sdk/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ futures-util = { workspace = true, features = ["std", "sink", "async-await-macro
2020
once_cell = { workspace = true }
2121
percent-encoding = { version = "2.0", optional = true }
2222
rand = { workspace = true, features = ["std", "std_rng","small_rng"], optional = true }
23+
rustc-hash = "2.0"
2324
glob = { version = "0.3.1", optional =true}
2425
serde = { workspace = true, features = ["derive", "rc"], optional = true }
2526
serde_json = { workspace = true, optional = true }

Diff for: opentelemetry-sdk/src/metrics/attribute_set.rs

+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
use std::hash::{Hash, Hasher};
2+
3+
use opentelemetry::{Key, KeyValue, Value};
4+
use rustc_hash::FxHasher;
5+
6+
/// A unique set of attributes that can be used as instrument identifiers.
7+
///
8+
/// This must implement [Hash], [PartialEq], and [Eq] so it may be used as
9+
/// HashMap keys and other de-duplication methods.
10+
#[derive(Clone, Default, Debug, PartialEq, Eq)]
11+
pub(crate) struct AttributeSet(pub(crate) Vec<KeyValue>, u64);
12+
13+
impl From<&[KeyValue]> for AttributeSet {
14+
fn from(values: &[KeyValue]) -> Self {
15+
let mut vec = Vec::from_iter(values.into_iter().cloned());
16+
vec.sort_by(|a, b| a.key.cmp(&b.key));
17+
18+
// we cannot use vec.dedup_by because it will remove last duplicate not first
19+
if vec.len() > 1 {
20+
let mut i = vec.len() - 1;
21+
while i != 0 {
22+
let is_same = unsafe { vec.get_unchecked(i - 1).key == vec.get_unchecked(i).key };
23+
if is_same {
24+
vec.remove(i - 1);
25+
}
26+
i -= 1;
27+
}
28+
}
29+
30+
let hash = calculate_hash(&vec);
31+
AttributeSet(vec, hash)
32+
}
33+
}
34+
35+
fn calculate_hash(values: &[KeyValue]) -> u64 {
36+
let mut hasher = FxHasher::default();
37+
values.iter().fold(&mut hasher, |mut hasher, item| {
38+
item.hash(&mut hasher);
39+
hasher
40+
});
41+
hasher.finish()
42+
}
43+
44+
impl AttributeSet {
45+
/// Iterate over key value pairs in the set
46+
pub(crate) fn iter(&self) -> impl Iterator<Item = (&Key, &Value)> {
47+
self.0.iter().map(|kv| (&kv.key, &kv.value))
48+
}
49+
}
50+
51+
impl Hash for AttributeSet {
52+
fn hash<H: Hasher>(&self, state: &mut H) {
53+
state.write_u64(self.1)
54+
}
55+
}

Diff for: opentelemetry-sdk/src/metrics/internal/histogram.rs

+49-128
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
1-
use std::collections::HashSet;
2-
use std::sync::atomic::Ordering;
3-
use std::sync::Arc;
1+
use std::mem::take;
42
use std::{sync::Mutex, time::SystemTime};
53

64
use crate::metrics::data::HistogramDataPoint;
75
use crate::metrics::data::{self, Aggregation, Temporality};
86
use opentelemetry::KeyValue;
97

10-
use super::Number;
8+
use super::{collect_data_points_readonly, collect_data_points_reset, Number};
119
use super::{AtomicTracker, AtomicallyUpdate, Operation, ValueMap};
1210

1311
struct HistogramUpdate;
@@ -45,7 +43,6 @@ impl<T: Number<T>> AtomicallyUpdate<T> for HistogramTracker<T> {
4543
}
4644
}
4745

48-
#[derive(Default)]
4946
struct Buckets<T> {
5047
counts: Vec<u64>,
5148
count: u64,
@@ -61,7 +58,8 @@ impl<T: Number<T>> Buckets<T> {
6158
counts: vec![0; n],
6259
min: T::max(),
6360
max: T::min(),
64-
..Default::default()
61+
count: 0,
62+
total: T::default(),
6563
}
6664
}
6765

@@ -80,14 +78,17 @@ impl<T: Number<T>> Buckets<T> {
8078
}
8179
}
8280

83-
fn reset(&mut self) {
84-
for item in &mut self.counts {
85-
*item = 0;
86-
}
87-
self.count = Default::default();
88-
self.total = Default::default();
89-
self.min = T::max();
90-
self.max = T::min();
81+
fn clone_and_reset(&mut self) -> Self {
82+
let n = self.counts.len();
83+
let res = Buckets {
84+
counts: take(&mut self.counts),
85+
count: self.count,
86+
total: self.total,
87+
min: self.min,
88+
max: self.max,
89+
};
90+
*self = Buckets::new(n);
91+
res
9192
}
9293
}
9394

@@ -155,26 +156,27 @@ impl<T: Number<T>> Histogram<T> {
155156
h.temporality = Temporality::Delta;
156157
h.data_points.clear();
157158

158-
// Max number of data points need to account for the special casing
159-
// of the no attribute value + overflow attribute.
160-
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
161-
if n > h.data_points.capacity() {
162-
h.data_points.reserve_exact(n - h.data_points.capacity());
163-
}
159+
let Ok(mut trackers) = self.value_map.trackers.write() else {
160+
return (0, None);
161+
};
164162

165-
if self
166-
.value_map
167-
.has_no_attribute_value
168-
.swap(false, Ordering::AcqRel)
169-
{
170-
if let Ok(ref mut b) = self.value_map.no_attribute_tracker.buckets.lock() {
171-
h.data_points.push(HistogramDataPoint {
172-
attributes: vec![],
163+
collect_data_points_reset(
164+
&self.value_map.no_attribs_tracker,
165+
&mut trackers,
166+
&mut h.data_points,
167+
|attributes, tracker| {
168+
let b = tracker
169+
.buckets
170+
.lock()
171+
.unwrap_or_else(|err| err.into_inner())
172+
.clone_and_reset();
173+
HistogramDataPoint {
174+
attributes,
173175
start_time: start,
174176
time: t,
175177
count: b.count,
176178
bounds: self.bounds.clone(),
177-
bucket_counts: b.counts.clone(),
179+
bucket_counts: b.counts,
178180
sum: if self.record_sum {
179181
b.total
180182
} else {
@@ -191,54 +193,14 @@ impl<T: Number<T>> Histogram<T> {
191193
None
192194
},
193195
exemplars: vec![],
194-
});
195-
196-
b.reset();
197-
}
198-
}
199-
200-
let mut trackers = match self.value_map.trackers.write() {
201-
Ok(v) => v,
202-
Err(_) => return (0, None),
203-
};
204-
205-
let mut seen = HashSet::new();
206-
for (attrs, tracker) in trackers.drain() {
207-
if seen.insert(Arc::as_ptr(&tracker)) {
208-
if let Ok(b) = tracker.buckets.lock() {
209-
h.data_points.push(HistogramDataPoint {
210-
attributes: attrs.clone(),
211-
start_time: start,
212-
time: t,
213-
count: b.count,
214-
bounds: self.bounds.clone(),
215-
bucket_counts: b.counts.clone(),
216-
sum: if self.record_sum {
217-
b.total
218-
} else {
219-
T::default()
220-
},
221-
min: if self.record_min_max {
222-
Some(b.min)
223-
} else {
224-
None
225-
},
226-
max: if self.record_min_max {
227-
Some(b.max)
228-
} else {
229-
None
230-
},
231-
exemplars: vec![],
232-
});
233196
}
234-
}
235-
}
197+
},
198+
);
236199

237200
// The delta collection cycle resets.
238201
if let Ok(mut start) = self.start.lock() {
239202
*start = t;
240203
}
241-
self.value_map.count.store(0, Ordering::SeqCst);
242204

243205
(h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
244206
}
@@ -266,21 +228,21 @@ impl<T: Number<T>> Histogram<T> {
266228
h.temporality = Temporality::Cumulative;
267229
h.data_points.clear();
268230

269-
// Max number of data points need to account for the special casing
270-
// of the no attribute value + overflow attribute.
271-
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
272-
if n > h.data_points.capacity() {
273-
h.data_points.reserve_exact(n - h.data_points.capacity());
274-
}
231+
let Ok(trackers) = self.value_map.trackers.read() else {
232+
return (0, None);
233+
};
275234

276-
if self
277-
.value_map
278-
.has_no_attribute_value
279-
.load(Ordering::Acquire)
280-
{
281-
if let Ok(b) = &self.value_map.no_attribute_tracker.buckets.lock() {
282-
h.data_points.push(HistogramDataPoint {
283-
attributes: vec![],
235+
collect_data_points_readonly(
236+
&self.value_map.no_attribs_tracker,
237+
&trackers,
238+
&mut h.data_points,
239+
|attributes, tracker| {
240+
let b = tracker
241+
.buckets
242+
.lock()
243+
.unwrap_or_else(|err| err.into_inner());
244+
HistogramDataPoint {
245+
attributes,
284246
start_time: start,
285247
time: t,
286248
count: b.count,
@@ -302,50 +264,9 @@ impl<T: Number<T>> Histogram<T> {
302264
None
303265
},
304266
exemplars: vec![],
305-
});
306-
}
307-
}
308-
309-
let trackers = match self.value_map.trackers.write() {
310-
Ok(v) => v,
311-
Err(_) => return (0, None),
312-
};
313-
314-
// TODO: This will use an unbounded amount of memory if there
315-
// are unbounded number of attribute sets being aggregated. Attribute
316-
// sets that become "stale" need to be forgotten so this will not
317-
// overload the system.
318-
let mut seen = HashSet::new();
319-
for (attrs, tracker) in trackers.iter() {
320-
if seen.insert(Arc::as_ptr(tracker)) {
321-
if let Ok(b) = tracker.buckets.lock() {
322-
h.data_points.push(HistogramDataPoint {
323-
attributes: attrs.clone(),
324-
start_time: start,
325-
time: t,
326-
count: b.count,
327-
bounds: self.bounds.clone(),
328-
bucket_counts: b.counts.clone(),
329-
sum: if self.record_sum {
330-
b.total
331-
} else {
332-
T::default()
333-
},
334-
min: if self.record_min_max {
335-
Some(b.min)
336-
} else {
337-
None
338-
},
339-
max: if self.record_min_max {
340-
Some(b.max)
341-
} else {
342-
None
343-
},
344-
exemplars: vec![],
345-
});
346267
}
347-
}
348-
}
268+
},
269+
);
349270

350271
(h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
351272
}

0 commit comments

Comments
 (0)