Skip to content

experiment: prototype measurement processor #2797

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions examples/metrics-advanced/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use opentelemetry::global;
use opentelemetry::{global, Context};
use opentelemetry::Key;
use opentelemetry::KeyValue;
use opentelemetry_sdk::metrics::{Aggregation, Instrument, SdkMeterProvider, Stream, Temporality};
use opentelemetry_sdk::metrics::{Aggregation, Instrument, MeasurementProcessor, SdkMeterProvider, Stream, Temporality};
use opentelemetry_sdk::Resource;
use std::error::Error;

Expand Down Expand Up @@ -57,6 +57,7 @@ fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider {
.with_view(my_view_rename_and_unit)
.with_view(my_view_drop_attributes)
.with_view(my_view_change_aggregation)
.with_measurement_processor(UserTypeMeasurementProcessor)
.build();
global::set_meter_provider(provider.clone());
provider
Expand Down Expand Up @@ -128,6 +129,8 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
],
);

// Enrich the next measurement with user type
let guard = Context::current_with_value(UserType::Admin).attach();
histogram2.record(
1.2,
&[
Expand All @@ -137,6 +140,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
KeyValue::new("mykey4", "myvalue4"),
],
);
drop(guard);

histogram2.record(
1.23,
Expand All @@ -154,3 +158,31 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
meter_provider.shutdown()?;
Ok(())
}



enum UserType {
User,
Admin,
}

impl UserType {
fn as_str(&self) -> &'static str {
match self {
UserType::User => "user",
UserType::Admin => "admin",
}
}
}

struct UserTypeMeasurementProcessor;

impl MeasurementProcessor for UserTypeMeasurementProcessor {
fn process<'a>(&self, attributes: &[KeyValue]) -> Option<Vec<KeyValue>> {
Context::current().get::<UserType>().map(|user_type| {
let mut attrs = attributes.to_vec();
attrs.push(KeyValue::new("user_type", user_type.as_str()));
attrs
})
}
}
103 changes: 85 additions & 18 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use std::{
sync::{Arc, Mutex},
time::SystemTime,
};

use std::borrow::Cow;
use crate::metrics::{data::Aggregation, Temporality};
use opentelemetry::time::now;
use opentelemetry::KeyValue;

