Skip to content

Commit c7027de

Browse files
authored
chore: Remove redundant RowToColumnar from CometShuffleExchangeExec for columnar shuffle (apache#944)
* chore: Remove redundant RowToColumnar from Comet query * fix * Update
1 parent 2108180 commit c7027de

File tree

35 files changed

+2744
-2819
lines changed

35 files changed

+2744
-2819
lines changed

spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala

+10
Original file line numberDiff line numberDiff line change
@@ -990,6 +990,16 @@ class CometSparkSessionExtensions
990990
val eliminatedPlan = plan transformUp {
991991
case ColumnarToRowExec(sparkToColumnar: CometSparkToColumnarExec) => sparkToColumnar.child
992992
case CometSparkToColumnarExec(child: CometSparkToColumnarExec) => child
993+
// Spark adds `RowToColumnar` under Comet columnar shuffle. But it's redundant as the
994+
// shuffle takes row-based input.
995+
case s @ CometShuffleExchangeExec(
996+
_,
997+
RowToColumnarExec(child),
998+
_,
999+
_,
1000+
CometColumnarShuffle,
1001+
_) =>
1002+
s.withNewChildren(Seq(child))
9931003
}
9941004

9951005
eliminatedPlan match {

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q98/explain.txt

+39-43
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,29 @@
11
== Physical Plan ==
2-
* ColumnarToRow (26)
3-
+- CometProject (25)
4-
+- CometSort (24)
5-
+- CometColumnarExchange (23)
6-
+- RowToColumnar (22)
7-
+- * Project (21)
8-
+- Window (20)
9-
+- * ColumnarToRow (19)
10-
+- CometSort (18)
11-
+- CometExchange (17)
12-
+- CometHashAggregate (16)
13-
+- CometExchange (15)
14-
+- CometHashAggregate (14)
15-
+- CometProject (13)
16-
+- CometBroadcastHashJoin (12)
17-
:- CometProject (7)
18-
: +- CometBroadcastHashJoin (6)
19-
: :- CometFilter (2)
20-
: : +- CometScan parquet spark_catalog.default.store_sales (1)
21-
: +- CometBroadcastExchange (5)
22-
: +- CometFilter (4)
23-
: +- CometScan parquet spark_catalog.default.item (3)
24-
+- CometBroadcastExchange (11)
25-
+- CometProject (10)
26-
+- CometFilter (9)
27-
+- CometScan parquet spark_catalog.default.date_dim (8)
2+
* ColumnarToRow (25)
3+
+- CometProject (24)
4+
+- CometSort (23)
5+
+- CometColumnarExchange (22)
6+
+- * Project (21)
7+
+- Window (20)
8+
+- * ColumnarToRow (19)
9+
+- CometSort (18)
10+
+- CometExchange (17)
11+
+- CometHashAggregate (16)
12+
+- CometExchange (15)
13+
+- CometHashAggregate (14)
14+
+- CometProject (13)
15+
+- CometBroadcastHashJoin (12)
16+
:- CometProject (7)
17+
: +- CometBroadcastHashJoin (6)
18+
: :- CometFilter (2)
19+
: : +- CometScan parquet spark_catalog.default.store_sales (1)
20+
: +- CometBroadcastExchange (5)
21+
: +- CometFilter (4)
22+
: +- CometScan parquet spark_catalog.default.item (3)
23+
+- CometBroadcastExchange (11)
24+
+- CometProject (10)
25+
+- CometFilter (9)
26+
+- CometScan parquet spark_catalog.default.date_dim (8)
2827

2928

3029
(1) Scan parquet spark_catalog.default.store_sales
@@ -124,53 +123,50 @@ Arguments: [sum(_w0#15) windowspecdefinition(i_class#9, specifiedwindowframe(Row
124123
Output [7]: [i_item_desc#7, i_category#10, i_class#9, i_current_price#8, itemrevenue#14, ((_w0#15 * 100) / _we0#16) AS revenueratio#17, i_item_id#6]
125124
Input [8]: [i_item_desc#7, i_category#10, i_class#9, i_current_price#8, itemrevenue#14, _w0#15, i_item_id#6, _we0#16]
126125

127-
(22) RowToColumnar
128-
Input [7]: [i_item_desc#7, i_category#10, i_class#9, i_current_price#8, itemrevenue#14, revenueratio#17, i_item_id#6]
129-
130-
(23) CometColumnarExchange
126+
(22) CometColumnarExchange
131127
Input [7]: [i_item_desc#7, i_category#10, i_class#9, i_current_price#8, itemrevenue#14, revenueratio#17, i_item_id#6]
132128
Arguments: rangepartitioning(i_category#10 ASC NULLS FIRST, i_class#9 ASC NULLS FIRST, i_item_id#6 ASC NULLS FIRST, i_item_desc#7 ASC NULLS FIRST, revenueratio#17 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=3]
133129

134-
(24) CometSort
130+
(23) CometSort
135131
Input [7]: [i_item_desc#7, i_category#10, i_class#9, i_current_price#8, itemrevenue#14, revenueratio#17, i_item_id#6]
136132
Arguments: [i_item_desc#7, i_category#10, i_class#9, i_current_price#8, itemrevenue#14, revenueratio#17, i_item_id#6], [i_category#10 ASC NULLS FIRST, i_class#9 ASC NULLS FIRST, i_item_id#6 ASC NULLS FIRST, i_item_desc#7 ASC NULLS FIRST, revenueratio#17 ASC NULLS FIRST]
137133

138-
(25) CometProject
134+
(24) CometProject
139135
Input [7]: [i_item_desc#7, i_category#10, i_class#9, i_current_price#8, itemrevenue#14, revenueratio#17, i_item_id#6]
140136
Arguments: [i_item_desc#7, i_category#10, i_class#9, i_current_price#8, itemrevenue#14, revenueratio#17], [i_item_desc#7, i_category#10, i_class#9, i_current_price#8, itemrevenue#14, revenueratio#17]
141137

142-
(26) ColumnarToRow [codegen id : 3]
138+
(25) ColumnarToRow [codegen id : 3]
143139
Input [6]: [i_item_desc#7, i_category#10, i_class#9, i_current_price#8, itemrevenue#14, revenueratio#17]
144140

145141
===== Subqueries =====
146142

147143
Subquery:1 Hosting operator id = 1 Hosting Expression = ss_sold_date_sk#3 IN dynamicpruning#4
148-
BroadcastExchange (31)
149-
+- * ColumnarToRow (30)
150-
+- CometProject (29)
151-
+- CometFilter (28)
152-
+- CometScan parquet spark_catalog.default.date_dim (27)
144+
BroadcastExchange (30)
145+
+- * ColumnarToRow (29)
146+
+- CometProject (28)
147+
+- CometFilter (27)
148+
+- CometScan parquet spark_catalog.default.date_dim (26)
153149

154150

155-
(27) Scan parquet spark_catalog.default.date_dim
151+
(26) Scan parquet spark_catalog.default.date_dim
156152
Output [2]: [d_date_sk#11, d_date#12]
157153
Batched: true
158154
Location [not included in comparison]/{warehouse_dir}/date_dim]
159155
PushedFilters: [IsNotNull(d_date), GreaterThanOrEqual(d_date,1999-02-22), LessThanOrEqual(d_date,1999-03-24), IsNotNull(d_date_sk)]
160156
ReadSchema: struct<d_date_sk:int,d_date:date>
161157

162-
(28) CometFilter
158+
(27) CometFilter
163159
Input [2]: [d_date_sk#11, d_date#12]
164160
Condition : (((isnotnull(d_date#12) AND (d_date#12 >= 1999-02-22)) AND (d_date#12 <= 1999-03-24)) AND isnotnull(d_date_sk#11))
165161

166-
(29) CometProject
162+
(28) CometProject
167163
Input [2]: [d_date_sk#11, d_date#12]
168164
Arguments: [d_date_sk#11], [d_date_sk#11]
169165

170-
(30) ColumnarToRow [codegen id : 1]
166+
(29) ColumnarToRow [codegen id : 1]
171167
Input [1]: [d_date_sk#11]
172168

173-
(31) BroadcastExchange
169+
(30) BroadcastExchange
174170
Input [1]: [d_date_sk#11]
175171
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=4]
176172

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q98/simplified.txt

+33-34
Original file line numberDiff line numberDiff line change
@@ -4,37 +4,36 @@ WholeStageCodegen (3)
44
CometProject [i_item_desc,i_category,i_class,i_current_price,itemrevenue,revenueratio]
55
CometSort [i_item_desc,i_category,i_class,i_current_price,itemrevenue,revenueratio,i_item_id]
66
CometColumnarExchange [i_category,i_class,i_item_id,i_item_desc,revenueratio] #1
7-
RowToColumnar
8-
WholeStageCodegen (2)
9-
Project [i_item_desc,i_category,i_class,i_current_price,itemrevenue,_w0,_we0,i_item_id]
10-
InputAdapter
11-
Window [_w0,i_class]
12-
WholeStageCodegen (1)
13-
ColumnarToRow
14-
InputAdapter
15-
CometSort [i_item_desc,i_category,i_class,i_current_price,itemrevenue,_w0,i_item_id]
16-
CometExchange [i_class] #2
17-
CometHashAggregate [i_item_desc,i_category,i_class,i_current_price,itemrevenue,_w0,i_item_id,sum,sum(UnscaledValue(ss_ext_sales_price))]
18-
CometExchange [i_item_id,i_item_desc,i_category,i_class,i_current_price] #3
19-
CometHashAggregate [i_item_id,i_item_desc,i_category,i_class,i_current_price,sum,ss_ext_sales_price]
20-
CometProject [ss_ext_sales_price,i_item_id,i_item_desc,i_current_price,i_class,i_category]
21-
CometBroadcastHashJoin [ss_ext_sales_price,ss_sold_date_sk,i_item_id,i_item_desc,i_current_price,i_class,i_category,d_date_sk]
22-
CometProject [ss_ext_sales_price,ss_sold_date_sk,i_item_id,i_item_desc,i_current_price,i_class,i_category]
23-
CometBroadcastHashJoin [ss_item_sk,ss_ext_sales_price,ss_sold_date_sk,i_item_sk,i_item_id,i_item_desc,i_current_price,i_class,i_category]
24-
CometFilter [ss_item_sk,ss_ext_sales_price,ss_sold_date_sk]
25-
CometScan parquet spark_catalog.default.store_sales [ss_item_sk,ss_ext_sales_price,ss_sold_date_sk]
26-
SubqueryBroadcast [d_date_sk] #1
27-
BroadcastExchange #4
28-
WholeStageCodegen (1)
29-
ColumnarToRow
30-
InputAdapter
31-
CometProject [d_date_sk]
32-
CometFilter [d_date_sk,d_date]
33-
CometScan parquet spark_catalog.default.date_dim [d_date_sk,d_date]
34-
CometBroadcastExchange [i_item_sk,i_item_id,i_item_desc,i_current_price,i_class,i_category] #5
35-
CometFilter [i_item_sk,i_item_id,i_item_desc,i_current_price,i_class,i_category]
36-
CometScan parquet spark_catalog.default.item [i_item_sk,i_item_id,i_item_desc,i_current_price,i_class,i_category]
37-
CometBroadcastExchange [d_date_sk] #6
38-
CometProject [d_date_sk]
39-
CometFilter [d_date_sk,d_date]
40-
CometScan parquet spark_catalog.default.date_dim [d_date_sk,d_date]
7+
WholeStageCodegen (2)
8+
Project [i_item_desc,i_category,i_class,i_current_price,itemrevenue,_w0,_we0,i_item_id]
9+
InputAdapter
10+
Window [_w0,i_class]
11+
WholeStageCodegen (1)
12+
ColumnarToRow
13+
InputAdapter
14+
CometSort [i_item_desc,i_category,i_class,i_current_price,itemrevenue,_w0,i_item_id]
15+
CometExchange [i_class] #2
16+
CometHashAggregate [i_item_desc,i_category,i_class,i_current_price,itemrevenue,_w0,i_item_id,sum,sum(UnscaledValue(ss_ext_sales_price))]
17+
CometExchange [i_item_id,i_item_desc,i_category,i_class,i_current_price] #3
18+
CometHashAggregate [i_item_id,i_item_desc,i_category,i_class,i_current_price,sum,ss_ext_sales_price]
19+
CometProject [ss_ext_sales_price,i_item_id,i_item_desc,i_current_price,i_class,i_category]
20+
CometBroadcastHashJoin [ss_ext_sales_price,ss_sold_date_sk,i_item_id,i_item_desc,i_current_price,i_class,i_category,d_date_sk]
21+
CometProject [ss_ext_sales_price,ss_sold_date_sk,i_item_id,i_item_desc,i_current_price,i_class,i_category]
22+
CometBroadcastHashJoin [ss_item_sk,ss_ext_sales_price,ss_sold_date_sk,i_item_sk,i_item_id,i_item_desc,i_current_price,i_class,i_category]
23+
CometFilter [ss_item_sk,ss_ext_sales_price,ss_sold_date_sk]
24+
CometScan parquet spark_catalog.default.store_sales [ss_item_sk,ss_ext_sales_price,ss_sold_date_sk]
25+
SubqueryBroadcast [d_date_sk] #1
26+
BroadcastExchange #4
27+
WholeStageCodegen (1)
28+
ColumnarToRow
29+
InputAdapter
30+
CometProject [d_date_sk]
31+
CometFilter [d_date_sk,d_date]
32+
CometScan parquet spark_catalog.default.date_dim [d_date_sk,d_date]
33+
CometBroadcastExchange [i_item_sk,i_item_id,i_item_desc,i_current_price,i_class,i_category] #5
34+
CometFilter [i_item_sk,i_item_id,i_item_desc,i_current_price,i_class,i_category]
35+
CometScan parquet spark_catalog.default.item [i_item_sk,i_item_id,i_item_desc,i_current_price,i_class,i_category]
36+
CometBroadcastExchange [d_date_sk] #6
37+
CometProject [d_date_sk]
38+
CometFilter [d_date_sk,d_date]
39+
CometScan parquet spark_catalog.default.date_dim [d_date_sk,d_date]

0 commit comments

Comments
 (0)