Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
7174278
chore: update FastMCP version
puchy22 Nov 24, 2025
2502f6b
feat(mcp_server): Add new Prowler App MCP Server implementation
puchy22 Nov 24, 2025
5c3f55e
refactor(mcp_server): Remove server generation logic
puchy22 Nov 24, 2025
7275f8b
chore: Update .gitignore to track server.py
puchy22 Nov 24, 2025
dfb233a
feat(mcp_server): Optimize findings tools for LLM consumption
puchy22 Nov 25, 2025
d39b9cd
fix(mcp_server): remove literal providers for supporting all new ones
puchy22 Nov 25, 2025
e626742
feat(mcp_server): enhance get_finding_details with temporal metadata …
puchy22 Nov 25, 2025
95f96c4
feat(mcp_server): enhance compliance tools using modeling
puchy22 Nov 26, 2025
f2d25e6
refactor(mcp-server): simplify tool architecture with auto-discovery …
puchy22 Nov 26, 2025
c55a559
refactor(mcp-server): make the serializers shared among classes
puchy22 Nov 26, 2025
03ca038
refactor(mcp-server): fix inconsistent use of None handling
puchy22 Nov 26, 2025
a4d7f98
chore: remove innecesary tools for this PR
puchy22 Nov 27, 2025
08054af
chore: add type hint not runtime needed import
puchy22 Nov 27, 2025
f72796a
refactor(mcp-server): improve parameter validation and documentation …
puchy22 Nov 27, 2025
ab7ee46
fix(mcp_server): put the schema as equal as docs
puchy22 Nov 27, 2025
ae2afa7
fix(mcp_server): put the schema as equal as docs
puchy22 Nov 27, 2025
a3a13e5
fix(mcp_server): ensure that right schema is being used
puchy22 Nov 27, 2025
bf86204
refactor(mcp-server): simplify response models and improve data parsi…
puchy22 Nov 27, 2025
97d88fe
chore: add default sorting when listing findings
puchy22 Nov 28, 2025
fa4ac8c
fix(mcp_server): change provider_type to accept everything
puchy22 Nov 28, 2025
eb65035
feat(mcp_server): add provider response models
puchy22 Nov 28, 2025
b168d0a
feat(mcp_server): add provider management tools
puchy22 Nov 28, 2025
ba7bb0b
feat(mcp_server): add task polling support to API client
puchy22 Nov 28, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,5 @@ _data/
# Claude
CLAUDE.md

# MCP Server
mcp_server/prowler_mcp_server/prowler_app/server.py
mcp_server/prowler_mcp_server/prowler_app/utils/schema.yaml

# Compliance report
*.pdf
30 changes: 30 additions & 0 deletions mcp_server/prowler_mcp_server/prowler_app/models/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""Pydantic models for Prowler App MCP Server."""

from prowler_mcp_server.prowler_app.models.base import MinimalSerializerMixin
from prowler_mcp_server.prowler_app.models.compliance import (
ComplianceFramework,
ComplianceFrameworksListResponse,
)
from prowler_mcp_server.prowler_app.models.findings import (
CheckMetadata,
CheckRemediation,
DetailedFinding,
FindingsListResponse,
FindingsOverview,
SimplifiedFinding,
)

__all__ = [
# Base models
"MinimalSerializerMixin",
# Compliance models
"ComplianceFramework",
"ComplianceFrameworksListResponse",
# Findings models
"CheckMetadata",
"CheckRemediation",
"DetailedFinding",
"FindingsListResponse",
"FindingsOverview",
"SimplifiedFinding",
]
59 changes: 59 additions & 0 deletions mcp_server/prowler_mcp_server/prowler_app/models/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""Base models and mixins for Prowler MCP Server models."""

from typing import Any

from pydantic import BaseModel, SerializerFunctionWrapHandler, model_serializer


class MinimalSerializerMixin(BaseModel):
"""Mixin that excludes empty values from serialization.
This mixin optimizes model serialization for LLM consumption by removing noise
and reducing token usage. It excludes:
- None values
- Empty strings
- Empty lists
- Empty dicts
"""

@model_serializer(mode="wrap")
def _serialize(self, handler: SerializerFunctionWrapHandler) -> dict[str, Any]:
"""Serialize model excluding empty values.
Args:
handler: Pydantic serializer function wrapper
Returns:
Dictionary with non-empty values only
"""
data = handler(self)
return {k: v for k, v in data.items() if not self._should_exclude(v)}

