diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index b70fecfeb5..4b5fde6d3e 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -322,7 +322,7 @@ impl OtlpHttpClient { logs: LogBatch<'_>, ) -> opentelemetry_sdk::logs::LogResult<(Vec, &'static str)> { use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; - let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource); + let resource_logs = group_logs_by_resource_and_scope(&logs, &self.resource); let req = ExportLogsServiceRequest { resource_logs }; match self.protocol { diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index cbcf5284b3..21042f1070 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -12,6 +12,8 @@ use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scop use super::BoxInterceptor; +use super::retry::{retry_with_exponential_backoff, RetryPolicy}; + pub(crate) struct TonicLogsClient { inner: Mutex>, #[allow(dead_code)] @@ -69,19 +71,56 @@ impl LogExporter for TonicLogsClient { None => return Err(OTelSdkError::AlreadyShutdown), }; - let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource); + let resource_logs = group_logs_by_resource_and_scope(&batch, &self.resource); otel_debug!(name: "TonicsLogsClient.CallingExport"); - client + // First attempt without retry + let result = client .export(Request::from_parts( - metadata, - extensions, - ExportLogsServiceRequest { resource_logs }, + metadata.clone(), + extensions.clone(), + ExportLogsServiceRequest { + resource_logs: resource_logs.clone() + }, )) - .await - .map_err(|e| OTelSdkError::InternalFailure(format!("export error: {:?}", e)))?; - Ok(()) + .await; + + // If the first attempt succeeds, return success + if result.is_ok() { + return Ok(()); + } + + // If the first attempt fails, try with retry + otel_debug!(name: "TonicsLogsClient.FirstAttemptFailed.Retrying"); + + let policy = RetryPolicy { + max_retries: 10, + initial_delay_ms: 100, + max_delay_ms: 1600, + jitter_ms: 100, + }; + + // Now use retry_with_exponential_backoff for subsequent attempts + retry_with_exponential_backoff( + policy, + "TonicsLogsClient.export", + || async { + client + .clone() + .export(Request::from_parts( + metadata.clone(), + extensions.clone(), + ExportLogsServiceRequest { + resource_logs: resource_logs.clone() + }, + )) + .await + .map_err(|e| OTelSdkError::InternalFailure(format!("export error: {:?}", e))) + } + ) + .await + .map(|_| ()) // Convert successful response to () as required by OTelSdkResult } fn shutdown(&self) -> OTelSdkResult { diff --git a/opentelemetry-otlp/src/exporter/tonic/mod.rs b/opentelemetry-otlp/src/exporter/tonic/mod.rs index 140c17d534..3d5ff5459a 100644 --- a/opentelemetry-otlp/src/exporter/tonic/mod.rs +++ b/opentelemetry-otlp/src/exporter/tonic/mod.rs @@ -28,6 +28,11 @@ pub(crate) mod metrics; #[cfg(feature = "trace")] pub(crate) mod trace; +// For now, we are not exposing the retry policy. Only work with grpc-tonic since retry takes a hard dependency on tokio +// while we sort out an abstraction for the async runtime which can be used by all exporters. +#[cfg(feature = "grpc-tonic")] +mod retry; + /// Configuration for [tonic] /// /// [tonic]: https://github.com/hyperium/tonic @@ -498,9 +503,6 @@ mod tests { #[test] #[cfg(feature = "gzip-tonic")] fn test_with_gzip_compression() { - // metadata should merge with the current one with priority instead of just replacing it - let mut metadata = MetadataMap::new(); - metadata.insert("foo", "bar".parse().unwrap()); let builder = TonicExporterBuilder::default().with_compression(Compression::Gzip); assert_eq!(builder.tonic_config.compression.unwrap(), Compression::Gzip); } diff --git a/opentelemetry-otlp/src/exporter/tonic/retry.rs b/opentelemetry-otlp/src/exporter/tonic/retry.rs new file mode 100644 index 0000000000..d9a283a463 --- /dev/null +++ b/opentelemetry-otlp/src/exporter/tonic/retry.rs @@ -0,0 +1,229 @@ +//! This module provides functionality for retrying operations with exponential backoff and jitter. +//! +//! The `RetryPolicy` struct defines the configuration for the retry behavior, including the maximum +//! number of retries, initial delay, maximum delay, and jitter. +//! +//! The `Sleep` trait abstracts the sleep functionality, allowing different implementations for +//! various async runtimes such as Tokio and async-std, as well as a synchronous implementation. +//! +//! The `retry_with_exponential_backoff` function retries the given operation according to the +//! specified retry policy, using exponential backoff and jitter to determine the delay between +//! retries. The function logs errors and retries the operation until it succeeds or the maximum +//! number of retries is reached. + +use std::future::Future; +use std::time::{Duration, SystemTime}; +use opentelemetry::otel_warn; + +/// Configuration for retry policy. +#[derive(Debug)] +pub(super) struct RetryPolicy { + /// Maximum number of retry attempts. + pub max_retries: usize, + /// Initial delay in milliseconds before the first retry. + pub initial_delay_ms: u64, + /// Maximum delay in milliseconds between retries. + pub max_delay_ms: u64, + /// Maximum jitter in milliseconds to add to the delay. + pub jitter_ms: u64, +} + +// Generates a random jitter value up to max_jitter +fn generate_jitter(max_jitter: u64) -> u64 { + let now = SystemTime::now(); + let nanos = now.duration_since(SystemTime::UNIX_EPOCH).unwrap().subsec_nanos(); + nanos as u64 % (max_jitter + 1) +} + +// /// Trait to abstract the sleep functionality. +// pub trait Sleep { +// /// The future returned by the sleep function. +// type SleepFuture: Future; + +// /// Sleeps for the specified duration. +// fn sleep(duration: Duration) -> Self::SleepFuture; +// } + +// /// Implementation of the Sleep trait for tokio::time::Sleep +// #[cfg(feature = "rt-tokio")] +// impl Sleep for tokio::time::Sleep { +// type SleepFuture = tokio::time::Sleep; + +// fn sleep(duration: Duration) -> Self::SleepFuture { +// } +// } + +// #[cfg(feature = "rt-async-std")] +// /// There is no direct equivalent to `tokio::time::Sleep` in `async-std`. +// /// Instead, we create a new struct `AsyncStdSleep` and implement the `Sleep` +// /// trait for it, boxing the future returned by `async_std::task::sleep` to fit +// /// the trait's associated type requirements. +// #[derive(Debug)] +// pub struct AsyncStdSleep; + +// /// Implementation of the Sleep trait for async-std +// #[cfg(feature = "rt-async-std")] +// impl Sleep for AsyncStdSleep { +// type SleepFuture = Pin + Send>>; + +// fn sleep(duration: Duration) -> Self::SleepFuture { +// Box::pin(async_std::task::sleep(duration)) +// } +// } + +// /// Implement the Sleep trait for synchronous sleep +// #[derive(Debug)] +// pub struct StdSleep; + +// impl Sleep for StdSleep { +// type SleepFuture = std::future::Ready<()>; + +// fn sleep(duration: Duration) -> Self::SleepFuture { +// std::thread::sleep(duration); +// std::future::ready(()) +// } +// } + +/// Retries the given operation with exponential backoff and jitter. +/// +/// # Arguments +/// +/// * `policy` - The retry policy configuration. +/// * `operation_name` - The name of the operation being retried. +/// * `operation` - The operation to be retried. +/// +/// # Returns +/// +/// A `Result` containing the operation's result or an error if the maximum retries are reached. +pub(super) async fn retry_with_exponential_backoff( + policy: RetryPolicy, + operation_name: &str, + mut operation: F, +) -> Result +where + F: FnMut() -> Fut, + E: std::fmt::Debug, + Fut: Future>, +{ + let mut attempt = 0; + let mut delay = policy.initial_delay_ms; + + loop { + match operation().await { + Ok(result) => return Ok(result), // Return the result if the operation succeeds + Err(err) if attempt < policy.max_retries => { + attempt += 1; + // Log the error and retry after a delay with jitter + otel_warn!(name: "OtlpRetry", message = format!("Retrying operation {:?} due to error: {:?}", operation_name, err)); + let jitter = generate_jitter(policy.jitter_ms); + let delay_with_jitter = std::cmp::min(delay + jitter, policy.max_delay_ms); + + // Retry currently only supports tokio::time::sleep (for use with gRPC/tonic). This + // should be replaced with a more generic sleep function that works with async-std + // and a synchronous runtime in the future. + tokio::time::sleep(Duration::from_millis(delay_with_jitter)).await; + + delay = std::cmp::min(delay * 2, policy.max_delay_ms); // Exponential backoff + } + Err(err) => return Err(err), // Return the error if max retries are reached + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio::time::timeout; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::time::Duration; + + // Test to ensure generate_jitter returns a value within the expected range + #[tokio::test] + async fn test_generate_jitter() { + let max_jitter = 100; + let jitter = generate_jitter(max_jitter); + assert!(jitter <= max_jitter); + } + + // Test to ensure retry_with_exponential_backoff succeeds on the first attempt + #[tokio::test] + async fn test_retry_with_exponential_backoff_success() { + let policy = RetryPolicy { + max_retries: 3, + initial_delay_ms: 100, + max_delay_ms: 1600, + jitter_ms: 100, + }; + + let result = retry_with_exponential_backoff(policy, "test_operation", || { + Box::pin(async { Ok::<_, ()>("success") }) + }).await; + + assert_eq!(result, Ok("success")); + } + + // Test to ensure retry_with_exponential_backoff retries the operation and eventually succeeds + #[tokio::test] + async fn test_retry_with_exponential_backoff_retries() { + let policy = RetryPolicy { + max_retries: 3, + initial_delay_ms: 100, + max_delay_ms: 1600, + jitter_ms: 100, + }; + + let attempts = AtomicUsize::new(0); + + let result = retry_with_exponential_backoff(policy, "test_operation", || { + let attempt = attempts.fetch_add(1, Ordering::SeqCst); + Box::pin(async move { + if attempt < 2 { + Err::<&str, &str>("error") // Fail the first two attempts + } else { + Ok::<&str, &str>("success") // Succeed on the third attempt + } + }) + }).await; + + assert_eq!(result, Ok("success")); + assert_eq!(attempts.load(Ordering::SeqCst), 3); // Ensure there were 3 attempts + } + + // Test to ensure retry_with_exponential_backoff fails after max retries + #[tokio::test] + async fn test_retry_with_exponential_backoff_failure() { + let policy = RetryPolicy { + max_retries: 3, + initial_delay_ms: 100, + max_delay_ms: 1600, + jitter_ms: 100, + }; + + let attempts = AtomicUsize::new(0); + + let result = retry_with_exponential_backoff(policy, "test_operation", || { + attempts.fetch_add(1, Ordering::SeqCst); + Box::pin(async { Err::<(), _>("error") }) // Always fail + }).await; + + assert_eq!(result, Err("error")); + assert_eq!(attempts.load(Ordering::SeqCst), 4); // Ensure there were 4 attempts (initial + 3 retries) + } + + // Test to ensure retry_with_exponential_backoff respects the timeout + #[tokio::test] + async fn test_retry_with_exponential_backoff_timeout() { + let policy = RetryPolicy { + max_retries: 12, // Increase the number of retries + initial_delay_ms: 100, + max_delay_ms: 1600, + jitter_ms: 100, + }; + + let result = timeout(Duration::from_secs(1), retry_with_exponential_backoff(policy, "test_operation", || { + Box::pin(async { Err::<(), _>("error") }) // Always fail + })).await; + + assert!(result.is_err()); // Ensure the operation times out + } +} \ No newline at end of file diff --git a/opentelemetry-otlp/tests/integration_test/src/logs_asserter.rs b/opentelemetry-otlp/tests/integration_test/src/logs_asserter.rs index 2c9339c6b4..9daccbea66 100644 --- a/opentelemetry-otlp/tests/integration_test/src/logs_asserter.rs +++ b/opentelemetry-otlp/tests/integration_test/src/logs_asserter.rs @@ -1,8 +1,5 @@ use anyhow::Result; -use opentelemetry_proto::tonic::{ - common::v1::KeyValue, - logs::v1::{LogRecord, LogsData, ResourceLogs}, -}; +use opentelemetry_proto::tonic::logs::v1::{LogRecord, LogsData, ResourceLogs}; use std::fs::File; // Given two ResourceLogs, assert that they are equal except for the timestamps diff --git a/opentelemetry-otlp/tests/integration_test/src/test_utils.rs b/opentelemetry-otlp/tests/integration_test/src/test_utils.rs index 5dd8ac5dd4..cecab22974 100644 --- a/opentelemetry-otlp/tests/integration_test/src/test_utils.rs +++ b/opentelemetry-otlp/tests/integration_test/src/test_utils.rs @@ -13,7 +13,7 @@ //! Only a single test suite can run at once, as each container has statically mapped ports, but //! this works nicely with the way cargo executes the suite. //! -//! To skip integration tests with cargo, you can run `cargo test --mod`, which will run unit tests +//! To skip integration tests with cargo, you can run `cargo test --lib`, which will run unit tests //! only. //! #![cfg(unix)] diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index f1a992fc9a..458979cd86 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -164,8 +164,8 @@ pub mod tonic { } } - pub fn group_logs_by_resource_and_scope( - logs: LogBatch<'_>, + pub fn group_logs_by_resource_and_scope<'a>( + logs: &'a LogBatch<'a>, resource: &ResourceAttributesWithSchema, ) -> Vec { // Group logs by target or instrumentation name @@ -273,7 +273,7 @@ mod tests { let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema let grouped_logs = - crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource); + crate::transform::logs::tonic::group_logs_by_resource_and_scope(&log_batch, &resource); assert_eq!(grouped_logs.len(), 1); let resource_logs = &grouped_logs[0]; @@ -293,7 +293,7 @@ mod tests { let log_batch = LogBatch::new(&logs); let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema let grouped_logs = - crate::transform::logs::tonic::group_logs_by_resource_and_scope(log_batch, &resource); + crate::transform::logs::tonic::group_logs_by_resource_and_scope(&log_batch, &resource); assert_eq!(grouped_logs.len(), 1); let resource_logs = &grouped_logs[0];