diff --git a/news/f91d42b8-8277-4918-94eb-031bc7be1c3f.trivial b/news/f91d42b8-8277-4918-94eb-031bc7be1c3f.trivial new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/pip/_internal/utils/parallel.py b/src/pip/_internal/utils/parallel.py new file mode 100644 index 00000000000..9fe1fe8b9e4 --- /dev/null +++ b/src/pip/_internal/utils/parallel.py @@ -0,0 +1,107 @@ +"""Convenient parallelization of higher order functions. + +This module provides two helper functions, with appropriate fallbacks on +Python 2 and on systems lacking support for synchronization mechanisms: + +- map_multiprocess +- map_multithread + +These helpers work like Python 3's map, with two differences: + +- They don't guarantee the order of processing of + the elements of the iterable. +- The underlying process/thread pools chop the iterable into + a number of chunks, so that for very long iterables using + a large value for chunksize can make the job complete much faster + than using the default value of 1. +""" + +__all__ = ['map_multiprocess', 'map_multithread'] + +from contextlib import contextmanager +from multiprocessing import Pool as ProcessPool +from multiprocessing.dummy import Pool as ThreadPool + +from pip._vendor.requests.adapters import DEFAULT_POOLSIZE +from pip._vendor.six import PY2 +from pip._vendor.six.moves import map + +from pip._internal.utils.typing import MYPY_CHECK_RUNNING + +if MYPY_CHECK_RUNNING: + from typing import Callable, Iterable, Iterator, Union, TypeVar + from multiprocessing import pool + + Pool = Union[pool.Pool, pool.ThreadPool] + S = TypeVar('S') + T = TypeVar('T') + +# On platforms without sem_open, multiprocessing[.dummy] Pool +# cannot be created. +try: + import multiprocessing.synchronize # noqa +except ImportError: + LACK_SEM_OPEN = True +else: + LACK_SEM_OPEN = False + +# Incredibly large timeout to work around bpo-8296 on Python 2. +TIMEOUT = 2000000 + + +@contextmanager +def closing(pool): + # type: (Pool) -> Iterator[Pool] + """Return a context manager making sure the pool closes properly.""" + try: + yield pool + finally: + # For Pool.imap*, close and join are needed + # for the returned iterator to begin yielding. + pool.close() + pool.join() + pool.terminate() + + +def _map_fallback(func, iterable, chunksize=1): + # type: (Callable[[S], T], Iterable[S], int) -> Iterator[T] + """Make an iterator applying func to each element in iterable. + + This function is the sequential fallback either on Python 2 + where Pool.imap* doesn't react to KeyboardInterrupt + or when sem_open is unavailable. + """ + return map(func, iterable) + + +def _map_multiprocess(func, iterable, chunksize=1): + # type: (Callable[[S], T], Iterable[S], int) -> Iterator[T] + """Chop iterable into chunks and submit them to a process pool. + + For very long iterables using a large value for chunksize can make + the job complete much faster than using the default value of 1. + + Return an unordered iterator of the results. + """ + with closing(ProcessPool()) as pool: + return pool.imap_unordered(func, iterable, chunksize) + + +def _map_multithread(func, iterable, chunksize=1): + # type: (Callable[[S], T], Iterable[S], int) -> Iterator[T] + """Chop iterable into chunks and submit them to a thread pool. + + For very long iterables using a large value for chunksize can make + the job complete much faster than using the default value of 1. + + Return an unordered iterator of the results. + """ + with closing(ThreadPool(DEFAULT_POOLSIZE)) as pool: + return pool.imap_unordered(func, iterable, chunksize) + + +if LACK_SEM_OPEN or PY2: + map_multiprocess = map_multithread = _map_fallback +else: + map_multiprocess = _map_multiprocess + map_multithread = _map_multithread diff --git a/tests/unit/test_utils_parallel.py b/tests/unit/test_utils_parallel.py new file mode 100644 index 00000000000..bf42f6bd9e4 --- /dev/null +++ b/tests/unit/test_utils_parallel.py @@ -0,0 +1,66 @@ +"""Test multiprocessing/multithreading higher-order functions.""" + +from importlib import import_module +from math import factorial +from sys import modules + +from pip._vendor.six import PY2 +from pip._vendor.six.moves import map +from pytest import mark + +DUNDER_IMPORT = '__builtin__.__import__' if PY2 else 'builtins.__import__' +FUNC, ITERABLE = factorial, range(42) +MAPS = 'map_multiprocess', 'map_multithread' +_import = __import__ + + +def reload_parallel(): + try: + del modules['pip._internal.utils.parallel'] + except KeyError: + pass + return import_module('pip._internal.utils.parallel') + + +def lack_sem_open(name, *args, **kwargs): + """Raise ImportError on import of multiprocessing.synchronize.""" + if name.endswith('synchronize'): + raise ImportError + return _import(name, *args, **kwargs) + + +def have_sem_open(name, *args, **kwargs): + """Make sure multiprocessing.synchronize import is successful.""" + # We don't care about the return value + # since we don't use the pool with this import. + if name.endswith('synchronize'): + return + return _import(name, *args, **kwargs) + + +@mark.parametrize('name', MAPS) +def test_lack_sem_open(name, monkeypatch): + """Test fallback when sem_open is not available. + + If so, multiprocessing[.dummy].Pool will fail to be created and + map_async should fallback to map. + """ + monkeypatch.setattr(DUNDER_IMPORT, lack_sem_open) + parallel = reload_parallel() + assert getattr(parallel, name) is parallel._map_fallback + + +@mark.parametrize('name', MAPS) +def test_have_sem_open(name, monkeypatch): + """Test fallback when sem_open is available.""" + monkeypatch.setattr(DUNDER_IMPORT, have_sem_open) + parallel = reload_parallel() + impl = '_map_fallback' if PY2 else '_{}'.format(name) + assert getattr(parallel, name) is getattr(parallel, impl) + + +@mark.parametrize('name', MAPS) +def test_map(name): + """Test correctness of result of asynchronous maps.""" + map_async = getattr(reload_parallel(), name) + assert set(map_async(FUNC, ITERABLE)) == set(map(FUNC, ITERABLE))