diff --git a/Doc/library/asyncio-pools.rst b/Doc/library/asyncio-pools.rst new file mode 100644 index 00000000000000..e4f74b3c1682c5 --- /dev/null +++ b/Doc/library/asyncio-pools.rst @@ -0,0 +1,95 @@ +.. currentmodule:: asyncio + +.. versionadded:: 3.9 + +.. _asyncio-pools: + +===== +Pools +===== + +**Source code:** :source:`Lib/asyncio/pools.py` + +----------------------------------------------- + +.. note:: + This section of the documentation and all of its members have been + added *provisionally* to asyncio's API. For more details, see + :term:`provisional api`. + +Asyncio pools are high-level, asynchronous context managers that can be used +to concurrently run blocking functions and methods. + +There are many potential use cases, but a particularly useful one is for +combining libraries without asyncio support with existing asyncio programs. +Normally, calling a non-async function within the event loop would will result +in blocking the event loop until the function returns. However, by using a +pool to run the function, it can be executed in a separate worker (such as a +thread or process) without blocking the event loop. + +.. class:: ThreadPool(concurrency=None) + + An asynchronous thread pool that provides methods to concurrently + run IO-bound functions, without blocking the event loop. + + *concurrency* is an optional argument that limits the number of + threads to utilize in the thread pool. With the default value of + ``None``, the amount of threads used will scale based on the + number of processors. + + .. coroutinemethod:: run(func, /, *args, **kwargs) + + Asynchronously run *func* with its arguments and keyword-arguments + within the thread pool, and return a :class:`asyncio.Future` object + that represents the eventual result of its execution. :: + + async with asyncio.ThreadPool() as pool: + await pool.run(time.sleep, 1) + + Raises a :exc:`RuntimeError` if the thread pool is *not* running. + + .. coroutinemethod:: astart() + + Start the thread pool and spawn its threads. Note that + this function is called automatically when using ``asyncio.ThreadPool`` + as an asynchronous context manager, and does not need to be called + directly. + + Raises a :exc:`RuntimeError` if the thread pool is already running or + if it's been closed. + + .. coroutinemethod:: aclose() + + Close the thread pool. Note that this function is + called automatically when using ``asyncio.ThreadPool`` as an + asynchronous context manager, and does not need to be called directly. + + Raises a :exc:`RuntimeError` if the thread pool has already been closed. + +Examples +======== + +Here's an example of concurrently running two IO-bound functions using +:class:`asyncio.ThreadPool`:: + + import asyncio + + def blocking_io(): + print("start blocking_io") + with open('/dev/urandom', 'rb') as f: + f.read(100_000) + print("blocking_io complete") + + def other_blocking_io(): + print("start other_blocking_io") + with open('/dev/zero', 'rb') as f: + f.read(10) + print("other_blocking_io complete") + + async def main(): + async with asyncio.ThreadPool() as pool: + await asyncio.gather( + pool.run(blocking_io), + pool.run(other_blocking_io)) + + asyncio.run(main()) diff --git a/Doc/library/asyncio.rst b/Doc/library/asyncio.rst index 94a853259d3483..aad339a31defc4 100644 --- a/Doc/library/asyncio.rst +++ b/Doc/library/asyncio.rst @@ -43,6 +43,8 @@ asyncio provides a set of **high-level** APIs to: * :ref:`synchronize ` concurrent code; +* concurrently run blocking functions in a :ref:`pool `; + Additionally, there are **low-level** APIs for *library and framework developers* to: @@ -73,6 +75,7 @@ Additionally, there are **low-level** APIs for asyncio-subprocess.rst asyncio-queue.rst asyncio-exceptions.rst + asyncio-pools.rst .. toctree:: :caption: Low-level APIs diff --git a/Doc/whatsnew/3.9.rst b/Doc/whatsnew/3.9.rst index 4991e56759b1c2..c58e6fcb5b9d78 100644 --- a/Doc/whatsnew/3.9.rst +++ b/Doc/whatsnew/3.9.rst @@ -137,6 +137,13 @@ details, see the documentation for ``loop.create_datagram_endpoint()``. (Contributed by Kyle Stanley, Antoine Pitrou, and Yury Selivanov in :issue:`37228`.) +Added :class:`asyncio.ThreadPool`, an asynchronous context manager for +concurrently running IO-bound functions without blocking the event loop. +It essentially works as a higher-level version of +:meth:`asyncio.loop.run_in_executor` that can take keyword arguments and +be used as a context manager using ``async with``. +(Contributed by Kyle Stanley in :issue:`32309`.) + Added a new :term:`coroutine` :meth:`~asyncio.loop.shutdown_default_executor` that schedules a shutdown for the default executor that waits on the :class:`~concurrent.futures.ThreadPoolExecutor` to finish closing. Also, diff --git a/Lib/asyncio/__init__.py b/Lib/asyncio/__init__.py index 28c2e2c429f34a..b1930ffad1c649 100644 --- a/Lib/asyncio/__init__.py +++ b/Lib/asyncio/__init__.py @@ -11,6 +11,7 @@ from .exceptions import * from .futures import * from .locks import * +from .pools import * from .protocols import * from .runners import * from .queues import * @@ -29,6 +30,7 @@ exceptions.__all__ + futures.__all__ + locks.__all__ + + pools.__all__ + protocols.__all__ + runners.__all__ + queues.__all__ + diff --git a/Lib/asyncio/pools.py b/Lib/asyncio/pools.py new file mode 100644 index 00000000000000..c69ba2cb6368b8 --- /dev/null +++ b/Lib/asyncio/pools.py @@ -0,0 +1,152 @@ +"""Support for high-level asynchronous pools in asyncio.""" + +__all__ = 'ThreadPool', + + +import concurrent.futures +import functools +import threading +import os + +from abc import ABC, abstractmethod + +from . import events +from . import exceptions +from . import futures + + +class AbstractPool(ABC): + """Abstract base class for asynchronous pools.""" + + @abstractmethod + async def astart(self): + raise NotImplementedError + + @abstractmethod + async def __aenter__(self): + await self.astart() + return self + + @abstractmethod + async def aclose(self): + raise NotImplementedError + + @abstractmethod + async def __aexit__(self, exc_type, exc_value, exc_traceback): + await self.aclose() + + @abstractmethod + async def run(self, func, /, *args, **kwargs): + """Asynchronously run function *func* using the pool. + + Return a future, representing the eventual result of *func*. + """ + raise NotImplementedError + + +class ThreadPool(AbstractPool): + """Asynchronous thread pool for running IO-bound functions. + + Directly calling an IO-bound function within the main thread will block + other operations from occurring until it is completed. By using a + thread pool, several IO-bound functions can be ran concurrently within + their own threads, without blocking other operations. + + The optional argument *concurrency* sets the number of threads within the + thread pool. If *concurrency* is `None`, the maximum number of threads will + be used; based on the number of CPU cores. + + This thread pool is intended to be used as an asynchronous context manager, + using the `async with` syntax, which provides automatic initialization and + finalization of resources. For example: + + import asyncio + + def blocking_io(): + print("start blocking_io") + with open('/dev/urandom', 'rb') as f: + f.read(100_000) + print("blocking_io complete") + + def other_blocking_io(): + print("start other_blocking_io") + with open('/dev/zero', 'rb') as f: + f.read(10) + print("other_blocking_io complete") + + async def main(): + async with asyncio.ThreadPool() as pool: + await asyncio.gather( + pool.run(blocking_io), + pool.run(other_blocking_io)) + + asyncio.run(main()) + """ + + def __init__(self, concurrency=None): + if concurrency is None: + concurrency = min(32, (os.cpu_count() or 1) + 4) + + self._concurrency = concurrency + self._running = False + self._closed = False + self._loop = None + self._pool = None + + async def astart(self): + self._loop = events.get_running_loop() + await self._spawn_threadpool() + + async def __aenter__(self): + await self.astart() + return self + + async def aclose(self): + await self._shutdown_threadpool() + + async def __aexit__(self, exc_type, exc_value, exc_traceback): + await self.aclose() + + async def run(self, func, /, *args, **kwargs): + if not self._running: + raise RuntimeError(f"unable to run {func!r}, " + "thread pool is not running") + + func_call = functools.partial(func, *args, **kwargs) + executor = self._pool + return await futures.wrap_future( + executor.submit(func_call), loop=self._loop) + + async def _spawn_threadpool(self): + """Spawn the thread pool. + + Asynchronously spawns a thread pool with *concurrency* threads. + """ + if self._running: + raise RuntimeError("thread pool is already running") + + if self._closed: + raise RuntimeError("thread pool is closed") + + await self._loop.run_in_executor(None, self._do_spawn) + + def _do_spawn(self): + self._pool = concurrent.futures.ThreadPoolExecutor( + max_workers=self._concurrency) + self._running = True + + async def _shutdown_threadpool(self): + """Shutdown the thread pool. + + Asynchronously joins all of the threads in the thread pool. + """ + if self._closed: + raise RuntimeError("thread pool is already closed") + + # Set _running to False as early as possible + self._running = False + await self._loop.run_in_executor(None, self._do_shutdown) + + def _do_shutdown(self): + self._pool.shutdown() + self._closed = True diff --git a/Lib/test/test_asyncio/test_pools.py b/Lib/test/test_asyncio/test_pools.py new file mode 100644 index 00000000000000..adb251d192f3bf --- /dev/null +++ b/Lib/test/test_asyncio/test_pools.py @@ -0,0 +1,188 @@ +"""Tests for pools.py""" + +import asyncio +import unittest + +from unittest import mock + + +def tearDownModule(): + asyncio.set_event_loop_policy(None) + + +class AbstractPoolTests(unittest.TestCase): + + def setUp(self): + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + def tearDown(self): + self.loop.run_until_complete( + self.loop.shutdown_default_executor()) + asyncio.set_event_loop(None) + self.loop.close() + + def test_methods_not_implemented(self): + with self.assertRaises(TypeError): + asyncio.pools.AbstractPool() + + def test_context_manager_not_implemented(self): + async def pool_cm(): + async with asyncio.pools.AbstractPool() as pool: + pass + + with self.assertRaises(TypeError): + self.loop.run_until_complete(pool_cm()) + + +class ThreadPoolTests(unittest.TestCase): + + def setUp(self): + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + def tearDown(self): + self.loop.run_until_complete( + self.loop.shutdown_default_executor()) + self.loop.close() + asyncio.set_event_loop(None) + self.loop = None + + def test_running(self): + async def main(): + async with asyncio.ThreadPool() as pool: + self.assertTrue(pool._running) + self.assertFalse(pool._closed) + + self.loop.run_until_complete(main()) + + def test_closed(self): + async def main(): + async with asyncio.ThreadPool() as pool: + pass + + self.assertTrue(pool._closed) + self.assertFalse(pool._running) + + self.loop.run_until_complete(main()) + + def test_concurrency(self): + async def main(): + async with asyncio.ThreadPool() as pool: + # We can't guarantee the number of threads since it's system + # specific, but it should never be None after the thread pool + # is started. + self.assertIsNotNone(pool._concurrency) + + self.loop.run_until_complete(main()) + + def test_concurrency_explicit(self): + async def main(): + async with asyncio.ThreadPool(concurrency=5) as pool: + self.assertEqual(pool._concurrency, 5) + + self.loop.run_until_complete(main()) + + def test_run(self): + async def main(): + async with asyncio.ThreadPool(concurrency=5) as pool: + return await pool.run(sum, [40, 2]) + + result = self.loop.run_until_complete(main()) + self.assertEqual(result, 42) + + def test_run_exception(self): + def raise_runtime(): + raise RuntimeError("test") + + async def main(): + async with asyncio.ThreadPool(concurrency=5) as pool: + await pool.run(raise_runtime) + + with self.assertRaisesRegex(RuntimeError, "test"): + self.loop.run_until_complete(main()) + + def test_run_once(self): + func = mock.Mock() + + async def main(): + async with asyncio.ThreadPool(concurrency=5) as pool: + await pool.run(func) + + self.loop.run_until_complete(main()) + func.assert_called_once() + + def test_run_concurrent(self): + func = mock.Mock() + + async def main(): + futs = [] + async with asyncio.ThreadPool(concurrency=5) as pool: + for _ in range(10): + fut = pool.run(func) + futs.append(fut) + await asyncio.gather(*futs) + + self.loop.run_until_complete(main()) + self.assertEqual(func.call_count, 10) + + def test_run_after_exit(self): + func = mock.Mock() + + async def main(): + async with asyncio.ThreadPool() as pool: + pass + + with self.assertRaises(RuntimeError): + await pool.run(func) + + self.loop.run_until_complete(main()) + func.assert_not_called() + + def test_run_before_start(self): + func = mock.Mock() + + async def main(): + pool = asyncio.ThreadPool() + with self.assertRaises(RuntimeError): + await pool.run(func) + + self.loop.run_until_complete(main()) + func.assert_not_called() + + def test_start_twice(self): + async def main(): + pool = asyncio.ThreadPool() + async with pool: + with self.assertRaises(RuntimeError): + async with pool: + pass + + self.loop.run_until_complete(main()) + + def test_start_after_close(self): + async def main(): + pool = asyncio.ThreadPool() + async with pool: + pass + + with self.assertRaises(RuntimeError): + async with pool: + pass + + self.loop.run_until_complete(main()) + + def test_close_twice(self): + async def main(): + pool = asyncio.ThreadPool() + async with pool: + pass + + with self.assertRaises(RuntimeError): + await pool.aclose() + + self.loop.run_until_complete(main()) + + +if __name__ == '__main__': + unittest.main() diff --git a/Misc/NEWS.d/next/Library/2020-02-08-03-49-55.bpo-32309.B88xPb.rst b/Misc/NEWS.d/next/Library/2020-02-08-03-49-55.bpo-32309.B88xPb.rst new file mode 100644 index 00000000000000..c30546a820c3b8 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2020-02-08-03-49-55.bpo-32309.B88xPb.rst @@ -0,0 +1,5 @@ +Added :class:`asyncio.ThreadPool`, an asynchronous context manager for +concurrently running IO-bound functions without blocking the event loop. +It essentially works as a higher-level version of +:meth:`asyncio.loop.run_in_executor` that can take keyword arguments and +be used as a context manager using ``async with``. \ No newline at end of file