Skip to content

Introduce DynamicFilterSource and DynamicPhysicalExpr #15568

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Apr 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 77 additions & 0 deletions datafusion/physical-expr-common/src/physical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use arrow::array::BooleanArray;
use arrow::compute::filter_record_batch;
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue};
use datafusion_expr_common::columnar_value::ColumnarValue;
use datafusion_expr_common::interval_arithmetic::Interval;
Expand Down Expand Up @@ -283,6 +284,55 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash {
/// See the [`fmt_sql`] function for an example of printing `PhysicalExpr`s as SQL.
///
fn fmt_sql(&self, f: &mut Formatter<'_>) -> fmt::Result;

/// Take a snapshot of this `PhysicalExpr`, if it is dynamic.
///
/// "Dynamic" in this case means containing references to structures that may change
/// during plan execution, such as hash tables.
///
/// This method is used to capture the current state of `PhysicalExpr`s that may contain
/// dynamic references to other operators in order to serialize it over the wire
/// or treat it via downcast matching.
///
/// You should not call this method directly as it does not handle recursion.
/// Instead use [`snapshot_physical_expr`] to handle recursion and capture the
/// full state of the `PhysicalExpr`.
///
/// This is expected to return "simple" expressions that do not have mutable state
/// and are composed of DataFusion's built-in `PhysicalExpr` implementations.
/// Callers however should *not* assume anything about the returned expressions
/// since callers and implementers may not agree on what "simple" or "built-in"
/// means.
/// In other words, if you need to serialize a `PhysicalExpr` across the wire
/// you should call this method and then try to serialize the result,
/// but you should handle unknown or unexpected `PhysicalExpr` implementations gracefully
/// just as if you had not called this method at all.
///
/// In particular, consider:
/// * A `PhysicalExpr` that references the current state of a `datafusion::physical_plan::TopK`
/// that is involved in a query with `SELECT * FROM t1 ORDER BY a LIMIT 10`.
/// This function may return something like `a >= 12`.
/// * A `PhysicalExpr` that references the current state of a `datafusion::physical_plan::joins::HashJoinExec`
/// from a query such as `SELECT * FROM t1 JOIN t2 ON t1.a = t2.b`.
/// This function may return something like `t2.b IN (1, 5, 7)`.
///
/// A system or function that can only deal with a hardcoded set of `PhysicalExpr` implementations
/// or needs to serialize this state to bytes may not be able to handle these dynamic references.
/// In such cases, we should return a simplified version of the `PhysicalExpr` that does not
/// contain these dynamic references.
///
/// Systems that implement remote execution of plans, e.g. serialize a portion of the query plan
/// and send it across the wire to a remote executor may want to call this method after
/// every batch on the source side and brodcast / update the current snaphot to the remote executor.
///
/// Note for implementers: this method should *not* handle recursion.
/// Recursion is handled in [`snapshot_physical_expr`].
fn snapshot(&self) -> Result<Option<Arc<dyn PhysicalExpr>>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All existing expressions no need to implement this, right? Even no need another structure implementing this other than DynamicFilterPhysicalExpr, as it seems to be designed to handle all tricks

Copy link
Contributor Author

@adriangb adriangb Apr 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think so. No existing expressions need to implement this, unless that would be helpful to somehow simplify themselves into something that PruningPredicate or serialization can handle, but I don't know of any cases where that would make sense.

But we leave the door open for someone to implement another PhysicalExpr that does not use DynamicFilterPhysicalExpr.

// By default, we return None to indicate that this PhysicalExpr does not
// have any dynamic references or state.
// This is a safe default behavior.
Ok(None)
}
Comment on lines +330 to +335
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if it would be reasonable to inject the schema here so that this can be used for the use case in #15057 as well: we could model a predicate like variant_get(col, 'foo') = 5 by making variant_get(col, 'foo') a dynamic physicalexpr that snapshots itself to "col.typed_value.foo.typed_value if the schema/types permit otherwise it snapshots itself to _variant_get_value(col.metadata, col.value).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean "inject" the schema?

Like make this something like?

    fn snapshot(&self, &Schema) -> Result<Option<Arc<dyn PhysicalExpr>>> {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes precisely.

So instead of being just dynamic on some internal state it will be dynamic on the actual physical schema of the file it is being applied to. But maybe that should be its own API:

fn with_physical_schema(&self, schema: &Schema) -> Option<Arc<dyn PhysicalExpr>>

Then if something wants a snapshot that's specialized for a specific file schema it can call both methods.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in theory the schema of the input to a PhysicaExpr shouldn't change so any expr that needs it could hold a reference 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that true? Where would it hold onto the reference from, data_type()? Can't the same PhysicalExpr be used in multiple contexts, e.g. with the same columns but different projections / different physical column orders? Put another way, is it guaranteed that data_type() can only be called once or that evaluate() is always called with the same schema?

To be clear: I don't think this is important for this PR. After reflection in https://github.com/apache/datafusion/pull/15568/files#r2033710338 I think if we want that we can add it as a new method in #15057

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that true? Where would it hold onto the reference from, data_type()? Can't the same PhysicalExpr be used in multiple contexts, e.g. with the same columns but different projections / different physical column orders?

Not in my understanding -- while it could in theory work this way I don't think they are typically shared.

Put another way, is it guaranteed that data_type() can only be called once or that evaluate() is always called with the same schema?

In my mind what happens is that the PhysicalExpr is created with the schema information: https://docs.rs/datafusion/latest/datafusion/physical_expr/fn.create_physical_expr.html

But it doesn't hold a reference to the schema internally 🤔

}

/// [`PhysicalExpr`] can't be constrained by [`Eq`] directly because it must remain object
Expand Down Expand Up @@ -446,3 +496,30 @@ pub fn fmt_sql(expr: &dyn PhysicalExpr) -> impl Display + '_ {

Wrapper { expr }
}

/// Take a snapshot of the given `PhysicalExpr` if it is dynamic.
///
/// Take a snapshot of this `PhysicalExpr` if it is dynamic.
/// This is used to capture the current state of `PhysicalExpr`s that may contain
/// dynamic references to other operators in order to serialize it over the wire
/// or treat it via downcast matching.
///
/// See the documentation of [`PhysicalExpr::snapshot`] for more details.
///
/// # Returns
///
/// Returns an `Option<Arc<dyn PhysicalExpr>>` which is the snapshot of the
/// `PhysicalExpr` if it is dynamic. If the `PhysicalExpr` does not have
/// any dynamic references or state, it returns `None`.
pub fn snapshot_physical_expr(
expr: Arc<dyn PhysicalExpr>,
) -> Result<Arc<dyn PhysicalExpr>> {
expr.transform_up(|e| {
if let Some(snapshot) = e.snapshot()? {
Ok(Transformed::yes(snapshot))
} else {
Ok(Transformed::no(Arc::clone(&e)))
}
})
.data()
}
1 change: 1 addition & 0 deletions datafusion/physical-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ petgraph = "0.7.1"
arrow = { workspace = true, features = ["test_utils"] }
criterion = { workspace = true }
datafusion-functions = { workspace = true }
insta = { workspace = true }
rand = { workspace = true }
rstest = { workspace = true }

Expand Down
Loading
Loading