@@ -113,6 +113,7 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
113
113
114
114
auto queueHead = fragment->waitQueue .load (std::memory_order_acquire);
115
115
bool contextInitialized = false ;
116
+ auto escalatedPriority = JobPriority::Unspecified;
116
117
while (true ) {
117
118
switch (queueHead.getStatus ()) {
118
119
case Status::Error:
@@ -145,6 +146,47 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
145
146
waitingTask->flagAsSuspended ();
146
147
}
147
148
149
+ // Escalate the blocking task to the priority of the waiting task.
150
+ // FIXME: Also record that the waiting task is now waiting on the
151
+ // blocking task so that escalators of the waiting task can propagate
152
+ // the escalation to the blocking task.
153
+ //
154
+ // Recording this dependency is tricky because we need escalators
155
+ // to be able to escalate without worrying about the blocking task
156
+ // concurrently finishing, resuming the escalated task, and being
157
+ // invalidated. So we're not doing that yet. In the meantime, we
158
+ // do the best-effort alternative of escalating the blocking task
159
+ // as a one-time deal to the current priority of the waiting task.
160
+ // If the waiting task is escalated after this point, the priority
161
+ // will not be escalated, but that's inevitable in the absence of
162
+ // propagation during escalation.
163
+ //
164
+ // We have to do the escalation before we successfully enqueue the
165
+ // waiting task on the blocking task's wait queue, because as soon as
166
+ // we do, this thread is no longer blocking the resumption of the
167
+ // waiting task, and so both the blocking task (which is retained
168
+ // during the wait only from the waiting task's perspective) and the
169
+ // waiting task (which can simply terminate) must be treat as
170
+ // invalidated from this thread's perspective.
171
+ //
172
+ // When we do fix this bug to record the dependency, we will have to
173
+ // do it before this escalation of the blocking task so that there
174
+ // isn't a race where an escalation of the waiting task can fail
175
+ // to propagate to the blocking task. The correct priority to
176
+ // escalate to is the priority we observe when we successfully record
177
+ // the dependency; any later escalations will automatically propagate.
178
+ //
179
+ // If the blocking task finishes while we're doing this escalation,
180
+ // the escalation will be innocuous. The wasted effort is acceptable;
181
+ // programmers should be encouraged to give tasks that will block
182
+ // other tasks the correct priority to begin with.
183
+ auto waitingStatus =
184
+ waitingTask->_private ()._status ().load (std::memory_order_relaxed);
185
+ if (waitingStatus.getStoredPriority () > escalatedPriority) {
186
+ swift_task_escalate (this , waitingStatus.getStoredPriority ());
187
+ escalatedPriority = waitingStatus.getStoredPriority ();
188
+ }
189
+
148
190
// Put the waiting task at the beginning of the wait queue.
149
191
waitingTask->getNextWaitingTask () = queueHead.getTask ();
150
192
auto newQueueHead = WaitQueueItem::get (Status::Executing, waitingTask);
@@ -153,11 +195,6 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
153
195
/* success*/ std::memory_order_release,
154
196
/* failure*/ std::memory_order_acquire)) {
155
197
156
- // Escalate the priority of this task based on the priority
157
- // of the waiting task.
158
- auto status = waitingTask->_private ()._status ().load (std::memory_order_relaxed);
159
- swift_task_escalate (this , status.getStoredPriority ());
160
-
161
198
_swift_task_clearCurrent ();
162
199
return FutureFragment::Status::Executing;
163
200
}
0 commit comments