Skip to content

Commit f2e9df2

Browse files
utpillacijothomas
andauthored
Fix metrics aggregation bug for Sum and Precomputed Sum to avoid duplicate export (#2018)
Co-authored-by: Cijo Thomas <[email protected]>
1 parent 07b918d commit f2e9df2

File tree

2 files changed

+169
-149
lines changed

2 files changed

+169
-149
lines changed

opentelemetry-sdk/src/metrics/internal/sum.rs

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -165,14 +165,17 @@ impl<T: Number<T>> Sum<T> {
165165
// are unbounded number of attribute sets being aggregated. Attribute
166166
// sets that become "stale" need to be forgotten so this will not
167167
// overload the system.
168+
let mut seen = HashSet::new();
168169
for (attrs, tracker) in trackers.iter() {
169-
s_data.data_points.push(DataPoint {
170-
attributes: attrs.clone(),
171-
start_time: Some(prev_start),
172-
time: Some(t),
173-
value: tracker.get_value(),
174-
exemplars: vec![],
175-
});
170+
if seen.insert(Arc::as_ptr(tracker)) {
171+
s_data.data_points.push(DataPoint {
172+
attributes: attrs.clone(),
173+
start_time: Some(prev_start),
174+
time: Some(t),
175+
value: tracker.get_value(),
176+
exemplars: vec![],
177+
});
178+
}
176179
}
177180

178181
(
@@ -263,17 +266,20 @@ impl<T: Number<T>> PrecomputedSum<T> {
263266
Err(_) => return (0, None),
264267
};
265268

269+
let mut seen = HashSet::new();
266270
for (attrs, tracker) in trackers.drain() {
267-
let value = tracker.get_value();
268-
let delta = value - *reported.get(&attrs).unwrap_or(&T::default());
269-
new_reported.insert(attrs.clone(), value);
270-
s_data.data_points.push(DataPoint {
271-
attributes: attrs.clone(),
272-
start_time: Some(prev_start),
273-
time: Some(t),
274-
value: delta,
275-
exemplars: vec![],
276-
});
271+
if seen.insert(Arc::as_ptr(&tracker)) {
272+
let value = tracker.get_value();
273+
let delta = value - *reported.get(&attrs).unwrap_or(&T::default());
274+
new_reported.insert(attrs.clone(), value);
275+
s_data.data_points.push(DataPoint {
276+
attributes: attrs.clone(),
277+
start_time: Some(prev_start),
278+
time: Some(t),
279+
value: delta,
280+
exemplars: vec![],
281+
});
282+
}
277283
}
278284

279285
// The delta collection cycle resets.
@@ -340,14 +346,18 @@ impl<T: Number<T>> PrecomputedSum<T> {
340346
Ok(v) => v,
341347
Err(_) => return (0, None),
342348
};
349+
350+
let mut seen = HashSet::new();
343351
for (attrs, tracker) in trackers.iter() {
344-
s_data.data_points.push(DataPoint {
345-
attributes: attrs.clone(),
346-
start_time: Some(prev_start),
347-
time: Some(t),
348-
value: tracker.get_value(),
349-
exemplars: vec![],
350-
});
352+
if seen.insert(Arc::as_ptr(tracker)) {
353+
s_data.data_points.push(DataPoint {
354+
attributes: attrs.clone(),
355+
start_time: Some(prev_start),
356+
time: Some(t),
357+
value: tracker.get_value(),
358+
exemplars: vec![],
359+
});
360+
}
351361
}
352362

353363
(

opentelemetry-sdk/src/metrics/mod.rs

Lines changed: 135 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -835,145 +835,155 @@ mod tests {
835835
// Run this test with stdout enabled to see output.
836836
// cargo test counter_aggregation_attribute_order_sorted_first --features=testing -- --nocapture
837837

838-
// Arrange
839-
let mut test_context = TestContext::new(Temporality::Delta);
840-
let counter = test_context.u64_counter("test", "my_counter", None);
841-
842-
// Act
843-
// Add the same set of attributes in different order. (they are expected
844-
// to be treated as same attributes)
845-
// start with sorted order
846-
counter.add(
847-
1,
848-
&[
849-
KeyValue::new("A", "a"),
850-
KeyValue::new("B", "b"),
851-
KeyValue::new("C", "c"),
852-
],
853-
);
854-
counter.add(
855-
1,
856-
&[
857-
KeyValue::new("A", "a"),
858-
KeyValue::new("C", "c"),
859-
KeyValue::new("B", "b"),
860-
],
861-
);
862-
counter.add(
863-
1,
864-
&[
865-
KeyValue::new("B", "b"),
866-
KeyValue::new("A", "a"),
867-
KeyValue::new("C", "c"),
868-
],
869-
);
870-
counter.add(
871-
1,
872-
&[
873-
KeyValue::new("B", "b"),
874-
KeyValue::new("C", "c"),
875-
KeyValue::new("A", "a"),
876-
],
877-
);
878-
counter.add(
879-
1,
880-
&[
881-
KeyValue::new("C", "c"),
882-
KeyValue::new("B", "b"),
883-
KeyValue::new("A", "a"),
884-
],
885-
);
886-
counter.add(
887-
1,
888-
&[
889-
KeyValue::new("C", "c"),
890-
KeyValue::new("A", "a"),
891-
KeyValue::new("B", "b"),
892-
],
893-
);
894-
test_context.flush_metrics();
838+
counter_aggregation_attribute_order_sorted_first_helper(Temporality::Delta);
839+
counter_aggregation_attribute_order_sorted_first_helper(Temporality::Cumulative);
840+
841+
fn counter_aggregation_attribute_order_sorted_first_helper(temporality: Temporality) {
842+
// Arrange
843+
let mut test_context = TestContext::new(temporality);
844+
let counter = test_context.u64_counter("test", "my_counter", None);
845+
846+
// Act
847+
// Add the same set of attributes in different order. (they are expected
848+
// to be treated as same attributes)
849+
// start with sorted order
850+
counter.add(
851+
1,
852+
&[
853+
KeyValue::new("A", "a"),
854+
KeyValue::new("B", "b"),
855+
KeyValue::new("C", "c"),
856+
],
857+
);
858+
counter.add(
859+
1,
860+
&[
861+
KeyValue::new("A", "a"),
862+
KeyValue::new("C", "c"),
863+
KeyValue::new("B", "b"),
864+
],
865+
);
866+
counter.add(
867+
1,
868+
&[
869+
KeyValue::new("B", "b"),
870+
KeyValue::new("A", "a"),
871+
KeyValue::new("C", "c"),
872+
],
873+
);
874+
counter.add(
875+
1,
876+
&[
877+
KeyValue::new("B", "b"),
878+
KeyValue::new("C", "c"),
879+
KeyValue::new("A", "a"),
880+
],
881+
);
882+
counter.add(
883+
1,
884+
&[
885+
KeyValue::new("C", "c"),
886+
KeyValue::new("B", "b"),
887+
KeyValue::new("A", "a"),
888+
],
889+
);
890+
counter.add(
891+
1,
892+
&[
893+
KeyValue::new("C", "c"),
894+
KeyValue::new("A", "a"),
895+
KeyValue::new("B", "b"),
896+
],
897+
);
898+
test_context.flush_metrics();
895899

896-
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
900+
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
897901

898-
// Expecting 1 time-series.
899-
assert_eq!(sum.data_points.len(), 1);
902+
// Expecting 1 time-series.
903+
assert_eq!(sum.data_points.len(), 1);
900904

901-
// validate the sole datapoint
902-
let data_point1 = &sum.data_points[0];
903-
assert_eq!(data_point1.value, 6);
905+
// validate the sole datapoint
906+
let data_point1 = &sum.data_points[0];
907+
assert_eq!(data_point1.value, 6);
908+
}
904909
}
905910

906911
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
907912
async fn counter_aggregation_attribute_order_unsorted_first() {
908913
// Run this test with stdout enabled to see output.
909914
// cargo test counter_aggregation_attribute_order_unsorted_first --features=testing -- --nocapture
910915

911-
// Arrange
912-
let mut test_context = TestContext::new(Temporality::Delta);
913-
let counter = test_context.u64_counter("test", "my_counter", None);
916+
counter_aggregation_attribute_order_unsorted_first_helper(Temporality::Delta);
917+
counter_aggregation_attribute_order_unsorted_first_helper(Temporality::Cumulative);
914918

915-
// Act
916-
// Add the same set of attributes in different order. (they are expected
917-
// to be treated as same attributes)
918-
// start with unsorted order
919-
counter.add(
920-
1,
921-
&[
922-
KeyValue::new("A", "a"),
923-
KeyValue::new("C", "c"),
924-
KeyValue::new("B", "b"),
925-
],
926-
);
927-
counter.add(
928-
1,
929-
&[
930-
KeyValue::new("A", "a"),
931-
KeyValue::new("B", "b"),
932-
KeyValue::new("C", "c"),
933-
],
934-
);
935-
counter.add(
936-
1,
937-
&[
938-
KeyValue::new("B", "b"),
939-
KeyValue::new("A", "a"),
940-
KeyValue::new("C", "c"),
941-
],
942-
);
943-
counter.add(
944-
1,
945-
&[
946-
KeyValue::new("B", "b"),
947-
KeyValue::new("C", "c"),
948-
KeyValue::new("A", "a"),
949-
],
950-
);
951-
counter.add(
952-
1,
953-
&[
954-
KeyValue::new("C", "c"),
955-
KeyValue::new("B", "b"),
956-
KeyValue::new("A", "a"),
957-
],
958-
);
959-
counter.add(
960-
1,
961-
&[
962-
KeyValue::new("C", "c"),
963-
KeyValue::new("A", "a"),
964-
KeyValue::new("B", "b"),
965-
],
966-
);
967-
test_context.flush_metrics();
919+
fn counter_aggregation_attribute_order_unsorted_first_helper(temporality: Temporality) {
920+
// Arrange
921+
let mut test_context = TestContext::new(temporality);
922+
let counter = test_context.u64_counter("test", "my_counter", None);
968923

969-
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
924+
// Act
925+
// Add the same set of attributes in different order. (they are expected
926+
// to be treated as same attributes)
927+
// start with unsorted order
928+
counter.add(
929+
1,
930+
&[
931+
KeyValue::new("A", "a"),
932+
KeyValue::new("C", "c"),
933+
KeyValue::new("B", "b"),
934+
],
935+
);
936+
counter.add(
937+
1,
938+
&[
939+
KeyValue::new("A", "a"),
940+
KeyValue::new("B", "b"),
941+
KeyValue::new("C", "c"),
942+
],
943+
);
944+
counter.add(
945+
1,
946+
&[
947+
KeyValue::new("B", "b"),
948+
KeyValue::new("A", "a"),
949+
KeyValue::new("C", "c"),
950+
],
951+
);
952+
counter.add(
953+
1,
954+
&[
955+
KeyValue::new("B", "b"),
956+
KeyValue::new("C", "c"),
957+
KeyValue::new("A", "a"),
958+
],
959+
);
960+
counter.add(
961+
1,
962+
&[
963+
KeyValue::new("C", "c"),
964+
KeyValue::new("B", "b"),
965+
KeyValue::new("A", "a"),
966+
],
967+
);
968+
counter.add(
969+
1,
970+
&[
971+
KeyValue::new("C", "c"),
972+
KeyValue::new("A", "a"),
973+
KeyValue::new("B", "b"),
974+
],
975+
);
976+
test_context.flush_metrics();
970977

971-
// Expecting 1 time-series.
972-
assert_eq!(sum.data_points.len(), 1);
978+
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
973979

974-
// validate the sole datapoint
975-
let data_point1 = &sum.data_points[0];
976-
assert_eq!(data_point1.value, 6);
980+
// Expecting 1 time-series.
981+
assert_eq!(sum.data_points.len(), 1);
982+
983+
// validate the sole datapoint
984+
let data_point1 = &sum.data_points[0];
985+
assert_eq!(data_point1.value, 6);
986+
}
977987
}
978988

979989
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]

0 commit comments

Comments
 (0)