11
11
import zipfile
12
12
from dataclasses import dataclass
13
13
from datetime import datetime
14
+ from typing import Any
14
15
15
16
import requests
16
17
from bs4 import BeautifulSoup , Tag
21
22
from macaron .json_tools import json_extract
22
23
from macaron .malware_analyzer .datetime_parser import parse_datetime
23
24
from macaron .slsa_analyzer .package_registry .package_registry import PackageRegistry
25
+ from macaron .slsa_analyzer .specs .package_registry_spec import PackageRegistryInfo
24
26
from macaron .util import send_get_http_raw
25
27
26
28
logger : logging .Logger = logging .getLogger (__name__ )
@@ -231,6 +233,45 @@ def fetch_sourcecode(self, src_url: str) -> dict[str, str] | None:
231
233
logger .debug ("Successfully fetch the source code from PyPI" )
232
234
return py_files_content
233
235
236
+ def get_artifact_hash (self , artifact_url : str , hash_algorithm : Any ) -> str | None :
237
+ """Return the hash of the artifact found at the passed URL.
238
+
239
+ Parameters
240
+ ----------
241
+ artifact_url
242
+ The URL of the artifact.
243
+ hash_algorithm: Any
244
+ The hash algorithm to use.
245
+
246
+ Returns
247
+ -------
248
+ str | None
249
+ The hash of the artifact, or None if not found.
250
+ """
251
+ try :
252
+ response = requests .get (artifact_url , stream = True , timeout = 40 )
253
+ response .raise_for_status ()
254
+ except requests .exceptions .HTTPError as http_err :
255
+ logger .debug ("HTTP error occurred: %s" , http_err )
256
+ return None
257
+
258
+ if response .status_code != 200 :
259
+ logger .debug ("Invalid response: %s" , response .status_code )
260
+ return None
261
+
262
+ try :
263
+ for chunk in response .iter_content ():
264
+ hash_algorithm .update (chunk )
265
+ except RequestException as error :
266
+ # Something went wrong with the request, abort.
267
+ logger .debug ("Error while streaming source file: %s" , error )
268
+ response .close ()
269
+ return None
270
+
271
+ artifact_hash : str = hash_algorithm .hexdigest ()
272
+ logger .debug ("Computed artifact hash: %s" , artifact_hash )
273
+ return artifact_hash
274
+
234
275
def get_package_page (self , package_name : str ) -> str | None :
235
276
"""Implement custom API to get package main page.
236
277
@@ -430,15 +471,19 @@ def get_latest_version(self) -> str | None:
430
471
"""
431
472
return json_extract (self .package_json , ["info" , "version" ], str )
432
473
433
- def get_sourcecode_url (self ) -> str | None :
474
+ def get_sourcecode_url (self , package_type : str = "sdist" ) -> str | None :
434
475
"""Get the url of the source distribution.
435
476
477
+ Parameters
478
+ ----------
479
+ package_type: str
480
+ The package type to retrieve the URL of.
481
+
436
482
Returns
437
483
-------
438
484
str | None
439
485
The URL of the source distribution.
440
486
"""
441
- urls : list | None = None
442
487
if self .component_version :
443
488
urls = json_extract (self .package_json , ["releases" , self .component_version ], list )
444
489
else :
@@ -447,7 +492,7 @@ def get_sourcecode_url(self) -> str | None:
447
492
if not urls :
448
493
return None
449
494
for distribution in urls :
450
- if distribution .get ("packagetype" ) != "sdist" :
495
+ if distribution .get ("packagetype" ) != package_type :
451
496
continue
452
497
# We intentionally check if the url is None and use empty string if that's the case.
453
498
source_url : str = distribution .get ("url" ) or ""
@@ -497,3 +542,39 @@ def get_sourcecode(self) -> dict[str, str] | None:
497
542
source_code : dict [str , str ] | None = self .pypi_registry .fetch_sourcecode (url )
498
543
return source_code
499
544
return None
545
+
546
+
547
+ def find_or_create_pypi_asset (
548
+ asset_name : str , asset_version : str | None , pypi_registry_info : PackageRegistryInfo
549
+ ) -> PyPIPackageJsonAsset | None :
550
+ """Find the asset in the provided package registry information, or create it.
551
+
552
+ Parameters
553
+ ----------
554
+ asset_name: str
555
+ The name of the asset.
556
+ asset_version: str | None
557
+ The version of the asset.
558
+ pypi_registry_info:
559
+ The package registry information.
560
+
561
+ Returns
562
+ -------
563
+ PyPIPackageJsonAsset | None
564
+ The asset, or None if not found.
565
+ """
566
+ pypi_package_json = next (
567
+ (asset for asset in pypi_registry_info .metadata if isinstance (asset , PyPIPackageJsonAsset )),
568
+ None ,
569
+ )
570
+ if pypi_package_json :
571
+ return pypi_package_json
572
+
573
+ package_registry = pypi_registry_info .package_registry
574
+ if not isinstance (package_registry , PyPIRegistry ):
575
+ logger .debug ("Failed to create PyPIPackageJson asset." )
576
+ return None
577
+
578
+ asset = PyPIPackageJsonAsset (asset_name , asset_version , False , package_registry , {})
579
+ pypi_registry_info .metadata .append (asset )
580
+ return asset
0 commit comments