diff --git a/news/8187.feature.rst b/news/8187.feature.rst new file mode 100644 index 00000000000..94cb8def243 --- /dev/null +++ b/news/8187.feature.rst @@ -0,0 +1,5 @@ +Run the wheel install in a multiprocessing Pool, this has significant performance gain +when installing cached packages. Packages that could not be installed +(exception raised) will be installed serially once the Pool is done. +If multiprocessing.Pool is not supported by the platform, +fall-back to serial installation. diff --git a/src/pip/_internal/cli/cmdoptions.py b/src/pip/_internal/cli/cmdoptions.py index f71c0b02011..a44adf52624 100644 --- a/src/pip/_internal/cli/cmdoptions.py +++ b/src/pip/_internal/cli/cmdoptions.py @@ -965,7 +965,7 @@ def check_list_path_option(options): metavar="feature", action="append", default=[], - choices=["2020-resolver", "fast-deps", "in-tree-build"], + choices=["2020-resolver", "fast-deps", "in-tree-build", "parallel-install"], help="Enable new functionality, that may be backward incompatible.", ) # type: Callable[..., Option] diff --git a/src/pip/_internal/cli/main.py b/src/pip/_internal/cli/main.py index 7ae074b59d5..6a4da54c6fb 100644 --- a/src/pip/_internal/cli/main.py +++ b/src/pip/_internal/cli/main.py @@ -44,6 +44,12 @@ def main(args=None): # type: (Optional[List[str]]) -> int + + # windows multiprocessing support + from multiprocessing import freeze_support + + freeze_support() + if args is None: args = sys.argv[1:] diff --git a/src/pip/_internal/commands/install.py b/src/pip/_internal/commands/install.py index dc637d87635..bf152f600b9 100644 --- a/src/pip/_internal/commands/install.py +++ b/src/pip/_internal/commands/install.py @@ -397,6 +397,7 @@ def run(self, options, args): warn_script_location=warn_script_location, use_user_site=options.use_user_site, pycompile=options.compile, + parallel_install="parallel-install" in options.features_enabled ) lib_locations = get_lib_location_guesses( diff --git a/src/pip/_internal/req/__init__.py b/src/pip/_internal/req/__init__.py index 06f0a0823f1..34bcdf2bfba 100644 --- a/src/pip/_internal/req/__init__.py +++ b/src/pip/_internal/req/__init__.py @@ -1,8 +1,10 @@ import collections import logging -from typing import Iterator, List, Optional, Sequence, Tuple +from functools import partial +from typing import Iterator, List, Optional, Sequence, Tuple, Union from pip._internal.utils.logging import indent_log +from pip._internal.utils.parallel import map_multiprocess_ordered from .req_file import parse_requirements from .req_install import InstallRequirement @@ -26,6 +28,13 @@ def __repr__(self): return f"InstallationResult(name={self.name!r})" +_InstallArgs = collections.namedtuple( + '_InstallArgs', + ['install_options', 'global_options', 'root', 'home', 'prefix', + 'warn_script_location', 'use_user_site', 'pycompile'] +) + + def _validate_requirements( requirements, # type: List[InstallRequirement] ): @@ -45,6 +54,7 @@ def install_given_reqs( warn_script_location, # type: bool use_user_site, # type: bool pycompile, # type: bool + parallel_install=False, # type: bool ): # type: (...) -> List[InstallationResult] """ @@ -60,39 +70,117 @@ def install_given_reqs( ', '.join(to_install.keys()), ) - installed = [] + # pre allocate installed package names + installed = [] # type: List[InstallationResult] + + # store install arguments + install_args = _InstallArgs( + install_options=install_options, + global_options=global_options, + root=root, + home=home, + prefix=prefix, + warn_script_location=warn_script_location, + use_user_site=use_user_site, + pycompile=pycompile, + ) with indent_log(): - for req_name, requirement in to_install.items(): - if requirement.should_reinstall: - logger.info('Attempting uninstall: %s', req_name) - with indent_log(): - uninstalled_pathset = requirement.uninstall( - auto_confirm=True - ) - else: - uninstalled_pathset = None - + # get a list of packages we can install in parallel. + # we will only select packaged wheels that do not require uninstalling + # previous version. This ensures that no console outputs is printed, + # making the stdout consistent in parallel execution. It also decreases + # the chance of package install failure, as we are only unzipping a + # wheel file (no compilation or script execution involved) + + if parallel_install: + should_parallel_reqs = [ + (i, req) for i, req in enumerate(requirements) + if not req.should_reinstall and req.is_wheel + ] + else: + should_parallel_reqs = [] + + if should_parallel_reqs: + # prepare for parallel execution + should_parallel_indexes, should_parallel_values = zip( + *should_parallel_reqs) + # install packages in parallel + executed_parallel_reqs = map_multiprocess_ordered( + partial(_single_install, install_args, suppress_exception=True), + should_parallel_values + ) + # collect back results + parallel_install_results = dict( + zip(should_parallel_indexes, executed_parallel_reqs)) + else: + parallel_install_results = {} + + # check the results from the parallel installation, + # and fill-in missing installations or raise exception + for i, req in enumerate(requirements): + + # select the install result from the parallel installation + # or install serially now try: - requirement.install( - install_options, - global_options, - root=root, - home=home, - prefix=prefix, - warn_script_location=warn_script_location, - use_user_site=use_user_site, - pycompile=pycompile, - ) - except Exception: - # if install did not succeed, rollback previous uninstall - if uninstalled_pathset and not requirement.install_succeeded: - uninstalled_pathset.rollback() - raise - else: - if uninstalled_pathset and requirement.install_succeeded: - uninstalled_pathset.commit() - - installed.append(InstallationResult(req_name)) + install_result = parallel_install_results[i] + if isinstance(install_result, InstallationResult): + logger.debug( + 'Successfully installed %s in parallel', req.name) + except KeyError: + install_result = _single_install( + install_args, req, suppress_exception=False) + logger.debug('Successfully installed %s serially', req.name) + + # Now processes the installation result, + # throw exception or add into installed packages + if isinstance(install_result, BaseException): + # Raise an exception if we caught one + # during the parallel installation + raise install_result + elif isinstance(install_result, InstallationResult): + installed.append(install_result) return installed + + +def _single_install( + install_args, # type: _InstallArgs + requirement, # type: InstallRequirement + suppress_exception=False, # type: bool +): + # type: (...) -> Union[None, InstallationResult, BaseException] + """ + Install a single requirement, returns InstallationResult + (to be called per requirement, either in parallel or serially). + """ + + if requirement.should_reinstall: + logger.info('Attempting uninstall: %s', requirement.name) + with indent_log(): + uninstalled_pathset = requirement.uninstall( + auto_confirm=True + ) + else: + uninstalled_pathset = None + + try: + requirement.install( + **install_args._asdict() + ) + except Exception as ex: + # Notice we might need to catch BaseException as this function + # can be executed from a subprocess. + # For the time being we keep the original catch Exception + + # if install did not succeed, rollback previous uninstall + if uninstalled_pathset and not requirement.install_succeeded: + uninstalled_pathset.rollback() + if suppress_exception: + return ex + raise + + if uninstalled_pathset and requirement.install_succeeded: + uninstalled_pathset.commit() + + return InstallationResult(requirement.name or '') diff --git a/src/pip/_internal/utils/parallel.py b/src/pip/_internal/utils/parallel.py index de91dc8abc8..eaa571b6753 100644 --- a/src/pip/_internal/utils/parallel.py +++ b/src/pip/_internal/utils/parallel.py @@ -16,13 +16,17 @@ than using the default value of 1. """ -__all__ = ["map_multiprocess", "map_multithread"] +__all__ = [ + "map_multiprocess", + "map_multithread", + "map_multiprocess_ordered", +] from contextlib import contextmanager from multiprocessing import Pool as ProcessPool from multiprocessing import pool from multiprocessing.dummy import Pool as ThreadPool -from typing import Callable, Iterable, Iterator, TypeVar, Union +from typing import Callable, Iterable, Iterator, List, TypeVar, Union from pip._vendor.requests.adapters import DEFAULT_POOLSIZE @@ -49,6 +53,9 @@ def closing(pool): """Return a context manager making sure the pool closes properly.""" try: yield pool + except (KeyboardInterrupt, SystemExit): + pool.terminate() + raise finally: # For Pool.imap*, close and join are needed # for the returned iterator to begin yielding. @@ -61,13 +68,24 @@ def _map_fallback(func, iterable, chunksize=1): # type: (Callable[[S], T], Iterable[S], int) -> Iterator[T] """Make an iterator applying func to each element in iterable. - This function is the sequential fallback either on Python 2 + This function is the sequential fallback where Pool.imap* doesn't react to KeyboardInterrupt or when sem_open is unavailable. """ return map(func, iterable) +def _map_ordered_fallback(func, iterable, chunksize=1): + # type: (Callable[[S], T], Iterable[S], int) -> List[T] + """Make a list applying func to each element in iterable. + + This function is the sequential fallback + where Pool.imap* doesn't react to KeyboardInterrupt + or when sem_open is unavailable. + """ + return list(map(func, iterable)) + + def _map_multiprocess(func, iterable, chunksize=1): # type: (Callable[[S], T], Iterable[S], int) -> Iterator[T] """Chop iterable into chunks and submit them to a process pool. @@ -81,6 +99,19 @@ def _map_multiprocess(func, iterable, chunksize=1): return pool.imap_unordered(func, iterable, chunksize) +def _map_multiprocess_ordered(func, iterable, chunksize=1): + # type: (Callable[[S], T], Iterable[S], int) -> List[T] + """Chop iterable into chunks and submit them to a process pool. + + For very long iterables using a large value for chunksize can make + the job complete much faster than using the default value of 1. + + Return an ordered list of the results. + """ + with closing(ProcessPool()) as pool: + return pool.map(func, iterable, chunksize) + + def _map_multithread(func, iterable, chunksize=1): # type: (Callable[[S], T], Iterable[S], int) -> Iterator[T] """Chop iterable into chunks and submit them to a thread pool. @@ -96,6 +127,8 @@ def _map_multithread(func, iterable, chunksize=1): if LACK_SEM_OPEN: map_multiprocess = map_multithread = _map_fallback + map_multiprocess_ordered = _map_ordered_fallback else: map_multiprocess = _map_multiprocess map_multithread = _map_multithread + map_multiprocess_ordered = _map_multiprocess_ordered diff --git a/tests/functional/test_install_wheel.py b/tests/functional/test_install_wheel.py index 8df208bb7da..e5c98ad3b34 100644 --- a/tests/functional/test_install_wheel.py +++ b/tests/functional/test_install_wheel.py @@ -707,3 +707,28 @@ def test_wheel_with_unknown_subdir_in_data_dir_has_reasonable_error( "install", "--no-index", str(wheel_path), expect_error=True ) assert "simple-0.1.0.data/unknown/hello.txt" in result.stderr + + +def test_install_from_wheel_in_parallel_installs_deps(script, data, tmpdir): + """ + Test can install dependencies of wheels in parallel + """ + # 'requires_source' depends on the 'source' project + package = data.packages.joinpath( + "requires_source-1.0-py2.py3-none-any.whl" + ) + shutil.copy(data.packages / "source-1.0.tar.gz", tmpdir) + result = script.pip( + '-vvv', + 'install', + '--use-feature=parallel-install', + '--no-index', '--find-links', tmpdir, package, + expect_stderr=True, + ) + log_messages = [ + msg for msg in result.stdout.split('\n') + if msg.lstrip().startswith('Successfully installed ')] + assert len(log_messages) == 3 + assert 'serially' in log_messages[0] + assert 'parallel' in log_messages[1] + result.assert_installed('source', editable=False)