diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 726015d17149..2bda99a2d3d6 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -413,6 +413,12 @@ config_namespace! { /// written, it may be necessary to increase this size to avoid errors from /// the remote end point. pub objectstore_writer_buffer_size: usize, default = 10 * 1024 * 1024 + + /// The maximum number of times pipeline breaking operators may poll + /// their children in a loop before yielding. Setting this to a value + /// greater than zero ensures running queries can be cancelled in a + /// timely fashion. + pub poll_budget: Option, default = Some(128) } } diff --git a/datafusion/core/tests/execution/mod.rs b/datafusion/core/tests/execution/mod.rs index 8169db1a4611..48b1d5cef4bc 100644 --- a/datafusion/core/tests/execution/mod.rs +++ b/datafusion/core/tests/execution/mod.rs @@ -16,3 +16,4 @@ // under the License. mod logical_plan; +mod yielding; diff --git a/datafusion/core/tests/execution/yielding.rs b/datafusion/core/tests/execution/yielding.rs new file mode 100644 index 000000000000..c77b2ab8921a --- /dev/null +++ b/datafusion/core/tests/execution/yielding.rs @@ -0,0 +1,436 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Int64Array, RecordBatch}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow_schema::SortOptions; +use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; +use datafusion::functions_aggregate::sum; +use datafusion::physical_expr::aggregate::AggregateExprBuilder; +use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion::physical_plan::aggregates::{ + AggregateExec, AggregateMode, PhysicalGroupBy, +}; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, +}; +use datafusion::prelude::SessionContext; +use datafusion::{common, physical_plan}; +use datafusion_common::JoinType; +use datafusion_execution::config::SessionConfig; +use datafusion_expr_common::operator::Operator; +use datafusion_functions_aggregate::min_max; +use datafusion_physical_expr::expressions::{binary, col, lit, Column}; +use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; +use datafusion_physical_plan::filter::FilterExec; +use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode, SortMergeJoinExec}; +use datafusion_physical_plan::sorts::sort::SortExec; +use futures::{Stream, StreamExt}; +use std::any::Any; +use std::error::Error; +use std::fmt::Formatter; +use std::ops::Range; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use tokio::runtime::{Handle, Runtime}; +use tokio::select; + +struct RangeStream { + schema: SchemaRef, + value_range: Range, + batch_size: usize, + poll_count: usize, +} + +impl RecordBatchStream for RangeStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +impl Stream for RangeStream { + type Item = common::Result; + + fn poll_next( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + self.poll_count += 1; + + let mut builder = Int64Array::builder(self.batch_size); + for _ in 0..self.batch_size { + match self.value_range.next() { + None => break, + Some(v) => builder.append_value(v), + } + } + let array = builder.finish(); + + if array.is_empty() { + return Poll::Ready(None); + } + + let batch = RecordBatch::try_new(self.schema(), vec![Arc::new(array)])?; + Poll::Ready(Some(Ok(batch))) + } +} + +#[derive(Debug)] +struct RangeExec { + value_range: Range, + properties: PlanProperties, +} + +impl RangeExec { + fn new(column_name: &str) -> Self { + Self::new_with_range(column_name, i64::MIN..i64::MAX) + } + + fn new_with_range(column_name: &str, values: Range) -> Self { + let schema: SchemaRef = Arc::new(Schema::new(vec![Field::new( + column_name, + DataType::Int64, + false, + )])); + + let orderings = vec![LexOrdering::new(vec![PhysicalSortExpr::new( + Arc::new(Column::new(column_name, 0)), + SortOptions::new(false, true), + )])]; + RangeExec { + value_range: values, + properties: PlanProperties::new( + EquivalenceProperties::new_with_orderings(schema.clone(), &orderings), + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + ), + } + } +} + +impl DisplayAs for RangeExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + write!(f, "RangeExec") + } +} + +impl ExecutionPlan for RangeExec { + fn name(&self) -> &str { + "RangeExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.properties + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> common::Result> { + Ok(self.clone()) + } + + fn execute( + &self, + _partition: usize, + context: Arc, + ) -> common::Result { + Ok(Box::pin(RangeStream { + schema: self.schema(), + value_range: self.value_range.clone(), + batch_size: context.session_config().batch_size(), + poll_count: 0, + })) + } +} + +#[tokio::test] +async fn test_agg_no_grouping_yields() -> Result<(), Box> { + // 1) build session & schema & sample batch + let config = SessionConfig::new(); + // config.options_mut().execution.poll_budget = None; + let session_ctx = SessionContext::new_with_config(config); + + // 2) set up an aggregation without grouping + let inf = Arc::new(RangeExec::new("value")); + let aggr = Arc::new(AggregateExec::try_new( + AggregateMode::Single, + PhysicalGroupBy::new(vec![], vec![], vec![]), + vec![Arc::new( + AggregateExprBuilder::new( + sum::sum_udaf(), + vec![col("value", &inf.schema())?], + ) + .schema(inf.schema()) + .alias("sum") + .build()?, + )], + vec![None], + inf.clone(), + inf.schema(), + )?); + + // 3) get the stream + let stream = physical_plan::execute_stream(aggr, session_ctx.task_ctx())?; + + test_stream_next_yields(stream).await +} + +#[tokio::test] +async fn test_agg_grouping_yields() -> Result<(), Box> { + // 1) build session & schema & sample batch + let config = SessionConfig::new(); + // config.options_mut().execution.poll_budget = None; + let session_ctx = SessionContext::new_with_config(config); + + // 2) set up an aggregation with grouping + let inf = Arc::new(RangeExec::new("value")); + + let value_col = col("value", &inf.schema())?; + let group = binary( + value_col.clone(), + Operator::Divide, + lit(1000000i64), + &inf.schema(), + )?; + + let aggr = Arc::new(AggregateExec::try_new( + AggregateMode::Single, + PhysicalGroupBy::new(vec![(group, "group".to_string())], vec![], vec![]), + vec![Arc::new( + AggregateExprBuilder::new(sum::sum_udaf(), vec![value_col.clone()]) + .schema(inf.schema()) + .alias("sum") + .build()?, + )], + vec![None], + inf.clone(), + inf.schema(), + )?); + + // 3) get the stream + let stream = physical_plan::execute_stream(aggr, session_ctx.task_ctx())?; + + test_stream_next_yields(stream).await +} + +#[tokio::test] +async fn test_agg_grouped_topk_yields() -> Result<(), Box> { + // 1) build session & schema & sample batch + let config = SessionConfig::new(); + // config.options_mut().execution.poll_budget = None; + let session_ctx = SessionContext::new_with_config(config); + + // 2) set up a top-k aggregation + let inf = Arc::new(RangeExec::new("value")); + + let value_col = col("value", &inf.schema())?; + let group = binary( + value_col.clone(), + Operator::Divide, + lit(1000000i64), + &inf.schema(), + )?; + + let aggr = Arc::new( + AggregateExec::try_new( + AggregateMode::Single, + PhysicalGroupBy::new( + vec![(group, "group".to_string())], + vec![], + vec![vec![false]], + ), + vec![Arc::new( + AggregateExprBuilder::new(min_max::max_udaf(), vec![value_col.clone()]) + .schema(inf.schema()) + .alias("max") + .build()?, + )], + vec![None], + inf.clone(), + inf.schema(), + )? + .with_limit(Some(100)), + ); + + // 3) get the stream + let stream = physical_plan::execute_stream(aggr, session_ctx.task_ctx())?; + + test_stream_next_yields(stream).await +} + +#[tokio::test] +async fn test_sort_yields() -> Result<(), Box> { + // 1) build session & schema & sample batch + let config = SessionConfig::new(); + // config.options_mut().execution.poll_budget = None; + let session_ctx = SessionContext::new_with_config(config); + + // 2) set up the infinite source + let inf = Arc::new(RangeExec::new("value")); + + // 3) set up a SortExec that will not be able to finish in time because input is very large + let sort_expr = PhysicalSortExpr::new( + col("value", &inf.schema())?, + SortOptions { + descending: true, + nulls_first: true, + }, + ); + // LexOrdering is just Vec + let lex_ordering: LexOrdering = vec![sort_expr].into(); + let sort_exec = Arc::new(SortExec::new(lex_ordering, inf.clone())); + + // 4) get the stream + let stream = physical_plan::execute_stream(sort_exec, session_ctx.task_ctx())?; + + test_stream_next_yields(stream).await +} + +#[tokio::test] +async fn test_filter_yields() -> Result<(), Box> { + // 1) build session & schema & sample batch + let config = SessionConfig::new(); + // config.options_mut().execution.poll_budget = None; + let session_ctx = SessionContext::new_with_config(config); + + // 2) set up the infinite source + let inf = Arc::new(RangeExec::new("value")); + + // 3) set up a FilterExec that will filter out entire batches + let filter_expr = binary( + col("value", &inf.schema())?, + Operator::Lt, + lit(i64::MIN), + &inf.schema(), + )?; + let filter = Arc::new(FilterExec::try_new(filter_expr, inf.clone())?); + + // 4) get the stream + let stream = physical_plan::execute_stream(filter, session_ctx.task_ctx())?; + + test_stream_next_yields(stream).await +} + +#[tokio::test] +async fn test_hash_join_yields() -> Result<(), Box> { + // 1) build session & schema & sample batch + let config = SessionConfig::new(); + // config.options_mut().execution.poll_budget = None; + let session_ctx = SessionContext::new_with_config(config); + + // 2) set up the join sources + let inf1 = Arc::new(RangeExec::new("value1")); + let inf2 = Arc::new(RangeExec::new("value2")); + + // 3) set up a HashJoinExec that will take a long time in the build phase + let join = Arc::new(HashJoinExec::try_new( + inf1.clone(), + inf2.clone(), + vec![( + col("value1", &inf1.schema())?, + col("value2", &inf2.schema())?, + )], + None, + &JoinType::Left, + None, + PartitionMode::CollectLeft, + true, + )?); + + // 4) get the stream + let stream = physical_plan::execute_stream(join, session_ctx.task_ctx())?; + + test_stream_next_yields(stream).await +} + +#[tokio::test] +async fn test_sort_merge_join_yields() -> Result<(), Box> { + // 1) build session & schema & sample batch + let config = SessionConfig::new(); + // config.options_mut().execution.poll_budget = None; + let session_ctx = SessionContext::new_with_config(config); + + // 2) set up the join sources + let inf1 = Arc::new(RangeExec::new_with_range("value1", i64::MIN..0)); + let inf2 = Arc::new(RangeExec::new_with_range("value2", 0..i64::MAX)); + + // 3) set up a SortMergeJoinExec that will take a long time skipping left side content to find + // the first right side match + let join = Arc::new(SortMergeJoinExec::try_new( + inf1.clone(), + inf2.clone(), + vec![( + col("value1", &inf1.schema())?, + col("value2", &inf2.schema())?, + )], + None, + JoinType::Inner, + vec![inf1.properties.eq_properties.output_ordering().unwrap()[0].options], + true, + )?); + + // 4) get the stream + let stream = physical_plan::execute_stream(join, session_ctx.task_ctx())?; + + test_stream_next_yields(stream).await +} + +async fn test_stream_next_yields( + mut stream: SendableRecordBatchStream, +) -> Result<(), Box> { + // Create an independent executor pool + let child_runtime = Runtime::new()?; + + // Spawn a task that tries to poll the stream + // The task returns Ready when the stream yielded with either Ready or Pending + let join_handle = child_runtime.spawn(std::future::poll_fn(move |cx| { + match stream.poll_next_unpin(cx) { + Poll::Ready(_) => Poll::Ready(Poll::Ready(())), + Poll::Pending => Poll::Ready(Poll::Pending), + } + })); + + let abort_handle = join_handle.abort_handle(); + + // Now select on the join handle of the task running in the child executor with a timeout + let yielded = select! { + _ = join_handle => true, + _ = tokio::time::sleep(tokio::time::Duration::from_secs(10)) => false + }; + + // Try to abort the poll task and shutdown the child runtime + abort_handle.abort(); + Handle::current().spawn_blocking(move || { + drop(child_runtime); + }); + + // Finally, check if poll_next yielded + assert!(yielded, "Task did not yield in a timely fashion"); + Ok(()) +} diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs b/datafusion/physical-plan/src/aggregates/no_grouping.rs index 9474a5f88c92..31ba3a3e49fb 100644 --- a/datafusion/physical-plan/src/aggregates/no_grouping.rs +++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs @@ -22,7 +22,7 @@ use crate::aggregates::{ AggregateMode, }; use crate::metrics::{BaselineMetrics, RecordOutput}; -use crate::{RecordBatchStream, SendableRecordBatchStream}; +use crate::RecordBatchStream; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::Result; @@ -33,12 +33,12 @@ use std::borrow::Cow; use std::sync::Arc; use std::task::{Context, Poll}; +use super::AggregateExec; use crate::filter::batch_filter; +use crate::r#yield::{PollBudget, YieldStream}; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use futures::stream::{Stream, StreamExt}; -use super::AggregateExec; - /// stream struct for aggregation without grouping columns pub(crate) struct AggregateStream { stream: BoxStream<'static, Result>, @@ -55,7 +55,7 @@ pub(crate) struct AggregateStream { struct AggregateStreamInner { schema: SchemaRef, mode: AggregateMode, - input: SendableRecordBatchStream, + input: YieldStream, baseline_metrics: BaselineMetrics, aggregate_expressions: Vec>>, filter_expressions: Vec>>, @@ -94,7 +94,7 @@ impl AggregateStream { let inner = AggregateStreamInner { schema: Arc::clone(&agg.schema), mode: agg.mode, - input, + input: PollBudget::from(context.as_ref()).wrap_stream(input), baseline_metrics, aggregate_expressions, filter_expressions, diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 62f541443068..263b39f15867 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -49,10 +49,11 @@ use datafusion_physical_expr::{GroupsAccumulatorAdapter, PhysicalSortExpr}; use super::order::GroupOrdering; use super::AggregateExec; +use crate::r#yield::PollBudget; use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use datafusion_physical_expr_common::sort_expr::LexOrdering; -use futures::ready; use futures::stream::{Stream, StreamExt}; +use futures::{ready, FutureExt}; use log::debug; #[derive(Debug, Clone)] @@ -432,6 +433,8 @@ pub(crate) struct GroupedHashAggregateStream { /// Execution metrics baseline_metrics: BaselineMetrics, + + poll_budget: PollBudget, } impl GroupedHashAggregateStream { @@ -620,6 +623,7 @@ impl GroupedHashAggregateStream { spill_state, group_values_soft_limit: agg.limit, skip_aggregation_probe, + poll_budget: PollBudget::from(context.as_ref()), }) } } @@ -653,6 +657,7 @@ impl Stream for GroupedHashAggregateStream { ) -> Poll> { let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); + let mut consume_budget = self.poll_budget.consume_budget(); loop { match &self.exec_state { ExecutionState::ReadingInput => 'reading_input: { @@ -815,6 +820,7 @@ impl Stream for GroupedHashAggregateStream { return Poll::Ready(None); } } + ready!(consume_budget.poll_unpin(cx)); } } } diff --git a/datafusion/physical-plan/src/aggregates/topk_stream.rs b/datafusion/physical-plan/src/aggregates/topk_stream.rs index bf02692486cc..4f92812aa623 100644 --- a/datafusion/physical-plan/src/aggregates/topk_stream.rs +++ b/datafusion/physical-plan/src/aggregates/topk_stream.rs @@ -22,6 +22,7 @@ use crate::aggregates::{ aggregate_expressions, evaluate_group_by, evaluate_many, AggregateExec, PhysicalGroupBy, }; +use crate::r#yield::PollBudget; use crate::{RecordBatchStream, SendableRecordBatchStream}; use arrow::array::{Array, ArrayRef, RecordBatch}; use arrow::datatypes::SchemaRef; @@ -31,10 +32,11 @@ use datafusion_common::Result; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; use futures::stream::{Stream, StreamExt}; +use futures::FutureExt; use log::{trace, Level}; use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; pub struct GroupedTopKAggregateStream { partition: usize, @@ -42,6 +44,7 @@ pub struct GroupedTopKAggregateStream { started: bool, schema: SchemaRef, input: SendableRecordBatchStream, + poll_budget: PollBudget, aggregate_arguments: Vec>>, group_by: PhysicalGroupBy, priority_map: PriorityMap, @@ -75,6 +78,7 @@ impl GroupedTopKAggregateStream { row_count: 0, schema: agg_schema, input, + poll_budget: PollBudget::from(context.as_ref()), aggregate_arguments, group_by, priority_map, @@ -111,7 +115,10 @@ impl Stream for GroupedTopKAggregateStream { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - while let Poll::Ready(res) = self.input.poll_next_unpin(cx) { + let mut consume_budget = self.poll_budget.consume_budget(); + + loop { + let res = ready!(self.input.poll_next_unpin(cx)); match res { // got a batch, convert to rows and append to our TreeMap Some(Ok(batch)) => { @@ -174,7 +181,8 @@ impl Stream for GroupedTopKAggregateStream { return Poll::Ready(Some(Err(e))); } } + + ready!(consume_budget.poll_unpin(cx)); } - Poll::Pending } } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 13129e382dec..99fe960f2488 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -61,7 +61,9 @@ use datafusion_physical_expr::{ analyze, conjunction, split_conjunction, AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, }; +use futures::FutureExt; +use crate::r#yield::PollBudget; use datafusion_physical_expr_common::physical_expr::fmt_sql; use futures::stream::{Stream, StreamExt}; use itertools::Itertools; @@ -395,10 +397,12 @@ impl ExecutionPlan for FilterExec { ) -> Result { trace!("Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id()); let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); + let poll_budget = PollBudget::from(context.as_ref()); Ok(Box::pin(FilterExecStream { schema: self.schema(), predicate: Arc::clone(&self.predicate), input: self.input.execute(partition, context)?, + poll_budget, baseline_metrics, projection: self.projection.clone(), })) @@ -643,6 +647,7 @@ struct FilterExecStream { predicate: Arc, /// The input partition to filter. input: SendableRecordBatchStream, + poll_budget: PollBudget, /// Runtime metrics recording baseline_metrics: BaselineMetrics, /// The projection indices of the columns in the input schema @@ -696,6 +701,7 @@ impl Stream for FilterExecStream { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { + let mut consume_budget = self.poll_budget.consume_budget(); let poll; loop { match ready!(self.input.poll_next_unpin(cx)) { @@ -710,6 +716,7 @@ impl Stream for FilterExecStream { timer.done(); // Skip entirely filtered batches if filtered_batch.num_rows() == 0 { + ready!(consume_budget.poll_unpin(cx)); continue; } poll = Poll::Ready(Some(Ok(filtered_batch))); diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 4d8c48c659ef..432d98ec08b2 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -46,6 +46,7 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::TaskContext; use datafusion_physical_expr::equivalence::join_equivalence_properties; +use crate::r#yield::{PollBudget, YieldStream}; use async_trait::async_trait; use futures::{ready, Stream, StreamExt, TryStreamExt}; @@ -188,7 +189,7 @@ impl CrossJoinExec { /// Asynchronously collect the result of the left child async fn load_left_input( - stream: SendableRecordBatchStream, + stream: YieldStream, metrics: BuildProbeJoinMetrics, reservation: MemoryReservation, ) -> Result { @@ -302,10 +303,10 @@ impl ExecutionPlan for CrossJoinExec { context.session_config().enforce_batch_size_in_joins(); let left_fut = self.left_fut.try_once(|| { + let poll_budget = PollBudget::from(context.as_ref()); let left_stream = self.left.execute(0, context)?; - Ok(load_left_input( - left_stream, + poll_budget.wrap_stream(left_stream), join_metrics.clone(), reservation, )) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 398c2fed7cdf..e0ea89887419 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -81,6 +81,7 @@ use datafusion_physical_expr::equivalence::{ use datafusion_physical_expr::PhysicalExprRef; use datafusion_physical_expr_common::datum::compare_op_for_nested; +use crate::r#yield::{PollBudget, YieldStream}; use ahash::RandomState; use datafusion_physical_expr_common::physical_expr::fmt_sql; use futures::{ready, Stream, StreamExt, TryStreamExt}; @@ -805,6 +806,7 @@ impl ExecutionPlan for HashJoinExec { let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); let left_fut = match self.mode { PartitionMode::CollectLeft => self.left_fut.try_once(|| { + let poll_budget = PollBudget::from(context.as_ref()); let left_stream = self.left.execute(0, Arc::clone(&context))?; let reservation = @@ -812,7 +814,7 @@ impl ExecutionPlan for HashJoinExec { Ok(collect_left_input( self.random_state.clone(), - left_stream, + poll_budget.wrap_stream(left_stream), on_left.clone(), join_metrics.clone(), reservation, @@ -821,6 +823,7 @@ impl ExecutionPlan for HashJoinExec { )) })?, PartitionMode::Partitioned => { + let poll_budget = PollBudget::from(context.as_ref()); let left_stream = self.left.execute(partition, Arc::clone(&context))?; let reservation = @@ -829,7 +832,7 @@ impl ExecutionPlan for HashJoinExec { OnceFut::new(collect_left_input( self.random_state.clone(), - left_stream, + poll_budget.wrap_stream(left_stream), on_left.clone(), join_metrics.clone(), reservation, @@ -950,7 +953,7 @@ impl ExecutionPlan for HashJoinExec { /// hash table (`LeftJoinData`) async fn collect_left_input( random_state: RandomState, - left_stream: SendableRecordBatchStream, + left_stream: YieldStream, on_left: Vec, metrics: BuildProbeJoinMetrics, reservation: MemoryReservation, diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index f87cf3d8864c..5029774f0dde 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -61,6 +61,7 @@ use datafusion_physical_expr::equivalence::{ join_equivalence_properties, ProjectionMapping, }; +use crate::r#yield::{PollBudget, YieldStream}; use futures::{ready, Stream, StreamExt, TryStreamExt}; use parking_lot::Mutex; @@ -497,10 +498,11 @@ impl ExecutionPlan for NestedLoopJoinExec { .register(context.memory_pool()); let inner_table = self.inner_table.try_once(|| { + let poll_budget = PollBudget::from(context.as_ref()); let stream = self.left.execute(0, Arc::clone(&context))?; Ok(collect_left_input( - stream, + poll_budget.wrap_stream(stream), join_metrics.clone(), load_reservation, need_produce_result_in_final(self.join_type), @@ -624,7 +626,7 @@ impl ExecutionPlan for NestedLoopJoinExec { /// Asynchronously collect input into a single batch, and creates `JoinLeftData` from it async fn collect_left_input( - stream: SendableRecordBatchStream, + stream: YieldStream, join_metrics: BuildProbeJoinMetrics, reservation: MemoryReservation, with_visited_left_side: bool, diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index cadd2b53ab11..4463935a8a91 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -31,7 +31,7 @@ use std::pin::Pin; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; use std::sync::Arc; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; use crate::execution_plan::{boundedness_from_children, EmissionType}; use crate::expressions::PhysicalSortExpr; @@ -74,7 +74,8 @@ use datafusion_physical_expr::PhysicalExprRef; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; -use futures::{Stream, StreamExt}; +use crate::r#yield::PollBudget; +use futures::{FutureExt, Stream, StreamExt}; /// Join execution plan that executes equi-join predicates on multiple partitions using Sort-Merge /// join algorithm and applies an optional filter post join. Can be used to join arbitrarily large @@ -499,6 +500,7 @@ impl ExecutionPlan for SortMergeJoinExec { SortMergeJoinMetrics::new(partition, &self.metrics), reservation, context.runtime_env(), + PollBudget::from(context.as_ref()), )?)) } @@ -908,6 +910,7 @@ struct SortMergeJoinStream { pub runtime_env: Arc, /// A unique number for each batch pub streamed_batch_counter: AtomicUsize, + pub poll_budget: PollBudget, } /// Joined batches with attached join filter information @@ -1125,6 +1128,9 @@ impl Stream for SortMergeJoinStream { ) -> Poll> { let join_time = self.join_metrics.join_time.clone(); let _timer = join_time.timer(); + + let mut consume_budget = self.poll_budget.consume_budget(); + loop { match &self.state { SortMergeJoinState::Init => { @@ -1204,7 +1210,9 @@ impl Stream for SortMergeJoinStream { .contains(&self.streamed_state) { match self.poll_streamed_row(cx)? { - Poll::Ready(_) => {} + Poll::Ready(_) => { + ready!(consume_budget.poll_unpin(cx)); + } Poll::Pending => return Poll::Pending, } } @@ -1213,7 +1221,9 @@ impl Stream for SortMergeJoinStream { .contains(&self.buffered_state) { match self.poll_buffered_batches(cx)? { - Poll::Ready(_) => {} + Poll::Ready(_) => { + ready!(consume_budget.poll_unpin(cx)); + } Poll::Pending => return Poll::Pending, } } @@ -1323,6 +1333,7 @@ impl SortMergeJoinStream { join_metrics: SortMergeJoinMetrics, reservation: MemoryReservation, runtime_env: Arc, + poll_budget: PollBudget, ) -> Result { let streamed_schema = streamed.schema(); let buffered_schema = buffered.schema(); @@ -1365,6 +1376,7 @@ impl SortMergeJoinStream { runtime_env, spill_manager, streamed_batch_counter: AtomicUsize::new(0), + poll_budget, }) } diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index ba423f958c78..f490d736e3ac 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -86,6 +86,7 @@ pub mod unnest; pub mod values; pub mod windows; pub mod work_table; +pub mod r#yield; pub mod udaf { pub use datafusion_expr::StatisticsArgs; pub use datafusion_physical_expr::aggregate::AggregateFunctionExpr; diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 683983d9e697..3b92657a26cd 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -19,11 +19,6 @@ //! It will do in-memory sorting if it has enough memory budget //! but spills to disk if needed. -use std::any::Any; -use std::fmt; -use std::fmt::{Debug, Formatter}; -use std::sync::Arc; - use crate::common::spawn_buffered; use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType}; use crate::expressions::PhysicalSortExpr; @@ -43,6 +38,10 @@ use crate::{ ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, }; +use std::any::Any; +use std::fmt; +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray}; use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays}; @@ -55,6 +54,7 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::LexOrdering; use datafusion_physical_expr_common::sort_expr::LexRequirement; +use crate::r#yield::PollBudget; use futures::{StreamExt, TryStreamExt}; use log::{debug, trace}; @@ -1093,7 +1093,7 @@ impl ExecutionPlan for SortExec { ) -> Result { trace!("Start SortExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id()); - let mut input = self.input.execute(partition, Arc::clone(&context))?; + let input = self.input.execute(partition, Arc::clone(&context))?; let execution_options = &context.session_config().options().execution; @@ -1108,6 +1108,7 @@ impl ExecutionPlan for SortExec { match (sort_satisfied, self.fetch.as_ref()) { (true, Some(fetch)) => Ok(Box::pin(LimitStream::new( + // limit is not a pipeline breaking stream, so poll budget is not required input, 0, Some(*fetch), @@ -1125,6 +1126,7 @@ impl ExecutionPlan for SortExec { context.runtime_env(), &self.metrics_set, )?; + let mut input = PollBudget::from(context.as_ref()).wrap_stream(input); Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), futures::stream::once(async move { @@ -1151,6 +1153,7 @@ impl ExecutionPlan for SortExec { &self.metrics_set, context.runtime_env(), )?; + let mut input = PollBudget::from(context.as_ref()).wrap_stream(input); Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), futures::stream::once(async move { diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 6751f9b20240..7963b4926878 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -63,8 +63,9 @@ use datafusion_physical_expr::window::{ use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; +use crate::r#yield::PollBudget; use futures::stream::Stream; -use futures::{ready, StreamExt}; +use futures::{ready, FutureExt, StreamExt}; use hashbrown::hash_table::HashTable; use indexmap::IndexMap; use log::debug; @@ -343,6 +344,7 @@ impl ExecutionPlan for BoundedWindowAggExec { partition: usize, context: Arc, ) -> Result { + let poll_budget = PollBudget::from(context.as_ref()); let input = self.input.execute(partition, context)?; let search_mode = self.get_search_algo()?; let stream = Box::pin(BoundedWindowAggStream::new( @@ -351,6 +353,7 @@ impl ExecutionPlan for BoundedWindowAggExec { input, BaselineMetrics::new(&self.metrics, partition), search_mode, + poll_budget, )?); Ok(stream) } @@ -916,6 +919,7 @@ pub struct BoundedWindowAggStream { /// Search mode for partition columns. This determines the algorithm with /// which we group each partition. search_mode: Box, + poll_budget: PollBudget, } impl BoundedWindowAggStream { @@ -959,6 +963,7 @@ impl BoundedWindowAggStream { input: SendableRecordBatchStream, baseline_metrics: BaselineMetrics, search_mode: Box, + poll_budget: PollBudget, ) -> Result { let state = window_expr.iter().map(|_| IndexMap::new()).collect(); let empty_batch = RecordBatch::new_empty(Arc::clone(&schema)); @@ -972,6 +977,7 @@ impl BoundedWindowAggStream { window_expr, baseline_metrics, search_mode, + poll_budget, }) } @@ -1017,36 +1023,41 @@ impl BoundedWindowAggStream { return Poll::Ready(None); } - let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); - match ready!(self.input.poll_next_unpin(cx)) { - Some(Ok(batch)) => { - // Start the timer for compute time within this operator. It will be - // stopped when dropped. - let _timer = elapsed_compute.timer(); - - self.search_mode.update_partition_batch( - &mut self.input_buffer, - batch, - &self.window_expr, - &mut self.partition_buffers, - )?; - if let Some(batch) = self.compute_aggregates()? { - return Poll::Ready(Some(Ok(batch))); - } - self.poll_next_inner(cx) - } - Some(Err(e)) => Poll::Ready(Some(Err(e))), - None => { - let _timer = elapsed_compute.timer(); + let mut consume_budget = self.poll_budget.consume_budget(); + + loop { + let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); + match ready!(self.input.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + // Start the timer for compute time within this operator. It will be + // stopped when dropped. + let _timer = elapsed_compute.timer(); + + self.search_mode.update_partition_batch( + &mut self.input_buffer, + batch, + &self.window_expr, + &mut self.partition_buffers, + )?; + if let Some(batch) = self.compute_aggregates()? { + return Poll::Ready(Some(Ok(batch))); + } - self.finished = true; - for (_, partition_batch_state) in self.partition_buffers.iter_mut() { - partition_batch_state.is_end = true; + ready!(consume_budget.poll_unpin(cx)); } - if let Some(batch) = self.compute_aggregates()? { - return Poll::Ready(Some(Ok(batch))); + Some(Err(e)) => return Poll::Ready(Some(Err(e))), + None => { + let _timer = elapsed_compute.timer(); + + self.finished = true; + for (_, partition_batch_state) in self.partition_buffers.iter_mut() { + partition_batch_state.is_end = true; + } + if let Some(batch) = self.compute_aggregates()? { + return Poll::Ready(Some(Ok(batch))); + } + return Poll::Ready(None); } - Poll::Ready(None) } } } diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index 4c76e2230875..0816352bba77 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -46,7 +46,8 @@ use datafusion_common::{internal_err, Result}; use datafusion_execution::TaskContext; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; -use futures::{ready, Stream, StreamExt}; +use crate::r#yield::PollBudget; +use futures::{ready, FutureExt, Stream, StreamExt}; /// Window execution plan #[derive(Debug, Clone)] @@ -272,6 +273,7 @@ impl ExecutionPlan for WindowAggExec { partition: usize, context: Arc, ) -> Result { + let poll_budget = PollBudget::from(context.as_ref()); let input = self.input.execute(partition, context)?; let stream = Box::pin(WindowAggStream::new( Arc::clone(&self.schema), @@ -280,6 +282,7 @@ impl ExecutionPlan for WindowAggExec { BaselineMetrics::new(&self.metrics, partition), self.partition_by_sort_keys()?, self.ordered_partition_by_indices.clone(), + poll_budget, )?); Ok(stream) } @@ -322,6 +325,7 @@ pub struct WindowAggStream { partition_by_sort_keys: LexOrdering, baseline_metrics: BaselineMetrics, ordered_partition_by_indices: Vec, + poll_budget: PollBudget, } impl WindowAggStream { @@ -333,6 +337,7 @@ impl WindowAggStream { baseline_metrics: BaselineMetrics, partition_by_sort_keys: LexOrdering, ordered_partition_by_indices: Vec, + poll_budget: PollBudget, ) -> Result { // In WindowAggExec all partition by columns should be ordered. if window_expr[0].partition_by().len() != ordered_partition_by_indices.len() { @@ -347,6 +352,7 @@ impl WindowAggStream { baseline_metrics, partition_by_sort_keys, ordered_partition_by_indices, + poll_budget, }) } @@ -418,10 +424,13 @@ impl WindowAggStream { return Poll::Ready(None); } + let mut consume_budget = self.poll_budget.consume_budget(); + loop { return Poll::Ready(Some(match ready!(self.input.poll_next_unpin(cx)) { Some(Ok(batch)) => { self.batches.push(batch); + ready!(consume_budget.poll_unpin(cx)); continue; } Some(Err(e)) => Err(e), diff --git a/datafusion/physical-plan/src/yield.rs b/datafusion/physical-plan/src/yield.rs new file mode 100644 index 000000000000..f01f1211209e --- /dev/null +++ b/datafusion/physical-plan/src/yield.rs @@ -0,0 +1,283 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Support types for to help physical operators implement cooperative yielding. +//! +//! This module provides utilities to help physical operators implement cooperative yielding. +//! This can be necessary to prevent blocking Tokio executor threads for an extended period of +//! time when an operator polls child operators in a loop. + +use arrow::array::RecordBatch; +use arrow_schema::Schema; +use datafusion_execution::config::SessionConfig; +use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; +use futures::{Stream, StreamExt}; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Poll::{Pending, Ready}; +use std::task::{Context, Poll}; + +/// A value describing the number of times an operator may poll a child operator without yielding. +/// Yielding means returning control to the calling code by returning any Poll value. +/// +/// When the budget is `None`, polling is unconstrained. +#[derive(Copy, Clone)] +pub struct PollBudget { + /// The maximum number of consecutive polls allowed, or `None` for unconstrained polling. + /// This value is guaranteed to never be zero. + budget: Option, +} + +impl PollBudget { + /// Creates a new `PollBudget` with the specified budget. + /// + /// # Arguments + /// + /// * `budget` - The maximum number of consecutive polls allowed, or `None` for unconstrained polling. + /// A value of `Some(0)` is treated as unconstrained. + /// + /// # Returns + /// + /// A new `PollBudget` instance. + pub fn new(budget: Option) -> Self { + match budget { + None => Self::unconstrained(), + Some(0) => Self::unconstrained(), + budget @ Some(_) => Self { budget }, + } + } + + /// Creates an unconstrained `PollBudget` that permits unlimited consecutive polls. + /// + /// # Returns + /// + /// A new unconstrained `PollBudget` instance. + pub fn unconstrained() -> Self { + Self { budget: None } + } + + /// Checks if this `PollBudget` is unconstrained. + /// + /// # Returns + /// + /// `true` if this budget is unconstrained, `false` otherwise. + pub fn is_unconstrained(&self) -> bool { + self.budget.is_none() + } + + /// Creates a `ConsumeBudget` future that can be used to track budget consumption. + /// This is typically used in `Stream` implementations that do not use `async`. + /// + /// # Examples + /// + /// ``` + /// use std::pin::Pin; + /// use std::task::{ready, Context, Poll}; + /// use arrow::record_batch::RecordBatch; + /// use futures::{FutureExt, Stream, StreamExt}; + /// use datafusion_execution::SendableRecordBatchStream; + /// use datafusion_physical_plan::r#yield::PollBudget; + /// + /// struct SkipEmptyStream { + /// input: SendableRecordBatchStream, + /// poll_budget: PollBudget, + /// } + /// + /// impl Stream for SkipEmptyStream { + /// type Item = datafusion_common::Result; + /// + /// fn poll_next( + /// mut self: Pin<&mut Self>, + /// cx: &mut Context<'_>, + /// ) -> Poll> { + /// // Create a new budget tracker on poll_next entry + /// let mut consume_budget = self.poll_budget.consume_budget(); + /// loop { + /// return match ready!(self.input.poll_next_unpin(cx)) { + /// Some(Ok(batch)) => { + /// if batch.num_rows() == 0 { + /// // Skip empty batches + /// // Consume budget since we're looping + /// // If the budget is depleted Pending will be returned + /// ready!(consume_budget.poll_unpin(cx)); + /// continue; + /// } + /// Poll::Ready(Some(Ok(batch))) + /// } + /// other @ _ => Poll::Ready(other), + /// } + /// } + /// } + /// } + /// ``` + /// + /// # Returns + /// + /// A new `ConsumeBudget` instance. + pub fn consume_budget(&self) -> ConsumeBudget { + ConsumeBudget { + remaining: self.budget, + } + } + + /// Wraps a record batch stream with a `YieldStream` that respects this budget. + /// + /// # Arguments + /// + /// * `inner` - The stream to wrap. + /// + /// # Returns + /// + /// A new `YieldStream` that wraps the input stream and respects this budget. + pub fn wrap_stream(self, inner: SendableRecordBatchStream) -> YieldStream { + YieldStream::new(inner, self) + } +} + +/// Creates a `PollBudget` from a session configuration. +/// +/// This implementation extracts the poll budget from the session configuration's +/// execution options. +impl From<&SessionConfig> for PollBudget { + fn from(session_config: &SessionConfig) -> Self { + Self::new(session_config.options().execution.poll_budget) + } +} + +/// Creates a `PollBudget` from a task context. +/// +/// This implementation extracts the poll budget from the task context's session +/// configuration. +impl From<&TaskContext> for PollBudget { + fn from(context: &TaskContext) -> Self { + Self::from(context.session_config()) + } +} + +/// A future that consumes a poll budget. +/// +/// When polled, this future will either: +/// - Complete immediately if the budget is unconstrained or has remaining capacity +/// - Yield to the executor if the budget has been exhausted +/// +/// This allows code to respect the poll budget by awaiting this future at appropriate points. +pub struct ConsumeBudget { + /// The remaining budget, or `None` if unconstrained. + remaining: Option, +} + +impl Future for ConsumeBudget { + type Output = (); + + /// Polls this future to completion. + /// + /// If the budget is unconstrained (`None`), this future completes immediately. + /// If the budget has remaining capacity, it decrements the budget and completes. + /// If the budget is exhausted (0), it wakes the task and returns `Pending`, + /// effectively yielding to the executor. + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.remaining { + None => Ready(()), + Some(remaining) => { + if remaining == 0 { + cx.waker().wake_by_ref(); + Pending + } else { + self.remaining = Some(remaining - 1); + Ready(()) + } + } + } + } +} + +/// A stream wrapper that respects a poll budget. +/// +/// This stream wraps another record batch stream and limits the number of consecutive +/// polls based on the configured budget. When the budget is exhausted, the stream +/// yields to the executor before continuing. +pub struct YieldStream { + /// The inner stream being wrapped. + inner: SendableRecordBatchStream, + /// The maximum number of consecutive polls allowed, or `None` for unconstrained polling. + budget: Option, + /// The remaining budget for the current polling cycle, or `None` if unconstrained. + remaining: Option, +} + +impl YieldStream { + /// Creates a new `YieldStream` that wraps the given stream with the specified budget. + /// + /// # Arguments + /// + /// * `inner` - The stream to wrap. + /// * `poll_budget` - The poll budget the stream should respect. + /// + /// # Returns + /// + /// A new `YieldStream` instance. + pub fn new(inner: SendableRecordBatchStream, poll_budget: PollBudget) -> Self { + let budget = poll_budget.budget; + Self { + inner, + budget, + remaining: budget, + } + } +} + +impl Stream for YieldStream { + type Item = datafusion_common::Result; + + /// Polls the stream for the next item, respecting the configured budget. + /// + /// If the budget is unconstrained (`None`), this delegates directly to the inner stream. + /// If the budget is exhausted (0), it resets the budget, wakes the task, and returns `Pending`, + /// effectively yielding to the executor. + /// Otherwise, it decrements the budget for each successful poll that returns an item. + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match self.remaining { + None => self.inner.poll_next_unpin(cx), + Some(0) => { + self.remaining = self.budget; + cx.waker().wake_by_ref(); + Pending + } + Some(remaining) => match self.inner.poll_next_unpin(cx) { + ready @ Ready(Some(_)) => { + self.remaining = Some(remaining - 1); + ready + } + other => { + self.remaining = self.budget; + other + } + }, + } + } +} + +impl RecordBatchStream for YieldStream { + /// Returns the schema of the inner stream. + fn schema(&self) -> Arc { + self.inner.schema() + } +} diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 108c844f20b4..d4f00cd420bf 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -257,6 +257,7 @@ datafusion.execution.parquet.statistics_truncate_length NULL datafusion.execution.parquet.write_batch_size 1024 datafusion.execution.parquet.writer_version 1.0 datafusion.execution.planning_concurrency 13 +datafusion.execution.poll_budget 128 datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 datafusion.execution.skip_physical_aggregate_schema_check false @@ -367,6 +368,7 @@ datafusion.execution.parquet.statistics_truncate_length NULL (writing) Sets stat datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in bytes datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer version valid values are "1.0" and "2.0" datafusion.execution.planning_concurrency 13 Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system +datafusion.execution.poll_budget 128 The maximum number of times pipeline breaking operators may poll their children in a loop before yielding. Setting this to a value greater than zero ensures running queries can be cancelled in a timely fashion. datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 Aggregation ratio (number of distinct groups / number of input rows) threshold for skipping partial aggregation. If the value is greater then partial aggregation will skip aggregation for further input datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode datafusion.execution.skip_physical_aggregate_schema_check false When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 6d65f54e228d..e6110f53e4ca 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -98,6 +98,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.use_row_number_estimates_to_optimize_partitioning | false | Should DataFusion use row number estimates at the input to decide whether increasing parallelism is beneficial or not. By default, only exact row numbers (not estimates) are used for this decision. Setting this flag to `true` will likely produce better plans. if the source of statistics is accurate. We plan to make this the default in the future. | | datafusion.execution.enforce_batch_size_in_joins | false | Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower. | | datafusion.execution.objectstore_writer_buffer_size | 10485760 | Size (bytes) of data buffer DataFusion uses when writing output files. This affects the size of the data chunks that are uploaded to remote object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being written, it may be necessary to increase this size to avoid errors from the remote end point. | +| datafusion.execution.poll_budget | 128 | The maximum number of times pipeline breaking operators may poll their children in a loop before yielding. Setting this to a value greater than zero ensures running queries can be cancelled in a timely fashion. | | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible |