Skip to content

fix: Processor now gets passed in reference #2726 #2895

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion examples/tracing-http-propagator/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl SpanProcessor for EnrichWithBaggageSpanProcessor {
}
}

fn on_end(&self, _span: opentelemetry_sdk::trace::SpanData) {}
fn on_end(&self, _span: &mut opentelemetry_sdk::trace::SpanData) {}
}

fn init_tracer() -> SdkTracerProvider {
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/benches/batch_span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ fn criterion_benchmark(c: &mut Criterion) {
let span_processor = shared_span_processor.clone();
let spans = get_span_data();
handles.push(tokio::spawn(async move {
for span in spans {
span_processor.on_end(span);
for mut span in spans {
span_processor.on_end(&mut span);
tokio::task::yield_now().await;
}
}));
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ mod tests {
}
}

fn on_end(&self, _span: SpanData) {
fn on_end(&self, _span: &mut SpanData) {
// TODO: Accessing Context::current() will panic today and hence commented out.
// See https://github.com/open-telemetry/opentelemetry-rust/issues/2871
// let _c = Context::current();
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/trace/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ mod tests {
.fetch_add(1, Ordering::SeqCst);
}

fn on_end(&self, _span: SpanData) {
fn on_end(&self, _span: &mut SpanData) {
// ignore
}

Expand Down Expand Up @@ -779,7 +779,7 @@ mod tests {
// No operation needed for this processor
}

fn on_end(&self, _span: SpanData) {
fn on_end(&self, _span: &mut SpanData) {
// No operation needed for this processor
}

Expand Down
29 changes: 16 additions & 13 deletions opentelemetry-sdk/src/trace/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,20 @@
fn ensure_ended_and_exported(&mut self, timestamp: Option<SystemTime>) {
// skip if data has already been exported
let mut data = match self.data.take() {
Some(data) => data,
Some(data) => crate::trace::SpanData {
span_context: self.span_context.clone(),
parent_span_id: data.parent_span_id,
span_kind: data.span_kind,
name: data.name,
start_time: data.start_time,
end_time: data.end_time,
attributes: data.attributes,
dropped_attributes_count: data.dropped_attributes_count,
events: data.events,
links: data.links,
status: data.status,
instrumentation_scope: self.tracer.instrumentation_scope().clone(),
},
None => return,
};

Expand All @@ -219,20 +232,10 @@

match provider.span_processors() {
[] => {}
[processor] => {
processor.on_end(build_export_data(
data,
self.span_context.clone(),
&self.tracer,
));
}
[processor] => processor.on_end(&mut data),
processors => {
for processor in processors {
processor.on_end(build_export_data(
data.clone(),
self.span_context.clone(),
&self.tracer,
));
processor.on_end(&mut data);

Check warning on line 238 in opentelemetry-sdk/src/trace/span.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span.rs#L238

Added line #L238 was not covered by tests
}
}
}
Expand Down
65 changes: 32 additions & 33 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
/// `on_end` is called after a `Span` is ended (i.e., the end timestamp is
/// already set). This method is called synchronously within the `Span::end`
/// API, therefore it should not block or throw an exception.
/// TODO - This method should take reference to `SpanData`
fn on_end(&self, span: SpanData);
fn on_end(&self, span: &mut SpanData);
/// Force the spans lying in the cache to be exported.
fn force_flush(&self) -> OTelSdkResult;
/// Shuts down the processor. Called when SDK is shut down. This is an
Expand Down Expand Up @@ -129,7 +128,7 @@ impl<T: SpanExporter> SpanProcessor for SimpleSpanProcessor<T> {
// Ignored
}

fn on_end(&self, span: SpanData) {
fn on_end(&self, span: &mut SpanData) {
if !span.span_context.is_sampled() {
return;
}
Expand All @@ -138,7 +137,7 @@ impl<T: SpanExporter> SpanProcessor for SimpleSpanProcessor<T> {
.exporter
.lock()
.map_err(|_| OTelSdkError::InternalFailure("SimpleSpanProcessor mutex poison".into()))
.and_then(|exporter| futures_executor::block_on(exporter.export(vec![span])));
.and_then(|exporter| futures_executor::block_on(exporter.export(vec![span.clone()])));

if let Err(err) = result {
// TODO: check error type, and log `error` only if the error is user-actionable, else log `debug`
Expand Down Expand Up @@ -513,7 +512,7 @@ impl SpanProcessor for BatchSpanProcessor {
}

/// Handles span end.
fn on_end(&self, span: SpanData) {
fn on_end(&self, span: &mut SpanData) {
if self.is_shutdown.load(Ordering::Relaxed) {
// this is a warning, as the user is trying to emit after the processor has been shutdown
otel_warn!(
Expand All @@ -522,7 +521,7 @@ impl SpanProcessor for BatchSpanProcessor {
);
return;
}
let result = self.span_sender.try_send(span);
let result = self.span_sender.try_send(span.clone());

if result.is_err() {
// Increment dropped span count. The first time we have to drop a span,
Expand Down Expand Up @@ -876,8 +875,8 @@ mod tests {
fn simple_span_processor_on_end_calls_export() {
let exporter = InMemorySpanExporterBuilder::new().build();
let processor = SimpleSpanProcessor::new(exporter.clone());
let span_data = new_test_export_span_data();
processor.on_end(span_data.clone());
let mut span_data = new_test_export_span_data();
processor.on_end(&mut span_data);
assert_eq!(exporter.get_finished_spans().unwrap()[0], span_data);
let _result = processor.shutdown();
}
Expand All @@ -886,7 +885,7 @@ mod tests {
fn simple_span_processor_on_end_skips_export_if_not_sampled() {
let exporter = InMemorySpanExporterBuilder::new().build();
let processor = SimpleSpanProcessor::new(exporter.clone());
let unsampled = SpanData {
let mut unsampled = SpanData {
span_context: SpanContext::empty_context(),
parent_span_id: SpanId::INVALID,
span_kind: SpanKind::Internal,
Expand All @@ -900,16 +899,16 @@ mod tests {
status: Status::Unset,
instrumentation_scope: Default::default(),
};
processor.on_end(unsampled);
processor.on_end(&mut unsampled);
assert!(exporter.get_finished_spans().unwrap().is_empty());
}

#[test]
fn simple_span_processor_shutdown_calls_shutdown() {
let exporter = InMemorySpanExporterBuilder::new().build();
let processor = SimpleSpanProcessor::new(exporter.clone());
let span_data = new_test_export_span_data();
processor.on_end(span_data.clone());
let mut span_data = new_test_export_span_data();
processor.on_end(&mut span_data);
assert!(!exporter.get_finished_spans().unwrap().is_empty());
let _result = processor.shutdown();
// Assume shutdown is called by ensuring spans are empty in the exporter
Expand Down Expand Up @@ -1110,8 +1109,8 @@ mod tests {
.build();
let processor = BatchSpanProcessor::new(exporter, config);

let test_span = create_test_span("test_span");
processor.on_end(test_span.clone());
let mut test_span = create_test_span("test_span");
processor.on_end(&mut test_span);

// Wait for flush interval to ensure the span is processed
std::thread::sleep(Duration::from_secs(6));
Expand All @@ -1133,8 +1132,8 @@ mod tests {
let processor = BatchSpanProcessor::new(exporter, config);

// Create a test span and send it to the processor
let test_span = create_test_span("force_flush_span");
processor.on_end(test_span.clone());
let mut test_span = create_test_span("force_flush_span");
processor.on_end(&mut test_span);

// Call force_flush to immediately export the spans
let flush_result = processor.force_flush();
Expand Down Expand Up @@ -1162,8 +1161,8 @@ mod tests {
let processor = BatchSpanProcessor::new(exporter, config);

// Create a test span and send it to the processor
let test_span = create_test_span("shutdown_span");
processor.on_end(test_span.clone());
let mut test_span = create_test_span("shutdown_span");
processor.on_end(&mut test_span);

// Call shutdown to flush and export all pending spans
let shutdown_result = processor.shutdown();
Expand Down Expand Up @@ -1197,13 +1196,13 @@ mod tests {
let processor = BatchSpanProcessor::new(exporter, config);

// Create test spans and send them to the processor
let span1 = create_test_span("span1");
let span2 = create_test_span("span2");
let span3 = create_test_span("span3"); // This span should be dropped
let mut span1 = create_test_span("span1");
let mut span2 = create_test_span("span2");
let mut span3 = create_test_span("span3"); // This span should be dropped

processor.on_end(span1.clone());
processor.on_end(span2.clone());
processor.on_end(span3.clone()); // This span exceeds the queue size
processor.on_end(&mut span1);
processor.on_end(&mut span2);
processor.on_end(&mut span3); // This span exceeds the queue size

// Wait for the scheduled delay to expire
std::thread::sleep(Duration::from_secs(3));
Expand Down Expand Up @@ -1243,7 +1242,7 @@ mod tests {
KeyValue::new("key1", "value1"),
KeyValue::new("key2", "value2"),
];
processor.on_end(span_data.clone());
processor.on_end(&mut span_data);

// Force flush to export the span
let _ = processor.force_flush();
Expand Down Expand Up @@ -1273,8 +1272,8 @@ mod tests {
processor.set_resource(&resource);

// Create a span and send it to the processor
let test_span = create_test_span("resource_test");
processor.on_end(test_span.clone());
let mut test_span = create_test_span("resource_test");
processor.on_end(&mut test_span);

// Force flush to ensure the span is exported
let _ = processor.force_flush();
Expand Down Expand Up @@ -1308,8 +1307,8 @@ mod tests {
let processor = BatchSpanProcessor::new(exporter, config);

for _ in 0..4 {
let span = new_test_export_span_data();
processor.on_end(span);
let mut span = new_test_export_span_data();
processor.on_end(&mut span);
}

processor.force_flush().unwrap();
Expand All @@ -1331,8 +1330,8 @@ mod tests {
let processor = BatchSpanProcessor::new(exporter, config);

for _ in 0..4 {
let span = new_test_export_span_data();
processor.on_end(span);
let mut span = new_test_export_span_data();
processor.on_end(&mut span);
}

processor.force_flush().unwrap();
Expand All @@ -1358,8 +1357,8 @@ mod tests {
for _ in 0..10 {
let processor_clone = Arc::clone(&processor);
let handle = tokio::spawn(async move {
let span = new_test_export_span_data();
processor_clone.on_end(span);
let mut span = new_test_export_span_data();
processor_clone.on_end(&mut span);
});
handles.push(handle);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,14 @@ impl<R: RuntimeChannel> SpanProcessor for BatchSpanProcessor<R> {
// Ignored
}

fn on_end(&self, span: SpanData) {
fn on_end(&self, span: &mut SpanData) {
if !span.span_context.is_sampled() {
return;
}

let result = self.message_sender.try_send(BatchMessage::ExportSpan(span));
let result = self
.message_sender
.try_send(BatchMessage::ExportSpan(span.clone()));

// If the queue is full, and we can't buffer a span
if result.is_err() {
Expand Down Expand Up @@ -518,7 +520,7 @@ mod tests {
}
});
tokio::time::sleep(Duration::from_secs(1)).await; // skip the first
processor.on_end(new_test_export_span_data());
processor.on_end(&mut new_test_export_span_data());
let flush_res = processor.force_flush();
assert!(flush_res.is_ok());
let _shutdown_result = processor.shutdown();
Expand All @@ -545,7 +547,7 @@ mod tests {
};
let processor = BatchSpanProcessor::new(exporter, config, runtime::TokioCurrentThread);
tokio::time::sleep(Duration::from_secs(1)).await; // skip the first
processor.on_end(new_test_export_span_data());
processor.on_end(&mut new_test_export_span_data());
let flush_res = processor.force_flush();
if time_out {
assert!(flush_res.is_err());
Expand Down
2 changes: 1 addition & 1 deletion stress/src/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl SpanProcessor for NoOpSpanProcessor {
// No-op
}

fn on_end(&self, _span: SpanData) {
fn on_end(&self, _span: &mut SpanData) {
// No-op
}

Expand Down