Skip to content

Commit 21bce33

Browse files
authored
Improve UserDefinedLogicalNode::from_template API to return Result (#10575)
* UserDefinedLogicalNode::from_template return Result * Rename from_template to with_exprs_and_inputs * Resolve review comments
1 parent 37b0112 commit 21bce33

File tree

5 files changed

+48
-47
lines changed

5 files changed

+48
-47
lines changed

datafusion/expr/src/logical_plan/extension.rs

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
//! This module defines the interface for logical nodes
1919
use crate::{Expr, LogicalPlan};
20-
use datafusion_common::{DFSchema, DFSchemaRef};
20+
use datafusion_common::{DFSchema, DFSchemaRef, Result};
2121
use std::hash::{Hash, Hasher};
2222
use std::{any::Any, collections::HashSet, fmt, sync::Arc};
2323

@@ -76,27 +76,31 @@ pub trait UserDefinedLogicalNode: fmt::Debug + Send + Sync {
7676
/// For example: `TopK: k=10`
7777
fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result;
7878

79-
/// Create a new `ExtensionPlanNode` with the specified children
79+
#[deprecated(since = "39.0.0", note = "use with_exprs_and_inputs instead")]
80+
#[allow(clippy::wrong_self_convention)]
81+
fn from_template(
82+
&self,
83+
exprs: &[Expr],
84+
inputs: &[LogicalPlan],
85+
) -> Arc<dyn UserDefinedLogicalNode> {
86+
self.with_exprs_and_inputs(exprs.to_vec(), inputs.to_vec())
87+
.unwrap()
88+
}
89+
90+
/// Create a new `UserDefinedLogicalNode` with the specified children
8091
/// and expressions. This function is used during optimization
8192
/// when the plan is being rewritten and a new instance of the
82-
/// `ExtensionPlanNode` must be created.
93+
/// `UserDefinedLogicalNode` must be created.
8394
///
8495
/// Note that exprs and inputs are in the same order as the result
8596
/// of self.inputs and self.exprs.
8697
///
87-
/// So, `self.from_template(exprs, ..).expressions() == exprs
88-
//
89-
// TODO(clippy): This should probably be renamed to use a `with_*` prefix. Something
90-
// like `with_template`, or `with_exprs_and_inputs`.
91-
//
92-
// Also, I think `ExtensionPlanNode` has been renamed to `UserDefinedLogicalNode`
93-
// but the doc comments have not been updated.
94-
#[allow(clippy::wrong_self_convention)]
95-
fn from_template(
98+
/// So, `self.with_exprs_and_inputs(exprs, ..).expressions() == exprs
99+
fn with_exprs_and_inputs(
96100
&self,
97-
exprs: &[Expr],
98-
inputs: &[LogicalPlan],
99-
) -> Arc<dyn UserDefinedLogicalNode>;
101+
exprs: Vec<Expr>,
102+
inputs: Vec<LogicalPlan>,
103+
) -> Result<Arc<dyn UserDefinedLogicalNode>>;
100104

101105
/// Returns the necessary input columns for this node required to compute
102106
/// the columns in the output schema
@@ -312,12 +316,12 @@ impl<T: UserDefinedLogicalNodeCore> UserDefinedLogicalNode for T {
312316
self.fmt_for_explain(f)
313317
}
314318

315-
fn from_template(
319+
fn with_exprs_and_inputs(
316320
&self,
317-
exprs: &[Expr],
318-
inputs: &[LogicalPlan],
319-
) -> Arc<dyn UserDefinedLogicalNode> {
320-
Arc::new(self.from_template(exprs, inputs))
321+
exprs: Vec<Expr>,
322+
inputs: Vec<LogicalPlan>,
323+
) -> Result<Arc<dyn UserDefinedLogicalNode>> {
324+
Ok(Arc::new(self.from_template(&exprs, &inputs)))
321325
}
322326

323327
fn necessary_children_exprs(

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -625,7 +625,7 @@ impl LogicalPlan {
625625
let expr = node.expressions();
626626
let inputs: Vec<_> = node.inputs().into_iter().cloned().collect();
627627
Ok(LogicalPlan::Extension(Extension {
628-
node: node.from_template(&expr, &inputs),
628+
node: node.with_exprs_and_inputs(expr, inputs)?,
629629
}))
630630
}
631631
LogicalPlan::Union(Union { inputs, schema }) => {
@@ -923,7 +923,7 @@ impl LogicalPlan {
923923
definition: definition.clone(),
924924
}))),
925925
LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
926-
node: e.node.from_template(&expr, &inputs),
926+
node: e.node.with_exprs_and_inputs(expr, inputs)?,
927927
})),
928928
LogicalPlan::Union(Union { schema, .. }) => {
929929
let input_schema = inputs[0].schema();

datafusion/expr/src/logical_plan/tree_node.rs

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ where
417417
.map_data(|new_inputs| {
418418
let exprs = node.expressions();
419419
Ok(Extension {
420-
node: node.from_template(&exprs, &new_inputs),
420+
node: node.with_exprs_and_inputs(exprs, new_inputs)?,
421421
})
422422
})
423423
}
@@ -658,22 +658,18 @@ impl LogicalPlan {
658658
LogicalPlan::Extension(Extension { node }) => {
659659
// would be nice to avoid this copy -- maybe can
660660
// update extension to just observer Exprs
661-
node.expressions()
661+
let exprs = node
662+
.expressions()
662663
.into_iter()
663-
.map_until_stop_and_collect(f)?
664-
.update_data(|exprs| {
665-
LogicalPlan::Extension(Extension {
666-
node: UserDefinedLogicalNode::from_template(
667-
node.as_ref(),
668-
exprs.as_slice(),
669-
node.inputs()
670-
.into_iter()
671-
.cloned()
672-
.collect::<Vec<_>>()
673-
.as_slice(),
674-
),
675-
})
676-
})
664+
.map_until_stop_and_collect(f)?;
665+
let plan = LogicalPlan::Extension(Extension {
666+
node: UserDefinedLogicalNode::with_exprs_and_inputs(
667+
node.as_ref(),
668+
exprs.data,
669+
node.inputs().into_iter().cloned().collect::<Vec<_>>(),
670+
)?,
671+
});
672+
Transformed::new(plan, exprs.transformed, exprs.tnr)
677673
}
678674
LogicalPlan::TableScan(TableScan {
679675
table_name,

datafusion/substrait/src/logical_plan/consumer.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -551,7 +551,8 @@ pub async fn from_substrait_rel(
551551
);
552552
};
553553
let input_plan = from_substrait_rel(ctx, input_rel, extensions).await?;
554-
let plan = plan.from_template(&plan.expressions(), &[input_plan]);
554+
let plan =
555+
plan.with_exprs_and_inputs(plan.expressions(), vec![input_plan])?;
555556
Ok(LogicalPlan::Extension(Extension { node: plan }))
556557
}
557558
Some(RelType::ExtensionMulti(extension)) => {
@@ -567,7 +568,7 @@ pub async fn from_substrait_rel(
567568
let input_plan = from_substrait_rel(ctx, input, extensions).await?;
568569
inputs.push(input_plan);
569570
}
570-
let plan = plan.from_template(&plan.expressions(), &inputs);
571+
let plan = plan.with_exprs_and_inputs(plan.expressions(), inputs)?;
571572
Ok(LogicalPlan::Extension(Extension { node: plan }))
572573
}
573574
Some(RelType::Exchange(exchange)) => {

datafusion/substrait/tests/cases/roundtrip_logical_plan.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -110,16 +110,16 @@ impl UserDefinedLogicalNode for MockUserDefinedLogicalPlan {
110110
)
111111
}
112112

113-
fn from_template(
113+
fn with_exprs_and_inputs(
114114
&self,
115-
_: &[Expr],
116-
inputs: &[LogicalPlan],
117-
) -> Arc<dyn UserDefinedLogicalNode> {
118-
Arc::new(Self {
115+
_: Vec<Expr>,
116+
inputs: Vec<LogicalPlan>,
117+
) -> Result<Arc<dyn UserDefinedLogicalNode>> {
118+
Ok(Arc::new(Self {
119119
validation_bytes: self.validation_bytes.clone(),
120-
inputs: inputs.to_vec(),
120+
inputs,
121121
empty_schema: Arc::new(DFSchema::empty()),
122-
})
122+
}))
123123
}
124124

125125
fn dyn_hash(&self, _: &mut dyn std::hash::Hasher) {

0 commit comments

Comments
 (0)