Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
475 changes: 475 additions & 0 deletions docs/openapi.yaml

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions env.d/development/common.dist
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,9 @@ ROOM_TELEPHONY_ENABLED=True

FRONTEND_USE_FRENCH_GOV_FOOTER=False
FRONTEND_USE_PROCONNECT_BUTTON=False

# External Applications
EXTERNAL_API_ENABLED=True
APPLICATION_JWT_AUDIENCE=http://localhost:8071/external-api/v1.0/
APPLICATION_JWT_SECRET_KEY=devKey
APPLICATION_BASE_URL=http://localhost:3000
64 changes: 64 additions & 0 deletions src/backend/core/admin.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Admin classes and registrations for core app."""

from django import forms
from django.contrib import admin
from django.contrib.auth import admin as auth_admin
from django.utils.translation import gettext_lazy as _
Expand Down Expand Up @@ -150,3 +151,66 @@ def get_owner(self, obj):
return _("Multiple owners")

return str(owners[0].user)


class ApplicationDomainInline(admin.TabularInline):
"""Inline admin for managing allowed domains per application."""

model = models.ApplicationDomain
extra = 0


class ApplicationAdminForm(forms.ModelForm):
"""Custom form for Application admin with multi-select scopes."""

scopes = forms.MultipleChoiceField(
choices=models.ApplicationScope.choices,
widget=forms.CheckboxSelectMultiple,
required=False,
)

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.instance.pk and self.instance.scopes:
self.fields["scopes"].initial = self.instance.scopes


@admin.register(models.Application)
class ApplicationAdmin(admin.ModelAdmin):
"""Admin interface for managing applications and their permissions."""

form = ApplicationAdminForm

list_display = ("id", "name", "client_id", "get_scopes_display")
fields = [
"name",
"id",
"created_at",
"updated_at",
"scopes",
"client_id",
"client_secret",
]
readonly_fields = ["id", "created_at", "updated_at"]
inlines = [ApplicationDomainInline]

def get_readonly_fields(self, request, obj=None):
"""Make client_id and client_secret readonly after creation."""
if obj: # Editing existing object
return self.readonly_fields + ["client_id", "client_secret"]
return self.readonly_fields

def get_fields(self, request, obj=None):
"""Hide client_secret after creation."""
fields = super().get_fields(request, obj)
if obj:
return [f for f in fields if f != "client_secret"]
return fields

def get_scopes_display(self, obj):
"""Display scopes in list view."""
if obj.scopes:
return ", ".join(obj.scopes)
return _("No scopes")

get_scopes_display.short_description = _("Scopes")
1 change: 1 addition & 0 deletions src/backend/core/external_api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Meet core external API endpoints"""
109 changes: 109 additions & 0 deletions src/backend/core/external_api/authentication.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""Authentication Backends for external application to the Meet core app."""

import logging

from django.conf import settings
from django.contrib.auth import get_user_model

import jwt
from rest_framework import authentication, exceptions

User = get_user_model()
logger = logging.getLogger(__name__)


class ApplicationJWTAuthentication(authentication.BaseAuthentication):
"""JWT authentication for application-delegated API access.

Validates JWT tokens issued to applications that are acting on behalf
of users. Tokens must include user_id, client_id, and delegation flag.
"""

def authenticate(self, request):
"""Extract and validate JWT from Authorization header.

Returns:
Tuple of (user, payload) if authentication successful, None otherwise
"""
auth_header = authentication.get_authorization_header(request).split()

if not auth_header or auth_header[0].lower() != b"bearer":
return None

if len(auth_header) != 2:
logger.warning("Invalid token header format")
raise exceptions.AuthenticationFailed("Invalid token header.")

try:
token = auth_header[1].decode("utf-8")
except UnicodeError as e:
logger.warning("Token decode error: %s", e)
raise exceptions.AuthenticationFailed("Invalid token encoding.") from e

return self.authenticate_credentials(token)

def authenticate_credentials(self, token):
"""Validate JWT token and return authenticated user.

Args:
token: JWT token string

Returns:
Tuple of (user, payload)

