|
| 1 | +# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved. |
| 2 | +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. |
| 3 | + |
| 4 | +"""The heuristic analyzer to check for an anomalous package version.""" |
| 5 | + |
| 6 | +import logging |
| 7 | +from enum import Enum |
| 8 | + |
| 9 | +from packaging.version import InvalidVersion, parse |
| 10 | + |
| 11 | +from macaron.config.defaults import defaults |
| 12 | +from macaron.errors import HeuristicAnalyzerValueError |
| 13 | +from macaron.json_tools import JsonType, json_extract |
| 14 | +from macaron.malware_analyzer.datetime_parser import parse_datetime |
| 15 | +from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer |
| 16 | +from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics |
| 17 | +from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset |
| 18 | + |
| 19 | +logger: logging.Logger = logging.getLogger(__name__) |
| 20 | + |
| 21 | + |
| 22 | +class AnomalousVersionAnalyzer(BaseHeuristicAnalyzer): |
| 23 | + """ |
| 24 | + Analyze the version number (if there is only a single release) to detect if it is anomalous. |
| 25 | +
|
| 26 | + A version number is anomalous if any of its values are greater than the epoch, major, or minor threshold values. |
| 27 | + If the version does not adhere to PyPI standards (PEP 440, as per the 'packaging' module), this heuristic |
| 28 | + cannot analyze it. |
| 29 | +
|
| 30 | + Calendar versioning is detected as version numbers with the year, month and day present in the following combinations: |
| 31 | + (using the example 11th October 2016) |
| 32 | + - YYYY.MM.DD, e.g. 2016.10.11 |
| 33 | + - YYYY.DD.MM, e.g. 2016.11.10 |
| 34 | + - YY.DD.MM, e.g. 16.11.10 |
| 35 | + - YY.MM.DD, e.g. 16.10.11 |
| 36 | + - MM.DD.YYYY, e.g. 10.11.2016 |
| 37 | + - DD.MM.YYYY, e.g. 11.10.2016 |
| 38 | + - DD.MM.YY, e.g. 11.10.16 |
| 39 | + - MM.DD.YY, e.g. 10.11.16 |
| 40 | + - YYYYMMDD, e.g. 20161011 |
| 41 | + - YYYYDDMM, e.g. 20161110 |
| 42 | + - YYDDMM, e.g. 161110 |
| 43 | + - YYMMDD, e.g. 161011 |
| 44 | + - MMDDYYYY, e.g. 10112016 |
| 45 | + - DDMMYYYY, e.g. 11102016 |
| 46 | + - DDMMYY, e.g. 111016 |
| 47 | + - MMDDYY, e.g. 101116 |
| 48 | + This may be followed by further versioning (e.g. 2016.10.11.5.6.2). This type of versioning is detected based on the |
| 49 | + date of the upload time for the release within a threshold of a number of days (in the defaults file). |
| 50 | +
|
| 51 | + Calendar-semantic versioning is detected as version numbers with the major value as the year (either yyyy or yy), |
| 52 | + and any other series of numbers following it: |
| 53 | + - 2016.7.1 woud be version 7.1 of 2016 |
| 54 | + - 16.1.4 would be version 1.4 of 2016 |
| 55 | + This type of versioning is detected based on the exact year of the upload time for the release. |
| 56 | +
|
| 57 | + All other versionings are detected as semantic versioning. |
| 58 | + """ |
| 59 | + |
| 60 | + DETAIL_INFO_KEY: str = "versioning" |
| 61 | + DIGIT_DATE_FORMATS: list[str] = ["%Y%m%d", "%Y%d%m", "%d%m%Y", "%m%d%Y", "%y%m%d", "%y%d%m", "%d%m%y", "%m%d%y"] |
| 62 | + |
| 63 | + def __init__(self) -> None: |
| 64 | + super().__init__( |
| 65 | + name="anomalous_version_analyzer", |
| 66 | + heuristic=Heuristics.ANOMALOUS_VERSION, |
| 67 | + depends_on=[(Heuristics.ONE_RELEASE, HeuristicResult.FAIL)], |
| 68 | + ) |
| 69 | + self.major_threshold, self.epoch_threshold, self.day_publish_error = self._load_defaults() |
| 70 | + |
| 71 | + def _load_defaults(self) -> tuple[int, int, int]: |
| 72 | + """Load default settings from defaults.ini. |
| 73 | +
|
| 74 | + Returns |
| 75 | + ------- |
| 76 | + tuple[int, int, int]: |
| 77 | + The Major threshold, Epoch threshold, and Day published error. |
| 78 | + """ |
| 79 | + section_name = "heuristic.pypi" |
| 80 | + if defaults.has_section(section_name): |
| 81 | + section = defaults[section_name] |
| 82 | + return ( |
| 83 | + section.getint("major_threshold"), |
| 84 | + section.getint("epoch_threshold"), |
| 85 | + section.getint("day_publish_error"), |
| 86 | + ) |
| 87 | + return 20, 3, 4 |
| 88 | + |
| 89 | + def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]: |
| 90 | + """Analyze the package. |
| 91 | +
|
| 92 | + Parameters |
| 93 | + ---------- |
| 94 | + pypi_package_json: PyPIPackageJsonAsset |
| 95 | + The PyPI package JSON asset object. |
| 96 | +
|
| 97 | + Returns |
| 98 | + ------- |
| 99 | + tuple[HeuristicResult, dict[str, JsonType]]: |
| 100 | + The result and related information collected during the analysis. |
| 101 | +
|
| 102 | + Raises |
| 103 | + ------ |
| 104 | + HeuristicAnalyzerValueError |
| 105 | + if there is no release information available. |
| 106 | + """ |
| 107 | + releases = pypi_package_json.get_releases() |
| 108 | + if releases is None: # no release information |
| 109 | + error_msg = "There is no information for any release of this package." |
| 110 | + logger.debug(error_msg) |
| 111 | + raise HeuristicAnalyzerValueError(error_msg) |
| 112 | + |
| 113 | + if len(releases) != 1: |
| 114 | + error_msg = ( |
| 115 | + "This heuristic depends on a single release, but somehow there are multiple when the one release" |
| 116 | + + " heuristic failed." |
| 117 | + ) |
| 118 | + logger.debug(error_msg) |
| 119 | + raise HeuristicAnalyzerValueError(error_msg) |
| 120 | + |
| 121 | + # Since there is only one release, the latest version should be that release |
| 122 | + release = pypi_package_json.get_latest_version() |
| 123 | + if release is None: |
| 124 | + error_msg = "No latest version information available" |
| 125 | + logger.debug(error_msg) |
| 126 | + raise HeuristicAnalyzerValueError(error_msg) |
| 127 | + |
| 128 | + try: |
| 129 | + release_metadata = releases[release] |
| 130 | + except KeyError as release_error: |
| 131 | + error_msg = "The latest release is not available in the list of releases" |
| 132 | + logger.debug(error_msg) |
| 133 | + raise HeuristicAnalyzerValueError(error_msg) from release_error |
| 134 | + |
| 135 | + try: |
| 136 | + version = parse(release) |
| 137 | + except InvalidVersion: |
| 138 | + return HeuristicResult.SKIP, {self.DETAIL_INFO_KEY: Versioning.INVALID.value} |
| 139 | + |
| 140 | + years = [] |
| 141 | + months = [] |
| 142 | + publish_days = [] |
| 143 | + |
| 144 | + for distribution in release_metadata: |
| 145 | + upload_time = json_extract(distribution, ["upload_time"], str) |
| 146 | + if upload_time is None: |
| 147 | + error_msg = "Missing upload time from release information" |
| 148 | + logger.debug(error_msg) |
| 149 | + raise HeuristicAnalyzerValueError(error_msg) |
| 150 | + |
| 151 | + parsed_time = parse_datetime(upload_time) |
| 152 | + if parsed_time is None: |
| 153 | + error_msg = "Upload time is not of the expected PyPI format" |
| 154 | + logger.debug(error_msg) |
| 155 | + raise HeuristicAnalyzerValueError(error_msg) |
| 156 | + |
| 157 | + years.append(parsed_time.year) |
| 158 | + years.append(parsed_time.year % 100) # last 2 digits |
| 159 | + months.append(parsed_time.month) |
| 160 | + publish_days.append(parsed_time.day) |
| 161 | + |
| 162 | + days = list(range(min(publish_days) - self.day_publish_error, max(publish_days) + self.day_publish_error + 1)) |
| 163 | + |
| 164 | + calendar = False |
| 165 | + calendar_semantic = False |
| 166 | + |
| 167 | + # check for year YY[YY]... |
| 168 | + if version.major in years: |
| 169 | + # calendar versioning: YY[YY].(M[M].D[D])(D[D].M[M])... |
| 170 | + if (version.minor in months and version.micro in days) or ( |
| 171 | + version.minor in days and version.micro in months |
| 172 | + ): |
| 173 | + calendar = True |
| 174 | + else: |
| 175 | + calendar_semantic = True |
| 176 | + # check for calendar versioning: M[M].D[D].YY[YY]... or D[D].M[M].YY[YY]... or the whole digit rerpesenting a datetime |
| 177 | + elif ( |
| 178 | + ((version.major in months and version.minor in days) or (version.major in days and version.minor in months)) |
| 179 | + and version.micro in years |
| 180 | + ) or self._integer_date(version.major, years, months, days): |
| 181 | + # must include day and year for this to be calendar |
| 182 | + calendar = True |
| 183 | + |
| 184 | + if calendar: # just check epoch |
| 185 | + detail_info: dict[str, JsonType] = {self.DETAIL_INFO_KEY: Versioning.CALENDAR.value} |
| 186 | + if version.epoch > self.epoch_threshold: |
| 187 | + return HeuristicResult.FAIL, detail_info |
| 188 | + |
| 189 | + return HeuristicResult.PASS, detail_info |
| 190 | + |
| 191 | + if calendar_semantic: # check minor (as major) and epoch |
| 192 | + detail_info = {self.DETAIL_INFO_KEY: Versioning.CALENDAR_SEMANTIC.value} |
| 193 | + |
| 194 | + if version.epoch > self.epoch_threshold: |
| 195 | + return HeuristicResult.FAIL, detail_info |
| 196 | + if version.minor > self.major_threshold: |
| 197 | + return HeuristicResult.FAIL, detail_info |
| 198 | + |
| 199 | + return HeuristicResult.PASS, detail_info |
| 200 | + |
| 201 | + # semantic versioning |
| 202 | + detail_info = {self.DETAIL_INFO_KEY: Versioning.SEMANTIC.value} |
| 203 | + |
| 204 | + if version.epoch > self.epoch_threshold: |
| 205 | + return HeuristicResult.FAIL, detail_info |
| 206 | + if version.major > self.major_threshold: |
| 207 | + return HeuristicResult.FAIL, detail_info |
| 208 | + |
| 209 | + return HeuristicResult.PASS, detail_info |
| 210 | + |
| 211 | + def _integer_date(self, value: int, years: list[int], months: list[int], days: list[int]) -> bool: |
| 212 | + """Check whether the provided integer represents a date. |
| 213 | +
|
| 214 | + Valid representations are: |
| 215 | + - YYYYMMDD |
| 216 | + - YYYYDDMM |
| 217 | + - YYDDMM |
| 218 | + - YYMMDD |
| 219 | + - MMDDYYYY |
| 220 | + - DDMMYYYY |
| 221 | + - DDMMYY |
| 222 | + - MMDDYY |
| 223 | +
|
| 224 | + Parameters |
| 225 | + ---------- |
| 226 | + value: int |
| 227 | + The integer to check. |
| 228 | + years: list[int] |
| 229 | + A list of integers representing valid years for components of value to represent. |
| 230 | + months: list[int] |
| 231 | + A list of integers representing valid months for components of value to represent. |
| 232 | + days: list[int] |
| 233 | + A list of integers representing valid days for components of value to represent. |
| 234 | +
|
| 235 | + Returns |
| 236 | + ------- |
| 237 | + bool: |
| 238 | + True if the integer may represent a date present in the list of valid years, months and days. |
| 239 | + False otherwise. |
| 240 | + """ |
| 241 | + for date_format in self.DIGIT_DATE_FORMATS: |
| 242 | + if (date := parse_datetime(str(value), date_format)) is None: |
| 243 | + continue |
| 244 | + |
| 245 | + if date.year in years and date.month in months and date.day in days: |
| 246 | + return True |
| 247 | + |
| 248 | + return False |
| 249 | + |
| 250 | + |
| 251 | +class Versioning(Enum): |
| 252 | + """Enum used to assign different versioning methods.""" |
| 253 | + |
| 254 | + INVALID = "invalid" |
| 255 | + CALENDAR = "calendar" |
| 256 | + CALENDAR_SEMANTIC = "calendar_semantic" |
| 257 | + SEMANTIC = "semantic" |
0 commit comments