def _should_exclude(self, value: Any) -> bool:
"""Determine if a value should be excluded from serialization.
Override this method in subclasses for custom exclusion logic.
Args:
value: Field value
Returns:
True if the value should be excluded, False otherwise
"""
# None values
if value is None:
return True

# Empty strings
if value == "":
return True

# Empty lists
if isinstance(value, list) and not value:
return True

# Empty dicts
if isinstance(value, dict) and not value:
return True

return False
74 changes: 74 additions & 0 deletions mcp_server/prowler_mcp_server/prowler_app/models/compliance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Pydantic models for simplified compliance responses."""

from typing import Any

from prowler_mcp_server.prowler_app.models.base import MinimalSerializerMixin
from pydantic import BaseModel, SerializerFunctionWrapHandler, model_serializer


class ComplianceFramework(MinimalSerializerMixin, BaseModel):
"""Simplified compliance framework overview."""

id: str
compliance_id: str | None = None
framework: str | None = None
version: str | None = None
provider: str | None = None
region: str | None = None
total_requirements: int = 0
requirements_passed: int = 0
requirements_failed: int = 0
requirements_manual: int = 0

@property
def pass_percentage(self) -> float:
"""Calculate pass percentage based on passed requirements."""
if self.total_requirements == 0:
return 0.0
return round((self.requirements_passed / self.total_requirements) * 100, 2)

@model_serializer(mode="wrap")
def _serialize(self, handler: SerializerFunctionWrapHandler) -> dict[str, Any]:
"""Exclude None and empty string fields, and add calculated pass_percentage."""
# Use parent's exclusion logic
data = super()._serialize(handler)
# Add calculated pass_percentage
data["pass_percentage"] = self.pass_percentage
return data

@classmethod
def from_api_response(cls, data: dict) -> "ComplianceFramework":
"""Transform JSON:API compliance overview response to simplified format."""
attributes = data["attributes"]

return cls(
id=data["id"],
compliance_id=attributes.get("compliance_id"),
framework=attributes.get("framework"),
version=attributes.get("version"),
provider=attributes.get("provider"),
region=attributes.get("region"),
total_requirements=attributes["total_requirements"],
requirements_passed=attributes["requirements_passed"],
requirements_failed=attributes["requirements_failed"],
requirements_manual=attributes["requirements_manual"],
)


class ComplianceFrameworksListResponse(BaseModel):
"""Simplified response for compliance frameworks list queries."""

frameworks: list[ComplianceFramework]
total_count: int

@classmethod
def from_api_response(cls, response: dict) -> "ComplianceFrameworksListResponse":
"""Transform JSON:API response to simplified format."""
data = response["data"]

frameworks = [ComplianceFramework.from_api_response(item) for item in data]

return cls(
frameworks=frameworks,
total_count=len(frameworks),
)
205 changes: 205 additions & 0 deletions mcp_server/prowler_mcp_server/prowler_app/models/findings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
"""Pydantic models for simplified security findings responses."""

from typing import Literal

from prowler_mcp_server.prowler_app.models.base import MinimalSerializerMixin
from pydantic import BaseModel


class CheckRemediation(MinimalSerializerMixin, BaseModel):
"""Remediation information for a security check."""

cli: str | None = None
terraform: str | None = None
recommendation_text: str | None = None
recommendation_url: str | None = None


class CheckMetadata(MinimalSerializerMixin, BaseModel):
"""Essential metadata for a security check."""

check_id: str
title: str
description: str
provider: str
risk: str | None = None
service: str
resource_type: str
remediation: CheckRemediation | None = None
related_url: str | None = None
categories: list[str] | None = None

@classmethod
def from_api_response(cls, data: dict) -> "CheckMetadata":
"""Transform API check_metadata to simplified format."""
remediation_data = data.get("remediation")

remediation = None
if remediation_data:
code = remediation_data.get("code", {})
recommendation = remediation_data.get("recommendation", {})

remediation = CheckRemediation(
cli=code.get("cli"),
terraform=code.get("terraform"),
recommendation_text=recommendation.get("text"),
recommendation_url=recommendation.get("url"),
)

return cls(
check_id=data["checkid"],
title=data["checktitle"],
description=data["description"],
provider=data["provider"],
risk=data.get("risk"),
service=data["servicename"],
resource_type=data["resourcetype"],
remediation=remediation,
related_url=data.get("relatedurl"),
categories=data.get("categories"),
)


class SimplifiedFinding(MinimalSerializerMixin, BaseModel):
"""Simplified security finding with only LLM-relevant information."""

id: str
uid: str
status: Literal["FAIL", "PASS", "MANUAL"]
severity: Literal["critical", "high", "medium", "low", "informational"]
check_metadata: CheckMetadata
status_extended: str | None = None
delta: Literal["new", "changed"] | None = None
muted: bool | None = None
muted_reason: str | None = None

@classmethod
def from_api_response(cls, data: dict) -> "SimplifiedFinding":
"""Transform JSON:API finding response to simplified format."""
attributes = data["attributes"]
check_metadata = attributes["check_metadata"]

return cls(
id=data["id"],
uid=attributes["uid"],
status=attributes["status"],
severity=attributes["severity"],
check_metadata=CheckMetadata.from_api_response(check_metadata),
status_extended=attributes.get("status_extended"),
delta=attributes.get("delta"),
muted=attributes.get("muted"),
muted_reason=attributes.get("muted_reason"),
)


class DetailedFinding(SimplifiedFinding):
"""Detailed security finding with comprehensive information for deep analysis.

