Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
345 changes: 339 additions & 6 deletions src/api/recommendations.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@
from src.core.models import User
from src.integrations.github.api import github_client

#
from src.rules.ai_rules_scan import (
scan_repo_for_ai_rule_files,
translate_ai_rule_files_to_yaml,
)
import yaml

logger = structlog.get_logger()

router = APIRouter(prefix="/rules", tags=["Recommendations"])
Expand Down Expand Up @@ -135,6 +142,62 @@ class MetricConfig(TypedDict):
thresholds: dict[str, float]
explanation: Callable[[float | int], str]

class ScanAIFilesRequest(BaseModel):
"""
Payload for scanning a repo for AI assistant rule files (Cursor, Claude, Copilot, etc.).
"""

repo_url: HttpUrl = Field(
..., description="Full URL of the GitHub repository (e.g., https://github.com/owner/repo)"
)
github_token: str | None = Field(
None, description="Optional GitHub Personal Access Token (higher rate limits / private repos)"
)
installation_id: int | None = Field(
None, description="GitHub App installation ID (optional; used to get installation token)"
)
include_content: bool = Field(
False, description="If True, include file content in response (for translation pipeline)"
)


class ScanAIFilesCandidate(BaseModel):
"""A single candidate AI rule file."""

path: str = Field(..., description="Repository-relative file path")
has_keywords: bool = Field(..., description="True if content contains known AI-instruction keywords")
content: str | None = Field(None, description="File content; only set when include_content was True")


class ScanAIFilesResponse(BaseModel):
"""Response from the scan-ai-files endpoint."""

repo_full_name: str = Field(..., description="Repository in owner/repo form")
ref: str = Field(..., description="Branch or ref that was scanned (e.g. main)")
candidate_files: list[ScanAIFilesCandidate] = Field(
default_factory=list, description="Candidate AI rule files matching path patterns"
)
warnings: list[str] = Field(default_factory=list, description="Warnings (e.g. rate limit, partial results)")

class TranslateAIFilesRequest(BaseModel):
"""Request for translating AI rule files into .watchflow rules YAML."""

repo_url: HttpUrl = Field(..., description="Full URL of the GitHub repository")
github_token: str | None = Field(None, description="Optional GitHub PAT")
installation_id: int | None = Field(None, description="Optional GitHub App installation ID")


class TranslateAIFilesResponse(BaseModel):
"""Response from translate-ai-files endpoint."""

repo_full_name: str = Field(..., description="Repository in owner/repo form")
ref: str = Field(..., description="Branch scanned (e.g. main)")
rules_yaml: str = Field(..., description="Merged rules YAML (rules: [...])")
rules_count: int = Field(..., description="Number of rules in rules_yaml")
ambiguous: list[dict[str, Any]] = Field(default_factory=list, description="Statements that could not be translated")
warnings: list[str] = Field(default_factory=list)

Comment thread
coderabbitai[bot] marked this conversation as resolved.


