8
8
from pathlib import Path
9
9
from queue import Queue
10
10
from threading import Event , Semaphore , Thread
11
- from typing import Iterable , Optional , Tuple , Union
11
+ from typing import Iterable , List , Optional , Tuple , Union
12
12
13
13
from pip ._vendor .requests .models import Response
14
14
@@ -127,6 +127,15 @@ def _http_get_download(session: PipSession, link: Link) -> Response:
127
127
return resp
128
128
129
129
130
+ def _http_content_length (session : PipSession , link : Link ) -> Optional [int ]:
131
+ target_url = link .url .split ("#" , 1 )[0 ]
132
+ resp = session .head (target_url )
133
+ raise_for_status (resp )
134
+ if content_length := resp .headers .get ("content-length" , None ):
135
+ return int (content_length )
136
+ return None
137
+
138
+
130
139
class Downloader :
131
140
def __init__ (
132
141
self ,
@@ -164,8 +173,9 @@ def _copy_chunks(
164
173
semaphore : Semaphore ,
165
174
session : PipSession ,
166
175
location : Path ,
167
- link : Link ,
176
+ download_info : Tuple [ Link , Optional [ int ]] ,
168
177
) -> None :
178
+ link , total_length = download_info
169
179
with semaphore :
170
180
try :
171
181
try :
@@ -188,11 +198,19 @@ def _copy_chunks(
188
198
189
199
with filepath .open ("wb" ) as output_file :
190
200
chunk_index = 0
201
+ current_bytes = 0
191
202
for chunk in chunks :
192
203
# Check if another thread exited with an exception between chunks.
193
204
if event .is_set ():
194
205
return
195
- logger .debug ("reading chunk %d for link %s" , chunk_index , link )
206
+ current_bytes += len (chunk )
207
+ logger .debug (
208
+ "reading chunk %d for file %s [%d/%s bytes]" ,
209
+ chunk_index ,
210
+ filename ,
211
+ current_bytes ,
212
+ str (total_length or 0 ),
213
+ )
196
214
chunk_index += 1
197
215
# Copy chunk directly to output file, without any
198
216
# additional buffering.
@@ -214,13 +232,24 @@ def __init__(
214
232
logger .info ("Ignoring progress bar %s for parallel downloads" , progress_bar )
215
233
216
234
def __call__ (
217
- self , input_links : Iterable [Link ], location : Path
235
+ self , links : Iterable [Link ], location : Path
218
236
) -> Iterable [Tuple [Link , Tuple [Path , Optional [str ]]]]:
219
237
"""Download the files given by links into location."""
220
- links = list (input_links )
238
+ # Calculate the byte length for each file, if available.
239
+ links_with_lengths : List [Tuple [Link , Optional [int ]]] = [
240
+ (link , _http_content_length (self ._session , link )) for link in links
241
+ ]
242
+ # Sum up the total length we'll be downloading.
243
+ total_length : Optional [int ] = 0
244
+ for _link , maybe_len in links_with_lengths :
245
+ if maybe_len is None :
246
+ total_length = None
247
+ break
248
+ assert total_length is not None
249
+ total_length += maybe_len
221
250
222
251
# Set up state to track thread progress, including inner exceptions.
223
- total_downloads : int = len (links )
252
+ total_downloads : int = len (links_with_lengths )
224
253
completed_downloads : int = 0
225
254
q : "Queue[Union[Tuple[Link, Path, Optional[str]], BaseException]]" = Queue ()
226
255
event = Event ()
@@ -239,9 +268,9 @@ def __call__(
239
268
workers = [
240
269
Thread (
241
270
target = _copy_chunks ,
242
- args = (q , event , semaphore , self ._session , location , link ),
271
+ args = (q , event , semaphore , self ._session , location , download_info ),
243
272
)
244
- for link in links
273
+ for download_info in links_with_lengths
245
274
]
246
275
for w in workers :
247
276
w .start ()
0 commit comments