diff --git a/docs/source/history.rst b/docs/source/history.rst index 1bcef84365..83292dd2c1 100644 --- a/docs/source/history.rst +++ b/docs/source/history.rst @@ -655,7 +655,7 @@ CPython, or PyPy3 5.9+. Other changes ~~~~~~~~~~~~~ -* :func:`run_sync_in_thread` now has a :ref:`robust mechanism +* ``run_sync_in_worker_thread`` now has a :ref:`robust mechanism for applying capacity limits to the number of concurrent threads ` (`#10 `__, `#57 diff --git a/docs/source/reference-core.rst b/docs/source/reference-core.rst index e6729a1a4c..f29f5ac8b5 100644 --- a/docs/source/reference-core.rst +++ b/docs/source/reference-core.rst @@ -1471,7 +1471,7 @@ In acknowledgment of this reality, Trio provides two useful utilities for working with real, operating-system level, :mod:`threading`\-module-style threads. First, if you're in Trio but need to push some blocking I/O into a thread, there's -:func:`run_sync_in_thread`. And if you're in a thread and need +`trio.to_thread.run_sync`. And if you're in a thread and need to communicate back with Trio, you can use :func:`trio.from_thread.run` and :func:`trio.from_thread.run_sync`. @@ -1494,7 +1494,7 @@ are spawned and the system gets overloaded and crashes. Instead, the N threads start executing the first N jobs, while the other (100,000 - N) jobs sit in a queue and wait their turn. Which is generally what you want, and this is how -:func:`trio.run_sync_in_thread` works by default. +:func:`trio.to_thread.run_sync` works by default. The downside of this kind of thread pool is that sometimes, you need more sophisticated logic for controlling how many threads are run at @@ -1541,16 +1541,16 @@ re-using threads, but has no admission control policy: if you give it responsible for providing the policy to make sure that this doesn't happen – but since it *only* has to worry about policy, it can be much simpler. In fact, all there is to it is the ``limiter=`` argument -passed to :func:`run_sync_in_thread`. This defaults to a global +passed to :func:`trio.to_thread.run_sync`. This defaults to a global :class:`CapacityLimiter` object, which gives us the classic fixed-size thread pool behavior. (See -:func:`current_default_thread_limiter`.) But if you want to use -"separate pools" for type A jobs and type B jobs, then it's just a -matter of creating two separate :class:`CapacityLimiter` objects and -passing them in when running these jobs. Or here's an example of -defining a custom policy that respects the global thread limit, while -making sure that no individual user can use more than 3 threads at a -time:: +:func:`trio.to_thread.current_default_thread_limiter`.) But if you +want to use "separate pools" for type A jobs and type B jobs, then +it's just a matter of creating two separate :class:`CapacityLimiter` +objects and passing them in when running these jobs. Or here's an +example of defining a custom policy that respects the global thread +limit, while making sure that no individual user can use more than 3 +threads at a time:: class CombinedLimiter: def __init__(self, first, second): @@ -1594,19 +1594,24 @@ time:: return combined_limiter - async def run_in_worker_thread_for_user(user_id, async_fn, *args, **kwargs): - # *args belong to async_fn; **kwargs belong to run_sync_in_thread + async def run_sync_in_thread_for_user(user_id, sync_fn, *args): kwargs["limiter"] = get_user_limiter(user_id) - return await trio.run_sync_in_thread(asycn_fn, *args, **kwargs) + return await trio.to_thread.run_sync(asycn_fn, *args) +.. module:: trio.to_thread +.. currentmodule:: trio + Putting blocking I/O into worker threads ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -.. autofunction:: run_sync_in_thread +.. autofunction:: trio.to_thread.run_sync + +.. autofunction:: trio.to_thread.current_default_thread_limiter -.. autofunction:: current_default_thread_limiter +.. module:: trio.from_thread +.. currentmodule:: trio Getting back into the Trio thread from another thread ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -1615,6 +1620,7 @@ Getting back into the Trio thread from another thread .. autofunction:: trio.from_thread.run_sync + This will probably be clearer with an example. Here we demonstrate how to spawn a child thread, and then use a :ref:`memory channel ` to send messages between the thread and a Trio task: diff --git a/docs/source/reference-core/from-thread-example.py b/docs/source/reference-core/from-thread-example.py index b8ea8f574a..71a75d67bf 100644 --- a/docs/source/reference-core/from-thread-example.py +++ b/docs/source/reference-core/from-thread-example.py @@ -23,7 +23,7 @@ async def main(): # In a background thread, run: # thread_fn(portal, receive_from_trio, send_to_trio) nursery.start_soon( - trio.run_sync_in_thread, thread_fn, receive_from_trio, send_to_trio + trio.to_thread.run_sync, thread_fn, receive_from_trio, send_to_trio ) # prints "1" diff --git a/docs/source/reference-hazmat.rst b/docs/source/reference-hazmat.rst index e12d360121..75e7d88d37 100644 --- a/docs/source/reference-hazmat.rst +++ b/docs/source/reference-hazmat.rst @@ -403,8 +403,8 @@ This logic is a bit convoluted, but accomplishes all of the following: loop outside of the ``except BlockingIOError:`` block. These functions can also be useful in other situations. For example, -when :func:`trio.run_sync_in_thread` schedules some work to run -in a worker thread, it blocks until the work is finished (so it's a +when :func:`trio.to_thread.run_sync` schedules some work to run in a +worker thread, it blocks until the work is finished (so it's a schedule point), but by default it doesn't allow cancellation. So to make sure that the call always acts as a checkpoint, it calls :func:`checkpoint_if_cancelled` before starting the thread. diff --git a/docs/source/reference-io.rst b/docs/source/reference-io.rst index 723ecd78e9..e7dc9f77f2 100644 --- a/docs/source/reference-io.rst +++ b/docs/source/reference-io.rst @@ -492,7 +492,7 @@ To understand why, you need to know two things. First, right now no mainstream operating system offers a generic, reliable, native API for async file or filesystem operations, so we have to fake it by using threads (specifically, -:func:`run_sync_in_thread`). This is cheap but isn't free: on a +:func:`trio.to_thread.run_sync`). This is cheap but isn't free: on a typical PC, dispatching to a worker thread adds something like ~100 µs of overhead to each operation. ("µs" is pronounced "microseconds", and there are 1,000,000 µs in a second. Note that all the numbers here are diff --git a/newsfragments/810.removal.rst b/newsfragments/810.removal.rst index 9ca960bcbc..7c3ff0a8b5 100644 --- a/newsfragments/810.removal.rst +++ b/newsfragments/810.removal.rst @@ -1,5 +1,5 @@ -``run_sync_in_worker_thread`` was too much of a mouthful – now it's -just called `run_sync_in_thread` (though the old name still works with -a deprecation warning, for now). Similarly, -``current_default_worker_thread_limiter`` is becoming -`current_default_thread_limiter`. +``run_sync_in_worker_thread`` has become `trio.to_thread.run_sync`, in +order to make it shorter, and more consistent with the new +``trio.from_thread``. And ``current_default_worker_thread_limiter`` is +now `trio.to_thread.current_default_thread_limiter`. (Of course the +old names still work with a deprecation warning, for now.) diff --git a/notes-to-self/blocking-read-hack.py b/notes-to-self/blocking-read-hack.py index 83dbf379a9..b301058e85 100644 --- a/notes-to-self/blocking-read-hack.py +++ b/notes-to-self/blocking-read-hack.py @@ -29,7 +29,7 @@ async def kill_it_after_timeout(new_fd): async with trio.open_nursery() as nursery: nursery.start_soon(kill_it_after_timeout, new_fd) try: - data = await trio.run_sync_in_thread(os.read, new_fd, count) + data = await trio.to_thread.run_sync(os.read, new_fd, count) except OSError as exc: if cancel_requested and exc.errno == errno.ENOTCONN: # Call was successfully cancelled. In a real version we'd diff --git a/notes-to-self/thread-dispatch-bench.py b/notes-to-self/thread-dispatch-bench.py index 02c8c0750f..1625efae17 100644 --- a/notes-to-self/thread-dispatch-bench.py +++ b/notes-to-self/thread-dispatch-bench.py @@ -2,7 +2,7 @@ # minimal a fashion as possible. # # This is useful to get a sense of the *lower-bound* cost of -# run_sync_in_thread +# trio.to_thread.run_sync import threading from queue import Queue diff --git a/trio/__init__.py b/trio/__init__.py index 9d9409bb97..ab75cd38b4 100644 --- a/trio/__init__.py +++ b/trio/__init__.py @@ -32,7 +32,6 @@ Event, CapacityLimiter, Semaphore, Lock, StrictFIFOLock, Condition ) -from ._threads import (run_sync_in_thread, current_default_thread_limiter) from ._threads import BlockingTrioPortal as _BlockingTrioPortal from ._highlevel_generic import aclose_forcefully, StapledStream @@ -67,12 +66,14 @@ from ._deprecate import TrioDeprecationWarning -# Imported by default +# Submodules imported by default from . import hazmat from . import socket from . import abc from . import from_thread -# Not imported by default: testing +from . import to_thread +# Not imported by default, but mentioned here so static analysis tools like +# pylint will know that it exists. if False: from . import testing @@ -104,13 +105,13 @@ ), "run_sync_in_worker_thread": _deprecate.DeprecatedAttribute( - run_sync_in_thread, + to_thread.run_sync, "0.12.0", issue=810, ), "current_default_worker_thread_limiter": _deprecate.DeprecatedAttribute( - current_default_thread_limiter, + to_thread.current_default_thread_limiter, "0.12.0", issue=810, ), @@ -163,6 +164,7 @@ fixup_module_metadata(socket.__name__, socket.__dict__) fixup_module_metadata(abc.__name__, abc.__dict__) fixup_module_metadata(from_thread.__name__, from_thread.__dict__) +fixup_module_metadata(to_thread.__name__, to_thread.__dict__) fixup_module_metadata(__name__ + ".ssl", _deprecated_ssl_reexports.__dict__) fixup_module_metadata( __name__ + ".subprocess", _deprecated_subprocess_reexports.__dict__ diff --git a/trio/_core/_entry_queue.py b/trio/_core/_entry_queue.py index 5af07fb7a5..97b1c56fa4 100644 --- a/trio/_core/_entry_queue.py +++ b/trio/_core/_entry_queue.py @@ -131,10 +131,10 @@ class TrioToken: This object has two uses: 1. It lets you re-enter the Trio run loop from external threads or signal - handlers. This is the low-level primitive that - :func:`trio.run_sync_in_thread` uses to receive results from - worker threads, that :func:`trio.open_signal_receiver` uses to receive - notifications about signals, and so forth. + handlers. This is the low-level primitive that :func:`trio.to_thread` + and `trio.from_thread` use to communicate with worker threads, that + `trio.open_signal_receiver` uses to receive notifications about + signals, and so forth. 2. Each call to :func:`trio.run` has exactly one associated :class:`TrioToken` object, so you can use it to identify a particular diff --git a/trio/_core/_traps.py b/trio/_core/_traps.py index 68451ed176..a24a2f1742 100644 --- a/trio/_core/_traps.py +++ b/trio/_core/_traps.py @@ -111,7 +111,7 @@ def abort_func(raise_cancel): At that point there are again two possibilities. You can simply ignore the cancellation altogether: wait for the operation to complete and then reschedule and continue as normal. (For example, this is what - :func:`trio.run_sync_in_thread` does if cancellation is disabled.) + :func:`trio.to_thread.run_sync` does if cancellation is disabled.) The other possibility is that the ``abort_func`` does succeed in cancelling the operation, but for some reason isn't able to report that right away. (Example: on Windows, it's possible to request that an diff --git a/trio/_core/tests/test_run.py b/trio/_core/tests/test_run.py index 020e4e4586..7d8d6ae4a9 100644 --- a/trio/_core/tests/test_run.py +++ b/trio/_core/tests/test_run.py @@ -17,7 +17,7 @@ from .tutil import check_sequence_matches, gc_collect_harder from ... import _core -from ..._threads import run_sync_in_thread +from ..._threads import to_thread_run_sync from ..._timeouts import sleep, fail_after from ..._util import aiter_compat from ...testing import ( @@ -552,7 +552,7 @@ async def test_cancel_scope_repr(mock_clock): scope.deadline = _core.current_time() + 10 assert "deadline is 10.00 seconds from now" in repr(scope) # when not in async context, can't get the current time - assert "deadline" not in await run_sync_in_thread(repr, scope) + assert "deadline" not in await to_thread_run_sync(repr, scope) scope.cancel() assert "cancelled" in repr(scope) assert "exited" in repr(scope) diff --git a/trio/_file_io.py b/trio/_file_io.py index 14f83397c9..3c3b6bf1f5 100644 --- a/trio/_file_io.py +++ b/trio/_file_io.py @@ -53,7 +53,7 @@ class AsyncIOWrapper(AsyncResource): """A generic :class:`~io.IOBase` wrapper that implements the :term:`asynchronous file object` interface. Wrapped methods that could block are executed in - :meth:`trio.run_sync_in_thread`. + :meth:`trio.to_thread.run_sync`. All properties and methods defined in in :mod:`~io` are exposed by this wrapper, if they exist in the wrapped file object. @@ -80,7 +80,7 @@ def __getattr__(self, name): @async_wraps(self.__class__, self._wrapped.__class__, name) async def wrapper(*args, **kwargs): func = partial(meth, *args, **kwargs) - return await trio.run_sync_in_thread(func) + return await trio.to_thread.run_sync(func) # cache the generated method setattr(self, name, wrapper) @@ -115,7 +115,7 @@ async def detach(self): """ - raw = await trio.run_sync_in_thread(self._wrapped.detach) + raw = await trio.to_thread.run_sync(self._wrapped.detach) return wrap_file(raw) async def aclose(self): @@ -128,7 +128,7 @@ async def aclose(self): # ensure the underling file is closed during cancellation with trio.CancelScope(shield=True): - await trio.run_sync_in_thread(self._wrapped.close) + await trio.to_thread.run_sync(self._wrapped.close) await trio.hazmat.checkpoint_if_cancelled() @@ -165,7 +165,7 @@ async def open_file( file = fspath(file) _file = wrap_file( - await trio.run_sync_in_thread( + await trio.to_thread.run_sync( io.open, file, mode, buffering, encoding, errors, newline, closefd, opener ) diff --git a/trio/_path.py b/trio/_path.py index 369da73d25..5419a0bce2 100644 --- a/trio/_path.py +++ b/trio/_path.py @@ -58,7 +58,7 @@ def iter_wrapper_factory(cls, meth_name): async def wrapper(self, *args, **kwargs): meth = getattr(self._wrapped, meth_name) func = partial(meth, *args, **kwargs) - items = await trio.run_sync_in_thread(func) + items = await trio.to_thread.run_sync(func) return (rewrap_path(item) for item in items) return wrapper @@ -70,7 +70,7 @@ async def wrapper(self, *args, **kwargs): args = unwrap_paths(args) meth = getattr(self._wrapped, meth_name) func = partial(meth, *args, **kwargs) - value = await trio.run_sync_in_thread(func) + value = await trio.to_thread.run_sync(func) return rewrap_path(value) return wrapper @@ -83,7 +83,7 @@ async def wrapper(cls, *args, **kwargs): args = unwrap_paths(args) meth = getattr(cls._wraps, meth_name) func = partial(meth, *args, **kwargs) - value = await trio.run_sync_in_thread(func) + value = await trio.to_thread.run_sync(func) return rewrap_path(value) return wrapper @@ -145,7 +145,7 @@ def generate_iter(cls, attrs): class Path(metaclass=AsyncAutoWrapperType): """A :class:`pathlib.Path` wrapper that executes blocking methods in - :meth:`trio.run_sync_in_thread`. + :meth:`trio.to_thread.run_sync`. """ @@ -185,7 +185,7 @@ async def open(self, *args, **kwargs): """ func = partial(self._wrapped.open, *args, **kwargs) - value = await trio.run_sync_in_thread(func) + value = await trio.to_thread.run_sync(func) return trio.wrap_file(value) diff --git a/trio/_socket.py b/trio/_socket.py index 0e257849b0..805bfd4440 100644 --- a/trio/_socket.py +++ b/trio/_socket.py @@ -7,7 +7,6 @@ import idna as _idna import trio -from ._threads import run_sync_in_thread from ._util import fspath from . import _core @@ -179,7 +178,7 @@ def numeric_only_failure(exc): if hr is not None: return await hr.getaddrinfo(host, port, family, type, proto, flags) else: - return await run_sync_in_thread( + return await trio.to_thread.run_sync( _stdlib_socket.getaddrinfo, host, port, @@ -205,7 +204,7 @@ async def getnameinfo(sockaddr, flags): if hr is not None: return await hr.getnameinfo(sockaddr, flags) else: - return await run_sync_in_thread( + return await trio.to_thread.run_sync( _stdlib_socket.getnameinfo, sockaddr, flags, cancellable=True ) @@ -216,7 +215,7 @@ async def getprotobyname(name): Like :func:`socket.getprotobyname`, but async. """ - return await run_sync_in_thread( + return await trio.to_thread.run_sync( _stdlib_socket.getprotobyname, name, cancellable=True ) @@ -464,7 +463,7 @@ async def bind(self, address): ): # Use a thread for the filesystem traversal (unless it's an # abstract domain socket) - return await run_sync_in_thread(self._sock.bind, address) + return await trio.to_thread.run_sync(self._sock.bind, address) else: # POSIX actually says that bind can return EWOULDBLOCK and # complete asynchronously, like connect. But in practice AFAICT diff --git a/trio/_subprocess_platform/waitid.py b/trio/_subprocess_platform/waitid.py index d411265427..030c546f88 100644 --- a/trio/_subprocess_platform/waitid.py +++ b/trio/_subprocess_platform/waitid.py @@ -5,7 +5,7 @@ from .. import _core, _subprocess from .._sync import CapacityLimiter, Event -from .._threads import run_sync_in_thread +from .._threads import to_thread_run_sync try: from os import waitid @@ -74,7 +74,7 @@ async def _waitid_system_task(pid: int, event: Event) -> None: # call to trio.run is shutting down. try: - await run_sync_in_thread( + await to_thread_run_sync( sync_wait_reapable, pid, cancellable=True, diff --git a/trio/_sync.py b/trio/_sync.py index 95b4cb67d5..cd6c419d27 100644 --- a/trio/_sync.py +++ b/trio/_sync.py @@ -153,9 +153,9 @@ class CapacityLimiter: fixed number of seats, and if they're all taken then you have to wait for someone to get up before you can sit down. - By default, :func:`run_sync_in_thread` uses a + By default, :func:`trio.to_thread.run_sync` uses a :class:`CapacityLimiter` to limit the number of threads running at once; - see :func:`current_default_thread_limiter` for details. + see `trio.to_thread.current_default_thread_limiter` for details. If you're familiar with semaphores, then you can think of this as a restricted semaphore that's specialized for one common use case, with @@ -256,7 +256,7 @@ def acquire_on_behalf_of_nowait(self, borrower): Args: borrower: A :class:`trio.hazmat.Task` or arbitrary opaque object used to record who is borrowing this token. This is used by - :func:`run_sync_in_thread` to allow threads to "hold + :func:`trio.to_thread.run_sync` to allow threads to "hold tokens", with the intention in the future of using it to `allow deadlock detection and other useful things `__ diff --git a/trio/_threads.py b/trio/_threads.py index d4a064fb32..e49403be9b 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -10,12 +10,6 @@ from ._sync import CapacityLimiter from ._core import enable_ki_protection, disable_ki_protection, RunVar, TrioToken -__all__ = [ - "run_sync_in_thread", - "current_default_thread_limiter", - "BlockingTrioPortal", -] - # Global due to Threading API, thread local storage for trio token TOKEN_LOCAL = threading.local() @@ -27,10 +21,10 @@ def __init__(self, trio_token=None): self._trio_token = trio_token def run(self, afn, *args): - return run(afn, *args, trio_token=self._trio_token) + return from_thread_run(afn, *args, trio_token=self._trio_token) def run_sync(self, fn, *args): - return run_sync(fn, *args, trio_token=self._trio_token) + return from_thread_run_sync(fn, *args, trio_token=self._trio_token) ################################################################ @@ -141,11 +135,11 @@ def run_sync(self, fn, *args): def current_default_thread_limiter(): - """Get the default :class:`CapacityLimiter` used by - :func:`run_sync_in_thread`. + """Get the default `~trio.CapacityLimiter` used by + `trio.to_thread.run_sync`. The most common reason to call this would be if you want to modify its - :attr:`~CapacityLimiter.total_tokens` attribute. + :attr:`~trio.CapacityLimiter.total_tokens` attribute. """ try: @@ -166,24 +160,21 @@ class ThreadPlaceholder: @enable_ki_protection -async def run_sync_in_thread(sync_fn, *args, cancellable=False, limiter=None): +async def to_thread_run_sync(sync_fn, *args, cancellable=False, limiter=None): """Convert a blocking operation into an async operation using a thread. These two lines are equivalent:: sync_fn(*args) - await run_sync_in_thread(sync_fn, *args) + await trio.to_thread.run_sync(sync_fn, *args) except that if ``sync_fn`` takes a long time, then the first line will block the Trio loop while it runs, while the second line allows other Trio tasks to continue working while ``sync_fn`` runs. This is accomplished by pushing the call to ``sync_fn(*args)`` off into a worker thread. - ``run_sync_in_thread`` also injects the current ``TrioToken`` into the - spawned thread's local storage so that these threads can re-enter the Trio - loop by calling either `trio.from_thread.run` or - `trio.from_thread.run_sync` for async or synchronous functions, - respectively. + From inside the worker thread, you can get back into Trio using the + functions in `trio.from_thread`. Args: sync_fn: An arbitrary synchronous callable. @@ -191,61 +182,54 @@ async def run_sync_in_thread(sync_fn, *args, cancellable=False, limiter=None): arguments, use :func:`functools.partial`. cancellable (bool): Whether to allow cancellation of this operation. See discussion below. - limiter (None, CapacityLimiter, or CapacityLimiter-like object): + limiter (None, or CapacityLimiter-like object): An object used to limit the number of simultaneous threads. Most - commonly this will be a :class:`CapacityLimiter`, but it could be + commonly this will be a `~trio.CapacityLimiter`, but it could be anything providing compatible :meth:`~trio.CapacityLimiter.acquire_on_behalf_of` and - :meth:`~trio.CapacityLimiter.release_on_behalf_of` - methods. :func:`run_sync_in_thread` will call - ``acquire_on_behalf_of`` before starting the thread, and - ``release_on_behalf_of`` after the thread has finished. + :meth:`~trio.CapacityLimiter.release_on_behalf_of` methods. This + function will call ``acquire_on_behalf_of`` before starting the + thread, and ``release_on_behalf_of`` after the thread has finished. - If None (the default), uses the default :class:`CapacityLimiter`, as + If None (the default), uses the default `~trio.CapacityLimiter`, as returned by :func:`current_default_thread_limiter`. **Cancellation handling**: Cancellation is a tricky issue here, because neither Python nor the operating systems it runs on provide any general mechanism for cancelling an arbitrary synchronous function running in a - thread. :func:`run_sync_in_thread` will always check for - cancellation on entry, before starting the thread. But once the thread is - running, there are two ways it can handle being cancelled: + thread. This function will always check for cancellation on entry, before + starting the thread. But once the thread is running, there are two ways it + can handle being cancelled: * If ``cancellable=False``, the function ignores the cancellation and keeps going, just like if we had called ``sync_fn`` synchronously. This is the default behavior. - * If ``cancellable=True``, then ``run_sync_in_thread`` immediately - raises :exc:`Cancelled`. In this case **the thread keeps running in + * If ``cancellable=True``, then this function immediately raises + `~trio.Cancelled`. In this case **the thread keeps running in background** – we just abandon it to do whatever it's going to do, and silently discard any return value or errors that it raises. Only use this if you know that the operation is safe and side-effect free. (For - example: :func:`trio.socket.getaddrinfo` is implemented using - :func:`run_sync_in_thread`, and it sets ``cancellable=True`` - because it doesn't really affect anything if a stray hostname lookup - keeps running in the background.) + example: :func:`trio.socket.getaddrinfo` is uses a thread with + ``cancellable=True``, because it doesn't really affect anything if a + stray hostname lookup keeps running in the background.) The ``limiter`` is only released after the thread has *actually* - finished – which in the case of cancellation may be some time after - :func:`run_sync_in_thread` has returned. (This is why it's - crucial that :func:`run_sync_in_thread` takes care of acquiring - and releasing the limiter.) If :func:`trio.run` finishes before the - thread does, then the limiter release method will never be called at - all. + finished – which in the case of cancellation may be some time after this + function has returned. If :func:`trio.run` finishes before the thread + does, then the limiter release method will never be called at all. .. warning:: - You should not use :func:`run_sync_in_thread` to call - long-running CPU-bound functions! In addition to the usual GIL-related - reasons why using threads for CPU-bound work is not very effective in - Python, there is an additional problem: on CPython, `CPU-bound threads - tend to "starve out" IO-bound threads - `__, so using - :func:`run_sync_in_thread` for CPU-bound work is likely to - adversely affect the main thread running Trio. If you need to do this, - you're better off using a worker process, or perhaps PyPy (which still - has a GIL, but may do a better job of fairly allocating CPU time - between threads). + You should not use this function to call long-running CPU-bound + functions! In addition to the usual GIL-related reasons why using + threads for CPU-bound work is not very effective in Python, there is an + additional problem: on CPython, `CPU-bound threads tend to "starve out" + IO-bound threads `__, so using + threads for CPU-bound work is likely to adversely affect the main + thread running Trio. If you need to do this, you're better off using a + worker process, or perhaps PyPy (which still has a GIL, but may do a + better job of fairly allocating CPU time between threads). Returns: Whatever ``sync_fn(*args)`` returns. @@ -362,7 +346,7 @@ def _run_fn_as_system_task(cb, fn, *args, trio_token=None): return q.get().unwrap() -def run(afn, *args, trio_token=None): +def from_thread_run(afn, *args, trio_token=None): """Run the given async function in the parent Trio thread, blocking until it is complete. @@ -380,19 +364,19 @@ def run(afn, *args, trio_token=None): :exc:`trio.Cancelled`, and this will propagate out into RuntimeError: if you try calling this from inside the Trio thread, which would otherwise cause a deadlock. - AttributeError: if run()'s thread local storage does not have a token. - This happens when it was not spawned from trio.run_sync_in_thread. + AttributeError: if no ``trio_token`` was provided, and we can't infer + one from context. **Locating a Trio Token**: There are two ways to specify which `trio.run` loop to reenter: - - Spawn this thread from `trio.run_sync_in_thread`. This will - "inject" the current Trio Token into thread local storage and allow - this function to re-enter the same `trio.run` loop. + - Spawn this thread from `trio.to_thread.run_sync`. Trio will + automatically capture the relevant Trio token and use it when you + want to re-enter Trio. - Pass a keyword argument, ``trio_token`` specifiying a specific - `trio.run` loop to re-enter. This is the "legacy" way of - re-entering a trio thread and is similar to the old - ``BlockingTrioPortal``. + `trio.run` loop to re-enter. This is useful in case you have a + "foreign" thread, spawned using some other framework, and still want + to enter Trio. """ def callback(q, afn, args): @@ -408,7 +392,7 @@ async def await_in_trio_thread_task(): return _run_fn_as_system_task(callback, afn, *args, trio_token=trio_token) -def run_sync(fn, *args, trio_token=None): +def from_thread_run_sync(fn, *args, trio_token=None): """Run the given sync function in the parent Trio thread, blocking until it is complete. @@ -426,19 +410,19 @@ def run_sync(fn, *args, trio_token=None): :exc:`trio.Cancelled`, and this will propagate out into RuntimeError: if you try calling this from inside the Trio thread, which would otherwise cause a deadlock. - AttributeError: if run()'s thread local storage does not have a token. - This happens when it was not spawned from trio.run_sync_in_thread. + AttributeError: if no ``trio_token`` was provided, and we can't infer + one from context. **Locating a Trio Token**: There are two ways to specify which `trio.run` loop to reenter: - - Spawn this thread from `trio.run_sync_in_thread`. This will - "inject" the current Trio Token into thread local storage and allow - this function to re-enter the same `trio.run` loop. + - Spawn this thread from `trio.to_thread.run_sync`. Trio will + automatically capture the relevant Trio token and use it when you + want to re-enter Trio. - Pass a keyword argument, ``trio_token`` specifiying a specific - `trio.run` loop to re-enter. This is the "legacy" way of - re-entering a trio thread and is similar to the old - ``BlockingTrioPortal``. + `trio.run` loop to re-enter. This is useful in case you have a + "foreign" thread, spawned using some other framework, and still want + to enter Trio. """ def callback(q, fn, args): diff --git a/trio/_wait_for_object.py b/trio/_wait_for_object.py index 9feab9e03a..dfbf47d8a3 100644 --- a/trio/_wait_for_object.py +++ b/trio/_wait_for_object.py @@ -32,7 +32,7 @@ async def WaitForSingleObject(obj): # that we can use to cancel the thread. cancel_handle = kernel32.CreateEventA(ffi.NULL, True, False, ffi.NULL) try: - await trio.run_sync_in_thread( + await trio.to_thread.run_sync( WaitForMultipleObjects_sync, handle, cancel_handle, diff --git a/trio/from_thread.py b/trio/from_thread.py index 745bcbeafd..296a5a89ea 100644 --- a/trio/from_thread.py +++ b/trio/from_thread.py @@ -3,4 +3,5 @@ an external thread by means of a Trio Token present in Thread Local Storage """ -from ._threads import (run_sync, run) +from ._threads import from_thread_run as run +from ._threads import from_thread_run_sync as run_sync diff --git a/trio/tests/test_signals.py b/trio/tests/test_signals.py index 8121bc861f..7ae930403c 100644 --- a/trio/tests/test_signals.py +++ b/trio/tests/test_signals.py @@ -55,7 +55,7 @@ async def naughty(): pass # pragma: no cover with pytest.raises(RuntimeError): - await trio.run_sync_in_thread(trio.run, naughty) + await trio.to_thread.run_sync(trio.run, naughty) async def test_open_signal_receiver_conflict(): diff --git a/trio/tests/test_ssl.py b/trio/tests/test_ssl.py index 8178f396fd..b80d89625b 100644 --- a/trio/tests/test_ssl.py +++ b/trio/tests/test_ssl.py @@ -114,7 +114,7 @@ async def ssl_echo_server_raw(**kwargs): # nursery context manager to exit too. with a, b: nursery.start_soon( - trio.run_sync_in_thread, + trio.to_thread.run_sync, partial(ssl_echo_serve_sync, b, **kwargs) ) diff --git a/trio/tests/test_threads.py b/trio/tests/test_threads.py index 306af92c27..b9991a9dcf 100644 --- a/trio/tests/test_threads.py +++ b/trio/tests/test_threads.py @@ -7,8 +7,10 @@ from .. import _core from .. import Event, CapacityLimiter, sleep from ..testing import wait_all_tasks_blocked -from .._threads import * -from .._threads import run, run_sync # Not in __all__, must import explicitly +from .._threads import ( + to_thread_run_sync, current_default_thread_limiter, from_thread_run, + from_thread_run_sync, BlockingTrioPortal +) from .._core.tests.test_ki import ki_self from .._core.tests.tutil import slow @@ -45,14 +47,16 @@ def f(record): record.append(("f", threading.current_thread())) return 2 - await check_case(run_sync, f, ("got", 2), trio_token=token) + await check_case(from_thread_run_sync, f, ("got", 2), trio_token=token) def f(record): assert not _core.currently_ki_protected() record.append(("f", threading.current_thread())) raise ValueError - await check_case(run_sync, f, ("error", ValueError), trio_token=token) + await check_case( + from_thread_run_sync, f, ("error", ValueError), trio_token=token + ) async def f(record): assert not _core.currently_ki_protected() @@ -60,7 +64,7 @@ async def f(record): record.append(("f", threading.current_thread())) return 3 - await check_case(run, f, ("got", 3), trio_token=token) + await check_case(from_thread_run, f, ("got", 3), trio_token=token) async def f(record): assert not _core.currently_ki_protected() @@ -68,18 +72,18 @@ async def f(record): record.append(("f", threading.current_thread())) raise KeyError - await check_case(run, f, ("error", KeyError), trio_token=token) + await check_case(from_thread_run, f, ("error", KeyError), trio_token=token) async def test_do_in_trio_thread_from_trio_thread(): with pytest.raises(RuntimeError): - run_sync(lambda: None) # pragma: no branch + from_thread_run_sync(lambda: None) # pragma: no branch async def foo(): # pragma: no cover pass with pytest.raises(RuntimeError): - run(foo) + from_thread_run(foo) def test_run_in_trio_thread_ki(): @@ -106,12 +110,12 @@ async def trio_thread_afn(): def external_thread_fn(): try: print("running") - run_sync(trio_thread_fn, trio_token=token) + from_thread_run_sync(trio_thread_fn, trio_token=token) except KeyboardInterrupt: print("ok1") record.add("ok1") try: - run(trio_thread_afn, trio_token=token) + from_thread_run(trio_thread_afn, trio_token=token) except KeyboardInterrupt: print("ok2") record.add("ok2") @@ -140,7 +144,7 @@ async def trio_fn(): def thread_fn(token): try: - run(trio_fn, trio_token=token) + from_thread_run(trio_fn, trio_token=token) except _core.Cancelled: record.append("cancelled") @@ -163,7 +167,7 @@ async def test_run_in_worker_thread(): def f(x): return (x, threading.current_thread()) - x, child_thread = await run_sync_in_thread(f, 1) + x, child_thread = await to_thread_run_sync(f, 1) assert x == 1 assert child_thread != trio_thread @@ -171,7 +175,7 @@ def g(): raise ValueError(threading.current_thread()) with pytest.raises(ValueError) as excinfo: - await run_sync_in_thread(g) + await to_thread_run_sync(g) print(excinfo.value.args) assert excinfo.value.args[0] != trio_thread @@ -188,7 +192,7 @@ def f(q): async def child(q, cancellable): record.append("start") try: - return await run_sync_in_thread(f, q, cancellable=cancellable) + return await to_thread_run_sync(f, q, cancellable=cancellable) finally: record.append("exit") @@ -197,7 +201,7 @@ async def child(q, cancellable): async with _core.open_nursery() as nursery: nursery.start_soon(child, q, True) # Give it a chance to get started. (This is important because - # run_sync_in_thread does a checkpoint_if_cancelled before + # to_thread_run_sync does a checkpoint_if_cancelled before # blocking on the thread, and we don't want to trigger this.) await wait_all_tasks_blocked() assert record == ["start"] @@ -246,7 +250,7 @@ def thread_fn(): async def main(): async def child(): - await run_sync_in_thread(thread_fn, cancellable=True) + await to_thread_run_sync(thread_fn, cancellable=True) async with _core.open_nursery() as nursery: nursery.start_soon(child) @@ -275,7 +279,7 @@ async def test_run_in_worker_thread_limiter(MAX, cancel, use_default_limiter): # This test is a bit tricky. The goal is to make sure that if we set # limiter=CapacityLimiter(MAX), then in fact only MAX threads are ever # running at a time, even if there are more concurrent calls to - # run_sync_in_thread, and even if some of those are cancelled. And + # to_thread_run_sync, and even if some of those are cancelled. And # also to make sure that the default limiter actually limits. COUNT = 2 * MAX gate = threading.Event() @@ -312,7 +316,7 @@ class state: def thread_fn(cancel_scope): print("thread_fn start") - run_sync(cancel_scope.cancel, trio_token=token) + from_thread_run_sync(cancel_scope.cancel, trio_token=token) with lock: state.ran += 1 state.running += 1 @@ -328,7 +332,7 @@ def thread_fn(cancel_scope): async def run_thread(event): with _core.CancelScope() as cancel_scope: - await run_sync_in_thread( + await to_thread_run_sync( thread_fn, cancel_scope, limiter=limiter_arg, @@ -395,7 +399,7 @@ def release_on_behalf_of(self, borrower): record.append("release") assert borrower == self._borrower - await run_sync_in_thread(lambda: None, limiter=CustomLimiter()) + await to_thread_run_sync(lambda: None, limiter=CustomLimiter()) assert record == ["acquire", "release"] @@ -413,7 +417,7 @@ def release_on_behalf_of(self, borrower): bs = BadCapacityLimiter() with pytest.raises(ValueError) as excinfo: - await run_sync_in_thread(lambda: None, limiter=bs) + await to_thread_run_sync(lambda: None, limiter=bs) assert excinfo.value.__context__ is None assert record == ["acquire", "release"] record = [] @@ -422,7 +426,7 @@ def release_on_behalf_of(self, borrower): # chains with it d = {} with pytest.raises(ValueError) as excinfo: - await run_sync_in_thread(lambda: d["x"], limiter=bs) + await to_thread_run_sync(lambda: d["x"], limiter=bs) assert isinstance(excinfo.value.__context__, KeyError) assert record == ["acquire", "release"] @@ -439,37 +443,37 @@ def bad_start(self): # We get an appropriate error, and the limiter is cleanly released with pytest.raises(RuntimeError) as excinfo: - await run_sync_in_thread(lambda: None) # pragma: no cover + await to_thread_run_sync(lambda: None) # pragma: no cover assert "engines" in str(excinfo.value) assert limiter.borrowed_tokens == 0 -async def test_trio_run_sync_in_thread_token(): - # Test that run_sync_in_thread automatically injects the current trio token +async def test_trio_to_thread_run_sync_token(): + # Test that to_thread_run_sync automatically injects the current trio token # into a spawned thread def thread_fn(): - callee_token = run_sync(_core.current_trio_token) + callee_token = from_thread_run_sync(_core.current_trio_token) return callee_token caller_token = _core.current_trio_token() - callee_token = await run_sync_in_thread(thread_fn) + callee_token = await to_thread_run_sync(thread_fn) assert callee_token == caller_token async def test_trio_from_thread_run_sync(): - # Test that run_sync_in_thread correctly "hands off" the trio token to + # Test that to_thread_run_sync correctly "hands off" the trio token to # trio.from_thread.run_sync() def thread_fn(): - trio_time = run_sync(_core.current_time) + trio_time = from_thread_run_sync(_core.current_time) return trio_time - trio_time = await run_sync_in_thread(thread_fn) + trio_time = await to_thread_run_sync(thread_fn) assert isinstance(trio_time, float) async def test_trio_from_thread_run(): - # Test that run_sync_in_thread correctly "hands off" the trio token to + # Test that to_thread_run_sync correctly "hands off" the trio token to # trio.from_thread.run() record = [] @@ -479,33 +483,35 @@ async def back_in_trio_fn(): def thread_fn(): record.append("in thread") - run(back_in_trio_fn) + from_thread_run(back_in_trio_fn) - await run_sync_in_thread(thread_fn) + await to_thread_run_sync(thread_fn) assert record == ["in thread", "back in trio"] async def test_trio_from_thread_token(): - # Test that run_sync_in_thread and spawned trio.from_thread.run_sync() + # Test that to_thread_run_sync and spawned trio.from_thread.run_sync() # share the same Trio token def thread_fn(): - callee_token = run_sync(_core.current_trio_token) + callee_token = from_thread_run_sync(_core.current_trio_token) return callee_token caller_token = _core.current_trio_token() - callee_token = await run_sync_in_thread(thread_fn) + callee_token = await to_thread_run_sync(thread_fn) assert callee_token == caller_token async def test_trio_from_thread_token_kwarg(): - # Test that run_sync_in_thread and spawned trio.from_thread.run_sync() can + # Test that to_thread_run_sync and spawned trio.from_thread.run_sync() can # use an explicitly defined token def thread_fn(token): - callee_token = run_sync(_core.current_trio_token, trio_token=token) + callee_token = from_thread_run_sync( + _core.current_trio_token, trio_token=token + ) return callee_token caller_token = _core.current_trio_token() - callee_token = await run_sync_in_thread(thread_fn, caller_token) + callee_token = await to_thread_run_sync(thread_fn, caller_token) assert callee_token == caller_token @@ -514,12 +520,14 @@ async def test_from_thread_no_token(): # has been provided with pytest.raises(RuntimeError): - run_sync(_core.current_time) + from_thread_run_sync(_core.current_time) def test_run_fn_as_system_task_catched_badly_typed_token(): with pytest.raises(RuntimeError): - run_sync(_core.current_time, trio_token="Not TrioTokentype") + from_thread_run_sync( + _core.current_time, trio_token="Not TrioTokentype" + ) async def test_do_in_trio_thread_from_trio_thread_legacy(): @@ -550,5 +558,5 @@ def worker_thread(token): portal = BlockingTrioPortal(token) return portal.run_sync(threading.current_thread) - t = await run_sync_in_thread(worker_thread, token) + t = await to_thread_run_sync(worker_thread, token) assert t == threading.current_thread() diff --git a/trio/tests/test_util.py b/trio/tests/test_util.py index 1b8af8beff..942e662e1a 100644 --- a/trio/tests/test_util.py +++ b/trio/tests/test_util.py @@ -5,8 +5,8 @@ import pytest +import trio from .. import _core -from trio import run_sync_in_thread from .._util import ( signal_raise, ConflictDetector, fspath, is_main_thread, generic_function, Final, NoPublicConstructor @@ -160,7 +160,7 @@ async def test_is_main_thread(): def not_main_thread(): assert not is_main_thread() - await run_sync_in_thread(not_main_thread) + await trio.to_thread.run_sync(not_main_thread) def test_generic_function(): diff --git a/trio/tests/test_wait_for_object.py b/trio/tests/test_wait_for_object.py index f98730a0ed..3c3830ea39 100644 --- a/trio/tests/test_wait_for_object.py +++ b/trio/tests/test_wait_for_object.py @@ -7,12 +7,12 @@ pytestmark = pytest.mark.skipif(not on_windows, reason="windows only") from .._core.tests.tutil import slow +import trio from .. import _core from .. import _timeouts if on_windows: from .._core._windows_cffi import ffi, kernel32 from .._wait_for_object import WaitForSingleObject, WaitForMultipleObjects_sync - from trio import run_sync_in_thread async def test_WaitForMultipleObjects_sync(): @@ -80,7 +80,7 @@ async def test_WaitForMultipleObjects_sync_slow(): t0 = _core.current_time() async with _core.open_nursery() as nursery: nursery.start_soon( - run_sync_in_thread, WaitForMultipleObjects_sync, handle1 + trio.to_thread.run_sync, WaitForMultipleObjects_sync, handle1 ) await _timeouts.sleep(TIMEOUT) # If we would comment the line below, the above thread will be stuck, @@ -97,7 +97,8 @@ async def test_WaitForMultipleObjects_sync_slow(): t0 = _core.current_time() async with _core.open_nursery() as nursery: nursery.start_soon( - run_sync_in_thread, WaitForMultipleObjects_sync, handle1, handle2 + trio.to_thread.run_sync, WaitForMultipleObjects_sync, handle1, + handle2 ) await _timeouts.sleep(TIMEOUT) kernel32.SetEvent(handle1) @@ -113,7 +114,8 @@ async def test_WaitForMultipleObjects_sync_slow(): t0 = _core.current_time() async with _core.open_nursery() as nursery: nursery.start_soon( - run_sync_in_thread, WaitForMultipleObjects_sync, handle1, handle2 + trio.to_thread.run_sync, WaitForMultipleObjects_sync, handle1, + handle2 ) await _timeouts.sleep(TIMEOUT) kernel32.SetEvent(handle2) diff --git a/trio/to_thread.py b/trio/to_thread.py new file mode 100644 index 0000000000..6eec7b36c7 --- /dev/null +++ b/trio/to_thread.py @@ -0,0 +1,2 @@ +from ._threads import to_thread_run_sync as run_sync +from ._threads import current_default_thread_limiter