-
Notifications
You must be signed in to change notification settings - Fork 1.5k
fix: External sort failing on an edge case #15017
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @2010YOUY01
While this likely will result in slightly slower performance in some cases (as there is additional spilling) making sure the queries won't error seems like a very valuable property.
Change the final sort-preserving merge logic of sorting: when it has spilled before, always spill all in-mem batches first, then start the merging phase.
Thank you for the super clear writeup and code. this PR was a pleasure to read.
@@ -468,6 +468,31 @@ async fn test_stringview_external_sort() { | |||
let _ = df.collect().await.expect("Query execution failed"); | |||
} | |||
|
|||
/// This test case is for the regression case: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the reference here to "regression" (which refers normally to something that stopped working when it worked before)
Maybe a better description would be "test_in_mem_buffer_almost_full" or something 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, updated.
// Release the memory reserved for merge back to the pool so | ||
// there is some left when `in_mem_sort_stream` requests an | ||
// allocation. | ||
self.merge_reservation.free(); | ||
|
||
if self.spilled_before() { | ||
let mut streams = vec![]; | ||
|
||
// Sort `in_mem_batches` and spill it first. If there are many |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Thank you for the review. |
Thanks @2010YOUY01 @andygrove as it might be also related to OOMs in Comet on Sort phase |
* fix external sort failure * clippy * review
Which issue does this PR close?
Rationale for this change
I came across one sorting query with memory limit fail indefinitely, here is a simpler reproducer (running in
datafusion-cli
with commit 7597769)# Compile datafusion-cli cargo run --profile release-nonlto -- --mem-pool-type fair -m 10M
2 sort queries are executed: Q1 get executed with no issue, Q2 has smaller input size than Q1, but it failed.
Query failure reason
At the final stage of sorting, all buffered in-memory batches and all the spilled files will be sort-preserving merged at the same time, and this caused the issue.
datafusion/datafusion/physical-plan/src/sorts/sort.rs
Lines 342 to 355 in 7597769
For example, there is one workload, let's say it's executing in a single partition. It's memory limit can hold 10 batches.
A common workaround I believe is to set
datafusion.execution.sort_spill_reservation_bytes
to larger, its used for extra space for merging. However, workloads' row/batch size can vary drastically, also it's possible to see the case in-memory batches has almost reached the memory limit but not yet triggered on spilling, so this parameter is very tricky to configure it correct.To make this simpler, we can always spill the in-memory batches (if it has spilled previously) at the final stage.
What changes are included in this PR?
Change the final sort-preserving merge logic of sorting: when it has spilled before, always spill all in-mem batches first, then start the merging phase.
Are these changes tested?
Regression test is added
Are there any user-facing changes?
No