Skip to content

Only recompute schema in TypeCoercion when necessary #10369

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ fn field_for_index(

/// cast subquery in InSubquery/ScalarSubquery to a given type.
pub fn cast_subquery(subquery: Subquery, cast_to_type: &DataType) -> Result<Subquery> {
if subquery.subquery.schema().field(0).data_type() == cast_to_type {
if subquery.data_type() == cast_to_type {
return Ok(subquery);
}

Expand Down
200 changes: 200 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use datafusion_common::{

// backwards compatibility
use crate::display::PgJsonVisitor;
use crate::logical_plan::tree_node::unwrap_arc;
pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
pub use datafusion_common::{JoinConstraint, JoinType};

Expand Down Expand Up @@ -467,6 +468,200 @@ impl LogicalPlan {
self.with_new_exprs(self.expressions(), inputs.to_vec())
}

/// Recomputes schema and type information for this LogicalPlan if needed.
///
/// Some `LogicalPlan`s may need to recompute their schema if the number or
/// type of expressions have been changed (for example due to type
/// coercion). For example [`LogicalPlan::Projection`]s schema depends on
/// its expressions.
///
/// Some `LogicalPlan`s schema is unaffected by any changes to their
/// expressions. For example [`LogicalPlan::Filter`] schema is always the
/// same as its input schema.
///
/// # Return value
/// Returns an error if there is some issue recomputing the schema.
///
/// # Notes
///
/// * Does not recursively recompute schema for input (child) plans.
pub fn recompute_schema(self) -> Result<Self> {
match self {
// Since expr may be different than the previous expr, schema of the projection
// may change. We need to use try_new method instead of try_new_with_schema method.
LogicalPlan::Projection(Projection {
expr,
input,
schema: _,
}) => Projection::try_new(expr, input).map(LogicalPlan::Projection),
LogicalPlan::Dml(_) => Ok(self),
LogicalPlan::Copy(_) => Ok(self),
LogicalPlan::Values(Values { schema, values }) => {
// todo it isn't clear why the schema is not recomputed here
Ok(LogicalPlan::Values(Values { schema, values }))
}
LogicalPlan::Filter(Filter { predicate, input }) => {
// todo: should this logic be moved to Filter::try_new?

// filter predicates should not contain aliased expressions so we remove any aliases
// before this logic was added we would have aliases within filters such as for
// benchmark q6:
//
// lineitem.l_shipdate >= Date32(\"8766\")
// AND lineitem.l_shipdate < Date32(\"9131\")
// AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS lineitem.l_discount >=
// Decimal128(Some(49999999999999),30,15)
// AND CAST(lineitem.l_discount AS Decimal128(30, 15)) AS lineitem.l_discount <=
// Decimal128(Some(69999999999999),30,15)
// AND lineitem.l_quantity < Decimal128(Some(2400),15,2)

let predicate = predicate
.transform_down(|expr| {
match expr {
Expr::Exists { .. }
| Expr::ScalarSubquery(_)
| Expr::InSubquery(_) => {
// subqueries could contain aliases so we don't recurse into those
Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump))
}
Expr::Alias(_) => Ok(Transformed::new(
expr.unalias(),
true,
TreeNodeRecursion::Jump,
)),
_ => Ok(Transformed::no(expr)),
}
})
.data()?;

Filter::try_new(predicate, input).map(LogicalPlan::Filter)
}
LogicalPlan::Repartition(_) => Ok(self),
LogicalPlan::Window(Window {
input,
window_expr,
schema: _,
}) => Window::try_new(window_expr, input).map(LogicalPlan::Window),
LogicalPlan::Aggregate(Aggregate {
input,
group_expr,
aggr_expr,
schema: _,
}) => Aggregate::try_new(input, group_expr, aggr_expr)
.map(LogicalPlan::Aggregate),
LogicalPlan::Sort(_) => Ok(self),
LogicalPlan::Join(Join {
left,
right,
filter,
join_type,
join_constraint,
on,
schema: _,
null_equals_null,
}) => {
let schema =
build_join_schema(left.schema(), right.schema(), &join_type)?;

let new_on: Vec<_> = on
.into_iter()
.map(|equi_expr| {
// SimplifyExpression rule may add alias to the equi_expr.
(equi_expr.0.unalias(), equi_expr.1.unalias())
})
.collect();

Ok(LogicalPlan::Join(Join {
left,
right,
join_type,
join_constraint,
on: new_on,
filter,
schema: DFSchemaRef::new(schema),
null_equals_null,
}))
}
LogicalPlan::CrossJoin(CrossJoin {
left,
right,
schema: _,
}) => {
let join_schema =
build_join_schema(left.schema(), right.schema(), &JoinType::Inner)?;

Ok(LogicalPlan::CrossJoin(CrossJoin {
left,
right,
schema: join_schema.into(),
}))
}
LogicalPlan::Subquery(_) => Ok(self),
LogicalPlan::SubqueryAlias(SubqueryAlias {
input,
alias,
schema: _,
}) => SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias),
LogicalPlan::Limit(_) => Ok(self),
LogicalPlan::Ddl(_) => Ok(self),
LogicalPlan::Extension(Extension { node }) => {
// todo make an API that does not require cloning
// This requires a copy of the extension nodes expressions and inputs
let expr = node.expressions();
let inputs: Vec<_> = node.inputs().into_iter().cloned().collect();
Ok(LogicalPlan::Extension(Extension {
node: node.from_template(&expr, &inputs),
}))
}
LogicalPlan::Union(Union { inputs, schema }) => {
let input_schema = inputs[0].schema();
// If inputs are not pruned do not change schema
// TODO this seems wrong (shouldn't we always use the schema of the input?)
let schema = if schema.fields().len() == input_schema.fields().len() {
schema.clone()
} else {
input_schema.clone()
};
Ok(LogicalPlan::Union(Union { inputs, schema }))
}
LogicalPlan::Distinct(distinct) => {
let distinct = match distinct {
Distinct::All(input) => Distinct::All(input),
Distinct::On(DistinctOn {
on_expr,
select_expr,
sort_expr,
input,
schema: _,
}) => Distinct::On(DistinctOn::try_new(
on_expr,
select_expr,
sort_expr,
input,
)?),
};
Ok(LogicalPlan::Distinct(distinct))
}
LogicalPlan::RecursiveQuery(_) => Ok(self),
LogicalPlan::Analyze(_) => Ok(self),
LogicalPlan::Explain(_) => Ok(self),
LogicalPlan::Prepare(_) => Ok(self),
LogicalPlan::TableScan(_) => Ok(self),
LogicalPlan::EmptyRelation(_) => Ok(self),
LogicalPlan::Statement(_) => Ok(self),
LogicalPlan::DescribeTable(_) => Ok(self),
LogicalPlan::Unnest(Unnest {
input,
columns,
schema: _,
options,
}) => {
// Update schema with unnested column type.
unnest_with_options(unwrap_arc(input), columns, options)
}
}
}

/// Returns a new `LogicalPlan` based on `self` with inputs and
/// expressions replaced.
///
Expand Down Expand Up @@ -2490,6 +2685,11 @@ impl Subquery {
outer_ref_columns: self.outer_ref_columns.clone(),
}
}

/// Returns the type of the first column of the subquery
pub fn data_type(&self) -> &DataType {
self.subquery.schema().fields()[0].data_type()
}
}

impl Debug for Subquery {
Expand Down
13 changes: 12 additions & 1 deletion datafusion/optimizer/src/analyzer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use log::debug;

use datafusion_common::config::ConfigOptions;
use datafusion_common::instant::Instant;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::expr::Exists;
use datafusion_expr::expr::InSubquery;
Expand Down Expand Up @@ -62,6 +62,17 @@ pub trait AnalyzerRule {
/// Rewrite `plan`
fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> Result<LogicalPlan>;

/// Rewrite a plan indicating if the plan was modified
/// The default implementation calls `analyze`
fn rewrite(
&self,
plan: LogicalPlan,
config: &ConfigOptions,
) -> Result<Transformed<LogicalPlan>> {
let new_plan = self.analyze(plan, config)?;
Ok(Transformed::yes(new_plan))
}

/// A human readable name for this analyzer rule
fn name(&self) -> &str;
}
Expand Down
Loading
Loading