Skip to content

Commit 6a025d8

Browse files
feat: instrument spawned tasks with current tracing span when tracing feature is enabled
1 parent 873b5f7 commit 6a025d8

File tree

23 files changed

+445
-20
lines changed

23 files changed

+445
-20
lines changed

Cargo.lock

Lines changed: 81 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ Optional features:
129129
- `backtrace`: include backtrace information in error messages
130130
- `pyarrow`: conversions between PyArrow and DataFusion types
131131
- `serde`: enable arrow-schema's `serde` feature
132+
- `tracing`: propagates the current span across thread boundaries
132133

133134
[apache avro]: https://avro.apache.org/
134135
[apache parquet]: https://parquet.apache.org/

datafusion-examples/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ async-trait = { workspace = true }
6161
bytes = { workspace = true }
6262
dashmap = { workspace = true }
6363
# note only use main datafusion crate for examples
64-
datafusion = { workspace = true, default-features = true, features = ["avro"] }
64+
datafusion = { workspace = true, default-features = true, features = ["avro", "tracing"] }
6565
datafusion-proto = { workspace = true }
6666
env_logger = { workspace = true }
6767
futures = { workspace = true }
@@ -73,6 +73,8 @@ tempfile = { workspace = true }
7373
test-utils = { path = "../test-utils" }
7474
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
7575
tonic = "0.12.1"
76+
tracing = { version = "0.1" }
77+
tracing-subscriber = { version = "0.3" }
7678
url = { workspace = true }
7779
uuid = "1.7"
7880

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! This example demonstrates the trace feature in DataFusion’s runtime.
19+
//! When the `trace` feature is enabled, spawned tasks in DataFusion (such as those
20+
//! created during repartitioning or when reading Parquet files) are instrumented
21+
//! with the current tracing span, allowing to propagate any existing tracing context.
22+
//!
23+
//! In this example we create a session configured to use multiple partitions,
24+
//! register a Parquet table (based on the `alltypes_tiny_pages_plain.parquet` file),
25+
//! and run a query that should trigger parallel execution on multiple threads.
26+
//! We wrap the entire query execution within a custom span and log messages.
27+
//! By inspecting the tracing output, we should see that the tasks spawned
28+
//! internally inherit the span context.
29+
30+
use arrow::util::pretty::pretty_format_batches;
31+
use datafusion::arrow::record_batch::RecordBatch;
32+
use datafusion::datasource::file_format::parquet::ParquetFormat;
33+
use datafusion::datasource::listing::ListingOptions;
34+
use datafusion::error::Result;
35+
use datafusion::prelude::*;
36+
use datafusion::test_util::parquet_test_data;
37+
use std::sync::Arc;
38+
use tracing::{info, Level, instrument};
39+
40+
#[tokio::main]
41+
async fn main() -> Result<()> {
42+
// Initialize a tracing subscriber that prints to stdout.
43+
tracing_subscriber::fmt()
44+
.with_thread_ids(true)
45+
.with_thread_names(true)
46+
.with_max_level(Level::DEBUG)
47+
.init();
48+
49+
log::info!("Starting example, this log is not captured by tracing");
50+
51+
// execute the query within a tracing span
52+
let result = run_instrumented_query().await;
53+
54+
info!(
55+
"Finished example. Check the logs above for tracing span details showing \
56+
that tasks were spawned within the 'run_instrumented_query' span on different threads."
57+
);
58+
59+
result
60+
}
61+
62+
#[instrument(level = "info")]
63+
async fn run_instrumented_query() -> Result<()> {
64+
info!("Starting query execution within the custom tracing span");
65+
66+
// The default session will set the number of partitions to `std::thread::available_parallelism()`.
67+
let ctx = SessionContext::new();
68+
69+
// Get the path to the test parquet data.
70+
let test_data = parquet_test_data();
71+
// Build listing options that pick up only the "alltypes_tiny_pages_plain.parquet" file.
72+
let file_format = ParquetFormat::default().with_enable_pruning(true);
73+
let listing_options = ListingOptions::new(Arc::new(file_format))
74+
.with_file_extension("alltypes_tiny_pages_plain.parquet");
75+
76+
info!("Registering Parquet table 'alltypes' from {test_data} in {listing_options:?}");
77+
78+
// Register a listing table using an absolute URL.
79+
let table_path = format!("file://{test_data}/");
80+
ctx.register_listing_table(
81+
"alltypes",
82+
&table_path,
83+
listing_options.clone(),
84+
None,
85+
None,
86+
)
87+
.await
88+
.expect("register_listing_table failed");
89+
90+
info!("Registered Parquet table 'alltypes' from {table_path}");
91+
92+
// Run a query that will trigger parallel execution on multiple threads.
93+
let sql = "SELECT COUNT(*), bool_col, date_string_col, string_col
94+
FROM (
95+
SELECT bool_col, date_string_col, string_col FROM alltypes
96+
UNION ALL
97+
SELECT bool_col, date_string_col, string_col FROM alltypes
98+
) AS t
99+
GROUP BY bool_col, date_string_col, string_col
100+
ORDER BY 1,2,3,4 DESC
101+
LIMIT 5;";
102+
info!(%sql, "Executing SQL query");
103+
let df = ctx.sql(sql).await?;
104+
105+
let results: Vec<RecordBatch> = df.collect().await?;
106+
info!("Query execution complete");
107+
108+
// Print out the results and tracing output.
109+
datafusion::common::assert_batches_eq!(
110+
[
111+
"+----------+----------+-----------------+------------+",
112+
"| count(*) | bool_col | date_string_col | string_col |",
113+
"+----------+----------+-----------------+------------+",
114+
"| 2 | false | 01/01/09 | 9 |",
115+
"| 2 | false | 01/01/09 | 7 |",
116+
"| 2 | false | 01/01/09 | 5 |",
117+
"| 2 | false | 01/01/09 | 3 |",
118+
"| 2 | false | 01/01/09 | 1 |",
119+
"+----------+----------+-----------------+------------+",
120+
],
121+
&results
122+
);
123+
124+
info!("Query results:\n{}", pretty_format_batches(&results)?);
125+
126+
Ok(())
127+
}

datafusion/common-runtime/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,18 @@ rust-version = { workspace = true }
3131
[lints]
3232
workspace = true
3333

34+
[features]
35+
tracing = ["dep:tracing", "dep:tracing-futures"]
36+
3437
[lib]
3538
name = "datafusion_common_runtime"
3639
path = "src/lib.rs"
3740

3841
[dependencies]
3942
log = { workspace = true }
4043
tokio = { workspace = true }
44+
tracing = { version = "0.1", optional = true }
45+
tracing-futures = { version = "0.2", optional = true }
4146

4247
[dev-dependencies]
4348
tokio = { version = "1.36", features = ["rt", "rt-multi-thread", "time"] }

datafusion/common-runtime/src/common.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
use std::future::Future;
1919

20-
use tokio::task::{JoinError, JoinSet};
20+
use tokio::task::{JoinError};
21+
use crate::JoinSet;
2122

2223
/// Helper that provides a simple API to spawn a single task and join it.
2324
/// Provides guarantees of aborting on `Drop` to keep it cancel-safe.
@@ -36,6 +37,7 @@ impl<R: 'static> SpawnedTask<R> {
3637
R: Send,
3738
{
3839
let mut inner = JoinSet::new();
40+
3941
inner.spawn(task);
4042
Self { inner }
4143
}

0 commit comments

Comments
 (0)