Skip to content

Commit 7a7c8cb

Browse files
authored
refactor: move PREPARE/EXECUTE into LogicalPlan::Statement (#13311)
* refactor: move PREPARE/EXECUTE into `LogicalPlan::Statement` * Fix cargo doc * Restore TableScan * Fix cargo doc
1 parent d77af59 commit 7a7c8cb

File tree

17 files changed

+210
-251
lines changed

17 files changed

+210
-251
lines changed

datafusion/core/src/execution/context/mod.rs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -688,11 +688,11 @@ impl SessionContext {
688688
LogicalPlan::Statement(Statement::SetVariable(stmt)) => {
689689
self.set_variable(stmt).await
690690
}
691-
LogicalPlan::Prepare(Prepare {
691+
LogicalPlan::Statement(Statement::Prepare(Prepare {
692692
name,
693693
input,
694694
data_types,
695-
}) => {
695+
})) => {
696696
// The number of parameters must match the specified data types length.
697697
if !data_types.is_empty() {
698698
let param_names = input.get_parameter_names()?;
@@ -712,7 +712,9 @@ impl SessionContext {
712712
self.state.write().store_prepared(name, data_types, input)?;
713713
self.return_empty_dataframe()
714714
}
715-
LogicalPlan::Execute(execute) => self.execute_prepared(execute),
715+
LogicalPlan::Statement(Statement::Execute(execute)) => {
716+
self.execute_prepared(execute)
717+
}
716718
plan => Ok(DataFrame::new(self.state(), plan)),
717719
}
718720
}
@@ -1773,14 +1775,6 @@ impl<'n, 'a> TreeNodeVisitor<'n> for BadPlanVisitor<'a> {
17731775
LogicalPlan::Statement(stmt) if !self.options.allow_statements => {
17741776
plan_err!("Statement not supported: {}", stmt.name())
17751777
}
1776-
// TODO: Implement PREPARE as a LogicalPlan::Statement
1777-
LogicalPlan::Prepare(_) if !self.options.allow_statements => {
1778-
plan_err!("Statement not supported: PREPARE")
1779-
}
1780-
// TODO: Implement EXECUTE as a LogicalPlan::Statement
1781-
LogicalPlan::Execute(_) if !self.options.allow_statements => {
1782-
plan_err!("Statement not supported: EXECUTE")
1783-
}
17841778
_ => Ok(TreeNodeRecursion::Continue),
17851779
}
17861780
}

datafusion/core/src/physical_planner.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1195,15 +1195,6 @@ impl DefaultPhysicalPlanner {
11951195
let name = statement.name();
11961196
return not_impl_err!("Unsupported logical plan: Statement({name})");
11971197
}
1198-
LogicalPlan::Prepare(_) => {
1199-
// There is no default plan for "PREPARE" -- it must be
1200-
// handled at a higher level (so that the appropriate
1201-
// statement can be prepared)
1202-
return not_impl_err!("Unsupported logical plan: Prepare");
1203-
}
1204-
LogicalPlan::Execute(_) => {
1205-
return not_impl_err!("Unsupported logical plan: Execute");
1206-
}
12071198
LogicalPlan::Dml(dml) => {
12081199
// DataFusion is a read-only query engine, but also a library, so consumers may implement this
12091200
return not_impl_err!("Unsupported logical plan: Dml({0})", dml.op);

datafusion/core/tests/sql/sql_api.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,12 +124,12 @@ async fn disable_prepare_and_execute_statement() {
124124
let df = ctx.sql_with_options(prepare_sql, options).await;
125125
assert_eq!(
126126
df.unwrap_err().strip_backtrace(),
127-
"Error during planning: Statement not supported: PREPARE"
127+
"Error during planning: Statement not supported: Prepare"
128128
);
129129
let df = ctx.sql_with_options(execute_sql, options).await;
130130
assert_eq!(
131131
df.unwrap_err().strip_backtrace(),
132-
"Error during planning: Statement not supported: EXECUTE"
132+
"Error during planning: Statement not supported: Execute"
133133
);
134134

135135
let options = options.with_allow_statements(true);

datafusion/expr/src/expr_rewriter/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ impl NamePreserver {
314314
| LogicalPlan::Join(_)
315315
| LogicalPlan::TableScan(_)
316316
| LogicalPlan::Limit(_)
317-
| LogicalPlan::Execute(_)
317+
| LogicalPlan::Statement(_)
318318
),
319319
}
320320
}

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use crate::utils::{
4242
};
4343
use crate::{
4444
and, binary_expr, lit, DmlStatement, Expr, ExprSchemable, Operator, RecursiveQuery,
45-
TableProviderFilterPushDown, TableSource, WriteOp,
45+
Statement, TableProviderFilterPushDown, TableSource, WriteOp,
4646
};
4747

4848
use super::dml::InsertOp;
@@ -500,11 +500,13 @@ impl LogicalPlanBuilder {
500500

501501
/// Make a builder for a prepare logical plan from the builder's plan
502502
pub fn prepare(self, name: String, data_types: Vec<DataType>) -> Result<Self> {
503-
Ok(Self::new(LogicalPlan::Prepare(Prepare {
504-
name,
505-
data_types,
506-
input: self.plan,
507-
})))
503+
Ok(Self::new(LogicalPlan::Statement(Statement::Prepare(
504+
Prepare {
505+
name,
506+
data_types,
507+
input: self.plan,
508+
},
509+
))))
508510
}
509511

510512
/// Limit the number of rows returned

datafusion/expr/src/logical_plan/display.rs

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ use std::collections::HashMap;
2121
use std::fmt;
2222

2323
use crate::{
24-
expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn, DmlStatement, Execute,
25-
Expr, Filter, Join, Limit, LogicalPlan, Partitioning, Prepare, Projection,
26-
RecursiveQuery, Repartition, Sort, Subquery, SubqueryAlias,
27-
TableProviderFilterPushDown, TableScan, Unnest, Values, Window,
24+
expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn, DmlStatement, Expr,
25+
Filter, Join, Limit, LogicalPlan, Partitioning, Projection, RecursiveQuery,
26+
Repartition, Sort, Subquery, SubqueryAlias, TableProviderFilterPushDown, TableScan,
27+
Unnest, Values, Window,
2828
};
2929

3030
use crate::dml::CopyTo;
@@ -618,24 +618,6 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {
618618
"Detail": format!("{:?}", e.node)
619619
})
620620
}
621-
LogicalPlan::Prepare(Prepare {
622-
name, data_types, ..
623-
}) => {
624-
json!({
625-
"Node Type": "Prepare",
626-
"Name": name,
627-
"Data Types": format!("{:?}", data_types)
628-
})
629-
}
630-
LogicalPlan::Execute(Execute {
631-
name, parameters, ..
632-
}) => {
633-
json!({
634-
"Node Type": "Execute",
635-
"Name": name,
636-
"Parameters": expr_vec_fmt!(parameters),
637-
})
638-
}
639621
LogicalPlan::DescribeTable(DescribeTable { .. }) => {
640622
json!({
641623
"Node Type": "DescribeTable"

datafusion/expr/src/logical_plan/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,14 @@ pub use ddl::{
3636
pub use dml::{DmlStatement, WriteOp};
3737
pub use plan::{
3838
projection_schema, Aggregate, Analyze, ColumnUnnestList, DescribeTable, Distinct,
39-
DistinctOn, EmptyRelation, Execute, Explain, Extension, FetchType, Filter, Join,
40-
JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
41-
Projection, RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, Subquery,
39+
DistinctOn, EmptyRelation, Explain, Extension, FetchType, Filter, Join,
40+
JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Projection,
41+
RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, Subquery,
4242
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
4343
};
4444
pub use statement::{
45-
SetVariable, Statement, TransactionAccessMode, TransactionConclusion, TransactionEnd,
46-
TransactionIsolationLevel, TransactionStart,
45+
Execute, Prepare, SetVariable, Statement, TransactionAccessMode,
46+
TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
4747
};
4848

4949
pub use display::display_schema;

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 27 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ use crate::utils::{
3939
split_conjunction,
4040
};
4141
use crate::{
42-
build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, Expr,
43-
ExprSchemable, LogicalPlanBuilder, Operator, TableProviderFilterPushDown,
44-
TableSource, WindowFunctionDefinition,
42+
build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, Execute,
43+
Expr, ExprSchemable, LogicalPlanBuilder, Operator, Prepare,
44+
TableProviderFilterPushDown, TableSource, WindowFunctionDefinition,
4545
};
4646

4747
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
@@ -262,11 +262,6 @@ pub enum LogicalPlan {
262262
/// Remove duplicate rows from the input. This is used to
263263
/// implement SQL `SELECT DISTINCT ...`.
264264
Distinct(Distinct),
265-
/// Prepare a statement and find any bind parameters
266-
/// (e.g. `?`). This is used to implement SQL-prepared statements.
267-
Prepare(Prepare),
268-
/// Execute a prepared statement. This is used to implement SQL 'EXECUTE'.
269-
Execute(Execute),
270265
/// Data Manipulation Language (DML): Insert / Update / Delete
271266
Dml(DmlStatement),
272267
/// Data Definition Language (DDL): CREATE / DROP TABLES / VIEWS / SCHEMAS
@@ -314,8 +309,6 @@ impl LogicalPlan {
314309
LogicalPlan::Statement(statement) => statement.schema(),
315310
LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
316311
LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
317-
LogicalPlan::Prepare(Prepare { input, .. }) => input.schema(),
318-
LogicalPlan::Execute(Execute { schema, .. }) => schema,
319312
LogicalPlan::Explain(explain) => &explain.schema,
320313
LogicalPlan::Analyze(analyze) => &analyze.schema,
321314
LogicalPlan::Extension(extension) => extension.node.schema(),
@@ -448,18 +441,16 @@ impl LogicalPlan {
448441
LogicalPlan::Copy(copy) => vec![&copy.input],
449442
LogicalPlan::Ddl(ddl) => ddl.inputs(),
450443
LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
451-
LogicalPlan::Prepare(Prepare { input, .. }) => vec![input],
452444
LogicalPlan::RecursiveQuery(RecursiveQuery {
453445
static_term,
454446
recursive_term,
455447
..
456448
}) => vec![static_term, recursive_term],
449+
LogicalPlan::Statement(stmt) => stmt.inputs(),
457450
// plans without inputs
458451
LogicalPlan::TableScan { .. }
459-
| LogicalPlan::Statement { .. }
460452
| LogicalPlan::EmptyRelation { .. }
461453
| LogicalPlan::Values { .. }
462-
| LogicalPlan::Execute { .. }
463454
| LogicalPlan::DescribeTable(_) => vec![],
464455
}
465456
}
@@ -562,8 +553,6 @@ impl LogicalPlan {
562553
}
563554
LogicalPlan::Subquery(_) => Ok(None),
564555
LogicalPlan::EmptyRelation(_)
565-
| LogicalPlan::Prepare(_)
566-
| LogicalPlan::Execute(_)
567556
| LogicalPlan::Statement(_)
568557
| LogicalPlan::Values(_)
569558
| LogicalPlan::Explain(_)
@@ -715,8 +704,6 @@ impl LogicalPlan {
715704
LogicalPlan::RecursiveQuery(_) => Ok(self),
716705
LogicalPlan::Analyze(_) => Ok(self),
717706
LogicalPlan::Explain(_) => Ok(self),
718-
LogicalPlan::Prepare(_) => Ok(self),
719-
LogicalPlan::Execute(_) => Ok(self),
720707
LogicalPlan::TableScan(_) => Ok(self),
721708
LogicalPlan::EmptyRelation(_) => Ok(self),
722709
LogicalPlan::Statement(_) => Ok(self),
@@ -1070,24 +1057,25 @@ impl LogicalPlan {
10701057
logical_optimization_succeeded: e.logical_optimization_succeeded,
10711058
}))
10721059
}
1073-
LogicalPlan::Prepare(Prepare {
1074-
name, data_types, ..
1075-
}) => {
1060+
LogicalPlan::Statement(Statement::Prepare(Prepare {
1061+
name,
1062+
data_types,
1063+
..
1064+
})) => {
10761065
self.assert_no_expressions(expr)?;
10771066
let input = self.only_input(inputs)?;
1078-
Ok(LogicalPlan::Prepare(Prepare {
1067+
Ok(LogicalPlan::Statement(Statement::Prepare(Prepare {
10791068
name: name.clone(),
10801069
data_types: data_types.clone(),
10811070
input: Arc::new(input),
1082-
}))
1071+
})))
10831072
}
1084-
LogicalPlan::Execute(Execute { name, schema, .. }) => {
1073+
LogicalPlan::Statement(Statement::Execute(Execute { name, .. })) => {
10851074
self.assert_no_inputs(inputs)?;
1086-
Ok(LogicalPlan::Execute(Execute {
1075+
Ok(LogicalPlan::Statement(Statement::Execute(Execute {
10871076
name: name.clone(),
1088-
schema: Arc::clone(schema),
10891077
parameters: expr,
1090-
}))
1078+
})))
10911079
}
10921080
LogicalPlan::TableScan(ts) => {
10931081
self.assert_no_inputs(inputs)?;
@@ -1184,8 +1172,8 @@ impl LogicalPlan {
11841172
/// Replaces placeholder param values (like `$1`, `$2`) in [`LogicalPlan`]
11851173
/// with the specified `param_values`.
11861174
///
1187-
/// [`LogicalPlan::Prepare`] are
1188-
/// converted to their inner logical plan for execution.
1175+
/// [`Prepare`] statements are converted to
1176+
/// their inner logical plan for execution.
11891177
///
11901178
/// # Example
11911179
/// ```
@@ -1242,13 +1230,17 @@ impl LogicalPlan {
12421230
let plan_with_values = self.replace_params_with_values(&param_values)?;
12431231

12441232
// unwrap Prepare
1245-
Ok(if let LogicalPlan::Prepare(prepare_lp) = plan_with_values {
1246-
param_values.verify(&prepare_lp.data_types)?;
1247-
// try and take ownership of the input if is not shared, clone otherwise
1248-
Arc::unwrap_or_clone(prepare_lp.input)
1249-
} else {
1250-
plan_with_values
1251-
})
1233+
Ok(
1234+
if let LogicalPlan::Statement(Statement::Prepare(prepare_lp)) =
1235+
plan_with_values
1236+
{
1237+
param_values.verify(&prepare_lp.data_types)?;
1238+
// try and take ownership of the input if is not shared, clone otherwise
1239+
Arc::unwrap_or_clone(prepare_lp.input)
1240+
} else {
1241+
plan_with_values
1242+
},
1243+
)
12521244
}
12531245

12541246
/// Returns the maximum number of rows that this plan can output, if known.
@@ -1346,8 +1338,6 @@ impl LogicalPlan {
13461338
| LogicalPlan::Dml(_)
13471339
| LogicalPlan::Copy(_)
13481340
| LogicalPlan::DescribeTable(_)
1349-
| LogicalPlan::Prepare(_)
1350-
| LogicalPlan::Execute(_)
13511341
| LogicalPlan::Statement(_)
13521342
| LogicalPlan::Extension(_) => None,
13531343
}
@@ -1962,14 +1952,6 @@ impl LogicalPlan {
19621952
LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
19631953
LogicalPlan::Union(_) => write!(f, "Union"),
19641954
LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
1965-
LogicalPlan::Prepare(Prepare {
1966-
name, data_types, ..
1967-
}) => {
1968-
write!(f, "Prepare: {name:?} {data_types:?} ")
1969-
}
1970-
LogicalPlan::Execute(Execute { name, parameters, .. }) => {
1971-
write!(f, "Execute: {} params=[{}]", name, expr_vec_fmt!(parameters))
1972-
}
19731955
LogicalPlan::DescribeTable(DescribeTable { .. }) => {
19741956
write!(f, "DescribeTable")
19751957
}
@@ -2624,39 +2606,6 @@ impl PartialOrd for Union {
26242606
}
26252607
}
26262608

2627-
/// Prepare a statement but do not execute it. Prepare statements can have 0 or more
2628-
/// `Expr::Placeholder` expressions that are filled in during execution
2629-
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
2630-
pub struct Prepare {
2631-
/// The name of the statement
2632-
pub name: String,
2633-
/// Data types of the parameters ([`Expr::Placeholder`])
2634-
pub data_types: Vec<DataType>,
2635-
/// The logical plan of the statements
2636-
pub input: Arc<LogicalPlan>,
2637-
}
2638-
2639-
/// Execute a prepared statement.
2640-
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
2641-
pub struct Execute {
2642-
/// The name of the prepared statement to execute
2643-
pub name: String,
2644-
/// The execute parameters
2645-
pub parameters: Vec<Expr>,
2646-
/// Dummy schema
2647-
pub schema: DFSchemaRef,
2648-
}
2649-
2650-
// Comparison excludes the `schema` field.
2651-
impl PartialOrd for Execute {
2652-
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
2653-
match self.name.partial_cmp(&other.name) {
2654-
Some(Ordering::Equal) => self.parameters.partial_cmp(&other.parameters),
2655-
cmp => cmp,
2656-
}
2657-
}
2658-
}
2659-
26602609
/// Describe the schema of table
26612610
///
26622611
/// # Example output:

0 commit comments

Comments
 (0)