Skip to content

Commit dd48c63

Browse files
committed
update
Signed-off-by: Weizhen Wang <[email protected]>
1 parent 2624692 commit dd48c63

File tree

7 files changed

+160
-37
lines changed

7 files changed

+160
-37
lines changed

Diff for: pkg/planner/core/casetest/tpch/BUILD.bazel

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ go_test(
99
],
1010
data = glob(["testdata/**"]),
1111
flaky = True,
12-
shard_count = 3,
12+
shard_count = 4,
1313
deps = [
1414
"//pkg/config",
1515
"//pkg/testkit",

Diff for: pkg/planner/core/casetest/tpch/testdata/tpch_suite_in.json

+6
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@
77
"explain format='brief' SELECT /*+ SHUFFLE_JOIN(orders, lineitem) */ o.o_orderdate, SUM(l.l_extendedprice * (1 - l.l_discount)) AS revenue FROM orders AS o JOIN lineitem AS l ON o.o_orderkey = l.l_orderkey WHERE o.o_orderdate BETWEEN '1994-01-01' AND '1994-12-31' GROUP BY o.o_orderdate ORDER BY revenue DESC LIMIT 10;"
88
]
99
},
10+
{
11+
"name": "TestQ9",
12+
"cases": [
13+
"explain format='brief' SELECT nation, o_year, SUM(amount) AS sum_profit FROM (SELECT n_name AS nation, EXTRACT(YEAR FROM o_orderdate) AS o_year, l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity AS amount FROM part, supplier, lineitem, partsupp, orders, nation WHERE s_suppkey = l_suppkey AND ps_suppkey = l_suppkey AND ps_partkey = l_partkey AND p_partkey = l_partkey AND o_orderkey = l_orderkey AND s_nationkey = n_nationkey AND p_name LIKE '%dim%') AS profit GROUP BY nation, o_year ORDER BY nation, o_year DESC;"
14+
]
15+
},
1016
{
1117
"name": "TestQ13",
1218
"cases": [

Diff for: pkg/planner/core/casetest/tpch/testdata/tpch_suite_out.json

+58
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,64 @@
7373
}
7474
]
7575
},
76+
{
77+
"Name": "TestQ9",
78+
"Cases": [
79+
{
80+
"SQL": "explain format='brief' SELECT nation, o_year, SUM(amount) AS sum_profit FROM (SELECT n_name AS nation, EXTRACT(YEAR FROM o_orderdate) AS o_year, l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity AS amount FROM part, supplier, lineitem, partsupp, orders, nation WHERE s_suppkey = l_suppkey AND ps_suppkey = l_suppkey AND ps_partkey = l_partkey AND p_partkey = l_partkey AND o_orderkey = l_orderkey AND s_nationkey = n_nationkey AND p_name LIKE '%dim%') AS profit GROUP BY nation, o_year ORDER BY nation, o_year DESC;",
81+
"Result": [
82+
"Sort 8000.00 root test.nation.n_name, Column#52:desc",
83+
"└─TableReader 8000.00 root MppVersion: 3, data:ExchangeSender",
84+
" └─ExchangeSender 8000.00 mpp[tiflash] ExchangeType: PassThrough",
85+
" └─Projection 8000.00 mpp[tiflash] test.nation.n_name, Column#52, Column#54",
86+
" └─Projection 8000.00 mpp[tiflash] Column#54, test.nation.n_name, Column#52",
87+
" └─HashAgg 8000.00 mpp[tiflash] group by:Column#72, test.nation.n_name, funcs:sum(Column#73)->Column#54, funcs:firstrow(test.nation.n_name)->test.nation.n_name, funcs:firstrow(Column#72)->Column#52",
88+
" └─ExchangeReceiver 8000.00 mpp[tiflash] ",
89+
" └─ExchangeSender 8000.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.nation.n_name, collate: utf8mb4_bin]",
90+
" └─HashAgg 8000.00 mpp[tiflash] group by:Column#77, Column#78, funcs:sum(Column#76)->Column#73",
91+
" └─Projection 24414.06 mpp[tiflash] minus(mul(test.lineitem.l_extendedprice, minus(1, test.lineitem.l_discount)), mul(test.partsupp.ps_supplycost, test.lineitem.l_quantity))->Column#76, test.nation.n_name->Column#77, extract(YEAR, test.orders.o_orderdate)->Column#78",
92+
" └─Projection 24414.06 mpp[tiflash] test.lineitem.l_quantity, test.lineitem.l_extendedprice, test.lineitem.l_discount, test.partsupp.ps_supplycost, test.orders.o_orderdate, test.nation.n_name",
93+
" └─Projection 24414.06 mpp[tiflash] test.lineitem.l_quantity, test.lineitem.l_extendedprice, test.lineitem.l_discount, test.partsupp.ps_supplycost, test.orders.o_orderdate, test.nation.n_name, test.supplier.s_nationkey",
94+
" └─HashJoin 24414.06 mpp[tiflash] inner join, equal:[eq(test.supplier.s_nationkey, test.nation.n_nationkey)]",
95+
" ├─ExchangeReceiver(Build) 10000.00 mpp[tiflash] ",
96+
" │ └─ExchangeSender 10000.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.nation.n_nationkey, collate: binary]",
97+
" │ └─TableFullScan 10000.00 mpp[tiflash] table:nation keep order:false, stats:pseudo",
98+
" └─ExchangeReceiver(Probe) 19531.25 mpp[tiflash] ",
99+
" └─ExchangeSender 19531.25 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.supplier.s_nationkey, collate: binary]",
100+
" └─Projection 19531.25 mpp[tiflash] test.lineitem.l_quantity, test.lineitem.l_extendedprice, test.lineitem.l_discount, test.supplier.s_nationkey, test.partsupp.ps_supplycost, test.orders.o_orderdate, test.lineitem.l_orderkey",
101+
" └─HashJoin 19531.25 mpp[tiflash] inner join, equal:[eq(test.lineitem.l_orderkey, test.orders.o_orderkey)]",
102+
" ├─ExchangeReceiver(Build) 10000.00 mpp[tiflash] ",
103+
" │ └─ExchangeSender 10000.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.orders.o_orderkey, collate: binary]",
104+
" │ └─TableFullScan 10000.00 mpp[tiflash] table:orders keep order:false, stats:pseudo",
105+
" └─ExchangeReceiver(Probe) 15625.00 mpp[tiflash] ",
106+
" └─ExchangeSender 15625.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.lineitem.l_orderkey, collate: binary]",
107+
" └─Projection 15625.00 mpp[tiflash] test.lineitem.l_orderkey, test.lineitem.l_quantity, test.lineitem.l_extendedprice, test.lineitem.l_discount, test.supplier.s_nationkey, test.partsupp.ps_supplycost, test.lineitem.l_suppkey, test.lineitem.l_partkey",
108+
" └─HashJoin 15625.00 mpp[tiflash] inner join, equal:[eq(test.lineitem.l_suppkey, test.partsupp.ps_suppkey) eq(test.lineitem.l_partkey, test.partsupp.ps_partkey)]",
109+
" ├─ExchangeReceiver(Build) 10000.00 mpp[tiflash] ",
110+
" │ └─ExchangeSender 10000.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.partsupp.ps_suppkey, collate: binary], [name: test.partsupp.ps_partkey, collate: binary]",
111+
" │ └─TableFullScan 10000.00 mpp[tiflash] table:partsupp keep order:false, stats:pseudo",
112+
" └─ExchangeReceiver(Probe) 12500.00 mpp[tiflash] ",
113+
" └─ExchangeSender 12500.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.lineitem.l_suppkey, collate: binary], [name: test.lineitem.l_partkey, collate: binary]",
114+
" └─Projection 12500.00 mpp[tiflash] test.lineitem.l_orderkey, test.lineitem.l_partkey, test.lineitem.l_suppkey, test.lineitem.l_quantity, test.lineitem.l_extendedprice, test.lineitem.l_discount, test.supplier.s_nationkey, test.supplier.s_suppkey",
115+
" └─HashJoin 12500.00 mpp[tiflash] inner join, equal:[eq(test.lineitem.l_suppkey, test.supplier.s_suppkey)]",
116+
" ├─ExchangeReceiver(Build) 10000.00 mpp[tiflash] ",
117+
" │ └─ExchangeSender 10000.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.lineitem.l_suppkey, collate: binary]",
118+
" │ └─Projection 10000.00 mpp[tiflash] test.lineitem.l_orderkey, test.lineitem.l_partkey, test.lineitem.l_suppkey, test.lineitem.l_quantity, test.lineitem.l_extendedprice, test.lineitem.l_discount",
119+
" │ └─HashJoin 10000.00 mpp[tiflash] inner join, equal:[eq(test.part.p_partkey, test.lineitem.l_partkey)]",
120+
" │ ├─ExchangeReceiver(Build) 8000.00 mpp[tiflash] ",
121+
" │ │ └─ExchangeSender 8000.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.part.p_partkey, collate: binary]",
122+
" │ │ └─Selection 8000.00 mpp[tiflash] like(test.part.p_name, \"%dim%\", 92)",
123+
" │ │ └─TableFullScan 10000.00 mpp[tiflash] table:part pushed down filter:empty, keep order:false, stats:pseudo",
124+
" │ └─ExchangeReceiver(Probe) 10000.00 mpp[tiflash] ",
125+
" │ └─ExchangeSender 10000.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.lineitem.l_partkey, collate: binary]",
126+
" │ └─TableFullScan 10000.00 mpp[tiflash] table:lineitem keep order:false, stats:pseudo",
127+
" └─ExchangeReceiver(Probe) 10000.00 mpp[tiflash] ",
128+
" └─ExchangeSender 10000.00 mpp[tiflash] ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: test.supplier.s_suppkey, collate: binary]",
129+
" └─TableFullScan 10000.00 mpp[tiflash] table:supplier keep order:false, stats:pseudo"
130+
]
131+
}
132+
]
133+
},
76134
{
77135
"Name": "TestQ13",
78136
"Cases": [

Diff for: pkg/planner/core/casetest/tpch/tpch_test.go

+19-32
Original file line numberDiff line numberDiff line change
@@ -178,39 +178,26 @@ CREATE TABLE orders (
178178
testkit.SetTiFlashReplica(t, dom, "test", "part")
179179
testkit.SetTiFlashReplica(t, dom, "test", "partsupp")
180180
testkit.SetTiFlashReplica(t, dom, "test", "supplier")
181-
tk.MustQuery(`explain select
182-
nation,
183-
o_year,
184-
sum(amount) as sum_profit
185-
from
186-
(
187-
select
188-
n_name as nation,
189-
extract(year from o_orderdate) as o_year,
190-
l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity as amount
191-
from
192-
part,
193-
supplier,
194-
lineitem,
195-
partsupp,
196-
orders,
197-
nation
198-
where
199-
s_suppkey = l_suppkey
200-
and ps_suppkey = l_suppkey
201-
and ps_partkey = l_partkey
202-
and p_partkey = l_partkey
203-
and o_orderkey = l_orderkey
204-
and s_nationkey = n_nationkey
205-
and p_name like '%dim%'
206-
) as profit
207-
group by
208-
nation,
209-
o_year
210-
order by
211-
nation,
212-
o_year desc;`).Check(testkit.Rows())
181+
integrationSuiteData := GetTPCHSuiteData()
182+
var (
183+
input []string
184+
output []struct {
185+
SQL string
186+
Result []string
187+
}
188+
)
189+
integrationSuiteData.LoadTestCases(t, &input, &output)
190+
for i := 0; i < len(input); i++ {
191+
testdata.OnRecord(func() {
192+
output[i].SQL = input[i]
193+
})
194+
testdata.OnRecord(func() {
195+
output[i].Result = testdata.ConvertRowsToStrings(tk.MustQuery(input[i]).Rows())
196+
})
197+
tk.MustQuery(input[i]).Check(testkit.Rows(output[i].Result...))
198+
}
213199
}
200+
214201
func TestQ13(t *testing.T) {
215202
store, dom := testkit.CreateMockStoreAndDomain(t)
216203
tk := testkit.NewTestKit(t, store)

Diff for: pkg/planner/core/initialize.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ func (base basePhysicalAgg) Init(ctx base.PlanContext, stats *property.StatsInfo
218218
}
219219

220220
func (base basePhysicalAgg) initForHash(ctx base.PlanContext, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalHashAgg {
221-
p := &PhysicalHashAgg{base, ""}
221+
p := &PhysicalHashAgg{base, "", nil}
222222
p.BasePhysicalPlan = physicalop.NewBasePhysicalPlan(ctx, plancodec.TypeHashAgg, p, offset)
223223
p.SetChildrenReqProps(props)
224224
p.SetStats(stats)

Diff for: pkg/planner/core/physical_plans.go

+12
Original file line numberDiff line numberDiff line change
@@ -1503,6 +1503,12 @@ type PhysicalHashJoin struct {
15031503

15041504
// for runtime filter
15051505
runtimeFilterList []*RuntimeFilter `plan-cache-clone:"must-nil"` // plan with runtime filter is not cached
1506+
1507+
// backupHashProp is used to store the hash partitioning information before `NeedMPPExchangeByEquivalence`.
1508+
// because `NeedMPPExchangeByEquivalence` will change the hash partitioning information.
1509+
// Having different numbers of MPP partitions in the child nodes of a join can lead to invalid tasks,
1510+
// which ultimately result in incorrect plan.
1511+
backupHashProp *property.PhysicalProperty `plan-cache-clone:"shallow"`
15061512
}
15071513

15081514
// CanUseHashJoinV2 returns true if current join is supported by hash join v2
@@ -2221,6 +2227,12 @@ func (p *basePhysicalAgg) MemoryUsage() (sum int64) {
22212227
type PhysicalHashAgg struct {
22222228
basePhysicalAgg
22232229
tiflashPreAggMode string
2230+
2231+
// backupHashProp is used to store the hash partitioning information before `NeedMPPExchangeByEquivalence`.
2232+
// because `NeedMPPExchangeByEquivalence` will change the hash partitioning information.
2233+
// Having different numbers of MPP partitions in the child nodes of a join can lead to invalid tasks,
2234+
// which ultimately result in incorrect plan.
2235+
backupHashProp *property.PhysicalProperty `plan-cache-clone:"shallow"`
22242236
}
22252237

22262238
func (p *PhysicalHashAgg) getPointer() *basePhysicalAgg {

Diff for: pkg/planner/core/task.go

+63-3
Original file line numberDiff line numberDiff line change
@@ -431,17 +431,40 @@ func (p *PhysicalHashJoin) convertPartitionKeysIfNeed(lTask, rTask *MppTask) (_,
431431
return lTask, rTask
432432
}
433433

434+
func enforceExchangerIfValid(task *MppTask, prop *property.PhysicalProperty, expectedCols int) *MppTask {
435+
if prop != nil && len(prop.MPPPartitionCols) == expectedCols {
436+
return task.enforceExchangerImpl(prop)
437+
}
438+
return nil
439+
}
440+
434441
func (p *PhysicalHashJoin) attach2TaskForMpp(tasks ...base.Task) base.Task {
435442
rTask, rok := tasks[1].(*MppTask)
436443
lTask, lok := tasks[0].(*MppTask)
437444
if !lok || !rok {
438445
return base.InvalidTask
439446
}
440447
if p.mppShuffleJoin {
441-
// protection check is case of some bugs
442-
if len(lTask.hashCols) != len(rTask.hashCols) || len(lTask.hashCols) == 0 {
448+
if len(lTask.hashCols) == 0 || len(rTask.hashCols) == 0 {
449+
// if the hash columns are empty, this is very likely a bug.
443450
return base.InvalidTask
444451
}
452+
if len(lTask.hashCols) != len(rTask.hashCols) {
453+
// if the hash columns are not the same, The most likely scenario is that
454+
// they have undergone exchange optimization, removing some hash columns.
455+
// In this case, we need to restore them.
456+
var backupHashProp *property.PhysicalProperty
457+
if len(lTask.hashCols) < len(rTask.hashCols) {
458+
backupHashProp = lTask.getBackupHashProp()
459+
lTask = enforceExchangerIfValid(lTask, backupHashProp, len(rTask.hashCols))
460+
} else {
461+
backupHashProp = rTask.getBackupHashProp()
462+
rTask = enforceExchangerIfValid(rTask, backupHashProp, len(lTask.hashCols))
463+
}
464+
if lTask == nil || rTask == nil {
465+
return base.InvalidTask
466+
}
467+
}
445468
lTask, rTask = p.convertPartitionKeysIfNeed(lTask, rTask)
446469
}
447470
p.SetChildren(lTask.Plan(), rTask.Plan())
@@ -2680,6 +2703,19 @@ func tryExpandVirtualColumn(p base.PhysicalPlan) {
26802703
}
26812704
}
26822705

2706+
func setBackupHashProp(prop *property.PhysicalProperty, p base.PhysicalPlan) {
2707+
switch pp := p.(type) {
2708+
case *PhysicalHashJoin:
2709+
pp.backupHashProp = prop
2710+
case *PhysicalHashAgg:
2711+
pp.backupHashProp = prop
2712+
default:
2713+
if child := p.Children(); len(child) == 1 {
2714+
setBackupHashProp(prop, child[0])
2715+
}
2716+
}
2717+
}
2718+
26832719
func (t *MppTask) needEnforceExchanger(prop *property.PhysicalProperty, fd *funcdep.FDSet) bool {
26842720
switch prop.MPPPartitionTp {
26852721
case property.AnyType:
@@ -2694,7 +2730,11 @@ func (t *MppTask) needEnforceExchanger(prop *property.PhysicalProperty, fd *func
26942730
}
26952731
// for example, if already partitioned by hash(B,C), then same (A,B,C) must distribute on a same node.
26962732
if fd != nil && len(t.hashCols) != 0 {
2697-
return prop.NeedMPPExchangeByEquivalence(t.hashCols, fd)
2733+
if prop.NeedMPPExchangeByEquivalence(t.hashCols, fd) {
2734+
return true
2735+
}
2736+
setBackupHashProp(prop, t.p)
2737+
return false
26982738
}
26992739
if len(prop.MPPPartitionCols) != len(t.hashCols) {
27002740
return true
@@ -2708,6 +2748,26 @@ func (t *MppTask) needEnforceExchanger(prop *property.PhysicalProperty, fd *func
27082748
}
27092749
}
27102750

2751+
func (t *MppTask) getBackupHashProp() *property.PhysicalProperty {
2752+
return getBackupHashProp(t.p)
2753+
}
2754+
2755+
func getBackupHashProp(p base.PhysicalPlan) *property.PhysicalProperty {
2756+
if p == nil {
2757+
return nil
2758+
}
2759+
switch pp := p.(type) {
2760+
case *PhysicalHashJoin:
2761+
return pp.backupHashProp
2762+
case *PhysicalHashAgg:
2763+
return pp.backupHashProp
2764+
}
2765+
for _, child := range p.Children() {
2766+
return getBackupHashProp(child)
2767+
}
2768+
return nil
2769+
}
2770+
27112771
func (t *MppTask) enforceExchanger(prop *property.PhysicalProperty, fd *funcdep.FDSet) *MppTask {
27122772
if !t.needEnforceExchanger(prop, fd) {
27132773
return t

0 commit comments

Comments
 (0)