Skip to content

feat: source_uuid to multiscan #147

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### Added

- New `GGClient.scan_and_create_incidents()` function that scans content for secrets and automatically creates incidents for any findings.
68 changes: 68 additions & 0 deletions pygitguardian/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from io import BytesIO
from pathlib import Path
from typing import Any, Dict, List, Optional, Union, cast
from uuid import UUID

import requests
from requests import Response, Session, codes
Expand All @@ -30,6 +31,7 @@
Detail,
Document,
DocumentSchema,
DocumentsForIncidentCreation,
HealthCheckResponse,
HoneytokenResponse,
HoneytokenWithContextResponse,
Expand Down Expand Up @@ -510,6 +512,7 @@ def multi_content_scan(
else:
raise TypeError("each document must be a dict")

# Validate documents using DocumentSchema
for document in request_obj:
DocumentSchema.validate_size(
document, self.secret_scan_preferences.maximum_document_size
Expand Down Expand Up @@ -538,6 +541,71 @@ def multi_content_scan(

return obj

def scan_and_create_incidents(
self,
documents: List[Dict[str, str]],
source_uuid: UUID,
extra_headers: Optional[Dict[str, str]] = None,
params: Optional[Dict[str, Any]] = None,
) -> Union[Detail, MultiScanResult]:
Comment on lines +544 to +550
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need that params argument? I think it would be better to have properly typed optional arguments than a pass-through like this.

Also, can you mark optional arguments as keyword-only by adding a * before extra_headers so that we can safely reorder them if need be?

"""
scan_and_create_incidents handles the /scan/create-incidents endpoint of the API.

If documents contain `0` bytes, they will be replaced with the ASCII substitute
character.

:param documents: List of dictionaries containing the keys document
and, optionally, filename.
example: [{"document":"example content","filename":"intro.py"}]
:param source_uuid: the source UUID that will be used to identify the custom source, for which
incidents will be created
:param extra_headers: additional headers to add to the request
:param params: additional parameters to add to the request
:return: Detail or ScanResult response and status code
"""
max_documents = self.secret_scan_preferences.maximum_documents_per_scan
if len(documents) > max_documents:
raise ValueError(
f"too many documents submitted for scan (max={max_documents})"
)

if all(isinstance(doc, dict) for doc in documents):
# Create Document objects directly from the input dictionaries
document_objects = [
Document(document=doc["document"], filename=doc.get("filename"))
for doc in documents
]
else:
raise TypeError("each document must be a dict")

# Validate documents using DocumentSchema
for document in document_objects:
DocumentSchema.validate_size(
{"document": document.document, "filename": document.filename},
self.secret_scan_preferences.maximum_document_size,
)

payload = DocumentsForIncidentCreation(
documents=document_objects, source_uuid=source_uuid
)

resp = self.post(
endpoint="scan/create-incidents",
data=payload.SCHEMA.dump(payload),
extra_headers=extra_headers,
params=params,
)

obj: Union[Detail, MultiScanResult]
if is_ok(resp):
obj = MultiScanResult.from_dict({"scan_results": resp.json()})
else:
obj = load_detail(resp)

obj.status_code = resp.status_code

return obj

def retrieve_secret_incident(
self, incident_id: int, with_occurrences: int = 20
) -> Union[Detail, SecretIncident]:
Expand Down
38 changes: 38 additions & 0 deletions pygitguardian/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,43 @@ def __repr__(self) -> str:
return f"filename:{self.filename}, document:{self.document}"


class DocumentsForIncidentCreationSchema(BaseSchema):
documents = fields.List(fields.Nested(DocumentSchema), required=True)
source_uuid = fields.UUID(required=True)

@post_dump
def transform_filename_to_document_identifier(
self, data: Dict[str, Any], **kwargs: Any
) -> Dict[str, Any]:
"""Transform filename field to document_identifier in the documents list"""
if "documents" in data:
for document in data["documents"]:
if "filename" in document:
document["document_identifier"] = document.pop("filename")
return data


class DocumentsForIncidentCreation(Base):
"""
DocumentsForIncidentCreation is a request object for communicating a list of documents
along with a source UUID to the API for incident creation

Attributes:
documents (List[Document]): list of documents to scan
source_uuid (UUID): UUID identifying the source
"""

SCHEMA = DocumentsForIncidentCreationSchema()

def __init__(self, documents: List[Document], source_uuid: UUID, **kwargs: Any):
super().__init__()
self.documents = documents
self.source_uuid = source_uuid

def __repr__(self) -> str:
return f"documents:{len(self.documents)}, source_uuid:{self.source_uuid}"
Comment on lines +99 to +133
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can avoid adding these classes: scan_and_create_incidents() can do something like this:

payload = {
    "source_uuid": source_uuid,
    "documents": [{
        "document_identifier": x["filename],
        "document": x["document"],
    } for x in documents]
}



class DetailSchema(BaseSchema):
detail = fields.String(required=True)

Expand Down Expand Up @@ -757,6 +794,7 @@ class TokenScope(str, Enum):
CUSTOM_TAGS_READ = "custom_tags:read"
CUSTOM_TAGS_WRITE = "custom_tags:write"
SECRET_READ = "secrets:read"
SCAN_CREATE_INCIDENTS = "scan:create-incidents"


class APITokensResponseSchema(BaseSchema):
Expand Down
88 changes: 88 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,94 @@ def test_multiscan_parameters(client: GGClient, ignore_known_secrets, all_secret
assert mock_response.call_count == 1


@responses.activate
@pytest.mark.parametrize("params", (None, {"param1": "value1"}))
def test_scan_and_create_incidents_parameters(client: GGClient, params):
"""
GIVEN a ggclient
WHEN calling scan_and_create_incidents with parameters
THEN the parameters are passed in the request
"""

to_match = {}
if params is not None:
to_match.update(params)

mock_response = responses.post(
url=client._url_from_endpoint("scan/create-incidents", "v1"),
status=200,
match=[matchers.query_param_matcher(to_match)],
json=[
{
"policy_break_count": 1,
"policies": ["pol"],
"policy_breaks": [
{
"type": "break",
"detector_name": "break",
"detector_group_name": "break",
"documentation_url": None,
"policy": "mypol",
"matches": [
{
"match": "hello",
"type": "hello",
}
],
}
],
}
],
)

client.scan_and_create_incidents(
[{"filename": FILENAME, "document": DOCUMENT}],
source_uuid="123e4567-e89b-12d3-a456-426614174000",
params=params,
)

assert mock_response.call_count == 1


@responses.activate
def test_scan_and_create_incidents_payload_structure(client: GGClient):
"""
GIVEN a ggclient
WHEN calling scan_and_create_incidents
THEN the payload is structured correctly with documents and source_uuid
"""

documents = [{"filename": FILENAME, "document": DOCUMENT}]
source_uuid = "123e4567-e89b-12d3-a456-426614174000"

expected_payload = {
"documents": [
{
"document": DOCUMENT,
"document_identifier": FILENAME,
}
],
"source_uuid": source_uuid,
}

mock_response = responses.post(
url=client._url_from_endpoint("scan/create-incidents", "v1"),
status=200,
match=[matchers.json_params_matcher(expected_payload)],
json=[
{
"policy_break_count": 0,
"policies": ["pol"],
"policy_breaks": [],
}
],
)

client.scan_and_create_incidents(documents, source_uuid)

assert mock_response.call_count == 1


@responses.activate
def test_retrieve_secret_incident(client: GGClient):
"""
Expand Down
Loading