Skip to content

Commit 082033b

Browse files
authored
Merge pull request #1002 from planetlabs/download-failure-974
Add retries to file download to avoid throwing an exception in the case of normal retry errors
2 parents 1b387e0 + 71d3085 commit 082033b

File tree

9 files changed

+234
-294
lines changed

9 files changed

+234
-294
lines changed

planet/clients/data.py

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@
2020
from typing import Any, AsyncIterator, Callable, Dict, List, Optional
2121
import uuid
2222

23+
from tqdm.asyncio import tqdm
24+
2325
from ..data_filter import empty_filter
2426
from .. import exceptions
2527
from ..constants import PLANET_BASE_URL
2628
from ..http import Session
27-
from ..models import Paged, StreamingBody
29+
from ..models import Paged
2830
from ..specs import validate_data_item_type
2931

3032
BASE_URL = f'{PLANET_BASE_URL}/data/v1/'
@@ -595,13 +597,30 @@ async def download_asset(self,
595597
raise exceptions.ClientError(
596598
'asset missing ["location"] entry. Is asset active?')
597599

598-
async with self._session.stream(method='GET', url=location) as resp:
599-
body = StreamingBody(resp)
600-
dl_path = Path(directory, filename or body.name)
601-
dl_path.parent.mkdir(exist_ok=True, parents=True)
602-
await body.write(dl_path,
603-
overwrite=overwrite,
604-
progress_bar=progress_bar)
600+
response = await self._session.request(method='GET', url=location)
601+
filename = filename or response.filename
602+
if not filename:
603+
raise exceptions.ClientError(
604+
f'Could not determine filename at {location}')
605+
606+
dl_path = Path(directory, filename)
607+
dl_path.parent.mkdir(exist_ok=True, parents=True)
608+
LOGGER.info(f'Downloading {dl_path}')
609+
610+
try:
611+
mode = 'wb' if overwrite else 'xb'
612+
with open(dl_path, mode) as fp:
613+
with tqdm(total=response.length,
614+
unit_scale=True,
615+
unit_divisor=1024 * 1024,
616+
unit='B',
617+
desc=str(filename),
618+
disable=not progress_bar) as progress:
619+
update = progress.update if progress_bar else LOGGER.debug
620+
await self._session.write(location, fp, update)
621+
except FileExistsError:
622+
LOGGER.info(f'File {dl_path} exists, not overwriting')
623+
605624
return dl_path
606625

607626
@staticmethod

planet/clients/orders.py

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,19 @@
1515
"""Functionality for interacting with the orders api"""
1616
import asyncio
1717
import logging
18+
from pathlib import Path
1819
import time
1920
from typing import AsyncIterator, Callable, List, Optional
2021
import uuid
2122
import json
2223
import hashlib
2324

24-
from pathlib import Path
25+
from tqdm.asyncio import tqdm
26+
2527
from .. import exceptions
2628
from ..constants import PLANET_BASE_URL
2729
from ..http import Session
28-
from ..models import Paged, StreamingBody
30+
from ..models import Paged
2931

3032
BASE_URL = f'{PLANET_BASE_URL}/compute/ops'
3133
STATS_PATH = '/stats/orders/v2'
@@ -251,14 +253,34 @@ async def download_asset(self,
251253
252254
Raises:
253255
planet.exceptions.APIError: On API error.
256+
planet.exceptions.ClientError: If location is not valid or retry
257+
limit is exceeded.
258+
254259
"""
255-
async with self._session.stream(method='GET', url=location) as resp:
256-
body = StreamingBody(resp)
257-
dl_path = Path(directory, filename or body.name)
258-
dl_path.parent.mkdir(exist_ok=True, parents=True)
259-
await body.write(dl_path,
260-
overwrite=overwrite,
261-
progress_bar=progress_bar)
260+
response = await self._session.request(method='GET', url=location)
261+
filename = filename or response.filename
262+
length = response.length
263+
if not filename:
264+
raise exceptions.ClientError(
265+
f'Could not determine filename at {location}')
266+
267+
dl_path = Path(directory, filename)
268+
dl_path.parent.mkdir(exist_ok=True, parents=True)
269+
LOGGER.info(f'Downloading {dl_path}')
270+
271+
try:
272+
mode = 'wb' if overwrite else 'xb'
273+
with open(dl_path, mode) as fp:
274+
with tqdm(total=length,
275+
unit_scale=True,
276+
unit_divisor=1024 * 1024,
277+
unit='B',
278+
desc=str(filename),
279+
disable=not progress_bar) as progress:
280+
await self._session.write(location, fp, progress.update)
281+
except FileExistsError:
282+
LOGGER.info(f'File {dl_path} exists, not overwriting')
283+
262284
return dl_path
263285

264286
async def download_order(self,

planet/http.py

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,11 @@
1616
from __future__ import annotations # https://stackoverflow.com/a/33533514
1717
import asyncio
1818
from collections import Counter
19-
from contextlib import asynccontextmanager
2019
from http import HTTPStatus
2120
import logging
2221
import random
2322
import time
24-
from typing import AsyncGenerator, Optional
23+
from typing import Callable, Optional
2524

2625
import httpx
2726
from typing_extensions import Literal
@@ -42,7 +41,7 @@
4241
httpx.ReadTimeout,
4342
httpx.RemoteProtocolError,
4443
exceptions.BadGateway,
45-
exceptions.TooManyRequests
44+
exceptions.TooManyRequests,
4645
]
4746
MAX_RETRIES = 5
4847
MAX_RETRY_BACKOFF = 64 # seconds
@@ -327,6 +326,7 @@ async def _retry(self, func, *a, **kw):
327326
LOGGER.info(f'Retrying: sleeping {wait_time}s')
328327
await asyncio.sleep(wait_time)
329328
else:
329+
LOGGER.info('Retrying: failed')
330330
raise e
331331

332332
self.outcomes.update(['Successful'])
@@ -394,26 +394,32 @@ async def _send(self, request, stream=False) -> httpx.Response:
394394

395395
return http_resp
396396

397-
@asynccontextmanager
398-
async def stream(
399-
self, method: str,
400-
url: str) -> AsyncGenerator[models.StreamingResponse, None]:
401-
"""Submit a request and get the response as a stream context manager.
397+
async def write(self, url: str, fp, callback: Optional[Callable] = None):
398+
"""Write data to local file with limiting and retries.
402399
403400
Parameters:
404-
method: HTTP request method.
405-
url: Location of the API endpoint.
401+
url: Remote location url.
402+
fp: Open write file pointer.
403+
callback: Function that handles write progress updates.
404+
405+
Raises:
406+
planet.exceptions.APIException: On API error.
406407
407-
Returns:
408-
Context manager providing the streaming response.
409408
"""
410-
request = self._client.build_request(method=method, url=url)
411-
http_response = await self._retry(self._send, request, stream=True)
412-
response = models.StreamingResponse(http_response)
413-
try:
414-
yield response
415-
finally:
416-
await response.aclose()
409+
410+
async def _limited_write():
411+
async with self._limiter:
412+
async with self._client.stream('GET', url) as response:
413+
previous = response.num_bytes_downloaded
414+
415+
async for chunk in response.aiter_bytes():
416+
fp.write(chunk)
417+
current = response.num_bytes_downloaded
418+
if callback is not None:
419+
callback(current - previous)
420+
previous = current
421+
422+
await self._retry(_limited_write)
417423

418424
def client(self,
419425
name: Literal['data', 'orders', 'subscriptions'],

planet/models.py

Lines changed: 31 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,11 @@
1414
# limitations under the License.
1515
"""Manage data for requests and responses."""
1616
import logging
17-
import mimetypes
18-
from pathlib import Path
19-
import random
2017
import re
21-
import string
2218
from typing import AsyncGenerator, Callable, List, Optional
2319
from urllib.parse import urlparse
2420

2521
import httpx
26-
from tqdm.asyncio import tqdm
2722

2823
from .exceptions import PagingError
2924

@@ -49,134 +44,59 @@ def status_code(self) -> int:
4944
"""HTTP status code"""
5045
return self._http_response.status_code
5146

52-
def json(self) -> dict:
53-
"""Response json"""
54-
return self._http_response.json()
55-
56-
57-
class StreamingResponse(Response):
58-
59-
@property
60-
def headers(self) -> httpx.Headers:
61-
return self._http_response.headers
62-
6347
@property
64-
def url(self) -> str:
65-
return str(self._http_response.url)
48+
def filename(self) -> Optional[str]:
49+
"""Name of the download file.
6650
67-
@property
68-
def num_bytes_downloaded(self) -> int:
69-
return self._http_response.num_bytes_downloaded
51+
The filename is None if the response does not represent a download.
52+
"""
53+
filename = None
7054

71-
async def aiter_bytes(self):
72-
async for c in self._http_response.aiter_bytes():
73-
yield c
55+
if self.length is not None: # is a download file
56+
filename = _get_filename_from_response(self._http_response)
7457

75-
async def aclose(self):
76-
await self._http_response.aclose()
58+
return filename
7759

60+
@property
61+
def length(self) -> Optional[int]:
62+
"""Length of the download file.
7863
79-
class StreamingBody:
80-
"""A representation of a streaming resource from the API."""
64+
The length is None if the response does not represent a download.
65+
"""
66+
LOGGER.warning('here')
67+
try:
68+
length = int(self._http_response.headers["Content-Length"])
69+
except KeyError:
70+
length = None
71+
LOGGER.warning(length)
72+
return length
8173

82-
def __init__(self, response: StreamingResponse):
83-
"""Initialize the object.
74+
def json(self) -> dict:
75+
"""Response json"""
76+
return self._http_response.json()
8477

85-
Parameters:
86-
response: Response that was received from the server.
87-
"""
88-
self._response = response
8978

90-
@property
91-
def name(self) -> str:
92-
"""The name of this resource.
79+
def _get_filename_from_response(response) -> Optional[str]:
80+
"""The name of the response resource.
9381
9482
The default is to use the content-disposition header value from the
9583
response. If not found, falls back to resolving the name from the url
9684
or generating a random name with the type from the response.
9785
"""
98-
name = (_get_filename_from_headers(self._response.headers)
99-
or _get_filename_from_url(self._response.url)
100-
or _get_random_filename(
101-
self._response.headers.get('content-type')))
102-
return name
103-
104-
@property
105-
def size(self) -> int:
106-
"""The size of the body."""
107-
return int(self._response.headers['Content-Length'])
108-
109-
async def write(self,
110-
filename: Path,
111-
overwrite: bool = True,
112-
progress_bar: bool = True):
113-
"""Write the body to a file.
114-
Parameters:
115-
filename: Name to assign to downloaded file.
116-
overwrite: Overwrite any existing files.
117-
progress_bar: Show progress bar during download.
118-
"""
119-
120-
class _LOG:
121-
122-
def __init__(self, total, unit, filename, disable):
123-
self.total = total
124-
self.unit = unit
125-
self.disable = disable
126-
self.previous = 0
127-
self.filename = str(filename)
128-
129-
if not self.disable:
130-
LOGGER.debug(f'writing to {self.filename}')
131-
132-
def update(self, new):
133-
if new - self.previous > self.unit and not self.disable:
134-
# LOGGER.debug(f'{new-self.previous}')
135-
perc = int(100 * new / self.total)
136-
LOGGER.debug(f'{self.filename}: '
137-
f'wrote {perc}% of {self.total}')
138-
self.previous = new
86+
name = (_get_filename_from_headers(response.headers)
87+
or _get_filename_from_url(str(response.url)))
88+
return name
13989

140-
unit = 1024 * 1024
14190

142-
mode = 'wb' if overwrite else 'xb'
143-
try:
144-
with open(filename, mode) as fp:
145-
_log = _LOG(self.size,
146-
16 * unit,
147-
filename,
148-
disable=progress_bar)
149-
with tqdm(total=self.size,
150-
unit_scale=True,
151-
unit_divisor=unit,
152-
unit='B',
153-
desc=str(filename),
154-
disable=not progress_bar) as progress:
155-
previous = self._response.num_bytes_downloaded
156-
async for chunk in self._response.aiter_bytes():
157-
fp.write(chunk)
158-
new = self._response.num_bytes_downloaded
159-
_log.update(new)
160-
progress.update(new - previous)
161-
previous = new
162-
except FileExistsError:
163-
LOGGER.info(f'File {filename} exists, not overwriting')
164-
165-
166-
def _get_filename_from_headers(headers):
167-
"""Get a filename from the Content-Disposition header, if available.
168-
169-
:param headers dict: a ``dict`` of response headers
170-
:returns: a filename (i.e. ``basename``)
171-
:rtype: str or None
172-
"""
91+
def _get_filename_from_headers(headers: httpx.Headers) -> Optional[str]:
92+
"""Get a filename from the Content-Disposition header, if available."""
17393
cd = headers.get('content-disposition', '')
17494
match = re.search('filename="?([^"]+)"?', cd)
17595
return match.group(1) if match else None
17696

17797

17898
def _get_filename_from_url(url: str) -> Optional[str]:
179-
"""Get a filename from a url.
99+
"""Get a filename from the url.
180100
181101
Getting a name for Landsat imagery uses this function.
182102
"""
@@ -185,19 +105,6 @@ def _get_filename_from_url(url: str) -> Optional[str]:
185105
return name or None
186106

187107

188-
def _get_random_filename(content_type=None):
189-
"""Get a pseudo-random, Planet-looking filename.
190-
191-
:returns: a filename (i.e. ``basename``)
192-
:rtype: str
193-
"""
194-
extension = mimetypes.guess_extension(content_type or '') or ''
195-
characters = string.ascii_letters + '0123456789'
196-
letters = ''.join(random.sample(characters, 8))
197-
name = 'planet-{}{}'.format(letters, extension)
198-
return name
199-
200-
201108
class Paged:
202109
"""Asynchronous iterator over results in a paged resource.
203110

0 commit comments

Comments
 (0)