Skip to content

feat: Support parsing subqueries with OuterReferenceColumn belongs to non-adjacent outer relations #16186

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
17 changes: 11 additions & 6 deletions datafusion/expr/src/logical_plan/invariants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,21 +200,26 @@ pub fn check_subquery_expr(
}
}?;
match outer_plan {
LogicalPlan::Projection(_)
| LogicalPlan::Filter(_) => Ok(()),
LogicalPlan::Aggregate(Aggregate { group_expr, aggr_expr, .. }) => {
LogicalPlan::Projection(_) | LogicalPlan::Filter(_) => Ok(()),
LogicalPlan::Aggregate(Aggregate {
group_expr,
aggr_expr,
..
}) => {
if group_expr.contains(expr) && !aggr_expr.contains(expr) {
// TODO revisit this validation logic
plan_err!(
"Correlated scalar subquery in the GROUP BY clause must also be in the aggregate expressions"
"Correlated scalar subquery in the GROUP BY clause must \
also be in the aggregate expressions"
)
} else {
Ok(())
}
}
_ => plan_err!(
"Correlated scalar subquery can only be used in Projection, Filter, Aggregate plan nodes"
)
"Correlated scalar subquery can only be used in Projection, \
Filter, Aggregate plan nodes"
),
}?;
}
check_correlations_in_subquery(inner_plan)
Expand Down
16 changes: 15 additions & 1 deletion datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1089,7 +1089,13 @@ impl OptimizerRule for PushDownFilter {
let (volatile_filters, non_volatile_filters): (Vec<&Expr>, Vec<&Expr>) =
filter_predicates
.into_iter()
.partition(|pred| pred.is_volatile());
// TODO: subquery decorrelation sometimes cannot decorrelated all the expr
// (i.e in the case of recursive subquery)
// this function may accidentally pushdown the subquery expr as well
// until then, we have to exclude these exprs here
.partition(|pred| {
pred.is_volatile() || has_scalar_subquery(pred)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when we allow nested subquery, the final plan reaches this optimizor and the predicate on scalar_subquery can be accidentally push down

});

// Check which non-volatile filters are supported by source
let supported_filters = scan
Expand Down Expand Up @@ -1382,6 +1388,14 @@ fn contain(e: &Expr, check_map: &HashMap<String, Expr>) -> bool {
is_contain
}

fn has_scalar_subquery(expr: &Expr) -> bool {
expr.exists(|e| match e {
Expr::ScalarSubquery(_) => Ok(true),
_ => Ok(false),
})
.unwrap()
}

#[cfg(test)]
mod tests {
use std::any::Any;
Expand Down
62 changes: 35 additions & 27 deletions datafusion/sql/src/expr/identifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
}

// Check the outer query schema
if let Some(outer) = planner_context.outer_query_schema() {
for outer in planner_context.outer_queries_schemas() {
if let Ok((qualifier, field)) =
outer.qualified_field_with_unqualified_name(normalize_ident.as_str())
{
Expand Down Expand Up @@ -165,35 +165,43 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
not_impl_err!("compound identifier: {ids:?}")
} else {
// Check the outer_query_schema and try to find a match
if let Some(outer) = planner_context.outer_query_schema() {
let search_result = search_dfschema(&ids, outer);
match search_result {
// Found matching field with spare identifier(s) for nested field(s) in structure
Some((field, qualifier, nested_names))
if !nested_names.is_empty() =>
{
// TODO: remove when can support nested identifiers for OuterReferenceColumn
not_impl_err!(
let outer_schemas = planner_context.outer_queries_schemas();
let mut maybe_result = None;
if !outer_schemas.is_empty() {
for outer in planner_context.outer_queries_schemas() {
let search_result = search_dfschema(&ids, &outer);
let result = match search_result {
// Found matching field with spare identifier(s) for nested field(s) in structure
Some((field, qualifier, nested_names))
if !nested_names.is_empty() =>
{
// TODO: remove when can support nested identifiers for OuterReferenceColumn
not_impl_err!(
"Nested identifiers are not yet supported for OuterReferenceColumn {}",
Column::from((qualifier, field)).quoted_flat_name()
)
}
// Found matching field with no spare identifier(s)
Some((field, qualifier, _nested_names)) => {
// Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column
Ok(Expr::OuterReferenceColumn(
field.data_type().clone(),
Column::from((qualifier, field)),
))
}
// Found no matching field, will return a default
None => {
let s = &ids[0..ids.len()];
// safe unwrap as s can never be empty or exceed the bounds
let (relation, column_name) =
form_identifier(s).unwrap();
Ok(Expr::Column(Column::new(relation, column_name)))
}
}
// Found matching field with no spare identifier(s)
Some((field, qualifier, _nested_names)) => {
// Found an exact match on a qualified name in the outer plan schema, so this is an outer reference column
Ok(Expr::OuterReferenceColumn(
field.data_type().clone(),
Column::from((qualifier, field)),
))
}
// Found no matching field, will return a default
None => continue,
};
maybe_result = Some(result);
break;
}
if let Some(result) = maybe_result {
result
} else {
let s = &ids[0..ids.len()];
// safe unwrap as s can never be empty or exceed the bounds
let (relation, column_name) = form_identifier(s).unwrap();
Ok(Expr::Column(Column::new(relation, column_name)))
}
} else {
let s = &ids[0..ids.len()];
Expand Down
16 changes: 7 additions & 9 deletions datafusion/sql/src/expr/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let old_outer_query_schema =
planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
planner_context.append_outer_query_schema(input_schema.clone().into());
let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.set_outer_query_schema(old_outer_query_schema);
planner_context.pop_outer_query_schema();
Ok(Expr::Exists(Exists {
subquery: Subquery {
subquery: Arc::new(sub_plan),
Expand All @@ -54,8 +53,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let old_outer_query_schema =
planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
planner_context.append_outer_query_schema(input_schema.clone().into());

let mut spans = Spans::new();
if let SetExpr::Select(select) = subquery.body.as_ref() {
Expand All @@ -70,7 +68,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {

let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.set_outer_query_schema(old_outer_query_schema);
planner_context.pop_outer_query_schema();

self.validate_single_column(
&sub_plan,
Expand Down Expand Up @@ -98,8 +96,8 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let old_outer_query_schema =
planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
planner_context.append_outer_query_schema(input_schema.clone().into());

let mut spans = Spans::new();
if let SetExpr::Select(select) = subquery.body.as_ref() {
for item in &select.projection {
Expand All @@ -112,7 +110,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
}
let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.set_outer_query_schema(old_outer_query_schema);
planner_context.pop_outer_query_schema();

self.validate_single_column(
&sub_plan,
Expand Down
44 changes: 42 additions & 2 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,15 @@ pub struct PlannerContext {
/// Map of CTE name to logical plan of the WITH clause.
/// Use `Arc<LogicalPlan>` to allow cheap cloning
ctes: HashMap<String, Arc<LogicalPlan>>,

/// The queries schemas of outer query relations, used to resolve the outer referenced
/// columns in subquery (recursive aware)
outer_queries_schemas_stack: Vec<DFSchemaRef>,

/// The query schema of the outer query plan, used to resolve the columns in subquery
/// This field is maintained to support deprecated functions
/// `outer_query_schema` and `set_outer_query_schema`
/// which is only aware of the adjacent outer relation
outer_query_schema: Option<DFSchemaRef>,
/// The joined schemas of all FROM clauses planned so far. When planning LATERAL
/// FROM clauses, this should become a suffix of the `outer_query_schema`.
Expand All @@ -220,6 +228,7 @@ impl PlannerContext {
prepare_param_data_types: Arc::new(vec![]),
ctes: HashMap::new(),
outer_query_schema: None,
outer_queries_schemas_stack: vec![],
outer_from_schema: None,
create_table_schema: None,
}
Expand All @@ -234,13 +243,22 @@ impl PlannerContext {
self
}

// Return a reference to the outer query's schema
/// Return a reference to the outer query's schema
/// This function should not be used together with
/// `outer_queries_schemas`, `append_outer_query_schema`
/// `latest_outer_query_schema` and `pop_outer_query_schema`
#[deprecated(note = "Use outer_queries_schemas instead")]
pub fn outer_query_schema(&self) -> Option<&DFSchema> {
Copy link
Contributor

@logan-keede logan-keede May 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a breaking change. You can either make a new function or have the PR marked with API changes label. with the first option being more preferable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh right 👍

self.outer_query_schema.as_ref().map(|s| s.as_ref())
}

/// Sets the outer query schema, returning the existing one, if
/// any
/// any, this function should not be used together with
/// `outer_queries_schemas`, `append_outer_query_schema`
/// `latest_outer_query_schema` and `pop_outer_query_schema`
#[deprecated(
note = "This struct is now aware of a stack of schemas, check pop_outer_query_schema"
)]
pub fn set_outer_query_schema(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above, but this can simply be deprecated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, old methods were deprecated

&mut self,
mut schema: Option<DFSchemaRef>,
Expand All @@ -249,6 +267,28 @@ impl PlannerContext {
schema
}

/// Return the stack of outer relations' schemas, the outer most
/// relation are at the first entry
pub fn outer_queries_schemas(&self) -> Vec<DFSchemaRef> {
self.outer_queries_schemas_stack.to_vec()
}

/// Sets the outer query schema, returning the existing one, if
/// any
pub fn append_outer_query_schema(&mut self, schema: DFSchemaRef) {
self.outer_queries_schemas_stack.push(schema);
}

/// The schema of the adjacent outer relation
pub fn latest_outer_query_schema(&mut self) -> Option<DFSchemaRef> {
self.outer_queries_schemas_stack.last().cloned()
}

/// Remove the schema of the adjacent outer relation
pub fn pop_outer_query_schema(&mut self) -> Option<DFSchemaRef> {
self.outer_queries_schemas_stack.pop()
}

pub fn set_table_schema(
&mut self,
mut schema: Option<DFSchemaRef>,
Expand Down
18 changes: 11 additions & 7 deletions datafusion/sql/src/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,20 +184,24 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
let old_from_schema = planner_context
.set_outer_from_schema(None)
.unwrap_or_else(|| Arc::new(DFSchema::empty()));
let new_query_schema = match planner_context.outer_query_schema() {
Some(old_query_schema) => {
let outer_query_schema = planner_context.pop_outer_query_schema();
let new_query_schema = match outer_query_schema {
Some(ref old_query_schema) => {
let mut new_query_schema = old_from_schema.as_ref().clone();
new_query_schema.merge(old_query_schema);
Some(Arc::new(new_query_schema))
new_query_schema.merge(old_query_schema.as_ref());
Arc::new(new_query_schema)
}
None => Some(Arc::clone(&old_from_schema)),
None => Arc::clone(&old_from_schema),
};
let old_query_schema = planner_context.set_outer_query_schema(new_query_schema);
planner_context.append_outer_query_schema(new_query_schema);

let plan = self.create_relation(subquery, planner_context)?;
let outer_ref_columns = plan.all_out_ref_exprs();

planner_context.set_outer_query_schema(old_query_schema);
planner_context.pop_outer_query_schema();
if let Some(schema) = outer_query_schema {
planner_context.append_outer_query_schema(schema);
}
planner_context.set_outer_from_schema(Some(old_from_schema));

// We can omit the subquery wrapper if there are no columns
Expand Down
20 changes: 13 additions & 7 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::utils::{

use datafusion_common::error::DataFusionErrorBuilder;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{not_impl_err, plan_err, Result};
use datafusion_common::{not_impl_err, plan_err, DFSchema, Result};
use datafusion_common::{RecursionUnnestOption, UnnestOptions};
use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions};
use datafusion_expr::expr_rewriter::{
Expand Down Expand Up @@ -506,12 +506,8 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
match selection {
Some(predicate_expr) => {
let fallback_schemas = plan.fallback_normalize_schemas();
let outer_query_schema = planner_context.outer_query_schema().cloned();
let outer_query_schema_vec = outer_query_schema
.as_ref()
.map(|schema| vec![schema])
.unwrap_or_else(Vec::new);

let outer_query_schema_vec = planner_context.outer_queries_schemas();
let filter_expr =
self.sql_to_expr(predicate_expr, plan.schema(), planner_context)?;

Expand All @@ -526,9 +522,19 @@ impl<S: ContextProvider> SqlToRel<'_, S> {

let mut using_columns = HashSet::new();
expr_to_columns(&filter_expr, &mut using_columns)?;
let mut schema_stack: Vec<Vec<&DFSchema>> =
vec![vec![plan.schema()], fallback_schemas];
for sc in outer_query_schema_vec.iter().rev() {
schema_stack.push(vec![sc.as_ref()]);
}

let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
filter_expr,
&[&[plan.schema()], &fallback_schemas, &outer_query_schema_vec],
schema_stack
.iter()
.map(|sc| sc.as_slice())
.collect::<Vec<&[&DFSchema]>>()
.as_slice(),
&[using_columns],
)?;

Expand Down
24 changes: 24 additions & 0 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4689,6 +4689,30 @@ logical_plan
08)----------TableScan: j3 projection=[j3_string, j3_id]
physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Int32, Column { relation: Some(Bare { table: "j1" }), name: "j1_id" })

# 2 nested lateral join with the deepest join referencing the outer most relation
query TT
explain SELECT * FROM j1 j1_outer, LATERAL (
SELECT * FROM j1 j1_inner, LATERAL (
SELECT * FROM j2 WHERE j1_inner.j1_id = j2_id and j1_outer.j1_id=j2_id
) as j2
) as j2;
----
logical_plan
01)Cross Join:
02)--SubqueryAlias: j1_outer
03)----TableScan: j1 projection=[j1_string, j1_id]
04)--SubqueryAlias: j2
05)----Subquery:
06)------Cross Join:
07)--------SubqueryAlias: j1_inner
08)----------TableScan: j1 projection=[j1_string, j1_id]
09)--------SubqueryAlias: j2
10)----------Subquery:
11)------------Filter: outer_ref(j1_inner.j1_id) = j2.j2_id AND outer_ref(j1_outer.j1_id) = j2.j2_id
12)--------------TableScan: j2 projection=[j2_string, j2_id]
physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(Int32, Column { relation: Some(Bare { table: "j1_inner" }), name: "j1_id" })


query TT
explain SELECT * FROM j1, LATERAL (SELECT 1) AS j2;
----
Expand Down
Loading