Skip to content

Add utilities for parallelization #8320

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
107 changes: 107 additions & 0 deletions src/pip/_internal/utils/parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""Convenient parallelization of higher order functions.

This module provides two helper functions, with appropriate fallbacks on
Python 2 and on systems lacking support for synchronization mechanisms:

- map_multiprocess
- map_multithread

These helpers work like Python 3's map, with two differences:

- They don't guarantee the order of processing of
the elements of the iterable.
- The underlying process/thread pools chop the iterable into
a number of chunks, so that for very long iterables using
a large value for chunksize can make the job complete much faster
than using the default value of 1.
"""

__all__ = ['map_multiprocess', 'map_multithread']

from contextlib import contextmanager
from multiprocessing import Pool as ProcessPool
from multiprocessing.dummy import Pool as ThreadPool

from pip._vendor.requests.adapters import DEFAULT_POOLSIZE
from pip._vendor.six import PY2
from pip._vendor.six.moves import map

from pip._internal.utils.typing import MYPY_CHECK_RUNNING

if MYPY_CHECK_RUNNING:
from typing import Callable, Iterable, Iterator, Union, TypeVar
from multiprocessing import pool

Pool = Union[pool.Pool, pool.ThreadPool]
S = TypeVar('S')
T = TypeVar('T')

# On platforms without sem_open, multiprocessing[.dummy] Pool
# cannot be created.
try:
import multiprocessing.synchronize # noqa
except ImportError:
LACK_SEM_OPEN = True
else:
LACK_SEM_OPEN = False

# Incredibly large timeout to work around bpo-8296 on Python 2.
TIMEOUT = 2000000


@contextmanager
def closing(pool):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't necessary, since pool's exit should handle closing and other details.

Copy link
Contributor Author

@McSinyx McSinyx Jun 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's needed for imap* to start submitting tasks to the pool for some reason. The non lazy variant doesn't need it though. I think I'll add a comment explaining why it is needed. Edit: I have, but I think my pun made it unclear so I'm gonna rephrase.

# type: (Pool) -> Iterator[Pool]
"""Return a context manager making sure the pool closes properly."""
try:
yield pool
finally:
# For Pool.imap*, close and join are needed
# for the returned iterator to begin yielding.
pool.close()
pool.join()
pool.terminate()


def _map_fallback(func, iterable, chunksize=1):
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
"""Make an iterator applying func to each element in iterable.

This function is the sequential fallback either on Python 2
where Pool.imap* doesn't react to KeyboardInterrupt
or when sem_open is unavailable.
"""
return map(func, iterable)


def _map_multiprocess(func, iterable, chunksize=1):
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
"""Chop iterable into chunks and submit them to a process pool.

For very long iterables using a large value for chunksize can make
the job complete much faster than using the default value of 1.

Return an unordered iterator of the results.
"""
with closing(ProcessPool()) as pool:
return pool.imap_unordered(func, iterable, chunksize)


def _map_multithread(func, iterable, chunksize=1):
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
"""Chop iterable into chunks and submit them to a thread pool.

For very long iterables using a large value for chunksize can make
the job complete much faster than using the default value of 1.

Return an unordered iterator of the results.
"""
with closing(ThreadPool(DEFAULT_POOLSIZE)) as pool:
return pool.imap_unordered(func, iterable, chunksize)


if LACK_SEM_OPEN or PY2:
map_multiprocess = map_multithread = _map_fallback
else:
map_multiprocess = _map_multiprocess
map_multithread = _map_multithread
66 changes: 66 additions & 0 deletions tests/unit/test_utils_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""Test multiprocessing/multithreading higher-order functions."""

from importlib import import_module
from math import factorial
from sys import modules

from pip._vendor.six import PY2
from pip._vendor.six.moves import map
from pytest import mark

DUNDER_IMPORT = '__builtin__.__import__' if PY2 else 'builtins.__import__'
FUNC, ITERABLE = factorial, range(42)
MAPS = 'map_multiprocess', 'map_multithread'
_import = __import__


def reload_parallel():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally this should be called one last time after all the tests have run (maybe via an auto-use fixture at module scope) to make sure it is loaded without being affected by the tests monkeypatch.

try:
del modules['pip._internal.utils.parallel']
except KeyError:
pass
return import_module('pip._internal.utils.parallel')


def lack_sem_open(name, *args, **kwargs):
"""Raise ImportError on import of multiprocessing.synchronize."""
if name.endswith('synchronize'):
raise ImportError
return _import(name, *args, **kwargs)


def have_sem_open(name, *args, **kwargs):
"""Make sure multiprocessing.synchronize import is successful."""
# We don't care about the return value
# since we don't use the pool with this import.
if name.endswith('synchronize'):
return
return _import(name, *args, **kwargs)


@mark.parametrize('name', MAPS)
def test_lack_sem_open(name, monkeypatch):
"""Test fallback when sem_open is not available.

If so, multiprocessing[.dummy].Pool will fail to be created and
map_async should fallback to map.
"""
monkeypatch.setattr(DUNDER_IMPORT, lack_sem_open)
parallel = reload_parallel()
assert getattr(parallel, name) is parallel._map_fallback


@mark.parametrize('name', MAPS)
def test_have_sem_open(name, monkeypatch):
"""Test fallback when sem_open is available."""
monkeypatch.setattr(DUNDER_IMPORT, have_sem_open)
parallel = reload_parallel()
impl = '_map_fallback' if PY2 else '_{}'.format(name)
assert getattr(parallel, name) is getattr(parallel, impl)


@mark.parametrize('name', MAPS)
def test_map(name):
"""Test correctness of result of asynchronous maps."""
map_async = getattr(reload_parallel(), name)
assert set(map_async(FUNC, ITERABLE)) == set(map(FUNC, ITERABLE))