Skip to content

Commit 5d93ed9

Browse files
revise and document cancellation semantics
in short, cancellable threads always use system tasks. normal threads use the host task, unless passed a token
1 parent eab30c4 commit 5d93ed9

File tree

3 files changed

+109
-101
lines changed

3 files changed

+109
-101
lines changed

docs/source/reference-core.rst

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1823,9 +1823,20 @@ to spawn a child thread, and then use a :ref:`memory channel
18231823

18241824
.. literalinclude:: reference-core/from-thread-example.py
18251825

1826+
.. note::
1827+
1828+
The ``from_thread.run*`` functions reuse the host task that called
1829+
:func:`trio.to_thread.run_sync` to run your provided function in the typical case,
1830+
namely when ``cancellable=False`` so Trio can be sure that the task will always be
1831+
around to perform the work. If you pass ``cancellable=True`` at the outset, or if
1832+
you provide a :class:`~trio.lowlevel.TrioToken` when calling back in to Trio, your
1833+
functions will be executed in a new system task. Therefore, the
1834+
:func:`~trio.lowlevel.current_task`, :func:`current_effective_deadline`, or other
1835+
task-tree specific values may differ depending on keyword argument values.
1836+
18261837
You can also use :func:`trio.from_thread.check_cancelled` to check for cancellation from
18271838
a thread that was spawned by :func:`trio.to_thread.run_sync`. If the call to
1828-
:func:`~trio.to_thread.run_sync` was cancelled, then
1839+
:func:`~trio.to_thread.run_sync` was cancelled (even if ``cancellable=False``!), then
18291840
:func:`~trio.from_thread.check_cancelled` will raise :func:`trio.Cancelled`.
18301841
It's like ``trio.from_thread.run(trio.sleep, 0)``, but much faster.
18311842

trio/_tests/test_threads.py

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -933,14 +933,16 @@ async def async_time_bomb():
933933
async def test_from_thread_check_cancelled():
934934
q = stdlib_queue.Queue()
935935

936-
async def child(cancellable):
937-
record.append("start")
938-
try:
939-
return await to_thread_run_sync(f, cancellable=cancellable)
940-
except _core.Cancelled:
941-
record.append("cancel")
942-
finally:
943-
record.append("exit")
936+
async def child(cancellable, scope):
937+
with scope:
938+
record.append("start")
939+
try:
940+
return await to_thread_run_sync(f, cancellable=cancellable)
941+
except _core.Cancelled:
942+
record.append("cancel")
943+
raise
944+
finally:
945+
record.append("exit")
944946

945947
def f():
946948
try:
@@ -956,7 +958,7 @@ def f():
956958
record = []
957959
ev = threading.Event()
958960
async with _core.open_nursery() as nursery:
959-
nursery.start_soon(child, False)
961+
nursery.start_soon(child, False, _core.CancelScope())
960962
await wait_all_tasks_blocked()
961963
assert record[0] == "start"
962964
assert q.get(timeout=1) == "Not Cancelled"
@@ -968,14 +970,15 @@ def f():
968970
# the appropriate cancel scope
969971
record = []
970972
ev = threading.Event()
973+
scope = _core.CancelScope() # Nursery cancel scope gives false positives
971974
async with _core.open_nursery() as nursery:
972-
nursery.start_soon(child, False)
975+
nursery.start_soon(child, False, scope)
973976
await wait_all_tasks_blocked()
974977
assert record[0] == "start"
975978
assert q.get(timeout=1) == "Not Cancelled"
976-
nursery.cancel_scope.cancel()
979+
scope.cancel()
977980
ev.set()
978-
assert nursery.cancel_scope.cancelled_caught
981+
assert scope.cancelled_caught
979982
assert "cancel" in record
980983
assert record[-1] == "exit"
981984

@@ -992,13 +995,14 @@ def f(): # noqa: F811
992995

993996
record = []
994997
ev = threading.Event()
998+
scope = _core.CancelScope()
995999
async with _core.open_nursery() as nursery:
996-
nursery.start_soon(child, True)
1000+
nursery.start_soon(child, True, scope)
9971001
await wait_all_tasks_blocked()
9981002
assert record[0] == "start"
999-
nursery.cancel_scope.cancel()
1003+
scope.cancel()
10001004
ev.set()
1001-
assert nursery.cancel_scope.cancelled_caught
1005+
assert scope.cancelled_caught
10021006
assert "cancel" in record
10031007
assert record[-1] == "exit"
10041008
assert q.get(timeout=1) == "Cancelled"

trio/_threads.py

Lines changed: 78 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
from trio._core._traps import RaiseCancelT
1818

1919
from ._core import (
20-
CancelScope,
2120
RunVar,
2221
TrioToken,
2322
disable_ki_protection,
@@ -35,6 +34,7 @@ class _ParentTaskData(threading.local):
3534
parent task of native Trio threads."""
3635

3736
token: TrioToken
37+
abandon_on_cancel: bool
3838
cancel_register: list[RaiseCancelT | None]
3939
task_register: list[trio.lowlevel.Task | None]
4040

@@ -74,11 +74,6 @@ class ThreadPlaceholder:
7474

7575

7676
# Types for the to_thread_run_sync message loop
77-
@attr.s(frozen=True, eq=False)
78-
class ThreadDone(Generic[RetT]):
79-
result: outcome.Outcome[RetT] = attr.ib()
80-
81-
8277
@attr.s(frozen=True, eq=False)
8378
class Run(Generic[RetT]):
8479
afn: Callable[..., Awaitable[RetT]] = attr.ib()
@@ -87,7 +82,6 @@ class Run(Generic[RetT]):
8782
queue: stdlib_queue.SimpleQueue[outcome.Outcome[RetT]] = attr.ib(
8883
init=False, factory=stdlib_queue.SimpleQueue
8984
)
90-
scope: CancelScope = attr.ib(init=False, factory=CancelScope)
9185

9286
@disable_ki_protection
9387
async def unprotected_afn(self) -> RetT:
@@ -108,14 +102,32 @@ async def run(self) -> None:
108102
await trio.lowlevel.cancel_shielded_checkpoint()
109103

110104
async def run_system(self) -> None:
111-
# NOTE: There is potential here to only conditionally enter a CancelScope
112-
# when we need it, sparing some computation. But doing so adds substantial
113-
# complexity, so we'll leave it until real need is demonstrated.
114-
with self.scope:
115-
result = await outcome.acapture(self.unprotected_afn)
116-
assert not self.scope.cancelled_caught, "any Cancelled should go to our parent"
105+
result = await outcome.acapture(self.unprotected_afn)
117106
self.queue.put_nowait(result)
118107

108+
def run_in_host_task(self, token: TrioToken) -> None:
109+
task_register = PARENT_TASK_DATA.task_register
110+
111+
def in_trio_thread() -> None:
112+
task = task_register[0]
113+
assert task is not None, "guaranteed by abandon_on_cancel semantics"
114+
trio.lowlevel.reschedule(task, outcome.Value(self))
115+
116+
token.run_sync_soon(in_trio_thread)
117+
118+
def run_in_system_nursery(self, token: TrioToken) -> None:
119+
def in_trio_thread() -> None:
120+
try:
121+
trio.lowlevel.spawn_system_task(
122+
self.run, name=self.afn, context=self.context
123+
)
124+
except RuntimeError: # system nursery is closed
125+
self.queue.put_nowait(
126+
outcome.Error(trio.RunFinishedError("system nursery is closed"))
127+
)
128+
129+
token.run_sync_soon(in_trio_thread)
130+
119131

120132
@attr.s(frozen=True, eq=False)
121133
class RunSync(Generic[RetT]):
@@ -144,6 +156,19 @@ def run_sync(self) -> None:
144156
result = outcome.capture(self.context.run, self.unprotected_fn)
145157
self.queue.put_nowait(result)
146158

159+
def run_in_host_task(self, token: TrioToken) -> None:
160+
task_register = PARENT_TASK_DATA.task_register
161+
162+
def in_trio_thread() -> None:
163+
task = task_register[0]
164+
assert task is not None, "guaranteed by abandon_on_cancel semantics"
165+
trio.lowlevel.reschedule(task, outcome.Value(self))
166+
167+
token.run_sync_soon(in_trio_thread)
168+
169+
def run_in_system_nursery(self, token: TrioToken) -> None:
170+
token.run_sync_soon(self.run_sync)
171+
147172

148173
@enable_ki_protection # Decorator used on function with Coroutine[Any, Any, RetT]
149174
async def to_thread_run_sync( # type: ignore[misc]
@@ -237,7 +262,7 @@ async def to_thread_run_sync( # type: ignore[misc]
237262
238263
"""
239264
await trio.lowlevel.checkpoint_if_cancelled()
240-
cancellable = bool(cancellable) # raise early if cancellable.__bool__ raises
265+
abandon_on_cancel = bool(cancellable) # raise early if cancellable.__bool__ raises
241266
if limiter is None:
242267
limiter = current_default_thread_limiter()
243268

@@ -266,9 +291,7 @@ def do_release_then_return_result() -> RetT:
266291

267292
result = outcome.capture(do_release_then_return_result)
268293
if task_register[0] is not None:
269-
trio.lowlevel.reschedule(
270-
task_register[0], outcome.Value(ThreadDone(result))
271-
)
294+
trio.lowlevel.reschedule(task_register[0], outcome.Value(result))
272295

273296
current_trio_token = trio.lowlevel.current_trio_token()
274297

@@ -283,6 +306,7 @@ def worker_fn() -> RetT:
283306
current_async_library_cvar.set(None)
284307

285308
PARENT_TASK_DATA.token = current_trio_token
309+
PARENT_TASK_DATA.abandon_on_cancel = abandon_on_cancel
286310
PARENT_TASK_DATA.cancel_register = cancel_register
287311
PARENT_TASK_DATA.task_register = task_register
288312
try:
@@ -299,6 +323,7 @@ def worker_fn() -> RetT:
299323
return ret
300324
finally:
301325
del PARENT_TASK_DATA.token
326+
del PARENT_TASK_DATA.abandon_on_cancel
302327
del PARENT_TASK_DATA.cancel_register
303328
del PARENT_TASK_DATA.task_register
304329

@@ -336,11 +361,11 @@ def abort(raise_cancel: RaiseCancelT) -> trio.lowlevel.Abort:
336361

337362
while True:
338363
# wait_task_rescheduled return value cannot be typed
339-
msg_from_thread: ThreadDone[RetT] | Run[object] | RunSync[
364+
msg_from_thread: outcome.Outcome[RetT] | Run[object] | RunSync[
340365
object
341366
] = await trio.lowlevel.wait_task_rescheduled(abort)
342-
if isinstance(msg_from_thread, ThreadDone):
343-
return msg_from_thread.result.unwrap() # type: ignore[no-any-return]
367+
if isinstance(msg_from_thread, outcome.Outcome):
368+
return msg_from_thread.unwrap() # type: ignore[no-any-return]
344369
elif isinstance(msg_from_thread, Run):
345370
await msg_from_thread.run()
346371
elif isinstance(msg_from_thread, RunSync):
@@ -354,10 +379,10 @@ def abort(raise_cancel: RaiseCancelT) -> trio.lowlevel.Abort:
354379

355380

356381
def from_thread_check_cancelled() -> None:
357-
"""Raise trio.Cancelled if the associated Trio task entered a cancelled status.
382+
"""Raise `trio.Cancelled` if the associated Trio task entered a cancelled status.
358383
359384
Only applicable to threads spawned by `trio.to_thread.run_sync`. Poll to allow
360-
``cancellable=False`` threads to raise :exc:`trio.Cancelled` at a suitable
385+
``cancellable=False`` threads to raise :exc:`~trio.Cancelled` at a suitable
361386
place, or to end abandoned ``cancellable=True`` threads sooner than they may
362387
otherwise.
363388
@@ -366,6 +391,13 @@ def from_thread_check_cancelled() -> None:
366391
delivery of cancellation attempted against it, regardless of the value of
367392
``cancellable`` supplied as an argument to it.
368393
RuntimeError: If this thread is not spawned from `trio.to_thread.run_sync`.
394+
395+
.. note::
396+
397+
The check for cancellation attempts of ``cancellable=False`` threads is
398+
interrupted while executing ``from_thread.run*`` functions, which can lead to
399+
edge cases where this function may raise or not depending on the timing of
400+
:class:`~trio.CancelScope` shields being raised or lowered in the Trio threads.
369401
"""
370402
try:
371403
raise_cancel = PARENT_TASK_DATA.cancel_register[0]
@@ -406,49 +438,6 @@ def _check_token(trio_token: TrioToken | None) -> TrioToken:
406438
return trio_token
407439

408440

409-
def _send_message_to_host_task(
410-
message: Run[RetT] | RunSync[RetT], trio_token: TrioToken
411-
) -> None:
412-
task_register = PARENT_TASK_DATA.task_register
413-
414-
def in_trio_thread() -> None:
415-
task = task_register[0]
416-
if task is None:
417-
# Our parent task is gone! Punt to a system task.
418-
if isinstance(message, Run):
419-
message.scope.cancel()
420-
_send_message_to_system_task(message, trio_token)
421-
else:
422-
trio.lowlevel.reschedule(task, outcome.Value(message))
423-
424-
trio_token.run_sync_soon(in_trio_thread)
425-
426-
427-
def _send_message_to_system_task(
428-
message: Run[RetT] | RunSync[RetT], trio_token: TrioToken
429-
) -> None:
430-
if type(message) is RunSync:
431-
run_sync = message.run_sync
432-
elif type(message) is Run:
433-
434-
def run_sync() -> None:
435-
try:
436-
trio.lowlevel.spawn_system_task(
437-
message.run_system, name=message.afn, context=message.context
438-
)
439-
except RuntimeError: # system nursery is closed
440-
message.queue.put_nowait(
441-
outcome.Error(trio.RunFinishedError("system nursery is closed"))
442-
)
443-
444-
else: # pragma: no cover, internal debugging guard TODO: use assert_never
445-
raise TypeError(
446-
"trio.to_thread.run_sync received unrecognized thread message {!r}."
447-
"".format(message)
448-
)
449-
trio_token.run_sync_soon(run_sync)
450-
451-
452441
def from_thread_run(
453442
afn: Callable[..., Awaitable[RetT]],
454443
*args: object,
@@ -467,17 +456,15 @@ def from_thread_run(
467456
RunFinishedError: if the corresponding call to :func:`trio.run` has
468457
already completed, or if the run has started its final cleanup phase
469458
and can no longer spawn new system tasks.
470-
Cancelled: if the corresponding `trio.to_thread.run_sync` task is
471-
cancellable and exits before this function is called, or
472-
if the task enters cancelled status or call to :func:`trio.run` completes
473-
while ``afn(*args)`` is running, then ``afn`` is likely to raise
459+
Cancelled: if the task enters cancelled status or call to :func:`trio.run`
460+
completes while ``afn(*args)`` is running, then ``afn`` is likely to raise
474461
:exc:`trio.Cancelled`.
475462
RuntimeError: if you try calling this from inside the Trio thread,
476463
which would otherwise cause a deadlock, or if no ``trio_token`` was
477464
provided, and we can't infer one from context.
478465
TypeError: if ``afn`` is not an asynchronous function.
479466
480-
**Locating a Trio Token**: There are two ways to specify which
467+
**Locating a TrioToken**: There are two ways to specify which
481468
`trio.run` loop to reenter:
482469
483470
- Spawn this thread from `trio.to_thread.run_sync`. Trio will
@@ -486,17 +473,20 @@ def from_thread_run(
486473
- Pass a keyword argument, ``trio_token`` specifying a specific
487474
`trio.run` loop to re-enter. This is useful in case you have a
488475
"foreign" thread, spawned using some other framework, and still want
489-
to enter Trio, or if you want to avoid the cancellation context of
490-
`trio.to_thread.run_sync`.
476+
to enter Trio, or if you want to use a new system task to call ``afn``,
477+
maybe to avoid the cancellation context of a corresponding
478+
`trio.to_thread.run_sync` task.
491479
"""
492-
if trio_token is None:
493-
send_message = _send_message_to_host_task
494-
else:
495-
send_message = _send_message_to_system_task
480+
token_provided = trio_token is not None
481+
trio_token = _check_token(trio_token)
496482

497483
message_to_trio = Run(afn, args, contextvars.copy_context())
498484

499-
send_message(message_to_trio, _check_token(trio_token))
485+
if token_provided or PARENT_TASK_DATA.abandon_on_cancel:
486+
message_to_trio.run_in_system_nursery(trio_token)
487+
else:
488+
message_to_trio.run_in_host_task(trio_token)
489+
500490
return message_to_trio.queue.get().unwrap() # type: ignore[no-any-return]
501491

502492

@@ -522,7 +512,7 @@ def from_thread_run_sync(
522512
provided, and we can't infer one from context.
523513
TypeError: if ``fn`` is an async function.
524514
525-
**Locating a Trio Token**: There are two ways to specify which
515+
**Locating a TrioToken**: There are two ways to specify which
526516
`trio.run` loop to reenter:
527517
528518
- Spawn this thread from `trio.to_thread.run_sync`. Trio will
@@ -531,15 +521,18 @@ def from_thread_run_sync(
531521
- Pass a keyword argument, ``trio_token`` specifying a specific
532522
`trio.run` loop to re-enter. This is useful in case you have a
533523
"foreign" thread, spawned using some other framework, and still want
534-
to enter Trio, or if you want to avoid the cancellation context of
535-
`trio.to_thread.run_sync`.
524+
to enter Trio, or if you want to use a new system task to call ``fn``,
525+
maybe to avoid the cancellation context of a corresponding
526+
`trio.to_thread.run_sync` task.
536527
"""
537-
if trio_token is None:
538-
send_message = _send_message_to_host_task
539-
else:
540-
send_message = _send_message_to_system_task
528+
token_provided = trio_token is not None
529+
trio_token = _check_token(trio_token)
541530

542531
message_to_trio = RunSync(fn, args, contextvars.copy_context())
543532

544-
send_message(message_to_trio, _check_token(trio_token))
533+
if token_provided or PARENT_TASK_DATA.abandon_on_cancel:
534+
message_to_trio.run_in_system_nursery(trio_token)
535+
else:
536+
message_to_trio.run_in_host_task(trio_token)
537+
545538
return message_to_trio.queue.get().unwrap() # type: ignore[no-any-return]

0 commit comments

Comments
 (0)