Skip to content

Commit 246dff2

Browse files
committed
Collect and export with zero allocations and clones
1 parent e2280f7 commit 246dff2

File tree

6 files changed

+141
-119
lines changed

6 files changed

+141
-119
lines changed

opentelemetry-sdk/src/metrics/exporter.rs

+45-26
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,15 @@
33
use opentelemetry::InstrumentationScope;
44

55
use crate::{error::OTelSdkResult, Resource};
6-
use std::{fmt::Debug, slice::Iter, time::Duration};
6+
use std::{
7+
fmt::Debug,
8+
slice::Iter,
9+
time::{Duration, SystemTime},
10+
};
711

812
use super::{
9-
data::AggregatedMetrics,
10-
reader::{MetricsData, ResourceMetricsData, ScopeMetricsData},
13+
data::{AggregatedMetrics, Sum},
14+
pipeline::InstrumentSync,
1115
InstrumentInfo, Temporality,
1216
};
1317

@@ -23,7 +27,7 @@ pub struct ResourceMetrics<'a> {
2327
/// Iterator over libraries instrumentation scopes ([`InstrumentationScope`]) together with metrics.
2428
/// Doesn't implement standard [`Iterator`], because it returns borrowed items. AKA "LendingIterator".
2529
pub struct ScopeMetricsLendingIter<'a> {
26-
iter: Iter<'a, ScopeMetricsData>,
30+
iter: std::collections::hash_map::Iter<'a, InstrumentationScope, Vec<InstrumentSync>>,
2731
}
2832

2933
/// A collection of metrics produced by a [`InstrumentationScope`] meter.
@@ -38,7 +42,9 @@ pub struct ScopeMetrics<'a> {
3842
/// Iterator over aggregations created by the meter.
3943
/// Doesn't implement standard [`Iterator`], because it returns borrowed items. AKA "LendingIterator".
4044
pub struct MetricsLendingIter<'a> {
41-
iter: Iter<'a, MetricsData>,
45+
// for optimization purposes
46+
aggr: AggregatedMetrics,
47+
iter: Iter<'a, InstrumentSync>,
4248
}
4349

4450
/// A collection of one or more aggregated time series from an [Instrument].
@@ -53,23 +59,13 @@ pub struct Metric<'a> {
5359
}
5460

5561
impl<'a> ResourceMetrics<'a> {
56-
pub(crate) fn new(rm: &'a ResourceMetricsData) -> Self {
57-
Self {
58-
resource: &rm.resource,
59-
scope_metrics: ScopeMetricsLendingIter {
60-
iter: rm.scope_metrics.iter(),
61-
},
62-
}
63-
}
64-
}
65-
66-
impl<'a> ScopeMetrics<'a> {
67-
fn new(sm: &'a ScopeMetricsData) -> Self {
62+
pub(crate) fn new(
63+
resource: &'a Resource,
64+
iter: std::collections::hash_map::Iter<'a, InstrumentationScope, Vec<InstrumentSync>>,
65+
) -> Self {
6866
Self {
69-
scope: &sm.scope,
70-
metrics: MetricsLendingIter {
71-
iter: sm.metrics.iter(),
72-
},
67+
resource,
68+
scope_metrics: ScopeMetricsLendingIter { iter },
7369
}
7470
}
7571
}
@@ -83,17 +79,40 @@ impl Debug for ScopeMetricsLendingIter<'_> {
8379
impl ScopeMetricsLendingIter<'_> {
8480
/// Advances the iterator and returns the next value.
8581
pub fn next(&mut self) -> Option<ScopeMetrics<'_>> {
86-
self.iter.next().map(ScopeMetrics::new)
82+
self.iter.next().map(|(scope, instruments)| ScopeMetrics {
83+
scope,
84+
metrics: MetricsLendingIter {
85+
// doesn't matter what we initialize this with,
86+
// it's purpose is to be reused between collections
87+
aggr: AggregatedMetrics::F64(super::data::MetricData::Sum(Sum {
88+
is_monotonic: true,
89+
data_points: Vec::new(),
90+
start_time: SystemTime::now(),
91+
time: SystemTime::now(),
92+
temporality: Temporality::Cumulative,
93+
})),
94+
iter: instruments.iter(),
95+
},
96+
})
8797
}
8898
}
8999

