From c841e53b82c81ff3d3422ffe0499345055e74efd Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Fri, 17 Jan 2025 15:19:32 -0800 Subject: [PATCH 1/4] initial commit --- opentelemetry-sdk/src/trace/span_processor.rs | 55 ++++++++++++++++++- 1 file changed, 52 insertions(+), 3 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 53ee2f9bc0..305371cfc6 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -221,12 +221,15 @@ use futures_executor::block_on; use std::sync::mpsc::sync_channel; use std::sync::mpsc::RecvTimeoutError; use std::sync::mpsc::SyncSender; +use std::sync::mpsc::Receiver; +use crate::export::trace::ExportResult; /// Messages exchanged between the main thread and the background thread. #[allow(clippy::large_enum_variant)] #[derive(Debug)] enum BatchMessage { - ExportSpan(SpanData), + //ExportSpan(SpanData), + ExportSpan(Arc), ForceFlush(SyncSender>), Shutdown(SyncSender>), SetResource(Arc), @@ -235,12 +238,16 @@ enum BatchMessage { /// A batch span processor with a dedicated background thread. #[derive(Debug)] pub struct BatchSpanProcessor { - message_sender: SyncSender, + span_sender: SyncSender, // Data channel to store spans + message_sender: SyncSender, // Control channel to store control messages. handle: Mutex>>, forceflush_timeout: Duration, shutdown_timeout: Duration, is_shutdown: AtomicBool, dropped_span_count: Arc, + export_span_message_sent: Arc, + current_batch_size: Arc, + max_export_batch_size: usize, } impl BatchSpanProcessor { @@ -255,7 +262,12 @@ impl BatchSpanProcessor { where E: SpanExporter + Send + 'static, { - let (message_sender, message_receiver) = sync_channel(config.max_queue_size); + let (message_sender, message_receiver) = sync_channel::(config.max_queue_size); + let (message_sender, message_receiver) = sync_channel::(64); // Is this a reasonable bound? + let max_queue_size = config.max_queue_size; + let max_export_batch_size = config.max_export_batch_size; + let current_batch_size = Arc::new(AtomicUsize::new(0)); + let current_batch_size_for_thread = current_batch_size.clone(); let handle = thread::Builder::new() .name("OpenTelemetry.Traces.BatchProcessor".to_string()) @@ -268,7 +280,41 @@ impl BatchSpanProcessor { ); let mut spans = Vec::with_capacity(config.max_export_batch_size); let mut last_export_time = Instant::now(); + let current_batch_size = current_batch_size_for_thread; + // This method gets upto `max_export_batch_size` amount of spans from the channel and exports them. + // It returns the result of the export operation. + // It expects the span vec to be empty when it's called. + #[inline] + fn get_spans_and_export( + spans_receiver: &Receiver, + exporter: &E, + spans: &mut Vec, + last_export_time: &mut Instant, + current_batch_size: &AtomicUsize, + config: &BatchConfig, + ) -> ExportResult + where + E: SpanExporter + Send + Sync + 'static, + { + // Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec + while let Ok(log) = spans_receiver.try_recv() { + spans.push(log); + if spans.len() == config.max_export_batch_size { + break; + } + } + + let count_of_logs = spans.len(); // Count of logs that will be exported + let result = export_with_timeout_sync( + config.max_export_timeout, + exporter, + spans, + last_export_time, + ); // This method clears the logs vec after exporting + current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed); + result + } loop { let remaining_time_option = config .scheduled_delay @@ -280,6 +326,9 @@ impl BatchSpanProcessor { match message_receiver.recv_timeout(remaining_time) { Ok(message) => match message { BatchMessage::ExportSpan(span) => { + otel_debug!( + name: "BatchSpanProcessor.ExportingDueToBatchSize", + ); spans.push(span); if spans.len() >= config.max_queue_size || last_export_time.elapsed() >= config.scheduled_delay From 0f38bd4a9ab50ed70cd748fef428d023871604c8 Mon Sep 17 00:00:00 2001 From: Lalit Date: Mon, 20 Jan 2025 20:06:24 -0800 Subject: [PATCH 2/4] initial commit --- opentelemetry-sdk/src/trace/span_processor.rs | 230 +++++++++++++----- 1 file changed, 163 insertions(+), 67 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 305371cfc6..83a8f49ff2 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -85,6 +85,7 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// `on_end` is called after a `Span` is ended (i.e., the end timestamp is /// already set). This method is called synchronously within the `Span::end` /// API, therefore it should not block or throw an exception. + /// TODO - This method should take reference to `SpanData` fn on_end(&self, span: SpanData); /// Force the spans lying in the cache to be exported. fn force_flush(&self) -> TraceResult<()>; @@ -163,6 +164,7 @@ impl SpanProcessor for SimpleSpanProcessor { } } +use crate::export::trace::ExportResult; /// The `BatchSpanProcessor` collects finished spans in a buffer and exports them /// in batches to the configured `SpanExporter`. This processor is ideal for /// high-throughput environments, as it minimizes the overhead of exporting spans @@ -217,12 +219,10 @@ impl SpanProcessor for SimpleSpanProcessor { /// provider.shutdown(); /// } /// ``` -use futures_executor::block_on; use std::sync::mpsc::sync_channel; +use std::sync::mpsc::Receiver; use std::sync::mpsc::RecvTimeoutError; use std::sync::mpsc::SyncSender; -use std::sync::mpsc::Receiver; -use crate::export::trace::ExportResult; /// Messages exchanged between the main thread and the background thread. #[allow(clippy::large_enum_variant)] @@ -248,6 +248,7 @@ pub struct BatchSpanProcessor { export_span_message_sent: Arc, current_batch_size: Arc, max_export_batch_size: usize, + max_queue_size: usize, } impl BatchSpanProcessor { @@ -262,7 +263,7 @@ impl BatchSpanProcessor { where E: SpanExporter + Send + 'static, { - let (message_sender, message_receiver) = sync_channel::(config.max_queue_size); + let (span_sender, span_receiver) = sync_channel::(config.max_queue_size); let (message_sender, message_receiver) = sync_channel::(64); // Is this a reasonable bound? let max_queue_size = config.max_queue_size; let max_export_batch_size = config.max_export_batch_size; @@ -281,40 +282,6 @@ impl BatchSpanProcessor { let mut spans = Vec::with_capacity(config.max_export_batch_size); let mut last_export_time = Instant::now(); let current_batch_size = current_batch_size_for_thread; - // This method gets upto `max_export_batch_size` amount of spans from the channel and exports them. - // It returns the result of the export operation. - // It expects the span vec to be empty when it's called. - #[inline] - fn get_spans_and_export( - spans_receiver: &Receiver, - exporter: &E, - spans: &mut Vec, - last_export_time: &mut Instant, - current_batch_size: &AtomicUsize, - config: &BatchConfig, - ) -> ExportResult - where - E: SpanExporter + Send + Sync + 'static, - { - // Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec - while let Ok(log) = spans_receiver.try_recv() { - spans.push(log); - if spans.len() == config.max_export_batch_size { - break; - } - } - - let count_of_logs = spans.len(); // Count of logs that will be exported - let result = export_with_timeout_sync( - config.max_export_timeout, - exporter, - spans, - last_export_time, - ); // This method clears the logs vec after exporting - - current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed); - result - } loop { let remaining_time_option = config .scheduled_delay @@ -325,31 +292,52 @@ impl BatchSpanProcessor { }; match message_receiver.recv_timeout(remaining_time) { Ok(message) => match message { - BatchMessage::ExportSpan(span) => { + BatchMessage::ExportSpan(export_span_message_sent) => { otel_debug!( name: "BatchSpanProcessor.ExportingDueToBatchSize", ); - spans.push(span); - if spans.len() >= config.max_queue_size - || last_export_time.elapsed() >= config.scheduled_delay - { - if let Err(err) = block_on(exporter.export(spans.split_off(0))) - { - otel_error!( - name: "BatchSpanProcessor.ExportError", - error = format!("{}", err) - ); - } - last_export_time = Instant::now(); - } + let _ = Self::get_spans_and_export( + &span_receiver, + &mut exporter, + &mut spans, + &mut last_export_time, + ¤t_batch_size, + &config, + ); + // Reset the export span message sent flag now it has has been processed. + export_span_message_sent.store(false, Ordering::Relaxed); } BatchMessage::ForceFlush(sender) => { - let result = block_on(exporter.export(spans.split_off(0))); + otel_debug!(name: "BatchSpanProcessor.ExportingDueToForceFlush"); + let result = Self::get_spans_and_export( + &span_receiver, + &mut exporter, + &mut spans, + &mut last_export_time, + ¤t_batch_size, + &config, + ); let _ = sender.send(result); } BatchMessage::Shutdown(sender) => { - let result = block_on(exporter.export(spans.split_off(0))); + otel_debug!(name: "BatchSpanProcessor.ExportingDueToShutdown"); + let result = Self::get_spans_and_export( + &span_receiver, + &mut exporter, + &mut spans, + &mut last_export_time, + ¤t_batch_size, + &config, + ); let _ = sender.send(result); + + otel_debug!( + name: "BatchSpanProcessor.ThreadExiting", + reason = "ShutdownRequested" + ); + // + // break out the loop and return from the current background thread. + // break; } BatchMessage::SetResource(resource) => { @@ -357,15 +345,18 @@ impl BatchSpanProcessor { } }, Err(RecvTimeoutError::Timeout) => { - if last_export_time.elapsed() >= config.scheduled_delay { - if let Err(err) = block_on(exporter.export(spans.split_off(0))) { - otel_error!( - name: "BatchSpanProcessor.ExportError", - error = format!("{}", err) - ); - } - last_export_time = Instant::now(); - } + otel_debug!( + name: "BatchLogProcessor.ExportingDueToTimer", + ); + + let _ = Self::get_spans_and_export( + &span_receiver, + &mut exporter, + &mut spans, + &mut last_export_time, + ¤t_batch_size, + &config, + ); } Err(RecvTimeoutError::Disconnected) => { // Channel disconnected, only thing to do is break @@ -385,12 +376,17 @@ impl BatchSpanProcessor { .expect("Failed to spawn thread"); //TODO: Handle thread spawn failure Self { + span_sender, message_sender, handle: Mutex::new(Some(handle)), forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable is_shutdown: AtomicBool::new(false), dropped_span_count: Arc::new(AtomicUsize::new(0)), + max_queue_size, + export_span_message_sent: Arc::new(AtomicBool::new(false)), + current_batch_size, + max_export_batch_size, } } @@ -404,6 +400,72 @@ impl BatchSpanProcessor { config: BatchConfig::default(), } } + + // This method gets upto `max_export_batch_size` amount of spans from the channel and exports them. + // It returns the result of the export operation. + // It expects the span vec to be empty when it's called. + #[inline] + fn get_spans_and_export( + spans_receiver: &Receiver, + exporter: &mut E, + spans: &mut Vec, + last_export_time: &mut Instant, + current_batch_size: &AtomicUsize, + config: &BatchConfig, + ) -> ExportResult + where + E: SpanExporter + Send + Sync + 'static, + { + // Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec + while let Ok(log) = spans_receiver.try_recv() { + spans.push(log); + if spans.len() == config.max_export_batch_size { + break; + } + } + + let count_of_logs = spans.len(); // Count of logs that will be exported + let result = Self::export_with_timeout_sync( + config.max_export_timeout, + exporter, + spans, + last_export_time, + ); // This method clears the logs vec after exporting + + current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed); + result + } + + #[allow(clippy::vec_box)] + fn export_with_timeout_sync( + _: Duration, // TODO, enforcing timeout in exporter. + exporter: &mut E, + batch: &mut Vec, + last_export_time: &mut Instant, + ) -> ExportResult + where + E: SpanExporter + Send + Sync + 'static, + { + *last_export_time = Instant::now(); + + if batch.is_empty() { + return TraceResult::Ok(()); + } + + let export = exporter.export(batch.split_off(0)); + let export_result = futures_executor::block_on(export); + + match export_result { + Ok(_) => TraceResult::Ok(()), + Err(err) => { + otel_error!( + name: "BatchLogProcessor.ExportError", + error = format!("{}", err) + ); + TraceResult::Err(err) + } + } + } } impl SpanProcessor for BatchSpanProcessor { @@ -418,10 +480,11 @@ impl SpanProcessor for BatchSpanProcessor { // this is a warning, as the user is trying to emit after the processor has been shutdown otel_warn!( name: "BatchSpanProcessor.Emit.ProcessorShutdown", + message = "BatchSpanProcessor has been shutdown. No further spans will be emitted." ); return; } - let result = self.message_sender.try_send(BatchMessage::ExportSpan(span)); + let result = self.span_sender.try_send(span); if result.is_err() { // Increment dropped span count. The first time we have to drop a span, @@ -431,6 +494,36 @@ impl SpanProcessor for BatchSpanProcessor { message = "BatchSpanProcessor dropped a Span due to queue full/internal errors. No further internal log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total Spans dropped."); } } + // At this point, sending the log record to the data channel was successful. + // Increment the current batch size and check if it has reached the max export batch size. + if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1 >= self.max_export_batch_size + { + // Check if the a control message for exporting logs is already sent to the worker thread. + // If not, send a control message to export logs. + // `export_span_message_sent` is set to false ONLY when the worker thread has processed the control message. + + if !self.export_span_message_sent.load(Ordering::Relaxed) { + // This is a cost-efficient check as atomic load operations do not require exclusive access to cache line. + // Perform atomic swap to `export_log_message_sent` ONLY when the atomic load operation above returns false. + // Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures. + // We could have used compare_exchange as well here, but it's more verbose than swap. + if !self.export_span_message_sent.swap(true, Ordering::Relaxed) { + match self.message_sender.try_send(BatchMessage::ExportSpan( + self.export_span_message_sent.clone(), + )) { + Ok(_) => { + // Control message sent successfully. + } + Err(_err) => { + // TODO: Log error + // If the control message could not be sent, reset the `export_log_message_sent` flag. + self.export_span_message_sent + .store(false, Ordering::Relaxed); + } + } + } + } + } } /// Flushes all pending spans. @@ -450,17 +543,20 @@ impl SpanProcessor for BatchSpanProcessor { /// Shuts down the processor. fn shutdown(&self) -> TraceResult<()> { + if self.is_shutdown.swap(true, Ordering::Relaxed) { + return Err(TraceError::Other("Processor already shutdown".into())); + } let dropped_spans = self.dropped_span_count.load(Ordering::Relaxed); + let max_queue_size = self.max_queue_size; if dropped_spans > 0 { otel_warn!( name: "BatchSpanProcessor.LogsDropped", dropped_span_count = dropped_spans, + max_queue_size = max_queue_size, message = "Spans were dropped due to a queue being full or other error. The count represents the total count of spans dropped in the lifetime of this BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals." ); } - if self.is_shutdown.swap(true, Ordering::Relaxed) { - return Err(TraceError::Other("Processor already shutdown".into())); - } + let (sender, receiver) = sync_channel(1); self.message_sender .try_send(BatchMessage::Shutdown(sender)) From 46aaac9021b8e854610d23d7ffd246566bb3d592 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 21 Jan 2025 10:47:17 -0800 Subject: [PATCH 3/4] replace logs with span where relevant --- opentelemetry-sdk/src/trace/span_processor.rs | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 83a8f49ff2..97ce39bb0f 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -346,7 +346,7 @@ impl BatchSpanProcessor { }, Err(RecvTimeoutError::Timeout) => { otel_debug!( - name: "BatchLogProcessor.ExportingDueToTimer", + name: "BatchSpanProcessor.ExportingDueToTimer", ); let _ = Self::get_spans_and_export( @@ -416,23 +416,23 @@ impl BatchSpanProcessor { where E: SpanExporter + Send + Sync + 'static, { - // Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec - while let Ok(log) = spans_receiver.try_recv() { - spans.push(log); + // Get upto `max_export_batch_size` amount of spans from the channel and push them to the span vec + while let Ok(span) = spans_receiver.try_recv() { + spans.push(span); if spans.len() == config.max_export_batch_size { break; } } - let count_of_logs = spans.len(); // Count of logs that will be exported + let count_of_spans = spans.len(); // Count of spans that will be exported let result = Self::export_with_timeout_sync( config.max_export_timeout, exporter, spans, last_export_time, - ); // This method clears the logs vec after exporting + ); // This method clears the spans vec after exporting - current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed); + current_batch_size.fetch_sub(count_of_spans, Ordering::Relaxed); result } @@ -459,7 +459,7 @@ impl BatchSpanProcessor { Ok(_) => TraceResult::Ok(()), Err(err) => { otel_error!( - name: "BatchLogProcessor.ExportError", + name: "BatchSpanProcessor.ExportError", error = format!("{}", err) ); TraceResult::Err(err) @@ -494,17 +494,17 @@ impl SpanProcessor for BatchSpanProcessor { message = "BatchSpanProcessor dropped a Span due to queue full/internal errors. No further internal log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total Spans dropped."); } } - // At this point, sending the log record to the data channel was successful. + // At this point, sending the span to the data channel was successful. // Increment the current batch size and check if it has reached the max export batch size. if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1 >= self.max_export_batch_size { - // Check if the a control message for exporting logs is already sent to the worker thread. - // If not, send a control message to export logs. + // Check if the a control message for exporting spans is already sent to the worker thread. + // If not, send a control message to export spans. // `export_span_message_sent` is set to false ONLY when the worker thread has processed the control message. if !self.export_span_message_sent.load(Ordering::Relaxed) { // This is a cost-efficient check as atomic load operations do not require exclusive access to cache line. - // Perform atomic swap to `export_log_message_sent` ONLY when the atomic load operation above returns false. + // Perform atomic swap to `export_span_message_sent` ONLY when the atomic load operation above returns false. // Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures. // We could have used compare_exchange as well here, but it's more verbose than swap. if !self.export_span_message_sent.swap(true, Ordering::Relaxed) { @@ -516,7 +516,7 @@ impl SpanProcessor for BatchSpanProcessor { } Err(_err) => { // TODO: Log error - // If the control message could not be sent, reset the `export_log_message_sent` flag. + // If the control message could not be sent, reset the `export_span_message_sent` flag. self.export_span_message_sent .store(false, Ordering::Relaxed); } @@ -550,7 +550,7 @@ impl SpanProcessor for BatchSpanProcessor { let max_queue_size = self.max_queue_size; if dropped_spans > 0 { otel_warn!( - name: "BatchSpanProcessor.LogsDropped", + name: "BatchSpanProcessor.SpansDropped", dropped_span_count = dropped_spans, max_queue_size = max_queue_size, message = "Spans were dropped due to a queue being full or other error. The count represents the total count of spans dropped in the lifetime of this BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals." From 5fce0a5cfc258acaa954ac29dc86748bd9b84042 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 21 Jan 2025 13:39:01 -0800 Subject: [PATCH 4/4] review comment to move export_span_message_sent before actual export --- opentelemetry-sdk/src/trace/span_processor.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 97ce39bb0f..87bf76b9c2 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -293,6 +293,8 @@ impl BatchSpanProcessor { match message_receiver.recv_timeout(remaining_time) { Ok(message) => match message { BatchMessage::ExportSpan(export_span_message_sent) => { + // Reset the export span message sent flag now it has has been processed. + export_span_message_sent.store(false, Ordering::Relaxed); otel_debug!( name: "BatchSpanProcessor.ExportingDueToBatchSize", ); @@ -304,8 +306,6 @@ impl BatchSpanProcessor { ¤t_batch_size, &config, ); - // Reset the export span message sent flag now it has has been processed. - export_span_message_sent.store(false, Ordering::Relaxed); } BatchMessage::ForceFlush(sender) => { otel_debug!(name: "BatchSpanProcessor.ExportingDueToForceFlush");