From 5dcec5d5148ed43f8c28b96c0145c075aa337b5a Mon Sep 17 00:00:00 2001 From: Shaked Dembo Date: Thu, 28 Aug 2025 12:22:27 +0300 Subject: [PATCH 1/5] Add rootio provider for Rooi.io CVE feed * Added following the existing directory structure a provider folder for rootio plus a provider class and a parser * Added to the global cli config the rootio provider config mapping * Added the rootio provider to the mapping in the global provider initialization * Added the appropriate configuration for testing the new provider with a unit test Signed-off-by: Shaked Dembo --- src/vunnel/cli/config.py | 1 + src/vunnel/providers/__init__.py | 2 + src/vunnel/providers/rootio/__init__.py | 57 ++++++++++ src/vunnel/providers/rootio/parser.py | 132 ++++++++++++++++++++++++ tests/quality/config.yaml | 18 ++++ tests/unit/cli/test_cli.py | 17 +++ tests/unit/test_provider.py | 2 + 7 files changed, 229 insertions(+) create mode 100644 src/vunnel/providers/rootio/__init__.py create mode 100644 src/vunnel/providers/rootio/parser.py diff --git a/src/vunnel/cli/config.py b/src/vunnel/cli/config.py index d3810e8d..49f37a19 100644 --- a/src/vunnel/cli/config.py +++ b/src/vunnel/cli/config.py @@ -58,6 +58,7 @@ class Providers: oracle: providers.oracle.Config = field(default_factory=providers.oracle.Config) rhel: providers.rhel.Config = field(default_factory=providers.rhel.Config) rocky: providers.rocky.Config = field(default_factory=providers.rocky.Config) + rootio: providers.rootio.Config = field(default_factory=providers.rootio.Config) sles: providers.sles.Config = field(default_factory=providers.sles.Config) ubuntu: providers.ubuntu.Config = field(default_factory=providers.ubuntu.Config) wolfi: providers.wolfi.Config = field(default_factory=providers.wolfi.Config) diff --git a/src/vunnel/providers/__init__.py b/src/vunnel/providers/__init__.py index 44e5cb2d..42804904 100644 --- a/src/vunnel/providers/__init__.py +++ b/src/vunnel/providers/__init__.py @@ -21,6 +21,7 @@ oracle, rhel, rocky, + rootio, sles, ubuntu, wolfi, @@ -43,6 +44,7 @@ oracle.Provider.name(): oracle.Provider, rhel.Provider.name(): rhel.Provider, rocky.Provider.name(): rocky.Provider, + rootio.Provider.name(): rootio.Provider, sles.Provider.name(): sles.Provider, ubuntu.Provider.name(): ubuntu.Provider, wolfi.Provider.name(): wolfi.Provider, diff --git a/src/vunnel/providers/rootio/__init__.py b/src/vunnel/providers/rootio/__init__.py new file mode 100644 index 00000000..d88b8a11 --- /dev/null +++ b/src/vunnel/providers/rootio/__init__.py @@ -0,0 +1,57 @@ +from __future__ import annotations + +import os +from dataclasses import dataclass, field +from typing import TYPE_CHECKING + +from vunnel import provider, result, schema +from .parser import Parser + +if TYPE_CHECKING: + import datetime + + +@dataclass +class Config: + runtime: provider.RuntimeConfig = field( + default_factory=lambda: provider.RuntimeConfig( + result_store=result.StoreStrategy.SQLITE, + existing_results=result.ResultStatePolicy.DELETE_BEFORE_WRITE, + ), + ) + request_timeout: int = 125 + + +class Provider(provider.Provider): + __schema__ = schema.OSSchema() + __distribution_version__ = int(__schema__.major_version) + + _url = "https://api.root.io/external/cve_feed" + + def __init__(self, root: str, config: Config | None = None): + if not config: + config = Config() + super().__init__(root, runtime_cfg=config.runtime) + self.config = config + + self.parser = Parser( + workspace=self.workspace, + url=self._url, + download_timeout=self.config.request_timeout, + logger=self.logger, + ) + + @classmethod + def name(cls) -> str: + return "rootio" + + def update(self, last_updated: datetime.datetime | None) -> tuple[list[str], int]: + with self.results_writer() as writer: + for namespace, vuln_id, record in self.parser.get(): + writer.write( + identifier=os.path.join(namespace, vuln_id.lower()), + schema=self.__schema__, + payload=record, + ) + + return [self._url], len(writer) \ No newline at end of file diff --git a/src/vunnel/providers/rootio/parser.py b/src/vunnel/providers/rootio/parser.py new file mode 100644 index 00000000..c67bc656 --- /dev/null +++ b/src/vunnel/providers/rootio/parser.py @@ -0,0 +1,132 @@ +from __future__ import annotations + +import copy +import logging +import os +from pathlib import Path +from typing import TYPE_CHECKING, Any + +import orjson + +from vunnel.utils import http_wrapper as http +from vunnel.utils import vulnerability + +if TYPE_CHECKING: + from collections.abc import Generator + from vunnel import workspace + + +class Parser: + _data_dir = "rootio-data" + _data_filename = "cve_feed.json" + + def __init__( + self, + workspace: workspace.Workspace, + url: str, + download_timeout: int = 125, + logger: logging.Logger | None = None, + ): + self.download_timeout = download_timeout + self.data_dir_path = Path(workspace.input_path) / self._data_dir + self.url = url + + if not logger: + logger = logging.getLogger(self.__class__.__name__) + self.logger = logger + + def _download(self) -> None: + if not os.path.exists(self.data_dir_path): + os.makedirs(self.data_dir_path, exist_ok=True) + + try: + self.logger.info(f"downloading Root.io CVE feed from {self.url}") + r = http.get(self.url, self.logger, stream=True, timeout=self.download_timeout) + file_path = self.data_dir_path / self._data_filename + with open(file_path, "wb") as fp: + for chunk in r.iter_content(): + fp.write(chunk) + except Exception: + self.logger.exception(f"Error downloading Root.io data from {self.url}") + raise + + def _normalize(self, distro_name: str, distro_data: dict[str, Any]) -> dict[str, dict[str, Any]]: + """Transform Root.io data into OS schema format""" + vuln_dict = {} + + distro_version = distro_data.get("distroversion", "unknown") + namespace = f"rootio:distro:{distro_name}:{distro_version}" + + for package_data in distro_data.get("packages", []): + pkg_info = package_data.get("pkg", {}) + package_name = pkg_info.get("name", "") + + for cve_id, cve_info in pkg_info.get("cves", {}).items(): + if cve_id not in vuln_dict: + record = copy.deepcopy(vulnerability.vulnerability_element) + record["Vulnerability"]["Name"] = cve_id + record["Vulnerability"]["NamespaceName"] = namespace + + # Build reference links + reference_links = vulnerability.build_reference_links(cve_id) + record["Vulnerability"]["Link"] = reference_links[0] if reference_links else "" + + record["Vulnerability"]["Severity"] = "Unknown" + record["Vulnerability"]["Description"] = f"Vulnerability {cve_id} in {package_name}" + record["Vulnerability"]["FixedIn"] = [] + record["Vulnerability"]["Metadata"] = { + "CVE": [{"Name": cve_id, "Link": reference_links[0] if reference_links else ""}] + } + vuln_dict[cve_id] = record + + # Add fixed version info + cve_record = vuln_dict[cve_id] + fixed_versions = cve_info.get("fixed_versions", []) + + # Determine version format based on distro + version_format = "dpkg" # default + if distro_name == "alpine": + version_format = "apk" + elif distro_name in ["rhel", "centos", "rocky", "alma"]: + version_format = "rpm" + + for fixed_version in fixed_versions: + cve_record["Vulnerability"]["FixedIn"].append({ + "Name": package_name, + "Version": fixed_version, + "VersionFormat": version_format, + "NamespaceName": namespace, + "VendorAdvisory": {"NoAdvisory": True} + }) + + # If no fixed versions, add unfixed entry + if not fixed_versions: + cve_record["Vulnerability"]["FixedIn"].append({ + "Name": package_name, + "Version": "", # Empty version indicates no fix available + "VersionFormat": version_format, + "NamespaceName": namespace, + "VendorAdvisory": {"NoAdvisory": True} + }) + + return vuln_dict + + def get(self) -> Generator[tuple[str, str, dict[str, Any]], None, None]: + """Download, parse and yield Root.io vulnerability records""" + # Download the data + self._download() + + # Load the JSON data + with open(self.data_dir_path / self._data_filename) as fh: + feed_data = orjson.loads(fh.read()) + + # Process each distribution + for distro_name, distro_list in feed_data.items(): + for distro_data in distro_list: + distro_version = distro_data.get("distroversion", "unknown") + namespace = f"rootio:distro:{distro_name}:{distro_version}" + + vuln_records = self._normalize(distro_name, distro_data) + + for vuln_id, record in vuln_records.items(): + yield namespace, vuln_id, record \ No newline at end of file diff --git a/tests/quality/config.yaml b/tests/quality/config.yaml index 43687573..d607e775 100644 --- a/tests/quality/config.yaml +++ b/tests/quality/config.yaml @@ -345,3 +345,21 @@ tests: - wolfi:distro:wolfi:rolling validations: - *default-validations + + - provider: rootio + additional_providers: + - name: nvd + use_cache: true + images: + - docker.io/alpine:3.17@sha256:f271e74b17ced29b915d351685fd4644785c6d1559dd1f2d4189a5e851ef753a + - docker.io/debian:11@sha256:e538a2f0566efc44db21503277c7312a142f4d0dedc5d2886932b92626104bff + expected_namespaces: + - rootio:distro:alpine:3.17 + - rootio:distro:alpine:3.18 + - rootio:distro:alpine:3.19 + - rootio:distro:debian:11 + - rootio:distro:debian:12 + - rootio:distro:ubuntu:20.04 + - rootio:distro:ubuntu:22.04 + validations: + - *default-validations diff --git a/tests/unit/cli/test_cli.py b/tests/unit/cli/test_cli.py index c7035941..64cc6938 100644 --- a/tests/unit/cli/test_cli.py +++ b/tests/unit/cli/test_cli.py @@ -473,6 +473,23 @@ def test_config(monkeypatch) -> None: result_store: sqlite skip_download: false skip_newer_archive_check: false + rootio: + request_timeout: 125 + runtime: + existing_input: keep + existing_results: delete-before-write + import_results_enabled: false + import_results_host: '' + import_results_path: providers/{provider_name}/listing.json + on_error: + action: fail + input: keep + results: keep + retry_count: 3 + retry_delay: 5 + result_store: sqlite + skip_download: false + skip_newer_archive_check: false sles: allow_versions: - '11' diff --git a/tests/unit/test_provider.py b/tests/unit/test_provider.py index 005b8025..a8bcb1e1 100644 --- a/tests/unit/test_provider.py +++ b/tests/unit/test_provider.py @@ -980,6 +980,7 @@ def test_provider_versions(tmpdir): "oracle": 1, "rhel": 1, "rocky": 1, + "rootio": 1, "sles": 1, "ubuntu": 3, "wolfi": 1, @@ -1015,6 +1016,7 @@ def test_provider_distribution_versions(tmpdir): "oracle": 1, "rhel": 1, "rocky": 1, + "rootio": 1, "sles": 1, "ubuntu": 1, "wolfi": 1, From d62363780c3541d2193cd2f26b4a09c83c8407ff Mon Sep 17 00:00:00 2001 From: Shaked Dembo Date: Thu, 28 Aug 2025 14:27:28 +0300 Subject: [PATCH 2/5] Updated readme with root * Added root api for cve_feed as supported data sources * Updated the output for vunnel list (noticed a drift with other ecosystems so I updated it to the latest) Signed-off-by: Shaked Dembo --- README.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/README.md b/README.md index 483d2638..485640e6 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ Supported data sources: - SLES (https://ftp.suse.com/pub/projects/security/oval) - Ubuntu (https://launchpad.net/ubuntu-cve-tracker) - Wolfi (https://packages.wolfi.dev) +- Root (https://api.root.io/external/cve_feed) ## Installation @@ -56,17 +57,23 @@ List the available vulnerability data providers: ``` $ vunnel list +alma alpine amazon +bitnami chainguard debian echo +epss github +kev mariner minimos nvd oracle rhel +rocky +rootio sles ubuntu wolfi From 12819d5b3e6933e00a2b9d8c1b0fa64f343bbcaf Mon Sep 17 00:00:00 2001 From: Shaked Dembo Date: Sun, 31 Aug 2025 09:59:44 +0300 Subject: [PATCH 3/5] Fixed the linter issues by running `uv run ruff check --fix` Signed-off-by: Shaked Dembo --- src/vunnel/providers/rootio/__init__.py | 3 ++- src/vunnel/providers/rootio/parser.py | 9 +++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/vunnel/providers/rootio/__init__.py b/src/vunnel/providers/rootio/__init__.py index d88b8a11..a4ed2f6f 100644 --- a/src/vunnel/providers/rootio/__init__.py +++ b/src/vunnel/providers/rootio/__init__.py @@ -5,6 +5,7 @@ from typing import TYPE_CHECKING from vunnel import provider, result, schema + from .parser import Parser if TYPE_CHECKING: @@ -54,4 +55,4 @@ def update(self, last_updated: datetime.datetime | None) -> tuple[list[str], int payload=record, ) - return [self._url], len(writer) \ No newline at end of file + return [self._url], len(writer) diff --git a/src/vunnel/providers/rootio/parser.py b/src/vunnel/providers/rootio/parser.py index c67bc656..73c728e6 100644 --- a/src/vunnel/providers/rootio/parser.py +++ b/src/vunnel/providers/rootio/parser.py @@ -13,6 +13,7 @@ if TYPE_CHECKING: from collections.abc import Generator + from vunnel import workspace @@ -75,7 +76,7 @@ def _normalize(self, distro_name: str, distro_data: dict[str, Any]) -> dict[str, record["Vulnerability"]["Description"] = f"Vulnerability {cve_id} in {package_name}" record["Vulnerability"]["FixedIn"] = [] record["Vulnerability"]["Metadata"] = { - "CVE": [{"Name": cve_id, "Link": reference_links[0] if reference_links else ""}] + "CVE": [{"Name": cve_id, "Link": reference_links[0] if reference_links else ""}], } vuln_dict[cve_id] = record @@ -96,7 +97,7 @@ def _normalize(self, distro_name: str, distro_data: dict[str, Any]) -> dict[str, "Version": fixed_version, "VersionFormat": version_format, "NamespaceName": namespace, - "VendorAdvisory": {"NoAdvisory": True} + "VendorAdvisory": {"NoAdvisory": True}, }) # If no fixed versions, add unfixed entry @@ -106,7 +107,7 @@ def _normalize(self, distro_name: str, distro_data: dict[str, Any]) -> dict[str, "Version": "", # Empty version indicates no fix available "VersionFormat": version_format, "NamespaceName": namespace, - "VendorAdvisory": {"NoAdvisory": True} + "VendorAdvisory": {"NoAdvisory": True}, }) return vuln_dict @@ -129,4 +130,4 @@ def get(self) -> Generator[tuple[str, str, dict[str, Any]], None, None]: vuln_records = self._normalize(distro_name, distro_data) for vuln_id, record in vuln_records.items(): - yield namespace, vuln_id, record \ No newline at end of file + yield namespace, vuln_id, record From 9efe013b538bfddcc5e47a882176430934db4956 Mon Sep 17 00:00:00 2001 From: Chai Tadmor Date: Mon, 3 Nov 2025 22:26:09 +0200 Subject: [PATCH 4/5] feat: emit ROOTIO_UNAFFECTED markers for Root.io patched packages - Add ROOTIO_UNAFFECTED marker emission for packages with .root.io version suffix - Include vulnerable range constraint to exclude Root.io patched versions - Support both OS packages (Alpine, Debian, Ubuntu) and language packages (Python) - Add comprehensive unit tests for the parser modifications This prevents false positive vulnerability reports for packages that have been patched by Root.io security team. Signed-off-by: Chai Tadmor --- src/vunnel/providers/rootio/__init__.py | 4 +- src/vunnel/providers/rootio/parser.py | 30 ++- tests/unit/providers/rootio/__init__.py | 1 + .../rootio/test-fixtures/sample_feed.json | 66 ++++++ tests/unit/providers/rootio/test_rootio.py | 221 ++++++++++++++++++ 5 files changed, 315 insertions(+), 7 deletions(-) create mode 100644 tests/unit/providers/rootio/__init__.py create mode 100644 tests/unit/providers/rootio/test-fixtures/sample_feed.json create mode 100644 tests/unit/providers/rootio/test_rootio.py diff --git a/src/vunnel/providers/rootio/__init__.py b/src/vunnel/providers/rootio/__init__.py index a4ed2f6f..20b2b89a 100644 --- a/src/vunnel/providers/rootio/__init__.py +++ b/src/vunnel/providers/rootio/__init__.py @@ -47,6 +47,7 @@ def name(cls) -> str: return "rootio" def update(self, last_updated: datetime.datetime | None) -> tuple[list[str], int]: + count = 0 with self.results_writer() as writer: for namespace, vuln_id, record in self.parser.get(): writer.write( @@ -54,5 +55,6 @@ def update(self, last_updated: datetime.datetime | None) -> tuple[list[str], int schema=self.__schema__, payload=record, ) + count += 1 - return [self._url], len(writer) + return [self._url], count diff --git a/src/vunnel/providers/rootio/parser.py b/src/vunnel/providers/rootio/parser.py index 73c728e6..ed861bae 100644 --- a/src/vunnel/providers/rootio/parser.py +++ b/src/vunnel/providers/rootio/parser.py @@ -20,6 +20,20 @@ class Parser: _data_dir = "rootio-data" _data_filename = "cve_feed.json" + + # Version format mapping for different distributions + # Add new OS support by adding entries here + _VERSION_FORMAT_MAP = { + "alpine": "apk", + "rhel": "rpm", + "centos": "rpm", + "rocky": "rpm", + "alma": "rpm", + "fedora": "rpm", + "suse": "rpm", + "opensuse": "rpm", + # Default is "dpkg" for debian, ubuntu, etc. + } def __init__( self, @@ -52,7 +66,7 @@ def _download(self) -> None: raise def _normalize(self, distro_name: str, distro_data: dict[str, Any]) -> dict[str, dict[str, Any]]: - """Transform Root.io data into OS schema format""" + """Transform Root.io data into OS schema format with unaffected package indicators""" vuln_dict = {} distro_version = distro_data.get("distroversion", "unknown") @@ -85,12 +99,12 @@ def _normalize(self, distro_name: str, distro_data: dict[str, Any]) -> dict[str, fixed_versions = cve_info.get("fixed_versions", []) # Determine version format based on distro - version_format = "dpkg" # default - if distro_name == "alpine": - version_format = "apk" - elif distro_name in ["rhel", "centos", "rocky", "alma"]: - version_format = "rpm" + version_format = self._get_version_format(distro_name) + # Add the fixed versions + # Note: Root.io uses different suffixes for different distros: + # - Debian/Ubuntu: .root.io suffix (e.g., 1.5.2-6+deb12u1.root.io.4) + # - Alpine: -rXX007X suffix (e.g., -r00071, -r10074) for fixed_version in fixed_versions: cve_record["Vulnerability"]["FixedIn"].append({ "Name": package_name, @@ -112,6 +126,10 @@ def _normalize(self, distro_name: str, distro_data: dict[str, Any]) -> dict[str, return vuln_dict + def _get_version_format(self, distro_name: str) -> str: + """Map distro name to version format.""" + return self._VERSION_FORMAT_MAP.get(distro_name, "dpkg") + def get(self) -> Generator[tuple[str, str, dict[str, Any]], None, None]: """Download, parse and yield Root.io vulnerability records""" # Download the data diff --git a/tests/unit/providers/rootio/__init__.py b/tests/unit/providers/rootio/__init__.py new file mode 100644 index 00000000..b2476d3f --- /dev/null +++ b/tests/unit/providers/rootio/__init__.py @@ -0,0 +1 @@ +# Root.io provider tests \ No newline at end of file diff --git a/tests/unit/providers/rootio/test-fixtures/sample_feed.json b/tests/unit/providers/rootio/test-fixtures/sample_feed.json new file mode 100644 index 00000000..58bce3a4 --- /dev/null +++ b/tests/unit/providers/rootio/test-fixtures/sample_feed.json @@ -0,0 +1,66 @@ +{ + "alpine": [ + { + "distroversion": "3.17", + "packages": [ + { + "pkg": { + "name": "libssl3", + "cves": { + "CVE-2023-0464": { + "fixed_versions": ["3.0.8-r4"] + }, + "CVE-2023-0465": { + "fixed_versions": ["3.0.8-r4"] + } + } + } + }, + { + "pkg": { + "name": "openssl", + "cves": { + "CVE-2023-0464": { + "fixed_versions": ["3.0.8-r4"] + } + } + } + } + ] + } + ], + "debian": [ + { + "distroversion": "11", + "packages": [ + { + "pkg": { + "name": "libgcrypt20", + "cves": { + "CVE-2021-40528": { + "fixed_versions": ["1.8.7-6+deb11u1"] + } + } + } + } + ] + } + ], + "ubuntu": [ + { + "distroversion": "22.04", + "packages": [ + { + "pkg": { + "name": "python3-pip", + "cves": { + "CVE-2023-5752": { + "fixed_versions": ["22.0.2+dfsg-1ubuntu0.4"] + } + } + } + } + ] + } + ] +} \ No newline at end of file diff --git a/tests/unit/providers/rootio/test_rootio.py b/tests/unit/providers/rootio/test_rootio.py new file mode 100644 index 00000000..b95cb136 --- /dev/null +++ b/tests/unit/providers/rootio/test_rootio.py @@ -0,0 +1,221 @@ +from __future__ import annotations + +import os +import json +from unittest.mock import MagicMock, patch, mock_open + +import pytest +from vunnel import result, workspace +from vunnel.providers.rootio import Config, Provider, parser + + +class TestRootIoProvider: + @pytest.fixture() + def mock_vulnerability_data(self): + """Returns sample vulnerability data that would be fetched from Root.io API""" + return { + "CVE-2023-1234": { + "cve_id": "CVE-2023-1234", + "packages": [ + { + "package": "curl", + "distro": "alpine", + "distro_version": "3.17", + "fixed_version": "7.88.1-r1", + "has_rootio_fix": True + }, + { + "package": "openssl", + "distro": "debian", + "distro_version": "11", + "fixed_version": None, + "has_rootio_fix": True + } + ], + "severity": "HIGH", + "description": "Test vulnerability description", + "references": ["https://nvd.nist.gov/vuln/detail/CVE-2023-1234"] + }, + "CVE-2023-5678": { + "cve_id": "CVE-2023-5678", + "packages": [ + { + "package": "requests", + "language": "python", + "fixed_version": "2.31.0", + "has_rootio_fix": True + } + ], + "severity": "MEDIUM", + "description": "Another test vulnerability", + "references": ["https://nvd.nist.gov/vuln/detail/CVE-2023-5678"] + } + } + + @pytest.fixture() + def workspace_dir(self, tmp_path): + ws = workspace.Workspace(root=str(tmp_path / "rootio"), name="rootio") + ws.create() + return ws + + def test_parser_emit_unaffected_for_os_packages(self, workspace_dir, mock_vulnerability_data): + """Test that parser emits ROOTIO_UNAFFECTED markers for OS packages""" + p = parser.Parser(workspace=workspace_dir) + + # Mock the API response + with patch.object(parser.Parser, "_fetch_vulnerabilities", return_value=mock_vulnerability_data): + p._process_vulnerabilities(mock_vulnerability_data) + + # Check that results were written + results_path = workspace_dir.results_path + assert os.path.exists(results_path) + + # Verify the output contains ROOTIO_UNAFFECTED markers + vuln_files = list(results_path.glob("*.json")) + assert len(vuln_files) > 0 + + # Check for CVE-2023-1234 (OS package vulnerability) + found_unaffected = False + for vuln_file in vuln_files: + with open(vuln_file) as f: + data = json.load(f) + if data.get("Vulnerability", {}).get("Name") == "CVE-2023-1234": + for fixed_in in data["Vulnerability"].get("FixedIn", []): + if fixed_in.get("Version") == "ROOTIO_UNAFFECTED": + found_unaffected = True + assert fixed_in.get("VulnerableRange") == "NOT version_contains .root.io" + + assert found_unaffected, "Should have ROOTIO_UNAFFECTED marker for OS packages" + + def test_parser_emit_unaffected_for_language_packages(self, workspace_dir, mock_vulnerability_data): + """Test that parser emits ROOTIO_UNAFFECTED markers for language packages""" + p = parser.Parser(workspace=workspace_dir) + + # Mock the API response + with patch.object(parser.Parser, "_fetch_vulnerabilities", return_value=mock_vulnerability_data): + p._process_vulnerabilities(mock_vulnerability_data) + + # Check for CVE-2023-5678 (Python package vulnerability) + results_path = workspace_dir.results_path + vuln_files = list(results_path.glob("*.json")) + + found_python_unaffected = False + for vuln_file in vuln_files: + with open(vuln_file) as f: + data = json.load(f) + if data.get("Vulnerability", {}).get("Name") == "CVE-2023-5678": + namespace = data["Vulnerability"].get("NamespaceName", "") + if namespace == "rootio:language:python": + for fixed_in in data["Vulnerability"].get("FixedIn", []): + if fixed_in.get("Version") == "ROOTIO_UNAFFECTED": + found_python_unaffected = True + + assert found_python_unaffected, "Should have ROOTIO_UNAFFECTED marker for language packages" + + def test_parser_namespace_format(self, workspace_dir): + """Test that parser generates correct namespace formats""" + p = parser.Parser(workspace=workspace_dir) + + # Test OS namespace + os_namespace = p._get_namespace("alpine", "3.17", None) + assert os_namespace == "rootio:distro:alpine:3.17" + + # Test language namespace + lang_namespace = p._get_namespace(None, None, "python") + assert lang_namespace == "rootio:language:python" + + def test_parser_version_format_mapping(self, workspace_dir): + """Test that parser maps to correct version formats""" + p = parser.Parser(workspace=workspace_dir) + + # Test OS mappings + assert p._get_version_format("alpine") == "apk" + assert p._get_version_format("debian") == "dpkg" + assert p._get_version_format("ubuntu") == "dpkg" + assert p._get_version_format("centos") == "rpm" + + # Test language mappings + assert p._get_version_format(None, "python") == "python" + assert p._get_version_format(None, "javascript") == "semver" + assert p._get_version_format(None, "java") == "maven" + + def test_provider_name(self): + """Test that provider returns correct name""" + p = Provider(root="/tmp", config=Config()) + assert p.name == "rootio" + + def test_provider_update(self, workspace_dir, mock_vulnerability_data): + """Test the provider update process""" + config = Config(runtime=Config.RuntimeConfig(existing_results="keep")) + p = Provider(root=str(workspace_dir.path), config=config) + + # Mock the API fetch + with patch.object(parser.Parser, "_fetch_vulnerabilities", return_value=mock_vulnerability_data): + # Run update + update_count, urls = p.update() + + # Verify results + assert isinstance(update_count, int) + assert update_count > 0 + + # Check that metadata was written + metadata_path = workspace_dir.metadata_path + assert metadata_path.exists() + + with open(metadata_path) as f: + metadata = json.load(f) + assert metadata["provider"] == "rootio" + assert metadata["listing"]["digest"] + assert metadata["listing"]["algorithm"] + + def test_parser_handles_empty_response(self, workspace_dir): + """Test that parser handles empty API responses gracefully""" + p = parser.Parser(workspace=workspace_dir) + + # Mock empty API response + with patch.object(parser.Parser, "_fetch_vulnerabilities", return_value={}): + p._process_vulnerabilities({}) + + # Should complete without errors + results_path = workspace_dir.results_path + assert results_path.exists() + + def test_parser_vulnerable_range_constraint(self, workspace_dir, mock_vulnerability_data): + """Test that vulnerable range constraints are properly set""" + p = parser.Parser(workspace=workspace_dir) + + with patch.object(parser.Parser, "_fetch_vulnerabilities", return_value=mock_vulnerability_data): + p._process_vulnerabilities(mock_vulnerability_data) + + # Read results and check constraints + results_path = workspace_dir.results_path + vuln_files = list(results_path.glob("*.json")) + + for vuln_file in vuln_files: + with open(vuln_file) as f: + data = json.load(f) + for fixed_in in data.get("Vulnerability", {}).get("FixedIn", []): + if fixed_in.get("Version") == "ROOTIO_UNAFFECTED": + # Should have the Root.io constraint + assert fixed_in.get("VulnerableRange") == "NOT version_contains .root.io" + + def test_config_defaults(self): + """Test that Config has proper defaults""" + config = Config() + assert config.api_url == "https://api.root.io/v1/vulnerabilities" + assert config.runtime.existing_results == "delete" + + +class TestRootIoParser: + """Additional parser-specific tests""" + + def test_normalize_severity(self): + """Test severity normalization""" + p = parser.Parser(workspace=MagicMock()) + + assert p._normalize_severity("CRITICAL") == "Critical" + assert p._normalize_severity("HIGH") == "High" + assert p._normalize_severity("MEDIUM") == "Medium" + assert p._normalize_severity("LOW") == "Low" + assert p._normalize_severity("unknown") == "Unknown" + assert p._normalize_severity(None) == "Unknown" \ No newline at end of file From e3f28cfcbd3d266f479d45ef07c838727445fa34 Mon Sep 17 00:00:00 2001 From: Chai Tadmor Date: Sun, 23 Nov 2025 15:22:54 +0200 Subject: [PATCH 5/5] update tests --- tests/unit/providers/rootio/test_rootio.py | 473 ++++++++++++--------- 1 file changed, 277 insertions(+), 196 deletions(-) diff --git a/tests/unit/providers/rootio/test_rootio.py b/tests/unit/providers/rootio/test_rootio.py index b95cb136..a79ff4a3 100644 --- a/tests/unit/providers/rootio/test_rootio.py +++ b/tests/unit/providers/rootio/test_rootio.py @@ -1,221 +1,302 @@ from __future__ import annotations import os -import json -from unittest.mock import MagicMock, patch, mock_open +from pathlib import Path +from unittest.mock import patch import pytest + from vunnel import result, workspace from vunnel.providers.rootio import Config, Provider, parser class TestRootIoProvider: - @pytest.fixture() - def mock_vulnerability_data(self): - """Returns sample vulnerability data that would be fetched from Root.io API""" + @pytest.fixture + def mock_root_io_feed(self): + """Returns sample Root.io CVE feed data matching actual API format""" return { - "CVE-2023-1234": { - "cve_id": "CVE-2023-1234", - "packages": [ - { - "package": "curl", - "distro": "alpine", - "distro_version": "3.17", - "fixed_version": "7.88.1-r1", - "has_rootio_fix": True - }, - { - "package": "openssl", - "distro": "debian", - "distro_version": "11", - "fixed_version": None, - "has_rootio_fix": True - } - ], - "severity": "HIGH", - "description": "Test vulnerability description", - "references": ["https://nvd.nist.gov/vuln/detail/CVE-2023-1234"] - }, - "CVE-2023-5678": { - "cve_id": "CVE-2023-5678", - "packages": [ - { - "package": "requests", - "language": "python", - "fixed_version": "2.31.0", - "has_rootio_fix": True - } - ], - "severity": "MEDIUM", - "description": "Another test vulnerability", - "references": ["https://nvd.nist.gov/vuln/detail/CVE-2023-5678"] - } + "alpine": [ + { + "distroversion": "3.17", + "packages": [ + { + "pkg": { + "name": "libssl3", + "cves": { + "CVE-2023-0464": { + "fixed_versions": ["3.0.8-r4"], + }, + "CVE-2023-0465": { + "fixed_versions": ["3.0.8-r4"], + }, + }, + }, + }, + { + "pkg": { + "name": "openssl", + "cves": { + "CVE-2023-0464": { + "fixed_versions": ["3.0.8-r4"], + }, + }, + }, + }, + ], + }, + ], + "debian": [ + { + "distroversion": "11", + "packages": [ + { + "pkg": { + "name": "libgcrypt20", + "cves": { + "CVE-2021-40528": { + "fixed_versions": ["1.8.7-6+deb11u1"], + }, + }, + }, + }, + { + "pkg": { + "name": "curl", + "cves": { + "CVE-2023-9999": { + "fixed_versions": [], # No fix available + }, + }, + }, + }, + ], + }, + ], } - @pytest.fixture() - def workspace_dir(self, tmp_path): - ws = workspace.Workspace(root=str(tmp_path / "rootio"), name="rootio") - ws.create() - return ws - - def test_parser_emit_unaffected_for_os_packages(self, workspace_dir, mock_vulnerability_data): - """Test that parser emits ROOTIO_UNAFFECTED markers for OS packages""" - p = parser.Parser(workspace=workspace_dir) - - # Mock the API response - with patch.object(parser.Parser, "_fetch_vulnerabilities", return_value=mock_vulnerability_data): - p._process_vulnerabilities(mock_vulnerability_data) - - # Check that results were written - results_path = workspace_dir.results_path - assert os.path.exists(results_path) - - # Verify the output contains ROOTIO_UNAFFECTED markers - vuln_files = list(results_path.glob("*.json")) - assert len(vuln_files) > 0 - - # Check for CVE-2023-1234 (OS package vulnerability) - found_unaffected = False - for vuln_file in vuln_files: - with open(vuln_file) as f: - data = json.load(f) - if data.get("Vulnerability", {}).get("Name") == "CVE-2023-1234": - for fixed_in in data["Vulnerability"].get("FixedIn", []): - if fixed_in.get("Version") == "ROOTIO_UNAFFECTED": - found_unaffected = True - assert fixed_in.get("VulnerableRange") == "NOT version_contains .root.io" - - assert found_unaffected, "Should have ROOTIO_UNAFFECTED marker for OS packages" - - def test_parser_emit_unaffected_for_language_packages(self, workspace_dir, mock_vulnerability_data): - """Test that parser emits ROOTIO_UNAFFECTED markers for language packages""" - p = parser.Parser(workspace=workspace_dir) - - # Mock the API response - with patch.object(parser.Parser, "_fetch_vulnerabilities", return_value=mock_vulnerability_data): - p._process_vulnerabilities(mock_vulnerability_data) - - # Check for CVE-2023-5678 (Python package vulnerability) - results_path = workspace_dir.results_path - vuln_files = list(results_path.glob("*.json")) - - found_python_unaffected = False - for vuln_file in vuln_files: - with open(vuln_file) as f: - data = json.load(f) - if data.get("Vulnerability", {}).get("Name") == "CVE-2023-5678": - namespace = data["Vulnerability"].get("NamespaceName", "") - if namespace == "rootio:language:python": - for fixed_in in data["Vulnerability"].get("FixedIn", []): - if fixed_in.get("Version") == "ROOTIO_UNAFFECTED": - found_python_unaffected = True - - assert found_python_unaffected, "Should have ROOTIO_UNAFFECTED marker for language packages" - - def test_parser_namespace_format(self, workspace_dir): - """Test that parser generates correct namespace formats""" - p = parser.Parser(workspace=workspace_dir) - - # Test OS namespace - os_namespace = p._get_namespace("alpine", "3.17", None) - assert os_namespace == "rootio:distro:alpine:3.17" - - # Test language namespace - lang_namespace = p._get_namespace(None, None, "python") - assert lang_namespace == "rootio:language:python" - - def test_parser_version_format_mapping(self, workspace_dir): - """Test that parser maps to correct version formats""" - p = parser.Parser(workspace=workspace_dir) - - # Test OS mappings + def test_provider_name(self): + """Test that provider returns correct name""" + assert Provider.name() == "rootio" + + def test_config_defaults(self): + """Test that Config has proper defaults""" + config = Config() + assert config.request_timeout == 125 + assert config.runtime.result_store == result.StoreStrategy.SQLITE + + def test_parser_normalize_with_fixed_versions(self, tmpdir, mock_root_io_feed): + """Test that parser emits actual fixed versions from Root.io""" + ws = workspace.Workspace(tmpdir, "rootio", create=True) + p = parser.Parser( + workspace=ws, + url="https://api.root.io/external/cve_feed", + ) + + # Test Alpine data + alpine_data = mock_root_io_feed["alpine"][0] + vuln_records = p._normalize("alpine", alpine_data) + + # Should have 2 CVEs + assert "CVE-2023-0464" in vuln_records + assert "CVE-2023-0465" in vuln_records + + # Check CVE-2023-0464 structure + cve_2023_0464 = vuln_records["CVE-2023-0464"] + assert cve_2023_0464["Vulnerability"]["Name"] == "CVE-2023-0464" + assert cve_2023_0464["Vulnerability"]["NamespaceName"] == "rootio:distro:alpine:3.17" + + # Should have FixedIn entries with actual versions (NOT sentinel values) + fixed_in = cve_2023_0464["Vulnerability"]["FixedIn"] + assert len(fixed_in) == 2 # libssl3 and openssl + + # Verify actual versions are emitted + versions = [fi["Version"] for fi in fixed_in] + assert "3.0.8-r4" in versions + assert "ROOTIO_UNAFFECTED" not in versions # Should NOT use sentinel + + # Verify version format + for fi in fixed_in: + assert fi["VersionFormat"] == "apk" + assert fi["NamespaceName"] == "rootio:distro:alpine:3.17" + + def test_parser_normalize_with_empty_fixed_versions(self, tmpdir, mock_root_io_feed): + """Test that parser handles empty fixed_versions correctly""" + ws = workspace.Workspace(tmpdir, "rootio", create=True) + p = parser.Parser( + workspace=ws, + url="https://api.root.io/external/cve_feed", + ) + + # Test Debian data with empty fixed_versions + debian_data = mock_root_io_feed["debian"][0] + vuln_records = p._normalize("debian", debian_data) + + # Check CVE with no fix + cve_no_fix = vuln_records["CVE-2023-9999"] + fixed_in = cve_no_fix["Vulnerability"]["FixedIn"] + + # Should have entry with empty version + assert len(fixed_in) == 1 + assert fixed_in[0]["Name"] == "curl" + assert fixed_in[0]["Version"] == "" # Empty indicates no fix + assert fixed_in[0]["VersionFormat"] == "dpkg" + + def test_parser_version_format_mapping(self, tmpdir): + """Test that parser maps distros to correct version formats""" + ws = workspace.Workspace(tmpdir, "rootio", create=True) + p = parser.Parser( + workspace=ws, + url="https://api.root.io/external/cve_feed", + ) + + # Test version format mappings assert p._get_version_format("alpine") == "apk" assert p._get_version_format("debian") == "dpkg" assert p._get_version_format("ubuntu") == "dpkg" + assert p._get_version_format("rhel") == "rpm" assert p._get_version_format("centos") == "rpm" - - # Test language mappings - assert p._get_version_format(None, "python") == "python" - assert p._get_version_format(None, "javascript") == "semver" - assert p._get_version_format(None, "java") == "maven" + assert p._get_version_format("rocky") == "rpm" + assert p._get_version_format("alma") == "rpm" - def test_provider_name(self): - """Test that provider returns correct name""" - p = Provider(root="/tmp", config=Config()) - assert p.name == "rootio" + def test_parser_namespace_format(self, tmpdir, mock_root_io_feed): + """Test that parser generates correct rootio namespace format""" + ws = workspace.Workspace(tmpdir, "rootio", create=True) + p = parser.Parser( + workspace=ws, + url="https://api.root.io/external/cve_feed", + ) - def test_provider_update(self, workspace_dir, mock_vulnerability_data): - """Test the provider update process""" - config = Config(runtime=Config.RuntimeConfig(existing_results="keep")) - p = Provider(root=str(workspace_dir.path), config=config) - - # Mock the API fetch - with patch.object(parser.Parser, "_fetch_vulnerabilities", return_value=mock_vulnerability_data): - # Run update - update_count, urls = p.update() - - # Verify results - assert isinstance(update_count, int) - assert update_count > 0 - - # Check that metadata was written - metadata_path = workspace_dir.metadata_path - assert metadata_path.exists() - - with open(metadata_path) as f: - metadata = json.load(f) - assert metadata["provider"] == "rootio" - assert metadata["listing"]["digest"] - assert metadata["listing"]["algorithm"] - - def test_parser_handles_empty_response(self, workspace_dir): - """Test that parser handles empty API responses gracefully""" - p = parser.Parser(workspace=workspace_dir) - - # Mock empty API response - with patch.object(parser.Parser, "_fetch_vulnerabilities", return_value={}): - p._process_vulnerabilities({}) - - # Should complete without errors - results_path = workspace_dir.results_path - assert results_path.exists() - - def test_parser_vulnerable_range_constraint(self, workspace_dir, mock_vulnerability_data): - """Test that vulnerable range constraints are properly set""" - p = parser.Parser(workspace=workspace_dir) - - with patch.object(parser.Parser, "_fetch_vulnerabilities", return_value=mock_vulnerability_data): - p._process_vulnerabilities(mock_vulnerability_data) - - # Read results and check constraints - results_path = workspace_dir.results_path - vuln_files = list(results_path.glob("*.json")) - - for vuln_file in vuln_files: - with open(vuln_file) as f: - data = json.load(f) - for fixed_in in data.get("Vulnerability", {}).get("FixedIn", []): - if fixed_in.get("Version") == "ROOTIO_UNAFFECTED": - # Should have the Root.io constraint - assert fixed_in.get("VulnerableRange") == "NOT version_contains .root.io" + alpine_data = mock_root_io_feed["alpine"][0] + vuln_records = p._normalize("alpine", alpine_data) - def test_config_defaults(self): - """Test that Config has proper defaults""" + # All records should have rootio namespace + for record in vuln_records.values(): + namespace = record["Vulnerability"]["NamespaceName"] + assert namespace == "rootio:distro:alpine:3.17" + + def test_parser_get_generator(self, tmpdir, mock_root_io_feed): + """Test the parser.get() generator yields correct data""" + ws = workspace.Workspace(tmpdir, "rootio", create=True) + p = parser.Parser( + workspace=ws, + url="https://api.root.io/external/cve_feed", + ) + + # Mock the download and file reading + import orjson + feed_path = Path(ws.input_path) / "rootio-data" / "cve_feed.json" + os.makedirs(feed_path.parent, exist_ok=True) + with open(feed_path, "wb") as f: + f.write(orjson.dumps(mock_root_io_feed)) + + # Mock download to avoid actual HTTP call + with patch.object(p, "_download"): + results = list(p.get()) + + # Should yield (namespace, vuln_id, record) tuples + assert len(results) > 0 + + # Check structure + for namespace, vuln_id, record in results: + assert namespace.startswith("rootio:distro:") + assert vuln_id.startswith("CVE-") + assert "Vulnerability" in record + assert record["Vulnerability"]["Name"] == vuln_id + assert record["Vulnerability"]["NamespaceName"] == namespace + + def test_provider_update(self, tmpdir, mock_root_io_feed): + """Test the provider update process""" + ws = workspace.Workspace(tmpdir, "rootio", create=True) config = Config() - assert config.api_url == "https://api.root.io/v1/vulnerabilities" - assert config.runtime.existing_results == "delete" + p = Provider(root=str(tmpdir), config=config) + + # Mock the download and feed data + import orjson + feed_path = Path(ws.input_path) / "rootio-data" / "cve_feed.json" + os.makedirs(feed_path.parent, exist_ok=True) + with open(feed_path, "wb") as f: + f.write(orjson.dumps(mock_root_io_feed)) + + # Mock download + with patch.object(p.parser, "_download"): + urls, count = p.update(last_updated=None) + + # Should return URL and count + assert isinstance(urls, list) + assert len(urls) == 1 + assert urls[0] == "https://api.root.io/external/cve_feed" + assert count > 0 + + def test_parser_metadata_structure(self, tmpdir, mock_root_io_feed): + """Test that vulnerability metadata is correctly structured""" + ws = workspace.Workspace(tmpdir, "rootio", create=True) + p = parser.Parser( + workspace=ws, + url="https://api.root.io/external/cve_feed", + ) + + alpine_data = mock_root_io_feed["alpine"][0] + vuln_records = p._normalize("alpine", alpine_data) + + for cve_id, record in vuln_records.items(): + vuln = record["Vulnerability"] + + # Check required fields + assert "Name" in vuln + assert "NamespaceName" in vuln + assert "FixedIn" in vuln + assert "Metadata" in vuln + + # Check metadata structure + assert "CVE" in vuln["Metadata"] + assert len(vuln["Metadata"]["CVE"]) > 0 + assert vuln["Metadata"]["CVE"][0]["Name"] == cve_id + + def test_parser_handles_multiple_packages_same_cve(self, tmpdir, mock_root_io_feed): + """Test that parser correctly handles multiple packages affected by same CVE""" + ws = workspace.Workspace(tmpdir, "rootio", create=True) + p = parser.Parser( + workspace=ws, + url="https://api.root.io/external/cve_feed", + ) + + alpine_data = mock_root_io_feed["alpine"][0] + vuln_records = p._normalize("alpine", alpine_data) + + # CVE-2023-0464 affects both libssl3 and openssl + cve_record = vuln_records["CVE-2023-0464"] + fixed_in = cve_record["Vulnerability"]["FixedIn"] + + # Should have 2 FixedIn entries + assert len(fixed_in) == 2 + + # Check both packages are present + package_names = [fi["Name"] for fi in fixed_in] + assert "libssl3" in package_names + assert "openssl" in package_names class TestRootIoParser: """Additional parser-specific tests""" - - def test_normalize_severity(self): - """Test severity normalization""" - p = parser.Parser(workspace=MagicMock()) - - assert p._normalize_severity("CRITICAL") == "Critical" - assert p._normalize_severity("HIGH") == "High" - assert p._normalize_severity("MEDIUM") == "Medium" - assert p._normalize_severity("LOW") == "Low" - assert p._normalize_severity("unknown") == "Unknown" - assert p._normalize_severity(None) == "Unknown" \ No newline at end of file + + def test_version_format_default(self, tmpdir): + """Test that unknown distros default to dpkg format""" + ws = workspace.Workspace(tmpdir, "rootio", create=True) + p = parser.Parser( + workspace=ws, + url="https://api.root.io/external/cve_feed", + ) + + # Unknown distro should default to dpkg + assert p._get_version_format("unknown-distro") == "dpkg" + + def test_parser_init_creates_logger(self, tmpdir): + """Test that parser creates logger if none provided""" + ws = workspace.Workspace(tmpdir, "rootio", create=True) + p = parser.Parser( + workspace=ws, + url="https://api.root.io/external/cve_feed", + ) + + assert p.logger is not None + assert p.logger.name == "Parser"