Skip to content
19 changes: 19 additions & 0 deletions cli/ca_mcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,25 @@ def make_patch_request(endpoint, payload):
)


def make_delete_request(endpoint):
"""
Make a DELETE request to the API
Args:
endpoint: API endpoint (e.g., "/task-templates/{id}/")
Returns:
Response object
"""
url = f"{API_URL}{endpoint}"
return requests.delete(
url,
headers=get_headers(),
verify=VERIFY_CERTIFICATE,
timeout=HTTP_TIMEOUT,
)


def handle_response(res, error_message="Error"):
"""
Handle API response and check for errors
Expand Down
30 changes: 30 additions & 0 deletions cli/ca_mcp/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,3 +321,33 @@ def resolve_library_id(library_urn_or_id: str) -> str:
)

return str(libraries[0]["id"])


def resolve_task_template_id(task_name_or_id: str) -> str:
"""Helper function to resolve task template name to UUID
If already a UUID, returns it. If a name, looks it up via API.
"""
# Check if it's already a UUID
if "-" in task_name_or_id and len(task_name_or_id) == 36:
return task_name_or_id

# Otherwise, look up by name
res = make_get_request("/task-templates/", params={"name": task_name_or_id})

if res.status_code != 200:
raise ValueError(
f"Task template '{task_name_or_id}' API error {res.status_code}"
)

data = res.json()
tasks = get_paginated_results(data)

if not tasks:
raise ValueError(f"Task template '{task_name_or_id}' not found")

if len(tasks) > 1:
raise ValueError(
f"Ambiguous task template name '{task_name_or_id}', found {len(tasks)}"
)

return tasks[0]["id"]
10 changes: 10 additions & 0 deletions cli/ca_mcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
get_quantitative_risk_studies,
get_quantitative_risk_scenarios,
get_quantitative_risk_hypotheses,
get_task_templates,
get_task_template_details,
)

from .tools.analysis_tools import (
Expand Down Expand Up @@ -52,6 +54,7 @@
create_quantitative_risk_scenario,
create_quantitative_risk_hypothesis,
refresh_quantitative_risk_study_simulations,
create_task_template,
)

from .tools.update_tools import (
Expand All @@ -62,6 +65,8 @@
update_quantitative_risk_study,
update_quantitative_risk_scenario,
update_quantitative_risk_hypothesis,
update_task_template,
delete_task_template,
)

# Register all tools with MCP decorators
Expand All @@ -83,6 +88,8 @@
mcp.tool()(get_quantitative_risk_studies)
mcp.tool()(get_quantitative_risk_scenarios)
mcp.tool()(get_quantitative_risk_hypotheses)
mcp.tool()(get_task_templates)
mcp.tool()(get_task_template_details)

mcp.tool()(get_all_audits_with_metrics)
mcp.tool()(get_audit_gap_analysis)
Expand All @@ -104,6 +111,7 @@
mcp.tool()(create_quantitative_risk_scenario)
mcp.tool()(create_quantitative_risk_hypothesis)
mcp.tool()(refresh_quantitative_risk_study_simulations)
mcp.tool()(create_task_template)

mcp.tool()(update_asset)
mcp.tool()(update_risk_scenario)
Expand All @@ -112,6 +120,8 @@
mcp.tool()(update_quantitative_risk_study)
mcp.tool()(update_quantitative_risk_scenario)
mcp.tool()(update_quantitative_risk_hypothesis)
mcp.tool()(update_task_template)
mcp.tool()(delete_task_template)


def run_server():
Expand Down
116 changes: 110 additions & 6 deletions cli/ca_mcp/tools/read_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,24 @@ async def get_applied_controls(folder: str = None):
if filters:
result += f" ({', '.join(f'{k}={v}' for k, v in filters.items())})"
result += "\n\n"
result += "|Ref|Name|Status|ETA|Domain|\n"
result += "|---|---|---|---|---|\n"
result += "|UUID|Ref|Name|Status|ETA|Domain|Category|CSF Function|Effort|Impact|Priority|Cost|\n"
result += "|---|---|---|---|---|---|---|---|---|---|---|---|\n"
Comment on lines +111 to +112
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Improve get_applied_controls UUID and cost display defaults.

The new columns are useful, but two small details could improve correctness/readability:

  • uuid = item.get("id") will render None if id is ever missing; other tables typically default to "N/A".
  • cost = item.get("cost", 0) will show 0 when the cost field is absent, which can be misleading given that cost is modeled as a complex JSON object elsewhere.

Consider:

-        for item in controls:
-            uuid = item.get("id")
+        for item in controls:
+            uuid = item.get("id", "N/A")
             ...
-            priority =  item.get("priority", "N/A")
-            cost = item.get("cost", 0)
+            priority = item.get("priority", "N/A")
+            cost = item.get("cost", "N/A")

This keeps the table consistent with other read tools and avoids treating missing structured cost as a numeric zero.

Also applies to: 115-128


for item in controls:
uuid = item.get("id")
ref_id = item.get("ref_id") or "N/A"
name = item.get("name", "N/A")
status = item.get("status", "N/A")
eta = item.get("eta") or "N/A"
domain = (item.get("folder") or {}).get("str", "N/A")
category = item.get("category", "N/A")
csf_function = item.get("csf_function", "N/A")
effort = item.get("effort", "N/A")
impact = item.get("control_impact", "N/A")
priority = item.get("priority", "N/A")
cost = item.get("cost", 0)

result += f"|{ref_id}|{name}|{status}|{eta}|{domain}|\n"
result += f"|{uuid}|{ref_id}|{name}|{status}|{eta}|{domain}|{category}|{csf_function}|{effort}|{impact}|{priority}|{cost}|\n"

return success_response(
result,
Expand Down Expand Up @@ -896,20 +903,21 @@ async def get_requirement_assessments(
return "No requirement assessments found"

result = f"Found {len(req_assessments)} requirement assessments\n\n"
result += "|ID|Ref|Requirement|Assessment|Status|Result|\n"
result += "|---|---|---|---|---|---|\n"
result += "|ID|Ref|Description|Requirement|Assessment|Status|Result|\n"
result += "|---|---|---|---|---|---|---|\n"

for req in req_assessments:
req_id = req.get("id", "N/A")
req_ref_id = req.get("ref_id", "N/A")
requirement = req.get("name", "N/A")[:30] # Truncate
description = req.get("description", "N/A")
comp_assessment = (req.get("compliance_assessment") or {}).get(
"name", "N/A"
)[:20]
status = req.get("status", "N/A")
result_val = req.get("result", "N/A")

result += f"|{req_id}|{req_ref_id}|{requirement}|{comp_assessment}|{status}|{result_val}|\n"
result += f"|{req_id}|{req_ref_id}|{description}|{requirement}|{comp_assessment}|{status}|{result_val}|\n"

return result
except Exception as e:
Expand Down Expand Up @@ -1070,3 +1078,99 @@ async def get_quantitative_risk_hypotheses(scenario_id_or_name: str = None):
return result
except Exception as e:
return f"Error in get_quantitative_risk_hypotheses: {str(e)}"


async def get_task_templates(limit: int = None, offset: int = None, ordering: str = None, search: str = None):
"""List task templates with IDs, names, and details
Args:
limit: Number of results to return per page
offset: The initial index from which to return the results
ordering: Which field to use when ordering the results
search: A search term
"""
try:
params = {}

if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
if ordering:
params["ordering"] = ordering
if search:
params["search"] = search

res = make_get_request("/task-templates/", params=params)

if res.status_code != 200:
return f"Error: HTTP {res.status_code} - {res.text}"

data = res.json()
tasks = get_paginated_results(data)

if not tasks:
return "No task tasks found"

result = f"Found {len(tasks)} task templates\n\n"
result += "|ID|Name|Description|Ref ID|Status|Recurrent|Enabled|Task Date|\n"
result += "|---|---|---|---|---|---|---|---|\n"

for task in tasks:
task_id = task.get("id", "N/A")
name = task.get("name", "N/A")
description = (task.get("description", "N/A") or "N/A")[:40] # Truncate
ref_id = task.get("ref_id", "N/A")
status = task.get("status", "N/A")
is_recurrent = "Yes" if task.get("is_recurrent") else "No"
enabled = "Yes" if task.get("enabled") else "No"
task_date = task.get("task_date", "N/A")

result += f"|{task_id}|{name}|{description}|{ref_id}|{status}|{is_recurrent}|{enabled}|{task_date}|\n"

return result
except Exception as e:
return f"Error in get_task_templates: {str(e)}"
Comment on lines 1083 to 1133
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix typo in empty‑result message for get_task_templates

The function works correctly, but the empty‑result message has a small typo: "No task tasks found".

Recommend updating it to "No task templates found":

-        if not tasks:
-            return "No task tasks found"
+        if not tasks:
+            return "No task templates found"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def get_task_templates(limit: int = None, offset: int = None, ordering: str = None, search: str = None):
"""List task templates with IDs, names, and details
Args:
limit: Number of results to return per page
offset: The initial index from which to return the results
ordering: Which field to use when ordering the results
search: A search term
"""
try:
params = {}
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
if ordering:
params["ordering"] = ordering
if search:
params["search"] = search
res = make_get_request("/task-templates/", params=params)
if res.status_code != 200:
return f"Error: HTTP {res.status_code} - {res.text}"
data = res.json()
tasks = get_paginated_results(data)
if not tasks:
return "No task tasks found"
result = f"Found {len(tasks)} task templates\n\n"
result += "|ID|Name|Description|Ref ID|Status|Recurrent|Enabled|Task Date|\n"
result += "|---|---|---|---|---|---|---|---|\n"
for task in tasks:
task_id = task.get("id", "N/A")
name = task.get("name", "N/A")
description = (task.get("description", "N/A") or "N/A")[:40] # Truncate
ref_id = task.get("ref_id", "N/A")
status = task.get("status", "N/A")
is_recurrent = "Yes" if task.get("is_recurrent") else "No"
enabled = "Yes" if task.get("enabled") else "No"
task_date = task.get("task_date", "N/A")
result += f"|{task_id}|{name}|{description}|{ref_id}|{status}|{is_recurrent}|{enabled}|{task_date}|\n"
return result
except Exception as e:
return f"Error in get_task_templates: {str(e)}"
async def get_task_templates(limit: int = None, offset: int = None, ordering: str = None, search: str = None):
"""List task templates with IDs, names, and details
Args:
limit: Number of results to return per page
offset: The initial index from which to return the results
ordering: Which field to use when ordering the results
search: A search term
"""
try:
params = {}
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset
if ordering:
params["ordering"] = ordering
if search:
params["search"] = search
res = make_get_request("/task-templates/", params=params)
if res.status_code != 200:
return f"Error: HTTP {res.status_code} - {res.text}"
data = res.json()
tasks = get_paginated_results(data)
if not tasks:
return "No task templates found"
result = f"Found {len(tasks)} task templates\n\n"
result += "|ID|Name|Description|Ref ID|Status|Recurrent|Enabled|Task Date|\n"
result += "|---|---|---|---|---|---|---|---|\n"
for task in tasks:
task_id = task.get("id", "N/A")
name = task.get("name", "N/A")
description = (task.get("description", "N/A") or "N/A")[:40] # Truncate
ref_id = task.get("ref_id", "N/A")
status = task.get("status", "N/A")
is_recurrent = "Yes" if task.get("is_recurrent") else "No"
enabled = "Yes" if task.get("enabled") else "No"
task_date = task.get("task_date", "N/A")
result += f"|{task_id}|{name}|{description}|{ref_id}|{status}|{is_recurrent}|{enabled}|{task_date}|\n"
return result
except Exception as e:
return f"Error in get_task_templates: {str(e)}"
🤖 Prompt for AI Agents
In cli/ca_mcp/tools/read_tools.py around lines 1083 to 1133, the empty-result
message incorrectly reads "No task tasks found"; update that string to "No task
templates found" so the output is accurate; make the single-line change where
the function returns the empty message.



async def get_task_template_details(task_id: str):
"""Get detailed information for a specific task template
Args:
task_id: Task template ID
"""
try:
res = make_get_request(f"/task-templates/{task_id}/")

if res.status_code != 200:
return f"Error: HTTP {res.status_code} - {res.text}"

task = res.json()

# Create result
result = f"|ID|Name|Description|Ref ID|Status|Task Date|Recurrent|Enabled|Published|Link|Folder|Path|Observation|Evidences|Created|Updated|Assets|Applied Controls|Compliance Assessment|Risk Assessment|Assign To|\n"
result += f"|{task.get('id', 'N/A')}|{task.get('name', 'N/A')}"
result += f"|{task.get('description', 'N/A')}"
result += f"|{task.get('ref_id', 'N/A')}"
result += f"|{task.get('status', 'N/A')}"
result += f"|{task.get('task_date', 'N/A')}"
result += f"|{'Yes' if task.get('is_recurrent') else 'No'}"
result += f"|{'Yes' if task.get('enabled') else 'No'}"
result += f"|{'Yes' if task.get('is_published') else 'No'}"
result += f"|{task.get('link', 'N/A')}"
result += f"|{task.get('folder', 'N/A')}"
result += f"|{task.get('path', 'N/A')}"
result += f"|{task.get('observation', 'N/A')}"
result += f"|{task.get('evidences', 'N/A')}"
result += f"|{task.get('created_at', 'N/A')}"
result += f"|{task.get('updated_at', 'N/A')}"
result += f"|{task.get('assets', [])}"
result += f"|{task.get('applied_controls', [])}"
result += f"|{task.get('compliance_assessments', [])}"
result += f"|{task.get('risk_assessments', [])}"
result += f"|{task.get('assigned_to', [])}"
result += "|\n"

return result
except Exception as e:
return f"Error in get_task_template_details: {str(e)}"
Loading