-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Use Tokio's task budget consistently, better APIs to support task cancellation #16398
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
75fd648
to
935db91
Compare
Thanks for the draft -- this is inline with my understanding from your description. I think it will inch us closer to a good, lasting solution (especially after your upstream tokio also PR merges). Feel free to ping me for a more detailed review once you are done with it |
@ozankabak I've pushed the optimizer rule changes I had in mind. This introduces two new execution plan properties that capture the evaluation type (how children are evaluated: eager vs lazy) and the scheduling type (how poll_next will behave wrt scheduling: blocking vs cooperative). With those two combined the tree can be rewritten in a bottom up fashion. Every leaf that is not cooperative gets wrapped as before. Additionally, any eager evaluating nodes (i.e. exchanges) that are not cooperative are wrapped. This should ensure the entire plan participates in cooperative scheduling. The only caveat that remains is dynamic stream creation. Operators that do that need to take the necessary precautions themselves. I already update the spill manager for this in the previous commit. While I was writing this I started wondering if evaluation type should be a per child thing. In my spawn experiment branch for instance hash join is eager for the build side, but lazy for the probe side. Perhaps it would be best to leave room for that. |
b593bfa
to
c648c0c
Compare
This is in alignment with what I was thinking, let's do it that way |
Thinking about it some more. The evaluation type is intended to describe how the operator computes record batches itself: lazy on demand, or by driving things itself. I’m kind of trying to refer to the terminology from the volcano paper. That talks about demand-driven and data-driven operators. I had first called this 'drive type' with values 'demand' and 'data', but that felt a bit awkward. Since this is actually a property of how the operator prepares its output, one value per operator is probably fine after all. What I'm trying to do with this is find the exchanges in the plan. The current set that's present in DataFusion is all fine, but if you were to implement one using |
Open to suggestions on better names for these properties. |
What puzzles me about the performance regressions are the results from #16398 (comment). At that point in time, the default behavior was what's now I'll have a look at the query plans for some of the queries to make sure the optimizer rule is not injecting coop operators needlessly. The only other difference I can think of is that in this PR |
Zooming in on query 4 (it would be super convenient if these had 1-based indices BTW to match line numbers in the file). The plan is
No redundant coop so that's good. The only element that changed in this plan is |
I added some output to be able to see what the coop logic was doing. Ignore the times; this is dev profile. Yield output:
vs
The next thing I can think of is the access to the budget thread local. But that doesn't explain why we got a very similar benchmark delta with the non-thread local counter.
vs the last result in #16398 (comment) with tokio_fallback
|
Could share budget count affect performance? I can see it only show regression for clickbench partition, i am not sure if it is possible that it will improve some performance when we yield more for multi partition cases. |
I'll experiment with the multi file variant as well. FWIW I'm running this on a 10-core MacBook, so clickbench1 was running with 10 partitions but all from the same file. |
🤔 now that I write '10-core MacBook', I'm wondering if the 10 core part is where my variability is coming from. That's 6 performance and 4 efficiency cores. Ideally DataFusion keeps the CPU bound work on the perf cores, and uses the efficiency ones for IO. I had been wondering about that and NUMA effects already. A topic for a different thread though. |
@zhuqi-lucas the situations where this can have an impact is when you have an operator that switches to a different input stream when the polled input stream returns pending. Interleave is a simple example of that. With the per stream budget, the second poll may end up returning ready. With the shared task budget the second poll will get 'budget depleted' and return pending as well. We may incur some overhead by attempting to poll the second stream even though you can already know that it will return pending. Besides that type of situation, and disregarding any non-obvious side effects of using thread locals for a moment, I don't see how shared vs non-shared budget would make a difference. There shouldn't be competition between tasks for the same budget; the counter is distinct per task and is reset each task 'tick'. |
Got it, so we add Cooperative to some operators such as sort_perserve_merging when partition > 1, it looks like similar to interleave which will have many inputs? I am not sure if it affects some performance. And the original solution, we only add yield to leaf node, leaf node will only has one input i believe. |
No, that hasn't changed (or at least it shouldn't have). When SPM has more than 1 input, it acts as an exchange and drives the inputs itself. This is advertised using Note that each exchange is also a task boundary, so task budget consumption does not leak from below the exchange to above it or vice versa. Since all the inputs to SPM are all driven as distinct tasks, there also shouldn't be any competition amongst the inputs. (I'm stating things as fact above, but this only reflects my current understanding. Working on double checking everything.) |
Ok, i see MPSC already for SPM, we only add the |
@alamb @Dandandan I'm starting to get the feeling this is a wild goose chase. I adapted I'm not sure if you can make any meaningful conclusion based on these timings alone. I'll try to have a look at performance counters next, maybe that paints a clearer picture.
|
@alamb @Dandandan Another update. The realization that Tokio is not P/E-core aware out of the box and that you have very little to no explicit control over thread affinity on macOS (QoS remains a hint) led me to try and find some other machine to run the perf tests on. We still had some Intel desktop machines lying around in our server room, so I installed a fresh Ubuntu server 24.04 on one of them and ran the test. Results below. Strengthens my believe that I'm measuring noise. Additionally I ran some tests with Instruments yesterday comparing total number of retired instructions, branch mispredictions, etc. There's some variability across runs, but it's more or less the same (as you would expect). The hardware specs this ran on are:
|
Yes i also find the ClickHouse numbering schemes confusing (we follow the https://github.com/ClickHouse/ClickBench convention) Thank you @pepijnve -- I have reviewed your analysis and agree with your conclusion (that the reported differences are likely just noise) To double check I tried to reproduce the results reported manually locally │ QQuery 4 │ 614.68 ms │ 702.11 ms │ 1.14x slower │ I used a decided unscientific approach $ cat q4.sql
SELECT COUNT(DISTINCT "UserID") FROM hits; $ datafusion-cli -f q4.sql | grep Elapsed Results on
Results on this PR
This I conclude there is no appreciable difference and we should merge this PR. I'll plan to do so after we get a clean CI run (I'll merge up to fix conflicts too) The Power vs Efficiency cores is a great (and fascinating) observation -- and one that I think deserves further study. I'll file another ticket to discuss that |
Thank you also @zhuqi-lucas for all your help and attention to this PR |
Actually, @zhuqi-lucas it wasn't 100% clear to me from your comments -- are you ok with this PR being merged? |
@alamb I agree with merging this PR, the change should not affect the performance since we already use MPSC for SPM, etc, thanks! |
I think the CI failures are due to Merging up to get a clean run |
🚀 |
Oh my, it landed 🎉. Thanks @alamb, great start of my weekend! Still working on the Tokio PR. I'll create a follow-up issue to referring to the todo I left in the code. |
|
Yeah I think the bias towards action is the name of the game in DataFusion to keep things moving forward (rather than stall due to apathy). Basically i like to think "if there is nothing actively stopping a PR from merging, we should merge it!" |
Which issue does this PR close?
Rationale for this change
RecordBatchStreamReceiver
supports cooperative scheduling implicitly by using Tokio's task budget.YieldStream
currently uses a custom mechanism. It would be better to use a single mechanism consistently.What changes are included in this PR?
Note that the implementation of CooperativeStream in this PR is suboptimal. The final implementation requires tokio-rs/tokio#7405 which I'm trying to move along as best I can.
Are these changes tested?
Covered by
infinite_cancel
coop
test.Are there any user-facing changes?
Yes, the
datafusion.optimizer.yield_period
configuration option is removed, but at the time of writing this has not been released yet.