From d6073d51ff55ebe8fd68658bcb2457321cbdddd7 Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Sat, 16 May 2020 20:51:40 -0400 Subject: [PATCH 1/9] Implement asyncio.to_thread() Co-authored-by: Yury Selivanov --- Doc/library/asyncio-task.rst | 78 ++++++++++++++++++++++++++ Doc/whatsnew/3.9.rst | 6 ++ Lib/asyncio/__init__.py | 2 + Lib/asyncio/threads.py | 19 +++++++ Lib/test/test_asyncio/test_threads.py | 79 +++++++++++++++++++++++++++ 5 files changed, 184 insertions(+) create mode 100644 Lib/asyncio/threads.py create mode 100644 Lib/test/test_asyncio/test_threads.py diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index 2e963398d93001..330352f58a6faa 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -602,6 +602,84 @@ Waiting Primitives # ... +Running in Threads +================== + +.. coroutinefunction:: to_thread(func, /, \*args, \*\*kwargs) + + Asynchronously run function *func* in a separate thread. + + Return an :class:`asyncio.Future` which represents the eventual result of + *func*. + + This coroutine function is primarily intended to be used for executing + IO-bound functions/methods that would otherwise block the event loop if + they were ran in the main thread. For example, the following code would + block the event loop:: + + # "async def" is just used here so we can submit to asyncio.gather() + async def blocking_io(): + print(f"start blocking_io at {time.strftime('%X')}") + # If done in the same thread, blocking IO (such as file operations) will + # block the event loop. + time.sleep(1) + print(f"blocking_io complete at {time.strftime('%X')}") + + async def main(): + print(f"started main at {time.strftime('%X')}") + + await asyncio.gather( + blocking_io(), + asyncio.sleep(1)) + + print(f"finished main at {time.strftime('%X')}") + + + asyncio.run(main()) + + # Expected output: + # + # started main at 19:50:49 + # start blocking_io at 19:50:49 + # blocking_io complete at 19:50:50 + # finished main at 19:50:51 + + However, by using `asyncio.to_thread()` to run `blocking_io()` in a + separate thread, we can avoid blocking the event loop:: + + def blocking_io(): + print(f"start blocking_io at {time.strftime('%X')}") + time.sleep(1) + print(f"blocking_io complete at {time.strftime('%X')}") + + async def main(): + print(f"started main at {time.strftime('%X')}") + + await asyncio.gather( + asyncio.to_thread(blocking_io), + asyncio.sleep(1)) + + print(f"finished main at {time.strftime('%X')}") + + + asyncio.run(main()) + + # Expected output: + # + # started main at 19:50:53 + # start blocking_io at 19:50:53 + # blocking_io complete at 19:50:54 + # finished main at 19:50:54 + + + .. note :: + + Due to the :term:`GIL`, `asyncio.to_thread()` can typically only be used + to make IO-bound functions non-blocking. However, for extension modules + that release the GIL or alternative Python implementations that don't + have one, `asyncio.to_thread()` can also be used for CPU-bound functions. + + Scheduling From Other Threads ============================= diff --git a/Doc/whatsnew/3.9.rst b/Doc/whatsnew/3.9.rst index 479c33b4a7fa1c..46ab42c7c68b9d 100644 --- a/Doc/whatsnew/3.9.rst +++ b/Doc/whatsnew/3.9.rst @@ -282,6 +282,12 @@ that schedules a shutdown for the default executor that waits on the Added :class:`asyncio.PidfdChildWatcher`, a Linux-specific child watcher implementation that polls process file descriptors. (:issue:`38692`) +Added a new :term:`coroutine` :func:`asyncio.to_thread`. It is mainly used for +running IO-bound functions in a separate thread to avoid blocking the event +loop, and essentially works as a high-level version of +:meth:`~asyncio.loop.run_in_executor` that can directly take keyword arguments. +(Contributed by Kyle Stanley and Yury Selivanov in :issue:`32309`.) + compileall ---------- diff --git a/Lib/asyncio/__init__.py b/Lib/asyncio/__init__.py index 28c2e2c429f34a..eb84bfb189ccf3 100644 --- a/Lib/asyncio/__init__.py +++ b/Lib/asyncio/__init__.py @@ -17,6 +17,7 @@ from .streams import * from .subprocess import * from .tasks import * +from .threads import * from .transports import * # Exposed for _asynciomodule.c to implement now deprecated @@ -35,6 +36,7 @@ streams.__all__ + subprocess.__all__ + tasks.__all__ + + threads.__all__ + transports.__all__) if sys.platform == 'win32': # pragma: no cover diff --git a/Lib/asyncio/threads.py b/Lib/asyncio/threads.py new file mode 100644 index 00000000000000..78f125886a476e --- /dev/null +++ b/Lib/asyncio/threads.py @@ -0,0 +1,19 @@ +"""High-level support for working with threads in asyncio""" + +import functools + +from . import events + + +__all__ = "to_thread", + + +async def to_thread(func, /, *args, **kwargs): + """Asynchronously run function *func* in a separate thread. + + Return an asyncio.Future which represents the eventual result of *func*. + """ + loop = events.get_running_loop() + func_call = functools.partial(func, *args, **kwargs) + return await loop.run_in_executor(None, func_call) + diff --git a/Lib/test/test_asyncio/test_threads.py b/Lib/test/test_asyncio/test_threads.py new file mode 100644 index 00000000000000..99a00f21832f3e --- /dev/null +++ b/Lib/test/test_asyncio/test_threads.py @@ -0,0 +1,79 @@ +"""Tests for asyncio/threads.py""" + +import asyncio +import unittest + +from unittest import mock +from test.test_asyncio import utils as test_utils + + +def tearDownModule(): + asyncio.set_event_loop_policy(None) + + +class ToThreadTests(test_utils.TestCase): + def setUp(self): + super().setUp() + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + def tearDown(self): + self.loop.run_until_complete( + self.loop.shutdown_default_executor()) + self.loop.close() + asyncio.set_event_loop(None) + self.loop = None + super().tearDown() + + def test_to_thread(self): + async def main(): + return await asyncio.to_thread(sum, [40, 2]) + + result = self.loop.run_until_complete(main()) + self.assertEqual(result, 42) + + def test_to_thread_exception(self): + def raise_runtime(): + raise RuntimeError("test") + + async def main(): + await asyncio.to_thread(raise_runtime) + + with self.assertRaisesRegex(RuntimeError, "test"): + self.loop.run_until_complete(main()) + + def test_to_thread_once(self): + func = mock.Mock() + + async def main(): + await asyncio.to_thread(func) + + self.loop.run_until_complete(main()) + func.assert_called_once() + + def test_to_thread_concurrent(self): + func = mock.Mock() + + async def main(): + futs = [] + for _ in range(10): + fut = asyncio.to_thread(func) + futs.append(fut) + await asyncio.gather(*futs) + + self.loop.run_until_complete(main()) + self.assertEqual(func.call_count, 10) + + def test_to_thread_args_kwargs(self): + # Unlike run_in_executor(), to_thread() should directly accept kwargs. + func = mock.Mock() + + async def main(): + await asyncio.to_thread(func, 'test', something=True) + + self.loop.run_until_complete(main()) + func.assert_called_once_with('test', something=True) + + +if __name__ == "__main__": + unittest.main() From 72da7bd5080b2eb7fbd4f2356cd55f2b1228d9ec Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Sun, 17 May 2020 02:03:12 +0000 Subject: [PATCH 2/9] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20blu?= =?UTF-8?q?rb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Library/2020-05-17-02-03-09.bpo-32309.KM9psl.rst | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2020-05-17-02-03-09.bpo-32309.KM9psl.rst diff --git a/Misc/NEWS.d/next/Library/2020-05-17-02-03-09.bpo-32309.KM9psl.rst b/Misc/NEWS.d/next/Library/2020-05-17-02-03-09.bpo-32309.KM9psl.rst new file mode 100644 index 00000000000000..6272c35edf4d57 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2020-05-17-02-03-09.bpo-32309.KM9psl.rst @@ -0,0 +1,4 @@ +Added a new :term:`coroutine` :func:`asyncio.to_thread`. It is mainly used for +running IO-bound functions in a separate thread to avoid blocking the event +loop, and essentially works as a high-level version of +:meth:`~asyncio.loop.run_in_executor` that can directly take keyword arguments. \ No newline at end of file From 520d029ebc10b63e3375646bf5bff2b6efdf6c94 Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Sat, 16 May 2020 22:10:45 -0400 Subject: [PATCH 3/9] Fix whitespace in threads.py --- Lib/asyncio/threads.py | 1 - 1 file changed, 1 deletion(-) diff --git a/Lib/asyncio/threads.py b/Lib/asyncio/threads.py index 78f125886a476e..966f4b03b484a8 100644 --- a/Lib/asyncio/threads.py +++ b/Lib/asyncio/threads.py @@ -16,4 +16,3 @@ async def to_thread(func, /, *args, **kwargs): loop = events.get_running_loop() func_call = functools.partial(func, *args, **kwargs) return await loop.run_in_executor(None, func_call) - From 542e9173c9e8d653adaccaea5087a6c76d8d5e3d Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Sat, 16 May 2020 22:22:42 -0400 Subject: [PATCH 4/9] Document *args and **kwargs behavior Co-authored-by: Chris Jerdonek --- Doc/library/asyncio-task.rst | 3 +++ Lib/asyncio/threads.py | 3 +++ 2 files changed, 6 insertions(+) diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index 330352f58a6faa..e3d7f622436964 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -609,6 +609,9 @@ Running in Threads Asynchronously run function *func* in a separate thread. + Any \*args and \*\*kwargs supplied for this function are directly passed + to *func*. + Return an :class:`asyncio.Future` which represents the eventual result of *func*. diff --git a/Lib/asyncio/threads.py b/Lib/asyncio/threads.py index 966f4b03b484a8..2f40467fe5bc7b 100644 --- a/Lib/asyncio/threads.py +++ b/Lib/asyncio/threads.py @@ -11,6 +11,9 @@ async def to_thread(func, /, *args, **kwargs): """Asynchronously run function *func* in a separate thread. + Any *args and **kwargs supplied for this function are directly passed + to *func*. + Return an asyncio.Future which represents the eventual result of *func*. """ loop = events.get_running_loop() From b5b62959eb07456f241224fb5a721240cbe1cb0d Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Sat, 16 May 2020 22:38:18 -0400 Subject: [PATCH 5/9] Fix formatting error --- Doc/library/asyncio-task.rst | 1 - 1 file changed, 1 deletion(-) diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index e3d7f622436964..cee96829f4b57e 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -674,7 +674,6 @@ Running in Threads # blocking_io complete at 19:50:54 # finished main at 19:50:54 - .. note :: Due to the :term:`GIL`, `asyncio.to_thread()` can typically only be used From c44451de2901d8500f4f3eafe822552b0b887288 Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Sun, 17 May 2020 00:20:44 -0400 Subject: [PATCH 6/9] Fix reST note --- Doc/library/asyncio-task.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index cee96829f4b57e..ed40cf29b8b824 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -674,7 +674,7 @@ Running in Threads # blocking_io complete at 19:50:54 # finished main at 19:50:54 - .. note :: + .. note:: Due to the :term:`GIL`, `asyncio.to_thread()` can typically only be used to make IO-bound functions non-blocking. However, for extension modules From 41af75f9689db990d1d277ecad8770c2aba32eba Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Mon, 18 May 2020 20:53:42 -0400 Subject: [PATCH 7/9] Add to_thread() to high-level API index --- Doc/library/asyncio-api-index.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Doc/library/asyncio-api-index.rst b/Doc/library/asyncio-api-index.rst index d5b5659abc65e2..047e5bbc58ccad 100644 --- a/Doc/library/asyncio-api-index.rst +++ b/Doc/library/asyncio-api-index.rst @@ -48,6 +48,9 @@ await on multiple things with timeouts. * - :class:`Task` - Task object. + * - :func:`to_thread` + - Asychronously run a function in a separate OS thread. + * - :func:`run_coroutine_threadsafe` - Schedule a coroutine from another OS thread. From 057870409da609392b4ea944939b38c839f2e27d Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Mon, 18 May 2020 22:27:15 -0400 Subject: [PATCH 8/9] Improve documentation examples for to_thread() Co-authored-by: Andrew Svetlov --- Doc/library/asyncio-task.rst | 40 ++++++++---------------------------- 1 file changed, 8 insertions(+), 32 deletions(-) diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index ed40cf29b8b824..a96c6dd4c52c1b 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -617,41 +617,12 @@ Running in Threads This coroutine function is primarily intended to be used for executing IO-bound functions/methods that would otherwise block the event loop if - they were ran in the main thread. For example, the following code would - block the event loop:: - - # "async def" is just used here so we can submit to asyncio.gather() - async def blocking_io(): - print(f"start blocking_io at {time.strftime('%X')}") - # If done in the same thread, blocking IO (such as file operations) will - # block the event loop. - time.sleep(1) - print(f"blocking_io complete at {time.strftime('%X')}") - - async def main(): - print(f"started main at {time.strftime('%X')}") - - await asyncio.gather( - blocking_io(), - asyncio.sleep(1)) - - print(f"finished main at {time.strftime('%X')}") - - - asyncio.run(main()) - - # Expected output: - # - # started main at 19:50:49 - # start blocking_io at 19:50:49 - # blocking_io complete at 19:50:50 - # finished main at 19:50:51 - - However, by using `asyncio.to_thread()` to run `blocking_io()` in a - separate thread, we can avoid blocking the event loop:: + they were ran in the main thread. For example:: def blocking_io(): print(f"start blocking_io at {time.strftime('%X')}") + # Note that `time.sleep()` can be replaced with any blocking + # IO-bound operation, such as file operations. time.sleep(1) print(f"blocking_io complete at {time.strftime('%X')}") @@ -674,6 +645,11 @@ Running in Threads # blocking_io complete at 19:50:54 # finished main at 19:50:54 + Directly calling `blocking_io()` in any coroutine would block the event loop + for its duration, resulting in an additional 1 second of run time. Instead, + by using `asyncio.to_thread()`, we can run it in a separate thread without + blocking the event loop. + .. note:: Due to the :term:`GIL`, `asyncio.to_thread()` can typically only be used From be502417af1d455144ace8d5041e5619305d60ca Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Mon, 18 May 2020 22:42:04 -0400 Subject: [PATCH 9/9] Fix doctest warning --- Doc/library/asyncio-task.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index a96c6dd4c52c1b..7c2704090551b6 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -621,7 +621,7 @@ Running in Threads def blocking_io(): print(f"start blocking_io at {time.strftime('%X')}") - # Note that `time.sleep()` can be replaced with any blocking + # Note that time.sleep() can be replaced with any blocking # IO-bound operation, such as file operations. time.sleep(1) print(f"blocking_io complete at {time.strftime('%X')}")