From c98c8e2120631d0b8c71aad8eee743bc48eadea7 Mon Sep 17 00:00:00 2001 From: Tom Conner Date: Wed, 22 Jan 2025 21:57:38 -0500 Subject: [PATCH 1/4] match new hash pattern, log when replacing URI paths --- zap/src/scan.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/zap/src/scan.py b/zap/src/scan.py index 5874a8f3..d7edfea1 100755 --- a/zap/src/scan.py +++ b/zap/src/scan.py @@ -348,7 +348,8 @@ def slack_alert_without_report( # pylint: disable=too-many-arguments logging.info("Alert sent to Slack channel for DefectDojo upload report") # match a hash after a hyphen or dot, and only match 8 or 9 characters of hex -URI_HASH_REGEX = re.compile(r"[-\.][a-zA-Z0-9]{8,9}(?![a-fA-F0-9])") +URI_HASH_REGEX1 = re.compile(r"[-\.][a-zA-Z0-9]{8,9}(?![a-fA-F0-9])") +URI_HASH_REGEX2 = re.compile(r"index-\w{7}-\.js") # /assets/index-4au49BA-.js def clean_uri_path(xml_report): """ @@ -360,7 +361,11 @@ def clean_uri_path(xml_report): # this should remove the hash. for uri in root.iter('uri'): r=urlparse(uri.text) - r=r._replace(path=URI_HASH_REGEX.sub('', r.path)) + r_prev = r + r=r._replace(path=URI_HASH_REGEX1.sub('', r.path)) + r=r._replace(path=URI_HASH_REGEX2.sub('index-hash-.js', r.path)) + if r != r_prev: + logging.info("URI %s -> %s", r_prev, r) uri.text = urlunparse(r) tree.write(xml_report) From 71ad647f5e67228f61719860459b16709a557c4a Mon Sep 17 00:00:00 2001 From: Tom Conner Date: Thu, 30 Jan 2025 14:54:00 -0500 Subject: [PATCH 2/4] ignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index ceaa75d3..dfbac5b6 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ zap/.python-version .vscode/ .vscode/settings.json .env +cis/.python-version From 11af1580ce5fcc977be18407abd89d7d4b5896f1 Mon Sep 17 00:00:00 2001 From: Tom Conner Date: Wed, 5 Feb 2025 16:26:49 -0500 Subject: [PATCH 3/4] remove hash --- zap/src/scan.py | 333 ++++++++++++++++++++++++++++-------------------- 1 file changed, 195 insertions(+), 138 deletions(-) diff --git a/zap/src/scan.py b/zap/src/scan.py index ec7ae580..f1706489 100755 --- a/zap/src/scan.py +++ b/zap/src/scan.py @@ -31,20 +31,23 @@ def fetch_dojo_product_name(defect_dojo, defect_dojo_user, defect_dojo_key, prod Includes retries as dojo sometimes does not respond. """ dojo = defectdojo.DefectDojoAPIv2( - defect_dojo, defect_dojo_key, defect_dojo_user, debug=False, timeout=120) - max_retries = int(getenv("MAX_RETRIES", '6')) + defect_dojo, defect_dojo_key, defect_dojo_user, debug=False, timeout=120 + ) + max_retries = int(getenv("MAX_RETRIES", "6")) retry_delay = 30 for attempt in range(max_retries): try: product = dojo.get_product(product_id=product_id) return product.data["name"] - except Exception: # pylint: disable=broad-except + except Exception: # pylint: disable=broad-except logging.info(product.message) sleep(retry_delay) raise RuntimeError("Maximum retry attempts reached") -def upload_gcs(bucket_name: str, scan_type: ScanType, filename: str, subfoldername=None): +def upload_gcs( + bucket_name: str, scan_type: ScanType, filename: str, subfoldername=None +): """ Upload scans to a GCS bucket and return the path to the file in Cloud Console. """ @@ -65,7 +68,7 @@ def error_slack_alert(error: str, token: str, channel: str): Send error to slack or make a note in the logs. """ if not channel: - error_log = f"{ error }. No Slack alert requested." + error_log = f"{error}. No Slack alert requested." logging.warning(error_log) else: slack = SlackClient(token) @@ -81,56 +84,74 @@ def codedx_upload(cdx: CodeDx, project: str, filename: str): cdx.analyze(project, filename) + def fetch_dojo_lead_id(dojo, defect_dojo_user): - #Doing these as individual retries to avoid uploading the same report twice. - max_retries = int(getenv("MAX_RETRIES", '5')) + # Doing these as individual retries to avoid uploading the same report twice. + max_retries = int(getenv("MAX_RETRIES", "5")) retry_delay = 30 for attempt in range(max_retries): try: lead_id = dojo.list_users(defect_dojo_user).data["results"][0]["id"] return lead_id - except Exception: # pylint: disable=broad-except + except Exception: # pylint: disable=broad-except sleep(retry_delay) logging.error("Did not retrieve dojo user ID, upload failed.") raise RuntimeError("Maximum retry attempts reached for requesting lead_id") -def defectdojo_upload(product_id: int, zap_filename: str, defect_dojo_key: str, defect_dojo_user: str, defect_dojo: str): # pylint: disable=line-too-long + +def defectdojo_upload( + product_id: int, + zap_filename: str, + defect_dojo_key: str, + defect_dojo_user: str, + defect_dojo: str, +): # pylint: disable=line-too-long """ Upload Zap results in DefectDojo product """ dojo = defectdojo.DefectDojoAPIv2( - defect_dojo, defect_dojo_key, defect_dojo_user, debug=False, timeout=120) + defect_dojo, defect_dojo_key, defect_dojo_user, debug=False, timeout=120 + ) absolute_path = os.path.abspath(zap_filename) date = datetime.today().strftime("%Y%m%d%H:%M") lead_id = fetch_dojo_lead_id(dojo, defect_dojo_user) - engagement=dojo.create_engagement( name=date, product_id=product_id, lead_id=lead_id, + engagement = dojo.create_engagement( + name=date, + product_id=product_id, + lead_id=lead_id, target_start=datetime.today().strftime("%Y-%m-%d"), - target_end=datetime.today().strftime("%Y-%m-%d"), status="In Progress", - active='True',deduplication_on_engagement='False') - engagement_id=engagement.data["id"] - - dojo_upload = dojo.upload_scan(engagement_id=engagement_id, - scan_type="ZAP Scan", - file=absolute_path, - active=True, - verified=False, - close_old_findings=True, - skip_duplicates=True, - scan_date=str(datetime.today().strftime('%Y-%m-%d')), - tags="Zap_scan") + target_end=datetime.today().strftime("%Y-%m-%d"), + status="In Progress", + active="True", + deduplication_on_engagement="False", + ) + engagement_id = engagement.data["id"] + + dojo_upload = dojo.upload_scan( + engagement_id=engagement_id, + scan_type="ZAP Scan", + file=absolute_path, + active=True, + verified=False, + close_old_findings=True, + skip_duplicates=True, + scan_date=str(datetime.today().strftime("%Y-%m-%d")), + tags="Zap_scan", + ) logging.info("Dojo file upload: %s", dojo_upload) - max_retries = int(getenv("MAX_RETRIES", '3')) + max_retries = int(getenv("MAX_RETRIES", "3")) retry_delay = 20 for attempt in range(max_retries): try: - dojo._request('POST','engagements/'+str(engagement_id)+'/close/') + dojo._request("POST", "engagements/" + str(engagement_id) + "/close/") return - except Exception: # pylint: disable=broad-except + except Exception: # pylint: disable=broad-except sleep(retry_delay) raise RuntimeError("Maximum retry attempts reached for closing engagement") + class Severity(str, Enum): """ Provides possible values of finding severity in Code Dx. @@ -144,9 +165,7 @@ class Severity(str, Enum): def get_codedx_alert_count_by_severity( - cdx: CodeDx, - project: str, - severities: List[Severity] + cdx: CodeDx, project: str, severities: List[Severity] ) -> int: """ Get finding count, given the severity levels for a Code Dx project. @@ -163,7 +182,7 @@ def get_codedx_alert_count_by_severity( pid = new_project["id"] res = cdx.get_finding_count(pid, filters) if "count" not in res: - raise RuntimeError(f"{ res }") + raise RuntimeError(f"{res}") return res["count"] @@ -183,21 +202,19 @@ def get_alerts_string(cdx: CodeDx, project: str, severities: List[Severity]): for severity in severities: count = get_codedx_alert_count_by_severity(cdx, project, [severity]) if count: - messages.append( - f"\t{ emojis[severity] } { count } { severity.value } findings\n" - ) + messages.append(f"\t{emojis[severity]} {count} {severity.value} findings\n") return "".join(messages) def get_codedx_report_by_alert_severity( - cdx: CodeDx, project: str, severities: List[Severity] + cdx: CodeDx, project: str, severities: List[Severity] ): """ Generate a PDF report, given the severity levels for a Code Dx project. """ logging.info("Getting PDF report from Codedx project: %s", project) report_date = datetime.now() - report_file = f'{project.replace("-", "_")}_report_{report_date:%Y%m%d}.pdf' + report_file = f"{project.replace('-', '_')}_report_{report_date:%Y%m%d}.pdf" filters = { "severity": [s.value for s in severities], "status": ["new", "unresolved", "reopened", "escalated"], @@ -217,18 +234,15 @@ def get_codedx_report_by_alert_severity( return report_file -def get_codedx_initial_report( - cdx: CodeDx, project: str -): + +def get_codedx_initial_report(cdx: CodeDx, project: str): """ Generate a PDF report showing all findings that haven't been closed. """ logging.info("Getting PDF report from Codedx project: %s", project) report_date = datetime.now() - report_file = f'{project.replace("-", "_")}_report_{report_date:%Y%m%d}.pdf' - filters = { - "status": [2, 3, 4, 5, 6, 10, 9, 1] - } + report_file = f"{project.replace('-', '_')}_report_{report_date:%Y%m%d}.pdf" + filters = {"status": [2, 3, 4, 5, 6, 10, 9, 1]} if not cdx.get_project_id(project): cdx.create_project(project) cdx.get_pdf( @@ -260,14 +274,14 @@ def parse_severities(severities: str): def slack_alert_with_report( # pylint: disable=too-many-arguments - cdx: CodeDx, - codedx_project: str, - severities: List[Severity], - token: str, - channel: str, - target_url: str, - xml_report_url: str, - scan_type: ScanType, + cdx: CodeDx, + codedx_project: str, + severities: List[Severity], + token: str, + channel: str, + target_url: str, + xml_report_url: str, + scan_type: ScanType, ): """ Alert Slack on requested findings, if any. @@ -292,9 +306,9 @@ def slack_alert_with_report( # pylint: disable=too-many-arguments ) alerts_string = get_alerts_string(cdx, codedx_project, severities) report_message = ( - f"{ gcs_slack_text }" - f"Results from {scan_type.label()} scan of endpoint { target_url }:\n" - f"{ alerts_string }" + f"{gcs_slack_text}" + f"Results from {scan_type.label()} scan of endpoint {target_url}:\n" + f"{alerts_string}" f"Please see the attached report for details." ) slack.files_upload( @@ -315,12 +329,12 @@ def slack_alert_with_report( # pylint: disable=too-many-arguments def slack_alert_without_report( # pylint: disable=too-many-arguments - token: str, - channel: str, - xml_report_url: str, - product_id: str, - dd: str, - target_url: str + token: str, + channel: str, + xml_report_url: str, + product_id: str, + dd: str, + target_url: str, ): """ Alert Slack on requested findings, if any. @@ -334,22 +348,29 @@ def slack_alert_without_report( # pylint: disable=too-many-arguments if xml_report_url: gcs_slack_text = ( - "New vulnerability report uploaded to GCS bucket: " + - f"{xml_report_url}\n and DefectDojo product: {dd}product/{product_id}" + "New vulnerability report uploaded to GCS bucket: " + + f"{xml_report_url}\n and DefectDojo product: {dd}product/{product_id}" ) slack.chat_postMessage(channel=channel, text=gcs_slack_text) - logging.info("Alert sent to Slack channel for GCS bucket and DefectDojo upload report") + logging.info( + "Alert sent to Slack channel for GCS bucket and DefectDojo upload report" + ) else: gcs_slack_text = ( - "New vulnerability report uploaded to DefectDojo for " + - f"{target_url}: {dd}product/{product_id}" + "New vulnerability report uploaded to DefectDojo for " + + f"{target_url}: {dd}product/{product_id}" ) slack.chat_postMessage(channel=channel, text=gcs_slack_text) logging.info("Alert sent to Slack channel for DefectDojo upload report") + # match a hash after a hyphen or dot, and only match 8 or 9 characters of hex URI_HASH_REGEX1 = re.compile(r"[-\.][a-zA-Z0-9]{8,9}(?![a-fA-F0-9])") -URI_HASH_REGEX2 = re.compile(r"index-\w{7}-\.js") # /assets/index-4au49BA-.js +REPLACEMENT1 = "" +# remote a hash e.g. /assets/index-4au49BA-.js -> /assets/index.js +URI_HASH_REGEX2 = re.compile(r"index-\w{7}-\.js") +REPLACEMENT2 = "index.js" + def clean_uri_path(xml_report): """ @@ -357,20 +378,21 @@ def clean_uri_path(xml_report): """ tree = ET.parse(xml_report) root = tree.getroot() - #There's a hash in bundled files that is causing flaws to not match + # There's a hash in bundled files that is causing flaws to not match # this should remove the hash. - for uri in root.iter('uri'): - r=urlparse(uri.text) - r_prev = r - r=r._replace(path=URI_HASH_REGEX1.sub('', r.path)) - r=r._replace(path=URI_HASH_REGEX2.sub('index-hash-.js', r.path)) + for uri in root.iter("uri"): + r = urlparse(uri.text) + print(type(r)) + path_old = r.path + r = r._replace(path=URI_HASH_REGEX1.sub(REPLACEMENT1, r.path)) + r = r._replace(path=URI_HASH_REGEX2.sub(REPLACEMENT2, r.path)) if r != r_prev: logging.info("URI %s -> %s", r_prev, r) - uri.text = urlunparse(r) + uri.text = urlunparse(r) tree.write(xml_report) -def main(): # pylint: disable=too-many-locals +def main(): # pylint: disable=too-many-locals """ - Run ZAP scan - Upload results to Code Dx @@ -380,7 +402,7 @@ def main(): # pylint: disable=too-many-locals client = google.cloud.logging.Client() client.setup_logging() - max_retries = int(getenv("MAX_RETRIES", '1')) + max_retries = int(getenv("MAX_RETRIES", "1")) sleep_time = 10 for attempt in range(max_retries): # run Zap scan @@ -408,29 +430,26 @@ def main(): # pylint: disable=too-many-locals defect_dojo = getenv("DEFECT_DOJO_URL") dd = getenv("DEFECT_DOJO") # configure logging - logging.basicConfig(level=logging.INFO, + logging.basicConfig( + level=logging.INFO, format=f"%(levelname)-8s [{codedx_project} {scan_type}-scan] %(message)s", - ) + ) # fetch dd poject name - dojo_product_name = fetch_dojo_product_name(defect_dojo, - defect_dojo_user, - defect_dojo_key, - product_id) + dojo_product_name = fetch_dojo_product_name( + defect_dojo, defect_dojo_user, defect_dojo_key, product_id + ) - logging.info("Severities: %s", ", ".join( - s.value for s in severities)) + logging.info("Severities: %s", ", ".join(s.value for s in severities)) (zap_filename, session_filename) = zap_compliance_scan( - dojo_product_name, target_url, scan_type) + dojo_product_name, target_url, scan_type + ) # optionally, upload them to GCS xml_report_url = "" if scan_type is not ScanType.BASELINE: xml_report_url = upload_gcs( - bucket_name, - scan_type, - zap_filename, - subfoldername='raw' + bucket_name, scan_type, zap_filename, subfoldername="raw" ) upload_gcs( session_bucket, @@ -438,32 +457,29 @@ def main(): # pylint: disable=too-many-locals session_filename, ) - #removes hash from certain static files to improve flaw matching. - #done after upload of raw report to GCS to preserve raw report xml. + # removes hash from certain static files to improve flaw matching. + # done after upload of raw report to GCS to preserve raw report xml. clean_uri_path(zap_filename) - #upload scrubbed results in case we need to do a manual upload + # upload scrubbed results in case we need to do a manual upload if scan_type in (ScanType.UI, ScanType.LEOAPP, ScanType.BEEHIVE): xml_report_url = upload_gcs( - bucket_name, - scan_type, - zap_filename, - subfoldername='clean' + bucket_name, scan_type, zap_filename, subfoldername="clean" ) # upload its results in defectDojo - defectdojo_upload(product_id, zap_filename, - defect_dojo_key, defect_dojo_user, defect_dojo) - + defectdojo_upload( + product_id, zap_filename, defect_dojo_key, defect_dojo_user, defect_dojo + ) - if codedx_api_key == '""' or codedx_project == '': + if codedx_api_key == '""' or codedx_project == "": slack_alert_without_report( slack_token, slack_channel, xml_report_url, product_id, dd, - target_url + target_url, ) else: # upload its results to Code Dx @@ -484,64 +500,106 @@ def main(): # pylint: disable=too-many-locals ) # Upload Terra scan XMLs and CodeDx reports to Google Drive. - if scan_type not in (ScanType.BASELINE, ScanType.HAILAPI, ScanType.HAILAUTH): + if scan_type not in ( + ScanType.BASELINE, + ScanType.HAILAPI, + ScanType.HAILAUTH, + ): try: - logging.info('Setting up the google drive API service for uploading reports.') + logging.info( + "Setting up the google drive API service for uploading reports." + ) drive_service = drivehelper.get_drive_service() - root_id = os.getenv('DRIVE_ROOT_ID', None) - drive_id = os.getenv('DRIVE_ID') - folder_structure = drivehelper.get_folders_with_structure(root_id, - drive_id, - drive_service) + root_id = os.getenv("DRIVE_ROOT_ID", None) + drive_id = os.getenv("DRIVE_ID") + folder_structure = drivehelper.get_folders_with_structure( + root_id, drive_id, drive_service + ) if not folder_structure: - raise RuntimeError("The provided gdrive folder ID was not found.") + raise RuntimeError( + "The provided gdrive folder ID was not found." + ) date = datetime.today() if drivehelper.after_fourth_wednesday(date): date = date + timedelta(days=10) - logging.info("Finding the folders for this month's scans in Google Drive") - year_folder_dict = drivehelper.find_subfolder(folder_structure, str(date.year)) + logging.info( + "Finding the folders for this month's scans in Google Drive" + ) + year_folder_dict = drivehelper.find_subfolder( + folder_structure, str(date.year) + ) if len(year_folder_dict) > 0: - month_folder_dict = drivehelper.find_subfolder(year_folder_dict, date.strftime('%Y-%m')) if year_folder_dict is not None else None - xml_folder_dict = drivehelper.find_subfolder(month_folder_dict, 'XML') if month_folder_dict is not None else None - zap_raw_folder = drivehelper.find_subfolder(month_folder_dict, 'Raw Reports') if month_folder_dict is not None else None + month_folder_dict = ( + drivehelper.find_subfolder( + year_folder_dict, date.strftime("%Y-%m") + ) + if year_folder_dict is not None + else None + ) + xml_folder_dict = ( + drivehelper.find_subfolder(month_folder_dict, "XML") + if month_folder_dict is not None + else None + ) + zap_raw_folder = ( + drivehelper.find_subfolder( + month_folder_dict, "Raw Reports" + ) + if month_folder_dict is not None + else None + ) if month_folder_dict and xml_folder_dict and zap_raw_folder: - logging.info(f"Uploading report and XML for this month's scans to {xml_folder_dict}") + logging.info( + f"Uploading report and XML for this month's scans to {xml_folder_dict}" + ) else: - raise RuntimeError("Unable to find the proper folders for uploading reports.") - file = drivehelper.upload_file_to_drive(zap_filename, - xml_folder_dict.get('id'), - drive_id, - drive_service) - logging.info(f"The returned file id for {dojo_product_name} XML is {file}") + raise RuntimeError( + "Unable to find the proper folders for uploading reports." + ) + file = drivehelper.upload_file_to_drive( + zap_filename, + xml_folder_dict.get("id"), + drive_id, + drive_service, + ) + logging.info( + f"The returned file id for {dojo_product_name} XML is {file}" + ) if not file: - raise RuntimeError(f"The XML file for {dojo_product_name} was not uploaded.") + raise RuntimeError( + f"The XML file for {dojo_product_name} was not uploaded." + ) cdx = CodeDx(codedx_url, codedx_api_key) report_file = get_codedx_initial_report(cdx, codedx_project) - file = drivehelper.upload_file_to_drive(report_file, - zap_raw_folder.get('id'), - drive_id, - drive_service) + file = drivehelper.upload_file_to_drive( + report_file, + zap_raw_folder.get("id"), + drive_id, + drive_service, + ) if not file: raise RuntimeError( - f"The CodeDx report for {dojo_product_name} was not uploaded.") - logging.info(f'The report {report_file} has been uploaded.') - except Exception as e: # pylint: disable=broad-except - error_message = f'Failed to complete uploading files to GDrive for {dojo_product_name}. Last error {e}' + f"The CodeDx report for {dojo_product_name} was not uploaded." + ) + logging.info(f"The report {report_file} has been uploaded.") + except Exception as e: # pylint: disable=broad-except + error_message = f"Failed to complete uploading files to GDrive for {dojo_product_name}. Last error {e}" logging.info(error_message) - error_slack_alert( - error_message, slack_token, slack_channel) + error_slack_alert(error_message, slack_token, slack_channel) zap = zap_connect() zap.core.shutdown() - except Exception as error: # pylint: disable=broad-except - error_message = f"[RETRY-{ attempt }] Exception running Zap Scans: { error }" + except Exception as error: # pylint: disable=broad-except + error_message = f"[RETRY-{attempt}] Exception running Zap Scans: {error}" logging.warning(error_message) if attempt == max_retries - 1: - error_message = f"Error running Zap Scans for { target_url }. Last error: { error }" + error_message = ( + f"Error running Zap Scans for {target_url}. Last error: {error}" + ) try: error_slack_alert(error_message, slack_token, slack_channel) except: @@ -549,10 +607,9 @@ def main(): # pylint: disable=too-many-locals try: zap = zap_connect() zap.core.shutdown() - except Exception as zap_e: # pylint: disable=broad-except - error_message = f"Error shutting down zap: { zap_e }" - error_slack_alert( - error_message, slack_token, slack_channel) + except Exception as zap_e: # pylint: disable=broad-except + error_message = f"Error shutting down zap: {zap_e}" + error_slack_alert(error_message, slack_token, slack_channel) logging.exception("Error shutting down zap.") logging.exception("Max retries exceeded.") exit(0) From 3be434b85cef87e26bd033e3ab0df0a879b8b1f0 Mon Sep 17 00:00:00 2001 From: Tom Conner Date: Wed, 5 Feb 2025 16:39:48 -0500 Subject: [PATCH 4/4] remove hash --- zap/src/scan.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zap/src/scan.py b/zap/src/scan.py index f1706489..a51ce720 100755 --- a/zap/src/scan.py +++ b/zap/src/scan.py @@ -386,8 +386,8 @@ def clean_uri_path(xml_report): path_old = r.path r = r._replace(path=URI_HASH_REGEX1.sub(REPLACEMENT1, r.path)) r = r._replace(path=URI_HASH_REGEX2.sub(REPLACEMENT2, r.path)) - if r != r_prev: - logging.info("URI %s -> %s", r_prev, r) + if r.path != path_old: + logging.info("URI %s -> %s", path_old, r.path) uri.text = urlunparse(r) tree.write(xml_report)