Skip to content

Commit dd9c3a8

Browse files
feat: introduce JoinSetTracer trait for tracing context propagation in spawned tasks (#14547)
* feat: instrument spawned tasks with current tracing span when `tracing` feature is enabled * feat: allowing injecting custom join_set tracer to avoid dependency on `tracing` crate * feat: move the `join_set` tracing utils to a dedicated file
1 parent 11838be commit dd9c3a8

File tree

21 files changed

+592
-19
lines changed

21 files changed

+592
-19
lines changed

Cargo.lock

Lines changed: 70 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-examples/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ tempfile = { workspace = true }
7373
test-utils = { path = "../test-utils" }
7474
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
7575
tonic = "0.12.1"
76+
tracing = { version = "0.1" }
77+
tracing-subscriber = { version = "0.3" }
7678
url = { workspace = true }
7779
uuid = "1.16"
7880

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! This example demonstrates the tracing injection feature for the DataFusion runtime.
19+
//! Tasks spawned on new threads behave differently depending on whether a tracer is injected.
20+
//! The log output clearly distinguishes the two cases.
21+
//!
22+
//! # Expected Log Output
23+
//!
24+
//! When **no tracer** is injected, logs from tasks running on `tokio-runtime-worker` threads
25+
//! will _not_ include the `run_instrumented_query` span:
26+
//!
27+
//! ```text
28+
//! 10:29:40.714 INFO main ThreadId(01) tracing: ***** RUNNING WITHOUT INJECTED TRACER *****
29+
//! 10:29:40.714 INFO main ThreadId(01) run_instrumented_query: tracing: Starting query execution
30+
//! 10:29:40.728 INFO main ThreadId(01) run_instrumented_query: tracing: Executing SQL query sql="SELECT COUNT(*), string_col FROM alltypes GROUP BY string_col"
31+
//! 10:29:40.743 DEBUG main ThreadId(01) run_instrumented_query: datafusion_optimizer::optimizer: Optimizer took 6 ms
32+
//! 10:29:40.759 DEBUG tokio-runtime-worker ThreadId(03) datafusion_physical_plan::aggregates::row_hash: Creating GroupedHashAggregateStream
33+
//! 10:29:40.758 DEBUG tokio-runtime-worker ThreadId(04) datafusion_physical_plan::aggregates::row_hash: Creating GroupedHashAggregateStream
34+
//! 10:29:40.771 INFO main ThreadId(01) run_instrumented_query: tracing: Query complete: 6 batches returned
35+
//! 10:29:40.772 INFO main ThreadId(01) tracing: ***** WITHOUT tracer: Non-main tasks did NOT inherit the `run_instrumented_query` span *****
36+
//! ```
37+
//!
38+
//! When a tracer **is** injected, tasks spawned on non‑main threads _do_ inherit the span:
39+
//!
40+
//! ```text
41+
//! 10:29:40.772 INFO main ThreadId(01) tracing: Injecting custom tracer...
42+
//! 10:29:40.772 INFO main ThreadId(01) tracing: ***** RUNNING WITH INJECTED TRACER *****
43+
//! 10:29:40.772 INFO main ThreadId(01) run_instrumented_query: tracing: Starting query execution
44+
//! 10:29:40.775 INFO main ThreadId(01) run_instrumented_query: tracing: Executing SQL query sql="SELECT COUNT(*), string_col FROM alltypes GROUP BY string_col"
45+
//! 10:29:40.784 DEBUG main ThreadId(01) run_instrumented_query: datafusion_optimizer::optimizer: Optimizer took 7 ms
46+
//! 10:29:40.801 DEBUG tokio-runtime-worker ThreadId(03) run_instrumented_query: datafusion_physical_plan::aggregates::row_hash: Creating GroupedHashAggregateStream
47+
//! 10:29:40.801 DEBUG tokio-runtime-worker ThreadId(04) run_instrumented_query: datafusion_physical_plan::aggregates::row_hash: Creating GroupedHashAggregateStream
48+
//! 10:29:40.809 INFO main ThreadId(01) run_instrumented_query: tracing: Query complete: 6 batches returned
49+
//! 10:29:40.809 INFO main ThreadId(01) tracing: ***** WITH tracer: Non-main tasks DID inherit the `run_instrumented_query` span *****
50+
//! ```
51+
52+
use datafusion::common::runtime::{set_join_set_tracer, JoinSetTracer};
53+
use datafusion::datasource::file_format::parquet::ParquetFormat;
54+
use datafusion::datasource::listing::ListingOptions;
55+
use datafusion::error::Result;
56+
use datafusion::prelude::*;
57+
use datafusion::test_util::parquet_test_data;
58+
use futures::future::BoxFuture;
59+
use futures::FutureExt;
60+
use std::any::Any;
61+
use std::sync::Arc;
62+
use tracing::{info, instrument, Instrument, Level, Span};
63+
64+
#[tokio::main]
65+
async fn main() -> Result<()> {
66+
// Initialize tracing subscriber with thread info.
67+
tracing_subscriber::fmt()
68+
.with_thread_ids(true)
69+
.with_thread_names(true)
70+
.with_max_level(Level::DEBUG)
71+
.init();
72+
73+
// Run query WITHOUT tracer injection.
74+
info!("***** RUNNING WITHOUT INJECTED TRACER *****");
75+
run_instrumented_query().await?;
76+
info!("***** WITHOUT tracer: `tokio-runtime-worker` tasks did NOT inherit the `run_instrumented_query` span *****");
77+
78+
// Inject custom tracer so tasks run in the current span.
79+
info!("Injecting custom tracer...");
80+
set_join_set_tracer(&SpanTracer).expect("Failed to set tracer");
81+
82+
// Run query WITH tracer injection.
83+
info!("***** RUNNING WITH INJECTED TRACER *****");
84+
run_instrumented_query().await?;
85+
info!("***** WITH tracer: `tokio-runtime-worker` tasks DID inherit the `run_instrumented_query` span *****");
86+
87+
Ok(())
88+
}
89+
90+
/// A simple tracer that ensures any spawned task or blocking closure
91+
/// inherits the current span via `in_current_span`.
92+
struct SpanTracer;
93+
94+
/// Implement the `JoinSetTracer` trait so we can inject instrumentation
95+
/// for both async futures and blocking closures.
96+
impl JoinSetTracer for SpanTracer {
97+
/// Instruments a boxed future to run in the current span. The future's
98+
/// return type is erased to `Box<dyn Any + Send>`, which we simply
99+
/// run inside the `Span::current()` context.
100+
fn trace_future(
101+
&self,
102+
fut: BoxFuture<'static, Box<dyn Any + Send>>,
103+
) -> BoxFuture<'static, Box<dyn Any + Send>> {
104+
fut.in_current_span().boxed()
105+
}
106+
107+
/// Instruments a boxed blocking closure by running it inside the
108+
/// `Span::current()` context.
109+
fn trace_block(
110+
&self,
111+
f: Box<dyn FnOnce() -> Box<dyn Any + Send> + Send>,
112+
) -> Box<dyn FnOnce() -> Box<dyn Any + Send> + Send> {
113+
let span = Span::current();
114+
Box::new(move || span.in_scope(f))
115+
}
116+
}
117+
118+
#[instrument(level = "info")]
119+
async fn run_instrumented_query() -> Result<()> {
120+
info!("Starting query execution");
121+
122+
let ctx = SessionContext::new();
123+
let test_data = parquet_test_data();
124+
let file_format = ParquetFormat::default().with_enable_pruning(true);
125+
let listing_options = ListingOptions::new(Arc::new(file_format))
126+
.with_file_extension("alltypes_tiny_pages_plain.parquet");
127+
128+
let table_path = format!("file://{test_data}/");
129+
info!("Registering table 'alltypes' from {}", table_path);
130+
ctx.register_listing_table("alltypes", &table_path, listing_options, None, None)
131+
.await
132+
.expect("Failed to register table");
133+
134+
let sql = "SELECT COUNT(*), string_col FROM alltypes GROUP BY string_col";
135+
info!(sql, "Executing SQL query");
136+
let result = ctx.sql(sql).await?.collect().await?;
137+
info!("Query complete: {} batches returned", result.len());
138+
Ok(())
139+
}

datafusion/common-runtime/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ workspace = true
3838
name = "datafusion_common_runtime"
3939

4040
[dependencies]
41+
futures = { workspace = true }
4142
log = { workspace = true }
4243
tokio = { workspace = true }
4344

datafusion/common-runtime/src/common.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
use std::future::Future;
1919

20-
use tokio::task::{JoinError, JoinSet};
20+
use crate::JoinSet;
21+
use tokio::task::JoinError;
2122

2223
/// Helper that provides a simple API to spawn a single task and join it.
2324
/// Provides guarantees of aborting on `Drop` to keep it cancel-safe.

0 commit comments

Comments
 (0)