From 1c29fe43104f49fc967e3c82a4c0a994da126062 Mon Sep 17 00:00:00 2001 From: Krishnavamsi-codes Date: Sun, 31 May 2026 19:26:20 +0530 Subject: [PATCH 1/3] feat(api) add consistent error codes for API responses --- backend/app/exceptions.py | 130 +++++++++++++++ backend/app/main.py | 20 ++- backend/app/schemas.py | 6 + backend/tests/test_auth_endpoints.py | 230 ++++++++++++++------------- backend/tests/test_endpoints.py | 9 ++ backend/tests/test_file_upload.py | 17 +- backend/tests/test_history.py | 3 + backend/tests/test_share.py | 191 +++++++++++----------- backend/tests/test_zip_dos.py | 4 + 9 files changed, 393 insertions(+), 217 deletions(-) create mode 100644 backend/app/exceptions.py diff --git a/backend/app/exceptions.py b/backend/app/exceptions.py new file mode 100644 index 00000000..88613205 --- /dev/null +++ b/backend/app/exceptions.py @@ -0,0 +1,130 @@ +from enum import Enum +from fastapi import HTTPException, Request, status +from fastapi.exceptions import RequestValidationError +from fastapi.responses import JSONResponse +import logging + +logger = logging.getLogger("ai_assistant.exceptions") + +class ErrorCode(str, Enum): + INVALID_TOKEN = "invalid_token" + AUTHENTICATION_REQUIRED = "authentication_required" + INVALID_CREDENTIALS = "invalid_credentials" + EMAIL_ALREADY_EXISTS = "email_already_exists" + HISTORY_NOT_FOUND = "history_not_found" + FAVORITE_NOT_FOUND = "favorite_not_found" + SHARED_RESULT_NOT_FOUND = "shared_result_not_found" + UNSUPPORTED_FILE_TYPE = "unsupported_file_type" + PAYLOAD_TOO_LARGE = "payload_too_large" + RATE_LIMITED = "rate_limited" + VALIDATION_ERROR = "validation_error" + INTERNAL_SERVER_ERROR = "internal_server_error" + BAD_REQUEST = "bad_request" + FORBIDDEN = "forbidden" + ALREADY_SUBSCRIBED = "already_subscribed" + SUBSCRIPTION_NOT_FOUND = "subscription_not_found" + +class APIException(HTTPException): + def __init__(self, status_code: int, error_code: ErrorCode, detail: str, headers: dict | None = None): + super().__init__(status_code=status_code, detail=detail, headers=headers) + self.error_code = error_code + +def map_http_exception_to_code(status_code: int, detail: str) -> str: + detail_lower = detail.lower() + + # 401 Unauthorized + if status_code == 401: + if "authentication required" in detail_lower: + return ErrorCode.AUTHENTICATION_REQUIRED + if "invalid token" in detail_lower or "user not found" in detail_lower: + return ErrorCode.INVALID_TOKEN + if "invalid credentials" in detail_lower: + return ErrorCode.INVALID_CREDENTIALS + return "unauthorized" + + # 403 Forbidden + if status_code == 403: + if "invalid unsubscribe token" in detail_lower: + return ErrorCode.INVALID_TOKEN + return ErrorCode.FORBIDDEN + + # 404 Not Found + if status_code == 404: + if "history" in detail_lower: + return ErrorCode.HISTORY_NOT_FOUND + if "favorite" in detail_lower: + return ErrorCode.FAVORITE_NOT_FOUND + if "shared result" in detail_lower or "share" in detail_lower: + return ErrorCode.SHARED_RESULT_NOT_FOUND + if "subscription" in detail_lower: + return ErrorCode.SUBSCRIPTION_NOT_FOUND + return "not_found" + + # 409 Conflict + if status_code == 409: + if "already subscribed" in detail_lower: + return ErrorCode.ALREADY_SUBSCRIBED + if "email already exists" in detail_lower: + return ErrorCode.EMAIL_ALREADY_EXISTS + return "conflict" + + # 413 Content Too Large / Payload Too Large + if status_code == 413: + return ErrorCode.PAYLOAD_TOO_LARGE + + # 415 Unsupported Media Type + if status_code == 415: + return ErrorCode.UNSUPPORTED_FILE_TYPE + + # 429 Too Many Requests + if status_code == 429: + return ErrorCode.RATE_LIMITED + + # 400 Bad Request + if status_code == 400: + if "only .zip" in detail_lower: + return ErrorCode.UNSUPPORTED_FILE_TYPE + return ErrorCode.BAD_REQUEST + + # 500 Internal Server Error + if status_code == 500: + return ErrorCode.INTERNAL_SERVER_ERROR + + return ErrorCode.BAD_REQUEST if status_code < 500 else ErrorCode.INTERNAL_SERVER_ERROR + +async def api_exception_handler(request: Request, exc: APIException): + return JSONResponse( + status_code=exc.status_code, + content={ + "error": exc.error_code, + "detail": exc.detail, + }, + headers=exc.headers, + ) + +async def http_exception_handler(request: Request, exc: HTTPException): + error_code = map_http_exception_to_code(exc.status_code, exc.detail) + return JSONResponse( + status_code=exc.status_code, + content={ + "error": error_code, + "detail": exc.detail, + }, + headers=exc.headers, + ) + +async def validation_exception_handler(request: Request, exc: RequestValidationError): + errors = exc.errors() + details = [] + for err in errors: + loc = " -> ".join(str(l) for l in err["loc"]) + details.append(f"{loc}: {err['msg']}") + detail_str = "; ".join(details) + + return JSONResponse( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + content={ + "error": ErrorCode.VALIDATION_ERROR, + "detail": detail_str, + }, + ) diff --git a/backend/app/main.py b/backend/app/main.py index e1903cfd..115d1eec 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -3,7 +3,8 @@ FastAPI application with advanced middleware, rate limiting, and full analysis engine. """ -from fastapi import FastAPI, Request +from fastapi import FastAPI, Request, HTTPException +from fastapi.exceptions import RequestValidationError from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.gzip import GZipMiddleware from fastapi.responses import JSONResponse @@ -14,6 +15,13 @@ import logging from contextlib import asynccontextmanager +from .exceptions import ( + APIException, + api_exception_handler, + http_exception_handler, + validation_exception_handler, +) + from .routers import ( analyze, auth, @@ -88,6 +96,10 @@ async def lifespan(app: FastAPI): lifespan=lifespan, ) +app.add_exception_handler(APIException, api_exception_handler) +app.add_exception_handler(HTTPException, http_exception_handler) +app.add_exception_handler(RequestValidationError, validation_exception_handler) + # ── Middleware ──────────────────────────────────────────────────────────────── app.add_middleware(GZipMiddleware, minimum_size=1000) app.add_middleware( @@ -125,6 +137,7 @@ async def add_process_time_header(request: Request, call_next): return JSONResponse( status_code=429, content={ + "error": "rate_limited", "detail": f"Rate limit exceeded. Max {RATE_LIMIT} requests/minute." }, headers=headers, @@ -238,5 +251,8 @@ async def global_exception_handler(request: Request, exc: Exception): logging.exception("Unhandled error") return JSONResponse( status_code=500, - content={"detail": "Internal server error. Please try again."}, + content={ + "error": "internal_server_error", + "detail": "Internal server error. Please try again.", + }, ) diff --git a/backend/app/schemas.py b/backend/app/schemas.py index ac17529e..7e9b72ab 100644 --- a/backend/app/schemas.py +++ b/backend/app/schemas.py @@ -378,3 +378,9 @@ class AnalyzeResponse(BaseModel): debugging: DebuggingResponse suggestions: SuggestionsResponse analysis_time_ms: float | None = None + + +class ErrorResponse(BaseModel): + error: str + detail: str + diff --git a/backend/tests/test_auth_endpoints.py b/backend/tests/test_auth_endpoints.py index eeece4a4..278c766d 100644 --- a/backend/tests/test_auth_endpoints.py +++ b/backend/tests/test_auth_endpoints.py @@ -1,113 +1,117 @@ -"""Integration tests for auth routes""" - -import os -import sys - -import pytest -from fastapi.testclient import TestClient -from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker -from sqlalchemy.pool import StaticPool - -sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) - -from app.database import Base, get_db -from app.main import app as fastapi_app - - -TEST_ENGINE = create_engine( - "sqlite:///:memory:", - connect_args={"check_same_thread": False}, - poolclass=StaticPool, -) -TEST_SESSION_LOCAL = sessionmaker(bind=TEST_ENGINE) - - -def _override_db(): - db = TEST_SESSION_LOCAL() - try: - yield db - finally: - db.close() - - -@pytest.fixture -def client(): - previous_override = fastapi_app.dependency_overrides.get(get_db) - fastapi_app.dependency_overrides[get_db] = _override_db - with TestClient(fastapi_app) as test_client: - yield test_client - if previous_override is None: - fastapi_app.dependency_overrides.pop(get_db, None) - else: - fastapi_app.dependency_overrides[get_db] = previous_override - - -@pytest.fixture(autouse=True) -def _recreate_tables(): - Base.metadata.create_all(bind=TEST_ENGINE) - yield - Base.metadata.drop_all(bind=TEST_ENGINE) - - -def test_auth_routes_are_exposed_in_openapi(client): - response = client.get("/openapi.json") - assert response.status_code == 200 - - paths = response.json()["paths"] - assert "/auth/signup" in paths - assert "/auth/login" in paths - assert "/auth/me" in paths - - -def test_signup_login_and_me_happy_path(client): - signup_response = client.post( - "/auth/signup", - json={"email": "new.user@example.com", "password": "StrongPass123!"}, - ) - assert signup_response.status_code == 200 - - signup_data = signup_response.json() - assert signup_data["email"] == "new.user@example.com" - assert signup_data["user_id"] > 0 - assert signup_data["access_token"] - - login_response = client.post( - "/auth/login", - json={"email": "new.user@example.com", "password": "StrongPass123!"}, - ) - assert login_response.status_code == 200 - - token = login_response.json()["access_token"] - me_response = client.get( - "/auth/me", - headers={"Authorization": f"Bearer {token}"}, - ) - assert me_response.status_code == 200 - assert me_response.json() == { - "user_id": signup_data["user_id"], - "email": "new.user@example.com", - } - - -def test_signup_duplicate_email_returns_409(client): - payload = {"email": "dup@example.com", "password": "StrongPass123!"} - first_response = client.post("/auth/signup", json=payload) - assert first_response.status_code == 200 - - duplicate_response = client.post("/auth/signup", json=payload) - assert duplicate_response.status_code == 409 - assert "already exists" in duplicate_response.json()["detail"].lower() - - -def test_me_rejects_missing_and_invalid_token(client): - missing_token_response = client.get("/auth/me") - assert missing_token_response.status_code == 401 - assert "authentication required" in missing_token_response.json()["detail"].lower() - - invalid_token_response = client.get( - "/auth/me", - headers={"Authorization": "Bearer not-a-real-token"}, - ) - assert invalid_token_response.status_code == 401 - assert "invalid token" in invalid_token_response.json()["detail"].lower() \ No newline at end of file +"""Integration tests for auth routes""" + +import os +import sys + +import pytest +from fastapi.testclient import TestClient +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from sqlalchemy.pool import StaticPool + +sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) + +from app.database import Base, get_db +from app.main import app as fastapi_app + + +TEST_ENGINE = create_engine( + "sqlite:///:memory:", + connect_args={"check_same_thread": False}, + poolclass=StaticPool, +) +TEST_SESSION_LOCAL = sessionmaker(bind=TEST_ENGINE) + + +def _override_db(): + db = TEST_SESSION_LOCAL() + try: + yield db + finally: + db.close() + + +@pytest.fixture +def client(): + previous_override = fastapi_app.dependency_overrides.get(get_db) + fastapi_app.dependency_overrides[get_db] = _override_db + with TestClient(fastapi_app) as test_client: + yield test_client + if previous_override is None: + fastapi_app.dependency_overrides.pop(get_db, None) + else: + fastapi_app.dependency_overrides[get_db] = previous_override + + +@pytest.fixture(autouse=True) +def _recreate_tables(): + Base.metadata.create_all(bind=TEST_ENGINE) + yield + Base.metadata.drop_all(bind=TEST_ENGINE) + + +def test_auth_routes_are_exposed_in_openapi(client): + response = client.get("/openapi.json") + assert response.status_code == 200 + + paths = response.json()["paths"] + assert "/auth/signup" in paths + assert "/auth/login" in paths + assert "/auth/me" in paths + + +def test_signup_login_and_me_happy_path(client): + signup_response = client.post( + "/auth/signup", + json={"email": "new.user@example.com", "password": "StrongPass123!"}, + ) + assert signup_response.status_code == 200 + + signup_data = signup_response.json() + assert signup_data["email"] == "new.user@example.com" + assert signup_data["user_id"] > 0 + assert signup_data["access_token"] + + login_response = client.post( + "/auth/login", + json={"email": "new.user@example.com", "password": "StrongPass123!"}, + ) + assert login_response.status_code == 200 + + token = login_response.json()["access_token"] + me_response = client.get( + "/auth/me", + headers={"Authorization": f"Bearer {token}"}, + ) + assert me_response.status_code == 200 + assert me_response.json() == { + "user_id": signup_data["user_id"], + "email": "new.user@example.com", + } + + +def test_signup_duplicate_email_returns_409(client): + payload = {"email": "dup@example.com", "password": "StrongPass123!"} + first_response = client.post("/auth/signup", json=payload) + assert first_response.status_code == 200 + + duplicate_response = client.post("/auth/signup", json=payload) + assert duplicate_response.status_code == 409 + assert "already exists" in duplicate_response.json()["detail"].lower() + assert duplicate_response.json()["error"] == "email_already_exists" + + + +def test_me_rejects_missing_and_invalid_token(client): + missing_token_response = client.get("/auth/me") + assert missing_token_response.status_code == 401 + assert "authentication required" in missing_token_response.json()["detail"].lower() + assert missing_token_response.json()["error"] == "authentication_required" + + invalid_token_response = client.get( + "/auth/me", + headers={"Authorization": "Bearer not-a-real-token"}, + ) + assert invalid_token_response.status_code == 401 + assert "invalid token" in invalid_token_response.json()["detail"].lower() + assert invalid_token_response.json()["error"] == "invalid_token" \ No newline at end of file diff --git a/backend/tests/test_endpoints.py b/backend/tests/test_endpoints.py index 818eedff..c903cf11 100644 --- a/backend/tests/test_endpoints.py +++ b/backend/tests/test_endpoints.py @@ -169,6 +169,8 @@ def test_rate_limit_returns_429_with_retry_after_header(): assert r.headers["Retry-After"] == str(app_main.RATE_LIMIT_WINDOW_SECONDS) assert r.headers["X-RateLimit-Limit"] == str(app_main.RATE_LIMIT) assert r.headers["X-RateLimit-Remaining"] == "0" + assert r.json()["error"] == "rate_limited" + assert "Rate limit exceeded" in r.json()["detail"] # ── Explanation ─────────────────────────────────────────────────────────────── @@ -214,11 +216,15 @@ def test_explanation_accepts_rust_hint_alias(): def test_explanation_empty_code(): r = client.post("/explanation/", json={"code": " "}) assert r.status_code == 422 + assert r.json()["error"] == "validation_error" + assert "code" in r.json()["detail"] def test_explanation_too_long(): r = client.post("/explanation/", json={"code": "x" * 60000}) assert r.status_code == 422 + assert r.json()["error"] == "validation_error" + assert "code" in r.json()["detail"] def test_explanation_typescript(): @@ -664,6 +670,7 @@ def test_full_analyze_all_languages(): def test_missing_code_field(): r = client.post("/analyze/", json={}) assert r.status_code == 422 + assert r.json()["error"] == "validation_error" def test_unicode_code(): @@ -734,3 +741,5 @@ def test_get_stream_with_language_hint(): def test_get_stream_empty_code_rejected(): r = client.get("/analyze/stream", params={"code": " "}) assert r.status_code in (400, 422) + assert r.json()["error"] in ("validation_error", "bad_request") + assert "code" in r.json()["detail"] diff --git a/backend/tests/test_file_upload.py b/backend/tests/test_file_upload.py index edba244a..9019b958 100644 --- a/backend/tests/test_file_upload.py +++ b/backend/tests/test_file_upload.py @@ -94,10 +94,10 @@ def test_upload_blocked_files( ) assert response.status_code == 415 - data = response.json() - assert "Executable files are not allowed" in data["detail"] + assert data["error"] == "unsupported_file_type" + # ========================================================= @@ -119,10 +119,10 @@ def test_invalid_mime_type(): print(response.json()) assert response.status_code == 415 - data = response.json() - assert "Invalid MIME type" in data["detail"] + assert data["error"] == "unsupported_file_type" + # ========================================================= @@ -143,10 +143,10 @@ def test_double_extension(): ) assert response.status_code == 415 - data = response.json() - assert "Executable files are not allowed" in data["detail"] + assert data["error"] == "unsupported_file_type" + # ========================================================= @@ -158,6 +158,8 @@ def test_no_file_uploaded(): response = client.post("/upload/validate") assert response.status_code in [400, 422] + assert response.json()["error"] in ("bad_request", "validation_error") + # ========================================================= @@ -179,4 +181,5 @@ def test_large_file(): } ) - assert response.status_code == 413 \ No newline at end of file + assert response.status_code == 413 + assert response.json()["error"] == "payload_too_large" \ No newline at end of file diff --git a/backend/tests/test_history.py b/backend/tests/test_history.py index 0b5f3718..21851f6a 100644 --- a/backend/tests/test_history.py +++ b/backend/tests/test_history.py @@ -67,6 +67,9 @@ def test_delete_history(): def test_delete_nonexistent(): r = client.delete("/history/999999") assert r.status_code == 404 + assert r.json()["error"] == "history_not_found" + assert "History entry not found" in r.json()["detail"] + def test_history_entry_fields(): diff --git a/backend/tests/test_share.py b/backend/tests/test_share.py index c7edef59..a5e06ba4 100644 --- a/backend/tests/test_share.py +++ b/backend/tests/test_share.py @@ -1,95 +1,96 @@ -from __future__ import annotations - -from datetime import UTC, datetime, timedelta - -from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker - -from app import database -from app.database import Base -from app.main import app -from app.models import SharedSnippet - - -def _configure_test_db(monkeypatch, tmp_path): - db_path = tmp_path / "share-tests.db" - - engine = create_engine( - f"sqlite:///{db_path}", - connect_args={"check_same_thread": False}, - ) - - session_local = sessionmaker( - autocommit=False, - autoflush=False, - bind=engine, - ) - - monkeypatch.setattr(database, "engine", engine) - monkeypatch.setattr(database, "SessionLocal", session_local) - - Base.metadata.drop_all(bind=engine) - Base.metadata.create_all(bind=engine) - - return session_local - - -def test_create_and_fetch_share(monkeypatch, tmp_path): - _configure_test_db(monkeypatch, tmp_path) - - from fastapi.testclient import TestClient - - client = TestClient(app) - - payload = { - "code": "print('hello')", - "result": { - "provider": "rule-based", - "explanation": {"summary": "ok"}, - }, - } - - create_resp = client.post("/share/", json=payload) - - assert create_resp.status_code == 200 - - share_id = create_resp.json()["id"] - - assert share_id - - fetch_resp = client.get(f"/share/{share_id}") - - assert fetch_resp.status_code == 200 - - data = fetch_resp.json() - - assert data["id"] == share_id - assert data["code"] == payload["code"] - assert data["result"] == payload["result"] - assert "created_at" in data - - -def test_expired_share_returns_404(monkeypatch, tmp_path): - session_local = _configure_test_db(monkeypatch, tmp_path) - - from fastapi.testclient import TestClient - - client = TestClient(app) - - db = session_local() - - record = SharedSnippet( - token="expired123", - code="print('old')", - result_json='{"ok": true}', - created_at=datetime.now(UTC) - timedelta(days=8), - ) - - db.add(record) - db.commit() - db.close() - - resp = client.get("/share/expired123") - - assert resp.status_code == 404 - assert "expired" in resp.json()["detail"].lower() \ No newline at end of file +from __future__ import annotations + +from datetime import UTC, datetime, timedelta + +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +from app import database +from app.database import Base +from app.main import app +from app.models import SharedSnippet + + +def _configure_test_db(monkeypatch, tmp_path): + db_path = tmp_path / "share-tests.db" + + engine = create_engine( + f"sqlite:///{db_path}", + connect_args={"check_same_thread": False}, + ) + + session_local = sessionmaker( + autocommit=False, + autoflush=False, + bind=engine, + ) + + monkeypatch.setattr(database, "engine", engine) + monkeypatch.setattr(database, "SessionLocal", session_local) + + Base.metadata.drop_all(bind=engine) + Base.metadata.create_all(bind=engine) + + return session_local + + +def test_create_and_fetch_share(monkeypatch, tmp_path): + _configure_test_db(monkeypatch, tmp_path) + + from fastapi.testclient import TestClient + + client = TestClient(app) + + payload = { + "code": "print('hello')", + "result": { + "provider": "rule-based", + "explanation": {"summary": "ok"}, + }, + } + + create_resp = client.post("/share/", json=payload) + + assert create_resp.status_code == 200 + + share_id = create_resp.json()["id"] + + assert share_id + + fetch_resp = client.get(f"/share/{share_id}") + + assert fetch_resp.status_code == 200 + + data = fetch_resp.json() + + assert data["id"] == share_id + assert data["code"] == payload["code"] + assert data["result"] == payload["result"] + assert "created_at" in data + + +def test_expired_share_returns_404(monkeypatch, tmp_path): + session_local = _configure_test_db(monkeypatch, tmp_path) + + from fastapi.testclient import TestClient + + client = TestClient(app) + + db = session_local() + + record = SharedSnippet( + token="expired123", + code="print('old')", + result_json='{"ok": true}', + created_at=datetime.now(UTC) - timedelta(days=8), + ) + + db.add(record) + db.commit() + db.close() + + resp = client.get("/share/expired123") + + assert resp.status_code == 404 + assert "expired" in resp.json()["detail"].lower() + assert resp.json()["error"] == "shared_result_not_found" \ No newline at end of file diff --git a/backend/tests/test_zip_dos.py b/backend/tests/test_zip_dos.py index ec0fa08e..10635051 100644 --- a/backend/tests/test_zip_dos.py +++ b/backend/tests/test_zip_dos.py @@ -19,6 +19,8 @@ def test_analyze_zip_too_large_via_header(): response = client.post("/analyze/zip/", files=files, headers={"Content-Length": str(15 * 1024 * 1024)}) assert response.status_code == 413 assert "ZIP file too large" in response.json()["detail"] + assert response.json()["error"] == "payload_too_large" + def test_analyze_zip_too_large_via_stream(): # Simulate a stream that exceeds the limit @@ -30,6 +32,8 @@ def test_analyze_zip_too_large_via_stream(): response = client.post("/analyze/zip/", files=files, headers={"Content-Length": "100"}) assert response.status_code == 413 assert "ZIP file exceeds size limit during upload" in response.json()["detail"] + assert response.json()["error"] == "payload_too_large" + def test_analyze_zip_valid(): # Create a real small ZIP From 211382c43f436a21e717a16eeb687fec11ba0e7a Mon Sep 17 00:00:00 2001 From: Krishnavamsi-codes Date: Sun, 7 Jun 2026 14:36:00 +0530 Subject: [PATCH 2/3] fix : resolved ruff line lint issues --- backend/app/exceptions.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/backend/app/exceptions.py b/backend/app/exceptions.py index 88613205..ab5eb9cc 100644 --- a/backend/app/exceptions.py +++ b/backend/app/exceptions.py @@ -31,7 +31,7 @@ def __init__(self, status_code: int, error_code: ErrorCode, detail: str, headers def map_http_exception_to_code(status_code: int, detail: str) -> str: detail_lower = detail.lower() - + # 401 Unauthorized if status_code == 401: if "authentication required" in detail_lower: @@ -41,13 +41,13 @@ def map_http_exception_to_code(status_code: int, detail: str) -> str: if "invalid credentials" in detail_lower: return ErrorCode.INVALID_CREDENTIALS return "unauthorized" - + # 403 Forbidden if status_code == 403: if "invalid unsubscribe token" in detail_lower: return ErrorCode.INVALID_TOKEN return ErrorCode.FORBIDDEN - + # 404 Not Found if status_code == 404: if "history" in detail_lower: @@ -59,7 +59,7 @@ def map_http_exception_to_code(status_code: int, detail: str) -> str: if "subscription" in detail_lower: return ErrorCode.SUBSCRIPTION_NOT_FOUND return "not_found" - + # 409 Conflict if status_code == 409: if "already subscribed" in detail_lower: @@ -67,29 +67,29 @@ def map_http_exception_to_code(status_code: int, detail: str) -> str: if "email already exists" in detail_lower: return ErrorCode.EMAIL_ALREADY_EXISTS return "conflict" - + # 413 Content Too Large / Payload Too Large if status_code == 413: return ErrorCode.PAYLOAD_TOO_LARGE - + # 415 Unsupported Media Type if status_code == 415: return ErrorCode.UNSUPPORTED_FILE_TYPE - + # 429 Too Many Requests if status_code == 429: return ErrorCode.RATE_LIMITED - + # 400 Bad Request if status_code == 400: if "only .zip" in detail_lower: return ErrorCode.UNSUPPORTED_FILE_TYPE return ErrorCode.BAD_REQUEST - + # 500 Internal Server Error if status_code == 500: return ErrorCode.INTERNAL_SERVER_ERROR - + return ErrorCode.BAD_REQUEST if status_code < 500 else ErrorCode.INTERNAL_SERVER_ERROR async def api_exception_handler(request: Request, exc: APIException): @@ -117,10 +117,10 @@ async def validation_exception_handler(request: Request, exc: RequestValidationE errors = exc.errors() details = [] for err in errors: - loc = " -> ".join(str(l) for l in err["loc"]) + loc = " -> ".join(str(location_part) for location_part in err["loc"]) details.append(f"{loc}: {err['msg']}") detail_str = "; ".join(details) - + return JSONResponse( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, content={ From 11ec55ced62c6f96480c1700c3914c16cc442d28 Mon Sep 17 00:00:00 2001 From: Krishnavamsi-codes Date: Wed, 10 Jun 2026 22:29:55 +0530 Subject: [PATCH 3/3] style: format files with black --- backend/app/exceptions.py | 18 +- backend/app/main.py | 23 +- backend/app/schemas.py | 5 +- backend/tests/test_auth_endpoints.py | 233 +++++++++---------- backend/tests/test_endpoints.py | 17 +- backend/tests/test_file_upload.py | 336 ++++++++++++--------------- backend/tests/test_history.py | 37 ++- backend/tests/test_zip_dos.py | 17 +- 8 files changed, 353 insertions(+), 333 deletions(-) diff --git a/backend/app/exceptions.py b/backend/app/exceptions.py index ab5eb9cc..d716d122 100644 --- a/backend/app/exceptions.py +++ b/backend/app/exceptions.py @@ -6,6 +6,7 @@ logger = logging.getLogger("ai_assistant.exceptions") + class ErrorCode(str, Enum): INVALID_TOKEN = "invalid_token" AUTHENTICATION_REQUIRED = "authentication_required" @@ -24,11 +25,19 @@ class ErrorCode(str, Enum): ALREADY_SUBSCRIBED = "already_subscribed" SUBSCRIPTION_NOT_FOUND = "subscription_not_found" + class APIException(HTTPException): - def __init__(self, status_code: int, error_code: ErrorCode, detail: str, headers: dict | None = None): + def __init__( + self, + status_code: int, + error_code: ErrorCode, + detail: str, + headers: dict | None = None, + ): super().__init__(status_code=status_code, detail=detail, headers=headers) self.error_code = error_code + def map_http_exception_to_code(status_code: int, detail: str) -> str: detail_lower = detail.lower() @@ -90,7 +99,10 @@ def map_http_exception_to_code(status_code: int, detail: str) -> str: if status_code == 500: return ErrorCode.INTERNAL_SERVER_ERROR - return ErrorCode.BAD_REQUEST if status_code < 500 else ErrorCode.INTERNAL_SERVER_ERROR + return ( + ErrorCode.BAD_REQUEST if status_code < 500 else ErrorCode.INTERNAL_SERVER_ERROR + ) + async def api_exception_handler(request: Request, exc: APIException): return JSONResponse( @@ -102,6 +114,7 @@ async def api_exception_handler(request: Request, exc: APIException): headers=exc.headers, ) + async def http_exception_handler(request: Request, exc: HTTPException): error_code = map_http_exception_to_code(exc.status_code, exc.detail) return JSONResponse( @@ -113,6 +126,7 @@ async def http_exception_handler(request: Request, exc: HTTPException): headers=exc.headers, ) + async def validation_exception_handler(request: Request, exc: RequestValidationError): errors = exc.errors() details = [] diff --git a/backend/app/main.py b/backend/app/main.py index 115d1eec..ffcda8ec 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -79,7 +79,9 @@ async def lifespan(app: FastAPI): await database.init_db() print("🚀 QyverixAI backend starting…") # Static info gauge so dashboards can pin version / provider labels. - initialise_app_info(version="3.0.0", ai_provider=os.getenv("AI_PROVIDER", "rule-based")) + initialise_app_info( + version="3.0.0", ai_provider=os.getenv("AI_PROVIDER", "rule-based") + ) start_scheduler() yield stop_scheduler() @@ -126,7 +128,12 @@ async def add_process_time_header(request: Request, call_next): remaining = RATE_LIMIT # Apply rate limiting to analysis endpoints only - if request.url.path in ("/explanation/", "/debugging/", "/suggestions/", "/analyze/"): + if request.url.path in ( + "/explanation/", + "/debugging/", + "/suggestions/", + "/analyze/", + ): remaining = check_rate_limit(ip) if remaining < 0: elapsed = (time.perf_counter() - start) * 1000 @@ -138,7 +145,7 @@ async def add_process_time_header(request: Request, call_next): status_code=429, content={ "error": "rate_limited", - "detail": f"Rate limit exceeded. Max {RATE_LIMIT} requests/minute." + "detail": f"Rate limit exceeded. Max {RATE_LIMIT} requests/minute.", }, headers=headers, ) @@ -163,16 +170,16 @@ async def add_cache_header(request: Request, call_next): # ── Routers ─────────────────────────────────────────────────────────────────── app.include_router(explanation.router, prefix="/explanation", tags=["Explanation"]) -app.include_router(debugging.router, prefix="/debugging", tags=["Debugging"]) +app.include_router(debugging.router, prefix="/debugging", tags=["Debugging"]) app.include_router(suggestions.router, prefix="/suggestions", tags=["Suggestions"]) -app.include_router(analyze.router, prefix="/analyze", tags=["Full Analysis"]) -app.include_router(subscribe.router, prefix="/subscribe", tags=["Subscription"]) -app.include_router(history.router, prefix="/history", tags=["History"]) +app.include_router(analyze.router, prefix="/analyze", tags=["Full Analysis"]) +app.include_router(subscribe.router, prefix="/subscribe", tags=["Subscription"]) +app.include_router(history.router, prefix="/history", tags=["History"]) app.include_router(auth.router) app.include_router(chat.router) app.include_router(share.router) app.include_router(user_data.router) -app.include_router(upload_file.router, prefix="/upload", tags=['Upload File'] ) +app.include_router(upload_file.router, prefix="/upload", tags=["Upload File"]) # Operational endpoints: /healthz/live, /healthz/ready, /metrics diff --git a/backend/app/schemas.py b/backend/app/schemas.py index 7e9b72ab..799dc9cd 100644 --- a/backend/app/schemas.py +++ b/backend/app/schemas.py @@ -13,6 +13,7 @@ validate_stored_result_json, ) + class CodeRequest(BaseModel): code: str language: str | None = None @@ -240,6 +241,7 @@ class ReadinessResponse(BaseModel): status: str checks: dict[str, dict[str, Any]] + class ShareCreateRequest(BaseModel): action: str = Field("share", min_length=3, max_length=50) code: str = Field(..., min_length=1, max_length=settings.max_code_chars) @@ -365,12 +367,14 @@ class ExplanationResponse(BaseModel): cyclomatic_complexity: int complexity_risk: str + class SuggestionsResponse(BaseModel): suggestions: list[Suggestion] overall_score: int grade: str next_step: str + class AnalyzeResponse(BaseModel): provider: str model: str @@ -383,4 +387,3 @@ class AnalyzeResponse(BaseModel): class ErrorResponse(BaseModel): error: str detail: str - diff --git a/backend/tests/test_auth_endpoints.py b/backend/tests/test_auth_endpoints.py index 278c766d..ca69482f 100644 --- a/backend/tests/test_auth_endpoints.py +++ b/backend/tests/test_auth_endpoints.py @@ -1,117 +1,116 @@ -"""Integration tests for auth routes""" - -import os -import sys - -import pytest -from fastapi.testclient import TestClient -from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker -from sqlalchemy.pool import StaticPool - -sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) - -from app.database import Base, get_db -from app.main import app as fastapi_app - - -TEST_ENGINE = create_engine( - "sqlite:///:memory:", - connect_args={"check_same_thread": False}, - poolclass=StaticPool, -) -TEST_SESSION_LOCAL = sessionmaker(bind=TEST_ENGINE) - - -def _override_db(): - db = TEST_SESSION_LOCAL() - try: - yield db - finally: - db.close() - - -@pytest.fixture -def client(): - previous_override = fastapi_app.dependency_overrides.get(get_db) - fastapi_app.dependency_overrides[get_db] = _override_db - with TestClient(fastapi_app) as test_client: - yield test_client - if previous_override is None: - fastapi_app.dependency_overrides.pop(get_db, None) - else: - fastapi_app.dependency_overrides[get_db] = previous_override - - -@pytest.fixture(autouse=True) -def _recreate_tables(): - Base.metadata.create_all(bind=TEST_ENGINE) - yield - Base.metadata.drop_all(bind=TEST_ENGINE) - - -def test_auth_routes_are_exposed_in_openapi(client): - response = client.get("/openapi.json") - assert response.status_code == 200 - - paths = response.json()["paths"] - assert "/auth/signup" in paths - assert "/auth/login" in paths - assert "/auth/me" in paths - - -def test_signup_login_and_me_happy_path(client): - signup_response = client.post( - "/auth/signup", - json={"email": "new.user@example.com", "password": "StrongPass123!"}, - ) - assert signup_response.status_code == 200 - - signup_data = signup_response.json() - assert signup_data["email"] == "new.user@example.com" - assert signup_data["user_id"] > 0 - assert signup_data["access_token"] - - login_response = client.post( - "/auth/login", - json={"email": "new.user@example.com", "password": "StrongPass123!"}, - ) - assert login_response.status_code == 200 - - token = login_response.json()["access_token"] - me_response = client.get( - "/auth/me", - headers={"Authorization": f"Bearer {token}"}, - ) - assert me_response.status_code == 200 - assert me_response.json() == { - "user_id": signup_data["user_id"], - "email": "new.user@example.com", - } - - -def test_signup_duplicate_email_returns_409(client): - payload = {"email": "dup@example.com", "password": "StrongPass123!"} - first_response = client.post("/auth/signup", json=payload) - assert first_response.status_code == 200 - - duplicate_response = client.post("/auth/signup", json=payload) - assert duplicate_response.status_code == 409 - assert "already exists" in duplicate_response.json()["detail"].lower() - assert duplicate_response.json()["error"] == "email_already_exists" - - - -def test_me_rejects_missing_and_invalid_token(client): - missing_token_response = client.get("/auth/me") - assert missing_token_response.status_code == 401 - assert "authentication required" in missing_token_response.json()["detail"].lower() - assert missing_token_response.json()["error"] == "authentication_required" - - invalid_token_response = client.get( - "/auth/me", - headers={"Authorization": "Bearer not-a-real-token"}, - ) - assert invalid_token_response.status_code == 401 - assert "invalid token" in invalid_token_response.json()["detail"].lower() - assert invalid_token_response.json()["error"] == "invalid_token" \ No newline at end of file +"""Integration tests for auth routes""" + +import os +import sys + +import pytest +from fastapi.testclient import TestClient +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from sqlalchemy.pool import StaticPool + +sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) + +from app.database import Base, get_db +from app.main import app as fastapi_app + + +TEST_ENGINE = create_engine( + "sqlite:///:memory:", + connect_args={"check_same_thread": False}, + poolclass=StaticPool, +) +TEST_SESSION_LOCAL = sessionmaker(bind=TEST_ENGINE) + + +def _override_db(): + db = TEST_SESSION_LOCAL() + try: + yield db + finally: + db.close() + + +@pytest.fixture +def client(): + previous_override = fastapi_app.dependency_overrides.get(get_db) + fastapi_app.dependency_overrides[get_db] = _override_db + with TestClient(fastapi_app) as test_client: + yield test_client + if previous_override is None: + fastapi_app.dependency_overrides.pop(get_db, None) + else: + fastapi_app.dependency_overrides[get_db] = previous_override + + +@pytest.fixture(autouse=True) +def _recreate_tables(): + Base.metadata.create_all(bind=TEST_ENGINE) + yield + Base.metadata.drop_all(bind=TEST_ENGINE) + + +def test_auth_routes_are_exposed_in_openapi(client): + response = client.get("/openapi.json") + assert response.status_code == 200 + + paths = response.json()["paths"] + assert "/auth/signup" in paths + assert "/auth/login" in paths + assert "/auth/me" in paths + + +def test_signup_login_and_me_happy_path(client): + signup_response = client.post( + "/auth/signup", + json={"email": "new.user@example.com", "password": "StrongPass123!"}, + ) + assert signup_response.status_code == 200 + + signup_data = signup_response.json() + assert signup_data["email"] == "new.user@example.com" + assert signup_data["user_id"] > 0 + assert signup_data["access_token"] + + login_response = client.post( + "/auth/login", + json={"email": "new.user@example.com", "password": "StrongPass123!"}, + ) + assert login_response.status_code == 200 + + token = login_response.json()["access_token"] + me_response = client.get( + "/auth/me", + headers={"Authorization": f"Bearer {token}"}, + ) + assert me_response.status_code == 200 + assert me_response.json() == { + "user_id": signup_data["user_id"], + "email": "new.user@example.com", + } + + +def test_signup_duplicate_email_returns_409(client): + payload = {"email": "dup@example.com", "password": "StrongPass123!"} + first_response = client.post("/auth/signup", json=payload) + assert first_response.status_code == 200 + + duplicate_response = client.post("/auth/signup", json=payload) + assert duplicate_response.status_code == 409 + assert "already exists" in duplicate_response.json()["detail"].lower() + assert duplicate_response.json()["error"] == "email_already_exists" + + +def test_me_rejects_missing_and_invalid_token(client): + missing_token_response = client.get("/auth/me") + assert missing_token_response.status_code == 401 + assert "authentication required" in missing_token_response.json()["detail"].lower() + assert missing_token_response.json()["error"] == "authentication_required" + + invalid_token_response = client.get( + "/auth/me", + headers={"Authorization": "Bearer not-a-real-token"}, + ) + assert invalid_token_response.status_code == 401 + assert "invalid token" in invalid_token_response.json()["detail"].lower() + assert invalid_token_response.json()["error"] == "invalid_token" diff --git a/backend/tests/test_endpoints.py b/backend/tests/test_endpoints.py index c903cf11..61b0c8bf 100644 --- a/backend/tests/test_endpoints.py +++ b/backend/tests/test_endpoints.py @@ -18,8 +18,11 @@ FIXTURES_DIR = Path(__file__).parent / "fixtures" + def load_fixture(filename: str) -> str: return (FIXTURES_DIR / filename).read_text(encoding="utf-8") + + @pytest.fixture(autouse=True) def reset_rate_limit_state(): app_main._request_counts.clear() @@ -487,7 +490,6 @@ def test_debug_kotlin(): assert d is not None - def test_debug_cpp_syntax_errors(): code = "void main() {\n cout << 'Hello World'\n}" r = client.post("/debugging/", json={"code": code, "language": "cpp"}) @@ -568,15 +570,20 @@ def test_add(): d = r.json() assert d["overall_score"] >= 60 # clean code should score reasonably + def test_suggestions_observability_print_only_python(): # Pasting code with print() in Java should NOT trigger the Observability suggestion - r_java = client.post("/suggestions/", json={"code": 'print("hello");', "language": "java"}) + r_java = client.post( + "/suggestions/", json={"code": 'print("hello");', "language": "java"} + ) assert r_java.status_code == 200 s_java = [s["category"] for s in r_java.json()["suggestions"]] assert "Observability" not in s_java # Pasting code with print() in Python SHOULD trigger the Observability suggestion - r_py = client.post("/suggestions/", json={"code": 'print("hello")', "language": "python"}) + r_py = client.post( + "/suggestions/", json={"code": 'print("hello")', "language": "python"} + ) assert r_py.status_code == 200 s_py = [s["category"] for s in r_py.json()["suggestions"]] assert "Observability" in s_py @@ -731,7 +738,9 @@ def test_get_stream_done_event_present(): def test_get_stream_with_language_hint(): - r = client.get("/analyze/stream", params={"code": JS_CODE, "language": "javascript"}) + r = client.get( + "/analyze/stream", params={"code": JS_CODE, "language": "javascript"} + ) assert r.status_code == 200 events = _parse_sse_events(r.text) exp = next(e["data"] for e in events if e["type"] == "explanation") diff --git a/backend/tests/test_file_upload.py b/backend/tests/test_file_upload.py index 9019b958..b3b33603 100644 --- a/backend/tests/test_file_upload.py +++ b/backend/tests/test_file_upload.py @@ -1,185 +1,151 @@ -from io import BytesIO - -import pytest -from fastapi.testclient import TestClient - -from app.main import app - -client = TestClient(app) - - -# ----------------------------- -# Allowed Files -# ----------------------------- - -ALLOWED_FILES = [ - ("test.py", b"print('hello')", "text/x-python"), - ("test.js", b"console.log('hello')", "application/javascript"), - ("test.ts", b"const x: number = 10;", "application/typescript"), - ("test.java", b"class Main {}", "text/x-java-source"), - ("test.cpp", b"#include ", "text/x-c++src"), - ("test.txt", b"hello world", "text/plain"), -] - - -# ----------------------------- -# Blocked Files -# ----------------------------- - -BLOCKED_FILES = [ - ("virus.exe", b"malware"), - ("script.bat", b"echo hacked"), - ("shell.sh", b"rm -rf /"), - ("powershell.ps1", b"Write-Host hacked"), - ("payload.dll", b"binarydata"), - ("installer.msi", b"installer"), -] - - -# ========================================================= -# TEST VALID FILES -# ========================================================= - -@pytest.mark.parametrize( - "filename,content,mime_type", - ALLOWED_FILES -) -def test_upload_allowed_files( - filename, - content, - mime_type -): - - response = client.post( - "/upload/validate", - files={ - "file": ( - filename, - BytesIO(content), - mime_type - ) - } - ) - - assert response.status_code == 200 - - data = response.json() - - assert data["success"] is True - assert data["filename"] == filename - - -# ========================================================= -# TEST BLOCKED EXTENSIONS -# ========================================================= - -@pytest.mark.parametrize( - "filename,content", - BLOCKED_FILES -) -def test_upload_blocked_files( - filename, - content -): - - response = client.post( - "/upload/validate", - files={ - "file": ( - filename, - BytesIO(content), - "application/octet-stream" - ) - } - ) - - assert response.status_code == 415 - data = response.json() - assert "Executable files are not allowed" in data["detail"] - assert data["error"] == "unsupported_file_type" - - - -# ========================================================= -# TEST INVALID MIME TYPE -# ========================================================= - -def test_invalid_mime_type(): - - response = client.post( - "/upload/validate", - files={ - "file": ( - "test.py", - BytesIO(b"%PDF-1.4 fake pdf content"), - "application/pdf" - ) - } - ) - print(response.json()) - - assert response.status_code == 415 - data = response.json() - assert "Invalid MIME type" in data["detail"] - assert data["error"] == "unsupported_file_type" - - - -# ========================================================= -# TEST DOUBLE EXTENSION -# ========================================================= - -def test_double_extension(): - - response = client.post( - "/upload/validate", - files={ - "file": ( - "virus.exe.py", - BytesIO(b"print('infected')"), - "text/x-python" - ) - } - ) - - assert response.status_code == 415 - data = response.json() - assert "Executable files are not allowed" in data["detail"] - assert data["error"] == "unsupported_file_type" - - - -# ========================================================= -# TEST NO FILE -# ========================================================= - -def test_no_file_uploaded(): - - response = client.post("/upload/validate") - - assert response.status_code in [400, 422] - assert response.json()["error"] in ("bad_request", "validation_error") - - - -# ========================================================= -# TEST LARGE FILE -# ========================================================= - -def test_large_file(): - - large_content = b"a" * (6 * 1024 * 1024) - - response = client.post( - "/upload/validate", - files={ - "file": ( - "large.txt", - BytesIO(large_content), - "text/plain" - ) - } - ) - - assert response.status_code == 413 - assert response.json()["error"] == "payload_too_large" \ No newline at end of file +from io import BytesIO + +import pytest +from fastapi.testclient import TestClient + +from app.main import app + +client = TestClient(app) + + +# ----------------------------- +# Allowed Files +# ----------------------------- + +ALLOWED_FILES = [ + ("test.py", b"print('hello')", "text/x-python"), + ("test.js", b"console.log('hello')", "application/javascript"), + ("test.ts", b"const x: number = 10;", "application/typescript"), + ("test.java", b"class Main {}", "text/x-java-source"), + ("test.cpp", b"#include ", "text/x-c++src"), + ("test.txt", b"hello world", "text/plain"), +] + + +# ----------------------------- +# Blocked Files +# ----------------------------- + +BLOCKED_FILES = [ + ("virus.exe", b"malware"), + ("script.bat", b"echo hacked"), + ("shell.sh", b"rm -rf /"), + ("powershell.ps1", b"Write-Host hacked"), + ("payload.dll", b"binarydata"), + ("installer.msi", b"installer"), +] + + +# ========================================================= +# TEST VALID FILES +# ========================================================= + + +@pytest.mark.parametrize("filename,content,mime_type", ALLOWED_FILES) +def test_upload_allowed_files(filename, content, mime_type): + + response = client.post( + "/upload/validate", files={"file": (filename, BytesIO(content), mime_type)} + ) + + assert response.status_code == 200 + + data = response.json() + + assert data["success"] is True + assert data["filename"] == filename + + +# ========================================================= +# TEST BLOCKED EXTENSIONS +# ========================================================= + + +@pytest.mark.parametrize("filename,content", BLOCKED_FILES) +def test_upload_blocked_files(filename, content): + + response = client.post( + "/upload/validate", + files={"file": (filename, BytesIO(content), "application/octet-stream")}, + ) + + assert response.status_code == 415 + data = response.json() + assert "Executable files are not allowed" in data["detail"] + assert data["error"] == "unsupported_file_type" + + +# ========================================================= +# TEST INVALID MIME TYPE +# ========================================================= + + +def test_invalid_mime_type(): + + response = client.post( + "/upload/validate", + files={ + "file": ( + "test.py", + BytesIO(b"%PDF-1.4 fake pdf content"), + "application/pdf", + ) + }, + ) + print(response.json()) + + assert response.status_code == 415 + data = response.json() + assert "Invalid MIME type" in data["detail"] + assert data["error"] == "unsupported_file_type" + + +# ========================================================= +# TEST DOUBLE EXTENSION +# ========================================================= + + +def test_double_extension(): + + response = client.post( + "/upload/validate", + files={ + "file": ("virus.exe.py", BytesIO(b"print('infected')"), "text/x-python") + }, + ) + + assert response.status_code == 415 + data = response.json() + assert "Executable files are not allowed" in data["detail"] + assert data["error"] == "unsupported_file_type" + + +# ========================================================= +# TEST NO FILE +# ========================================================= + + +def test_no_file_uploaded(): + + response = client.post("/upload/validate") + + assert response.status_code in [400, 422] + assert response.json()["error"] in ("bad_request", "validation_error") + + +# ========================================================= +# TEST LARGE FILE +# ========================================================= + + +def test_large_file(): + + large_content = b"a" * (6 * 1024 * 1024) + + response = client.post( + "/upload/validate", + files={"file": ("large.txt", BytesIO(large_content), "text/plain")}, + ) + + assert response.status_code == 413 + assert response.json()["error"] == "payload_too_large" diff --git a/backend/tests/test_history.py b/backend/tests/test_history.py index 21851f6a..4aab91bd 100644 --- a/backend/tests/test_history.py +++ b/backend/tests/test_history.py @@ -1,6 +1,7 @@ """ Tests for the /history/ endpoints. """ + import sys import os import tempfile @@ -22,12 +23,15 @@ def test_save_history(): - r = client.post("/history/", json={ - "code": "print('hello')", - "language": "Python", - "score": 85, - "issue_count": 1, - }) + r = client.post( + "/history/", + json={ + "code": "print('hello')", + "language": "Python", + "score": 85, + "issue_count": 1, + }, + ) assert r.status_code == 201 d = r.json() assert d["status"] == "saved" @@ -35,7 +39,10 @@ def test_save_history(): def test_get_history(): - client.post("/history/", json={"code": "x = 1", "language": "Python", "score": 90, "issue_count": 0}) + client.post( + "/history/", + json={"code": "x = 1", "language": "Python", "score": 90, "issue_count": 0}, + ) r = client.get("/history/") assert r.status_code == 200 assert isinstance(r.json(), list) @@ -49,7 +56,10 @@ def test_get_history_pagination(): def test_search_history(): - client.post("/history/", json={"code": "def my_unique_function(): pass", "language": "Python"}) + client.post( + "/history/", + json={"code": "def my_unique_function(): pass", "language": "Python"}, + ) r = client.get("/history/search?q=my_unique_function") assert r.status_code == 200 results = r.json() @@ -71,9 +81,16 @@ def test_delete_nonexistent(): assert "History entry not found" in r.json()["detail"] - def test_history_entry_fields(): - client.post("/history/", json={"code": "let x = 1;", "language": "JavaScript", "score": 70, "issue_count": 2}) + client.post( + "/history/", + json={ + "code": "let x = 1;", + "language": "JavaScript", + "score": 70, + "issue_count": 2, + }, + ) r = client.get("/history/") assert r.status_code == 200 entry = r.json()[0] diff --git a/backend/tests/test_zip_dos.py b/backend/tests/test_zip_dos.py index 10635051..dda37da3 100644 --- a/backend/tests/test_zip_dos.py +++ b/backend/tests/test_zip_dos.py @@ -11,12 +11,15 @@ client = TestClient(app) + def test_analyze_zip_too_large_via_header(): # Simulate a large file via Content-Length header data = b"fake zip content" files = {"file": ("test.zip", data, "application/zip")} - - response = client.post("/analyze/zip/", files=files, headers={"Content-Length": str(15 * 1024 * 1024)}) + + response = client.post( + "/analyze/zip/", files=files, headers={"Content-Length": str(15 * 1024 * 1024)} + ) assert response.status_code == 413 assert "ZIP file too large" in response.json()["detail"] assert response.json()["error"] == "payload_too_large" @@ -27,9 +30,11 @@ def test_analyze_zip_too_large_via_stream(): # We create a 11MB file to trigger the streaming limit large_data = b"0" * (11 * 1024 * 1024) files = {"file": ("test.zip", large_data, "application/zip")} - + # Provide a small Content-Length header to bypass the early check and enter the streaming check - response = client.post("/analyze/zip/", files=files, headers={"Content-Length": "100"}) + response = client.post( + "/analyze/zip/", files=files, headers={"Content-Length": "100"} + ) assert response.status_code == 413 assert "ZIP file exceeds size limit during upload" in response.json()["detail"] assert response.json()["error"] == "payload_too_large" @@ -40,11 +45,11 @@ def test_analyze_zip_valid(): zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zip_file: zip_file.writestr("hello.py", "print('hello')") - + zip_buffer.seek(0) files = {"file": ("test.zip", zip_buffer, "application/zip")} response = client.post("/analyze/zip/", files=files) - + assert response.status_code == 200 assert response.json()["file_count"] == 1 assert response.json()["files"][0]["filename"] == "hello.py"