From ae4b18b2b85d4d62e0701b15e436342dcdc90371 Mon Sep 17 00:00:00 2001 From: "omer.roth" Date: Thu, 6 Nov 2025 14:54:46 +0200 Subject: [PATCH] initial added tests and ran ruff ran ruff format ruff fixes ruff format changed file path parameter to positional parameter ran ruff format updated test related to file positional parameter updated test per ruff check updated README Squashed commits for import sbom --- README.md | 25 +- cycode/cli/app.py | 3 +- cycode/cli/apps/report_import/__init__.py | 8 + .../report_import/report_import_command.py | 13 + .../cli/apps/report_import/sbom/__init__.py | 6 + .../apps/report_import/sbom/sbom_command.py | 76 ++++ cycode/cli/cli_types.py | 6 + cycode/cli/utils/get_api_client.py | 9 +- cycode/cyclient/client_creator.py | 6 + cycode/cyclient/import_sbom_client.py | 81 +++++ cycode/cyclient/models.py | 40 ++- tests/cli/commands/import/__init__.py | 0 tests/cli/commands/import/test_import_sbom.py | 329 ++++++++++++++++++ .../mocked_responses/import_sbom_client.py | 67 ++++ 14 files changed, 663 insertions(+), 6 deletions(-) create mode 100644 cycode/cli/apps/report_import/__init__.py create mode 100644 cycode/cli/apps/report_import/report_import_command.py create mode 100644 cycode/cli/apps/report_import/sbom/__init__.py create mode 100644 cycode/cli/apps/report_import/sbom/sbom_command.py create mode 100644 cycode/cyclient/import_sbom_client.py create mode 100644 tests/cli/commands/import/__init__.py create mode 100644 tests/cli/commands/import/test_import_sbom.py create mode 100644 tests/cyclient/mocked_responses/import_sbom_client.py diff --git a/README.md b/README.md index ffc02d22..0cf2dc76 100644 --- a/README.md +++ b/README.md @@ -56,8 +56,9 @@ This guide walks you through both installation and usage. 6. [Ignoring via a config file](#ignoring-via-a-config-file) 6. [Report command](#report-command) 1. [Generating SBOM Report](#generating-sbom-report) -7. [Scan logs](#scan-logs) -8. [Syntax Help](#syntax-help) +7. [Import command](#import-command) +8. [Scan logs](#scan-logs) +9. [Syntax Help](#syntax-help) # Prerequisites @@ -1295,6 +1296,26 @@ To create an SBOM report for a path:\ For example:\ `cycode report sbom --format spdx-2.3 --include-vulnerabilities --include-dev-dependencies path /path/to/local/project` +# Import Command + +## Importing SBOM + +A software bill of materials (SBOM) is an inventory of all constituent components and software dependencies involved in the development and delivery of an application. +Using this command, you can import an SBOM file from your file system into Cycode. + +The following options are available for use with this command: + +| Option | Description | Required | Default | +|----------------------------------------------------|--------------------------------------------|----------|-------------------------------------------------------| +| `-n, --name TEXT` | Display name of the SBOM | Yes | | +| `-v, --vendor TEXT` | Name of the entity that provided the SBOM | Yes | | +| `-l, --label TEXT` | Attach label to the SBOM | No | | +| `-o, --owner TEXT` | Email address of the Cycode user that serves as point of contact for this SBOM | No | | +| `-b, --business-impact [High \| Medium \| Low]` | Business Impact | No | Medium | + +For example:\ +`cycode import sbom --name example-sbom --vendor cycode -label tag1 -label tag2 --owner example@cycode.com /path/to/local/project` + # Scan Logs All CLI scans are logged in Cycode. The logs can be found under Settings > CLI Logs. diff --git a/cycode/cli/app.py b/cycode/cli/app.py index 1b13ebf2..04872b7d 100644 --- a/cycode/cli/app.py +++ b/cycode/cli/app.py @@ -9,7 +9,7 @@ from typer.completion import install_callback, show_callback from cycode import __version__ -from cycode.cli.apps import ai_remediation, auth, configure, ignore, report, scan, status +from cycode.cli.apps import ai_remediation, auth, configure, ignore, report, report_import, scan, status if sys.version_info >= (3, 10): from cycode.cli.apps import mcp @@ -50,6 +50,7 @@ app.add_typer(configure.app) app.add_typer(ignore.app) app.add_typer(report.app) +app.add_typer(report_import.app) app.add_typer(scan.app) app.add_typer(status.app) if sys.version_info >= (3, 10): diff --git a/cycode/cli/apps/report_import/__init__.py b/cycode/cli/apps/report_import/__init__.py new file mode 100644 index 00000000..1fd2475b --- /dev/null +++ b/cycode/cli/apps/report_import/__init__.py @@ -0,0 +1,8 @@ +import typer + +from cycode.cli.apps.report_import.report_import_command import report_import_command +from cycode.cli.apps.report_import.sbom import sbom_command + +app = typer.Typer(name='import', no_args_is_help=True) +app.callback(short_help='Import report. You`ll need to specify which report type to import.')(report_import_command) +app.command(name='sbom', short_help='Import SBOM report from a local path.')(sbom_command) diff --git a/cycode/cli/apps/report_import/report_import_command.py b/cycode/cli/apps/report_import/report_import_command.py new file mode 100644 index 00000000..7f4e8844 --- /dev/null +++ b/cycode/cli/apps/report_import/report_import_command.py @@ -0,0 +1,13 @@ +import typer + +from cycode.cli.utils.sentry import add_breadcrumb + + +def report_import_command(ctx: typer.Context) -> int: + """:bar_chart: [bold cyan]Import security reports.[/] + + Example usage: + * `cycode import sbom`: Import SBOM report + """ + add_breadcrumb('import') + return 1 diff --git a/cycode/cli/apps/report_import/sbom/__init__.py b/cycode/cli/apps/report_import/sbom/__init__.py new file mode 100644 index 00000000..4d667031 --- /dev/null +++ b/cycode/cli/apps/report_import/sbom/__init__.py @@ -0,0 +1,6 @@ +import typer + +from cycode.cli.apps.report_import.sbom.sbom_command import sbom_command + +app = typer.Typer(name='sbom') +app.command(name='path', short_help='Import SBOM report from a local path.')(sbom_command) diff --git a/cycode/cli/apps/report_import/sbom/sbom_command.py b/cycode/cli/apps/report_import/sbom/sbom_command.py new file mode 100644 index 00000000..de9e85d4 --- /dev/null +++ b/cycode/cli/apps/report_import/sbom/sbom_command.py @@ -0,0 +1,76 @@ +from pathlib import Path +from typing import Annotated, Optional + +import typer + +from cycode.cli.cli_types import BusinessImpactOption +from cycode.cli.exceptions.handle_report_sbom_errors import handle_report_exception +from cycode.cli.utils.get_api_client import get_import_sbom_cycode_client +from cycode.cli.utils.sentry import add_breadcrumb +from cycode.cyclient.import_sbom_client import ImportSbomParameters + + +def sbom_command( + ctx: typer.Context, + path: Annotated[ + Path, + typer.Argument( + exists=True, resolve_path=True, dir_okay=False, readable=True, help='Path to SBOM file.', show_default=False + ), + ], + sbom_name: Annotated[ + str, typer.Option('--name', '-n', help='SBOM Name.', case_sensitive=False, show_default=False) + ], + vendor: Annotated[ + str, typer.Option('--vendor', '-v', help='Vendor Name.', case_sensitive=False, show_default=False) + ], + labels: Annotated[ + Optional[list[str]], + typer.Option( + '--label', '-l', help='Label, can be specified multiple times.', case_sensitive=False, show_default=False + ), + ] = None, + owners: Annotated[ + Optional[list[str]], + typer.Option( + '--owner', + '-o', + help='Email address of a user in Cycode platform, can be specified multiple times.', + case_sensitive=True, + show_default=False, + ), + ] = None, + business_impact: Annotated[ + BusinessImpactOption, + typer.Option( + '--business-impact', + '-b', + help='Business Impact.', + case_sensitive=True, + show_default=True, + ), + ] = BusinessImpactOption.MEDIUM, +) -> None: + """Import SBOM.""" + add_breadcrumb('sbom') + + client = get_import_sbom_cycode_client(ctx) + + import_parameters = ImportSbomParameters( + Name=sbom_name, + Vendor=vendor, + BusinessImpact=business_impact, + Labels=labels, + Owners=owners, + ) + + try: + if not path.exists(): + from errno import ENOENT + from os import strerror + + raise FileNotFoundError(ENOENT, strerror(ENOENT), path.absolute()) + + client.request_sbom_import_execution(import_parameters, path) + except Exception as e: + handle_report_exception(ctx, e) diff --git a/cycode/cli/cli_types.py b/cycode/cli/cli_types.py index a5d7f9d9..63a1cb36 100644 --- a/cycode/cli/cli_types.py +++ b/cycode/cli/cli_types.py @@ -52,6 +52,12 @@ class SbomOutputFormatOption(StrEnum): JSON = 'json' +class BusinessImpactOption(StrEnum): + HIGH = 'High' + MEDIUM = 'Medium' + LOW = 'Low' + + class SeverityOption(StrEnum): INFO = 'info' LOW = 'low' diff --git a/cycode/cli/utils/get_api_client.py b/cycode/cli/utils/get_api_client.py index 110d528b..ba98d937 100644 --- a/cycode/cli/utils/get_api_client.py +++ b/cycode/cli/utils/get_api_client.py @@ -3,11 +3,12 @@ import click from cycode.cli.user_settings.credentials_manager import CredentialsManager -from cycode.cyclient.client_creator import create_report_client, create_scan_client +from cycode.cyclient.client_creator import create_import_sbom_client, create_report_client, create_scan_client if TYPE_CHECKING: import typer + from cycode.cyclient.import_sbom_client import ImportSbomClient from cycode.cyclient.report_client import ReportClient from cycode.cyclient.scan_client import ScanClient @@ -38,6 +39,12 @@ def get_report_cycode_client(ctx: 'typer.Context', hide_response_log: bool = Tru return _get_cycode_client(create_report_client, client_id, client_secret, hide_response_log) +def get_import_sbom_cycode_client(ctx: 'typer.Context', hide_response_log: bool = True) -> 'ImportSbomClient': + client_id = ctx.obj.get('client_id') + client_secret = ctx.obj.get('client_secret') + return _get_cycode_client(create_import_sbom_client, client_id, client_secret, hide_response_log) + + def _get_configured_credentials() -> tuple[str, str]: credentials_manager = CredentialsManager() return credentials_manager.get_credentials() diff --git a/cycode/cyclient/client_creator.py b/cycode/cyclient/client_creator.py index 45911589..68845646 100644 --- a/cycode/cyclient/client_creator.py +++ b/cycode/cyclient/client_creator.py @@ -2,6 +2,7 @@ from cycode.cyclient.config_dev import DEV_CYCODE_API_URL from cycode.cyclient.cycode_dev_based_client import CycodeDevBasedClient from cycode.cyclient.cycode_token_based_client import CycodeTokenBasedClient +from cycode.cyclient.import_sbom_client import ImportSbomClient from cycode.cyclient.report_client import ReportClient from cycode.cyclient.scan_client import ScanClient from cycode.cyclient.scan_config_base import DefaultScanConfig, DevScanConfig @@ -21,3 +22,8 @@ def create_scan_client(client_id: str, client_secret: str, hide_response_log: bo def create_report_client(client_id: str, client_secret: str, _: bool) -> ReportClient: client = CycodeDevBasedClient(DEV_CYCODE_API_URL) if dev_mode else CycodeTokenBasedClient(client_id, client_secret) return ReportClient(client) + + +def create_import_sbom_client(client_id: str, client_secret: str, _: bool) -> ImportSbomClient: + client = CycodeDevBasedClient(DEV_CYCODE_API_URL) if dev_mode else CycodeTokenBasedClient(client_id, client_secret) + return ImportSbomClient(client) diff --git a/cycode/cyclient/import_sbom_client.py b/cycode/cyclient/import_sbom_client.py new file mode 100644 index 00000000..d8107cf5 --- /dev/null +++ b/cycode/cyclient/import_sbom_client.py @@ -0,0 +1,81 @@ +import dataclasses +from pathlib import Path +from typing import Optional + +from requests import Response + +from cycode.cli.cli_types import BusinessImpactOption +from cycode.cli.exceptions.custom_exceptions import RequestHttpError +from cycode.cyclient import models +from cycode.cyclient.cycode_client_base import CycodeClientBase + + +@dataclasses.dataclass +class ImportSbomParameters: + Name: str + Vendor: str + BusinessImpact: BusinessImpactOption + Labels: Optional[list[str]] + Owners: Optional[list[str]] + + def _owners_to_ids(self) -> list[str]: + return [] + + def to_request_form(self) -> dict: + form_data = {} + for field in dataclasses.fields(self): + key = field.name + val = getattr(self, key) + if val is None or len(val) == 0: + continue + if isinstance(val, list): + form_data[f'{key}[]'] = val + else: + form_data[key] = val + return form_data + + +class ImportSbomClient: + IMPORT_SBOM_REQUEST_PATH: str = 'v4/sbom/import' + GET_USER_ID_REQUEST_PATH: str = 'v4/members' + + def __init__(self, client: CycodeClientBase) -> None: + self.client = client + + def request_sbom_import_execution(self, params: ImportSbomParameters, file_path: Path) -> None: + if params.Owners: + owners_ids = self.get_owners_user_ids(params.Owners) + params.Owners = owners_ids + + form_data = params.to_request_form() + + with open(file_path.absolute(), 'rb') as f: + request_args = { + 'url_path': self.IMPORT_SBOM_REQUEST_PATH, + 'data': form_data, + 'files': {'File': f}, + } + + response = self.client.post(**request_args) + + if response.status_code != 201: + raise RequestHttpError(response.status_code, response.text, response) + + def get_owners_user_ids(self, owners: list[str]) -> list[str]: + return [self._get_user_id_by_email(owner) for owner in owners] + + def _get_user_id_by_email(self, email: str) -> str: + request_args = {'url_path': self.GET_USER_ID_REQUEST_PATH, 'params': {'email': email}} + + response = self.client.get(**request_args) + member_details = self.parse_requested_member_details_response(response) + + if not member_details.items: + raise Exception( + f"Failed to find user with email '{email}'. Verify this email is registered to Cycode platform" + ) + return member_details.items.pop(0).external_id + + @staticmethod + def parse_requested_member_details_response(response: Response) -> models.MemberDetails: + return models.RequestedMemberDetailsResultSchema().load(response.json()) diff --git a/cycode/cyclient/models.py b/cycode/cyclient/models.py index f6419645..c3144a53 100644 --- a/cycode/cyclient/models.py +++ b/cycode/cyclient/models.py @@ -47,10 +47,10 @@ class DetectionSchema(Schema): class Meta: unknown = EXCLUDE - id = fields.String(missing=None) + id = fields.String(load_default=None) message = fields.String() type = fields.String() - severity = fields.String(missing=None) + severity = fields.String(load_default=None) detection_type_id = fields.String() detection_details = fields.Dict() detection_rule_id = fields.String() @@ -402,6 +402,42 @@ def build_dto(self, data: dict[str, Any], **_) -> SbomReport: return SbomReport(**data) +@dataclass +class Member: + external_id: str + + +class MemberSchema(Schema): + class Meta: + unknown = EXCLUDE + + external_id = fields.String() + + @post_load + def build_dto(self, data: dict[str, Any], **_) -> Member: + return Member(**data) + + +@dataclass +class MemberDetails: + items: list[Member] + page_size: int + next_page_token: Optional[str] + + +class RequestedMemberDetailsResultSchema(Schema): + class Meta: + unknown = EXCLUDE + + items = fields.List(fields.Nested(MemberSchema)) + page_size = fields.Integer() + next_page_token = fields.String(allow_none=True) + + @post_load + def build_dto(self, data: dict[str, Any], **_) -> MemberDetails: + return MemberDetails(**data) + + @dataclass class ClassificationData: severity: str diff --git a/tests/cli/commands/import/__init__.py b/tests/cli/commands/import/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/cli/commands/import/test_import_sbom.py b/tests/cli/commands/import/test_import_sbom.py new file mode 100644 index 00000000..ecea2c6d --- /dev/null +++ b/tests/cli/commands/import/test_import_sbom.py @@ -0,0 +1,329 @@ +import tempfile +from pathlib import Path +from typing import TYPE_CHECKING + +import pytest +import responses +from typer.testing import CliRunner + +from cycode.cli.app import app +from cycode.cli.cli_types import BusinessImpactOption +from cycode.cyclient.client_creator import create_import_sbom_client +from cycode.cyclient.import_sbom_client import ImportSbomClient, ImportSbomParameters +from tests.conftest import _CLIENT_ID, _CLIENT_SECRET, CLI_ENV_VARS +from tests.cyclient.mocked_responses import import_sbom_client as mocked_import_sbom + +if TYPE_CHECKING: + from pytest_mock import MockerFixture + + +@pytest.fixture(scope='session') +def import_sbom_client() -> ImportSbomClient: + return create_import_sbom_client(_CLIENT_ID, _CLIENT_SECRET, False) + + +def _validate_called_endpoint(calls: responses.CallList, path: str, expected_count: int = 1) -> None: + # Verify the import request was made + import_calls = [c for c in calls if path in c.request.url] + assert len(import_calls) == expected_count + + +class TestImportSbomParameters: + """Tests for ImportSbomParameters.to_request_form() method""" + + def test_to_request_form_with_all_fields(self) -> None: + data = { + 'Name': 'test-sbom', + 'Vendor': 'test-vendor', + 'BusinessImpact': BusinessImpactOption.HIGH, + 'Labels': ['label1', 'label2'], + 'Owners': ['owner1-id', 'owner2-id'], + } + + params = ImportSbomParameters(**data) + form_data = params.to_request_form() + + for key, val in data.items(): + if isinstance(val, list): + assert form_data[f'{key}[]'] == val + else: + assert form_data[key] == val + + def test_to_request_form_with_required_fields_only(self) -> None: + params = ImportSbomParameters( + Name='test-sbom', + Vendor='test-vendor', + BusinessImpact=BusinessImpactOption.MEDIUM, + Labels=[], + Owners=[], + ) + form_data = params.to_request_form() + + # Assert + assert form_data['Name'] == 'test-sbom' + assert form_data['Vendor'] == 'test-vendor' + assert form_data['BusinessImpact'] == BusinessImpactOption.MEDIUM + assert 'Labels[]' not in form_data + assert 'Owners[]' not in form_data + + +class TestSbomCommand: + """Tests for sbom_command with CLI integration""" + + @responses.activate + def test_sbom_command_happy_path( + self, + mocker: 'MockerFixture', + api_token_response: responses.Response, + import_sbom_client: ImportSbomClient, + ) -> None: + responses.add(api_token_response) + mocked_import_sbom.mock_import_sbom_responses(responses, import_sbom_client) + + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as temp_file: + temp_file.write('{"sbom": "content"}') + temp_file_path = temp_file.name + + try: + args = [ + 'import', + 'sbom', + '--name', + 'test-sbom', + '--vendor', + 'test-vendor', + temp_file_path, + ] + result = CliRunner().invoke(app, args, env=CLI_ENV_VARS) + + assert result.exit_code == 0 + _validate_called_endpoint(responses.calls, ImportSbomClient.IMPORT_SBOM_REQUEST_PATH) + + finally: + Path(temp_file_path).unlink(missing_ok=True) + + @responses.activate + def test_sbom_command_with_all_options( + self, + mocker: 'MockerFixture', + api_token_response: responses.Response, + import_sbom_client: ImportSbomClient, + ) -> None: + responses.add(api_token_response) + mocked_import_sbom.mock_member_details_response(responses, import_sbom_client, 'user1@example.com', 'user-123') + mocked_import_sbom.mock_import_sbom_responses(responses, import_sbom_client) + + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as temp_file: + temp_file.write('{"sbom": "content"}') + temp_file_path = temp_file.name + + try: + args = [ + 'import', + 'sbom', + '--name', + 'test-sbom', + '--vendor', + 'test-vendor', + '--label', + 'production', + '--label', + 'critical', + '--owner', + 'user1@example.com', + '--business-impact', + 'High', + temp_file_path, + ] + result = CliRunner().invoke(app, args, env=CLI_ENV_VARS) + + # Assert + assert result.exit_code == 0 + _validate_called_endpoint(responses.calls, ImportSbomClient.IMPORT_SBOM_REQUEST_PATH) + _validate_called_endpoint(responses.calls, ImportSbomClient.GET_USER_ID_REQUEST_PATH) + + finally: + Path(temp_file_path).unlink(missing_ok=True) + + def test_sbom_command_file_not_exists(self, mocker: 'MockerFixture') -> None: + from uuid import uuid4 + + non_existent_file = str(uuid4()) + + args = [ + 'import', + 'sbom', + '--name', + 'test-sbom', + '--vendor', + 'test-vendor', + non_existent_file, + ] + result = CliRunner().invoke(app, args, env=CLI_ENV_VARS) + + assert result.exit_code != 0 + assert "Invalid value for 'PATH': File " in result.output + assert 'not exist' in result.output + + def test_sbom_command_file_is_directory(self, mocker: 'MockerFixture') -> None: + with tempfile.TemporaryDirectory() as temp_dir: + args = [ + 'import', + 'sbom', + '--name', + 'test-sbom', + '--vendor', + 'test-vendor', + temp_dir, + ] + result = CliRunner().invoke(app, args, env=CLI_ENV_VARS) + + # Typer should reject this before the command runs + assert result.exit_code != 0 + # The error message contains "is a" and "directory" (may be across lines) + assert 'directory' in result.output.lower() + + @responses.activate + def test_sbom_command_invalid_owner_email( + self, + mocker: 'MockerFixture', + api_token_response: responses.Response, + import_sbom_client: ImportSbomClient, + ) -> None: + responses.add(api_token_response) + # Mock with no external_id to simulate user not found + mocked_import_sbom.mock_member_details_response(responses, import_sbom_client, 'invalid@example.com', None) + + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as temp_file: + temp_file.write('{"sbom": "content"}') + temp_file_path = temp_file.name + + try: + args = [ + 'import', + 'sbom', + '--name', + 'test-sbom', + '--vendor', + 'test-vendor', + '--owner', + 'invalid@example.com', + temp_file_path, + ] + result = CliRunner().invoke(app, args, env=CLI_ENV_VARS) + + assert result.exit_code == 1 + assert 'invalid@example.com' in result.output + assert 'Failed to find user' in result.output or 'not found' in result.output.lower() + finally: + Path(temp_file_path).unlink(missing_ok=True) + + @responses.activate + def test_sbom_command_http_400_error( + self, + mocker: 'MockerFixture', + api_token_response: responses.Response, + import_sbom_client: ImportSbomClient, + ) -> None: + responses.add(api_token_response) + # Mock the SBOM import API endpoint to return 400 + mocked_import_sbom.mock_import_sbom_responses(responses, import_sbom_client, status=400) + + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as temp_file: + temp_file.write('{"sbom": "content"}') + temp_file_path = temp_file.name + + try: + args = [ + 'import', + 'sbom', + '--name', + 'test-sbom', + '--vendor', + 'test-vendor', + temp_file_path, + ] + result = CliRunner().invoke(app, args, env=CLI_ENV_VARS) + + # HTTP 400 errors are also soft failures - exit with code 0 + assert result.exit_code == 0 + _validate_called_endpoint(responses.calls, ImportSbomClient.IMPORT_SBOM_REQUEST_PATH) + finally: + Path(temp_file_path).unlink(missing_ok=True) + + @responses.activate + def test_sbom_command_http_500_error( + self, + mocker: 'MockerFixture', + api_token_response: responses.Response, + import_sbom_client: ImportSbomClient, + ) -> None: + responses.add(api_token_response) + # Mock the SBOM import API endpoint to return 500 (soft failure) + mocked_import_sbom.mock_import_sbom_responses(responses, import_sbom_client, status=500) + + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as temp_file: + temp_file.write('{"sbom": "content"}') + temp_file_path = temp_file.name + + try: + args = [ + 'import', + 'sbom', + '--name', + 'test-sbom', + '--vendor', + 'test-vendor', + temp_file_path, + ] + result = CliRunner().invoke(app, args, env=CLI_ENV_VARS) + + assert result.exit_code == 0 + _validate_called_endpoint(responses.calls, ImportSbomClient.IMPORT_SBOM_REQUEST_PATH) + finally: + Path(temp_file_path).unlink(missing_ok=True) + + @responses.activate + def test_sbom_command_multiple_owners( + self, + mocker: 'MockerFixture', + api_token_response: responses.Response, + import_sbom_client: ImportSbomClient, + ) -> None: + username1 = 'user1' + username2 = 'user2' + + responses.add(api_token_response) + mocked_import_sbom.mock_import_sbom_responses(responses, import_sbom_client) + mocked_import_sbom.mock_member_details_response( + responses, import_sbom_client, f'{username1}@example.com', 'user-123' + ) + mocked_import_sbom.mock_member_details_response( + responses, import_sbom_client, f'{username2}@example.com', 'user-456' + ) + + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as temp_file: + temp_file.write('{"sbom": "content"}') + temp_file_path = temp_file.name + + try: + args = [ + 'import', + 'sbom', + '--name', + 'test-sbom', + '--vendor', + 'test-vendor', + '--owner', + f'{username1}@example.com', + '--owner', + f'{username2}@example.com', + temp_file_path, + ] + result = CliRunner().invoke(app, args, env=CLI_ENV_VARS) + + assert result.exit_code == 0 + _validate_called_endpoint(responses.calls, ImportSbomClient.IMPORT_SBOM_REQUEST_PATH) + _validate_called_endpoint(responses.calls, ImportSbomClient.GET_USER_ID_REQUEST_PATH, 2) + finally: + Path(temp_file_path).unlink(missing_ok=True) diff --git a/tests/cyclient/mocked_responses/import_sbom_client.py b/tests/cyclient/mocked_responses/import_sbom_client.py new file mode 100644 index 00000000..04398ace --- /dev/null +++ b/tests/cyclient/mocked_responses/import_sbom_client.py @@ -0,0 +1,67 @@ +from typing import Optional + +import responses +from responses import matchers + +from cycode.cyclient.import_sbom_client import ImportSbomClient + + +def get_import_sbom_url(import_sbom_client: ImportSbomClient) -> str: + api_url = import_sbom_client.client.api_url + service_url = ImportSbomClient.IMPORT_SBOM_REQUEST_PATH + return f'{api_url}/{service_url}' + + +def get_import_sbom_response(url: str, status: int = 201) -> responses.Response: + json_response = {'message': 'SBOM imported successfully'} + return responses.Response(method=responses.POST, url=url, json=json_response, status=status) + + +def get_member_details_url(import_sbom_client: ImportSbomClient) -> str: + api_url = import_sbom_client.client.api_url + service_url = ImportSbomClient.GET_USER_ID_REQUEST_PATH + return f'{api_url}/{service_url}' + + +def get_member_details_response( + url: str, email: str, external_id: Optional[str] = None, status: int = 200 +) -> responses.Response: + items = [] + if external_id: + items = [{'external_id': external_id, 'email': email}] + + json_response = { + 'items': items, + 'page_size': 10, + 'next_page_token': None, + } + + return responses.Response( + method=responses.GET, + url=url, + json=json_response, + status=status, + match=[matchers.query_param_matcher({'email': email})], + ) + + +def mock_import_sbom_responses( + responses_module: responses, + import_sbom_client: ImportSbomClient, + status: int = 201, +) -> None: + """Mock the basic SBOM import endpoint""" + responses_module.add(get_import_sbom_response(get_import_sbom_url(import_sbom_client), status)) + + +def mock_member_details_response( + responses_module: responses, + import_sbom_client: ImportSbomClient, + email: str, + external_id: Optional[str] = None, + status: int = 200, +) -> None: + """Mock the member details lookup endpoint""" + responses_module.add( + get_member_details_response(get_member_details_url(import_sbom_client), email, external_id, status) + )