90100
impl MetricsLendingIter<'_> {
91101
/// Advances the iterator and returns the next value.
92102
pub fn next(&mut self) -> Option<Metric<'_>> {
93-
self.iter.next().map(|metric| Metric {
94-
instrument: &metric.instrument,
95-
data: &metric.data,
96-
})
103+
loop {
104+
let inst = self.iter.next()?;
105+
let (len, data) = inst.comp_agg.call(Some(&mut self.aggr));
106+
if len > 0 {
107+
if let Some(new_aggr) = data {
108+
self.aggr = new_aggr;
109+
}
110+
return Some(Metric {
111+
instrument: &inst.info,
112+
data: &self.aggr,
113+
});
114+
}
115+
}
97116
}
98117
}
99118

opentelemetry-sdk/src/metrics/in_memory_exporter.rs

+15-9
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ impl InMemoryMetricExporter {
194194
.map(|mut metrics_guard| metrics_guard.clear());
195195
}
196196

197-
fn clone_metrics(mut metric: ResourceMetrics<'_>) -> ResourceMetricsData {
197+
fn clone_metrics(mut metric: ResourceMetrics<'_>) -> Option<ResourceMetricsData> {
198198
let mut scope_metrics = Vec::new();
199199
while let Some(mut scope_metric) = metric.scope_metrics.next() {
200200
let mut metrics = Vec::new();
@@ -204,14 +204,20 @@ impl InMemoryMetricExporter {
204204
data: Self::clone_data(&metric.data),
205205
});
206206
}
207-
scope_metrics.push(ScopeMetricsData {
208-
scope: scope_metric.scope.clone(),
209-
metrics,
210-
});
207+
if !metrics.is_empty() {
208+
scope_metrics.push(ScopeMetricsData {
209+
scope: scope_metric.scope.clone(),
210+
metrics,
211+
});
212+
}
211213
}
212-
ResourceMetricsData {
213-
resource: metric.resource.clone(),
214-
scope_metrics,
214+
if !scope_metrics.is_empty() {
215+
Some(ResourceMetricsData {
216+
resource: metric.resource.clone(),
217+
scope_metrics,
218+
})
219+
} else {
220+
None
215221
}
216222
}
217223

@@ -261,7 +267,7 @@ impl PushMetricExporter for InMemoryMetricExporter {
261267
self.metrics
262268
.lock()
263269
.map(|mut metrics_guard| {
264-
metrics_guard.push_back(InMemoryMetricExporter::clone_metrics(metrics))
270+
metrics_guard.extend(InMemoryMetricExporter::clone_metrics(metrics))
265271
})
266272
.map_err(|_| OTelSdkError::InternalFailure("Failed to lock metrics".to_string()))
267273
}

opentelemetry-sdk/src/metrics/mod.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,7 @@ pub use periodic_reader::*;
8282
#[cfg(feature = "experimental_metrics_custom_reader")]
8383
pub use pipeline::Pipeline;
8484

85-
#[cfg(feature = "experimental_metrics_custom_reader")]
86-
pub use instrument::InstrumentKind;
85+
pub use instrument::{InstrumentInfo, InstrumentKind};
8786

8887
#[cfg(feature = "spec_unstable_metrics_views")]
8988
pub use instrument::*;

opentelemetry-sdk/src/metrics/periodic_reader.rs

+37-30
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use crate::{
1616
exporter::{PushMetricExporter, ResourceMetrics},
1717
reader::SdkProducer,
1818
},
19-
Resource,
2019
};
2120

