Skip to content

Commit f8da884

Browse files
committed
Morsel-driven Parallelism using rayon (apache#2199)
1 parent 8058fbb commit f8da884

File tree

11 files changed

+1199
-29
lines changed

11 files changed

+1199
-29
lines changed

CONTRIBUTING.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ The parquet SQL benchmarks can be run with
150150
cargo bench --bench parquet_query_sql
151151
```
152152

153-
These randomly generate a parquet file, and then benchmark queries sourced from [parquet_query_sql.sql](./datafusion/benches/parquet_query_sql.sql) against it. This can therefore be a quick way to add coverage of particular query and/or data paths.
153+
These randomly generate a parquet file, and then benchmark queries sourced from [parquet_query_sql.sql](./datafusion/scheduler/benches/parquet_query_sql.sql) against it. This can therefore be a quick way to add coverage of particular query and/or data paths.
154154

155155
If the environment variable `PARQUET_FILE` is set, the benchmark will run queries against this file instead of a randomly generated one. This can be useful for performing multiple runs, potentially with different code, against the same source data, or for testing against a custom dataset.
156156

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717

1818
[workspace]
1919
members = [
20-
"datafusion/core",
2120
"datafusion/common",
21+
"datafusion/core",
2222
"datafusion/expr",
2323
"datafusion/jit",
2424
"datafusion/physical-expr",
2525
"datafusion/proto",
26+
"datafusion/scheduler",
2627
"datafusion-examples",
2728
"benchmarks",
2829
"ballista/rust/client",

datafusion/core/Cargo.toml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,6 @@ name = "scalar"
117117
harness = false
118118
name = "physical_plan"
119119

120-
[[bench]]
121-
harness = false
122-
name = "parquet_query_sql"
123-
124120
[[bench]]
125121
harness = false
126122
name = "jit"

datafusion/core/src/datasource/memory.rs

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -71,25 +71,15 @@ impl MemTable {
7171
let exec = t.scan(&None, &[], None).await?;
7272
let partition_count = exec.output_partitioning().partition_count();
7373

74-
let tasks = (0..partition_count)
75-
.map(|part_i| {
76-
let context1 = context.clone();
77-
let exec = exec.clone();
78-
tokio::spawn(async move {
79-
let stream = exec.execute(part_i, context1.clone()).await?;
80-
common::collect(stream).await
81-
})
82-
})
83-
// this collect *is needed* so that the join below can
84-
// switch between tasks
85-
.collect::<Vec<_>>();
86-
87-
let mut data: Vec<Vec<RecordBatch>> =
88-
Vec::with_capacity(exec.output_partitioning().partition_count());
89-
for task in tasks {
90-
let result = task.await.expect("MemTable::load could not join task")?;
91-
data.push(result);
92-
}
74+
let data = futures::future::try_join_all((0..partition_count).map(|part_i| {
75+
let context1 = context.clone();
76+
let exec = exec.clone();
77+
async move {
78+
let stream = exec.execute(part_i, context1.clone()).await?;
79+
common::collect(stream).await
80+
}
81+
}))
82+
.await?;
9383

9484
let exec = MemoryExec::try_new(&data, schema.clone(), None)?;
9585

datafusion/scheduler/Cargo.toml

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[package]
19+
name = "datafusion-scheduler"
20+
description = "Scheduling for DataFusion query engine"
21+
version = "7.0.0"
22+
homepage = "https://github.com/apache/arrow-datafusion"
23+
repository = "https://github.com/apache/arrow-datafusion"
24+
readme = "../README.md"
25+
authors = ["Apache Arrow <[email protected]>"]
26+
license = "Apache-2.0"
27+
keywords = ["arrow", "query", "sql"]
28+
edition = "2021"
29+
rust-version = "1.58"
30+
31+
[lib]
32+
name = "datafusion_scheduler"
33+
path = "src/lib.rs"
34+
35+
[features]
36+
37+
[dependencies]
38+
ahash = { version = "0.7", default-features = false }
39+
arrow = { version = "11" }
40+
async-trait = "0.1"
41+
datafusion = { path = "../core", version = "7.0.0" }
42+
futures = "0.3"
43+
log = "0.4"
44+
parking_lot = "0.12"
45+
rayon = "1.5"
46+
47+
[dev-dependencies]
48+
criterion = "0.3"
49+
rand = "0.8"
50+
tokio = { version = "1.0", features = ["macros", "rt"] }
51+
parquet = "11.0"
52+
tempfile = "3"
53+
54+
[[bench]]
55+
harness = false
56+
name = "parquet_query_sql"

datafusion/core/benches/parquet_query_sql.rs renamed to datafusion/scheduler/benches/parquet_query_sql.rs

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,13 @@ use arrow::datatypes::{
2424
};
2525
use arrow::record_batch::RecordBatch;
2626
use criterion::{criterion_group, criterion_main, Criterion};
27+
<<<<<<<< HEAD:datafusion/core/benches/parquet_query_sql.rs
2728
use datafusion::prelude::{ParquetReadOptions, SessionContext};
29+
========
30+
use datafusion::prelude::{SessionConfig, SessionContext};
31+
use datafusion_scheduler::Scheduler;
32+
use futures::stream::StreamExt;
33+
>>>>>>>> scheduler-experiments:datafusion-scheduler/benches/parquet_query_sql.rs
2834
use parquet::arrow::ArrowWriter;
2935
use parquet::file::properties::{WriterProperties, WriterVersion};
3036
use rand::distributions::uniform::SampleUniform;
@@ -37,7 +43,6 @@ use std::path::Path;
3743
use std::sync::Arc;
3844
use std::time::Instant;
3945
use tempfile::NamedTempFile;
40-
use tokio_stream::StreamExt;
4146

4247
/// The number of batches to write
4348
const NUM_BATCHES: usize = 2048;
@@ -193,6 +198,7 @@ fn criterion_benchmark(c: &mut Criterion) {
193198
assert!(Path::new(&file_path).exists(), "path not found");
194199
println!("Using parquet file {}", file_path);
195200

201+
<<<<<<<< HEAD:datafusion/core/benches/parquet_query_sql.rs
196202
let context = SessionContext::new();
197203

198204
let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap();
@@ -202,6 +208,26 @@ fn criterion_benchmark(c: &mut Criterion) {
202208
ParquetReadOptions::default(),
203209
))
204210
.unwrap();
211+
========
212+
let partitions = 4;
213+
let config = SessionConfig::new().with_target_partitions(partitions);
214+
let mut context = SessionContext::with_config(config);
215+
216+
let scheduler = Scheduler::new(partitions);
217+
218+
let local_rt = tokio::runtime::Builder::new_current_thread()
219+
.build()
220+
.unwrap();
221+
222+
let query_rt = tokio::runtime::Builder::new_multi_thread()
223+
.worker_threads(partitions)
224+
.build()
225+
.unwrap();
226+
227+
local_rt
228+
.block_on(context.register_parquet("t", file_path.as_str()))
229+
.unwrap();
230+
>>>>>>>> scheduler-experiments:datafusion-scheduler/benches/parquet_query_sql.rs
205231

206232
// We read the queries from a file so they can be changed without recompiling the benchmark
207233
let mut queries_file = File::open("benches/parquet_query_sql.sql").unwrap();
@@ -220,17 +246,48 @@ fn criterion_benchmark(c: &mut Criterion) {
220246
continue;
221247
}
222248

223-
let query = query.as_str();
224-
c.bench_function(query, |b| {
249+
c.bench_function(&format!("tokio: {}", query), |b| {
225250
b.iter(|| {
251+
<<<<<<<< HEAD:datafusion/core/benches/parquet_query_sql.rs
226252
let context = context.clone();
227253
rt.block_on(async move {
228254
let query = context.sql(query).await.unwrap();
255+
========
256+
let query = query.clone();
257+
let mut context = context.clone();
258+
let (sender, mut receiver) = futures::channel::mpsc::unbounded();
259+
260+
// Spawn work to a separate tokio thread pool
261+
query_rt.spawn(async move {
262+
let query = context.sql(&query).await.unwrap();
263+
>>>>>>>> scheduler-experiments:datafusion-scheduler/benches/parquet_query_sql.rs
229264
let mut stream = query.execute_stream().await.unwrap();
230-
while criterion::black_box(stream.next().await).is_some() {}
265+
266+
while let Some(next) = stream.next().await {
267+
sender.unbounded_send(next).unwrap();
268+
}
269+
});
270+
271+
local_rt.block_on(async {
272+
while receiver.next().await.transpose().unwrap().is_some() {}
231273
})
232274
});
233275
});
276+
277+
c.bench_function(&format!("scheduled: {}", query), |b| {
278+
b.iter(|| {
279+
let query = query.clone();
280+
let mut context = context.clone();
281+
282+
local_rt.block_on(async {
283+
let query = context.sql(&query).await.unwrap();
284+
let plan = query.create_physical_plan().await.unwrap();
285+
let mut stream =
286+
scheduler.schedule_plan(plan, context.task_ctx()).unwrap();
287+
while stream.next().await.transpose().unwrap().is_some() {}
288+
});
289+
});
290+
});
234291
}
235292

236293
// Temporary file must outlive the benchmarks, it is deleted when dropped

0 commit comments

Comments
 (0)