Skip to content

Commit c648c0c

Browse files
committed
Rework ensure_coop to base itself on evaluation and scheduling properties
1 parent acc08b1 commit c648c0c

File tree

14 files changed

+117
-96
lines changed

14 files changed

+117
-96
lines changed

datafusion/core/tests/user_defined/insert_operation.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use datafusion::{
2626
use datafusion_catalog::{Session, TableProvider};
2727
use datafusion_expr::{dml::InsertOp, Expr, TableType};
2828
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
29+
use datafusion_physical_plan::execution_plan::SchedulingType;
2930
use datafusion_physical_plan::{
3031
execution_plan::{Boundedness, EmissionType},
3132
DisplayAs, ExecutionPlan, PlanProperties,
@@ -132,7 +133,8 @@ impl TestInsertExec {
132133
Partitioning::UnknownPartitioning(1),
133134
EmissionType::Incremental,
134135
Boundedness::Bounded,
135-
),
136+
)
137+
.with_scheduling_type(SchedulingType::Cooperative),
136138
}
137139
}
138140
}
@@ -179,10 +181,6 @@ impl ExecutionPlan for TestInsertExec {
179181
) -> Result<datafusion_execution::SendableRecordBatchStream> {
180182
unimplemented!("TestInsertExec is a stub for testing.")
181183
}
182-
183-
fn with_cooperative_yields(self: Arc<Self>) -> Option<Arc<dyn ExecutionPlan>> {
184-
Some(self)
185-
}
186184
}
187185

188186
fn make_count_schema() -> SchemaRef {

datafusion/datasource/src/sink.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use datafusion_physical_plan::{
3636
};
3737

3838
use async_trait::async_trait;
39+
use datafusion_physical_plan::execution_plan::{EvaluationType, SchedulingType};
3940
use futures::StreamExt;
4041

4142
/// `DataSink` implements writing streams of [`RecordBatch`]es to
@@ -141,6 +142,8 @@ impl DataSinkExec {
141142
input.pipeline_behavior(),
142143
input.boundedness(),
143144
)
145+
.with_scheduling_type(SchedulingType::Cooperative)
146+
.with_evaluation_type(EvaluationType::Eager)
144147
}
145148
}
146149

@@ -246,10 +249,6 @@ impl ExecutionPlan for DataSinkExec {
246249
fn metrics(&self) -> Option<MetricsSet> {
247250
self.sink.metrics()
248251
}
249-
250-
fn with_cooperative_yields(self: Arc<Self>) -> Option<Arc<dyn ExecutionPlan>> {
251-
Some(self)
252-
}
253252
}
254253

255254
/// Create a output record batch with a count

datafusion/datasource/src/source.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ use std::fmt;
2222
use std::fmt::{Debug, Formatter};
2323
use std::sync::Arc;
2424

25-
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
25+
use datafusion_physical_plan::execution_plan::{
26+
Boundedness, EmissionType, SchedulingType,
27+
};
2628
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
2729
use datafusion_physical_plan::projection::ProjectionExec;
2830
use datafusion_physical_plan::{
@@ -262,10 +264,6 @@ impl ExecutionPlan for DataSourceExec {
262264
.map(|stream| make_cooperative(stream))
263265
}
264266

265-
fn with_cooperative_yields(self: Arc<Self>) -> Option<Arc<dyn ExecutionPlan>> {
266-
Some(self)
267-
}
268-
269267
fn metrics(&self) -> Option<MetricsSet> {
270268
Some(self.data_source.metrics().clone_inner())
271269
}
@@ -380,6 +378,7 @@ impl DataSourceExec {
380378
EmissionType::Incremental,
381379
Boundedness::Bounded,
382380
)
381+
.with_scheduling_type(SchedulingType::Cooperative)
383382
}
384383

385384
/// Downcast the `DataSourceExec`'s `data_source` to a specific file source

datafusion/physical-optimizer/src/ensure_coop.rs

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,14 @@ use datafusion_common::config::ConfigOptions;
2929
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
3030
use datafusion_common::Result;
3131
use datafusion_physical_plan::coop::CooperativeExec;
32+
use datafusion_physical_plan::execution_plan::{EvaluationType, SchedulingType};
3233
use datafusion_physical_plan::ExecutionPlan;
3334