def _get_severity_label(value: float, thresholds: dict[str, float]) -> tuple[str, str]:
"""
Expand Down Expand Up @@ -420,6 +483,75 @@ def parse_repo_from_url(url: str) -> str:
return f"{p.owner}/{p.repo}"


def _ref_to_branch(ref: str | None) -> str | None:
"""Convert a full ref (e.g. refs/heads/feature-x) to branch name for use with GitHub API."""
if not ref or not ref.strip():
return None
ref = ref.strip()
if ref.startswith("refs/heads/"):
return ref[len("refs/heads/") :].strip() or None
return ref


async def get_suggested_rules_from_repo(
repo_full_name: str,
installation_id: int | None,
github_token: str | None,
*,
ref: str | None = None,
) -> tuple[str, int, list[dict[str, Any]], list[str]]:
"""
Run agentic scan+translate for a repo (rules.md, etc. -> Watchflow YAML).
Safe to call from event processors; returns empty result on any failure.
Returns (rules_yaml, rules_count, ambiguous_list, rule_sources).
When ref is provided (e.g. from push or PR head), scans that branch; otherwise uses default branch.
"""
try:
repo_data, repo_error = await github_client.get_repository(
repo_full_name, installation_id=installation_id, user_token=github_token
)
if repo_error or not repo_data:
return ("rules: []\n", 0, [], [])
default_branch = repo_data.get("default_branch") or "main"
scan_ref = _ref_to_branch(ref) if ref else default_branch
if not scan_ref:
scan_ref = default_branch

tree_entries = await github_client.get_repository_tree(
repo_full_name,
ref=scan_ref,
installation_id=installation_id,
user_token=github_token,
recursive=True,
)
if not tree_entries:
return ("rules: []\n", 0, [], [])

async def get_content(path: str):
return await github_client.get_file_content(
repo_full_name, path, installation_id, github_token, ref=scan_ref
)

raw_candidates = await scan_repo_for_ai_rule_files(
tree_entries, fetch_content=True, get_file_content=get_content
)
candidates_with_content = [c for c in raw_candidates if c.get("content")]
if not candidates_with_content:
return ("rules: []\n", 0, [], [])

rules_yaml, ambiguous, rule_sources = await translate_ai_rule_files_to_yaml(candidates_with_content)
rules_count = 0
try:
parsed = yaml.safe_load(rules_yaml)
rules_count = len(parsed.get("rules", [])) if isinstance(parsed, dict) else 0
except Exception:
pass
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
return (rules_yaml, rules_count, ambiguous, rule_sources)
except Exception as e:
logger.warning("get_suggested_rules_from_repo_failed", repo=repo_full_name, error=str(e))
return ("rules: []\n", 0, [], [])
Comment thread
coderabbitai[bot] marked this conversation as resolved.


# --- Endpoints --- # Main API surface—keep stable for clients.


Expand Down Expand Up @@ -680,17 +812,18 @@ async def proceed_with_pr(

try:
# Step 1: Get repository metadata to find default branch
repo_data = await github_client.get_repository(
repo_data, repo_error = await github_client.get_repository(
repo_full_name=repo_full_name,
installation_id=installation_id,
user_token=user_token,
)

if not repo_data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Repository '{repo_full_name}' not found or access denied.",
)
if repo_error:
err_status = repo_error["status"]
status_code = status.HTTP_429_TOO_MANY_REQUESTS if err_status == 403 else err_status
if status_code not in (401, 403, 404, 429):
status_code = status.HTTP_502_BAD_GATEWAY
raise HTTPException(status_code=status_code, detail=repo_error["message"])
Comment thread
coderabbitai[bot] marked this conversation as resolved.

base_branch = payload.base_branch or repo_data.get("default_branch", "main")

Expand Down Expand Up @@ -795,3 +928,203 @@ async def proceed_with_pr(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create pull request. Please try again.",
) from e

@router.post(
"/scan-ai-files",
response_model=ScanAIFilesResponse,
status_code=status.HTTP_200_OK,
summary="Scan repository for AI rule files",
description=(
"Lists files matching *rules*.md, *guidelines*.md, *prompt*.md, .cursor/rules/*.mdc. "
"Optionally fetches content and flags files that contain AI-instruction keywords."
),
dependencies=[Depends(rate_limiter)],
)
async def scan_ai_rule_files(
request: Request,
payload: ScanAIFilesRequest,
user: User | None = Depends(get_current_user_optional),
) -> ScanAIFilesResponse:
"""
Scan a repository for AI assistant rule files (Cursor, Claude, Copilot, etc.).
"""
repo_url_str = str(payload.repo_url)
client_ip = request.client.host if request.client else "unknown"
logger.info("scan_ai_files_requested", repo_url=repo_url_str, ip=client_ip)

try:
repo_full_name = parse_repo_from_url(repo_url_str)
except ValueError as e:
logger.warning("invalid_url_provided", url=repo_url_str, error=str(e))
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=str(e)
) from e

# Resolve token (same as recommend_rules)
github_token = None
if user and user.github_token:
try:
github_token = user.github_token.get_secret_value()
except (AttributeError, TypeError):
github_token = str(user.github_token) if user.github_token else None
elif payload.github_token:
github_token = payload.github_token
elif payload.installation_id:
installation_token = await github_client.get_installation_access_token(payload.installation_id)
if installation_token:
github_token = installation_token

installation_id = payload.installation_id

# Default branch
repo_data, repo_error = await github_client.get_repository(
repo_full_name, installation_id=installation_id, user_token=github_token
)
if repo_error:
err_status = repo_error["status"]
status_code = status.HTTP_429_TOO_MANY_REQUESTS if err_status == 403 else err_status
if status_code not in (401, 403, 404, 429):
status_code = status.HTTP_502_BAD_GATEWAY
raise HTTPException(status_code=status_code, detail=repo_error["message"])
default_branch = repo_data.get("default_branch") or "main"
ref = default_branch

# Full tree
tree_entries = await github_client.get_repository_tree(
repo_full_name,
ref=ref,
installation_id=installation_id,
user_token=github_token,
recursive=True,
)
if not tree_entries:
return ScanAIFilesResponse(
repo_full_name=repo_full_name,
ref=ref,
candidate_files=[],
warnings=["Could not load repository tree; check access and ref."],
)

# Optional content fetcher for keyword scan (and optionally include in response)
async def get_content(path: str):
return await github_client.get_file_content(
repo_full_name, path, installation_id, github_token
)

# Always fetch content so has_keywords is set; strip content in response unless include_content
raw_candidates = await scan_repo_for_ai_rule_files(
tree_entries,
fetch_content=True,
get_file_content=get_content,
)

candidates = [
ScanAIFilesCandidate(
path=c["path"],
has_keywords=c["has_keywords"],
content=c["content"] if payload.include_content else None,
)
for c in raw_candidates
]

return ScanAIFilesResponse(
repo_full_name=repo_full_name,
ref=ref,
candidate_files=candidates,
warnings=[],
)

@router.post(
"/translate-ai-files",
response_model=TranslateAIFilesResponse,
status_code=status.HTTP_200_OK,
summary="Translate AI rule files to Watchflow YAML",
description="Scans repo for AI rule files, extracts statements, maps or translates to .watchflow rules YAML.",
dependencies=[Depends(rate_limiter)],
)
async def translate_ai_rule_files(
request: Request,
payload: TranslateAIFilesRequest,
user: User | None = Depends(get_current_user_optional),
) -> TranslateAIFilesResponse:
repo_url_str = str(payload.repo_url)
logger.info("translate_ai_files_requested", repo_url=repo_url_str)

try:
repo_full_name = parse_repo_from_url(repo_url_str)
except ValueError as e:
logger.warning("invalid_url_provided", url=repo_url_str, error=str(e))
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=str(e)) from e

github_token = None
if user and user.github_token:
try:
github_token = user.github_token.get_secret_value()
except (AttributeError, TypeError):
github_token = str(user.github_token) if user.github_token else None
elif payload.github_token:
github_token = payload.github_token
elif payload.installation_id:
installation_token = await github_client.get_installation_access_token(payload.installation_id)
if installation_token:
github_token = installation_token
installation_id = payload.installation_id

repo_data, repo_error = await github_client.get_repository(
repo_full_name, installation_id=installation_id, user_token=github_token
)
if repo_error:
err_status = repo_error["status"]
status_code = status.HTTP_429_TOO_MANY_REQUESTS if err_status == 403 else err_status
if status_code not in (401, 403, 404, 429):
status_code = status.HTTP_502_BAD_GATEWAY
raise HTTPException(status_code=status_code, detail=repo_error["message"])
default_branch = repo_data.get("default_branch") or "main"
ref = default_branch

tree_entries = await github_client.get_repository_tree(
repo_full_name, ref=ref, installation_id=installation_id, user_token=github_token, recursive=True
)
if not tree_entries:
return TranslateAIFilesResponse(
repo_full_name=repo_full_name,
ref=ref,
rules_yaml="rules: []\n",
rules_count=0,
ambiguous=[],
warnings=["Could not load repository tree."],
)

async def get_content(path: str):
return await github_client.get_file_content(repo_full_name, path, installation_id, github_token)

raw_candidates = await scan_repo_for_ai_rule_files(
tree_entries, fetch_content=True, get_file_content=get_content
)
candidates_with_content = [c for c in raw_candidates if c.get("content")]
if not candidates_with_content:
return TranslateAIFilesResponse(
repo_full_name=repo_full_name,
ref=ref,
rules_yaml="rules: []\n",
rules_count=0,
ambiguous=[],
warnings=["No AI rule file content could be loaded."],
)

rules_yaml, ambiguous, rule_sources = await translate_ai_rule_files_to_yaml(candidates_with_content)
rules_count = rules_yaml.count("\n - ") + (1 if rules_yaml.strip() != "rules: []" and " - " in rules_yaml else 0)
try:
parsed = yaml.safe_load(rules_yaml)
rules_count = len(parsed.get("rules", [])) if isinstance(parsed, dict) else 0
except Exception:
pass
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

return TranslateAIFilesResponse(
repo_full_name=repo_full_name,
ref=ref,
rules_yaml=rules_yaml,
rules_count=rules_count,
ambiguous=ambiguous,
warnings=[],
Comment thread
coderabbitai[bot] marked this conversation as resolved.
)
Loading
Loading