Skip to content

Commit 185a02d

Browse files
gabotechsalamb
andauthored
Factor out Substrait consumers into separate files (#15794)
* Factor out Substrait consumers into separate files * Move relations and expressions into their own modules * Refactor: rename rex to expr * Refactor: move from_substrait_extended_expr to mod.rs --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 061ee09 commit 185a02d

28 files changed

+4159
-3452
lines changed

datafusion/substrait/src/logical_plan/consumer.rs

-3,452
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::logical_plan::consumer::{
19+
from_substrait_func_args, substrait_fun_name, SubstraitConsumer,
20+
};
21+
use datafusion::common::{not_impl_datafusion_err, plan_err, DFSchema, ScalarValue};
22+
use datafusion::execution::FunctionRegistry;
23+
use datafusion::logical_expr::{expr, Expr, SortExpr};
24+
use std::sync::Arc;
25+
use substrait::proto::AggregateFunction;
26+
27+
/// Convert Substrait AggregateFunction to DataFusion Expr
28+
pub async fn from_substrait_agg_func(
29+
consumer: &impl SubstraitConsumer,
30+
f: &AggregateFunction,
31+
input_schema: &DFSchema,
32+
filter: Option<Box<Expr>>,
33+
order_by: Option<Vec<SortExpr>>,
34+
distinct: bool,
35+
) -> datafusion::common::Result<Arc<Expr>> {
36+
let Some(fn_signature) = consumer
37+
.get_extensions()
38+
.functions
39+
.get(&f.function_reference)
40+
else {
41+
return plan_err!(
42+
"Aggregate function not registered: function anchor = {:?}",
43+
f.function_reference
44+
);
45+
};
46+
47+
let fn_name = substrait_fun_name(fn_signature);
48+
let udaf = consumer.get_function_registry().udaf(fn_name);
49+
let udaf = udaf.map_err(|_| {
50+
not_impl_datafusion_err!(
51+
"Aggregate function {} is not supported: function anchor = {:?}",
52+
fn_signature,
53+
f.function_reference
54+
)
55+
})?;
56+
57+
let args = from_substrait_func_args(consumer, &f.arguments, input_schema).await?;
58+
59+
// Datafusion does not support aggregate functions with no arguments, so
60+
// we inject a dummy argument that does not affect the query, but allows
61+
// us to bypass this limitation.
62+
let args = if udaf.name() == "count" && args.is_empty() {
63+
vec![Expr::Literal(ScalarValue::Int64(Some(1)))]
64+
} else {
65+
args
66+
};
67+
68+
Ok(Arc::new(Expr::AggregateFunction(
69+
expr::AggregateFunction::new_udf(udaf, args, distinct, filter, order_by, None),
70+
)))
71+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::logical_plan::consumer::types::from_substrait_type_without_names;
19+
use crate::logical_plan::consumer::SubstraitConsumer;
20+
use datafusion::common::{substrait_err, DFSchema};
21+
use datafusion::logical_expr::{Cast, Expr, TryCast};
22+
use substrait::proto::expression as substrait_expression;
23+
use substrait::proto::expression::cast::FailureBehavior::ReturnNull;
24+
25+
pub async fn from_cast(
26+
consumer: &impl SubstraitConsumer,
27+
cast: &substrait_expression::Cast,
28+
input_schema: &DFSchema,
29+
) -> datafusion::common::Result<Expr> {
30+
match cast.r#type.as_ref() {
31+
Some(output_type) => {
32+
let input_expr = Box::new(
33+
consumer
34+
.consume_expression(
35+
cast.input.as_ref().unwrap().as_ref(),
36+
input_schema,
37+
)
38+
.await?,
39+
);
40+
let data_type = from_substrait_type_without_names(consumer, output_type)?;
41+
if cast.failure_behavior() == ReturnNull {
42+
Ok(Expr::TryCast(TryCast::new(input_expr, data_type)))
43+
} else {
44+
Ok(Expr::Cast(Cast::new(input_expr, data_type)))
45+
}
46+
}
47+
None => substrait_err!("Cast expression without output type is not allowed"),
48+
}
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::logical_plan::consumer::SubstraitConsumer;
19+
use datafusion::common::{not_impl_err, Column, DFSchema};
20+
use datafusion::logical_expr::Expr;
21+
use substrait::proto::expression::field_reference::ReferenceType::DirectReference;
22+
use substrait::proto::expression::reference_segment::ReferenceType::StructField;
23+
use substrait::proto::expression::FieldReference;
24+
25+
pub async fn from_field_reference(
26+
_consumer: &impl SubstraitConsumer,
27+
field_ref: &FieldReference,
28+
input_schema: &DFSchema,
29+
) -> datafusion::common::Result<Expr> {
30+
from_substrait_field_reference(field_ref, input_schema)
31+
}
32+
33+
pub(crate) fn from_substrait_field_reference(
34+
field_ref: &FieldReference,
35+
input_schema: &DFSchema,
36+
) -> datafusion::common::Result<Expr> {
37+
match &field_ref.reference_type {
38+
Some(DirectReference(direct)) => match &direct.reference_type.as_ref() {
39+
Some(StructField(x)) => match &x.child.as_ref() {
40+
Some(_) => not_impl_err!(
41+
"Direct reference StructField with child is not supported"
42+
),
43+
None => Ok(Expr::Column(Column::from(
44+
input_schema.qualified_field(x.field as usize),
45+
))),
46+
},
47+
_ => not_impl_err!(
48+
"Direct reference with types other than StructField is not supported"
49+
),
50+
},
51+
_ => not_impl_err!("unsupported field ref type"),
52+
}
53+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::logical_plan::consumer::SubstraitConsumer;
19+
use datafusion::common::{not_impl_err, DFSchema};
20+
use datafusion::logical_expr::Expr;
21+
use substrait::proto::function_argument::ArgType;
22+
use substrait::proto::FunctionArgument;
23+
24+
/// Convert Substrait FunctionArguments to DataFusion Exprs
25+
pub async fn from_substrait_func_args(
26+
consumer: &impl SubstraitConsumer,
27+
arguments: &Vec<FunctionArgument>,
28+
input_schema: &DFSchema,
29+
) -> datafusion::common::Result<Vec<Expr>> {
30+
let mut args: Vec<Expr> = vec![];
31+
for arg in arguments {
32+
let arg_expr = match &arg.arg_type {
33+
Some(ArgType::Value(e)) => consumer.consume_expression(e, input_schema).await,
34+
_ => not_impl_err!("Function argument non-Value type not supported"),
35+
};
36+
args.push(arg_expr?);
37+
}
38+
Ok(args)
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::logical_plan::consumer::SubstraitConsumer;
19+
use datafusion::common::DFSchema;
20+
use datafusion::logical_expr::{Case, Expr};
21+
use substrait::proto::expression::IfThen;
22+
23+
pub async fn from_if_then(
24+
consumer: &impl SubstraitConsumer,
25+
if_then: &IfThen,
26+
input_schema: &DFSchema,
27+
) -> datafusion::common::Result<Expr> {
28+
// Parse `ifs`
29+
// If the first element does not have a `then` part, then we can assume it's a base expression
30+
let mut when_then_expr: Vec<(Box<Expr>, Box<Expr>)> = vec![];
31+
let mut expr = None;
32+
for (i, if_expr) in if_then.ifs.iter().enumerate() {
33+
if i == 0 {
34+
// Check if the first element is type base expression
35+
if if_expr.then.is_none() {
36+
expr = Some(Box::new(
37+
consumer
38+
.consume_expression(if_expr.r#if.as_ref().unwrap(), input_schema)
39+
.await?,
40+
));
41+
continue;
42+
}
43+
}
44+
when_then_expr.push((
45+
Box::new(
46+
consumer
47+
.consume_expression(if_expr.r#if.as_ref().unwrap(), input_schema)
48+
.await?,
49+
),
50+
Box::new(
51+
consumer
52+
.consume_expression(if_expr.then.as_ref().unwrap(), input_schema)
53+
.await?,
54+
),
55+
));
56+
}
57+
// Parse `else`
58+
let else_expr = match &if_then.r#else {
59+
Some(e) => Some(Box::new(
60+
consumer.consume_expression(e, input_schema).await?,
61+
)),
62+
None => None,
63+
};
64+
Ok(Expr::Case(Case {
65+
expr,
66+
when_then_expr,
67+
else_expr,
68+
}))
69+
}

0 commit comments

Comments
 (0)