Skip to content

Commit 02f4d95

Browse files
committed
Morsel-driven Parallelism using rayon (apache#2199)
1 parent 8058fbb commit 02f4d95

File tree

10 files changed

+1180
-22
lines changed

10 files changed

+1180
-22
lines changed

CONTRIBUTING.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ The parquet SQL benchmarks can be run with
150150
cargo bench --bench parquet_query_sql
151151
```
152152

153-
These randomly generate a parquet file, and then benchmark queries sourced from [parquet_query_sql.sql](./datafusion/benches/parquet_query_sql.sql) against it. This can therefore be a quick way to add coverage of particular query and/or data paths.
153+
These randomly generate a parquet file, and then benchmark queries sourced from [parquet_query_sql.sql](./datafusion/scheduler/benches/parquet_query_sql.sql) against it. This can therefore be a quick way to add coverage of particular query and/or data paths.
154154

155155
If the environment variable `PARQUET_FILE` is set, the benchmark will run queries against this file instead of a randomly generated one. This can be useful for performing multiple runs, potentially with different code, against the same source data, or for testing against a custom dataset.
156156

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717

1818
[workspace]
1919
members = [
20-
"datafusion/core",
2120
"datafusion/common",
21+
"datafusion/core",
2222
"datafusion/expr",
2323
"datafusion/jit",
2424
"datafusion/physical-expr",
2525
"datafusion/proto",
26+
"datafusion/scheduler",
2627
"datafusion-examples",
2728
"benchmarks",
2829
"ballista/rust/client",

datafusion/core/Cargo.toml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,6 @@ name = "scalar"
117117
harness = false
118118
name = "physical_plan"
119119

120-
[[bench]]
121-
harness = false
122-
name = "parquet_query_sql"
123-
124120
[[bench]]
125121
harness = false
126122
name = "jit"

datafusion/scheduler/Cargo.toml

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[package]
19+
name = "datafusion-scheduler"
20+
description = "Scheduling for DataFusion query engine"
21+
version = "7.0.0"
22+
homepage = "https://github.com/apache/arrow-datafusion"
23+
repository = "https://github.com/apache/arrow-datafusion"
24+
readme = "../README.md"
25+
authors = ["Apache Arrow <[email protected]>"]
26+
license = "Apache-2.0"
27+
keywords = ["arrow", "query", "sql"]
28+
edition = "2021"
29+
rust-version = "1.58"
30+
31+
[lib]
32+
name = "datafusion_scheduler"
33+
path = "src/lib.rs"
34+
35+
[features]
36+
37+
[dependencies]
38+
ahash = { version = "0.7", default-features = false }
39+
arrow = { version = "11" }
40+
async-trait = "0.1"
41+
datafusion = { path = "../core", version = "7.0.0" }
42+
futures = "0.3"
43+
log = "0.4"
44+
parking_lot = "0.12"
45+
rayon = "1.5"
46+
47+
[dev-dependencies]
48+
criterion = "0.3"
49+
rand = "0.8"
50+
tokio = { version = "1.0", features = ["macros", "rt"] }
51+
parquet = "11.0"
52+
tempfile = "3"
53+
54+
[[bench]]
55+
harness = false
56+
name = "parquet_query_sql"

datafusion/core/benches/parquet_query_sql.rs renamed to datafusion/scheduler/benches/parquet_query_sql.rs

Lines changed: 51 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ use arrow::datatypes::{
2424
};
2525
use arrow::record_batch::RecordBatch;
2626
use criterion::{criterion_group, criterion_main, Criterion};
27-
use datafusion::prelude::{ParquetReadOptions, SessionContext};
27+
use datafusion::prelude::{SessionConfig, SessionContext};
28+
use datafusion_scheduler::Scheduler;
29+
use futures::stream::StreamExt;
2830
use parquet::arrow::ArrowWriter;
2931
use parquet::file::properties::{WriterProperties, WriterVersion};
3032
use rand::distributions::uniform::SampleUniform;
@@ -37,7 +39,6 @@ use std::path::Path;
3739
use std::sync::Arc;
3840
use std::time::Instant;
3941
use tempfile::NamedTempFile;
40-
use tokio_stream::StreamExt;
4142

4243
/// The number of batches to write
4344
const NUM_BATCHES: usize = 2048;
@@ -193,15 +194,24 @@ fn criterion_benchmark(c: &mut Criterion) {
193194
assert!(Path::new(&file_path).exists(), "path not found");
194195
println!("Using parquet file {}", file_path);
195196

196-
let context = SessionContext::new();
197+
let partitions = 4;
198+
let config = SessionConfig::new().with_target_partitions(partitions);
199+
let mut context = SessionContext::with_config(config);
197200

198-
let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap();
199-
rt.block_on(context.register_parquet(
200-
"t",
201-
file_path.as_str(),
202-
ParquetReadOptions::default(),
203-
))
204-
.unwrap();
201+
let scheduler = Scheduler::new(partitions);
202+
203+
let local_rt = tokio::runtime::Builder::new_current_thread()
204+
.build()
205+
.unwrap();
206+
207+
let query_rt = tokio::runtime::Builder::new_multi_thread()
208+
.worker_threads(partitions)
209+
.build()
210+
.unwrap();
211+
212+
local_rt
213+
.block_on(context.register_parquet("t", file_path.as_str()))
214+
.unwrap();
205215

206216
// We read the queries from a file so they can be changed without recompiling the benchmark
207217
let mut queries_file = File::open("benches/parquet_query_sql.sql").unwrap();
@@ -220,17 +230,42 @@ fn criterion_benchmark(c: &mut Criterion) {
220230
continue;
221231
}
222232

223-
let query = query.as_str();
224-
c.bench_function(query, |b| {
233+
c.bench_function(&format!("tokio: {}", query), |b| {
225234
b.iter(|| {
226-
let context = context.clone();
227-
rt.block_on(async move {
228-
let query = context.sql(query).await.unwrap();
235+
let query = query.clone();
236+
let mut context = context.clone();
237+
let (sender, mut receiver) = futures::channel::mpsc::unbounded();
238+
239+
// Spawn work to a separate tokio thread pool
240+
query_rt.spawn(async move {
241+
let query = context.sql(&query).await.unwrap();
229242
let mut stream = query.execute_stream().await.unwrap();
230-
while criterion::black_box(stream.next().await).is_some() {}
243+
244+
while let Some(next) = stream.next().await {
245+
sender.unbounded_send(next).unwrap();
246+
}
247+
});
248+
249+
local_rt.block_on(async {
250+
while receiver.next().await.transpose().unwrap().is_some() {}
231251
})
232252
});
233253
});
254+
255+
c.bench_function(&format!("scheduled: {}", query), |b| {
256+
b.iter(|| {
257+
let query = query.clone();
258+
let mut context = context.clone();
259+
260+
local_rt.block_on(async {
261+
let query = context.sql(&query).await.unwrap();
262+
let plan = query.create_physical_plan().await.unwrap();
263+
let mut stream =
264+
scheduler.schedule_plan(plan, context.task_ctx()).unwrap();
265+
while stream.next().await.transpose().unwrap().is_some() {}
266+
});
267+
});
268+
});
234269
}
235270

236271
// Temporary file must outlive the benchmarks, it is deleted when dropped

0 commit comments

Comments
 (0)