Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 38 additions & 4 deletions finbot/apps/ctf/routes/profile.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"""Profile API Routes - Social features for authenticated users"""

import hashlib
import ipaddress
import logging
import socket
from urllib.parse import urlparse

from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field, HttpUrl
Expand All @@ -21,6 +24,30 @@
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/profile", tags=["profile"])

BLOCKED_NETWORKS = [
ipaddress.ip_network("127.0.0.0/8"), # loopback
ipaddress.ip_network("169.254.0.0/16"), # link-local (AWS metadata)
ipaddress.ip_network("10.0.0.0/8"), # RFC-1918 private
ipaddress.ip_network("172.16.0.0/12"), # RFC-1918 private
ipaddress.ip_network("192.168.0.0/16"), # RFC-1918 private
ipaddress.ip_network("::1/128"), # IPv6 loopback
ipaddress.ip_network("fc00::/7"), # IPv6 ULA
ipaddress.ip_network("fe80::/10"), # IPv6 link-local
]

def is_ssrf_safe(url: str) -> bool:
parsed = urlparse(url)
if parsed.scheme != "https":
return False
try:
host = parsed.hostname
if not host:
return False
ip = ipaddress.ip_address(socket.gethostbyname(host))
return not any(ip in net for net in BLOCKED_NETWORKS)
except Exception:
return False # fail closed


