@@ -24,10 +24,12 @@ mod data_utils;
24
24
25
25
use crate :: criterion:: Criterion ;
26
26
use arrow:: datatypes:: { DataType , Field , Fields , Schema } ;
27
+ use arrow_array:: { ArrayRef , RecordBatch } ;
27
28
use criterion:: Bencher ;
28
29
use datafusion:: datasource:: MemTable ;
29
30
use datafusion:: execution:: context:: SessionContext ;
30
31
use datafusion_common:: ScalarValue ;
32
+ use datafusion_expr:: col;
31
33
use itertools:: Itertools ;
32
34
use std:: fs:: File ;
33
35
use std:: io:: { BufRead , BufReader } ;
@@ -147,6 +149,77 @@ fn benchmark_with_param_values_many_columns(ctx: &SessionContext, b: &mut Benche
147
149
} ) ;
148
150
}
149
151
152
+ /// Registers a table like this:
153
+ /// c0,c1,c2...,c99
154
+ /// 0,100...9900
155
+ /// 0,200...19800
156
+ /// 0,300...29700
157
+ fn register_union_order_table ( ctx : & SessionContext , num_columns : usize , num_rows : usize ) {
158
+ // ("c0", [0, 0, ...])
159
+ // ("c1": [100, 200, ...])
160
+ // etc
161
+ let iter = ( 0 ..num_columns) . map ( |i| i as u64 ) . map ( |i| {
162
+ let array: ArrayRef = Arc :: new ( arrow:: array:: UInt64Array :: from_iter_values (
163
+ ( 0 ..num_rows)
164
+ . map ( |j| j as u64 * 100 + i)
165
+ . collect :: < Vec < _ > > ( ) ,
166
+ ) ) ;
167
+ ( format ! ( "c{}" , i) , array)
168
+ } ) ;
169
+ let batch = RecordBatch :: try_from_iter ( iter) . unwrap ( ) ;
170
+ let schema = batch. schema ( ) ;
171
+ let partitions = vec ! [ vec![ batch] ] ;
172
+
173
+ // tell DataFusion that the table is sorted by all columns
174
+ let sort_order = ( 0 ..num_columns)
175
+ . map ( |i| col ( format ! ( "c{}" , i) ) . sort ( true , true ) )
176
+ . collect :: < Vec < _ > > ( ) ;
177
+
178
+ // create the table
179
+ let table = MemTable :: try_new ( schema, partitions)
180
+ . unwrap ( )
181
+ . with_sort_order ( vec ! [ sort_order] ) ;
182
+
183
+ ctx. register_table ( "t" , Arc :: new ( table) ) . unwrap ( ) ;
184
+ }
185
+
186
+ /// return a query like
187
+ /// ```sql
188
+ /// select c1, null as c2, ... null as cn from t ORDER BY c1
189
+ /// UNION ALL
190
+ /// select null as c1, c2, ... null as cn from t ORDER BY c2
191
+ /// ...
192
+ /// select null as c1, null as c2, ... cn from t ORDER BY cn
193
+ /// ORDER BY c1, c2 ... CN
194
+ /// ```
195
+ fn union_orderby_query ( n : usize ) -> String {
196
+ let mut query = String :: new ( ) ;
197
+ for i in 0 ..n {
198
+ if i != 0 {
199
+ query. push_str ( "\n UNION ALL \n " ) ;
200
+ }
201
+ let select_list = ( 0 ..n)
202
+ . map ( |j| {
203
+ if i == j {
204
+ format ! ( "c{j}" )
205
+ } else {
206
+ format ! ( "null as c{j}" )
207
+ }
208
+ } )
209
+ . collect :: < Vec < _ > > ( )
210
+ . join ( ", " ) ;
211
+ query. push_str ( & format ! ( "(SELECT {} FROM t ORDER BY c{})" , select_list, i) ) ;
212
+ }
213
+ query. push_str ( & format ! (
214
+ "\n ORDER BY {}" ,
215
+ ( 0 ..n)
216
+ . map( |i| format!( "c{}" , i) )
217
+ . collect:: <Vec <_>>( )
218
+ . join( ", " )
219
+ ) ) ;
220
+ query
221
+ }
222
+
150
223
fn criterion_benchmark ( c : & mut Criterion ) {
151
224
// verify that we can load the clickbench data prior to running the benchmark
152
225
if !PathBuf :: from ( format ! ( "{BENCHMARKS_PATH_1}{CLICKBENCH_DATA_PATH}" ) ) . exists ( )
@@ -289,6 +362,17 @@ fn criterion_benchmark(c: &mut Criterion) {
289
362
} ) ;
290
363
} ) ;
291
364
365
+ // -- Sorted Queries --
366
+ register_union_order_table ( & ctx, 100 , 1000 ) ;
367
+
368
+ // this query has many expressions in its sort order so stresses
369
+ // order equivalence validation
370
+ c. bench_function ( "physical_sorted_union_orderby" , |b| {
371
+ // SELECT ... UNION ALL ...
372
+ let query = union_orderby_query ( 20 ) ;
373
+ b. iter ( || physical_plan ( & ctx, & query) )
374
+ } ) ;
375
+
292
376
// --- TPC-H ---
293
377
294
378
let tpch_ctx = register_defs ( SessionContext :: new ( ) , tpch_schemas ( ) ) ;
0 commit comments