-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Introduce DynamicFilterSource and DynamicPhysicalExpr #15568
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
fn snapshot(&self) -> Result<Option<Arc<dyn PhysicalExpr>>> { | ||
// By default, we return None to indicate that this PhysicalExpr does not | ||
// have any dynamic references or state. | ||
// This is a safe default behavior. | ||
Ok(None) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if it would be reasonable to inject the schema here so that this can be used for the use case in #15057 as well: we could model a predicate like variant_get(col, 'foo') = 5
by making variant_get(col, 'foo')
a dynamic physicalexpr that snapshots itself to "col.typed_value.foo.typed_value
if the schema/types permit otherwise it snapshots itself to _variant_get_value(col.metadata, col.value)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean "inject" the schema?
Like make this something like?
fn snapshot(&self, &Schema) -> Result<Option<Arc<dyn PhysicalExpr>>> {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes precisely.
So instead of being just dynamic on some internal state it will be dynamic on the actual physical schema of the file it is being applied to. But maybe that should be its own API:
fn with_physical_schema(&self, schema: &Schema) -> Option<Arc<dyn PhysicalExpr>>
Then if something wants a snapshot that's specialized for a specific file schema it can call both methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in theory the schema of the input to a PhysicaExpr shouldn't change so any expr that needs it could hold a reference 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that true? Where would it hold onto the reference from, data_type()
? Can't the same PhysicalExpr
be used in multiple contexts, e.g. with the same columns but different projections / different physical column orders? Put another way, is it guaranteed that data_type()
can only be called once or that evaluate()
is always called with the same schema?
To be clear: I don't think this is important for this PR. After reflection in https://github.com/apache/datafusion/pull/15568/files#r2033710338 I think if we want that we can add it as a new method in #15057
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that true? Where would it hold onto the reference from, data_type()? Can't the same PhysicalExpr be used in multiple contexts, e.g. with the same columns but different projections / different physical column orders?
Not in my understanding -- while it could in theory work this way I don't think they are typically shared.
Put another way, is it guaranteed that data_type() can only be called once or that evaluate() is always called with the same schema?
In my mind what happens is that the PhysicalExpr is created with the schema information: https://docs.rs/datafusion/latest/datafusion/physical_expr/fn.create_physical_expr.html
But it doesn't hold a reference to the schema internally 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After the API in #15566 I wonder if we still need a DynamicFilterSource
anymore 🤔 It seems like any nodes with dynamic filters could just pass the predicate as part of ExecutionPlan::try_pushdown_filters
I do think we'll likely still need snapshot_filter
though
My view for I realize this PR doesn't make that clear or structure the exports correctly atm. Probably makes sense for it to be a private module in the same crate as |
Okay nope this PR is set up correctly: So really this entire PR is just:
|
b43c39d
to
f13729c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @adriangb -- I really like this API
Not only will it help with topk dynamic filtering, I think it is exactly the API someone would need if they wanted to implement dynamic filters for distributed joins -- for example 'snapshot' would make / create a bloom filter to send.
ALl in all I am super excited about where this is heading.
/// Callers however should *not* assume anything about the returned expressions | ||
/// since callers and implementers may not agree on what "simple" or "built-in" | ||
/// means. | ||
/// In other words, if you need to searlize a `PhysicalExpr` across the wire |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
fn snapshot(&self) -> Result<Option<Arc<dyn PhysicalExpr>>> { | ||
// By default, we return None to indicate that this PhysicalExpr does not | ||
// have any dynamic references or state. | ||
// This is a safe default behavior. | ||
Ok(None) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean "inject" the schema?
Like make this something like?
fn snapshot(&self, &Schema) -> Result<Option<Arc<dyn PhysicalExpr>>> {
@@ -283,6 +284,51 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash { | |||
/// See the [`fmt_sql`] function for an example of printing `PhysicalExpr`s as SQL. | |||
/// | |||
fn fmt_sql(&self, f: &mut Formatter<'_>) -> fmt::Result; | |||
|
|||
/// Take a snapshot of this `PhysicalExpr` if it is dynamic. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Take a snapshot of this `PhysicalExpr` if it is dynamic. | |
/// Take a snapshot of this `PhysicalExpr`, if it is dynamic. | |
/// |
/// This is used to capture the current state of `PhysicalExpr`s that may contain | ||
/// dynamic references to other operators in order to serialize it over the wire | ||
/// or treat it via downcast matching. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// This is used to capture the current state of `PhysicalExpr`s that may contain | |
/// dynamic references to other operators in order to serialize it over the wire | |
/// or treat it via downcast matching. | |
/// "Dynamic" in this case means containing references to structures that may change | |
/// during plan execution, such as hash tables. | |
/// | |
/// This method is used to capture the current state of `PhysicalExpr`s that may contain | |
/// dynamic references to other operators in order to serialize it over the wire | |
/// or treat it via downcast matching. |
This might be a good thing to note in the comments of |
Thank you for the review, I'll address comments this afternoon |
Done in 75f4e40 😄 |
Yep my goal was to create a flexible interface that can be used for joins, distributed things, etc. I hope it works out in the end! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking very good, thank you again @adriangb. I just need a bit help to understand some points
/// [`PhysicalExpr::evaluate`], [`PhysicalExpr::data_type`], and [`PhysicalExpr::nullable`]. | ||
/// It also implements [`PhysicalExpr::snapshot`] by forwarding the call to [`DynamicFilterSource::snapshot_current_filters`]. | ||
#[derive(Debug)] | ||
pub struct DynamicFilterPhysicalExpr { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this going to be the shared object between sources and dynamic filter introducing operators?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This becomes an Arc<dyn PhysicalExpr>
and is what gets pushed down into the source but holds onto a reference to the operator.
/// | ||
/// Note for implementers: this method should *not* handle recursion. | ||
/// Recursion is handled in [`snapshot_physical_expr`]. | ||
fn snapshot(&self) -> Result<Option<Arc<dyn PhysicalExpr>>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All existing expressions no need to implement this, right? Even no need another structure implementing this other than DynamicFilterPhysicalExpr, as it seems to be designed to handle all tricks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I think so. No existing expressions need to implement this, unless that would be helpful to somehow simplify themselves into something that PruningPredicate
or serialization can handle, but I don't know of any cases where that would make sense.
But we leave the door open for someone to implement another PhysicalExpr
that does not use DynamicFilterPhysicalExpr
.
/// accessible within the crate. | ||
/// If you would like to use this trait in your own code, please open an issue | ||
/// to discuss the use case and we can consider making it public. | ||
pub trait DynamicFilterSource: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There will be different structs for different filter introducing streams or they will utilize a common struct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea was that there would be different ones. For example TopK will have something like:
struct TopKDynamicFilterSource { ... }
impl DynamicFilterSource for TopKDynamicFilterSource { ... }
But now that I think about it it will probably be more helpful to provide one concrete implementation of:
struct DynamicPhysicalExpr {
inner: Arc<RwLock<Arc<dyn PhysicalExpr>>>
}
impl PhysicalExpr for DynamicPhysicalExpr {
// similar to current
}
I think that would work for at least some use cases and is a good place to start.
Wdyt @berkaysynnada ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@berkaysynnada please take a look at bdbd438, I think it makes things simpler 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@berkaysynnada please take a look at bdbd438, I think it makes things simpler 😄
It seems better to me as well :D
|
||
impl PartialEq for DynamicFilterPhysicalExpr { | ||
fn eq(&self, other: &Self) -> bool { | ||
let inner = self.current().expect("Failed to get current expression"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For equality I wonder if this would be simpler just to do a pointer check -- I think the instance has to be the same 🤔
) -> Result<Arc<dyn PhysicalExpr>> { | ||
Ok(Arc::new(Self { | ||
children: self.children.clone(), | ||
remapped_children: Some(children), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't look correct to me.
with_new_children
is the function used when we rewrite the plan with the new children.
children
and other properties i.e. cache is updated with the given children: Vec<Arc<dyn PhysicalExpr>>
.
I think you should find another way to handle the remapped_children
logic. PlanContext
with the custom payload may helps
It seems what you need is to update with new children in with_new_children
then you don't need remapped_children
anymore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jayzhan211 is right, I didn't notice that. All children
, remapped_children
and inner
fields should be updated here accordingly. Maybe we can left this as not_implemented!()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
self.children.iter().collect()
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>> {
let current = self
.inner
.read()
.map_err(|_| {
datafusion_common::DataFusionError::Execution(
"Failed to acquire read lock for inner".to_string(),
)
})?
.clone();
let new_cur = current
.transform_up(|expr| {
if let Some(pos) = self
.children
.iter()
.position(|c| c.as_ref() == expr.as_ref())
{
let new_child = Arc::clone(&children[pos]);
Ok(Transformed::yes(new_child))
} else {
// Otherwise, just return the expression
Ok(Transformed::no(expr))
}
});
Ok(Arc::new(Self {
children,
remapped_children: None,
inner: Arc::new(RwLock::new(new_cur.data().unwrap())),
data_type: Arc::clone(&self.data_type),
nullable: Arc::clone(&self.nullable),
}))
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and I think we don't need Arc<RwLock<T>>
for inner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would have to also keep track of remapped_children
to re-apply the transformation in update
/ whenever the expression changes. But yes what you are suggestion is to do the update immediately instead of when curent()
is called, which makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update is equivalent to with_new_children
No it is not. update()
replaces the inner PhysicalExpr
while preserving the outer PhysicalExpr
. new_with_children
is the exact opposite: it replaces the outer PhysicalExpr
(because the children changed) while keeping the same inner PhysicalExpr
. update()
is intended to be called by the producer of the DynamicPhysicalExpr
only while with_new_children
is called by the consumers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do they need to be updated here? Children get remapped dynamically when you call current().
What if we are doing plan rewrite and want a completely different children
in DynamicFilterPhysicalExpr
?
impl<T: ConcreteTreeNode> TreeNode for T {
fn apply_children<'n, F: FnMut(&'n Self) -> Result<TreeNodeRecursion>>(
&'n self,
f: F,
) -> Result<TreeNodeRecursion> {
self.children().iter().apply_until_stop(f)
}
fn map_children<F: FnMut(Self) -> Result<Transformed<Self>>>(
self,
f: F,
) -> Result<Transformed<Self>> {
let (new_self, children) = self.take_children();
if !children.is_empty() {
let new_children = children.into_iter().map_until_stop_and_collect(f)?;
// Propagate up `new_children.transformed` and `new_children.tnr` along with
// the node containing transformed children.
new_children.map_data(|new_children| new_self.with_new_children(new_children))
} else {
Ok(Transformed::no(new_self))
}
}
}
with_new_children
is the function called when you do the plan rewrite, but
in your code the children
isn't changed but remapped_children
is updated instead. IMO this seems like you rely on the with_new_children
function for DynamicFilterPhysicalExpr
requirement, and didn't actually have the ability to update the source of filters with the new one.
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::new(Self {
children: self.children.clone(),
remapped_children: Some(children),
inner: Arc::clone(&self.inner),
data_type: Arc::clone(&self.data_type),
nullable: Arc::clone(&self.nullable),
}))
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The result of with_new_children(children: Vec<Arc<dyn PhysicalExpr>>)
should be logically equivalent to DynamicFilterPhysicalExpr::new(children)
pub struct DynamicFilterPhysicalExpr {
/// The source of dynamic filters. `inner` in your design.
children: Arc<dyn PhysicalExpr>,
/// `children` in your design.
original_columns: Vec<Arc<dyn PhysicalExpr>>,
/// `remapped_children` in your design.
remapped_columns: Option<Vec<Arc<dyn PhysicalExpr>>>,
}
impl PhysicalExpr for DynamicFilterPhysicalExpr {
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>> {
// the source is updated
Ok(Arc::new(Self {
children,
...
}))
}
}
impl DynamicFilterPhysicalExpr {
// use this instead of `reassign_predicate_columns` to avoid the `with_new_children`
fn set_remapped_children(
&mut self,
remapped_children: Vec<Arc<dyn PhysicalExpr>>,
) {
self.remapped_children = Some(remapped_children);
}
fn update(new_expr: Arc<dyn PhysicalExpr>) -> Result<()> {
}
// Update the columns directly without transform API that call `with_new_children`
fn remap_children() {}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I must be missing something. Maybe you can make a PR that keeps the tests passing with your design?
The point is that:
- DynamicFilterPhysicalExpr gets initialized at planning time with a known set of children but a placeholder expression (
lit(true)
) with_new_children
is called making a newDynamicFilterPhysicalExpr
but with the children replaced (let's ignore how that happens internally for now)update
is called on the original reference with an expression that references the original children. This is propagated to all references, including those with new children, because of theArc<RwLock<...>>
.evaluate
is called on one of the references that previously hadwith_new_children
called on it. Sinceupdate
was called, which swapped outinner
, the children of this newinner
need to be remapped to the children that we currently expose externally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DynamicFilterPhysicalExpr(Vec<Arc>) is enough
Again I'm sorry if I'm missing something, but how would this work to share a single updatable reference across multiple copies of the PhysicalExpr (eg because with_new_children
was called)?
children, | ||
remapped_children: None, // Initially no remapped children | ||
inner: Arc::new(RwLock::new(inner)), | ||
data_type: Arc::new(RwLock::new(None)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is for test purpose, I think we can use with_data_type
, and gated with cfg(test)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what you mean. Are you wanting to avoid initializing these fields if we're not in tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay will look into that in a follow up PR. My thought was that it's pretty cheap to initialize them.
@berkaysynnada @jayzhan211 I added a test in f59577c that shows how this interacts with |
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Berkay Şahin <[email protected]>
f59577c
to
3cbdd2b
Compare
SInce this is an internal API that is not yet hooked up, let's merge merge it in and keep iterating on main (so we can hook it up and start testing it asap). If we can find simpler ways to implement some of the functions as we go forward, such as suggested by @jayzhan211 that will be great. |
Thanks everyone! |
I couldn't fully follow but, do we have remaining issues known with |
I think @jayzhan211 still has concerns but I'm having trouble understanding what they translate to in practice |
I think some of the concerns will become more clear as we connect this code all up together and see it in action. As it is currently not yet hooked up I think it is hard to reason about some of the potential concerns. |
Upd: @adriangb, DynamicFilterPhysicalExpr is PhysicalExprRef, instead of carrying
|
* update * Add file * fix * Add remap children test * fmt * more comments * fmt * Apply suggestions from code review Co-authored-by: Andrew Lamb <[email protected]> * Add some more comments * Update datafusion/physical-expr-common/src/physical_expr.rs Co-authored-by: Berkay Şahin <[email protected]> * Simplify trait to concrete impl * clippy * remap children in update() * better test --------- Co-authored-by: Andrew Lamb <[email protected]> Co-authored-by: Berkay Şahin <[email protected]>
This PR introduces:
PhysicalExpr::snapshot
to capture the current state of dynamic PhyscalExpr's for feeding into systems that match on concrete trait impls (namely PruningPredicate and protobuf serialization)