From 16dd2c8a3b972752a6fc6c2da1b6d938018d3aeb Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Wed, 6 Nov 2024 18:16:04 -0500 Subject: [PATCH 1/2] move coalesce batches --- datafusion/core/src/physical_optimizer/mod.rs | 2 +- .../src}/coalesce_batches.rs | 23 ++++++++++--------- datafusion/physical-optimizer/src/lib.rs | 1 + 3 files changed, 14 insertions(+), 12 deletions(-) rename datafusion/{core/src/physical_optimizer => physical-optimizer/src}/coalesce_batches.rs (85%) diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs index efdd3148d03f..af2c8d9cbdf1 100644 --- a/datafusion/core/src/physical_optimizer/mod.rs +++ b/datafusion/core/src/physical_optimizer/mod.rs @@ -21,7 +21,7 @@ //! "Repartition" or "Sortedness" //! //! [`ExecutionPlan`]: crate::physical_plan::ExecutionPlan -pub mod coalesce_batches; + pub mod enforce_distribution; pub mod enforce_sorting; pub mod join_selection; diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/physical-optimizer/src/coalesce_batches.rs similarity index 85% rename from datafusion/core/src/physical_optimizer/coalesce_batches.rs rename to datafusion/physical-optimizer/src/coalesce_batches.rs index 2f834813ede9..8ed44ba578ca 100644 --- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs +++ b/datafusion/physical-optimizer/src/coalesce_batches.rs @@ -18,19 +18,20 @@ //! CoalesceBatches optimizer that groups batches together rows //! in bigger batches to avoid overhead with small batches +use crate::PhysicalOptimizerRule; + use std::sync::Arc; -use crate::{ - config::ConfigOptions, - error::Result, - physical_plan::{ - coalesce_batches::CoalesceBatchesExec, filter::FilterExec, joins::HashJoinExec, - repartition::RepartitionExec, Partitioning, - }, -}; +use datafusion_common::config::ConfigOptions; +use datafusion_common::error::Result; +use datafusion_physical_expr::Partitioning; +use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec; +use datafusion_physical_plan::filter::FilterExec; +use datafusion_physical_plan::joins::HashJoinExec; +use datafusion_physical_plan::repartition::RepartitionExec; +use datafusion_physical_plan::ExecutionPlan; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_physical_optimizer::PhysicalOptimizerRule; /// Optimizer rule that introduces CoalesceBatchesExec to avoid overhead with small batches that /// are produced by highly selective filters @@ -46,9 +47,9 @@ impl CoalesceBatches { impl PhysicalOptimizerRule for CoalesceBatches { fn optimize( &self, - plan: Arc, + plan: Arc, config: &ConfigOptions, - ) -> Result> { + ) -> Result> { if !config.execution.coalesce_batches { return Ok(plan); } diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index 439f1dc873d1..22f06f3d29fc 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -18,6 +18,7 @@ #![deny(clippy::clone_on_ref_ptr)] pub mod aggregate_statistics; +pub mod coalesce_batches; pub mod combine_partial_final_agg; pub mod limit_pushdown; pub mod limited_distinct_aggregation; From d06cce8d810afde1d255a9bf8a314cbd36d8fc24 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Thu, 7 Nov 2024 13:15:32 -0500 Subject: [PATCH 2/2] small fix --- datafusion/physical-optimizer/src/coalesce_batches.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-optimizer/src/coalesce_batches.rs b/datafusion/physical-optimizer/src/coalesce_batches.rs index 8ed44ba578ca..5cf2c877c61a 100644 --- a/datafusion/physical-optimizer/src/coalesce_batches.rs +++ b/datafusion/physical-optimizer/src/coalesce_batches.rs @@ -25,11 +25,10 @@ use std::sync::Arc; use datafusion_common::config::ConfigOptions; use datafusion_common::error::Result; use datafusion_physical_expr::Partitioning; -use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec; -use datafusion_physical_plan::filter::FilterExec; -use datafusion_physical_plan::joins::HashJoinExec; -use datafusion_physical_plan::repartition::RepartitionExec; -use datafusion_physical_plan::ExecutionPlan; +use datafusion_physical_plan::{ + coalesce_batches::CoalesceBatchesExec, filter::FilterExec, joins::HashJoinExec, + repartition::RepartitionExec, ExecutionPlan, +}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};