34-
/// `EnsureCooperative` is a [`PhysicalOptimizerRule`] that finds every leaf node in
35-
/// the plan and replaces it with a variant that yields cooperatively if supported.
36-
/// If the node does not provide a built-in yielding variant via
37-
/// [`ExecutionPlan::with_cooperative_yields`], it is wrapped in a [`CooperativeExec`] parent.
35+
/// `EnsureCooperative` is a [`PhysicalOptimizerRule`] that inspects the physical plan for
36+
/// sub plans that do not participate in cooperative scheduling. The plan is subdivided into sub
37+
/// plans on eager evaluation boundaries. Leaf nodes and eager evaluation roots are checked
38+
/// to see if they participate in cooperative scheduling. Those that do no are wrapped in
39+
/// a [`CooperativeExec`] parent.
3840
pub struct EnsureCooperative {}
3941

4042
impl EnsureCooperative {
@@ -65,20 +67,22 @@ impl PhysicalOptimizerRule for EnsureCooperative {
6567
plan: Arc<dyn ExecutionPlan>,
6668
_config: &ConfigOptions,
6769
) -> Result<Arc<dyn ExecutionPlan>> {
68-
plan.transform_down(|plan| {
69-
if !plan.children().is_empty() {
70-
// Not a leaf, keep recursing down.
71-
return Ok(Transformed::no(plan));
70+
plan.transform_up(|plan| {
71+
let is_leaf = plan.children().is_empty();
72+
let is_exchange = plan.properties().evaluation_type == EvaluationType::Eager;
73+
if (is_leaf || is_exchange)
74+
&& plan.properties().scheduling_type != SchedulingType::Cooperative
75+
{
76+
// Wrap non-cooperative leaves or eager evaluation roots in a cooperative exec to
77+
// ensure the plans they participate in are properly cooperative.
78+
Ok(Transformed::new(
79+
Arc::new(CooperativeExec::new(Arc::clone(&plan))),
80+
true,
81+
TreeNodeRecursion::Continue,
82+
))
83+
} else {
84+
Ok(Transformed::no(plan))
7285
}
73-
// For leaf nodes, try to get a built-in cooperative-yielding variant.
74-
let new_plan =
75-
Arc::clone(&plan)
76-
.with_cooperative_yields()
77-
.unwrap_or_else(|| {
78-
// Only if no built-in variant exists, insert a `CooperativeExec`.
79-
Arc::new(CooperativeExec::new(plan))
80-
});
81-
Ok(Transformed::new(new_plan, true, TreeNodeRecursion::Jump))
8286
})
8387
.map(|t| t.data)
8488
}
@@ -107,7 +111,7 @@ mod tests {
107111
let display = displayable(optimized.as_ref()).indent(true).to_string();
108112
// Use insta snapshot to ensure full plan structure
109113
assert_snapshot!(display, @r###"
110-
YieldStreamExec frequency=64
114+
CooperativeExec
111115
DataSourceExec: partitions=1, partition_sizes=[1]
112116
"###);
113117
}

datafusion/physical-plan/src/coalesce_partitions.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use super::{
2727
DisplayAs, ExecutionPlanProperties, PlanProperties, SendableRecordBatchStream,
2828
Statistics,
2929
};
30-
use crate::execution_plan::CardinalityEffect;
30+
use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType};
3131
use crate::projection::{make_with_child, ProjectionExec};
3232
use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
3333

@@ -72,6 +72,16 @@ impl CoalescePartitionsExec {
7272

7373
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
7474
fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
75+
let input_partitions = input.output_partitioning().partition_count();
76+
let (drive, scheduling) = if input_partitions > 1 {
77+
(EvaluationType::Eager, SchedulingType::Cooperative)
78+
} else {
79+
(
80+
input.properties().evaluation_type,
81+
input.properties().scheduling_type,
82+
)
83+
};
84+
7585
// Coalescing partitions loses existing orderings:
7686
let mut eq_properties = input.equivalence_properties().clone();
7787
eq_properties.clear_orderings();
@@ -82,6 +92,8 @@ impl CoalescePartitionsExec {
8292
input.pipeline_behavior(),
8393
input.boundedness(),
8494
)
95+
.with_evaluation_type(drive)
96+
.with_scheduling_type(scheduling)
8597
}
8698
}
8799

datafusion/physical-plan/src/coop.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use arrow_schema::Schema;
3030
use datafusion_common::{internal_err, Result, Statistics};
3131
use datafusion_execution::TaskContext;
3232

33+
use crate::execution_plan::SchedulingType;
3334
use crate::stream::RecordBatchStreamAdapter;
3435
use futures::{FutureExt, Stream};
3536
use pin_project_lite::pin_project;
@@ -102,13 +103,19 @@ pub struct CooperativeExec {
102103
/// The child execution plan that this operator "wraps" to make it
103104
/// cooperate with the runtime.
104105
input: Arc<dyn ExecutionPlan>,
106+
properties: PlanProperties,
105107
}
106108

107109
impl CooperativeExec {
108110
/// Creates a new `CooperativeExec` operator that wraps the given child
109111
/// execution plan.
110112
pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
111-
Self { input }
113+
let properties = input
114+
.properties()
115+
.clone()
116+
.with_scheduling_type(SchedulingType::Cooperative);
117+
118+
Self { input, properties }
112119
}
113120

114121
/// Returns the child execution plan this operator "wraps" to make it
@@ -142,7 +149,7 @@ impl ExecutionPlan for CooperativeExec {
142149
}
143150

144151
fn properties(&self) -> &PlanProperties {
145-
self.input.properties()
152+
&self.properties
146153
}
147154

148155
fn maintains_input_order(&self) -> Vec<bool> {

datafusion/physical-plan/src/empty.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use datafusion_common::{internal_err, Result};
3333
use datafusion_execution::TaskContext;
3434
use datafusion_physical_expr::EquivalenceProperties;
3535

36+
use crate::execution_plan::SchedulingType;
3637
use log::trace;
3738

3839
/// Execution plan for empty relation with produce_one_row=false
@@ -81,6 +82,7 @@ impl EmptyExec {
8182
EmissionType::Incremental,
8283
Boundedness::Bounded,
8384
)
85+
.with_scheduling_type(SchedulingType::Cooperative)
8486
}
8587
}
8688

@@ -173,10 +175,6 @@ impl ExecutionPlan for EmptyExec {
173175
None,
174176
))
175177
}
176-
177-
fn with_cooperative_yields(self: Arc<Self>) -> Option<Arc<dyn ExecutionPlan>> {
178-
Some(self)
179-
}
180178
}
181179

