-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-51752][SQL] Enable rCTE referencing from within a CTE #50546
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6ff35e0
0d80b72
b67fc23
dff286e
efaae85
75ef4c9
a968531
039beee
c9bf2d5
3a7fcc8
c4a28b2
14771bd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -87,11 +87,11 @@ object CTESubstitution extends Rule[LogicalPlan] { | |
LegacyBehaviorPolicy.withName(conf.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match { | ||
case LegacyBehaviorPolicy.EXCEPTION => | ||
assertNoNameConflictsInCTE(plan) | ||
traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs) | ||
traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs, None) | ||
case LegacyBehaviorPolicy.LEGACY => | ||
(legacyTraverseAndSubstituteCTE(plan, cteDefs), None) | ||
case LegacyBehaviorPolicy.CORRECTED => | ||
traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs) | ||
traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs, None) | ||
} | ||
if (cteDefs.isEmpty) { | ||
substituted | ||
|
@@ -162,7 +162,7 @@ object CTESubstitution extends Rule[LogicalPlan] { | |
messageParameters = Map.empty) | ||
} | ||
val resolvedCTERelations = resolveCTERelations(relations, isLegacy = true, | ||
forceInline = false, Seq.empty, cteDefs, allowRecursion) | ||
forceInline = false, Seq.empty, cteDefs, None, allowRecursion) | ||
substituteCTE(child, alwaysInline = true, resolvedCTERelations, None) | ||
} | ||
} | ||
|
@@ -202,14 +202,18 @@ object CTESubstitution extends Rule[LogicalPlan] { | |
* @param forceInline always inline the CTE relations if this is true | ||
* @param outerCTEDefs already resolved outer CTE definitions with names | ||
* @param cteDefs all accumulated CTE definitions | ||
* @param recursiveCTERelationAncestor contains information of whether we are in a recursive CTE, | ||
* as well as what CTE that is. | ||
* @return the plan where CTE substitution is applied and optionally the last substituted `With` | ||
* where CTE definitions will be gathered to | ||
*/ | ||
private def traverseAndSubstituteCTE( | ||
plan: LogicalPlan, | ||
forceInline: Boolean, | ||
outerCTEDefs: Seq[(String, CTERelationDef)], | ||
cteDefs: ArrayBuffer[CTERelationDef]): (LogicalPlan, Option[LogicalPlan]) = { | ||
cteDefs: ArrayBuffer[CTERelationDef], | ||
recursiveCTERelationAncestor: Option[(String, CTERelationDef)] | ||
): (LogicalPlan, Option[LogicalPlan]) = { | ||
var firstSubstituted: Option[LogicalPlan] = None | ||
val newPlan = plan.resolveOperatorsDownWithPruning( | ||
_.containsAnyPattern(UNRESOLVED_WITH, PLAN_EXPRESSION)) { | ||
|
@@ -220,18 +224,31 @@ object CTESubstitution extends Rule[LogicalPlan] { | |
errorClass = "RECURSIVE_CTE_WHEN_INLINING_IS_FORCED", | ||
messageParameters = Map.empty) | ||
} | ||
val resolvedCTERelations = | ||
|
||
val tempCteDefs = ArrayBuffer.empty[CTERelationDef] | ||
val resolvedCTERelations = if (recursiveCTERelationAncestor.isDefined) { | ||
resolveCTERelations(relations, isLegacy = false, forceInline = false, outerCTEDefs, | ||
tempCteDefs, recursiveCTERelationAncestor, allowRecursion) ++ outerCTEDefs | ||
} else { | ||
resolveCTERelations(relations, isLegacy = false, forceInline, outerCTEDefs, cteDefs, | ||
allowRecursion) ++ outerCTEDefs | ||
recursiveCTERelationAncestor, allowRecursion) ++ outerCTEDefs | ||
} | ||
val substituted = substituteCTE( | ||
traverseAndSubstituteCTE(child, forceInline, resolvedCTERelations, cteDefs)._1, | ||
traverseAndSubstituteCTE(child, forceInline, resolvedCTERelations, cteDefs, | ||
recursiveCTERelationAncestor)._1, | ||
// If we are resolving CTEs in a recursive CTE, we need to inline it in case the | ||
// CTE contains the self reference. | ||
forceInline, | ||
resolvedCTERelations, | ||
None) | ||
if (firstSubstituted.isEmpty) { | ||
firstSubstituted = Some(substituted) | ||
} | ||
substituted | ||
if (recursiveCTERelationAncestor.isDefined) { | ||
withCTEDefs(substituted, tempCteDefs.toSeq) | ||
} else { | ||
substituted | ||
} | ||
|
||
case other => | ||
other.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) { | ||
|
@@ -247,6 +264,7 @@ object CTESubstitution extends Rule[LogicalPlan] { | |
forceInline: Boolean, | ||
outerCTEDefs: Seq[(String, CTERelationDef)], | ||
cteDefs: ArrayBuffer[CTERelationDef], | ||
recursiveCTERelationAncestor: Option[(String, CTERelationDef)], | ||
allowRecursion: Boolean): Seq[(String, CTERelationDef)] = { | ||
val alwaysInline = isLegacy || forceInline | ||
var resolvedCTERelations = if (alwaysInline) { | ||
|
@@ -255,6 +273,21 @@ object CTESubstitution extends Rule[LogicalPlan] { | |
outerCTEDefs | ||
} | ||
for ((name, relation) <- relations) { | ||
// If recursion is allowed (RECURSIVE keyword specified) | ||
// then it has higher priority than outer or previous relations. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have a question about semantics here. Let's see an example
The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It references the inner non recursive one (just checked). The comment just says that the recursive reference has a higher priority than the outer ones or the ones within in the same WITH statement. |
||
// Therefore, we construct a `CTERelationDef` for the current relation. | ||
// Later if we encounter unresolved relation which we need to find which CTE Def it is | ||
// referencing to, we first check if it is a reference to this one. If yes, then we set the | ||
// reference as being recursive. | ||
val recursiveCTERelation = if (allowRecursion) { | ||
Some(name -> CTERelationDef(relation)) | ||
} else { | ||
// If there is an outer recursive CTE relative to this one, and this one isn't recursive, | ||
// then the self reference with the first-check priority is going to be the CteRelationDef | ||
// of this recursive ancestor. | ||
recursiveCTERelationAncestor | ||
} | ||
|
||
val innerCTEResolved = if (isLegacy) { | ||
// In legacy mode, outer CTE relations take precedence. Here we don't resolve the inner | ||
// `With` nodes, later we will substitute `UnresolvedRelation`s with outer CTE relations. | ||
|
@@ -305,26 +338,20 @@ object CTESubstitution extends Rule[LogicalPlan] { | |
} else { | ||
resolvedCTERelations | ||
} | ||
traverseAndSubstituteCTE(relation, forceInline, nonConflictingCTERelations, cteDefs)._1 | ||
traverseAndSubstituteCTE(relation, forceInline, nonConflictingCTERelations, | ||
cteDefs, recursiveCTERelation)._1 | ||
} | ||
|
||
// If recursion is allowed (RECURSIVE keyword specified) | ||
// then it has higher priority than outer or previous relations. | ||
// Therefore, we construct a `CTERelationDef` for the current relation. | ||
// Later if we encounter unresolved relation which we need to find which CTE Def it is | ||
// referencing to, we first check if it is a reference to this one. If yes, then we set the | ||
// reference as being recursive. | ||
val recursiveCTERelation = if (allowRecursion) { | ||
Some(name -> CTERelationDef(relation)) | ||
} else { | ||
None | ||
} | ||
// CTE definition can reference a previous one or itself if recursion allowed. | ||
val substituted = substituteCTE(innerCTEResolved, alwaysInline, | ||
resolvedCTERelations, recursiveCTERelation) | ||
val cteRelation = recursiveCTERelation | ||
val cteRelation = if (allowRecursion) { | ||
recursiveCTERelation | ||
.map(_._2.copy(child = substituted)) | ||
.getOrElse(CTERelationDef(substituted)) | ||
} else { | ||
CTERelationDef(substituted) | ||
} | ||
if (!alwaysInline) { | ||
cteDefs += cteRelation | ||
} | ||
|
Uh oh!
There was an error while loading. Please reload this page.