Skip to content

Commit 54042e6

Browse files
McSinyxpradyunsg
andcommitted
Drop parallel map for Python 2 and the non-lazy variant
Co-authored-by: Pradyun Gedam <[email protected]>
1 parent e1bac23 commit 54042e6

File tree

2 files changed

+34
-151
lines changed

2 files changed

+34
-151
lines changed

src/pip/_internal/utils/parallel.py

+31-139
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,22 @@
11
"""Convenient parallelization of higher order functions.
22
3-
This module provides proper fallback functions for multiprocess
4-
and multithread map, both the non-lazy, ordered variant
5-
and the lazy, unordered variant.
3+
This module provides two helper functions, with appropriate fallbacks on
4+
Python 2 and on systems lacking support for synchronization mechanisms:
5+
6+
- map_multiprocess
7+
- map_multithread
8+
9+
These helpers work like Python 3's map, with two differences:
10+
11+
- They don't guarantee the order of processing of
12+
the elements of the iterable.
13+
- The underlying process/thread pools chop the iterable into
14+
a number of chunks, so that for very long iterables using
15+
a large value for chunksize can make the job complete much faster
16+
than using the default value of 1.
617
"""
718

8-
__all__ = ['map_multiprocess', 'imap_multiprocess',
9-
'map_multithread', 'imap_multithread']
19+
__all__ = ['map_multiprocess', 'map_multithread']
1020

1121
from contextlib import contextmanager
1222
from multiprocessing import Pool as ProcessPool
@@ -19,8 +29,7 @@
1929
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
2030

2131
if MYPY_CHECK_RUNNING:
22-
from typing import (
23-
Callable, Iterable, Iterator, List, Optional, Union, TypeVar)
32+
from typing import Callable, Iterable, Iterator, Union, TypeVar
2433
from multiprocessing import pool
2534

2635
Pool = Union[pool.Pool, pool.ThreadPool]
@@ -43,87 +52,29 @@
4352
@contextmanager
4453
def closing(pool):
4554
# type: (Pool) -> Iterator[Pool]
46-
"""Return a context manager that closes and joins pool.
47-
48-
This is needed for Pool.imap* to make the result iterator iterate.
49-
"""
55+
"""Return a context manager making sure the pool closes properly."""
5056
try:
5157
yield pool
5258
finally:
59+
# For Pool.imap*, close and join are needed
60+
# for the returned iterator to begin yielding.
5361
pool.close()
5462
pool.join()
63+
pool.terminate()
5564

5665

57-
def _map_fallback(func, iterable, chunksize=None):
58-
# type: (Callable[[S], T], Iterable[S], Optional[int]) -> List[T]
59-
"""Return a list of func applied to each element in iterable.
60-
61-
This function is the sequential fallback when sem_open is unavailable.
62-
"""
63-
return list(map(func, iterable))
64-
65-
66-
def _imap_fallback(func, iterable, chunksize=1):
66+
def _map_fallback(func, iterable, chunksize=1):
6767
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
6868
"""Make an iterator applying func to each element in iterable.
6969
70-
This function is the sequential fallback when sem_open is unavailable.
70+
This function is the sequential fallback either on Python 2
71+
where Pool.imap* doesn't react to KeyboardInterrupt
72+
or when sem_open is unavailable.
7173
"""
7274
return map(func, iterable)
7375

7476

75-
def _map_multiprocess_py2(func, iterable, chunksize=None):
76-
# type: (Callable[[S], T], Iterable[S], Optional[int]) -> List[T]
77-
"""Chop iterable into chunks and submit them to a process pool.
78-
79-
The (approximate) size of these chunks can be specified
80-
by setting chunksize to a positive integer.
81-
82-
Note that this function may cause high memory usage
83-
for long iterables.
84-
85-
Return a list of results in order.
86-
"""
87-
pool = ProcessPool()
88-
try:
89-
return pool.map_async(func, iterable, chunksize).get(TIMEOUT)
90-
finally:
91-
pool.terminate()
92-
93-
94-
def _map_multiprocess_py3(func, iterable, chunksize=None):
95-
# type: (Callable[[S], T], Iterable[S], Optional[int]) -> List[T]
96-
"""Chop iterable into chunks and submit them to a process pool.
97-
98-
The (approximate) size of these chunks can be specified
99-
by setting chunksize to a positive integer.
100-
101-
Note that this function may cause high memory usage
102-
for long iterables.
103-
104-
Return a list of results in order.
105-
"""
106-
with ProcessPool() as pool:
107-
return pool.map(func, iterable, chunksize)
108-
109-
110-
def _imap_multiprocess_py2(func, iterable, chunksize=1):
111-
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
112-
"""Chop iterable into chunks and submit them to a process pool.
113-
114-
For very long iterables using a large value for chunksize can make
115-
the job complete much faster than using the default value of 1.
116-
117-
Return an unordered iterator of the results.
118-
"""
119-
pool = ProcessPool()
120-
try:
121-
return iter(pool.map_async(func, iterable, chunksize).get(TIMEOUT))
122-
finally:
123-
pool.terminate()
124-
125-
126-
def _imap_multiprocess_py3(func, iterable, chunksize=1):
77+
def _map_multiprocess(func, iterable, chunksize=1):
12778
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
12879
"""Chop iterable into chunks and submit them to a process pool.
12980
@@ -132,62 +83,11 @@ def _imap_multiprocess_py3(func, iterable, chunksize=1):
13283
13384
Return an unordered iterator of the results.
13485
"""
135-
with ProcessPool() as pool, closing(pool):
86+
with closing(ProcessPool()) as pool:
13687
return pool.imap_unordered(func, iterable, chunksize)
13788