182180
#[cfg(test)]

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 35 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@ use crate::coalesce_partitions::CoalescePartitionsExec;
4141
use crate::display::DisplayableExecutionPlan;
4242
use crate::metrics::MetricsSet;
4343
use crate::projection::ProjectionExec;
44-
use crate::repartition::RepartitionExec;
45-
use crate::sorts::sort_preserving_merge::SortPreservingMergeExec;
4644
use crate::stream::RecordBatchStreamAdapter;
4745

4846
use arrow::array::{Array, RecordBatch};
@@ -559,16 +557,6 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
559557
child_pushdown_result,
560558
))
561559
}
562-
563-
/// Returns a version of this plan that cooperates with the runtime via
564-
/// built‐in yielding. If such a version doesn't exist, returns `None`.
565-
/// You do not need to do provide such a version of a custom operator,
566-
/// but DataFusion will utilize it while optimizing the plan if it exists.
567-
fn with_cooperative_yields(self: Arc<Self>) -> Option<Arc<dyn ExecutionPlan>> {
568-
// Conservative default implementation assumes that a leaf does not
569-
// cooperate with yielding.
570-
None
571-
}
572560
}
573561

574562
/// [`ExecutionPlan`] Invariant Level
@@ -743,6 +731,26 @@ pub enum EmissionType {
743731
Both,
744732
}
745733

