From 8cc155e005cc321b0ce0c755a93ef5b965ccf1fa Mon Sep 17 00:00:00 2001 From: aeros Date: Sat, 16 Nov 2019 17:19:59 -0500 Subject: [PATCH 01/18] Implement asyncio.ThreadPool --- Doc/library/asyncio-pools.rst | 95 +++++++++++++++ Doc/library/asyncio.rst | 3 + Lib/asyncio/__init__.py | 2 + Lib/asyncio/pools.py | 166 +++++++++++++++++++++++++ Lib/test/test_asyncio/test_pools.py | 182 ++++++++++++++++++++++++++++ 5 files changed, 448 insertions(+) create mode 100644 Doc/library/asyncio-pools.rst create mode 100644 Lib/asyncio/pools.py create mode 100644 Lib/test/test_asyncio/test_pools.py diff --git a/Doc/library/asyncio-pools.rst b/Doc/library/asyncio-pools.rst new file mode 100644 index 00000000000000..1703a055bb8182 --- /dev/null +++ b/Doc/library/asyncio-pools.rst @@ -0,0 +1,95 @@ +.. currentmodule:: asyncio + +.. versionadded:: 3.9 + +.. _asyncio-pools: + +===== +Pools +===== + +**Source code:** :source:`Lib/asyncio/pools.py` + +----------------------------------------------- + +.. note:: + This section of the documentation and all of its members have been + added *provisionally* to asyncio's API. For more details, see + :term:`provisional api`. + +Asyncio pools are high-level, asynchronous context managers that can be used +to concurrently run blocking functions and methods. + +There are many potential use cases, but a particularly useful one is for +combining libraries without asyncio support with existing asyncio programs. +Normally, calling a non-async function within the event loop would will result +in blocking the event loop until the function returns. However, by using a +pool to run the function, it can be executed in a separate worker (such as a +thread or process) without blocking the event loop. + +.. class:: ThreadPool(concurrency=None) + + An asynchronous threadpool that provides methods to concurrently + run IO-bound functions, without blocking the event loop. + + *concurrency* is an optional argument that limits the number of + threads to utilize in the threadpool. With the default value of + ``None``, the amount of threads used will scale based on the + number of processors. + + .. coroutinemethod:: run(/, func, *args, **kwargs) + + Asynchronously run *func* with its arguments and keyword-arguments + within the threadpool, and return a :class:`asyncio.Future` object + that represents the eventual result of its execution. :: + + with asyncio.ThreadPool() as pool: + await pool.run(time.sleep, 1) + + Raises a :exc:`RuntimeError` if the threadpool is *not* running. + + .. coroutinemethod:: astart() + + Schedule the start of the threadpool and spawn its threads. Note that + this function is called automatically when using ``asyncio.ThreadPool`` + as an asynchronous context manager, and does not need to be called + directly. + + Raises a :exc:`RuntimeError` if the threadpool is already running or + if it's been closed. + + .. coroutinemethod:: aclose() + + Schedule the closing of the threadpool. Note that this function is + called automatically when using ``asyncio.ThreadPool`` as an + asynchronous context manager, and does not need to be called directly. + + Raises a :exc:`RuntimeError` if the threadpool has already been closed. + +Examples +======== + +Here's an example of concurrently running two IO-bound functions using +:class:`asyncio.ThreadPool`:: + + import asyncio + + def blocking_io(): + print("start blocking_io") + with open('/dev/urandom', 'rb') as f: + f.read(100_000) + print("blocking_io complete") + + def other_blocking_io(): + print("start other_blocking_io") + with open('/dev/zero', 'rb') as f: + f.read(10) + print("other_blocking_io complete") + + async def main(): + with asyncio.ThreadPool() as pool: + await asyncio.gather( + pool.run(blocking_io), + pool.run(other_io)) + + asyncio.run(main()) diff --git a/Doc/library/asyncio.rst b/Doc/library/asyncio.rst index 94a853259d3483..aad339a31defc4 100644 --- a/Doc/library/asyncio.rst +++ b/Doc/library/asyncio.rst @@ -43,6 +43,8 @@ asyncio provides a set of **high-level** APIs to: * :ref:`synchronize ` concurrent code; +* concurrently run blocking functions in a :ref:`pool `; + Additionally, there are **low-level** APIs for *library and framework developers* to: @@ -73,6 +75,7 @@ Additionally, there are **low-level** APIs for asyncio-subprocess.rst asyncio-queue.rst asyncio-exceptions.rst + asyncio-pools.rst .. toctree:: :caption: Low-level APIs diff --git a/Lib/asyncio/__init__.py b/Lib/asyncio/__init__.py index 28c2e2c429f34a..b1930ffad1c649 100644 --- a/Lib/asyncio/__init__.py +++ b/Lib/asyncio/__init__.py @@ -11,6 +11,7 @@ from .exceptions import * from .futures import * from .locks import * +from .pools import * from .protocols import * from .runners import * from .queues import * @@ -29,6 +30,7 @@ exceptions.__all__ + futures.__all__ + locks.__all__ + + pools.__all__ + protocols.__all__ + runners.__all__ + queues.__all__ + diff --git a/Lib/asyncio/pools.py b/Lib/asyncio/pools.py new file mode 100644 index 00000000000000..0d1828a79db128 --- /dev/null +++ b/Lib/asyncio/pools.py @@ -0,0 +1,166 @@ +"""Support for high-level asynchronous pools in asyncio.""" + +__all__ = 'ThreadPool', + + +import concurrent.futures +import functools +import threading +import os + +from . import events +from . import exceptions +from . import futures + + +class AbstractPool: + """Abstract base class for asynchronous pools.""" + + async def astart(self): + raise NotImplementedError + + async def __aenter__(self): + await self.astart() + return self + + async def aclose(self): + raise NotImplementedError + + async def __aexit__(self, exc_type, exc_value, exc_traceback): + await self.aclose() + + async def run(self, /, func, *args, **kwargs): + """Asynchronously run function *func* using the pool. + + Return a future, representing the eventual result of *func*. + """ + raise NotImplementedError + + +class ThreadPool(AbstractPool): + """Asynchronous threadpool for running IO-bound functions. + + Directly calling an IO-bound function within the main thread will block + other operations from occurring until it is completed. By using a + threadpool, several IO-bound functions can be ran concurrently within + their own threads, without blocking other operations. + + The optional argument *concurrency* sets the number of threads within the + threadpool. If *concurrency* is `None`, the maximum number of threads will + be used; based on the number of CPU cores. + + This threadpool is intended to be used as an asynchronous context manager, + using the `async with` syntax, which provides automatic initialization and + finalization of resources. For example: + + import asyncio + + def blocking_io(): + print("start blocking_io") + with open('/dev/urandom', 'rb') as f: + f.read(100_000) + print("blocking_io complete") + + def other_blocking_io(): + print("start other_blocking_io") + with open('/dev/zero', 'rb') as f: + f.read(10) + print("other_blocking_io complete") + + async def main(): + with asyncio.ThreadPool() as pool: + await asyncio.gather( + pool.run(blocking_io), + pool.run(other_io)) + + asyncio.run(main()) + """ + + def __init__(self, concurrency=None): + if concurrency is None: + concurrency = min(32, (os.cpu_count() or 1) + 4) + + self._concurrency = concurrency + self._running = False + self._closed = False + self._loop = None + self._pool = None + + async def astart(self): + self._loop = events.get_running_loop() + await self._spawn_threadpool() + + async def __aenter__(self): + await self.astart() + return self + + async def aclose(self): + await self._shutdown_threadpool() + + async def __aexit__(self, exc_type, exc_value, exc_traceback): + await self.aclose() + + async def run(self, /, func, *args, **kwargs): + if not self._running: + raise RuntimeError(f"unable to run {func!r}, " + "threadpool is not running") + + func_call = functools.partial(func, *args, **kwargs) + executor = self._pool + return await futures.wrap_future( + executor.submit(func_call), loop=self._loop) + + async def _spawn_threadpool(self): + """Schedule the spawning of the threadpool. + + Asynchronously spawns a threadpool with *concurrency* threads. + """ + if self._running: + raise RuntimeError("threadpool is already running") + + if self._closed: + raise RuntimeError("threadpool is closed") + + future = self._loop.create_future() + thread = threading.Thread(target=self._do_spawn, args=(future,)) + thread.start() + try: + await future + finally: + thread.join() + + def _do_spawn(self, future): + try: + self._pool = concurrent.futures.ThreadPoolExecutor( + max_workers=self._concurrency) + self._running = True + self._loop.call_soon_threadsafe(future.set_result, None) + except Exception as ex: + self._loop.call_soon_threadsafe(future.exception, ex) + + async def _shutdown_threadpool(self): + """Schedule the shutdown of the threadpool. + + Asynchronously joins all of the threads in the threadpool. + """ + if self._closed: + raise RuntimeError("threadpool is already closed") + + # Set _running to False as early as possible + self._running = False + + future = self._loop.create_future() + thread = threading.Thread(target=self._do_shutdown, args=(future,)) + thread.start() + try: + await future + finally: + thread.join() + + def _do_shutdown(self, future): + try: + self._pool.shutdown() + self._closed = True + self._loop.call_soon_threadsafe(future.set_result, None) + except Exception as ex: + self._loop.call_soon_threadsafe(future.exception, ex) diff --git a/Lib/test/test_asyncio/test_pools.py b/Lib/test/test_asyncio/test_pools.py new file mode 100644 index 00000000000000..c9a27d92ceaa5d --- /dev/null +++ b/Lib/test/test_asyncio/test_pools.py @@ -0,0 +1,182 @@ +"""Tests for pools.py""" + +import asyncio +import unittest + +from unittest import mock + + +def tearDownModule(): + asyncio.set_event_loop_policy(None) + + +class AbstractPoolTests(unittest.TestCase): + + def setUp(self): + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + def tearDown(self): + self.loop.close() + self.loop = None + + def test_methods_not_implemented(self): + pool = asyncio.pools.AbstractPool() + with self.assertRaises(NotImplementedError): + self.loop.run_until_complete(pool.astart()) + + with self.assertRaises(NotImplementedError): + self.loop.run_until_complete(pool.aclose()) + + func = mock.Mock() + with self.assertRaises(NotImplementedError): + self.loop.run_until_complete(pool.run(func)) + func.assert_not_called() + + def test_context_manager_not_implemented(self): + async def pool_cm(): + async with asyncio.pools.AbstractPool() as pool: + pass + + with self.assertRaises(NotImplementedError): + self.loop.run_until_complete(pool_cm()) + + +class ThreadPoolTests(unittest.TestCase): + + def setUp(self): + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + def tearDown(self): + self.loop.close() + asyncio.set_event_loop(None) + self.loop = None + + def test_running(self): + async def main(): + async with asyncio.ThreadPool() as pool: + self.assertTrue(pool._running) + self.assertFalse(pool._closed) + + self.loop.run_until_complete(main()) + + def test_closed(self): + async def main(): + async with asyncio.ThreadPool() as pool: + pass + + self.assertTrue(pool._closed) + self.assertFalse(pool._running) + + self.loop.run_until_complete(main()) + + def test_concurrency(self): + async def main(): + async with asyncio.ThreadPool() as pool: + # We can't guarantee the number of threads since it's system + # specific, but it should never be None after the thread pool + # is started. + self.assertIsNotNone(pool._concurrency) + + self.loop.run_until_complete(main()) + + def test_concurrency_explicit(self): + async def main(): + async with asyncio.ThreadPool(concurrency=5) as pool: + self.assertEqual(pool._concurrency, 5) + + self.loop.run_until_complete(main()) + + def test_run(self): + async def main(): + async with asyncio.ThreadPool(concurrency=5) as pool: + return await pool.run(sum, [40, 2]) + + result = self.loop.run_until_complete(main()) + self.assertEqual(result, 42) + + def test_run_once(self): + func = mock.Mock() + + async def main(): + async with asyncio.ThreadPool(concurrency=5) as pool: + await pool.run(func) + + self.loop.run_until_complete(main()) + func.assert_called_once() + + def test_run_concurrent(self): + func = mock.Mock() + + async def main(): + futs = [] + async with asyncio.ThreadPool(concurrency=5) as pool: + for _ in range(10): + fut = pool.run(func) + futs.append(fut) + await asyncio.gather(*futs) + + self.loop.run_until_complete(main()) + self.assertEqual(func.call_count, 10) + + def test_run_after_exit(self): + func = mock.Mock() + + async def main(): + async with asyncio.ThreadPool() as pool: + pass + + with self.assertRaises(RuntimeError): + await pool.run(func) + + self.loop.run_until_complete(main()) + func.assert_not_called() + + def test_run_before_start(self): + func = mock.Mock() + + async def main(): + pool = asyncio.ThreadPool() + with self.assertRaises(RuntimeError): + await pool.run(func) + + self.loop.run_until_complete(main()) + func.assert_not_called() + + def test_start_twice(self): + async def main(): + pool = asyncio.ThreadPool() + async with pool: + with self.assertRaises(RuntimeError): + async with pool: + pass + + self.loop.run_until_complete(main()) + + def test_start_after_close(self): + async def main(): + pool = asyncio.ThreadPool() + async with pool: + pass + + with self.assertRaises(RuntimeError): + async with pool: + pass + + self.loop.run_until_complete(main()) + + def test_close_twice(self): + async def main(): + pool = asyncio.ThreadPool() + async with pool: + pass + + with self.assertRaises(RuntimeError): + await pool.aclose() + + self.loop.run_until_complete(main()) + + +if __name__ == '__main__': + unittest.main() From 323ef09bfd4b01ede50fcf946521a9853551a463 Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Sat, 8 Feb 2020 03:49:57 +0000 Subject: [PATCH 02/18] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20b?= =?UTF-8?q?lurb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Library/2020-02-08-03-49-55.bpo-32309.B88xPb.rst | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2020-02-08-03-49-55.bpo-32309.B88xPb.rst diff --git a/Misc/NEWS.d/next/Library/2020-02-08-03-49-55.bpo-32309.B88xPb.rst b/Misc/NEWS.d/next/Library/2020-02-08-03-49-55.bpo-32309.B88xPb.rst new file mode 100644 index 00000000000000..c30546a820c3b8 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2020-02-08-03-49-55.bpo-32309.B88xPb.rst @@ -0,0 +1,5 @@ +Added :class:`asyncio.ThreadPool`, an asynchronous context manager for +concurrently running IO-bound functions without blocking the event loop. +It essentially works as a higher-level version of +:meth:`asyncio.loop.run_in_executor` that can take keyword arguments and +be used as a context manager using ``async with``. \ No newline at end of file From 2dc325a03f3c172b6d6ad2e80a4cbc7104588cd3 Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Fri, 7 Feb 2020 22:58:09 -0500 Subject: [PATCH 03/18] Add whatsnew for 3.9 --- Doc/whatsnew/3.9.rst | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/Doc/whatsnew/3.9.rst b/Doc/whatsnew/3.9.rst index 4991e56759b1c2..c58e6fcb5b9d78 100644 --- a/Doc/whatsnew/3.9.rst +++ b/Doc/whatsnew/3.9.rst @@ -137,6 +137,13 @@ details, see the documentation for ``loop.create_datagram_endpoint()``. (Contributed by Kyle Stanley, Antoine Pitrou, and Yury Selivanov in :issue:`37228`.) +Added :class:`asyncio.ThreadPool`, an asynchronous context manager for +concurrently running IO-bound functions without blocking the event loop. +It essentially works as a higher-level version of +:meth:`asyncio.loop.run_in_executor` that can take keyword arguments and +be used as a context manager using ``async with``. +(Contributed by Kyle Stanley in :issue:`32309`.) + Added a new :term:`coroutine` :meth:`~asyncio.loop.shutdown_default_executor` that schedules a shutdown for the default executor that waits on the :class:`~concurrent.futures.ThreadPoolExecutor` to finish closing. Also, From bf3e022d5334b127b394b6a30070f79758b34641 Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Fri, 7 Feb 2020 23:09:11 -0500 Subject: [PATCH 04/18] Fix asyncio.ThreadPool examples --- Doc/library/asyncio-pools.rst | 4 ++-- Lib/asyncio/pools.py | 24 ++++++++++++------------ 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/Doc/library/asyncio-pools.rst b/Doc/library/asyncio-pools.rst index 1703a055bb8182..25cd41cd626638 100644 --- a/Doc/library/asyncio-pools.rst +++ b/Doc/library/asyncio-pools.rst @@ -43,7 +43,7 @@ thread or process) without blocking the event loop. within the threadpool, and return a :class:`asyncio.Future` object that represents the eventual result of its execution. :: - with asyncio.ThreadPool() as pool: + async with asyncio.ThreadPool() as pool: await pool.run(time.sleep, 1) Raises a :exc:`RuntimeError` if the threadpool is *not* running. @@ -87,7 +87,7 @@ Here's an example of concurrently running two IO-bound functions using print("other_blocking_io complete") async def main(): - with asyncio.ThreadPool() as pool: + async with asyncio.ThreadPool() as pool: await asyncio.gather( pool.run(blocking_io), pool.run(other_io)) diff --git a/Lib/asyncio/pools.py b/Lib/asyncio/pools.py index 0d1828a79db128..7fe4dcf86de8b2 100644 --- a/Lib/asyncio/pools.py +++ b/Lib/asyncio/pools.py @@ -56,22 +56,22 @@ class ThreadPool(AbstractPool): import asyncio def blocking_io(): - print("start blocking_io") - with open('/dev/urandom', 'rb') as f: - f.read(100_000) - print("blocking_io complete") + print("start blocking_io") + with open('/dev/urandom', 'rb') as f: + f.read(100_000) + print("blocking_io complete") def other_blocking_io(): - print("start other_blocking_io") - with open('/dev/zero', 'rb') as f: - f.read(10) - print("other_blocking_io complete") + print("start other_blocking_io") + with open('/dev/zero', 'rb') as f: + f.read(10) + print("other_blocking_io complete") async def main(): - with asyncio.ThreadPool() as pool: - await asyncio.gather( - pool.run(blocking_io), - pool.run(other_io)) + async with asyncio.ThreadPool() as pool: + await asyncio.gather( + pool.run(blocking_io), + pool.run(other_io)) asyncio.run(main()) """ From 8b715febce83688b82366cba6244d63a568e5f2c Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Tue, 11 Feb 2020 17:17:27 -0500 Subject: [PATCH 05/18] Fix asyncio.ThreadPool example --- Doc/library/asyncio-pools.rst | 2 +- Lib/asyncio/pools.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Doc/library/asyncio-pools.rst b/Doc/library/asyncio-pools.rst index 25cd41cd626638..173b9b657904eb 100644 --- a/Doc/library/asyncio-pools.rst +++ b/Doc/library/asyncio-pools.rst @@ -90,6 +90,6 @@ Here's an example of concurrently running two IO-bound functions using async with asyncio.ThreadPool() as pool: await asyncio.gather( pool.run(blocking_io), - pool.run(other_io)) + pool.run(other_blocking_io)) asyncio.run(main()) diff --git a/Lib/asyncio/pools.py b/Lib/asyncio/pools.py index 7fe4dcf86de8b2..e6db274a7acfdf 100644 --- a/Lib/asyncio/pools.py +++ b/Lib/asyncio/pools.py @@ -71,7 +71,7 @@ async def main(): async with asyncio.ThreadPool() as pool: await asyncio.gather( pool.run(blocking_io), - pool.run(other_io)) + pool.run(other_blocking_io)) asyncio.run(main()) """ From ccd425ff81c346dae4e01618933b7514e1b4520c Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Wed, 12 Feb 2020 23:25:46 -0500 Subject: [PATCH 06/18] Improve tearDown() in test_pools --- Lib/test/test_asyncio/test_pools.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/test_asyncio/test_pools.py b/Lib/test/test_asyncio/test_pools.py index c9a27d92ceaa5d..3724d48e58fe39 100644 --- a/Lib/test/test_asyncio/test_pools.py +++ b/Lib/test/test_asyncio/test_pools.py @@ -17,8 +17,8 @@ def setUp(self): asyncio.set_event_loop(self.loop) def tearDown(self): + asyncio.set_event_loop(None) self.loop.close() - self.loop = None def test_methods_not_implemented(self): pool = asyncio.pools.AbstractPool() From 2dba11d3221c81acf00c8cec93ddf12c5a1ba869 Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Sun, 1 Mar 2020 23:39:20 -0500 Subject: [PATCH 07/18] Co-authored-by: Antoine Pitrou Use abc.ABC and abc.abstractmethod to prevent direct instantiation of AbstractPool. --- Lib/asyncio/pools.py | 11 +++++++++-- Lib/test/test_asyncio/test_pools.py | 15 +++------------ 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/Lib/asyncio/pools.py b/Lib/asyncio/pools.py index e6db274a7acfdf..0672bc70c51d26 100644 --- a/Lib/asyncio/pools.py +++ b/Lib/asyncio/pools.py @@ -8,27 +8,34 @@ import threading import os +from abc import ABC, abstractmethod + from . import events from . import exceptions from . import futures -class AbstractPool: +class AbstractPool(ABC): """Abstract base class for asynchronous pools.""" - + + @abstractmethod async def astart(self): raise NotImplementedError + @abstractmethod async def __aenter__(self): await self.astart() return self + @abstractmethod async def aclose(self): raise NotImplementedError + @abstractmethod async def __aexit__(self, exc_type, exc_value, exc_traceback): await self.aclose() + @abstractmethod async def run(self, /, func, *args, **kwargs): """Asynchronously run function *func* using the pool. diff --git a/Lib/test/test_asyncio/test_pools.py b/Lib/test/test_asyncio/test_pools.py index 3724d48e58fe39..184b37541b6d91 100644 --- a/Lib/test/test_asyncio/test_pools.py +++ b/Lib/test/test_asyncio/test_pools.py @@ -21,24 +21,15 @@ def tearDown(self): self.loop.close() def test_methods_not_implemented(self): - pool = asyncio.pools.AbstractPool() - with self.assertRaises(NotImplementedError): - self.loop.run_until_complete(pool.astart()) - - with self.assertRaises(NotImplementedError): - self.loop.run_until_complete(pool.aclose()) - - func = mock.Mock() - with self.assertRaises(NotImplementedError): - self.loop.run_until_complete(pool.run(func)) - func.assert_not_called() + with self.assertRaises(TypeError): + asyncio.pools.AbstractPool() def test_context_manager_not_implemented(self): async def pool_cm(): async with asyncio.pools.AbstractPool() as pool: pass - with self.assertRaises(NotImplementedError): + with self.assertRaises(TypeError): self.loop.run_until_complete(pool_cm()) From fb194efe37b2183a12a30c8e9dc751571b85e742 Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Mon, 2 Mar 2020 00:11:30 -0500 Subject: [PATCH 08/18] Fix whitespace --- Lib/asyncio/pools.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/asyncio/pools.py b/Lib/asyncio/pools.py index 0672bc70c51d26..765afad98a5192 100644 --- a/Lib/asyncio/pools.py +++ b/Lib/asyncio/pools.py @@ -17,7 +17,7 @@ class AbstractPool(ABC): """Abstract base class for asynchronous pools.""" - + @abstractmethod async def astart(self): raise NotImplementedError From 1a8a3daecc11cddfb29a94814cb30d415f1019dc Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Wed, 13 May 2020 20:05:33 -0400 Subject: [PATCH 09/18] Replace "threadpool" with "thread pool" Co-authored-by: Tom M --- Doc/library/asyncio-pools.rst | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/Doc/library/asyncio-pools.rst b/Doc/library/asyncio-pools.rst index 173b9b657904eb..93ba729e94a8ae 100644 --- a/Doc/library/asyncio-pools.rst +++ b/Doc/library/asyncio-pools.rst @@ -29,42 +29,42 @@ thread or process) without blocking the event loop. .. class:: ThreadPool(concurrency=None) - An asynchronous threadpool that provides methods to concurrently + An asynchronous thread pool that provides methods to concurrently run IO-bound functions, without blocking the event loop. *concurrency* is an optional argument that limits the number of - threads to utilize in the threadpool. With the default value of + threads to utilize in the thread pool. With the default value of ``None``, the amount of threads used will scale based on the number of processors. .. coroutinemethod:: run(/, func, *args, **kwargs) Asynchronously run *func* with its arguments and keyword-arguments - within the threadpool, and return a :class:`asyncio.Future` object + within the thread pool, and return a :class:`asyncio.Future` object that represents the eventual result of its execution. :: async with asyncio.ThreadPool() as pool: await pool.run(time.sleep, 1) - Raises a :exc:`RuntimeError` if the threadpool is *not* running. + Raises a :exc:`RuntimeError` if the thread pool is *not* running. .. coroutinemethod:: astart() - Schedule the start of the threadpool and spawn its threads. Note that + Schedule the start of the thread pool and spawn its threads. Note that this function is called automatically when using ``asyncio.ThreadPool`` as an asynchronous context manager, and does not need to be called directly. - Raises a :exc:`RuntimeError` if the threadpool is already running or + Raises a :exc:`RuntimeError` if the thread pool is already running or if it's been closed. .. coroutinemethod:: aclose() - Schedule the closing of the threadpool. Note that this function is + Schedule the closing of the thread pool. Note that this function is called automatically when using ``asyncio.ThreadPool`` as an asynchronous context manager, and does not need to be called directly. - Raises a :exc:`RuntimeError` if the threadpool has already been closed. + Raises a :exc:`RuntimeError` if the thread pool has already been closed. Examples ======== From 59eb5ce553e47e5b82e0d679793c95ae43409e2d Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Thu, 14 May 2020 00:53:34 -0400 Subject: [PATCH 10/18] Simplify astart() and aclose() docs Co-authored-by: Yury Selivanov --- Doc/library/asyncio-pools.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Doc/library/asyncio-pools.rst b/Doc/library/asyncio-pools.rst index 93ba729e94a8ae..1879055b7ea846 100644 --- a/Doc/library/asyncio-pools.rst +++ b/Doc/library/asyncio-pools.rst @@ -50,7 +50,7 @@ thread or process) without blocking the event loop. .. coroutinemethod:: astart() - Schedule the start of the thread pool and spawn its threads. Note that + Start the thread pool and spawn its threads. Note that this function is called automatically when using ``asyncio.ThreadPool`` as an asynchronous context manager, and does not need to be called directly. @@ -60,7 +60,7 @@ thread or process) without blocking the event loop. .. coroutinemethod:: aclose() - Schedule the closing of the thread pool. Note that this function is + Close the thread pool. Note that this function is called automatically when using ``asyncio.ThreadPool`` as an asynchronous context manager, and does not need to be called directly. From 00cd86be3c47dabafcfcd65fc90ccaabe98cee9a Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Thu, 14 May 2020 00:56:42 -0400 Subject: [PATCH 11/18] Allow *func* to be a kwarg for functions passed to run() Co-authored-by: Yury Selivanov --- Lib/asyncio/pools.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/asyncio/pools.py b/Lib/asyncio/pools.py index 765afad98a5192..d1eb4c2eca2949 100644 --- a/Lib/asyncio/pools.py +++ b/Lib/asyncio/pools.py @@ -36,7 +36,7 @@ async def __aexit__(self, exc_type, exc_value, exc_traceback): await self.aclose() @abstractmethod - async def run(self, /, func, *args, **kwargs): + async def run(self, func, /, *args, **kwargs): """Asynchronously run function *func* using the pool. Return a future, representing the eventual result of *func*. From e2df3e58aeb0f6a6fda4cc1b02f4d699fee3bb0f Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Thu, 14 May 2020 01:47:10 -0400 Subject: [PATCH 12/18] Apply suggestions from code review Update run() docs function signature and use future.set_exception() instead of future.exception() Co-authored-by: Yury Selivanov --- Doc/library/asyncio-pools.rst | 2 +- Lib/asyncio/pools.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Doc/library/asyncio-pools.rst b/Doc/library/asyncio-pools.rst index 1879055b7ea846..e4f74b3c1682c5 100644 --- a/Doc/library/asyncio-pools.rst +++ b/Doc/library/asyncio-pools.rst @@ -37,7 +37,7 @@ thread or process) without blocking the event loop. ``None``, the amount of threads used will scale based on the number of processors. - .. coroutinemethod:: run(/, func, *args, **kwargs) + .. coroutinemethod:: run(func, /, *args, **kwargs) Asynchronously run *func* with its arguments and keyword-arguments within the thread pool, and return a :class:`asyncio.Future` object diff --git a/Lib/asyncio/pools.py b/Lib/asyncio/pools.py index d1eb4c2eca2949..ec27ad7d1427f3 100644 --- a/Lib/asyncio/pools.py +++ b/Lib/asyncio/pools.py @@ -143,7 +143,7 @@ def _do_spawn(self, future): self._running = True self._loop.call_soon_threadsafe(future.set_result, None) except Exception as ex: - self._loop.call_soon_threadsafe(future.exception, ex) + self._loop.call_soon_threadsafe(future.set_exception, ex) async def _shutdown_threadpool(self): """Schedule the shutdown of the threadpool. @@ -170,4 +170,4 @@ def _do_shutdown(self, future): self._closed = True self._loop.call_soon_threadsafe(future.set_result, None) except Exception as ex: - self._loop.call_soon_threadsafe(future.exception, ex) + self._loop.call_soon_threadsafe(future.set_exception, ex) From 70895b93e51f39c2986af0c1a9d681cc26ab3b2f Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Thu, 14 May 2020 02:32:53 -0400 Subject: [PATCH 13/18] Add test_run_exception() --- Lib/test/test_asyncio/test_pools.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/Lib/test/test_asyncio/test_pools.py b/Lib/test/test_asyncio/test_pools.py index 184b37541b6d91..57adab1701ceb6 100644 --- a/Lib/test/test_asyncio/test_pools.py +++ b/Lib/test/test_asyncio/test_pools.py @@ -87,6 +87,17 @@ async def main(): result = self.loop.run_until_complete(main()) self.assertEqual(result, 42) + def test_run_exception(self): + def raise_runtime(): + raise RuntimeError("test") + + async def main(): + async with asyncio.ThreadPool(concurrency=5) as pool: + await pool.run(raise_runtime) + + with self.assertRaisesRegex(RuntimeError, "test"): + self.loop.run_until_complete(main()) + def test_run_once(self): func = mock.Mock() From 245d93ef8fce0bb92ef54824b0f4a20dcfdcd859 Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Thu, 14 May 2020 04:02:34 -0400 Subject: [PATCH 14/18] Use run_in_executor() for startup/shutdown instead of seperate thread Co-authored-by: Inada Naoki --- Lib/asyncio/pools.py | 41 ++++++++++------------------------------- 1 file changed, 10 insertions(+), 31 deletions(-) diff --git a/Lib/asyncio/pools.py b/Lib/asyncio/pools.py index ec27ad7d1427f3..7780ef292e60cd 100644 --- a/Lib/asyncio/pools.py +++ b/Lib/asyncio/pools.py @@ -128,22 +128,12 @@ async def _spawn_threadpool(self): if self._closed: raise RuntimeError("threadpool is closed") - future = self._loop.create_future() - thread = threading.Thread(target=self._do_spawn, args=(future,)) - thread.start() - try: - await future - finally: - thread.join() - - def _do_spawn(self, future): - try: - self._pool = concurrent.futures.ThreadPoolExecutor( - max_workers=self._concurrency) - self._running = True - self._loop.call_soon_threadsafe(future.set_result, None) - except Exception as ex: - self._loop.call_soon_threadsafe(future.set_exception, ex) + await self._loop.run_in_executor(None, self._do_spawn) + + def _do_spawn(self): + self._pool = concurrent.futures.ThreadPoolExecutor( + max_workers=self._concurrency) + self._running = True async def _shutdown_threadpool(self): """Schedule the shutdown of the threadpool. @@ -155,19 +145,8 @@ async def _shutdown_threadpool(self): # Set _running to False as early as possible self._running = False + await self._loop.run_in_executor(None, self._do_shutdown) - future = self._loop.create_future() - thread = threading.Thread(target=self._do_shutdown, args=(future,)) - thread.start() - try: - await future - finally: - thread.join() - - def _do_shutdown(self, future): - try: - self._pool.shutdown() - self._closed = True - self._loop.call_soon_threadsafe(future.set_result, None) - except Exception as ex: - self._loop.call_soon_threadsafe(future.set_exception, ex) + def _do_shutdown(self): + self._pool.shutdown() + self._closed = True From 79c6b5b54ae3a4eec9fcd77147c729c22910dda5 Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Thu, 14 May 2020 04:20:51 -0400 Subject: [PATCH 15/18] Add missing conversions from 'threadpool' to 'thread pool' --- Lib/asyncio/pools.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/Lib/asyncio/pools.py b/Lib/asyncio/pools.py index 7780ef292e60cd..26ff5cd6ffd240 100644 --- a/Lib/asyncio/pools.py +++ b/Lib/asyncio/pools.py @@ -45,18 +45,18 @@ async def run(self, func, /, *args, **kwargs): class ThreadPool(AbstractPool): - """Asynchronous threadpool for running IO-bound functions. + """Asynchronous thread pool for running IO-bound functions. Directly calling an IO-bound function within the main thread will block other operations from occurring until it is completed. By using a - threadpool, several IO-bound functions can be ran concurrently within + thread pool, several IO-bound functions can be ran concurrently within their own threads, without blocking other operations. The optional argument *concurrency* sets the number of threads within the - threadpool. If *concurrency* is `None`, the maximum number of threads will + thread pool. If *concurrency* is `None`, the maximum number of threads will be used; based on the number of CPU cores. - This threadpool is intended to be used as an asynchronous context manager, + This thread pool is intended to be used as an asynchronous context manager, using the `async with` syntax, which provides automatic initialization and finalization of resources. For example: @@ -110,7 +110,7 @@ async def __aexit__(self, exc_type, exc_value, exc_traceback): async def run(self, /, func, *args, **kwargs): if not self._running: raise RuntimeError(f"unable to run {func!r}, " - "threadpool is not running") + "thread pool is not running") func_call = functools.partial(func, *args, **kwargs) executor = self._pool @@ -118,15 +118,15 @@ async def run(self, /, func, *args, **kwargs): executor.submit(func_call), loop=self._loop) async def _spawn_threadpool(self): - """Schedule the spawning of the threadpool. + """Spawn the thread pool. - Asynchronously spawns a threadpool with *concurrency* threads. + Asynchronously spawns a thread pool with *concurrency* threads. """ if self._running: - raise RuntimeError("threadpool is already running") + raise RuntimeError("thread pool is already running") if self._closed: - raise RuntimeError("threadpool is closed") + raise RuntimeError("thread pool is closed") await self._loop.run_in_executor(None, self._do_spawn) @@ -136,12 +136,12 @@ def _do_spawn(self): self._running = True async def _shutdown_threadpool(self): - """Schedule the shutdown of the threadpool. + """Shutdown the thread pool. - Asynchronously joins all of the threads in the threadpool. + Asynchronously joins all of the threads in the thread pool. """ if self._closed: - raise RuntimeError("threadpool is already closed") + raise RuntimeError("thread pool is already closed") # Set _running to False as early as possible self._running = False From baec4280d935e1fb238e247523f69370ad34b114 Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Thu, 14 May 2020 04:44:55 -0400 Subject: [PATCH 16/18] Add shutdown_default_executor() to tearDown() --- Lib/test/test_asyncio/test_pools.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Lib/test/test_asyncio/test_pools.py b/Lib/test/test_asyncio/test_pools.py index 57adab1701ceb6..955bc216f3c74e 100644 --- a/Lib/test/test_asyncio/test_pools.py +++ b/Lib/test/test_asyncio/test_pools.py @@ -17,6 +17,8 @@ def setUp(self): asyncio.set_event_loop(self.loop) def tearDown(self): + self.loop.run_until_complete( + self.loop.shutdown_default_executor()) asyncio.set_event_loop(None) self.loop.close() From 25963afdadb118b0722b84df635899044b321222 Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Thu, 14 May 2020 21:11:05 -0400 Subject: [PATCH 17/18] Fix ThreadPool.run() function signature Co-authored-by: Yury Selivanov --- Lib/asyncio/pools.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/asyncio/pools.py b/Lib/asyncio/pools.py index 26ff5cd6ffd240..c69ba2cb6368b8 100644 --- a/Lib/asyncio/pools.py +++ b/Lib/asyncio/pools.py @@ -107,7 +107,7 @@ async def aclose(self): async def __aexit__(self, exc_type, exc_value, exc_traceback): await self.aclose() - async def run(self, /, func, *args, **kwargs): + async def run(self, func, /, *args, **kwargs): if not self._running: raise RuntimeError(f"unable to run {func!r}, " "thread pool is not running") From bd29b9427aab637fc167e735f127f74341f585d5 Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Thu, 14 May 2020 22:22:51 -0400 Subject: [PATCH 18/18] Add shutdown_default_executor to ThreadPoolTests.tearDown() --- Lib/test/test_asyncio/test_pools.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Lib/test/test_asyncio/test_pools.py b/Lib/test/test_asyncio/test_pools.py index 955bc216f3c74e..adb251d192f3bf 100644 --- a/Lib/test/test_asyncio/test_pools.py +++ b/Lib/test/test_asyncio/test_pools.py @@ -42,6 +42,8 @@ def setUp(self): asyncio.set_event_loop(self.loop) def tearDown(self): + self.loop.run_until_complete( + self.loop.shutdown_default_executor()) self.loop.close() asyncio.set_event_loop(None) self.loop = None