@@ -159,6 +159,7 @@ mod tests {
159
159
use opentelemetry:: { metrics:: MeterProvider as _, KeyValue } ;
160
160
use rand:: { rngs, Rng , SeedableRng } ;
161
161
use std:: borrow:: Cow ;
162
+ use std:: cmp:: { max, min} ;
162
163
use std:: sync:: atomic:: { AtomicBool , Ordering } ;
163
164
use std:: sync:: { Arc , Mutex } ;
164
165
use std:: thread;
@@ -1215,6 +1216,14 @@ mod tests {
1215
1216
counter_f64_multithreaded_aggregation_helper ( Temporality :: Cumulative ) ;
1216
1217
}
1217
1218
1219
+ #[ tokio:: test( flavor = "multi_thread" , worker_threads = 1 ) ]
1220
+ async fn histogram_multithreaded ( ) {
1221
+ // Run this test with stdout enabled to see output.
1222
+ // cargo test histogram_multithreaded --features=testing -- --nocapture
1223
+
1224
+ histogram_multithreaded_aggregation_helper ( Temporality :: Delta ) ;
1225
+ histogram_multithreaded_aggregation_helper ( Temporality :: Cumulative ) ;
1226
+ }
1218
1227
#[ tokio:: test( flavor = "multi_thread" , worker_threads = 1 ) ]
1219
1228
async fn synchronous_instruments_cumulative_with_gap_in_measurements ( ) {
1220
1229
// Run this test with stdout enabled to see output.
@@ -1577,6 +1586,143 @@ mod tests {
1577
1586
assert ! ( f64 :: abs( 61.5 - sum_key1_value1) < 0.0001 ) ; // Each of the 10 update threads record measurements 5 times = 10 * 5 * 1.23 = 61.5
1578
1587
}
1579
1588
1589
+ fn histogram_multithreaded_aggregation_helper ( temporality : Temporality ) {
1590
+ // Arrange
1591
+ let mut test_context = TestContext :: new ( temporality) ;
1592
+ let histogram = Arc :: new ( test_context. meter ( ) . u64_histogram ( "test_histogram" ) . init ( ) ) ;
1593
+
1594
+ for i in 0 ..10 {
1595
+ thread:: scope ( |s| {
1596
+ s. spawn ( || {
1597
+ histogram. record ( 1 , & [ ] ) ;
1598
+ histogram. record ( 4 , & [ ] ) ;
1599
+
1600
+ histogram. record ( 5 , & [ KeyValue :: new ( "key1" , "value1" ) ] ) ;
1601
+ histogram. record ( 7 , & [ KeyValue :: new ( "key1" , "value1" ) ] ) ;
1602
+ histogram. record ( 18 , & [ KeyValue :: new ( "key1" , "value1" ) ] ) ;
1603
+
1604
+ // Test concurrent collection by forcing half of the update threads to `force_flush` metrics and sleep for some time.
1605
+ if i % 2 == 0 {
1606
+ test_context. flush_metrics ( ) ;
1607
+ thread:: sleep ( Duration :: from_millis ( i) ) ; // Make each thread sleep for some time duration for better testing
1608
+ }
1609
+
1610
+ histogram. record ( 35 , & [ KeyValue :: new ( "key1" , "value1" ) ] ) ;
1611
+ histogram. record ( 35 , & [ KeyValue :: new ( "key1" , "value1" ) ] ) ;
1612
+ } ) ;
1613
+ } ) ;
1614
+ }
1615
+
1616
+ test_context. flush_metrics ( ) ;
1617
+
1618
+ // Assert
1619
+ // We invoke `test_context.flush_metrics()` six times.
1620
+ let histograms = test_context. get_from_multiple_aggregations :: < data:: Histogram < u64 > > (
1621
+ "test_histogram" ,
1622
+ None ,
1623
+ 6 ,
1624
+ ) ;
1625
+
1626
+ let (
1627
+ mut sum_zero_attributes,
1628
+ mut count_zero_attributes,
1629
+ mut min_zero_attributes,
1630
+ mut max_zero_attributes,
1631
+ ) = ( 0 , 0 , u64:: MAX , u64:: MIN ) ;
1632
+ let ( mut sum_key1_value1, mut count_key1_value1, mut min_key1_value1, mut max_key1_value1) =
1633
+ ( 0 , 0 , u64:: MAX , u64:: MIN ) ;
1634
+
1635
+ let mut bucket_counts_zero_attributes = vec ! [ 0 ; 16 ] ; // There are 16 buckets for the default configuration
1636
+ let mut bucket_counts_key1_value1 = vec ! [ 0 ; 16 ] ;
1637
+
1638
+ histograms. iter ( ) . for_each ( |histogram| {
1639
+ assert_eq ! ( histogram. data_points. len( ) , 2 ) ; // Expecting 1 time-series.
1640
+ assert_eq ! ( histogram. temporality, temporality) ;
1641
+
1642
+ let data_point_zero_attributes =
1643
+ find_histogram_datapoint_with_no_attributes ( & histogram. data_points ) . unwrap ( ) ;
1644
+ let data_point_key1_value1 =
1645
+ find_histogram_datapoint_with_key_value ( & histogram. data_points , "key1" , "value1" )
1646
+ . unwrap ( ) ;
1647
+
1648
+ if temporality == Temporality :: Delta {
1649
+ sum_zero_attributes += data_point_zero_attributes. sum ;
1650
+ sum_key1_value1 += data_point_key1_value1. sum ;
1651
+
1652
+ count_zero_attributes += data_point_zero_attributes. count ;
1653
+ count_key1_value1 += data_point_key1_value1. count ;
1654
+
1655
+ min_zero_attributes =
1656
+ min ( min_zero_attributes, data_point_zero_attributes. min . unwrap ( ) ) ;
1657
+ min_key1_value1 = min ( min_key1_value1, data_point_key1_value1. min . unwrap ( ) ) ;
1658
+
1659
+ max_zero_attributes =
1660
+ max ( max_zero_attributes, data_point_zero_attributes. max . unwrap ( ) ) ;
1661
+ max_key1_value1 = max ( max_key1_value1, data_point_key1_value1. max . unwrap ( ) ) ;
1662
+
1663
+ assert_eq ! ( data_point_zero_attributes. bucket_counts. len( ) , 16 ) ;
1664
+ assert_eq ! ( data_point_key1_value1. bucket_counts. len( ) , 16 ) ;
1665
+
1666
+ for ( i, _) in data_point_zero_attributes. bucket_counts . iter ( ) . enumerate ( ) {
1667
+ bucket_counts_zero_attributes[ i] += data_point_zero_attributes. bucket_counts [ i] ;
1668
+ }
1669
+
1670
+ for ( i, _) in data_point_key1_value1. bucket_counts . iter ( ) . enumerate ( ) {
1671
+ bucket_counts_key1_value1[ i] += data_point_key1_value1. bucket_counts [ i] ;
1672
+ }
1673
+ } else {
1674
+ sum_zero_attributes = data_point_zero_attributes. sum ;
1675
+ sum_key1_value1 = data_point_key1_value1. sum ;
1676
+
1677
+ count_zero_attributes = data_point_zero_attributes. count ;
1678
+ count_key1_value1 = data_point_key1_value1. count ;
1679
+
1680
+ min_zero_attributes = data_point_zero_attributes. min . unwrap ( ) ;
1681
+ min_key1_value1 = data_point_key1_value1. min . unwrap ( ) ;
1682
+
1683
+ max_zero_attributes = data_point_zero_attributes. max . unwrap ( ) ;
1684
+ max_key1_value1 = data_point_key1_value1. max . unwrap ( ) ;
1685
+
1686
+ assert_eq ! ( data_point_zero_attributes. bucket_counts. len( ) , 16 ) ;
1687
+ assert_eq ! ( data_point_key1_value1. bucket_counts. len( ) , 16 ) ;
1688
+
1689
+ bucket_counts_zero_attributes. clone_from ( & data_point_zero_attributes. bucket_counts ) ;
1690
+ bucket_counts_key1_value1. clone_from ( & data_point_key1_value1. bucket_counts ) ;
1691
+ } ;
1692
+ } ) ;
1693
+
1694
+ // Default buckets:
1695
+ // (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, 25.0], (25.0, 50.0], (50.0, 75.0], (75.0, 100.0], (100.0, 250.0], (250.0, 500.0],
1696
+ // (500.0, 750.0], (750.0, 1000.0], (1000.0, 2500.0], (2500.0, 5000.0], (5000.0, 7500.0], (7500.0, 10000.0], (10000.0, +∞).
1697
+
1698
+ assert_eq ! ( count_zero_attributes, 20 ) ; // Each of the 10 update threads record two measurements.
1699
+ assert_eq ! ( sum_zero_attributes, 50 ) ; // Each of the 10 update threads record measurements summing up to 5.
1700
+ assert_eq ! ( min_zero_attributes, 1 ) ;
1701
+ assert_eq ! ( max_zero_attributes, 4 ) ;
1702
+
1703
+ for ( i, count) in bucket_counts_zero_attributes. iter ( ) . enumerate ( ) {
1704
+ match i {
1705
+ 1 => assert_eq ! ( * count, 20 ) , // For each of the 10 update threads, both the recorded values 1 and 4 fall under the bucket (0, 5].
1706
+ _ => assert_eq ! ( * count, 0 ) ,
1707
+ }
1708
+ }
1709
+
1710
+ assert_eq ! ( count_key1_value1, 50 ) ; // Each of the 10 update threads record 5 measurements.
1711
+ assert_eq ! ( sum_key1_value1, 1000 ) ; // Each of the 10 update threads record measurements summing up to 100 (5 + 7 + 18 + 35 + 35).
1712
+ assert_eq ! ( min_key1_value1, 5 ) ;
1713
+ assert_eq ! ( max_key1_value1, 35 ) ;
1714
+
1715
+ for ( i, count) in bucket_counts_key1_value1. iter ( ) . enumerate ( ) {
1716
+ match i {
1717
+ 1 => assert_eq ! ( * count, 10 ) , // For each of the 10 update threads, the recorded value 5 falls under the bucket (0, 5].
1718
+ 2 => assert_eq ! ( * count, 10 ) , // For each of the 10 update threads, the recorded value 7 falls under the bucket (5, 10].
1719
+ 3 => assert_eq ! ( * count, 10 ) , // For each of the 10 update threads, the recorded value 18 falls under the bucket (10, 25].
1720
+ 4 => assert_eq ! ( * count, 20 ) , // For each of the 10 update threads, the recorded value 35 (recorded twice) falls under the bucket (25, 50].
1721
+ _ => assert_eq ! ( * count, 0 ) ,
1722
+ }
1723
+ }
1724
+ }
1725
+
1580
1726
fn histogram_aggregation_helper ( temporality : Temporality ) {
1581
1727
// Arrange
1582
1728
let mut test_context = TestContext :: new ( temporality) ;
0 commit comments