15
15
// specific language governing permissions and limitations
16
16
// under the License.
17
17
18
+ use arrow:: array:: { BooleanArray , Int32Array } ;
19
+ use arrow:: record_batch:: RecordBatch ;
18
20
use datafusion:: arrow:: datatypes:: { DataType , Field , Schema , TimeUnit } ;
19
21
use datafusion:: error:: Result ;
20
22
use datafusion:: optimizer:: simplify_expressions:: { ExprSimplifier , SimplifyContext } ;
21
23
use datafusion:: physical_expr:: execution_props:: ExecutionProps ;
24
+ use datafusion:: physical_expr:: {
25
+ analyze, create_physical_expr, AnalysisContext , ExprBoundaries , PhysicalExpr ,
26
+ } ;
22
27
use datafusion:: prelude:: * ;
23
28
use datafusion_common:: { ScalarValue , ToDFSchema } ;
24
29
use datafusion_expr:: expr:: BinaryExpr ;
25
- use datafusion_expr:: Operator ;
30
+ use datafusion_expr:: interval_arithmetic:: Interval ;
31
+ use datafusion_expr:: { ColumnarValue , ExprSchemable , Operator } ;
32
+ use std:: sync:: Arc ;
26
33
27
34
/// This example demonstrates the DataFusion [`Expr`] API.
28
35
///
29
36
/// DataFusion comes with a powerful and extensive system for
30
37
/// representing and manipulating expressions such as `A + 5` and `X
31
- /// IN ('foo', 'bar', 'baz')` and many other constructs.
38
+ /// IN ('foo', 'bar', 'baz')`.
39
+ ///
40
+ /// In addition to building and manipulating [`Expr`]s, DataFusion
41
+ /// also comes with APIs for evaluation, simplification, and analysis.
42
+ ///
43
+ /// The code in this example shows how to:
44
+ /// 1. Create [`Exprs`] using different APIs: [`main`]`
45
+ /// 2. Evaluate [`Exprs`] against data: [`evaluate_demo`]
46
+ /// 3. Simplify expressions: [`simplify_demo`]
47
+ /// 4. Analyze predicates for boundary ranges: [`range_analysis_demo`]
32
48
#[ tokio:: main]
33
49
async fn main ( ) -> Result < ( ) > {
34
50
// The easiest way to do create expressions is to use the
35
- // "fluent"-style API, like this :
51
+ // "fluent"-style API:
36
52
let expr = col ( "a" ) + lit ( 5 ) ;
37
53
38
- // this creates the same expression as the following though with
39
- // much less code,
54
+ // The same same expression can be created directly, with much more code:
40
55
let expr2 = Expr :: BinaryExpr ( BinaryExpr :: new (
41
56
Box :: new ( col ( "a" ) ) ,
42
57
Operator :: Plus ,
43
58
Box :: new ( Expr :: Literal ( ScalarValue :: Int32 ( Some ( 5 ) ) ) ) ,
44
59
) ) ;
45
60
assert_eq ! ( expr, expr2) ;
46
61
62
+ // See how to evaluate expressions
63
+ evaluate_demo ( ) ?;
64
+
65
+ // See how to simplify expressions
47
66
simplify_demo ( ) ?;
48
67
68
+ // See how to analyze ranges in expressions
69
+ range_analysis_demo ( ) ?;
70
+
71
+ Ok ( ( ) )
72
+ }
73
+
74
+ /// DataFusion can also evaluate arbitrary expressions on Arrow arrays.
75
+ fn evaluate_demo ( ) -> Result < ( ) > {
76
+ // For example, let's say you have some integers in an array
77
+ let batch = RecordBatch :: try_from_iter ( [ (
78
+ "a" ,
79
+ Arc :: new ( Int32Array :: from ( vec ! [ 4 , 5 , 6 , 7 , 8 , 7 , 4 ] ) ) as _ ,
80
+ ) ] ) ?;
81
+
82
+ // If you want to find all rows where the expression `a < 5 OR a = 8` is true
83
+ let expr = col ( "a" ) . lt ( lit ( 5 ) ) . or ( col ( "a" ) . eq ( lit ( 8 ) ) ) ;
84
+
85
+ // First, you make a "physical expression" from the logical `Expr`
86
+ let physical_expr = physical_expr ( & batch. schema ( ) , expr) ?;
87
+
88
+ // Now, you can evaluate the expression against the RecordBatch
89
+ let result = physical_expr. evaluate ( & batch) ?;
90
+
91
+ // The result contain an array that is true only for where `a < 5 OR a = 8`
92
+ let expected_result = Arc :: new ( BooleanArray :: from ( vec ! [
93
+ true , false , false , false , true , false , true ,
94
+ ] ) ) as _ ;
95
+ assert ! (
96
+ matches!( & result, ColumnarValue :: Array ( r) if r == & expected_result) ,
97
+ "result: {:?}" ,
98
+ result
99
+ ) ;
100
+
49
101
Ok ( ( ) )
50
102
}
51
103
52
- /// In addition to easy construction, DataFusion exposes APIs for
53
- /// working with and simplifying such expressions that call into the
54
- /// same powerful and extensive implementation used for the query
55
- /// engine.
104
+ /// In addition to easy construction, DataFusion exposes APIs for simplifying
105
+ /// such expression so they are more efficient to evaluate. This code is also
106
+ /// used by the query engine to optimize queries.
56
107
fn simplify_demo ( ) -> Result < ( ) > {
57
108
// For example, lets say you have has created an expression such
58
109
// ts = to_timestamp("2020-09-08T12:00:00+00:00")
@@ -94,7 +145,7 @@ fn simplify_demo() -> Result<()> {
94
145
make_field( "b" , DataType :: Boolean ) ,
95
146
] )
96
147
. to_dfschema_ref ( ) ?;
97
- let context = SimplifyContext :: new ( & props) . with_schema ( schema) ;
148
+ let context = SimplifyContext :: new ( & props) . with_schema ( schema. clone ( ) ) ;
98
149
let simplifier = ExprSimplifier :: new ( context) ;
99
150
100
151
// basic arithmetic simplification
@@ -120,6 +171,64 @@ fn simplify_demo() -> Result<()> {
120
171
col( "i" ) . lt( lit( 10 ) )
121
172
) ;
122
173
174
+ // String --> Date simplification
175
+ // `cast('2020-09-01' as date)` --> 18500
176
+ assert_eq ! (
177
+ simplifier. simplify( lit( "2020-09-01" ) . cast_to( & DataType :: Date32 , & schema) ?) ?,
178
+ lit( ScalarValue :: Date32 ( Some ( 18506 ) ) )
179
+ ) ;
180
+
181
+ Ok ( ( ) )
182
+ }
183
+
184
+ /// DataFusion also has APIs for analyzing predicates (boolean expressions) to
185
+ /// determine any ranges restrictions on the inputs required for the predicate
186
+ /// evaluate to true.
187
+ fn range_analysis_demo ( ) -> Result < ( ) > {
188
+ // For example, let's say you are interested in finding data for all days
189
+ // in the month of September, 2020
190
+ let september_1 = ScalarValue :: Date32 ( Some ( 18506 ) ) ; // 2020-09-01
191
+ let october_1 = ScalarValue :: Date32 ( Some ( 18536 ) ) ; // 2020-10-01
192
+
193
+ // The predicate to find all such days could be
194
+ // `date > '2020-09-01' AND date < '2020-10-01'`
195
+ let expr = col ( "date" )
196
+ . gt ( lit ( september_1. clone ( ) ) )
197
+ . and ( col ( "date" ) . lt ( lit ( october_1. clone ( ) ) ) ) ;
198
+
199
+ // Using the analysis API, DataFusion can determine that the value of `date`
200
+ // must be in the range `['2020-09-01', '2020-10-01']`. If your data is
201
+ // organized in files according to day, this information permits skipping
202
+ // entire files without reading them.
203
+ //
204
+ // While this simple example could be handled with a special case, the
205
+ // DataFusion API handles arbitrary expressions (so for example, you don't
206
+ // have to handle the case where the predicate clauses are reversed such as
207
+ // `date < '2020-10-01' AND date > '2020-09-01'`
208
+
209
+ // As always, we need to tell DataFusion the type of column "date"
210
+ let schema = Schema :: new ( vec ! [ make_field( "date" , DataType :: Date32 ) ] ) ;
211
+
212
+ // You can provide DataFusion any known boundaries on the values of `date`
213
+ // (for example, maybe you know you only have data up to `2020-09-15`), but
214
+ // in this case, let's say we don't know any boundaries beforehand so we use
215
+ // `try_new_unknown`
216
+ let boundaries = ExprBoundaries :: try_new_unbounded ( & schema) ?;
217
+
218
+ // Now, we invoke the analysis code to perform the range analysis
219
+ let physical_expr = physical_expr ( & schema, expr) ?;
220
+ let analysis_result =
221
+ analyze ( & physical_expr, AnalysisContext :: new ( boundaries) , & schema) ?;
222
+
223
+ // The results of the analysis is an range, encoded as an `Interval`, for
224
+ // each column in the schema, that must be true in order for the predicate
225
+ // to be true.
226
+ //
227
+ // In this case, we can see that, as expected, `analyze` has figured out
228
+ // that in this case, `date` must be in the range `['2020-09-01', '2020-10-01']`
229
+ let expected_range = Interval :: try_new ( september_1, october_1) ?;
230
+ assert_eq ! ( analysis_result. boundaries[ 0 ] . interval, expected_range) ;
231
+
123
232
Ok ( ( ) )
124
233
}
125
234
@@ -132,3 +241,18 @@ fn make_ts_field(name: &str) -> Field {
132
241
let tz = None ;
133
242
make_field ( name, DataType :: Timestamp ( TimeUnit :: Nanosecond , tz) )
134
243
}
244
+
245
+ /// Build a physical expression from a logical one, after applying simplification and type coercion
246
+ pub fn physical_expr ( schema : & Schema , expr : Expr ) -> Result < Arc < dyn PhysicalExpr > > {
247
+ let df_schema = schema. clone ( ) . to_dfschema_ref ( ) ?;
248
+
249
+ // Simplify
250
+ let props = ExecutionProps :: new ( ) ;
251
+ let simplifier =
252
+ ExprSimplifier :: new ( SimplifyContext :: new ( & props) . with_schema ( df_schema. clone ( ) ) ) ;
253
+
254
+ // apply type coercion here to ensure types match
255
+ let expr = simplifier. coerce ( expr, df_schema. clone ( ) ) ?;
256
+
257
+ create_physical_expr ( & expr, df_schema. as_ref ( ) , schema, & props)
258
+ }
0 commit comments