diff --git a/.gitignore b/.gitignore index 7548cc20f1..5dee871277 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ .env .venv venv +cli/.mcp.env **/node_modules/ .vscode *.sqlite3 @@ -19,4 +20,4 @@ charts/custom-values.yaml **/charts/*/charts charts/ciso-assistant-next/custom.yaml .history/ -__pycache__ \ No newline at end of file +__pycache__ diff --git a/cli/.mcp.env b/cli/.mcp.env.example similarity index 100% rename from cli/.mcp.env rename to cli/.mcp.env.example diff --git a/cli/ca_mcp/client.py b/cli/ca_mcp/client.py index dbbe919161..9e4929cafe 100644 --- a/cli/ca_mcp/client.py +++ b/cli/ca_mcp/client.py @@ -84,6 +84,25 @@ def make_patch_request(endpoint, payload): ) +def make_delete_request(endpoint): + """ + Make a DELETE request to the API + + Args: + endpoint: API endpoint (e.g., "/task-templates/{id}/") + + Returns: + Response object + """ + url = f"{API_URL}{endpoint}" + return requests.delete( + url, + headers=get_headers(), + verify=VERIFY_CERTIFICATE, + timeout=HTTP_TIMEOUT, + ) + + def handle_response(res, error_message="Error"): """ Handle API response and check for errors diff --git a/cli/ca_mcp/resolvers.py b/cli/ca_mcp/resolvers.py index 5279b00b1f..896f1658f5 100644 --- a/cli/ca_mcp/resolvers.py +++ b/cli/ca_mcp/resolvers.py @@ -321,3 +321,33 @@ def resolve_library_id(library_urn_or_id: str) -> str: ) return str(libraries[0]["id"]) + + +def resolve_task_template_id(task_name_or_id: str) -> str: + """Helper function to resolve task template name to UUID + If already a UUID, returns it. If a name, looks it up via API. + """ + # Check if it's already a UUID + if "-" in task_name_or_id and len(task_name_or_id) == 36: + return task_name_or_id + + # Otherwise, look up by name + res = make_get_request("/task-templates/", params={"name": task_name_or_id}) + + if res.status_code != 200: + raise ValueError( + f"Task template '{task_name_or_id}' API error {res.status_code}" + ) + + data = res.json() + tasks = get_paginated_results(data) + + if not tasks: + raise ValueError(f"Task template '{task_name_or_id}' not found") + + if len(tasks) > 1: + raise ValueError( + f"Ambiguous task template name '{task_name_or_id}', found {len(tasks)}" + ) + + return tasks[0]["id"] diff --git a/cli/ca_mcp/server.py b/cli/ca_mcp/server.py index 1fd93ec73d..c4a850b900 100644 --- a/cli/ca_mcp/server.py +++ b/cli/ca_mcp/server.py @@ -25,6 +25,8 @@ get_quantitative_risk_studies, get_quantitative_risk_scenarios, get_quantitative_risk_hypotheses, + get_task_templates, + get_task_template_details, ) from .tools.analysis_tools import ( @@ -52,6 +54,7 @@ create_quantitative_risk_scenario, create_quantitative_risk_hypothesis, refresh_quantitative_risk_study_simulations, + create_task_template, ) from .tools.update_tools import ( @@ -62,6 +65,8 @@ update_quantitative_risk_study, update_quantitative_risk_scenario, update_quantitative_risk_hypothesis, + update_task_template, + delete_task_template, ) # Register all tools with MCP decorators @@ -83,6 +88,8 @@ mcp.tool()(get_quantitative_risk_studies) mcp.tool()(get_quantitative_risk_scenarios) mcp.tool()(get_quantitative_risk_hypotheses) +mcp.tool()(get_task_templates) +mcp.tool()(get_task_template_details) mcp.tool()(get_all_audits_with_metrics) mcp.tool()(get_audit_gap_analysis) @@ -104,6 +111,7 @@ mcp.tool()(create_quantitative_risk_scenario) mcp.tool()(create_quantitative_risk_hypothesis) mcp.tool()(refresh_quantitative_risk_study_simulations) +mcp.tool()(create_task_template) mcp.tool()(update_asset) mcp.tool()(update_risk_scenario) @@ -112,6 +120,8 @@ mcp.tool()(update_quantitative_risk_study) mcp.tool()(update_quantitative_risk_scenario) mcp.tool()(update_quantitative_risk_hypothesis) +mcp.tool()(update_task_template) +mcp.tool()(delete_task_template) def run_server(): diff --git a/cli/ca_mcp/tools/read_tools.py b/cli/ca_mcp/tools/read_tools.py index 83072e9b3d..607aeab11d 100644 --- a/cli/ca_mcp/tools/read_tools.py +++ b/cli/ca_mcp/tools/read_tools.py @@ -108,17 +108,24 @@ async def get_applied_controls(folder: str = None): if filters: result += f" ({', '.join(f'{k}={v}' for k, v in filters.items())})" result += "\n\n" - result += "|Ref|Name|Status|ETA|Domain|\n" - result += "|---|---|---|---|---|\n" + result += "|UUID|Ref|Name|Status|ETA|Domain|Category|CSF Function|Effort|Impact|Priority|Cost|\n" + result += "|---|---|---|---|---|---|---|---|---|---|---|---|\n" for item in controls: + uuid = item.get("id") ref_id = item.get("ref_id") or "N/A" name = item.get("name", "N/A") status = item.get("status", "N/A") eta = item.get("eta") or "N/A" domain = (item.get("folder") or {}).get("str", "N/A") + category = item.get("category", "N/A") + csf_function = item.get("csf_function", "N/A") + effort = item.get("effort", "N/A") + impact = item.get("control_impact", "N/A") + priority = item.get("priority", "N/A") + cost = item.get("cost", 0) - result += f"|{ref_id}|{name}|{status}|{eta}|{domain}|\n" + result += f"|{uuid}|{ref_id}|{name}|{status}|{eta}|{domain}|{category}|{csf_function}|{effort}|{impact}|{priority}|{cost}|\n" return success_response( result, @@ -896,20 +903,21 @@ async def get_requirement_assessments( return "No requirement assessments found" result = f"Found {len(req_assessments)} requirement assessments\n\n" - result += "|ID|Ref|Requirement|Assessment|Status|Result|\n" - result += "|---|---|---|---|---|---|\n" + result += "|ID|Ref|Description|Requirement|Assessment|Status|Result|\n" + result += "|---|---|---|---|---|---|---|\n" for req in req_assessments: req_id = req.get("id", "N/A") req_ref_id = req.get("ref_id", "N/A") requirement = req.get("name", "N/A")[:30] # Truncate + description = req.get("description", "N/A") comp_assessment = (req.get("compliance_assessment") or {}).get( "name", "N/A" )[:20] status = req.get("status", "N/A") result_val = req.get("result", "N/A") - result += f"|{req_id}|{req_ref_id}|{requirement}|{comp_assessment}|{status}|{result_val}|\n" + result += f"|{req_id}|{req_ref_id}|{description}|{requirement}|{comp_assessment}|{status}|{result_val}|\n" return result except Exception as e: @@ -1070,3 +1078,99 @@ async def get_quantitative_risk_hypotheses(scenario_id_or_name: str = None): return result except Exception as e: return f"Error in get_quantitative_risk_hypotheses: {str(e)}" + + +async def get_task_templates(limit: int = None, offset: int = None, ordering: str = None, search: str = None): + """List task templates with IDs, names, and details + + Args: + limit: Number of results to return per page + offset: The initial index from which to return the results + ordering: Which field to use when ordering the results + search: A search term + """ + try: + params = {} + + if limit is not None: + params["limit"] = limit + if offset is not None: + params["offset"] = offset + if ordering: + params["ordering"] = ordering + if search: + params["search"] = search + + res = make_get_request("/task-templates/", params=params) + + if res.status_code != 200: + return f"Error: HTTP {res.status_code} - {res.text}" + + data = res.json() + tasks = get_paginated_results(data) + + if not tasks: + return "No task found" + + result = f"Found {len(tasks)} task templates\n\n" + result += "|ID|Name|Description|Ref ID|Status|Recurrent|Enabled|Task Date|\n" + result += "|---|---|---|---|---|---|---|---|\n" + + for task in tasks: + task_id = task.get("id", "N/A") + name = task.get("name", "N/A") + description = (task.get("description", "N/A") or "N/A")[:40] # Truncate + ref_id = task.get("ref_id", "N/A") + status = task.get("status", "N/A") + is_recurrent = "Yes" if task.get("is_recurrent") else "No" + enabled = "Yes" if task.get("enabled") else "No" + task_date = task.get("task_date", "N/A") + + result += f"|{task_id}|{name}|{description}|{ref_id}|{status}|{is_recurrent}|{enabled}|{task_date}|\n" + + return result + except Exception as e: + return f"Error in get_task_templates: {str(e)}" + + +async def get_task_template_details(task_id: str): + """Get detailed information for a specific task template + + Args: + task_id: Task template ID + """ + try: + res = make_get_request(f"/task-templates/{task_id}/") + + if res.status_code != 200: + return f"Error: HTTP {res.status_code} - {res.text}" + + task = res.json() + + # Create result + result = f"|ID|Name|Description|Ref ID|Status|Task Date|Recurrent|Enabled|Published|Link|Folder|Path|Observation|Evidences|Created|Updated|Assets|Applied Controls|Compliance Assessment|Risk Assessment|Assign To|\n" + result += f"|{task.get('id', 'N/A')}|{task.get('name', 'N/A')}" + result += f"|{task.get('description', 'N/A')}" + result += f"|{task.get('ref_id', 'N/A')}" + result += f"|{task.get('status', 'N/A')}" + result += f"|{task.get('task_date', 'N/A')}" + result += f"|{'Yes' if task.get('is_recurrent') else 'No'}" + result += f"|{'Yes' if task.get('enabled') else 'No'}" + result += f"|{'Yes' if task.get('is_published') else 'No'}" + result += f"|{task.get('link', 'N/A')}" + result += f"|{task.get('folder', 'N/A')}" + result += f"|{task.get('path', 'N/A')}" + result += f"|{task.get('observation', 'N/A')}" + result += f"|{task.get('evidences', 'N/A')}" + result += f"|{task.get('created_at', 'N/A')}" + result += f"|{task.get('updated_at', 'N/A')}" + result += f"|{task.get('assets', [])}" + result += f"|{task.get('applied_controls', [])}" + result += f"|{task.get('compliance_assessments', [])}" + result += f"|{task.get('risk_assessments', [])}" + result += f"|{task.get('assigned_to', [])}" + result += "|\n" + + return result + except Exception as e: + return f"Error in get_task_template_details: {str(e)}" diff --git a/cli/ca_mcp/tools/update_tools.py b/cli/ca_mcp/tools/update_tools.py index e2977d317d..fe986ac34c 100644 --- a/cli/ca_mcp/tools/update_tools.py +++ b/cli/ca_mcp/tools/update_tools.py @@ -1,6 +1,6 @@ """Update MCP tools for CISO Assistant""" -from ..client import make_get_request, make_patch_request, get_paginated_results +from ..client import make_get_request, make_patch_request, make_delete_request, get_paginated_results from ..resolvers import ( resolve_asset_id, resolve_risk_scenario_id, @@ -8,6 +8,7 @@ resolve_folder_id, resolve_applied_control_id, resolve_requirement_assessment_id, + resolve_task_template_id, ) @@ -219,6 +220,7 @@ async def update_applied_control( category: str = None, csf_function: str = None, effort: str = None, + cost: dict = None, control_impact: int = None, eta: str = None, start_date: str = None, @@ -237,6 +239,15 @@ async def update_applied_control( category: policy | process | technical | physical csf_function: identify | protect | detect | respond | recover | govern effort: XS | S | M | L | XL + cost: is a JSON object composed by follwing keys : + currency: € (euros), + amortization_period (typically 1 year), + build: One-time implementation costs + fixed_cost: monetary amount, + people_days: person-days effort + run: Annual operational costs + fixed_cost: monetary amount + people_days: person-days effort control_impact: 1-5 (1=Very Low, 5=Very High) eta: ETA date YYYY-MM-DD start_date: Start date YYYY-MM-DD @@ -265,6 +276,8 @@ async def update_applied_control( payload["csf_function"] = csf_function if effort is not None: payload["effort"] = effort + if cost is not None: + payload["cost"] = cost if control_impact is not None: payload["control_impact"] = control_impact if eta is not None: @@ -302,6 +315,7 @@ async def update_requirement_assessment( eta: str = None, due_date: str = None, selected: bool = None, + applied_controls: list = None, ) -> str: """Update requirement assessment in audit. Use get_requirement_assessments() to find IDs @@ -315,6 +329,7 @@ async def update_requirement_assessment( eta: ETA date YYYY-MM-DD due_date: Due date YYYY-MM-DD selected: Applicability flag + applied_controls: List of applied control IDs/names to associate with this requirement assessment. Can be None to leave unchanged, or empty list to clear associations. Elements should be strings representing control identifiers. """ try: # Validate UUID @@ -339,6 +354,12 @@ async def update_requirement_assessment( payload["due_date"] = due_date if selected is not None: payload["selected"] = selected + if applied_controls is not None: + resolved_controls = [] + for control in applied_controls: + resolved_control_id = resolve_applied_control_id(control) + resolved_controls.append(resolved_control_id) + payload["applied_controls"] = resolved_controls if not payload: return "Error: No fields provided to update" @@ -703,3 +724,139 @@ async def update_quantitative_risk_hypothesis( return f"Error updating quantitative risk hypothesis: {res.status_code} - {res.text}" except Exception as e: return f"Error in update_quantitative_risk_hypothesis: {str(e)}" + + +async def update_task_template( + task_id: str, + name: str = None, + description: str = None, + status: str = None, + observation: str = None, + evidences: list = None, + is_published: bool = None, + task_date: str = None, + is_recurrent: bool = None, + ref_id: str = None, + schedule: str = None, + enabled: bool = None, + link: str = None, + folder_id: str = None, + assigned_to: list = None, + assets: list = None, + applied_controls: list = None, + compliance_assessments: list = None, + risk_assessments: list = None, + findings_assessment: list = None, +) -> str: + """Update task template properties + + Args: + task_id: Task template ID/name (required) + name: Task template name + description: Description + status: Status + observation: Observation text + evidences: Array of evidence UUIDs + is_published: Published flag + task_date: Task date (YYYY-MM-DD) + is_recurrent: Recurrent flag + ref_id: Reference ID + schedule: Schedule definition + enabled: Enabled flag + link: Link to evidence (e.g. Jira ticket) + folder_id: Folder ID/name + assigned_to: Array of user UUIDs + assets: Array of asset UUIDs + applied_controls: List of applied control IDs/names to associate with this task template. Can be None to leave unchanged, or empty list to clear associations. Elements should be strings representing control identifiers. + compliance_assessments: Array of compliance assessment UUIDs + risk_assessments: Array of risk assessment UUIDs + findings_assessment: Array of finding assessment UUIDs + """ + try: + # Build update payload with only provided fields + payload = {} + + if name is not None: + payload["name"] = name + if description is not None: + payload["description"] = description + if status is not None: + valid_statuses = ["pending", "in_progress", "cancelled", "completed"] + if status not in valid_statuses: + return f"Error: Invalid status '{status}'. Must be one of: {', '.join(valid_statuses)}" + payload["status"] = status + if observation is not None: + payload["observation"] = observation + if evidences is not None: + payload["evidences"] = evidences + if is_published is not None: + payload["is_published"] = is_published + if task_date is not None: + payload["task_date"] = task_date + if is_recurrent is not None: + payload["is_recurrent"] = is_recurrent + if ref_id is not None: + payload["ref_id"] = ref_id + if schedule is not None: + payload["schedule"] = schedule + if enabled is not None: + payload["enabled"] = enabled + if link is not None: + payload["link"] = link + if assigned_to is not None: + payload["assigned_to"] = assigned_to + if assets is not None: + payload["assets"] = assets + if applied_controls is not None: + resolved_controls = [] + for control in applied_controls: + resolved_control_id = resolve_applied_control_id(control) + resolved_controls.append(resolved_control_id) + payload["applied_controls"] = resolved_controls + if compliance_assessments is not None: + payload["compliance_assessments"] = compliance_assessments + if risk_assessments is not None: + payload["risk_assessments"] = risk_assessments + if findings_assessment is not None: + payload["findings_assessment"] = findings_assessment + + # Resolve folder name to ID if provided + if folder_id is not None: + resolved_folder_id = resolve_folder_id(folder_id) + payload["folder"] = resolved_folder_id + + if not payload: + return "Error: No fields provided to update" + + # Resolve task name to ID if needed + resolved_task_id = resolve_task_template_id(task_id) + + res = make_patch_request(f"/task-templates/{resolved_task_id}/", payload) + + if res.status_code == 200: + task = res.json() + return f"Updated task template: {task.get('name')} (ID: {task.get('id')})" + else: + return f"Error updating task template: {res.status_code} - {res.text}" + except Exception as e: + return f"Error in update_task_template: {str(e)}" + + +async def delete_task_template(task_id: str) -> str: + """Delete task template + + Args: + task_id: Task template ID/name + """ + try: + # Resolve task name to ID if needed + resolved_task_id = resolve_task_template_id(task_id) + + res = make_delete_request(f"/task-templates/{resolved_task_id}/") + + if res.status_code == 204: + return f"Deleted task template (ID: {resolved_task_id})" + else: + return f"Error deleting task template: {res.status_code} - {res.text}" + except Exception as e: + return f"Error in delete_task_template: {str(e)}" diff --git a/cli/ca_mcp/tools/write_tools.py b/cli/ca_mcp/tools/write_tools.py index ba3fe71a7a..528dbe27c4 100644 --- a/cli/ca_mcp/tools/write_tools.py +++ b/cli/ca_mcp/tools/write_tools.py @@ -7,6 +7,7 @@ resolve_risk_matrix_id, resolve_framework_id, resolve_risk_assessment_id, + resolve_applied_control_id, ) from ..config import GLOBAL_FOLDER_ID from ..utils.response_formatter import ( @@ -901,3 +902,108 @@ async def refresh_quantitative_risk_study_simulations(study_id: str) -> str: return f"Error refreshing simulations: {res.status_code} - {res.text}" except Exception as e: return f"Error in refresh_quantitative_risk_study_simulations: {str(e)}" + + +async def create_task_template( + name: str, + folder_id: str, + description: str = None, + status: str = None, + observation: str = None, + evidences: list = None, + is_published: bool = False, + task_date: str = None, + is_recurrent: bool = False, + ref_id: str = None, + schedule: str = None, + enabled: bool = True, + link: str = None, + assigned_to: list = None, + assets: list = None, + applied_controls: list = None, + compliance_assessments: list = None, + risk_assessments: list = None, + findings_assessment: list = None, +) -> str: + """Create task template + + Args: + name: Task template name (required) + folder_id: Folder ID/name (required) + description: Description + status: Status + observation: Observation text + evidences: Array of evidence UUIDs + is_published: Published flag + task_date: Task date (YYYY-MM-DD) + is_recurrent: Recurrent flag + ref_id: Reference ID + schedule: Schedule definition + enabled: Enabled flag + link: Link to evidence (e.g. Jira ticket) + assigned_to: Array of user UUIDs + assets: Array of asset UUIDs + applied_controls: Array of applied control UUIDs + compliance_assessments: Array of compliance assessment UUIDs + risk_assessments: Array of risk assessment UUIDs + findings_assessment: Array of finding assessment UUIDs + """ + try: + # Resolve folder name to ID if needed + folder_id = resolve_folder_id(folder_id) + + payload = { + "name": name, + "folder": folder_id, + "is_published": is_published, + "is_recurrent": is_recurrent, + "enabled": enabled, + } + + # Add optional fields if provided + if description is not None: + payload["description"] = description + if status is not None: + valid_statuses = ["pending", "in_progress", "cancelled", "completed"] + if status not in valid_statuses: + return f"Error: Invalid status '{status}'. Must be one of: {', '.join(valid_statuses)}" + payload["status"] = status + if observation is not None: + payload["observation"] = observation + if evidences is not None: + payload["evidences"] = evidences + if task_date is not None: + payload["task_date"] = task_date + if ref_id is not None: + payload["ref_id"] = ref_id + if schedule is not None: + payload["schedule"] = schedule + if link is not None: + payload["link"] = link + if assigned_to is not None: + payload["assigned_to"] = assigned_to + if assets is not None: + payload["assets"] = assets + if applied_controls is not None: + resolved_controls = [] + for control in applied_controls: + resolved_control_id = resolve_applied_control_id(control) + resolved_controls.append(resolved_control_id) + payload["applied_controls"] = resolved_controls + if compliance_assessments is not None: + payload["compliance_assessments"] = compliance_assessments + if risk_assessments is not None: + payload["risk_assessments"] = risk_assessments + if findings_assessment is not None: + payload["findings_assessment"] = findings_assessment + + res = make_post_request("/task-templates/", payload) + + if res.status_code == 201: + task = res.json() + return f"Created task template: {task.get('name')} (ID: {task.get('id')})" + else: + return f"Error creating task template: {res.status_code} - {res.text}" + except Exception as e: + return f"Error in create_task_template: {str(e)}" + diff --git a/cli/mcp.md b/cli/mcp.md index 5fa6fa1455..2a54bf90f2 100644 --- a/cli/mcp.md +++ b/cli/mcp.md @@ -12,8 +12,9 @@ Note: MCP technology is still maturing so the instructions might vary. ### instructions 1. Login to CISO Assistant and generate a PAT: click on the three dots next to the email -> my profile -> settings -2. Under the `cli` folder, update the `.mcp.env` with your token -3. Update the settings of mcpServers of your Claude Desktop app. The path will vary depending on your OS. On MacOS it's under `~/Library/Application\ Support/Claude/claude_desktop_config.json`. Make sure to put the **full absolute paths** for `uv` binary and the `cli` folder of your cloned repo +2. Under the `cli` folder, copy the `.mcp.env.example` as `.mcp.env` +3. Under the `cli` folder, update the copied `.mcp.env` with your credentials +4. Update the settings of mcpServers of your Claude Desktop app. The path will vary depending on your OS. On MacOS it's under `~/Library/Application\ Support/Claude/claude_desktop_config.json`. Make sure to put the **full absolute paths** for `uv` binary and the `cli` folder of your cloned repo Here is a sample: