Skip to content

Commit 615dc39

Browse files
feat: instrument spawned tasks with current tracing span when tracing feature is enabled
1 parent 94d2baf commit 615dc39

File tree

21 files changed

+359
-19
lines changed

21 files changed

+359
-19
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ Optional features:
126126
- `backtrace`: include backtrace information in error messages
127127
- `pyarrow`: conversions between PyArrow and DataFusion types
128128
- `serde`: enable arrow-schema's `serde` feature
129+
- `tracing`: propagates the current span across thread boundaries
129130

130131
[apache avro]: https://avro.apache.org/
131132
[apache parquet]: https://parquet.apache.org/

datafusion-examples/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ async-trait = { workspace = true }
6161
bytes = { workspace = true }
6262
dashmap = { workspace = true }
6363
# note only use main datafusion crate for examples
64-
datafusion = { workspace = true, default-features = true, features = ["avro"] }
64+
datafusion = { workspace = true, default-features = true, features = ["avro", "tracing"] }
6565
datafusion-proto = { workspace = true }
6666
env_logger = { workspace = true }
6767
futures = { workspace = true }
@@ -73,6 +73,8 @@ tempfile = { workspace = true }
7373
test-utils = { path = "../test-utils" }
7474
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
7575
tonic = "0.12.1"
76+
tracing = { version = "0.1" }
77+
tracing-subscriber = { version = "0.3" }
7678
url = { workspace = true }
7779
uuid = "1.7"
7880

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! This example demonstrates the trace feature in DataFusion’s runtime.
19+
//! When the `trace` feature is enabled, spawned tasks in DataFusion (such as those
20+
//! created during repartitioning or when reading Parquet files) are instrumented
21+
//! with the current tracing span, allowing to propagate any existing tracing context.
22+
//!
23+
//! In this example we create a session configured to use multiple partitions,
24+
//! register a Parquet table (based on the `alltypes_tiny_pages_plain.parquet` file),
25+
//! and run a query that should trigger parallel execution on multiple threads.
26+
//! We wrap the entire query execution within a custom span and log messages.
27+
//! By inspecting the tracing output, we should see that the tasks spawned
28+
//! internally inherit the span context.
29+
30+
use arrow::util::pretty::pretty_format_batches;
31+
use datafusion::arrow::record_batch::RecordBatch;
32+
use datafusion::datasource::file_format::parquet::ParquetFormat;
33+
use datafusion::datasource::listing::ListingOptions;
34+
use datafusion::error::Result;
35+
use datafusion::prelude::*;
36+
use datafusion::test_util::parquet_test_data;
37+
use std::sync::Arc;
38+
39+
// Bring in tracing and a subscriber to capture log output.
40+
use tracing::{info, Level, instrument};
41+
use tracing_subscriber;
42+
43+
#[tokio::main]
44+
async fn main() -> Result<()> {
45+
// Initialize a tracing subscriber that prints to stdout.
46+
tracing_subscriber::fmt()
47+
.with_thread_ids(true)
48+
.with_thread_names(true)
49+
.with_max_level(Level::DEBUG)
50+
.init();
51+
52+
log::info!("Starting example, this log is not captured by tracing");
53+
54+
// execute the query within a tracing span
55+
let result = run_instrumented_query().await;
56+
57+
info!(
58+
"Finished example. Check the logs above for tracing span details showing \
59+
that tasks were spawned within the 'run_instrumented_query' span on different threads."
60+
);
61+
62+
result
63+
}
64+
65+
#[instrument(level = "info")]
66+
async fn run_instrumented_query() -> Result<()> {
67+
info!("Starting query execution within the custom tracing span");
68+
69+
// The default session will set the number of partitions to `std::thread::available_parallelism()`.
70+
let ctx = SessionContext::new();
71+
72+
// Get the path to the test parquet data.
73+
let test_data = parquet_test_data();
74+
// Build listing options that pick up only the "alltypes_tiny_pages_plain.parquet" file.
75+
let file_format = ParquetFormat::default().with_enable_pruning(true);
76+
let listing_options = ListingOptions::new(Arc::new(file_format))
77+
.with_file_extension("alltypes_tiny_pages_plain.parquet");
78+
79+
info!("Registering Parquet table 'alltypes' from {test_data} in {listing_options:?}");
80+
81+
// Register a listing table using an absolute URL.
82+
let table_path = format!("file://{test_data}/");
83+
ctx.register_listing_table(
84+
"alltypes",
85+
&table_path,
86+
listing_options.clone(),
87+
None,
88+
None,
89+
)
90+
.await
91+
.expect("register_listing_table failed");
92+
93+
info!("Registered Parquet table 'alltypes' from {table_path}");
94+
95+
// Run a query that will trigger parallel execution on multiple threads.
96+
let sql = "SELECT COUNT(*), bool_col, date_string_col, string_col
97+
FROM (
98+
SELECT bool_col, date_string_col, string_col FROM alltypes
99+
UNION ALL
100+
SELECT bool_col, date_string_col, string_col FROM alltypes
101+
) AS t
102+
GROUP BY bool_col, date_string_col, string_col
103+
ORDER BY 1,2,3,4 DESC
104+
LIMIT 5;";
105+
info!(%sql, "Executing SQL query");
106+
let df = ctx.sql(sql).await?;
107+
108+
let results: Vec<RecordBatch> = df.collect().await?;
109+
info!("Query execution complete");
110+
111+
// Print out the results and tracing output.
112+
datafusion::common::assert_batches_eq!(
113+
[
114+
"+----------+----------+-----------------+------------+",
115+
"| count(*) | bool_col | date_string_col | string_col |",
116+
"+----------+----------+-----------------+------------+",
117+
"| 2 | false | 01/01/09 | 9 |",
118+
"| 2 | false | 01/01/09 | 7 |",
119+
"| 2 | false | 01/01/09 | 5 |",
120+
"| 2 | false | 01/01/09 | 3 |",
121+
"| 2 | false | 01/01/09 | 1 |",
122+
"+----------+----------+-----------------+------------+",
123+
],
124+
&results
125+
);
126+
127+
info!("Query results:\n{}", pretty_format_batches(&results)?);
128+
129+
Ok(())
130+
}

