Skip to content

Commit 95b3286

Browse files
committed
feat(heuristics): add whitespace, fake email, and similar project checks with tests
Implemented three new heuristics: Whitespace Check for suspicious spacing, Fake Emails for invalid maintainer addresses, and Similar Projects for duplicate structures. Added unit tests for all. Signed-off-by: Amine <[email protected]>
1 parent 80afd9e commit 95b3286

File tree

12 files changed

+832
-4
lines changed

12 files changed

+832
-4
lines changed

.DS_Store

-6 KB
Binary file not shown.

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,3 +181,4 @@ docs/_build
181181
bin/
182182
requirements.txt
183183
.macaron_env_file
184+
**/.DS_Store

src/macaron/config/defaults.ini

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,3 +611,5 @@ scaling = 0.15
611611
cost = 1.0
612612
# The path to the file that contains the list of popular packages.
613613
popular_packages_path =
614+
# The threshold for the number of repeated spaces in a single line.
615+
repeated_spaces_threshold = 30

src/macaron/malware_analyzer/pypi_heuristics/heuristics.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,15 @@ class Heuristics(str, Enum):
4040
#: Indicates that the package name is similar to a popular package.
4141
TYPOSQUATTING_PRESENCE = "typosquatting_presence"
4242

43+
#: Indicates that at least one maintainer has a suspicious email address.
44+
FAKE_EMAIL = "fake_email"
45+
46+
#: Indicates that the package has a lot of white spaces or invisible characters.
47+
WHITE_SPACES = "white_spaces"
48+
49+
#: Indicates that the package and other package from the same maintainer have similar folder structure.
50+
SIMILAR_PROJECTS = "similar_projects"
51+
4352

