Skip to content

Commit 7c07948

Browse files
Complete moving PhysicalOptimizer into datafusion-physical-optimizer (#14300)
* consolidate tests and utils * remove core/po * fix docs * Update mod.rs * Update lib.rs
1 parent 4dc97b2 commit 7c07948

File tree

20 files changed

+1674
-1695
lines changed

20 files changed

+1674
-1695
lines changed

datafusion-examples/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ datafusion-expr = { workspace = true }
6666
datafusion-functions-window-common = { workspace = true }
6767
datafusion-optimizer = { workspace = true, default-features = true }
6868
datafusion-physical-expr = { workspace = true, default-features = true }
69+
datafusion-physical-optimizer = { workspace = true, default-features = true }
6970
datafusion-proto = { workspace = true }
7071
datafusion-sql = { workspace = true }
7172
env_logger = { workspace = true }

datafusion-examples/examples/advanced_parquet_index.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,16 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::any::Any;
19+
use std::collections::{HashMap, HashSet};
20+
use std::fs::File;
21+
use std::ops::Range;
22+
use std::path::{Path, PathBuf};
23+
use std::sync::atomic::{AtomicBool, Ordering};
24+
use std::sync::Arc;
25+
1826
use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
1927
use arrow_schema::SchemaRef;
20-
use async_trait::async_trait;
21-
use bytes::Bytes;
2228
use datafusion::catalog::Session;
2329
use datafusion::datasource::listing::PartitionedFile;
2430
use datafusion::datasource::physical_plan::parquet::{
@@ -38,7 +44,6 @@ use datafusion::parquet::file::metadata::ParquetMetaData;
3844
use datafusion::parquet::file::properties::{EnabledStatistics, WriterProperties};
3945
use datafusion::parquet::schema::types::ColumnPath;
4046
use datafusion::physical_expr::PhysicalExpr;
41-
use datafusion::physical_optimizer::pruning::PruningPredicate;
4247
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
4348
use datafusion::physical_plan::ExecutionPlan;
4449
use datafusion::prelude::*;
@@ -48,16 +53,13 @@ use datafusion_common::{
4853
use datafusion_expr::utils::conjunction;
4954
use datafusion_expr::{TableProviderFilterPushDown, TableType};
5055
use datafusion_physical_expr::utils::{Guarantee, LiteralGuarantee};
56+
use datafusion_physical_optimizer::pruning::PruningPredicate;
57+
58+
use async_trait::async_trait;
59+
use bytes::Bytes;
5160
use futures::future::BoxFuture;
5261
use futures::FutureExt;
5362
use object_store::ObjectStore;
54-
use std::any::Any;
55-
use std::collections::{HashMap, HashSet};
56-
use std::fs::File;
57-
use std::ops::Range;
58-
use std::path::{Path, PathBuf};
59-
use std::sync::atomic::{AtomicBool, Ordering};
60-
use std::sync::Arc;
6163
use tempfile::TempDir;
6264
use url::Url;
6365

datafusion-examples/examples/parquet_index.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,24 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::any::Any;
19+
use std::collections::HashSet;
20+
use std::fmt::Display;
21+
use std::fs::{self, DirEntry, File};
22+
use std::ops::Range;
23+
use std::path::{Path, PathBuf};
24+
use std::sync::{
25+
atomic::{AtomicUsize, Ordering},
26+
Arc,
27+
};
28+
1829
use arrow::array::{
1930
Array, ArrayRef, AsArray, BooleanArray, Int32Array, RecordBatch, StringArray,
2031
UInt64Array,
2132
};
2233
use arrow::datatypes::Int32Type;
2334
use arrow::util::pretty::pretty_format_batches;
2435
use arrow_schema::SchemaRef;
25-
use async_trait::async_trait;
2636
use datafusion::catalog::Session;
2737
use datafusion::datasource::listing::PartitionedFile;
2838
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
@@ -32,24 +42,16 @@ use datafusion::parquet::arrow::arrow_reader::statistics::StatisticsConverter;
3242
use datafusion::parquet::arrow::{
3343
arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter,
3444
};
35-
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
3645
use datafusion::physical_plan::ExecutionPlan;
3746
use datafusion::prelude::*;
3847
use datafusion_common::{
3948
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue,
4049
};
4150
use datafusion_expr::{utils::conjunction, TableProviderFilterPushDown, TableType};
4251
use datafusion_physical_expr::PhysicalExpr;
43-
use std::any::Any;
44-
use std::collections::HashSet;
45-
use std::fmt::Display;
46-
use std::fs::{self, DirEntry, File};
47-
use std::ops::Range;
48-
use std::path::{Path, PathBuf};
49-
use std::sync::{
50-
atomic::{AtomicUsize, Ordering},
51-
Arc,
52-
};
52+
use datafusion_physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
53+
54+
use async_trait::async_trait;
5355
use tempfile::TempDir;
5456
use url::Url;
5557

datafusion-examples/examples/pruning.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,16 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::collections::HashSet;
19+
use std::sync::Arc;
20+
1821
use arrow::array::{ArrayRef, BooleanArray, Int32Array};
1922
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
2023
use datafusion::common::{DFSchema, ScalarValue};
2124
use datafusion::execution::context::ExecutionProps;
2225
use datafusion::physical_expr::create_physical_expr;
23-
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
2426
use datafusion::prelude::*;
25-
use std::collections::HashSet;
26-
use std::sync::Arc;
27+
use datafusion_physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
2728

2829
/// This example shows how to use DataFusion's `PruningPredicate` to prove
2930
/// filter expressions can never be true based on statistics such as min/max

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,15 @@
1717

1818
//! [`ParquetExec`] Execution plan for reading Parquet files
1919
20+
mod access_plan;
21+
mod metrics;
22+
mod opener;
23+
mod page_filter;
24+
mod reader;
25+
mod row_filter;
26+
mod row_group_filter;
27+
mod writer;
28+
2029
use std::any::Any;
2130
use std::fmt::Debug;
2231
use std::sync::Arc;
@@ -27,45 +36,35 @@ use crate::datasource::physical_plan::{
2736
parquet::page_filter::PagePruningAccessPlanFilter, DisplayAs, FileGroupPartitioner,
2837
FileScanConfig,
2938
};
39+
use crate::datasource::schema_adapter::{
40+
DefaultSchemaAdapterFactory, SchemaAdapterFactory,
41+
};
3042
use crate::{
3143
config::{ConfigOptions, TableParquetOptions},
3244
error::Result,
3345
execution::context::TaskContext,
34-
physical_optimizer::pruning::PruningPredicate,
3546
physical_plan::{
3647
metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
3748
DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
3849
SendableRecordBatchStream, Statistics,
3950
},
4051
};
4152

53+
pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
4254
use arrow::datatypes::SchemaRef;
4355
use datafusion_common::Constraints;
4456
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr};
57+
use datafusion_physical_optimizer::pruning::PruningPredicate;
4558
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
46-
47-
use itertools::Itertools;
48-
use log::debug;
49-
50-
mod access_plan;
51-
mod metrics;
52-
mod opener;
53-
mod page_filter;
54-
mod reader;
55-
mod row_filter;
56-
mod row_group_filter;
57-
mod writer;
58-
59-
use crate::datasource::schema_adapter::{
60-
DefaultSchemaAdapterFactory, SchemaAdapterFactory,
61-
};
62-
pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
6359
pub use metrics::ParquetFileMetrics;
6460
use opener::ParquetOpener;
6561
pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory};
6662
pub use row_filter::can_expr_be_pushed_down_with_schemas;
6763
pub use writer::plan_to_parquet;
6864

65+
use itertools::Itertools;
66+
use log::debug;
67+
6968
/// Execution plan for reading one or more Parquet files.
7069
///
7170
/// ```text

datafusion/core/src/datasource/physical_plan/parquet/opener.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
//! [`ParquetOpener`] for opening Parquet files
1919
20+
use std::sync::Arc;
21+
2022
use crate::datasource::file_format::{
2123
coerce_file_schema_to_string_type, coerce_file_schema_to_view_type,
2224
};
@@ -29,17 +31,18 @@ use crate::datasource::physical_plan::{
2931
FileMeta, FileOpenFuture, FileOpener, ParquetFileMetrics, ParquetFileReaderFactory,
3032
};
3133
use crate::datasource::schema_adapter::SchemaAdapterFactory;
32-
use crate::physical_optimizer::pruning::PruningPredicate;
34+
3335
use arrow_schema::{ArrowError, SchemaRef};
3436
use datafusion_common::{exec_err, Result};
3537
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
38+
use datafusion_physical_optimizer::pruning::PruningPredicate;
3639
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
40+
3741
use futures::{StreamExt, TryStreamExt};
3842
use log::debug;
3943
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
4044
use parquet::arrow::async_reader::AsyncFileReader;
4145
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
42-
use std::sync::Arc;
4346

4447
/// Implements [`FileOpener`] for a parquet file
4548
pub(super) struct ParquetOpener {

datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,19 @@
1717

1818
//! Contains code to filter entire pages
1919
20+
use std::collections::HashSet;
21+
use std::sync::Arc;
22+
2023
use super::metrics::ParquetFileMetrics;
2124
use crate::datasource::physical_plan::parquet::ParquetAccessPlan;
22-
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
25+
2326
use arrow::array::BooleanArray;
2427
use arrow::{array::ArrayRef, datatypes::SchemaRef};
2528
use arrow_schema::Schema;
2629
use datafusion_common::ScalarValue;
2730
use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
31+
use datafusion_physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
32+
2833
use log::{debug, trace};
2934
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
3035
use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex};
@@ -34,8 +39,6 @@ use parquet::{
3439
arrow::arrow_reader::{RowSelection, RowSelector},
3540
file::metadata::{ParquetMetaData, RowGroupMetaData},
3641
};
37-
use std::collections::HashSet;
38-
use std::sync::Arc;
3942

4043
/// Filters a [`ParquetAccessPlan`] based on the [Parquet PageIndex], if present
4144
///

datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,17 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::collections::{HashMap, HashSet};
19+
use std::sync::Arc;
20+
21+
use super::{ParquetAccessPlan, ParquetFileMetrics};
1822
use crate::datasource::listing::FileRange;
19-
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
23+
2024
use arrow::{array::ArrayRef, datatypes::Schema};
2125
use arrow_array::BooleanArray;
2226
use datafusion_common::{Column, Result, ScalarValue};
27+
use datafusion_physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
28+
2329
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
2430
use parquet::arrow::parquet_column;
2531
use parquet::basic::Type;
@@ -30,10 +36,6 @@ use parquet::{
3036
bloom_filter::Sbbf,
3137
file::metadata::RowGroupMetaData,
3238
};
33-
use std::collections::{HashMap, HashSet};
34-
use std::sync::Arc;
35-
36-
use super::{ParquetAccessPlan, ParquetFileMetrics};
3739

3840
/// Reduces the [`ParquetAccessPlan`] based on row group level metadata.
3941
///

datafusion/core/src/execution/session_state.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@
1717

1818
//! [`SessionState`]: information required to run queries in a session
1919
20+
use std::any::Any;
21+
use std::collections::hash_map::Entry;
22+
use std::collections::{HashMap, HashSet};
23+
use std::fmt::Debug;
24+
use std::sync::Arc;
25+
2026
use crate::catalog::{CatalogProviderList, SchemaProvider, TableProviderFactory};
2127
use crate::catalog_common::information_schema::{
2228
InformationSchemaProvider, INFORMATION_SCHEMA,
@@ -27,11 +33,9 @@ use crate::datasource::file_format::{format_as_file_type, FileFormatFactory};
2733
use crate::datasource::provider_as_source;
2834
use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner};
2935
use crate::execution::SessionStateDefaults;
30-
use crate::physical_optimizer::optimizer::PhysicalOptimizer;
3136
use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
37+
3238
use arrow_schema::{DataType, SchemaRef};
33-
use async_trait::async_trait;
34-
use chrono::{DateTime, Utc};
3539
use datafusion_catalog::{Session, TableFunction, TableFunctionImpl};
3640
use datafusion_common::alias::AliasGenerator;
3741
use datafusion_common::config::{ConfigExtension, ConfigOptions, TableOptions};
@@ -61,20 +65,19 @@ use datafusion_optimizer::{
6165
};
6266
use datafusion_physical_expr::create_physical_expr;
6367
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
68+
use datafusion_physical_optimizer::optimizer::PhysicalOptimizer;
6469
use datafusion_physical_optimizer::PhysicalOptimizerRule;
6570
use datafusion_physical_plan::ExecutionPlan;
6671
use datafusion_sql::parser::{DFParser, Statement};
6772
use datafusion_sql::planner::{ContextProvider, ParserOptions, PlannerContext, SqlToRel};
73+
74+
use async_trait::async_trait;
75+
use chrono::{DateTime, Utc};
6876
use itertools::Itertools;
6977
use log::{debug, info};
7078
use object_store::ObjectStore;
7179
use sqlparser::ast::{Expr as SQLExpr, ExprWithAlias as SQLExprWithAlias};
7280
use sqlparser::dialect::dialect_from_str;
73-
use std::any::Any;
74-
use std::collections::hash_map::Entry;
75-
use std::collections::{HashMap, HashSet};
76-
use std::fmt::Debug;
77-
use std::sync::Arc;
7881
use url::Url;
7982
use uuid::Uuid;
8083

datafusion/core/src/lib.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@
207207
//! [`QueryPlanner`]: execution::context::QueryPlanner
208208
//! [`OptimizerRule`]: datafusion_optimizer::optimizer::OptimizerRule
209209
//! [`AnalyzerRule`]: datafusion_optimizer::analyzer::AnalyzerRule
210-
//! [`PhysicalOptimizerRule`]: crate::physical_optimizer::PhysicalOptimizerRule
210+
//! [`PhysicalOptimizerRule`]: datafusion_physical_optimizer::PhysicalOptimizerRule
211211
//!
212212
//! ## Query Planning and Execution Overview
213213
//!
@@ -349,7 +349,7 @@
349349
//! filtering can never be `true` using additional statistical information.
350350
//!
351351
//! [cp_solver]: crate::physical_expr::intervals::cp_solver
352-
//! [`PruningPredicate`]: crate::physical_optimizer::pruning::PruningPredicate
352+
//! [`PruningPredicate`]: datafusion_physical_optimizer::pruning::PruningPredicate
353353
//! [`PhysicalExpr`]: crate::physical_plan::PhysicalExpr
354354
//!
355355
//! ## Execution
@@ -659,7 +659,7 @@
659659
//! [`OptimizerRule`]: optimizer::optimizer::OptimizerRule
660660
//! [`ExecutionPlan`]: physical_plan::ExecutionPlan
661661
//! [`PhysicalPlanner`]: physical_planner::PhysicalPlanner
662-
//! [`PhysicalOptimizerRule`]: datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule
662+
//! [`PhysicalOptimizerRule`]: datafusion_physical_optimizer::PhysicalOptimizerRule
663663
//! [`Schema`]: arrow::datatypes::Schema
664664
//! [`PhysicalExpr`]: physical_plan::PhysicalExpr
665665
//! [`RecordBatch`]: arrow::record_batch::RecordBatch
@@ -677,7 +677,6 @@ pub mod dataframe;
677677
pub mod datasource;
678678
pub mod error;
679679
pub mod execution;
680-
pub mod physical_optimizer;
681680
pub mod physical_planner;
682681
pub mod prelude;
683682
pub mod scalar;

0 commit comments

Comments
 (0)