-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Add support for recursive CTEs #7581
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4418ad2
dac7f22
daa995c
aa6d74a
5839dd1
d8af7fb
515312c
f9faa05
38e95dd
219de0c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -112,6 +112,8 @@ pub enum LogicalPlan { | |
/// produces 0 or 1 row. This is used to implement SQL `SELECT` | ||
/// that has no values in the `FROM` clause. | ||
EmptyRelation(EmptyRelation), | ||
/// A named temporary relation with a schema. | ||
NamedRelation(NamedRelation), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am considering whether the Use But this idea is in its early stages and may be wrong. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jonahgao, could you provide the rationale for your suggested strategy? I'm interested in understanding why it might be more effective than the current implementation. Performance is critical to our use case. And the implementation for recursion is very sensitive to performance considerations, as the setup for execution and stream management isn't amortized over all input record batches. Instead, it's incurred with each iteration. For instance, we've observed a substantial performance boost—up to 30 times faster—by eliminating certain intermediate nodes, like coalesce, from our plan (as evidenced in this PR). I've drafted another PR that appears to again double the speed of execution merely by omitting metric collection in recursive sub-graphs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One rationale might be to make the implementation simpler -- if we could implement the recursive relation as a table provider, it would likely allow the changes to be more localized / smaller (e.g. maybe we could reuse There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Basically I understand the need to have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @matthewgapp Another rationale might be to support pushing down filters to the working table, which may be useful if we support spilling the working table to disk in the future. I think the performance should not be affected, the execution of physical plans is almost the same as it is now. I implemented a demo on this branch and in this commit. GitHub does not allow forking a repository twice, so I directly pushed it to another repository for convenience. In this demo, I attempted to replace the Another change is that I used a structure called The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wow, tyty! I was in the process of implementing the shared table and my implementation turned out very similar to yours although I ended up working around the crate dependency graph constraints a bit differently by introducing a couple new traits. But I did end up exposing a method on the context to generate a table. I like your approach better. I tested out your poc and performance remains about the same between my previous implementation and your new worktable approach! (which makes sense). I'm going to go ahead and work based on your POC toward the list of PRs that Andrew wants to get this landed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for your work and for the nexting contributions! @matthewgapp |
||
/// Produces the output of running another query. This is used to | ||
/// implement SQL subqueries | ||
Subquery(Subquery), | ||
|
@@ -154,6 +156,8 @@ pub enum LogicalPlan { | |
/// Unnest a column that contains a nested list type such as an | ||
/// ARRAY. This is used to implement SQL `UNNEST` | ||
Unnest(Unnest), | ||
/// A variadic query (e.g. "Recursive CTEs") | ||
RecursiveQuery(RecursiveQuery), | ||
} | ||
|
||
impl LogicalPlan { | ||
|
@@ -191,6 +195,11 @@ impl LogicalPlan { | |
LogicalPlan::Copy(CopyTo { input, .. }) => input.schema(), | ||
LogicalPlan::Ddl(ddl) => ddl.schema(), | ||
LogicalPlan::Unnest(Unnest { schema, .. }) => schema, | ||
LogicalPlan::NamedRelation(NamedRelation { schema, .. }) => schema, | ||
LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => { | ||
// we take the schema of the static term as the schema of the entire recursive query | ||
static_term.schema() | ||
} | ||
} | ||
} | ||
|
||
|
@@ -233,6 +242,7 @@ impl LogicalPlan { | |
LogicalPlan::Explain(_) | ||
| LogicalPlan::Analyze(_) | ||
| LogicalPlan::EmptyRelation(_) | ||
| LogicalPlan::NamedRelation(_) | ||
| LogicalPlan::Ddl(_) | ||
| LogicalPlan::Dml(_) | ||
| LogicalPlan::Copy(_) | ||
|
@@ -243,6 +253,10 @@ impl LogicalPlan { | |
| LogicalPlan::TableScan(_) => { | ||
vec![self.schema()] | ||
} | ||
LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => { | ||
// return only the schema of the static term | ||
static_term.all_schemas() | ||
} | ||
// return children schemas | ||
LogicalPlan::Limit(_) | ||
| LogicalPlan::Subquery(_) | ||
|
@@ -384,6 +398,9 @@ impl LogicalPlan { | |
.try_for_each(f), | ||
// plans without expressions | ||
LogicalPlan::EmptyRelation(_) | ||
| LogicalPlan::NamedRelation(_) | ||
// TODO: not sure if this should go here | ||
| LogicalPlan::RecursiveQuery(_) | ||
| LogicalPlan::Subquery(_) | ||
| LogicalPlan::SubqueryAlias(_) | ||
| LogicalPlan::Limit(_) | ||
|
@@ -430,8 +447,14 @@ impl LogicalPlan { | |
LogicalPlan::Ddl(ddl) => ddl.inputs(), | ||
LogicalPlan::Unnest(Unnest { input, .. }) => vec![input], | ||
LogicalPlan::Prepare(Prepare { input, .. }) => vec![input], | ||
LogicalPlan::RecursiveQuery(RecursiveQuery { | ||
static_term, | ||
recursive_term, | ||
.. | ||
}) => vec![static_term, recursive_term], | ||
// plans without inputs | ||
LogicalPlan::TableScan { .. } | ||
| LogicalPlan::NamedRelation(_) | ||
| LogicalPlan::Statement { .. } | ||
| LogicalPlan::EmptyRelation { .. } | ||
| LogicalPlan::Values { .. } | ||
|
@@ -510,6 +533,9 @@ impl LogicalPlan { | |
cross.left.head_output_expr() | ||
} | ||
} | ||
LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => { | ||
static_term.head_output_expr() | ||
} | ||
LogicalPlan::Union(union) => Ok(Some(Expr::Column( | ||
union.schema.fields()[0].qualified_column(), | ||
))), | ||
|
@@ -529,6 +555,7 @@ impl LogicalPlan { | |
} | ||
LogicalPlan::Subquery(_) => Ok(None), | ||
LogicalPlan::EmptyRelation(_) | ||
| LogicalPlan::NamedRelation(_) | ||
| LogicalPlan::Prepare(_) | ||
| LogicalPlan::Statement(_) | ||
| LogicalPlan::Values(_) | ||
|
@@ -835,6 +862,14 @@ impl LogicalPlan { | |
}; | ||
Ok(LogicalPlan::Distinct(distinct)) | ||
} | ||
LogicalPlan::RecursiveQuery(RecursiveQuery { | ||
name, is_distinct, .. | ||
}) => Ok(LogicalPlan::RecursiveQuery(RecursiveQuery { | ||
name: name.clone(), | ||
static_term: Arc::new(inputs[0].clone()), | ||
recursive_term: Arc::new(inputs[1].clone()), | ||
is_distinct: *is_distinct, | ||
})), | ||
LogicalPlan::Analyze(a) => { | ||
assert!(expr.is_empty()); | ||
assert_eq!(inputs.len(), 1); | ||
|
@@ -873,6 +908,7 @@ impl LogicalPlan { | |
})) | ||
} | ||
LogicalPlan::EmptyRelation(_) | ||
| LogicalPlan::NamedRelation(_) | ||
| LogicalPlan::Ddl(_) | ||
| LogicalPlan::Statement(_) => { | ||
// All of these plan types have no inputs / exprs so should not be called | ||
|
@@ -1073,6 +1109,9 @@ impl LogicalPlan { | |
}), | ||
LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch, | ||
LogicalPlan::EmptyRelation(_) => Some(0), | ||
// TODO: not sure if this is correct | ||
LogicalPlan::NamedRelation(_) => None, | ||
LogicalPlan::RecursiveQuery(_) => None, | ||
LogicalPlan::Subquery(_) => None, | ||
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(), | ||
LogicalPlan::Limit(Limit { fetch, .. }) => *fetch, | ||
|
@@ -1408,6 +1447,14 @@ impl LogicalPlan { | |
fn fmt(&self, f: &mut Formatter) -> fmt::Result { | ||
match self.0 { | ||
LogicalPlan::EmptyRelation(_) => write!(f, "EmptyRelation"), | ||
LogicalPlan::NamedRelation(NamedRelation { name, .. }) => { | ||
write!(f, "NamedRelation: {}", name) | ||
} | ||
LogicalPlan::RecursiveQuery(RecursiveQuery { | ||
is_distinct, .. | ||
}) => { | ||
write!(f, "RecursiveQuery: is_distinct={}", is_distinct) | ||
} | ||
LogicalPlan::Values(Values { ref values, .. }) => { | ||
let str_values: Vec<_> = values | ||
.iter() | ||
|
@@ -1718,6 +1765,28 @@ pub struct EmptyRelation { | |
pub schema: DFSchemaRef, | ||
} | ||
|
||
/// A named temporary relation with a known schema. | ||
#[derive(Clone, PartialEq, Eq, Hash)] | ||
pub struct NamedRelation { | ||
/// The relation name | ||
pub name: String, | ||
/// The schema description | ||
pub schema: DFSchemaRef, | ||
} | ||
|
||
/// A variadic query operation | ||
#[derive(Clone, PartialEq, Eq, Hash)] | ||
pub struct RecursiveQuery { | ||
/// Name of the query | ||
pub name: String, | ||
/// The static term | ||
pub static_term: Arc<LogicalPlan>, | ||
/// The recursive term | ||
pub recursive_term: Arc<LogicalPlan>, | ||
/// Distinction | ||
pub is_distinct: bool, | ||
} | ||
|
||
/// Values expression. See | ||
/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html) | ||
/// documentation for more details. | ||
|
Uh oh!
There was an error while loading. Please reload this page.