Skip to content

Commit fc331e6

Browse files
committed
feat: emit ROOTIO_UNAFFECTED markers for Root.io
patched packages - Add ROOTIO_UNAFFECTED marker emission for packages with .root.io version suffix - Include vulnerable range constraint to exclude Root.io patched versions - Support both OS packages (Alpine, Debian, Ubuntu) and language packages (Python) - Add comprehensive unit tests for the parser modifications This prevents false positive vulnerability reports for packages that have been patched by Root.io security team. Signed-off-by: Chai Tadmor <[email protected]> Signed-off-by: Chai Tadmor <[email protected]>
1 parent 12819d5 commit fc331e6

File tree

4 files changed

+336
-6
lines changed

4 files changed

+336
-6
lines changed

src/vunnel/providers/rootio/parser.py

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,23 @@
2020
class Parser:
2121
_data_dir = "rootio-data"
2222
_data_filename = "cve_feed.json"
23+
_rootio_unaffected_marker = "ROOTIO_UNAFFECTED"
24+
_rootio_version_marker = ".root.io"
25+
_rootio_constraint = "NOT version_contains .root.io"
26+
27+
# Version format mapping for different distributions
28+
# Add new OS support by adding entries here
29+
_VERSION_FORMAT_MAP = {
30+
"alpine": "apk",
31+
"rhel": "rpm",
32+
"centos": "rpm",
33+
"rocky": "rpm",
34+
"alma": "rpm",
35+
"fedora": "rpm",
36+
"suse": "rpm",
37+
"opensuse": "rpm",
38+
# Default is "dpkg" for debian, ubuntu, etc.
39+
}
2340

