33import sys
44import time
55from platform import platform
6- from typing import TYPE_CHECKING , Callable , Dict , List , Optional , Tuple
6+ from typing import TYPE_CHECKING , Callable , Optional
77from uuid import UUID , uuid4
88
99import click
@@ -84,7 +84,7 @@ def scan_sca_commit_range(ctx: typer.Context, path: str, commit_range: str) -> N
8484 scan_commit_range_documents (ctx , from_commit_documents , to_commit_documents , scan_parameters = scan_parameters )
8585
8686
87- def scan_disk_files (ctx : typer .Context , paths : Tuple [str ]) -> None :
87+ def scan_disk_files (ctx : typer .Context , paths : tuple [str , ... ]) -> None :
8888 scan_type = ctx .obj ['scan_type' ]
8989 progress_bar = ctx .obj ['progress_bar' ]
9090
@@ -96,7 +96,7 @@ def scan_disk_files(ctx: typer.Context, paths: Tuple[str]) -> None:
9696 handle_scan_exception (ctx , e )
9797
9898
99- def set_issue_detected_by_scan_results (ctx : typer .Context , scan_results : List [LocalScanResult ]) -> None :
99+ def set_issue_detected_by_scan_results (ctx : typer .Context , scan_results : list [LocalScanResult ]) -> None :
100100 set_issue_detected (ctx , any (scan_result .issue_detected for scan_result in scan_results ))
101101
102102
@@ -110,6 +110,7 @@ def _should_use_sync_flow(command_scan_type: str, scan_type: str, sync_option: b
110110 - for IAC scan, sync flow is always used
111111 - for SAST scan, sync flow is not supported
112112 - for SCA and Secrets scan, sync flow is supported only for path/repository scan
113+
113114 """
114115 if not sync_option and scan_type != consts .IAC_SCAN_TYPE :
115116 return False
@@ -161,14 +162,14 @@ def _enrich_scan_result_with_data_from_detection_rules(
161162
162163def _get_scan_documents_thread_func (
163164 ctx : typer .Context , is_git_diff : bool , is_commit_range : bool , scan_parameters : dict
164- ) -> Callable [[List [Document ]], Tuple [str , CliError , LocalScanResult ]]:
165+ ) -> Callable [[list [Document ]], tuple [str , CliError , LocalScanResult ]]:
165166 cycode_client = ctx .obj ['client' ]
166167 scan_type = ctx .obj ['scan_type' ]
167168 severity_threshold = ctx .obj ['severity_threshold' ]
168169 sync_option = ctx .obj ['sync' ]
169170 command_scan_type = ctx .info_name
170171
171- def _scan_batch_thread_func (batch : List [Document ]) -> Tuple [str , CliError , LocalScanResult ]:
172+ def _scan_batch_thread_func (batch : list [Document ]) -> tuple [str , CliError , LocalScanResult ]:
172173 local_scan_result = error = error_message = None
173174 detections_count = relevant_detections_count = zip_file_size = 0
174175
@@ -297,7 +298,7 @@ def scan_commit_range(
297298
298299def scan_documents (
299300 ctx : typer .Context ,
300- documents_to_scan : List [Document ],
301+ documents_to_scan : list [Document ],
301302 scan_parameters : dict ,
302303 is_git_diff : bool = False ,
303304 is_commit_range : bool = False ,
@@ -335,13 +336,12 @@ def scan_documents(
335336
336337def scan_commit_range_documents (
337338 ctx : typer .Context ,
338- from_documents_to_scan : List [Document ],
339- to_documents_to_scan : List [Document ],
339+ from_documents_to_scan : list [Document ],
340+ to_documents_to_scan : list [Document ],
340341 scan_parameters : Optional [dict ] = None ,
341342 timeout : Optional [int ] = None ,
342343) -> None :
343- """Used by SCA only"""
344-
344+ """In use by SCA only."""
345345 cycode_client = ctx .obj ['client' ]
346346 scan_type = ctx .obj ['scan_type' ]
347347 severity_threshold = ctx .obj ['severity_threshold' ]
@@ -424,13 +424,13 @@ def scan_commit_range_documents(
424424 )
425425
426426
427- def should_scan_documents (from_documents_to_scan : List [Document ], to_documents_to_scan : List [Document ]) -> bool :
427+ def should_scan_documents (from_documents_to_scan : list [Document ], to_documents_to_scan : list [Document ]) -> bool :
428428 return len (from_documents_to_scan ) > 0 or len (to_documents_to_scan ) > 0
429429
430430
431431def create_local_scan_result (
432432 scan_result : ZippedFileScanResult ,
433- documents_to_scan : List [Document ],
433+ documents_to_scan : list [Document ],
434434 command_scan_type : str ,
435435 scan_type : str ,
436436 severity_threshold : str ,
@@ -568,15 +568,15 @@ def print_debug_scan_details(scan_details_response: 'ScanDetailsResponse') -> No
568568
569569
570570def print_results (
571- ctx : typer .Context , local_scan_results : List [LocalScanResult ], errors : Optional [Dict [str , 'CliError' ]] = None
571+ ctx : typer .Context , local_scan_results : list [LocalScanResult ], errors : Optional [dict [str , 'CliError' ]] = None
572572) -> None :
573573 printer = ctx .obj .get ('console_printer' )
574574 printer .print_scan_results (local_scan_results , errors )
575575
576576
577577def get_document_detections (
578- scan_result : ZippedFileScanResult , documents_to_scan : List [Document ]
579- ) -> List [DocumentDetections ]:
578+ scan_result : ZippedFileScanResult , documents_to_scan : list [Document ]
579+ ) -> list [DocumentDetections ]:
580580 logger .debug ('Getting document detections' )
581581
582582 document_detections = []
@@ -595,11 +595,11 @@ def get_document_detections(
595595
596596
597597def exclude_irrelevant_document_detections (
598- document_detections_list : List [DocumentDetections ],
598+ document_detections_list : list [DocumentDetections ],
599599 scan_type : str ,
600600 command_scan_type : str ,
601601 severity_threshold : str ,
602- ) -> List [DocumentDetections ]:
602+ ) -> list [DocumentDetections ]:
603603 relevant_document_detections_list = []
604604 for document_detections in document_detections_list :
605605 relevant_detections = exclude_irrelevant_detections (
@@ -614,8 +614,7 @@ def exclude_irrelevant_document_detections(
614614
615615
616616def parse_pre_receive_input () -> str :
617- """
618- Parsing input to pushed branch update details
617+ """Parse input to pushed branch update details.
619618
620619 Example input:
621620 old_value new_value refname
@@ -624,7 +623,7 @@ def parse_pre_receive_input() -> str:
624623 973a96d3e925b65941f7c47fa16129f1577d499f 0000000000000000000000000000000000000000 refs/heads/feature-branch
625624 59564ef68745bca38c42fc57a7822efd519a6bd9 3378e52dcfa47fb11ce3a4a520bea5f85d5d0bf3 refs/heads/develop
626625
627- :return: first branch update details (input's first line)
626+ :return: First branch update details (input's first line)
628627 """
629628 # FIXME(MarshalX): this blocks main thread forever if called outside of pre-receive hook
630629 pre_receive_input = sys .stdin .read ().strip ()
@@ -649,7 +648,7 @@ def _get_default_scan_parameters(ctx: typer.Context) -> dict:
649648 }
650649
651650
652- def get_scan_parameters (ctx : typer .Context , paths : Optional [Tuple [str ]] = None ) -> dict :
651+ def get_scan_parameters (ctx : typer .Context , paths : Optional [tuple [str , ... ]] = None ) -> dict :
653652 scan_parameters = _get_default_scan_parameters (ctx )
654653
655654 if not paths :
@@ -684,15 +683,14 @@ def try_get_git_remote_url(path: str) -> Optional[str]:
684683
685684
686685def _get_plastic_repository_name (path : str ) -> Optional [str ]:
687- """Gets the name of the Plastic repository from the current working directory.
686+ """Get the name of the Plastic repository from the current working directory.
688687
689688 The command to execute is:
690689 cm status --header --machinereadable --fieldseparator=":::"
691690
692691 Example of status header in machine-readable format:
693692 STATUS:::0:::Project/RepoName:::OrgName@ServerInfo
694693 """
695-
696694 try :
697695 command = [
698696 'cm' ,
@@ -718,8 +716,8 @@ def _get_plastic_repository_name(path: str) -> Optional[str]:
718716 return None
719717
720718
721- def _get_plastic_repository_list (working_dir : Optional [str ] = None ) -> Dict [str , str ]:
722- """Gets the list of Plastic repositories and their GUIDs.
719+ def _get_plastic_repository_list (working_dir : Optional [str ] = None ) -> dict [str , str ]:
720+ """Get the list of Plastic repositories and their GUIDs.
723721
724722 The command to execute is:
725723 cm repo list --format="{repname}:::{repguid}"
@@ -729,7 +727,6 @@ def _get_plastic_repository_list(working_dir: Optional[str] = None) -> Dict[str,
729727
730728 Each line represents an individual repository.
731729 """
732-
733730 repo_name_to_guid = {}
734731
735732 try :
@@ -771,14 +768,14 @@ def try_to_get_plastic_remote_url(path: str) -> Optional[str]:
771768
772769
773770def exclude_irrelevant_detections (
774- detections : List [Detection ], scan_type : str , command_scan_type : str , severity_threshold : str
775- ) -> List [Detection ]:
771+ detections : list [Detection ], scan_type : str , command_scan_type : str , severity_threshold : str
772+ ) -> list [Detection ]:
776773 relevant_detections = _exclude_detections_by_exclusions_configuration (detections , scan_type )
777774 relevant_detections = _exclude_detections_by_scan_type (relevant_detections , scan_type , command_scan_type )
778775 return _exclude_detections_by_severity (relevant_detections , severity_threshold )
779776
780777
781- def _exclude_detections_by_severity (detections : List [Detection ], severity_threshold : str ) -> List [Detection ]:
778+ def _exclude_detections_by_severity (detections : list [Detection ], severity_threshold : str ) -> list [Detection ]:
782779 relevant_detections = []
783780 for detection in detections :
784781 severity = detection .severity
@@ -795,8 +792,8 @@ def _exclude_detections_by_severity(detections: List[Detection], severity_thresh
795792
796793
797794def _exclude_detections_by_scan_type (
798- detections : List [Detection ], scan_type : str , command_scan_type : str
799- ) -> List [Detection ]:
795+ detections : list [Detection ], scan_type : str , command_scan_type : str
796+ ) -> list [Detection ]:
800797 if command_scan_type == consts .PRE_COMMIT_COMMAND_SCAN_TYPE :
801798 return exclude_detections_in_deleted_lines (detections )
802799
@@ -811,16 +808,16 @@ def _exclude_detections_by_scan_type(
811808 return detections
812809
813810
814- def exclude_detections_in_deleted_lines (detections : List [Detection ]) -> List [Detection ]:
811+ def exclude_detections_in_deleted_lines (detections : list [Detection ]) -> list [Detection ]:
815812 return [detection for detection in detections if detection .detection_details .get ('line_type' ) != 'Removed' ]
816813
817814
818- def _exclude_detections_by_exclusions_configuration (detections : List [Detection ], scan_type : str ) -> List [Detection ]:
815+ def _exclude_detections_by_exclusions_configuration (detections : list [Detection ], scan_type : str ) -> list [Detection ]:
819816 exclusions = configuration_manager .get_exclusions_by_scan_type (scan_type )
820817 return [detection for detection in detections if not _should_exclude_detection (detection , exclusions )]
821818
822819
823- def _should_exclude_detection (detection : Detection , exclusions : Dict ) -> bool :
820+ def _should_exclude_detection (detection : Detection , exclusions : dict ) -> bool :
824821 # FIXME(MarshalX): what the difference between by_value and by_sha?
825822 exclusions_by_value = exclusions .get (consts .EXCLUSIONS_BY_VALUE_SECTION_NAME , [])
826823 if _is_detection_sha_configured_in_exclusions (detection , exclusions_by_value ):
@@ -862,7 +859,7 @@ def _should_exclude_detection(detection: Detection, exclusions: Dict) -> bool:
862859 return False
863860
864861
865- def _is_detection_sha_configured_in_exclusions (detection : Detection , exclusions : List [str ]) -> bool :
862+ def _is_detection_sha_configured_in_exclusions (detection : Detection , exclusions : list [str ]) -> bool :
866863 detection_sha = detection .detection_details .get ('sha512' )
867864 return detection_sha in exclusions
868865
@@ -886,7 +883,7 @@ def _get_cve_identifier(detection: Detection) -> Optional[str]:
886883
887884
888885def _get_document_by_file_name (
889- documents : List [Document ], file_name : str , unique_id : Optional [str ] = None
886+ documents : list [Document ], file_name : str , unique_id : Optional [str ] = None
890887) -> Optional [Document ]:
891888 for document in documents :
892889 if _normalize_file_path (document .path ) == _normalize_file_path (file_name ) and document .unique_id == unique_id :
@@ -992,10 +989,11 @@ def _try_get_aggregation_report_url_if_needed(
992989 logger .debug ('Failed to get aggregation report url: %s' , str (e ))
993990
994991
995- def _map_detections_per_file_and_commit_id (scan_type : str , raw_detections : List [dict ]) -> List [DetectionsPerFile ]:
996- """Converts list of detections (async flow) to list of DetectionsPerFile objects (sync flow).
992+ def _map_detections_per_file_and_commit_id (scan_type : str , raw_detections : list [dict ]) -> list [DetectionsPerFile ]:
993+ """Convert a list of detections (async flow) to list of DetectionsPerFile objects (sync flow).
997994
998995 Args:
996+ scan_type: Type of the scan.
999997 raw_detections: List of detections as is returned from the server.
1000998
1001999 Note:
@@ -1004,6 +1002,7 @@ def _map_detections_per_file_and_commit_id(scan_type: str, raw_detections: List[
10041002
10051003 Note:
10061004 Aggregation is performed by file name and commit ID (if available)
1005+
10071006 """
10081007 detections_per_files = {}
10091008 for raw_detection in raw_detections :
@@ -1045,7 +1044,7 @@ def _get_secret_file_name_from_detection(raw_detection: dict) -> str:
10451044 return os .path .join (file_path , file_name )
10461045
10471046
1048- def _does_reach_to_max_commits_to_scan_limit (commit_ids : List [str ], max_commits_count : Optional [int ]) -> bool :
1047+ def _does_reach_to_max_commits_to_scan_limit (commit_ids : list [str ], max_commits_count : Optional [int ]) -> bool :
10491048 if max_commits_count is None :
10501049 return False
10511050
0 commit comments