From 6ff35e0a7424d615fcb0b635e4e81a0bd0568b03 Mon Sep 17 00:00:00 2001 From: pavle-martinovic_data Date: Wed, 9 Apr 2025 09:15:58 +0200 Subject: [PATCH 01/11] Fix CTE within rCTE referencing rCTE --- .../catalyst/analysis/CTESubstitution.scala | 51 ++++++---- .../analyzer-results/cte-recursion.sql.out | 92 +++++++++++++------ .../sql-tests/results/cte-recursion.sql.out | 42 ++++----- 3 files changed, 112 insertions(+), 73 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index 4bc3300b75747..adcdf504c86a4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -87,11 +87,11 @@ object CTESubstitution extends Rule[LogicalPlan] { LegacyBehaviorPolicy.withName(conf.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match { case LegacyBehaviorPolicy.EXCEPTION => assertNoNameConflictsInCTE(plan) - traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs) + traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs, None) case LegacyBehaviorPolicy.LEGACY => (legacyTraverseAndSubstituteCTE(plan, cteDefs), None) case LegacyBehaviorPolicy.CORRECTED => - traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs) + traverseAndSubstituteCTE(plan, forceInline, Seq.empty, cteDefs, None) } if (cteDefs.isEmpty) { substituted @@ -162,7 +162,7 @@ object CTESubstitution extends Rule[LogicalPlan] { messageParameters = Map.empty) } val resolvedCTERelations = resolveCTERelations(relations, isLegacy = true, - forceInline = false, Seq.empty, cteDefs, allowRecursion) + forceInline = false, Seq.empty, cteDefs, None, allowRecursion) substituteCTE(child, alwaysInline = true, resolvedCTERelations, None) } } @@ -202,6 +202,7 @@ object CTESubstitution extends Rule[LogicalPlan] { * @param forceInline always inline the CTE relations if this is true * @param outerCTEDefs already resolved outer CTE definitions with names * @param cteDefs all accumulated CTE definitions + * @param recursiveCTERelation contains information of whether we are inside of a recursive CTE * @return the plan where CTE substitution is applied and optionally the last substituted `With` * where CTE definitions will be gathered to */ @@ -209,7 +210,9 @@ object CTESubstitution extends Rule[LogicalPlan] { plan: LogicalPlan, forceInline: Boolean, outerCTEDefs: Seq[(String, CTERelationDef)], - cteDefs: ArrayBuffer[CTERelationDef]): (LogicalPlan, Option[LogicalPlan]) = { + cteDefs: ArrayBuffer[CTERelationDef], + recursiveCTERelation: Option[(String, CTERelationDef)]): + (LogicalPlan, Option[LogicalPlan]) = { var firstSubstituted: Option[LogicalPlan] = None val newPlan = plan.resolveOperatorsDownWithPruning( _.containsAnyPattern(UNRESOLVED_WITH, PLAN_EXPRESSION)) { @@ -222,10 +225,11 @@ object CTESubstitution extends Rule[LogicalPlan] { } val resolvedCTERelations = resolveCTERelations(relations, isLegacy = false, forceInline, outerCTEDefs, cteDefs, - allowRecursion) ++ outerCTEDefs + recursiveCTERelation, allowRecursion) ++ outerCTEDefs val substituted = substituteCTE( - traverseAndSubstituteCTE(child, forceInline, resolvedCTERelations, cteDefs)._1, - forceInline, + traverseAndSubstituteCTE(child, forceInline, resolvedCTERelations, cteDefs, + recursiveCTERelation)._1, + forceInline || recursiveCTERelation.isDefined, resolvedCTERelations, None) if (firstSubstituted.isEmpty) { @@ -247,6 +251,7 @@ object CTESubstitution extends Rule[LogicalPlan] { forceInline: Boolean, outerCTEDefs: Seq[(String, CTERelationDef)], cteDefs: ArrayBuffer[CTERelationDef], + recursiveCTERelationAncestor: Option[(String, CTERelationDef)], allowRecursion: Boolean): Seq[(String, CTERelationDef)] = { val alwaysInline = isLegacy || forceInline var resolvedCTERelations = if (alwaysInline) { @@ -255,6 +260,18 @@ object CTESubstitution extends Rule[LogicalPlan] { outerCTEDefs } for ((name, relation) <- relations) { + // If recursion is allowed (RECURSIVE keyword specified) + // then it has higher priority than outer or previous relations. + // Therefore, we construct a `CTERelationDef` for the current relation. + // Later if we encounter unresolved relation which we need to find which CTE Def it is + // referencing to, we first check if it is a reference to this one. If yes, then we set the + // reference as being recursive. + val recursiveCTERelation = if (allowRecursion) { + Some(name -> CTERelationDef(relation)) + } else { + recursiveCTERelationAncestor + } + val innerCTEResolved = if (isLegacy) { // In legacy mode, outer CTE relations take precedence. Here we don't resolve the inner // `With` nodes, later we will substitute `UnresolvedRelation`s with outer CTE relations. @@ -305,26 +322,20 @@ object CTESubstitution extends Rule[LogicalPlan] { } else { resolvedCTERelations } - traverseAndSubstituteCTE(relation, forceInline, nonConflictingCTERelations, cteDefs)._1 + traverseAndSubstituteCTE(relation, forceInline, nonConflictingCTERelations, + cteDefs, recursiveCTERelation)._1 } - // If recursion is allowed (RECURSIVE keyword specified) - // then it has higher priority than outer or previous relations. - // Therefore, we construct a `CTERelationDef` for the current relation. - // Later if we encounter unresolved relation which we need to find which CTE Def it is - // referencing to, we first check if it is a reference to this one. If yes, then we set the - // reference as being recursive. - val recursiveCTERelation = if (allowRecursion) { - Some(name -> CTERelationDef(relation)) - } else { - None - } // CTE definition can reference a previous one or itself if recursion allowed. val substituted = substituteCTE(innerCTEResolved, alwaysInline, resolvedCTERelations, recursiveCTERelation) - val cteRelation = recursiveCTERelation + val cteRelation = if (allowRecursion) { + recursiveCTERelation .map(_._2.copy(child = substituted)) .getOrElse(CTERelationDef(substituted)) + } else { + CTERelationDef(substituted) + } if (!alwaysInline) { cteDefs += cteRelation } diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/cte-recursion.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/cte-recursion.sql.out index 88f3f675c87b2..2d1137f2ee56d 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/cte-recursion.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/cte-recursion.sql.out @@ -385,19 +385,16 @@ WITH RECURSIVE ) SELECT * FROM t1 -- !query analysis -org.apache.spark.sql.catalyst.ExtendedAnalysisException +org.apache.spark.sql.AnalysisException { - "errorClass" : "TABLE_OR_VIEW_NOT_FOUND", - "sqlState" : "42P01", - "messageParameters" : { - "relationName" : "`t1`" - }, + "errorClass" : "UNION_NOT_SUPPORTED_IN_RECURSIVE_CTE", + "sqlState" : "42836", "queryContext" : [ { "objectType" : "", "objectName" : "", - "startIndex" : 100, - "stopIndex" : 101, - "fragment" : "t1" + "startIndex" : 1, + "stopIndex" : 169, + "fragment" : "WITH RECURSIVE\n t1 AS (\n SELECT 1 AS level\n UNION (\n WITH t2 AS (SELECT level + 1 FROM t1 WHERE level < 10)\n SELECT * FROM t2\n )\n )\nSELECT * FROM t1" } ] } @@ -424,21 +421,36 @@ WITH ) SELECT * FROM t2 -- !query analysis -org.apache.spark.sql.catalyst.ExtendedAnalysisException -{ - "errorClass" : "TABLE_OR_VIEW_NOT_FOUND", - "sqlState" : "42P01", - "messageParameters" : { - "relationName" : "`t1`" - }, - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 159, - "stopIndex" : 160, - "fragment" : "t1" - } ] -} +WithCTE +:- CTERelationDef xxxx, false +: +- SubqueryAlias t1 +: +- Project [1 AS 1#x] +: +- OneRowRelation +:- CTERelationDef xxxx, false +: +- SubqueryAlias t3 +: +- Project [(level#x + 1) AS (level + 1)#x] +: +- Filter (level#x < 10) +: +- SubqueryAlias t1 +: +- CTERelationRef xxxx, true, [level#x], false, false +:- CTERelationDef xxxx, false +: +- SubqueryAlias t1 +: +- UnionLoop xxxx +: :- Project [1 AS level#x] +: : +- OneRowRelation +: +- Project [(level + 1)#x] +: +- SubqueryAlias t3 +: +- Project [(level#x + 1) AS (level + 1)#x] +: +- Filter (level#x < 10) +: +- SubqueryAlias t1 +: +- UnionLoopRef xxxx, [level#x], false +:- CTERelationDef xxxx, false +: +- SubqueryAlias t2 +: +- Project [level#x] +: +- SubqueryAlias t1 +: +- CTERelationRef xxxx, true, [level#x], false, false ++- Project [level#x] + +- SubqueryAlias t2 + +- CTERelationRef xxxx, true, [level#x], false, false -- !query @@ -975,7 +987,13 @@ WithCTE : +- UnionLoop xxxx : :- Project [0 AS outerlevel#x, innerlevel#x] : : +- SubqueryAlias r1 -: : +- CTERelationRef xxxx, true, [innerlevel#x], false, false +: : +- Union false, false +: : :- Project [0 AS innerlevel#x] +: : : +- OneRowRelation +: : +- Project [(innerlevel#x + 1) AS (innerlevel + 1)#x] +: : +- Filter (innerlevel#x < 3) +: : +- SubqueryAlias r1 +: : +- CTERelationRef xxxx, true, [innerlevel#x], false, false : +- Project [(outerlevel#x + 1) AS (outerlevel + 1)#x, innerlevel#x] : +- Filter (outerlevel#x < 3) : +- SubqueryAlias r2 @@ -1015,11 +1033,23 @@ WithCTE : +- Union false, false : :- Project [level#x] : : +- SubqueryAlias r -: : +- CTERelationRef xxxx, true, [level#x], false, false +: : +- Project [col1#x AS level#x] +: : +- Union false, false +: : :- LocalRelation [col1#x] +: : +- Project [(level#x + 1) AS (level + 1)#x] +: : +- Filter (level#x < 3) +: : +- SubqueryAlias r +: : +- CTERelationRef xxxx, true, [level#x], false, false : +- Project [(level#x + 1) AS (level + 1)#x] : +- Filter (level#x < 3) : +- SubqueryAlias r -: +- CTERelationRef xxxx, true, [level#x], false, false +: +- Project [col1#x AS level#x] +: +- Union false, false +: :- LocalRelation [col1#x] +: +- Project [(level#x + 1) AS (level + 1)#x] +: +- Filter (level#x < 3) +: +- SubqueryAlias r +: +- CTERelationRef xxxx, true, [level#x], false, false +- Project [level#x] +- SubqueryAlias r +- CTERelationRef xxxx, true, [level#x], false, false @@ -1055,7 +1085,13 @@ WithCTE : +- UnionLoop xxxx : :- Project [level#x] : : +- SubqueryAlias r -: : +- CTERelationRef xxxx, true, [level#x], false, false +: : +- Project [col1#x AS level#x] +: : +- Union false, false +: : :- LocalRelation [col1#x] +: : +- Project [(level#x + 1) AS (level + 1)#x] +: : +- Filter (level#x < 3) +: : +- SubqueryAlias r +: : +- CTERelationRef xxxx, true, [level#x], false, false : +- Project [(level#x + 1) AS (level + 1)#x] : +- Filter (level#x < 3) : +- SubqueryAlias r diff --git a/sql/core/src/test/resources/sql-tests/results/cte-recursion.sql.out b/sql/core/src/test/resources/sql-tests/results/cte-recursion.sql.out index 90762f81fd51b..9dc21d1760909 100644 --- a/sql/core/src/test/resources/sql-tests/results/cte-recursion.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/cte-recursion.sql.out @@ -367,19 +367,16 @@ SELECT * FROM t1 -- !query schema struct<> -- !query output -org.apache.spark.sql.catalyst.ExtendedAnalysisException +org.apache.spark.sql.AnalysisException { - "errorClass" : "TABLE_OR_VIEW_NOT_FOUND", - "sqlState" : "42P01", - "messageParameters" : { - "relationName" : "`t1`" - }, + "errorClass" : "UNION_NOT_SUPPORTED_IN_RECURSIVE_CTE", + "sqlState" : "42836", "queryContext" : [ { "objectType" : "", "objectName" : "", - "startIndex" : 100, - "stopIndex" : 101, - "fragment" : "t1" + "startIndex" : 1, + "stopIndex" : 169, + "fragment" : "WITH RECURSIVE\n t1 AS (\n SELECT 1 AS level\n UNION (\n WITH t2 AS (SELECT level + 1 FROM t1 WHERE level < 10)\n SELECT * FROM t2\n )\n )\nSELECT * FROM t1" } ] } @@ -408,23 +405,18 @@ WITH ) SELECT * FROM t2 -- !query schema -struct<> +struct -- !query output -org.apache.spark.sql.catalyst.ExtendedAnalysisException -{ - "errorClass" : "TABLE_OR_VIEW_NOT_FOUND", - "sqlState" : "42P01", - "messageParameters" : { - "relationName" : "`t1`" - }, - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 159, - "stopIndex" : 160, - "fragment" : "t1" - } ] -} +1 +10 +2 +3 +4 +5 +6 +7 +8 +9 -- !query From 0d80b72cf9f7a2f428b3cec2b245ce7d50b4f705 Mon Sep 17 00:00:00 2001 From: pavle-martinovic_data Date: Wed, 9 Apr 2025 12:55:00 +0200 Subject: [PATCH 02/11] Do not unravel recursive CTEs --- .../catalyst/analysis/CTESubstitution.scala | 6 +++- .../analyzer-results/cte-recursion.sql.out | 32 +++---------------- 2 files changed, 9 insertions(+), 29 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index adcdf504c86a4..ae441cfd89c50 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -369,7 +369,11 @@ object CTESubstitution extends Rule[LogicalPlan] { .find(r => conf.resolver(r._1, table)) .map { case (_, d) => - if (alwaysInline) { + val hasSelfReference = d.exists { + case CTERelationRef(d.id, _, _, _, _, true, _) => true + case _ => false + } + if (alwaysInline && !hasSelfReference) { d.child } else { // Add a `SubqueryAlias` for hint-resolving rules to match relation names. diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/cte-recursion.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/cte-recursion.sql.out index 2d1137f2ee56d..511c2564f11d9 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/cte-recursion.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/cte-recursion.sql.out @@ -987,13 +987,7 @@ WithCTE : +- UnionLoop xxxx : :- Project [0 AS outerlevel#x, innerlevel#x] : : +- SubqueryAlias r1 -: : +- Union false, false -: : :- Project [0 AS innerlevel#x] -: : : +- OneRowRelation -: : +- Project [(innerlevel#x + 1) AS (innerlevel + 1)#x] -: : +- Filter (innerlevel#x < 3) -: : +- SubqueryAlias r1 -: : +- CTERelationRef xxxx, true, [innerlevel#x], false, false +: : +- CTERelationRef xxxx, true, [innerlevel#x], false, false : +- Project [(outerlevel#x + 1) AS (outerlevel + 1)#x, innerlevel#x] : +- Filter (outerlevel#x < 3) : +- SubqueryAlias r2 @@ -1033,23 +1027,11 @@ WithCTE : +- Union false, false : :- Project [level#x] : : +- SubqueryAlias r -: : +- Project [col1#x AS level#x] -: : +- Union false, false -: : :- LocalRelation [col1#x] -: : +- Project [(level#x + 1) AS (level + 1)#x] -: : +- Filter (level#x < 3) -: : +- SubqueryAlias r -: : +- CTERelationRef xxxx, true, [level#x], false, false +: : +- CTERelationRef xxxx, true, [level#x], false, false : +- Project [(level#x + 1) AS (level + 1)#x] : +- Filter (level#x < 3) : +- SubqueryAlias r -: +- Project [col1#x AS level#x] -: +- Union false, false -: :- LocalRelation [col1#x] -: +- Project [(level#x + 1) AS (level + 1)#x] -: +- Filter (level#x < 3) -: +- SubqueryAlias r -: +- CTERelationRef xxxx, true, [level#x], false, false +: +- CTERelationRef xxxx, true, [level#x], false, false +- Project [level#x] +- SubqueryAlias r +- CTERelationRef xxxx, true, [level#x], false, false @@ -1085,13 +1067,7 @@ WithCTE : +- UnionLoop xxxx : :- Project [level#x] : : +- SubqueryAlias r -: : +- Project [col1#x AS level#x] -: : +- Union false, false -: : :- LocalRelation [col1#x] -: : +- Project [(level#x + 1) AS (level + 1)#x] -: : +- Filter (level#x < 3) -: : +- SubqueryAlias r -: : +- CTERelationRef xxxx, true, [level#x], false, false +: : +- CTERelationRef xxxx, true, [level#x], false, false : +- Project [(level#x + 1) AS (level + 1)#x] : +- Filter (level#x < 3) : +- SubqueryAlias r From b67fc23dc5cf4300bc11229fe070c58fbc8b58eb Mon Sep 17 00:00:00 2001 From: pavle-martinovic_data Date: Wed, 9 Apr 2025 14:30:35 +0200 Subject: [PATCH 03/11] Add comments, rename variables and regen postgreSQL/with golden file --- .../sql/catalyst/analysis/CTESubstitution.scala | 17 ++++++++++++----- .../analyzer-results/postgreSQL/with.sql.out | 2 +- .../sql-tests/results/postgreSQL/with.sql.out | 2 +- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index ae441cfd89c50..895ea9c2c975b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -202,7 +202,8 @@ object CTESubstitution extends Rule[LogicalPlan] { * @param forceInline always inline the CTE relations if this is true * @param outerCTEDefs already resolved outer CTE definitions with names * @param cteDefs all accumulated CTE definitions - * @param recursiveCTERelation contains information of whether we are inside of a recursive CTE + * @param recursiveCTERelationAncestor contains information of whether we are in a recursive CTE, + * as well as what CTE that is. * @return the plan where CTE substitution is applied and optionally the last substituted `With` * where CTE definitions will be gathered to */ @@ -211,7 +212,7 @@ object CTESubstitution extends Rule[LogicalPlan] { forceInline: Boolean, outerCTEDefs: Seq[(String, CTERelationDef)], cteDefs: ArrayBuffer[CTERelationDef], - recursiveCTERelation: Option[(String, CTERelationDef)]): + recursiveCTERelationAncestor: Option[(String, CTERelationDef)]): (LogicalPlan, Option[LogicalPlan]) = { var firstSubstituted: Option[LogicalPlan] = None val newPlan = plan.resolveOperatorsDownWithPruning( @@ -225,11 +226,13 @@ object CTESubstitution extends Rule[LogicalPlan] { } val resolvedCTERelations = resolveCTERelations(relations, isLegacy = false, forceInline, outerCTEDefs, cteDefs, - recursiveCTERelation, allowRecursion) ++ outerCTEDefs + recursiveCTERelationAncestor, allowRecursion) ++ outerCTEDefs val substituted = substituteCTE( traverseAndSubstituteCTE(child, forceInline, resolvedCTERelations, cteDefs, - recursiveCTERelation)._1, - forceInline || recursiveCTERelation.isDefined, + recursiveCTERelationAncestor)._1, + // If we are resolving CTEs in a recursive CTE, we need to inline it in case the + // CTE contains the self reference. + forceInline || recursiveCTERelationAncestor.isDefined, resolvedCTERelations, None) if (firstSubstituted.isEmpty) { @@ -269,6 +272,9 @@ object CTESubstitution extends Rule[LogicalPlan] { val recursiveCTERelation = if (allowRecursion) { Some(name -> CTERelationDef(relation)) } else { + // If there is an outer recursive CTE relative to this one, and this one isn't recursive, + // then the self reference with the first-check priority is going to be the CteRelationDef + // of this recursive ancestor. recursiveCTERelationAncestor } @@ -369,6 +375,7 @@ object CTESubstitution extends Rule[LogicalPlan] { .find(r => conf.resolver(r._1, table)) .map { case (_, d) => + // We don't ever want to inline rCTEs. val hasSelfReference = d.exists { case CTERelationRef(d.id, _, _, _, _, true, _) => true case _ => false diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/with.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/with.sql.out index 2fdced93217df..faf02a862d066 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/with.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/with.sql.out @@ -1622,7 +1622,7 @@ SELECT * FROM outermost ORDER BY 1 -- !query analysis org.apache.spark.sql.AnalysisException { - "errorClass" : "UNION_NOT_SUPPORTED_IN_RECURSIVE_CTE", + "errorClass" : "INVALID_RECURSIVE_REFERENCE.NUMBER", "sqlState" : "42836", "queryContext" : [ { "objectType" : "", diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/with.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/with.sql.out index 53d1be3b14473..cfb021aeca75e 100644 --- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/with.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/with.sql.out @@ -1381,7 +1381,7 @@ struct<> -- !query output org.apache.spark.sql.AnalysisException { - "errorClass" : "UNION_NOT_SUPPORTED_IN_RECURSIVE_CTE", + "errorClass" : "INVALID_RECURSIVE_REFERENCE.NUMBER", "sqlState" : "42836", "queryContext" : [ { "objectType" : "", From dff286e21ca26895e1432edfbaa84a55b35801d1 Mon Sep 17 00:00:00 2001 From: pavle-martinovic_data Date: Fri, 11 Apr 2025 10:01:41 +0200 Subject: [PATCH 04/11] replace inlining with inplace CTE substitution --- .../catalyst/analysis/CTESubstitution.scala | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index 895ea9c2c975b..d19f0baf4cfba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -224,21 +224,31 @@ object CTESubstitution extends Rule[LogicalPlan] { errorClass = "RECURSIVE_CTE_WHEN_INLINING_IS_FORCED", messageParameters = Map.empty) } - val resolvedCTERelations = + + val tempCteDefs = ArrayBuffer.empty[CTERelationDef] + val resolvedCTERelations = if (recursiveCTERelationAncestor.isDefined) { + resolveCTERelations(relations, isLegacy = false, forceInline, outerCTEDefs, tempCteDefs, + recursiveCTERelationAncestor, allowRecursion) ++ outerCTEDefs + } else { resolveCTERelations(relations, isLegacy = false, forceInline, outerCTEDefs, cteDefs, recursiveCTERelationAncestor, allowRecursion) ++ outerCTEDefs + } val substituted = substituteCTE( traverseAndSubstituteCTE(child, forceInline, resolvedCTERelations, cteDefs, recursiveCTERelationAncestor)._1, // If we are resolving CTEs in a recursive CTE, we need to inline it in case the // CTE contains the self reference. - forceInline || recursiveCTERelationAncestor.isDefined, + forceInline, resolvedCTERelations, None) if (firstSubstituted.isEmpty) { firstSubstituted = Some(substituted) } - substituted + if (recursiveCTERelationAncestor.isDefined) { + withCTEDefs(substituted, tempCteDefs.toSeq) + } else { + substituted + } case other => other.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) { @@ -375,12 +385,7 @@ object CTESubstitution extends Rule[LogicalPlan] { .find(r => conf.resolver(r._1, table)) .map { case (_, d) => - // We don't ever want to inline rCTEs. - val hasSelfReference = d.exists { - case CTERelationRef(d.id, _, _, _, _, true, _) => true - case _ => false - } - if (alwaysInline && !hasSelfReference) { + if (alwaysInline) { d.child } else { // Add a `SubqueryAlias` for hint-resolving rules to match relation names. From 75ef4c9177e58339485fe17d8f14dcbfd0cddc19 Mon Sep 17 00:00:00 2001 From: pavle-martinovic_data Date: Fri, 11 Apr 2025 11:48:45 +0200 Subject: [PATCH 05/11] catch cases for inline ctes in resolvewithcte --- .../catalyst/analysis/ResolveWithCTE.scala | 81 +++++- .../analyzer-results/cte-recursion.sql.out | 236 ++++++++++++------ .../analyzer-results/postgreSQL/with.sql.out | 37 +-- .../sql-tests/inputs/cte-recursion.sql | 39 ++- .../sql-tests/results/cte-recursion.sql.out | 84 +++++-- 5 files changed, 366 insertions(+), 111 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala index 5a5acb24c6971..42dd665cd6671 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala @@ -83,8 +83,21 @@ object ResolveWithCTE extends Rule[LogicalPlan] { cteDef.copy(child = alias.copy(child = loop)) } + // Simple case of duplicating (UNION ALL) clause, with CTEs inside. + case alias @ SubqueryAlias(_, withCTE @ WithCTE( + Union(Seq(anchor, recursion), false, false), _)) => + if (!anchor.resolved) { + cteDef + } else { + val loop = UnionLoop( + cteDef.id, + anchor, + rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, None)) + cteDef.copy(child = alias.copy(child = withCTE.copy(plan = loop))) + } + // The case of CTE name followed by a parenthesized list of column name(s), eg. - // WITH RECURSIVE t(n). + // WITH RECURSIVE t(n), with CTEs inside. case alias @ SubqueryAlias(_, columnAlias @ UnresolvedSubqueryColumnAliases( colNames, @@ -100,7 +113,25 @@ object ResolveWithCTE extends Rule[LogicalPlan] { cteDef.copy(child = alias.copy(child = columnAlias.copy(child = loop))) } - // If the recursion is described with an UNION (deduplicating) clause then the + // The case of CTE name followed by a parenthesized list of column name(s), eg. + // WITH RECURSIVE t(n). + case alias @ SubqueryAlias(_, + columnAlias @ UnresolvedSubqueryColumnAliases( + colNames, + withCTE @ WithCTE(Union(Seq(anchor, recursion), false, false), _) + )) => + if (!anchor.resolved) { + cteDef + } else { + val loop = UnionLoop( + cteDef.id, + anchor, + rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, Some(colNames))) + cteDef.copy(child = alias.copy(child = columnAlias.copy( + child = withCTE.copy(plan = loop)))) + } + + // If the recursion is described with a UNION (deduplicating) clause then the // recursive term should not return those rows that have been calculated previously, // and we exclude those rows from the current iteration result. case alias @ SubqueryAlias(_, @@ -123,6 +154,27 @@ object ResolveWithCTE extends Rule[LogicalPlan] { cteDef.copy(child = alias.copy(child = loop)) } + // UNION case with CTEs inside. + case alias @ SubqueryAlias(_, withCTE @ WithCTE( + Distinct(Union(Seq(anchor, recursion), false, false)), _)) => + cteDef.failAnalysis( + errorClass = "UNION_NOT_SUPPORTED_IN_RECURSIVE_CTE", + messageParameters = Map.empty) + if (!anchor.resolved) { + cteDef + } else { + val loop = UnionLoop( + cteDef.id, + Distinct(anchor), + Except( + rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, None), + UnionLoopRef(cteDef.id, anchor.output, true), + isAll = false + ) + ) + cteDef.copy(child = alias.copy(child = withCTE.copy(plan = loop))) + } + // The case of CTE name followed by a parenthesized list of column name(s). case alias @ SubqueryAlias(_, columnAlias@UnresolvedSubqueryColumnAliases( @@ -147,6 +199,31 @@ object ResolveWithCTE extends Rule[LogicalPlan] { cteDef.copy(child = alias.copy(child = columnAlias.copy(child = loop))) } + // The case of CTE name followed by a parenthesized list of column name(s). + case alias @ SubqueryAlias(_, + columnAlias@UnresolvedSubqueryColumnAliases( + colNames, + withCTE @ WithCTE(Distinct(Union(Seq(anchor, recursion), false, false)), _) + )) => + cteDef.failAnalysis( + errorClass = "UNION_NOT_SUPPORTED_IN_RECURSIVE_CTE", + messageParameters = Map.empty) + if (!anchor.resolved) { + cteDef + } else { + val loop = UnionLoop( + cteDef.id, + Distinct(anchor), + Except( + rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, Some(colNames)), + UnionLoopRef(cteDef.id, anchor.output, true), + isAll = false + ) + ) + cteDef.copy(child = alias.copy(child = + columnAlias.copy(child = withCTE.copy(plan = loop)))) + } + case other => // We do not support cases of sole Union (needs a SubqueryAlias above it), nor // Project (as UnresolvedSubqueryColumnAliases have not been substituted with the diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/cte-recursion.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/cte-recursion.sql.out index f37eac3bdea6a..272517aa36145 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/cte-recursion.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/cte-recursion.sql.out @@ -378,25 +378,32 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException WITH RECURSIVE t1 AS ( SELECT 1 AS level - UNION ( + UNION ALL ( WITH t2 AS (SELECT level + 1 FROM t1 WHERE level < 10) SELECT * FROM t2 ) ) SELECT * FROM t1 -- !query analysis -org.apache.spark.sql.AnalysisException -{ - "errorClass" : "UNION_NOT_SUPPORTED_IN_RECURSIVE_CTE", - "sqlState" : "42836", - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 1, - "stopIndex" : 169, - "fragment" : "WITH RECURSIVE\n t1 AS (\n SELECT 1 AS level\n UNION (\n WITH t2 AS (SELECT level + 1 FROM t1 WHERE level < 10)\n SELECT * FROM t2\n )\n )\nSELECT * FROM t1" - } ] -} +WithCTE +:- CTERelationDef xxxx, false +: +- SubqueryAlias t1 +: +- UnionLoop xxxx +: :- Project [1 AS level#x] +: : +- OneRowRelation +: +- WithCTE +: :- CTERelationDef xxxx, false +: : +- SubqueryAlias t2 +: : +- Project [(level#x + 1) AS (level + 1)#x] +: : +- Filter (level#x < 10) +: : +- SubqueryAlias t1 +: : +- UnionLoopRef xxxx, [level#x], false +: +- Project [(level + 1)#x] +: +- SubqueryAlias t2 +: +- CTERelationRef xxxx, true, [(level + 1)#x], false, false ++- Project [level#x] + +- SubqueryAlias t1 + +- CTERelationRef xxxx, true, [level#x], false, false -- !query @@ -427,22 +434,20 @@ WithCTE : +- Project [1 AS 1#x] : +- OneRowRelation :- CTERelationDef xxxx, false -: +- SubqueryAlias t3 -: +- Project [(level#x + 1) AS (level + 1)#x] -: +- Filter (level#x < 10) -: +- SubqueryAlias t1 -: +- CTERelationRef xxxx, true, [level#x], false, false -:- CTERelationDef xxxx, false : +- SubqueryAlias t1 : +- UnionLoop xxxx : :- Project [1 AS level#x] : : +- OneRowRelation -: +- Project [(level + 1)#x] -: +- SubqueryAlias t3 -: +- Project [(level#x + 1) AS (level + 1)#x] -: +- Filter (level#x < 10) -: +- SubqueryAlias t1 -: +- UnionLoopRef xxxx, [level#x], false +: +- WithCTE +: :- CTERelationDef xxxx, false +: : +- SubqueryAlias t3 +: : +- Project [(level#x + 1) AS (level + 1)#x] +: : +- Filter (level#x < 10) +: : +- SubqueryAlias t1 +: : +- UnionLoopRef xxxx, [level#x], false +: +- Project [(level + 1)#x] +: +- SubqueryAlias t3 +: +- CTERelationRef xxxx, true, [(level + 1)#x], false, false :- CTERelationDef xxxx, false : +- SubqueryAlias t2 : +- Project [level#x] @@ -974,24 +979,25 @@ SELECT * FROM r2 -- !query analysis WithCTE :- CTERelationDef xxxx, false -: +- SubqueryAlias r1 -: +- UnionLoop xxxx -: :- Project [0 AS innerlevel#x] -: : +- OneRowRelation -: +- Project [(innerlevel#x + 1) AS (innerlevel + 1)#x] -: +- Filter (innerlevel#x < 3) -: +- SubqueryAlias r1 -: +- UnionLoopRef xxxx, [innerlevel#x], false -:- CTERelationDef xxxx, false : +- SubqueryAlias r2 -: +- UnionLoop xxxx -: :- Project [0 AS outerlevel#x, innerlevel#x] +: +- WithCTE +: :- CTERelationDef xxxx, false : : +- SubqueryAlias r1 -: : +- CTERelationRef xxxx, true, [innerlevel#x], false, false -: +- Project [(outerlevel#x + 1) AS (outerlevel + 1)#x, innerlevel#x] -: +- Filter (outerlevel#x < 3) -: +- SubqueryAlias r2 -: +- UnionLoopRef xxxx, [outerlevel#x, innerlevel#x], false +: : +- UnionLoop xxxx +: : :- Project [0 AS innerlevel#x] +: : : +- OneRowRelation +: : +- Project [(innerlevel#x + 1) AS (innerlevel + 1)#x] +: : +- Filter (innerlevel#x < 3) +: : +- SubqueryAlias r1 +: : +- UnionLoopRef xxxx, [innerlevel#x], false +: +- UnionLoop xxxx +: :- Project [0 AS outerlevel#x, innerlevel#x] +: : +- SubqueryAlias r1 +: : +- CTERelationRef xxxx, true, [innerlevel#x], false, false +: +- Project [(outerlevel#x + 1) AS (outerlevel + 1)#x, innerlevel#x] +: +- Filter (outerlevel#x < 3) +: +- SubqueryAlias r2 +: +- UnionLoopRef xxxx, [outerlevel#x, innerlevel#x], false +- Project [outerlevel#x, innerlevel#x] +- SubqueryAlias r2 +- CTERelationRef xxxx, true, [outerlevel#x, innerlevel#x], false, false @@ -1013,25 +1019,26 @@ SELECT * FROM r WithCTE :- CTERelationDef xxxx, false : +- SubqueryAlias r -: +- Project [col1#x AS level#x] -: +- UnionLoop xxxx -: :- LocalRelation [col1#x] -: +- Project [(level#x + 1) AS (level + 1)#x] -: +- Filter (level#x < 3) -: +- SubqueryAlias r -: +- Project [col1#x AS level#x] -: +- UnionLoopRef xxxx, [col1#x], false -:- CTERelationDef xxxx, false -: +- SubqueryAlias r : +- Project [level#x AS level#x] -: +- Union false, false -: :- Project [level#x] +: +- WithCTE +: :- CTERelationDef xxxx, false : : +- SubqueryAlias r -: : +- CTERelationRef xxxx, true, [level#x], false, false -: +- Project [(level#x + 1) AS (level + 1)#x] -: +- Filter (level#x < 3) -: +- SubqueryAlias r -: +- CTERelationRef xxxx, true, [level#x], false, false +: : +- Project [col1#x AS level#x] +: : +- UnionLoop xxxx +: : :- LocalRelation [col1#x] +: : +- Project [(level#x + 1) AS (level + 1)#x] +: : +- Filter (level#x < 3) +: : +- SubqueryAlias r +: : +- Project [col1#x AS level#x] +: : +- UnionLoopRef xxxx, [col1#x], false +: +- Union false, false +: :- Project [level#x] +: : +- SubqueryAlias r +: : +- CTERelationRef xxxx, true, [level#x], false, false +: +- Project [(level#x + 1) AS (level + 1)#x] +: +- Filter (level#x < 3) +: +- SubqueryAlias r +: +- CTERelationRef xxxx, true, [level#x], false, false +- Project [level#x] +- SubqueryAlias r +- CTERelationRef xxxx, true, [level#x], false, false @@ -1053,21 +1060,22 @@ SELECT * FROM r WithCTE :- CTERelationDef xxxx, false : +- SubqueryAlias r -: +- Project [col1#x AS level#x] -: +- UnionLoop xxxx -: :- LocalRelation [col1#x] -: +- Project [(level#x + 1) AS (level + 1)#x] -: +- Filter (level#x < 3) -: +- SubqueryAlias r -: +- Project [col1#x AS level#x] -: +- UnionLoopRef xxxx, [col1#x], false -:- CTERelationDef xxxx, false -: +- SubqueryAlias r : +- Project [level#x AS level#x] : +- UnionLoop xxxx -: :- Project [level#x] -: : +- SubqueryAlias r -: : +- CTERelationRef xxxx, true, [level#x], false, false +: :- WithCTE +: : :- CTERelationDef xxxx, false +: : : +- SubqueryAlias r +: : : +- Project [col1#x AS level#x] +: : : +- UnionLoop xxxx +: : : :- LocalRelation [col1#x] +: : : +- Project [(level#x + 1) AS (level + 1)#x] +: : : +- Filter (level#x < 3) +: : : +- SubqueryAlias r +: : : +- Project [col1#x AS level#x] +: : : +- UnionLoopRef xxxx, [col1#x], false +: : +- Project [level#x] +: : +- SubqueryAlias r +: : +- CTERelationRef xxxx, true, [level#x], false, false : +- Project [(level#x + 1) AS (level + 1)#x] : +- Filter (level#x < 3) : +- SubqueryAlias r @@ -1312,3 +1320,89 @@ WithCTE +- Project [n#x] +- SubqueryAlias t2 +- CTERelationRef xxxx, true, [n#x], false, false + + +-- !query +WITH RECURSIVE t1 (n) AS ( + VALUES(1) + UNION ALL + ( + WITH t2(j) AS ( + SELECT n + 1 FROM t1 + ), + t3(k) AS ( + SELECT j FROM t2 + ) + SELECT k FROM t3 WHERE k <= 5 + ) +) +SELECT n FROM t1 +-- !query analysis +WithCTE +:- CTERelationDef xxxx, false +: +- SubqueryAlias t1 +: +- Project [col1#x AS n#x] +: +- UnionLoop xxxx +: :- LocalRelation [col1#x] +: +- WithCTE +: :- CTERelationDef xxxx, false +: : +- SubqueryAlias t2 +: : +- Project [(n + 1)#x AS j#x] +: : +- Project [(n#x + 1) AS (n + 1)#x] +: : +- SubqueryAlias t1 +: : +- Project [col1#x AS n#x] +: : +- UnionLoopRef xxxx, [col1#x], false +: :- CTERelationDef xxxx, false +: : +- SubqueryAlias t3 +: : +- Project [j#x AS k#x] +: : +- Project [j#x] +: : +- SubqueryAlias t2 +: : +- CTERelationRef xxxx, true, [j#x], false, false +: +- Project [k#x] +: +- Filter (k#x <= 5) +: +- SubqueryAlias t3 +: +- CTERelationRef xxxx, true, [k#x], false, false ++- Project [n#x] + +- SubqueryAlias t1 + +- CTERelationRef xxxx, true, [n#x], false, false + + +-- !query +WITH RECURSIVE r2(outerlevel1, innerlevel1) AS ( + WITH RECURSIVE r1 AS ( + SELECT 0 AS innerlevel + UNION ALL + SELECT innerlevel + 1 FROM r1 WHERE innerlevel < 3 + ) + SELECT 0 AS outerlevel, innerlevel FROM r1 + UNION ALL + SELECT outerlevel1 + 1, innerlevel1 FROM r2 WHERE outerlevel1 < 3 +) +SELECT * FROM r2 +-- !query analysis +WithCTE +:- CTERelationDef xxxx, false +: +- SubqueryAlias r2 +: +- Project [outerlevel#x AS outerlevel1#x, innerlevel#x AS innerlevel1#x] +: +- WithCTE +: :- CTERelationDef xxxx, false +: : +- SubqueryAlias r1 +: : +- UnionLoop xxxx +: : :- Project [0 AS innerlevel#x] +: : : +- OneRowRelation +: : +- Project [(innerlevel#x + 1) AS (innerlevel + 1)#x] +: : +- Filter (innerlevel#x < 3) +: : +- SubqueryAlias r1 +: : +- UnionLoopRef xxxx, [innerlevel#x], false +: +- UnionLoop xxxx +: :- Project [0 AS outerlevel#x, innerlevel#x] +: : +- SubqueryAlias r1 +: : +- CTERelationRef xxxx, true, [innerlevel#x], false, false +: +- Project [(outerlevel1#x + 1) AS (outerlevel1 + 1)#x, innerlevel1#x] +: +- Filter (outerlevel1#x < 3) +: +- SubqueryAlias r2 +: +- Project [outerlevel#x AS outerlevel1#x, innerlevel#x AS innerlevel1#x] +: +- UnionLoopRef xxxx, [outerlevel#x, innerlevel#x], false ++- Project [outerlevel1#x, innerlevel1#x] + +- SubqueryAlias r2 + +- CTERelationRef xxxx, true, [outerlevel1#x, innerlevel1#x], false, false diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/with.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/with.sql.out index faf02a862d066..32388ed10e735 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/with.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/with.sql.out @@ -1503,27 +1503,28 @@ SELECT * FROM t -- !query analysis WithCTE :- CTERelationDef xxxx, false -: +- SubqueryAlias s -: +- Project [col1#x AS i#x] -: +- UnionLoop xxxx -: :- LocalRelation [col1#x] -: +- Project [(i#x + 1) AS (i + 1)#x] -: +- Filter (i#x < 10) -: +- SubqueryAlias s -: +- Project [col1#x AS i#x] -: +- UnionLoopRef xxxx, [col1#x], false -:- CTERelationDef xxxx, false : +- SubqueryAlias t : +- Project [i#x AS j#x] -: +- UnionLoop xxxx -: :- Project [i#x] +: +- WithCTE +: :- CTERelationDef xxxx, false : : +- SubqueryAlias s -: : +- CTERelationRef xxxx, true, [i#x], false, false -: +- Project [(j#x + 1) AS (j + 1)#x] -: +- Filter (j#x < 10) -: +- SubqueryAlias t -: +- Project [i#x AS j#x] -: +- UnionLoopRef xxxx, [i#x], false +: : +- Project [col1#x AS i#x] +: : +- UnionLoop xxxx +: : :- LocalRelation [col1#x] +: : +- Project [(i#x + 1) AS (i + 1)#x] +: : +- Filter (i#x < 10) +: : +- SubqueryAlias s +: : +- Project [col1#x AS i#x] +: : +- UnionLoopRef xxxx, [col1#x], false +: +- UnionLoop xxxx +: :- Project [i#x] +: : +- SubqueryAlias s +: : +- CTERelationRef xxxx, true, [i#x], false, false +: +- Project [(j#x + 1) AS (j + 1)#x] +: +- Filter (j#x < 10) +: +- SubqueryAlias t +: +- Project [i#x AS j#x] +: +- UnionLoopRef xxxx, [i#x], false +- Project [j#x] +- SubqueryAlias t +- CTERelationRef xxxx, true, [j#x], false, false diff --git a/sql/core/src/test/resources/sql-tests/inputs/cte-recursion.sql b/sql/core/src/test/resources/sql-tests/inputs/cte-recursion.sql index c45f62196430a..6f8c8c137da3a 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/cte-recursion.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/cte-recursion.sql @@ -143,20 +143,18 @@ WITH SELECT * FROM t2; --- recursive reference is not allowed in a nested CTE --- TABLE_OR_VIEW_NOT_FOUND is thrown now, although it some check should be added to exactly inform --- that this is not allowed +-- recursive reference in a nested CTE WITH RECURSIVE t1 AS ( SELECT 1 AS level - UNION ( + UNION ALL ( WITH t2 AS (SELECT level + 1 FROM t1 WHERE level < 10) SELECT * FROM t2 ) ) SELECT * FROM t1; --- recursive reference and conflicting outer CTEs are not allowed in a nested CTE +-- recursive reference and conflicting outer CTEs in a nested CTE SET spark.sql.legacy.ctePrecedencePolicy=CORRECTED; WITH t1 AS (SELECT 1), @@ -499,4 +497,33 @@ t2(n) AS ( UNION ALL SELECT n + 1 FROM t2, t1 WHERE n + 1 = a ) -SELECT * FROM t2; \ No newline at end of file +SELECT * FROM t2; + +-- Multiple CTEs within rCTE +WITH RECURSIVE t1 (n) AS ( + VALUES(1) + UNION ALL + ( + WITH t2(j) AS ( + SELECT n + 1 FROM t1 + ), + t3(k) AS ( + SELECT j FROM t2 + ) + SELECT k FROM t3 WHERE k <= 5 + ) +) +SELECT n FROM t1; + +-- Column aliases with CTEs inside rCTEs +WITH RECURSIVE r2(outerlevel1, innerlevel1) AS ( + WITH RECURSIVE r1 AS ( + SELECT 0 AS innerlevel + UNION ALL + SELECT innerlevel + 1 FROM r1 WHERE innerlevel < 3 + ) + SELECT 0 AS outerlevel, innerlevel FROM r1 + UNION ALL + SELECT outerlevel1 + 1, innerlevel1 FROM r2 WHERE outerlevel1 < 3 +) +SELECT * FROM r2; \ No newline at end of file diff --git a/sql/core/src/test/resources/sql-tests/results/cte-recursion.sql.out b/sql/core/src/test/resources/sql-tests/results/cte-recursion.sql.out index f860f897c99ce..8e1310c254eba 100644 --- a/sql/core/src/test/resources/sql-tests/results/cte-recursion.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/cte-recursion.sql.out @@ -358,27 +358,25 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException WITH RECURSIVE t1 AS ( SELECT 1 AS level - UNION ( + UNION ALL ( WITH t2 AS (SELECT level + 1 FROM t1 WHERE level < 10) SELECT * FROM t2 ) ) SELECT * FROM t1 -- !query schema -struct<> +struct -- !query output -org.apache.spark.sql.AnalysisException -{ - "errorClass" : "UNION_NOT_SUPPORTED_IN_RECURSIVE_CTE", - "sqlState" : "42836", - "queryContext" : [ { - "objectType" : "", - "objectName" : "", - "startIndex" : 1, - "stopIndex" : 169, - "fragment" : "WITH RECURSIVE\n t1 AS (\n SELECT 1 AS level\n UNION (\n WITH t2 AS (SELECT level + 1 FROM t1 WHERE level < 10)\n SELECT * FROM t2\n )\n )\nSELECT * FROM t1" - } ] -} +1 +10 +2 +3 +4 +5 +6 +7 +8 +9 -- !query @@ -1190,3 +1188,61 @@ struct 1 2 3 + + +-- !query +WITH RECURSIVE t1 (n) AS ( + VALUES(1) + UNION ALL + ( + WITH t2(j) AS ( + SELECT n + 1 FROM t1 + ), + t3(k) AS ( + SELECT j FROM t2 + ) + SELECT k FROM t3 WHERE k <= 5 + ) +) +SELECT n FROM t1 +-- !query schema +struct +-- !query output +1 +2 +3 +4 +5 + + +-- !query +WITH RECURSIVE r2(outerlevel1, innerlevel1) AS ( + WITH RECURSIVE r1 AS ( + SELECT 0 AS innerlevel + UNION ALL + SELECT innerlevel + 1 FROM r1 WHERE innerlevel < 3 + ) + SELECT 0 AS outerlevel, innerlevel FROM r1 + UNION ALL + SELECT outerlevel1 + 1, innerlevel1 FROM r2 WHERE outerlevel1 < 3 +) +SELECT * FROM r2 +-- !query schema +struct +-- !query output +0 0 +0 1 +0 2 +0 3 +1 0 +1 1 +1 2 +1 3 +2 0 +2 1 +2 2 +2 3 +3 0 +3 1 +3 2 +3 3 From a968531cf22f4546766d36f9388edc9f66a40535 Mon Sep 17 00:00:00 2001 From: Pavle Martinovic <34302662+Pajaraja@users.noreply.github.com> Date: Fri, 11 Apr 2025 14:34:03 +0200 Subject: [PATCH 06/11] Update sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala Co-authored-by: Wenchen Fan --- .../org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala index 42dd665cd6671..a51591c7d6560 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala @@ -97,7 +97,7 @@ object ResolveWithCTE extends Rule[LogicalPlan] { } // The case of CTE name followed by a parenthesized list of column name(s), eg. - // WITH RECURSIVE t(n), with CTEs inside. + // WITH RECURSIVE t(n). case alias @ SubqueryAlias(_, columnAlias @ UnresolvedSubqueryColumnAliases( colNames, From 039beeebfa7dfc6586ff513ac1747a688f25870b Mon Sep 17 00:00:00 2001 From: Pavle Martinovic <34302662+Pajaraja@users.noreply.github.com> Date: Fri, 11 Apr 2025 14:34:42 +0200 Subject: [PATCH 07/11] Update sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala Co-authored-by: Wenchen Fan --- .../apache/spark/sql/catalyst/analysis/CTESubstitution.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index d19f0baf4cfba..ac95f3bd23a9b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -212,8 +212,8 @@ object CTESubstitution extends Rule[LogicalPlan] { forceInline: Boolean, outerCTEDefs: Seq[(String, CTERelationDef)], cteDefs: ArrayBuffer[CTERelationDef], - recursiveCTERelationAncestor: Option[(String, CTERelationDef)]): - (LogicalPlan, Option[LogicalPlan]) = { + recursiveCTERelationAncestor: Option[(String, CTERelationDef)] + ): (LogicalPlan, Option[LogicalPlan]) = { var firstSubstituted: Option[LogicalPlan] = None val newPlan = plan.resolveOperatorsDownWithPruning( _.containsAnyPattern(UNRESOLVED_WITH, PLAN_EXPRESSION)) { From c9bf2d53073579b81d32aae48db8440eb36ad25a Mon Sep 17 00:00:00 2001 From: pavle-martinovic_data Date: Fri, 11 Apr 2025 16:25:58 +0200 Subject: [PATCH 08/11] prevent rCTEs from inlining --- .../apache/spark/sql/catalyst/analysis/CTESubstitution.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index ac95f3bd23a9b..0858ec60d0faf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -385,7 +385,7 @@ object CTESubstitution extends Rule[LogicalPlan] { .find(r => conf.resolver(r._1, table)) .map { case (_, d) => - if (alwaysInline) { + if (alwaysInline && !d.hasSelfReferenceAsCTERef) { d.child } else { // Add a `SubqueryAlias` for hint-resolving rules to match relation names. From 3a7fcc8b5f80a20eb165e159b7262dcacf1b7519 Mon Sep 17 00:00:00 2001 From: pavle-martinovic_data Date: Fri, 11 Apr 2025 19:12:02 +0200 Subject: [PATCH 09/11] Fix case when outer CTE makes recursive reference --- .../catalyst/analysis/ResolveWithCTE.scala | 46 +++++++++++--- .../analyzer-results/cte-recursion.sql.out | 62 +++++++++++++++++++ .../sql-tests/inputs/cte-recursion.sql | 20 +++++- .../sql-tests/results/cte-recursion.sql.out | 34 ++++++++++ 4 files changed, 151 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala index a51591c7d6560..6ec465f7ffe76 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala @@ -83,17 +83,24 @@ object ResolveWithCTE extends Rule[LogicalPlan] { cteDef.copy(child = alias.copy(child = loop)) } - // Simple case of duplicating (UNION ALL) clause, with CTEs inside. + // Simple case of duplicating (UNION ALL) clause. case alias @ SubqueryAlias(_, withCTE @ WithCTE( - Union(Seq(anchor, recursion), false, false), _)) => + Union(Seq(anchor, recursion), false, false), innerCteDefs)) => if (!anchor.resolved) { cteDef } else { + // We need to check whether any of the inner CTEs has a self reference and replace + // it if needed + val newInnerCteDefs = innerCteDefs.map { innerCteDef => + innerCteDef.copy(child = rewriteRecursiveCTERefs( + innerCteDef.child, anchor, cteDef.id, None)) + } val loop = UnionLoop( cteDef.id, anchor, rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, None)) - cteDef.copy(child = alias.copy(child = withCTE.copy(plan = loop))) + cteDef.copy(child = alias.copy(child = withCTE.copy( + plan = loop, cteDefs = newInnerCteDefs))) } // The case of CTE name followed by a parenthesized list of column name(s), eg. @@ -118,17 +125,23 @@ object ResolveWithCTE extends Rule[LogicalPlan] { case alias @ SubqueryAlias(_, columnAlias @ UnresolvedSubqueryColumnAliases( colNames, - withCTE @ WithCTE(Union(Seq(anchor, recursion), false, false), _) + withCTE @ WithCTE(Union(Seq(anchor, recursion), false, false), innerCteDefs) )) => if (!anchor.resolved) { cteDef } else { + // We need to check whether any of the inner CTEs has a self reference and replace + // it if needed + val newInnerCteDefs = innerCteDefs.map { innerCteDef => + innerCteDef.copy(child = rewriteRecursiveCTERefs( + innerCteDef.child, anchor, cteDef.id, Some(colNames))) + } val loop = UnionLoop( cteDef.id, anchor, rewriteRecursiveCTERefs(recursion, anchor, cteDef.id, Some(colNames))) cteDef.copy(child = alias.copy(child = columnAlias.copy( - child = withCTE.copy(plan = loop)))) + child = withCTE.copy(plan = loop, cteDefs = newInnerCteDefs)))) } // If the recursion is described with a UNION (deduplicating) clause then the @@ -156,13 +169,19 @@ object ResolveWithCTE extends Rule[LogicalPlan] { // UNION case with CTEs inside. case alias @ SubqueryAlias(_, withCTE @ WithCTE( - Distinct(Union(Seq(anchor, recursion), false, false)), _)) => + Distinct(Union(Seq(anchor, recursion), false, false)), innerCteDefs)) => cteDef.failAnalysis( errorClass = "UNION_NOT_SUPPORTED_IN_RECURSIVE_CTE", messageParameters = Map.empty) if (!anchor.resolved) { cteDef } else { + // We need to check whether any of the inner CTEs has a self reference and replace + // it if needed + val newInnerCteDefs = innerCteDefs.map { innerCteDef => + innerCteDef.copy(child = rewriteRecursiveCTERefs( + innerCteDef.child, anchor, cteDef.id, None)) + } val loop = UnionLoop( cteDef.id, Distinct(anchor), @@ -172,7 +191,8 @@ object ResolveWithCTE extends Rule[LogicalPlan] { isAll = false ) ) - cteDef.copy(child = alias.copy(child = withCTE.copy(plan = loop))) + cteDef.copy(child = alias.copy(child = withCTE.copy( + plan = loop, cteDefs = newInnerCteDefs))) } // The case of CTE name followed by a parenthesized list of column name(s). @@ -203,7 +223,7 @@ object ResolveWithCTE extends Rule[LogicalPlan] { case alias @ SubqueryAlias(_, columnAlias@UnresolvedSubqueryColumnAliases( colNames, - withCTE @ WithCTE(Distinct(Union(Seq(anchor, recursion), false, false)), _) + WithCTE(Distinct(Union(Seq(anchor, recursion), false, false)), innerCteDefs) )) => cteDef.failAnalysis( errorClass = "UNION_NOT_SUPPORTED_IN_RECURSIVE_CTE", @@ -211,6 +231,12 @@ object ResolveWithCTE extends Rule[LogicalPlan] { if (!anchor.resolved) { cteDef } else { + // We need to check whether any of the inner CTEs has a self reference and replace + // it if needed + val newInnerCteDefs = innerCteDefs.map { innerCteDef => + innerCteDef.copy(child = rewriteRecursiveCTERefs( + innerCteDef.child, anchor, cteDef.id, Some(colNames))) + } val loop = UnionLoop( cteDef.id, Distinct(anchor), @@ -220,8 +246,8 @@ object ResolveWithCTE extends Rule[LogicalPlan] { isAll = false ) ) - cteDef.copy(child = alias.copy(child = - columnAlias.copy(child = withCTE.copy(plan = loop)))) + cteDef.copy(child = alias.copy(child = columnAlias.copy( + child = withCTE.copy(plan = loop, cteDefs = newInnerCteDefs)))) } case other => diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/cte-recursion.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/cte-recursion.sql.out index 272517aa36145..f95f7a00bb4c1 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/cte-recursion.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/cte-recursion.sql.out @@ -1406,3 +1406,65 @@ WithCTE +- Project [outerlevel1#x, innerlevel1#x] +- SubqueryAlias r2 +- CTERelationRef xxxx, true, [outerlevel1#x, innerlevel1#x], false, false + + +-- !query +WITH RECURSIVE t1(n) AS ( + WITH t2(n) AS (SELECT * FROM t1) + SELECT 1 + UNION ALL + SELECT n+1 FROM t2 WHERE n < 5) +SELECT * FROM t1 +-- !query analysis +WithCTE +:- CTERelationDef xxxx, false +: +- SubqueryAlias t1 +: +- Project [1#x AS n#x] +: +- WithCTE +: :- CTERelationDef xxxx, false +: : +- SubqueryAlias t2 +: : +- Project [n#x AS n#x] +: : +- Project [n#x] +: : +- SubqueryAlias t1 +: : +- Project [1#x AS n#x] +: : +- UnionLoopRef xxxx, [1#x], false +: +- UnionLoop xxxx +: :- Project [1 AS 1#x] +: : +- OneRowRelation +: +- Project [(n#x + 1) AS (n + 1)#x] +: +- Filter (n#x < 5) +: +- SubqueryAlias t2 +: +- CTERelationRef xxxx, true, [n#x], false, false ++- Project [n#x] + +- SubqueryAlias t1 + +- CTERelationRef xxxx, true, [n#x], false, false + + +-- !query +WITH RECURSIVE t1 AS ( + WITH t2(n) AS (SELECT * FROM t1) + SELECT 1 AS n + UNION ALL + SELECT n+1 FROM t2 WHERE n < 5) +SELECT * FROM t1 +-- !query analysis +WithCTE +:- CTERelationDef xxxx, false +: +- SubqueryAlias t1 +: +- WithCTE +: :- CTERelationDef xxxx, false +: : +- SubqueryAlias t2 +: : +- Project [n#x AS n#x] +: : +- Project [n#x] +: : +- SubqueryAlias t1 +: : +- UnionLoopRef xxxx, [n#x], false +: +- UnionLoop xxxx +: :- Project [1 AS n#x] +: : +- OneRowRelation +: +- Project [(n#x + 1) AS (n + 1)#x] +: +- Filter (n#x < 5) +: +- SubqueryAlias t2 +: +- CTERelationRef xxxx, true, [n#x], false, false ++- Project [n#x] + +- SubqueryAlias t1 + +- CTERelationRef xxxx, true, [n#x], false, false diff --git a/sql/core/src/test/resources/sql-tests/inputs/cte-recursion.sql b/sql/core/src/test/resources/sql-tests/inputs/cte-recursion.sql index 6f8c8c137da3a..97638b9424645 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/cte-recursion.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/cte-recursion.sql @@ -526,4 +526,22 @@ WITH RECURSIVE r2(outerlevel1, innerlevel1) AS ( UNION ALL SELECT outerlevel1 + 1, innerlevel1 FROM r2 WHERE outerlevel1 < 3 ) -SELECT * FROM r2; \ No newline at end of file +SELECT * FROM r2; + +-- An inner cte that is defined for both the anchor and recursion but called only in the recursion +-- with subquery alias +WITH RECURSIVE t1(n) AS ( + WITH t2(n) AS (SELECT * FROM t1) + SELECT 1 + UNION ALL + SELECT n+1 FROM t2 WHERE n < 5) +SELECT * FROM t1; + +-- An inner cte that is defined for both the anchor and recursion but called only in the recursion +-- without query alias +WITH RECURSIVE t1 AS ( + WITH t2(n) AS (SELECT * FROM t1) + SELECT 1 AS n + UNION ALL + SELECT n+1 FROM t2 WHERE n < 5) +SELECT * FROM t1; \ No newline at end of file diff --git a/sql/core/src/test/resources/sql-tests/results/cte-recursion.sql.out b/sql/core/src/test/resources/sql-tests/results/cte-recursion.sql.out index 8e1310c254eba..6b12d6a159bc9 100644 --- a/sql/core/src/test/resources/sql-tests/results/cte-recursion.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/cte-recursion.sql.out @@ -1246,3 +1246,37 @@ struct 3 1 3 2 3 3 + + +-- !query +WITH RECURSIVE t1(n) AS ( + WITH t2(n) AS (SELECT * FROM t1) + SELECT 1 + UNION ALL + SELECT n+1 FROM t2 WHERE n < 5) +SELECT * FROM t1 +-- !query schema +struct +-- !query output +1 +2 +3 +4 +5 + + +-- !query +WITH RECURSIVE t1 AS ( + WITH t2(n) AS (SELECT * FROM t1) + SELECT 1 AS n + UNION ALL + SELECT n+1 FROM t2 WHERE n < 5) +SELECT * FROM t1 +-- !query schema +struct +-- !query output +1 +2 +3 +4 +5 From c4a28b20578ebb037c571fd3adfbbc494fae566e Mon Sep 17 00:00:00 2001 From: Pavle Martinovic <34302662+Pajaraja@users.noreply.github.com> Date: Sun, 13 Apr 2025 01:19:37 +0200 Subject: [PATCH 10/11] Update sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala Co-authored-by: Wenchen Fan --- .../apache/spark/sql/catalyst/analysis/CTESubstitution.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index 0858ec60d0faf..9a0ec48ca15d4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -227,7 +227,7 @@ object CTESubstitution extends Rule[LogicalPlan] { val tempCteDefs = ArrayBuffer.empty[CTERelationDef] val resolvedCTERelations = if (recursiveCTERelationAncestor.isDefined) { - resolveCTERelations(relations, isLegacy = false, forceInline, outerCTEDefs, tempCteDefs, + resolveCTERelations(relations, isLegacy = false, forceInline = false, outerCTEDefs, tempCteDefs, recursiveCTERelationAncestor, allowRecursion) ++ outerCTEDefs } else { resolveCTERelations(relations, isLegacy = false, forceInline, outerCTEDefs, cteDefs, From 14771bdffddfd3e005062d92b127a1312e7c210f Mon Sep 17 00:00:00 2001 From: pavle-martinovic_data Date: Sun, 13 Apr 2025 10:58:20 +0200 Subject: [PATCH 11/11] fix scala lint and remove redundant inline check --- .../spark/sql/catalyst/analysis/CTESubstitution.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index 9a0ec48ca15d4..5bbe85705ac18 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -227,8 +227,8 @@ object CTESubstitution extends Rule[LogicalPlan] { val tempCteDefs = ArrayBuffer.empty[CTERelationDef] val resolvedCTERelations = if (recursiveCTERelationAncestor.isDefined) { - resolveCTERelations(relations, isLegacy = false, forceInline = false, outerCTEDefs, tempCteDefs, - recursiveCTERelationAncestor, allowRecursion) ++ outerCTEDefs + resolveCTERelations(relations, isLegacy = false, forceInline = false, outerCTEDefs, + tempCteDefs, recursiveCTERelationAncestor, allowRecursion) ++ outerCTEDefs } else { resolveCTERelations(relations, isLegacy = false, forceInline, outerCTEDefs, cteDefs, recursiveCTERelationAncestor, allowRecursion) ++ outerCTEDefs @@ -385,7 +385,7 @@ object CTESubstitution extends Rule[LogicalPlan] { .find(r => conf.resolver(r._1, table)) .map { case (_, d) => - if (alwaysInline && !d.hasSelfReferenceAsCTERef) { + if (alwaysInline) { d.child } else { // Add a `SubqueryAlias` for hint-resolving rules to match relation names.