diff --git a/changelog.d/20250604_143935_alina.tuholukova_add_source_id_parameter_for_multiscan.md b/changelog.d/20250604_143935_alina.tuholukova_add_source_id_parameter_for_multiscan.md new file mode 100644 index 00000000..e51e9080 --- /dev/null +++ b/changelog.d/20250604_143935_alina.tuholukova_add_source_id_parameter_for_multiscan.md @@ -0,0 +1,3 @@ +### Added + +- New `GGClient.scan_and_create_incidents()` function that scans content for secrets and automatically creates incidents for any findings. diff --git a/pygitguardian/client.py b/pygitguardian/client.py index 7ccdabde..a9a6e322 100644 --- a/pygitguardian/client.py +++ b/pygitguardian/client.py @@ -8,6 +8,7 @@ from io import BytesIO from pathlib import Path from typing import Any, Dict, List, Optional, Union, cast +from uuid import UUID import requests from requests import Response, Session, codes @@ -30,6 +31,7 @@ Detail, Document, DocumentSchema, + DocumentsForIncidentCreation, HealthCheckResponse, HoneytokenResponse, HoneytokenWithContextResponse, @@ -510,6 +512,7 @@ def multi_content_scan( else: raise TypeError("each document must be a dict") + # Validate documents using DocumentSchema for document in request_obj: DocumentSchema.validate_size( document, self.secret_scan_preferences.maximum_document_size @@ -538,6 +541,71 @@ def multi_content_scan( return obj + def scan_and_create_incidents( + self, + documents: List[Dict[str, str]], + source_uuid: UUID, + extra_headers: Optional[Dict[str, str]] = None, + params: Optional[Dict[str, Any]] = None, + ) -> Union[Detail, MultiScanResult]: + """ + scan_and_create_incidents handles the /scan/create-incidents endpoint of the API. + + If documents contain `0` bytes, they will be replaced with the ASCII substitute + character. + + :param documents: List of dictionaries containing the keys document + and, optionally, filename. + example: [{"document":"example content","filename":"intro.py"}] + :param source_uuid: the source UUID that will be used to identify the custom source, for which + incidents will be created + :param extra_headers: additional headers to add to the request + :param params: additional parameters to add to the request + :return: Detail or ScanResult response and status code + """ + max_documents = self.secret_scan_preferences.maximum_documents_per_scan + if len(documents) > max_documents: + raise ValueError( + f"too many documents submitted for scan (max={max_documents})" + ) + + if all(isinstance(doc, dict) for doc in documents): + # Create Document objects directly from the input dictionaries + document_objects = [ + Document(document=doc["document"], filename=doc.get("filename")) + for doc in documents + ] + else: + raise TypeError("each document must be a dict") + + # Validate documents using DocumentSchema + for document in document_objects: + DocumentSchema.validate_size( + {"document": document.document, "filename": document.filename}, + self.secret_scan_preferences.maximum_document_size, + ) + + payload = DocumentsForIncidentCreation( + documents=document_objects, source_uuid=source_uuid + ) + + resp = self.post( + endpoint="scan/create-incidents", + data=payload.SCHEMA.dump(payload), + extra_headers=extra_headers, + params=params, + ) + + obj: Union[Detail, MultiScanResult] + if is_ok(resp): + obj = MultiScanResult.from_dict({"scan_results": resp.json()}) + else: + obj = load_detail(resp) + + obj.status_code = resp.status_code + + return obj + def retrieve_secret_incident( self, incident_id: int, with_occurrences: int = 20 ) -> Union[Detail, SecretIncident]: diff --git a/pygitguardian/models.py b/pygitguardian/models.py index 0babee2c..092574c6 100644 --- a/pygitguardian/models.py +++ b/pygitguardian/models.py @@ -96,6 +96,43 @@ def __repr__(self) -> str: return f"filename:{self.filename}, document:{self.document}" +class DocumentsForIncidentCreationSchema(BaseSchema): + documents = fields.List(fields.Nested(DocumentSchema), required=True) + source_uuid = fields.UUID(required=True) + + @post_dump + def transform_filename_to_document_identifier( + self, data: Dict[str, Any], **kwargs: Any + ) -> Dict[str, Any]: + """Transform filename field to document_identifier in the documents list""" + if "documents" in data: + for document in data["documents"]: + if "filename" in document: + document["document_identifier"] = document.pop("filename") + return data + + +class DocumentsForIncidentCreation(Base): + """ + DocumentsForIncidentCreation is a request object for communicating a list of documents + along with a source UUID to the API for incident creation + + Attributes: + documents (List[Document]): list of documents to scan + source_uuid (UUID): UUID identifying the source + """ + + SCHEMA = DocumentsForIncidentCreationSchema() + + def __init__(self, documents: List[Document], source_uuid: UUID, **kwargs: Any): + super().__init__() + self.documents = documents + self.source_uuid = source_uuid + + def __repr__(self) -> str: + return f"documents:{len(self.documents)}, source_uuid:{self.source_uuid}" + + class DetailSchema(BaseSchema): detail = fields.String(required=True) @@ -757,6 +794,7 @@ class TokenScope(str, Enum): CUSTOM_TAGS_READ = "custom_tags:read" CUSTOM_TAGS_WRITE = "custom_tags:write" SECRET_READ = "secrets:read" + SCAN_CREATE_INCIDENTS = "scan:create-incidents" class APITokensResponseSchema(BaseSchema): diff --git a/tests/test_client.py b/tests/test_client.py index ca006a81..46b0e95e 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -606,6 +606,94 @@ def test_multiscan_parameters(client: GGClient, ignore_known_secrets, all_secret assert mock_response.call_count == 1 +@responses.activate +@pytest.mark.parametrize("params", (None, {"param1": "value1"})) +def test_scan_and_create_incidents_parameters(client: GGClient, params): + """ + GIVEN a ggclient + WHEN calling scan_and_create_incidents with parameters + THEN the parameters are passed in the request + """ + + to_match = {} + if params is not None: + to_match.update(params) + + mock_response = responses.post( + url=client._url_from_endpoint("scan/create-incidents", "v1"), + status=200, + match=[matchers.query_param_matcher(to_match)], + json=[ + { + "policy_break_count": 1, + "policies": ["pol"], + "policy_breaks": [ + { + "type": "break", + "detector_name": "break", + "detector_group_name": "break", + "documentation_url": None, + "policy": "mypol", + "matches": [ + { + "match": "hello", + "type": "hello", + } + ], + } + ], + } + ], + ) + + client.scan_and_create_incidents( + [{"filename": FILENAME, "document": DOCUMENT}], + source_uuid="123e4567-e89b-12d3-a456-426614174000", + params=params, + ) + + assert mock_response.call_count == 1 + + +@responses.activate +def test_scan_and_create_incidents_payload_structure(client: GGClient): + """ + GIVEN a ggclient + WHEN calling scan_and_create_incidents + THEN the payload is structured correctly with documents and source_uuid + """ + + documents = [{"filename": FILENAME, "document": DOCUMENT}] + source_uuid = "123e4567-e89b-12d3-a456-426614174000" + + expected_payload = { + "documents": [ + { + "document": DOCUMENT, + "document_identifier": FILENAME, + } + ], + "source_uuid": source_uuid, + } + + mock_response = responses.post( + url=client._url_from_endpoint("scan/create-incidents", "v1"), + status=200, + match=[matchers.json_params_matcher(expected_payload)], + json=[ + { + "policy_break_count": 0, + "policies": ["pol"], + "policy_breaks": [], + } + ], + ) + + client.scan_and_create_incidents(documents, source_uuid) + + assert mock_response.call_count == 1 + + @responses.activate def test_retrieve_secret_incident(client: GGClient): """