@@ -49,6 +49,8 @@ use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
49
49
use datafusion:: datasource:: file_format:: parquet:: DEFAULT_PARQUET_EXTENSION ;
50
50
use datafusion:: datasource:: listing:: ListingTableUrl ;
51
51
use datafusion:: execution:: context:: SessionState ;
52
+ use datafusion:: scheduler:: Scheduler ;
53
+ use futures:: TryStreamExt ;
52
54
use serde:: Serialize ;
53
55
use structopt:: StructOpt ;
54
56
@@ -101,6 +103,10 @@ struct DataFusionBenchmarkOpt {
101
103
/// Whether to disable collection of statistics (and cost based optimizations) or not.
102
104
#[ structopt( short = "S" , long = "disable-statistics" ) ]
103
105
disable_statistics : bool ,
106
+
107
+ /// Enable scheduler
108
+ #[ structopt( short = "e" , long = "enable-scheduler" ) ]
109
+ enable_scheduler : bool ,
104
110
}
105
111
106
112
#[ derive( Debug , StructOpt ) ]
@@ -235,14 +241,16 @@ async fn benchmark_query(
235
241
if query_id == 15 {
236
242
for ( n, query) in sql. iter ( ) . enumerate ( ) {
237
243
if n == 1 {
238
- result = execute_query ( & ctx, query, opt. debug ) . await ?;
244
+ result = execute_query ( & ctx, query, opt. debug , opt. enable_scheduler )
245
+ . await ?;
239
246
} else {
240
- execute_query ( & ctx, query, opt. debug ) . await ?;
247
+ execute_query ( & ctx, query, opt. debug , opt . enable_scheduler ) . await ?;
241
248
}
242
249
}
243
250
} else {
244
251
for query in sql {
245
- result = execute_query ( & ctx, query, opt. debug ) . await ?;
252
+ result =
253
+ execute_query ( & ctx, query, opt. debug , opt. enable_scheduler ) . await ?;
246
254
}
247
255
}
248
256
@@ -317,6 +325,7 @@ async fn execute_query(
317
325
ctx : & SessionContext ,
318
326
sql : & str ,
319
327
debug : bool ,
328
+ enable_scheduler : bool ,
320
329
) -> Result < Vec < RecordBatch > > {
321
330
let plan = ctx. sql ( sql) . await ?;
322
331
let plan = plan. to_unoptimized_plan ( ) ;
@@ -337,7 +346,13 @@ async fn execute_query(
337
346
) ;
338
347
}
339
348
let task_ctx = ctx. task_ctx ( ) ;
340
- let result = collect ( physical_plan. clone ( ) , task_ctx) . await ?;
349
+ let result = if enable_scheduler {
350
+ let scheduler = Scheduler :: new ( num_cpus:: get ( ) ) ;
351
+ let results = scheduler. schedule ( physical_plan. clone ( ) , task_ctx) . unwrap ( ) ;
352
+ results. stream ( ) . try_collect ( ) . await ?
353
+ } else {
354
+ collect ( physical_plan. clone ( ) , task_ctx) . await ?
355
+ } ;
341
356
if debug {
342
357
println ! (
343
358
"=== Physical plan with metrics ===\n {}\n " ,
@@ -813,7 +828,7 @@ mod tests {
813
828
814
829
let sql = & get_query_sql ( n) ?;
815
830
for query in sql {
816
- execute_query ( & ctx, query, false ) . await ?;
831
+ execute_query ( & ctx, query, false , false ) . await ?;
817
832
}
818
833
819
834
Ok ( ( ) )
@@ -841,6 +856,7 @@ mod ci {
841
856
mem_table : false ,
842
857
output_path : None ,
843
858
disable_statistics : false ,
859
+ enable_scheduler : false ,
844
860
} ;
845
861
register_tables ( & opt, & ctx) . await ?;
846
862
let queries = get_query_sql ( query) ?;
@@ -1153,6 +1169,7 @@ mod ci {
1153
1169
mem_table : false ,
1154
1170
output_path : None ,
1155
1171
disable_statistics : false ,
1172
+ enable_scheduler : false ,
1156
1173
} ;
1157
1174
let mut results = benchmark_datafusion ( opt) . await ?;
1158
1175
assert_eq ! ( results. len( ) , 1 ) ;
0 commit comments