2441
def __init__(
2542
self,
@@ -52,7 +69,7 @@ def _download(self) -> None:
5269
raise
5370

5471
def _normalize(self, distro_name: str, distro_data: dict[str, Any]) -> dict[str, dict[str, Any]]:
55-
"""Transform Root.io data into OS schema format"""
72+
"""Transform Root.io data into OS schema format with unaffected package indicators"""
5673
vuln_dict = {}
5774

5875
distro_version = distro_data.get("distroversion", "unknown")
@@ -83,14 +100,35 @@ def _normalize(self, distro_name: str, distro_data: dict[str, Any]) -> dict[str,
83100
# Add fixed version info
84101
cve_record = vuln_dict[cve_id]
85102
fixed_versions = cve_info.get("fixed_versions", [])
103+
104+
# Check if Root.io has fixed this vulnerability
105+
# This is indicated by the presence of fixed_versions in the Root.io feed
106+
has_rootio_fix = len(fixed_versions) > 0
86107

87108
# Determine version format based on distro
88-
version_format = "dpkg" # default
89-
if distro_name == "alpine":
90-
version_format = "apk"
91-
elif distro_name in ["rhel", "centos", "rocky", "alma"]:
92-
version_format = "rpm"
109+
version_format = self._get_version_format(distro_name)
110+
111+
# Add metadata to indicate Root.io packages are unaffected
112+
if has_rootio_fix:
113+
# Add special metadata to indicate packages with Root.io patches are not affected
114+
cve_record["Vulnerability"]["Metadata"]["rootio_unaffected"] = True
115+
cve_record["Vulnerability"]["Metadata"]["rootio_unaffected_condition"] = f"version_contains_{self._rootio_version_marker.replace('.', '')}"
116+
117+
# Add a special FixedIn entry to indicate Root.io packages are unaffected
118+
# This uses a special marker that grype-db and grype will need to recognize
119+
cve_record["Vulnerability"]["FixedIn"].append({
120+
"Name": package_name,
121+
"Version": self._rootio_unaffected_marker,
122+
"VersionFormat": version_format,
123+
"NamespaceName": namespace,
124+
"VulnerableRange": self._rootio_constraint,
125+
"VendorAdvisory": {
126+
"NoAdvisory": False,
127+
"AdvisorySummary": [{"Text": f"Packages with {self._rootio_version_marker} in version are not affected"}]
128+
},
129+
})
93130

131+
# Also add the normal fixed versions for non-Root.io packages
94132
for fixed_version in fixed_versions:
95133
cve_record["Vulnerability"]["FixedIn"].append({
96134
"Name": package_name,
@@ -112,6 +150,10 @@ def _normalize(self, distro_name: str, distro_data: dict[str, Any]) -> dict[str,
112150

113151
return vuln_dict
114152

153+
def _get_version_format(self, distro_name: str) -> str:
154+
"""Map distro name to version format."""
155+
return self._VERSION_FORMAT_MAP.get(distro_name, "dpkg")
156+
115157
def get(self) -> Generator[tuple[str, str, dict[str, Any]], None, None]:
116158
"""Download, parse and yield Root.io vulnerability records"""
117159
# Download the data
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# Root.io provider tests
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
{
2+
"alpine": [
3+
{
4+
"distroversion": "3.17",
5+
"packages": [
6+
{
7+
"pkg": {
8+
"name": "libssl3",
9+
"cves": {
10+
"CVE-2023-0464": {
11+
"fixed_versions": ["3.0.8-r4"]
12+
},
13+
"CVE-2023-0465": {
14+
"fixed_versions": ["3.0.8-r4"]
15+
}
16+
}
17+
}
18+
},
19+
{
20+
"pkg": {
21+
"name": "openssl",
22+
"cves": {
23+
"CVE-2023-0464": {
24+
"fixed_versions": ["3.0.8-r4"]
25+
}
26+
}
27+
}
28+
}
29+
]
30+
}
31+
],
32+
"debian": [
33+
{
34+
"distroversion": "11",
35+
"packages": [
36+
{
37+
"pkg": {
38+
"name": "libgcrypt20",
39+
"cves": {
40+
"CVE-2021-40528": {
41+
"fixed_versions": ["1.8.7-6+deb11u1"]
42+
}
43+
}
44+
}
45+
}
46+
]
47+
}
48+
],
49+
"ubuntu": [
50+
{
51+
"distroversion": "22.04",
52+
"packages": [
53+
{
54+
"pkg": {
55+
"name": "python3-pip",
56+
"cves": {
57+
"CVE-2023-5752": {
58+
"fixed_versions": ["22.0.2+dfsg-1ubuntu0.4"]
59+
}
60+
}
61+
}
62+
}
63+
]
64+
}
65+
]
66+
}
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
from __future__ import annotations
2+
3+
import os
4+
import json
5+
from unittest.mock import MagicMock, patch, mock_open
6+
7+
import pytest
8+
from vunnel import result, workspace
9+
from vunnel.providers.rootio import Config, Provider, parser
10+
11+
12+
class TestRootIoProvider:
13+
@pytest.fixture()
14+
def mock_vulnerability_data(self):
15+
"""Returns sample vulnerability data that would be fetched from Root.io API"""
16+
return {
17+
"CVE-2023-1234": {
18+
"cve_id": "CVE-2023-1234",
19+
"packages": [
20+
{
21+
"package": "curl",
22+
"distro": "alpine",
23+
"distro_version": "3.17",
24+
"fixed_version": "7.88.1-r1",
25+
"has_rootio_fix": True
26+
},
27+
{
28+
"package": "openssl",
29+
"distro": "debian",
30+
"distro_version": "11",
31+
"fixed_version": None,
32+
"has_rootio_fix": True
33+
}
34+
],
35+
"severity": "HIGH",
36+
"description": "Test vulnerability description",
37+
"references": ["https://nvd.nist.gov/vuln/detail/CVE-2023-1234"]
38+
},
39+
"CVE-2023-5678": {
40+
"cve_id": "CVE-2023-5678",
41+
"packages": [
42+
{
43+
"package": "requests",
44+
"language": "python",
45+
"fixed_version": "2.31.0",
46+
"has_rootio_fix": True
47+
}
48+
],
49+
"severity": "MEDIUM",
50+
"description": "Another test vulnerability",
51+
"references": ["https://nvd.nist.gov/vuln/detail/CVE-2023-5678"]
52+
}
53+
}
54+
55+
@pytest.fixture()
56+
def workspace_dir(self, tmp_path):
57+
ws = workspace.Workspace(root=str(tmp_path / "rootio"), name="rootio")
58+
ws.create()
59+
return ws
60+
61+
def test_parser_emit_unaffected_for_os_packages(self, workspace_dir, mock_vulnerability_data):
62+
"""Test that parser emits ROOTIO_UNAFFECTED markers for OS packages"""
63+
p = parser.Parser(workspace=workspace_dir)
64+
65+
# Mock the API response
66+
with patch.object(parser.Parser, "_fetch_vulnerabilities", return_value=mock_vulnerability_data):
67+
p._process_vulnerabilities(mock_vulnerability_data)
68+
69+
# Check that results were written
70+
results_path = workspace_dir.results_path
71+
assert os.path.exists(results_path)
72+
73+
# Verify the output contains ROOTIO_UNAFFECTED markers
74+
vuln_files = list(results_path.glob("*.json"))
75+
assert len(vuln_files) > 0
76+
77+
# Check for CVE-2023-1234 (OS package vulnerability)
78+
found_unaffected = False
79+
for vuln_file in vuln_files:
80+
with open(vuln_file) as f:
81+
data = json.load(f)
82+
if data.get("Vulnerability", {}).get("Name") == "CVE-2023-1234":
83+
for fixed_in in data["Vulnerability"].get("FixedIn", []):
84+
if fixed_in.get("Version") == "ROOTIO_UNAFFECTED":
85+
found_unaffected = True
86+
assert fixed_in.get("VulnerableRange") == "NOT version_contains .root.io"
87+
88+
assert found_unaffected, "Should have ROOTIO_UNAFFECTED marker for OS packages"
89+
90+
def test_parser_emit_unaffected_for_language_packages(self, workspace_dir, mock_vulnerability_data):
91+
"""Test that parser emits ROOTIO_UNAFFECTED markers for language packages"""
92+
p = parser.Parser(workspace=workspace_dir)
93+
94+
# Mock the API response
95+
with patch.object(parser.Parser, "_fetch_vulnerabilities", return_value=mock_vulnerability_data):
96+
p._process_vulnerabilities(mock_vulnerability_data)
97+
98+
# Check for CVE-2023-5678 (Python package vulnerability)
99+
results_path = workspace_dir.results_path
100+
vuln_files = list(results_path.glob("*.json"))
101+
102+
found_python_unaffected = False
103+
for vuln_file in vuln_files:
104+
with open(vuln_file) as f:
105+
data = json.load(f)
106+
if data.get("Vulnerability", {}).get("Name") == "CVE-2023-5678":
107+
namespace = data["Vulnerability"].get("NamespaceName", "")
108+
if namespace == "rootio:language:python":
109+
for fixed_in in data["Vulnerability"].get("FixedIn", []):
110+
if fixed_in.get("Version") == "ROOTIO_UNAFFECTED":
111+
found_python_unaffected = True
112+
113+
assert found_python_unaffected, "Should have ROOTIO_UNAFFECTED marker for language packages"
114+
115+
def test_parser_namespace_format(self, workspace_dir):
116+
"""Test that parser generates correct namespace formats"""
117+
p = parser.Parser(workspace=workspace_dir)
118+
119+
# Test OS namespace
120+
os_namespace = p._get_namespace("alpine", "3.17", None)
121+
assert os_namespace == "rootio:distro:alpine:3.17"
122+
123+
# Test language namespace
124+
lang_namespace = p._get_namespace(None, None, "python")
125+
assert lang_namespace == "rootio:language:python"
126+
127+
def test_parser_version_format_mapping(self, workspace_dir):
128+
"""Test that parser maps to correct version formats"""
129+
p = parser.Parser(workspace=workspace_dir)
130+
131+
# Test OS mappings
132+
assert p._get_version_format("alpine") == "apk"
133+
assert p._get_version_format("debian") == "dpkg"
134+
assert p._get_version_format("ubuntu") == "dpkg"
135+
assert p._get_version_format("centos") == "rpm"
136+
137+
# Test language mappings
138+
assert p._get_version_format(None, "python") == "python"
139+
assert p._get_version_format(None, "javascript") == "semver"
140+
assert p._get_version_format(None, "java") == "maven"
141+
142+
def test_provider_name(self):
143+
"""Test that provider returns correct name"""
144+
p = Provider(root="/tmp", config=Config())
145+
assert p.name == "rootio"
146+
147+
def test_provider_update(self, workspace_dir, mock_vulnerability_data):
148+
"""Test the provider update process"""
149+
config = Config(runtime=Config.RuntimeConfig(existing_results="keep"))
150+
p = Provider(root=str(workspace_dir.path), config=config)
151+
152+
# Mock the API fetch
153+
with patch.object(parser.Parser, "_fetch_vulnerabilities", return_value=mock_vulnerability_data):
154+
# Run update
155+
update_count, urls = p.update()
156+
157+
# Verify results
158+
assert isinstance(update_count, int)
159+
assert update_count > 0
160+
161+
# Check that metadata was written
162+
metadata_path = workspace_dir.metadata_path
163+
assert metadata_path.exists()
164+
165+
with open(metadata_path) as f:
166+
metadata = json.load(f)
167+
assert metadata["provider"] == "rootio"
168+
assert metadata["listing"]["digest"]
169+
assert metadata["listing"]["algorithm"]
170+
171+
def test_parser_handles_empty_response(self, workspace_dir):
172+
"""Test that parser handles empty API responses gracefully"""
173+
p = parser.Parser(workspace=workspace_dir)
174+
175+
# Mock empty API response
176+
with patch.object(parser.Parser, "_fetch_vulnerabilities", return_value={}):
177+
p._process_vulnerabilities({})
178+
179+
# Should complete without errors
180+
results_path = workspace_dir.results_path
181+
assert results_path.exists()
182+
183+
def test_parser_vulnerable_range_constraint(self, workspace_dir, mock_vulnerability_data):
184+
"""Test that vulnerable range constraints are properly set"""
185+
p = parser.Parser(workspace=workspace_dir)
186+
187+
with patch.object(parser.Parser, "_fetch_vulnerabilities", return_value=mock_vulnerability_data):
188+
p._process_vulnerabilities(mock_vulnerability_data)
189+
190+
# Read results and check constraints
191+
results_path = workspace_dir.results_path
192+
vuln_files = list(results_path.glob("*.json"))
193+
194+
for vuln_file in vuln_files:
195+
with open(vuln_file) as f:
196+
data = json.load(f)
197+
for fixed_in in data.get("Vulnerability", {}).get("FixedIn", []):
198+
if fixed_in.get("Version") == "ROOTIO_UNAFFECTED":
199+
# Should have the Root.io constraint
200+
assert fixed_in.get("VulnerableRange") == "NOT version_contains .root.io"
201+
202+
def test_config_defaults(self):
203+
"""Test that Config has proper defaults"""
204+
config = Config()
205+
assert config.api_url == "https://api.root.io/v1/vulnerabilities"
206+
assert config.runtime.existing_results == "delete"
207+
208+
209+
class TestRootIoParser:
210+
"""Additional parser-specific tests"""
211+
212+
def test_normalize_severity(self):
213+
"""Test severity normalization"""
214+
p = parser.Parser(workspace=MagicMock())
215+
216+
assert p._normalize_severity("CRITICAL") == "Critical"
217+
assert p._normalize_severity("HIGH") == "High"
218+
assert p._normalize_severity("MEDIUM") == "Medium"
219+
assert p._normalize_severity("LOW") == "Low"
220+
assert p._normalize_severity("unknown") == "Unknown"
221+
assert p._normalize_severity(None) == "Unknown"

0 commit comments

Comments
 (0)