Skip to content

Commit fc7154e

Browse files
razeghi71alamb
authored andcommitted
Split EmptyExec into PlaceholderRowExec (#8446)
* add PlaceHolderRowExec * Change produce_one_row=true calls to use PlaceHolderRowExec * remove produce_one_row from EmptyExec, changes in proto serializer, working tests * PlaceHolder => Placeholder --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 834da74 commit fc7154e

File tree

23 files changed

+459
-183
lines changed

23 files changed

+459
-183
lines changed

datafusion/core/src/datasource/empty.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ impl TableProvider for EmptyTable {
7777
// even though there is no data, projections apply
7878
let projected_schema = project_schema(&self.schema, projection)?;
7979
Ok(Arc::new(
80-
EmptyExec::new(false, projected_schema).with_partitions(self.partitions),
80+
EmptyExec::new(projected_schema).with_partitions(self.partitions),
8181
))
8282
}
8383
}

datafusion/core/src/datasource/listing/table.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -685,7 +685,7 @@ impl TableProvider for ListingTable {
685685
if partitioned_file_lists.is_empty() {
686686
let schema = self.schema();
687687
let projected_schema = project_schema(&schema, projection)?;
688-
return Ok(Arc::new(EmptyExec::new(false, projected_schema)));
688+
return Ok(Arc::new(EmptyExec::new(projected_schema)));
689689
}
690690

691691
// extract types of partition columns
@@ -713,7 +713,7 @@ impl TableProvider for ListingTable {
713713
let object_store_url = if let Some(url) = self.table_paths.first() {
714714
url.object_store()
715715
} else {
716-
return Ok(Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))));
716+
return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
717717
};
718718
// create the execution plan
719719
self.options

datafusion/core/src/physical_optimizer/aggregate_statistics.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ use super::optimizer::PhysicalOptimizerRule;
2222
use crate::config::ConfigOptions;
2323
use crate::error::Result;
2424
use crate::physical_plan::aggregates::AggregateExec;
25-
use crate::physical_plan::empty::EmptyExec;
2625
use crate::physical_plan::projection::ProjectionExec;
2726
use crate::physical_plan::{expressions, AggregateExpr, ExecutionPlan, Statistics};
2827
use crate::scalar::ScalarValue;
2928

3029
use datafusion_common::stats::Precision;
3130
use datafusion_common::tree_node::TreeNode;
3231
use datafusion_expr::utils::COUNT_STAR_EXPANSION;
32+
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
3333

3434
/// Optimizer that uses available statistics for aggregate functions
3535
#[derive(Default)]
@@ -82,7 +82,7 @@ impl PhysicalOptimizerRule for AggregateStatistics {
8282
// input can be entirely removed
8383
Ok(Arc::new(ProjectionExec::try_new(
8484
projections,
85-
Arc::new(EmptyExec::new(true, plan.schema())),
85+
Arc::new(PlaceholderRowExec::new(plan.schema())),
8686
)?))
8787
} else {
8888
plan.map_children(|child| self.optimize(child, _config))

datafusion/core/src/physical_optimizer/join_selection.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1623,12 +1623,12 @@ mod hash_join_tests {
16231623

16241624
let children = vec![
16251625
PipelineStatePropagator {
1626-
plan: Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))),
1626+
plan: Arc::new(EmptyExec::new(Arc::new(Schema::empty()))),
16271627
unbounded: left_unbounded,
16281628
children: vec![],
16291629
},
16301630
PipelineStatePropagator {
1631-
plan: Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))),
1631+
plan: Arc::new(EmptyExec::new(Arc::new(Schema::empty()))),
16321632
unbounded: right_unbounded,
16331633
children: vec![],
16341634
},

datafusion/core/src/physical_planner.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ use datafusion_expr::{
9191
WindowFrameBound, WriteOp,
9292
};
9393
use datafusion_physical_expr::expressions::Literal;
94+
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
9495
use datafusion_sql::utils::window_expr_common_partition_keys;
9596

