diff --git a/vulnerabilities/importer.py b/vulnerabilities/importer.py index 933c19edc..c64dfccaa 100644 --- a/vulnerabilities/importer.py +++ b/vulnerabilities/importer.py @@ -376,6 +376,24 @@ class Importer: vcs_response: VCSResponse = None # It needs to be unique and immutable importer_name = "" + requires_reference_for_advisory_id = False + + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + raise NotImplementedError + + @classmethod + def get_cve_id(cls, aliases: list[str]) -> str: + """ + Return the CVE ID for the given aliases. + """ + for alias in aliases: + if alias.startswith("CVE-"): + return alias + return None def __init__(self): if not self.spdx_license_expression: diff --git a/vulnerabilities/importers/apache_httpd.py b/vulnerabilities/importers/apache_httpd.py index 75099ab8f..9dee86041 100644 --- a/vulnerabilities/importers/apache_httpd.py +++ b/vulnerabilities/importers/apache_httpd.py @@ -38,6 +38,13 @@ class ApacheHTTPDImporter(Importer): license_url = "https://www.apache.org/licenses/LICENSE-2.0" importer_name = "Apache HTTPD Importer" + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + return cls.get_cve_id(aliases) + def advisory_data(self): links = fetch_links(self.base_url) for link in links: diff --git a/vulnerabilities/importers/apache_kafka.py b/vulnerabilities/importers/apache_kafka.py index 27c244b2a..f14f383b5 100644 --- a/vulnerabilities/importers/apache_kafka.py +++ b/vulnerabilities/importers/apache_kafka.py @@ -102,6 +102,13 @@ def fetch_advisory_page(self): page = requests.get(self.GH_PAGE_URL) return page.content + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + return cls.get_cve_id(aliases) + def advisory_data(self): advisory_page = self.fetch_advisory_page(self) diff --git a/vulnerabilities/importers/apache_tomcat.py b/vulnerabilities/importers/apache_tomcat.py index 9d371ee7d..4e5498c15 100644 --- a/vulnerabilities/importers/apache_tomcat.py +++ b/vulnerabilities/importers/apache_tomcat.py @@ -120,6 +120,13 @@ class ApacheTomcatImporter(Importer): license_url = "https://www.apache.org/licenses/LICENSE-2.0" importer_name = "Apache Tomcat Importer" + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + return cls.get_cve_id(aliases) + def fetch_advisory_pages(self): """ Yield the content of each HTML page containing version-related security data. diff --git a/vulnerabilities/importers/archlinux.py b/vulnerabilities/importers/archlinux.py index 640fb24dc..9207ebe5a 100644 --- a/vulnerabilities/importers/archlinux.py +++ b/vulnerabilities/importers/archlinux.py @@ -29,6 +29,17 @@ class ArchlinuxImporter(Importer): spdx_license_expression = "MIT" license_url = "https://github.com/archlinux/arch-security-tracker/blob/master/LICENSE" importer_name = "Arch Linux Importer" + requires_reference_for_advisory_id = True + + @classmethod + def get_advisory_id(cls, aliases: list[str], references) -> str: + """ + Return the Advisory ID for the given aliases. + """ + for ref in references: + if ref.get("reference_id").startswith("AVG-"): + return ref.get("reference_id") + return cls.get_cve_id(aliases) def fetch(self) -> Iterable[Mapping]: response = fetch_response(self.url) diff --git a/vulnerabilities/importers/curl.py b/vulnerabilities/importers/curl.py index a7f5e86fa..ef321dbb6 100644 --- a/vulnerabilities/importers/curl.py +++ b/vulnerabilities/importers/curl.py @@ -39,6 +39,13 @@ class CurlImporter(Importer): importer_name = "Curl Importer" api_url = "https://curl.se/docs/vuln.json" + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + return cls.get_cve_id(aliases) + def fetch(self) -> Iterable[Mapping]: response = fetch_response(self.api_url) return response.json() diff --git a/vulnerabilities/importers/debian.py b/vulnerabilities/importers/debian.py index 7d1ae2071..583f0c602 100644 --- a/vulnerabilities/importers/debian.py +++ b/vulnerabilities/importers/debian.py @@ -82,6 +82,13 @@ class DebianImporter(Importer): api_url = "https://security-tracker.debian.org/tracker/data/json" importer_name = "Debian Importer" + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + return cls.get_cve_id(aliases) + def get_response(self): response = requests.get(self.api_url) if response.status_code == 200: diff --git a/vulnerabilities/importers/debian_oval.py b/vulnerabilities/importers/debian_oval.py index f5a747a11..20dc1aff7 100644 --- a/vulnerabilities/importers/debian_oval.py +++ b/vulnerabilities/importers/debian_oval.py @@ -56,6 +56,13 @@ class DebianOvalImporter(OvalImporter): """ importer_name = "Debian Oval Importer" + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + return cls.get_cve_id(aliases) + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # we could avoid setting translations, and have it diff --git a/vulnerabilities/importers/elixir_security.py b/vulnerabilities/importers/elixir_security.py index 3fe0ec15b..70183c3e6 100644 --- a/vulnerabilities/importers/elixir_security.py +++ b/vulnerabilities/importers/elixir_security.py @@ -41,6 +41,13 @@ def advisory_data(self) -> Set[AdvisoryData]: if self.vcs_response: self.vcs_response.delete() + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + return cls.get_cve_id(aliases) + def process_file(self, file, base_path): relative_path = str(file.relative_to(base_path)).strip("/") advisory_url = ( diff --git a/vulnerabilities/importers/epss.py b/vulnerabilities/importers/epss.py index 982229e09..1c3d5b22f 100644 --- a/vulnerabilities/importers/epss.py +++ b/vulnerabilities/importers/epss.py @@ -29,6 +29,13 @@ class EPSSImporter(Importer): spdx_license_expression = "unknown" importer_name = "EPSS Importer" + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + return cls.get_cve_id(aliases) + def advisory_data(self) -> Iterable[AdvisoryData]: response = urllib.request.urlopen(self.advisory_url) with gzip.open(response, "rb") as f: diff --git a/vulnerabilities/importers/fireeye.py b/vulnerabilities/importers/fireeye.py index 03fb3a8d5..3b4acd9f7 100644 --- a/vulnerabilities/importers/fireeye.py +++ b/vulnerabilities/importers/fireeye.py @@ -35,6 +35,16 @@ class FireyeImporter(Importer): repo_url = "git+https://github.com/mandiant/Vulnerability-Disclosures" importer_name = "FireEye Importer" + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + for alias in aliases: + if alias.startswith("MNDT-"): + return alias + return cls.get_cve_id(aliases) + def advisory_data(self) -> Iterable[AdvisoryData]: try: self.vcs_response = self.clone(repo_url=self.repo_url) diff --git a/vulnerabilities/importers/gentoo.py b/vulnerabilities/importers/gentoo.py index 2f569cdf1..6aa8a4b81 100644 --- a/vulnerabilities/importers/gentoo.py +++ b/vulnerabilities/importers/gentoo.py @@ -32,6 +32,18 @@ class GentooImporter(Importer): # under the [CC-BY-SA-4.0](https://creativecommons.org/licenses/by-sa/4.0/) license. license_url = "https://creativecommons.org/licenses/by-sa/4.0/" importer_name = "Gentoo Importer" + requires_reference_for_advisory_id = True + + @classmethod + def get_advisory_id(cls, aliases: list[str], references) -> str: + """ + Return the Advisory ID for the given aliases. + """ + for ref in references: + ref_id = ref.get("reference_id") + if ref_id and ref_id.startswith("GLSA-"): + return ref_id + return cls.get_cve_id(aliases) def advisory_data(self) -> Iterable[AdvisoryData]: try: diff --git a/vulnerabilities/importers/github_osv.py b/vulnerabilities/importers/github_osv.py index f0490044e..1b6c0ebbe 100644 --- a/vulnerabilities/importers/github_osv.py +++ b/vulnerabilities/importers/github_osv.py @@ -25,6 +25,16 @@ class GithubOSVImporter(Importer): repo_url = "git+https://github.com/github/advisory-database/" importer_name = "GithubOSV Importer" + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + for alias in aliases: + if alias.startswith("GHSA"): + return alias + return cls.get_cve_id(aliases) + def advisory_data(self) -> Iterable[AdvisoryData]: supported_ecosystems = [ "pypi", diff --git a/vulnerabilities/importers/istio.py b/vulnerabilities/importers/istio.py index 8f9f6334a..17e5529bd 100644 --- a/vulnerabilities/importers/istio.py +++ b/vulnerabilities/importers/istio.py @@ -43,6 +43,18 @@ class IstioImporter(Importer): license_url = "https://github.com/istio/istio.io/blob/master/LICENSE" repo_url = "git+https://github.com/istio/istio.io/" importer_name = "Istio Importer" + requires_reference_for_advisory_id = True + + @classmethod + def get_advisory_id(cls, aliases: list[str], references) -> str: + """ + Return the Advisory ID for the given aliases. + """ + for ref in references: + ref_id = ref.get("reference_id") + if ref_id and ref_id.startswith("ISTIO-"): + return ref_id + return cls.get_cve_id(aliases) def advisory_data(self) -> Set[AdvisoryData]: try: diff --git a/vulnerabilities/importers/mozilla.py b/vulnerabilities/importers/mozilla.py index 8eea10370..93776c4f7 100644 --- a/vulnerabilities/importers/mozilla.py +++ b/vulnerabilities/importers/mozilla.py @@ -38,6 +38,18 @@ class MozillaImporter(Importer): license_url = "https://github.com/mozilla/foundation-security-advisories/blob/master/LICENSE" repo_url = "git+https://github.com/mozilla/foundation-security-advisories/" importer_name = "Mozilla Importer" + requires_reference_for_advisory_id = True + + @classmethod + def get_advisory_id(cls, aliases: list[str], references) -> str: + """ + Return the Advisory ID for the given aliases. + """ + for ref in references: + ref_id = ref.get("reference_id") + if ref_id and ref_id.lower().startswith("mfsa"): + return ref_id + return cls.get_cve_id(aliases) def advisory_data(self) -> Iterable[AdvisoryData]: try: diff --git a/vulnerabilities/importers/openssl.py b/vulnerabilities/importers/openssl.py index b71206418..86f2c9b36 100644 --- a/vulnerabilities/importers/openssl.py +++ b/vulnerabilities/importers/openssl.py @@ -35,6 +35,16 @@ class OpensslImporter(Importer): url = "https://www.openssl.org/news/vulnerabilities.xml" importer_name = "OpenSSL Importer" + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + for alias in aliases: + if alias.startswith("VC-OPENSSL-"): + return alias + return cls.get_cve_id(aliases) + def fetch(self): response = requests.get(url=self.url) if not response.status_code == 200: diff --git a/vulnerabilities/importers/oss_fuzz.py b/vulnerabilities/importers/oss_fuzz.py index 63b879990..39eba36ba 100644 --- a/vulnerabilities/importers/oss_fuzz.py +++ b/vulnerabilities/importers/oss_fuzz.py @@ -26,6 +26,13 @@ class OSSFuzzImporter(Importer): url = "git+https://github.com/google/oss-fuzz-vulns" importer_name = "OSS Fuzz Importer" + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + return cls.get_cve_id(aliases) + def advisory_data(self) -> Iterable[AdvisoryData]: try: self.clone(repo_url=self.url) diff --git a/vulnerabilities/importers/postgresql.py b/vulnerabilities/importers/postgresql.py index 70ab1bfe9..fed2bb68d 100644 --- a/vulnerabilities/importers/postgresql.py +++ b/vulnerabilities/importers/postgresql.py @@ -30,6 +30,13 @@ class PostgreSQLImporter(Importer): spdx_license_expression = "PostgreSQL" importer_name = "PostgreSQL Importer" + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + return cls.get_cve_id(aliases) + def advisory_data(self): known_urls = {self.root_url} visited_urls = set() diff --git a/vulnerabilities/importers/project_kb_msr2019.py b/vulnerabilities/importers/project_kb_msr2019.py index a006b1353..ebf6f0e4a 100644 --- a/vulnerabilities/importers/project_kb_msr2019.py +++ b/vulnerabilities/importers/project_kb_msr2019.py @@ -24,6 +24,13 @@ class ProjectKBMSRImporter(Importer): license_url = "https://github.com/SAP/project-kb/blob/main/LICENSE.txt" importer_name = "ProjectKB MSRImporter" + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + return cls.get_cve_id(aliases) + def advisory_data(self): raw_data = fetch_and_read_from_csv(self.url) yield from self.to_advisories(raw_data) diff --git a/vulnerabilities/importers/redhat.py b/vulnerabilities/importers/redhat.py index 68e3d5062..b5dac67ed 100644 --- a/vulnerabilities/importers/redhat.py +++ b/vulnerabilities/importers/redhat.py @@ -66,6 +66,18 @@ class RedhatImporter(Importer): spdx_license_expression = "CC-BY-4.0" license_url = "https://access.redhat.com/documentation/en-us/red_hat_security_data_api/1.0/html/red_hat_security_data_api/legal-notice" importer_name = "RedHat Importer" + requires_reference_for_advisory_id = True + + @classmethod + def get_advisory_id(cls, aliases: list[str], references) -> str: + """ + Return the Advisory ID for the given aliases. + """ + for ref in references: + ref_id = ref.get("reference_id") + if ref_id and ref_id.lower().startswith("RHSA-"): + return ref_id + return cls.get_cve_id(aliases) def advisory_data(self) -> Iterable[AdvisoryData]: for redhat_cves in fetch_cves(): diff --git a/vulnerabilities/importers/retiredotnet.py b/vulnerabilities/importers/retiredotnet.py index 139ecd1af..1469c1eff 100644 --- a/vulnerabilities/importers/retiredotnet.py +++ b/vulnerabilities/importers/retiredotnet.py @@ -44,6 +44,13 @@ def advisory_data(self) -> Iterable[AdvisoryData]: if self.vcs_response: self.vcs_response.delete() + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + return cls.get_cve_id(aliases) + @staticmethod def vuln_id_from_desc(desc): cve_regex = re.compile(r"CVE-\d+-\d+") diff --git a/vulnerabilities/importers/ruby.py b/vulnerabilities/importers/ruby.py index 268419587..537d557a4 100644 --- a/vulnerabilities/importers/ruby.py +++ b/vulnerabilities/importers/ruby.py @@ -52,6 +52,16 @@ class RubyImporter(Importer): SOFTWARE. """ + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + for alias in aliases: + if alias.startswith("GHSA-"): + return alias + return cls.get_cve_id(aliases) + def advisory_data(self) -> Iterable[AdvisoryData]: try: self.clone(self.repo_url) diff --git a/vulnerabilities/importers/suse_oval.py b/vulnerabilities/importers/suse_oval.py index 0722682f7..8c10d4e88 100644 --- a/vulnerabilities/importers/suse_oval.py +++ b/vulnerabilities/importers/suse_oval.py @@ -26,6 +26,13 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.translations = {"less than": "<", "equals": "=", "greater than or equal": ">="} + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + return cls.get_cve_id(aliases) + def _fetch(self): page = requests.get(self.base_url).text soup = BeautifulSoup(page, "lxml") diff --git a/vulnerabilities/importers/suse_scores.py b/vulnerabilities/importers/suse_scores.py index b7f2089ac..9dc891160 100644 --- a/vulnerabilities/importers/suse_scores.py +++ b/vulnerabilities/importers/suse_scores.py @@ -26,6 +26,13 @@ class SUSESeverityScoreImporter(Importer): license_url = "https://ftp.suse.com/pub/projects/security/yaml/LICENSE" importer_name = "SUSE Severity Score Importer" + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + return cls.get_cve_id(aliases) + def advisory_data(self) -> Iterable[AdvisoryData]: score_data = fetch_yaml(URL) yield from self.to_advisory(score_data) diff --git a/vulnerabilities/importers/ubuntu.py b/vulnerabilities/importers/ubuntu.py index e47515b93..02f01a85e 100644 --- a/vulnerabilities/importers/ubuntu.py +++ b/vulnerabilities/importers/ubuntu.py @@ -63,6 +63,13 @@ class UbuntuImporter(OvalImporter): """ importer_name = "Ubuntu OVAL Importer" + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + return cls.get_cve_id(aliases) + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # we could avoid setting translations, and have it diff --git a/vulnerabilities/importers/ubuntu_usn.py b/vulnerabilities/importers/ubuntu_usn.py index 1aa247ec6..f32da51e0 100644 --- a/vulnerabilities/importers/ubuntu_usn.py +++ b/vulnerabilities/importers/ubuntu_usn.py @@ -63,6 +63,18 @@ class UbuntuUSNImporter(Importer): Thanks """ importer_name = "Ubuntu USN Importer" + requires_reference_for_advisory_id = True + + @classmethod + def get_advisory_id(cls, aliases: list[str], references) -> str: + """ + Return the Advisory ID for the given aliases. + """ + for ref in references: + reference_id = ref.get("reference_id") + if reference_id and reference_id.startswith("USN-"): + return reference_id + return cls.get_cve_id(aliases) def advisory_data(self): usn_db = fetch(self.db_url) diff --git a/vulnerabilities/importers/vulnrichment.py b/vulnerabilities/importers/vulnrichment.py index 9eb4d3bcb..32fd8155d 100644 --- a/vulnerabilities/importers/vulnrichment.py +++ b/vulnerabilities/importers/vulnrichment.py @@ -25,6 +25,13 @@ class VulnrichImporter(Importer): repo_url = "git+https://github.com/cisagov/vulnrichment.git" importer_name = "Vulnrichment" + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + return cls.get_cve_id(aliases) + def advisory_data(self) -> Iterable[AdvisoryData]: try: vcs_response = self.clone(repo_url=self.repo_url) diff --git a/vulnerabilities/importers/xen.py b/vulnerabilities/importers/xen.py index a0cafa324..18c336bf3 100644 --- a/vulnerabilities/importers/xen.py +++ b/vulnerabilities/importers/xen.py @@ -45,6 +45,17 @@ class XenImporter(Importer): -George """ importer_name = "Xen Importer" + requires_reference_for_advisory_id = True + + @classmethod + def get_advisory_id(cls, aliases: list[str], references: list[dict]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + for ref in references: + if ref.get("reference_id").startswith("XSA-"): + return ref.get("reference_id") + return cls.get_cve_id(aliases) def advisory_data(self): data = fetch_response(self.url).json() diff --git a/vulnerabilities/migrations/0091_advisory_advisory_id.py b/vulnerabilities/migrations/0091_advisory_advisory_id.py new file mode 100644 index 000000000..ee457986c --- /dev/null +++ b/vulnerabilities/migrations/0091_advisory_advisory_id.py @@ -0,0 +1,22 @@ +# Generated by Django 4.2.17 on 2025-03-28 06:22 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("vulnerabilities", "0090_migrate_advisory_aliases"), + ] + + operations = [ + migrations.AddField( + model_name="advisory", + name="advisory_id", + field=models.CharField( + blank=True, + help_text="An advisory id, such as CVE-123-345 when available", + max_length=100, + ), + ), + ] diff --git a/vulnerabilities/models.py b/vulnerabilities/models.py index dba205500..2af1bf55b 100644 --- a/vulnerabilities/models.py +++ b/vulnerabilities/models.py @@ -1323,6 +1323,16 @@ class Advisory(models.Model): null=False, help_text="A 64 character unique identifier for the content of the advisory since we use sha256 as hex", ) + + advisory_id = models.CharField( + max_length=100, + blank=True, + help_text="An advisory id, such as CVE-123-345 when available", + ) + url = models.URLField( + blank=True, + help_text="Link to the advisory on the upstream website", + ) aliases = models.ManyToManyField( Alias, through="AdvisoryRelatedAlias", @@ -1354,10 +1364,6 @@ class Advisory(models.Model): "module name importing the advisory. Eg:" "vulnerabilities.pipeline.nginx_importer.NginxImporterPipeline", ) - url = models.URLField( - blank=True, - help_text="Link to the advisory on the upstream website", - ) objects = AdvisoryQuerySet.as_manager() diff --git a/vulnerabilities/pipelines/__init__.py b/vulnerabilities/pipelines/__init__.py index d74db9f35..b1b9860f5 100644 --- a/vulnerabilities/pipelines/__init__.py +++ b/vulnerabilities/pipelines/__init__.py @@ -13,6 +13,7 @@ from timeit import default_timer as timer from traceback import format_exc as traceback_format_exc from typing import Iterable +from typing import Optional from aboutcode.pipeline import BasePipeline from aboutcode.pipeline import LoopProgress @@ -114,6 +115,7 @@ class VulnerableCodeBaseImporterPipeline(VulnerableCodePipeline): repo_url = None importer_name = None advisory_confidence = MAX_CONFIDENCE + requires_reference_for_advisory_id = False @classmethod def steps(cls): @@ -132,6 +134,29 @@ def collect_advisories(self) -> Iterable[AdvisoryData]: """ raise NotImplementedError + @classmethod + def get_advisory_id(cls, aliases: list[str], references: Optional[list[dict]] = None) -> str: + """ + Return the Advisory ID for the given aliases. + """ + raise NotImplementedError + + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + raise NotImplementedError + + @classmethod + def get_cve_id(cls, aliases: list[str]) -> str: + """ + Return the CVE ID for the given aliases. + """ + for alias in aliases: + if alias.startswith("CVE-"): + return alias + def advisories_count(self) -> int: """ Return the estimated AdvisoryData to be yielded by ``collect_advisories``. diff --git a/vulnerabilities/pipelines/add_advisory_id.py b/vulnerabilities/pipelines/add_advisory_id.py new file mode 100644 index 000000000..92405e66b --- /dev/null +++ b/vulnerabilities/pipelines/add_advisory_id.py @@ -0,0 +1,63 @@ +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +from aboutcode.pipeline import LoopProgress +from django.db import transaction + +from vulnerabilities.importers import IMPORTERS_REGISTRY +from vulnerabilities.models import Advisory +from vulnerabilities.models import Alias +from vulnerabilities.pipelines import VulnerableCodePipeline + + +class AddAdvisoryID(VulnerableCodePipeline): + """ + Pipeline to map CVEs from VulnerabilitySeverity to corresponding Advisories with CVSS3.1 scores. + """ + + pipeline_id = "add_advisory_id" + + @classmethod + def steps(cls): + return (cls.add_advisory_id,) + + def add_advisory_id(self): + + advisories = Advisory.objects.all() + + advisories_to_update = [] + + batch_size = 500 + + progress = LoopProgress(total_iterations=advisories.count(), logger=self.log) + + for advisory in progress.iter(advisories.iterator(chunk_size=batch_size)): + importer_name = advisory.created_by + aliases = Alias.objects.filter(advisories=advisory).values_list("alias", flat=True) + references = advisory.references + importer = IMPORTERS_REGISTRY[importer_name] + if not importer.requires_reference_for_advisory_id: + advisory_id = importer.get_advisory_id(aliases=aliases) + else: + advisory_id = importer.get_advisory_id(aliases=aliases, references=references) + if not advisory_id: + self.log(f"Advisory {advisory.id} does not have an advisory ID. Skipping.") + continue + advisory.advisory_id = advisory_id + aliases = Alias.objects.filter(advisories=advisory).exclude(alias=advisory_id) + advisory.aliases.set(aliases) + advisories_to_update.append(advisory) + if len(advisories_to_update) >= batch_size: + self.do_bulk_update(advisories_to_update) + advisories_to_update = [] + self.do_bulk_update(advisories_to_update) + self.log(f"Pipeline [{self.pipeline_name}] completed.") + + def do_bulk_update(self, advisories_to_update): + Advisory.objects.bulk_update(advisories_to_update, ["advisory_id"]) + self.log(f"Updated {len(advisories_to_update)} advisories with advisory_id.") diff --git a/vulnerabilities/pipelines/alpine_linux_importer.py b/vulnerabilities/pipelines/alpine_linux_importer.py index 5657ee4d2..23392558f 100644 --- a/vulnerabilities/pipelines/alpine_linux_importer.py +++ b/vulnerabilities/pipelines/alpine_linux_importer.py @@ -45,6 +45,13 @@ def steps(cls): cls.import_new_advisories, ) + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + return cls.get_cve_id(aliases) + def advisories_count(self) -> int: return 0 diff --git a/vulnerabilities/pipelines/github_importer.py b/vulnerabilities/pipelines/github_importer.py index 66c457824..b26076a89 100644 --- a/vulnerabilities/pipelines/github_importer.py +++ b/vulnerabilities/pipelines/github_importer.py @@ -59,6 +59,17 @@ def steps(cls): # "GO": "golang", } + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + # return GHSA ID from the aliases + for alias in aliases: + if alias.startswith("GHSA-"): + return alias + return cls.get_cve_id(aliases) + def advisories_count(self): advisory_query = """ query{ diff --git a/vulnerabilities/pipelines/gitlab_importer.py b/vulnerabilities/pipelines/gitlab_importer.py index 4f25c4d94..b9507e4a0 100644 --- a/vulnerabilities/pipelines/gitlab_importer.py +++ b/vulnerabilities/pipelines/gitlab_importer.py @@ -66,6 +66,19 @@ def steps(cls): gitlab_scheme_by_purl_type = {v: k for k, v in purl_type_by_gitlab_scheme.items()} + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + for alias in aliases: + if alias.startswith("GMS-"): + return alias + for alias in aliases: + if alias.startswith("CVE-"): + return alias + return None + def clone(self): self.log(f"Cloning `{self.repo_url}`") self.vcs_response = fetch_via_vcs(self.repo_url) diff --git a/vulnerabilities/pipelines/nginx_importer.py b/vulnerabilities/pipelines/nginx_importer.py index c5e017033..e1aae0c61 100644 --- a/vulnerabilities/pipelines/nginx_importer.py +++ b/vulnerabilities/pipelines/nginx_importer.py @@ -46,6 +46,13 @@ def fetch(self): self.log(f"Fetch `{self.url}`") self.advisory_data = requests.get(self.url).text + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + return cls.get_cve_id(aliases) + def advisories_count(self): return self.advisory_data.count("
") diff --git a/vulnerabilities/pipelines/npm_importer.py b/vulnerabilities/pipelines/npm_importer.py index 7b6d3aba2..d4d26c6ee 100644 --- a/vulnerabilities/pipelines/npm_importer.py +++ b/vulnerabilities/pipelines/npm_importer.py @@ -48,6 +48,13 @@ def steps(cls): cls.clean_downloads, ) + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + return cls.get_cve_id(aliases) + def clone(self): self.log(f"Cloning `{self.repo_url}`") self.vcs_response = fetch_via_vcs(self.repo_url) diff --git a/vulnerabilities/pipelines/nvd_importer.py b/vulnerabilities/pipelines/nvd_importer.py index 645b9f442..41302b3f5 100644 --- a/vulnerabilities/pipelines/nvd_importer.py +++ b/vulnerabilities/pipelines/nvd_importer.py @@ -68,6 +68,13 @@ class NVDImporterPipeline(VulnerableCodeBaseImporterPipeline): """ importer_name = "NVD Importer" + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + return cls.get_cve_id(aliases) + @classmethod def steps(cls): return ( diff --git a/vulnerabilities/pipelines/pypa_importer.py b/vulnerabilities/pipelines/pypa_importer.py index aebafacf4..520cb8655 100644 --- a/vulnerabilities/pipelines/pypa_importer.py +++ b/vulnerabilities/pipelines/pypa_importer.py @@ -37,6 +37,16 @@ def steps(cls): cls.clean_downloads, ) + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + for alias in aliases: + if alias.lower().startswith("pysec-"): + return alias + return cls.get_cve_id(aliases) + def clone(self): self.log(f"Cloning `{self.repo_url}`") self.vcs_response = fetch_via_vcs(self.repo_url) diff --git a/vulnerabilities/pipelines/pysec_importer.py b/vulnerabilities/pipelines/pysec_importer.py index 32a9fd896..d9611c621 100644 --- a/vulnerabilities/pipelines/pysec_importer.py +++ b/vulnerabilities/pipelines/pysec_importer.py @@ -36,6 +36,16 @@ def steps(cls): cls.import_new_advisories, ) + @classmethod + def get_advisory_id(cls, aliases: list[str]) -> str: + """ + Return the Advisory ID for the given aliases. + """ + for alias in aliases: + if alias.startswith("PYSEC-"): + return alias + return cls.get_cve_id(aliases) + def fetch_zip(self): self.log(f"Fetching `{self.url}`") self.advisory_zip = requests.get(self.url).content diff --git a/vulnerabilities/tests/test_add_advisory_pipeline.py b/vulnerabilities/tests/test_add_advisory_pipeline.py new file mode 100644 index 000000000..1c0fb4220 --- /dev/null +++ b/vulnerabilities/tests/test_add_advisory_pipeline.py @@ -0,0 +1,34 @@ +from datetime import datetime + +import pytest + +from vulnerabilities.importers import IMPORTERS_REGISTRY +from vulnerabilities.importers import nvd_importer +from vulnerabilities.models import Advisory +from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipeline +from vulnerabilities.pipelines import add_advisory_id +from vulnerabilities.pipes.advisory import get_or_create_aliases + + +@pytest.mark.django_db +class TestAddAdvisoryPipeline: + def test_add_advisory_id(self): + for importer in IMPORTERS_REGISTRY.values(): + if issubclass(importer, VulnerableCodeBaseImporterPipeline): + created_by = importer.pipeline_id + else: + created_by = importer.qualified_name + aliases = get_or_create_aliases(["CVE-2021-1234"]) + advisory = Advisory.objects.create( + unique_content_id="test-unique-content-id1", + created_by=created_by, + summary="TEST", + date_collected=datetime.now(), + url="https://test.com/source", + advisory_id="TEST", + ) + advisory.aliases.add(*aliases) + add_advisory_id.AddAdvisoryID().add_advisory_id() + advisory.refresh_from_db() + assert advisory.advisory_id == "CVE-2021-1234" + assert advisory.aliases.count() == 0