diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 72d51cb15a88..68b9ec9dab94 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -296,9 +296,7 @@ config_namespace! { pub listing_table_ignore_subdirectory: bool, default = true /// Should DataFusion support recursive CTEs - /// Defaults to false since this feature is a work in progress and may not - /// behave as expected - pub enable_recursive_ctes: bool, default = false + pub enable_recursive_ctes: bool, default = true } } diff --git a/datafusion/core/tests/memory_limit.rs b/datafusion/core/tests/memory_limit.rs index 4735a97fee0c..ebc2456224da 100644 --- a/datafusion/core/tests/memory_limit.rs +++ b/datafusion/core/tests/memory_limit.rs @@ -301,6 +301,28 @@ async fn sort_spill_reservation() { test.with_config(config).with_expected_success().run().await; } +#[tokio::test] +async fn oom_recursive_cte() { + TestCase::new() + .with_query( + "WITH RECURSIVE nodes AS ( + SELECT 1 as id + UNION ALL + SELECT UNNEST(RANGE(id+1, id+1000)) as id + FROM nodes + WHERE id < 10 + ) + SELECT * FROM nodes;", + ) + .with_expected_errors(vec![ + "Resources exhausted: Failed to allocate additional", + "RecursiveQuery", + ]) + .with_memory_limit(2_000) + .run() + .await +} + /// Run the query with the specified memory limit, /// and verifies the expected errors are returned #[derive(Clone, Debug)] diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index ae3692ce1b03..795ec3c7315e 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -31,6 +31,7 @@ use super::{ use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::{internal_err, project_schema, Result}; +use datafusion_execution::memory_pool::MemoryReservation; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; @@ -236,6 +237,8 @@ impl MemoryExec { pub struct MemoryStream { /// Vector of record batches data: Vec, + /// Optional memory reservation bound to the data, freed on drop + reservation: Option, /// Schema representing the data schema: SchemaRef, /// Optional projection for which columns to load @@ -253,11 +256,18 @@ impl MemoryStream { ) -> Result { Ok(Self { data, + reservation: None, schema, projection, index: 0, }) } + + /// Set the memory reservation for the data + pub(super) fn with_reservation(mut self, reservation: MemoryReservation) -> Self { + self.reservation = Some(reservation); + self + } } impl Stream for MemoryStream { diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 2e4b97bc224b..68abc9653a8b 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -23,7 +23,7 @@ use std::task::{Context, Poll}; use super::{ metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, - work_table::{WorkTable, WorkTableExec}, + work_table::{ReservedBatches, WorkTable, WorkTableExec}, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; use crate::{DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan}; @@ -32,6 +32,7 @@ use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{not_impl_err, DataFusionError, Result}; +use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; @@ -236,6 +237,8 @@ struct RecursiveQueryStream { /// In-memory buffer for storing a copy of the current results. Will be /// cleared after each iteration. buffer: Vec, + /// Tracks the memory used by the buffer + reservation: MemoryReservation, // /// Metrics. _baseline_metrics: BaselineMetrics, } @@ -250,6 +253,8 @@ impl RecursiveQueryStream { baseline_metrics: BaselineMetrics, ) -> Self { let schema = static_stream.schema(); + let reservation = + MemoryConsumer::new("RecursiveQuery").register(task_context.memory_pool()); Self { task_context, work_table, @@ -258,6 +263,7 @@ impl RecursiveQueryStream { recursive_stream: None, schema, buffer: vec![], + reservation, _baseline_metrics: baseline_metrics, } } @@ -268,6 +274,10 @@ impl RecursiveQueryStream { mut self: std::pin::Pin<&mut Self>, batch: RecordBatch, ) -> Poll>> { + if let Err(e) = self.reservation.try_grow(batch.get_array_memory_size()) { + return Poll::Ready(Some(Err(e))); + } + self.buffer.push(batch.clone()); Poll::Ready(Some(Ok(batch))) } @@ -289,8 +299,11 @@ impl RecursiveQueryStream { } // Update the work table with the current buffer - let batches = self.buffer.drain(..).collect(); - self.work_table.write(batches); + let reserved_batches = ReservedBatches::new( + std::mem::take(&mut self.buffer), + self.reservation.take(), + ); + self.work_table.update(reserved_batches); // We always execute (and re-execute iteratively) the first partition. // Downstream plans should not expect any partitioning. diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index 0e138b8f74a7..f6fc0334dfc5 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -29,16 +29,34 @@ use crate::{DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProp use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; -use datafusion_common::{internal_err, Result}; +use datafusion_common::{internal_datafusion_err, internal_err, Result}; +use datafusion_execution::memory_pool::MemoryReservation; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; +/// A vector of record batches with a memory reservation. +#[derive(Debug)] +pub(super) struct ReservedBatches { + batches: Vec, + #[allow(dead_code)] + reservation: MemoryReservation, +} + +impl ReservedBatches { + pub(super) fn new(batches: Vec, reservation: MemoryReservation) -> Self { + ReservedBatches { + batches, + reservation, + } + } +} + /// The name is from PostgreSQL's terminology. /// See /// This table serves as a mirror or buffer between each iteration of a recursive query. #[derive(Debug)] pub(super) struct WorkTable { - batches: Mutex>>, + batches: Mutex>, } impl WorkTable { @@ -51,14 +69,17 @@ impl WorkTable { /// Take the previously written batches from the work table. /// This will be called by the [`WorkTableExec`] when it is executed. - fn take(&self) -> Vec { - let batches = self.batches.lock().unwrap().take().unwrap_or_default(); - batches + fn take(&self) -> Result { + self.batches + .lock() + .unwrap() + .take() + .ok_or_else(|| internal_datafusion_err!("Unexpected empty work table")) } - /// Write the results of a recursive query iteration to the work table. - pub(super) fn write(&self, input: Vec) { - self.batches.lock().unwrap().replace(input); + /// Update the results of a recursive query iteration to the work table. + pub(super) fn update(&self, batches: ReservedBatches) { + self.batches.lock().unwrap().replace(batches); } } @@ -175,13 +196,11 @@ impl ExecutionPlan for WorkTableExec { "WorkTableExec got an invalid partition {partition} (expected 0)" ); } - - let batches = self.work_table.take(); - Ok(Box::pin(MemoryStream::try_new( - batches, - self.schema.clone(), - None, - )?)) + let batch = self.work_table.take()?; + Ok(Box::pin( + MemoryStream::try_new(batch.batches, self.schema.clone(), None)? + .with_reservation(batch.reservation), + )) } fn metrics(&self) -> Option { @@ -194,4 +213,41 @@ impl ExecutionPlan for WorkTableExec { } #[cfg(test)] -mod tests {} +mod tests { + use super::*; + use arrow_array::{ArrayRef, Int32Array, RecordBatch}; + use datafusion_execution::memory_pool::{MemoryConsumer, UnboundedMemoryPool}; + use std::sync::Arc; + + #[test] + fn test_work_table() { + let work_table = WorkTable::new(); + // cann't take from empty work_table + assert!(work_table.take().is_err()); + + let pool = Arc::new(UnboundedMemoryPool::default()) as _; + let mut reservation = MemoryConsumer::new("test_work_table").register(&pool); + + // update batch to work_table + let array: ArrayRef = Arc::new((0..5).collect::()); + let batch = RecordBatch::try_from_iter(vec![("col", array)]).unwrap(); + reservation.try_grow(100).unwrap(); + work_table.update(ReservedBatches::new(vec![batch.clone()], reservation)); + // take from work_table + let reserved_batches = work_table.take().unwrap(); + assert_eq!(reserved_batches.batches, vec![batch.clone()]); + + // consume the batch by the MemoryStream + let memory_stream = + MemoryStream::try_new(reserved_batches.batches, batch.schema(), None) + .unwrap() + .with_reservation(reserved_batches.reservation); + + // should still be reserved + assert_eq!(pool.reserved(), 100); + + // the reservation should be freed after drop the memory_stream + drop(memory_stream); + assert_eq!(pool.reserved(), 0); + } +} diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index a6ea22db9651..92302ae00518 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -1387,15 +1387,21 @@ fn recursive_ctes() { select n + 1 FROM numbers WHERE N < 10 ) select * from numbers;"; - let err = logical_plan(sql).expect_err("query should have failed"); - assert_eq!( - "This feature is not implemented: Recursive CTEs are not enabled", - err.strip_backtrace() - ); + quick_test( + sql, + "Projection: numbers.n\ + \n SubqueryAlias: numbers\ + \n RecursiveQuery: is_distinct=false\ + \n Projection: Int64(1) AS n\ + \n EmptyRelation\ + \n Projection: numbers.n + Int64(1)\ + \n Filter: numbers.n < Int64(10)\ + \n TableScan: numbers", + ) } #[test] -fn recursive_ctes_enabled() { +fn recursive_ctes_disabled() { let sql = " WITH RECURSIVE numbers AS ( select 1 as n @@ -1404,28 +1410,20 @@ fn recursive_ctes_enabled() { ) select * from numbers;"; - // manually setting up test here so that we can enable recursive ctes + // manually setting up test here so that we can disable recursive ctes let mut context = MockContextProvider::default(); - context.options_mut().execution.enable_recursive_ctes = true; + context.options_mut().execution.enable_recursive_ctes = false; let planner = SqlToRel::new_with_options(&context, ParserOptions::default()); let result = DFParser::parse_sql_with_dialect(sql, &GenericDialect {}); let mut ast = result.unwrap(); - let plan = planner + let err = planner .statement_to_plan(ast.pop_front().unwrap()) - .expect("recursive cte plan creation failed"); - + .expect_err("query should have failed"); assert_eq!( - format!("{plan:?}"), - "Projection: numbers.n\ - \n SubqueryAlias: numbers\ - \n RecursiveQuery: is_distinct=false\ - \n Projection: Int64(1) AS n\ - \n EmptyRelation\ - \n Projection: numbers.n + Int64(1)\ - \n Filter: numbers.n < Int64(10)\ - \n TableScan: numbers" + "This feature is not implemented: Recursive CTEs are not enabled", + err.strip_backtrace() ); } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 8d7f7d96e42a..456ddef650ba 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -166,7 +166,7 @@ datafusion.execution.aggregate.scalar_update_factor 10 datafusion.execution.batch_size 8192 datafusion.execution.coalesce_batches true datafusion.execution.collect_statistics false -datafusion.execution.enable_recursive_ctes false +datafusion.execution.enable_recursive_ctes true datafusion.execution.listing_table_ignore_subdirectory true datafusion.execution.max_buffered_batches_per_output_file 2 datafusion.execution.meta_fetch_concurrency 32 @@ -244,7 +244,7 @@ datafusion.execution.aggregate.scalar_update_factor 10 Specifies the threshold f datafusion.execution.batch_size 8192 Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption datafusion.execution.coalesce_batches true When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting datafusion.execution.collect_statistics false Should DataFusion collect statistics after listing files -datafusion.execution.enable_recursive_ctes false Should DataFusion support recursive CTEs Defaults to false since this feature is a work in progress and may not behave as expected +datafusion.execution.enable_recursive_ctes true Should DataFusion support recursive CTEs datafusion.execution.listing_table_ignore_subdirectory true Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). datafusion.execution.max_buffered_batches_per_output_file 2 This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption datafusion.execution.meta_fetch_concurrency 32 Number of files to read in parallel when inferring schema and statistics diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 4b0fee063737..492be93caf0c 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -83,7 +83,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max | | datafusion.execution.max_buffered_batches_per_output_file | 2 | This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption | | datafusion.execution.listing_table_ignore_subdirectory | true | Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). | -| datafusion.execution.enable_recursive_ctes | false | Should DataFusion support recursive CTEs Defaults to false since this feature is a work in progress and may not behave as expected | +| datafusion.execution.enable_recursive_ctes | true | Should DataFusion support recursive CTEs | | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible |