Skip to content

Add parallel install with fall-back to serial install when no multiprocessing available #8215

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions news/8187.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Run the wheel install in a multiprocessing Pool, this has significant performance gain
when installing cached packages. Packages that could not be installed
(exception raised) will be installed serially once the Pool is done.
If multiprocessing.Pool is not supported by the platform,
fall-back to serial installation.
2 changes: 1 addition & 1 deletion src/pip/_internal/cli/cmdoptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -965,7 +965,7 @@ def check_list_path_option(options):
metavar="feature",
action="append",
default=[],
choices=["2020-resolver", "fast-deps", "in-tree-build"],
choices=["2020-resolver", "fast-deps", "in-tree-build", "parallel-install"],
help="Enable new functionality, that may be backward incompatible.",
) # type: Callable[..., Option]

Expand Down
6 changes: 6 additions & 0 deletions src/pip/_internal/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@

def main(args=None):
# type: (Optional[List[str]]) -> int

# windows multiprocessing support
from multiprocessing import freeze_support

freeze_support()

if args is None:
args = sys.argv[1:]

Expand Down
1 change: 1 addition & 0 deletions src/pip/_internal/commands/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ def run(self, options, args):
warn_script_location=warn_script_location,
use_user_site=options.use_user_site,
pycompile=options.compile,
parallel_install="parallel-install" in options.features_enabled
)

lib_locations = get_lib_location_guesses(
Expand Down
152 changes: 120 additions & 32 deletions src/pip/_internal/req/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import collections
import logging
from typing import Iterator, List, Optional, Sequence, Tuple
from functools import partial
from typing import Iterator, List, Optional, Sequence, Tuple, Union

from pip._internal.utils.logging import indent_log
from pip._internal.utils.parallel import map_multiprocess_ordered

from .req_file import parse_requirements
from .req_install import InstallRequirement
Expand All @@ -26,6 +28,13 @@ def __repr__(self):
return f"InstallationResult(name={self.name!r})"


_InstallArgs = collections.namedtuple(
'_InstallArgs',
['install_options', 'global_options', 'root', 'home', 'prefix',
'warn_script_location', 'use_user_site', 'pycompile']
)


def _validate_requirements(
requirements, # type: List[InstallRequirement]
):
Expand All @@ -45,6 +54,7 @@ def install_given_reqs(
warn_script_location, # type: bool
use_user_site, # type: bool
pycompile, # type: bool
parallel_install=False, # type: bool
):
# type: (...) -> List[InstallationResult]
"""
Expand All @@ -60,39 +70,117 @@ def install_given_reqs(
', '.join(to_install.keys()),
)

installed = []
# pre allocate installed package names
installed = [] # type: List[InstallationResult]

# store install arguments
install_args = _InstallArgs(
install_options=install_options,
global_options=global_options,
root=root,
home=home,
prefix=prefix,
warn_script_location=warn_script_location,
use_user_site=use_user_site,
pycompile=pycompile,
)

with indent_log():
for req_name, requirement in to_install.items():
if requirement.should_reinstall:
logger.info('Attempting uninstall: %s', req_name)
with indent_log():
uninstalled_pathset = requirement.uninstall(
auto_confirm=True
)
else:
uninstalled_pathset = None

# get a list of packages we can install in parallel.
# we will only select packaged wheels that do not require uninstalling
# previous version. This ensures that no console outputs is printed,
# making the stdout consistent in parallel execution. It also decreases
# the chance of package install failure, as we are only unzipping a
# wheel file (no compilation or script execution involved)

if parallel_install:
should_parallel_reqs = [
(i, req) for i, req in enumerate(requirements)
if not req.should_reinstall and req.is_wheel
]
else:
should_parallel_reqs = []

if should_parallel_reqs:
# prepare for parallel execution
should_parallel_indexes, should_parallel_values = zip(
*should_parallel_reqs)
# install packages in parallel
executed_parallel_reqs = map_multiprocess_ordered(
partial(_single_install, install_args, suppress_exception=True),
should_parallel_values
)
# collect back results
parallel_install_results = dict(
zip(should_parallel_indexes, executed_parallel_reqs))
else:
parallel_install_results = {}

# check the results from the parallel installation,
# and fill-in missing installations or raise exception
for i, req in enumerate(requirements):

# select the install result from the parallel installation
# or install serially now
try:
requirement.install(
install_options,
global_options,
root=root,
home=home,
prefix=prefix,
warn_script_location=warn_script_location,
use_user_site=use_user_site,
pycompile=pycompile,
)
except Exception:
# if install did not succeed, rollback previous uninstall
if uninstalled_pathset and not requirement.install_succeeded:
uninstalled_pathset.rollback()
raise
else:
if uninstalled_pathset and requirement.install_succeeded:
uninstalled_pathset.commit()

installed.append(InstallationResult(req_name))
install_result = parallel_install_results[i]
if isinstance(install_result, InstallationResult):
logger.debug(
'Successfully installed %s in parallel', req.name)
except KeyError:
install_result = _single_install(
install_args, req, suppress_exception=False)
logger.debug('Successfully installed %s serially', req.name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logs a successful install but just after, it may throw an exception: isinstance(install_result, BaseException) ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sbidoul when calling _single_install with suppress_exception=False it will not suppress the exception but raise it. This means you will not get isinstance(install_result, BaseException) in this section, hence if we passed the _single_install call, the installation was successful.


# Now processes the installation result,
# throw exception or add into installed packages
if isinstance(install_result, BaseException):
# Raise an exception if we caught one
# during the parallel installation
raise install_result
elif isinstance(install_result, InstallationResult):
installed.append(install_result)

return installed


def _single_install(
install_args, # type: _InstallArgs
requirement, # type: InstallRequirement
suppress_exception=False, # type: bool
):
# type: (...) -> Union[None, InstallationResult, BaseException]
"""
Install a single requirement, returns InstallationResult
(to be called per requirement, either in parallel or serially).
"""

if requirement.should_reinstall:
logger.info('Attempting uninstall: %s', requirement.name)
with indent_log():
uninstalled_pathset = requirement.uninstall(
auto_confirm=True
)
else:
uninstalled_pathset = None

try:
requirement.install(
**install_args._asdict()
)
except Exception as ex:
# Notice we might need to catch BaseException as this function
# can be executed from a subprocess.
# For the time being we keep the original catch Exception

# if install did not succeed, rollback previous uninstall
if uninstalled_pathset and not requirement.install_succeeded:
uninstalled_pathset.rollback()
if suppress_exception:
return ex
raise

if uninstalled_pathset and requirement.install_succeeded:
uninstalled_pathset.commit()

return InstallationResult(requirement.name or '')
39 changes: 36 additions & 3 deletions src/pip/_internal/utils/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@
than using the default value of 1.
"""

__all__ = ["map_multiprocess", "map_multithread"]
__all__ = [
"map_multiprocess",
"map_multithread",
"map_multiprocess_ordered",
]

from contextlib import contextmanager
from multiprocessing import Pool as ProcessPool
from multiprocessing import pool
from multiprocessing.dummy import Pool as ThreadPool
from typing import Callable, Iterable, Iterator, TypeVar, Union
from typing import Callable, Iterable, Iterator, List, TypeVar, Union

from pip._vendor.requests.adapters import DEFAULT_POOLSIZE

Expand All @@ -49,6 +53,9 @@ def closing(pool):
"""Return a context manager making sure the pool closes properly."""
try:
yield pool
except (KeyboardInterrupt, SystemExit):
pool.terminate()
raise
finally:
# For Pool.imap*, close and join are needed
# for the returned iterator to begin yielding.
Expand All @@ -61,13 +68,24 @@ def _map_fallback(func, iterable, chunksize=1):
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
"""Make an iterator applying func to each element in iterable.

This function is the sequential fallback either on Python 2
This function is the sequential fallback
where Pool.imap* doesn't react to KeyboardInterrupt
or when sem_open is unavailable.
"""
return map(func, iterable)


def _map_ordered_fallback(func, iterable, chunksize=1):
# type: (Callable[[S], T], Iterable[S], int) -> List[T]
"""Make a list applying func to each element in iterable.

This function is the sequential fallback
where Pool.imap* doesn't react to KeyboardInterrupt
or when sem_open is unavailable.
"""
return list(map(func, iterable))


def _map_multiprocess(func, iterable, chunksize=1):
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
"""Chop iterable into chunks and submit them to a process pool.
Expand All @@ -81,6 +99,19 @@ def _map_multiprocess(func, iterable, chunksize=1):
return pool.imap_unordered(func, iterable, chunksize)


def _map_multiprocess_ordered(func, iterable, chunksize=1):
# type: (Callable[[S], T], Iterable[S], int) -> List[T]
"""Chop iterable into chunks and submit them to a process pool.

For very long iterables using a large value for chunksize can make
the job complete much faster than using the default value of 1.

Return an ordered list of the results.
"""
with closing(ProcessPool()) as pool:
return pool.map(func, iterable, chunksize)


def _map_multithread(func, iterable, chunksize=1):
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
"""Chop iterable into chunks and submit them to a thread pool.
Expand All @@ -96,6 +127,8 @@ def _map_multithread(func, iterable, chunksize=1):

if LACK_SEM_OPEN:
map_multiprocess = map_multithread = _map_fallback
map_multiprocess_ordered = _map_ordered_fallback
else:
map_multiprocess = _map_multiprocess
map_multithread = _map_multithread
map_multiprocess_ordered = _map_multiprocess_ordered
25 changes: 25 additions & 0 deletions tests/functional/test_install_wheel.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,3 +707,28 @@ def test_wheel_with_unknown_subdir_in_data_dir_has_reasonable_error(
"install", "--no-index", str(wheel_path), expect_error=True
)
assert "simple-0.1.0.data/unknown/hello.txt" in result.stderr


def test_install_from_wheel_in_parallel_installs_deps(script, data, tmpdir):
"""
Test can install dependencies of wheels in parallel
"""
# 'requires_source' depends on the 'source' project
package = data.packages.joinpath(
"requires_source-1.0-py2.py3-none-any.whl"
)
shutil.copy(data.packages / "source-1.0.tar.gz", tmpdir)
result = script.pip(
'-vvv',
'install',
'--use-feature=parallel-install',
'--no-index', '--find-links', tmpdir, package,
expect_stderr=True,
)
log_messages = [
msg for msg in result.stdout.split('\n')
if msg.lstrip().startswith('Successfully installed ')]
assert len(log_messages) == 3
assert 'serially' in log_messages[0]
assert 'parallel' in log_messages[1]
result.assert_installed('source', editable=False)