Skip to content

Commit ae10754

Browse files
peter-tothalamb
andauthored
Add reference visitor TreeNode APIs, change ExecutionPlan::children() and PhysicalExpr::children() return references (#10543)
* add reference visitor APIs * use stricter references in apply() and visit() * avoid where clause * remove NO_OP * remove assert after removing NO_OP --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 7c08a6f commit ae10754

File tree

84 files changed

+338
-252
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

84 files changed

+338
-252
lines changed

datafusion-examples/examples/custom_datasource.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ impl ExecutionPlan for CustomExec {
237237
&self.cache
238238
}
239239

240-
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
240+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
241241
vec![]
242242
}
243243

datafusion/common/src/tree_node.rs

Lines changed: 111 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,8 @@ pub trait TreeNode: Sized {
123123
/// TreeNodeVisitor::f_up(ChildNode2)
124124
/// TreeNodeVisitor::f_up(ParentNode)
125125
/// ```
126-
fn visit<V: TreeNodeVisitor<Node = Self>>(
127-
&self,
126+
fn visit<'n, V: TreeNodeVisitor<'n, Node = Self>>(
127+
&'n self,
128128
visitor: &mut V,
129129
) -> Result<TreeNodeRecursion> {
130130
visitor
@@ -190,12 +190,12 @@ pub trait TreeNode: Sized {
190190
/// # See Also
191191
/// * [`Self::transform_down`] for the equivalent transformation API.
192192
/// * [`Self::visit`] for both top-down and bottom up traversal.
193-
fn apply<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
194-
&self,
193+
fn apply<'n, F: FnMut(&'n Self) -> Result<TreeNodeRecursion>>(
194+
&'n self,
195195
mut f: F,
196196
) -> Result<TreeNodeRecursion> {
197-
fn apply_impl<N: TreeNode, F: FnMut(&N) -> Result<TreeNodeRecursion>>(
198-
node: &N,
197+
fn apply_impl<'n, N: TreeNode, F: FnMut(&'n N) -> Result<TreeNodeRecursion>>(
198+
node: &'n N,
199199
f: &mut F,
200200
) -> Result<TreeNodeRecursion> {
201201
f(node)?.visit_children(|| node.apply_children(|c| apply_impl(c, f)))
@@ -427,8 +427,8 @@ pub trait TreeNode: Sized {
427427
///
428428
/// Description: Apply `f` to inspect node's children (but not the node
429429
/// itself).
430-
fn apply_children<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
431-
&self,
430+
fn apply_children<'n, F: FnMut(&'n Self) -> Result<TreeNodeRecursion>>(
431+
&'n self,
432432
f: F,
433433
) -> Result<TreeNodeRecursion>;
434434

@@ -466,19 +466,19 @@ pub trait TreeNode: Sized {
466466
///
467467
/// # See Also:
468468
/// * [`TreeNode::rewrite`] to rewrite owned `TreeNode`s
469-
pub trait TreeNodeVisitor: Sized {
469+
pub trait TreeNodeVisitor<'n>: Sized {
470470
/// The node type which is visitable.
471471
type Node: TreeNode;
472472

473473
/// Invoked while traversing down the tree, before any children are visited.
474474
/// Default implementation continues the recursion.
475-
fn f_down(&mut self, _node: &Self::Node) -> Result<TreeNodeRecursion> {
475+
fn f_down(&mut self, _node: &'n Self::Node) -> Result<TreeNodeRecursion> {
476476
Ok(TreeNodeRecursion::Continue)
477477
}
478478

479479
/// Invoked while traversing up the tree after children are visited. Default
480480
/// implementation continues the recursion.
481-
fn f_up(&mut self, _node: &Self::Node) -> Result<TreeNodeRecursion> {
481+
fn f_up(&mut self, _node: &'n Self::Node) -> Result<TreeNodeRecursion> {
482482
Ok(TreeNodeRecursion::Continue)
483483
}
484484
}
@@ -855,7 +855,7 @@ impl<T> TransformedResult<T> for Result<Transformed<T>> {
855855
/// its related `Arc<dyn T>` will automatically implement [`TreeNode`].
856856
pub trait DynTreeNode {
857857
/// Returns all children of the specified `TreeNode`.
858-
fn arc_children(&self) -> Vec<Arc<Self>>;
858+
fn arc_children(&self) -> Vec<&Arc<Self>>;
859859

860860
/// Constructs a new node with the specified children.
861861
fn with_new_arc_children(
@@ -868,11 +868,11 @@ pub trait DynTreeNode {
868868
/// Blanket implementation for any `Arc<T>` where `T` implements [`DynTreeNode`]
869869
/// (such as [`Arc<dyn PhysicalExpr>`]).
870870
impl<T: DynTreeNode + ?Sized> TreeNode for Arc<T> {
871-
fn apply_children<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
872-
&self,
871+
fn apply_children<'n, F: FnMut(&'n Self) -> Result<TreeNodeRecursion>>(
872+
&'n self,
873873
f: F,
874874
) -> Result<TreeNodeRecursion> {
875-
self.arc_children().iter().apply_until_stop(f)
875+
self.arc_children().into_iter().apply_until_stop(f)
876876
}
877877

878878
fn map_children<F: FnMut(Self) -> Result<Transformed<Self>>>(
@@ -881,7 +881,10 @@ impl<T: DynTreeNode + ?Sized> TreeNode for Arc<T> {
881881
) -> Result<Transformed<Self>> {
882882
let children = self.arc_children();
883883
if !children.is_empty() {
884-
let new_children = children.into_iter().map_until_stop_and_collect(f)?;
884+
let new_children = children
885+
.into_iter()
886+
.cloned()
887+
.map_until_stop_and_collect(f)?;
885888
// Propagate up `new_children.transformed` and `new_children.tnr`
886889
// along with the node containing transformed children.
887890
if new_children.transformed {
@@ -913,8 +916,8 @@ pub trait ConcreteTreeNode: Sized {
913916
}
914917

915918
impl<T: ConcreteTreeNode> TreeNode for T {
916-
fn apply_children<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
917-
&self,
919+
fn apply_children<'n, F: FnMut(&'n Self) -> Result<TreeNodeRecursion>>(
920+
&'n self,
918921
f: F,
919922
) -> Result<TreeNodeRecursion> {
920923
self.children().iter().apply_until_stop(f)
@@ -938,6 +941,7 @@ impl<T: ConcreteTreeNode> TreeNode for T {
938941

939942
#[cfg(test)]
940943
mod tests {
944+
use std::collections::HashMap;
941945
use std::fmt::Display;
942946

943947
use crate::tree_node::{
@@ -946,7 +950,7 @@ mod tests {
946950
};
947951
use crate::Result;
948952

949-
#[derive(PartialEq, Debug)]
953+
#[derive(Debug, Eq, Hash, PartialEq)]
950954
struct TestTreeNode<T> {
951955
children: Vec<TestTreeNode<T>>,
952956
data: T,
@@ -959,8 +963,8 @@ mod tests {
959963
}
960964

961965
impl<T> TreeNode for TestTreeNode<T> {
962-
fn apply_children<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
963-
&self,
966+
fn apply_children<'n, F: FnMut(&'n Self) -> Result<TreeNodeRecursion>>(
967+
&'n self,
964968
f: F,
965969
) -> Result<TreeNodeRecursion> {
966970
self.children.iter().apply_until_stop(f)
@@ -1459,15 +1463,15 @@ mod tests {
14591463
}
14601464
}
14611465

1462-
impl<T: Display> TreeNodeVisitor for TestVisitor<T> {
1466+
impl<'n, T: Display> TreeNodeVisitor<'n> for TestVisitor<T> {
14631467
type Node = TestTreeNode<T>;
14641468

1465-
fn f_down(&mut self, node: &Self::Node) -> Result<TreeNodeRecursion> {
1469+
fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
14661470
self.visits.push(format!("f_down({})", node.data));
14671471
(*self.f_down)(node)
14681472
}
14691473

1470-
fn f_up(&mut self, node: &Self::Node) -> Result<TreeNodeRecursion> {
1474+
fn f_up(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
14711475
self.visits.push(format!("f_up({})", node.data));
14721476
(*self.f_up)(node)
14731477
}
@@ -1912,4 +1916,87 @@ mod tests {
19121916
TreeNodeRecursion::Stop
19131917
)
19141918
);
1919+
1920+
// F
1921+
// / | \
1922+
// / | \
1923+
// E C A
1924+
// | / \
1925+
// C B D
1926+
// / \ |
1927+
// B D A
1928+
// |
1929+
// A
1930+
#[test]
1931+
fn test_apply_and_visit_references() -> Result<()> {
1932+
let node_a = TestTreeNode::new(vec![], "a".to_string());
1933+
let node_b = TestTreeNode::new(vec![], "b".to_string());
1934+
let node_d = TestTreeNode::new(vec![node_a], "d".to_string());
1935+
let node_c = TestTreeNode::new(vec![node_b, node_d], "c".to_string());
1936+
let node_e = TestTreeNode::new(vec![node_c], "e".to_string());
1937+
let node_a_2 = TestTreeNode::new(vec![], "a".to_string());
1938+
let node_b_2 = TestTreeNode::new(vec![], "b".to_string());
1939+
let node_d_2 = TestTreeNode::new(vec![node_a_2], "d".to_string());
1940+
let node_c_2 = TestTreeNode::new(vec![node_b_2, node_d_2], "c".to_string());
1941+
let node_a_3 = TestTreeNode::new(vec![], "a".to_string());
1942+
let tree = TestTreeNode::new(vec![node_e, node_c_2, node_a_3], "f".to_string());
1943+
1944+
let node_f_ref = &tree;
1945+
let node_e_ref = &node_f_ref.children[0];
1946+
let node_c_ref = &node_e_ref.children[0];
1947+
let node_b_ref = &node_c_ref.children[0];
1948+
let node_d_ref = &node_c_ref.children[1];
1949+
let node_a_ref = &node_d_ref.children[0];
1950+
1951+
let mut m: HashMap<&TestTreeNode<String>, usize> = HashMap::new();
1952+
tree.apply(|e| {
1953+
*m.entry(e).or_insert(0) += 1;
1954+
Ok(TreeNodeRecursion::Continue)
1955+
})?;
1956+
1957+
let expected = HashMap::from([
1958+
(node_f_ref, 1),
1959+
(node_e_ref, 1),
1960+
(node_c_ref, 2),
1961+
(node_d_ref, 2),
1962+
(node_b_ref, 2),
1963+
(node_a_ref, 3),
1964+
]);
1965+
assert_eq!(m, expected);
1966+
1967+
struct TestVisitor<'n> {
1968+
m: HashMap<&'n TestTreeNode<String>, (usize, usize)>,
1969+
}
1970+
1971+
impl<'n> TreeNodeVisitor<'n> for TestVisitor<'n> {
1972+
type Node = TestTreeNode<String>;
1973+
1974+
fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
1975+
let (down_count, _) = self.m.entry(node).or_insert((0, 0));
1976+
*down_count += 1;
1977+
Ok(TreeNodeRecursion::Continue)
1978+
}
1979+
1980+
fn f_up(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
1981+
let (_, up_count) = self.m.entry(node).or_insert((0, 0));
1982+
*up_count += 1;
1983+
Ok(TreeNodeRecursion::Continue)
1984+
}
1985+
}
1986+
1987+
let mut visitor = TestVisitor { m: HashMap::new() };
1988+
tree.visit(&mut visitor)?;
1989+
1990+
let expected = HashMap::from([
1991+
(node_f_ref, (1, 1)),
1992+
(node_e_ref, (1, 1)),
1993+
(node_c_ref, (2, 2)),
1994+
(node_d_ref, (2, 2)),
1995+
(node_b_ref, (2, 2)),
1996+
(node_a_ref, (3, 3)),
1997+
]);
1998+
assert_eq!(visitor.m, expected);
1999+
2000+
Ok(())
2001+
}
19152002
}

datafusion/core/src/datasource/physical_plan/arrow_file.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ impl ExecutionPlan for ArrowExec {
134134
&self.cache
135135
}
136136

137-
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
137+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
138138
Vec::new()
139139
}
140140

datafusion/core/src/datasource/physical_plan/avro.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ impl ExecutionPlan for AvroExec {
111111
&self.cache
112112
}
113113

114-
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
114+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
115115
Vec::new()
116116
}
117117

datafusion/core/src/datasource/physical_plan/csv.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ impl ExecutionPlan for CsvExec {
173173
&self.cache
174174
}
175175

176-
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
176+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
177177
// this is a leaf node and has no children
178178
vec![]
179179
}

datafusion/core/src/datasource/physical_plan/json.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ impl ExecutionPlan for NdJsonExec {
138138
&self.cache
139139
}
140140

141-
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
141+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
142142
Vec::new()
143143
}
144144

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ impl ExecutionPlan for ParquetExec {
365365
&self.cache
366366
}
367367

368-
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
368+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
369369
// this is a leaf node and has no children
370370
vec![]
371371
}

datafusion/core/src/execution/context/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2465,10 +2465,10 @@ impl<'a> BadPlanVisitor<'a> {
24652465
}
24662466
}
24672467

2468-
impl<'a> TreeNodeVisitor for BadPlanVisitor<'a> {
2468+
impl<'n, 'a> TreeNodeVisitor<'n> for BadPlanVisitor<'a> {
24692469
type Node = LogicalPlan;
24702470

2471-
fn f_down(&mut self, node: &Self::Node) -> Result<TreeNodeRecursion> {
2471+
fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
24722472
match node {
24732473
LogicalPlan::Ddl(ddl) if !self.options.allow_ddl => {
24742474
plan_err!("DDL not supported: {}", ddl.name())

datafusion/core/src/physical_optimizer/aggregate_statistics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ fn take_optimizable(node: &dyn ExecutionPlan) -> Option<Arc<dyn ExecutionPlan>>
123123
return Some(child);
124124
}
125125
}
126-
if let [ref childrens_child] = child.children().as_slice() {
126+
if let [childrens_child] = child.children().as_slice() {
127127
child = Arc::clone(childrens_child);
128128
} else {
129129
break;

datafusion/core/src/physical_optimizer/enforce_distribution.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1375,8 +1375,8 @@ pub(crate) mod tests {
13751375
vec![false]
13761376
}
13771377

1378-
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
1379-
vec![self.input.clone()]
1378+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1379+
vec![&self.input]
13801380
}
13811381

13821382
// model that it requires the output ordering of its input

datafusion/core/src/physical_optimizer/enforce_sorting.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -567,7 +567,7 @@ fn remove_corresponding_sort_from_sub_plan(
567567
// Replace with variants that do not preserve order.
568568
if is_sort_preserving_merge(&node.plan) {
569569
node.children = node.children.swap_remove(0).children;
570-
node.plan = node.plan.children().swap_remove(0);
570+
node.plan = node.plan.children().swap_remove(0).clone();
571571
} else if let Some(repartition) =
572572
node.plan.as_any().downcast_ref::<RepartitionExec>()
573573
{

datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,15 +78,15 @@ impl LimitedDistinctAggregation {
7878
let mut is_global_limit = false;
7979
if let Some(local_limit) = plan.as_any().downcast_ref::<LocalLimitExec>() {
8080
limit = local_limit.fetch();
81-
children = local_limit.children();
81+
children = local_limit.children().into_iter().cloned().collect();
8282
} else if let Some(global_limit) = plan.as_any().downcast_ref::<GlobalLimitExec>()
8383
{
8484
global_fetch = global_limit.fetch();
8585
global_fetch?;
8686
global_skip = global_limit.skip();
8787
// the aggregate must read at least fetch+skip number of rows
8888
limit = global_fetch.unwrap() + global_skip;
89-
children = global_limit.children();
89+
children = global_limit.children().into_iter().cloned().collect();
9090
is_global_limit = true
9191
} else {
9292
return None;

datafusion/core/src/physical_optimizer/output_requirements.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,8 @@ impl ExecutionPlan for OutputRequirementExec {
157157
vec![true]
158158
}
159159

160-
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
161-
vec![self.input.clone()]
160+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
161+
vec![&self.input]
162162
}
163163

164164
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
@@ -273,7 +273,7 @@ fn require_top_ordering_helper(
273273
// When an operator requires an ordering, any `SortExec` below can not
274274
// be responsible for (i.e. the originator of) the global ordering.
275275
let (new_child, is_changed) =
276-
require_top_ordering_helper(children.swap_remove(0))?;
276+
require_top_ordering_helper(children.swap_remove(0).clone())?;
277277
Ok((plan.with_new_children(vec![new_child])?, is_changed))
278278
} else {
279279
// Stop searching, there is no global ordering desired for the query.

0 commit comments

Comments
 (0)