# =============================================================================
# Level System
Expand Down Expand Up @@ -339,10 +366,17 @@ async def update_profile(
if error:
raise HTTPException(status_code=400, detail=error)

# Validate avatar_url if switching to url type
if request.avatar_type == "url" and request.avatar_url:
if not request.avatar_url.startswith("https://"):
raise HTTPException(status_code=400, detail="Avatar URL must use HTTPS")
# Fetch current profile to validate the state transition securely
profile = profile_repo.get_by_user_id(session_context.user_id)
if not profile:
raise HTTPException(status_code=404, detail="Profile not found")

# Validate avatar_url if switching to/retaining url type
effective_avatar_type = request.avatar_type if request.avatar_type is not None else profile.avatar_type
effective_avatar_url = request.avatar_url if request.avatar_url is not None else profile.avatar_url
if effective_avatar_type == "url" and effective_avatar_url:
if not is_ssrf_safe(effective_avatar_url):
raise HTTPException(status_code=400, detail="Invalid Avatar URL: Only public HTTPS URLs are allowed")

# Update other fields
profile = profile_repo.update_profile(
Expand Down
2 changes: 1 addition & 1 deletion finbot/apps/ctf/routes/share.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ async def _fetch_avatar_b64(url: str) -> str:
Returns empty string on any failure (timeout, bad content type, too large).
"""
try:
async with httpx.AsyncClient(timeout=5.0, follow_redirects=True) as client:
async with httpx.AsyncClient(timeout=5.0, follow_redirects=False) as client:
resp = await client.get(url)
if resp.status_code != 200:
return ""
Expand Down
212 changes: 212 additions & 0 deletions tests/unit/apps/ctf/test_profile_security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
"""Unit tests for CTF profile security and validation checks"""

import pytest
from fastapi import HTTPException
from unittest.mock import MagicMock

from finbot.apps.ctf.routes.profile import update_profile, ProfileUpdateRequest
from finbot.core.auth.session import SessionContext


@pytest.fixture
def mock_session_context():
"""Mock session context for an authenticated user"""
context = MagicMock(spec=SessionContext)
context.user_id = "test_user_id"
return context


@pytest.fixture
def mock_db():
"""Mock database session"""
return MagicMock()


@pytest.fixture
def mock_profile_repo():
"""Mock profile repository"""
return MagicMock()


@pytest.fixture
def patch_dependencies(monkeypatch, mock_profile_repo):
"""Patch the UserProfileRepository, validate_username, and _update_social_links"""
monkeypatch.setattr(
"finbot.apps.ctf.routes.profile.UserProfileRepository",
lambda db, context: mock_profile_repo,
)
monkeypatch.setattr(
"finbot.apps.ctf.routes.profile.validate_username",
lambda username: (True, None),
)
monkeypatch.setattr(
"finbot.apps.ctf.routes.profile._update_social_links",
lambda profile, request, db: None,
)
monkeypatch.setattr(
"finbot.apps.ctf.routes.profile._build_profile_response",
lambda profile, user: MagicMock(),
)


@pytest.mark.asyncio
async def test_update_profile_avatar_url_valid_https(
mock_session_context, mock_db, mock_profile_repo, patch_dependencies
):
"""Test that a valid HTTPS URL is accepted during profile update"""
# Arrange
existing_profile = MagicMock()
existing_profile.avatar_type = "url"
existing_profile.avatar_url = "https://example.com/old.png"
existing_profile.user_id = "test_user_id"
mock_profile_repo.get_by_user_id.return_value = existing_profile

updated_profile = MagicMock()
mock_profile_repo.update_profile.return_value = updated_profile

request = ProfileUpdateRequest(
avatar_type="url",
avatar_url="https://example.com/new.png"
)

# Act
response = await update_profile(
request=request,
session_context=mock_session_context,
db=mock_db
)

# Assert
assert response is not None
mock_profile_repo.update_profile.assert_called_once()


@pytest.mark.asyncio
async def test_update_profile_avatar_url_insecure_blocked(
mock_session_context, mock_db, mock_profile_repo, patch_dependencies
):
"""Test that an insecure HTTP URL is blocked when avatar_type is explicitly 'url'"""
# Arrange
existing_profile = MagicMock()
existing_profile.avatar_type = "url"
existing_profile.avatar_url = "https://example.com/old.png"
existing_profile.user_id = "test_user_id"
mock_profile_repo.get_by_user_id.return_value = existing_profile

request = ProfileUpdateRequest(
avatar_type="url",
avatar_url="http://127.0.0.1/malicious.png"
)

# Act & Assert
with pytest.raises(HTTPException) as exc_info:
await update_profile(
request=request,
session_context=mock_session_context,
db=mock_db
)

assert exc_info.value.status_code == 400
assert "Invalid Avatar URL: Only public HTTPS URLs are allowed" in exc_info.value.detail
mock_profile_repo.update_profile.assert_not_called()


@pytest.mark.asyncio
async def test_update_profile_avatar_url_bypass_attempt_blocked(
mock_session_context, mock_db, mock_profile_repo, patch_dependencies
):
"""Test that omitting avatar_type still blocks insecure URLs if the database type is 'url'"""
# Arrange
existing_profile = MagicMock()
existing_profile.avatar_type = "url"
existing_profile.avatar_url = "https://example.com/old.png"
existing_profile.user_id = "test_user_id"
mock_profile_repo.get_by_user_id.return_value = existing_profile

# request omits 'avatar_type', but tries to inject insecure 'avatar_url'
request = ProfileUpdateRequest(
avatar_url="http://169.254.169.254/latest/meta-data"
)

# Act & Assert
with pytest.raises(HTTPException) as exc_info:
await update_profile(
request=request,
session_context=mock_session_context,
db=mock_db
)

assert exc_info.value.status_code == 400
assert "Invalid Avatar URL: Only public HTTPS URLs are allowed" in exc_info.value.detail
mock_profile_repo.update_profile.assert_not_called()


@pytest.mark.asyncio
async def test_update_profile_avatar_url_emoji_retains_insecure_url_ignored(
mock_session_context, mock_db, mock_profile_repo, patch_dependencies
):
"""Test that changing avatar_type to emoji clears/ignores insecure URL checks"""
# Arrange
existing_profile = MagicMock()
existing_profile.avatar_type = "url"
existing_profile.avatar_url = "https://example.com/old.png"
existing_profile.user_id = "test_user_id"
mock_profile_repo.get_by_user_id.return_value = existing_profile

updated_profile = MagicMock()
mock_profile_repo.update_profile.return_value = updated_profile

# User changes type to emoji, URL should no longer trigger validation error even if old url was insecure
request = ProfileUpdateRequest(
avatar_type="emoji",
avatar_emoji="🦊"
)

# Act
response = await update_profile(
request=request,
session_context=mock_session_context,
db=mock_db
)

# Assert
assert response is not None
mock_profile_repo.update_profile.assert_called_once_with(
user_id="test_user_id",
bio=None,
avatar_emoji="🦊",
avatar_type="emoji",
avatar_url=None,
is_public=None,
show_activity=None
)


@pytest.mark.asyncio
async def test_update_profile_avatar_url_ssrf_blocked(
mock_session_context, mock_db, mock_profile_repo, patch_dependencies
):
"""Test that a private IP is blocked even if it uses HTTPS"""
# Arrange
existing_profile = MagicMock()
existing_profile.avatar_type = "url"
existing_profile.avatar_url = "https://example.com/old.png"
existing_profile.user_id = "test_user_id"
mock_profile_repo.get_by_user_id.return_value = existing_profile

request = ProfileUpdateRequest(
avatar_type="url",
avatar_url="https://127.0.0.1/malicious.png"
)

# Act & Assert
with pytest.raises(HTTPException) as exc_info:
await update_profile(
request=request,
session_context=mock_session_context,
db=mock_db
)

assert exc_info.value.status_code == 400
assert "Invalid Avatar URL: Only public HTTPS URLs are allowed" in exc_info.value.detail
mock_profile_repo.update_profile.assert_not_called()