Extends SimplifiedFinding with temporal metadata and relationships to scans and resources.
Use this when you need complete context about a specific finding.
"""

inserted_at: str | None = None
updated_at: str | None = None
first_seen_at: str | None = None
scan_id: str | None = None
resource_ids: list[str] | None = None

@classmethod
def from_api_response(cls, data: dict) -> "DetailedFinding":
"""Transform JSON:API finding response to detailed format."""
attributes = data["attributes"]
check_metadata = attributes["check_metadata"]
relationships = data.get("relationships", {})

# Parse scan relationship
scan_id = None
scan_data = relationships.get("scan", {}).get("data")
if scan_data:
scan_id = scan_data["id"]

# Parse resources relationship
resource_ids = None
resources_data = relationships.get("resources", {}).get("data", [])
if resources_data:
resource_ids = [r["id"] for r in resources_data]

return cls(
id=data["id"],
uid=attributes["uid"],
status=attributes["status"],
severity=attributes["severity"],
check_metadata=CheckMetadata.from_api_response(check_metadata),
status_extended=attributes.get("status_extended"),
delta=attributes.get("delta"),
muted=attributes.get("muted"),
muted_reason=attributes.get("muted_reason"),
inserted_at=attributes.get("inserted_at"),
updated_at=attributes.get("updated_at"),
first_seen_at=attributes.get("first_seen_at"),
scan_id=scan_id,
resource_ids=resource_ids,
)


class FindingsListResponse(BaseModel):
"""Simplified response for findings list queries."""

findings: list[SimplifiedFinding]
total_num_finding: int
total_num_pages: int
current_page: int

@classmethod
def from_api_response(cls, response: dict) -> "FindingsListResponse":
"""Transform JSON:API response to simplified format."""
data = response["data"]
meta = response["meta"]
pagination = meta["pagination"]

findings = [SimplifiedFinding.from_api_response(item) for item in data]

return cls(
findings=findings,
total_num_finding=pagination["count"],
total_num_pages=pagination["pages"],
current_page=pagination["page"],
)


class FindingsOverview(BaseModel):
"""Simplified findings overview with aggregate statistics."""

total: int = 0
fail: int = 0
passed: int = 0 # Using 'passed' instead of 'pass' since 'pass' is a Python keyword
muted: int = 0
new: int = 0
changed: int = 0
fail_new: int = 0
fail_changed: int = 0
pass_new: int = 0
pass_changed: int = 0
muted_new: int = 0
muted_changed: int = 0

@classmethod
def from_api_response(cls, response: dict) -> "FindingsOverview":
"""Transform JSON:API overview response to simplified format."""
data = response["data"]
attributes = data["attributes"]

return cls(
total=attributes["total"],
fail=attributes["fail"],
passed=attributes["pass"],
muted=attributes["muted"],
new=attributes["new"],
changed=attributes["changed"],
fail_new=attributes["fail_new"],
fail_changed=attributes["fail_changed"],
pass_new=attributes["pass_new"],
pass_changed=attributes["pass_changed"],
muted_new=attributes["muted_new"],
muted_changed=attributes["muted_changed"],
)
Loading