Raises:
AuthenticationFailed: If token is invalid, expired, or user not found
"""
# Decode and validate JWT
try:
payload = jwt.decode(
token,
settings.APPLICATION_JWT_SECRET_KEY,
algorithms=[settings.APPLICATION_JWT_ALG],
issuer=settings.APPLICATION_JWT_ISSUER,
audience=settings.APPLICATION_JWT_AUDIENCE,
)
Comment on lines +59 to +65
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Require standard JWT claims to harden validation.

Enforce presence of exp/iat to improve security posture.

             payload = jwt.decode(
                 token,
                 settings.APPLICATION_JWT_SECRET_KEY,
                 algorithms=[settings.APPLICATION_JWT_ALG],
                 issuer=settings.APPLICATION_JWT_ISSUER,
                 audience=settings.APPLICATION_JWT_AUDIENCE,
+                options={"require": ["exp", "iat"]},
             )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
payload = jwt.decode(
token,
settings.APPLICATION_JWT_SECRET_KEY,
algorithms=[settings.APPLICATION_JWT_ALG],
issuer=settings.APPLICATION_JWT_ISSUER,
audience=settings.APPLICATION_JWT_AUDIENCE,
)
payload = jwt.decode(
token,
settings.APPLICATION_JWT_SECRET_KEY,
algorithms=[settings.APPLICATION_JWT_ALG],
issuer=settings.APPLICATION_JWT_ISSUER,
audience=settings.APPLICATION_JWT_AUDIENCE,
options={"require": ["exp", "iat"]},
)
🤖 Prompt for AI Agents
In src/backend/core/external_api/authentication.py around lines 59 to 65, the
JWT decode call currently validates signature, issuer and audience but does not
enforce required standard claims; update the jwt.decode invocation to require
the presence of exp and iat (e.g., pass the appropriate options/require
parameter) so tokens missing those claims are rejected, and ensure any decode
exceptions are handled as before.

except jwt.ExpiredSignatureError as e:
logger.warning("Token expired")
raise exceptions.AuthenticationFailed("Token expired.") from e
except jwt.InvalidIssuerError as e:
logger.warning("Invalid JWT issuer: %s", e)
raise exceptions.AuthenticationFailed("Invalid token.") from e
except jwt.InvalidAudienceError as e:
logger.warning("Invalid JWT audience: %s", e)
raise exceptions.AuthenticationFailed("Invalid token.") from e
except jwt.InvalidTokenError as e:
logger.warning("Invalid JWT token: %s", e)
raise exceptions.AuthenticationFailed("Invalid token.") from e

user_id = payload.get("user_id")
client_id = payload.get("client_id")
is_delegated = payload.get("delegated", False)

if not user_id:
logger.warning("Missing 'user_id' in JWT payload")
raise exceptions.AuthenticationFailed("Invalid token claims.")

if not client_id:
logger.warning("Missing 'client_id' in JWT payload")
raise exceptions.AuthenticationFailed("Invalid token claims.")

if not is_delegated:
logger.warning("Token is not marked as delegated")
raise exceptions.AuthenticationFailed("Invalid token type.")

try:
user = User.objects.get(id=user_id)
except User.DoesNotExist as e:
logger.warning("User not found: %s", user_id)
raise exceptions.AuthenticationFailed("User not found.") from e

if not user.is_active:
logger.warning("Inactive user attempted authentication: %s", user_id)
raise exceptions.AuthenticationFailed("User account is disabled.")

return (user, payload)

def authenticate_header(self, request):
"""Return authentication scheme for WWW-Authenticate header."""
return "Bearer"
76 changes: 76 additions & 0 deletions src/backend/core/external_api/permissions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""Permission handlers for application-delegated API access."""

import logging
from typing import Dict

from rest_framework import exceptions, permissions

from .. import models

logger = logging.getLogger(__name__)


class BaseScopePermission(permissions.BasePermission):
"""Base class for scope-based permission checking.

Subclasses must define `scope_map` attribute mapping actions to required scopes.
"""

scope_map: Dict[str, str] = {}

