Skip to content

Commit cc2bbc2

Browse files
authored
bpo-32309: Implement asyncio.to_thread() (GH-20143)
Implements `asyncio.to_thread`, a coroutine for asynchronously running IO-bound functions in a separate thread without blocking the event loop. See the discussion starting from [here](#18410 (comment)) in GH-18410 for context. Automerge-Triggered-By: @aeros
1 parent d4fe098 commit cc2bbc2

File tree

7 files changed

+171
-0
lines changed

7 files changed

+171
-0
lines changed

Doc/library/asyncio-api-index.rst

+3
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ await on multiple things with timeouts.
4848
* - :class:`Task`
4949
- Task object.
5050

51+
* - :func:`to_thread`
52+
- Asychronously run a function in a separate OS thread.
53+
5154
* - :func:`run_coroutine_threadsafe`
5255
- Schedule a coroutine from another OS thread.
5356

Doc/library/asyncio-task.rst

+56
Original file line numberDiff line numberDiff line change
@@ -602,6 +602,62 @@ Waiting Primitives
602602
# ...
603603

604604

605+
Running in Threads
606+
==================
607+
608+
.. coroutinefunction:: to_thread(func, /, \*args, \*\*kwargs)
609+
610+
Asynchronously run function *func* in a separate thread.
611+
612+
Any \*args and \*\*kwargs supplied for this function are directly passed
613+
to *func*.
614+
615+
Return an :class:`asyncio.Future` which represents the eventual result of
616+
*func*.
617+
618+
This coroutine function is primarily intended to be used for executing
619+
IO-bound functions/methods that would otherwise block the event loop if
620+
they were ran in the main thread. For example::
621+
622+
def blocking_io():
623+
print(f"start blocking_io at {time.strftime('%X')}")
624+
# Note that time.sleep() can be replaced with any blocking
625+
# IO-bound operation, such as file operations.
626+
time.sleep(1)
627+
print(f"blocking_io complete at {time.strftime('%X')}")
628+
629+
async def main():
630+
print(f"started main at {time.strftime('%X')}")
631+
632+
await asyncio.gather(
633+
asyncio.to_thread(blocking_io),
634+
asyncio.sleep(1))
635+
636+
print(f"finished main at {time.strftime('%X')}")
637+
638+
639+
asyncio.run(main())
640+
641+
# Expected output:
642+
#
643+
# started main at 19:50:53
644+
# start blocking_io at 19:50:53
645+
# blocking_io complete at 19:50:54
646+
# finished main at 19:50:54
647+
648+
Directly calling `blocking_io()` in any coroutine would block the event loop
649+
for its duration, resulting in an additional 1 second of run time. Instead,
650+
by using `asyncio.to_thread()`, we can run it in a separate thread without
651+
blocking the event loop.
652+
653+
.. note::
654+
655+
Due to the :term:`GIL`, `asyncio.to_thread()` can typically only be used
656+
to make IO-bound functions non-blocking. However, for extension modules
657+
that release the GIL or alternative Python implementations that don't
658+
have one, `asyncio.to_thread()` can also be used for CPU-bound functions.
659+
660+
605661
Scheduling From Other Threads
606662
=============================
607663

Doc/whatsnew/3.9.rst

+6
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,12 @@ that schedules a shutdown for the default executor that waits on the
282282
Added :class:`asyncio.PidfdChildWatcher`, a Linux-specific child watcher
283283
implementation that polls process file descriptors. (:issue:`38692`)
284284

285+
Added a new :term:`coroutine` :func:`asyncio.to_thread`. It is mainly used for
286+
running IO-bound functions in a separate thread to avoid blocking the event
287+
loop, and essentially works as a high-level version of
288+
:meth:`~asyncio.loop.run_in_executor` that can directly take keyword arguments.
289+
(Contributed by Kyle Stanley and Yury Selivanov in :issue:`32309`.)
290+
285291
compileall
286292
----------
287293

Lib/asyncio/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from .streams import *
1818
from .subprocess import *
1919
from .tasks import *
20+
from .threads import *
2021
from .transports import *
2122

2223
# Exposed for _asynciomodule.c to implement now deprecated
@@ -35,6 +36,7 @@
3536
streams.__all__ +
3637
subprocess.__all__ +
3738
tasks.__all__ +
39+
threads.__all__ +
3840
transports.__all__)
3941

