From b7697a840569535bccb863998e248ed0285e91b4 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Mon, 19 Aug 2024 09:18:18 -0700 Subject: [PATCH 1/7] initial commit --- opentelemetry-otlp/src/logs.rs | 2 +- opentelemetry-proto/src/transform/common.rs | 35 ++++++++++++ opentelemetry-proto/src/transform/logs.rs | 53 ++++++++++++------- opentelemetry-sdk/benches/log_exporter.rs | 6 +-- opentelemetry-sdk/benches/log_processor.rs | 14 +++-- opentelemetry-sdk/src/export/logs/mod.rs | 10 ++-- opentelemetry-sdk/src/logs/log_emitter.rs | 8 +-- opentelemetry-sdk/src/logs/log_processor.rs | 47 ++++++++++------ .../src/testing/logs/in_memory_exporter.rs | 20 +++++-- opentelemetry-stdout/src/logs/exporter.rs | 2 +- opentelemetry-stdout/src/logs/transform.rs | 9 ++-- 11 files changed, 145 insertions(+), 61 deletions(-) diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index 3f21697fb0..6c936403c1 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -100,7 +100,7 @@ impl LogExporter { impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { async fn export<'a>( &mut self, - batch: Vec>, + batch: Vec>>, ) -> opentelemetry::logs::LogResult<()> { self.client.export(batch).await } diff --git a/opentelemetry-proto/src/transform/common.rs b/opentelemetry-proto/src/transform/common.rs index ff42479288..57abff6a11 100644 --- a/opentelemetry-proto/src/transform/common.rs +++ b/opentelemetry-proto/src/transform/common.rs @@ -108,6 +108,41 @@ pub mod tonic { } } + impl + From<( + Cow<'_, opentelemetry_sdk::InstrumentationLibrary>, + Option>, + )> for InstrumentationScope + { + fn from( + data: ( + Cow<'_, opentelemetry_sdk::InstrumentationLibrary>, + Option>, + ), + ) -> Self { + let (library, target) = data; + if let Some(t) = target { + InstrumentationScope { + name: t.to_string(), + version: String::new(), + attributes: vec![], + ..Default::default() + } + } else { + InstrumentationScope { + name: library.name.clone().into_owned(), + version: library + .version + .as_ref() + .map(ToString::to_string) + .unwrap_or_default(), + attributes: Attributes::from(library.attributes.clone()).0, + ..Default::default() + } + } + } + } + /// Wrapper type for Vec<`KeyValue`> #[derive(Default, Debug)] pub struct Attributes(pub ::std::vec::Vec); diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index dfd845c5d8..d959194364 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -139,7 +139,7 @@ pub mod tonic { impl From<( - opentelemetry_sdk::export::logs::LogData, + opentelemetry_sdk::export::logs::LogData<'_>, &ResourceAttributesWithSchema, )> for ResourceLogs { @@ -164,15 +164,21 @@ pub mod tonic { .clone() .map(Into::into) .unwrap_or_default(), - scope: Some((log_data.instrumentation, log_data.record.target.clone()).into()), - log_records: vec![log_data.record.into()], + scope: Some( + ( + log_data.instrumentation.into_owned(), + log_data.record.target.clone(), + ) + .into(), + ), + log_records: vec![log_data.record.into_owned().into()], }], } } } pub fn group_logs_by_resource_and_scope( - logs: Vec, + logs: Vec>, resource: &ResourceAttributesWithSchema, ) -> Vec { // Group logs by target or instrumentation name @@ -180,14 +186,13 @@ pub mod tonic { HashMap::new(), |mut scope_map: HashMap< Cow<'static, str>, - Vec<&opentelemetry_sdk::export::logs::LogData>, + Vec<&opentelemetry_sdk::export::logs::LogData<'_>>, >, log| { - let key = log - .record - .target - .clone() - .unwrap_or_else(|| log.instrumentation.name.clone()); + let key = + log.record.target.clone().unwrap_or_else(|| { + Cow::Owned(log.instrumentation.name.clone().into_owned()) + }); scope_map.entry(key).or_default().push(log); scope_map }, @@ -197,13 +202,20 @@ pub mod tonic { .into_iter() .map(|(key, log_data)| ScopeLogs { scope: Some(InstrumentationScope::from(( - &log_data.first().unwrap().instrumentation, - Some(key), + Cow::Owned( + log_data + .first() + .unwrap() + .instrumentation + .clone() + .into_owned(), + ), + Some(key.into_owned().into()), ))), schema_url: resource.schema_url.clone().unwrap_or_default(), log_records: log_data .into_iter() - .map(|log_data| log_data.record.clone().into()) + .map(|log_data| log_data.record.clone().into_owned().into()) .collect(), }) .collect(); @@ -225,18 +237,21 @@ mod tests { use opentelemetry::logs::LogRecord as _; use opentelemetry_sdk::export::logs::LogData; use opentelemetry_sdk::{logs::LogRecord, Resource}; + use std::borrow::Cow; use std::time::SystemTime; - fn create_test_log_data(instrumentation_name: &str, _message: &str) -> LogData { + fn create_test_log_data<'a>(instrumentation_name: &str, _message: &str) -> LogData<'a> { let mut logrecord = LogRecord::default(); logrecord.set_timestamp(SystemTime::now()); logrecord.set_observed_timestamp(SystemTime::now()); LogData { - instrumentation: opentelemetry_sdk::InstrumentationLibrary::builder( - instrumentation_name.to_string(), - ) - .build(), - record: logrecord, + instrumentation: Cow::Owned( + opentelemetry_sdk::InstrumentationLibrary::builder( + instrumentation_name.to_string(), + ) + .build(), + ), + record: Cow::Owned(logrecord), } } diff --git a/opentelemetry-sdk/benches/log_exporter.rs b/opentelemetry-sdk/benches/log_exporter.rs index 97069db21c..a382f4985a 100644 --- a/opentelemetry-sdk/benches/log_exporter.rs +++ b/opentelemetry-sdk/benches/log_exporter.rs @@ -28,11 +28,11 @@ use std::fmt::Debug; // cargo bench --bench log_exporter #[async_trait] pub trait LogExporterWithFuture: Send + Sync + Debug { - async fn export(&mut self, batch: Vec); + async fn export<'a>(&mut self, batch: Vec>); } pub trait LogExporterWithoutFuture: Send + Sync + Debug { - fn export(&mut self, batch: Vec); + fn export<'a>(&mut self, batch: Vec>); } #[derive(Debug)] @@ -40,7 +40,7 @@ struct NoOpExporterWithFuture {} #[async_trait] impl LogExporterWithFuture for NoOpExporterWithFuture { - async fn export(&mut self, _batch: Vec) {} + async fn export<'a>(&mut self, _batch: Vec>) {} } #[derive(Debug)] diff --git a/opentelemetry-sdk/benches/log_processor.rs b/opentelemetry-sdk/benches/log_processor.rs index c75dee65c1..fcb729318c 100644 --- a/opentelemetry-sdk/benches/log_processor.rs +++ b/opentelemetry-sdk/benches/log_processor.rs @@ -23,6 +23,7 @@ use opentelemetry_sdk::{ export::logs::LogData, logs::{LogProcessor, LogRecord, Logger, LoggerProvider}, }; +use std::borrow::Cow; // Run this benchmark with: // cargo bench --bench log_processor @@ -45,7 +46,7 @@ fn create_log_record(logger: &Logger) -> LogRecord { struct NoopProcessor; impl LogProcessor for NoopProcessor { - fn emit(&self, _data: &mut LogData) {} + fn emit(&self, _data: &mut LogData<'_>) {} fn force_flush(&self) -> LogResult<()> { Ok(()) @@ -60,7 +61,7 @@ impl LogProcessor for NoopProcessor { struct CloningProcessor; impl LogProcessor for CloningProcessor { - fn emit(&self, data: &mut LogData) { + fn emit(&self, data: &mut LogData<'_>) { let _data_cloned = data.clone(); } @@ -75,8 +76,8 @@ impl LogProcessor for CloningProcessor { #[derive(Debug)] struct SendToChannelProcessor { - sender: std::sync::mpsc::Sender, - receiver: Arc>>, + sender: std::sync::mpsc::Sender>, + receiver: Arc>>>, } impl SendToChannelProcessor { @@ -104,7 +105,10 @@ impl SendToChannelProcessor { impl LogProcessor for SendToChannelProcessor { fn emit(&self, data: &mut LogData) { - let data_cloned = data.clone(); + let data_cloned = LogData { + record: Cow::Owned(data.record.clone().into_owned()), + instrumentation: Cow::Owned(data.instrumentation.clone().into_owned()), + }; let res = self.sender.send(data_cloned); if res.is_err() { println!("Error sending log data to channel {0}", res.err().unwrap()); diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index e1426553a1..932f38dbf1 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -15,7 +15,7 @@ use std::fmt::Debug; #[async_trait] pub trait LogExporter: Send + Sync + Debug { /// Exports a batch of [`LogData`]. - async fn export<'a>(&mut self, batch: Vec>) -> LogResult<()>; + async fn export<'a>(&mut self, batch: Vec>>) -> LogResult<()>; /// Shuts down the exporter. fn shutdown(&mut self) {} #[cfg(feature = "logs_level_enabled")] @@ -30,11 +30,11 @@ pub trait LogExporter: Send + Sync + Debug { /// `LogData` represents a single log event without resource context. #[derive(Clone, Debug)] -pub struct LogData { - /// Log record - pub record: LogRecord, +pub struct LogData<'a> { + /// Log record, which can be borrowed or owned. + pub record: Cow<'a, LogRecord>, /// Instrumentation details for the emitter who produced this `LogEvent`. - pub instrumentation: InstrumentationLibrary, + pub instrumentation: Cow<'a, InstrumentationLibrary>, } /// Describes the result of an export. diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index c9d3e5a828..604a1799e5 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -274,8 +274,8 @@ impl opentelemetry::logs::Logger for Logger { } let mut data = LogData { - record: log_record, - instrumentation: self.instrumentation_library().clone(), + record: Cow::Borrowed(&log_record), + instrumentation: Cow::Borrowed(self.instrumentation_library()), }; for p in processors { @@ -336,7 +336,7 @@ mod tests { } impl LogProcessor for ShutdownTestLogProcessor { - fn emit(&self, _data: &mut LogData) { + fn emit(&self, _data: &mut LogData<'_>) { self.is_shutdown .lock() .map(|is_shutdown| { @@ -566,7 +566,7 @@ mod tests { } impl LogProcessor for LazyLogProcessor { - fn emit(&self, _data: &mut LogData) { + fn emit(&self, _data: &mut LogData<'_>) { // nothing to do. } diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 7366f19791..ad90086357 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -55,7 +55,7 @@ pub trait LogProcessor: Send + Sync + Debug { /// /// # Parameters /// - `data`: A mutable reference to `LogData` representing the log record. - fn emit(&self, data: &mut LogData); + fn emit(&self, data: &mut LogData<'_>); /// Force the logs lying in the cache to be exported. fn force_flush(&self) -> LogResult<()>; /// Shuts down the processor. @@ -93,7 +93,7 @@ impl SimpleLogProcessor { } impl LogProcessor for SimpleLogProcessor { - fn emit(&self, data: &mut LogData) { + fn emit(&self, data: &mut LogData<'_>) { // noop after shutdown if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { return; @@ -150,10 +150,14 @@ impl Debug for BatchLogProcessor { } impl LogProcessor for BatchLogProcessor { - fn emit(&self, data: &mut LogData) { + fn emit(&self, data: &mut LogData<'_>) { + let owned_data = LogData { + record: Cow::Owned(data.record.clone().into_owned()), + instrumentation: Cow::Owned(data.instrumentation.clone().into_owned()), + }; let result = self .message_sender - .try_send(BatchMessage::ExportLog(data.clone())); + .try_send(BatchMessage::ExportLog(owned_data)); if let Err(err) = result { global::handle_error(LogError::Other(err.into())); @@ -300,7 +304,7 @@ async fn export_with_timeout<'a, R, E>( time_out: Duration, exporter: &mut E, runtime: &R, - batch: Vec>, + batch: Vec>>, ) -> ExportResult where R: RuntimeChannel, @@ -490,7 +494,7 @@ where #[derive(Debug)] enum BatchMessage { /// Export logs, usually called when the log is emitted. - ExportLog(LogData), + ExportLog(LogData<'static>), /// Flush the current buffer to the backend, it can be triggered by /// pre configured interval or a call to `force_push` function. Flush(Option>), @@ -536,7 +540,7 @@ mod tests { #[async_trait] impl LogExporter for MockLogExporter { - async fn export<'a>(&mut self, _batch: Vec>) -> LogResult<()> { + async fn export<'a>(&mut self, _batch: Vec>>) -> LogResult<()> { Ok(()) } @@ -805,20 +809,26 @@ mod tests { #[derive(Debug)] struct FirstProcessor { - pub(crate) logs: Arc>>, + pub(crate) logs: Arc>>>, } impl LogProcessor for FirstProcessor { - fn emit(&self, data: &mut LogData) { + fn emit(&self, data: &mut LogData<'_>) { // add attribute - data.record.attributes.get_or_insert(vec![]).push(( + let record = data.record.to_mut(); + record.attributes.get_or_insert(vec![]).push(( Key::from_static_str("processed_by"), AnyValue::String("FirstProcessor".into()), )); // update body - data.record.body = Some("Updated by FirstProcessor".into()); - - self.logs.lock().unwrap().push(data.clone()); //clone as the LogProcessor is storing the data. + record.body = Some("Updated by FirstProcessor".into()); + // Convert the modified LogData to an owned version + let owned_data = LogData { + record: Cow::Owned(record.clone()), // Since record is already owned, no need to clone deeply + instrumentation: Cow::Owned(data.instrumentation.clone().into_owned()), + }; + + self.logs.lock().unwrap().push(owned_data); //clone as the LogProcessor is storing the data. } fn force_flush(&self) -> LogResult<()> { @@ -832,11 +842,11 @@ mod tests { #[derive(Debug)] struct SecondProcessor { - pub(crate) logs: Arc>>, + pub(crate) logs: Arc>>>, } impl LogProcessor for SecondProcessor { - fn emit(&self, data: &mut LogData) { + fn emit(&self, data: &mut LogData<'_>) { assert!(data.record.attributes.as_ref().map_or(false, |attrs| { attrs.iter().any(|(key, value)| { key.as_str() == "processed_by" @@ -847,7 +857,12 @@ mod tests { data.record.body.clone().unwrap() == AnyValue::String("Updated by FirstProcessor".into()) ); - self.logs.lock().unwrap().push(data.clone()); + let record = data.record.to_mut(); + let owned_data = LogData { + record: Cow::Owned(record.clone()), // Convert the record to owned + instrumentation: Cow::Owned(data.instrumentation.clone().into_owned()), + }; + self.logs.lock().unwrap().push(owned_data); } fn force_flush(&self) -> LogResult<()> { diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index 8068fafaec..75fec68e98 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -37,13 +37,23 @@ use std::sync::{Arc, Mutex}; ///# } /// ``` /// +/// #[derive(Clone, Debug)] pub struct InMemoryLogsExporter { - logs: Arc>>, + logs: Arc>>, resource: Arc>, should_reset_on_shutdown: bool, } +/// `OwnedLogData` represents a single log event without resource context. +#[derive(Debug, Clone)] +pub struct OwnedLogData { + /// Log record, which can be borrowed or owned. + pub record: LogRecord, + /// Instrumentation details for the emitter who produced this `LogEvent`. + pub instrumentation: InstrumentationLibrary, +} + impl Default for InMemoryLogsExporter { fn default() -> Self { InMemoryLogsExporterBuilder::new().build() @@ -175,10 +185,14 @@ impl InMemoryLogsExporter { #[async_trait] impl LogExporter for InMemoryLogsExporter { - async fn export<'a>(&mut self, batch: Vec>) -> LogResult<()> { + async fn export<'a>(&mut self, batch: Vec>>) -> LogResult<()> { let mut logs_guard = self.logs.lock().map_err(LogError::from)?; for log in batch.into_iter() { - logs_guard.push(log.into_owned()); + let owned_log = OwnedLogData { + record: log.record.clone().into_owned(), + instrumentation: log.instrumentation.clone().into_owned(), + }; + logs_guard.push(owned_log); } Ok(()) } diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index dacefa3d8b..6befdf0fa7 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -45,7 +45,7 @@ impl fmt::Debug for LogExporter { #[async_trait] impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { /// Export spans to stdout - async fn export<'a>(&mut self, batch: Vec>) -> ExportResult { + async fn export<'a>(&mut self, batch: Vec>>) -> ExportResult { if let Some(writer) = &mut self.writer { // TODO - Avoid cloning logdata if it is borrowed. let log_data = crate::logs::transform::LogData::from(( diff --git a/opentelemetry-stdout/src/logs/transform.rs b/opentelemetry-stdout/src/logs/transform.rs index 0560e0c064..2a72cfc9df 100644 --- a/opentelemetry-stdout/src/logs/transform.rs +++ b/opentelemetry-stdout/src/logs/transform.rs @@ -16,7 +16,7 @@ pub struct LogData { impl From<( - Vec, + Vec>, &opentelemetry_sdk::Resource, )> for LogData { @@ -31,7 +31,7 @@ impl for sdk_log in sdk_logs { let resource_schema_url = sdk_resource.schema_url().map(|s| s.to_string().into()); let schema_url = sdk_log.instrumentation.schema_url.clone(); - let scope: Scope = sdk_log.instrumentation.clone().into(); + let scope: Scope = sdk_log.instrumentation.clone().into_owned().into(); let resource: Resource = sdk_resource.into(); let rl = resource_logs @@ -104,7 +104,7 @@ struct LogRecord { trace_id: Option, } -impl From for LogRecord { +impl From> for LogRecord { fn from(value: opentelemetry_sdk::export::logs::LogData) -> Self { LogRecord { attributes: { @@ -142,6 +142,7 @@ impl From for LogRecord { flags: value .record .trace_context + .as_ref() .map(|c| c.trace_flags.map(|f| f.to_u8())) .unwrap_or_default(), time_unix_nano: value.record.timestamp, @@ -155,7 +156,7 @@ impl From for LogRecord { .unwrap_or_default(), dropped_attributes_count: 0, severity_text: value.record.severity_text, - body: value.record.body.map(|a| a.into()), + body: value.record.body.clone().map(|a| a.into()), } } } From 29ab4b988345c59a653df67d43d5a4dec1ce95a7 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 20 Aug 2024 00:13:17 -0700 Subject: [PATCH 2/7] directly use logrecord and instrumentationlib in exporter interface --- .../benches/logs.rs | 8 +- opentelemetry-otlp/src/exporter/http/logs.rs | 17 ++- opentelemetry-otlp/src/exporter/http/mod.rs | 10 +- opentelemetry-otlp/src/exporter/tonic/logs.rs | 18 ++-- opentelemetry-otlp/src/logs.rs | 6 +- opentelemetry-proto/src/transform/common.rs | 35 ------ opentelemetry-proto/src/transform/logs.rs | 101 +++++++++--------- opentelemetry-sdk/benches/log_exporter.rs | 16 +-- opentelemetry-sdk/src/export/logs/mod.rs | 5 +- opentelemetry-sdk/src/logs/log_processor.rs | 19 +++- .../src/testing/logs/in_memory_exporter.rs | 13 ++- opentelemetry-stdout/src/logs/exporter.rs | 16 ++- opentelemetry-stdout/src/logs/transform.rs | 57 +++++----- 13 files changed, 154 insertions(+), 167 deletions(-) diff --git a/opentelemetry-appender-tracing/benches/logs.rs b/opentelemetry-appender-tracing/benches/logs.rs index ba229419d4..763b02a0cb 100644 --- a/opentelemetry-appender-tracing/benches/logs.rs +++ b/opentelemetry-appender-tracing/benches/logs.rs @@ -34,7 +34,13 @@ struct NoopExporter { #[async_trait] impl LogExporter for NoopExporter { - async fn export<'a>(&mut self, _: Vec>) -> LogResult<()> { + async fn export<'a>( + &mut self, + _: Vec<( + &'a opentelemetry_sdk::logs::LogRecord, + &'a opentelemetry::InstrumentationLibrary, + )>, + ) -> LogResult<()> { LogResult::Ok(()) } diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index 396dec680d..09e35c6804 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -3,13 +3,18 @@ use std::sync::Arc; use async_trait::async_trait; use http::{header::CONTENT_TYPE, Method}; use opentelemetry::logs::{LogError, LogResult}; -use opentelemetry_sdk::export::logs::{LogData, LogExporter}; +use opentelemetry::InstrumentationLibrary; +use opentelemetry_sdk::export::logs::LogExporter; +use opentelemetry_sdk::logs::LogRecord; use super::OtlpHttpClient; #[async_trait] impl LogExporter for OtlpHttpClient { - async fn export<'a>(&mut self, batch: Vec>) -> LogResult<()> { + async fn export<'a>( + &mut self, + batch: Vec<(&'a LogRecord, &'a InstrumentationLibrary)>, + ) -> LogResult<()> { let client = self .client .lock() @@ -19,13 +24,7 @@ impl LogExporter for OtlpHttpClient { _ => Err(LogError::Other("exporter is already shut down".into())), })?; - //TODO: avoid cloning here. - let owned_batch = batch - .into_iter() - .map(|cow_log_data| cow_log_data.into_owned()) // Converts Cow to owned LogData - .collect::>(); - - let (body, content_type) = { self.build_logs_export_body(owned_batch)? }; + let (body, content_type) = { self.build_logs_export_body(batch)? }; let mut request = http::Request::builder() .method(Method::POST) .uri(&self.collector_endpoint) diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index 2fa3ff851b..aa50019e5c 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -7,16 +7,18 @@ use crate::{ OTEL_EXPORTER_OTLP_TIMEOUT, }; use http::{HeaderName, HeaderValue, Uri}; +#[cfg(feature = "logs")] +use opentelemetry::InstrumentationLibrary; use opentelemetry_http::HttpClient; use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema; #[cfg(feature = "logs")] use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope; #[cfg(feature = "trace")] use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope; -#[cfg(feature = "logs")] -use opentelemetry_sdk::export::logs::LogData; #[cfg(feature = "trace")] use opentelemetry_sdk::export::trace::SpanData; +#[cfg(feature = "logs")] +use opentelemetry_sdk::logs::LogRecord; #[cfg(feature = "metrics")] use opentelemetry_sdk::metrics::data::ResourceMetrics; use prost::Message; @@ -326,9 +328,9 @@ impl OtlpHttpClient { } #[cfg(feature = "logs")] - fn build_logs_export_body( + fn build_logs_export_body<'a>( &self, - logs: Vec, + logs: Vec<(&'a LogRecord, &'a InstrumentationLibrary)>, ) -> opentelemetry::logs::LogResult<(Vec, &'static str)> { use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource); diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index b529eda511..f14a4c3d11 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -4,13 +4,16 @@ use opentelemetry::logs::{LogError, LogResult}; use opentelemetry_proto::tonic::collector::logs::v1::{ logs_service_client::LogsServiceClient, ExportLogsServiceRequest, }; -use opentelemetry_sdk::export::logs::{LogData, LogExporter}; +use opentelemetry_sdk::export::logs::LogExporter; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope; use super::BoxInterceptor; +use opentelemetry::InstrumentationLibrary; +use opentelemetry_sdk::logs::LogRecord; + pub(crate) struct TonicLogsClient { inner: Option, #[allow(dead_code)] @@ -54,7 +57,10 @@ impl TonicLogsClient { #[async_trait] impl LogExporter for TonicLogsClient { - async fn export<'a>(&mut self, batch: Vec>) -> LogResult<()> { + async fn export<'a>( + &mut self, + batch: Vec<(&'a LogRecord, &'a InstrumentationLibrary)>, + ) -> LogResult<()> { let (mut client, metadata, extensions) = match &mut self.inner { Some(inner) => { let (m, e, _) = inner @@ -67,13 +73,7 @@ impl LogExporter for TonicLogsClient { None => return Err(LogError::Other("exporter is already shut down".into())), }; - //TODO: avoid cloning here. - let owned_batch = batch - .into_iter() - .map(|cow_log_data| cow_log_data.into_owned()) // Converts Cow to owned LogData - .collect::>(); - - let resource_logs = group_logs_by_resource_and_scope(owned_batch, &self.resource); + let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource); client .export(Request::from_parts( diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index 6c936403c1..27d6756252 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -14,7 +14,9 @@ use std::fmt::Debug; use opentelemetry::logs::LogError; -use opentelemetry_sdk::{export::logs::LogData, runtime::RuntimeChannel, Resource}; +use opentelemetry::InstrumentationLibrary; +use opentelemetry_sdk::logs::LogRecord; +use opentelemetry_sdk::{runtime::RuntimeChannel, Resource}; /// Compression algorithm to use, defaults to none. pub const OTEL_EXPORTER_OTLP_LOGS_COMPRESSION: &str = "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION"; @@ -100,7 +102,7 @@ impl LogExporter { impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { async fn export<'a>( &mut self, - batch: Vec>>, + batch: Vec<(&'a LogRecord, &'a InstrumentationLibrary)>, ) -> opentelemetry::logs::LogResult<()> { self.client.export(batch).await } diff --git a/opentelemetry-proto/src/transform/common.rs b/opentelemetry-proto/src/transform/common.rs index 57abff6a11..ff42479288 100644 --- a/opentelemetry-proto/src/transform/common.rs +++ b/opentelemetry-proto/src/transform/common.rs @@ -108,41 +108,6 @@ pub mod tonic { } } - impl - From<( - Cow<'_, opentelemetry_sdk::InstrumentationLibrary>, - Option>, - )> for InstrumentationScope - { - fn from( - data: ( - Cow<'_, opentelemetry_sdk::InstrumentationLibrary>, - Option>, - ), - ) -> Self { - let (library, target) = data; - if let Some(t) = target { - InstrumentationScope { - name: t.to_string(), - version: String::new(), - attributes: vec![], - ..Default::default() - } - } else { - InstrumentationScope { - name: library.name.clone().into_owned(), - version: library - .version - .as_ref() - .map(ToString::to_string) - .unwrap_or_default(), - attributes: Attributes::from(library.attributes.clone()).0, - ..Default::default() - } - } - } - } - /// Wrapper type for Vec<`KeyValue`> #[derive(Default, Debug)] pub struct Attributes(pub ::std::vec::Vec); diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index d959194364..808dd1463b 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -54,8 +54,8 @@ pub mod tonic { } } - impl From for LogRecord { - fn from(log_record: opentelemetry_sdk::logs::LogRecord) -> Self { + impl From<&opentelemetry_sdk::logs::LogRecord> for LogRecord { + fn from(log_record: &opentelemetry_sdk::logs::LogRecord) -> Self { let trace_context = log_record.trace_context.as_ref(); let severity_number = match log_record.severity_number { Some(Severity::Trace) => SeverityNumber::Trace, @@ -118,7 +118,7 @@ pub mod tonic { }, severity_number: severity_number.into(), severity_text: log_record.severity_text.map(Into::into).unwrap_or_default(), - body: log_record.body.map(Into::into), + body: log_record.body.clone().map(Into::into), dropped_attributes_count: 0, flags: trace_context .map(|ctx| { @@ -139,17 +139,23 @@ pub mod tonic { impl From<( - opentelemetry_sdk::export::logs::LogData<'_>, + ( + &opentelemetry_sdk::logs::LogRecord, + &opentelemetry::InstrumentationLibrary, + ), &ResourceAttributesWithSchema, )> for ResourceLogs { fn from( data: ( - opentelemetry_sdk::export::logs::LogData, + ( + &opentelemetry_sdk::logs::LogRecord, + &opentelemetry::InstrumentationLibrary, + ), &ResourceAttributesWithSchema, ), ) -> Self { - let (log_data, resource) = data; + let ((log_record, instrumentation), resource) = data; ResourceLogs { resource: Some(Resource { @@ -158,27 +164,23 @@ pub mod tonic { }), schema_url: resource.schema_url.clone().unwrap_or_default(), scope_logs: vec![ScopeLogs { - schema_url: log_data - .instrumentation + schema_url: instrumentation .schema_url .clone() .map(Into::into) .unwrap_or_default(), - scope: Some( - ( - log_data.instrumentation.into_owned(), - log_data.record.target.clone(), - ) - .into(), - ), - log_records: vec![log_data.record.into_owned().into()], + scope: Some((instrumentation, log_record.target.clone()).into()), + log_records: vec![log_record.into()], }], } } } - pub fn group_logs_by_resource_and_scope( - logs: Vec>, + pub fn group_logs_by_resource_and_scope<'a>( + logs: Vec<( + &'a opentelemetry_sdk::logs::LogRecord, + &'a opentelemetry::InstrumentationLibrary, + )>, resource: &ResourceAttributesWithSchema, ) -> Vec { // Group logs by target or instrumentation name @@ -186,14 +188,20 @@ pub mod tonic { HashMap::new(), |mut scope_map: HashMap< Cow<'static, str>, - Vec<&opentelemetry_sdk::export::logs::LogData<'_>>, + Vec<( + &'a opentelemetry_sdk::logs::LogRecord, + &'a opentelemetry::InstrumentationLibrary, + )>, >, - log| { - let key = - log.record.target.clone().unwrap_or_else(|| { - Cow::Owned(log.instrumentation.name.clone().into_owned()) - }); - scope_map.entry(key).or_default().push(log); + (log_record, instrumentation)| { + let key = log_record + .target + .clone() + .unwrap_or_else(|| Cow::Owned(instrumentation.name.clone().into_owned())); + scope_map + .entry(key) + .or_default() + .push((log_record, instrumentation)); scope_map }, ); @@ -202,20 +210,13 @@ pub mod tonic { .into_iter() .map(|(key, log_data)| ScopeLogs { scope: Some(InstrumentationScope::from(( - Cow::Owned( - log_data - .first() - .unwrap() - .instrumentation - .clone() - .into_owned(), - ), + log_data.first().unwrap().1, Some(key.into_owned().into()), ))), schema_url: resource.schema_url.clone().unwrap_or_default(), log_records: log_data .into_iter() - .map(|log_data| log_data.record.clone().into_owned().into()) + .map(|(log_record, _)| log_record.into()) .collect(), }) .collect(); @@ -235,33 +236,29 @@ pub mod tonic { mod tests { use crate::transform::common::tonic::ResourceAttributesWithSchema; use opentelemetry::logs::LogRecord as _; - use opentelemetry_sdk::export::logs::LogData; + use opentelemetry::InstrumentationLibrary; use opentelemetry_sdk::{logs::LogRecord, Resource}; - use std::borrow::Cow; use std::time::SystemTime; - fn create_test_log_data<'a>(instrumentation_name: &str, _message: &str) -> LogData<'a> { + fn create_test_log_data( + instrumentation_name: &str, + _message: &str, + ) -> (LogRecord, InstrumentationLibrary) { let mut logrecord = LogRecord::default(); logrecord.set_timestamp(SystemTime::now()); logrecord.set_observed_timestamp(SystemTime::now()); - LogData { - instrumentation: Cow::Owned( - opentelemetry_sdk::InstrumentationLibrary::builder( - instrumentation_name.to_string(), - ) - .build(), - ), - record: Cow::Owned(logrecord), - } + let instrumentation = + InstrumentationLibrary::builder(instrumentation_name.to_string()).build(); + (logrecord, instrumentation) } #[test] fn test_group_logs_by_resource_and_scope_single_scope() { let resource = Resource::default(); - let log1 = create_test_log_data("test-lib", "Log 1"); - let log2 = create_test_log_data("test-lib", "Log 2"); + let log_data1 = create_test_log_data("test-lib", "Log 1"); + let log_data2 = create_test_log_data("test-lib", "Log 2"); - let logs = vec![log1, log2]; + let logs = vec![(&log_data1.0, &log_data1.1), (&log_data2.0, &log_data2.1)]; let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema let grouped_logs = @@ -278,10 +275,10 @@ mod tests { #[test] fn test_group_logs_by_resource_and_scope_multiple_scopes() { let resource = Resource::default(); - let log1 = create_test_log_data("lib1", "Log 1"); - let log2 = create_test_log_data("lib2", "Log 2"); + let log_data1 = create_test_log_data("lib1", "Log 1"); + let log_data2 = create_test_log_data("lib2", "Log 2"); - let logs = vec![log1, log2]; + let logs = vec![(&log_data1.0, &log_data1.1), (&log_data2.0, &log_data2.1)]; let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema let grouped_logs = crate::transform::logs::tonic::group_logs_by_resource_and_scope(logs, &resource); diff --git a/opentelemetry-sdk/benches/log_exporter.rs b/opentelemetry-sdk/benches/log_exporter.rs index a382f4985a..434fc11ae6 100644 --- a/opentelemetry-sdk/benches/log_exporter.rs +++ b/opentelemetry-sdk/benches/log_exporter.rs @@ -18,8 +18,10 @@ use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::logs::{LogRecord as _, LogResult, Logger as _, LoggerProvider as _, Severity}; +use opentelemetry::InstrumentationLibrary; use opentelemetry_sdk::export::logs::LogData; use opentelemetry_sdk::logs::LogProcessor; +use opentelemetry_sdk::logs::LogRecord; use opentelemetry_sdk::logs::LoggerProvider; use pprof::criterion::{Output, PProfProfiler}; use std::fmt::Debug; @@ -28,11 +30,11 @@ use std::fmt::Debug; // cargo bench --bench log_exporter #[async_trait] pub trait LogExporterWithFuture: Send + Sync + Debug { - async fn export<'a>(&mut self, batch: Vec>); + async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>); } pub trait LogExporterWithoutFuture: Send + Sync + Debug { - fn export<'a>(&mut self, batch: Vec>); + fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>); } #[derive(Debug)] @@ -40,13 +42,13 @@ struct NoOpExporterWithFuture {} #[async_trait] impl LogExporterWithFuture for NoOpExporterWithFuture { - async fn export<'a>(&mut self, _batch: Vec>) {} + async fn export(&mut self, _batch: Vec<(&LogRecord, &InstrumentationLibrary)>) {} } #[derive(Debug)] struct NoOpExporterWithoutFuture {} impl LogExporterWithoutFuture for NoOpExporterWithoutFuture { - fn export(&mut self, _batch: Vec) {} + fn export(&mut self, _batch: Vec<(&LogRecord, &InstrumentationLibrary)>) {} } #[derive(Debug)] @@ -65,7 +67,9 @@ impl ExportingProcessorWithFuture { impl LogProcessor for ExportingProcessorWithFuture { fn emit(&self, data: &mut LogData) { let mut exporter = self.exporter.lock().expect("lock error"); - futures_executor::block_on(exporter.export(vec![data.clone()])); + futures_executor::block_on( + exporter.export(vec![(data.record.as_ref(), data.instrumentation.as_ref())]), + ); } fn force_flush(&self) -> LogResult<()> { @@ -95,7 +99,7 @@ impl LogProcessor for ExportingProcessorWithoutFuture { self.exporter .lock() .expect("lock error") - .export(vec![data.clone()]); + .export(vec![(data.record.as_ref(), data.instrumentation.as_ref())]); } fn force_flush(&self) -> LogResult<()> { diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index 932f38dbf1..ed8ba56e6a 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -15,7 +15,10 @@ use std::fmt::Debug; #[async_trait] pub trait LogExporter: Send + Sync + Debug { /// Exports a batch of [`LogData`]. - async fn export<'a>(&mut self, batch: Vec>>) -> LogResult<()>; + async fn export<'a>( + &mut self, + records: Vec<(&'a LogRecord, &'a InstrumentationLibrary)>, + ) -> LogResult<()>; /// Shuts down the exporter. fn shutdown(&mut self) {} #[cfg(feature = "logs_level_enabled")] diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index ad90086357..b4df3aa631 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -104,7 +104,10 @@ impl LogProcessor for SimpleLogProcessor { .lock() .map_err(|_| LogError::Other("simple logprocessor mutex poison".into())) .and_then(|mut exporter| { - futures_executor::block_on(exporter.export(vec![Cow::Borrowed(data)])) + // Extract references to LogRecord and InstrumentationLibrary + let log_record = data.record.as_ref(); + let instrumentation = data.instrumentation.as_ref(); + futures_executor::block_on(exporter.export(vec![(log_record, instrumentation)])) }); if let Err(err) = result { global::handle_error(err); @@ -313,8 +316,13 @@ where if batch.is_empty() { return Ok(()); } + // Convert the Vec<&LogData> to Vec<(&LogRecord, &InstrumentationLibrary)> + let export_batch = batch + .iter() + .map(|log_data| (log_data.record.as_ref(), log_data.instrumentation.as_ref())) + .collect(); - let export = exporter.export(batch); + let export = exporter.export(export_batch); let timeout = runtime.delay(time_out); pin_mut!(export); pin_mut!(timeout); @@ -510,6 +518,7 @@ mod tests { BatchLogProcessor, OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, OTEL_BLRP_MAX_QUEUE_SIZE, OTEL_BLRP_SCHEDULE_DELAY, }; + use crate::logs::LogRecord; use crate::testing::logs::InMemoryLogsExporterBuilder; use crate::{ export::logs::{LogData, LogExporter}, @@ -527,6 +536,7 @@ mod tests { use async_trait::async_trait; use opentelemetry::logs::AnyValue; use opentelemetry::logs::{Logger, LoggerProvider as _}; + use opentelemetry::InstrumentationLibrary; use opentelemetry::Key; use opentelemetry::{logs::LogResult, KeyValue}; use std::borrow::Cow; @@ -540,7 +550,10 @@ mod tests { #[async_trait] impl LogExporter for MockLogExporter { - async fn export<'a>(&mut self, _batch: Vec>>) -> LogResult<()> { + async fn export<'a>( + &mut self, + _batch: Vec<(&'a LogRecord, &'a InstrumentationLibrary)>, + ) -> LogResult<()> { Ok(()) } diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index 75fec68e98..393b15ddce 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -1,4 +1,4 @@ -use crate::export::logs::{LogData, LogExporter}; +use crate::export::logs::LogExporter; use crate::logs::LogRecord; use crate::Resource; use async_trait::async_trait; @@ -185,12 +185,15 @@ impl InMemoryLogsExporter { #[async_trait] impl LogExporter for InMemoryLogsExporter { - async fn export<'a>(&mut self, batch: Vec>>) -> LogResult<()> { + async fn export<'a>( + &mut self, + batch: Vec<(&'a LogRecord, &'a InstrumentationLibrary)>, + ) -> LogResult<()> { let mut logs_guard = self.logs.lock().map_err(LogError::from)?; - for log in batch.into_iter() { + for (log_record, instrumentation) in batch.into_iter() { let owned_log = OwnedLogData { - record: log.record.clone().into_owned(), - instrumentation: log.instrumentation.clone().into_owned(), + record: log_record.clone(), + instrumentation: instrumentation.clone(), }; logs_guard.push(owned_log); } diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index 6befdf0fa7..6436e7529a 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -1,12 +1,12 @@ use async_trait::async_trait; use core::fmt; +use opentelemetry::InstrumentationLibrary; use opentelemetry::{ logs::{LogError, LogResult}, ExportError, }; -use opentelemetry_sdk::export::logs::{ExportResult, LogData}; +use opentelemetry_sdk::logs::LogRecord; use opentelemetry_sdk::Resource; -use std::borrow::Cow; use std::io::{stdout, Write}; type Encoder = @@ -45,14 +45,12 @@ impl fmt::Debug for LogExporter { #[async_trait] impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { /// Export spans to stdout - async fn export<'a>(&mut self, batch: Vec>>) -> ExportResult { + async fn export<'a>( + &mut self, + batch: Vec<(&'a LogRecord, &'a InstrumentationLibrary)>, + ) -> LogResult<()> { if let Some(writer) = &mut self.writer { - // TODO - Avoid cloning logdata if it is borrowed. - let log_data = crate::logs::transform::LogData::from(( - batch.into_iter().map(Cow::into_owned).collect(), - &self.resource, - )); - let result = (self.encoder)(writer, log_data) as LogResult<()>; + let result = (self.encoder)(writer, (batch, &self.resource).into()) as LogResult<()>; result.and_then(|_| writer.write_all(b"\n").map_err(|e| Error(e).into())) } else { Err("exporter is shut down".into()) diff --git a/opentelemetry-stdout/src/logs/transform.rs b/opentelemetry-stdout/src/logs/transform.rs index 2a72cfc9df..84e864f469 100644 --- a/opentelemetry-stdout/src/logs/transform.rs +++ b/opentelemetry-stdout/src/logs/transform.rs @@ -16,13 +16,19 @@ pub struct LogData { impl From<( - Vec>, + Vec<( + &opentelemetry_sdk::logs::LogRecord, + &opentelemetry::InstrumentationLibrary, + )>, &opentelemetry_sdk::Resource, )> for LogData { fn from( (sdk_logs, sdk_resource): ( - Vec, + Vec<( + &opentelemetry_sdk::logs::LogRecord, + &opentelemetry::InstrumentationLibrary, + )>, &opentelemetry_sdk::Resource, ), ) -> Self { @@ -30,8 +36,8 @@ impl for sdk_log in sdk_logs { let resource_schema_url = sdk_resource.schema_url().map(|s| s.to_string().into()); - let schema_url = sdk_log.instrumentation.schema_url.clone(); - let scope: Scope = sdk_log.instrumentation.clone().into_owned().into(); + let schema_url = sdk_log.1.schema_url.clone(); + let scope: Scope = sdk_log.1.clone().into(); let resource: Resource = sdk_resource.into(); let rl = resource_logs @@ -43,10 +49,10 @@ impl }); match rl.scope_logs.iter_mut().find(|sl| sl.scope == scope) { - Some(sl) => sl.log_records.push(sdk_log.into()), + Some(sl) => sl.log_records.push(sdk_log.0.into()), None => rl.scope_logs.push(ScopeLogs { scope, - log_records: vec![sdk_log.into()], + log_records: vec![sdk_log.0.into()], schema_url, }), } @@ -104,18 +110,17 @@ struct LogRecord { trace_id: Option, } -impl From> for LogRecord { - fn from(value: opentelemetry_sdk::export::logs::LogData) -> Self { +impl From<&opentelemetry_sdk::logs::LogRecord> for LogRecord { + fn from(record: &opentelemetry_sdk::logs::LogRecord) -> Self { LogRecord { attributes: { - let attributes = value - .record + let attributes = record .attributes_iter() .map(|(k, v)| KeyValue::from((k.clone(), v.clone()))) // Map each pair to a KeyValue .collect::>(); // Collect into a Vecs #[cfg(feature = "populate-logs-event-name")] - if let Some(event_name) = value.record.event_name { + if let Some(event_name) = record.event_name { let mut attributes_with_name = attributes; attributes_with_name.push(KeyValue::from(( "name".into(), @@ -129,34 +134,24 @@ impl From> for LogRecord { #[cfg(not(feature = "populate-logs-event-name"))] attributes }, - trace_id: value - .record + trace_id: record .trace_context .as_ref() .map(|c| c.trace_id.to_string()), - span_id: value - .record - .trace_context - .as_ref() - .map(|c| c.span_id.to_string()), - flags: value - .record + span_id: record.trace_context.as_ref().map(|c| c.span_id.to_string()), + flags: record .trace_context .as_ref() .map(|c| c.trace_flags.map(|f| f.to_u8())) .unwrap_or_default(), - time_unix_nano: value.record.timestamp, - time: value.record.timestamp, - observed_time_unix_nano: value.record.observed_timestamp.unwrap(), - observed_time: value.record.observed_timestamp.unwrap(), - severity_number: value - .record - .severity_number - .map(|u| u as u32) - .unwrap_or_default(), + time_unix_nano: record.timestamp, + time: record.timestamp, + observed_time_unix_nano: record.observed_timestamp.unwrap(), + observed_time: record.observed_timestamp.unwrap(), + severity_number: record.severity_number.map(|u| u as u32).unwrap_or_default(), dropped_attributes_count: 0, - severity_text: value.record.severity_text, - body: value.record.body.clone().map(|a| a.into()), + severity_text: record.severity_text, + body: record.body.clone().map(|a| a.into()), } } } From 37b2a267f29fa1c664ca6f377ad0a44353cab89e Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 20 Aug 2024 00:27:35 -0700 Subject: [PATCH 3/7] remove explicit lifetime specifier for LogRecord and InstrumentationLib --- opentelemetry-otlp/src/exporter/http/logs.rs | 5 +---- opentelemetry-otlp/src/exporter/tonic/logs.rs | 5 +---- opentelemetry-otlp/src/logs.rs | 4 ++-- opentelemetry-proto/src/transform/logs.rs | 10 +++++----- opentelemetry-sdk/src/export/logs/mod.rs | 4 ++-- opentelemetry-sdk/src/logs/log_processor.rs | 4 ++-- .../src/testing/logs/in_memory_exporter.rs | 5 +---- opentelemetry-stdout/src/logs/exporter.rs | 5 +---- 8 files changed, 15 insertions(+), 27 deletions(-) diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index 09e35c6804..83f25c1f9f 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -11,10 +11,7 @@ use super::OtlpHttpClient; #[async_trait] impl LogExporter for OtlpHttpClient { - async fn export<'a>( - &mut self, - batch: Vec<(&'a LogRecord, &'a InstrumentationLibrary)>, - ) -> LogResult<()> { + async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> { let client = self .client .lock() diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index f14a4c3d11..5a8d04b97e 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -57,10 +57,7 @@ impl TonicLogsClient { #[async_trait] impl LogExporter for TonicLogsClient { - async fn export<'a>( - &mut self, - batch: Vec<(&'a LogRecord, &'a InstrumentationLibrary)>, - ) -> LogResult<()> { + async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> { let (mut client, metadata, extensions) = match &mut self.inner { Some(inner) => { let (m, e, _) = inner diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index 27d6756252..9a67bf66d9 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -100,9 +100,9 @@ impl LogExporter { #[async_trait] impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { - async fn export<'a>( + async fn export( &mut self, - batch: Vec<(&'a LogRecord, &'a InstrumentationLibrary)>, + batch: Vec<(&LogRecord, &InstrumentationLibrary)>, ) -> opentelemetry::logs::LogResult<()> { self.client.export(batch).await } diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index 808dd1463b..9ab688bac2 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -176,10 +176,10 @@ pub mod tonic { } } - pub fn group_logs_by_resource_and_scope<'a>( + pub fn group_logs_by_resource_and_scope( logs: Vec<( - &'a opentelemetry_sdk::logs::LogRecord, - &'a opentelemetry::InstrumentationLibrary, + &opentelemetry_sdk::logs::LogRecord, + &opentelemetry::InstrumentationLibrary, )>, resource: &ResourceAttributesWithSchema, ) -> Vec { @@ -189,8 +189,8 @@ pub mod tonic { |mut scope_map: HashMap< Cow<'static, str>, Vec<( - &'a opentelemetry_sdk::logs::LogRecord, - &'a opentelemetry::InstrumentationLibrary, + &opentelemetry_sdk::logs::LogRecord, + &opentelemetry::InstrumentationLibrary, )>, >, (log_record, instrumentation)| { diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index ed8ba56e6a..216d813bbf 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -15,9 +15,9 @@ use std::fmt::Debug; #[async_trait] pub trait LogExporter: Send + Sync + Debug { /// Exports a batch of [`LogData`]. - async fn export<'a>( + async fn export( &mut self, - records: Vec<(&'a LogRecord, &'a InstrumentationLibrary)>, + records: Vec<(&LogRecord, &InstrumentationLibrary)>, ) -> LogResult<()>; /// Shuts down the exporter. fn shutdown(&mut self) {} diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index b4df3aa631..9bdd493b8a 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -550,9 +550,9 @@ mod tests { #[async_trait] impl LogExporter for MockLogExporter { - async fn export<'a>( + async fn export( &mut self, - _batch: Vec<(&'a LogRecord, &'a InstrumentationLibrary)>, + _batch: Vec<(&LogRecord, &InstrumentationLibrary)>, ) -> LogResult<()> { Ok(()) } diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index 393b15ddce..346ef9861a 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -185,10 +185,7 @@ impl InMemoryLogsExporter { #[async_trait] impl LogExporter for InMemoryLogsExporter { - async fn export<'a>( - &mut self, - batch: Vec<(&'a LogRecord, &'a InstrumentationLibrary)>, - ) -> LogResult<()> { + async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> { let mut logs_guard = self.logs.lock().map_err(LogError::from)?; for (log_record, instrumentation) in batch.into_iter() { let owned_log = OwnedLogData { diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index 6436e7529a..fd59701c0b 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -45,10 +45,7 @@ impl fmt::Debug for LogExporter { #[async_trait] impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { /// Export spans to stdout - async fn export<'a>( - &mut self, - batch: Vec<(&'a LogRecord, &'a InstrumentationLibrary)>, - ) -> LogResult<()> { + async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()> { if let Some(writer) = &mut self.writer { let result = (self.encoder)(writer, (batch, &self.resource).into()) as LogResult<()>; result.and_then(|_| writer.write_all(b"\n").map_err(|e| Error(e).into())) From d14416fe0414636f81695074fa47d62bbd6ca78c Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 20 Aug 2024 00:34:21 -0700 Subject: [PATCH 4/7] fix leftover specifier removal --- opentelemetry-appender-tracing/benches/logs.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/opentelemetry-appender-tracing/benches/logs.rs b/opentelemetry-appender-tracing/benches/logs.rs index 763b02a0cb..3aa0100e47 100644 --- a/opentelemetry-appender-tracing/benches/logs.rs +++ b/opentelemetry-appender-tracing/benches/logs.rs @@ -34,11 +34,11 @@ struct NoopExporter { #[async_trait] impl LogExporter for NoopExporter { - async fn export<'a>( + async fn export( &mut self, _: Vec<( - &'a opentelemetry_sdk::logs::LogRecord, - &'a opentelemetry::InstrumentationLibrary, + &opentelemetry_sdk::logs::LogRecord, + &opentelemetry::InstrumentationLibrary, )>, ) -> LogResult<()> { LogResult::Ok(()) From d323e0b6998a9a4d6d179e529b813e186c97867d Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 20 Aug 2024 00:41:40 -0700 Subject: [PATCH 5/7] more explicit lifetime specifier removal --- opentelemetry-otlp/src/exporter/http/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index aa50019e5c..1b60971d76 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -328,9 +328,9 @@ impl OtlpHttpClient { } #[cfg(feature = "logs")] - fn build_logs_export_body<'a>( + fn build_logs_export_body( &self, - logs: Vec<(&'a LogRecord, &'a InstrumentationLibrary)>, + logs: Vec<(&LogRecord, &InstrumentationLibrary)>, ) -> opentelemetry::logs::LogResult<(Vec, &'static str)> { use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource); From 4f6dde1dd42420098fd523603910960595108011 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 20 Aug 2024 01:25:42 -0700 Subject: [PATCH 6/7] move LogData from opentelemetry_sdk::exporter::logs to opentelemetry_sdk::logs namespace --- opentelemetry-appender-tracing/benches/logs.rs | 4 ++-- opentelemetry-sdk/benches/log.rs | 2 +- opentelemetry-sdk/benches/log_exporter.rs | 2 +- opentelemetry-sdk/benches/log_processor.rs | 5 +---- opentelemetry-sdk/src/export/logs/mod.rs | 10 ---------- opentelemetry-sdk/src/logs/log_emitter.rs | 7 ++----- opentelemetry-sdk/src/logs/log_processor.rs | 6 ++++-- opentelemetry-sdk/src/logs/mod.rs | 11 +++++++++++ stress/src/logs.rs | 2 +- 9 files changed, 23 insertions(+), 26 deletions(-) diff --git a/opentelemetry-appender-tracing/benches/logs.rs b/opentelemetry-appender-tracing/benches/logs.rs index 3aa0100e47..2a81cb9b1d 100644 --- a/opentelemetry-appender-tracing/benches/logs.rs +++ b/opentelemetry-appender-tracing/benches/logs.rs @@ -18,8 +18,8 @@ use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::logs::LogResult; use opentelemetry::KeyValue; use opentelemetry_appender_tracing::layer as tracing_layer; -use opentelemetry_sdk::export::logs::{LogData, LogExporter}; -use opentelemetry_sdk::logs::{LogProcessor, LoggerProvider}; +use opentelemetry_sdk::export::logs::LogExporter; +use opentelemetry_sdk::logs::{LogData, LogProcessor, LoggerProvider}; use opentelemetry_sdk::Resource; use pprof::criterion::{Output, PProfProfiler}; use tracing::error; diff --git a/opentelemetry-sdk/benches/log.rs b/opentelemetry-sdk/benches/log.rs index 71d5fc699f..840560a1f4 100644 --- a/opentelemetry-sdk/benches/log.rs +++ b/opentelemetry-sdk/benches/log.rs @@ -26,7 +26,7 @@ use opentelemetry::logs::{ use opentelemetry::trace::Tracer; use opentelemetry::trace::TracerProvider as _; use opentelemetry::Key; -use opentelemetry_sdk::export::logs::LogData; +use opentelemetry_sdk::logs::LogData; use opentelemetry_sdk::logs::LogProcessor; use opentelemetry_sdk::logs::{Logger, LoggerProvider}; use opentelemetry_sdk::trace; diff --git a/opentelemetry-sdk/benches/log_exporter.rs b/opentelemetry-sdk/benches/log_exporter.rs index 434fc11ae6..73fde7d61d 100644 --- a/opentelemetry-sdk/benches/log_exporter.rs +++ b/opentelemetry-sdk/benches/log_exporter.rs @@ -19,7 +19,7 @@ use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::logs::{LogRecord as _, LogResult, Logger as _, LoggerProvider as _, Severity}; use opentelemetry::InstrumentationLibrary; -use opentelemetry_sdk::export::logs::LogData; +use opentelemetry_sdk::logs::LogData; use opentelemetry_sdk::logs::LogProcessor; use opentelemetry_sdk::logs::LogRecord; use opentelemetry_sdk::logs::LoggerProvider; diff --git a/opentelemetry-sdk/benches/log_processor.rs b/opentelemetry-sdk/benches/log_processor.rs index fcb729318c..7e78897669 100644 --- a/opentelemetry-sdk/benches/log_processor.rs +++ b/opentelemetry-sdk/benches/log_processor.rs @@ -19,10 +19,7 @@ use std::{ use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::logs::{LogRecord as _, LogResult, Logger as _, LoggerProvider as _, Severity}; -use opentelemetry_sdk::{ - export::logs::LogData, - logs::{LogProcessor, LogRecord, Logger, LoggerProvider}, -}; +use opentelemetry_sdk::logs::{LogData, LogProcessor, LogRecord, Logger, LoggerProvider}; use std::borrow::Cow; // Run this benchmark with: diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index 216d813bbf..b254bd3e55 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -8,7 +8,6 @@ use opentelemetry::{ logs::{LogError, LogResult}, InstrumentationLibrary, }; -use std::borrow::Cow; use std::fmt::Debug; /// `LogExporter` defines the interface that log exporters should implement. @@ -31,14 +30,5 @@ pub trait LogExporter: Send + Sync + Debug { fn set_resource(&mut self, _resource: &Resource) {} } -/// `LogData` represents a single log event without resource context. -#[derive(Clone, Debug)] -pub struct LogData<'a> { - /// Log record, which can be borrowed or owned. - pub record: Cow<'a, LogRecord>, - /// Instrumentation details for the emitter who produced this `LogEvent`. - pub instrumentation: Cow<'a, InstrumentationLibrary>, -} - /// Describes the result of an export. pub type ExportResult = Result<(), LogError>; diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 604a1799e5..ff4f7b57ba 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -1,9 +1,6 @@ use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, TraceContext}; -use crate::{ - export::logs::{LogData, LogExporter}, - runtime::RuntimeChannel, - Resource, -}; +use crate::logs::LogData; +use crate::{export::logs::LogExporter, runtime::RuntimeChannel, Resource}; use opentelemetry::{ global, logs::{LogError, LogResult}, diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 9bdd493b8a..072e350ca4 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -1,5 +1,6 @@ +use crate::logs::LogData; use crate::{ - export::logs::{ExportResult, LogData, LogExporter}, + export::logs::{ExportResult, LogExporter}, runtime::{RuntimeChannel, TrySend}, Resource, }; @@ -518,10 +519,11 @@ mod tests { BatchLogProcessor, OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, OTEL_BLRP_MAX_QUEUE_SIZE, OTEL_BLRP_SCHEDULE_DELAY, }; + use crate::logs::LogData; use crate::logs::LogRecord; use crate::testing::logs::InMemoryLogsExporterBuilder; use crate::{ - export::logs::{LogData, LogExporter}, + export::logs::LogExporter, logs::{ log_processor::{ OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, diff --git a/opentelemetry-sdk/src/logs/mod.rs b/opentelemetry-sdk/src/logs/mod.rs index 5d2e72719b..92e384ee41 100644 --- a/opentelemetry-sdk/src/logs/mod.rs +++ b/opentelemetry-sdk/src/logs/mod.rs @@ -4,12 +4,23 @@ mod log_emitter; mod log_processor; mod record; +use crate::InstrumentationLibrary; pub use log_emitter::{Builder, Logger, LoggerProvider}; pub use log_processor::{ BatchConfig, BatchConfigBuilder, BatchLogProcessor, BatchLogProcessorBuilder, LogProcessor, SimpleLogProcessor, }; pub use record::{LogRecord, TraceContext}; +use std::borrow::Cow; + +/// `LogData` represents a single log event without resource context. +#[derive(Clone, Debug)] +pub struct LogData<'a> { + /// Log record, which can be borrowed or owned. + pub record: Cow<'a, LogRecord>, + /// Instrumentation details for the emitter who produced this `LogEvent`. + pub instrumentation: Cow<'a, InstrumentationLibrary>, +} #[cfg(all(test, feature = "testing"))] mod tests { diff --git a/stress/src/logs.rs b/stress/src/logs.rs index 6cec97463c..1798401e32 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -20,7 +20,7 @@ mod throughput; pub struct NoOpLogProcessor; impl LogProcessor for NoOpLogProcessor { - fn emit(&self, _data: &mut opentelemetry_sdk::export::logs::LogData) {} + fn emit(&self, _data: &mut opentelemetry_sdk::logs::LogData) {} fn force_flush(&self) -> opentelemetry::logs::LogResult<()> { Ok(()) From 864aa544b54321168a921780599756148446bd79 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 20 Aug 2024 02:00:49 -0700 Subject: [PATCH 7/7] add changelog, fix doc --- opentelemetry-sdk/CHANGELOG.md | 38 ++++++++++++++++++++++++ opentelemetry-sdk/src/export/logs/mod.rs | 2 +- 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 496cc3543f..c674857dd2 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -25,6 +25,44 @@ [#2021](https://github.com/open-telemetry/opentelemetry-rust/pull/2021) - Provide default implementation for `event_enabled` method in `LogProcessor` trait that returns `true` always. +- **Breaking** [#2035](https://github.com/open-telemetry/opentelemetry-rust/pull/2035) + - The Exporter::export() interface is modified as below: + Previous Signature: + ```rust + async fn export<'a>(&mut self, batch: Vec>) -> LogResult<()>; + ``` + + Updated Signature: + ```rust + async fn export(&mut self, batch: Vec<(&LogRecord, &InstrumentationLibrary)>) -> LogResult<()>; + ``` + This change simplifies the processing required by exporters. Exporters no longer need to determine if the LogData is borrowed or owned, as they now work directly with references. As a result, exporters must explicitly create a copy of LogRecord and/or InstrumentationLibrary when needed, as the new interface only provides references to these structures. + + - The LogData structure is NO longer the part of the export interface. So it has been moved from `opentelemetry_sdk::export::logs` to `opentelemetry_sdk::logs` namespace. The custom implementations of `LogProcessor` need to update the imports accordindgly. + + - The LogData structure has been changed as below: + Previous Signature + ```rust + #[derive(Clone, Debug)] + pub struct LogData { + /// Log record + pub record: LogRecord, + /// Instrumentation details for the emitter who produced this `LogEvent`. + pub instrumentation: InstrumentationLibrary, + ``` + + Updated Signature: + ```rust + #[derive(Clone, Debug)] + pub struct LogData<'a> { + /// Log record, which can be borrowed or owned. + pub record: Cow<'a, LogRecord>, + /// Instrumentation details for the emitter who produced this `LogEvent`. + pub instrumentation: Cow<'a, InstrumentationLibrary>, + } + ``` + The custom implementation of `LogProcessor` need to accordingly modify the handling of LogData + received through LogProcessor::emit() interface. ## v0.24.1 diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index b254bd3e55..353c89042c 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -13,7 +13,7 @@ use std::fmt::Debug; /// `LogExporter` defines the interface that log exporters should implement. #[async_trait] pub trait LogExporter: Send + Sync + Debug { - /// Exports a batch of [`LogData`]. + /// Exports a batch of [`LogRecord`, `InstrumentationLibrary`]. async fn export( &mut self, records: Vec<(&LogRecord, &InstrumentationLibrary)>,