13889

139-
def _map_multithread_py2(func, iterable, chunksize=None):
140-
# type: (Callable[[S], T], Iterable[S], Optional[int]) -> List[T]
141-
"""Chop iterable into chunks and submit them to a thread pool.
142-
143-
The (approximate) size of these chunks can be specified
144-
by setting chunksize to a positive integer.
145-
146-
Note that this function may cause high memory usage
147-
for long iterables.
148-
149-
Return a list of results in order.
150-
"""
151-
pool = ThreadPool(DEFAULT_POOLSIZE)
152-
try:
153-
return pool.map_async(func, iterable, chunksize).get(TIMEOUT)
154-
finally:
155-
pool.terminate()
156-
157-
158-
def _map_multithread_py3(func, iterable, chunksize=None):
159-
# type: (Callable[[S], T], Iterable[S], Optional[int]) -> List[T]
160-
"""Chop iterable into chunks and submit them to a thread pool.
161-
162-
The (approximate) size of these chunks can be specified
163-
by setting chunksize to a positive integer.
164-
165-
Note that this function may cause high memory usage
166-
for long iterables.
167-
168-
Return a list of results in order.
169-
"""
170-
with ThreadPool(DEFAULT_POOLSIZE) as pool:
171-
return pool.map(func, iterable, chunksize)
172-
173-
174-
def _imap_multithread_py2(func, iterable, chunksize=1):
175-
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
176-
"""Chop iterable into chunks and submit them to a thread pool.
177-
178-
For very long iterables using a large value for chunksize can make
179-
the job complete much faster than using the default value of 1.
180-
181-
Return an unordered iterator of the results.
182-
"""
183-
pool = ThreadPool(DEFAULT_POOLSIZE)
184-
try:
185-
return pool.map_async(func, iterable, chunksize).get(TIMEOUT)
186-
finally:
187-
pool.terminate()
188-
189-
190-
def _imap_multithread_py3(func, iterable, chunksize=1):
90+
def _map_multithread(func, iterable, chunksize=1):
19191
# type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
19292
"""Chop iterable into chunks and submit them to a thread pool.
19393
@@ -196,20 +96,12 @@ def _imap_multithread_py3(func, iterable, chunksize=1):
19696
19797
Return an unordered iterator of the results.
19898
"""
199-
with ThreadPool(DEFAULT_POOLSIZE) as pool, closing(pool):
99+
with closing(ThreadPool(DEFAULT_POOLSIZE)) as pool:
200100
return pool.imap_unordered(func, iterable, chunksize)
201101

202102

203-
if LACK_SEM_OPEN:
103+
if LACK_SEM_OPEN or PY2:
204104
map_multiprocess = map_multithread = _map_fallback
205-
imap_multiprocess = imap_multithread = _imap_fallback
206-
elif PY2:
207-
map_multiprocess = _map_multiprocess_py2
208-
imap_multiprocess = _imap_multiprocess_py2
209-
map_multithread = _map_multithread_py2
210-
imap_multithread = _imap_multithread_py2
211105
else:
212-
map_multiprocess = _map_multiprocess_py3
213-
imap_multiprocess = _imap_multiprocess_py3
214-
map_multithread = _map_multithread_py3
215-
imap_multithread = _imap_multithread_py3
106+
map_multiprocess = _map_multiprocess
107+
map_multithread = _map_multithread

tests/unit/test_utils_parallel.py

+3-12
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@
1010

1111
DUNDER_IMPORT = '__builtin__.__import__' if PY2 else 'builtins.__import__'
1212
FUNC, ITERABLE = factorial, range(42)
13-
MAPS = ('map_multiprocess', 'imap_multiprocess',
14-
'map_multithread', 'imap_multithread')
13+
MAPS = 'map_multiprocess', 'map_multithread'
1514
_import = __import__
1615

1716

@@ -45,16 +44,15 @@ def test_lack_sem_open(name, monkeypatch):
4544
"""
4645
monkeypatch.setattr(DUNDER_IMPORT, lack_sem_open)
4746
parallel = reload_parallel()
48-
fallback = '_{}_fallback'.format(name.split('_')[0])
49-
assert getattr(parallel, name) is getattr(parallel, fallback)
47+
assert getattr(parallel, name) is getattr(parallel, '_map_fallback')
5048

5149

5250
@mark.parametrize('name', MAPS)
5351
def test_have_sem_open(name, monkeypatch):
5452
"""Test fallback when sem_open is available."""
5553
monkeypatch.setattr(DUNDER_IMPORT, have_sem_open)
5654
parallel = reload_parallel()
57-
impl = ('_{}_py2' if PY2 else '_{}_py3').format(name)
55+
impl = '_map_fallback' if PY2 else '_{}'.format(name)
5856
assert getattr(parallel, name) is getattr(parallel, impl)
5957

6058

@@ -63,10 +61,3 @@ def test_map(name):
6361
"""Test correctness of result of asynchronous maps."""
6462
map_async = getattr(reload_parallel(), name)
6563
assert set(map_async(FUNC, ITERABLE)) == set(map(FUNC, ITERABLE))
66-
67-
68-
@mark.parametrize('name', ('map_multiprocess', 'map_multithread'))
69-
def test_map_order(name):
70-
"""Test result ordering of asynchronous maps."""
71-
map_async = getattr(reload_parallel(), name)
72-
assert tuple(map_async(FUNC, ITERABLE)) == tuple(map(FUNC, ITERABLE))

0 commit comments

Comments
 (0)