4453
class HeuristicResult(str, Enum):
4554
"""Result type indicating the outcome of a heuristic."""
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved.
2+
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.
3+
4+
"""The heuristic analyzer to check the email address of the package maintainers."""
5+
6+
import logging
7+
import re
8+
9+
import dns.resolver as dns_resolver
10+
11+
from macaron.errors import HeuristicAnalyzerValueError
12+
from macaron.json_tools import JsonType
13+
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
14+
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
15+
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset
16+
17+
logger: logging.Logger = logging.getLogger(__name__)
18+
19+
20+
class FakeEmailAnalyzer(BaseHeuristicAnalyzer):
21+
"""Analyze the email address of the package maintainers."""
22+
23+
def __init__(self) -> None:
24+
super().__init__(
25+
name="fake_email_analyzer",
26+
heuristic=Heuristics.FAKE_EMAIL,
27+
depends_on=None,
28+
)
29+
30+
self.suspicious_domains: set[str] = set()
31+
32+
def is_valid_email(self, email: str) -> bool:
33+
"""Check if the email format is valid and the domain has MX records.
34+
35+
Parameters
36+
----------
37+
email: str
38+
The email address to check.
39+
40+
Returns
41+
-------
42+
bool:
43+
True if the email address is valid, False otherwise.
44+
45+
Raises
46+
------
47+
HeuristicAnalyzerValueError
48+
if the failure is due to DNS resolution.
49+
"""
50+
if not re.match(r"[^@]+@[^@]+\.[^@]+", email):
51+
return False
52+
53+
domain = email.split("@")[1]
54+
if domain in self.suspicious_domains:
55+
return False
56+
try:
57+
records = dns_resolver.resolve(domain, "MX")
58+
if not records:
59+
self.suspicious_domains.add(domain)
60+
return False
61+
return True
62+
except Exception as err:
63+
err_message = f"Failed to resolve domain {domain}: {err}"
64+
raise HeuristicAnalyzerValueError(err_message) from err
65+
66+
def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:
67+
"""Analyze the package.
68+
69+
Parameters
70+
----------
71+
pypi_package_json: PyPIPackageJsonAsset
72+
The PyPI package JSON asset object.
73+
74+
Returns
75+
-------
76+
tuple[HeuristicResult, dict[str, JsonType]]:
77+
The result and related information collected during the analysis.
78+
79+
Raises
80+
------
81+
HeuristicAnalyzerValueError
82+
if the analysis fails.
83+
"""
84+
package_name = pypi_package_json.component_name
85+
maintainers = pypi_package_json.pypi_registry.get_maintainers_of_package(package_name)
86+
if not maintainers:
87+
err_message = f"Failed to get maintainers for {package_name}"
88+
raise HeuristicAnalyzerValueError(err_message)
89+
90+
for email in maintainers:
91+
if not self.is_valid_email(email):
92+
return HeuristicResult.FAIL, {"email": email}
93+
94+
return HeuristicResult.PASS, {}
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved.
2+
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.
3+
4+
"""This analyzer checks if the package has a similar structure to other packages maintained by the same user."""
5+
6+
import hashlib
7+
import logging
8+
import tarfile
9+
import typing
10+
11+
import requests
12+
from bs4 import BeautifulSoup
13+
14+
from macaron.errors import HeuristicAnalyzerValueError
15+
from macaron.json_tools import JsonType
16+
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
17+
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
18+
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset
19+
20+
logger: logging.Logger = logging.getLogger(__name__)
21+
22+
23+
class SimilarProjectAnalyzer(BaseHeuristicAnalyzer):
24+
"""Check whether the package has a similar structure to other packages maintained by the same user."""
25+
26+
def __init__(self) -> None:
27+
super().__init__(
28+
name="similar_project_analyzer",
29+
heuristic=Heuristics.SIMILAR_PROJECTS,
30+
depends_on=None,
31+
)
32+
33+
def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:
34+
"""Analyze the package.
35+
36+
Parameters
37+
----------
38+
pypi_package_json: PyPIPackageJsonAsset
39+
The PyPI package JSON asset object.
40+
41+
Returns
42+
-------
43+
tuple[HeuristicResult, dict[str, JsonType]]:
44+
The result and related information collected during the analysis.
45+
46+
Raises
47+
------
48+
HeuristicAnalyzerValueError
49+
if the analysis fails.
50+
"""
51+
package_name = pypi_package_json.component_name
52+
try:
53+
target_hash = self.get_structure_hash(package_name)
54+
except Exception as err:
55+
err_message = f"Failed to get structure hash for {package_name}: {err}"
56+
raise HeuristicAnalyzerValueError(err_message) from err
57+
58+
similar_packages = self.get_packages(package_name)
59+
if not similar_packages:
60+
return HeuristicResult.SKIP, {
61+
"message": f"No similar packages found for {package_name}",
62+
}
63+
64+
for package in similar_packages:
65+
try:
66+
package_hash = self.get_structure_hash(package)
67+
except Exception as err:
68+
err_message = f"Failed to get structure hash for {package}: {err}"
69+
raise HeuristicAnalyzerValueError(err_message) from err
70+
if package_hash == target_hash:
71+
return HeuristicResult.FAIL, {
72+
"similar_package": package,
73+
}
74+
return HeuristicResult.PASS, {}
75+
76+
def get_maintainers(self, package_name: str) -> list[str]:
77+
"""Get all maintainers of a package.
78+
79+
Parameters
80+
----------
81+
package_name (str): The name of the package.
82+
83+
Returns
84+
-------
85+
list[str]: A list of maintainers.
86+
"""
87+
url = f"https://pypi.org/project/{package_name}/"
88+
response = requests.get(url, timeout=10)
89+
if response.status_code != 200:
90+
return []
91+
92+
soup = BeautifulSoup(response.text, "html.parser")
93+
gravatar_spans = soup.find_all("span", class_="sidebar-section__user-gravatar-text")
94+
maintainers = [span.get_text().strip() for span in gravatar_spans]
95+
96+
return maintainers
97+
98+
def get_packages_by_user(self, username: str) -> list[str]:
99+
"""Get all packages by a user.
100+
101+
Parameters
102+
----------
103+
username (str): The username of the user.
104+
105+
Returns
106+
-------
107+
list[str]: A list of package names.
108+
"""
109+
url = f"https://pypi.org/user/{username}/"
110+
response = requests.get(url, timeout=10)
111+
if response.status_code != 200:
112+
return []
113+
114+
soup = BeautifulSoup(response.text, "html.parser")
115+
headers = soup.find_all("h3", class_="package-snippet__title")
116+
packages = [header.get_text().strip() for header in headers]
117+
return packages
118+
119+
def get_packages(self, package_name: str) -> list[str]:
120+
"""Get packages that are maintained by this package's maintainers.
121+
122+
Parameters
123+
----------
124+
package_name (str): The name of the package.
125+
126+
Returns
127+
-------
128+
list[str]: A list of similar projects.
129+
"""
130+
similar_projects = []
131+
maintainers = self.get_maintainers(package_name)
132+
for user in maintainers:
133+
user_packages = self.get_packages_by_user(user)
134+
similar_projects.extend(user_packages)
135+
# Remove the target package from the list of similar projects.
136+
similar_projects_set = set(similar_projects)
137+
similar_projects_set.discard(package_name)
138+
return list(similar_projects_set)
139+
140+
def fetch_sdist_url(self, package_name: str, version: str | None = None) -> str:
141+
"""Fetch the sdist URL for a package.
142+
143+
Parameters
144+
----------
145+
package_name (str): The name of the package.
146+
version (str): The version of the package. If None, the latest version will be used.
147+
148+
Returns
149+
-------
150+
str: The sdist URL, or an empty string if not found.
151+
"""
152+
url = f"https://pypi.org/pypi/{package_name}/json"
153+
try:
154+
response = requests.get(url, timeout=10)
155+
response.raise_for_status()
156+
data = response.json()
157+
except requests.exceptions.RequestException as err:
158+
err_message = f"Failed to fetch PyPI JSON for {package_name}: {err}"
159+
raise HeuristicAnalyzerValueError(err_message) from err
160+
except ValueError as err:
161+
err_message = f"Failed to decode PyPI JSON for {package_name}: {err}"
162+
raise HeuristicAnalyzerValueError(err_message) from err
163+
164+
actual_version: str
165+
if version is None:
166+
try:
167+
actual_version = typing.cast(str, data["info"]["version"])
168+
except (KeyError, TypeError) as err:
169+
err_message = f"Failed to get version for {package_name}: {err}"
170+
raise HeuristicAnalyzerValueError(err_message) from err
171+
else:
172+
actual_version = version
173+
174+
try:
175+
for release_file in data.get("releases", {}).get(actual_version, []):
176+
if isinstance(release_file, dict) and release_file.get("packagetype") == "sdist":
177+
sdist_url = release_file.get("url")
178+
if isinstance(sdist_url, str):
179+
return sdist_url
180+
except Exception as err:
181+
err_message = f"Failed to parse releases for {package_name} version {actual_version}: {err}"
182+
raise HeuristicAnalyzerValueError(err_message) from err
183+
184+
return ""
185+
186+
def get_structure_hash(self, package_name: str) -> str | None:
187+
"""Calculate a hash based on the project's file structure.
188+
189+
Parameters
190+
----------
191+
package_name (str): The name of the package.
192+
193+
Returns
194+
-------
195+
str: The structure hash.
196+
197+
Raises
198+
------
199+
ValueError: If the sdist URL cannot be fetched or the package structure cannot be hashed.
200+
"""
201+
sdist_url = self.fetch_sdist_url(package_name)
202+
if not sdist_url:
203+
return ""
204+
205+
try:
206+
response = requests.get(sdist_url, stream=True, timeout=10)
207+
response.raise_for_status()
208+
raw_file_obj: typing.IO[bytes] = typing.cast(typing.IO[bytes], response.raw)
209+
210+
with tarfile.open(fileobj=raw_file_obj, mode="r:gz") as file_archive:
211+
paths = []
212+
for member in file_archive:
213+
if not member.isdir():
214+
# remove top‑level dir.
215+
parts = member.name.split("/", 1)
216+
normalized = parts[1] if len(parts) > 1 else parts[0]
217+
# replace the pkg name.
218+
normalized = normalized.replace(package_name, "<PKG>")
219+
paths.append(normalized)
220+
paths.sort()
221+
structure_hash_calculator = hashlib.sha256()
222+
for path in paths:
223+
structure_hash_calculator.update(path.encode("utf-8"))
224+
structure_hash_calculator.update(b"\n")
225+
return structure_hash_calculator.hexdigest()
226+
except requests.exceptions.RequestException as err:
227+
err_message = f"Failed to download sdist for {package_name} from {sdist_url}: {err}"
228+
raise HeuristicAnalyzerValueError(err_message) from err
229+
except tarfile.TarError as err:
230+
err_message = f"Failed to process tarfile for {package_name} from {sdist_url}: {err}"
231+
raise HeuristicAnalyzerValueError(err_message) from err
232+
except Exception as err:
233+
err_message = f"Failed to get structure hash for {package_name}: {err}"
234+
raise HeuristicAnalyzerValueError(err_message) from err

0 commit comments

Comments
 (0)