def has_permission(self, request, view):
"""Check if the JWT token contains the required scope for this action.

Args:
request: DRF request object with authenticated user
view: ViewSet instance

Returns:
bool: True if permission granted

Raises:
PermissionDenied: If required scope is missing from token
"""
# Get the current action (e.g., 'list', 'create')
action = getattr(view, "action", None)
if not action:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you just add a comment to say why we need to manage this case ? (automatic scheme documentation if I remember correctly?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated this check to return false, if I cannot determine the action being made.

raise exceptions.PermissionDenied(
"Insufficient permissions. Unknown action."
)

required_scope = self.scope_map.get(action)
if not required_scope:
# Action not in scope_map, deny by default
raise exceptions.PermissionDenied(
f"Insufficient permissions. Required scope: {required_scope}"
)

token_payload = request.auth
token_scopes = token_payload.get("scope")

if not token_scopes:
raise exceptions.PermissionDenied("Insufficient permissions.")

# Ensure scopes is a list (handle both list and space-separated string)
if isinstance(token_scopes, str):
token_scopes = token_scopes.split()

if required_scope not in token_scopes:
raise exceptions.PermissionDenied(
f"Insufficient permissions. Required scope: {required_scope}"
)

return True


class HasRequiredRoomScope(BaseScopePermission):
"""Permission class for Room-related operations."""

scope_map = {
"list": models.ApplicationScope.ROOMS_LIST,
"retrieve": models.ApplicationScope.ROOMS_RETRIEVE,
"create": models.ApplicationScope.ROOMS_CREATE,
"update": models.ApplicationScope.ROOMS_UPDATE,
"partial_update": models.ApplicationScope.ROOMS_UPDATE,
"destroy": models.ApplicationScope.ROOMS_DELETE,
}
73 changes: 73 additions & 0 deletions src/backend/core/external_api/serializers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""Serializers for the external API of the Meet core app."""

# pylint: disable=abstract-method

from django.conf import settings

from rest_framework import serializers

from core import models, utils
from core.api.serializers import BaseValidationOnlySerializer

OAUTH2_GRANT_TYPE_CLIENT_CREDENTIALS = "client_credentials"


class ApplicationJwtSerializer(BaseValidationOnlySerializer):
"""Validate OAuth2 JWT token request data."""

client_id = serializers.CharField(write_only=True)
client_secret = serializers.CharField(write_only=True)
grant_type = serializers.ChoiceField(choices=[OAUTH2_GRANT_TYPE_CLIENT_CREDENTIALS])
scope = serializers.CharField(write_only=True)


class RoomSerializer(serializers.ModelSerializer):
"""External API serializer for room data exposed to applications.

Provides limited, safe room information for third-party integrations:
- Secure defaults for room creation (trusted access level)
- Computed fields (url, telephony) for external consumption
- Filtered data appropriate for delegation scenarios
- Tracks creation source for auditing

Intentionally exposes minimal information to external applications,
following the principle of least privilege.
"""

class Meta:
model = models.Room
fields = ["id", "name", "slug", "pin_code", "access_level"]
read_only_fields = ["id", "name", "slug", "pin_code", "access_level"]

def to_representation(self, instance):
"""Enrich response with application-specific computed fields."""
output = super().to_representation(instance)
request = self.context.get("request")
pin_code = output.pop("pin_code", None)

if not request:
return output

# Add room URL for direct access
if settings.APPLICATION_BASE_URL:
output["url"] = f"{settings.APPLICATION_BASE_URL}/{instance.slug}"

# Add telephony information if enabled
if settings.ROOM_TELEPHONY_ENABLED:
output["telephony"] = {
"enabled": True,
"phone_number": settings.ROOM_TELEPHONY_PHONE_NUMBER,
"pin_code": pin_code,
"default_country": settings.ROOM_TELEPHONY_DEFAULT_COUNTRY,
}

return output

def create(self, validated_data):
"""Create room with secure defaults for application delegation."""

# Set secure defaults
validated_data["name"] = utils.generate_room_slug()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the slug is already existing?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have set this case aside for now. Why? With the current slug pattern, there are approximately 10^14 possible combinations.

To put this in perspective: you would need to generate roughly 1.7 million slugs to have just a 1% chance of encountering at least one collision. For a 50% chance, the number rises to around 14 million slugs.

It's relevant, however I think it can wait for later.

validated_data["access_level"] = models.RoomAccessLevel.TRUSTED

return super().create(validated_data)
Loading
Loading