|
| 1 | +import os |
| 2 | +from typing import List, Tuple |
1 | 3 | from uuid import uuid4 |
2 | 4 |
|
3 | 5 | import pytest |
| 6 | +import requests |
4 | 7 | import responses |
| 8 | +from requests.exceptions import ConnectionError as RequestsConnectionError |
5 | 9 |
|
6 | 10 | from cycode.cli.cli_types import ScanTypeOption |
| 11 | +from cycode.cli.exceptions.custom_exceptions import ( |
| 12 | + HttpUnauthorizedError, |
| 13 | + RequestConnectionError, |
| 14 | + RequestHttpError, |
| 15 | + RequestTimeout, |
| 16 | +) |
| 17 | +from cycode.cli.files_collector.models.in_memory_zip import InMemoryZip |
| 18 | +from cycode.cli.models import Document |
7 | 19 | from cycode.cyclient.scan_client import ScanClient |
| 20 | +from tests.conftest import ZIP_CONTENT_PATH |
8 | 21 | from tests.cyclient.mocked_responses.scan_client import ( |
9 | 22 | get_scan_aggregation_report_url, |
10 | 23 | get_scan_aggregation_report_url_response, |
| 24 | + get_scan_details_response, |
| 25 | + get_scan_details_url, |
| 26 | + get_zipped_file_scan_async_response, |
| 27 | + get_zipped_file_scan_async_url, |
11 | 28 | ) |
12 | 29 |
|
13 | 30 |
|
| 31 | +def zip_scan_resources(scan_type: str, scan_client: ScanClient) -> Tuple[str, InMemoryZip]: |
| 32 | + url = get_zipped_file_scan_async_url(scan_type, scan_client) |
| 33 | + zip_file = get_test_zip_file(scan_type) |
| 34 | + |
| 35 | + return url, zip_file |
| 36 | + |
| 37 | + |
| 38 | +def get_test_zip_file(scan_type: str) -> InMemoryZip: |
| 39 | + # TODO(MarshalX): refactor scan_disk_files in code_scanner.py to reuse method here instead of this |
| 40 | + test_documents: List[Document] = [] |
| 41 | + for root, _, files in os.walk(ZIP_CONTENT_PATH): |
| 42 | + for name in files: |
| 43 | + path = os.path.join(root, name) |
| 44 | + with open(path, 'r', encoding='UTF-8') as f: |
| 45 | + test_documents.append(Document(path, f.read(), is_git_diff_format=False)) |
| 46 | + |
| 47 | + from cycode.cli.files_collector.zip_documents import zip_documents |
| 48 | + |
| 49 | + return zip_documents(scan_type, test_documents) |
| 50 | + |
| 51 | + |
| 52 | +@pytest.mark.parametrize('scan_type', list(ScanTypeOption)) |
| 53 | +@responses.activate |
| 54 | +def test_zipped_file_scan_async( |
| 55 | + scan_type: str, scan_client: ScanClient, api_token_response: responses.Response |
| 56 | +) -> None: |
| 57 | + """Test the zipped_file_scan_async method for the async flow.""" |
| 58 | + url, zip_file = zip_scan_resources(scan_type, scan_client) |
| 59 | + expected_scan_id = uuid4() |
| 60 | + |
| 61 | + responses.add(api_token_response) # mock token based client |
| 62 | + responses.add(get_zipped_file_scan_async_response(url, expected_scan_id)) |
| 63 | + |
| 64 | + # Call the method with the correct parameter order |
| 65 | + scan_initialization_response = scan_client.zipped_file_scan_async( |
| 66 | + zip_file=zip_file, scan_type=scan_type, scan_parameters={} |
| 67 | + ) |
| 68 | + assert scan_initialization_response.scan_id == str(expected_scan_id) |
| 69 | + |
| 70 | + |
| 71 | +@pytest.mark.parametrize('scan_type', list(ScanTypeOption)) |
| 72 | +@responses.activate |
| 73 | +def test_get_scan_report_url(scan_type: str, scan_client: ScanClient, api_token_response: responses.Response) -> None: |
| 74 | + """Test getting the scan report URL for the async flow.""" |
| 75 | + scan_id = uuid4() |
| 76 | + url = get_scan_aggregation_report_url(scan_id, scan_client, scan_type) |
| 77 | + |
| 78 | + responses.add(api_token_response) # mock token based client |
| 79 | + responses.add(get_scan_aggregation_report_url_response(url, scan_id)) |
| 80 | + |
| 81 | + scan_report_url_response = scan_client.get_scan_aggregation_report_url(str(scan_id), scan_type) |
| 82 | + assert scan_report_url_response.report_url == f'https://app.domain/cli-logs-aggregation/{scan_id}' |
| 83 | + |
| 84 | + |
14 | 85 | @pytest.mark.parametrize('scan_type', list(ScanTypeOption)) |
15 | 86 | @responses.activate |
16 | | -def test_get_scan_report_url( |
17 | | - scan_type: ScanTypeOption, scan_client: ScanClient, api_token_response: responses.Response |
| 87 | +def test_zipped_file_scan_async_unauthorized_error( |
| 88 | + scan_type: str, scan_client: ScanClient, api_token_response: responses.Response |
18 | 89 | ) -> None: |
19 | | - aggregation_id = uuid4() |
20 | | - url = get_scan_aggregation_report_url(aggregation_id, scan_client, scan_type) |
| 90 | + """Test handling of unauthorized errors in the async flow.""" |
| 91 | + url, zip_file = zip_scan_resources(scan_type, scan_client) |
| 92 | + |
| 93 | + responses.add(api_token_response) # mock token based client |
| 94 | + responses.add(method=responses.POST, url=url, status=401, body='Unauthorized') |
| 95 | + |
| 96 | + with pytest.raises(HttpUnauthorizedError) as e_info: |
| 97 | + scan_client.zipped_file_scan_async(zip_file=zip_file, scan_type=scan_type, scan_parameters={}) |
| 98 | + |
| 99 | + assert e_info.value.status_code == 401 |
| 100 | + |
| 101 | + |
| 102 | +@pytest.mark.parametrize('scan_type', list(ScanTypeOption)) |
| 103 | +@responses.activate |
| 104 | +def test_zipped_file_scan_async_bad_request_error( |
| 105 | + scan_type: str, scan_client: ScanClient, api_token_response: responses.Response |
| 106 | +) -> None: |
| 107 | + """Test handling of bad request errors in the async flow.""" |
| 108 | + url, zip_file = zip_scan_resources(scan_type, scan_client) |
| 109 | + |
| 110 | + expected_status_code = 400 |
| 111 | + expected_response_text = 'Bad Request' |
| 112 | + |
| 113 | + responses.add(api_token_response) # mock token based client |
| 114 | + responses.add(method=responses.POST, url=url, status=expected_status_code, body=expected_response_text) |
| 115 | + |
| 116 | + with pytest.raises(RequestHttpError) as e_info: |
| 117 | + scan_client.zipped_file_scan_async(zip_file=zip_file, scan_type=scan_type, scan_parameters={}) |
| 118 | + |
| 119 | + assert e_info.value.status_code == expected_status_code |
| 120 | + assert e_info.value.error_message == expected_response_text |
| 121 | + |
| 122 | + |
| 123 | +@pytest.mark.parametrize('scan_type', list(ScanTypeOption)) |
| 124 | +@responses.activate |
| 125 | +def test_zipped_file_scan_async_timeout_error( |
| 126 | + scan_type: str, scan_client: ScanClient, api_token_response: responses.Response |
| 127 | +) -> None: |
| 128 | + """Test handling of timeout errors in the async flow.""" |
| 129 | + url, zip_file = zip_scan_resources(scan_type, scan_client) |
| 130 | + |
| 131 | + # Create a timeout response |
| 132 | + timeout_error = requests.exceptions.Timeout('Connection timed out') |
| 133 | + |
| 134 | + responses.add(api_token_response) # mock token based client |
| 135 | + responses.add(method=responses.POST, url=url, body=timeout_error) |
| 136 | + |
| 137 | + with pytest.raises(RequestTimeout): |
| 138 | + scan_client.zipped_file_scan_async(zip_file=zip_file, scan_type=scan_type, scan_parameters={}) |
| 139 | + |
| 140 | + |
| 141 | +@pytest.mark.parametrize('scan_type', list(ScanTypeOption)) |
| 142 | +@responses.activate |
| 143 | +def test_zipped_file_scan_async_connection_error( |
| 144 | + scan_type: str, scan_client: ScanClient, api_token_response: responses.Response |
| 145 | +) -> None: |
| 146 | + """Test handling of connection errors in the async flow.""" |
| 147 | + url, zip_file = zip_scan_resources(scan_type, scan_client) |
| 148 | + |
| 149 | + # Create a connection error response |
| 150 | + connection_error = RequestsConnectionError('Connection refused') |
| 151 | + |
| 152 | + responses.add(api_token_response) # mock token based client |
| 153 | + responses.add(method=responses.POST, url=url, body=connection_error) |
| 154 | + |
| 155 | + with pytest.raises(RequestConnectionError): |
| 156 | + scan_client.zipped_file_scan_async(zip_file=zip_file, scan_type=scan_type, scan_parameters={}) |
| 157 | + |
| 158 | + |
| 159 | +@pytest.mark.parametrize('scan_type', list(ScanTypeOption)) |
| 160 | +@responses.activate |
| 161 | +def test_get_scan_details(scan_type: str, scan_client: ScanClient, api_token_response: responses.Response) -> None: |
| 162 | + """Test getting scan details in the async flow.""" |
| 163 | + scan_id = uuid4() |
| 164 | + url = get_scan_details_url(scan_type, scan_id, scan_client) |
21 | 165 |
|
22 | 166 | responses.add(api_token_response) # mock token based client |
23 | | - responses.add(get_scan_aggregation_report_url_response(url, aggregation_id)) |
| 167 | + responses.add(get_scan_details_response(url, scan_id)) |
24 | 168 |
|
25 | | - scan_report_url_response = scan_client.get_scan_aggregation_report_url(str(aggregation_id), scan_type) |
26 | | - assert scan_report_url_response.report_url == f'https://app.domain/cli-logs-aggregation/{aggregation_id}' |
| 169 | + scan_details_response = scan_client.get_scan_details(scan_type, str(scan_id)) |
| 170 | + assert scan_details_response.id == str(scan_id) |
| 171 | + assert scan_details_response.scan_status == 'Completed' |
0 commit comments