Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
413 changes: 405 additions & 8 deletions src/api/recommendations.py

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions src/event_processors/pull_request/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from typing import Any

from src.agents import get_agent
from src.api.recommendations import get_suggested_rules_from_repo
from src.rules.ai_rules_scan import is_relevant_pr
from src.core.models import Violation
from src.event_processors.base import BaseEventProcessor, ProcessingResult
from src.event_processors.pull_request.enricher import PullRequestEnricher
Expand Down Expand Up @@ -60,6 +62,32 @@ async def process(self, task: Task) -> ProcessingResult:
raise ValueError("Failed to get installation access token")
github_token = github_token_optional

# Agentic: scan repo only when relevant (PR targets default branch)
# Use the PR head ref so we scan the branch being proposed, not main.
if is_relevant_pr(task.payload):
try:
pr_head_ref = pr_data.get("head", {}).get("ref") # branch name, e.g. feature-x
rules_yaml, rules_count, ambiguous, rule_sources = await get_suggested_rules_from_repo(
repo_full_name, installation_id, github_token, ref=pr_head_ref
)
logger.info("=" * 80)
logger.info("📋 Suggested rules (agentic scan + translation)")
logger.info(f" Repo: {repo_full_name} | PR #{pr_number} | Ref: {pr_head_ref or 'default'} | Translated rules: {rules_count}")
if rule_sources:
from_mapping = sum(1 for s in rule_sources if s == "mapping")
from_agent = sum(1 for s in rule_sources if s == "agent")
logger.info(" From deterministic mapping: %s | From AI agent: %s", from_mapping, from_agent)
logger.info(" Per-rule source: %s", rule_sources)
if rules_count > 0:
logger.info(" YAML:\n%s", rules_yaml)
if ambiguous:
logger.info(" Ambiguous (not translated): %s", [a.get("statement", "") for a in ambiguous])
logger.info("=" * 80)
except Exception as e:
logger.warning("Suggested rules scan failed: %s", e)
else:
logger.info("PR not relevant for agentic scan (skip): base ref=%s", task.payload.get("pull_request", {}).get("base", {}).get("ref"))

Comment thread
coderabbitai[bot] marked this conversation as resolved.
# 1. Enrich event data
event_data = await self.enricher.enrich_event_data(task, github_token)
api_calls += 1
Expand Down
30 changes: 30 additions & 0 deletions src/event_processors/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
from typing import Any

from src.agents import get_agent
from src.api.recommendations import get_suggested_rules_from_repo
from src.rules.ai_rules_scan import is_relevant_push
from src.core.models import Severity, Violation
from src.event_processors.base import BaseEventProcessor, ProcessingResult
from src.integrations.github.check_runs import CheckRunManager
from src.tasks.task_queue import Task


logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -62,6 +65,33 @@ async def process(self, task: Task) -> ProcessingResult:
error="No installation ID found",
)

