-
Notifications
You must be signed in to change notification settings - Fork 5.9k
/
Copy pathhttp_session.py
63 lines (48 loc) · 2.14 KB
/
http_session.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from dataclasses import dataclass, field
from typing import Any, MutableMapping
import httpx
from openhands.core.logger import openhands_logger as logger
CLIENT = httpx.Client()
@dataclass
class HttpSession:
"""
request.Session is reusable after it has been closed. This behavior makes it
likely to leak file descriptors (Especially when combined with tenacity).
We wrap the session to make it unusable after being closed
"""
_is_closed: bool = False
headers: MutableMapping[str, str] = field(default_factory=dict)
def request(self, *args: Any, **kwargs: Any) -> httpx.Response:
if self._is_closed:
logger.error(
'Session is being used after close!', stack_info=True, exc_info=True
)
self._is_closed = False
headers = kwargs.get('headers') or {}
headers = {**self.headers, **headers}
kwargs['headers'] = headers
return CLIENT.request(*args, **kwargs)
def stream(self, *args: Any, **kwargs: Any) -> httpx.Response:
if self._is_closed:
logger.error(
'Session is being used after close!', stack_info=True, exc_info=True
)
self._is_closed = False
headers = kwargs.get('headers') or {}
headers = {**self.headers, **headers}
kwargs['headers'] = headers
return CLIENT.stream(*args, **kwargs)
def get(self, *args: Any, **kwargs: Any) -> httpx.Response:
return self.request('GET', *args, **kwargs)
def post(self, *args: Any, **kwargs: Any) -> httpx.Response:
return self.request('POST', *args, **kwargs)
def patch(self, *args: Any, **kwargs: Any) -> httpx.Response:
return self.request('PATCH', *args, **kwargs)
def put(self, *args: Any, **kwargs: Any) -> httpx.Response:
return self.request('PUT', *args, **kwargs)
def delete(self, *args: Any, **kwargs: Any) -> httpx.Response:
return self.request('DELETE', *args, **kwargs)
def options(self, *args: Any, **kwargs: Any) -> httpx.Response:
return self.request('OPTIONS', *args, **kwargs)
def close(self) -> None:
self._is_closed = True