Skip to content

Improve ability to cancel queries quickly #16301

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from

Conversation

pepijnve
Copy link
Contributor

@pepijnve pepijnve commented Jun 6, 2025

Which issue does this PR close?

This PR is an alternative 'operator intrusive' solution based on the work initially done in #16196. This should not be considered for merging until the dust has settled on #16196, but feedback would already be appreciated.

Rationale for this change

Cancelling tasks in Tokio is a cooperative endeavor. The documentation on cooperative scheduling describes a problematic situation when a task polls a stream in a loop and that stream happens to be always ready. In that situation the task effectively becomes a busy loop and will block the Tokio executor thread for an extended period of time. This prevents the task from being cancelled and also prevents other tasks from being scheduled.

Since DataFusion queries are executed as Tokio tasks, the above applies to queries as well. The most common example of this type of code in DataFusion are pipeline breaking operators that try to drain their input completely before emitting results themselves. There are however other examples that may loop for extended periods of time. Filters that reject many full batches, the build phase of join operators, sort-merge that takes a long time finding inner join matches, etc.

This style of looping in operator implementations causes issues when the operator in question polls from an always ready input stream since the operator then hits a busy loop. This in turn prevents the query from being cancelled. Even if the task is aborted via the JoinHandle of the task, the operator will keep on running until completion.
Note that (in my opinion at least) the always ready input stream is not the issue. Viewed in isolation this is a perfectly fine implementation that always returns control in a timely fashion. The problem is the consumer looping in an uncontrolled fashion and not returning control.

In order to give the Tokio scheduler the time to do its work operators should ensure they yield to the runtime every now and then. Unfortunately yielding to the runtime means the poll_next calls of each stream in a chain needs to return and then be invoked again. This is not a zero cost operation so we should be careful not to yield too often. Additionally the overhead of checking if we should yield and of unwinding and rebuilding the call chain should be kept to a minimum.

Proposed implementation

Implement something similar to coop::consume_budget

Provide a mechanism similar in spirit to coop::consume_budget, but tailored to the needs of this project. This PR implements that approach. Details on the implementation provided below.

This is the suggestion made in the current documentation at https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#cancellation--aborting-execution and there is ad hoc prior art for this in RepartitionExec. The idea would be to make following the guideline as convenient as possible.

Possible alternatives

Use tokio::task::coop::consume_budget

Tokio's consume_budget provides a runtime managed cooperative scheduling budget. Operators could use this to insert yield points where necessary. The benefit of this approach is that the tokio runtime manages the scheduling budget. There's very little work to do in each operator.
The downside may be (but I did not measure this yet) that the task budget is stored in thread local storage which may make accessing it slower than desired. consume_budget is also an async function making it's use in state machine style stream implementations rather cumbersome.

Inject yield points externally

PR #16196 takes the approach of injecting yield operators externally using a physical optimizer rule. The rule uses the EmissionType of operators to determine where yield points are required. When it determines an operator requires yield points it ensures all leaf nodes under the operator in question periodically return a Pending result. This is done either by asking the leaf node to do so or by wrapping the leaf node in an operator that creates a wrapper Stream.

This is a nice solution because it does not necessarily require cooperation from operator implementations, but there are downsides as well.

Describing precisely when yield points are required in a declarative fashion is not trivial. EmissionType::Final is a good first heuristic, but for some operators like filter and join build phase this is not sufficient. Because of this difficulty PR #16196 now always injects yield points even when unnecessary.

Determining where yield points are required based on the static ExecutionPlan tree alone is not always sufficient. As an example, a grouped aggregation that spills large amounts of data to disk will emit data from a stream that is created after the original input has been drained. This takes the original yielding wrapper stream out of the picture. If the new stream is then consumed by another 'final' operator, the query may become uncancellable again depending on the newly created stream. This could be solved by ensuring operators wrap streams they produce themselves as well, but that again requires operator cooperation again which somewhat negates the benefits of this approach.
In practice this does work out ok today because SortPreservingMergeStream has a tendency to return Pending a lot for some reason. I still need to study why that's the case.

When evaluating this design from an assignment of responsibilities perspective it is not ideal. It is not the responsibility of the producer of data, that happens to always be ready, to return Pending. Looking at the producer Stream implementation in isolation, it's not obvious why it would do this since it's already returning control and it is not waiting for data itself. The only reason it would do this is because it assumes the consumer is not well behaved and needs help to force a yield.

From a performance perspective, injecting yield points at the leaves is theoretically the worst case scenario when looking at the depth of the call stack that needs to be unwound. By definition the call stack is the deepest at the leave streams so yielding from there incurs the highest possible cost.

What changes are included in this PR?

A new PollBudget support type is introduced. This is a type safe wrapper around an Option<u8> that represents the number of times a single operator is allowed to poll its input (or loop in general) without itself returning a Poll value.

Operators that manually implement Stream state machines make use of this type by creating a ConsumeBudget future on entry to poll_next. When polled, this future will return Ready(()) as many times as the configured poll budget allows. When the budget is depleted it returns Pending. In the Stream::poll_next implementations, ready!(consume_budget.poll(cx)) has been inserted at points where the implementation would loop.

An (admittedly very amateur) analysis of the optimized compiler output shows that this is a fairly efficient implementation strategy. On entry to poll_next the initial counter value is loaded. The child poll call is then invoked. In this example when the child returns pending or an error this is immediately returned without any extra work. Only in the case where the stream would loop is the counter decremented and tested. All combined this makes for a fairly inexpensive yield point.

The construct described above is difficult to integrate with Stream implementations that use async function based implementations. As an approximation they instead use a wrapper YieldStream around their input streams. This YieldStream counts the number of times poll_next is called and returns Pending when the budget is depleted. When Pending is returned, or the wrapped stream returns Pending or an error itself, the budget is reset.
This implementation is a bit less precise and optimal than the state machine variant. Ideally the budget counter is reset whenever the stream making use of the YieldStream returns any value, as is the case in the state machine variant, but I have not figured out how this could be implemented yet.

The ad hoc implementation in RepartitionExec is quite similar to what's being done here, but suffers from the same problem. It yields every num_partition batches, but it does so regardless of whether any of the await points in the async function have yielded already. As a consequence it may yield unnecessarily often. My expertise on Rust async is still quite limited, and I have not been able to find a way to know whether an await point returned Pending or continued immediately due to Ready.

Are these changes tested?

Existing tests pass. Some additional tests for yielding have been added, but not all affected operators are covered already.

Are there any user-facing changes?

There are no breaking API changes. This PR does introduce a new soft requirement for downstream users who implement custom operators. In order to support cooperative cancellation operators that loop for an extended period of time when their input is always ready should make use of the cooperative yielding support. On there other hand, this is already the current guideline today.

@github-actions github-actions bot added documentation Improvements or additions to documentation core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) common Related to common crate physical-plan Changes to the physical-plan crate labels Jun 6, 2025
@pepijnve
Copy link
Contributor Author

Superseded by #16398

@pepijnve pepijnve closed this Jun 14, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
common Related to common crate core Core DataFusion crate documentation Improvements or additions to documentation physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
1 participant