Skip to content

Refactor HTTP download logic #13383

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions src/pip/_internal/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from pip._vendor.requests.models import Request, Response

from pip._internal.metadata import BaseDistribution
from pip._internal.models.link import Link
from pip._internal.network.download import _FileDownload
from pip._internal.req.req_install import InstallRequirement

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -819,17 +819,19 @@ class IncompleteDownloadError(DiagnosticPipError):

reference = "incomplete-download"

def __init__(
self, link: Link, received: int, expected: int, *, retries: int
) -> None:
def __init__(self, download: _FileDownload) -> None:
# Dodge circular import.
from pip._internal.utils.misc import format_size

download_status = f"{format_size(received)}/{format_size(expected)}"
if retries:
retry_status = f"after {retries} attempts "
assert download.size is not None
download_status = (
f"{format_size(download.bytes_received)}/{format_size(download.size)}"
)
if download.reattempts:
retry_status = f"after {download.reattempts + 1} attempts "
hint = "Use --resume-retries to configure resume attempt limit."
else:
# Download retrying is not enabled.
retry_status = ""
hint = "Consider using --resume-retries to enable download resumption."
message = Text(
Expand All @@ -839,7 +841,7 @@ def __init__(

super().__init__(
message=message,
context=f"URL: {link.redacted_url}",
context=f"URL: {download.link.redacted_url}",
hint_stmt=hint,
note_stmt="This is an issue with network connectivity, not pip.",
)
Expand Down
231 changes: 96 additions & 135 deletions src/pip/_internal/network/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import logging
import mimetypes
import os
from collections.abc import Iterable
from collections.abc import Iterable, Mapping
from dataclasses import dataclass
from http import HTTPStatus
from typing import BinaryIO

Expand Down Expand Up @@ -40,7 +41,7 @@ def _get_http_response_etag_or_last_modified(resp: Response) -> str | None:
return resp.headers.get("etag", resp.headers.get("last-modified"))


def _prepare_download(
def _log_download(
resp: Response,
link: Link,
progress_bar: str,
Expand Down Expand Up @@ -134,28 +135,28 @@ def _get_http_response_filename(resp: Response, link: Link) -> str:
return filename


def _http_get_download(
session: PipSession,
link: Link,
range_start: int | None = 0,
if_range: str | None = None,
) -> Response:
target_url = link.url.split("#", 1)[0]
headers = HEADERS.copy()
# request a partial download
if range_start:
headers["Range"] = f"bytes={range_start}-"
# make sure the file hasn't changed
if if_range:
headers["If-Range"] = if_range
try:
resp = session.get(target_url, headers=headers, stream=True)
raise_for_status(resp)
except NetworkConnectionError as e:
assert e.response is not None
logger.critical("HTTP error %s while getting %s", e.response.status_code, link)
raise
return resp
@dataclass
class _FileDownload:
"""Stores the state of a single link download."""

link: Link
output_file: BinaryIO
size: int | None
bytes_received: int = 0
reattempts: int = 0

def is_incomplete(self) -> bool:
return bool(self.size is not None and self.bytes_received < self.size)

def write_chunk(self, data: bytes) -> None:
self.bytes_received += len(data)
self.output_file.write(data)

def reset_file(self) -> None:
"""Delete any saved data and reset progress to zero."""
self.output_file.seek(0)
self.output_file.truncate()
self.bytes_received = 0


class Downloader:
Expand All @@ -172,146 +173,106 @@ def __init__(
self._progress_bar = progress_bar
self._resume_retries = resume_retries

def __call__(self, link: Link, location: str) -> tuple[str, str]:
"""Download the file given by link into location."""
resp = _http_get_download(self._session, link)
# NOTE: The original download size needs to be passed down everywhere
# so if the download is resumed (with a HTTP Range request) the progress
# bar will report the right size.
total_length = _get_http_response_size(resp)
content_type = resp.headers.get("Content-Type", "")
def batch(
self, links: Iterable[Link], location: str
) -> Iterable[tuple[Link, tuple[str, str]]]:
"""Convenience method to download multiple links."""
for link in links:
filepath, content_type = self(link, location)
yield link, (filepath, content_type)

filename = _get_http_response_filename(resp, link)
filepath = os.path.join(location, filename)
def __call__(self, link: Link, location: str) -> tuple[str, str]:
"""Download a link and save it under location."""
resp = self._http_get(link)
download_size = _get_http_response_size(resp)

filepath = os.path.join(location, _get_http_response_filename(resp, link))
with open(filepath, "wb") as content_file:
bytes_received = self._process_response(
resp, link, content_file, 0, total_length
)
# If possible, check for an incomplete download and attempt resuming.
if total_length and bytes_received < total_length:
self._attempt_resume(
resp, link, content_file, total_length, bytes_received
)
download = _FileDownload(link, content_file, download_size)
self._process_response(download, resp)
if download.is_incomplete():
self._attempt_resumes_or_redownloads(download, resp)

content_type = resp.headers.get("Content-Type", "")
return filepath, content_type

def _process_response(
self,
resp: Response,
link: Link,
content_file: BinaryIO,
bytes_received: int,
total_length: int | None,
) -> int:
"""Process the response and write the chunks to the file."""
chunks = _prepare_download(
resp, link, self._progress_bar, total_length, range_start=bytes_received
)
return self._write_chunks_to_file(
chunks, content_file, allow_partial=bool(total_length)
def _process_response(self, download: _FileDownload, resp: Response) -> None:
"""Download and save chunks from a response."""
chunks = _log_download(
resp,
download.link,
self._progress_bar,
download.size,
range_start=download.bytes_received,
)

def _write_chunks_to_file(
self, chunks: Iterable[bytes], content_file: BinaryIO, *, allow_partial: bool
) -> int:
"""Write the chunks to the file and return the number of bytes received."""
bytes_received = 0
try:
for chunk in chunks:
bytes_received += len(chunk)
content_file.write(chunk)
download.write_chunk(chunk)
except ReadTimeoutError as e:
# If partial downloads are OK (the download will be retried), don't bail.
if not allow_partial:
# If the download size is not known, then give up downloading the file.
if download.size is None:
raise e

# Ensuring bytes_received is returned to attempt resume
logger.warning("Connection timed out while downloading.")

return bytes_received

def _attempt_resume(
self,
resp: Response,
link: Link,
content_file: BinaryIO,
total_length: int | None,
bytes_received: int,
def _attempt_resumes_or_redownloads(
self, download: _FileDownload, first_resp: Response
) -> None:
"""Attempt to resume the download if connection was dropped."""
etag_or_last_modified = _get_http_response_etag_or_last_modified(resp)

attempts_left = self._resume_retries
while total_length and attempts_left and bytes_received < total_length:
attempts_left -= 1
"""Attempt to resume/restart the download if connection was dropped."""

while download.reattempts < self._resume_retries and download.is_incomplete():
assert download.size is not None
download.reattempts += 1
logger.warning(
"Attempting to resume incomplete download (%s/%s, attempt %d)",
format_size(bytes_received),
format_size(total_length),
(self._resume_retries - attempts_left),
format_size(download.bytes_received),
format_size(download.size),
download.reattempts,
)

try:
# Try to resume the download using a HTTP range request.
resume_resp = _http_get_download(
self._session,
link,
range_start=bytes_received,
if_range=etag_or_last_modified,
)

resume_resp = self._http_get_resume(download, should_match=first_resp)
# Fallback: if the server responded with 200 (i.e., the file has
# since been modified or range requests are unsupported) or any
# other unexpected status, restart the download from the beginning.
must_restart = resume_resp.status_code != HTTPStatus.PARTIAL_CONTENT
if must_restart:
bytes_received, total_length, etag_or_last_modified = (
self._reset_download_state(resume_resp, content_file)
)
download.reset_file()
download.size = _get_http_response_size(resume_resp)
first_resp = resume_resp

bytes_received += self._process_response(
resume_resp, link, content_file, bytes_received, total_length
)
self._process_response(download, resume_resp)
except (ConnectionError, ReadTimeoutError, OSError):
continue

# No more resume attempts. Raise an error if the download is still incomplete.
if total_length and bytes_received < total_length:
os.remove(content_file.name)
raise IncompleteDownloadError(
link, bytes_received, total_length, retries=self._resume_retries
if download.is_incomplete():
os.remove(download.output_file.name)
raise IncompleteDownloadError(download)

def _http_get_resume(
self, download: _FileDownload, should_match: Response
) -> Response:
"""Issue a HTTP range request to resume the download."""
# To better understand the download resumption logic, see the mdn web docs:
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Guides/Range_requests
headers = HEADERS.copy()
headers["Range"] = f"bytes={download.bytes_received}-"
# If possible, use a conditional range request to avoid corrupted
# downloads caused by the remote file changing in-between.
if identifier := _get_http_response_etag_or_last_modified(should_match):
headers["If-Range"] = identifier
return self._http_get(download.link, headers)

def _http_get(self, link: Link, headers: Mapping[str, str] = HEADERS) -> Response:
target_url = link.url_without_fragment
try:
resp = self._session.get(target_url, headers=headers, stream=True)
raise_for_status(resp)
except NetworkConnectionError as e:
assert e.response is not None
logger.critical(
"HTTP error %s while getting %s", e.response.status_code, link
)

def _reset_download_state(
self,
resp: Response,
content_file: BinaryIO,
) -> tuple[int, int | None, str | None]:
"""Reset the download state to restart downloading from the beginning."""
content_file.seek(0)
content_file.truncate()
bytes_received = 0
total_length = _get_http_response_size(resp)
etag_or_last_modified = _get_http_response_etag_or_last_modified(resp)

return bytes_received, total_length, etag_or_last_modified


class BatchDownloader:
def __init__(
self,
session: PipSession,
progress_bar: str,
resume_retries: int,
) -> None:
self._downloader = Downloader(session, progress_bar, resume_retries)

def __call__(
self, links: Iterable[Link], location: str
) -> Iterable[tuple[Link, tuple[str, str]]]:
"""Download the files given by links into location."""
for link in links:
filepath, content_type = self._downloader(link, location)
yield link, (filepath, content_type)
raise
return resp
8 changes: 2 additions & 6 deletions src/pip/_internal/operations/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from pip._internal.models.direct_url import ArchiveInfo
from pip._internal.models.link import Link
from pip._internal.models.wheel import Wheel
from pip._internal.network.download import BatchDownloader, Downloader
from pip._internal.network.download import Downloader
from pip._internal.network.lazy_wheel import (
HTTPRangeRequestUnsupported,
dist_from_wheel_url,
Expand Down Expand Up @@ -245,7 +245,6 @@ def __init__(
self.build_tracker = build_tracker
self._session = session
self._download = Downloader(session, progress_bar, resume_retries)
self._batch_download = BatchDownloader(session, progress_bar, resume_retries)
self.finder = finder

# Where still-packed archives should be written to. If None, they are
Expand Down Expand Up @@ -468,10 +467,7 @@ def _complete_partial_requirements(
assert req.link
links_to_fully_download[req.link] = req

batch_download = self._batch_download(
links_to_fully_download.keys(),
temp_dir,
)
batch_download = self._download.batch(links_to_fully_download.keys(), temp_dir)
for link, (filepath, _) in batch_download:
logger.debug("Downloading link %s to %s", link, filepath)
req = links_to_fully_download[link]
Expand Down
Loading