17
17
18
18
mod aggregate_function;
19
19
mod cast;
20
- mod extended_expression;
21
20
mod field_reference;
22
21
mod function_arguments;
23
22
mod if_then;
@@ -29,7 +28,6 @@ mod window_function;
29
28
30
29
pub use aggregate_function:: * ;
31
30
pub use cast:: * ;
32
- pub use extended_expression:: * ;
33
31
pub use field_reference:: * ;
34
32
pub use function_arguments:: * ;
35
33
pub use if_then:: * ;
@@ -39,11 +37,18 @@ pub use singular_or_list::*;
39
37
pub use subquery:: * ;
40
38
pub use window_function:: * ;
41
39
42
- use crate :: logical_plan:: consumer:: SubstraitConsumer ;
43
- use datafusion:: common:: { substrait_err, DFSchema } ;
44
- use datafusion:: logical_expr:: Expr ;
40
+ use crate :: extensions:: Extensions ;
41
+ use crate :: logical_plan:: consumer:: utils:: rename_field;
42
+ use crate :: logical_plan:: consumer:: {
43
+ from_substrait_named_struct, DefaultSubstraitConsumer , SubstraitConsumer ,
44
+ } ;
45
+ use datafusion:: arrow:: datatypes:: Field ;
46
+ use datafusion:: common:: { not_impl_err, plan_err, substrait_err, DFSchema , DFSchemaRef } ;
47
+ use datafusion:: execution:: SessionState ;
48
+ use datafusion:: logical_expr:: { Expr , ExprSchemable } ;
45
49
use substrait:: proto:: expression:: RexType ;
46
- use substrait:: proto:: Expression ;
50
+ use substrait:: proto:: expression_reference:: ExprType ;
51
+ use substrait:: proto:: { Expression , ExtendedExpression } ;
47
52
48
53
/// Convert Substrait Rex to DataFusion Expr
49
54
pub async fn from_substrait_rex (
@@ -92,6 +97,87 @@ pub async fn from_substrait_rex(
92
97
}
93
98
}
94
99
100
+ /// Convert Substrait ExtendedExpression to ExprContainer
101
+ ///
102
+ /// A Substrait ExtendedExpression message contains one or more expressions,
103
+ /// with names for the outputs, and an input schema. These pieces are all included
104
+ /// in the ExprContainer.
105
+ ///
106
+ /// This is a top-level message and can be used to send expressions (not plans)
107
+ /// between systems. This is often useful for scenarios like pushdown where filter
108
+ /// expressions need to be sent to remote systems.
109
+ pub async fn from_substrait_extended_expr (
110
+ state : & SessionState ,
111
+ extended_expr : & ExtendedExpression ,
112
+ ) -> datafusion:: common:: Result < ExprContainer > {
113
+ // Register function extension
114
+ let extensions = Extensions :: try_from ( & extended_expr. extensions ) ?;
115
+ if !extensions. type_variations . is_empty ( ) {
116
+ return not_impl_err ! ( "Type variation extensions are not supported" ) ;
117
+ }
118
+
119
+ let consumer = DefaultSubstraitConsumer {
120
+ extensions : & extensions,
121
+ state,
122
+ } ;
123
+
124
+ let input_schema = DFSchemaRef :: new ( match & extended_expr. base_schema {
125
+ Some ( base_schema) => from_substrait_named_struct ( & consumer, base_schema) ,
126
+ None => {
127
+ plan_err ! ( "required property `base_schema` missing from Substrait ExtendedExpression message" )
128
+ }
129
+ } ?) ;
130
+
131
+ // Parse expressions
132
+ let mut exprs = Vec :: with_capacity ( extended_expr. referred_expr . len ( ) ) ;
133
+ for ( expr_idx, substrait_expr) in extended_expr. referred_expr . iter ( ) . enumerate ( ) {
134
+ let scalar_expr = match & substrait_expr. expr_type {
135
+ Some ( ExprType :: Expression ( scalar_expr) ) => Ok ( scalar_expr) ,
136
+ Some ( ExprType :: Measure ( _) ) => {
137
+ not_impl_err ! ( "Measure expressions are not yet supported" )
138
+ }
139
+ None => {
140
+ plan_err ! ( "required property `expr_type` missing from Substrait ExpressionReference message" )
141
+ }
142
+ } ?;
143
+ let expr = consumer
144
+ . consume_expression ( scalar_expr, & input_schema)
145
+ . await ?;
146
+ let ( output_type, expected_nullability) =
147
+ expr. data_type_and_nullable ( & input_schema) ?;
148
+ let output_field = Field :: new ( "" , output_type, expected_nullability) ;
149
+ let mut names_idx = 0 ;
150
+ let output_field = rename_field (
151
+ & output_field,
152
+ & substrait_expr. output_names ,
153
+ expr_idx,
154
+ & mut names_idx,
155
+ /*rename_self=*/ true ,
156
+ ) ?;
157
+ exprs. push ( ( expr, output_field) ) ;
158
+ }
159
+
160
+ Ok ( ExprContainer {
161
+ input_schema,
162
+ exprs,
163
+ } )
164
+ }
165
+
166
+ /// An ExprContainer is a container for a collection of expressions with a common input schema
167
+ ///
168
+ /// In addition, each expression is associated with a field, which defines the
169
+ /// expression's output. The data type and nullability of the field are calculated from the
170
+ /// expression and the input schema. However the names of the field (and its nested fields) are
171
+ /// derived from the Substrait message.
172
+ pub struct ExprContainer {
173
+ /// The input schema for the expressions
174
+ pub input_schema : DFSchemaRef ,
175
+ /// The expressions
176
+ ///
177
+ /// Each item contains an expression and the field that defines the expected nullability and name of the expr's output
178
+ pub exprs : Vec < ( Expr , Field ) > ,
179
+ }
180
+
95
181
/// Convert Substrait Expressions to DataFusion Exprs
96
182
pub async fn from_substrait_rex_vec (
97
183
consumer : & impl SubstraitConsumer ,
0 commit comments