|
140 | 140 | Protocol,
|
141 | 141 | )
|
142 | 142 | from enum import Enum
|
| 143 | +from collections import defaultdict |
143 | 144 | from difflib import SequenceMatcher
|
144 | 145 | from http.client import HTTPSConnection, HTTPResponse
|
145 | 146 | from dataclasses import dataclass
|
@@ -757,62 +758,116 @@ def get_team_members(self, org: str, team: Team) -> Iterable[TeamMember]:
|
757 | 758 | team_name=team.name,
|
758 | 759 | )
|
759 | 760 |
|
760 |
| - def get_repository_teams( |
761 |
| - self, org: str, repo: str |
762 |
| - ) -> Iterable[TeamRepositoryAccess]: |
763 |
| - teams = self._http_get_json_paginated(f"/repos/{org}/{repo}/teams") |
764 |
| - for team in teams: |
765 |
| - permissions: Dict[str, bool] = team["permissions"] |
766 |
| - yield TeamRepositoryAccess( |
767 |
| - team_name=team["name"], |
768 |
| - role=RepositoryAccessRole.from_permissions_dict(permissions), |
769 |
| - ) |
| 761 | + def get_organization_repo_to_teams_map(self, org: str) -> dict[str, [TeamRepositoryaccess]]: |
| 762 | + query = """ |
| 763 | + query($org: String!) { |
| 764 | + organization(login: $org) { |
| 765 | + teams(first: 100) { |
| 766 | + nodes { |
| 767 | + name |
| 768 | + repositories { |
| 769 | + edges { |
| 770 | + permission |
| 771 | + node { |
| 772 | + databaseId |
| 773 | + } |
| 774 | + } |
| 775 | + pageInfo { |
| 776 | + hasNextPage |
| 777 | + } |
| 778 | + totalCount |
| 779 | + } |
| 780 | + } |
| 781 | + pageInfo { |
| 782 | + hasNextPage |
| 783 | + } |
| 784 | + } |
| 785 | + } |
| 786 | + } |
| 787 | + """ |
| 788 | + variables = { "org": org } |
| 789 | + response = self._http_graphql(query, variables) |
770 | 790 |
|
771 |
| - def get_repository_users( |
772 |
| - self, org: str, repo: str |
773 |
| - ) -> Iterable[UserRepositoryAccess]: |
774 |
| - # We query with affiliation=direct to get all users that have explicit |
775 |
| - # access to the repository (i.e. not those who have implicit access |
776 |
| - # through being a member of a group). The default is affiliation=all, |
777 |
| - # which also returns users with implicit access. |
778 |
| - users = self._http_get_json_paginated(f"/repos/{org}/{repo}/collaborators?affiliation=direct") |
779 |
| - for user in users: |
780 |
| - permissions: Dict[str, bool] = user["permissions"] |
781 |
| - yield UserRepositoryAccess( |
782 |
| - user_id=user["id"], |
783 |
| - user_name=user["login"], |
784 |
| - role=RepositoryAccessRole.from_permissions_dict(permissions), |
785 |
| - ) |
| 791 | + teams = response['organization']['teams'] |
| 792 | + # Assume we have less than 100 teams and skip pagination |
| 793 | + assert(teams['pageInfo']['hasNextPage'] == False) |
| 794 | + |
| 795 | + repo_to_teams: defaultdict[str, [TeamRepositoryaccess]] = defaultdict(list) |
| 796 | + |
| 797 | + #TODO pagination |
| 798 | + for team in teams['nodes']: |
| 799 | + for repo in team['repositories']['edges']: |
| 800 | + repo_to_teams[repo['node']['databaseId']].append(TeamRepositoryAccess( |
| 801 | + team_name=team['name'], |
| 802 | + role=RepositoryAccessRole(repo['permission'].lower()) |
| 803 | + )) |
| 804 | + |
| 805 | + return dict(repo_to_teams) |
786 | 806 |
|
787 | 807 | def get_organization_repositories(self, org: str) -> Iterable[Repository]:
|
788 |
| - # Listing repositories is a slow endpoint, and paginated as well, print |
789 |
| - # some progress. Technically from the pagination headers we could |
790 |
| - # extract more precise progress, but I am not going to bother. |
791 |
| - print_status_stderr("[1 / ??] Listing organization repositories") |
792 |
| - repos = [] |
793 |
| - for i, more_repos in enumerate( |
794 |
| - self._http_get_json_paginated(f"/orgs/{org}/repos?per_page=100") |
795 |
| - ): |
796 |
| - repos.append(more_repos) |
797 |
| - print_status_stderr( |
798 |
| - f"[{len(repos)} / ??] Listing organization repositories" |
799 |
| - ) |
800 |
| - # Materialize to a list so we know the total so we can show a progress |
801 |
| - # counter. |
802 |
| - n = len(repos) |
803 |
| - for i, repo in enumerate(repos): |
804 |
| - name = repo["name"] |
805 |
| - print_status_stderr(f"[{i + 1} / {n}] Getting access on {name}") |
806 |
| - user_access = tuple(sorted(self.get_repository_users(org, name))) |
807 |
| - team_access = tuple(sorted(self.get_repository_teams(org, name))) |
| 808 | + query = """ |
| 809 | + query($org: String!) { |
| 810 | + organization(login: $org) { |
| 811 | + repositories(first:100) { |
| 812 | + nodes { |
| 813 | + databaseId |
| 814 | + name |
| 815 | + visibility |
| 816 | + # We query with affiliation=direct to get all users that have explicit |
| 817 | + # access to the repository (i.e. not those who have implicit access |
| 818 | + # through being a member of a group). The default is affiliation=all, |
| 819 | + # which also returns users with implicit access. |
| 820 | + collaborators(affiliation: DIRECT, first: 100) { |
| 821 | + edges { |
| 822 | + node { |
| 823 | + databaseId |
| 824 | + login |
| 825 | + } |
| 826 | + permission |
| 827 | + } |
| 828 | + pageInfo { |
| 829 | + hasNextPage |
| 830 | + } |
| 831 | + } |
| 832 | + } |
| 833 | + pageInfo { |
| 834 | + hasNextPage |
| 835 | + endCursor |
| 836 | + } |
| 837 | + totalCount |
| 838 | + } |
| 839 | + } |
| 840 | + } |
| 841 | + """ |
| 842 | + variables = { "org": org } |
| 843 | + response = self._http_graphql(query, variables) |
| 844 | + |
| 845 | + repo_to_teams = self.get_organization_repo_to_teams_map(org) |
| 846 | + |
| 847 | + repos = response['organization']['repositories'] |
| 848 | + # TODO: handle pagination |
| 849 | + |
| 850 | + for repo in repos['nodes']: |
| 851 | + repo_id = repo['databaseId'] |
| 852 | + |
| 853 | + collaborators = repo['collaborators'] |
| 854 | + # Assume we have less than 100 directs collaborators to any repo and skip pagination |
| 855 | + assert(collaborators['pageInfo']['hasNextPage'] == False) |
| 856 | + user_access = tuple(sorted(UserRepositoryAccess( |
| 857 | + user_id=collaborator['node']['databaseId'], |
| 858 | + user_name=collaborator['node']['login'], |
| 859 | + role=RepositoryAccessRole(collaborator['permission'].lower()), |
| 860 | + ) for collaborator in collaborators['edges'])) |
| 861 | + |
| 862 | + team_access = tuple(sorted(repo_to_teams[repo_id])) |
| 863 | + |
808 | 864 | yield Repository(
|
809 |
| - repo_id=repo["id"], |
810 |
| - name=name, |
811 |
| - visibility=RepositoryVisibility(repo["visibility"]), |
| 865 | + repo_id=repo_id, |
| 866 | + name=repo['name'], |
| 867 | + visibility=RepositoryVisibility(repo["visibility"].lower()), |
812 | 868 | user_access=user_access,
|
813 | 869 | team_access=team_access,
|
814 | 870 | )
|
815 |
| - print_status_stderr("") |
816 | 871 |
|
817 | 872 |
|
818 | 873 | def print_indented(lines: str) -> None:
|
|
0 commit comments