diff --git a/README.md b/README.md index 483d2638..485640e6 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ Supported data sources: - SLES (https://ftp.suse.com/pub/projects/security/oval) - Ubuntu (https://launchpad.net/ubuntu-cve-tracker) - Wolfi (https://packages.wolfi.dev) +- Root (https://api.root.io/external/cve_feed) ## Installation @@ -56,17 +57,23 @@ List the available vulnerability data providers: ``` $ vunnel list +alma alpine amazon +bitnami chainguard debian echo +epss github +kev mariner minimos nvd oracle rhel +rocky +rootio sles ubuntu wolfi diff --git a/src/vunnel/cli/config.py b/src/vunnel/cli/config.py index d3810e8d..49f37a19 100644 --- a/src/vunnel/cli/config.py +++ b/src/vunnel/cli/config.py @@ -58,6 +58,7 @@ class Providers: oracle: providers.oracle.Config = field(default_factory=providers.oracle.Config) rhel: providers.rhel.Config = field(default_factory=providers.rhel.Config) rocky: providers.rocky.Config = field(default_factory=providers.rocky.Config) + rootio: providers.rootio.Config = field(default_factory=providers.rootio.Config) sles: providers.sles.Config = field(default_factory=providers.sles.Config) ubuntu: providers.ubuntu.Config = field(default_factory=providers.ubuntu.Config) wolfi: providers.wolfi.Config = field(default_factory=providers.wolfi.Config) diff --git a/src/vunnel/providers/__init__.py b/src/vunnel/providers/__init__.py index 44e5cb2d..42804904 100644 --- a/src/vunnel/providers/__init__.py +++ b/src/vunnel/providers/__init__.py @@ -21,6 +21,7 @@ oracle, rhel, rocky, + rootio, sles, ubuntu, wolfi, @@ -43,6 +44,7 @@ oracle.Provider.name(): oracle.Provider, rhel.Provider.name(): rhel.Provider, rocky.Provider.name(): rocky.Provider, + rootio.Provider.name(): rootio.Provider, sles.Provider.name(): sles.Provider, ubuntu.Provider.name(): ubuntu.Provider, wolfi.Provider.name(): wolfi.Provider, diff --git a/src/vunnel/providers/rootio/__init__.py b/src/vunnel/providers/rootio/__init__.py new file mode 100644 index 00000000..20b2b89a --- /dev/null +++ b/src/vunnel/providers/rootio/__init__.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +import os +from dataclasses import dataclass, field +from typing import TYPE_CHECKING + +from vunnel import provider, result, schema + +from .parser import Parser + +if TYPE_CHECKING: + import datetime + + +@dataclass +class Config: + runtime: provider.RuntimeConfig = field( + default_factory=lambda: provider.RuntimeConfig( + result_store=result.StoreStrategy.SQLITE, + existing_results=result.ResultStatePolicy.DELETE_BEFORE_WRITE, + ), + ) + request_timeout: int = 125 + + +class Provider(provider.Provider): + __schema__ = schema.OSSchema() + __distribution_version__ = int(__schema__.major_version) + + _url = "https://api.root.io/external/cve_feed" + + def __init__(self, root: str, config: Config | None = None): + if not config: + config = Config() + super().__init__(root, runtime_cfg=config.runtime) + self.config = config + + self.parser = Parser( + workspace=self.workspace, + url=self._url, + download_timeout=self.config.request_timeout, + logger=self.logger, + ) + + @classmethod + def name(cls) -> str: + return "rootio" + + def update(self, last_updated: datetime.datetime | None) -> tuple[list[str], int]: + count = 0 + with self.results_writer() as writer: + for namespace, vuln_id, record in self.parser.get(): + writer.write( + identifier=os.path.join(namespace, vuln_id.lower()), + schema=self.__schema__, + payload=record, + ) + count += 1 + + return [self._url], count diff --git a/src/vunnel/providers/rootio/parser.py b/src/vunnel/providers/rootio/parser.py new file mode 100644 index 00000000..ed861bae --- /dev/null +++ b/src/vunnel/providers/rootio/parser.py @@ -0,0 +1,151 @@ +from __future__ import annotations + +import copy +import logging +import os +from pathlib import Path +from typing import TYPE_CHECKING, Any + +import orjson + +from vunnel.utils import http_wrapper as http +from vunnel.utils import vulnerability + +if TYPE_CHECKING: + from collections.abc import Generator + + from vunnel import workspace + + +class Parser: + _data_dir = "rootio-data" + _data_filename = "cve_feed.json" + + # Version format mapping for different distributions + # Add new OS support by adding entries here + _VERSION_FORMAT_MAP = { + "alpine": "apk", + "rhel": "rpm", + "centos": "rpm", + "rocky": "rpm", + "alma": "rpm", + "fedora": "rpm", + "suse": "rpm", + "opensuse": "rpm", + # Default is "dpkg" for debian, ubuntu, etc. + } + + def __init__( + self, + workspace: workspace.Workspace, + url: str, + download_timeout: int = 125, + logger: logging.Logger | None = None, + ): + self.download_timeout = download_timeout + self.data_dir_path = Path(workspace.input_path) / self._data_dir + self.url = url + + if not logger: + logger = logging.getLogger(self.__class__.__name__) + self.logger = logger + + def _download(self) -> None: + if not os.path.exists(self.data_dir_path): + os.makedirs(self.data_dir_path, exist_ok=True) + + try: + self.logger.info(f"downloading Root.io CVE feed from {self.url}") + r = http.get(self.url, self.logger, stream=True, timeout=self.download_timeout) + file_path = self.data_dir_path / self._data_filename + with open(file_path, "wb") as fp: + for chunk in r.iter_content(): + fp.write(chunk) + except Exception: + self.logger.exception(f"Error downloading Root.io data from {self.url}") + raise + + def _normalize(self, distro_name: str, distro_data: dict[str, Any]) -> dict[str, dict[str, Any]]: + """Transform Root.io data into OS schema format with unaffected package indicators""" + vuln_dict = {} + + distro_version = distro_data.get("distroversion", "unknown") + namespace = f"rootio:distro:{distro_name}:{distro_version}" + + for package_data in distro_data.get("packages", []): + pkg_info = package_data.get("pkg", {}) + package_name = pkg_info.get("name", "") + + for cve_id, cve_info in pkg_info.get("cves", {}).items(): + if cve_id not in vuln_dict: + record = copy.deepcopy(vulnerability.vulnerability_element) + record["Vulnerability"]["Name"] = cve_id + record["Vulnerability"]["NamespaceName"] = namespace + + # Build reference links + reference_links = vulnerability.build_reference_links(cve_id) + record["Vulnerability"]["Link"] = reference_links[0] if reference_links else "" + + record["Vulnerability"]["Severity"] = "Unknown" + record["Vulnerability"]["Description"] = f"Vulnerability {cve_id} in {package_name}" + record["Vulnerability"]["FixedIn"] = [] + record["Vulnerability"]["Metadata"] = { + "CVE": [{"Name": cve_id, "Link": reference_links[0] if reference_links else ""}], + } + vuln_dict[cve_id] = record + + # Add fixed version info + cve_record = vuln_dict[cve_id] + fixed_versions = cve_info.get("fixed_versions", []) + + # Determine version format based on distro + version_format = self._get_version_format(distro_name) + + # Add the fixed versions + # Note: Root.io uses different suffixes for different distros: + # - Debian/Ubuntu: .root.io suffix (e.g., 1.5.2-6+deb12u1.root.io.4) + # - Alpine: -rXX007X suffix (e.g., -r00071, -r10074) + for fixed_version in fixed_versions: + cve_record["Vulnerability"]["FixedIn"].append({ + "Name": package_name, + "Version": fixed_version, + "VersionFormat": version_format, + "NamespaceName": namespace, + "VendorAdvisory": {"NoAdvisory": True}, + }) + + # If no fixed versions, add unfixed entry + if not fixed_versions: + cve_record["Vulnerability"]["FixedIn"].append({ + "Name": package_name, + "Version": "", # Empty version indicates no fix available + "VersionFormat": version_format, + "NamespaceName": namespace, + "VendorAdvisory": {"NoAdvisory": True}, + }) + + return vuln_dict + + def _get_version_format(self, distro_name: str) -> str: + """Map distro name to version format.""" + return self._VERSION_FORMAT_MAP.get(distro_name, "dpkg") + + def get(self) -> Generator[tuple[str, str, dict[str, Any]], None, None]: + """Download, parse and yield Root.io vulnerability records""" + # Download the data + self._download() + + # Load the JSON data + with open(self.data_dir_path / self._data_filename) as fh: + feed_data = orjson.loads(fh.read()) + + # Process each distribution + for distro_name, distro_list in feed_data.items(): + for distro_data in distro_list: + distro_version = distro_data.get("distroversion", "unknown") + namespace = f"rootio:distro:{distro_name}:{distro_version}" + + vuln_records = self._normalize(distro_name, distro_data) + + for vuln_id, record in vuln_records.items(): + yield namespace, vuln_id, record diff --git a/tests/quality/config.yaml b/tests/quality/config.yaml index 43687573..d607e775 100644 --- a/tests/quality/config.yaml +++ b/tests/quality/config.yaml @@ -345,3 +345,21 @@ tests: - wolfi:distro:wolfi:rolling validations: - *default-validations + + - provider: rootio + additional_providers: + - name: nvd + use_cache: true + images: + - docker.io/alpine:3.17@sha256:f271e74b17ced29b915d351685fd4644785c6d1559dd1f2d4189a5e851ef753a + - docker.io/debian:11@sha256:e538a2f0566efc44db21503277c7312a142f4d0dedc5d2886932b92626104bff + expected_namespaces: + - rootio:distro:alpine:3.17 + - rootio:distro:alpine:3.18 + - rootio:distro:alpine:3.19 + - rootio:distro:debian:11 + - rootio:distro:debian:12 + - rootio:distro:ubuntu:20.04 + - rootio:distro:ubuntu:22.04 + validations: + - *default-validations diff --git a/tests/unit/cli/test_cli.py b/tests/unit/cli/test_cli.py index c7035941..64cc6938 100644 --- a/tests/unit/cli/test_cli.py +++ b/tests/unit/cli/test_cli.py @@ -473,6 +473,23 @@ def test_config(monkeypatch) -> None: result_store: sqlite skip_download: false skip_newer_archive_check: false + rootio: + request_timeout: 125 + runtime: + existing_input: keep + existing_results: delete-before-write + import_results_enabled: false + import_results_host: '' + import_results_path: providers/{provider_name}/listing.json + on_error: + action: fail + input: keep + results: keep + retry_count: 3 + retry_delay: 5 + result_store: sqlite + skip_download: false + skip_newer_archive_check: false sles: allow_versions: - '11' diff --git a/tests/unit/providers/rootio/__init__.py b/tests/unit/providers/rootio/__init__.py new file mode 100644 index 00000000..b2476d3f --- /dev/null +++ b/tests/unit/providers/rootio/__init__.py @@ -0,0 +1 @@ +# Root.io provider tests \ No newline at end of file diff --git a/tests/unit/providers/rootio/test-fixtures/sample_feed.json b/tests/unit/providers/rootio/test-fixtures/sample_feed.json new file mode 100644 index 00000000..58bce3a4 --- /dev/null +++ b/tests/unit/providers/rootio/test-fixtures/sample_feed.json @@ -0,0 +1,66 @@ +{ + "alpine": [ + { + "distroversion": "3.17", + "packages": [ + { + "pkg": { + "name": "libssl3", + "cves": { + "CVE-2023-0464": { + "fixed_versions": ["3.0.8-r4"] + }, + "CVE-2023-0465": { + "fixed_versions": ["3.0.8-r4"] + } + } + } + }, + { + "pkg": { + "name": "openssl", + "cves": { + "CVE-2023-0464": { + "fixed_versions": ["3.0.8-r4"] + } + } + } + } + ] + } + ], + "debian": [ + { + "distroversion": "11", + "packages": [ + { + "pkg": { + "name": "libgcrypt20", + "cves": { + "CVE-2021-40528": { + "fixed_versions": ["1.8.7-6+deb11u1"] + } + } + } + } + ] + } + ], + "ubuntu": [ + { + "distroversion": "22.04", + "packages": [ + { + "pkg": { + "name": "python3-pip", + "cves": { + "CVE-2023-5752": { + "fixed_versions": ["22.0.2+dfsg-1ubuntu0.4"] + } + } + } + } + ] + } + ] +} \ No newline at end of file diff --git a/tests/unit/providers/rootio/test_rootio.py b/tests/unit/providers/rootio/test_rootio.py new file mode 100644 index 00000000..a79ff4a3 --- /dev/null +++ b/tests/unit/providers/rootio/test_rootio.py @@ -0,0 +1,302 @@ +from __future__ import annotations + +import os +from pathlib import Path +from unittest.mock import patch + +import pytest + +from vunnel import result, workspace +from vunnel.providers.rootio import Config, Provider, parser + + +class TestRootIoProvider: + @pytest.fixture + def mock_root_io_feed(self): + """Returns sample Root.io CVE feed data matching actual API format""" + return { + "alpine": [ + { + "distroversion": "3.17", + "packages": [ + { + "pkg": { + "name": "libssl3", + "cves": { + "CVE-2023-0464": { + "fixed_versions": ["3.0.8-r4"], + }, + "CVE-2023-0465": { + "fixed_versions": ["3.0.8-r4"], + }, + }, + }, + }, + { + "pkg": { + "name": "openssl", + "cves": { + "CVE-2023-0464": { + "fixed_versions": ["3.0.8-r4"], + }, + }, + }, + }, + ], + }, + ], + "debian": [ + { + "distroversion": "11", + "packages": [ + { + "pkg": { + "name": "libgcrypt20", + "cves": { + "CVE-2021-40528": { + "fixed_versions": ["1.8.7-6+deb11u1"], + }, + }, + }, + }, + { + "pkg": { + "name": "curl", + "cves": { + "CVE-2023-9999": { + "fixed_versions": [], # No fix available + }, + }, + }, + }, + ], + }, + ], + } + + def test_provider_name(self): + """Test that provider returns correct name""" + assert Provider.name() == "rootio" + + def test_config_defaults(self): + """Test that Config has proper defaults""" + config = Config() + assert config.request_timeout == 125 + assert config.runtime.result_store == result.StoreStrategy.SQLITE + + def test_parser_normalize_with_fixed_versions(self, tmpdir, mock_root_io_feed): + """Test that parser emits actual fixed versions from Root.io""" + ws = workspace.Workspace(tmpdir, "rootio", create=True) + p = parser.Parser( + workspace=ws, + url="https://api.root.io/external/cve_feed", + ) + + # Test Alpine data + alpine_data = mock_root_io_feed["alpine"][0] + vuln_records = p._normalize("alpine", alpine_data) + + # Should have 2 CVEs + assert "CVE-2023-0464" in vuln_records + assert "CVE-2023-0465" in vuln_records + + # Check CVE-2023-0464 structure + cve_2023_0464 = vuln_records["CVE-2023-0464"] + assert cve_2023_0464["Vulnerability"]["Name"] == "CVE-2023-0464" + assert cve_2023_0464["Vulnerability"]["NamespaceName"] == "rootio:distro:alpine:3.17" + + # Should have FixedIn entries with actual versions (NOT sentinel values) + fixed_in = cve_2023_0464["Vulnerability"]["FixedIn"] + assert len(fixed_in) == 2 # libssl3 and openssl + + # Verify actual versions are emitted + versions = [fi["Version"] for fi in fixed_in] + assert "3.0.8-r4" in versions + assert "ROOTIO_UNAFFECTED" not in versions # Should NOT use sentinel + + # Verify version format + for fi in fixed_in: + assert fi["VersionFormat"] == "apk" + assert fi["NamespaceName"] == "rootio:distro:alpine:3.17" + + def test_parser_normalize_with_empty_fixed_versions(self, tmpdir, mock_root_io_feed): + """Test that parser handles empty fixed_versions correctly""" + ws = workspace.Workspace(tmpdir, "rootio", create=True) + p = parser.Parser( + workspace=ws, + url="https://api.root.io/external/cve_feed", + ) + + # Test Debian data with empty fixed_versions + debian_data = mock_root_io_feed["debian"][0] + vuln_records = p._normalize("debian", debian_data) + + # Check CVE with no fix + cve_no_fix = vuln_records["CVE-2023-9999"] + fixed_in = cve_no_fix["Vulnerability"]["FixedIn"] + + # Should have entry with empty version + assert len(fixed_in) == 1 + assert fixed_in[0]["Name"] == "curl" + assert fixed_in[0]["Version"] == "" # Empty indicates no fix + assert fixed_in[0]["VersionFormat"] == "dpkg" + + def test_parser_version_format_mapping(self, tmpdir): + """Test that parser maps distros to correct version formats""" + ws = workspace.Workspace(tmpdir, "rootio", create=True) + p = parser.Parser( + workspace=ws, + url="https://api.root.io/external/cve_feed", + ) + + # Test version format mappings + assert p._get_version_format("alpine") == "apk" + assert p._get_version_format("debian") == "dpkg" + assert p._get_version_format("ubuntu") == "dpkg" + assert p._get_version_format("rhel") == "rpm" + assert p._get_version_format("centos") == "rpm" + assert p._get_version_format("rocky") == "rpm" + assert p._get_version_format("alma") == "rpm" + + def test_parser_namespace_format(self, tmpdir, mock_root_io_feed): + """Test that parser generates correct rootio namespace format""" + ws = workspace.Workspace(tmpdir, "rootio", create=True) + p = parser.Parser( + workspace=ws, + url="https://api.root.io/external/cve_feed", + ) + + alpine_data = mock_root_io_feed["alpine"][0] + vuln_records = p._normalize("alpine", alpine_data) + + # All records should have rootio namespace + for record in vuln_records.values(): + namespace = record["Vulnerability"]["NamespaceName"] + assert namespace == "rootio:distro:alpine:3.17" + + def test_parser_get_generator(self, tmpdir, mock_root_io_feed): + """Test the parser.get() generator yields correct data""" + ws = workspace.Workspace(tmpdir, "rootio", create=True) + p = parser.Parser( + workspace=ws, + url="https://api.root.io/external/cve_feed", + ) + + # Mock the download and file reading + import orjson + feed_path = Path(ws.input_path) / "rootio-data" / "cve_feed.json" + os.makedirs(feed_path.parent, exist_ok=True) + with open(feed_path, "wb") as f: + f.write(orjson.dumps(mock_root_io_feed)) + + # Mock download to avoid actual HTTP call + with patch.object(p, "_download"): + results = list(p.get()) + + # Should yield (namespace, vuln_id, record) tuples + assert len(results) > 0 + + # Check structure + for namespace, vuln_id, record in results: + assert namespace.startswith("rootio:distro:") + assert vuln_id.startswith("CVE-") + assert "Vulnerability" in record + assert record["Vulnerability"]["Name"] == vuln_id + assert record["Vulnerability"]["NamespaceName"] == namespace + + def test_provider_update(self, tmpdir, mock_root_io_feed): + """Test the provider update process""" + ws = workspace.Workspace(tmpdir, "rootio", create=True) + config = Config() + p = Provider(root=str(tmpdir), config=config) + + # Mock the download and feed data + import orjson + feed_path = Path(ws.input_path) / "rootio-data" / "cve_feed.json" + os.makedirs(feed_path.parent, exist_ok=True) + with open(feed_path, "wb") as f: + f.write(orjson.dumps(mock_root_io_feed)) + + # Mock download + with patch.object(p.parser, "_download"): + urls, count = p.update(last_updated=None) + + # Should return URL and count + assert isinstance(urls, list) + assert len(urls) == 1 + assert urls[0] == "https://api.root.io/external/cve_feed" + assert count > 0 + + def test_parser_metadata_structure(self, tmpdir, mock_root_io_feed): + """Test that vulnerability metadata is correctly structured""" + ws = workspace.Workspace(tmpdir, "rootio", create=True) + p = parser.Parser( + workspace=ws, + url="https://api.root.io/external/cve_feed", + ) + + alpine_data = mock_root_io_feed["alpine"][0] + vuln_records = p._normalize("alpine", alpine_data) + + for cve_id, record in vuln_records.items(): + vuln = record["Vulnerability"] + + # Check required fields + assert "Name" in vuln + assert "NamespaceName" in vuln + assert "FixedIn" in vuln + assert "Metadata" in vuln + + # Check metadata structure + assert "CVE" in vuln["Metadata"] + assert len(vuln["Metadata"]["CVE"]) > 0 + assert vuln["Metadata"]["CVE"][0]["Name"] == cve_id + + def test_parser_handles_multiple_packages_same_cve(self, tmpdir, mock_root_io_feed): + """Test that parser correctly handles multiple packages affected by same CVE""" + ws = workspace.Workspace(tmpdir, "rootio", create=True) + p = parser.Parser( + workspace=ws, + url="https://api.root.io/external/cve_feed", + ) + + alpine_data = mock_root_io_feed["alpine"][0] + vuln_records = p._normalize("alpine", alpine_data) + + # CVE-2023-0464 affects both libssl3 and openssl + cve_record = vuln_records["CVE-2023-0464"] + fixed_in = cve_record["Vulnerability"]["FixedIn"] + + # Should have 2 FixedIn entries + assert len(fixed_in) == 2 + + # Check both packages are present + package_names = [fi["Name"] for fi in fixed_in] + assert "libssl3" in package_names + assert "openssl" in package_names + + +class TestRootIoParser: + """Additional parser-specific tests""" + + def test_version_format_default(self, tmpdir): + """Test that unknown distros default to dpkg format""" + ws = workspace.Workspace(tmpdir, "rootio", create=True) + p = parser.Parser( + workspace=ws, + url="https://api.root.io/external/cve_feed", + ) + + # Unknown distro should default to dpkg + assert p._get_version_format("unknown-distro") == "dpkg" + + def test_parser_init_creates_logger(self, tmpdir): + """Test that parser creates logger if none provided""" + ws = workspace.Workspace(tmpdir, "rootio", create=True) + p = parser.Parser( + workspace=ws, + url="https://api.root.io/external/cve_feed", + ) + + assert p.logger is not None + assert p.logger.name == "Parser" diff --git a/tests/unit/test_provider.py b/tests/unit/test_provider.py index 005b8025..a8bcb1e1 100644 --- a/tests/unit/test_provider.py +++ b/tests/unit/test_provider.py @@ -980,6 +980,7 @@ def test_provider_versions(tmpdir): "oracle": 1, "rhel": 1, "rocky": 1, + "rootio": 1, "sles": 1, "ubuntu": 3, "wolfi": 1, @@ -1015,6 +1016,7 @@ def test_provider_distribution_versions(tmpdir): "oracle": 1, "rhel": 1, "rocky": 1, + "rootio": 1, "sles": 1, "ubuntu": 1, "wolfi": 1,