734+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
735+
pub enum SchedulingType {
736+
/// The stream generated by [`execute`](ExecutionPlan::execute) does not participate in cooperative scheduling
737+
Blocking,
738+
/// The stream generated by [`execute`](ExecutionPlan::execute) actively participates in cooperative scheduling
739+
/// by consuming task budget
740+
Cooperative,
741+
}
742+
743+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
744+
pub enum EvaluationType {
745+
/// The stream generated by [`execute`](ExecutionPlan::execute) only generates `RecordBatch`
746+
/// instances when it is demanded by invoking `Stream::poll_next`.
747+
Lazy,
748+
/// The stream generated by [`execute`](ExecutionPlan::execute) eagerly generates `RecordBatch`
749+
/// in one or more spawned Tokio tasks. Eager evaluation is only started the first time
750+
/// `Stream::poll_next` is called.
751+
Eager,
752+
}
753+
746754
/// Utility to determine an operator's boundedness based on its children's boundedness.
747755
///
748756
/// Assumes boundedness can be inferred from child operators:
@@ -831,6 +839,8 @@ pub struct PlanProperties {
831839
pub emission_type: EmissionType,
832840
/// See [ExecutionPlanProperties::boundedness]
833841
pub boundedness: Boundedness,
842+
pub evaluation_type: EvaluationType,
843+
pub scheduling_type: SchedulingType,
834844
/// See [ExecutionPlanProperties::output_ordering]
835845
output_ordering: Option<LexOrdering>,
836846
}
@@ -850,6 +860,8 @@ impl PlanProperties {
850860
partitioning,
851861
emission_type,
852862
boundedness,
863+
evaluation_type: EvaluationType::Lazy,
864+
scheduling_type: SchedulingType::Blocking,
853865
output_ordering,
854866
}
855867
}
@@ -881,6 +893,16 @@ impl PlanProperties {
881893
self
882894
}
883895

896+
pub fn with_scheduling_type(mut self, scheduling_type: SchedulingType) -> Self {
897+
self.scheduling_type = scheduling_type;
898+
self
899+
}
900+
901+
pub fn with_evaluation_type(mut self, drive_type: EvaluationType) -> Self {
902+
self.evaluation_type = drive_type;
903+
self
904+
}
905+
884906
/// Overwrite constraints with its new value.
885907
pub fn with_constraints(mut self, constraints: Constraints) -> Self {
886908
self.eq_properties = self.eq_properties.with_constraints(constraints);
@@ -912,25 +934,7 @@ impl PlanProperties {
912934
/// 2. CoalescePartitionsExec for collapsing all of the partitions into one without ordering guarantee
913935
/// 3. SortPreservingMergeExec for collapsing all of the sorted partitions into one with ordering guarantee
914936
pub fn need_data_exchange(plan: Arc<dyn ExecutionPlan>) -> bool {
915-
if let Some(repartition) = plan.as_any().downcast_ref::<RepartitionExec>() {
916-
!matches!(
917-
repartition.properties().output_partitioning(),
918-
Partitioning::RoundRobinBatch(_)
919-
)
920-
} else if let Some(coalesce) = plan.as_any().downcast_ref::<CoalescePartitionsExec>()
921-
{
922-
coalesce.input().output_partitioning().partition_count() > 1
923-
} else if let Some(sort_preserving_merge) =
924-
plan.as_any().downcast_ref::<SortPreservingMergeExec>()
925-
{
926-
sort_preserving_merge
927-
.input()
928-
.output_partitioning()
929-
.partition_count()
930-
> 1
931-
} else {
932-
false
933-
}
937+
plan.properties().evaluation_type == EvaluationType::Lazy
934938
}
935939

936940
/// Returns a copy of this plan if we change any child according to the pointer comparison.

datafusion/physical-plan/src/memory.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::sync::Arc;
2323
use std::task::{Context, Poll};
2424

2525
use crate::coop::cooperative;
26-
use crate::execution_plan::{Boundedness, EmissionType};
26+
use crate::execution_plan::{Boundedness, EmissionType, SchedulingType};
2727
use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
2828
use crate::{
2929
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
@@ -163,7 +163,9 @@ impl LazyMemoryExec {
163163
Partitioning::RoundRobinBatch(generators.len()),
164164
EmissionType::Incremental,
165165
Boundedness::Bounded,
166-
);
166+
)
167+
.with_scheduling_type(SchedulingType::Cooperative);
168+
167169
Ok(Self {
168170
schema,
169171
batch_generators: generators,
@@ -273,10 +275,6 @@ impl ExecutionPlan for LazyMemoryExec {
273275
Ok(Box::pin(cooperative(stream)))
274276
}
275277

276-
fn with_cooperative_yields(self: Arc<Self>) -> Option<Arc<dyn ExecutionPlan>> {
277-
Some(self)
278-
}
279-
280278
fn metrics(&self) -> Option<MetricsSet> {
281279
Some(self.metrics.clone_inner())
282280
}

0 commit comments

Comments
 (0)