Skip to content

Commit d12d01a

Browse files
authored
consolidate physical_optimizer tests into core/tests/physical_optimizer (#14244)
* consolidate physical_optimizer tests into core/tests/physical_optimizer Move tests * cleanu
1 parent f6b0cc5 commit d12d01a

14 files changed

+1548
-1638
lines changed
Lines changed: 322 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,322 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use datafusion_common::config::ConfigOptions;
19+
use datafusion_execution::TaskContext;
20+
use datafusion_physical_optimizer::aggregate_statistics::AggregateStatistics;
21+
use datafusion_physical_optimizer::PhysicalOptimizerRule;
22+
use datafusion_physical_plan::aggregates::AggregateExec;
23+
use datafusion_physical_plan::projection::ProjectionExec;
24+
use datafusion_physical_plan::ExecutionPlan;
25+
use std::sync::Arc;
26+
27+
use datafusion_common::Result;
28+
29+
use datafusion_physical_plan::aggregates::PhysicalGroupBy;
30+
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
31+
use datafusion_physical_plan::common;
32+
use datafusion_physical_plan::filter::FilterExec;
33+
use datafusion_physical_plan::memory::MemoryExec;
34+
35+
use arrow::array::Int32Array;
36+
use arrow::datatypes::{DataType, Field, Schema};
37+
use arrow::record_batch::RecordBatch;
38+
use datafusion_common::cast::as_int64_array;
39+
use datafusion_expr::Operator;
40+
use datafusion_physical_expr::expressions::{self, cast};
41+
use datafusion_physical_optimizer::test_utils::TestAggregate;
42+
use datafusion_physical_plan::aggregates::AggregateMode;
43+
44+
/// Mock data using a MemoryExec which has an exact count statistic
45+
fn mock_data() -> Result<Arc<MemoryExec>> {
46+
let schema = Arc::new(Schema::new(vec![
47+
Field::new("a", DataType::Int32, true),
48+
Field::new("b", DataType::Int32, true),
49+
]));
50+
51+
let batch = RecordBatch::try_new(
52+
Arc::clone(&schema),
53+
vec![
54+
Arc::new(Int32Array::from(vec![Some(1), Some(2), None])),
55+
Arc::new(Int32Array::from(vec![Some(4), None, Some(6)])),
56+
],
57+
)?;
58+
59+
Ok(Arc::new(MemoryExec::try_new(
60+
&[vec![batch]],
61+
Arc::clone(&schema),
62+
None,
63+
)?))
64+
}
65+
66+
/// Checks that the count optimization was applied and we still get the right result
67+
async fn assert_count_optim_success(
68+
plan: AggregateExec,
69+
agg: TestAggregate,
70+
) -> Result<()> {
71+
let task_ctx = Arc::new(TaskContext::default());
72+
let plan: Arc<dyn ExecutionPlan> = Arc::new(plan);
73+
74+
let config = ConfigOptions::new();
75+
let optimized = AggregateStatistics::new().optimize(Arc::clone(&plan), &config)?;
76+
77+
// A ProjectionExec is a sign that the count optimization was applied
78+
assert!(optimized.as_any().is::<ProjectionExec>());
79+
80+
// run both the optimized and nonoptimized plan
81+
let optimized_result =
82+
common::collect(optimized.execute(0, Arc::clone(&task_ctx))?).await?;
83+
let nonoptimized_result = common::collect(plan.execute(0, task_ctx)?).await?;
84+
assert_eq!(optimized_result.len(), nonoptimized_result.len());
85+
86+
// and validate the results are the same and expected
87+
assert_eq!(optimized_result.len(), 1);
88+
check_batch(optimized_result.into_iter().next().unwrap(), &agg);
89+
// check the non optimized one too to ensure types and names remain the same
90+
assert_eq!(nonoptimized_result.len(), 1);
91+
check_batch(nonoptimized_result.into_iter().next().unwrap(), &agg);
92+
93+
Ok(())
94+
}
95+
96+
fn check_batch(batch: RecordBatch, agg: &TestAggregate) {
97+
let schema = batch.schema();
98+
let fields = schema.fields();
99+
assert_eq!(fields.len(), 1);
100+
101+
let field = &fields[0];
102+
assert_eq!(field.name(), agg.column_name());
103+
assert_eq!(field.data_type(), &DataType::Int64);
104+
// note that nullability differs
105+
106+
assert_eq!(
107+
as_int64_array(batch.column(0)).unwrap().values(),
108+
&[agg.expected_count()]
109+
);
110+
}
111+
112+
#[tokio::test]
113+
async fn test_count_partial_direct_child() -> Result<()> {
114+
// basic test case with the aggregation applied on a source with exact statistics
115+
let source = mock_data()?;
116+
let schema = source.schema();
117+
let agg = TestAggregate::new_count_star();
118+
119+
let partial_agg = AggregateExec::try_new(
120+
AggregateMode::Partial,
121+
PhysicalGroupBy::default(),
122+
vec![Arc::new(agg.count_expr(&schema))],
123+
vec![None],
124+
source,
125+
Arc::clone(&schema),
126+
)?;
127+
128+
let final_agg = AggregateExec::try_new(
129+
AggregateMode::Final,
130+
PhysicalGroupBy::default(),
131+
vec![Arc::new(agg.count_expr(&schema))],
132+
vec![None],
133+
Arc::new(partial_agg),
134+
Arc::clone(&schema),
135+
)?;
136+
137+
assert_count_optim_success(final_agg, agg).await?;
138+
139+
Ok(())
140+
}
141+
142+
#[tokio::test]
143+
async fn test_count_partial_with_nulls_direct_child() -> Result<()> {
144+
// basic test case with the aggregation applied on a source with exact statistics
145+
let source = mock_data()?;
146+
let schema = source.schema();
147+
let agg = TestAggregate::new_count_column(&schema);
148+
149+
let partial_agg = AggregateExec::try_new(
150+
AggregateMode::Partial,
151+
PhysicalGroupBy::default(),
152+
vec![Arc::new(agg.count_expr(&schema))],
153+
vec![None],
154+
source,
155+
Arc::clone(&schema),
156+
)?;
157+
158+
let final_agg = AggregateExec::try_new(
159+
AggregateMode::Final,
160+
PhysicalGroupBy::default(),
161+
vec![Arc::new(agg.count_expr(&schema))],
162+
vec![None],
163+
Arc::new(partial_agg),
164+
Arc::clone(&schema),
165+
)?;
166+
167+
assert_count_optim_success(final_agg, agg).await?;
168+
169+
Ok(())
170+
}
171+
172+
#[tokio::test]
173+
async fn test_count_partial_indirect_child() -> Result<()> {
174+
let source = mock_data()?;
175+
let schema = source.schema();
176+
let agg = TestAggregate::new_count_star();
177+
178+
let partial_agg = AggregateExec::try_new(
179+
AggregateMode::Partial,
180+
PhysicalGroupBy::default(),
181+
vec![Arc::new(agg.count_expr(&schema))],
182+
vec![None],
183+
source,
184+
Arc::clone(&schema),
185+
)?;
186+
187+
// We introduce an intermediate optimization step between the partial and final aggregator
188+
let coalesce = CoalescePartitionsExec::new(Arc::new(partial_agg));
189+
190+
let final_agg = AggregateExec::try_new(
191+
AggregateMode::Final,
192+
PhysicalGroupBy::default(),
193+
vec![Arc::new(agg.count_expr(&schema))],
194+
vec![None],
195+
Arc::new(coalesce),
196+
Arc::clone(&schema),
197+
)?;
198+
199+
assert_count_optim_success(final_agg, agg).await?;
200+
201+
Ok(())
202+
}
203+
204+
#[tokio::test]
205+
async fn test_count_partial_with_nulls_indirect_child() -> Result<()> {
206+
let source = mock_data()?;
207+
let schema = source.schema();
208+
let agg = TestAggregate::new_count_column(&schema);
209+
210+
let partial_agg = AggregateExec::try_new(
211+
AggregateMode::Partial,
212+
PhysicalGroupBy::default(),
213+
vec![Arc::new(agg.count_expr(&schema))],
214+
vec![None],
215+
source,
216+
Arc::clone(&schema),
217+
)?;
218+
219+
// We introduce an intermediate optimization step between the partial and final aggregator
220+
let coalesce = CoalescePartitionsExec::new(Arc::new(partial_agg));
221+
222+
let final_agg = AggregateExec::try_new(
223+
AggregateMode::Final,
224+
PhysicalGroupBy::default(),
225+
vec![Arc::new(agg.count_expr(&schema))],
226+
vec![None],
227+
Arc::new(coalesce),
228+
Arc::clone(&schema),
229+
)?;
230+
231+
assert_count_optim_success(final_agg, agg).await?;
232+
233+
Ok(())
234+
}
235+
236+
#[tokio::test]
237+
async fn test_count_inexact_stat() -> Result<()> {
238+
let source = mock_data()?;
239+
let schema = source.schema();
240+
let agg = TestAggregate::new_count_star();
241+
242+
// adding a filter makes the statistics inexact
243+
let filter = Arc::new(FilterExec::try_new(
244+
expressions::binary(
245+
expressions::col("a", &schema)?,
246+
Operator::Gt,
247+
cast(expressions::lit(1u32), &schema, DataType::Int32)?,
248+
&schema,
249+
)?,
250+
source,
251+
)?);
252+
253+
let partial_agg = AggregateExec::try_new(
254+
AggregateMode::Partial,
255+
PhysicalGroupBy::default(),
256+
vec![Arc::new(agg.count_expr(&schema))],
257+
vec![None],
258+
filter,
259+
Arc::clone(&schema),
260+
)?;
261+
262+
let final_agg = AggregateExec::try_new(
263+
AggregateMode::Final,
264+
PhysicalGroupBy::default(),
265+
vec![Arc::new(agg.count_expr(&schema))],
266+
vec![None],
267+
Arc::new(partial_agg),
268+
Arc::clone(&schema),
269+
)?;
270+
271+
let conf = ConfigOptions::new();
272+
let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?;
273+
274+
// check that the original ExecutionPlan was not replaced
275+
assert!(optimized.as_any().is::<AggregateExec>());
276+
277+
Ok(())
278+
}
279+
280+
#[tokio::test]
281+
async fn test_count_with_nulls_inexact_stat() -> Result<()> {
282+
let source = mock_data()?;
283+
let schema = source.schema();
284+
let agg = TestAggregate::new_count_column(&schema);
285+
286+
// adding a filter makes the statistics inexact
287+
let filter = Arc::new(FilterExec::try_new(
288+
expressions::binary(
289+
expressions::col("a", &schema)?,
290+
Operator::Gt,
291+
cast(expressions::lit(1u32), &schema, DataType::Int32)?,
292+
&schema,
293+
)?,
294+
source,
295+
)?);
296+
297+
let partial_agg = AggregateExec::try_new(
298+
AggregateMode::Partial,
299+
PhysicalGroupBy::default(),
300+
vec![Arc::new(agg.count_expr(&schema))],
301+
vec![None],
302+
filter,
303+
Arc::clone(&schema),
304+
)?;
305+
306+
let final_agg = AggregateExec::try_new(
307+
AggregateMode::Final,
308+
PhysicalGroupBy::default(),
309+
vec![Arc::new(agg.count_expr(&schema))],
310+
vec![None],
311+
Arc::new(partial_agg),
312+
Arc::clone(&schema),
313+
)?;
314+
315+
let conf = ConfigOptions::new();
316+
let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?;
317+
318+
// check that the original ExecutionPlan was not replaced
319+
assert!(optimized.as_any().is::<AggregateExec>());
320+
321+
Ok(())
322+
}

0 commit comments

Comments
 (0)