-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for recursive CTEs #7581
Conversation
deedb21
to
8b4a646
Compare
815c72f
to
f9614d0
Compare
Started adding tests, will continue to add more. |
I've been out for the last couple weeks because life got busy. I'll be back on this soon to push it over the finish line. |
f5d7eba
to
e8d78c8
Compare
Rebased, plan to finish this out today/tomorrow. |
@alamb Finally got around to cleaning this up. I think this is ready for initial review. Happy to hear what other tests/safeguards you'd like to see in the initial version. But would be great to get in so that I can continue layering on improvements without the PR blowing up into too much of a larger change. |
d7721f1
to
e0724dd
Compare
wip: fixes after rebase but tpcds_physical_q54 keeps overflowing its stack
7dba5b6
to
219de0c
Compare
@@ -1387,22 +1387,6 @@ fn select_interval_out_of_range() { | |||
); | |||
} | |||
|
|||
#[test] | |||
fn recursive_ctes() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🥳
@@ -112,6 +112,8 @@ pub enum LogicalPlan { | |||
/// produces 0 or 1 row. This is used to implement SQL `SELECT` | |||
/// that has no values in the `FROM` clause. | |||
EmptyRelation(EmptyRelation), | |||
/// A named temporary relation with a schema. | |||
NamedRelation(NamedRelation), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am considering whether the NamedRelation
and RecursiveQuery
could be implemented as two TableSource
s, one being CTESelfRefTable
and the other being CTERecursiveTable
, and then use TableScan to read them.
Use CTESelfRefTable
within the recursive term and CTERecursiveTable
in the outer query.
But this idea is in its early stages and may be wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jonahgao, could you provide the rationale for your suggested strategy? I'm interested in understanding why it might be more effective than the current implementation. Performance is critical to our use case. And the implementation for recursion is very sensitive to performance considerations, as the setup for execution and stream management isn't amortized over all input record batches. Instead, it's incurred with each iteration. For instance, we've observed a substantial performance boost—up to 30 times faster—by eliminating certain intermediate nodes, like coalesce, from our plan (as evidenced in this PR). I've drafted another PR that appears to again double the speed of execution merely by omitting metric collection in recursive sub-graphs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One rationale might be to make the implementation simpler -- if we could implement the recursive relation as a table provider, it would likely allow the changes to be more localized / smaller (e.g. maybe we could reuse MemTable::load
to update the batches on each iteration)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically I understand the need to have LogicalPlan::RecursiveQuery
but I don't (yet) understand the need to have the NamedRelation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NamedRelation
is primarily a way to mirror batches back to the RecursiveQuery
via its physical counterpart, ContinuanceExec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@matthewgapp Another rationale might be to support pushing down filters to the working table, which may be useful if we support spilling the working table to disk in the future. I think the performance should not be affected, the execution of physical plans is almost the same as it is now.
I implemented a demo on this branch and in this commit. GitHub does not allow forking a repository twice, so I directly pushed it to another repository for convenience.
In this demo, I attempted to replace the NamedRelation
with a TableProvider
, namely CteWorkTable
. The benefit of this is that it can avoid maintaining a new logical plan.
Another change is that I used a structure called WorkTable
to connect the RecursiveQueryExec
and the WorkTableExec
(it was previously ContinuanceExec
). The advantage of this is that it avoids maintaining some external context information, such as relation_handlers
in TaskContext
, and the ctx
in create_initial_plan
.
The WorkTable
is a shared table, it will be scanned by the WorkTableExec
during the execution of the recursive term, and after the execution is completed, the results will be written back to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, tyty! I was in the process of implementing the shared table and my implementation turned out very similar to yours although I ended up working around the crate dependency graph constraints a bit differently by introducing a couple new traits. But I did end up exposing a method on the context to generate a table. I like your approach better.
I tested out your poc and performance remains about the same between my previous implementation and your new worktable approach! (which makes sense).
I'm going to go ahead and work based on your POC toward the list of PRs that Andrew wants to get this landed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your work and for the nexting contributions! @matthewgapp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your patience @matthewgapp -- I was finally able to find the few hours I needed to load all the context into my head to review this PR properly
I left some high level comments on the design (specifically how to simplify it / potentially keep the implementation more isolated). However overall I found this PR easy to read and follow. Nice work
BTW if anyone else needs a reminder how recursive CTEs work, here is the postgres documentation: https://www.postgresql.org/docs/11/queries-with.html#id-1.5.6.12.5.4
@@ -112,6 +112,8 @@ pub enum LogicalPlan { | |||
/// produces 0 or 1 row. This is used to implement SQL `SELECT` | |||
/// that has no values in the `FROM` clause. | |||
EmptyRelation(EmptyRelation), | |||
/// A named temporary relation with a schema. | |||
NamedRelation(NamedRelation), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One rationale might be to make the implementation simpler -- if we could implement the recursive relation as a table provider, it would likely allow the changes to be more localized / smaller (e.g. maybe we could reuse MemTable::load
to update the batches on each iteration)
// Downstream plans should not expect any partitioning. | ||
let partition = 0; | ||
|
||
self.recursive_stream = Some( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this calls execute
again recursively, if we used a TableProvider
the underlying TableProvider::execute
would be called again too
@@ -112,6 +112,8 @@ pub enum LogicalPlan { | |||
/// produces 0 or 1 row. This is used to implement SQL `SELECT` | |||
/// that has no values in the `FROM` clause. | |||
EmptyRelation(EmptyRelation), | |||
/// A named temporary relation with a schema. | |||
NamedRelation(NamedRelation), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically I understand the need to have LogicalPlan::RecursiveQuery
but I don't (yet) understand the need to have the NamedRelation
cc @Dandandan for your thoughts |
I thought more about this PR last night and in the shower. First of all, I think the direction it is headed is great and would make a good addition to DataFusion. Thank you @matthewgapp both for the code as well as for your patience In terms of implementation, I think there are a few items that we need prior to making this feature available for everyone
Some nice to have (but not strictly required) items:
In order to avoid a massive PR, my suggestion is to implement this feature in stages:
|
Thanks @alamb! Think this impl plan makes sense. I'll start knocking out the PRs. Also going to have a go at reworking the existing PR to use MemTable in the two plans (as @jonahgao mentioned here #7581 (comment)). I'll link the resulting PRs to this one. |
I think this PR has been superseded by other PRs on #462 (comment) I think all that remains for CTE work is make sure it doesn't explode with memory and turn it on by default 🎉 (that is tracked by #462) |
This PR implements support for recursive CTEs based on @isidentical's design here #462 (comment)
Caveats:
It does not attempt to place safeguards against infinite recursion. That could be implemented as a follow-up PR. It would be great to have some syntax like
OPTON (MAXRECURSION 100)
to prevent infinite recursion. Since the sql parser crate doesn’t support OPTIONS (MAXRECURSION) I might create a max-recursion parameter to the logical and physical plans that can be inserted manually once the logical plan is produced via SQL as a near-term workaround to protect against infinite recursion. I'll do that in a separate follow-on PR though.It's slower than it needs to be because of the execution plan inserts coalesce and repartition execution steps within the recursive term. This optimization is solved via this follow-on PR. There is also room to omit gathering of statistics for each execution iteration because setting up statistics on each execution iteration is expensive.
Which issue does this PR close?
Closes #462
Rationale for this change
Datafusion should support recursive CTEs so that it can express calculations that depend on the results of previous iterations.
What changes are included in this PR?
Are these changes tested?
Yes, they're tested via the SQL tests. The SQL tests use CSVs introduced by this PR apache/arrow-testing#93
Are there any user-facing changes?