2221
use super::{
@@ -350,11 +349,11 @@ impl<E: PushMetricExporter> fmt::Debug for PeriodicReader<E> {
350349
struct PeriodicReaderInner<E: PushMetricExporter> {
351350
exporter: Arc<E>,
352351
message_sender: mpsc::Sender<Message>,
353-
producer: Mutex<Option<Weak<dyn SdkProducer>>>,
352+
producer: Mutex<Option<Weak<Pipeline>>>,
354353
}
355354

356355
impl<E: PushMetricExporter> PeriodicReaderInner<E> {
357-
fn register_pipeline(&self, producer: Weak<dyn SdkProducer>) {
356+
fn register_pipeline(&self, producer: Weak<Pipeline>) {
358357
let mut inner = self.producer.lock().expect("lock poisoned");
359358
*inner = Some(producer);
360359
}
@@ -384,39 +383,47 @@ impl<E: PushMetricExporter> PeriodicReaderInner<E> {
384383
}
385384

386385
fn collect_and_export(&self) -> OTelSdkResult {
387-
// TODO: Reuse the internal vectors. Or refactor to avoid needing any
388-
// owned data structures to be passed to exporters.
389-
let mut rm = ResourceMetricsData {
390-
resource: Resource::empty(),
391-
scope_metrics: Vec::new(),
386+
let producer = self.producer.lock().expect("lock poisoned");
387+
let pipeline = if let Some(p) = producer.as_ref() {
388+
p.upgrade().ok_or(OTelSdkError::AlreadyShutdown)?
389+
} else {
390+
otel_warn!(
391+
name: "PeriodReader.MeterProviderNotRegistered",
392+
message = "PeriodicReader is not registered with MeterProvider. Metrics will not be collected. \
393+
This occurs when a periodic reader is created but not associated with a MeterProvider \
394+
by calling `.with_reader(reader)` on MeterProviderBuilder."
395+
);
396+
return Err(OTelSdkError::InternalFailure(
397+
"MeterProvider is not registered".into(),
398+
));
392399
};
393-
400+
drop(producer);
401+
let Ok(inner) = pipeline.inner.lock() else {
402+
otel_warn!(
403+
name: "PeriodReader.PipelineLockPoisoned",
404+
message = "Failed to acquire lock for collect and export"
405+
);
406+
return Err(OTelSdkError::InternalFailure(
407+
"Paniced while holding a pipeline lock".into(),
408+
));
409+
};
410+
for cb in &inner.callbacks {
411+
cb();
412+
}
394413
let current_time = Instant::now();
395-
let collect_result = self.collect(&mut rm);
396-
let time_taken_for_collect = current_time.elapsed();
414+
// Relying on futures executor to execute async call.
415+
let res = futures_executor::block_on(self.exporter.export(ResourceMetrics::new(
416+
&pipeline.resource,
417+
inner.aggregations.iter(),
418+
)));
419+
otel_debug!(name: "PeriodicReaderMetricsCollected", time_taken_in_millis = current_time.elapsed().as_millis());
397420

398-
#[allow(clippy::question_mark)]
399-
if let Err(e) = collect_result {
421+
res.inspect_err(|err| {
400422
otel_warn!(
401423
name: "PeriodReaderCollectError",
402-
error = format!("{:?}", e)
424+
error = format!("{:?}", err)
403425
);
404-
return Err(OTelSdkError::InternalFailure(e.to_string()));
405-
}
406-
407-
if rm.scope_metrics.is_empty() {
408-
otel_debug!(name: "NoMetricsCollected");
409-
return Ok(());
410-
}
411-
412-
let metrics_count = rm.scope_metrics.iter().fold(0, |count, scope_metrics| {
413-
count + scope_metrics.metrics.len()
414-
});
415-
otel_debug!(name: "PeriodicReaderMetricsCollected", count = metrics_count, time_taken_in_millis = time_taken_for_collect.as_millis());
416-
417-
// Relying on futures executor to execute async call.
418-
// TODO: Pass timeout to exporter
419-
futures_executor::block_on(self.exporter.export(ResourceMetrics::new(&rm)))
426+
})
420427
}
421428

422429
fn force_flush(&self) -> OTelSdkResult {

opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs

+34-44
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ use std::{
66

77
use futures_channel::{mpsc, oneshot};
88
use futures_util::{
9-
future::{self, Either},
10-
pin_mut,
119
stream::{self, FusedStream},
1210
StreamExt,
1311
};
@@ -19,7 +17,6 @@ use crate::runtime::{to_interval_stream, Runtime};
1917
use crate::{
2018
error::{OTelSdkError, OTelSdkResult},
2119
metrics::{exporter::PushMetricExporter, reader::SdkProducer},
22-
Resource,
2320
};
2421

2522
use super::{instrument::InstrumentKind, pipeline::Pipeline, reader::MetricReader};
@@ -116,17 +113,7 @@ where
116113
.skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval.
117114
.map(|_| Message::Export);
118115
let messages = Box::pin(stream::select(message_receiver, ticker));
119-
PeriodicReaderWorker {
120-
reader,
121-
timeout: self.timeout,
122-
runtime,
123-
rm: ResourceMetricsData {
124-
resource: Resource::empty(),
125-
scope_metrics: Vec::new(),
126-
},
127-
}
128-
.run(messages)
129-
.await
116+
PeriodicReaderWorker { reader }.run(messages).await
130117
});
131118
};
132119

@@ -229,47 +216,50 @@ enum Message {
229216
}
230217

231218
enum ProducerOrWorker<E: PushMetricExporter> {
232-
Producer(Weak<dyn SdkProducer>),
219+
Producer(Weak<Pipeline>),
233220
#[allow(clippy::type_complexity)]
234221
Worker(Box<dyn FnOnce(&PeriodicReader<E>) + Send + Sync>),
235222
}
236223

237-
struct PeriodicReaderWorker<E: PushMetricExporter, RT: Runtime> {
224+
struct PeriodicReaderWorker<E: PushMetricExporter> {
238225
reader: PeriodicReader<E>,
239-
timeout: Duration,
240-
runtime: RT,
241-
rm: ResourceMetricsData,
242226
}
243227

244-
impl<E: PushMetricExporter, RT: Runtime> PeriodicReaderWorker<E, RT> {
228+
impl<E: PushMetricExporter> PeriodicReaderWorker<E> {
245229
async fn collect_and_export(&mut self) -> OTelSdkResult {
246-
self.reader
247-
.collect(&mut self.rm)
248-
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;
249-
if self.rm.scope_metrics.is_empty() {
250-
otel_debug!(
251-
name: "PeriodicReaderWorker.NoMetricsToExport",
252-
);
253-
// No metrics to export.
254-
return Ok(());
230+
let inner = self
231+
.reader
232+
.inner
233+
.lock()
234+
.map_err(|_| OTelSdkError::InternalFailure("Failed to lock pipeline".into()))?;
235+
236+
if inner.is_shutdown {
237+
return Err(OTelSdkError::AlreadyShutdown);
255238
}
256239

257-
otel_debug!(
258-
name: "PeriodicReaderWorker.InvokeExporter",
259-
message = "Calling exporter's export method with collected metrics.",
260-
count = self.rm.scope_metrics.len(),
261-
);
262-
let export = self.reader.exporter.export(ResourceMetrics::new(&self.rm));
263-
let timeout = self.runtime.delay(self.timeout);
264-
pin_mut!(export);
265-
pin_mut!(timeout);
266-
267-
match future::select(export, timeout).await {
268-
Either::Left((res, _)) => {
269-
res // return the status of export.
270-
}
271-
Either::Right(_) => Err(OTelSdkError::Timeout(self.timeout)),
240+
let producer = match &inner.sdk_producer_or_worker {
241+
ProducerOrWorker::Producer(sdk_producer) => sdk_producer.upgrade(),
242+
ProducerOrWorker::Worker(_) => None,
243+
}
244+
.ok_or(OTelSdkError::InternalFailure(
245+
"reader is not registered".into(),
246+
))?;
247+
drop(inner);
248+
249+
let Ok(producer_inner) = producer.inner.lock() else {
250+
return Err(OTelSdkError::InternalFailure(
251+
"Paniced while holding a pipeline lock".into(),
252+
));
253+
};
254+
for cb in &producer_inner.callbacks {
255+
cb();
272256
}
257+
// unfortunatelly we need to block here, because runtime require future to be "Send",
258+
// but we hold a lock for PipelineInner.
259+
futures_executor::block_on(self.reader.exporter.export(ResourceMetrics::new(
260+
&producer.resource,
261+
producer_inner.aggregations.iter(),
262+
)))
273263
}
274264

275265
async fn process_message(&mut self, message: Message) -> bool {

0 commit comments

Comments
 (0)