diff --git a/examples/actix-http/src/main.rs b/examples/actix-http/src/main.rs index 77673f500d..e7ee6f08e8 100644 --- a/examples/actix-http/src/main.rs +++ b/examples/actix-http/src/main.rs @@ -10,8 +10,8 @@ use opentelemetry::{ }; fn init_tracer() -> Result { - opentelemetry_jaeger::new_pipeline() - .with_collector_endpoint("http://127.0.0.1:14268/api/traces") + opentelemetry_jaeger::new_collector_pipeline() + .with_endpoint("http://127.0.0.1:14268/api/traces") .with_service_name("trace-http-demo") .install_batch(opentelemetry::runtime::TokioCurrentThread) } diff --git a/examples/actix-udp/src/main.rs b/examples/actix-udp/src/main.rs index 02a0348b5c..2a741b92a1 100644 --- a/examples/actix-udp/src/main.rs +++ b/examples/actix-udp/src/main.rs @@ -9,8 +9,8 @@ use opentelemetry::{ }; fn init_tracer() -> Result { - opentelemetry_jaeger::new_pipeline() - .with_agent_endpoint("localhost:6831") + opentelemetry_jaeger::new_agent_pipeline() + .with_endpoint("localhost:6831") .with_service_name("trace-udp-demo") .with_trace_config(opentelemetry::sdk::trace::config().with_resource( opentelemetry::sdk::Resource::new(vec![ diff --git a/examples/async/src/main.rs b/examples/async/src/main.rs index 776f456fd3..fdc892e28b 100644 --- a/examples/async/src/main.rs +++ b/examples/async/src/main.rs @@ -54,7 +54,7 @@ async fn run(addr: &SocketAddr) -> io::Result { } fn init_tracer() -> Result { - opentelemetry_jaeger::new_pipeline() + opentelemetry_jaeger::new_agent_pipeline() .with_service_name("trace-demo") .install_batch(opentelemetry::runtime::Tokio) } diff --git a/examples/basic/src/main.rs b/examples/basic/src/main.rs index 73aeed8747..1ef5e22bce 100644 --- a/examples/basic/src/main.rs +++ b/examples/basic/src/main.rs @@ -14,7 +14,7 @@ use std::error::Error; use std::time::Duration; fn init_tracer() -> Result { - opentelemetry_jaeger::new_pipeline() + opentelemetry_jaeger::new_agent_pipeline() .with_service_name("trace-demo") .with_trace_config(Config::default().with_resource(Resource::new(vec![ KeyValue::new("service.name", "new_service"), diff --git a/examples/grpc/src/client.rs b/examples/grpc/src/client.rs index 18bab3c3d3..df2e65ab14 100644 --- a/examples/grpc/src/client.rs +++ b/examples/grpc/src/client.rs @@ -30,7 +30,7 @@ pub mod hello_world { fn tracing_init() -> TraceResult { global::set_text_map_propagator(TraceContextPropagator::new()); - opentelemetry_jaeger::new_pipeline() + opentelemetry_jaeger::new_agent_pipeline() .with_service_name("grpc-client") .install_simple() } diff --git a/examples/grpc/src/server.rs b/examples/grpc/src/server.rs index d48057fff1..b4f02b5e2e 100644 --- a/examples/grpc/src/server.rs +++ b/examples/grpc/src/server.rs @@ -61,7 +61,7 @@ impl Greeter for MyGreeter { fn tracing_init() -> Result { global::set_text_map_propagator(TraceContextPropagator::new()); - opentelemetry_jaeger::new_pipeline() + opentelemetry_jaeger::new_agent_pipeline() .with_service_name("grpc-server") .install_simple() } diff --git a/examples/multiple-span-processors/src/main.rs b/examples/multiple-span-processors/src/main.rs index e0a502124c..354ec010f3 100644 --- a/examples/multiple-span-processors/src/main.rs +++ b/examples/multiple-span-processors/src/main.rs @@ -10,13 +10,13 @@ use std::time::Duration; fn init_tracer() -> Result<(), TraceError> { // build a jaeger batch span processor let jaeger_processor = BatchSpanProcessor::builder( - opentelemetry_jaeger::new_pipeline() + opentelemetry_jaeger::new_agent_pipeline() .with_service_name("trace-demo") .with_trace_config( Config::default() .with_resource(Resource::new(vec![KeyValue::new("exporter", "jaeger")])), ) - .init_async_exporter(opentelemetry::runtime::Tokio)?, + .build_async_agent_exporter(opentelemetry::runtime::Tokio)?, opentelemetry::runtime::Tokio, ) .build(); diff --git a/opentelemetry-jaeger/README.md b/opentelemetry-jaeger/README.md index 03e281db1a..9124c7c5b3 100644 --- a/opentelemetry-jaeger/README.md +++ b/opentelemetry-jaeger/README.md @@ -47,7 +47,7 @@ use opentelemetry::trace::Tracer; fn main() -> Result<(), Box> { global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); - let tracer = opentelemetry_jaeger::new_pipeline().install_simple()?; + let tracer = opentelemetry_jaeger::new_agent_pipeline().install_simple()?; tracer.in_span("doing_work", |cx| { // Traced app logic here... @@ -76,7 +76,7 @@ opentelemetry-jaeger = { version = "*", features = ["rt-tokio"] } ``` ```rust -let tracer = opentelemetry_jaeger::new_pipeline() +let tracer = opentelemetry_jaeger::new_agent_pipeline() .install_batch(opentelemetry::runtime::Tokio)?; ``` @@ -120,11 +120,11 @@ Then you can use the [`with_collector_endpoint`] method to specify the endpoint: use opentelemetry::trace::Tracer; fn main() -> Result<(), Box> { - let tracer = opentelemetry_jaeger::new_pipeline() - .with_collector_endpoint("http://localhost:14268/api/traces") + let tracer = opentelemetry_jaeger::new_collector_pipeline() + .with_endpoint("http://localhost:14268/api/traces") // optionally set username and password as well. - .with_collector_username("username") - .with_collector_password("s3cr3t") + .with_username("username") + .with_password("s3cr3t") .install_batch()?; tracer.in_span("doing_work", |cx| { diff --git a/opentelemetry-jaeger/src/exporter/agent.rs b/opentelemetry-jaeger/src/exporter/agent.rs index d0842f31fb..aeba755fdd 100644 --- a/opentelemetry-jaeger/src/exporter/agent.rs +++ b/opentelemetry-jaeger/src/exporter/agent.rs @@ -12,9 +12,6 @@ use thrift::{ transport::{ReadHalf, TIoChannel, WriteHalf}, }; -/// The max size of UDP packet we want to send, synced with jaeger-agent -const UDP_PACKET_MAX_LENGTH: usize = 65_000; - struct BufferClient { buffer: ReadHalf, client: agent::AgentSyncClient< @@ -47,10 +44,9 @@ impl AgentSyncClientUdp { /// Create a new UDP agent client pub(crate) fn new( host_port: T, - max_packet_size: Option, + max_packet_size: usize, auto_split: bool, ) -> thrift::Result { - let max_packet_size = max_packet_size.unwrap_or(UDP_PACKET_MAX_LENGTH); let (buffer, write) = TBufferChannel::with_capacity(max_packet_size).split()?; let client = agent::AgentSyncClient::new( TCompactInputProtocol::new(TNoopChannel), @@ -106,11 +102,10 @@ impl AgentAsyncClientUdp { /// Create a new UDP agent client pub(crate) fn new( host_port: T, - max_packet_size: Option, + max_packet_size: usize, runtime: R, auto_split: bool, ) -> thrift::Result { - let max_packet_size = max_packet_size.unwrap_or(UDP_PACKET_MAX_LENGTH); let (buffer, write) = TBufferChannel::with_capacity(max_packet_size).split()?; let client = agent::AgentSyncClient::new( TCompactInputProtocol::new(TNoopChannel), diff --git a/opentelemetry-jaeger/src/exporter/collector.rs b/opentelemetry-jaeger/src/exporter/collector.rs index 09feceb31a..3c483d81a3 100644 --- a/opentelemetry-jaeger/src/exporter/collector.rs +++ b/opentelemetry-jaeger/src/exporter/collector.rs @@ -1,26 +1,14 @@ //! # HTTP Jaeger Collector Client +//! +#[cfg(feature = "collector_client")] use http::Uri; #[cfg(feature = "collector_client")] use opentelemetry_http::{HttpClient, ResponseExt as _}; -use std::sync::atomic::AtomicUsize; - -/// `CollectorAsyncClientHttp` implements an async version of the -/// `TCollectorSyncClient` interface over HTTP -#[derive(Debug)] -pub(crate) struct CollectorAsyncClientHttp { - endpoint: Uri, - #[cfg(feature = "collector_client")] - client: Box, - #[cfg(all(feature = "wasm_collector_client", not(feature = "collector_client")))] - client: WasmHttpClient, - payload_size_estimate: AtomicUsize, -} +#[cfg(feature = "collector_client")] +pub(crate) use collector_client::AsyncHttpClient; #[cfg(feature = "wasm_collector_client")] -#[derive(Debug)] -struct WasmHttpClient { - _auth: Option, -} +pub(crate) use wasm_collector_client::WasmCollector; #[cfg(feature = "collector_client")] mod collector_client { @@ -31,14 +19,23 @@ mod collector_client { use std::sync::atomic::{AtomicUsize, Ordering}; use thrift::protocol::TBinaryOutputProtocol; - impl CollectorAsyncClientHttp { + /// `AsyncHttpClient` implements an async version of the + /// `TCollectorSyncClient` interface over HTTP + #[derive(Debug)] + pub(crate) struct AsyncHttpClient { + endpoint: Uri, + http_client: Box, + payload_size_estimate: AtomicUsize, + } + + impl AsyncHttpClient { /// Create a new HTTP collector client pub(crate) fn new(endpoint: Uri, client: Box) -> Self { let payload_size_estimate = AtomicUsize::new(512); - CollectorAsyncClientHttp { + AsyncHttpClient { endpoint, - client, + http_client: client, payload_size_estimate, } } @@ -68,15 +65,14 @@ mod collector_client { .expect("request should always be valid"); // Send request to collector - let _ = self.client.send(req).await?.error_for_status()?; + let _ = self.http_client.send(req).await?.error_for_status()?; Ok(()) } } } -#[cfg(all(feature = "wasm_collector_client", not(feature = "collector_client")))] +#[cfg(feature = "wasm_collector_client")] mod wasm_collector_client { - use super::*; use crate::exporter::thrift::jaeger; use futures_util::future; use http::Uri; @@ -91,7 +87,19 @@ mod wasm_collector_client { use wasm_bindgen_futures::JsFuture; use web_sys::{Request, RequestCredentials, RequestInit, RequestMode, Response}; - impl CollectorAsyncClientHttp { + #[derive(Debug)] + pub(crate) struct WasmCollector { + endpoint: Uri, + payload_size_estimate: AtomicUsize, + client: WasmHttpClient, + } + + #[derive(Debug, Default)] + struct WasmHttpClient { + auth: Option, + } + + impl WasmCollector { /// Create a new HTTP collector client pub(crate) fn new( endpoint: Uri, @@ -111,7 +119,7 @@ mod wasm_collector_client { Ok(Self { endpoint, - client: WasmHttpClient { _auth: auth }, + client: WasmHttpClient { auth }, payload_size_estimate, }) } diff --git a/opentelemetry-jaeger/src/exporter/config/agent.rs b/opentelemetry-jaeger/src/exporter/config/agent.rs new file mode 100644 index 0000000000..ba6c6fba63 --- /dev/null +++ b/opentelemetry-jaeger/src/exporter/config/agent.rs @@ -0,0 +1,342 @@ +use crate::exporter::agent::{AgentAsyncClientUdp, AgentSyncClientUdp}; +use crate::exporter::config::{ + build_config_and_process, install_tracer_provider_and_get_tracer, HasRequiredConfig, + TransformationConfig, +}; +use crate::exporter::uploader::{AsyncUploader, SyncUploader, Uploader}; +use crate::{Error, Exporter, JaegerTraceRuntime}; +use opentelemetry::sdk; +use opentelemetry::sdk::trace::{Config, TracerProvider}; +use opentelemetry::trace::TraceError; +use std::borrow::BorrowMut; +use std::{env, net}; + +/// The max size of UDP packet we want to send, synced with jaeger-agent +const UDP_PACKET_MAX_LENGTH: usize = 65_000; + +/// The hostname for the Jaeger agent. +/// e.g. "localhost" +const ENV_AGENT_HOST: &str = "OTEL_EXPORTER_JAEGER_AGENT_HOST"; + +/// The port for the Jaeger agent. +/// e.g. 6832 +const ENV_AGENT_PORT: &str = "OTEL_EXPORTER_JAEGER_AGENT_PORT"; + +/// Default agent endpoint if none is provided +const DEFAULT_AGENT_ENDPOINT: &str = "127.0.0.1:6831"; + +/// AgentPipeline config and build a exporter targeting a jaeger agent using UDP as transport layer protocol. +/// +/// ## UDP packet max length +/// The exporter will use UDP to communicate with the agent. Depends on your platform, UDP protocol +/// may cut off if the packet is too long. See [UDP packet size] for details. Users can utilise [`with_max_packet_size`] +/// and [`with_auto_split_batch`] to avoid spans don't lose because the packet is too long. +/// +/// [`with_auto_split_batch`]: AgentPipeline::with_auto_split_batch +/// [`with_max_packet_size`]: AgentPipeline::with_max_packet_size +/// [UDP packet size]: https://stackoverflow.com/questions/1098897/what-is-the-largest-safe-udp-packet-size-on-the-internet +/// +/// ## Environment variables +/// The following environment variables are available to configure the agent exporter. +/// +/// - `OTEL_EXPORTER_JAEGER_AGENT_HOST`, set the host of the agent. If the `OTEL_EXPORTER_JAEGER_AGENT_HOST` +/// is not set, the value will be ignored. +/// - `OTEL_EXPORTER_JAEGER_AGENT_PORT`, set the port of the agent. If the `OTEL_EXPORTER_JAEGER_AGENT_HOST` +/// is not set, the exporter will use 127.0.0.1 as the host. +#[derive(Debug)] +pub struct AgentPipeline { + transformation_config: TransformationConfig, + trace_config: Option, + agent_endpoint: Result, crate::Error>, + max_packet_size: usize, + auto_split_batch: bool, +} + +impl Default for AgentPipeline { + fn default() -> Self { + let mut pipeline = AgentPipeline { + transformation_config: Default::default(), + trace_config: Default::default(), + agent_endpoint: Ok(vec![DEFAULT_AGENT_ENDPOINT.parse().unwrap()]), + max_packet_size: UDP_PACKET_MAX_LENGTH, + auto_split_batch: false, + }; + + if let (Ok(host), Ok(port)) = (env::var(ENV_AGENT_HOST), env::var(ENV_AGENT_PORT)) { + pipeline = pipeline.with_endpoint(format!("{}:{}", host.trim(), port.trim())); + } else if let Ok(port) = env::var(ENV_AGENT_PORT) { + pipeline = pipeline.with_endpoint(format!("127.0.0.1:{}", port.trim())) + } + pipeline + } +} + +// implement the seal trait +impl HasRequiredConfig for AgentPipeline { + fn set_transformation_config(&mut self, f: T) + where + T: FnOnce(&mut TransformationConfig), + { + f(self.transformation_config.borrow_mut()) + } + + fn set_trace_config(&mut self, config: Config) { + self.trace_config = Some(config) + } +} + +/// Start a new pipeline to configure a exporter that target a jaeger agent. +/// +/// See details for each configurations at [`AgentPipeline`] +/// +/// [`AgentPipeline`]: crate::config::agent::AgentPipeline +pub fn new_agent_pipeline() -> AgentPipeline { + AgentPipeline::default() +} + +impl AgentPipeline { + /// set the endpoint of the agent. + /// + /// It usually composed by host ip and the port number. + /// Any valid socket address can be used. + /// + /// Default to be `127.0.0.1:6831`. + pub fn with_endpoint(self, agent_endpoint: T) -> Self { + AgentPipeline { + agent_endpoint: agent_endpoint + .to_socket_addrs() + .map(|addrs| addrs.collect()) + .map_err(|io_err| crate::Error::ConfigError { + pipeline_name: "agent", + config_name: "endpoint", + reason: io_err.to_string(), + }), + ..self + } + } + + /// Assign the max packet size in bytes. + /// + /// If the application is generating a lot of spans or each spans contains a lot of events/tags + /// it can result in spans loss because of the UDP size limit. Increase the `max_packet_size` can medicate the problem. + /// + /// Default to be `65000`. + pub fn with_max_packet_size(self, max_packet_size: usize) -> Self { + AgentPipeline { + max_packet_size, + ..self + } + } + + /// Config whether to auto split batches. + /// + /// When auto split is set to `true`, the exporter will try to split the + /// batch into smaller ones so that there will be minimal data loss. It + /// will impact the performance. + /// + /// Note that if one span is too large to export, other spans within the + /// same batch may or may not be exported. In this case, exporter will + /// return errors as we cannot split spans. + /// + /// Default to be `false`. + pub fn with_auto_split_batch(mut self, should_auto_split: bool) -> Self { + self.auto_split_batch = should_auto_split; + self + } + + /// Set the service name of the application. It generally is the name of application. + /// Critically, Jaeger backend depends on `Span.Process.ServiceName` to identify the service + /// that produced the spans. + /// + /// Opentelemetry allows set the service name using multiple methods. + /// This functions takes priority over all other methods. + /// + /// If the service name is not set. It will default to be `unknown_service`. + pub fn with_service_name>(mut self, service_name: T) -> Self { + self.set_transformation_config(|mut config| { + config.service_name = Some(service_name.into()); + }); + self + } + + /// Config whether to export information of instrumentation library. + /// + /// It's required to [report instrumentation library as span tags]. + /// However it does have a overhead on performance, performance sensitive applications can + /// use this function to opt out reporting instrumentation library. + /// + /// Default to be `true`. + /// + /// [report instrumentation library as span tags]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk_exporters/non-otlp.md#instrumentationscope + pub fn with_instrumentation_library_tags(mut self, should_export: bool) -> Self { + self.set_transformation_config(|mut config| { + config.export_instrument_library = should_export; + }); + self + } + + /// Assign the opentelemetry SDK configurations for the exporter pipeline. + /// + /// For mapping between opentelemetry configurations and Jaeger spans. Please refer [the spec]. + /// + /// [the spec]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk_exporters/jaeger.md#mappings + /// # Examples + /// Set service name via resource. + /// ```rust + /// use opentelemetry::{sdk::{self, Resource}, KeyValue}; + /// + /// let pipeline = opentelemetry_jaeger::new_agent_pipeline() + /// .with_trace_config( + /// sdk::trace::Config::default() + /// .with_resource(Resource::new(vec![KeyValue::new("service.name", "my-service")])) + /// ); + /// + /// ``` + pub fn with_trace_config(mut self, config: sdk::trace::Config) -> Self { + self.set_trace_config(config); + self + } + + /// Build a `TracerProvider` using a blocking exporter and configurations from the pipeline. + /// + /// The exporter will send each span to the agent upon the span ends. + pub fn build_simple(mut self) -> Result { + let mut builder = sdk::trace::TracerProvider::builder(); + + let (config, process) = build_config_and_process( + builder.sdk_provided_resource(), + self.trace_config.take(), + self.transformation_config.service_name.take(), + ); + let exporter = Exporter::new( + process.into(), + self.transformation_config.export_instrument_library, + self.build_sync_agent_uploader()?, + ); + + builder = builder.with_simple_exporter(exporter); + builder = builder.with_config(config); + + Ok(builder.build()) + } + + /// Build a `TracerProvider` using a async exporter and configurations from the pipeline. + /// + /// The exporter will collect spans in a batch and send them to the agent. + /// + /// It's possible to lose spans up to a batch when the application shuts down. So users should + /// use [`shut_down_tracer_provider`] to block the shut down process until + /// all remaining spans have been sent. + /// + /// Commonly used runtime are provided via `rt-tokio`, `rt-tokio-current-thread`, `rt-async-std` + /// features. + /// + /// [`shut_down_tracer_provider`]: opentelemetry::global::shutdown_tracer_provider + pub fn build_batch(mut self, runtime: R) -> Result + where + R: JaegerTraceRuntime, + { + let mut builder = sdk::trace::TracerProvider::builder(); + + let export_instrument_library = self.transformation_config.export_instrument_library; + // build sdk trace config and jaeger process. + // some attributes like service name has attributes like service name + let (config, process) = build_config_and_process( + builder.sdk_provided_resource(), + self.trace_config.take(), + self.transformation_config.service_name.take(), + ); + let uploader = self.build_async_agent_uploader(runtime.clone())?; + let exporter = Exporter::new(process.into(), export_instrument_library, uploader); + + builder = builder.with_batch_exporter(exporter, runtime); + builder = builder.with_config(config); + + Ok(builder.build()) + } + + /// Similar to [`build_simple`][AgentPipeline::build_simple] but also returns a tracer from the + /// tracer provider. + /// + /// The tracer name is `opentelemetry-jaeger`. The tracer version will be the version of this crate. + pub fn install_simple(self) -> Result { + let tracer_provider = self.build_simple()?; + install_tracer_provider_and_get_tracer(tracer_provider) + } + + /// Similar to [`build_batch`][AgentPipeline::build_batch] but also returns a tracer from the + /// tracer provider. + /// + /// The tracer name is `opentelemetry-jaeger`. The tracer version will be the version of this crate. + pub fn install_batch(self, runtime: R) -> Result + where + R: JaegerTraceRuntime, + { + let tracer_provider = self.build_batch(runtime)?; + install_tracer_provider_and_get_tracer(tracer_provider) + } + + /// Build an jaeger exporter targeting a jaeger agent and running on the async runtime. + pub fn build_async_agent_exporter( + mut self, + runtime: R, + ) -> Result + where + R: JaegerTraceRuntime, + { + let builder = sdk::trace::TracerProvider::builder(); + let export_instrument_library = self.transformation_config.export_instrument_library; + // build sdk trace config and jaeger process. + // some attributes like service name has attributes like service name + let (_, process) = build_config_and_process( + builder.sdk_provided_resource(), + self.trace_config.take(), + self.transformation_config.service_name.take(), + ); + let uploader = self.build_async_agent_uploader(runtime)?; + Ok(Exporter::new( + process.into(), + export_instrument_library, + uploader, + )) + } + + /// Build an jaeger exporter targeting a jaeger agent and running on the sync runtime. + pub fn build_sync_agent_exporter(mut self) -> Result { + let builder = sdk::trace::TracerProvider::builder(); + let (_, process) = build_config_and_process( + builder.sdk_provided_resource(), + self.trace_config.take(), + self.transformation_config.service_name.take(), + ); + Ok(Exporter::new( + process.into(), + self.transformation_config.export_instrument_library, + self.build_sync_agent_uploader()?, + )) + } + + fn build_async_agent_uploader(self, runtime: R) -> Result, TraceError> + where + R: JaegerTraceRuntime, + { + let agent = AgentAsyncClientUdp::new( + self.agent_endpoint?.as_slice(), + self.max_packet_size, + runtime, + self.auto_split_batch, + ) + .map_err::(Into::into)?; + Ok(Box::new(AsyncUploader::Agent(agent))) + } + + fn build_sync_agent_uploader(self) -> Result, TraceError> { + let agent = AgentSyncClientUdp::new( + self.agent_endpoint?.as_slice(), + self.max_packet_size, + self.auto_split_batch, + ) + .map_err::(Into::into)?; + Ok(Box::new(SyncUploader::Agent(agent))) + } +} diff --git a/opentelemetry-jaeger/src/exporter/config/collector/http_client.rs b/opentelemetry-jaeger/src/exporter/config/collector/http_client.rs new file mode 100644 index 0000000000..0fba8667cf --- /dev/null +++ b/opentelemetry-jaeger/src/exporter/config/collector/http_client.rs @@ -0,0 +1,194 @@ +use crate::Error::NoHttpClient; +#[cfg(feature = "surf_collector_client")] +use async_trait::async_trait; +#[cfg(any( + feature = "reqwest_blocking_collector_client", + feature = "reqwest_collector_client" +))] +use headers::authorization::Credentials; +#[cfg(feature = "isahc_collector_client")] +use isahc::config::Configurable; +use opentelemetry_http::HttpClient as OtelHttpClient; +#[cfg(feature = "surf_collector_client")] +use std::convert::TryInto; +use std::time::Duration; + +#[derive(Debug)] +#[cfg(feature = "surf_collector_client")] +struct BasicAuthMiddleware(surf::http::auth::BasicAuth); + +#[async_trait] +#[cfg(feature = "surf_collector_client")] +impl surf::middleware::Middleware for BasicAuthMiddleware { + async fn handle( + &self, + mut req: surf::Request, + client: surf::Client, + next: surf::middleware::Next<'_>, + ) -> surf::Result { + req.insert_header(self.0.name(), self.0.value()); + next.run(req, client).await + } +} + +#[derive(Debug)] +pub(crate) enum CollectorHttpClient { + None, + Custom(Box), + #[cfg(feature = "isahc_collector_client")] + Isahc, + #[cfg(feature = "surf_collector_client")] + Surf, + #[cfg(feature = "reqwest_collector_client")] + Reqwest, + #[cfg(feature = "reqwest_blocking_collector_client")] + ReqwestBlocking, +} + +impl CollectorHttpClient { + // try to build a build in http client if users chose one. If none available return NoHttpClient error + #[allow(unused_variables)] // if the user enabled no build in client features. all parameters are unsed. + pub(crate) fn build_client( + self, + collector_username: Option, + collector_password: Option, + collector_timeout: Duration, + ) -> Result, crate::Error> { + match self { + CollectorHttpClient::Custom(client) => Ok(client), + CollectorHttpClient::None => Err(NoHttpClient), + #[cfg(feature = "isahc_collector_client")] + CollectorHttpClient::Isahc => { + let mut builder = isahc::HttpClient::builder().timeout(collector_timeout); + + if let (Some(username), Some(password)) = (collector_username, collector_password) { + builder = builder + .authentication(isahc::auth::Authentication::basic()) + .credentials(isahc::auth::Credentials::new(username, password)); + } + + Ok(Box::new(builder.build().map_err(|err| { + crate::Error::ThriftAgentError(::thrift::Error::from(std::io::Error::new( + std::io::ErrorKind::Other, + err.to_string(), + ))) + })?)) + } + #[cfg(feature = "surf_collector_client")] + CollectorHttpClient::Surf => { + let client: surf::Client = surf::Config::new() + .set_timeout(Some(collector_timeout)) + .try_into() + .map_err(|err| crate::Error::ConfigError { + pipeline_name: "collector", + config_name: "http_client", + reason: format!("cannot set timeout for surf client. {}", err), + })?; + + let client = if let (Some(username), Some(password)) = + (collector_username, collector_password) + { + let auth = surf::http::auth::BasicAuth::new(username, password); + client.with(BasicAuthMiddleware(auth)) + } else { + client + }; + + Ok(Box::new(client)) + } + #[cfg(feature = "reqwest_blocking_collector_client")] + CollectorHttpClient::ReqwestBlocking => { + let mut builder = + reqwest::blocking::ClientBuilder::new().timeout(collector_timeout); + if let (Some(username), Some(password)) = (collector_username, collector_password) { + let mut map = http::HeaderMap::with_capacity(1); + let auth_header_val = + headers::Authorization::basic(username.as_str(), password.as_str()); + map.insert(http::header::AUTHORIZATION, auth_header_val.0.encode()); + builder = builder.default_headers(map); + } + let client: Box = + Box::new(builder.build().map_err::(Into::into)?); + Ok(client) + } + #[cfg(feature = "reqwest_collector_client")] + CollectorHttpClient::Reqwest => { + let mut builder = reqwest::ClientBuilder::new().timeout(collector_timeout); + if let (Some(username), Some(password)) = (collector_username, collector_password) { + let mut map = http::HeaderMap::with_capacity(1); + let auth_header_val = + headers::Authorization::basic(username.as_str(), password.as_str()); + map.insert(http::header::AUTHORIZATION, auth_header_val.0.encode()); + builder = builder.default_headers(map); + } + let client: Box = + Box::new(builder.build().map_err::(Into::into)?); + Ok(client) + } + } + } +} + +#[cfg(test)] +pub(crate) mod test_http_client { + use async_trait::async_trait; + use bytes::Bytes; + use http::{Request, Response}; + use opentelemetry_http::{HttpClient, HttpError}; + use std::fmt::Debug; + + pub(crate) struct TestHttpClient; + + impl Debug for TestHttpClient { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("test http client") + } + } + + #[async_trait] + impl HttpClient for TestHttpClient { + async fn send(&self, _request: Request>) -> Result, HttpError> { + Err("wrong uri set in http client".into()) + } + } +} + +#[cfg(test)] +#[cfg(all(feature = "collector_client", feature = "rt-tokio"))] +mod collector_client_tests { + use crate::config::build_config_and_process; + use crate::config::collector::http_client::test_http_client; + use crate::exporter::thrift::jaeger::Batch; + use crate::new_collector_pipeline; + use opentelemetry::runtime::Tokio; + use opentelemetry::sdk::Resource; + use opentelemetry::trace::TraceError; + use opentelemetry::KeyValue; + + #[test] + fn test_bring_your_own_client() -> Result<(), TraceError> { + let invalid_uri_builder = new_collector_pipeline() + .with_endpoint("localhost:6831") + .with_http_client(test_http_client::TestHttpClient); + let sdk_provided_resource = + Resource::new(vec![KeyValue::new("service.name", "unknown_service")]); + let (_, process) = build_config_and_process(sdk_provided_resource, None, None); + let mut uploader = invalid_uri_builder.build_uploader::()?; + let res = futures_executor::block_on(async { + uploader + .upload(Batch::new(process.into(), Vec::new())) + .await + }); + assert_eq!( + format!("{:?}", res.err().unwrap()), + "Other(\"wrong uri set in http client\")" + ); + + let valid_uri_builder = new_collector_pipeline() + .with_http_client(test_http_client::TestHttpClient) + .build_uploader::(); + + assert!(valid_uri_builder.is_ok()); + Ok(()) + } +} diff --git a/opentelemetry-jaeger/src/exporter/config/collector/mod.rs b/opentelemetry-jaeger/src/exporter/config/collector/mod.rs new file mode 100644 index 0000000000..23408b6e8b --- /dev/null +++ b/opentelemetry-jaeger/src/exporter/config/collector/mod.rs @@ -0,0 +1,515 @@ +use crate::exporter::config::{ + build_config_and_process, install_tracer_provider_and_get_tracer, HasRequiredConfig, + TransformationConfig, +}; +use crate::exporter::uploader::{AsyncUploader, Uploader}; +use crate::{Exporter, JaegerTraceRuntime}; +use http::Uri; +use opentelemetry::{sdk, sdk::trace::Config as TraceConfig, trace::TraceError}; +use std::borrow::BorrowMut; +use std::convert::TryFrom; +use std::env; +#[cfg(feature = "collector_client")] +use std::time::Duration; + +#[cfg(feature = "collector_client")] +use opentelemetry_http::HttpClient; + +#[cfg(feature = "collector_client")] +use crate::config::collector::http_client::CollectorHttpClient; + +#[cfg(feature = "collector_client")] +use crate::exporter::collector::AsyncHttpClient; +#[cfg(feature = "wasm_collector_client")] +use crate::exporter::collector::WasmCollector; + +#[cfg(feature = "collector_client")] +mod http_client; + +/// HTTP endpoint for Jaeger collector. +/// e.g. "http://localhost:14250" +const ENV_ENDPOINT: &str = "OTEL_EXPORTER_JAEGER_ENDPOINT"; + +const DEFAULT_ENDPOINT: &str = "http://localhost:14250/api/trace"; + +/// Timeout for Jaeger collector. +#[cfg(feature = "collector_client")] +const ENV_TIMEOUT: &str = "OTEL_EXPORTER_JAEGER_TIMEOUT"; + +/// Default of 10s +#[cfg(feature = "collector_client")] +const DEFAULT_COLLECTOR_TIMEOUT: Duration = Duration::from_secs(10); + +/// Username to send as part of "Basic" authentication to the collector endpoint. +const ENV_USER: &str = "OTEL_EXPORTER_JAEGER_USER"; + +/// Password to send as part of "Basic" authentication to the collector endpoint. +const ENV_PASSWORD: &str = "OTEL_EXPORTER_JAEGER_PASSWORD"; + +/// CollectorPipeline config and build a exporter targeting a jaeger collector using HTTP protocol. +/// +/// ## Environment variables +/// +/// - `OTEL_EXPORTER_JAEGER_ENDPOINT`: set the endpoint of the collector. Usually starts with `http://` or `https://` +/// +/// - `OTEL_EXPORTER_JAEGER_TIMEOUT`: set the timeout of the http client timeout. It only applies to build in http clients. +/// +/// - `OTEL_EXPORTER_JAEGER_USER`: set the username. Part of the authentication for the collector. It only applies to build in http clients. +/// +/// - `OTEL_EXPORTER_JAEGER_PASSWORD`: set the password. Part of the authentication for the collector. It only applies to build in http clients. +/// +/// ## Build in http clients +/// To help user setup the exporter, `opentelemetry-jaeger` provides the following build in http client +/// implementation and relative configurations. +/// +/// - [surf], requires `surf_collector_client` feature enabled, use [`with_surf`][CollectorPipeline::with_surf] function to setup. +/// - [isahc], requires `isahc_collector_client` feature enabled, use [`with_isahc`][CollectorPipeline::with_isahc] function to setup. +/// - [reqwest], requires `reqwest_collector_client` feature enabled, use [`with_reqwest`][CollectorPipeline::with_reqwest] function to setup. +/// - [reqwest blocking client], requires `reqwest_blocking_collector_client` feature enabled, use [`with_reqwest_blocking`][CollectorPipeline::with_surf] function to setup. +/// +/// Note that the functions to setup build in http clients override each other. That means if you have a pipeline with the following setup +/// +/// ```ignore +/// # use opentelemetry::trace::TraceError; +/// let tracer = opentelemetry_jaeger::new_collector_pipeline() +/// .with_surf() +/// .with_reqwest() +/// .install_batch(opentelemetry::runtime::Tokio)?; +/// ``` +/// +/// The pipeline will use [reqwest] http client. +/// +/// [surf]:https://docs.rs/surf/latest/surf/ +/// [isahc]: https://docs.rs/isahc/latest/isahc/ +/// [reqwest]: reqwest::Client +/// [reqwest blocking client]: reqwest::blocking::Client +#[derive(Debug)] +pub struct CollectorPipeline { + transformation_config: TransformationConfig, + trace_config: Option, + + #[cfg(feature = "collector_client")] + collector_timeout: Duration, + // only used by buildin http clients. + collector_endpoint: Option>, + collector_username: Option, + collector_password: Option, + + client_config: ClientConfig, +} + +impl Default for CollectorPipeline { + fn default() -> Self { + let mut pipeline = Self { + #[cfg(feature = "collector_client")] + collector_timeout: DEFAULT_COLLECTOR_TIMEOUT, + collector_endpoint: None, + collector_username: None, + collector_password: None, + client_config: ClientConfig::default(), + transformation_config: Default::default(), + trace_config: Default::default(), + }; + + #[cfg(feature = "collector_client")] + if let Some(timeout) = env::var(ENV_TIMEOUT).ok().filter(|var| !var.is_empty()) { + let timeout = match timeout.parse() { + Ok(timeout) => Duration::from_millis(timeout), + Err(e) => { + eprintln!("{} malformed defaulting to 10000: {}", ENV_TIMEOUT, e); + DEFAULT_COLLECTOR_TIMEOUT + } + }; + pipeline = pipeline.with_timeout(timeout); + } + + if let Some(endpoint) = env::var(ENV_ENDPOINT).ok().filter(|var| !var.is_empty()) { + pipeline = pipeline.with_endpoint(endpoint); + } + + if let Some(user) = env::var(ENV_USER).ok().filter(|var| !var.is_empty()) { + pipeline = pipeline.with_username(user); + } + + if let Some(password) = env::var(ENV_PASSWORD).ok().filter(|var| !var.is_empty()) { + pipeline = pipeline.with_password(password); + } + + pipeline + } +} + +// implement the seal trait +impl HasRequiredConfig for CollectorPipeline { + fn set_transformation_config(&mut self, f: T) + where + T: FnOnce(&mut TransformationConfig), + { + f(self.transformation_config.borrow_mut()) + } + + fn set_trace_config(&mut self, config: TraceConfig) { + self.trace_config = Some(config) + } +} + +#[derive(Debug)] +enum ClientConfig { + #[cfg(feature = "collector_client")] + Http { client_type: CollectorHttpClient }, + #[cfg(feature = "wasm_collector_client")] + Wasm, // no config is available for wasm for now. But we can add in the future +} + +impl Default for ClientConfig { + fn default() -> Self { + // as long as collector is enabled, we will in favor of it + #[cfg(feature = "collector_client")] + { + ClientConfig::Http { + client_type: CollectorHttpClient::None, + } + } + // when collector_client is disabled and wasm_collector_client is enabled + #[cfg(not(feature = "collector_client"))] + ClientConfig::Wasm + } +} + +/// Start a new pipeline to configure a exporter that target a jaeger collector. +/// +/// See details for each configurations at [`CollectorPipeline`]. +/// +/// [`CollectorPipeline`]: crate::config::collector::CollectorPipeline +#[cfg(feature = "collector_client")] +pub fn new_collector_pipeline() -> CollectorPipeline { + CollectorPipeline::default() +} + +/// Similar to [`new_collector_pipeline`] but the exporter is configured to run with wasm. +#[cfg(feature = "wasm_collector_client")] +#[allow(clippy::field_reassign_with_default)] // make sure when collector_cilent and wasm_collector_client are both set. We will create a wasm type client +pub fn new_wasm_collector_pipeline() -> CollectorPipeline { + let mut pipeline = CollectorPipeline::default(); + pipeline.client_config = ClientConfig::Wasm; + pipeline +} + +impl CollectorPipeline { + /// Set the http client timeout. + /// + /// This function only applies to build in http clients. + /// + /// Default to be 10s. + #[cfg(feature = "collector_client")] + pub fn with_timeout(self, collector_timeout: Duration) -> Self { + Self { + collector_timeout, + ..self + } + } + + /// Set the collector endpoint. + /// + /// E.g. "http://localhost:14268/api/traces" + pub fn with_endpoint(self, collector_endpoint: T) -> Self + where + http::Uri: core::convert::TryFrom, + >::Error: Into, + { + Self { + collector_endpoint: Some( + core::convert::TryFrom::try_from(collector_endpoint).map_err(Into::into), + ), + ..self + } + } + + /// Set the username used in authentication to communicate with the collector. + /// + /// *Note* that if the password is not set by calling `with_password` or set `OTEL_EXPORTER_JAEGER_PASSWORD` + /// environment variables. The username will be ignored. + /// + /// This function only applies to build in http clients. + pub fn with_username>(self, collector_username: S) -> Self { + Self { + collector_username: Some(collector_username.into()), + ..self + } + } + + /// Set the password used in authentication to communicate with the collector. + /// + /// *Note* that if the username is not set by calling `with_username` or set `OTEL_EXPORTER_JAEGER_USER` + /// environment variables. The username will be ignored. + /// + /// This function only applies to build in http clients. + pub fn with_password>(self, collector_password: S) -> Self { + Self { + collector_password: Some(collector_password.into()), + ..self + } + } + + /// Get collector's username set in the builder. Default to be the value of + /// `OTEL_EXPORTER_JAEGER_USER` environment variable. + /// + /// If users uses custom http client. This function can help retrieve the value of + /// `OTEL_EXPORTER_JAEGER_USER` environment variable. + pub fn collector_username(&self) -> Option { + (&self.collector_username).clone() + } + + /// Get the collector's password set in the builder. Default to be the value of + /// `OTEL_EXPORTER_JAEGER_PASSWORD` environment variable. + /// + /// If users uses custom http client. This function can help retrieve the value of + /// `OTEL_EXPORTER_JAEGER_PASSWORD` environment variable. + pub fn collector_password(self) -> Option { + (&self.collector_password).clone() + } + + /// Custom the http client used to send spans. + /// + /// **Note** that all configuration other than the [`endpoint`][CollectorPipeline::with_endpoint] are not + /// applicable to custom clients. + #[cfg(feature = "collector_client")] + pub fn with_http_client(mut self, client: T) -> Self { + self.client_config = match self.client_config { + ClientConfig::Http { .. } => ClientConfig::Http { + client_type: CollectorHttpClient::Custom(Box::new(client)), + }, + // noop for wasm + #[cfg(feature = "wasm_collector_client")] + ClientConfig::Wasm => ClientConfig::Wasm, + }; + self + } + + /// Use isahc http client in the exporter. + #[cfg(feature = "isahc_collector_client")] + pub fn with_isahc(self) -> Self { + Self { + client_config: ClientConfig::Http { + client_type: CollectorHttpClient::Isahc, + }, + ..self + } + } + + /// Use surf http client in the exporter. + #[cfg(feature = "surf_collector_client")] + pub fn with_surf(self) -> Self { + Self { + client_config: ClientConfig::Http { + client_type: CollectorHttpClient::Surf, + }, + ..self + } + } + + /// Use reqwest http client in the exporter. + #[cfg(feature = "reqwest_collector_client")] + pub fn with_reqwest(self) -> Self { + Self { + client_config: ClientConfig::Http { + client_type: CollectorHttpClient::Reqwest, + }, + ..self + } + } + + /// Use reqwest blocking http client in the exporter. + #[cfg(feature = "reqwest_blocking_collector_client")] + pub fn with_reqwest_blocking(self) -> Self { + Self { + client_config: ClientConfig::Http { + client_type: CollectorHttpClient::ReqwestBlocking, + }, + ..self + } + } + + /// Set the service name of the application. It generally is the name of application. + /// Critically, Jaeger backend depends on `Span.Process.ServiceName` to identify the service + /// that produced the spans. + /// + /// Opentelemetry allows set the service name using multiple methods. + /// This functions takes priority over all other methods. + /// + /// If the service name is not set. It will default to be `unknown_service`. + pub fn with_service_name>(mut self, service_name: T) -> Self { + self.set_transformation_config(|mut config| { + config.service_name = Some(service_name.into()); + }); + self + } + + /// Config whether to export information of instrumentation library. + /// + /// It's required to [report instrumentation library as span tags]. + /// However it does have a overhead on performance, performance sensitive applications can + /// use this function to opt out reporting instrumentation library. + /// + /// Default to be `true`. + /// + /// [report instrumentation library as span tags]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk_exporters/non-otlp.md#instrumentationscope + pub fn with_instrumentation_library_tags(mut self, should_export: bool) -> Self { + self.set_transformation_config(|mut config| { + config.export_instrument_library = should_export; + }); + self + } + + /// Assign the opentelemetry SDK configurations for the exporter pipeline. + /// + /// For mapping between opentelemetry configurations and Jaeger spans. Please refer [the spec]. + /// + /// [the spec]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk_exporters/jaeger.md#mappings + /// # Examples + /// Set service name via resource. + /// ```rust + /// use opentelemetry::{sdk::{self, Resource}, KeyValue}; + /// + /// let pipeline = opentelemetry_jaeger::new_collector_pipeline() + /// .with_trace_config( + /// sdk::trace::Config::default() + /// .with_resource(Resource::new(vec![KeyValue::new("service.name", "my-service")])) + /// ); + /// + /// ``` + pub fn with_trace_config(mut self, config: sdk::trace::Config) -> Self { + self.set_trace_config(config); + self + } + + /// Build a `TracerProvider` using a async exporter and configurations from the pipeline. + /// + /// The exporter will collect spans in a batch and send them to the agent. + /// + /// It's possible to lose spans up to a batch when the application shuts down. So users should + /// use [`shut_down_tracer_provider`] to block the shut down process until + /// all remaining spans have been sent. + /// + /// Commonly used runtime are provided via `rt-tokio`, `rt-tokio-current-thread`, `rt-async-std` + /// features. + /// + /// [`shut_down_tracer_provider`]: opentelemetry::global::shutdown_tracer_provider + // todo: we don't need JaegerTraceRuntime, we only need otel runtime + pub fn build_batch( + mut self, + runtime: R, + ) -> Result { + let mut builder = sdk::trace::TracerProvider::builder(); + // build sdk trace config and jaeger process. + // some attributes like service name has attributes like service name + let export_instrument_library = self.transformation_config.export_instrument_library; + let (config, process) = build_config_and_process( + builder.sdk_provided_resource(), + self.trace_config.take(), + self.transformation_config.service_name.take(), + ); + let uploader = self.build_uploader::()?; + let exporter = Exporter::new(process.into(), export_instrument_library, uploader); + + builder = builder.with_batch_exporter(exporter, runtime); + builder = builder.with_config(config); + + Ok(builder.build()) + } + + /// Similar to [`build_batch`][CollectorPipeline::build_batch] but also returns a tracer from the + /// tracer provider. + /// + /// The tracer name is `opentelemetry-jaeger`. The tracer version will be the version of this crate. + pub fn install_batch( + self, + runtime: R, + ) -> Result { + let tracer_provider = self.build_batch(runtime)?; + install_tracer_provider_and_get_tracer(tracer_provider) + } + + fn build_uploader(self) -> Result, crate::Error> + where + R: JaegerTraceRuntime, + { + let endpoint = self + .collector_endpoint + .transpose() + .map_err::(|err| crate::Error::ConfigError { + pipeline_name: "collector", + config_name: "collector_endpoint", + reason: err.to_string(), + })? + .unwrap_or_else(|| { + Uri::try_from(DEFAULT_ENDPOINT).unwrap() // default endpoint should always valid + }); + match self.client_config { + #[cfg(feature = "collector_client")] + ClientConfig::Http { client_type } => { + let client = client_type.build_client( + self.collector_username, + self.collector_password, + self.collector_timeout, + )?; + + let collector = AsyncHttpClient::new(endpoint, client); + Ok(Box::new(AsyncUploader::::Collector(collector))) + } + #[cfg(feature = "wasm_collector_client")] + ClientConfig::Wasm => { + let collector = + WasmCollector::new(endpoint, self.collector_username, self.collector_password) + .map_err::(Into::into)?; + Ok(Box::new(AsyncUploader::::WasmCollector(collector))) + } + } + } +} + +#[cfg(test)] +#[cfg(feature = "rt-tokio")] +mod tests { + use super::*; + use crate::config::collector::http_client::test_http_client; + use opentelemetry::runtime::Tokio; + + #[test] + fn test_collector_defaults() { + // No Env Variable + std::env::remove_var(ENV_TIMEOUT); + let builder = CollectorPipeline::default(); + assert_eq!(DEFAULT_COLLECTOR_TIMEOUT, builder.collector_timeout); + + // Bad Env Variable + std::env::set_var(ENV_TIMEOUT, "a"); + let builder = CollectorPipeline::default(); + assert_eq!(DEFAULT_COLLECTOR_TIMEOUT, builder.collector_timeout); + + // Good Env Variable + std::env::set_var(ENV_TIMEOUT, "777"); + let builder = CollectorPipeline::default(); + assert_eq!(Duration::from_millis(777), builder.collector_timeout); + } + + #[test] + fn test_set_collector_endpoint() { + let invalid_uri = new_collector_pipeline() + .with_endpoint("127.0.0.1:14268/api/traces") + .with_http_client(test_http_client::TestHttpClient) + .build_uploader::(); + assert!(invalid_uri.is_err()); + assert_eq!( + format!("{:?}", invalid_uri.err().unwrap()), + "ConfigError { pipeline_name: \"collector\", config_name: \"collector_endpoint\", reason: \"invalid format\" }", + ); + + let valid_uri = new_collector_pipeline() + .with_http_client(test_http_client::TestHttpClient) + .with_endpoint("http://127.0.0.1:14268/api/traces") + .build_uploader::(); + + assert!(valid_uri.is_ok()); + } +} diff --git a/opentelemetry-jaeger/src/exporter/config/mod.rs b/opentelemetry-jaeger/src/exporter/config/mod.rs new file mode 100644 index 0000000000..a363c0270a --- /dev/null +++ b/opentelemetry-jaeger/src/exporter/config/mod.rs @@ -0,0 +1,172 @@ +//! Configurations to build a jaeger exporter. +//! +//! The jaeger exporter can send spans to [jaeger agent] or [jaeger collector]. The agent is usually +//! deployed along with the application like a sidecar. The collector is usually deployed a stand alone +//! application and receive spans from multiple sources. The exporter will use UDP to send spans to +//! agents and use HTTP/TCP to send spans to collectors. See [jaeger deployment guide] for more details. +//! +//! [jaeger agent]: https://www.jaegertracing.io/docs/1.31/deployment/#agent +//! [jaeger collector]: https://www.jaegertracing.io/docs/1.31/deployment/#collector +//! [jaeger deployment guide]: https://www.jaegertracing.io/docs/1.31/deployment + +use crate::Process; +use opentelemetry::sdk::trace::Config; +use opentelemetry::sdk::Resource; +use opentelemetry::trace::{TraceError, TracerProvider}; +use opentelemetry::{global, sdk, KeyValue}; +use opentelemetry_semantic_conventions as semcov; +use std::sync::Arc; + +/// Config a exporter that sends the spans to a [jaeger agent](https://www.jaegertracing.io/docs/1.31/deployment/#agent). +pub mod agent; +/// Config a exporter that bypass the agent and send spans directly to [jaeger collector](https://www.jaegertracing.io/docs/1.31/deployment/#collector). +#[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] +pub mod collector; + +// configurations and overrides on how to transform OTLP spans to Jaeger spans. +#[derive(Debug)] +struct TransformationConfig { + export_instrument_library: bool, + service_name: Option, +} + +impl Default for TransformationConfig { + fn default() -> Self { + TransformationConfig { + export_instrument_library: true, + service_name: None, + } + } +} + +// pipeline must have transformation config and trace config. +trait HasRequiredConfig { + fn set_transformation_config(&mut self, f: T) + where + T: FnOnce(&mut TransformationConfig); + + fn set_trace_config(&mut self, config: sdk::trace::Config); +} + +// To reduce the overhead of copying service name in every spans. We convert resource into jaeger tags +// and store them into process. And set the resource in trace config to empty. +// +// There are multiple ways to set the service name. A `service.name` tag will be always added +// to the process tags. +fn build_config_and_process( + sdk_resource: sdk::Resource, + mut config: Option, + service_name_opt: Option, +) -> (sdk::trace::Config, Process) { + let (config, resource) = if let Some(mut config) = config.take() { + let resource = if let Some(resource) = config.resource.replace(Arc::new(Resource::empty())) + { + sdk_resource.merge(resource) + } else { + sdk_resource + }; + + (config, resource) + } else { + (Config::default(), sdk_resource) + }; + + let service_name = service_name_opt.unwrap_or_else(|| { + resource + .get(semcov::resource::SERVICE_NAME) + .map(|v| v.to_string()) + .unwrap_or_else(|| "unknown_service".to_string()) + }); + + // merge the tags and resource. Resources take priority. + let mut tags = resource + .into_iter() + .filter(|(key, _)| *key != semcov::resource::SERVICE_NAME) + .map(|(key, value)| KeyValue::new(key, value)) + .collect::>(); + + tags.push(KeyValue::new( + semcov::resource::SERVICE_NAME, + service_name.clone(), + )); + + (config, Process { service_name, tags }) +} + +#[cfg(test)] +mod tests { + use crate::exporter::config::build_config_and_process; + use crate::new_agent_pipeline; + use opentelemetry::sdk::trace::Config; + use opentelemetry::sdk::Resource; + use opentelemetry::KeyValue; + use std::env; + use std::sync::Arc; + + #[test] + fn test_set_service_name() { + let service_name = "halloween_service".to_string(); + + // set via builder's service name, it has highest priority + let (_, process) = + build_config_and_process(Resource::empty(), None, Some(service_name.clone())); + assert_eq!(process.service_name, service_name); + + // make sure the tags in resource are moved to process + let trace_config = Config::default() + .with_resource(Resource::new(vec![KeyValue::new("test-key", "test-value")])); + let (config, process) = + build_config_and_process(Resource::empty(), Some(trace_config), Some(service_name)); + assert_eq!(config.resource, Some(Arc::new(Resource::empty()))); + assert_eq!(process.tags.len(), 2); + + // sdk provided resource can override service name if users didn't provided service name to builder + let (_, process) = build_config_and_process( + Resource::new(vec![KeyValue::new("service.name", "halloween_service")]), + None, + None, + ); + assert_eq!(process.service_name, "halloween_service"); + + // users can also provided service.name from config's resource, in this case, it will override the + // sdk provided service name + let trace_config = Config::default().with_resource(Resource::new(vec![KeyValue::new( + "service.name", + "override_service", + )])); + let (_, process) = build_config_and_process( + Resource::new(vec![KeyValue::new("service.name", "halloween_service")]), + Some(trace_config), + None, + ); + + assert_eq!(process.service_name, "override_service"); + assert_eq!(process.tags.len(), 1); + assert_eq!( + process.tags[0], + KeyValue::new("service.name", "override_service") + ); + } + + #[test] + fn test_read_from_env() { + // OTEL_SERVICE_NAME env var also works + env::set_var("OTEL_SERVICE_NAME", "test service"); + let builder = new_agent_pipeline(); + let exporter = builder.build_sync_agent_exporter().unwrap(); + assert_eq!(exporter.process.service_name, "test service"); + env::set_var("OTEL_SERVICE_NAME", "") + } +} + +pub(crate) fn install_tracer_provider_and_get_tracer( + tracer_provider: sdk::trace::TracerProvider, +) -> Result { + let tracer = tracer_provider.versioned_tracer( + "opentelemetry-jaeger", + Some(env!("CARGO_PKG_VERSION")), + None, + ); + let _ = global::set_tracer_provider(tracer_provider); + Ok(tracer) +} diff --git a/opentelemetry-jaeger/src/exporter/env.rs b/opentelemetry-jaeger/src/exporter/env.rs deleted file mode 100644 index f9bf4ed9ca..0000000000 --- a/opentelemetry-jaeger/src/exporter/env.rs +++ /dev/null @@ -1,66 +0,0 @@ -use crate::PipelineBuilder; -use std::env; -#[cfg(feature = "collector_client")] -use std::time::Duration; - -/// The hostname for the Jaeger agent. -/// e.g. "localhost" -const ENV_AGENT_HOST: &str = "OTEL_EXPORTER_JAEGER_AGENT_HOST"; - -/// The port for the Jaeger agent. -/// e.g. 6832 -const ENV_AGENT_PORT: &str = "OTEL_EXPORTER_JAEGER_AGENT_PORT"; - -/// HTTP endpoint for Jaeger collector. -/// e.g. "http://localhost:14250" -#[cfg(feature = "collector_client")] -const ENV_ENDPOINT: &str = "OTEL_EXPORTER_JAEGER_ENDPOINT"; - -/// Timeout for Jaeger collector. -#[cfg(feature = "collector_client")] -pub(crate) const ENV_TIMEOUT: &str = "OTEL_EXPORTER_JAEGER_TIMEOUT"; - -/// Default of 10s -#[cfg(feature = "collector_client")] -pub(crate) const DEFAULT_COLLECTOR_TIMEOUT: Duration = Duration::from_secs(10); - -/// Username to send as part of "Basic" authentication to the collector endpoint. -#[cfg(feature = "collector_client")] -const ENV_USER: &str = "OTEL_EXPORTER_JAEGER_USER"; - -/// Password to send as part of "Basic" authentication to the collector endpoint. -#[cfg(feature = "collector_client")] -const ENV_PASSWORD: &str = "OTEL_EXPORTER_JAEGER_PASSWORD"; - -/// Assign builder attributes from env -pub(crate) fn assign_attrs(mut builder: PipelineBuilder) -> PipelineBuilder { - if let (Ok(host), Ok(port)) = (env::var(ENV_AGENT_HOST), env::var(ENV_AGENT_PORT)) { - builder = builder.with_agent_endpoint(format!("{}:{}", host.trim(), port.trim())); - } - - #[cfg(feature = "collector_client")] - { - if let Some(timeout) = env::var(ENV_TIMEOUT).ok().filter(|var| !var.is_empty()) { - let timeout = match timeout.parse() { - Ok(timeout) => Duration::from_millis(timeout), - Err(e) => { - eprintln!("{} malformed defaulting to 10000: {}", ENV_TIMEOUT, e); - DEFAULT_COLLECTOR_TIMEOUT - } - }; - builder = builder.with_collector_timeout(timeout); - } - if let Some(endpoint) = env::var(ENV_ENDPOINT).ok().filter(|var| !var.is_empty()) { - builder = builder.with_collector_endpoint(endpoint); - } - - if let Some(user) = env::var(ENV_USER).ok().filter(|var| !var.is_empty()) { - builder = builder.with_collector_username(user); - } - if let Some(password) = env::var(ENV_PASSWORD).ok().filter(|var| !var.is_empty()) { - builder = builder.with_collector_password(password); - } - } - - builder -} diff --git a/opentelemetry-jaeger/src/exporter/mod.rs b/opentelemetry-jaeger/src/exporter/mod.rs index d304148083..ee09651e7c 100644 --- a/opentelemetry-jaeger/src/exporter/mod.rs +++ b/opentelemetry-jaeger/src/exporter/mod.rs @@ -5,9 +5,9 @@ mod agent; mod collector; pub(crate) mod runtime; #[allow(clippy::all, unreachable_pub, dead_code)] -#[rustfmt::skip] +#[rustfmt::skip] // don't format generated files mod thrift; -mod env; +pub mod config; pub(crate) mod transport; mod uploader; @@ -18,11 +18,7 @@ use std::convert::TryFrom; use self::runtime::JaegerTraceRuntime; use self::thrift::jaeger; -use agent::AgentAsyncClientUdp; use async_trait::async_trait; -#[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] -use collector::CollectorAsyncClientHttp; -use opentelemetry_semantic_conventions as semcov; use std::convert::TryInto; #[cfg(feature = "isahc_collector_client")] @@ -30,37 +26,15 @@ use std::convert::TryInto; use isahc::prelude::Configurable; use opentelemetry::sdk::export::ExportError; -use opentelemetry::trace::TraceError; use opentelemetry::{ - global, sdk, + sdk, sdk::export::trace, - trace::{Event, Link, SpanKind, StatusCode, TracerProvider}, + trace::{Event, Link, SpanKind, StatusCode}, Key, KeyValue, }; -#[cfg(feature = "collector_client")] -use opentelemetry_http::HttpClient; -use std::collections::HashSet; -use std::{ - net, - time::{Duration, SystemTime}, -}; -use uploader::{AsyncUploader, SyncUploader, Uploader}; -#[cfg(all( - any( - feature = "reqwest_collector_client", - feature = "reqwest_blocking_collector_client" - ), - not(feature = "surf_collector_client"), - not(feature = "isahc_collector_client") -))] -use headers::authorization::Credentials; -use opentelemetry::sdk::trace::Config; -use opentelemetry::sdk::Resource; -use std::sync::Arc; - -/// Default agent endpoint if none is provided -const DEFAULT_AGENT_ENDPOINT: &str = "127.0.0.1:6831"; +use crate::exporter::uploader::Uploader; +use std::time::{Duration, SystemTime}; /// Instrument Library name MUST be reported in Jaeger Span tags with the following key const INSTRUMENTATION_LIBRARY_NAME: &str = "otel.library.name"; @@ -68,11 +42,6 @@ const INSTRUMENTATION_LIBRARY_NAME: &str = "otel.library.name"; /// Instrument Library version MUST be reported in Jaeger Span tags with the following key const INSTRUMENTATION_LIBRARY_VERSION: &str = "otel.library.version"; -/// Create a new Jaeger exporter pipeline builder. -pub fn new_pipeline() -> PipelineBuilder { - PipelineBuilder::default() -} - /// Jaeger span exporter #[derive(Debug)] pub struct Exporter { @@ -82,6 +51,20 @@ pub struct Exporter { uploader: Box, } +impl Exporter { + fn new( + process: jaeger::Process, + export_instrumentation_lib: bool, + uploader: Box, + ) -> Exporter { + Exporter { + process, + export_instrumentation_lib, + uploader, + } + } +} + /// Jaeger process configuration #[derive(Debug, Default)] pub struct Process { @@ -111,566 +94,6 @@ impl trace::SpanExporter for Exporter { } } -/// Jaeger exporter builder -#[derive(Debug)] -pub struct PipelineBuilder { - agent_endpoint: Vec, - // There are many variations in which it's read unclear which is causing not to be. - #[allow(dead_code)] - #[cfg(feature = "collector_client")] - collector_timeout: Duration, - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - collector_endpoint: Option>, - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - collector_username: Option, - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - collector_password: Option, - #[cfg(feature = "collector_client")] - client: Option>, - export_instrument_library: bool, - service_name: Option, - tags: Option>, - max_packet_size: Option, - auto_split: bool, - config: Option, -} - -impl Default for PipelineBuilder { - /// Return the default Exporter Builder. - fn default() -> Self { - let builder_defaults = PipelineBuilder { - agent_endpoint: vec![DEFAULT_AGENT_ENDPOINT.parse().unwrap()], - #[cfg(feature = "collector_client")] - collector_timeout: env::DEFAULT_COLLECTOR_TIMEOUT, - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - collector_endpoint: None, - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - collector_username: None, - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - collector_password: None, - #[cfg(feature = "collector_client")] - client: None, - export_instrument_library: true, - service_name: None, - tags: None, - max_packet_size: None, - auto_split: false, - config: None, - }; - - // Override above defaults with env vars if set - env::assign_attrs(builder_defaults) - } -} - -impl PipelineBuilder { - /// Assign the agent endpoint. - pub fn with_agent_endpoint(self, agent_endpoint: T) -> Self { - PipelineBuilder { - agent_endpoint: agent_endpoint - .to_socket_addrs() - .map(|addrs| addrs.collect()) - .unwrap_or_default(), - - ..self - } - } - - /// Config whether to export information of instrumentation library. - pub fn with_instrumentation_library_tags(self, export: bool) -> Self { - PipelineBuilder { - export_instrument_library: export, - ..self - } - } - - /// Assign the collector timeout - /// - /// E.g. "10s" - #[cfg(feature = "collector_client")] - #[cfg_attr(docsrs, doc(cfg(feature = "collector_client")))] - pub fn with_collector_timeout(self, collector_timeout: Duration) -> Self { - PipelineBuilder { - collector_timeout, - ..self - } - } - - /// Assign the collector endpoint. - /// - /// E.g. "http://localhost:14268/api/traces" - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - #[cfg_attr( - docsrs, - doc(cfg(any(feature = "collector_client", feature = "wasm_collector_client"))) - )] - pub fn with_collector_endpoint(self, collector_endpoint: T) -> Self - where - http::Uri: core::convert::TryFrom, - >::Error: Into, - { - PipelineBuilder { - collector_endpoint: Some( - core::convert::TryFrom::try_from(collector_endpoint).map_err(Into::into), - ), - ..self - } - } - - /// Assign the collector username - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - #[cfg_attr( - docsrs, - doc(any(feature = "collector_client", feature = "wasm_collector_client")) - )] - pub fn with_collector_username>(self, collector_username: S) -> Self { - PipelineBuilder { - collector_username: Some(collector_username.into()), - ..self - } - } - - /// Get collector's username set in the builder. Default to be the value of - /// `OTEL_EXPORTER_JAEGER_USER` environment variable. - /// - /// If users uses custom http client. This function can help retrieve the value of - /// `OTEL_EXPORTER_JAEGER_USER` environment variable. - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - #[cfg_attr( - docsrs, - doc(any(feature = "collector_client", feature = "wasm_collector_client")) - )] - pub fn collector_username(&self) -> Option { - (&self.collector_username).clone() - } - - /// Assign the collector password - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - #[cfg_attr( - docsrs, - doc(any(feature = "collector_client", feature = "wasm_collector_client")) - )] - pub fn with_collector_password>(self, collector_password: S) -> Self { - PipelineBuilder { - collector_password: Some(collector_password.into()), - ..self - } - } - - /// Get the collector's password set in the builder. Default to be the value of - /// `OTEL_EXPORTER_JAEGER_PASSWORD` environment variable. - /// - /// If users uses custom http client. This function can help retrieve the value of - /// `OTEL_EXPORTER_JAEGER_PASSWORD` environment variable. - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - #[cfg_attr( - docsrs, - doc(any(feature = "collector_client", feature = "wasm_collector_client")) - )] - pub fn collector_password(self) -> Option { - (&self.collector_password).clone() - } - - /// Assign the process service name. - pub fn with_service_name>(mut self, service_name: T) -> Self { - self.service_name = Some(service_name.into()); - self - } - - /// Assign the process tags. - /// - /// Note that resource in trace [Config](sdk::trace::Config) is also reported as process tags - /// in jaeger. If there is duplicate tags between resource and tags. Resource's value take - /// priority even if it's empty. - #[deprecated( - since = "0.16.0", - note = "please pass those tags as resource in sdk::trace::Config. Then use with_trace_config \ - method to pass the config. All key value pairs in resources will be reported as process tags" - )] - pub fn with_tags>(mut self, tags: T) -> Self { - self.tags = Some(tags.into_iter().collect()); - self - } - - /// Assign the max packet size in bytes. Jaeger defaults is 65000. - pub fn with_max_packet_size(mut self, max_packet_size: usize) -> Self { - self.max_packet_size = Some(max_packet_size); - self - } - - /// Config whether to auto split batches. - /// - /// When auto split is set to true, the exporter will try to split the - /// batch into smaller ones so that there will be minimal data loss. It - /// will impact the performance. - /// - /// Note that if one span is too large to export, other spans within the - /// same batch may or may not be exported. In this case, exporter will - /// return errors as we cannot split spans. - pub fn with_auto_split_batch(mut self, auto_split: bool) -> Self { - self.auto_split = auto_split; - self - } - - /// Assign the SDK config for the exporter pipeline. - /// - /// # Examples - /// Set service name via resource. - /// ```rust - /// use opentelemetry_jaeger::PipelineBuilder; - /// use opentelemetry::sdk; - /// use opentelemetry::sdk::Resource; - /// use opentelemetry::KeyValue; - /// - /// let pipeline = PipelineBuilder::default() - /// .with_trace_config( - /// sdk::trace::Config::default() - /// .with_resource(Resource::new(vec![KeyValue::new("service.name", "my-service")])) - /// ); - /// - /// ``` - pub fn with_trace_config(self, config: sdk::trace::Config) -> Self { - PipelineBuilder { - config: Some(config), - ..self - } - } - - /// Assign the http client to use - #[cfg(feature = "collector_client")] - pub fn with_http_client(mut self, client: T) -> Self { - self.client = Some(Box::new(client)); - self - } - - /// Install a Jaeger pipeline with a simple span processor. - pub fn install_simple(self) -> Result { - let tracer_provider = self.build_simple()?; - let tracer = tracer_provider.versioned_tracer( - "opentelemetry-jaeger", - Some(env!("CARGO_PKG_VERSION")), - None, - ); - let _ = global::set_tracer_provider(tracer_provider); - Ok(tracer) - } - - /// Install a Jaeger pipeline with a batch span processor using the specified runtime. - pub fn install_batch( - self, - runtime: R, - ) -> Result { - let tracer_provider = self.build_batch(runtime)?; - let tracer = tracer_provider.versioned_tracer( - "opentelemetry-jaeger", - Some(env!("CARGO_PKG_VERSION")), - None, - ); - let _ = global::set_tracer_provider(tracer_provider); - Ok(tracer) - } - - // To reduce the overhead of copying service name in every spans. We convert resource into jaeger tags - // and store them into process. And set the resource in trace config to empty. - // - // There are multiple ways to set the service name. A `service.name` tag will be always added - // to the process tags. - fn build_config_and_process(&mut self, sdk_provided_resource: Resource) -> (Config, Process) { - let (config, resource) = if let Some(mut config) = self.config.take() { - let resource = - if let Some(resource) = config.resource.replace(Arc::new(Resource::empty())) { - sdk_provided_resource.merge(resource) - } else { - sdk_provided_resource - }; - - (config, resource) - } else { - (Config::default(), sdk_provided_resource) - }; - - let service_name = self.service_name.clone().unwrap_or_else(|| { - resource - .get(semcov::resource::SERVICE_NAME) - .map(|v| v.to_string()) - .unwrap_or_else(|| "unknown_service".to_string()) - }); - - // merge the tags and resource. Resources take priority. - let mut tags = resource - .into_iter() - .filter(|(key, _)| *key != semcov::resource::SERVICE_NAME) - .map(|(key, value)| KeyValue::new(key, value)) - .collect::>(); - - tags.push(KeyValue::new( - semcov::resource::SERVICE_NAME, - service_name.clone(), - )); - - // if users provide key list - if let Some(provided_tags) = self.tags.take() { - let key_set: HashSet = tags - .iter() - .map(|key_value| key_value.key.clone()) - .collect::>(); - for tag in provided_tags.into_iter() { - if !key_set.contains(&tag.key) { - tags.push(tag) - } - } - } - - (config, Process { service_name, tags }) - } - - /// Build a configured `sdk::trace::TracerProvider` with a simple span processor. - pub fn build_simple(mut self) -> Result { - let mut builder = sdk::trace::TracerProvider::builder(); - let (config, process) = self.build_config_and_process(builder.sdk_provided_resource()); - let exporter = self.init_sync_exporter_with_process(process)?; - builder = builder.with_simple_exporter(exporter); - builder = builder.with_config(config); - - Ok(builder.build()) - } - - /// Build a configured `sdk::trace::TracerProvider` with a batch span processor using the - /// specified runtime. - pub fn build_batch( - mut self, - runtime: R, - ) -> Result { - let mut builder = sdk::trace::TracerProvider::builder(); - let (config, process) = self.build_config_and_process(builder.sdk_provided_resource()); - let exporter = self.init_async_exporter_with_process(process, runtime.clone())?; - builder = builder.with_batch_exporter(exporter, runtime); - builder = builder.with_config(config); - - Ok(builder.build()) - } - - /// Initialize a new simple exporter. - /// - /// This is useful if you are manually constructing a pipeline. - pub fn init_sync_exporter(mut self) -> Result { - let builder = sdk::trace::TracerProvider::builder(); - let (_, process) = self.build_config_and_process(builder.sdk_provided_resource()); - self.init_sync_exporter_with_process(process) - } - - fn init_sync_exporter_with_process(self, process: Process) -> Result { - let export_instrumentation_lib = self.export_instrument_library; - let uploader = self.init_sync_uploader()?; - - Ok(Exporter { - process: process.into(), - export_instrumentation_lib, - uploader, - }) - } - - /// Initialize a new exporter. - /// - /// This is useful if you are manually constructing a pipeline. - pub fn init_async_exporter( - mut self, - runtime: R, - ) -> Result { - let builder = sdk::trace::TracerProvider::builder(); - let (_, process) = self.build_config_and_process(builder.sdk_provided_resource()); - self.init_async_exporter_with_process(process, runtime) - } - - fn init_async_exporter_with_process( - self, - process: Process, - runtime: R, - ) -> Result { - let export_instrumentation_lib = self.export_instrument_library; - let uploader = self.init_async_uploader(runtime)?; - - Ok(Exporter { - process: process.into(), - export_instrumentation_lib, - uploader, - }) - } - - fn init_sync_uploader(self) -> Result, TraceError> { - let agent = agent::AgentSyncClientUdp::new( - self.agent_endpoint.as_slice(), - self.max_packet_size, - self.auto_split, - ) - .map_err::(Into::into)?; - Ok(Box::new(SyncUploader::Agent(agent))) - } - - #[cfg(not(any(feature = "collector_client", feature = "wasm_collector_client")))] - fn init_async_uploader( - self, - runtime: R, - ) -> Result, TraceError> { - let agent = AgentAsyncClientUdp::new( - self.agent_endpoint.as_slice(), - self.max_packet_size, - runtime, - self.auto_split, - ) - .map_err::(Into::into)?; - Ok(Box::new(AsyncUploader::Agent(agent))) - } - - #[cfg(feature = "collector_client")] - fn init_async_uploader( - self, - runtime: R, - ) -> Result, TraceError> { - if let Some(collector_endpoint) = self - .collector_endpoint - .transpose() - .map_err::(Into::into)? - { - #[cfg(all( - not(feature = "isahc_collector_client"), - not(feature = "surf_collector_client"), - not(feature = "reqwest_collector_client"), - not(feature = "reqwest_blocking_collector_client") - ))] - let client = self.client.ok_or(crate::Error::NoHttpClient)?; - - #[cfg(feature = "isahc_collector_client")] - let client = self.client.unwrap_or({ - let mut builder = isahc::HttpClient::builder().timeout(self.collector_timeout); - - if let (Some(username), Some(password)) = - (self.collector_username, self.collector_password) - { - builder = builder - .authentication(isahc::auth::Authentication::basic()) - .credentials(isahc::auth::Credentials::new(username, password)); - } - - Box::new(builder.build().map_err(|err| { - crate::Error::ThriftAgentError(::thrift::Error::from(std::io::Error::new( - std::io::ErrorKind::Other, - err.to_string(), - ))) - })?) - }); - - #[cfg(all( - not(feature = "isahc_collector_client"), - not(feature = "surf_collector_client"), - any( - feature = "reqwest_collector_client", - feature = "reqwest_blocking_collector_client" - ) - ))] - let client = self.client.unwrap_or({ - #[cfg(feature = "reqwest_collector_client")] - let mut builder = reqwest::ClientBuilder::new().timeout(self.collector_timeout); - #[cfg(all( - not(feature = "reqwest_collector_client"), - feature = "reqwest_blocking_collector_client" - ))] - let mut builder = - reqwest::blocking::ClientBuilder::new().timeout(self.collector_timeout); - if let (Some(username), Some(password)) = - (self.collector_username, self.collector_password) - { - let mut map = http::HeaderMap::with_capacity(1); - let auth_header_val = - headers::Authorization::basic(username.as_str(), password.as_str()); - map.insert(http::header::AUTHORIZATION, auth_header_val.0.encode()); - builder = builder.default_headers(map); - } - let client: Box = - Box::new(builder.build().map_err::(Into::into)?); - client - }); - - #[cfg(all( - not(feature = "isahc_collector_client"), - feature = "surf_collector_client", - not(feature = "reqwest_collector_client"), - not(feature = "reqwest_blocking_collector_client") - ))] - let client = self.client.unwrap_or({ - let client = surf::Client::try_from( - surf::Config::new().set_timeout(Some(self.collector_timeout)), - ) - .unwrap_or_else(|_| surf::Client::new()); - - let client = if let (Some(username), Some(password)) = - (self.collector_username, self.collector_password) - { - let auth = surf::http::auth::BasicAuth::new(username, password); - client.with(BasicAuthMiddleware(auth)) - } else { - client - }; - - Box::new(client) - }); - - let collector = CollectorAsyncClientHttp::new(collector_endpoint, client); - let uploader: AsyncUploader = AsyncUploader::Collector(collector); - Ok(Box::new(uploader)) - } else { - let endpoint = self.agent_endpoint.as_slice(); - let agent = - AgentAsyncClientUdp::new(endpoint, self.max_packet_size, runtime, self.auto_split) - .map_err::(Into::into)?; - Ok(Box::new(AsyncUploader::Agent(agent))) - } - } - - #[cfg(all(feature = "wasm_collector_client", not(feature = "collector_client")))] - fn init_async_uploader( - self, - runtime: R, - ) -> Result, TraceError> { - if let Some(collector_endpoint) = self - .collector_endpoint - .transpose() - .map_err::(Into::into)? - { - let collector = CollectorAsyncClientHttp::new( - collector_endpoint, - self.collector_username, - self.collector_password, - ) - .map_err::(Into::into)?; - Ok(Box::new(AsyncUploader::Collector(collector))) - } else { - let endpoint = self.agent_endpoint.as_slice(); - let agent = AgentAsyncClientUdp::new(endpoint, self.max_packet_size, self.auto_split) - .map_err::(Into::into)?; - Ok(Box::new(AsyncUploader::Agent(agent))) - } - } -} - -#[derive(Debug)] -#[cfg(feature = "surf_collector_client")] -struct BasicAuthMiddleware(surf::http::auth::BasicAuth); - -#[async_trait] -#[cfg(feature = "surf_collector_client")] -impl surf::middleware::Middleware for BasicAuthMiddleware { - async fn handle( - &self, - mut req: surf::Request, - client: surf::Client, - next: surf::middleware::Next<'_>, - ) -> surf::Result { - req.insert_header(self.0.name(), self.0.value()); - next.run(req, client).await - } -} - fn links_to_references(links: sdk::trace::EvictedQueue) -> Option> { if !links.is_empty() { let refs = links @@ -852,6 +275,17 @@ pub enum Error { #[error("collector uri is invalid, {0}")] #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] InvalidUri(#[from] http::uri::InvalidUri), + + /// Pipeline fails because one of the configurations is invalid. + #[error("{pipeline_name} pipeline fails because one of the configuration, {config_name}, is invalid. {reason}")] + ConfigError { + /// the name of the pipeline. It can be `agent`, `collector` or `wasm collector` + pipeline_name: &'static str, + /// config name that has the error. + config_name: &'static str, + /// the underlying error message. + reason: String, + }, } impl ExportError for Error { @@ -860,123 +294,14 @@ impl ExportError for Error { } } -#[cfg(test)] -#[cfg(all(feature = "collector_client"))] -mod timeout_env_tests { - use crate::exporter::env; - use crate::exporter::PipelineBuilder; - use std::time::Duration; - - #[test] - fn test_collector_defaults() { - // No Env Variable - std::env::remove_var(env::ENV_TIMEOUT); - let builder = PipelineBuilder::default(); - assert_eq!(env::DEFAULT_COLLECTOR_TIMEOUT, builder.collector_timeout); - - // Bad Env Variable - std::env::set_var(env::ENV_TIMEOUT, "a"); - let builder = PipelineBuilder::default(); - assert_eq!(env::DEFAULT_COLLECTOR_TIMEOUT, builder.collector_timeout); - - // Good Env Variable - std::env::set_var(env::ENV_TIMEOUT, "777"); - let builder = PipelineBuilder::default(); - assert_eq!(Duration::from_millis(777), builder.collector_timeout); - } -} - -#[cfg(test)] -#[cfg(all(feature = "collector_client", feature = "rt-tokio"))] -mod collector_client_tests { - use crate::exporter::thrift::jaeger::Batch; - use crate::new_pipeline; - use opentelemetry::runtime::Tokio; - use opentelemetry::sdk::Resource; - use opentelemetry::trace::TraceError; - use opentelemetry::KeyValue; - - mod test_http_client { - use async_trait::async_trait; - use bytes::Bytes; - use http::{Request, Response}; - use opentelemetry_http::{HttpClient, HttpError}; - use std::fmt::Debug; - - pub(crate) struct TestHttpClient; - - impl Debug for TestHttpClient { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str("test http client") - } - } - - #[async_trait] - impl HttpClient for TestHttpClient { - async fn send(&self, _request: Request>) -> Result, HttpError> { - Err("wrong uri set in http client".into()) - } - } - } - - #[test] - fn test_bring_your_own_client() -> Result<(), TraceError> { - let mut builder = new_pipeline() - .with_collector_endpoint("localhost:6831") - .with_http_client(test_http_client::TestHttpClient); - let sdk_provided_resource = - Resource::new(vec![KeyValue::new("service.name", "unknown_service")]); - let (_, process) = builder.build_config_and_process(sdk_provided_resource); - let mut uploader = builder.init_async_uploader(Tokio)?; - let res = futures_executor::block_on(async { - uploader - .upload(Batch::new(process.into(), Vec::new())) - .await - }); - assert_eq!( - format!("{:?}", res.err().unwrap()), - "Other(\"wrong uri set in http client\")" - ); - - Ok(()) - } - - #[test] - #[cfg(any( - feature = "isahc_collector_client", - feature = "surf_collector_client", - feature = "reqwest_collector_client", - feature = "reqwest_blocking_collector_client" - ))] - fn test_set_collector_endpoint() { - let invalid_uri = new_pipeline() - .with_collector_endpoint("127.0.0.1:14268/api/traces") - .init_async_uploader(Tokio); - assert!(invalid_uri.is_err()); - assert_eq!( - format!("{:?}", invalid_uri.err().unwrap()), - "ExportFailed(InvalidUri(InvalidUri(InvalidFormat)))" - ); - - let valid_uri = new_pipeline() - .with_collector_endpoint("http://127.0.0.1:14268/api/traces") - .init_async_uploader(Tokio); - - assert!(valid_uri.is_ok()); - } -} - #[cfg(test)] mod tests { use super::SPAN_KIND; use crate::exporter::thrift::jaeger::Tag; use crate::exporter::{build_span_tags, OTEL_STATUS_CODE, OTEL_STATUS_DESCRIPTION}; - use opentelemetry::sdk::trace::{Config, EvictedHashMap}; - use opentelemetry::sdk::Resource; + use opentelemetry::sdk::trace::EvictedHashMap; use opentelemetry::trace::{SpanKind, StatusCode}; use opentelemetry::KeyValue; - use std::env; - use std::sync::Arc; fn assert_tag_contains(tags: Vec, key: &'static str, expect_val: &'static str) { assert_eq!( @@ -1078,59 +403,4 @@ mod tests { assert_tag_contains(tags.clone(), OTEL_STATUS_CODE, "ERROR"); assert_tag_contains(tags, OTEL_STATUS_DESCRIPTION, user_status_description); } - - #[test] - fn test_set_service_name() { - let service_name = "halloween_service"; - - // set via builder's service name, it has highest priority - let mut builder = crate::PipelineBuilder::default(); - builder = builder.with_service_name(service_name); - let (_, process) = builder.build_config_and_process(Resource::empty()); - assert_eq!(process.service_name, service_name); - - // make sure the tags in resource are moved to process - builder = crate::PipelineBuilder::default(); - builder = builder.with_service_name(service_name); - builder = builder.with_trace_config( - Config::default() - .with_resource(Resource::new(vec![KeyValue::new("test-key", "test-value")])), - ); - let (config, process) = builder.build_config_and_process(Resource::empty()); - assert_eq!(config.resource, Some(Arc::new(Resource::empty()))); - assert_eq!(process.tags.len(), 2); - - // sdk provided resource can override service name if users didn't provided service name to builder - builder = crate::PipelineBuilder::default(); - let (_, process) = builder.build_config_and_process(Resource::new(vec![KeyValue::new( - "service.name", - "halloween_service", - )])); - assert_eq!(process.service_name, "halloween_service"); - - // users can also provided service.name from config's resource, in this case, it will override the - // sdk provided service name - builder = crate::PipelineBuilder::default(); - builder = builder.with_trace_config(Config::default().with_resource(Resource::new(vec![ - KeyValue::new("service.name", "override_service"), - ]))); - let (_, process) = builder.build_config_and_process(Resource::new(vec![KeyValue::new( - "service.name", - "halloween_service", - )])); - - assert_eq!(process.service_name, "override_service"); - assert_eq!(process.tags.len(), 1); - assert_eq!( - process.tags[0], - KeyValue::new("service.name", "override_service") - ); - - // OTEL_SERVICE_NAME env var also works - env::set_var("OTEL_SERVICE_NAME", "test service"); - builder = crate::PipelineBuilder::default(); - let exporter = builder.init_sync_exporter().unwrap(); - assert_eq!(exporter.process.service_name, "test service"); - env::set_var("OTEL_SERVICE_NAME", "") - } } diff --git a/opentelemetry-jaeger/src/exporter/runtime.rs b/opentelemetry-jaeger/src/exporter/runtime.rs index 7bc9b16a49..d5eaee43a2 100644 --- a/opentelemetry-jaeger/src/exporter/runtime.rs +++ b/opentelemetry-jaeger/src/exporter/runtime.rs @@ -18,7 +18,6 @@ pub trait JaegerTraceRuntime: TraceRuntime + std::fmt::Debug { } #[cfg(feature = "rt-tokio")] -#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))] #[async_trait] impl JaegerTraceRuntime for opentelemetry::runtime::Tokio { type Socket = tokio::net::UdpSocket; @@ -37,7 +36,6 @@ impl JaegerTraceRuntime for opentelemetry::runtime::Tokio { } #[cfg(feature = "rt-tokio-current-thread")] -#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))] #[async_trait] impl JaegerTraceRuntime for opentelemetry::runtime::TokioCurrentThread { type Socket = tokio::net::UdpSocket; @@ -56,7 +54,6 @@ impl JaegerTraceRuntime for opentelemetry::runtime::TokioCurrentThread { } #[cfg(feature = "rt-async-std")] -#[cfg_attr(docrs, doc(cfg(feature = "rt-async-std")))] #[async_trait] impl JaegerTraceRuntime for opentelemetry::runtime::AsyncStd { type Socket = async_std::net::UdpSocket; diff --git a/opentelemetry-jaeger/src/exporter/uploader.rs b/opentelemetry-jaeger/src/exporter/uploader.rs index 505ea8b914..fd17aab72f 100644 --- a/opentelemetry-jaeger/src/exporter/uploader.rs +++ b/opentelemetry-jaeger/src/exporter/uploader.rs @@ -4,7 +4,10 @@ use crate::exporter::collector; use crate::exporter::{agent, jaeger}; use async_trait::async_trait; use opentelemetry::sdk::export::trace; +use opentelemetry::sdk::export::trace::ExportResult; +use std::fmt::Debug; +use crate::exporter::thrift::jaeger::Batch; use crate::exporter::JaegerTraceRuntime; #[async_trait] @@ -38,16 +41,17 @@ pub(crate) enum AsyncUploader { /// Agent async client Agent(agent::AgentAsyncClientUdp), /// Collector sync client - #[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))] - Collector(collector::CollectorAsyncClientHttp), + #[cfg(feature = "collector_client")] + Collector(collector::AsyncHttpClient), + #[cfg(feature = "wasm_collector_client")] + WasmCollector(collector::WasmCollector), } #[async_trait] impl Uploader for AsyncUploader { - /// Emit a jaeger batch for the given uploader - async fn upload(&mut self, batch: jaeger::Batch) -> trace::ExportResult { + async fn upload(&mut self, batch: Batch) -> ExportResult { match self { - AsyncUploader::Agent(client) => { + Self::Agent(client) => { // TODO Implement retry behaviour client .emit_batch(batch) @@ -55,13 +59,12 @@ impl Uploader for AsyncUploader { .map_err::(Into::into)?; } #[cfg(feature = "collector_client")] - AsyncUploader::Collector(collector) => { + Self::Collector(collector) => { // TODO Implement retry behaviour collector.submit_batch(batch).await?; } - #[cfg(all(not(feature = "collector_client"), feature = "wasm_collector_client"))] - AsyncUploader::Collector(collector) => { - // TODO Implement retry behaviour + #[cfg(feature = "wasm_collector_client")] + Self::WasmCollector(collector) => { collector .submit_batch(batch) .await diff --git a/opentelemetry-jaeger/src/lib.rs b/opentelemetry-jaeger/src/lib.rs index bdbae7a1d8..ec968750a4 100644 --- a/opentelemetry-jaeger/src/lib.rs +++ b/opentelemetry-jaeger/src/lib.rs @@ -25,7 +25,7 @@ //! //! fn main() -> Result<(), opentelemetry::trace::TraceError> { //! global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); -//! let tracer = opentelemetry_jaeger::new_pipeline().install_simple()?; +//! let tracer = opentelemetry_jaeger::new_agent_pipeline().install_simple()?; //! //! tracer.in_span("doing_work", |cx| { //! // Traced app logic here... @@ -37,6 +37,25 @@ //! } //! ``` //! +//! Or if you are running on an async runtime like Tokio and want to report spans in batches +//! ```no_run +//! use opentelemetry::trace::Tracer; +//! use opentelemetry::global; +//! use opentelemetry::runtime::Tokio; +//! +//! fn main() -> Result<(), opentelemetry::trace::TraceError> { +//! global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); +//! let tracer = opentelemetry_jaeger::new_agent_pipeline().install_batch(Tokio)?; +//! +//! tracer.in_span("doing_work", |cx| { +//! // Traced app logic here... +//! }); +//! +//! global::shutdown_tracer_provider(); // export remaining spans +//! +//! Ok(()) +//! } +//! ``` //! ## Performance //! //! For optimal performance, a batch exporter is recommended as the simple exporter @@ -53,7 +72,7 @@ //! //! ```no_run //! # fn main() -> Result<(), opentelemetry::trace::TraceError> { -//! let tracer = opentelemetry_jaeger::new_pipeline() +//! let tracer = opentelemetry_jaeger::new_agent_pipeline() //! .install_batch(opentelemetry::runtime::Tokio)?; //! # Ok(()) //! # } @@ -78,25 +97,27 @@ //! //! ```toml //! [dependencies] -//! opentelemetry-jaeger = { version = "..", features = ["collector_client", "isahc"] } +//! opentelemetry-jaeger = { version = "..", features = ["collector_client", "isahc_collector_client"] } //! ``` //! -//! Then you can use the [`with_collector_endpoint`] method to specify the endpoint: +//! Then you can use the [`with_endpoint`] method to specify the endpoint: //! -//! [`with_collector_endpoint`]: PipelineBuilder::with_collector_endpoint() +//! [`with_endpoint`]: exporter::config::collector::CollectorPipeline::with_endpoint //! //! ```ignore //! // Note that this requires the `collector_client` feature. -//! // We enabled the `isahc` feature for a default isahc http client. -//! // You can also provide your own implementation via new_pipeline().with_http_client() method. +//! // We enabled the `isahc_collector_client` feature for a default isahc http client. +//! // You can also provide your own implementation via .with_http_client() method. //! use opentelemetry::trace::{Tracer, TraceError}; //! //! fn main() -> Result<(), TraceError> { -//! let tracer = opentelemetry_jaeger::new_pipeline() -//! .with_collector_endpoint("http://localhost:14268/api/traces") -//! // optionally set username and password as well. -//! .with_collector_username("username") -//! .with_collector_password("s3cr3t") +//! let tracer = opentelemetry_jaeger::new_collector_pipeline() +//! .with_endpoint("http://localhost:14268/api/traces") +//! // optionally set username and password for authentication of the exporter. +//! .with_username("username") +//! .with_password("s3cr3t") +//! .with_isahc() +//! //.with_http_client() provide custom http client implementation //! .install_batch(opentelemetry::runtime::Tokio)?; //! //! tracer.in_span("doing_work", |cx| { @@ -112,7 +133,7 @@ //! The full list of this mapping can be found in [OpenTelemetry to Jaeger Transformation]. //! //! The **process tags** in jaeger spans will be mapped as resource in opentelemetry. You can -//! set it through `OTEL_RESOURCE_ATTRIBUTES` environment variable or using [`PipelineBuilder::with_trace_config`]. +//! set it through `OTEL_RESOURCE_ATTRIBUTES` environment variable or using [`with_trace_config`]. //! //! Note that to avoid copying data multiple times. Jaeger exporter will uses resource stored in [`Exporter`]. //! @@ -121,35 +142,39 @@ //! //! Each jaeger span requires a **service name**. This will be mapped as a resource with `service.name` key. //! You can set it using one of the following methods from highest priority to lowest priority. -//! 1. [`PipelineBuilder::with_service_name`]. -//! 2. include a `service.name` key value pairs when configure resource using [`PipelineBuilder::with_trace_config`]. +//! 1. [`with_service_name`]. +//! 2. include a `service.name` key value pairs when configure resource using [`with_trace_config`]. //! 3. set the service name as `OTEL_SERVCE_NAME` environment variable. //! 4. set the `service.name` attributes in `OTEL_RESOURCE_ATTRIBUTES`. //! 5. if the service name is not provided by the above method. `unknown_service` will be used. //! //! Based on the service name, we update/append the `service.name` process tags in jaeger spans. //! -//! [`set_attribute`]: https://docs.rs/opentelemetry/0.16.0/opentelemetry/trace/trait.Span.html#tymethod.set_attribute -//! +//! [`with_service_name`]: crate::exporter::config::agent::AgentPipeline::with_service_name +//! [`with_trace_config`]: crate::exporter::config::agent::AgentPipeline::with_trace_config +//! [`set_attribute`]: opentelemetry::trace::Span::set_attribute //! [OpenTelemetry to Jaeger Transformation]:https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk_exporters/jaeger.md //! //! ## Kitchen Sink Full Configuration //! //! Example showing how to override all configuration options. See the -//! [`PipelineBuilder`] docs for details of each option. +//! [`CollectorPipeline`] and [`AgentPipeline`] docs for details of each option. //! +//! [`CollectorPipeline`]: config::collector::CollectorPipeline +//! [`AgentPipeline`]: config::agent::AgentPipeline //! +//! ### Export to agents //! ```no_run -//! use opentelemetry::{KeyValue, trace::{Tracer, TraceError}}; -//! use opentelemetry::sdk::{trace::{self, RandomIdGenerator, Sampler}, Resource}; -//! use opentelemetry::global; +//! use opentelemetry::{sdk::{trace::{self, RandomIdGenerator, Sampler}, Resource}, global, KeyValue, trace::{Tracer, TraceError}}; //! //! fn main() -> Result<(), TraceError> { //! global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); -//! let tracer = opentelemetry_jaeger::new_pipeline() -//! .with_agent_endpoint("localhost:6831") +//! let tracer = opentelemetry_jaeger::new_agent_pipeline() +//! .with_endpoint("localhost:6831") //! .with_service_name("my_app") //! .with_max_packet_size(9_216) +//! .with_auto_split_batch(true) +//! .with_instrumentation_library_tags(false) //! .with_trace_config( //! trace::config() //! .with_sampler(Sampler::AlwaysOn) @@ -157,6 +182,7 @@ //! .with_max_events_per_span(64) //! .with_max_attributes_per_span(16) //! .with_max_events_per_span(16) +//! // resources will translated to tags in jaeger spans //! .with_resource(Resource::new(vec![KeyValue::new("key", "value"), //! KeyValue::new("process_key", "process_value")])), //! ) @@ -166,7 +192,48 @@ //! // Traced app logic here... //! }); //! -//! global::shutdown_tracer_provider(); // export remaining spans +//! // export remaining spans. It's optional if you can accept spans loss for the last batch. +//! global::shutdown_tracer_provider(); +//! +//! Ok(()) +//! } +//! ``` +//! +//! ### Export to collectors +//! Note that this example requires `collecotr_client` and `isahc_collector_client` feature. +//! ```ignore +//! use opentelemetry::{sdk::{trace::{self, RandomIdGenerator, Sampler}, Resource}, global, KeyValue, trace::{Tracer, TraceError}}; +//! +//! fn main() -> Result<(), TraceError> { +//! global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); +//! let tracer = opentelemetry_jaeger::new_collector_pipeline() +//! .with_endpoint("http://localhost:14250/api/trace") // set collector endpoint +//! .with_service_name("my_app") // the name of the application +//! .with_trace_config( +//! trace::config() +//! .with_sampler(Sampler::AlwaysOn) +//! .with_id_generator(RandomIdGenerator::default()) +//! .with_max_events_per_span(64) +//! .with_max_attributes_per_span(16) +//! .with_max_events_per_span(16) +//! // resources will translated to tags in jaeger spans +//! .with_resource(Resource::new(vec![KeyValue::new("key", "value"), +//! KeyValue::new("process_key", "process_value")])), +//! ) +//! // we config a surf http client with 2 seconds timeout +//! // and have basic authentication header with username=username, password=s3cr3t +//! .with_isahc() // requires `isahc_collector_client` feature +//! .with_username("username") +//! .with_password("s3cr3t") +//! .with_timeout(std::time::Duration::from_secs(2)) +//! .install_batch(opentelemetry::runtime::Tokio)?; +//! +//! tracer.in_span("doing_work", |cx| { +//! // Traced app logic here... +//! }); +//! +//! // export remaining spans. It's optional if you can accept spans loss for the last batch. +//! global::shutdown_tracer_provider(); //! //! Ok(()) //! } @@ -237,6 +304,16 @@ )] #![cfg_attr(test, deny(warnings))] +pub use exporter::config; +#[cfg(feature = "collector_client")] +pub use exporter::config::collector::new_collector_pipeline; +#[cfg(feature = "wasm_collector_client")] +pub use exporter::config::collector::new_wasm_collector_pipeline; +pub use exporter::{ + config::agent::new_agent_pipeline, runtime::JaegerTraceRuntime, Error, Exporter, Process, +}; +pub use propagator::Propagator; + mod exporter; #[cfg(feature = "integration_test")] @@ -619,8 +696,3 @@ mod propagator { } } } - -pub use exporter::{ - new_pipeline, runtime::JaegerTraceRuntime, Error, Exporter, PipelineBuilder, Process, -}; -pub use propagator::Propagator; diff --git a/opentelemetry-jaeger/tests/integration_test.rs b/opentelemetry-jaeger/tests/integration_test.rs index 251833c93e..c6ed611f17 100644 --- a/opentelemetry-jaeger/tests/integration_test.rs +++ b/opentelemetry-jaeger/tests/integration_test.rs @@ -65,8 +65,8 @@ mod tests { println!("{}, {}", agent_endpoint, query_api_endpoint); runtime.block_on(async { - let tracer = opentelemetry_jaeger::new_pipeline() - .with_agent_endpoint(agent_endpoint) + let tracer = opentelemetry_jaeger::new_agent_pipeline() + .with_endpoint(agent_endpoint) .with_service_name(SERVICE_NAME) .install_batch(opentelemetry::runtime::Tokio) .expect("cannot create tracer using default configuration"); diff --git a/scripts/lint.sh b/scripts/lint.sh index 81331b555d..43ed731cec 100755 --- a/scripts/lint.sh +++ b/scripts/lint.sh @@ -34,6 +34,9 @@ if rustup component add clippy; then cargo_feature opentelemetry-jaeger "reqwest_blocking_collector_client" cargo_feature opentelemetry-jaeger "reqwest_collector_client" cargo_feature opentelemetry-jaeger "collector_client" + cargo_feature opentelemetry-jaeger "wasm_collector_client" + cargo_feature opentelemetry-jaeger "collector_client, wasm_collector_client" + cargo_feature opentelemetry-jaeger "default" cargo_feature opentelemetry-dynatrace "default" cargo_feature opentelemetry-dynatrace "metrics,rt-tokio,reqwest-client"