9697
use async_trait::async_trait;
@@ -1196,10 +1197,15 @@ impl DefaultPhysicalPlanner {
11961197
}
11971198
LogicalPlan::Subquery(_) => todo!(),
11981199
LogicalPlan::EmptyRelation(EmptyRelation {
1199-
produce_one_row,
1200+
produce_one_row: false,
12001201
schema,
12011202
}) => Ok(Arc::new(EmptyExec::new(
1202-
*produce_one_row,
1203+
SchemaRef::new(schema.as_ref().to_owned().into()),
1204+
))),
1205+
LogicalPlan::EmptyRelation(EmptyRelation {
1206+
produce_one_row: true,
1207+
schema,
1208+
}) => Ok(Arc::new(PlaceholderRowExec::new(
12031209
SchemaRef::new(schema.as_ref().to_owned().into()),
12041210
))),
12051211
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => {
@@ -2767,7 +2773,7 @@ mod tests {
27672773
27682774
digraph {
27692775
1[shape=box label="ProjectionExec: expr=[id@0 + 2 as employee.id + Int32(2)]", tooltip=""]
2770-
2[shape=box label="EmptyExec: produce_one_row=false", tooltip=""]
2776+
2[shape=box label="EmptyExec", tooltip=""]
27712777
1 -> 2 [arrowhead=none, arrowtail=normal, dir=back]
27722778
}
27732779
// End DataFusion GraphViz Plan

datafusion/core/tests/custom_sources.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ use datafusion::execution::context::{SessionContext, SessionState, TaskContext};
3030
use datafusion::logical_expr::{
3131
col, Expr, LogicalPlan, LogicalPlanBuilder, TableScan, UNNAMED_TABLE,
3232
};
33-
use datafusion::physical_plan::empty::EmptyExec;
3433
use datafusion::physical_plan::expressions::PhysicalSortExpr;
3534
use datafusion::physical_plan::{
3635
collect, ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
@@ -42,6 +41,7 @@ use datafusion_common::project_schema;
4241
use datafusion_common::stats::Precision;
4342

4443
use async_trait::async_trait;
44+
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
4545
use futures::stream::Stream;
4646

4747
/// Also run all tests that are found in the `custom_sources_cases` directory
@@ -256,9 +256,9 @@ async fn optimizers_catch_all_statistics() {
256256

257257
let physical_plan = df.create_physical_plan().await.unwrap();
258258

259-
// when the optimization kicks in, the source is replaced by an EmptyExec
259+
// when the optimization kicks in, the source is replaced by an PlaceholderRowExec
260260
assert!(
261-
contains_empty_exec(Arc::clone(&physical_plan)),
261+
contains_place_holder_exec(Arc::clone(&physical_plan)),
262262
"Expected aggregate_statistics optimizations missing: {physical_plan:?}"
263263
);
264264

@@ -283,12 +283,12 @@ async fn optimizers_catch_all_statistics() {
283283
assert_eq!(format!("{:?}", actual[0]), format!("{expected:?}"));
284284
}
285285

286-
fn contains_empty_exec(plan: Arc<dyn ExecutionPlan>) -> bool {
287-
if plan.as_any().is::<EmptyExec>() {
286+
fn contains_place_holder_exec(plan: Arc<dyn ExecutionPlan>) -> bool {
287+
if plan.as_any().is::<PlaceholderRowExec>() {
288288
true
289289
} else if plan.children().len() != 1 {
290290
false
291291
} else {
292-
contains_empty_exec(Arc::clone(&plan.children()[0]))
292+
contains_place_holder_exec(Arc::clone(&plan.children()[0]))
293293
}
294294
}

datafusion/core/tests/sql/explain_analyze.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,7 @@ async fn explain_analyze_runs_optimizers() {
575575

576576
// This happens as an optimization pass where count(*) can be
577577
// answered using statistics only.
578-
let expected = "EmptyExec: produce_one_row=true";
578+
let expected = "PlaceholderRowExec";
579579

580580
let sql = "EXPLAIN SELECT count(*) from alltypes_plain";
581581
let actual = execute_to_batches(&ctx, sql).await;
@@ -806,7 +806,7 @@ async fn explain_physical_plan_only() {
806806
let expected = vec![vec![
807807
"physical_plan",
808808
"ProjectionExec: expr=[2 as COUNT(*)]\
809-
\n EmptyExec: produce_one_row=true\
809+
\n PlaceholderRowExec\
810810
\n",
811811
]];
812812
assert_eq!(expected, actual);

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1208,7 +1208,7 @@ impl LogicalPlan {
12081208
self.with_new_exprs(new_exprs, &new_inputs_with_values)
12091209
}
12101210

1211-
/// Walk the logical plan, find any `PlaceHolder` tokens, and return a map of their IDs and DataTypes
1211+
/// Walk the logical plan, find any `Placeholder` tokens, and return a map of their IDs and DataTypes
12121212
pub fn get_parameter_types(
12131213
&self,
12141214
) -> Result<HashMap<String, Option<DataType>>, DataFusionError> {

datafusion/optimizer/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ Looking at the `EXPLAIN` output we can see that the optimizer has effectively re
153153
| logical_plan | Projection: Int64(3) AS Int64(1) + Int64(2) |
154154
| | EmptyRelation |
155155
| physical_plan | ProjectionExec: expr=[3 as Int64(1) + Int64(2)] |
156-
| | EmptyExec: produce_one_row=true |
156+
| | PlaceholderRowExec |
157157
| | |
158158
+---------------+-------------------------------------------------+
159159
```
@@ -318,15 +318,15 @@ In the following example, the `type_coercion` and `simplify_expressions` passes
318318
| logical_plan | Projection: Utf8("3.2") AS foo |
319319
| | EmptyRelation |
320320
| initial_physical_plan | ProjectionExec: expr=[3.2 as foo] |
321-
| | EmptyExec: produce_one_row=true |
321+
| | PlaceholderRowExec |
322322
| | |
323323
| physical_plan after aggregate_statistics | SAME TEXT AS ABOVE |
324324
| physical_plan after join_selection | SAME TEXT AS ABOVE |
325325
| physical_plan after coalesce_batches | SAME TEXT AS ABOVE |
326326
| physical_plan after repartition | SAME TEXT AS ABOVE |
327327
| physical_plan after add_merge_exec | SAME TEXT AS ABOVE |
328328
| physical_plan | ProjectionExec: expr=[3.2 as foo] |
329-
| | EmptyExec: produce_one_row=true |
329+
| | PlaceholderRowExec |
330330
| | |
331331
+------------------------------------------------------------+---------------------------------------------------------------------------+
332332
```

datafusion/physical-plan/src/display.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
132132
/// ```dot
133133
/// strict digraph dot_plan {
134134
// 0[label="ProjectionExec: expr=[id@0 + 2 as employee.id + Int32(2)]",tooltip=""]
135-
// 1[label="EmptyExec: produce_one_row=false",tooltip=""]
135+
// 1[label="EmptyExec",tooltip=""]
136136
// 0 -> 1
137137
// }
138138
/// ```

datafusion/physical-plan/src/empty.rs

Lines changed: 10 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! EmptyRelation execution plan
18+
//! EmptyRelation with produce_one_row=false execution plan
1919
2020
use std::any::Any;
2121
use std::sync::Arc;
@@ -24,19 +24,16 @@ use super::expressions::PhysicalSortExpr;
2424
use super::{common, DisplayAs, SendableRecordBatchStream, Statistics};
2525
use crate::{memory::MemoryStream, DisplayFormatType, ExecutionPlan, Partitioning};
2626

27-
use arrow::array::{ArrayRef, NullArray};
28-
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
27+
use arrow::datatypes::SchemaRef;
2928
use arrow::record_batch::RecordBatch;
3029
use datafusion_common::{internal_err, DataFusionError, Result};
3130
use datafusion_execution::TaskContext;
3231

3332
use log::trace;
3433

35-
/// Execution plan for empty relation (produces no rows)
34+
/// Execution plan for empty relation with produce_one_row=false
3635
#[derive(Debug)]
3736
pub struct EmptyExec {
38-
/// Specifies whether this exec produces a row or not
39-
produce_one_row: bool,
4037
/// The schema for the produced row
4138
schema: SchemaRef,
4239
/// Number of partitions
@@ -45,9 +42,8 @@ pub struct EmptyExec {
4542

4643
impl EmptyExec {
4744
/// Create a new EmptyExec
48-
pub fn new(produce_one_row: bool, schema: SchemaRef) -> Self {
45+
pub fn new(schema: SchemaRef) -> Self {
4946
EmptyExec {
50-
produce_one_row,
5147
schema,
5248
partitions: 1,
5349
}
@@ -59,36 +55,8 @@ impl EmptyExec {
5955
self
6056
}
6157

62-
/// Specifies whether this exec produces a row or not
63-
pub fn produce_one_row(&self) -> bool {
64-
self.produce_one_row
65-
}
66-
6758
fn data(&self) -> Result<Vec<RecordBatch>> {
68-
let batch = if self.produce_one_row {
69-
let n_field = self.schema.fields.len();
70-
// hack for https://github.com/apache/arrow-datafusion/pull/3242
71-
let n_field = if n_field == 0 { 1 } else { n_field };
72-
vec![RecordBatch::try_new(
73-
Arc::new(Schema::new(
74-
(0..n_field)
75-
.map(|i| {
76-
Field::new(format!("placeholder_{i}"), DataType::Null, true)
77-
})
78-
.collect::<Fields>(),
79-
)),
80-
(0..n_field)
81-
.map(|_i| {
82-
let ret: ArrayRef = Arc::new(NullArray::new(1));
83-
ret
84-
})
85-
.collect(),
86-
)?]
87-
} else {
88-
vec![]
89-
};
90-
91-
Ok(batch)
59+
Ok(vec![])
9260
}
9361
}
9462

@@ -100,7 +68,7 @@ impl DisplayAs for EmptyExec {
10068
) -> std::fmt::Result {
10169
match t {
10270
DisplayFormatType::Default | DisplayFormatType::Verbose => {
103-
write!(f, "EmptyExec: produce_one_row={}", self.produce_one_row)
71+
write!(f, "EmptyExec")
10472
}
10573
}
10674
}
@@ -133,10 +101,7 @@ impl ExecutionPlan for EmptyExec {
133101
self: Arc<Self>,
134102
_: Vec<Arc<dyn ExecutionPlan>>,
135103
) -> Result<Arc<dyn ExecutionPlan>> {
136-
Ok(Arc::new(EmptyExec::new(
137-
self.produce_one_row,
138-
self.schema.clone(),
139-
)))
104+
Ok(Arc::new(EmptyExec::new(self.schema.clone())))
140105
}
141106

142107
fn execute(
@@ -184,7 +149,7 @@ mod tests {
184149
let task_ctx = Arc::new(TaskContext::default());
185150
let schema = test::aggr_test_schema();
186151

187-
let empty = EmptyExec::new(false, schema.clone());
152+
let empty = EmptyExec::new(schema.clone());
188153
assert_eq!(empty.schema(), schema);
189154

190155
// we should have no results
@@ -198,16 +163,11 @@ mod tests {
198163
#[test]
199164
fn with_new_children() -> Result<()> {
200165
let schema = test::aggr_test_schema();
201-
let empty = Arc::new(EmptyExec::new(false, schema.clone()));
202-
let empty_with_row = Arc::new(EmptyExec::new(true, schema));
166+
let empty = Arc::new(EmptyExec::new(schema.clone()));
203167

204168
let empty2 = with_new_children_if_necessary(empty.clone(), vec![])?.into();
205169
assert_eq!(empty.schema(), empty2.schema());
206170

207-
let empty_with_row_2 =
208-
with_new_children_if_necessary(empty_with_row.clone(), vec![])?.into();
209-
assert_eq!(empty_with_row.schema(), empty_with_row_2.schema());
210-
211171
let too_many_kids = vec![empty2];
212172
assert!(
213173
with_new_children_if_necessary(empty, too_many_kids).is_err(),
@@ -220,44 +180,11 @@ mod tests {
220180
async fn invalid_execute() -> Result<()> {
221181
let task_ctx = Arc::new(TaskContext::default());
222182
let schema = test::aggr_test_schema();
223-
let empty = EmptyExec::new(false, schema);
183+
let empty = EmptyExec::new(schema);
224184

225185
// ask for the wrong partition
226186
assert!(empty.execute(1, task_ctx.clone()).is_err());
227187
assert!(empty.execute(20, task_ctx).is_err());
228188
Ok(())
229189
}
230-
231-
#[tokio::test]
232-
async fn produce_one_row() -> Result<()> {
233-
let task_ctx = Arc::new(TaskContext::default());
234-
let schema = test::aggr_test_schema();
235-
let empty = EmptyExec::new(true, schema);
236-
237-
let iter = empty.execute(0, task_ctx)?;
238-
let batches = common::collect(iter).await?;
239-
240-
// should have one item
241-
assert_eq!(batches.len(), 1);
242-
243-
Ok(())
244-
}
245-
246-
#[tokio::test]
247-
async fn produce_one_row_multiple_partition() -> Result<()> {
248-
let task_ctx = Arc::new(TaskContext::default());
249-
let schema = test::aggr_test_schema();
250-
let partitions = 3;
251-
let empty = EmptyExec::new(true, schema).with_partitions(partitions);
252-
253-
for n in 0..partitions {
254-
let iter = empty.execute(n, task_ctx.clone())?;
255-
let batches = common::collect(iter).await?;
256-
257-
// should have one item
258-
assert_eq!(batches.len(), 1);
259-
}
260-
261-
Ok(())
262-
}
263190
}

datafusion/physical-plan/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ pub mod limit;
5959
pub mod memory;
6060
pub mod metrics;
6161
mod ordering;
62+
pub mod placeholder_row;
6263
pub mod projection;
6364
pub mod repartition;
6465
pub mod sorts;

0 commit comments

Comments
 (0)