4042
if sys.platform == 'win32': # pragma: no cover

Lib/asyncio/threads.py

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
"""High-level support for working with threads in asyncio"""
2+
3+
import functools
4+
5+
from . import events
6+
7+
8+
__all__ = "to_thread",
9+
10+
11+
async def to_thread(func, /, *args, **kwargs):
12+
"""Asynchronously run function *func* in a separate thread.
13+
14+
Any *args and **kwargs supplied for this function are directly passed
15+
to *func*.
16+
17+
Return an asyncio.Future which represents the eventual result of *func*.
18+
"""
19+
loop = events.get_running_loop()
20+
func_call = functools.partial(func, *args, **kwargs)
21+
return await loop.run_in_executor(None, func_call)

Lib/test/test_asyncio/test_threads.py

+79
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
"""Tests for asyncio/threads.py"""
2+
3+
import asyncio
4+
import unittest
5+
6+
from unittest import mock
7+
from test.test_asyncio import utils as test_utils
8+
9+
10+
def tearDownModule():
11+
asyncio.set_event_loop_policy(None)
12+
13+
14+
class ToThreadTests(test_utils.TestCase):
15+
def setUp(self):
16+
super().setUp()
17+
self.loop = asyncio.new_event_loop()
18+
asyncio.set_event_loop(self.loop)
19+
20+
def tearDown(self):
21+
self.loop.run_until_complete(
22+
self.loop.shutdown_default_executor())
23+
self.loop.close()
24+
asyncio.set_event_loop(None)
25+
self.loop = None
26+
super().tearDown()
27+
28+
def test_to_thread(self):
29+
async def main():
30+
return await asyncio.to_thread(sum, [40, 2])
31+
32+
result = self.loop.run_until_complete(main())
33+
self.assertEqual(result, 42)
34+
35+
def test_to_thread_exception(self):
36+
def raise_runtime():
37+
raise RuntimeError("test")
38+
39+
async def main():
40+
await asyncio.to_thread(raise_runtime)
41+
42+
with self.assertRaisesRegex(RuntimeError, "test"):
43+
self.loop.run_until_complete(main())
44+
45+
def test_to_thread_once(self):
46+
func = mock.Mock()
47+
48+
async def main():
49+
await asyncio.to_thread(func)
50+
51+
self.loop.run_until_complete(main())
52+
func.assert_called_once()
53+
54+
def test_to_thread_concurrent(self):
55+
func = mock.Mock()
56+
57+
async def main():
58+
futs = []
59+
for _ in range(10):
60+
fut = asyncio.to_thread(func)
61+
futs.append(fut)
62+
await asyncio.gather(*futs)
63+
64+
self.loop.run_until_complete(main())
65+
self.assertEqual(func.call_count, 10)
66+
67+
def test_to_thread_args_kwargs(self):
68+
# Unlike run_in_executor(), to_thread() should directly accept kwargs.
69+
func = mock.Mock()
70+
71+
async def main():
72+
await asyncio.to_thread(func, 'test', something=True)
73+
74+
self.loop.run_until_complete(main())
75+
func.assert_called_once_with('test', something=True)
76+
77+
78+
if __name__ == "__main__":
79+
unittest.main()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
Added a new :term:`coroutine` :func:`asyncio.to_thread`. It is mainly used for
2+
running IO-bound functions in a separate thread to avoid blocking the event
3+
loop, and essentially works as a high-level version of
4+
:meth:`~asyncio.loop.run_in_executor` that can directly take keyword arguments.

0 commit comments

Comments
 (0)