diff --git a/Cargo.toml b/Cargo.toml index 62b56c0939a7..758bdfb510bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -218,5 +218,9 @@ unnecessary_lazy_evaluations = "warn" uninlined_format_args = "warn" [workspace.lints.rust] -unexpected_cfgs = { level = "warn", check-cfg = ["cfg(tarpaulin)", "cfg(tarpaulin_include)"] } +unexpected_cfgs = { level = "warn", check-cfg = [ + 'cfg(datafusion_coop, values("tokio", "tokio_fallback", "per_stream"))', + "cfg(tarpaulin)", + "cfg(tarpaulin_include)", +] } unused_qualifications = "deny" diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index def64517ce40..f16e30969b4c 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -796,15 +796,6 @@ config_namespace! { /// then the output will be coerced to a non-view. /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. pub expand_views_at_output: bool, default = false - - /// When DataFusion detects that a plan might not be promply cancellable - /// due to the presence of tight-looping operators, it will attempt to - /// mitigate this by inserting explicit yielding (in as few places as - /// possible to avoid performance degradation). This value represents the - /// yielding period (in batches) at such explicit yielding points. The - /// default value is 64. If set to 0, no DataFusion will not perform - /// any explicit yielding. - pub yield_period: usize, default = 64 } } diff --git a/datafusion/core/tests/execution/coop.rs b/datafusion/core/tests/execution/coop.rs new file mode 100644 index 000000000000..d8aceadcec66 --- /dev/null +++ b/datafusion/core/tests/execution/coop.rs @@ -0,0 +1,755 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Int64Array, RecordBatch}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow_schema::SortOptions; +use datafusion::common::NullEquality; +use datafusion::functions_aggregate::sum; +use datafusion::physical_expr::aggregate::AggregateExprBuilder; +use datafusion::physical_plan; +use datafusion::physical_plan::aggregates::{ + AggregateExec, AggregateMode, PhysicalGroupBy, +}; +use datafusion::physical_plan::execution_plan::Boundedness; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::SessionContext; +use datafusion_common::{DataFusionError, JoinType, ScalarValue}; +use datafusion_execution::TaskContext; +use datafusion_expr_common::operator::Operator; +use datafusion_expr_common::operator::Operator::{Divide, Eq, Gt, Modulo}; +use datafusion_functions_aggregate::min_max; +use datafusion_physical_expr::expressions::{ + binary, col, lit, BinaryExpr, Column, Literal, +}; +use datafusion_physical_expr::Partitioning; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; +use datafusion_physical_optimizer::ensure_coop::EnsureCooperative; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec; +use datafusion_physical_plan::filter::FilterExec; +use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode, SortMergeJoinExec}; +use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec}; +use datafusion_physical_plan::projection::ProjectionExec; +use datafusion_physical_plan::repartition::RepartitionExec; +use datafusion_physical_plan::sorts::sort::SortExec; +use datafusion_physical_plan::union::InterleaveExec; +use futures::StreamExt; +use parking_lot::RwLock; +use rstest::rstest; +use std::error::Error; +use std::fmt::Formatter; +use std::ops::Range; +use std::sync::Arc; +use std::task::Poll; +use std::time::Duration; +use tokio::runtime::{Handle, Runtime}; +use tokio::select; + +#[derive(Debug)] +struct RangeBatchGenerator { + schema: SchemaRef, + value_range: Range, + boundedness: Boundedness, + batch_size: usize, + poll_count: usize, +} + +impl std::fmt::Display for RangeBatchGenerator { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + // Display current counter + write!(f, "InfiniteGenerator(counter={})", self.poll_count) + } +} + +impl LazyBatchGenerator for RangeBatchGenerator { + fn boundedness(&self) -> Boundedness { + self.boundedness + } + + /// Generate the next RecordBatch. + fn generate_next_batch(&mut self) -> datafusion_common::Result> { + self.poll_count += 1; + + let mut builder = Int64Array::builder(self.batch_size); + for _ in 0..self.batch_size { + match self.value_range.next() { + None => break, + Some(v) => builder.append_value(v), + } + } + let array = builder.finish(); + + if array.is_empty() { + return Ok(None); + } + + let batch = + RecordBatch::try_new(Arc::clone(&self.schema), vec![Arc::new(array)])?; + Ok(Some(batch)) + } +} + +fn make_lazy_exec(column_name: &str, pretend_infinite: bool) -> LazyMemoryExec { + make_lazy_exec_with_range(column_name, i64::MIN..i64::MAX, pretend_infinite) +} + +fn make_lazy_exec_with_range( + column_name: &str, + range: Range, + pretend_infinite: bool, +) -> LazyMemoryExec { + let schema = Arc::new(Schema::new(vec![Field::new( + column_name, + DataType::Int64, + false, + )])); + + let boundedness = if pretend_infinite { + Boundedness::Unbounded { + requires_infinite_memory: false, + } + } else { + Boundedness::Bounded + }; + + // Instantiate the generator with the batch and limit + let gen = RangeBatchGenerator { + schema: Arc::clone(&schema), + boundedness, + value_range: range, + batch_size: 8192, + poll_count: 0, + }; + + // Wrap the generator in a trait object behind Arc> + let generator: Arc> = Arc::new(RwLock::new(gen)); + + // Create a LazyMemoryExec with one partition using our generator + let mut exec = LazyMemoryExec::try_new(schema, vec![generator]).unwrap(); + + exec.add_ordering(vec![PhysicalSortExpr::new( + Arc::new(Column::new(column_name, 0)), + SortOptions::new(false, true), + )]); + + exec +} + +#[rstest] +#[tokio::test] +async fn agg_no_grouping_yields( + #[values(false, true)] pretend_infinite: bool, +) -> Result<(), Box> { + // build session + let session_ctx = SessionContext::new(); + + // set up an aggregation without grouping + let inf = Arc::new(make_lazy_exec("value", pretend_infinite)); + let aggr = Arc::new(AggregateExec::try_new( + AggregateMode::Single, + PhysicalGroupBy::new(vec![], vec![], vec![]), + vec![Arc::new( + AggregateExprBuilder::new( + sum::sum_udaf(), + vec![col("value", &inf.schema())?], + ) + .schema(inf.schema()) + .alias("sum") + .build()?, + )], + vec![None], + inf.clone(), + inf.schema(), + )?); + + query_yields(aggr, session_ctx.task_ctx()).await +} + +#[rstest] +#[tokio::test] +async fn agg_grouping_yields( + #[values(false, true)] pretend_infinite: bool, +) -> Result<(), Box> { + // build session + let session_ctx = SessionContext::new(); + + // set up an aggregation with grouping + let inf = Arc::new(make_lazy_exec("value", pretend_infinite)); + + let value_col = col("value", &inf.schema())?; + let group = binary(value_col.clone(), Divide, lit(1000000i64), &inf.schema())?; + + let aggr = Arc::new(AggregateExec::try_new( + AggregateMode::Single, + PhysicalGroupBy::new(vec![(group, "group".to_string())], vec![], vec![]), + vec![Arc::new( + AggregateExprBuilder::new(sum::sum_udaf(), vec![value_col.clone()]) + .schema(inf.schema()) + .alias("sum") + .build()?, + )], + vec![None], + inf.clone(), + inf.schema(), + )?); + + query_yields(aggr, session_ctx.task_ctx()).await +} + +#[rstest] +#[tokio::test] +async fn agg_grouped_topk_yields( + #[values(false, true)] pretend_infinite: bool, +) -> Result<(), Box> { + // build session + let session_ctx = SessionContext::new(); + + // set up a top-k aggregation + let inf = Arc::new(make_lazy_exec("value", pretend_infinite)); + + let value_col = col("value", &inf.schema())?; + let group = binary(value_col.clone(), Divide, lit(1000000i64), &inf.schema())?; + + let aggr = Arc::new( + AggregateExec::try_new( + AggregateMode::Single, + PhysicalGroupBy::new( + vec![(group, "group".to_string())], + vec![], + vec![vec![false]], + ), + vec![Arc::new( + AggregateExprBuilder::new(min_max::max_udaf(), vec![value_col.clone()]) + .schema(inf.schema()) + .alias("max") + .build()?, + )], + vec![None], + inf.clone(), + inf.schema(), + )? + .with_limit(Some(100)), + ); + + query_yields(aggr, session_ctx.task_ctx()).await +} + +#[rstest] +#[tokio::test] +async fn sort_yields( + #[values(false, true)] pretend_infinite: bool, +) -> Result<(), Box> { + // build session + let session_ctx = SessionContext::new(); + + // set up the infinite source + let inf = Arc::new(make_lazy_exec("value", pretend_infinite)); + + // set up a SortExec that will not be able to finish in time because input is very large + let sort_expr = PhysicalSortExpr::new( + col("value", &inf.schema())?, + SortOptions { + descending: true, + nulls_first: true, + }, + ); + + let lex_ordering = LexOrdering::new(vec![sort_expr]).unwrap(); + let sort_exec = Arc::new(SortExec::new(lex_ordering, inf.clone())); + + query_yields(sort_exec, session_ctx.task_ctx()).await +} + +#[rstest] +#[tokio::test] +async fn sort_merge_join_yields( + #[values(false, true)] pretend_infinite: bool, +) -> Result<(), Box> { + // build session + let session_ctx = SessionContext::new(); + + // set up the join sources + let inf1 = Arc::new(make_lazy_exec_with_range( + "value1", + i64::MIN..0, + pretend_infinite, + )); + let inf2 = Arc::new(make_lazy_exec_with_range( + "value2", + 0..i64::MAX, + pretend_infinite, + )); + + // set up a SortMergeJoinExec that will take a long time skipping left side content to find + // the first right side match + let join = Arc::new(SortMergeJoinExec::try_new( + inf1.clone(), + inf2.clone(), + vec![( + col("value1", &inf1.schema())?, + col("value2", &inf2.schema())?, + )], + None, + JoinType::Inner, + vec![inf1.properties().eq_properties.output_ordering().unwrap()[0].options], + NullEquality::NullEqualsNull, + )?); + + query_yields(join, session_ctx.task_ctx()).await +} + +#[rstest] +#[tokio::test] +async fn filter_yields( + #[values(false, true)] pretend_infinite: bool, +) -> Result<(), Box> { + // build session + let session_ctx = SessionContext::new(); + + // set up the infinite source + let inf = Arc::new(make_lazy_exec("value", pretend_infinite)); + + // set up a FilterExec that will filter out entire batches + let filter_expr = binary( + col("value", &inf.schema())?, + Operator::Lt, + lit(i64::MIN), + &inf.schema(), + )?; + let filter = Arc::new(FilterExec::try_new(filter_expr, inf.clone())?); + + query_yields(filter, session_ctx.task_ctx()).await +} + +#[rstest] +#[tokio::test] +async fn filter_reject_all_batches_yields( + #[values(false, true)] pretend_infinite: bool, +) -> Result<(), Box> { + // Create a Session, Schema, and an 8K-row RecordBatch + let session_ctx = SessionContext::new(); + + // Wrap this batch in an InfiniteExec + let infinite = make_lazy_exec_with_range("value", i64::MIN..0, pretend_infinite); + + // 2b) Construct a FilterExec that is always false: “value > 10000” (no rows pass) + let false_predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("value", 0)), + Gt, + Arc::new(Literal::new(ScalarValue::Int64(Some(0)))), + )); + let filtered = Arc::new(FilterExec::try_new(false_predicate, Arc::new(infinite))?); + + // Use CoalesceBatchesExec to guarantee each Filter pull always yields an 8192-row batch + let coalesced = Arc::new(CoalesceBatchesExec::new(filtered, 8_192)); + + query_yields(coalesced, session_ctx.task_ctx()).await +} + +#[rstest] +#[tokio::test] +async fn interleave_then_filter_all_yields( + #[values(false, true)] pretend_infinite: bool, +) -> Result<(), Box> { + // Build a session and a schema with one i64 column. + let session_ctx = SessionContext::new(); + + // Create multiple infinite sources, each filtered by a different threshold. + // This ensures InterleaveExec has many children. + let mut infinite_children = vec![]; + + // Use 32 distinct thresholds (each >0 and <8 192) to force 32 infinite inputs + for thr in 1..32 { + // One infinite exec: + let mut inf = make_lazy_exec_with_range("value", 0..i64::MAX, pretend_infinite); + + // Now repartition so that all children share identical Hash partitioning + // on “value” into 1 bucket. This is required for InterleaveExec::try_new. + let exprs = vec![Arc::new(Column::new("value", 0)) as _]; + let partitioning = Partitioning::Hash(exprs, 1); + inf.try_set_partitioning(partitioning)?; + + // Apply a FilterExec: “(value / 8192) % thr == 0”. + let filter_expr = binary( + binary( + binary( + col("value", &inf.schema())?, + Divide, + lit(8192i64), + &inf.schema(), + )?, + Modulo, + lit(thr as i64), + &inf.schema(), + )?, + Eq, + lit(0i64), + &inf.schema(), + )?; + let filtered = Arc::new(FilterExec::try_new(filter_expr, Arc::new(inf))?); + + infinite_children.push(filtered as _); + } + + // Build an InterleaveExec over all infinite children. + let interleave = Arc::new(InterleaveExec::try_new(infinite_children)?); + + // Wrap the InterleaveExec in a FilterExec that always returns false, + // ensuring that no rows are ever emitted. + let always_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false)))); + let filtered_interleave = Arc::new(FilterExec::try_new(always_false, interleave)?); + + query_yields(filtered_interleave, session_ctx.task_ctx()).await +} + +#[rstest] +#[tokio::test] +async fn interleave_then_aggregate_yields( + #[values(false, true)] pretend_infinite: bool, +) -> Result<(), Box> { + // Build session, schema, and a sample batch. + let session_ctx = SessionContext::new(); + + // Create N infinite sources, each filtered by a different predicate. + // That way, the InterleaveExec will have multiple children. + let mut infinite_children = vec![]; + + // Use 32 distinct thresholds (each >0 and <8 192) to force 32 infinite inputs + for thr in 1..32 { + // One infinite exec: + let mut inf = make_lazy_exec_with_range("value", 0..i64::MAX, pretend_infinite); + + // Now repartition so that all children share identical Hash partitioning + // on “value” into 1 bucket. This is required for InterleaveExec::try_new. + let exprs = vec![Arc::new(Column::new("value", 0)) as _]; + let partitioning = Partitioning::Hash(exprs, 1); + inf.try_set_partitioning(partitioning)?; + + // Apply a FilterExec: “(value / 8192) % thr == 0”. + let filter_expr = binary( + binary( + binary( + col("value", &inf.schema())?, + Divide, + lit(8192i64), + &inf.schema(), + )?, + Modulo, + lit(thr as i64), + &inf.schema(), + )?, + Eq, + lit(0i64), + &inf.schema(), + )?; + let filtered = Arc::new(FilterExec::try_new(filter_expr, Arc::new(inf))?); + + infinite_children.push(filtered as _); + } + + // Build an InterleaveExec over all N children. + // Since each child now has Partitioning::Hash([col "value"], 1), InterleaveExec::try_new succeeds. + let interleave = Arc::new(InterleaveExec::try_new(infinite_children)?); + let interleave_schema = interleave.schema(); + + // Build a global AggregateExec that sums “value” over all rows. + // Because we use `AggregateMode::Single` with no GROUP BY columns, this plan will + // only produce one “final” row once all inputs finish. But our inputs never finish, + // so we should never get any output. + let aggregate_expr = AggregateExprBuilder::new( + sum::sum_udaf(), + vec![Arc::new(Column::new("value", 0))], + ) + .schema(interleave_schema.clone()) + .alias("total") + .build()?; + + let aggr = Arc::new(AggregateExec::try_new( + AggregateMode::Single, + PhysicalGroupBy::new( + vec![], // no GROUP BY columns + vec![], // no GROUP BY expressions + vec![], // no GROUP BY physical expressions + ), + vec![Arc::new(aggregate_expr)], + vec![None], // no “distinct” flags + interleave, + interleave_schema, + )?); + + query_yields(aggr, session_ctx.task_ctx()).await +} + +#[rstest] +#[tokio::test] +async fn join_yields( + #[values(false, true)] pretend_infinite: bool, +) -> Result<(), Box> { + // Session, schema, and a single 8 K‐row batch for each side + let session_ctx = SessionContext::new(); + + // on the right side, we’ll shift each value by +1 so that not everything joins, + // but plenty of matching keys exist (e.g. 0 on left matches 1 on right, etc.) + let infinite_left = make_lazy_exec_with_range("value", -10..10, false); + let infinite_right = + make_lazy_exec_with_range("value", 0..i64::MAX, pretend_infinite); + + // Create Join keys → join on “value” = “value” + let left_keys: Vec> = vec![Arc::new(Column::new("value", 0))]; + let right_keys: Vec> = vec![Arc::new(Column::new("value", 0))]; + + // Wrap each side in CoalesceBatches + Repartition so they are both hashed into 1 partition + let coalesced_left = + Arc::new(CoalesceBatchesExec::new(Arc::new(infinite_left), 8_192)); + let coalesced_right = + Arc::new(CoalesceBatchesExec::new(Arc::new(infinite_right), 8_192)); + + let part_left = Partitioning::Hash(left_keys, 1); + let part_right = Partitioning::Hash(right_keys, 1); + + let hashed_left = Arc::new(RepartitionExec::try_new(coalesced_left, part_left)?); + let hashed_right = Arc::new(RepartitionExec::try_new(coalesced_right, part_right)?); + + // Build an Inner HashJoinExec → left.value = right.value + let join = Arc::new(HashJoinExec::try_new( + hashed_left, + hashed_right, + vec![( + Arc::new(Column::new("value", 0)), + Arc::new(Column::new("value", 0)), + )], + None, + &JoinType::Inner, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNull, + )?); + + query_yields(join, session_ctx.task_ctx()).await +} + +#[rstest] +#[tokio::test] +async fn join_agg_yields( + #[values(false, true)] pretend_infinite: bool, +) -> Result<(), Box> { + // Session, schema, and a single 8 K‐row batch for each side + let session_ctx = SessionContext::new(); + + // on the right side, we’ll shift each value by +1 so that not everything joins, + // but plenty of matching keys exist (e.g. 0 on left matches 1 on right, etc.) + let infinite_left = make_lazy_exec_with_range("value", -10..10, false); + let infinite_right = + make_lazy_exec_with_range("value", 0..i64::MAX, pretend_infinite); + + // 2b) Create Join keys → join on “value” = “value” + let left_keys: Vec> = vec![Arc::new(Column::new("value", 0))]; + let right_keys: Vec> = vec![Arc::new(Column::new("value", 0))]; + + // Wrap each side in CoalesceBatches + Repartition so they are both hashed into 1 partition + let coalesced_left = + Arc::new(CoalesceBatchesExec::new(Arc::new(infinite_left), 8_192)); + let coalesced_right = + Arc::new(CoalesceBatchesExec::new(Arc::new(infinite_right), 8_192)); + + let part_left = Partitioning::Hash(left_keys, 1); + let part_right = Partitioning::Hash(right_keys, 1); + + let hashed_left = Arc::new(RepartitionExec::try_new(coalesced_left, part_left)?); + let hashed_right = Arc::new(RepartitionExec::try_new(coalesced_right, part_right)?); + + // Build an Inner HashJoinExec → left.value = right.value + let join = Arc::new(HashJoinExec::try_new( + hashed_left, + hashed_right, + vec![( + Arc::new(Column::new("value", 0)), + Arc::new(Column::new("value", 0)), + )], + None, + &JoinType::Inner, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNull, + )?); + + // Project only one column (“value” from the left side) because we just want to sum that + let input_schema = join.schema(); + + let proj_expr = vec![( + Arc::new(Column::new_with_schema("value", &input_schema)?) as _, + "value".to_string(), + )]; + + let projection = Arc::new(ProjectionExec::try_new(proj_expr, join)?); + let projection_schema = projection.schema(); + + let output_fields = vec![Field::new("total", DataType::Int64, true)]; + let output_schema = Arc::new(Schema::new(output_fields)); + + // 4) Global aggregate (Single) over “value” + let aggregate_expr = AggregateExprBuilder::new( + sum::sum_udaf(), + vec![Arc::new(Column::new_with_schema( + "value", + &projection.schema(), + )?)], + ) + .schema(output_schema) + .alias("total") + .build()?; + + let aggr = Arc::new(AggregateExec::try_new( + AggregateMode::Single, + PhysicalGroupBy::new(vec![], vec![], vec![]), + vec![Arc::new(aggregate_expr)], + vec![None], + projection, + projection_schema, + )?); + + query_yields(aggr, session_ctx.task_ctx()).await +} + +#[rstest] +#[tokio::test] +async fn hash_join_yields( + #[values(false, true)] pretend_infinite: bool, +) -> Result<(), Box> { + // build session + let session_ctx = SessionContext::new(); + + // set up the join sources + let inf1 = Arc::new(make_lazy_exec("value1", pretend_infinite)); + let inf2 = Arc::new(make_lazy_exec("value2", pretend_infinite)); + + // set up a HashJoinExec that will take a long time in the build phase + let join = Arc::new(HashJoinExec::try_new( + inf1.clone(), + inf2.clone(), + vec![( + col("value1", &inf1.schema())?, + col("value2", &inf2.schema())?, + )], + None, + &JoinType::Left, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNull, + )?); + + query_yields(join, session_ctx.task_ctx()).await +} + +#[rstest] +#[tokio::test] +async fn hash_join_without_repartition_and_no_agg( + #[values(false, true)] pretend_infinite: bool, +) -> Result<(), Box> { + // Create Session, schema, and an 8K-row RecordBatch for each side + let session_ctx = SessionContext::new(); + + // on the right side, we’ll shift each value by +1 so that not everything joins, + // but plenty of matching keys exist (e.g. 0 on left matches 1 on right, etc.) + let infinite_left = make_lazy_exec_with_range("value", -10..10, false); + let infinite_right = + make_lazy_exec_with_range("value", 0..i64::MAX, pretend_infinite); + + // Directly feed `infinite_left` and `infinite_right` into HashJoinExec. + // Do not use aggregation or repartition. + let join = Arc::new(HashJoinExec::try_new( + Arc::new(infinite_left), + Arc::new(infinite_right), + vec![( + Arc::new(Column::new("value", 0)), + Arc::new(Column::new("value", 0)), + )], + /* filter */ None, + &JoinType::Inner, + /* output64 */ None, + // Using CollectLeft is fine—just avoid RepartitionExec’s partitioned channels. + PartitionMode::CollectLeft, + NullEquality::NullEqualsNull, + )?); + + query_yields(join, session_ctx.task_ctx()).await +} + +#[derive(Debug)] +enum Yielded { + ReadyOrPending, + Err(#[allow(dead_code)] DataFusionError), + Timeout, +} + +async fn query_yields( + plan: Arc, + task_ctx: Arc, +) -> Result<(), Box> { + // Run plan through EnsureCooperative + let optimized = + EnsureCooperative::new().optimize(plan, task_ctx.session_config().options())?; + + // Get the stream + let mut stream = physical_plan::execute_stream(optimized, task_ctx)?; + + // Create an independent executor pool + let child_runtime = Runtime::new()?; + + // Spawn a task that tries to poll the stream + // The task returns Ready when the stream yielded with either Ready or Pending + let join_handle = child_runtime.spawn(std::future::poll_fn(move |cx| { + match stream.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(_))) => Poll::Ready(Poll::Ready(Ok(()))), + Poll::Ready(Some(Err(e))) => Poll::Ready(Poll::Ready(Err(e))), + Poll::Ready(None) => Poll::Ready(Poll::Ready(Ok(()))), + Poll::Pending => Poll::Ready(Poll::Pending), + } + })); + + let abort_handle = join_handle.abort_handle(); + + // Now select on the join handle of the task running in the child executor with a timeout + let yielded = select! { + result = join_handle => { + match result { + Ok(Pending) => Yielded::ReadyOrPending, + Ok(Ready(Ok(_))) => Yielded::ReadyOrPending, + Ok(Ready(Err(e))) => Yielded::Err(e), + Err(_) => Yielded::Err(DataFusionError::Execution("join error".into())), + } + }, + _ = tokio::time::sleep(Duration::from_secs(10)) => { + Yielded::Timeout + } + }; + + // Try to abort the poll task and shutdown the child runtime + abort_handle.abort(); + Handle::current().spawn_blocking(move || { + child_runtime.shutdown_timeout(Duration::from_secs(5)); + }); + + // Finally, check if poll_next yielded + assert!( + matches!(yielded, Yielded::ReadyOrPending), + "Result is not Ready or Pending: {yielded:?}" + ); + Ok(()) +} diff --git a/datafusion/core/tests/execution/infinite_cancel.rs b/datafusion/core/tests/execution/infinite_cancel.rs deleted file mode 100644 index ea35dd367b99..000000000000 --- a/datafusion/core/tests/execution/infinite_cancel.rs +++ /dev/null @@ -1,820 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::error::Error; -use std::fmt::Formatter; -use std::sync::Arc; - -use arrow::array::{Array, Int64Array, RecordBatch}; -use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef}; -use arrow_schema::SortOptions; -use datafusion::functions_aggregate::sum; -use datafusion::physical_expr::aggregate::AggregateExprBuilder; -use datafusion::physical_expr::Partitioning; -use datafusion::physical_plan; -use datafusion::physical_plan::aggregates::{ - AggregateExec, AggregateMode, PhysicalGroupBy, -}; -use datafusion::physical_plan::execution_plan::Boundedness; -use datafusion::physical_plan::ExecutionPlan; -use datafusion::prelude::SessionContext; -use datafusion_common::config::ConfigOptions; -use datafusion_common::{JoinType, NullEquality, ScalarValue}; -use datafusion_expr_common::operator::Operator::Gt; -use datafusion_physical_expr::expressions::{col, BinaryExpr, Column, Literal}; -use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; -use datafusion_physical_optimizer::insert_yield_exec::InsertYieldExec; -use datafusion_physical_optimizer::PhysicalOptimizerRule; -use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec; -use datafusion_physical_plan::filter::FilterExec; -use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode, SortMergeJoinExec}; -use datafusion_physical_plan::projection::ProjectionExec; -use datafusion_physical_plan::repartition::RepartitionExec; -use datafusion_physical_plan::sorts::sort::SortExec; -use datafusion_physical_plan::union::InterleaveExec; - -use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec}; -use futures::StreamExt; -use parking_lot::RwLock; -use rstest::rstest; -use tokio::select; - -#[derive(Debug)] -/// A batch generator that can produce either bounded or boundless infinite stream of the same RecordBatch. -struct InfiniteGenerator { - /// The RecordBatch to return on each call. - batch: RecordBatch, - /// How many batches have already been generated. - counter: usize, -} - -impl std::fmt::Display for InfiniteGenerator { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - // Display current counter - write!(f, "InfiniteGenerator(counter={})", self.counter) - } -} - -impl LazyBatchGenerator for InfiniteGenerator { - /// Generate the next RecordBatch. - fn generate_next_batch(&mut self) -> datafusion_common::Result> { - // Increment the counter and return a clone of the batch - self.counter += 1; - Ok(Some(self.batch.clone())) - } -} - -/// Build a LazyMemoryExec that yields either a finite or infinite stream depending on `pretend_finite`. -fn make_lazy_exec( - batch: RecordBatch, - schema: SchemaRef, - pretend_finite: bool, -) -> Arc { - let boundedness = if pretend_finite { - Boundedness::Bounded - } else { - Boundedness::Unbounded { - requires_infinite_memory: false, - } - }; - - // Instantiate the generator with the batch and limit - let gen = InfiniteGenerator { batch, counter: 0 }; - - // Wrap the generator in a trait object behind Arc> - let generator: Arc> = Arc::new(RwLock::new(gen)); - - // Create a LazyMemoryExec with one partition using our generator - let mut exec = LazyMemoryExec::try_new(schema, vec![generator]).unwrap(); - exec.set_boundedness(boundedness); - - // Erase concrete type into a generic ExecutionPlan handle - Arc::new(exec) as Arc -} - -#[rstest] -#[tokio::test] -async fn test_infinite_agg_cancel( - #[values(false, true)] pretend_finite: bool, -) -> Result<(), Box> { - // 1) build session & schema & sample batch - let session_ctx = SessionContext::new(); - let schema = Arc::new(Schema::new(Fields::from(vec![Field::new( - "value", - DataType::Int64, - false, - )]))); - let mut builder = Int64Array::builder(8192); - for v in 0..8192 { - builder.append_value(v); - } - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(builder.finish())])?; - - // 2) set up the infinite source + aggregation - let inf = make_lazy_exec(batch.clone(), schema.clone(), pretend_finite); - let aggr = Arc::new(AggregateExec::try_new( - AggregateMode::Single, - PhysicalGroupBy::new(vec![], vec![], vec![]), - vec![Arc::new( - AggregateExprBuilder::new( - sum::sum_udaf(), - vec![Arc::new(Column::new_with_schema("value", &schema)?)], - ) - .schema(inf.schema()) - .alias("sum") - .build()?, - )], - vec![None], - inf, - schema, - )?); - - // 3) optimize the plan with InsertYieldExec to auto-insert Yield - let config = ConfigOptions::new(); - let optimized = InsertYieldExec::new().optimize(aggr, &config)?; - - // 4) get the stream - let mut stream = physical_plan::execute_stream(optimized, session_ctx.task_ctx())?; - const TIMEOUT: u64 = 1; - - // 5) drive the stream inline in select! - let result = select! { - batch_opt = stream.next() => batch_opt, - _ = tokio::time::sleep(tokio::time::Duration::from_secs(TIMEOUT)) => { - None - } - }; - - assert!(result.is_none(), "Expected timeout, but got a result"); - Ok(()) -} - -#[rstest] -#[tokio::test] -async fn test_infinite_sort_cancel( - #[values(false, true)] pretend_finite: bool, -) -> Result<(), Box> { - // 1) build session & schema & sample batch - let session_ctx = SessionContext::new(); - let schema = Arc::new(Schema::new(vec![Field::new( - "value", - DataType::Int64, - false, - )])); - let mut builder = Int64Array::builder(8192); - for v in 0..8192 { - builder.append_value(v); - } - let array = builder.finish(); - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)])?; - - // 2) set up the infinite source - let inf = make_lazy_exec(batch.clone(), schema.clone(), pretend_finite); - - // 3) set up a SortExec that will never finish because input is infinite - let sort_options = SortOptions { - descending: false, - nulls_first: true, - }; - let sort_expr = PhysicalSortExpr::new( - Arc::new(Column::new_with_schema("value", &schema)?), - sort_options, - ); - let sort_exec = Arc::new(SortExec::new([sort_expr].into(), inf)); - - // 4) optimize the plan with InsertYieldExec to auto-insert Yield - let config = ConfigOptions::new(); - let optimized = InsertYieldExec::new().optimize(sort_exec, &config)?; - - // 5) get the stream - let mut stream = physical_plan::execute_stream(optimized, session_ctx.task_ctx())?; - const TIMEOUT: u64 = 1; - - // 6) drive the stream inline in select! - let result = select! { - batch_opt = stream.next() => batch_opt, - _ = tokio::time::sleep(tokio::time::Duration::from_secs(TIMEOUT)) => { - None - } - }; - - assert!( - result.is_none(), - "Expected timeout for sort, but got a result" - ); - Ok(()) -} - -#[rstest] -#[tokio::test] -async fn test_infinite_interleave_cancel( - #[values(false, true)] pretend_finite: bool, -) -> Result<(), Box> { - // 1) Build a session and a schema with one i64 column. - let session_ctx = SessionContext::new(); - let schema = Arc::new(Schema::new(Fields::from(vec![Field::new( - "value", - DataType::Int64, - false, - )]))); - let mut builder = Int64Array::builder(8192); - for v in 0..8192 { - builder.append_value(v); - } - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(builder.finish())])?; - - // 2) Create multiple infinite sources, each filtered by a different threshold. - // This ensures InterleaveExec has many children. - let mut infinite_children = vec![]; - // Use 32 distinct thresholds (each > 0 and < 8192) for 32 infinite inputs. - let thresholds = (0..32).map(|i| 8191 - (i * 256) as i64); - - for thr in thresholds { - // 2a) Set up the infinite source - let inf = make_lazy_exec(batch.clone(), schema.clone(), pretend_finite); - - // 2b) Apply a FilterExec with predicate "value > thr". - let filter_expr = Arc::new(BinaryExpr::new( - Arc::new(Column::new_with_schema("value", &schema)?), - Gt, - Arc::new(Literal::new(ScalarValue::Int64(Some(thr)))), - )); - let filtered = Arc::new(FilterExec::try_new(filter_expr, inf)?); - - // 2c) Wrap the filtered stream in CoalesceBatchesExec so it emits - // one 8192-row batch at a time. - let coalesced = Arc::new(CoalesceBatchesExec::new(filtered, 8192)); - - // 2d) Repartition each coalesced stream by hashing on "value" into 1 partition. - // Required for InterleaveExec::try_new to succeed. - let exprs = vec![Arc::new(Column::new_with_schema("value", &schema)?) as _]; - let partitioning = Partitioning::Hash(exprs, 1); - let hashed = Arc::new(RepartitionExec::try_new(coalesced, partitioning)?); - - infinite_children.push(hashed as Arc); - } - - // 3) Build an InterleaveExec over all infinite children. - let interleave = Arc::new(InterleaveExec::try_new(infinite_children)?); - - // 4) Wrap the InterleaveExec in a FilterExec that always returns false, - // ensuring that no rows are ever emitted. - let always_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false)))); - let filtered_interleave = Arc::new(FilterExec::try_new(always_false, interleave)?); - - // 5) Coalesce the filtered interleave into 8192-row batches. - // This lets InsertYieldExec insert YieldStreamExec at each batch boundary. - let coalesced_top = Arc::new(CoalesceBatchesExec::new(filtered_interleave, 8192)); - - // 6) Apply InsertYieldExec to insert YieldStreamExec under every leaf. - // Each InfiniteExec → FilterExec → CoalesceBatchesExec chain will yield periodically. - let config = ConfigOptions::new(); - let optimized = InsertYieldExec::new().optimize(coalesced_top, &config)?; - - // 7) Execute the optimized plan with a 1-second timeout. - // Because the top-level FilterExec always discards rows and the inputs are infinite, - // no batch will be returned within 1 second, causing result to be None. - let mut stream = physical_plan::execute_stream(optimized, session_ctx.task_ctx())?; - const TIMEOUT: u64 = 1; - let result = select! { - batch_opt = stream.next() => batch_opt, - _ = tokio::time::sleep(tokio::time::Duration::from_secs(TIMEOUT)) => None, - }; - - assert!( - result.is_none(), - "Expected no output for infinite interleave aggregate, but got a batch" - ); - Ok(()) -} - -#[rstest] -#[tokio::test] -async fn test_infinite_interleave_agg_cancel( - #[values(false, true)] pretend_finite: bool, -) -> Result<(), Box> { - // 1) Build session, schema, and a sample batch. - let session_ctx = SessionContext::new(); - let schema = Arc::new(Schema::new(Fields::from(vec![Field::new( - "value", - DataType::Int64, - false, - )]))); - let mut builder = Int64Array::builder(8192); - for v in 0..8192 { - builder.append_value(v); - } - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(builder.finish())])?; - - // 2) Create N infinite sources, each filtered by a different predicate. - // That way, the InterleaveExec will have multiple children. - let mut infinite_children = vec![]; - // Use 32 distinct thresholds (each >0 and <8 192) to force 32 infinite inputs - let thresholds = (0..32).map(|i| 8_192 - 1 - (i * 256) as i64); - - for thr in thresholds { - // 2a) One infinite exec: - let inf = make_lazy_exec(batch.clone(), schema.clone(), pretend_finite); - - // 2b) Apply a FilterExec: “value > thr”. - let filter_expr = Arc::new(BinaryExpr::new( - Arc::new(Column::new_with_schema("value", &schema)?), - Gt, - Arc::new(Literal::new(ScalarValue::Int64(Some(thr)))), - )); - let filtered = Arc::new(FilterExec::try_new(filter_expr, inf)?); - - // 2c) Wrap in CoalesceBatchesExec so the upstream yields are batched. - let coalesced = Arc::new(CoalesceBatchesExec::new(filtered, 8192)); - - // 2d) Now repartition so that all children share identical Hash partitioning - // on “value” into 1 bucket. This is required for InterleaveExec::try_new. - let exprs = vec![Arc::new(Column::new_with_schema("value", &schema)?) as _]; - let partitioning = Partitioning::Hash(exprs, 1); - let hashed = Arc::new(RepartitionExec::try_new(coalesced, partitioning)?); - - infinite_children.push(hashed as _); - } - - // 3) Build an InterleaveExec over all N children. - // Since each child now has Partitioning::Hash([col "value"], 1), InterleaveExec::try_new succeeds. - let interleave = Arc::new(InterleaveExec::try_new(infinite_children)?); - let interleave_schema = interleave.schema(); - - // 4) Build a global AggregateExec that sums “value” over all rows. - // Because we use `AggregateMode::Single` with no GROUP BY columns, this plan will - // only produce one “final” row once all inputs finish. But our inputs never finish, - // so we should never get any output. - let aggregate_expr = AggregateExprBuilder::new( - sum::sum_udaf(), - vec![Arc::new(Column::new_with_schema("value", &schema)?)], - ) - .schema(interleave_schema.clone()) - .alias("total") - .build()?; - - let aggr = Arc::new(AggregateExec::try_new( - AggregateMode::Single, - PhysicalGroupBy::new( - vec![], // no GROUP BY columns - vec![], // no GROUP BY expressions - vec![], // no GROUP BY physical expressions - ), - vec![Arc::new(aggregate_expr)], - vec![None], // no “distinct” flags - interleave, - interleave_schema, - )?); - - // 5) InsertYieldExec will automatically insert YieldStreams beneath each “infinite” leaf. - // That way, each InfiniteExec (through the FilterExec/CoalesceBatchesExec/RepartitionExec chain) - // yields to the runtime periodically instead of spinning CPU. - let config = ConfigOptions::new(); - let optimized = InsertYieldExec::new().optimize(aggr, &config)?; - - // 6) Execute the stream. Because AggregateExec(mode=Single) only emits a final batch - // after all inputs finish—and those inputs are infinite—we expect no output - // within 1 second (timeout → None). - let mut stream = physical_plan::execute_stream(optimized, session_ctx.task_ctx())?; - const TIMEOUT: u64 = 1; - let result = select! { - batch_opt = stream.next() => batch_opt, - _ = tokio::time::sleep(tokio::time::Duration::from_secs(TIMEOUT)) => { - None - } - }; - - assert!( - result.is_none(), - "Expected no output for aggregate over infinite interleave, but got some batch" - ); - - Ok(()) -} - -#[rstest] -#[tokio::test] -async fn test_infinite_join_cancel( - #[values(false, true)] pretend_finite: bool, -) -> Result<(), Box> { - // 1) Session, schema, and a single 8 K‐row batch for each side - let session_ctx = SessionContext::new(); - let schema: SchemaRef = Arc::new(Schema::new(vec![Field::new( - "value", - DataType::Int64, - false, - )])); - - let mut builder_left = Int64Array::builder(8_192); - let mut builder_right = Int64Array::builder(8_192); - for v in 0..8_192 { - builder_left.append_value(v); - // on the right side, we’ll shift each value by +1 so that not everything joins, - // but plenty of matching keys exist (e.g. 0 on left matches 1 on right, etc.) - builder_right.append_value(v + 1); - } - let batch_left = - RecordBatch::try_new(schema.clone(), vec![Arc::new(builder_left.finish())])?; - let batch_right = - RecordBatch::try_new(schema.clone(), vec![Arc::new(builder_right.finish())])?; - - // 2a) Build two InfiniteExecs (left and right) - let infinite_left = - make_lazy_exec(batch_left.clone(), schema.clone(), pretend_finite); - let infinite_right = - make_lazy_exec(batch_right.clone(), schema.clone(), pretend_finite); - - // 2b) Create Join keys → join on “value” = “value” - let left_keys: Vec> = - vec![Arc::new(Column::new_with_schema("value", &schema)?)]; - let right_keys: Vec> = - vec![Arc::new(Column::new_with_schema("value", &schema)?)]; - - // 2c) Wrap each side in CoalesceBatches + Repartition so they are both hashed into 1 partition - let coalesced_left = Arc::new(CoalesceBatchesExec::new(infinite_left, 8_192)); - let coalesced_right = Arc::new(CoalesceBatchesExec::new(infinite_right, 8_192)); - - let part_left = Partitioning::Hash(left_keys, 1); - let part_right = Partitioning::Hash(right_keys, 1); - - let hashed_left = Arc::new(RepartitionExec::try_new(coalesced_left, part_left)?); - let hashed_right = Arc::new(RepartitionExec::try_new(coalesced_right, part_right)?); - - // 2d) Build an Inner HashJoinExec → left.value = right.value - let join = Arc::new(HashJoinExec::try_new( - hashed_left, - hashed_right, - vec![( - Arc::new(Column::new_with_schema("value", &schema)?), - Arc::new(Column::new_with_schema("value", &schema)?), - )], - None, - &JoinType::Inner, - None, - PartitionMode::CollectLeft, - NullEquality::NullEqualsNull, - )?); - - // 3) Wrap yields under each infinite leaf - let config = ConfigOptions::new(); - let optimized = InsertYieldExec::new().optimize(join, &config)?; - - // 4) Execute + 1 sec timeout - let mut stream = physical_plan::execute_stream(optimized, session_ctx.task_ctx())?; - const TIMEOUT: u64 = 1; - let result = select! { - batch_opt = stream.next() => batch_opt, - _ = tokio::time::sleep(tokio::time::Duration::from_secs(TIMEOUT)) => { - None - } - }; - assert!( - result.is_none(), - "Expected no output for aggregate over infinite + join, but got a batch" - ); - Ok(()) -} - -#[rstest] -#[tokio::test] -async fn test_infinite_join_agg_cancel( - #[values(false, true)] pretend_finite: bool, -) -> Result<(), Box> { - // 1) Session, schema, and a single 8 K‐row batch for each side - let session_ctx = SessionContext::new(); - let schema = Arc::new(Schema::new(vec![Field::new( - "value", - DataType::Int64, - false, - )])); - - let mut builder_left = Int64Array::builder(8_192); - let mut builder_right = Int64Array::builder(8_192); - for v in 0..8_192 { - builder_left.append_value(v); - // on the right side, we’ll shift each value by +1 so that not everything joins, - // but plenty of matching keys exist (e.g. 0 on left matches 1 on right, etc.) - builder_right.append_value(v + 1); - } - let batch_left = - RecordBatch::try_new(schema.clone(), vec![Arc::new(builder_left.finish())])?; - let batch_right = - RecordBatch::try_new(schema.clone(), vec![Arc::new(builder_right.finish())])?; - - // 2a) Build two InfiniteExecs (left and right) - let infinite_left = - make_lazy_exec(batch_left.clone(), schema.clone(), pretend_finite); - let infinite_right = - make_lazy_exec(batch_right.clone(), schema.clone(), pretend_finite); - - // 2b) Create Join keys → join on “value” = “value” - let left_keys: Vec> = - vec![Arc::new(Column::new_with_schema("value", &schema)?)]; - let right_keys: Vec> = - vec![Arc::new(Column::new_with_schema("value", &schema)?)]; - - // 2c) Wrap each side in CoalesceBatches + Repartition so they are both hashed into 1 partition - let coalesced_left = Arc::new(CoalesceBatchesExec::new(infinite_left, 8_192)); - let coalesced_right = Arc::new(CoalesceBatchesExec::new(infinite_right, 8_192)); - - let part_left = Partitioning::Hash(left_keys, 1); - let part_right = Partitioning::Hash(right_keys, 1); - - let hashed_left = Arc::new(RepartitionExec::try_new(coalesced_left, part_left)?); - let hashed_right = Arc::new(RepartitionExec::try_new(coalesced_right, part_right)?); - - // 2d) Build an Inner HashJoinExec → left.value = right.value - let join = Arc::new(HashJoinExec::try_new( - hashed_left, - hashed_right, - vec![( - Arc::new(Column::new_with_schema("value", &schema)?), - Arc::new(Column::new_with_schema("value", &schema)?), - )], - None, - &JoinType::Inner, - None, - PartitionMode::CollectLeft, - NullEquality::NullEqualsNull, - )?); - - // 3) Project only one column (“value” from the left side) because we just want to sum that - let input_schema = join.schema(); - - let proj_expr = vec![( - Arc::new(Column::new_with_schema("value", &input_schema)?) as _, - "value".to_string(), - )]; - - let projection = Arc::new(ProjectionExec::try_new(proj_expr, join)?); - let projection_schema = projection.schema(); - - let output_fields = vec![Field::new("total", DataType::Int64, true)]; - let output_schema = Arc::new(Schema::new(output_fields)); - - // 4) Global aggregate (Single) over “value” - let aggregate_expr = AggregateExprBuilder::new( - sum::sum_udaf(), - vec![Arc::new(Column::new_with_schema( - "value", - &projection.schema(), - )?)], - ) - .schema(output_schema) - .alias("total") - .build()?; - - let aggr = Arc::new(AggregateExec::try_new( - AggregateMode::Single, - PhysicalGroupBy::new(vec![], vec![], vec![]), - vec![Arc::new(aggregate_expr)], - vec![None], - projection, - projection_schema, - )?); - - // 5) Wrap yields under each infinite leaf - let config = ConfigOptions::new(); - let optimized = InsertYieldExec::new().optimize(aggr, &config)?; - - // 6) Execute + 1 sec timeout - let mut stream = physical_plan::execute_stream(optimized, session_ctx.task_ctx())?; - const TIMEOUT: u64 = 1; - let result = select! { - batch_opt = stream.next() => batch_opt, - _ = tokio::time::sleep(tokio::time::Duration::from_secs(TIMEOUT)) => { - None - } - }; - assert!( - result.is_none(), - "Expected no output for aggregate over infinite + join, but got a batch" - ); - Ok(()) -} - -#[rstest] -#[tokio::test] -async fn test_filter_reject_all_batches_cancel( - #[values(false, true)] pretend_finite: bool, -) -> Result<(), Box> { - // 1) Create a Session, Schema, and an 8K-row RecordBatch - let session_ctx = SessionContext::new(); - let schema = Arc::new(Schema::new(vec![Field::new( - "value", - DataType::Int64, - false, - )])); - - // Build a batch with values 0..8191 - let mut builder = Int64Array::builder(8_192); - for v in 0..8_192 { - builder.append_value(v); - } - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(builder.finish())])?; - - // 2a) Wrap this batch in an InfiniteExec - let infinite = make_lazy_exec(batch.clone(), schema.clone(), pretend_finite); - - // 2b) Construct a FilterExec that is always false: “value > 10000” (no rows pass) - let false_predicate = Arc::new(BinaryExpr::new( - Arc::new(Column::new_with_schema("value", &schema)?), - Gt, - Arc::new(Literal::new(ScalarValue::Int64(Some(10_000)))), - )); - let filtered = Arc::new(FilterExec::try_new(false_predicate, infinite)?); - - // 2c) Use CoalesceBatchesExec to guarantee each Filter pull always yields an 8192-row batch - let coalesced = Arc::new(CoalesceBatchesExec::new(filtered, 8_192)); - - // 3) InsertYieldExec to insert YieldExec—so that the InfiniteExec yields control between batches - let config = ConfigOptions::new(); - let optimized = InsertYieldExec::new().optimize(coalesced, &config)?; - - // 4) Execute with a 1-second timeout. Because Filter discards all 8192 rows each time - // without ever producing output, no batch will arrive within 1 second. And since - // emission type is not Final, we never see an end‐of‐stream marker. - let mut stream = physical_plan::execute_stream(optimized, session_ctx.task_ctx())?; - const TIMEOUT: u64 = 1; - let result = select! { - batch_opt = stream.next() => batch_opt, - _ = tokio::time::sleep(tokio::time::Duration::from_secs(TIMEOUT)) => { - None - } - }; - assert!( - result.is_none(), - "Expected no output for infinite + filter(all-false) + aggregate, but got a batch" - ); - Ok(()) -} - -#[rstest] -#[tokio::test] -async fn test_infinite_hash_join_without_repartition_and_no_agg( - #[values(false, true)] pretend_finite: bool, -) -> Result<(), Box> { - // 1) Create Session, schema, and an 8K-row RecordBatch for each side - let session_ctx = SessionContext::new(); - let schema: SchemaRef = Arc::new(Schema::new(vec![Field::new( - "value", - DataType::Int64, - false, - )])); - - let mut builder_left = Int64Array::builder(8_192); - let mut builder_right = Int64Array::builder(8_192); - for v in 0..8_192 { - builder_left.append_value(v); - builder_right.append_value(v + 1); - } - let batch_left = - RecordBatch::try_new(schema.clone(), vec![Arc::new(builder_left.finish())])?; - let batch_right = - RecordBatch::try_new(schema.clone(), vec![Arc::new(builder_right.finish())])?; - - // 2a) Unlike the test with aggregation, keep this as a pure join— - // use InfiniteExec to simulate an infinite stream - let infinite_left = - make_lazy_exec(batch_left.clone(), schema.clone(), pretend_finite); - let infinite_right = - make_lazy_exec(batch_right.clone(), schema.clone(), pretend_finite); - - // 2b) To feed a single batch into the Join, we can still use CoalesceBatchesExec, - // but do NOT wrap it in a RepartitionExec - let coalesced_left = Arc::new(CoalesceBatchesExec::new(infinite_left, 8_192)); - let coalesced_right = Arc::new(CoalesceBatchesExec::new(infinite_right, 8_192)); - - // 2c) Directly feed `coalesced_left` and `coalesced_right` into HashJoinExec. - // Do not use aggregation or repartition. - let join = Arc::new(HashJoinExec::try_new( - coalesced_left, - coalesced_right, - vec![( - Arc::new(Column::new_with_schema("value", &schema)?), - Arc::new(Column::new_with_schema("value", &schema)?), - )], - /* filter */ None, - &JoinType::Inner, - /* output64 */ None, - // Using CollectLeft is fine—just avoid RepartitionExec’s partitioned channels. - PartitionMode::CollectLeft, - /* build_left */ NullEquality::NullEqualsNull, - )?); - - // 3) Do not apply InsertYieldExec—since there is no aggregation, InsertYieldExec would - // not insert a 'final' yield wrapper for the Join. If you want to skip InsertYieldExec - // entirely, comment out the next line; however, not calling it is equivalent - // because there is no aggregation so no wrapper is inserted. Here we simply do - // not call InsertYieldExec, ensuring the plan has neither aggregation nor repartition. - let config = ConfigOptions::new(); - let optimized = InsertYieldExec::new().optimize(join, &config)?; - - // 4) Execute with a 1 second timeout - let mut stream = physical_plan::execute_stream(optimized, session_ctx.task_ctx())?; - const TIMEOUT_SEC: u64 = 1; - let result = select! { - batch_opt = stream.next() => batch_opt, - _ = tokio::time::sleep(tokio::time::Duration::from_secs(TIMEOUT_SEC)) => { - None - } - }; - - assert!( - result.is_none(), - "Expected no output for infinite + filter(all-false) + aggregate, but got a batch" - ); - Ok(()) -} - -#[rstest] -#[tokio::test] -async fn test_infinite_sort_merge_join_without_repartition_and_no_agg( - #[values(false, true)] pretend_finite: bool, -) -> Result<(), Box> { - // 1) Create Session, schema, and two small RecordBatches that never overlap: - // Left = [-3, -2, -1], Right = [0, 1, 2] - let session_ctx = SessionContext::new(); - let schema: SchemaRef = Arc::new(Schema::new(vec![Field::new( - "value", - DataType::Int64, - false, - )])); - - let left_array = { - let mut b = Int64Array::builder(3); - b.append_value(-3); - b.append_value(-2); - b.append_value(-1); - Arc::new(b.finish()) as Arc - }; - let right_array = { - let mut b = Int64Array::builder(3); - b.append_value(0); - b.append_value(1); - b.append_value(2); - Arc::new(b.finish()) as Arc - }; - let batch_left = RecordBatch::try_new(schema.clone(), vec![left_array])?; - let batch_right = RecordBatch::try_new(schema.clone(), vec![right_array])?; - - // 2a) Wrap each small batch in an InfiniteExec (pretend_finite toggles finite vs infinite) - let infinite_left = - make_lazy_exec(batch_left.clone(), schema.clone(), pretend_finite); - let infinite_right = - make_lazy_exec(batch_right.clone(), schema.clone(), pretend_finite); - - // 2b) Coalesce each InfiniteExec into a single 3-row batch at a time. - // (Do NOT wrap in RepartitionExec.) - let coalesced_left = Arc::new(CoalesceBatchesExec::new(infinite_left, 3)); - let coalesced_right = Arc::new(CoalesceBatchesExec::new(infinite_right, 3)); - - // 2c) Build a SortMergeJoinExec on “value”. Since left values < 0 and - // right values ≥ 0, they never match. No aggregation or repartition. - // - // We need a Vec for the join key. Any consistent SortOptions works, - // because data is already in ascending order on “value.” - let join = Arc::new(SortMergeJoinExec::try_new( - coalesced_left, - coalesced_right, - vec![(col("value", &schema)?, col("value", &schema)?)], - /* filter */ None, - JoinType::Inner, - vec![SortOptions::new(true, false)], // ascending, nulls last - /* null_equality */ NullEquality::NullEqualsNull, - )?); - - // 3) Do not apply InsertYieldExec (no aggregation, no repartition → no built-in yields). - let config = ConfigOptions::new(); - let optimized = InsertYieldExec::new().optimize(join, &config)?; - - // 4) Execute with a 1-second timeout. Because both sides are infinite and never match, - // the SortMergeJoin will never produce output within 1s. - let mut stream = physical_plan::execute_stream(optimized, session_ctx.task_ctx())?; - const TIMEOUT_SEC: u64 = 1; - let result = select! { - batch_opt = stream.next() => batch_opt, - _ = tokio::time::sleep(tokio::time::Duration::from_secs(TIMEOUT_SEC)) => None, - }; - - assert!( - result.is_none(), - "Expected no output for infinite SortMergeJoin (no repartition & no aggregation), but got a batch", - ); - Ok(()) -} diff --git a/datafusion/core/tests/execution/mod.rs b/datafusion/core/tests/execution/mod.rs index 333a695dca8e..f367a29017a3 100644 --- a/datafusion/core/tests/execution/mod.rs +++ b/datafusion/core/tests/execution/mod.rs @@ -15,5 +15,5 @@ // specific language governing permissions and limitations // under the License. -mod infinite_cancel; +mod coop; mod logical_plan; diff --git a/datafusion/core/tests/user_defined/insert_operation.rs b/datafusion/core/tests/user_defined/insert_operation.rs index 2c90abeb8047..c8a4279a4211 100644 --- a/datafusion/core/tests/user_defined/insert_operation.rs +++ b/datafusion/core/tests/user_defined/insert_operation.rs @@ -26,6 +26,7 @@ use datafusion::{ use datafusion_catalog::{Session, TableProvider}; use datafusion_expr::{dml::InsertOp, Expr, TableType}; use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion_physical_plan::execution_plan::SchedulingType; use datafusion_physical_plan::{ execution_plan::{Boundedness, EmissionType}, DisplayAs, ExecutionPlan, PlanProperties, @@ -132,7 +133,8 @@ impl TestInsertExec { Partitioning::UnknownPartitioning(1), EmissionType::Incremental, Boundedness::Bounded, - ), + ) + .with_scheduling_type(SchedulingType::Cooperative), } } } @@ -179,10 +181,6 @@ impl ExecutionPlan for TestInsertExec { ) -> Result { unimplemented!("TestInsertExec is a stub for testing.") } - - fn with_cooperative_yields(self: Arc) -> Option> { - Some(self) - } } fn make_count_schema() -> SchemaRef { diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 5dca952d0c60..431b6ab0bcf0 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -64,6 +64,8 @@ use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, }; +use datafusion_physical_plan::coop::cooperative; +use datafusion_physical_plan::execution_plan::SchedulingType; use log::{debug, warn}; /// The base configurations for a [`DataSourceExec`], the a physical plan for @@ -487,7 +489,7 @@ impl DataSource for FileScanConfig { let opener = source.create_file_opener(object_store, self, partition); let stream = FileStream::new(self, partition, opener, source.metrics())?; - Ok(Box::pin(stream)) + Ok(Box::pin(cooperative(stream))) } fn as_any(&self) -> &dyn Any { @@ -556,6 +558,10 @@ impl DataSource for FileScanConfig { .with_constraints(constraints) } + fn scheduling_type(&self) -> SchedulingType { + SchedulingType::Cooperative + } + fn statistics(&self) -> Result { Ok(self.projected_stats()) } diff --git a/datafusion/datasource/src/memory.rs b/datafusion/datasource/src/memory.rs index 98c7bd273fdf..f5eb354ea13f 100644 --- a/datafusion/datasource/src/memory.rs +++ b/datafusion/datasource/src/memory.rs @@ -45,6 +45,8 @@ use datafusion_physical_plan::{ }; use async_trait::async_trait; +use datafusion_physical_plan::coop::cooperative; +use datafusion_physical_plan::execution_plan::SchedulingType; use futures::StreamExt; use itertools::Itertools; use tokio::sync::RwLock; @@ -77,14 +79,14 @@ impl DataSource for MemorySourceConfig { partition: usize, _context: Arc, ) -> Result { - Ok(Box::pin( + Ok(Box::pin(cooperative( MemoryStream::try_new( self.partitions[partition].clone(), Arc::clone(&self.projected_schema), self.projection.clone(), )? .with_fetch(self.fetch), - )) + ))) } fn as_any(&self) -> &dyn Any { @@ -188,6 +190,10 @@ impl DataSource for MemorySourceConfig { ) } + fn scheduling_type(&self) -> SchedulingType { + SchedulingType::Cooperative + } + fn statistics(&self) -> Result { Ok(common::compute_record_batch_statistics( &self.partitions, diff --git a/datafusion/datasource/src/sink.rs b/datafusion/datasource/src/sink.rs index d89e2ea5fc38..b8c5b42bf767 100644 --- a/datafusion/datasource/src/sink.rs +++ b/datafusion/datasource/src/sink.rs @@ -36,6 +36,7 @@ use datafusion_physical_plan::{ }; use async_trait::async_trait; +use datafusion_physical_plan::execution_plan::{EvaluationType, SchedulingType}; use futures::StreamExt; /// `DataSink` implements writing streams of [`RecordBatch`]es to @@ -141,6 +142,8 @@ impl DataSinkExec { input.pipeline_behavior(), input.boundedness(), ) + .with_scheduling_type(SchedulingType::Cooperative) + .with_evaluation_type(EvaluationType::Eager) } } @@ -246,10 +249,6 @@ impl ExecutionPlan for DataSinkExec { fn metrics(&self) -> Option { self.sink.metrics() } - - fn with_cooperative_yields(self: Arc) -> Option> { - Some(self) - } } /// Create a output record batch with a count diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index a0f49ad7b16c..4dda95b0856b 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -22,7 +22,9 @@ use std::fmt; use std::fmt::{Debug, Formatter}; use std::sync::Arc; -use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion_physical_plan::execution_plan::{ + Boundedness, EmissionType, SchedulingType, +}; use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use datafusion_physical_plan::projection::ProjectionExec; use datafusion_physical_plan::{ @@ -38,7 +40,6 @@ use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::filter_pushdown::{ ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, }; -use datafusion_physical_plan::yield_stream::wrap_yield_stream; /// A source of data, typically a list of files or memory /// @@ -144,6 +145,9 @@ pub trait DataSource: Send + Sync + Debug { fn output_partitioning(&self) -> Partitioning; fn eq_properties(&self) -> EquivalenceProperties; + fn scheduling_type(&self) -> SchedulingType { + SchedulingType::NonCooperative + } fn statistics(&self) -> Result; /// Return a copy of this DataSource with a new fetch limit fn with_fetch(&self, _limit: Option) -> Option>; @@ -186,8 +190,6 @@ pub struct DataSourceExec { data_source: Arc, /// Cached plan properties such as sort order cache: PlanProperties, - /// Indicates whether to enable cooperative yielding mode. - cooperative: bool, } impl DisplayAs for DataSourceExec { @@ -259,13 +261,7 @@ impl ExecutionPlan for DataSourceExec { partition: usize, context: Arc, ) -> Result { - self.data_source - .open(partition, Arc::clone(&context)) - .map(|stream| wrap_yield_stream(stream, &context, self.cooperative)) - } - - fn with_cooperative_yields(self: Arc) -> Option> { - self.cooperative.then_some(self) + self.data_source.open(partition, Arc::clone(&context)) } fn metrics(&self) -> Option { @@ -298,11 +294,7 @@ impl ExecutionPlan for DataSourceExec { let data_source = self.data_source.with_fetch(limit)?; let cache = self.cache.clone(); - Some(Arc::new(Self { - data_source, - cache, - cooperative: self.cooperative, - })) + Some(Arc::new(Self { data_source, cache })) } fn fetch(&self) -> Option { @@ -354,11 +346,7 @@ impl DataSourceExec { // Default constructor for `DataSourceExec`, setting the `cooperative` flag to `true`. pub fn new(data_source: Arc) -> Self { let cache = Self::compute_properties(Arc::clone(&data_source)); - Self { - data_source, - cache, - cooperative: true, - } + Self { data_source, cache } } /// Return the source object @@ -384,12 +372,6 @@ impl DataSourceExec { self } - /// Assign yielding mode - pub fn with_cooperative(mut self, cooperative: bool) -> Self { - self.cooperative = cooperative; - self - } - fn compute_properties(data_source: Arc) -> PlanProperties { PlanProperties::new( data_source.eq_properties(), @@ -397,6 +379,7 @@ impl DataSourceExec { EmissionType::Incremental, Boundedness::Bounded, ) + .with_scheduling_type(data_source.scheduling_type()) } /// Downcast the `DataSourceExec`'s `data_source` to a specific file source diff --git a/datafusion/physical-optimizer/src/ensure_coop.rs b/datafusion/physical-optimizer/src/ensure_coop.rs new file mode 100644 index 000000000000..0c0b63c0b3e7 --- /dev/null +++ b/datafusion/physical-optimizer/src/ensure_coop.rs @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! The [`EnsureCooperative`] optimizer rule inspects the physical plan to find all +//! portions of the plan that will not yield cooperatively. +//! It will insert `CooperativeExec` nodes where appropriate to ensure execution plans +//! always yield cooperatively. + +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; + +use crate::PhysicalOptimizerRule; + +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; +use datafusion_common::Result; +use datafusion_physical_plan::coop::CooperativeExec; +use datafusion_physical_plan::execution_plan::{EvaluationType, SchedulingType}; +use datafusion_physical_plan::ExecutionPlan; + +/// `EnsureCooperative` is a [`PhysicalOptimizerRule`] that inspects the physical plan for +/// sub plans that do not participate in cooperative scheduling. The plan is subdivided into sub +/// plans on eager evaluation boundaries. Leaf nodes and eager evaluation roots are checked +/// to see if they participate in cooperative scheduling. Those that do no are wrapped in +/// a [`CooperativeExec`] parent. +pub struct EnsureCooperative {} + +impl EnsureCooperative { + pub fn new() -> Self { + Self {} + } +} + +impl Default for EnsureCooperative { + fn default() -> Self { + Self::new() + } +} + +impl Debug for EnsureCooperative { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct(self.name()).finish() + } +} + +impl PhysicalOptimizerRule for EnsureCooperative { + fn name(&self) -> &str { + "EnsureCooperative" + } + + fn optimize( + &self, + plan: Arc, + _config: &ConfigOptions, + ) -> Result> { + plan.transform_up(|plan| { + let is_leaf = plan.children().is_empty(); + let is_exchange = plan.properties().evaluation_type == EvaluationType::Eager; + if (is_leaf || is_exchange) + && plan.properties().scheduling_type != SchedulingType::Cooperative + { + // Wrap non-cooperative leaves or eager evaluation roots in a cooperative exec to + // ensure the plans they participate in are properly cooperative. + Ok(Transformed::new( + Arc::new(CooperativeExec::new(Arc::clone(&plan))), + true, + TreeNodeRecursion::Continue, + )) + } else { + Ok(Transformed::no(plan)) + } + }) + .map(|t| t.data) + } + + fn schema_check(&self) -> bool { + // Wrapping a leaf in YieldStreamExec preserves the schema, so it is safe. + true + } +} + +#[cfg(test)] +mod tests { + use super::*; + use datafusion_common::config::ConfigOptions; + use datafusion_physical_plan::{displayable, test::scan_partitioned}; + use insta::assert_snapshot; + + #[tokio::test] + async fn test_cooperative_exec_for_custom_exec() { + let test_custom_exec = scan_partitioned(1); + let config = ConfigOptions::new(); + let optimized = EnsureCooperative::new() + .optimize(test_custom_exec, &config) + .unwrap(); + + let display = displayable(optimized.as_ref()).indent(true).to_string(); + // Use insta snapshot to ensure full plan structure + assert_snapshot!(display, @r###" + CooperativeExec + DataSourceExec: partitions=1, partition_sizes=[1] + "###); + } +} diff --git a/datafusion/physical-optimizer/src/insert_yield_exec.rs b/datafusion/physical-optimizer/src/insert_yield_exec.rs deleted file mode 100644 index 8ce893866d22..000000000000 --- a/datafusion/physical-optimizer/src/insert_yield_exec.rs +++ /dev/null @@ -1,122 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! The [`InsertYieldExec`] optimizer rule inspects the physical plan to find all leaf -//! nodes corresponding to tight-looping operators. It first attempts to replace -//! each leaf with a cooperative-yielding variant via `with_cooperative_yields`, -//! and only if no built-in variant exists does it wrap the node in a -//! [`YieldStreamExec`] operator to enforce periodic yielding, ensuring the plan -//! remains cancellation-friendly. - -use std::fmt::{Debug, Formatter}; -use std::sync::Arc; - -use crate::PhysicalOptimizerRule; - -use datafusion_common::config::ConfigOptions; -use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; -use datafusion_common::Result; -use datafusion_physical_plan::yield_stream::YieldStreamExec; -use datafusion_physical_plan::ExecutionPlan; - -/// `InsertYieldExec` is a [`PhysicalOptimizerRule`] that finds every leaf node in -/// the plan and replaces it with a variant that yields cooperatively if supported. -/// If the node does not provide a built-in yielding variant via -/// [`ExecutionPlan::with_cooperative_yields`], it is wrapped in a [`YieldStreamExec`] parent to -/// enforce a configured yield frequency. -pub struct InsertYieldExec {} - -impl InsertYieldExec { - pub fn new() -> Self { - Self {} - } -} - -impl Default for InsertYieldExec { - fn default() -> Self { - Self::new() - } -} - -impl Debug for InsertYieldExec { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("InsertYieldExec").finish() - } -} - -impl PhysicalOptimizerRule for InsertYieldExec { - fn name(&self) -> &str { - "insert_yield_exec" - } - - fn optimize( - &self, - plan: Arc, - config: &ConfigOptions, - ) -> Result> { - // Only activate if user has configured a non-zero yield frequency. - let yield_period = config.optimizer.yield_period; - if yield_period != 0 { - plan.transform_down(|plan| { - if !plan.children().is_empty() { - // Not a leaf, keep recursing down. - return Ok(Transformed::no(plan)); - } - // For leaf nodes, try to get a built-in cooperative-yielding variant. - let new_plan = Arc::clone(&plan) - .with_cooperative_yields() - .unwrap_or_else(|| { - // Only if no built-in variant exists, insert a `YieldStreamExec`. - Arc::new(YieldStreamExec::new(plan, yield_period)) - }); - Ok(Transformed::new(new_plan, true, TreeNodeRecursion::Jump)) - }) - .map(|t| t.data) - } else { - Ok(plan) - } - } - - fn schema_check(&self) -> bool { - // Wrapping a leaf in YieldStreamExec preserves the schema, so it is safe. - true - } -} - -#[cfg(test)] -mod tests { - use super::*; - use datafusion_common::config::ConfigOptions; - use datafusion_physical_plan::{displayable, test::scan_partitioned}; - use insta::assert_snapshot; - - #[tokio::test] - async fn test_yield_stream_exec_for_custom_exec() { - let test_custom_exec = scan_partitioned(1); - let config = ConfigOptions::new(); - let optimized = InsertYieldExec::new() - .optimize(test_custom_exec, &config) - .unwrap(); - - let display = displayable(optimized.as_ref()).indent(true).to_string(); - // Use insta snapshot to ensure full plan structure - assert_snapshot!(display, @r###" - YieldStreamExec frequency=64 - DataSourceExec: partitions=1, partition_sizes=[1] - "###); - } -} diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index f7b5bd584351..f8d7b3b74614 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -29,8 +29,8 @@ pub mod coalesce_batches; pub mod combine_partial_final_agg; pub mod enforce_distribution; pub mod enforce_sorting; +pub mod ensure_coop; pub mod filter_pushdown; -pub mod insert_yield_exec; pub mod join_selection; pub mod limit_pushdown; pub mod limited_distinct_aggregation; diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index aed81606919e..38ec92b7d116 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -25,8 +25,8 @@ use crate::coalesce_batches::CoalesceBatches; use crate::combine_partial_final_agg::CombinePartialFinalAggregate; use crate::enforce_distribution::EnforceDistribution; use crate::enforce_sorting::EnforceSorting; +use crate::ensure_coop::EnsureCooperative; use crate::filter_pushdown::FilterPushdown; -use crate::insert_yield_exec::InsertYieldExec; use crate::join_selection::JoinSelection; use crate::limit_pushdown::LimitPushdown; use crate::limited_distinct_aggregation::LimitedDistinctAggregation; @@ -140,7 +140,7 @@ impl PhysicalOptimizer { // are not present, the load of executors such as join or union will be // reduced by narrowing their input tables. Arc::new(ProjectionPushdown::new()), - Arc::new(InsertYieldExec::new()), + Arc::new(EnsureCooperative::new()), // This FilterPushdown handles dynamic filters that may have references to the source ExecutionPlan. // Therefore it should be run at the end of the optimization process since any changes to the plan may break the dynamic filter's references. // See `FilterPushdownPhase` for more details. diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 0fc68ae49775..095ee78cd0d6 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -36,6 +36,8 @@ workspace = true [features] force_hash_collisions = [] +tokio_coop = [] +tokio_coop_fallback = [] [lib] name = "datafusion_physical_plan" diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 114f830688c9..976ff70502b7 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -27,7 +27,7 @@ use super::{ DisplayAs, ExecutionPlanProperties, PlanProperties, SendableRecordBatchStream, Statistics, }; -use crate::execution_plan::CardinalityEffect; +use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType}; use crate::projection::{make_with_child, ProjectionExec}; use crate::{DisplayFormatType, ExecutionPlan, Partitioning}; @@ -72,6 +72,16 @@ impl CoalescePartitionsExec { /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties(input: &Arc) -> PlanProperties { + let input_partitions = input.output_partitioning().partition_count(); + let (drive, scheduling) = if input_partitions > 1 { + (EvaluationType::Eager, SchedulingType::Cooperative) + } else { + ( + input.properties().evaluation_type, + input.properties().scheduling_type, + ) + }; + // Coalescing partitions loses existing orderings: let mut eq_properties = input.equivalence_properties().clone(); eq_properties.clear_orderings(); @@ -82,6 +92,8 @@ impl CoalescePartitionsExec { input.pipeline_behavior(), input.boundedness(), ) + .with_evaluation_type(drive) + .with_scheduling_type(scheduling) } } diff --git a/datafusion/physical-plan/src/coop.rs b/datafusion/physical-plan/src/coop.rs new file mode 100644 index 000000000000..d55c7b8c97af --- /dev/null +++ b/datafusion/physical-plan/src/coop.rs @@ -0,0 +1,370 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utilities for improved cooperative scheduling. +//! +//! # Cooperative scheduling +//! +//! A single call to `poll_next` on a top-level `Stream` may potentially perform a lot of work +//! before it returns a `Poll::Pending`. Think for instance of calculating an aggregation over a +//! large dataset. +//! If a `Stream` runs for a long period of time without yielding back to the Tokio executor, +//! it can starve other tasks waiting on that executor to execute them. +//! Additionally, this prevents the query execution from being cancelled. +//! +//! To ensure that `Stream` implementations yield regularly, operators can insert explicit yield +//! points using the utilities in this module. For most operators this is **not** necessary. The +//! `Stream`s of the built-in DataFusion operators that generate (rather than manipulate) +//! `RecordBatch`es such as `DataSourceExec` and those that eagerly consume `RecordBatch`es +//! (for instance, `RepartitionExec`) contain yield points that will make most query `Stream`s yield +//! periodically. +//! +//! There are a couple of types of operators that _should_ insert yield points: +//! - New source operators that do not make use of Tokio resources +//! - Exchange like operators that do not use Tokio's `Channel` implementation to pass data between +//! tasks +//! +//! ## Adding yield points +//! +//! Yield points can be inserted manually using the facilities provided by the +//! [Tokio coop module](https://docs.rs/tokio/latest/tokio/task/coop/index.html) such as +//! [`tokio::task::coop::consume_budget`](https://docs.rs/tokio/latest/tokio/task/coop/fn.consume_budget.html). +//! +//! Another option is to use the wrapper `Stream` implementation provided by this module which will +//! consume a unit of task budget every time a `RecordBatch` is produced. +//! Wrapper `Stream`s can be created using the [`cooperative`] and [`make_cooperative`] functions. +//! +//! [`cooperative`] is a generic function that takes ownership of the wrapped [`RecordBatchStream`]. +//! This function has the benefit of not requiring an additional heap allocation and can avoid +//! dynamic dispatch. +//! +//! [`make_cooperative`] is a non-generic function that wraps a [`SendableRecordBatchStream`]. This +//! can be used to wrap dynamically typed, heap allocated [`RecordBatchStream`]s. +//! +//! ## Automatic cooperation +//! +//! The `EnsureCooperative` physical optimizer rule, which is included in the default set of +//! optimizer rules, inspects query plans for potential cooperative scheduling issues. +//! It injects the [`CooperativeExec`] wrapper `ExecutionPlan` into the query plan where necessary. +//! This `ExecutionPlan` uses [`make_cooperative`] to wrap the `Stream` of its input. +//! +//! The optimizer rule currently checks the plan for exchange-like operators and leave operators +//! that report [`SchedulingType::NonCooperative`] in their [plan properties](ExecutionPlan::properties). + +#[cfg(any( + datafusion_coop = "tokio_fallback", + not(any(datafusion_coop = "tokio", datafusion_coop = "per_stream")) +))] +use futures::Future; +use std::any::Any; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use crate::execution_plan::CardinalityEffect::{self, Equal}; +use crate::{ + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream, + SendableRecordBatchStream, +}; +use arrow::record_batch::RecordBatch; +use arrow_schema::Schema; +use datafusion_common::{internal_err, Result, Statistics}; +use datafusion_execution::TaskContext; + +use crate::execution_plan::SchedulingType; +use crate::stream::RecordBatchStreamAdapter; +use futures::{Stream, StreamExt}; + +/// A stream that passes record batches through unchanged while cooperating with the Tokio runtime. +/// It consumes cooperative scheduling budget for each returned [`RecordBatch`], +/// allowing other tasks to execute when the budget is exhausted. +/// +/// See the [module level documentation](crate::coop) for an in-depth discussion. +pub struct CooperativeStream +where + T: RecordBatchStream + Unpin, +{ + inner: T, + #[cfg(datafusion_coop = "per_stream")] + budget: u8, +} + +#[cfg(datafusion_coop = "per_stream")] +// Magic value that matches Tokio's task budget value +const YIELD_FREQUENCY: u8 = 128; + +impl CooperativeStream +where + T: RecordBatchStream + Unpin, +{ + /// Creates a new `CooperativeStream` that wraps the provided stream. + /// The resulting stream will cooperate with the Tokio scheduler by consuming a unit of + /// scheduling budget when the wrapped `Stream` returns a record batch. + pub fn new(inner: T) -> Self { + Self { + inner, + #[cfg(datafusion_coop = "per_stream")] + budget: YIELD_FREQUENCY, + } + } +} + +impl Stream for CooperativeStream +where + T: RecordBatchStream + Unpin, +{ + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + #[cfg(datafusion_coop = "tokio")] + { + // TODO this should be the default implementation + // Enable once https://github.com/tokio-rs/tokio/issues/7403 is merged and released + let coop = std::task::ready!(tokio::task::coop::poll_proceed(cx)); + let value = self.inner.poll_next_unpin(cx); + if value.is_ready() { + coop.made_progress(); + } + value + } + + #[cfg(any( + datafusion_coop = "tokio_fallback", + not(any(datafusion_coop = "tokio", datafusion_coop = "per_stream")) + ))] + { + // This is a temporary placeholder implementation that may have slightly + // worse performance compared to `poll_proceed` + if !tokio::task::coop::has_budget_remaining() { + cx.waker().wake_by_ref(); + return Poll::Pending; + } + + let value = self.inner.poll_next_unpin(cx); + if value.is_ready() { + // In contrast to `poll_proceed` we are not able to consume + // budget before proceeding to do work. Instead, we try to consume budget + // after the work has been done and just assume that that succeeded. + // The poll result is ignored because we don't want to discard + // or buffer the Ready result we got from the inner stream. + let consume = tokio::task::coop::consume_budget(); + let consume_ref = std::pin::pin!(consume); + let _ = consume_ref.poll(cx); + } + value + } + + #[cfg(datafusion_coop = "per_stream")] + { + if self.budget == 0 { + self.budget = YIELD_FREQUENCY; + cx.waker().wake_by_ref(); + return Poll::Pending; + } + + let value = { self.inner.poll_next_unpin(cx) }; + + if value.is_ready() { + self.budget -= 1; + } else { + self.budget = YIELD_FREQUENCY; + } + value + } + } +} + +impl RecordBatchStream for CooperativeStream +where + T: RecordBatchStream + Unpin, +{ + fn schema(&self) -> Arc { + self.inner.schema() + } +} + +/// An execution plan decorator that enables cooperative multitasking. +/// It wraps the streams produced by its input execution plan using the [`make_cooperative`] function, +/// which makes the stream participate in Tokio cooperative scheduling. +#[derive(Debug)] +pub struct CooperativeExec { + input: Arc, + properties: PlanProperties, +} + +impl CooperativeExec { + /// Creates a new `CooperativeExec` operator that wraps the given input execution plan. + pub fn new(input: Arc) -> Self { + let properties = input + .properties() + .clone() + .with_scheduling_type(SchedulingType::Cooperative); + + Self { input, properties } + } + + /// Returns a reference to the wrapped input execution plan. + pub fn input(&self) -> &Arc { + &self.input + } +} + +impl DisplayAs for CooperativeExec { + fn fmt_as( + &self, + _t: DisplayFormatType, + f: &mut std::fmt::Formatter<'_>, + ) -> std::fmt::Result { + write!(f, "CooperativeExec") + } +} + +impl ExecutionPlan for CooperativeExec { + fn name(&self) -> &str { + "CooperativeExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> Arc { + self.input.schema() + } + + fn properties(&self) -> &PlanProperties { + &self.properties + } + + fn maintains_input_order(&self) -> Vec { + self.input.maintains_input_order() + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + mut children: Vec>, + ) -> Result> { + if children.len() != 1 { + return internal_err!("CooperativeExec requires exactly one child"); + } + Ok(Arc::new(CooperativeExec::new(children.swap_remove(0)))) + } + + fn execute( + &self, + partition: usize, + task_ctx: Arc, + ) -> Result { + let child_stream = self.input.execute(partition, task_ctx)?; + Ok(make_cooperative(child_stream)) + } + + fn partition_statistics(&self, partition: Option) -> Result { + self.input.partition_statistics(partition) + } + + fn supports_limit_pushdown(&self) -> bool { + true + } + + fn cardinality_effect(&self) -> CardinalityEffect { + Equal + } +} + +/// Creates a [`CooperativeStream`] wrapper around the given [`RecordBatchStream`]. +/// This wrapper collaborates with the Tokio cooperative scheduler by consuming a unit of +/// scheduling budget for each returned record batch. +pub fn cooperative(stream: T) -> CooperativeStream +where + T: RecordBatchStream + Unpin + Send + 'static, +{ + CooperativeStream::new(stream) +} + +/// Wraps a `SendableRecordBatchStream` inside a [`CooperativeStream`] to enable cooperative multitasking. +/// Since `SendableRecordBatchStream` is a `dyn RecordBatchStream` this requires the use of dynamic +/// method dispatch. +/// When the stream type is statically known, consider use the generic [`cooperative`] function +/// to allow static method dispatch. +pub fn make_cooperative(stream: SendableRecordBatchStream) -> SendableRecordBatchStream { + // TODO is there a more elegant way to overload cooperative + Box::pin(cooperative(RecordBatchStreamAdapter::new( + stream.schema(), + stream, + ))) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::stream::RecordBatchStreamAdapter; + + use arrow_schema::SchemaRef; + + use futures::{stream, StreamExt}; + + // This is the hardcoded value Tokio uses + const TASK_BUDGET: usize = 128; + + /// Helper: construct a SendableRecordBatchStream containing `n` empty batches + fn make_empty_batches(n: usize) -> SendableRecordBatchStream { + let schema: SchemaRef = Arc::new(Schema::empty()); + let schema_for_stream = Arc::clone(&schema); + + let s = + stream::iter((0..n).map(move |_| { + Ok(RecordBatch::new_empty(Arc::clone(&schema_for_stream))) + })); + + Box::pin(RecordBatchStreamAdapter::new(schema, s)) + } + + #[tokio::test] + async fn yield_less_than_threshold() -> Result<()> { + let count = TASK_BUDGET - 10; + let inner = make_empty_batches(count); + let out = make_cooperative(inner).collect::>().await; + assert_eq!(out.len(), count); + Ok(()) + } + + #[tokio::test] + async fn yield_equal_to_threshold() -> Result<()> { + let count = TASK_BUDGET; + let inner = make_empty_batches(count); + let out = make_cooperative(inner).collect::>().await; + assert_eq!(out.len(), count); + Ok(()) + } + + #[tokio::test] + async fn yield_more_than_threshold() -> Result<()> { + let count = TASK_BUDGET + 20; + let inner = make_empty_batches(count); + let out = make_cooperative(inner).collect::>().await; + assert_eq!(out.len(), count); + Ok(()) + } +} diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index 68f9eb22e330..40b4ec61dc10 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -33,6 +33,7 @@ use datafusion_common::{internal_err, Result}; use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; +use crate::execution_plan::SchedulingType; use log::trace; /// Execution plan for empty relation with produce_one_row=false @@ -81,6 +82,7 @@ impl EmptyExec { EmissionType::Incremental, Boundedness::Bounded, ) + .with_scheduling_type(SchedulingType::Cooperative) } } @@ -173,10 +175,6 @@ impl ExecutionPlan for EmptyExec { None, )) } - - fn with_cooperative_yields(self: Arc) -> Option> { - Some(self) - } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 75f1c90820e1..2605e26c3c7f 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -42,8 +42,6 @@ use crate::coalesce_partitions::CoalescePartitionsExec; use crate::display::DisplayableExecutionPlan; use crate::metrics::MetricsSet; use crate::projection::ProjectionExec; -use crate::repartition::RepartitionExec; -use crate::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::stream::RecordBatchStreamAdapter; use arrow::array::{Array, RecordBatch}; @@ -271,11 +269,13 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// batch is superlinear. See this [general guideline][async-guideline] for more context /// on this point, which explains why one should avoid spending a long time without /// reaching an `await`/yield point in asynchronous runtimes. - /// This can be achieved by manually returning [`Poll::Pending`] and setting up wakers - /// appropriately, or the use of [`tokio::task::yield_now()`] when appropriate. + /// This can be achieved by using the utilities from the [`coop`](crate::coop) module, by + /// manually returning [`Poll::Pending`] and setting up wakers appropriately, or by calling + /// [`tokio::task::yield_now()`] when appropriate. /// In special cases that warrant manual yielding, determination for "regularly" may be - /// made using a timer (being careful with the overhead-heavy system call needed to - /// take the time), or by counting rows or batches. + /// made using the [Tokio task budget](https://docs.rs/tokio/latest/tokio/task/coop/index.html), + /// a timer (being careful with the overhead-heavy system call needed to take the time), or by + /// counting rows or batches. /// /// The [cancellation benchmark] tracks some cases of how quickly queries can /// be cancelled. @@ -570,16 +570,6 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { child_pushdown_result, )) } - - /// Returns a version of this plan that cooperates with the runtime via - /// built‐in yielding. If such a version doesn't exist, returns `None`. - /// You do not need to do provide such a version of a custom operator, - /// but DataFusion will utilize it while optimizing the plan if it exists. - fn with_cooperative_yields(self: Arc) -> Option> { - // Conservative default implementation assumes that a leaf does not - // cooperate with yielding. - None - } } /// [`ExecutionPlan`] Invariant Level @@ -754,6 +744,49 @@ pub enum EmissionType { Both, } +/// Represents whether an operator's `Stream` has been implemented to actively cooperate with the +/// Tokio scheduler or not. Please refer to the [`coop`](crate::coop) module for more details. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SchedulingType { + /// The stream generated by [`execute`](ExecutionPlan::execute) does not actively participate in + /// cooperative scheduling. This means the implementation of the `Stream` returned by + /// [`ExecutionPlan::execute`] does not contain explicit task budget consumption such as + /// [`tokio::task::coop::consume_budget`]. + /// + /// `NonCooperative` is the default value and is acceptable for most operators. Please refer to + /// the [`coop`](crate::coop) module for details on when it may be useful to use + /// `Cooperative` instead. + NonCooperative, + /// The stream generated by [`execute`](ExecutionPlan::execute) actively participates in + /// cooperative scheduling by consuming task budget when it was able to produce a + /// [`RecordBatch`]. + Cooperative, +} + +/// Represents how an operator's `Stream` implementation generates `RecordBatch`es. +/// +/// Most operators in DataFusion generate `RecordBatch`es when asked to do so by a call to +/// `Stream::poll_next`. This is known as demand-driven or lazy evaluation. +/// +/// Some operators like `Repartition` need to drive `RecordBatch` generation themselves though. This +/// is known as data-driven or eager evaluation. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum EvaluationType { + /// The stream generated by [`execute`](ExecutionPlan::execute) only generates `RecordBatch` + /// instances when it is demanded by invoking `Stream::poll_next`. + /// Filter, projection, and join are examples of such lazy operators. + /// + /// Lazy operators are also known as demand-driven operators. + Lazy, + /// The stream generated by [`execute`](ExecutionPlan::execute) eagerly generates `RecordBatch` + /// in one or more spawned Tokio tasks. Eager evaluation is only started the first time + /// `Stream::poll_next` is called. + /// Examples of eager operators are repartition, coalesce partitions, and sort preserving merge. + /// + /// Eager operators are also known as a data-driven operators. + Eager, +} + /// Utility to determine an operator's boundedness based on its children's boundedness. /// /// Assumes boundedness can be inferred from child operators: @@ -842,6 +875,8 @@ pub struct PlanProperties { pub emission_type: EmissionType, /// See [ExecutionPlanProperties::boundedness] pub boundedness: Boundedness, + pub evaluation_type: EvaluationType, + pub scheduling_type: SchedulingType, /// See [ExecutionPlanProperties::output_ordering] output_ordering: Option, } @@ -861,6 +896,8 @@ impl PlanProperties { partitioning, emission_type, boundedness, + evaluation_type: EvaluationType::Lazy, + scheduling_type: SchedulingType::NonCooperative, output_ordering, } } @@ -892,6 +929,22 @@ impl PlanProperties { self } + /// Set the [`SchedulingType`]. + /// + /// Defaults to [`SchedulingType::NonCooperative`] + pub fn with_scheduling_type(mut self, scheduling_type: SchedulingType) -> Self { + self.scheduling_type = scheduling_type; + self + } + + /// Set the [`EvaluationType`]. + /// + /// Defaults to [`EvaluationType::Lazy`] + pub fn with_evaluation_type(mut self, drive_type: EvaluationType) -> Self { + self.evaluation_type = drive_type; + self + } + /// Overwrite constraints with its new value. pub fn with_constraints(mut self, constraints: Constraints) -> Self { self.eq_properties = self.eq_properties.with_constraints(constraints); @@ -918,30 +971,12 @@ impl PlanProperties { /// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful /// especially for the distributed engine to judge whether need to deal with shuffling. -/// Currently there are 3 kinds of execution plan which needs data exchange +/// Currently, there are 3 kinds of execution plan which needs data exchange /// 1. RepartitionExec for changing the partition number between two `ExecutionPlan`s /// 2. CoalescePartitionsExec for collapsing all of the partitions into one without ordering guarantee /// 3. SortPreservingMergeExec for collapsing all of the sorted partitions into one with ordering guarantee pub fn need_data_exchange(plan: Arc) -> bool { - if let Some(repartition) = plan.as_any().downcast_ref::() { - !matches!( - repartition.properties().output_partitioning(), - Partitioning::RoundRobinBatch(_) - ) - } else if let Some(coalesce) = plan.as_any().downcast_ref::() - { - coalesce.input().output_partitioning().partition_count() > 1 - } else if let Some(sort_preserving_merge) = - plan.as_any().downcast_ref::() - { - sort_preserving_merge - .input() - .output_partitioning() - .partition_count() - > 1 - } else { - false - } + plan.properties().evaluation_type == EvaluationType::Eager } /// Returns a copy of this plan if we change any child according to the pointer comparison. diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 9e703ced1fc2..22ae859e8c5b 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -59,9 +59,11 @@ mod visitor; pub mod aggregates; pub mod analyze; +pub mod coalesce; pub mod coalesce_batches; pub mod coalesce_partitions; pub mod common; +pub mod coop; pub mod display; pub mod empty; pub mod execution_plan; @@ -91,6 +93,4 @@ pub mod udaf { pub use datafusion_physical_expr::aggregate::AggregateFunctionExpr; } -pub mod coalesce; pub mod test; -pub mod yield_stream; diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 91af03bf46df..3e5ea32a4cab 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -22,9 +22,9 @@ use std::fmt; use std::sync::Arc; use std::task::{Context, Poll}; -use crate::execution_plan::{Boundedness, EmissionType}; +use crate::coop::cooperative; +use crate::execution_plan::{Boundedness, EmissionType, SchedulingType}; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; -use crate::yield_stream::wrap_yield_stream; use crate::{ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, @@ -37,6 +37,7 @@ use datafusion_execution::memory_pool::MemoryReservation; use datafusion_execution::TaskContext; use datafusion_physical_expr::EquivalenceProperties; +use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use futures::Stream; use parking_lot::RwLock; @@ -133,6 +134,10 @@ impl RecordBatchStream for MemoryStream { } pub trait LazyBatchGenerator: Send + Sync + fmt::Debug + fmt::Display { + fn boundedness(&self) -> Boundedness { + Boundedness::Bounded + } + /// Generate the next batch, return `None` when no more batches are available fn generate_next_batch(&mut self) -> Result>; } @@ -148,8 +153,6 @@ pub struct LazyMemoryExec { batch_generators: Vec>>, /// Plan properties cache storing equivalence properties, partitioning, and execution mode cache: PlanProperties, - /// Indicates whether to enable cooperative yielding mode (defaults to `true`). - cooperative: bool, /// Execution metrics metrics: ExecutionPlanMetricsSet, } @@ -160,30 +163,61 @@ impl LazyMemoryExec { schema: SchemaRef, generators: Vec>>, ) -> Result { + let boundedness = generators + .iter() + .map(|g| g.read().boundedness()) + .reduce(|acc, b| match acc { + Boundedness::Bounded => b, + Boundedness::Unbounded { + requires_infinite_memory, + } => { + let acc_infinite_memory = requires_infinite_memory; + match b { + Boundedness::Bounded => acc, + Boundedness::Unbounded { + requires_infinite_memory, + } => Boundedness::Unbounded { + requires_infinite_memory: requires_infinite_memory + || acc_infinite_memory, + }, + } + } + }) + .unwrap_or(Boundedness::Bounded); + let cache = PlanProperties::new( EquivalenceProperties::new(Arc::clone(&schema)), Partitioning::RoundRobinBatch(generators.len()), EmissionType::Incremental, - Boundedness::Bounded, - ); + boundedness, + ) + .with_scheduling_type(SchedulingType::Cooperative); + Ok(Self { schema, batch_generators: generators, cache, - cooperative: true, // Cooperative yielding mode defaults to true metrics: ExecutionPlanMetricsSet::new(), }) } - /// Set the Yielding mode for the execution plan - /// It defaults to `true`, meaning it will yield back to the runtime for cooperative scheduling. - pub fn with_cooperative_yielding(mut self, cooperative: bool) -> Self { - self.cooperative = cooperative; - self + pub fn try_set_partitioning(&mut self, partitioning: Partitioning) -> Result<()> { + if partitioning.partition_count() != self.batch_generators.len() { + internal_err!( + "Partition count must match generator count: {} != {}", + partitioning.partition_count(), + self.batch_generators.len() + ) + } else { + self.cache.partitioning = partitioning; + Ok(()) + } } - pub fn set_boundedness(&mut self, boundedness: Boundedness) { - self.cache.boundedness = boundedness; + pub fn add_ordering(&mut self, ordering: impl IntoIterator) { + self.cache + .eq_properties + .add_orderings(std::iter::once(ordering)); } } @@ -263,7 +297,7 @@ impl ExecutionPlan for LazyMemoryExec { fn execute( &self, partition: usize, - context: Arc, + _context: Arc, ) -> Result { if partition >= self.batch_generators.len() { return internal_err!( @@ -275,16 +309,12 @@ impl ExecutionPlan for LazyMemoryExec { let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); - let stream = Box::pin(LazyMemoryStream { + let stream = LazyMemoryStream { schema: Arc::clone(&self.schema), generator: Arc::clone(&self.batch_generators[partition]), baseline_metrics, - }); - Ok(wrap_yield_stream(stream, &context, self.cooperative)) - } - - fn with_cooperative_yields(self: Arc) -> Option> { - self.cooperative.then_some(self) + }; + Ok(Box::pin(cooperative(stream))) } fn metrics(&self) -> Option { diff --git a/datafusion/physical-plan/src/placeholder_row.rs b/datafusion/physical-plan/src/placeholder_row.rs index a5c80438e774..6cd581700a88 100644 --- a/datafusion/physical-plan/src/placeholder_row.rs +++ b/datafusion/physical-plan/src/placeholder_row.rs @@ -20,9 +20,9 @@ use std::any::Any; use std::sync::Arc; -use crate::execution_plan::{Boundedness, EmissionType}; +use crate::coop::cooperative; +use crate::execution_plan::{Boundedness, EmissionType, SchedulingType}; use crate::memory::MemoryStream; -use crate::yield_stream::wrap_yield_stream; use crate::{ common, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, @@ -44,8 +44,6 @@ pub struct PlaceholderRowExec { /// Number of partitions partitions: usize, cache: PlanProperties, - /// Indicates whether to enable cooperative yielding mode. - cooperative: bool, } impl PlaceholderRowExec { @@ -57,7 +55,6 @@ impl PlaceholderRowExec { schema, partitions, cache, - cooperative: true, } } @@ -105,6 +102,7 @@ impl PlaceholderRowExec { EmissionType::Incremental, Boundedness::Bounded, ) + .with_scheduling_type(SchedulingType::Cooperative) } } @@ -164,8 +162,8 @@ impl ExecutionPlan for PlaceholderRowExec { ); } - MemoryStream::try_new(self.data()?, Arc::clone(&self.schema), None) - .map(|ms| wrap_yield_stream(Box::pin(ms), &context, self.cooperative)) + let ms = MemoryStream::try_new(self.data()?, Arc::clone(&self.schema), None)?; + Ok(Box::pin(cooperative(ms))) } fn statistics(&self) -> Result { @@ -185,10 +183,6 @@ impl ExecutionPlan for PlaceholderRowExec { None, )) } - - fn with_cooperative_yields(self: Arc) -> Option> { - self.cooperative.then_some(self) - } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index d872a84d7285..620bfa2809a9 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -30,7 +30,7 @@ use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; use super::{ DisplayAs, ExecutionPlanProperties, RecordBatchStream, SendableRecordBatchStream, }; -use crate::execution_plan::CardinalityEffect; +use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType}; use crate::hash_utils::create_hashes; use crate::metrics::BaselineMetrics; use crate::projection::{all_columns, make_with_child, update_expr, ProjectionExec}; @@ -886,6 +886,8 @@ impl RepartitionExec { input.pipeline_behavior(), input.boundedness(), ) + .with_scheduling_type(SchedulingType::Cooperative) + .with_evaluation_type(EvaluationType::Eager) } /// Specify if this repartitioning operation should preserve the order of diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 2944ac230f38..09ad71974e6c 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -35,6 +35,7 @@ use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_execution::TaskContext; use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequirements}; +use crate::execution_plan::{EvaluationType, SchedulingType}; use log::{debug, trace}; /// Sort preserving merge execution plan @@ -157,6 +158,16 @@ impl SortPreservingMergeExec { input: &Arc, ordering: LexOrdering, ) -> PlanProperties { + let input_partitions = input.output_partitioning().partition_count(); + let (drive, scheduling) = if input_partitions > 1 { + (EvaluationType::Eager, SchedulingType::Cooperative) + } else { + ( + input.properties().evaluation_type, + input.properties().scheduling_type, + ) + }; + let mut eq_properties = input.equivalence_properties().clone(); eq_properties.clear_per_partition_constants(); eq_properties.add_ordering(ordering); @@ -166,6 +177,8 @@ impl SortPreservingMergeExec { input.pipeline_behavior(), // Pipeline Behavior input.boundedness(), // Boundedness ) + .with_evaluation_type(drive) + .with_scheduling_type(scheduling) } } diff --git a/datafusion/physical-plan/src/spill/spill_manager.rs b/datafusion/physical-plan/src/spill/spill_manager.rs index 89fd12d39836..4f3afc5d1220 100644 --- a/datafusion/physical-plan/src/spill/spill_manager.rs +++ b/datafusion/physical-plan/src/spill/spill_manager.rs @@ -27,9 +27,9 @@ use datafusion_common::{config::SpillCompression, Result}; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::SendableRecordBatchStream; -use crate::{common::spawn_buffered, metrics::SpillMetrics}; - use super::{in_progress_spill_file::InProgressSpillFile, SpillReaderStream}; +use crate::coop::cooperative; +use crate::{common::spawn_buffered, metrics::SpillMetrics}; /// The `SpillManager` is responsible for the following tasks: /// - Reading and writing `RecordBatch`es to raw files based on the provided configurations. @@ -132,10 +132,10 @@ impl SpillManager { &self, spill_file_path: RefCountedTempFile, ) -> Result { - let stream = Box::pin(SpillReaderStream::new( + let stream = Box::pin(cooperative(SpillReaderStream::new( Arc::clone(&self.schema), spill_file_path, - )); + ))); Ok(spawn_buffered(stream, self.batch_read_buffer_capacity)) } diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index a3f593a06d72..d4e6ba4c96c7 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -22,15 +22,15 @@ use std::fmt::Debug; use std::sync::Arc; use super::{DisplayAs, DisplayFormatType, PlanProperties}; +use crate::coop::make_cooperative; use crate::display::{display_orderings, ProjectSchemaDisplay}; -use crate::execution_plan::{Boundedness, EmissionType}; +use crate::execution_plan::{Boundedness, EmissionType, SchedulingType}; use crate::limit::LimitStream; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use crate::projection::{ all_alias_free_columns, new_projections_for_columns, update_ordering, ProjectionExec, }; use crate::stream::RecordBatchStreamAdapter; -use crate::yield_stream::wrap_yield_stream; use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream}; use arrow::datatypes::{Schema, SchemaRef}; @@ -69,8 +69,6 @@ pub struct StreamingTableExec { limit: Option, cache: PlanProperties, metrics: ExecutionPlanMetricsSet, - /// Indicates whether to enable cooperative yielding mode. - cooperative: bool, } impl StreamingTableExec { @@ -115,7 +113,6 @@ impl StreamingTableExec { limit, cache, metrics: ExecutionPlanMetricsSet::new(), - cooperative: true, }) } @@ -172,6 +169,7 @@ impl StreamingTableExec { EmissionType::Incremental, boundedness, ) + .with_scheduling_type(SchedulingType::Cooperative) } } @@ -276,7 +274,7 @@ impl ExecutionPlan for StreamingTableExec { )), None => stream, }; - let stream = wrap_yield_stream(projected_stream, &ctx, self.cooperative); + let stream = make_cooperative(projected_stream); Ok(match self.limit { None => stream, @@ -339,13 +337,8 @@ impl ExecutionPlan for StreamingTableExec { limit, cache: self.cache.clone(), metrics: self.metrics.clone(), - cooperative: self.cooperative, })) } - - fn with_cooperative_yields(self: Arc) -> Option> { - self.cooperative.then_some(self) - } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index a5f094ffaf04..e6179cd75ffb 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -20,10 +20,10 @@ use std::any::Any; use std::sync::{Arc, Mutex}; -use crate::execution_plan::{Boundedness, EmissionType}; +use crate::coop::cooperative; +use crate::execution_plan::{Boundedness, EmissionType, SchedulingType}; use crate::memory::MemoryStream; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; -use crate::yield_stream::wrap_yield_stream; use crate::{ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream, Statistics, @@ -107,8 +107,6 @@ pub struct WorkTableExec { metrics: ExecutionPlanMetricsSet, /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanProperties, - /// Indicates whether to enable cooperative yielding mode. - cooperative: bool, } impl WorkTableExec { @@ -121,7 +119,6 @@ impl WorkTableExec { metrics: ExecutionPlanMetricsSet::new(), work_table: Arc::new(WorkTable::new()), cache, - cooperative: true, } } @@ -142,7 +139,6 @@ impl WorkTableExec { metrics: ExecutionPlanMetricsSet::new(), work_table, cache: self.cache.clone(), - cooperative: self.cooperative, } } @@ -154,6 +150,7 @@ impl WorkTableExec { EmissionType::Incremental, Boundedness::Bounded, ) + .with_scheduling_type(SchedulingType::Cooperative) } } @@ -210,7 +207,7 @@ impl ExecutionPlan for WorkTableExec { fn execute( &self, partition: usize, - context: Arc, + _context: Arc, ) -> Result { // WorkTable streams must be the plan base. if partition != 0 { @@ -220,12 +217,10 @@ impl ExecutionPlan for WorkTableExec { } let batch = self.work_table.take()?; - let stream = Box::pin( + let stream = MemoryStream::try_new(batch.batches, Arc::clone(&self.schema), None)? - .with_reservation(batch.reservation), - ); - // Cooperatively yield if asked to do so: - Ok(wrap_yield_stream(stream, &context, self.cooperative)) + .with_reservation(batch.reservation); + Ok(Box::pin(cooperative(stream))) } fn metrics(&self) -> Option { @@ -239,10 +234,6 @@ impl ExecutionPlan for WorkTableExec { fn partition_statistics(&self, _partition: Option) -> Result { Ok(Statistics::new_unknown(&self.schema())) } - - fn with_cooperative_yields(self: Arc) -> Option> { - self.cooperative.then_some(self) - } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/yield_stream.rs b/datafusion/physical-plan/src/yield_stream.rs deleted file mode 100644 index 0069b4b64d38..000000000000 --- a/datafusion/physical-plan/src/yield_stream.rs +++ /dev/null @@ -1,276 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::any::Any; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; - -use crate::execution_plan::CardinalityEffect::{self, Equal}; -use crate::{ - DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream, - SendableRecordBatchStream, -}; -use arrow::record_batch::RecordBatch; -use arrow_schema::Schema; -use datafusion_common::{internal_err, Result, Statistics}; -use datafusion_execution::TaskContext; - -use futures::{Stream, StreamExt}; - -/// An identity stream that passes batches through as is, but yields control -/// back to the runtime every `period` batches. This stream is useful to -/// construct a mechanism that allows operators that do not directly cooperate -/// with the runtime to check/support cancellation. -pub struct YieldStream { - inner: SendableRecordBatchStream, - batches_processed: usize, - period: usize, -} - -impl YieldStream { - pub fn new(inner: SendableRecordBatchStream, mut period: usize) -> Self { - if period == 0 { - period = usize::MAX; - } - Self { - inner, - batches_processed: 0, - period, - } - } -} - -impl Stream for YieldStream { - type Item = Result; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - if self.batches_processed >= self.period { - self.batches_processed = 0; - cx.waker().wake_by_ref(); - return Poll::Pending; - } - - let value = self.inner.poll_next_unpin(cx); - match value { - Poll::Ready(Some(Ok(_))) => { - self.batches_processed += 1; - } - Poll::Pending => { - self.batches_processed = 0; - } - _ => {} - } - value - } -} - -impl RecordBatchStream for YieldStream { - fn schema(&self) -> Arc { - self.inner.schema() - } -} - -/// This operator wraps any other execution plan and to "adapt" it to cooperate -/// with the runtime by yielding control back to the runtime every `frequency` -/// batches. This is useful for operators that do not natively support yielding -/// control, allowing them to be used in a runtime that requires yielding for -/// cancellation or other purposes. -/// -/// # Note -/// If your ExecutionPlan periodically yields control back to the scheduler -/// implement [`ExecutionPlan::with_cooperative_yields`] to avoid the need for this -/// node. -#[derive(Debug)] -pub struct YieldStreamExec { - /// The child execution plan that this operator "wraps" to make it - /// cooperate with the runtime. - child: Arc, - /// The frequency at which the operator yields control back to the runtime. - frequency: usize, -} - -impl YieldStreamExec { - /// Create a new `YieldStreamExec` operator that wraps the given child - /// execution plan and yields control back to the runtime every `frequency` - /// batches. - pub fn new(child: Arc, frequency: usize) -> Self { - Self { frequency, child } - } - - /// Returns the child execution plan this operator "wraps" to make it - /// cooperate with the runtime. - pub fn input(&self) -> &Arc { - &self.child - } - - /// Returns the period at which the operator yields control back to the - /// runtime. - pub fn yield_period(&self) -> usize { - self.frequency - } -} - -impl DisplayAs for YieldStreamExec { - fn fmt_as( - &self, - _t: DisplayFormatType, - f: &mut std::fmt::Formatter<'_>, - ) -> std::fmt::Result { - write!(f, "YieldStreamExec frequency={}", self.frequency) - } -} - -impl ExecutionPlan for YieldStreamExec { - fn name(&self) -> &str { - "YieldStreamExec" - } - - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> Arc { - self.child.schema() - } - - fn properties(&self) -> &PlanProperties { - self.child.properties() - } - - fn children(&self) -> Vec<&Arc> { - vec![&self.child] - } - - fn with_new_children( - self: Arc, - mut children: Vec>, - ) -> Result> { - if children.len() != 1 { - return internal_err!("YieldStreamExec requires exactly one child"); - } - Ok(Arc::new(YieldStreamExec::new( - children.swap_remove(0), - self.frequency, - ))) - } - - fn execute( - &self, - partition: usize, - task_ctx: Arc, - ) -> Result { - let child_stream = self.child.execute(partition, task_ctx)?; - let yield_stream = YieldStream::new(child_stream, self.frequency); - Ok(Box::pin(yield_stream)) - } - - fn partition_statistics(&self, partition: Option) -> Result { - self.child.partition_statistics(partition) - } - - fn maintains_input_order(&self) -> Vec { - self.child.maintains_input_order() - } - - fn supports_limit_pushdown(&self) -> bool { - true - } - - fn cardinality_effect(&self) -> CardinalityEffect { - Equal - } -} - -/// Wraps `stream` inside a `YieldStream` depending on the `cooperative` flag. -/// Yielding period is extracted from `context`. -pub fn wrap_yield_stream( - mut stream: SendableRecordBatchStream, - context: &TaskContext, - cooperative: bool, -) -> SendableRecordBatchStream { - if cooperative { - let period = context.session_config().options().optimizer.yield_period; - if period > 0 { - stream = Box::pin(YieldStream::new(stream, period)); - } - } - stream -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::stream::RecordBatchStreamAdapter; - - use arrow_schema::SchemaRef; - - use futures::{stream, StreamExt}; - - // Frequency testing: - // Number of batches to yield before yielding control back to the executor - const YIELD_BATCHES: usize = 64; - - /// Helper: construct a SendableRecordBatchStream containing `n` empty batches - fn make_empty_batches(n: usize) -> SendableRecordBatchStream { - let schema: SchemaRef = Arc::new(Schema::empty()); - let schema_for_stream = Arc::clone(&schema); - - let s = - stream::iter((0..n).map(move |_| { - Ok(RecordBatch::new_empty(Arc::clone(&schema_for_stream))) - })); - - Box::pin(RecordBatchStreamAdapter::new(schema, s)) - } - - #[tokio::test] - async fn yield_less_than_threshold() -> Result<()> { - let count = YIELD_BATCHES - 10; - let inner = make_empty_batches(count); - let out = YieldStream::new(inner, YIELD_BATCHES) - .collect::>() - .await; - assert_eq!(out.len(), count); - Ok(()) - } - - #[tokio::test] - async fn yield_equal_to_threshold() -> Result<()> { - let count = YIELD_BATCHES; - let inner = make_empty_batches(count); - let out = YieldStream::new(inner, YIELD_BATCHES) - .collect::>() - .await; - assert_eq!(out.len(), count); - Ok(()) - } - - #[tokio::test] - async fn yield_more_than_threshold() -> Result<()> { - let count = YIELD_BATCHES + 20; - let inner = make_empty_batches(count); - let out = YieldStream::new(inner, YIELD_BATCHES) - .collect::>() - .await; - assert_eq!(out.len(), count); - Ok(()) - } -} diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 1e1f91e07e29..64789f5de0d2 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -726,7 +726,7 @@ message PhysicalPlanNode { ParquetSinkExecNode parquet_sink = 29; UnnestExecNode unnest = 30; JsonScanExecNode json_scan = 31; - YieldStreamExecNode yield_stream = 32; + CooperativeExecNode cooperative = 32; } } @@ -1034,9 +1034,8 @@ message AvroScanExecNode { FileScanExecConf base_conf = 1; } -message YieldStreamExecNode { +message CooperativeExecNode { PhysicalPlanNode input = 1; - uint32 frequency = 2; } enum PartitionMode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 02a1cc70eeb9..92309ea6a5cb 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -2574,6 +2574,97 @@ impl<'de> serde::Deserialize<'de> for ColumnUnnestListRecursions { deserializer.deserialize_struct("datafusion.ColumnUnnestListRecursions", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for CooperativeExecNode { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.input.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.CooperativeExecNode", len)?; + if let Some(v) = self.input.as_ref() { + struct_ser.serialize_field("input", v)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for CooperativeExecNode { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "input", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Input, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "input" => Ok(GeneratedField::Input), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = CooperativeExecNode; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.CooperativeExecNode") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut input__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Input => { + if input__.is_some() { + return Err(serde::de::Error::duplicate_field("input")); + } + input__ = map_.next_value()?; + } + } + } + Ok(CooperativeExecNode { + input: input__, + }) + } + } + deserializer.deserialize_struct("datafusion.CooperativeExecNode", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for CopyToNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -15800,8 +15891,8 @@ impl serde::Serialize for PhysicalPlanNode { physical_plan_node::PhysicalPlanType::JsonScan(v) => { struct_ser.serialize_field("jsonScan", v)?; } - physical_plan_node::PhysicalPlanType::YieldStream(v) => { - struct_ser.serialize_field("yieldStream", v)?; + physical_plan_node::PhysicalPlanType::Cooperative(v) => { + struct_ser.serialize_field("cooperative", v)?; } } } @@ -15861,8 +15952,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { "unnest", "json_scan", "jsonScan", - "yield_stream", - "yieldStream", + "cooperative", ]; #[allow(clippy::enum_variant_names)] @@ -15897,7 +15987,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { ParquetSink, Unnest, JsonScan, - YieldStream, + Cooperative, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -15949,7 +16039,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { "parquetSink" | "parquet_sink" => Ok(GeneratedField::ParquetSink), "unnest" => Ok(GeneratedField::Unnest), "jsonScan" | "json_scan" => Ok(GeneratedField::JsonScan), - "yieldStream" | "yield_stream" => Ok(GeneratedField::YieldStream), + "cooperative" => Ok(GeneratedField::Cooperative), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -16182,11 +16272,11 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { physical_plan_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_plan_node::PhysicalPlanType::JsonScan) ; } - GeneratedField::YieldStream => { + GeneratedField::Cooperative => { if physical_plan_type__.is_some() { - return Err(serde::de::Error::duplicate_field("yieldStream")); + return Err(serde::de::Error::duplicate_field("cooperative")); } - physical_plan_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_plan_node::PhysicalPlanType::YieldStream) + physical_plan_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_plan_node::PhysicalPlanType::Cooperative) ; } } @@ -22685,113 +22775,3 @@ impl<'de> serde::Deserialize<'de> for WindowNode { deserializer.deserialize_struct("datafusion.WindowNode", FIELDS, GeneratedVisitor) } } -impl serde::Serialize for YieldStreamExecNode { - #[allow(deprecated)] - fn serialize(&self, serializer: S) -> std::result::Result - where - S: serde::Serializer, - { - use serde::ser::SerializeStruct; - let mut len = 0; - if self.input.is_some() { - len += 1; - } - if self.frequency != 0 { - len += 1; - } - let mut struct_ser = serializer.serialize_struct("datafusion.YieldStreamExecNode", len)?; - if let Some(v) = self.input.as_ref() { - struct_ser.serialize_field("input", v)?; - } - if self.frequency != 0 { - struct_ser.serialize_field("frequency", &self.frequency)?; - } - struct_ser.end() - } -} -impl<'de> serde::Deserialize<'de> for YieldStreamExecNode { - #[allow(deprecated)] - fn deserialize(deserializer: D) -> std::result::Result - where - D: serde::Deserializer<'de>, - { - const FIELDS: &[&str] = &[ - "input", - "frequency", - ]; - - #[allow(clippy::enum_variant_names)] - enum GeneratedField { - Input, - Frequency, - } - impl<'de> serde::Deserialize<'de> for GeneratedField { - fn deserialize(deserializer: D) -> std::result::Result - where - D: serde::Deserializer<'de>, - { - struct GeneratedVisitor; - - impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = GeneratedField; - - fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(formatter, "expected one of: {:?}", &FIELDS) - } - - #[allow(unused_variables)] - fn visit_str(self, value: &str) -> std::result::Result - where - E: serde::de::Error, - { - match value { - "input" => Ok(GeneratedField::Input), - "frequency" => Ok(GeneratedField::Frequency), - _ => Err(serde::de::Error::unknown_field(value, FIELDS)), - } - } - } - deserializer.deserialize_identifier(GeneratedVisitor) - } - } - struct GeneratedVisitor; - impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { - type Value = YieldStreamExecNode; - - fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - formatter.write_str("struct datafusion.YieldStreamExecNode") - } - - fn visit_map(self, mut map_: V) -> std::result::Result - where - V: serde::de::MapAccess<'de>, - { - let mut input__ = None; - let mut frequency__ = None; - while let Some(k) = map_.next_key()? { - match k { - GeneratedField::Input => { - if input__.is_some() { - return Err(serde::de::Error::duplicate_field("input")); - } - input__ = map_.next_value()?; - } - GeneratedField::Frequency => { - if frequency__.is_some() { - return Err(serde::de::Error::duplicate_field("frequency")); - } - frequency__ = - Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) - ; - } - } - } - Ok(YieldStreamExecNode { - input: input__, - frequency: frequency__.unwrap_or_default(), - }) - } - } - deserializer.deserialize_struct("datafusion.YieldStreamExecNode", FIELDS, GeneratedVisitor) - } -} diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index c1f8fa61f3b3..b0fc0ce60436 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1119,7 +1119,7 @@ pub mod physical_plan_node { #[prost(message, tag = "31")] JsonScan(super::JsonScanExecNode), #[prost(message, tag = "32")] - YieldStream(::prost::alloc::boxed::Box), + Cooperative(::prost::alloc::boxed::Box), } } #[derive(Clone, PartialEq, ::prost::Message)] @@ -1574,11 +1574,9 @@ pub struct AvroScanExecNode { pub base_conf: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] -pub struct YieldStreamExecNode { +pub struct CooperativeExecNode { #[prost(message, optional, boxed, tag = "1")] pub input: ::core::option::Option<::prost::alloc::boxed::Box>, - #[prost(uint32, tag = "2")] - pub frequency: u32, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct HashJoinExecNode { diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 3d541f54fe10..242b36786d07 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -64,6 +64,7 @@ use datafusion::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy}; use datafusion::physical_plan::analyze::AnalyzeExec; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion::physical_plan::coop::CooperativeExec; use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::explain::ExplainExec; use datafusion::physical_plan::expressions::PhysicalSortExpr; @@ -82,7 +83,6 @@ use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMerge use datafusion::physical_plan::union::{InterleaveExec, UnionExec}; use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec}; use datafusion::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; -use datafusion::physical_plan::yield_stream::YieldStreamExec; use datafusion::physical_plan::{ ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr, }; @@ -324,9 +324,9 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { runtime, extension_codec, ), - PhysicalPlanType::YieldStream(yield_stream) => self - .try_into_yield_stream_physical_plan( - yield_stream, + PhysicalPlanType::Cooperative(cooperative) => self + .try_into_cooperative_physical_plan( + cooperative, registry, runtime, extension_codec, @@ -520,8 +520,8 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { ); } - if let Some(exec) = plan.downcast_ref::() { - return protobuf::PhysicalPlanNode::try_from_yield_stream_exec( + if let Some(exec) = plan.downcast_ref::() { + return protobuf::PhysicalPlanNode::try_from_cooperative_exec( exec, extension_codec, ); @@ -1794,19 +1794,16 @@ impl protobuf::PhysicalPlanNode { ))) } - fn try_into_yield_stream_physical_plan( + fn try_into_cooperative_physical_plan( &self, - field_stream: &protobuf::YieldStreamExecNode, + field_stream: &protobuf::CooperativeExecNode, registry: &dyn FunctionRegistry, runtime: &RuntimeEnv, extension_codec: &dyn PhysicalExtensionCodec, ) -> Result> { let input = into_physical_plan(&field_stream.input, registry, runtime, extension_codec)?; - Ok(Arc::new(YieldStreamExec::new( - input, - field_stream.frequency as _, - ))) + Ok(Arc::new(CooperativeExec::new(input))) } fn try_from_explain_exec( @@ -2791,8 +2788,8 @@ impl protobuf::PhysicalPlanNode { }) } - fn try_from_yield_stream_exec( - exec: &YieldStreamExec, + fn try_from_cooperative_exec( + exec: &CooperativeExec, extension_codec: &dyn PhysicalExtensionCodec, ) -> Result { let input = protobuf::PhysicalPlanNode::try_from_physical_plan( @@ -2801,10 +2798,9 @@ impl protobuf::PhysicalPlanNode { )?; Ok(protobuf::PhysicalPlanNode { - physical_plan_type: Some(PhysicalPlanType::YieldStream(Box::new( - protobuf::YieldStreamExecNode { + physical_plan_type: Some(PhysicalPlanType::Cooperative(Box::new( + protobuf::CooperativeExecNode { input: Some(Box::new(input)), - frequency: exec.yield_period() as _, }, ))), }) diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index e65225084548..ae2ef67c041c 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -242,7 +242,7 @@ physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: [[W physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after LimitPushdown SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE -physical_plan after insert_yield_exec SAME TEXT AS ABOVE +physical_plan after EnsureCooperative SAME TEXT AS ABOVE physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true @@ -321,7 +321,7 @@ physical_plan after OutputRequirements physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan after ProjectionPushdown SAME TEXT AS ABOVE -physical_plan after insert_yield_exec SAME TEXT AS ABOVE +physical_plan after EnsureCooperative SAME TEXT AS ABOVE physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] @@ -364,7 +364,7 @@ physical_plan after OutputRequirements physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet physical_plan after ProjectionPushdown SAME TEXT AS ABOVE -physical_plan after insert_yield_exec SAME TEXT AS ABOVE +physical_plan after EnsureCooperative SAME TEXT AS ABOVE physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index bc9336dcc689..f76e436e0ad3 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -305,7 +305,6 @@ datafusion.optimizer.repartition_sorts true datafusion.optimizer.repartition_windows true datafusion.optimizer.skip_failed_rules false datafusion.optimizer.top_down_join_key_reordering true -datafusion.optimizer.yield_period 64 datafusion.sql_parser.collect_spans false datafusion.sql_parser.dialect generic datafusion.sql_parser.enable_ident_normalization true @@ -418,7 +417,6 @@ datafusion.optimizer.repartition_sorts true Should DataFusion execute sorts in a datafusion.optimizer.repartition_windows true Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level datafusion.optimizer.skip_failed_rules false When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail datafusion.optimizer.top_down_join_key_reordering true When set to true, the physical plan optimizer will run a top down process to reorder the join keys -datafusion.optimizer.yield_period 64 When DataFusion detects that a plan might not be promply cancellable due to the presence of tight-looping operators, it will attempt to mitigate this by inserting explicit yielding (in as few places as possible to avoid performance degradation). This value represents the yielding period (in batches) at such explicit yielding points. The default value is 64. If set to 0, no DataFusion will not perform any explicit yielding. datafusion.sql_parser.collect_spans false When set to true, the source locations relative to the original SQL query (i.e. [`Span`](https://docs.rs/sqlparser/latest/sqlparser/tokenizer/struct.Span.html)) will be collected and recorded in the logical plan nodes. datafusion.sql_parser.dialect generic Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks. datafusion.sql_parser.enable_ident_normalization true When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 23a35c896dc5..ea5ce7ddb079 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -121,7 +121,6 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). | | datafusion.optimizer.prefer_existing_union | false | When set to true, the optimizer will not attempt to convert Union to Interleave | | datafusion.optimizer.expand_views_at_output | false | When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. | -| datafusion.optimizer.yield_period | 64 | When DataFusion detects that a plan might not be promply cancellable due to the presence of tight-looping operators, it will attempt to mitigate this by inserting explicit yielding (in as few places as possible to avoid performance degradation). This value represents the yielding period (in batches) at such explicit yielding points. The default value is 64. If set to 0, no DataFusion will not perform any explicit yielding. | | datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | | datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | | datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |