Skip to content

Add pipeline for adding advisory ID and tests #1833

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions vulnerabilities/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,24 @@ class Importer:
vcs_response: VCSResponse = None
# It needs to be unique and immutable
importer_name = ""
requires_reference_for_advisory_id = False

@classmethod
def get_advisory_id(cls, aliases: list[str]) -> str:
"""
Return the Advisory ID for the given aliases.
"""
raise NotImplementedError

@classmethod
def get_cve_id(cls, aliases: list[str]) -> str:
"""
Return the CVE ID for the given aliases.
"""
for alias in aliases:
if alias.startswith("CVE-"):
return alias
return None

def __init__(self):
if not self.spdx_license_expression:
Expand Down
7 changes: 7 additions & 0 deletions vulnerabilities/importers/apache_httpd.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ class ApacheHTTPDImporter(Importer):
license_url = "https://www.apache.org/licenses/LICENSE-2.0"
importer_name = "Apache HTTPD Importer"

@classmethod
def get_advisory_id(cls, aliases: list[str]) -> str:
"""
Return the Advisory ID for the given aliases.
"""
return cls.get_cve_id(aliases)

def advisory_data(self):
links = fetch_links(self.base_url)
for link in links:
Expand Down
7 changes: 7 additions & 0 deletions vulnerabilities/importers/apache_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ def fetch_advisory_page(self):
page = requests.get(self.GH_PAGE_URL)
return page.content

@classmethod
def get_advisory_id(cls, aliases: list[str]) -> str:
"""
Return the Advisory ID for the given aliases.
"""
return cls.get_cve_id(aliases)

def advisory_data(self):
advisory_page = self.fetch_advisory_page(self)

Expand Down
7 changes: 7 additions & 0 deletions vulnerabilities/importers/apache_tomcat.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ class ApacheTomcatImporter(Importer):
license_url = "https://www.apache.org/licenses/LICENSE-2.0"
importer_name = "Apache Tomcat Importer"

@classmethod
def get_advisory_id(cls, aliases: list[str]) -> str:
"""
Return the Advisory ID for the given aliases.
"""
return cls.get_cve_id(aliases)

def fetch_advisory_pages(self):
"""
Yield the content of each HTML page containing version-related security data.
Expand Down
11 changes: 11 additions & 0 deletions vulnerabilities/importers/archlinux.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ class ArchlinuxImporter(Importer):
spdx_license_expression = "MIT"
license_url = "https://github.com/archlinux/arch-security-tracker/blob/master/LICENSE"
importer_name = "Arch Linux Importer"
requires_reference_for_advisory_id = True

@classmethod
def get_advisory_id(cls, aliases: list[str], references) -> str:
"""
Return the Advisory ID for the given aliases.
"""
for ref in references:
if ref.get("reference_id").startswith("AVG-"):
return ref.get("reference_id")
return cls.get_cve_id(aliases)

def fetch(self) -> Iterable[Mapping]:
response = fetch_response(self.url)
Expand Down
7 changes: 7 additions & 0 deletions vulnerabilities/importers/curl.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ class CurlImporter(Importer):
importer_name = "Curl Importer"
api_url = "https://curl.se/docs/vuln.json"

@classmethod
def get_advisory_id(cls, aliases: list[str]) -> str:
"""
Return the Advisory ID for the given aliases.
"""
return cls.get_cve_id(aliases)

def fetch(self) -> Iterable[Mapping]:
response = fetch_response(self.api_url)
return response.json()
Expand Down
7 changes: 7 additions & 0 deletions vulnerabilities/importers/debian.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ class DebianImporter(Importer):
api_url = "https://security-tracker.debian.org/tracker/data/json"
importer_name = "Debian Importer"

@classmethod
def get_advisory_id(cls, aliases: list[str]) -> str:
"""
Return the Advisory ID for the given aliases.
"""
return cls.get_cve_id(aliases)

def get_response(self):
response = requests.get(self.api_url)
if response.status_code == 200:
Expand Down
7 changes: 7 additions & 0 deletions vulnerabilities/importers/debian_oval.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ class DebianOvalImporter(OvalImporter):
"""
importer_name = "Debian Oval Importer"

@classmethod
def get_advisory_id(cls, aliases: list[str]) -> str:
"""
Return the Advisory ID for the given aliases.
"""
return cls.get_cve_id(aliases)

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# we could avoid setting translations, and have it
Expand Down
7 changes: 7 additions & 0 deletions vulnerabilities/importers/elixir_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ def advisory_data(self) -> Set[AdvisoryData]:
if self.vcs_response:
self.vcs_response.delete()

@classmethod
def get_advisory_id(cls, aliases: list[str]) -> str:
"""
Return the Advisory ID for the given aliases.
"""
return cls.get_cve_id(aliases)

def process_file(self, file, base_path):
relative_path = str(file.relative_to(base_path)).strip("/")
advisory_url = (
Expand Down
7 changes: 7 additions & 0 deletions vulnerabilities/importers/epss.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ class EPSSImporter(Importer):
spdx_license_expression = "unknown"
importer_name = "EPSS Importer"

@classmethod
def get_advisory_id(cls, aliases: list[str]) -> str:
"""
Return the Advisory ID for the given aliases.
"""
return cls.get_cve_id(aliases)

def advisory_data(self) -> Iterable[AdvisoryData]:
response = urllib.request.urlopen(self.advisory_url)
with gzip.open(response, "rb") as f:
Expand Down
10 changes: 10 additions & 0 deletions vulnerabilities/importers/fireeye.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ class FireyeImporter(Importer):
repo_url = "git+https://github.com/mandiant/Vulnerability-Disclosures"
importer_name = "FireEye Importer"

@classmethod
def get_advisory_id(cls, aliases: list[str]) -> str:
"""
Return the Advisory ID for the given aliases.
"""
for alias in aliases:
if alias.startswith("MNDT-"):
return alias
return cls.get_cve_id(aliases)

def advisory_data(self) -> Iterable[AdvisoryData]:
try:
self.vcs_response = self.clone(repo_url=self.repo_url)
Expand Down
12 changes: 12 additions & 0 deletions vulnerabilities/importers/gentoo.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ class GentooImporter(Importer):
# under the [CC-BY-SA-4.0](https://creativecommons.org/licenses/by-sa/4.0/) license.
license_url = "https://creativecommons.org/licenses/by-sa/4.0/"
importer_name = "Gentoo Importer"
requires_reference_for_advisory_id = True

@classmethod
def get_advisory_id(cls, aliases: list[str], references) -> str:
"""
Return the Advisory ID for the given aliases.
"""
for ref in references:
ref_id = ref.get("reference_id")
if ref_id and ref_id.startswith("GLSA-"):
return ref_id
return cls.get_cve_id(aliases)

def advisory_data(self) -> Iterable[AdvisoryData]:
try:
Expand Down
10 changes: 10 additions & 0 deletions vulnerabilities/importers/github_osv.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ class GithubOSVImporter(Importer):
repo_url = "git+https://github.com/github/advisory-database/"
importer_name = "GithubOSV Importer"

@classmethod
def get_advisory_id(cls, aliases: list[str]) -> str:
"""
Return the Advisory ID for the given aliases.
"""
for alias in aliases:
if alias.startswith("GHSA"):
return alias
return cls.get_cve_id(aliases)

def advisory_data(self) -> Iterable[AdvisoryData]:
supported_ecosystems = [
"pypi",
Expand Down
12 changes: 12 additions & 0 deletions vulnerabilities/importers/istio.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,18 @@ class IstioImporter(Importer):
license_url = "https://github.com/istio/istio.io/blob/master/LICENSE"
repo_url = "git+https://github.com/istio/istio.io/"
importer_name = "Istio Importer"
requires_reference_for_advisory_id = True

@classmethod
def get_advisory_id(cls, aliases: list[str], references) -> str:
"""
Return the Advisory ID for the given aliases.
"""
for ref in references:
ref_id = ref.get("reference_id")
if ref_id and ref_id.startswith("ISTIO-"):
return ref_id
return cls.get_cve_id(aliases)

def advisory_data(self) -> Set[AdvisoryData]:
try:
Expand Down
12 changes: 12 additions & 0 deletions vulnerabilities/importers/mozilla.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ class MozillaImporter(Importer):
license_url = "https://github.com/mozilla/foundation-security-advisories/blob/master/LICENSE"
repo_url = "git+https://github.com/mozilla/foundation-security-advisories/"
importer_name = "Mozilla Importer"
requires_reference_for_advisory_id = True

@classmethod
def get_advisory_id(cls, aliases: list[str], references) -> str:
"""
Return the Advisory ID for the given aliases.
"""
for ref in references:
ref_id = ref.get("reference_id")
if ref_id and ref_id.lower().startswith("mfsa"):
return ref_id
return cls.get_cve_id(aliases)

def advisory_data(self) -> Iterable[AdvisoryData]:
try:
Expand Down
10 changes: 10 additions & 0 deletions vulnerabilities/importers/openssl.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ class OpensslImporter(Importer):
url = "https://www.openssl.org/news/vulnerabilities.xml"
importer_name = "OpenSSL Importer"

@classmethod
def get_advisory_id(cls, aliases: list[str]) -> str:
"""
Return the Advisory ID for the given aliases.
"""
for alias in aliases:
if alias.startswith("VC-OPENSSL-"):
return alias
return cls.get_cve_id(aliases)

def fetch(self):
response = requests.get(url=self.url)
if not response.status_code == 200:
Expand Down
7 changes: 7 additions & 0 deletions vulnerabilities/importers/oss_fuzz.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ class OSSFuzzImporter(Importer):
url = "git+https://github.com/google/oss-fuzz-vulns"
importer_name = "OSS Fuzz Importer"

@classmethod
def get_advisory_id(cls, aliases: list[str]) -> str:
"""
Return the Advisory ID for the given aliases.
"""
return cls.get_cve_id(aliases)

def advisory_data(self) -> Iterable[AdvisoryData]:
try:
self.clone(repo_url=self.url)
Expand Down
7 changes: 7 additions & 0 deletions vulnerabilities/importers/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ class PostgreSQLImporter(Importer):
spdx_license_expression = "PostgreSQL"
importer_name = "PostgreSQL Importer"

@classmethod
def get_advisory_id(cls, aliases: list[str]) -> str:
"""
Return the Advisory ID for the given aliases.
"""
return cls.get_cve_id(aliases)

def advisory_data(self):
known_urls = {self.root_url}
visited_urls = set()
Expand Down
7 changes: 7 additions & 0 deletions vulnerabilities/importers/project_kb_msr2019.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ class ProjectKBMSRImporter(Importer):
license_url = "https://github.com/SAP/project-kb/blob/main/LICENSE.txt"
importer_name = "ProjectKB MSRImporter"

@classmethod
def get_advisory_id(cls, aliases: list[str]) -> str:
"""
Return the Advisory ID for the given aliases.
"""
return cls.get_cve_id(aliases)

def advisory_data(self):
raw_data = fetch_and_read_from_csv(self.url)
yield from self.to_advisories(raw_data)
Expand Down
12 changes: 12 additions & 0 deletions vulnerabilities/importers/redhat.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ class RedhatImporter(Importer):
spdx_license_expression = "CC-BY-4.0"
license_url = "https://access.redhat.com/documentation/en-us/red_hat_security_data_api/1.0/html/red_hat_security_data_api/legal-notice"
importer_name = "RedHat Importer"
requires_reference_for_advisory_id = True

@classmethod
def get_advisory_id(cls, aliases: list[str], references) -> str:
"""
Return the Advisory ID for the given aliases.
"""
for ref in references:
ref_id = ref.get("reference_id")
if ref_id and ref_id.lower().startswith("RHSA-"):
return ref_id
return cls.get_cve_id(aliases)

def advisory_data(self) -> Iterable[AdvisoryData]:
for redhat_cves in fetch_cves():
Expand Down
7 changes: 7 additions & 0 deletions vulnerabilities/importers/retiredotnet.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ def advisory_data(self) -> Iterable[AdvisoryData]:
if self.vcs_response:
self.vcs_response.delete()

@classmethod
def get_advisory_id(cls, aliases: list[str]) -> str:
"""
Return the Advisory ID for the given aliases.
"""
return cls.get_cve_id(aliases)

@staticmethod
def vuln_id_from_desc(desc):
cve_regex = re.compile(r"CVE-\d+-\d+")
Expand Down
10 changes: 10 additions & 0 deletions vulnerabilities/importers/ruby.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ class RubyImporter(Importer):
SOFTWARE.
"""

@classmethod
def get_advisory_id(cls, aliases: list[str]) -> str:
"""
Return the Advisory ID for the given aliases.
"""
for alias in aliases:
if alias.startswith("GHSA-"):
return alias
return cls.get_cve_id(aliases)

def advisory_data(self) -> Iterable[AdvisoryData]:
try:
self.clone(self.repo_url)
Expand Down
7 changes: 7 additions & 0 deletions vulnerabilities/importers/suse_oval.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.translations = {"less than": "<", "equals": "=", "greater than or equal": ">="}

@classmethod
def get_advisory_id(cls, aliases: list[str]) -> str:
"""
Return the Advisory ID for the given aliases.
"""
return cls.get_cve_id(aliases)

def _fetch(self):
page = requests.get(self.base_url).text
soup = BeautifulSoup(page, "lxml")
Expand Down
7 changes: 7 additions & 0 deletions vulnerabilities/importers/suse_scores.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ class SUSESeverityScoreImporter(Importer):
license_url = "https://ftp.suse.com/pub/projects/security/yaml/LICENSE"
importer_name = "SUSE Severity Score Importer"

@classmethod
def get_advisory_id(cls, aliases: list[str]) -> str:
"""
Return the Advisory ID for the given aliases.
"""
return cls.get_cve_id(aliases)

def advisory_data(self) -> Iterable[AdvisoryData]:
score_data = fetch_yaml(URL)
yield from self.to_advisory(score_data)
Expand Down
7 changes: 7 additions & 0 deletions vulnerabilities/importers/ubuntu.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ class UbuntuImporter(OvalImporter):
"""
importer_name = "Ubuntu OVAL Importer"

@classmethod
def get_advisory_id(cls, aliases: list[str]) -> str:
"""
Return the Advisory ID for the given aliases.
"""
return cls.get_cve_id(aliases)

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# we could avoid setting translations, and have it
Expand Down
Loading
Loading