use crate::metrics::measurement_processor::MeasurementProcessor;
use super::{
exponential_histogram::ExpoHistogram, histogram::Histogram, last_value::LastValue,
precomputed_sum::PrecomputedSum, sum::Sum, Number,
Expand Down Expand Up @@ -106,21 +106,51 @@ type Filter = Arc<dyn Fn(&KeyValue) -> bool + Send + Sync>;
#[derive(Clone)]
pub(crate) struct AttributeSetFilter {
filter: Option<Filter>,
processor: Option<Arc<dyn MeasurementProcessor>>
}

impl AttributeSetFilter {
pub(crate) fn new(filter: Option<Filter>) -> Self {
Self { filter }
pub(crate) fn new(filter: Option<Filter>, processors: Vec<Arc<dyn MeasurementProcessor>>) -> Self {
Self { filter, processor: AggregateProcessor::try_create(processors) }
}

pub(crate) fn apply(&self, attrs: &[KeyValue], run: impl FnOnce(&[KeyValue])) {
if let Some(filter) = &self.filter {
let filtered_attrs: Vec<KeyValue> =
attrs.iter().filter(|kv| filter(kv)).cloned().collect();
run(&filtered_attrs);
} else {
run(attrs);
};
match (&self.filter, &self.processor) {
(None, None) => {
run(attrs);
},
(Some(filter), None) => {
let filtered_attrs: Vec<KeyValue> =
attrs.iter().filter(|kv| filter(kv)).cloned().collect();

run(&filtered_attrs);
},
(None, Some(processor)) => {
let attributes = Cow::Borrowed(attrs);

match processor.process(&attributes) {
Some(attributes) => {
run(&attributes);
}
None => {
run(attrs);
}
}
},
(Some(filter), Some(processor)) => {
let filtered_attrs: Vec<KeyValue> =
attrs.iter().filter(|kv| filter(kv)).cloned().collect();

match processor.process(&filtered_attrs) {
Some(attributes) => {
run(&attributes);
}
None => {
run(attrs);
}
}
}
}
}
}

Expand All @@ -137,10 +167,10 @@ pub(crate) struct AggregateBuilder<T> {
}

impl<T: Number> AggregateBuilder<T> {
pub(crate) fn new(temporality: Temporality, filter: Option<Filter>) -> Self {
pub(crate) fn new(temporality: Temporality, filter: Option<Filter>, processors: Vec<Arc<dyn MeasurementProcessor>>) -> Self {
AggregateBuilder {
temporality,
filter: AttributeSetFilter::new(filter),
filter: AttributeSetFilter::new(filter, processors),
_marker: marker::PhantomData,
}
}
Expand Down Expand Up @@ -201,6 +231,43 @@ impl<T: Number> AggregateBuilder<T> {
}
}


#[derive(Clone)]
struct AggregateProcessor(Arc<Vec<Arc<dyn MeasurementProcessor>>>);

impl AggregateProcessor {
fn try_create(
processors: Vec<Arc<dyn MeasurementProcessor>>,
) -> Option<Arc<dyn MeasurementProcessor>> {

match processors.len() {
0 => return None,
1 => Some(processors[0].clone()),
_ => Some(Arc::new(AggregateProcessor(Arc::new(processors)))),
}
}
}

impl MeasurementProcessor for AggregateProcessor {
fn process<'a>(&self, attributes: &[KeyValue]) -> Option<Vec<KeyValue>> {
// Do not allocate if not necessary.
let mut new_attributes: Option<Vec<KeyValue>> = None;

for processor in self.0.iter() {
let existing_or_new = match &new_attributes {
Some(new) => new,
None => attributes
};

if let Some(new) = processor.process(existing_or_new) {
new_attributes = Some(new);
}
}

new_attributes
}
}

#[cfg(test)]
mod tests {
use crate::metrics::data::{
Expand All @@ -214,7 +281,7 @@ mod tests {
#[test]
fn last_value_aggregation() {
let AggregateFns { measure, collect } =
AggregateBuilder::<u64>::new(Temporality::Cumulative, None).last_value(None);
AggregateBuilder::<u64>::new(Temporality::Cumulative, None, vec![]).last_value(None);
let mut a = Gauge {
data_points: vec![GaugeDataPoint {
attributes: vec![KeyValue::new("a", 1)],
Expand All @@ -240,7 +307,7 @@ mod tests {
fn precomputed_sum_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let AggregateFns { measure, collect } =
AggregateBuilder::<u64>::new(temporality, None).precomputed_sum(true);
AggregateBuilder::<u64>::new(temporality, None, vec![]).precomputed_sum(true);
let mut a = Sum {
data_points: vec![
SumDataPoint {
Expand Down Expand Up @@ -282,7 +349,7 @@ mod tests {
fn sum_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let AggregateFns { measure, collect } =
AggregateBuilder::<u64>::new(temporality, None).sum(true);
AggregateBuilder::<u64>::new(temporality, None, vec![]).sum(true);
let mut a = Sum {
data_points: vec![
SumDataPoint {
Expand Down Expand Up @@ -323,7 +390,7 @@ mod tests {
#[test]
fn explicit_bucket_histogram_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let AggregateFns { measure, collect } = AggregateBuilder::<u64>::new(temporality, None)
let AggregateFns { measure, collect } = AggregateBuilder::<u64>::new(temporality, None, vec![])
.explicit_bucket_histogram(vec![1.0], true, true);
let mut a = Histogram {
data_points: vec![HistogramDataPoint {
Expand Down Expand Up @@ -366,7 +433,7 @@ mod tests {
#[test]
fn exponential_histogram_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let AggregateFns { measure, collect } = AggregateBuilder::<u64>::new(temporality, None)
let AggregateFns { measure, collect } = AggregateBuilder::<u64>::new(temporality, None, vec![])
.exponential_bucket_histogram(4, 20, true, true);
let mut a = ExponentialHistogram {
data_points: vec![ExponentialHistogramDataPoint {
Expand Down
13 changes: 13 additions & 0 deletions opentelemetry-sdk/src/metrics/measurement_processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use opentelemetry::KeyValue;

/// A trait for processing measurement attributes.
pub trait MeasurementProcessor: Send + Sync + 'static {

/// Processes the attributes of a measurement.
///
/// The processor might decide to modify the attributes. In that case, it returns
/// `Some` with the modified attributes. If no attribute modification is needed,
/// it returns `None`.
fn process<'a>(&self, attributes: &[KeyValue]) -> Option<Vec<KeyValue>>;
}

13 changes: 9 additions & 4 deletions opentelemetry-sdk/src/metrics/meter_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ use opentelemetry::{
use crate::error::OTelSdkResult;
use crate::Resource;

use super::{
exporter::PushMetricExporter, meter::SdkMeter, noop::NoopMeter, pipeline::Pipelines,
reader::MetricReader, view::View, PeriodicReader,
};
use super::{exporter::PushMetricExporter, meter::SdkMeter, noop::NoopMeter, pipeline::Pipelines, reader::MetricReader, view::View, MeasurementProcessor, PeriodicReader};

/// Handles the creation and coordination of [Meter]s.
///
Expand Down Expand Up @@ -223,6 +220,7 @@ pub struct MeterProviderBuilder {
resource: Option<Resource>,
readers: Vec<Box<dyn MetricReader>>,
views: Vec<Arc<dyn View>>,
measurement_processors: Vec<Arc<dyn MeasurementProcessor>>,
}

impl MeterProviderBuilder {
Expand Down Expand Up @@ -291,6 +289,12 @@ impl MeterProviderBuilder {
self
}

/// Associates a [MeasurementProcessor] with a [MeterProvider].
pub fn with_measurement_processor<T: MeasurementProcessor>(mut self, processor: T) -> Self {
self.measurement_processors.push(Arc::new(processor));
self
}

/// Construct a new [MeterProvider] with this configuration.
pub fn build(self) -> SdkMeterProvider {
otel_debug!(
Expand All @@ -304,6 +308,7 @@ impl MeterProviderBuilder {
self.resource.unwrap_or(Resource::builder().build()),
self.readers,
self.views,
self.measurement_processors,
)),
meters: Default::default(),
shutdown_invoked: AtomicBool::new(false),
Expand Down
4 changes: 4 additions & 0 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ pub(crate) mod view;
#[cfg(any(feature = "testing", test))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "testing", test))))]
pub mod in_memory_exporter;

mod measurement_processor;
pub use measurement_processor::MeasurementProcessor;

#[cfg(any(feature = "testing", test))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "testing", test))))]
pub use in_memory_exporter::{InMemoryMetricExporter, InMemoryMetricExporterBuilder};
Expand Down
7 changes: 5 additions & 2 deletions opentelemetry-sdk/src/metrics/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{

use self::internal::AggregateFns;

use super::{Aggregation, Temporality};
use super::{Aggregation, MeasurementProcessor, Temporality};

/// Connects all of the instruments created by a meter provider to a [MetricReader].
///
Expand All @@ -39,6 +39,7 @@ pub struct Pipeline {
reader: Box<dyn MetricReader>,
views: Vec<Arc<dyn View>>,
inner: Mutex<PipelineInner>,
processors: Vec<Arc<dyn MeasurementProcessor>>,
}

impl fmt::Debug for Pipeline {
Expand Down Expand Up @@ -385,7 +386,7 @@ where
.clone()
.map(|allowed| Arc::new(move |kv: &KeyValue| allowed.contains(&kv.key)) as Arc<_>);

let b = AggregateBuilder::new(self.pipeline.reader.temporality(kind), filter);
let b = AggregateBuilder::new(self.pipeline.reader.temporality(kind), filter, self.pipeline.processors.clone());
let AggregateFns { measure, collect } = match aggregate_fn(b, &agg, kind) {
Ok(Some(inst)) => inst,
other => return other.map(|fs| fs.map(|inst| inst.measure)), // Drop aggregator or error
Expand Down Expand Up @@ -621,6 +622,7 @@ impl Pipelines {
res: Resource,
readers: Vec<Box<dyn MetricReader>>,
views: Vec<Arc<dyn View>>,
processors: Vec<Arc<dyn MeasurementProcessor>>,
) -> Self {
let mut pipes = Vec::with_capacity(readers.len());
for r in readers {
Expand All @@ -629,6 +631,7 @@ impl Pipelines {
reader: r,
views: views.clone(),
inner: Default::default(),
processors: processors.clone(),
});
p.reader.register_pipeline(Arc::downgrade(&p));
pipes.push(p);
Expand Down
Loading