# Agentic: scan repo only when relevant (default branch or touched rule files)
# Use the branch that was pushed so we scan that branch's file content, not main.
if is_relevant_push(task.payload):
try:
github_token = await self.github_client.get_installation_access_token(task.installation_id)
push_ref = payload.get("ref") # e.g. refs/heads/feature-x
rules_yaml, rules_count, ambiguous, rule_sources = await get_suggested_rules_from_repo(
task.repo_full_name, task.installation_id, github_token, ref=push_ref
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
logger.info("=" * 80)
logger.info("📋 Suggested rules (agentic scan + translation)")
logger.info(f" Repo: {task.repo_full_name} | Ref: {push_ref or 'default'} | Translated rules: {rules_count}")
if rule_sources:
from_mapping = sum(1 for s in rule_sources if s == "mapping")
from_agent = sum(1 for s in rule_sources if s == "agent")
logger.info(" From deterministic mapping: %s | From AI agent: %s", from_mapping, from_agent)
logger.info(" Per-rule source: %s", rule_sources)
if rules_count > 0:
logger.info(" YAML:\n%s", rules_yaml)
if ambiguous:
logger.info(" Ambiguous (not translated): %s", [a.get("statement", "") for a in ambiguous])
logger.info("=" * 80)
except Exception as e:
logger.warning("Suggested rules scan failed: %s", e)
else:
logger.info("Push not relevant for agentic scan (skip): ref=%s", task.payload.get("ref"))

Comment thread
coderabbitai[bot] marked this conversation as resolved.
rules_optional = await self.rule_provider.get_rules(task.repo_full_name, task.installation_id)
rules = rules_optional if rules_optional is not None else []

Expand Down
115 changes: 96 additions & 19 deletions src/integrations/github/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import jwt
import structlog
from cachetools import TTLCache # type: ignore[import-untyped]
from tenacity import retry, stop_after_attempt, wait_exponential
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential

from src.core.config import config
from src.core.errors import GitHubGraphQLError
Expand Down Expand Up @@ -129,27 +129,51 @@ async def get_installation_access_token(self, installation_id: int) -> str | Non

async def get_repository(
self, repo_full_name: str, installation_id: int | None = None, user_token: str | None = None
) -> dict[str, Any] | None:
"""Fetch repository metadata (default branch, language, etc.). Supports public access."""
) -> tuple[dict[str, Any] | None, dict[str, Any] | None]:
"""
Fetch repository metadata. Returns (repo_data, None) on success;
(None, {"status": int, "message": str}) on failure for meaningful API responses.
"""
headers = await self._get_auth_headers(
installation_id=installation_id, user_token=user_token, allow_anonymous=True
installation_id=installation_id, user_token=user_token
)
if not headers:
return None
return (
None,
{"status": 401, "message": "Authentication required. Provide github_token or installation_id in the request."},
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
url = f"{config.github.api_base_url}/repos/{repo_full_name}"
session = await self._get_session()
async with session.get(url, headers=headers) as response:
if response.status == 200:
data = await response.json()
return cast("dict[str, Any]", data)
return None
return cast("dict[str, Any]", data), None
try:
body = await response.json()
gh_message = body.get("message", "") if isinstance(body, dict) else ""
except Exception:
gh_message = ""
if response.status == 404:
msg = gh_message or "Repository not found or access denied. Check repo name and token permissions."
return None, {"status": 404, "message": msg}
if response.status == 403:
msg = "GitHub API rate limit exceeded. Try again later or provide github_token for higher limits."
if gh_message and "rate limit" in gh_message.lower():
msg = gh_message
return None, {"status": 403, "message": msg}
if response.status == 401:
return (
None,
{"status": 401, "message": gh_message or "Invalid or expired token. Check github_token or installation_id."},
)
return None, {"status": response.status, "message": gh_message or f"GitHub API returned {response.status}."}
Comment on lines +133 to +174
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Use validated models instead of tuple-of-raw-dicts for get_repository.

The new (repo_data, error_info) contract is shape-unsafe (dict[str, Any]) and easy to misuse. Please return a Pydantic response model (typed success/error branches) so callers can rely on validated fields.

As per coding guidelines, "All agent outputs and external payloads must use validated BaseModel from Pydantic".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/integrations/github/api.py` around lines 132 - 169, get_repository
currently returns a tuple of raw dicts which is shape-unsafe; change it to
return validated Pydantic models (e.g., RepositoryResponse for success and
ApiErrorResponse for failures) instead of tuple[dict[str, Any] | None, dict[str,
Any] | None]. Update the function signature and all return sites inside
get_repository (including the authentication error branch using
_get_auth_headers and all response.status branches after _get_session().get) to
instantiate and return the appropriate BaseModel instances with the same fields
(status, message, and repo data mapped into RepositoryResponse fields), and
adjust callers to consume the typed models rather than raw dicts. Ensure models
are defined as Pydantic BaseModel subclasses and include parsing/validation of
the JSON body (use the model.parse_obj or model(...) constructors) before
returning.


async def list_directory_any_auth(
self, repo_full_name: str, path: str, installation_id: int | None = None, user_token: str | None = None
) -> list[dict[str, Any]]:
"""List directory contents using either installation or user token."""
"""List directory contents using installation or user token (auth required)."""
headers = await self._get_auth_headers(
installation_id=installation_id, user_token=user_token, allow_anonymous=True
installation_id=installation_id, user_token=user_token
)
if not headers:
return []
Expand All @@ -164,24 +188,75 @@ async def list_directory_any_auth(
response.raise_for_status()
return []


async def get_repository_tree(
self,
repo_full_name: str,
ref: str | None = None,
installation_id: int | None = None,
user_token: str | None = None,
recursive: bool = True,
) -> list[dict[str, Any]]:
"""Get the tree of a repository. Requires authentication (github_token or installation_id)."""
headers = await self._get_auth_headers(
installation_id=installation_id,
user_token=user_token,
)
if not headers:
return []
ref = ref or "main"
tree_sha = await self._resolve_tree_sha(repo_full_name, ref, headers)
if not tree_sha:
return []

url = ( f"{config.github.api_base_url}"
f"/repos/{repo_full_name}/git/trees/{tree_sha}"
f"?recursive={recursive}" )

session = await self._get_session()
async with session.get(url, headers=headers) as response:
if response.status != 200:
return []
data = await response.json()
return cast("list[dict[str, Any]]", data.get("tree", []))

Comment thread
coderabbitai[bot] marked this conversation as resolved.

async def _resolve_tree_sha(self, repo_full_name: str, ref: str, headers: dict[str, str]) -> str | None:
"""Resolve the tree SHA for the given ref (branch, tag, or commit SHA) via the commits API."""
session = await self._get_session()
url = f"{config.github.api_base_url}/repos/{repo_full_name}/commits/{ref}"
async with session.get(url, headers=headers) as response:
if response.status != 200:
return None
commit_data = await response.json()
if not isinstance(commit_data, dict):
return None
return commit_data.get("commit", {}).get("tree", {}).get("sha")

async def get_file_content(
self, repo_full_name: str, file_path: str, installation_id: int | None, user_token: str | None = None
self,
repo_full_name: str,
file_path: str,
installation_id: int | None,
user_token: str | None = None,
ref: str | None = None,
) -> str | None:
"""
Fetches the content of a file from a repository. Supports anonymous access for public analysis.
Fetches the content of a file from a repository. Requires authentication (github_token or installation_id).
When ref is provided (branch name, tag, or commit SHA), returns content at that ref; otherwise uses default branch.
"""
headers = await self._get_auth_headers(
installation_id=installation_id,
user_token=user_token,
accept="application/vnd.github.raw",
allow_anonymous=True,
)
if not headers:
return None
url = f"{config.github.api_base_url}/repos/{repo_full_name}/contents/{file_path}"
params = {"ref": ref} if ref else None

session = await self._get_session()
async with session.get(url, headers=headers) as response:
async with session.get(url, headers=headers, params=params) as response:
if response.status == 200:
logger.info(f"Successfully fetched file '{file_path}' from '{repo_full_name}'.")
return await response.text()
Expand Down Expand Up @@ -1030,7 +1105,6 @@ async def fetch_recent_pull_requests(
headers = await self._get_auth_headers(
installation_id=installation_id,
user_token=user_token,
allow_anonymous=True, # Support public repos
)
if not headers:
logger.error("pr_fetch_auth_failed", repo=repo_full_name, error_type="auth_error")
Expand Down Expand Up @@ -1115,7 +1189,11 @@ async def fetch_recent_pull_requests(
logger.error("pr_fetch_unexpected_error", repo=repo_full_name, error_type="unknown_error", error=str(e))
return []

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
@retry(
retry=retry_if_exception_type(aiohttp.ClientError),
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
)
async def execute_graphql(
self, query: str, variables: dict[str, Any], user_token: str | None = None, installation_id: int | None = None
) -> dict[str, Any]:
Expand All @@ -1139,18 +1217,17 @@ async def execute_graphql(
url = f"{config.github.api_base_url}/graphql"
payload = {"query": query, "variables": variables}

# Get appropriate headers (can be anonymous for public data or authenticated)
# Priority: user_token > installation_id > anonymous (if allowed)
# Get appropriate headers (auth required: user_token or installation_id)
headers = await self._get_auth_headers(
user_token=user_token, installation_id=installation_id, allow_anonymous=True
user_token=user_token, installation_id=installation_id
)
if not headers:
# Fallback or error? GraphQL usually demands auth.
# If we have no headers, we likely can't query GraphQL successfully for many fields.
# We'll try with empty headers if that's what _get_auth_headers returns (it returns None on failure).
# If None, we can't proceed.
logger.error("GraphQL execution failed: No authentication headers available.")
raise Exception("Authentication required for GraphQL query.")
raise PermissionError("Authentication required for GraphQL query.")

start_time = time.time()

Expand Down
Loading