diff --git a/Cargo.lock b/Cargo.lock index cc771331ebb3..50412bca5bca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2421,6 +2421,7 @@ dependencies = [ "half", "hashbrown 0.14.5", "indexmap 2.8.0", + "insta", "itertools 0.14.0", "log", "paste", diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 43f214607f9f..3bc41d2652d9 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -27,6 +27,7 @@ use arrow::array::BooleanArray; use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue}; use datafusion_expr_common::columnar_value::ColumnarValue; use datafusion_expr_common::interval_arithmetic::Interval; @@ -283,6 +284,55 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash { /// See the [`fmt_sql`] function for an example of printing `PhysicalExpr`s as SQL. /// fn fmt_sql(&self, f: &mut Formatter<'_>) -> fmt::Result; + + /// Take a snapshot of this `PhysicalExpr`, if it is dynamic. + /// + /// "Dynamic" in this case means containing references to structures that may change + /// during plan execution, such as hash tables. + /// + /// This method is used to capture the current state of `PhysicalExpr`s that may contain + /// dynamic references to other operators in order to serialize it over the wire + /// or treat it via downcast matching. + /// + /// You should not call this method directly as it does not handle recursion. + /// Instead use [`snapshot_physical_expr`] to handle recursion and capture the + /// full state of the `PhysicalExpr`. + /// + /// This is expected to return "simple" expressions that do not have mutable state + /// and are composed of DataFusion's built-in `PhysicalExpr` implementations. + /// Callers however should *not* assume anything about the returned expressions + /// since callers and implementers may not agree on what "simple" or "built-in" + /// means. + /// In other words, if you need to serialize a `PhysicalExpr` across the wire + /// you should call this method and then try to serialize the result, + /// but you should handle unknown or unexpected `PhysicalExpr` implementations gracefully + /// just as if you had not called this method at all. + /// + /// In particular, consider: + /// * A `PhysicalExpr` that references the current state of a `datafusion::physical_plan::TopK` + /// that is involved in a query with `SELECT * FROM t1 ORDER BY a LIMIT 10`. + /// This function may return something like `a >= 12`. + /// * A `PhysicalExpr` that references the current state of a `datafusion::physical_plan::joins::HashJoinExec` + /// from a query such as `SELECT * FROM t1 JOIN t2 ON t1.a = t2.b`. + /// This function may return something like `t2.b IN (1, 5, 7)`. + /// + /// A system or function that can only deal with a hardcoded set of `PhysicalExpr` implementations + /// or needs to serialize this state to bytes may not be able to handle these dynamic references. + /// In such cases, we should return a simplified version of the `PhysicalExpr` that does not + /// contain these dynamic references. + /// + /// Systems that implement remote execution of plans, e.g. serialize a portion of the query plan + /// and send it across the wire to a remote executor may want to call this method after + /// every batch on the source side and brodcast / update the current snaphot to the remote executor. + /// + /// Note for implementers: this method should *not* handle recursion. + /// Recursion is handled in [`snapshot_physical_expr`]. + fn snapshot(&self) -> Result>> { + // By default, we return None to indicate that this PhysicalExpr does not + // have any dynamic references or state. + // This is a safe default behavior. + Ok(None) + } } /// [`PhysicalExpr`] can't be constrained by [`Eq`] directly because it must remain object @@ -446,3 +496,30 @@ pub fn fmt_sql(expr: &dyn PhysicalExpr) -> impl Display + '_ { Wrapper { expr } } + +/// Take a snapshot of the given `PhysicalExpr` if it is dynamic. +/// +/// Take a snapshot of this `PhysicalExpr` if it is dynamic. +/// This is used to capture the current state of `PhysicalExpr`s that may contain +/// dynamic references to other operators in order to serialize it over the wire +/// or treat it via downcast matching. +/// +/// See the documentation of [`PhysicalExpr::snapshot`] for more details. +/// +/// # Returns +/// +/// Returns an `Option>` which is the snapshot of the +/// `PhysicalExpr` if it is dynamic. If the `PhysicalExpr` does not have +/// any dynamic references or state, it returns `None`. +pub fn snapshot_physical_expr( + expr: Arc, +) -> Result> { + expr.transform_up(|e| { + if let Some(snapshot) = e.snapshot()? { + Ok(Transformed::yes(snapshot)) + } else { + Ok(Transformed::no(Arc::clone(&e))) + } + }) + .data() +} diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index 97d028897b0b..47e3291e5cb4 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -57,6 +57,7 @@ petgraph = "0.7.1" arrow = { workspace = true, features = ["test_utils"] } criterion = { workspace = true } datafusion-functions = { workspace = true } +insta = { workspace = true } rand = { workspace = true } rstest = { workspace = true } diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs new file mode 100644 index 000000000000..c0a3285f0e78 --- /dev/null +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -0,0 +1,474 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + any::Any, + fmt::Display, + hash::Hash, + sync::{Arc, RwLock}, +}; + +use crate::PhysicalExpr; +use arrow::datatypes::{DataType, Schema}; +use datafusion_common::{ + tree_node::{Transformed, TransformedResult, TreeNode}, + Result, +}; +use datafusion_expr::ColumnarValue; +use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash}; + +/// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference to it. +#[derive(Debug)] +pub struct DynamicFilterPhysicalExpr { + /// The original children of this PhysicalExpr, if any. + /// This is necessary because the dynamic filter may be initialized with a placeholder (e.g. `lit(true)`) + /// and later remapped to the actual expressions that are being filtered. + /// But we need to know the children (e.g. columns referenced in the expression) ahead of time to evaluate the expression correctly. + children: Vec>, + /// If any of the children were remapped / modified (e.g. to adjust for projections) we need to keep track of the new children + /// so that when we update `current()` in subsequent iterations we can re-apply the replacements. + remapped_children: Option>>, + /// The source of dynamic filters. + inner: Arc>>, + /// For testing purposes track the data type and nullability to make sure they don't change. + /// If they do, there's a bug in the implementation. + /// But this can have overhead in production, so it's only included in our tests. + data_type: Arc>>, + nullable: Arc>>, +} + +impl Hash for DynamicFilterPhysicalExpr { + fn hash(&self, state: &mut H) { + let inner = self.current().expect("Failed to get current expression"); + inner.dyn_hash(state); + self.children.dyn_hash(state); + self.remapped_children.dyn_hash(state); + } +} + +impl PartialEq for DynamicFilterPhysicalExpr { + fn eq(&self, other: &Self) -> bool { + let inner = self.current().expect("Failed to get current expression"); + let our_children = self.remapped_children.as_ref().unwrap_or(&self.children); + let other_children = other.remapped_children.as_ref().unwrap_or(&other.children); + let other = other.current().expect("Failed to get current expression"); + inner.dyn_eq(other.as_any()) && our_children == other_children + } +} + +impl Eq for DynamicFilterPhysicalExpr {} + +impl Display for DynamicFilterPhysicalExpr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let inner = self.current().expect("Failed to get current expression"); + write!(f, "DynamicFilterPhysicalExpr [ {} ]", inner) + } +} + +impl DynamicFilterPhysicalExpr { + /// Create a new [`DynamicFilterPhysicalExpr`] + /// from an initial expression and a list of children. + /// The list of children is provided separately because + /// the initial expression may not have the same children. + /// For example, if the initial expression is just `true` + /// it will not reference any columns, but we may know that + /// we are going to replace this expression with a real one + /// that does reference certain columns. + /// In this case you **must** pass in the columns that will be + /// used in the final expression as children to this function + /// since DataFusion is generally not compatible with dynamic + /// *children* in expressions. + /// + /// To determine the children you can: + /// + /// - Use [`collect_columns`] to collect the columns from the expression. + /// - Use existing information, such as the sort columns in a `SortExec`. + /// + /// Generally the important bit is that the *leaf children that reference columns + /// do not change* since those will be used to determine what columns need to read or projected + /// when evaluating the expression. + /// + /// [`collect_columns`]: crate::utils::collect_columns + #[allow(dead_code)] // Only used in tests for now + pub fn new( + children: Vec>, + inner: Arc, + ) -> Self { + Self { + children, + remapped_children: None, // Initially no remapped children + inner: Arc::new(RwLock::new(inner)), + data_type: Arc::new(RwLock::new(None)), + nullable: Arc::new(RwLock::new(None)), + } + } + + fn remap_children( + children: &[Arc], + remapped_children: Option<&Vec>>, + expr: Arc, + ) -> Result> { + if let Some(remapped_children) = remapped_children { + // Remap the children to the new children + // of the expression. + expr.transform_up(|child| { + // Check if this is any of our original children + if let Some(pos) = + children.iter().position(|c| c.as_ref() == child.as_ref()) + { + // If so, remap it to the current children + // of the expression. + let new_child = Arc::clone(&remapped_children[pos]); + Ok(Transformed::yes(new_child)) + } else { + // Otherwise, just return the expression + Ok(Transformed::no(child)) + } + }) + .data() + } else { + // If we don't have any remapped children, just return the expression + Ok(Arc::clone(&expr)) + } + } + + /// Get the current expression. + /// This will return the current expression with any children + /// remapped to match calls to [`PhysicalExpr::with_new_children`]. + pub fn current(&self) -> Result> { + let inner = self + .inner + .read() + .map_err(|_| { + datafusion_common::DataFusionError::Execution( + "Failed to acquire read lock for inner".to_string(), + ) + })? + .clone(); + let inner = + Self::remap_children(&self.children, self.remapped_children.as_ref(), inner)?; + Ok(inner) + } + + /// Update the current expression. + /// Any children of this expression must be a subset of the original children + /// passed to the constructor. + /// This should be called e.g.: + /// - When we've computed the probe side's hash table in a HashJoinExec + /// - After every batch is processed if we update the TopK heap in a SortExec using a TopK approach. + #[allow(dead_code)] // Only used in tests for now + pub fn update(&self, new_expr: Arc) -> Result<()> { + let mut current = self.inner.write().map_err(|_| { + datafusion_common::DataFusionError::Execution( + "Failed to acquire write lock for inner".to_string(), + ) + })?; + // Remap the children of the new expression to match the original children + // We still do this again in `current()` but doing it preventively here + // reduces the work needed in some cases if `current()` is called multiple times + // and the same externally facing `PhysicalExpr` is used for both `with_new_children` and `update()`.` + let new_expr = Self::remap_children( + &self.children, + self.remapped_children.as_ref(), + new_expr, + )?; + *current = new_expr; + Ok(()) + } +} + +impl PhysicalExpr for DynamicFilterPhysicalExpr { + fn as_any(&self) -> &dyn Any { + self + } + + fn children(&self) -> Vec<&Arc> { + self.remapped_children + .as_ref() + .unwrap_or(&self.children) + .iter() + .collect() + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(Self { + children: self.children.clone(), + remapped_children: Some(children), + inner: Arc::clone(&self.inner), + data_type: Arc::clone(&self.data_type), + nullable: Arc::clone(&self.nullable), + })) + } + + fn data_type(&self, input_schema: &Schema) -> Result { + let res = self.current()?.data_type(input_schema)?; + #[cfg(test)] + { + use datafusion_common::internal_err; + // Check if the data type has changed. + let mut data_type_lock = self + .data_type + .write() + .expect("Failed to acquire write lock for data_type"); + if let Some(existing) = &*data_type_lock { + if existing != &res { + // If the data type has changed, we have a bug. + return internal_err!( + "DynamicFilterPhysicalExpr data type has changed unexpectedly. \ + Expected: {existing:?}, Actual: {res:?}" + ); + } + } else { + *data_type_lock = Some(res.clone()); + } + } + Ok(res) + } + + fn nullable(&self, input_schema: &Schema) -> Result { + let res = self.current()?.nullable(input_schema)?; + #[cfg(test)] + { + use datafusion_common::internal_err; + // Check if the nullability has changed. + let mut nullable_lock = self + .nullable + .write() + .expect("Failed to acquire write lock for nullable"); + if let Some(existing) = *nullable_lock { + if existing != res { + // If the nullability has changed, we have a bug. + return internal_err!( + "DynamicFilterPhysicalExpr nullability has changed unexpectedly. \ + Expected: {existing}, Actual: {res}" + ); + } + } else { + *nullable_lock = Some(res); + } + } + Ok(res) + } + + fn evaluate( + &self, + batch: &arrow::record_batch::RecordBatch, + ) -> Result { + let current = self.current()?; + #[cfg(test)] + { + // Ensure that we are not evaluating after the expression has changed. + let schema = batch.schema(); + self.nullable(&schema)?; + self.data_type(&schema)?; + }; + current.evaluate(batch) + } + + fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let inner = self.current().map_err(|_| std::fmt::Error)?; + inner.fmt_sql(f) + } + + fn snapshot(&self) -> Result>> { + // Return the current expression as a snapshot. + Ok(Some(self.current()?)) + } +} + +#[cfg(test)] +mod test { + use crate::{ + expressions::{col, lit, BinaryExpr}, + utils::reassign_predicate_columns, + }; + use arrow::{ + array::RecordBatch, + datatypes::{DataType, Field, Schema}, + }; + use datafusion_common::ScalarValue; + + use super::*; + + #[test] + fn test_remap_children() { + let table_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + let expr = Arc::new(BinaryExpr::new( + col("a", &table_schema).unwrap(), + datafusion_expr::Operator::Eq, + lit(42) as Arc, + )); + let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new( + vec![col("a", &table_schema).unwrap()], + expr as Arc, + )); + // Simulate two `ParquetSource` files with different filter schemas + // Both of these should hit the same inner `PhysicalExpr` even after `update()` is called + // and be able to remap children independently. + let filter_schema_1 = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + let filter_schema_2 = Arc::new(Schema::new(vec![ + Field::new("b", DataType::Int32, false), + Field::new("a", DataType::Int32, false), + ])); + // Each ParquetExec calls `with_new_children` on the DynamicFilterPhysicalExpr + // and remaps the children to the file schema. + let dynamic_filter_1 = reassign_predicate_columns( + Arc::clone(&dynamic_filter) as Arc, + &filter_schema_1, + false, + ) + .unwrap(); + let snap = dynamic_filter_1.snapshot().unwrap().unwrap(); + insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) }, fail_on_overflow: false }"#); + let dynamic_filter_2 = reassign_predicate_columns( + Arc::clone(&dynamic_filter) as Arc, + &filter_schema_2, + false, + ) + .unwrap(); + let snap = dynamic_filter_2.snapshot().unwrap().unwrap(); + insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 1 }, op: Eq, right: Literal { value: Int32(42) }, fail_on_overflow: false }"#); + // Both filters allow evaluating the same expression + let batch_1 = RecordBatch::try_new( + Arc::clone(&filter_schema_1), + vec![ + // a + ScalarValue::Int32(Some(42)).to_array_of_size(1).unwrap(), + // b + ScalarValue::Int32(Some(43)).to_array_of_size(1).unwrap(), + ], + ) + .unwrap(); + let batch_2 = RecordBatch::try_new( + Arc::clone(&filter_schema_2), + vec![ + // b + ScalarValue::Int32(Some(43)).to_array_of_size(1).unwrap(), + // a + ScalarValue::Int32(Some(42)).to_array_of_size(1).unwrap(), + ], + ) + .unwrap(); + // Evaluate the expression on both batches + let result_1 = dynamic_filter_1.evaluate(&batch_1).unwrap(); + let result_2 = dynamic_filter_2.evaluate(&batch_2).unwrap(); + // Check that the results are the same + let ColumnarValue::Array(arr_1) = result_1 else { + panic!("Expected ColumnarValue::Array"); + }; + let ColumnarValue::Array(arr_2) = result_2 else { + panic!("Expected ColumnarValue::Array"); + }; + assert!(arr_1.eq(&arr_2)); + let expected = ScalarValue::Boolean(Some(true)) + .to_array_of_size(1) + .unwrap(); + assert!(arr_1.eq(&expected)); + // Now lets update the expression + // Note that we update the *original* expression and that should be reflected in both the derived expressions + let new_expr = Arc::new(BinaryExpr::new( + col("a", &table_schema).unwrap(), + datafusion_expr::Operator::Gt, + lit(43) as Arc, + )); + dynamic_filter + .update(Arc::clone(&new_expr) as Arc) + .expect("Failed to update expression"); + // Now we should be able to evaluate the new expression on both batches + let result_1 = dynamic_filter_1.evaluate(&batch_1).unwrap(); + let result_2 = dynamic_filter_2.evaluate(&batch_2).unwrap(); + // Check that the results are the same + let ColumnarValue::Array(arr_1) = result_1 else { + panic!("Expected ColumnarValue::Array"); + }; + let ColumnarValue::Array(arr_2) = result_2 else { + panic!("Expected ColumnarValue::Array"); + }; + assert!(arr_1.eq(&arr_2)); + let expected = ScalarValue::Boolean(Some(false)) + .to_array_of_size(1) + .unwrap(); + assert!(arr_1.eq(&expected)); + } + + #[test] + fn test_snapshot() { + let expr = lit(42) as Arc; + let dynamic_filter = DynamicFilterPhysicalExpr::new(vec![], Arc::clone(&expr)); + + // Take a snapshot of the current expression + let snapshot = dynamic_filter.snapshot().unwrap(); + assert_eq!(snapshot, Some(expr)); + + // Update the current expression + let new_expr = lit(100) as Arc; + dynamic_filter.update(Arc::clone(&new_expr)).unwrap(); + // Take another snapshot + let snapshot = dynamic_filter.snapshot().unwrap(); + assert_eq!(snapshot, Some(new_expr)); + } + + #[test] + fn test_dynamic_filter_physical_expr_misbehaves_data_type_nullable() { + let dynamic_filter = + DynamicFilterPhysicalExpr::new(vec![], lit(42) as Arc); + + // First call to data_type and nullable should set the initial values. + let initial_data_type = dynamic_filter.data_type(&Schema::empty()).unwrap(); + let initial_nullable = dynamic_filter.nullable(&Schema::empty()).unwrap(); + + // Call again and expect no change. + let second_data_type = dynamic_filter.data_type(&Schema::empty()).unwrap(); + let second_nullable = dynamic_filter.nullable(&Schema::empty()).unwrap(); + assert_eq!( + initial_data_type, second_data_type, + "Data type should not change on second call." + ); + assert_eq!( + initial_nullable, second_nullable, + "Nullability should not change on second call." + ); + + // Now change the current expression to something else. + dynamic_filter + .update(lit(ScalarValue::Utf8(None)) as Arc) + .expect("Failed to update expression"); + // Check that we error if we call data_type, nullable or evaluate after changing the expression. + assert!( + dynamic_filter.data_type(&Schema::empty()).is_err(), + "Expected err when data_type is called after changing the expression." + ); + assert!( + dynamic_filter.nullable(&Schema::empty()).is_err(), + "Expected err when nullable is called after changing the expression." + ); + let batch = RecordBatch::new_empty(Arc::new(Schema::empty())); + assert!( + dynamic_filter.evaluate(&batch).is_err(), + "Expected err when evaluate is called after changing the expression." + ); + } +} diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index f00b49f50314..d77207fbbcd7 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -22,6 +22,7 @@ mod binary; mod case; mod cast; mod column; +mod dynamic_filters; mod in_list; mod is_not_null; mod is_null; diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 93ced2eb628d..9f795c81fa48 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -68,7 +68,7 @@ pub use planner::{create_physical_expr, create_physical_exprs}; pub use scalar_function::ScalarFunctionExpr; pub use datafusion_physical_expr_common::utils::reverse_order_bys; -pub use utils::split_conjunction; +pub use utils::{conjunction, conjunction_opt, split_conjunction}; // For backwards compatibility pub mod tree_node { diff --git a/datafusion/physical-expr/src/utils/mod.rs b/datafusion/physical-expr/src/utils/mod.rs index 7e4c7f0e10ba..b4d0758fd2e8 100644 --- a/datafusion/physical-expr/src/utils/mod.rs +++ b/datafusion/physical-expr/src/utils/mod.rs @@ -47,6 +47,31 @@ pub fn split_conjunction( split_impl(Operator::And, predicate, vec![]) } +/// Create a conjunction of the given predicates. +/// If the input is empty, return a literal true. +/// If the input contains a single predicate, return the predicate. +/// Otherwise, return a conjunction of the predicates (e.g. `a AND b AND c`). +pub fn conjunction( + predicates: impl IntoIterator>, +) -> Arc { + conjunction_opt(predicates).unwrap_or_else(|| crate::expressions::lit(true)) +} + +/// Create a conjunction of the given predicates. +/// If the input is empty or the return None. +/// If the input contains a single predicate, return Some(predicate). +/// Otherwise, return a Some(..) of a conjunction of the predicates (e.g. `Some(a AND b AND c)`). +pub fn conjunction_opt( + predicates: impl IntoIterator>, +) -> Option> { + predicates + .into_iter() + .fold(None, |acc, predicate| match acc { + None => Some(predicate), + Some(acc) => Some(Arc::new(BinaryExpr::new(acc, Operator::And, predicate))), + }) +} + /// Assume the predicate is in the form of DNF, split the predicate to a Vec of PhysicalExprs. /// /// For example, split "a1 = a2 OR b1 <= b2 OR c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"] diff --git a/datafusion/physical-optimizer/src/pruning.rs b/datafusion/physical-optimizer/src/pruning.rs index 42d08da20bfd..1dd168f18167 100644 --- a/datafusion/physical-optimizer/src/pruning.rs +++ b/datafusion/physical-optimizer/src/pruning.rs @@ -41,6 +41,7 @@ use datafusion_common::{Column, DFSchema}; use datafusion_expr_common::operator::Operator; use datafusion_physical_expr::utils::{collect_columns, Guarantee, LiteralGuarantee}; use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef}; +use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr; use datafusion_physical_plan::{ColumnarValue, PhysicalExpr}; /// A source of runtime statistical information to [`PruningPredicate`]s. @@ -527,6 +528,9 @@ impl PruningPredicate { /// See the struct level documentation on [`PruningPredicate`] for more /// details. pub fn try_new(expr: Arc, schema: SchemaRef) -> Result { + // Get a (simpler) snapshot of the physical expr here to use with `PruningPredicate` + // which does not handle dynamic exprs in general + let expr = snapshot_physical_expr(expr)?; let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _; // build predicate expression once diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index f6546ff3f2a6..af7800d6febe 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -22,6 +22,7 @@ use datafusion::datasource::file_format::parquet::ParquetSink; use datafusion::datasource::physical_plan::FileSink; use datafusion::physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr}; +use datafusion::physical_expr_common::physical_expr::snapshot_physical_expr; use datafusion::physical_plan::expressions::{ BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, @@ -210,6 +211,9 @@ pub fn serialize_physical_expr( value: &Arc, codec: &dyn PhysicalExtensionCodec, ) -> Result { + // Snapshot the expr in case it has dynamic predicate state so + // it can be serialized + let value = snapshot_physical_expr(Arc::clone(value))?; let expr = value.as_any(); if let Some(expr) = expr.downcast_ref::() { @@ -368,7 +372,7 @@ pub fn serialize_physical_expr( }) } else { let mut buf: Vec = vec![]; - match codec.try_encode_expr(value, &mut buf) { + match codec.try_encode_expr(&value, &mut buf) { Ok(_) => { let inputs: Vec = value .children()