-
-
Notifications
You must be signed in to change notification settings - Fork 32k
bpo-32309: Implement asyncio.ThreadPool #18410
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
aeros
wants to merge
18
commits into
python:master
from
aeros:bpo39349-add-cancel_futures-to-Executor.shutdown
Closed
Changes from 13 commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
8cc155e
Implement asyncio.ThreadPool
aeros 323ef09
📜🤖 Added by blurb_it.
blurb-it[bot] 2dc325a
Add whatsnew for 3.9
aeros bf3e022
Fix asyncio.ThreadPool examples
aeros 8b715fe
Fix asyncio.ThreadPool example
aeros ccd425f
Improve tearDown() in test_pools
aeros 2dba11d
Co-authored-by: Antoine Pitrou <[email protected]>
aeros fb194ef
Fix whitespace
aeros 1a8a3da
Replace "threadpool" with "thread pool"
aeros 59eb5ce
Simplify astart() and aclose() docs
aeros 00cd86b
Allow *func* to be a kwarg for functions passed to run()
aeros e2df3e5
Apply suggestions from code review
aeros 70895b9
Add test_run_exception()
aeros 245d93e
Use run_in_executor() for startup/shutdown instead of seperate thread
aeros 79c6b5b
Add missing conversions from 'threadpool' to 'thread pool'
aeros baec428
Add shutdown_default_executor() to tearDown()
aeros 25963af
Fix ThreadPool.run() function signature
aeros bd29b94
Add shutdown_default_executor to ThreadPoolTests.tearDown()
aeros File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
.. currentmodule:: asyncio | ||
|
||
.. versionadded:: 3.9 | ||
|
||
.. _asyncio-pools: | ||
|
||
===== | ||
Pools | ||
===== | ||
|
||
**Source code:** :source:`Lib/asyncio/pools.py` | ||
|
||
----------------------------------------------- | ||
|
||
.. note:: | ||
This section of the documentation and all of its members have been | ||
added *provisionally* to asyncio's API. For more details, see | ||
:term:`provisional api`. | ||
|
||
Asyncio pools are high-level, asynchronous context managers that can be used | ||
to concurrently run blocking functions and methods. | ||
|
||
There are many potential use cases, but a particularly useful one is for | ||
combining libraries without asyncio support with existing asyncio programs. | ||
Normally, calling a non-async function within the event loop would will result | ||
in blocking the event loop until the function returns. However, by using a | ||
pool to run the function, it can be executed in a separate worker (such as a | ||
thread or process) without blocking the event loop. | ||
|
||
.. class:: ThreadPool(concurrency=None) | ||
|
||
An asynchronous thread pool that provides methods to concurrently | ||
run IO-bound functions, without blocking the event loop. | ||
|
||
*concurrency* is an optional argument that limits the number of | ||
threads to utilize in the thread pool. With the default value of | ||
``None``, the amount of threads used will scale based on the | ||
number of processors. | ||
|
||
.. coroutinemethod:: run(func, /, *args, **kwargs) | ||
|
||
Asynchronously run *func* with its arguments and keyword-arguments | ||
within the thread pool, and return a :class:`asyncio.Future` object | ||
that represents the eventual result of its execution. :: | ||
|
||
async with asyncio.ThreadPool() as pool: | ||
await pool.run(time.sleep, 1) | ||
|
||
Raises a :exc:`RuntimeError` if the thread pool is *not* running. | ||
|
||
.. coroutinemethod:: astart() | ||
|
||
Start the thread pool and spawn its threads. Note that | ||
this function is called automatically when using ``asyncio.ThreadPool`` | ||
as an asynchronous context manager, and does not need to be called | ||
directly. | ||
|
||
Raises a :exc:`RuntimeError` if the thread pool is already running or | ||
if it's been closed. | ||
|
||
.. coroutinemethod:: aclose() | ||
|
||
Close the thread pool. Note that this function is | ||
called automatically when using ``asyncio.ThreadPool`` as an | ||
asynchronous context manager, and does not need to be called directly. | ||
|
||
Raises a :exc:`RuntimeError` if the thread pool has already been closed. | ||
|
||
Examples | ||
======== | ||
|
||
Here's an example of concurrently running two IO-bound functions using | ||
:class:`asyncio.ThreadPool`:: | ||
|
||
import asyncio | ||
|
||
def blocking_io(): | ||
print("start blocking_io") | ||
with open('/dev/urandom', 'rb') as f: | ||
f.read(100_000) | ||
print("blocking_io complete") | ||
|
||
def other_blocking_io(): | ||
print("start other_blocking_io") | ||
with open('/dev/zero', 'rb') as f: | ||
f.read(10) | ||
print("other_blocking_io complete") | ||
|
||
async def main(): | ||
async with asyncio.ThreadPool() as pool: | ||
await asyncio.gather( | ||
pool.run(blocking_io), | ||
pool.run(other_blocking_io)) | ||
|
||
asyncio.run(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,173 @@ | ||
"""Support for high-level asynchronous pools in asyncio.""" | ||
|
||
__all__ = 'ThreadPool', | ||
|
||
|
||
import concurrent.futures | ||
import functools | ||
import threading | ||
import os | ||
|
||
from abc import ABC, abstractmethod | ||
|
||
from . import events | ||
from . import exceptions | ||
from . import futures | ||
|
||
|
||
class AbstractPool(ABC): | ||
"""Abstract base class for asynchronous pools.""" | ||
|
||
@abstractmethod | ||
async def astart(self): | ||
raise NotImplementedError | ||
aeros marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
@abstractmethod | ||
async def __aenter__(self): | ||
await self.astart() | ||
return self | ||
|
||
@abstractmethod | ||
async def aclose(self): | ||
raise NotImplementedError | ||
|
||
@abstractmethod | ||
async def __aexit__(self, exc_type, exc_value, exc_traceback): | ||
await self.aclose() | ||
|
||
@abstractmethod | ||
async def run(self, func, /, *args, **kwargs): | ||
"""Asynchronously run function *func* using the pool. | ||
|
||
Return a future, representing the eventual result of *func*. | ||
""" | ||
raise NotImplementedError | ||
|
||
|
||
class ThreadPool(AbstractPool): | ||
"""Asynchronous threadpool for running IO-bound functions. | ||
|
||
Directly calling an IO-bound function within the main thread will block | ||
other operations from occurring until it is completed. By using a | ||
threadpool, several IO-bound functions can be ran concurrently within | ||
their own threads, without blocking other operations. | ||
|
||
The optional argument *concurrency* sets the number of threads within the | ||
threadpool. If *concurrency* is `None`, the maximum number of threads will | ||
be used; based on the number of CPU cores. | ||
|
||
This threadpool is intended to be used as an asynchronous context manager, | ||
using the `async with` syntax, which provides automatic initialization and | ||
finalization of resources. For example: | ||
|
||
import asyncio | ||
|
||
def blocking_io(): | ||
print("start blocking_io") | ||
with open('/dev/urandom', 'rb') as f: | ||
f.read(100_000) | ||
print("blocking_io complete") | ||
|
||
def other_blocking_io(): | ||
print("start other_blocking_io") | ||
with open('/dev/zero', 'rb') as f: | ||
f.read(10) | ||
print("other_blocking_io complete") | ||
|
||
async def main(): | ||
async with asyncio.ThreadPool() as pool: | ||
await asyncio.gather( | ||
pool.run(blocking_io), | ||
pool.run(other_blocking_io)) | ||
|
||
asyncio.run(main()) | ||
""" | ||
|
||
def __init__(self, concurrency=None): | ||
if concurrency is None: | ||
concurrency = min(32, (os.cpu_count() or 1) + 4) | ||
aeros marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
self._concurrency = concurrency | ||
self._running = False | ||
self._closed = False | ||
self._loop = None | ||
self._pool = None | ||
|
||
async def astart(self): | ||
self._loop = events.get_running_loop() | ||
await self._spawn_threadpool() | ||
|
||
async def __aenter__(self): | ||
await self.astart() | ||
return self | ||
|
||
async def aclose(self): | ||
await self._shutdown_threadpool() | ||
|
||
async def __aexit__(self, exc_type, exc_value, exc_traceback): | ||
await self.aclose() | ||
|
||
async def run(self, /, func, *args, **kwargs): | ||
aeros marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if not self._running: | ||
aeros marked this conversation as resolved.
Show resolved
Hide resolved
|
||
raise RuntimeError(f"unable to run {func!r}, " | ||
"threadpool is not running") | ||
|
||
func_call = functools.partial(func, *args, **kwargs) | ||
executor = self._pool | ||
return await futures.wrap_future( | ||
executor.submit(func_call), loop=self._loop) | ||
|
||
async def _spawn_threadpool(self): | ||
"""Schedule the spawning of the threadpool. | ||
|
||
Asynchronously spawns a threadpool with *concurrency* threads. | ||
""" | ||
if self._running: | ||
raise RuntimeError("threadpool is already running") | ||
|
||
if self._closed: | ||
raise RuntimeError("threadpool is closed") | ||
|
||
future = self._loop.create_future() | ||
thread = threading.Thread(target=self._do_spawn, args=(future,)) | ||
aeros marked this conversation as resolved.
Show resolved
Hide resolved
|
||
thread.start() | ||
try: | ||
await future | ||
finally: | ||
thread.join() | ||
|
||
def _do_spawn(self, future): | ||
try: | ||
self._pool = concurrent.futures.ThreadPoolExecutor( | ||
max_workers=self._concurrency) | ||
self._running = True | ||
self._loop.call_soon_threadsafe(future.set_result, None) | ||
except Exception as ex: | ||
self._loop.call_soon_threadsafe(future.set_exception, ex) | ||
|
||
async def _shutdown_threadpool(self): | ||
"""Schedule the shutdown of the threadpool. | ||
|
||
Asynchronously joins all of the threads in the threadpool. | ||
""" | ||
if self._closed: | ||
raise RuntimeError("threadpool is already closed") | ||
|
||
# Set _running to False as early as possible | ||
self._running = False | ||
|
||
future = self._loop.create_future() | ||
thread = threading.Thread(target=self._do_shutdown, args=(future,)) | ||
aeros marked this conversation as resolved.
Show resolved
Hide resolved
|
||
thread.start() | ||
try: | ||
await future | ||
finally: | ||
thread.join() | ||
|
||
def _do_shutdown(self, future): | ||
try: | ||
self._pool.shutdown() | ||
self._closed = True | ||
self._loop.call_soon_threadsafe(future.set_result, None) | ||
except Exception as ex: | ||
self._loop.call_soon_threadsafe(future.set_exception, ex) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.