Skip to content

Commit d99135b

Browse files
Ngone51cloud-fan
authored andcommitted
[SPARK-34741][SQL] MergeIntoTable should avoid ambiguous reference in UpdateAction
### What changes were proposed in this pull request? This PR proposes to deduplicate the source table when there're conflicting attributes between the target table and the source table. ### Why are the changes needed? When resolving the `UpdateAction`, which could reference attributes from both target and source tables, Spark should know clearly where the attribute comes from when there're conflicting attributes instead of picking up a random one. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added a unit test and updated existing tests. Closes apache#31835 from Ngone51/dedup-MergeIntoTable. Authored-by: yi.wu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 86ea520 commit d99135b

File tree

4 files changed

+30
-6
lines changed

4 files changed

+30
-6
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

+3
Original file line numberDiff line numberDiff line change
@@ -1669,6 +1669,9 @@ class Analyzer(override val catalogManager: CatalogManager)
16691669
// implementation and should be resolved based on the table schema.
16701670
o.copy(deleteExpr = resolveExpressionByPlanOutput(o.deleteExpr, o.table))
16711671

1672+
case m @ MergeIntoTable(targetTable, sourceTable, _, _, _) if !m.duplicateResolved =>
1673+
m.copy(sourceTable = dedupRight(targetTable, sourceTable))
1674+
16721675
case m @ MergeIntoTable(targetTable, sourceTable, _, _, _)
16731676
if !m.resolved && targetTable.resolved && sourceTable.resolved =>
16741677

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

+1
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,7 @@ case class MergeIntoTable(
404404
matchedActions: Seq[MergeAction],
405405
notMatchedActions: Seq[MergeAction]) extends Command with SupportsSubquery {
406406
override def children: Seq[LogicalPlan] = Seq(targetTable, sourceTable)
407+
def duplicateResolved: Boolean = targetTable.outputSet.intersect(sourceTable.outputSet).isEmpty
407408
}
408409

409410
sealed abstract class MergeAction extends Expression with Unevaluable {

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala

+13
Original file line numberDiff line numberDiff line change
@@ -671,6 +671,19 @@ class AnalysisSuite extends AnalysisTest with Matchers {
671671
Project(Seq(UnresolvedAttribute("temp0.a"), UnresolvedAttribute("temp1.a")), join))
672672
}
673673

674+
test("SPARK-34741: Avoid ambiguous reference in MergeIntoTable") {
675+
val cond = 'a > 1
676+
assertAnalysisError(
677+
MergeIntoTable(
678+
testRelation,
679+
testRelation,
680+
cond,
681+
UpdateAction(Some(cond), Assignment('a, 'a) :: Nil) :: Nil,
682+
Nil
683+
),
684+
"Reference 'a' is ambiguous" :: Nil)
685+
}
686+
674687
test("SPARK-24488 Generator with multiple aliases") {
675688
assertAnalysisSuccess(
676689
listRelation.select(Explode($"list").as("first_alias").as("second_alias")))

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicateSuite.scala

+13-6
Original file line numberDiff line numberDiff line change
@@ -455,18 +455,25 @@ class ReplaceNullWithFalseInPredicateSuite extends PlanTest {
455455
}
456456

457457
private def testMerge(originalCond: Expression, expectedCond: Expression): Unit = {
458-
val func = (rel: LogicalPlan, expr: Expression) => {
459-
val assignments = Seq(
458+
val func = (target: LogicalPlan, source: LogicalPlan, expr: Expression) => {
459+
val matchedAssignments = Seq(
460460
Assignment('i, 'i),
461461
Assignment('b, 'b),
462462
Assignment('a, 'a),
463463
Assignment('m, 'm)
464464
)
465-
val matchedActions = UpdateAction(Some(expr), assignments) :: DeleteAction(Some(expr)) :: Nil
466-
val notMatchedActions = InsertAction(Some(expr), assignments) :: Nil
467-
MergeIntoTable(rel, rel, mergeCondition = expr, matchedActions, notMatchedActions)
465+
val notMatchedAssignments = Seq(
466+
Assignment('i, 'd)
467+
)
468+
val matchedActions = UpdateAction(Some(expr), matchedAssignments) ::
469+
DeleteAction(Some(expr)) :: Nil
470+
val notMatchedActions = InsertAction(None, notMatchedAssignments) :: Nil
471+
MergeIntoTable(target, source, mergeCondition = expr, matchedActions, notMatchedActions)
468472
}
469-
test(func, originalCond, expectedCond)
473+
val originalPlan = func(testRelation, anotherTestRelation, originalCond).analyze
474+
val optimizedPlan = Optimize.execute(originalPlan)
475+
val expectedPlan = func(testRelation, anotherTestRelation, expectedCond).analyze
476+
comparePlans(optimizedPlan, expectedPlan)
470477
}
471478

472479
private def testHigherOrderFunc(

0 commit comments

Comments
 (0)