diff --git a/examples/metrics-advanced/src/main.rs b/examples/metrics-advanced/src/main.rs index 27150b522c..1b08b74655 100644 --- a/examples/metrics-advanced/src/main.rs +++ b/examples/metrics-advanced/src/main.rs @@ -1,7 +1,7 @@ -use opentelemetry::global; +use opentelemetry::{global, Context}; use opentelemetry::Key; use opentelemetry::KeyValue; -use opentelemetry_sdk::metrics::{Aggregation, Instrument, SdkMeterProvider, Stream, Temporality}; +use opentelemetry_sdk::metrics::{Aggregation, Instrument, MeasurementProcessor, SdkMeterProvider, Stream, Temporality}; use opentelemetry_sdk::Resource; use std::error::Error; @@ -57,6 +57,7 @@ fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider { .with_view(my_view_rename_and_unit) .with_view(my_view_drop_attributes) .with_view(my_view_change_aggregation) + .with_measurement_processor(UserTypeMeasurementProcessor) .build(); global::set_meter_provider(provider.clone()); provider @@ -128,6 +129,8 @@ async fn main() -> Result<(), Box> { ], ); + // Enrich the next measurement with user type + let guard = Context::current_with_value(UserType::Admin).attach(); histogram2.record( 1.2, &[ @@ -137,6 +140,7 @@ async fn main() -> Result<(), Box> { KeyValue::new("mykey4", "myvalue4"), ], ); + drop(guard); histogram2.record( 1.23, @@ -154,3 +158,31 @@ async fn main() -> Result<(), Box> { meter_provider.shutdown()?; Ok(()) } + + + +enum UserType { + User, + Admin, +} + +impl UserType { + fn as_str(&self) -> &'static str { + match self { + UserType::User => "user", + UserType::Admin => "admin", + } + } +} + +struct UserTypeMeasurementProcessor; + +impl MeasurementProcessor for UserTypeMeasurementProcessor { + fn process<'a>(&self, attributes: &[KeyValue]) -> Option> { + Context::current().get::().map(|user_type| { + let mut attrs = attributes.to_vec(); + attrs.push(KeyValue::new("user_type", user_type.as_str())); + attrs + }) + } +} \ No newline at end of file diff --git a/opentelemetry-sdk/src/metrics/internal/aggregate.rs b/opentelemetry-sdk/src/metrics/internal/aggregate.rs index 4497ac2fd3..7cdc02f33a 100644 --- a/opentelemetry-sdk/src/metrics/internal/aggregate.rs +++ b/opentelemetry-sdk/src/metrics/internal/aggregate.rs @@ -5,11 +5,11 @@ use std::{ sync::{Arc, Mutex}, time::SystemTime, }; - +use std::borrow::Cow; use crate::metrics::{data::Aggregation, Temporality}; use opentelemetry::time::now; use opentelemetry::KeyValue; - +use crate::metrics::measurement_processor::MeasurementProcessor; use super::{ exponential_histogram::ExpoHistogram, histogram::Histogram, last_value::LastValue, precomputed_sum::PrecomputedSum, sum::Sum, Number, @@ -106,21 +106,51 @@ type Filter = Arc bool + Send + Sync>; #[derive(Clone)] pub(crate) struct AttributeSetFilter { filter: Option, + processor: Option> } impl AttributeSetFilter { - pub(crate) fn new(filter: Option) -> Self { - Self { filter } + pub(crate) fn new(filter: Option, processors: Vec>) -> Self { + Self { filter, processor: AggregateProcessor::try_create(processors) } } pub(crate) fn apply(&self, attrs: &[KeyValue], run: impl FnOnce(&[KeyValue])) { - if let Some(filter) = &self.filter { - let filtered_attrs: Vec = - attrs.iter().filter(|kv| filter(kv)).cloned().collect(); - run(&filtered_attrs); - } else { - run(attrs); - }; + match (&self.filter, &self.processor) { + (None, None) => { + run(attrs); + }, + (Some(filter), None) => { + let filtered_attrs: Vec = + attrs.iter().filter(|kv| filter(kv)).cloned().collect(); + + run(&filtered_attrs); + }, + (None, Some(processor)) => { + let attributes = Cow::Borrowed(attrs); + + match processor.process(&attributes) { + Some(attributes) => { + run(&attributes); + } + None => { + run(attrs); + } + } + }, + (Some(filter), Some(processor)) => { + let filtered_attrs: Vec = + attrs.iter().filter(|kv| filter(kv)).cloned().collect(); + + match processor.process(&filtered_attrs) { + Some(attributes) => { + run(&attributes); + } + None => { + run(attrs); + } + } + } + } } } @@ -137,10 +167,10 @@ pub(crate) struct AggregateBuilder { } impl AggregateBuilder { - pub(crate) fn new(temporality: Temporality, filter: Option) -> Self { + pub(crate) fn new(temporality: Temporality, filter: Option, processors: Vec>) -> Self { AggregateBuilder { temporality, - filter: AttributeSetFilter::new(filter), + filter: AttributeSetFilter::new(filter, processors), _marker: marker::PhantomData, } } @@ -201,6 +231,43 @@ impl AggregateBuilder { } } + +#[derive(Clone)] +struct AggregateProcessor(Arc>>); + +impl AggregateProcessor { + fn try_create( + processors: Vec>, + ) -> Option> { + + match processors.len() { + 0 => return None, + 1 => Some(processors[0].clone()), + _ => Some(Arc::new(AggregateProcessor(Arc::new(processors)))), + } + } +} + +impl MeasurementProcessor for AggregateProcessor { + fn process<'a>(&self, attributes: &[KeyValue]) -> Option> { + // Do not allocate if not necessary. + let mut new_attributes: Option> = None; + + for processor in self.0.iter() { + let existing_or_new = match &new_attributes { + Some(new) => new, + None => attributes + }; + + if let Some(new) = processor.process(existing_or_new) { + new_attributes = Some(new); + } + } + + new_attributes + } +} + #[cfg(test)] mod tests { use crate::metrics::data::{ @@ -214,7 +281,7 @@ mod tests { #[test] fn last_value_aggregation() { let AggregateFns { measure, collect } = - AggregateBuilder::::new(Temporality::Cumulative, None).last_value(None); + AggregateBuilder::::new(Temporality::Cumulative, None, vec![]).last_value(None); let mut a = Gauge { data_points: vec![GaugeDataPoint { attributes: vec![KeyValue::new("a", 1)], @@ -240,7 +307,7 @@ mod tests { fn precomputed_sum_aggregation() { for temporality in [Temporality::Delta, Temporality::Cumulative] { let AggregateFns { measure, collect } = - AggregateBuilder::::new(temporality, None).precomputed_sum(true); + AggregateBuilder::::new(temporality, None, vec![]).precomputed_sum(true); let mut a = Sum { data_points: vec![ SumDataPoint { @@ -282,7 +349,7 @@ mod tests { fn sum_aggregation() { for temporality in [Temporality::Delta, Temporality::Cumulative] { let AggregateFns { measure, collect } = - AggregateBuilder::::new(temporality, None).sum(true); + AggregateBuilder::::new(temporality, None, vec![]).sum(true); let mut a = Sum { data_points: vec![ SumDataPoint { @@ -323,7 +390,7 @@ mod tests { #[test] fn explicit_bucket_histogram_aggregation() { for temporality in [Temporality::Delta, Temporality::Cumulative] { - let AggregateFns { measure, collect } = AggregateBuilder::::new(temporality, None) + let AggregateFns { measure, collect } = AggregateBuilder::::new(temporality, None, vec![]) .explicit_bucket_histogram(vec![1.0], true, true); let mut a = Histogram { data_points: vec![HistogramDataPoint { @@ -366,7 +433,7 @@ mod tests { #[test] fn exponential_histogram_aggregation() { for temporality in [Temporality::Delta, Temporality::Cumulative] { - let AggregateFns { measure, collect } = AggregateBuilder::::new(temporality, None) + let AggregateFns { measure, collect } = AggregateBuilder::::new(temporality, None, vec![]) .exponential_bucket_histogram(4, 20, true, true); let mut a = ExponentialHistogram { data_points: vec![ExponentialHistogramDataPoint { diff --git a/opentelemetry-sdk/src/metrics/measurement_processor.rs b/opentelemetry-sdk/src/metrics/measurement_processor.rs new file mode 100644 index 0000000000..013e061d91 --- /dev/null +++ b/opentelemetry-sdk/src/metrics/measurement_processor.rs @@ -0,0 +1,13 @@ +use opentelemetry::KeyValue; + +/// A trait for processing measurement attributes. +pub trait MeasurementProcessor: Send + Sync + 'static { + + /// Processes the attributes of a measurement. + /// + /// The processor might decide to modify the attributes. In that case, it returns + /// `Some` with the modified attributes. If no attribute modification is needed, + /// it returns `None`. + fn process<'a>(&self, attributes: &[KeyValue]) -> Option>; +} + diff --git a/opentelemetry-sdk/src/metrics/meter_provider.rs b/opentelemetry-sdk/src/metrics/meter_provider.rs index a1cf2f9dec..3a73e582ad 100644 --- a/opentelemetry-sdk/src/metrics/meter_provider.rs +++ b/opentelemetry-sdk/src/metrics/meter_provider.rs @@ -15,10 +15,7 @@ use opentelemetry::{ use crate::error::OTelSdkResult; use crate::Resource; -use super::{ - exporter::PushMetricExporter, meter::SdkMeter, noop::NoopMeter, pipeline::Pipelines, - reader::MetricReader, view::View, PeriodicReader, -}; +use super::{exporter::PushMetricExporter, meter::SdkMeter, noop::NoopMeter, pipeline::Pipelines, reader::MetricReader, view::View, MeasurementProcessor, PeriodicReader}; /// Handles the creation and coordination of [Meter]s. /// @@ -223,6 +220,7 @@ pub struct MeterProviderBuilder { resource: Option, readers: Vec>, views: Vec>, + measurement_processors: Vec>, } impl MeterProviderBuilder { @@ -291,6 +289,12 @@ impl MeterProviderBuilder { self } + /// Associates a [MeasurementProcessor] with a [MeterProvider]. + pub fn with_measurement_processor(mut self, processor: T) -> Self { + self.measurement_processors.push(Arc::new(processor)); + self + } + /// Construct a new [MeterProvider] with this configuration. pub fn build(self) -> SdkMeterProvider { otel_debug!( @@ -304,6 +308,7 @@ impl MeterProviderBuilder { self.resource.unwrap_or(Resource::builder().build()), self.readers, self.views, + self.measurement_processors, )), meters: Default::default(), shutdown_invoked: AtomicBool::new(false), diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 25a7389d92..fd30d8c705 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -61,6 +61,10 @@ pub(crate) mod view; #[cfg(any(feature = "testing", test))] #[cfg_attr(docsrs, doc(cfg(any(feature = "testing", test))))] pub mod in_memory_exporter; + +mod measurement_processor; +pub use measurement_processor::MeasurementProcessor; + #[cfg(any(feature = "testing", test))] #[cfg_attr(docsrs, doc(cfg(any(feature = "testing", test))))] pub use in_memory_exporter::{InMemoryMetricExporter, InMemoryMetricExporterBuilder}; diff --git a/opentelemetry-sdk/src/metrics/pipeline.rs b/opentelemetry-sdk/src/metrics/pipeline.rs index d8c9429c51..86fe9145be 100644 --- a/opentelemetry-sdk/src/metrics/pipeline.rs +++ b/opentelemetry-sdk/src/metrics/pipeline.rs @@ -23,7 +23,7 @@ use crate::{ use self::internal::AggregateFns; -use super::{Aggregation, Temporality}; +use super::{Aggregation, MeasurementProcessor, Temporality}; /// Connects all of the instruments created by a meter provider to a [MetricReader]. /// @@ -39,6 +39,7 @@ pub struct Pipeline { reader: Box, views: Vec>, inner: Mutex, + processors: Vec>, } impl fmt::Debug for Pipeline { @@ -385,7 +386,7 @@ where .clone() .map(|allowed| Arc::new(move |kv: &KeyValue| allowed.contains(&kv.key)) as Arc<_>); - let b = AggregateBuilder::new(self.pipeline.reader.temporality(kind), filter); + let b = AggregateBuilder::new(self.pipeline.reader.temporality(kind), filter, self.pipeline.processors.clone()); let AggregateFns { measure, collect } = match aggregate_fn(b, &agg, kind) { Ok(Some(inst)) => inst, other => return other.map(|fs| fs.map(|inst| inst.measure)), // Drop aggregator or error @@ -621,6 +622,7 @@ impl Pipelines { res: Resource, readers: Vec>, views: Vec>, + processors: Vec>, ) -> Self { let mut pipes = Vec::with_capacity(readers.len()); for r in readers { @@ -629,6 +631,7 @@ impl Pipelines { reader: r, views: views.clone(), inner: Default::default(), + processors: processors.clone(), }); p.reader.register_pipeline(Arc::downgrade(&p)); pipes.push(p);