datafusion/common-runtime/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,18 @@ rust-version = { workspace = true }
3131
[lints]
3232
workspace = true
3333

34+
[features]
35+
tracing = ["dep:tracing", "dep:tracing-futures"]
36+
3437
[lib]
3538
name = "datafusion_common_runtime"
3639
path = "src/lib.rs"
3740

3841
[dependencies]
3942
log = { workspace = true }
4043
tokio = { workspace = true }
44+
tracing = { version = "0.1", optional = true }
45+
tracing-futures = { version = "0.2", optional = true }
4146

4247
[dev-dependencies]
4348
tokio = { version = "1.36", features = ["rt", "rt-multi-thread", "time"] }

datafusion/common-runtime/src/common.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
use std::future::Future;
1919

20-
use tokio::task::{JoinError, JoinSet};
20+
use tokio::task::{JoinError};
21+
use crate::JoinSet;
2122

2223
/// Helper that provides a simple API to spawn a single task and join it.
2324
/// Provides guarantees of aborting on `Drop` to keep it cancel-safe.
@@ -36,6 +37,7 @@ impl<R: 'static> SpawnedTask<R> {
3637
R: Send,
3738
{
3839
let mut inner = JoinSet::new();
40+
3941
inner.spawn(task);
4042
Self { inner }
4143
}
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::future::Future;
19+
use std::task::{Context, Poll};
20+
use tokio::runtime::Handle;
21+
use tokio::task::JoinSet as TokioJoinSet;
22+
use tokio::task::{AbortHandle, Id, JoinError, LocalSet};
23+
24+
#[cfg(feature = "tracing")]
25+
mod trace_utils {
26+
use std::future::Future;
27+
use tracing_futures::Instrument;
28+
29+
/// Instruments the provided future with the current tracing span.
30+
pub fn trace_future<F, T>(future: F) -> impl Future<Output = T> + Send
31+
where
32+
F: Future<Output = T> + Send + 'static,
33+
T: Send,
34+
{
35+
future.in_current_span()
36+
}
37+
38+
/// Wraps the provided blocking function to execute within the current tracing span.
39+
pub fn trace_block<F, T>(f: F) -> impl FnOnce() -> T + Send + 'static
40+
where
41+
F: FnOnce() -> T + Send + 'static,
42+
T: Send,
43+
{
44+
let span = tracing::Span::current();
45+
move || span.in_scope(|| f())
46+
}
47+
}
48+
49+
/// A wrapper around Tokio's [`JoinSet`](tokio::task::JoinSet) that transparently forwards all public API calls
50+
/// while optionally instrumenting spawned tasks and blocking closures with the current tracing span
51+
/// when the `trace` feature is enabled.
52+
#[derive(Debug)]
53+
pub struct JoinSet<T> {
54+
inner: TokioJoinSet<T>,
55+
}
56+
57+
impl<T> JoinSet<T> {
58+
/// [JoinSet::new](tokio::task::JoinSet::new) - Create a new JoinSet.
59+
pub fn new() -> Self {
60+
Self {
61+
inner: TokioJoinSet::new(),
62+
}
63+
}
64+
65+
/// [JoinSet::len](tokio::task::JoinSet::len) - Return the number of tasks.
66+
pub fn len(&self) -> usize {
67+
self.inner.len()
68+
}
69+
70+
/// [JoinSet::is_empty](tokio::task::JoinSet::is_empty) - Check if the JoinSet is empty.
71+
pub fn is_empty(&self) -> bool {
72+
self.inner.is_empty()
73+
}
74+
}
75+
impl<T: 'static> JoinSet<T> {
76+
/// [JoinSet::spawn](tokio::task::JoinSet::spawn) - Spawn a new task.
77+
pub fn spawn<F>(&mut self, task: F) -> AbortHandle
78+
where
79+
F: Future<Output = T>,
80+
F: Send + 'static,
81+
T: Send,
82+
{
83+
#[cfg(feature = "tracing")]
84+
let task = trace_utils::trace_future(task);
85+
86+
self.inner.spawn(task)
87+
}
88+
89+
/// [JoinSet::spawn_on](tokio::task::JoinSet::spawn_on) - Spawn a task on a provided runtime.
90+
pub fn spawn_on<F>(&mut self, task: F, handle: &Handle) -> AbortHandle
91+
where
92+
F: Future<Output = T>,
93+
F: Send + 'static,
94+
T: Send,
95+
{
96+
#[cfg(feature = "tracing")]
97+
let task = trace_utils::trace_future(task);
98+
99+
self.inner.spawn_on(task, handle)
100+
}
101+
102+
/// [JoinSet::spawn_local](tokio::task::JoinSet::spawn_local) - Spawn a local task.
103+
pub fn spawn_local<F>(&mut self, task: F) -> AbortHandle
104+
where
105+
F: Future<Output = T>,
106+
F: 'static,
107+
{
108+
self.inner.spawn_local(task)
109+
}
110+
111+
/// [JoinSet::spawn_local_on](tokio::task::JoinSet::spawn_local_on) - Spawn a local task on a provided LocalSet.
112+
pub fn spawn_local_on<F>(&mut self, task: F, local_set: &LocalSet) -> AbortHandle
113+
where
114+
F: Future<Output = T>,
115+
F: 'static,
116+
{
117+
self.inner.spawn_local_on(task, local_set)
118+
}
119+
120+
/// [JoinSet::spawn_blocking](tokio::task::JoinSet::spawn_blocking) - Spawn a blocking task.
121+
pub fn spawn_blocking<F>(&mut self, f: F) -> AbortHandle
122+
where
123+
F: FnOnce() -> T,
124+
F: Send + 'static,
125+
T: Send,
126+
{
127+
#[cfg(feature = "tracing")]
128+
let f = trace_utils::trace_block(f);
129+
130+
self.inner.spawn_blocking(f)
131+
}
132+
133+
/// [JoinSet::spawn_blocking_on](tokio::task::JoinSet::spawn_blocking_on) - Spawn a blocking task on a provided runtime.
134+
pub fn spawn_blocking_on<F>(&mut self, f: F, handle: &Handle) -> AbortHandle
135+
where
136+
F: FnOnce() -> T,
137+
F: Send + 'static,
138+
T: Send,
139+
{
140+
#[cfg(feature = "tracing")]
141+
let f = trace_utils::trace_block(f);
142+
143+
self.inner.spawn_blocking_on(f, handle)
144+
}
145+
146+
/// [JoinSet::join_next](tokio::task::JoinSet::join_next) - Await the next completed task.
147+
pub async fn join_next(&mut self) -> Option<Result<T, JoinError>> {
148+
self.inner.join_next().await
149+
}
150+
151+
/// [JoinSet::try_join_next](tokio::task::JoinSet::try_join_next) - Try to join the next completed task.
152+
pub fn try_join_next(&mut self) -> Option<Result<T, JoinError>> {
153+
self.inner.try_join_next()
154+
}
155+
156+
/// [JoinSet::abort_all](tokio::task::JoinSet::abort_all) - Abort all tasks.
157+
pub fn abort_all(&mut self) {
158+
self.inner.abort_all()
159+
}
160+
161+
/// [JoinSet::detach_all](tokio::task::JoinSet::detach_all) - Detach all tasks.
162+
pub fn detach_all(&mut self) {
163+
self.inner.detach_all()
164+
}
165+
166+
/// [JoinSet::poll_join_next](tokio::task::JoinSet::poll_join_next) - Poll for the next completed task.
167+
pub fn poll_join_next(
168+
&mut self,
169+
cx: &mut Context<'_>,
170+
) -> Poll<Option<Result<T, JoinError>>> {
171+
self.inner.poll_join_next(cx)
172+
}
173+
174+
/// [JoinSet::join_next_with_id](tokio::task::JoinSet::join_next_with_id) - Await the next completed task with its ID.
175+
pub async fn join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>> {
176+
self.inner.join_next_with_id().await
177+
}
178+
179+
/// [JoinSet::try_join_next_with_id](tokio::task::JoinSet::try_join_next_with_id) - Try to join the next completed task with its ID.
180+
pub fn try_join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>> {
181+
self.inner.try_join_next_with_id()
182+
}
183+
184+
/// [JoinSet::poll_join_next_with_id](tokio::task::JoinSet::poll_join_next_with_id) - Poll for the next completed task with its ID.
185+
pub fn poll_join_next_with_id(
186+
&mut self,
187+
cx: &mut Context<'_>,
188+
) -> Poll<Option<Result<(Id, T), JoinError>>> {
189+
self.inner.poll_join_next_with_id(cx)
190+
}
191+
192+
/// [JoinSet::shutdown](tokio::task::JoinSet::shutdown) - Abort all tasks and wait for shutdown.
193+
pub async fn shutdown(&mut self) {
194+
self.inner.shutdown().await
195+
}
196+
197+
/// [JoinSet::join_all](tokio::task::JoinSet::join_all) - Await all tasks.
198+
pub async fn join_all(self) -> Vec<T> {
199+
self.inner.join_all().await
200+
}
201+
}

0 commit comments

Comments
 (0)