-
Notifications
You must be signed in to change notification settings - Fork 5.9k
/
Copy pathgitlab_service.py
192 lines (161 loc) Β· 6.04 KB
/
gitlab_service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import os
from typing import Any
from urllib.parse import quote_plus
import httpx
from pydantic import SecretStr
from openhands.integrations.service_types import (
AuthenticationError,
GitService,
ProviderType,
Repository,
UnknownException,
User,
)
from openhands.utils.import_utils import get_impl
class GitLabService(GitService):
BASE_URL = 'https://gitlab.com/api/v4'
token: SecretStr = SecretStr('')
refresh = False
def __init__(
self,
user_id: str | None = None,
external_auth_id: str | None = None,
external_auth_token: SecretStr | None = None,
token: SecretStr | None = None,
external_token_manager: bool = False,
):
self.user_id = user_id
self.external_token_manager = external_token_manager
if token:
self.token = token
async def _get_gitlab_headers(self) -> dict:
"""
Retrieve the GitLab Token to construct the headers
"""
if self.user_id and not self.token:
self.token = await self.get_latest_token()
return {
'Authorization': f'Bearer {self.token.get_secret_value()}',
}
def _has_token_expired(self, status_code: int) -> bool:
return status_code == 401
async def get_latest_token(self) -> SecretStr | None:
return self.token
async def _fetch_data(
self, url: str, params: dict | None = None
) -> tuple[Any, dict]:
try:
async with httpx.AsyncClient() as client:
gitlab_headers = await self._get_gitlab_headers()
response = await client.get(url, headers=gitlab_headers, params=params)
if self.refresh and self._has_token_expired(response.status_code):
await self.get_latest_token()
gitlab_headers = await self._get_gitlab_headers()
response = await client.get(
url, headers=gitlab_headers, params=params
)
response.raise_for_status()
headers = {}
if 'Link' in response.headers:
headers['Link'] = response.headers['Link']
return response.json(), headers
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
raise AuthenticationError('Invalid GitLab token')
raise UnknownException('Unknown error')
except httpx.HTTPError:
raise UnknownException('Unknown error')
async def get_user(self) -> User:
url = f'{self.BASE_URL}/user'
response, _ = await self._fetch_data(url)
return User(
id=response.get('id'),
username=response.get('username'),
avatar_url=response.get('avatar_url'),
name=response.get('name'),
email=response.get('email'),
company=response.get('organization'),
login=response.get('username'),
)
async def search_repositories(
self, query: str, per_page: int = 30, sort: str = 'updated', order: str = 'desc'
) -> list[Repository]:
url = f'{self.BASE_URL}/projects'
params = {
'search': query,
'per_page': per_page,
'order_by': 'last_activity_at',
'sort': order,
'visibility': 'public',
}
response, _ = await self._fetch_data(url, params)
repos = [
Repository(
id=repo.get('id'),
full_name=repo.get('path_with_namespace'),
stargazers_count=repo.get('star_count'),
git_provider=ProviderType.GITLAB,
)
for repo in response
]
return repos
async def get_repositories(
self, sort: str, installation_id: int | None
) -> list[Repository]:
# if installation_id:
# return [] # Not implementing installation_token case yet
MAX_REPOS = 1000
PER_PAGE = 100 # Maximum allowed by GitLab API
all_repos: list[dict] = []
page = 1
url = f'{self.BASE_URL}/projects'
# Map GitHub's sort values to GitLab's order_by values
order_by = {
'pushed': 'last_activity_at',
'updated': 'last_activity_at',
'created': 'created_at',
'full_name': 'name',
}.get(sort, 'last_activity_at')
while len(all_repos) < MAX_REPOS:
params = {
'page': str(page),
'per_page': str(PER_PAGE),
'order_by': order_by,
'sort': 'desc', # GitLab uses sort for direction (asc/desc)
'owned': 1, # Use 1 instead of True
'membership': 1, # Use 1 instead of True
}
response, headers = await self._fetch_data(url, params)
if not response: # No more repositories
break
all_repos.extend(response)
page += 1
# Check if we've reached the last page
link_header = headers.get('Link', '')
if 'rel="next"' not in link_header:
break
# Trim to MAX_REPOS if needed and convert to Repository objects
all_repos = all_repos[:MAX_REPOS]
return [
Repository(
id=repo.get('id'),
full_name=repo.get('path_with_namespace'),
stargazers_count=repo.get('star_count'),
git_provider=ProviderType.GITLAB,
)
for repo in all_repos
]
async def does_repo_exist(self, repository: str) -> bool:
encoded_repo = quote_plus(repository)
url = f'{self.BASE_URL}/projects/{encoded_repo}'
try:
await self._fetch_data(url)
return True
except Exception as e:
print(e)
return False
gitlab_service_cls = os.environ.get(
'OPENHANDS_GITLAB_SERVICE_CLS',
'openhands.integrations.gitlab.gitlab_service.GitLabService',
)
GitLabServiceImpl = get_impl(GitLabService, gitlab_service_cls)