Skip to content

Commit c060730

Browse files
adriangbalambberkaysynnada
authored and
Nirnay Roy
committed
Introduce DynamicFilterSource and DynamicPhysicalExpr (apache#15568)
* update * Add file * fix * Add remap children test * fmt * more comments * fmt * Apply suggestions from code review Co-authored-by: Andrew Lamb <[email protected]> * Add some more comments * Update datafusion/physical-expr-common/src/physical_expr.rs Co-authored-by: Berkay Şahin <[email protected]> * Simplify trait to concrete impl * clippy * remap children in update() * better test --------- Co-authored-by: Andrew Lamb <[email protected]> Co-authored-by: Berkay Şahin <[email protected]>
1 parent a6fdfff commit c060730

File tree

9 files changed

+593
-2
lines changed

9 files changed

+593
-2
lines changed

Cargo.lock

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/physical-expr-common/src/physical_expr.rs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use arrow::array::BooleanArray;
2727
use arrow::compute::filter_record_batch;
2828
use arrow::datatypes::{DataType, Schema};
2929
use arrow::record_batch::RecordBatch;
30+
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
3031
use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue};
3132
use datafusion_expr_common::columnar_value::ColumnarValue;
3233
use datafusion_expr_common::interval_arithmetic::Interval;
@@ -283,6 +284,55 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash {
283284
/// See the [`fmt_sql`] function for an example of printing `PhysicalExpr`s as SQL.
284285
///
285286
fn fmt_sql(&self, f: &mut Formatter<'_>) -> fmt::Result;
287+
288+
/// Take a snapshot of this `PhysicalExpr`, if it is dynamic.
289+
///
290+
/// "Dynamic" in this case means containing references to structures that may change
291+
/// during plan execution, such as hash tables.
292+
///
293+
/// This method is used to capture the current state of `PhysicalExpr`s that may contain
294+
/// dynamic references to other operators in order to serialize it over the wire
295+
/// or treat it via downcast matching.
296+
///
297+
/// You should not call this method directly as it does not handle recursion.
298+
/// Instead use [`snapshot_physical_expr`] to handle recursion and capture the
299+
/// full state of the `PhysicalExpr`.
300+
///
301+
/// This is expected to return "simple" expressions that do not have mutable state
302+
/// and are composed of DataFusion's built-in `PhysicalExpr` implementations.
303+
/// Callers however should *not* assume anything about the returned expressions
304+
/// since callers and implementers may not agree on what "simple" or "built-in"
305+
/// means.
306+
/// In other words, if you need to serialize a `PhysicalExpr` across the wire
307+
/// you should call this method and then try to serialize the result,
308+
/// but you should handle unknown or unexpected `PhysicalExpr` implementations gracefully
309+
/// just as if you had not called this method at all.
310+
///
311+
/// In particular, consider:
312+
/// * A `PhysicalExpr` that references the current state of a `datafusion::physical_plan::TopK`
313+
/// that is involved in a query with `SELECT * FROM t1 ORDER BY a LIMIT 10`.
314+
/// This function may return something like `a >= 12`.
315+
/// * A `PhysicalExpr` that references the current state of a `datafusion::physical_plan::joins::HashJoinExec`
316+
/// from a query such as `SELECT * FROM t1 JOIN t2 ON t1.a = t2.b`.
317+
/// This function may return something like `t2.b IN (1, 5, 7)`.
318+
///
319+
/// A system or function that can only deal with a hardcoded set of `PhysicalExpr` implementations
320+
/// or needs to serialize this state to bytes may not be able to handle these dynamic references.
321+
/// In such cases, we should return a simplified version of the `PhysicalExpr` that does not
322+
/// contain these dynamic references.
323+
///
324+
/// Systems that implement remote execution of plans, e.g. serialize a portion of the query plan
325+
/// and send it across the wire to a remote executor may want to call this method after
326+
/// every batch on the source side and brodcast / update the current snaphot to the remote executor.
327+
///
328+
/// Note for implementers: this method should *not* handle recursion.
329+
/// Recursion is handled in [`snapshot_physical_expr`].
330+
fn snapshot(&self) -> Result<Option<Arc<dyn PhysicalExpr>>> {
331+
// By default, we return None to indicate that this PhysicalExpr does not
332+
// have any dynamic references or state.
333+
// This is a safe default behavior.
334+
Ok(None)
335+
}
286336
}
287337

288338
/// [`PhysicalExpr`] can't be constrained by [`Eq`] directly because it must remain object
@@ -446,3 +496,30 @@ pub fn fmt_sql(expr: &dyn PhysicalExpr) -> impl Display + '_ {
446496

447497
Wrapper { expr }
448498
}
499+
500+
/// Take a snapshot of the given `PhysicalExpr` if it is dynamic.
501+
///
502+
/// Take a snapshot of this `PhysicalExpr` if it is dynamic.
503+
/// This is used to capture the current state of `PhysicalExpr`s that may contain
504+
/// dynamic references to other operators in order to serialize it over the wire
505+
/// or treat it via downcast matching.
506+
///
507+
/// See the documentation of [`PhysicalExpr::snapshot`] for more details.
508+
///
509+
/// # Returns
510+
///
511+
/// Returns an `Option<Arc<dyn PhysicalExpr>>` which is the snapshot of the
512+
/// `PhysicalExpr` if it is dynamic. If the `PhysicalExpr` does not have
513+
/// any dynamic references or state, it returns `None`.
514+
pub fn snapshot_physical_expr(
515+
expr: Arc<dyn PhysicalExpr>,
516+
) -> Result<Arc<dyn PhysicalExpr>> {
517+
expr.transform_up(|e| {
518+
if let Some(snapshot) = e.snapshot()? {
519+
Ok(Transformed::yes(snapshot))
520+
} else {
521+
Ok(Transformed::no(Arc::clone(&e)))
522+
}
523+
})
524+
.data()
525+
}

datafusion/physical-expr/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ petgraph = "0.7.1"
5757
arrow = { workspace = true, features = ["test_utils"] }
5858
criterion = { workspace = true }
5959
datafusion-functions = { workspace = true }
60+
insta = { workspace = true }
6061
rand = { workspace = true }
6162
rstest = { workspace = true }
6263

0 commit comments

Comments
 (0)