Skip to content

Commit ef80a31

Browse files
committed
When waiting on a task, escalate it before enqueuing the waiting task.
As soon as the waiting task is successfully enqueued on the blocking task, both tasks have to be considered invalidated because the blocking task can concurrently complete and resume its waiters: - The waiting task ensures that the blocking task is valid while it's waiting. However, that's measured from the perspective of the waiting task, not from the perspective of the thread that was previously executing it. As soon as the waiting task is resumed, the wait call completes and the validity guarantee on the blocking task disappears, so the blocking task must be treated as invalidated. - The waiting task ensures that it is valid as long as it isn't complete. Since it's trying to wait, it must not be complete. However, as soon we resume it, it can complete, so the waiting task must also be treated as invalidated. This is one of those things that's not really easy to test, and the need for a fix is pretty urgent, so I'm submitting this patch without a test. I'll try to land a race test that demonstrates the bug in the next few days. @kavon deserves all the credit here for some truly heroic debugging and finally recognizing the flaw in the code; I'm just popping in at the last minute to sheepishly patch the bug. Fixes rdar://92666987
1 parent ddf9268 commit ef80a31

File tree

1 file changed

+42
-5
lines changed

1 file changed

+42
-5
lines changed

stdlib/public/Concurrency/Task.cpp

+42-5
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
113113

114114
auto queueHead = fragment->waitQueue.load(std::memory_order_acquire);
115115
bool contextInitialized = false;
116+
auto escalatedPriority = JobPriority::Unspecified;
116117
while (true) {
117118
switch (queueHead.getStatus()) {
118119
case Status::Error:
@@ -145,6 +146,47 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
145146
waitingTask->flagAsSuspended();
146147
}
147148

149+
// Escalate the blocking task to the priority of the waiting task.
150+
// FIXME: Also record that the waiting task is now waiting on the
151+
// blocking task so that escalators of the waiting task can propagate
152+
// the escalation to the blocking task.
153+
//
154+
// Recording this dependency is tricky because we need escalators
155+
// to be able to escalate without worrying about the blocking task
156+
// concurrently finishing, resuming the escalated task, and being
157+
// invalidated. So we're not doing that yet. In the meantime, we
158+
// do the best-effort alternative of escalating the blocking task
159+
// as a one-time deal to the current priority of the waiting task.
160+
// If the waiting task is escalated after this point, the priority
161+
// will not be escalated, but that's inevitable in the absence of
162+
// propagation during escalation.
163+
//
164+
// We have to do the escalation before we successfully enqueue the
165+
// waiting task on the blocking task's wait queue, because as soon as
166+
// we do, this thread is no longer blocking the resumption of the
167+
// waiting task, and so both the blocking task (which is retained
168+
// during the wait only from the waiting task's perspective) and the
169+
// waiting task (which can simply terminate) must be treat as
170+
// invalidated from this thread's perspective.
171+
//
172+
// When we do fix this bug to record the dependency, we will have to
173+
// do it before this escalation of the blocking task so that there
174+
// isn't a race where an escalation of the waiting task can fail
175+
// to propagate to the blocking task. The correct priority to
176+
// escalate to is the priority we observe when we successfully record
177+
// the dependency; any later escalations will automatically propagate.
178+
//
179+
// If the blocking task finishes while we're doing this escalation,
180+
// the escalation will be innocuous. The wasted effort is acceptable;
181+
// programmers should be encouraged to give tasks that will block
182+
// other tasks the correct priority to begin with.
183+
auto waitingStatus =
184+
waitingTask->_private()._status().load(std::memory_order_relaxed);
185+
if (waitingStatus.getStoredPriority() > escalatedPriority) {
186+
swift_task_escalate(this, waitingStatus.getStoredPriority());
187+
escalatedPriority = waitingStatus.getStoredPriority();
188+
}
189+
148190
// Put the waiting task at the beginning of the wait queue.
149191
waitingTask->getNextWaitingTask() = queueHead.getTask();
150192
auto newQueueHead = WaitQueueItem::get(Status::Executing, waitingTask);
@@ -153,11 +195,6 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
153195
/*success*/ std::memory_order_release,
154196
/*failure*/ std::memory_order_acquire)) {
155197

156-
// Escalate the priority of this task based on the priority
157-
// of the waiting task.
158-
auto status = waitingTask->_private()._status().load(std::memory_order_relaxed);
159-
swift_task_escalate(this, status.getStoredPriority());
160-
161198
_swift_task_clearCurrent();
162199
return FutureFragment::Status::Executing;
163200
}

0 commit comments

Comments
 (0)