diff --git a/Makefile b/Makefile index 73563d5aa..cfb553e5e 100644 --- a/Makefile +++ b/Makefile @@ -108,6 +108,7 @@ bootstrap: \ build: ## build the project containers @$(MAKE) build-backend @$(MAKE) build-frontend + @$(MAKE) build-agents .PHONY: build build-backend: ## build the app-dev container @@ -119,6 +120,10 @@ build-frontend: ## build the frontend container @$(COMPOSE) build frontend .PHONY: build-frontend +build-agents: ## build the agents image(s) + @$(COMPOSE) build metadata-agent +.PHONY: build-agents + down: ## stop and remove containers, networks, images, and volumes @$(COMPOSE) down .PHONY: down @@ -138,10 +143,15 @@ run-summary: ## start only the summary application and all needed services @$(COMPOSE) up --force-recreate -d celery-summary-summarize .PHONY: run-summary +run-agents: ## start agents + @$(COMPOSE) up --force-recreate -d metadata-agent +.PHONY: run-agents + run: run: ## start the wsgi (production) and development server @$(MAKE) run-backend @$(MAKE) run-summary + @$(MAKE) run-agents @$(COMPOSE) up --force-recreate -d frontend .PHONY: run diff --git a/compose.yml b/compose.yml index c734bf68b..57750c90d 100644 --- a/compose.yml +++ b/compose.yml @@ -282,3 +282,39 @@ services: watch: - action: rebuild path: ./src/summary + + metadata-agent: + build: + context: ./src/agents + dockerfile: Dockerfile + working_dir: /app + command: ["python", "metadata_extractor.py", "start"] + environment: + LIVEKIT_URL: "ws://livekit:7880" + LIVEKIT_API_KEY: "devkey" + LIVEKIT_API_SECRET: "secret" + ROOM_METADATA_AGENT_NAME: "metadata-extractor" + AWS_S3_ENDPOINT_URL: "minio:9000" + AWS_S3_ACCESS_KEY_ID: "meet" + AWS_S3_SECRET_ACCESS_KEY: "password" + AWS_S3_SECURE_ACCESS: "false" + PYTHONUNBUFFERED: "1" + AWS_STORAGE_BUCKET_NAME: "meet-media-storage" + AWS_S3_REGION_NAME: "local" + depends_on: + - livekit + - minio + + transcriber-agent: + build: + context: ./src/agents + dockerfile: Dockerfile + working_dir: /app + command: ["python", "multi-user-transcriber.py", "start"] + environment: + LIVEKIT_URL: "ws://livekit:7880" + LIVEKIT_API_KEY: "devkey" + LIVEKIT_API_SECRET: "secret" + PYTHONUNBUFFERED: "1" + depends_on: + - livekit \ No newline at end of file diff --git a/docker/livekit/config/livekit-server.yaml b/docker/livekit/config/livekit-server.yaml index 3de4e707d..719fe409d 100644 --- a/docker/livekit/config/livekit-server.yaml +++ b/docker/livekit/config/livekit-server.yaml @@ -1,5 +1,11 @@ log_level: debug redis: address: redis:6379 + keys: devkey: secret + +webhook: + api_key: devkey + urls: + - "http://app-dev:8000/api/v1.0/rooms/webhooks-livekit/" \ No newline at end of file diff --git a/src/agents/Dockerfile b/src/agents/Dockerfile index dcd2ebf24..da29f8a9e 100644 --- a/src/agents/Dockerfile +++ b/src/agents/Dockerfile @@ -19,6 +19,4 @@ USER ${DOCKER_USER} # Un-privileged user running the application COPY --from=builder /install /usr/local -COPY . . - -CMD ["python", "multi-user-transcriber.py", "start"] +COPY . . \ No newline at end of file diff --git a/src/agents/metadata_extractor.py b/src/agents/metadata_extractor.py new file mode 100644 index 000000000..688d67e08 --- /dev/null +++ b/src/agents/metadata_extractor.py @@ -0,0 +1,395 @@ +"""Metadata agent that extracts metadata from active room.""" + +import asyncio +import json +import logging +import os +from dataclasses import asdict, dataclass +from datetime import datetime, timezone +from io import BytesIO +from typing import List, Optional + +from dotenv import find_dotenv, load_dotenv +from livekit import api, rtc +from livekit.agents import ( + Agent, + AgentSession, + AutoSubscribe, + JobContext, + JobProcess, + JobRequest, + RoomInputOptions, + RoomIO, + RoomOutputOptions, + WorkerOptions, + WorkerPermissions, + cli, + utils, +) +from livekit.plugins import silero +from minio import Minio +from minio.error import S3Error + +load_dotenv() + +logger = logging.getLogger("metadata-extractor") + +AGENT_NAME = os.getenv("ROOM_METADATA_EXTRACTOR_AGENT_NAME", "metadata-extractor") + + +@dataclass +class MetadataEvent: + """Wip.""" + + participant_id: str + type: str + timestamp: datetime + data: Optional[str] = None + + def serialize(self) -> dict: + """Return a JSON-serializable dictionary representation of the event.""" + data = asdict(self) + data["timestamp"] = self.timestamp.isoformat() + return data + + +class VADAgent(Agent): + """Agent that monitors voice activity for a specific participant.""" + + def __init__(self, participant_identity: str, events: List): + """Wip.""" + super().__init__( + instructions="not-needed", + ) + self.participant_identity = participant_identity + self.events = events + + async def on_enter(self) -> None: + """Initialize VAD monitoring for this participant.""" + + @self.session.on("user_state_changed") + def on_user_state(event): + timestamp = datetime.now(timezone.utc) + + if event.new_state == "speaking": + event = MetadataEvent( + participant_id=self.participant_identity, + type="speech_start", + timestamp=timestamp, + ) + self.events.append(event) + + elif event.old_state == "speaking": + event = MetadataEvent( + participant_id=self.participant_identity, + type="speech_end", + timestamp=timestamp, + ) + self.events.append(event) + + +class MetadataAgent: + """Monitor and manage real-time metadata extraction from meeting rooms. + + Oversees VAD (Voice Activity Detection) and participant metadata streams + to track and analyze real-time events, coordinating data collection across + participants for insights like speaking activity and engagement. + """ + + def __init__(self, ctx: JobContext, recording_id: str): + """Initialize metadata agent.""" + self.minio_client = Minio( + endpoint=os.getenv("AWS_S3_ENDPOINT_URL"), + access_key=os.getenv("AWS_S3_ACCESS_KEY_ID"), + secret_key=os.getenv("AWS_S3_SECRET_ACCESS_KEY"), + secure=os.getenv("AWS_S3_SECURE_ACCESS", "False").lower() == "true", + ) + + # todo - raise error if none + self.bucket_name = os.getenv("AWS_STORAGE_BUCKET_NAME") + logger.info("Using S3 bucket: %s", self.bucket_name) + dotenv_path = find_dotenv() + logger.info(f"Fichier .env trouvé : {dotenv_path}") + self.ctx = ctx + self._sessions: dict[str, AgentSession] = {} + self._tasks: set[asyncio.Task] = set() + + self.output_filename = ( + f"{os.getenv('AWS_S3_OUTPUT_FOLDER', 'recordings').strip('/')}/" + f"{json.loads(recording_id).get('recording_id')}-metadata.json" + ) + + # Storage for events + self.events = [] + self.participants = {} + + logger.info("MetadataAgent initialized") + + def start(self): + """Start listening for participant connection events.""" + self.ctx.room.on("participant_connected", self.on_participant_connected) + self.ctx.room.on("participant_disconnected", self.on_participant_disconnected) + self.ctx.room.on("participant_name_changed", self.on_participant_name_changed) + + self.ctx.room.register_text_stream_handler("lk.chat", self.handle_chat_stream) + + logger.info("Started listening for participant events") + + async def on_chat_message_received( + self, reader: rtc.TextStreamReader, participant_identity: str + ): + """Wip.""" + full_text = await reader.read_all() + logger.info( + "Received chat message from %s: '%s'", participant_identity, full_text + ) + + self.events.append( + MetadataEvent( + participant_id=participant_identity, + type="chat_received", + timestamp=datetime.now(timezone.utc), + data=full_text, + ) + ) + + def handle_chat_stream(self, reader, participant_identity): + """Wip.""" + task = asyncio.create_task( + self.on_chat_message_received(reader, participant_identity) + ) + self._tasks.add(task) + task.add_done_callback(lambda _: self._tasks.remove(task)) + + def save(self): + """Wip.""" + logger.info("Persisting processed metadata output to disk…") + + participants = [] + for k, v in self.participants.items(): + participants.append({"participantId": k, "name": v}) + + sorted_event = sorted(self.events, key=lambda e: e.timestamp) + + payload = { + "events": [event.serialize() for event in sorted_event], + "participants": participants, + } + + data = json.dumps(payload, indent=2).encode("utf-8") + stream = BytesIO(data) + + try: + self.minio_client.put_object( + self.bucket_name, + self.output_filename, + stream, + length=len(data), + content_type="application/json", + ) + logger.info( + "Uploaded speaker meeting metadata", + ) + except S3Error: + logger.exception( + "Failed to upload meeting metadata", + ) + + async def aclose(self): + """Close all sessions and cleanup resources.""" + logger.info("Closing all VAD monitoring sessions…") + + await utils.aio.cancel_and_wait(*self._tasks) + + await asyncio.gather( + *[self._close_session(session) for session in self._sessions.values()], + return_exceptions=True, + ) + + self.ctx.room.off("participant_connected", self.on_participant_connected) + self.ctx.room.off("participant_disconnected", self.on_participant_disconnected) + self.ctx.room.off("participant_name_changed", self.on_participant_name_changed) + + logger.info("All VAD sessions closed") + self.save() + + def on_participant_connected(self, participant: rtc.RemoteParticipant): + """Handle new participant connection by starting VAD monitoring.""" + if participant.identity in self._sessions: + logger.debug("Session already exists for %s", participant.identity) + return + + self.events.append( + MetadataEvent( + participant_id=participant.identity, + type="participant_connected", + timestamp=datetime.now(timezone.utc), + ) + ) + + self.participants[participant.identity] = participant.name + + logger.info("New participant connected: %s", participant.identity) + task = asyncio.create_task(self._start_session(participant)) + self._tasks.add(task) + + def on_task_done(task: asyncio.Task): + try: + self._sessions[participant.identity] = task.result() + except Exception: + logger.exception("Failed to start session for %s", participant.identity) + finally: + self._tasks.discard(task) + + task.add_done_callback(on_task_done) + + def on_participant_disconnected(self, participant: rtc.RemoteParticipant): + """Handle participant disconnection by closing VAD monitoring.""" + self.events.append( + MetadataEvent( + participant_id=participant.identity, + type="participant_disconnected", + timestamp=datetime.now(timezone.utc), + ) + ) + + session = self._sessions.pop(participant.identity, None) + if session is None: + logger.debug("No session found for %s", participant.identity) + return + + logger.info("Participant disconnected: %s", participant.identity) + task = asyncio.create_task(self._close_session(session)) + self._tasks.add(task) + + def on_close_done(_): + self._tasks.discard(task) + logger.info( + "VAD session closed for %s (remaining sessions: %d)", + participant.identity, + len(self._sessions), + ) + + task.add_done_callback(on_close_done) + + def on_participant_name_changed(self, participant: rtc.RemoteParticipant): + """Wip.""" + logger.info("Participant's name changed: %s", participant.identity) + self.participants[participant.identity] = participant.name + + async def _start_session(self, participant: rtc.RemoteParticipant) -> AgentSession: + """Create and start VAD monitoring session for participant.""" + if participant.identity in self._sessions: + return self._sessions[participant.identity] + + # Create session with VAD only - no STT, LLM, or TTS + session = AgentSession( + vad=self.ctx.proc.userdata["vad"], + turn_detection="vad", + user_away_timeout=30.0, + ) + + # Set up room IO to receive audio from this specific participant + room_io = RoomIO( + agent_session=session, + room=self.ctx.room, + participant=participant, + input_options=RoomInputOptions( + audio_enabled=True, + text_enabled=False, + ), + output_options=RoomOutputOptions( + audio_enabled=False, + transcription_enabled=False, + ), + ) + + await room_io.start() + await session.start( + agent=VADAgent( + participant_identity=participant.identity, events=self.events + ) + ) + + return session + + async def _close_session(self, session: AgentSession) -> None: + """Close and cleanup VAD monitoring session.""" + try: + await session.drain() + await session.aclose() + except Exception: + logger.exception("Error closing session") + + +async def entrypoint(ctx: JobContext): + """Initialize and run the multi-user VAD monitor.""" + logger.info("Starting metadata agent in room: %s", ctx.room.name) + recording_id = ctx.job.metadata + logger.info("Recording ID: %s", recording_id) + vad_monitor = MetadataAgent(ctx, recording_id) + vad_monitor.start() + + # Connect to room and subscribe to audio only + await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY) + + existing_participants = list(ctx.room.remote_participants.values()) + for participant in existing_participants: + vad_monitor.on_participant_connected(participant) + + async def cleanup(): + logger.info("Shutting down VAD monitor...") + await vad_monitor.aclose() + + ctx.add_shutdown_callback(cleanup) + + +async def handle_job_request(job_req: JobRequest) -> None: + """Accept or reject the job request based on agent presence in the room.""" + room_name = job_req.room.name + recording_id = job_req.job.metadata + agent_identity = f"{AGENT_NAME}-{room_name}" + + async with api.LiveKitAPI() as lk: + try: + resp = await lk.room.list_participants( + list=api.ListParticipantsRequest(room=room_name) + ) + already_present = any( + p.kind == rtc.ParticipantKind.PARTICIPANT_KIND_AGENT + and p.identity == agent_identity + for p in resp.participants + ) + if already_present: + logger.info("Agent already in the room '%s' — reject", room_name) + await job_req.reject() + else: + logger.info( + "Accept job for '%s' — identity=%s", room_name, agent_identity + ) + await job_req.accept(identity=agent_identity, metadata=recording_id) + except Exception: + logger.exception("Error treating the job for '%s'", room_name) + await job_req.reject() + + +def prewarm(proc: JobProcess): + """Preload voice activity detection model.""" + proc.userdata["vad"] = silero.VAD.load() + + +if __name__ == "__main__": + cli.run_app( + WorkerOptions( + entrypoint_fnc=entrypoint, + prewarm_fnc=prewarm, + request_fnc=handle_job_request, + agent_name=AGENT_NAME, + permissions=WorkerPermissions( + can_publish=False, + can_publish_data=False, + can_subscribe=True, + hidden=True, + ), + ) + ) diff --git a/src/agents/pyproject.toml b/src/agents/pyproject.toml index 81d5823e2..b53ea8441 100644 --- a/src/agents/pyproject.toml +++ b/src/agents/pyproject.toml @@ -7,7 +7,8 @@ dependencies = [ "livekit-agents==1.2.6", "livekit-plugins-deepgram==1.2.6", "livekit-plugins-silero==1.2.6", - "python-dotenv==1.1.1" + "python-dotenv==1.1.1", + "minio==7.2.15", ] [project.optional-dependencies] diff --git a/src/backend/core/api/feature_flag.py b/src/backend/core/api/feature_flag.py index b137a7ea0..df5f7eca7 100644 --- a/src/backend/core/api/feature_flag.py +++ b/src/backend/core/api/feature_flag.py @@ -1,30 +1,75 @@ """Feature flag handler for the Meet core app.""" +import logging +import os from functools import wraps +from logging import getLogger from django.conf import settings from django.http import Http404 +from posthog import Posthog + +logging.basicConfig(level=logging.DEBUG) + +logger = getLogger(__name__) + + +class FeatureFlagError(Exception): + """Feature flag management error.""" + class FeatureFlag: - """Check if features are enabled and return error responses.""" + """Feature flag management using Django settings and PostHog.""" FLAGS = { - "recording": "RECORDING_ENABLE", - "storage_event": "RECORDING_STORAGE_EVENT_ENABLE", - "subtitle": "ROOM_SUBTITLE_ENABLED", + "metadata_agent": {"posthog": "is_metadata_agent_enabled"}, + "recording": {"setting": "RECORDING_ENABLE"}, + "storage_event": {"setting": "RECORDING_STORAGE_EVENT_ENABLE"}, + "subtitle": {"setting": "ROOM_SUBTITLE_ENABLED"}, } - @classmethod - def flag_is_active(cls, flag_name): - """Check if a feature flag is active.""" + _ph_client = None - setting_name = cls.FLAGS.get(flag_name) + @classmethod + def _get_ph_client(cls): + """Initialize and return PostHog client if configured.""" + if cls._ph_client is not None: + return cls._ph_client + api_key = os.getenv("POSTHOG_API_KEY") + host = os.getenv("POSTHOG_API_HOST", "https://eu.i.posthog.com") + logging.info("PostHog config: api_key=%s, host=%s", api_key, host) + if Posthog and api_key and host: + logging.info("Initializing PostHog client") + cls._ph_client = Posthog(project_api_key=api_key, host=host) + logging.info("PostHog client initialized: %s", bool(cls._ph_client)) + return cls._ph_client - if setting_name is None: + @classmethod + def flag_is_active(cls, flag_name, *, distinct_id=None, default=False): + """Check if a feature flag is active.""" + cfg = cls.FLAGS.get(flag_name) + if not cfg: return False - return getattr(settings, setting_name, False) + setting_name = cfg.get("setting") + if setting_name is not None: + return bool(getattr(settings, setting_name, False)) + + posthog_flag = cfg.get("posthog") + if posthog_flag: + ph = cls._get_ph_client() + if ph and distinct_id: + try: + logger.info( + "Checking PostHog flag %s for id=%s", posthog_flag, distinct_id + ) + return bool(ph.feature_enabled(posthog_flag, distinct_id)) + except FeatureFlagError as e: + logging.error("Error checking feature flag %s: %s", flag_name, e) + return default + return default + return default @classmethod def require(cls, flag_name): diff --git a/src/backend/core/api/viewsets.py b/src/backend/core/api/viewsets.py index 55578e9e8..1457ddd53 100644 --- a/src/backend/core/api/viewsets.py +++ b/src/backend/core/api/viewsets.py @@ -301,7 +301,6 @@ def start_room_recording(self, request, pk=None): # pylint: disable=unused-argu """Start recording a room.""" serializer = serializers.StartRecordingSerializer(data=request.data) - if not serializer.is_valid(): return drf_response.Response( {"detail": "Invalid request."}, status=drf_status.HTTP_400_BAD_REQUEST diff --git a/src/backend/core/migrations/0015_recording_metadata_dispatch_id_alter_resource_users_and_more.py b/src/backend/core/migrations/0015_recording_metadata_dispatch_id_alter_resource_users_and_more.py new file mode 100644 index 000000000..5070bd07d --- /dev/null +++ b/src/backend/core/migrations/0015_recording_metadata_dispatch_id_alter_resource_users_and_more.py @@ -0,0 +1,29 @@ +# Generated by Django 5.2.6 on 2025-09-25 15:14 + +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0014_room_pin_code'), + ] + + operations = [ + migrations.AddField( + model_name='recording', + name='metadata_dispatch_id', + field=models.CharField(blank=True, max_length=100, null=True), + ), + migrations.AlterField( + model_name='resource', + name='users', + field=models.ManyToManyField(related_name='resources', through='core.ResourceAccess', through_fields=('resource', 'user'), to=settings.AUTH_USER_MODEL), + ), + migrations.AlterField( + model_name='user', + name='language', + field=models.CharField(choices=[('en-us', 'English'), ('fr-fr', 'French'), ('nl-nl', 'Dutch'), ('de-de', 'German')], default='en-us', help_text='The language in which the user wants to see the interface.', max_length=10, verbose_name='language'), + ), + ] diff --git a/src/backend/core/models.py b/src/backend/core/models.py index b92b9291c..548499f47 100644 --- a/src/backend/core/models.py +++ b/src/backend/core/models.py @@ -573,6 +573,7 @@ class Recording(BaseModel): verbose_name=_("Recording mode"), help_text=_("Defines the mode of recording being called."), ) + metadata_dispatch_id = models.CharField(max_length=100, null=True, blank=True) class Meta: db_table = "meet_recording" diff --git a/src/backend/core/recording/event/notification.py b/src/backend/core/recording/event/notification.py index 49f0d02e1..240b47970 100644 --- a/src/backend/core/recording/event/notification.py +++ b/src/backend/core/recording/event/notification.py @@ -132,6 +132,8 @@ def _notify_summary_service(recording): logger.error("No owner found for recording %s", recording.id) return False + worker_id = recording.worker_id + payload = { "filename": recording.key, "email": owner_access.user.email, @@ -143,6 +145,7 @@ def _notify_summary_service(recording): "recording_time": recording.created_at.astimezone( owner_access.user.timezone ).strftime("%H:%M"), + "worker_id": worker_id, } headers = { diff --git a/src/backend/core/recording/services/recording_events.py b/src/backend/core/recording/services/recording_events.py index f24a7b029..997639692 100644 --- a/src/backend/core/recording/services/recording_events.py +++ b/src/backend/core/recording/services/recording_events.py @@ -1,8 +1,19 @@ """Recording-related LiveKit Events Service""" +import asyncio +import logging from logging import getLogger +from django.core.exceptions import ObjectDoesNotExist +from django.db import DatabaseError, transaction + +import aiohttp + from core import models, utils +from core.api.feature_flag import FeatureFlag +from core.services.metadata import MetadataService + +logging.basicConfig(level=logging.DEBUG) logger = getLogger(__name__) @@ -11,6 +22,21 @@ class RecordingEventsError(Exception): """Recording event handling fails.""" +def get_recording_creator_id(recording: models.Recording) -> str | None: + """Get the user ID of the recording creator (owner).""" + owner_id = ( + models.RecordingAccess.objects.filter( + role=models.RoleChoices.OWNER, + recording_id=recording.id, + user__isnull=False, + ) + .order_by("created_at") + .values_list("user_id", flat=True) + .first() + ) + return str(owner_id) if owner_id else None + + class RecordingEventsService: """Handles recording-related Livekit webhook events.""" @@ -47,3 +73,127 @@ def handle_limit_reached(recording): f"Failed to notify participants in room '{recording.room.id}' about " f"recording limit reached (recording_id={recording.id})" ) from e + + @staticmethod + def handle_egress_started(recording): + """Start metadata agent after transaction commit.""" + rec_id = recording.id + room_id = recording.room_id + creator_id = get_recording_creator_id(recording) + if not FeatureFlag.flag_is_active("metadata_agent", distinct_id=creator_id): + logger.info("Metadata agent disabled by PostHog flag for id=%s", creator_id) + return + + service = MetadataService() + + logger.info( + "Scheduling metadata start for recording=%s room_id=%s", rec_id, room_id + ) + + def _start(): + """Start metadata agent after transaction commit.""" + try: + rec = ( + models.Recording.objects.select_related("room") + .only("id", "status", "room_id", "room__id") + .get(id=rec_id) + ) + + if rec.status not in ( + models.RecordingStatusChoices.INITIATED, + models.RecordingStatusChoices.ACTIVE, + ): + logger.info( + "Skip metadata start: status=%s (rec=%s)", rec.status, rec.id + ) + return + + room = rec.room + + if room.pk != room_id: + logger.error( + "Room mismatch at start: rec.room_id=%s event.room_id=%s", + room.pk, + room_id, + ) + return + + dispatch_id = service.start_metadata(room, rec_id) + rec.metadata_dispatch_id = dispatch_id + rec.save(update_fields=["metadata_dispatch_id"]) + logger.info( + "Metadata start dispatched for rec=%s room_id=%s", rec.id, room.pk + ) + except (aiohttp.ClientError, asyncio.TimeoutError) as e: + logger.warning( + "Controller unreachable during start (rec=%s, room_id=%s): %s", + rec_id, + room_id, + e, + exc_info=True, + ) + + transaction.on_commit(_start, robust=True) + + @staticmethod + def handle_egress_ended(recording): + """Stop metadata agent after transaction commit.""" + service = MetadataService() + rec_id = recording.id + room_id = recording.room_id + creator_id = get_recording_creator_id(recording) + if not FeatureFlag.flag_is_active("metadata_agent", distinct_id=creator_id): + logger.info("Metadata agent disabled by PostHog flag for id=%s", creator_id) + return + + def _stop(): + try: + rec = ( + models.Recording.objects.select_related("room") + .only("id", "room_id", "room__id") + .get(id=rec_id) + ) + room = rec.room + + if room.pk != room_id: + logger.error( + "Room mismatch at stop: rec.room_id=%s event.room_id=%s", + room.pk, + room_id, + ) + return + + try: + service.stop_metadata(room, rec.metadata_dispatch_id) + logger.info( + "Metadata stop dispatched for rec=%s room_id=%s", + rec.id, + room.pk, + ) + except (aiohttp.ClientError, asyncio.TimeoutError) as e: + logger.warning( + "Controller unreachable during stop (rec=%s room_id=%s): %s", + rec_id, + room_id, + e, + exc_info=True, + ) + + except ObjectDoesNotExist as e: + logger.warning( + "Recording or Room not found while stopping (rec=%s room_id=%s): %s", + rec_id, + room_id, + e, + exc_info=True, + ) + except DatabaseError as e: + logger.warning( + "DB error while stopping metadata (rec=%s room_id=%s): %s", + rec_id, + room_id, + e, + exc_info=True, + ) + + transaction.on_commit(_stop, robust=True) diff --git a/src/backend/core/services/livekit_events.py b/src/backend/core/services/livekit_events.py index 980c49c9c..4b8cca167 100644 --- a/src/backend/core/services/livekit_events.py +++ b/src/backend/core/services/livekit_events.py @@ -2,6 +2,8 @@ # pylint: disable=no-member +import logging +import re import uuid from enum import Enum from logging import getLogger @@ -19,6 +21,8 @@ from .lobby import LobbyService from .telephony import TelephonyException, TelephonyService +logging.basicConfig(level=logging.DEBUG) + logger = getLogger(__name__) @@ -77,6 +81,30 @@ class LiveKitWebhookEventType(Enum): INGRESS_ENDED = "ingress_ended" +def _extract_recording_id_from_egress(data) -> str | None: + """Try to extract recording_id from egress data filenames. + On failure, return None. + """ + ei = getattr(data, "egress_info", None) + if not ei: + return None + + fname = getattr(getattr(ei, "file", None), "filename", None) + if not fname: + try: + outs = getattr(getattr(ei, "room_composite", None), "file_outputs", []) + if outs: + fname = getattr(outs[0], "filepath", None) + except (AttributeError, IndexError, TypeError): + fname = None + + if not fname: + return None + + m = re.search(r"([0-9a-fA-F-]{36})\.ogg$", fname) + return m.group(1) if m else None + + class LiveKitEventsService: """Service for processing and handling LiveKit webhook events and notifications.""" @@ -94,7 +122,7 @@ def __init__(self): def receive(self, request): """Process webhook and route to appropriate handler.""" - + logger.debug("LiveKit webhook received") auth_token = request.headers.get("Authorization") if not auth_token: raise AuthenticationError("Authorization header missing") @@ -118,13 +146,17 @@ def receive(self, request): if not handler or not callable(handler): return - + logger.debug("Handling LiveKit webhook event: %s", data) # pylint: disable=not-callable handler(data) def _handle_egress_ended(self, data): """Handle 'egress_ended' event.""" - + logger.debug( + "Egress ended: id=%s status=%s", + getattr(data.egress_info, "egress_id", None), + getattr(data.egress_info, "status", None), + ) try: recording = models.Recording.objects.get( worker_id=data.egress_info.egress_id @@ -134,6 +166,13 @@ def _handle_egress_ended(self, data): f"Recording with worker ID {data.egress_info.egress_id} does not exist" ) from err + try: + self.recording_events.handle_egress_ended(recording) + except RecordingEventsError as e: + raise ActionFailedError( + f"Failed to process egress_ended for recording {recording.id}" + ) from e + if ( data.egress_info.status == api.EgressStatus.EGRESS_LIMIT_REACHED and recording.status == models.RecordingStatusChoices.ACTIVE @@ -145,6 +184,100 @@ def _handle_egress_ended(self, data): f"Failed to process limit reached event for recording {recording}" ) from e + def _handle_egress_started(self, data): + ei = getattr(data, "egress_info", None) or ( + data.get("egress_info") if isinstance(data, dict) else None + ) + egress_id = None + room_name = None + status = None + if ei: + egress_id = getattr(ei, "egress_id", None) or getattr(ei, "egressId", None) + room_name = getattr(ei, "room_name", None) or getattr(ei, "roomName", None) + status = getattr(ei, "status", None) + + logger.warning( + "egress_started: egress_id=%s status=%s room_name=%s", + egress_id, + status, + room_name, + ) + + if not egress_id: + logger.error("egress_started without egress_id") + return + + rec = models.Recording.objects.filter(worker_id=egress_id).first() + + if rec is None: + rec_id = _extract_recording_id_from_egress(data) + if rec_id: + rec = ( + models.Recording.objects.filter(id=rec_id) + .select_related("room") + .first() + ) + if rec and not rec.worker_id: + rec.worker_id = egress_id + rec.save(update_fields=["worker_id"]) + logger.info( + "Attached worker_id=%s to recording=%s via filename", + egress_id, + rec.id, + ) + + if rec is None and room_name: + rec = ( + models.Recording.objects.filter( + room__livekit_name=room_name, + status__in=[ + models.RecordingStatusChoices.INITIATED, + models.RecordingStatusChoices.ACTIVE, + ], + ) + .order_by("-created_at") + .select_related("room") + .first() + ) + if rec and not rec.worker_id: + rec.worker_id = egress_id + rec.save(update_fields=["worker_id"]) + logger.info( + "Heuristically attached worker_id=%s to recording=%s via livekit room", + egress_id, + rec.id, + ) + + if rec is None: + logger.warning( + "egress_started: unknown egress_id=%s (room_name=%s)", + egress_id, + room_name, + ) + return + + lk_name = getattr(rec.room, "livekit_name", None) + if room_name and lk_name and lk_name != room_name: + logger.error( + "Room mismatch: rec[%s].room=%s != event.room=%s", + rec.id, + lk_name, + room_name, + ) + return + + try: + logger.debug( + "Processing egress_started for recording %s (room=%s)", + rec.id, + lk_name or rec.room_id, + ) + self.recording_events.handle_egress_started(rec) + except RecordingEventsError as e: + raise ActionFailedError( + f"Failed to process egress_started for recording {rec.id}" + ) from e + def _handle_room_started(self, data): """Handle 'room_started' event.""" diff --git a/src/backend/core/services/metadata.py b/src/backend/core/services/metadata.py new file mode 100644 index 000000000..ad4741b2e --- /dev/null +++ b/src/backend/core/services/metadata.py @@ -0,0 +1,88 @@ +"""Service for managing metadata agents in LiveKit rooms.""" + +import json +import logging +from logging import getLogger + +from django.conf import settings + +from asgiref.sync import async_to_sync +from livekit.protocol.agent_dispatch import ( + CreateAgentDispatchRequest, +) + +from core import utils + +logging.basicConfig(level=logging.DEBUG) + +logger = getLogger(__name__) + + +class MetadataException(Exception): + """Exception raised when metadata operations fail.""" + + +class MetadataService: + """Service for managing metadata agents in LiveKit rooms.""" + + def __init__(self) -> None: + self.agent_name = settings.ROOM_METADATA_AGENT_NAME + + @async_to_sync + async def start_metadata(self, room, recording_id) -> None: + """Start metadata agent in the specified room.""" + + lkapi = utils.create_livekit_client() + try: + payload = ( + json.dumps({"recording_id": str(recording_id)}) + if recording_id + else None + ) + + resp = await lkapi.agent_dispatch.create_dispatch( + CreateAgentDispatchRequest( + agent_name=self.agent_name, + room=str(room.id), + metadata=payload, + ) + ) + dispatch_id = resp.id + dispatch_id = getattr(resp, "id", None) + if not dispatch_id: + raise MetadataException("LiveKit did not return a dispatch_id") + logger.info( + "Agent dispatch created: room=%s agent=%s dispatch_id=%s", + room.id, + self.agent_name, + dispatch_id, + ) + return dispatch_id + except Exception as e: + logger.exception("Failed to create agent dispatch for room %s", room.id) + raise MetadataException("Failed to create metadata agent") from e + finally: + await lkapi.aclose() + + @async_to_sync + async def stop_metadata(self, room, dispatch_id) -> None: + """Stop metadata agent in the specified room.""" + logger.info( + "deleting agent dispatch: room=%s agent=%s dispatch_id=%s", + room.id, + self.agent_name, + dispatch_id, + ) + lkapi = utils.create_livekit_client() + try: + await lkapi.agent_dispatch.delete_dispatch( + dispatch_id=str(dispatch_id), room_name=str(room.id) + ) + logger.info( + "Agent dispatch deleted: room=%s agent=%s", room.id, self.agent_name + ) + except Exception as e: + logger.exception("Failed to delete agent dispatch for room %s", room.id) + raise MetadataException("Failed to stop metadata agent") from e + finally: + await lkapi.aclose() diff --git a/src/backend/core/tests/rooms/test_api_rooms_start_recording.py b/src/backend/core/tests/rooms/test_api_rooms_start_recording.py index 9d3b1a736..46076b33c 100644 --- a/src/backend/core/tests/rooms/test_api_rooms_start_recording.py +++ b/src/backend/core/tests/rooms/test_api_rooms_start_recording.py @@ -183,9 +183,10 @@ def test_start_recording_success( mock_worker_service_factory.assert_called_once_with(mode="screen_recording") assert response.status_code == 201 - assert response.json() == { - "message": f"Recording successfully started for room {room.slug}" - } + assert ( + response.json()["message"] + == f"Recording successfully started for room {room.slug}" + ) # Verify the mediator was called with the recording recording = Recording.objects.first() diff --git a/src/backend/meet/settings.py b/src/backend/meet/settings.py index 51d10e880..447e2df95 100755 --- a/src/backend/meet/settings.py +++ b/src/backend/meet/settings.py @@ -664,6 +664,12 @@ class Base(Configuration): environ_prefix=None, ) + ROOM_METADATA_AGENT_NAME = values.Value( + "metadata-extractor", + environ_name="ROOM_METADATA_AGENT_NAME", + environ_prefix=None, + ) + # pylint: disable=invalid-name @property def ENVIRONMENT(self): diff --git a/src/backend/pyproject.toml b/src/backend/pyproject.toml index 006058c76..2e1caed2d 100644 --- a/src/backend/pyproject.toml +++ b/src/backend/pyproject.toml @@ -48,6 +48,7 @@ dependencies = [ "jsonschema==4.24.0", "markdown==3.8.2", "nested-multipart-parser==1.5.0", + "posthog==6.0.3", "psycopg[binary]==3.2.9", "PyJWT==2.10.1", "python-frontmatter==1.1.0", diff --git a/src/summary/pyproject.toml b/src/summary/pyproject.toml index 4d1b934b1..8388e2b28 100644 --- a/src/summary/pyproject.toml +++ b/src/summary/pyproject.toml @@ -15,6 +15,8 @@ dependencies = [ "posthog==6.0.3", "requests==2.32.4", "sentry-sdk[fastapi, celery]==2.30.0", + "pandas>=2.2.0", + "numpy>=1.26.0" ] [project.optional-dependencies] diff --git a/src/summary/summary/api/route/tasks.py b/src/summary/summary/api/route/tasks.py index 37895e9e2..de998f565 100644 --- a/src/summary/summary/api/route/tasks.py +++ b/src/summary/summary/api/route/tasks.py @@ -25,7 +25,7 @@ class TaskCreation(BaseModel): room: Optional[str] recording_date: Optional[str] recording_time: Optional[str] - + worker_id: Optional[str] router = APIRouter(prefix="/tasks") @@ -42,6 +42,7 @@ async def create_task(request: TaskCreation): request.room, request.recording_date, request.recording_time, + request.worker_id, ], queue=settings.transcribe_queue, ) diff --git a/src/summary/summary/core/celery_worker.py b/src/summary/summary/core/celery_worker.py index d838eb30d..1f98177d6 100644 --- a/src/summary/summary/core/celery_worker.py +++ b/src/summary/summary/core/celery_worker.py @@ -6,10 +6,13 @@ import os import tempfile import time +from datetime import datetime from pathlib import Path -from typing import Optional +from typing import Any, Dict, List, Optional +import numpy as np import openai +import pandas as pd import sentry_sdk from celery import Celery, signals from celery.utils.log import get_task_logger @@ -30,6 +33,159 @@ PROMPT_USER_PART, ) + +def nanoseconds_to_seconds(nanoseconds: int) -> float: + """Convert nanoseconds timestamp to seconds since epoch.""" + return nanoseconds / 1_000_000_000 + + +def parse_iso_timestamp(iso_string: str) -> float: + """Convert ISO timestamp to seconds since epoch.""" + dt = datetime.fromisoformat(iso_string.replace("+00:00", "")) + return dt.timestamp() + + +def calculate_overlap_vectorized(starts1, ends1, starts2, ends2): + """Calculate overlap duration between two sets of time intervals (vectorized).""" + overlap_starts = np.maximum(starts1[:, None], starts2) + overlap_ends = np.minimum(ends1[:, None], ends2) + return np.maximum(0, overlap_ends - overlap_starts) + + +def build_speech_segments_df(events: List[Dict[str, Any]]) -> pd.DataFrame: + """Build speech segments DataFrame from events. + + Returns: DataFrame with columns [participant_id, start_time, end_time] + """ + df = pd.DataFrame(events) + df["timestamp"] = df["timestamp"].apply(parse_iso_timestamp) + + starts = df[df["type"] == "speech_start"][["participant_id", "timestamp"]] + ends = df[df["type"] == "speech_end"][["participant_id", "timestamp"]] + + starts = starts.sort_values("timestamp").reset_index(drop=True) + ends = ends.sort_values("timestamp").reset_index(drop=True) + + segments = pd.merge( + starts.rename(columns={"timestamp": "start_time"}), + ends.rename(columns={"timestamp": "end_time"}), + left_index=True, + right_index=True, + suffixes=("_start", "_end"), + ) + + segments = segments[ + segments["participant_id_start"] == segments["participant_id_end"] + ] + segments = segments.rename(columns={"participant_id_start": "participant_id"}) + segments = segments[["participant_id", "start_time", "end_time"]] + return segments + + +def assign_participant_ids( # noqa: PLR0912 + diarization_output: Dict[str, Any], + metadatas: List[Dict[str, Any]], + recording_metadata: Dict[str, Any], + overlap_threshold: float = 0.3, +) -> Dict[str, Any]: + """Assign participant IDs to WhisperX diarization speakers.""" + recording_start = nanoseconds_to_seconds(recording_metadata["started_at"]) + + participant_segments_df = build_speech_segments_df(metadatas.get("events")) + + if participant_segments_df.empty: + return {} + + words_df = pd.DataFrame(diarization_output) + + if words_df.empty: + return {} + words_df = words_df.dropna(subset=["start", "end"]) + + if words_df.empty: + return {} + + if "speaker" not in words_df.columns: + words_df["speaker"] = "UNKNOWN" + words_df["speaker"] = words_df["speaker"].fillna("UNKNOWN") + + words_df["abs_start"] = recording_start + words_df["start"] + words_df["abs_end"] = recording_start + words_df["end"] + speaker_segments_list = [] + + for speaker, group in words_df.groupby("speaker"): + grp = group.sort_values("abs_start").reset_index(drop=True) + segments = [] + current_start = grp.iloc[0]["abs_start"] + current_end = grp.iloc[0]["abs_end"] + + for idx in range(1, len(grp)): + if grp.iloc[idx]["abs_start"] - current_end < 1.0: + current_end = grp.iloc[idx]["abs_end"] + else: + segments.append( + { + "speaker": speaker, + "start_time": current_start, + "end_time": current_end, + } + ) + current_start = grp.iloc[idx]["abs_start"] + current_end = grp.iloc[idx]["abs_end"] + + segments.append( + {"speaker": speaker, "start_time": current_start, "end_time": current_end} + ) + speaker_segments_list.extend(segments) + + speaker_segments_df = pd.DataFrame(speaker_segments_list) + + speaker_to_participant = {} + + for speaker in speaker_segments_df["speaker"].unique(): + spk_segs = speaker_segments_df[speaker_segments_df["speaker"] == speaker] + + best_match = None + best_overlap = 0 + + for participant_id in participant_segments_df["participant_id"].unique(): + part_segs = participant_segments_df[ + participant_segments_df["participant_id"] == participant_id + ] + + overlaps = calculate_overlap_vectorized( + spk_segs["start_time"].values, + spk_segs["end_time"].values, + part_segs["start_time"].values, + part_segs["end_time"].values, + ) + + total_overlap = overlaps.sum() + total_speaker_duration = ( + spk_segs["end_time"] - spk_segs["start_time"] + ).sum() + + if total_speaker_duration > 0: + overlap_ratio = total_overlap / total_speaker_duration + + if overlap_ratio > best_overlap and overlap_ratio >= overlap_threshold: + best_overlap = overlap_ratio + best_match = participant_id + + speaker_to_participant[speaker] = { + "participant_id": best_match, + "confidence": best_overlap, + } + for speaker, mapping in speaker_to_participant.items(): + for participant in metadatas.get("participants", []): + if participant["participantId"] == mapping["participant_id"]: + speaker_to_participant[speaker] = participant["name"] + if mapping["confidence"] < overlap_threshold + 0.2: + speaker_to_participant[speaker] += "?" + break + return speaker_to_participant + + settings = get_settings() analytics = get_analytics() @@ -135,7 +291,7 @@ def call(self, system_prompt: str, user_prompt: str): raise LLMException("LLM call failed.") from e -def format_segments(transcription_data): +def format_segments(transcription_data, metadata=None, manifest=None): """Format transcription segments from WhisperX into a readable conversation format. Processes transcription data with segments containing speaker information and text, @@ -143,17 +299,36 @@ def format_segments(transcription_data): conversation with speaker labels. """ formatted_output = "" - if not transcription_data or not hasattr(transcription_data, "segments"): - if isinstance(transcription_data, dict) and "segments" in transcription_data: - segments = transcription_data["segments"] - else: - return "Error: Invalid transcription data format" - else: - segments = transcription_data.segments + logger.info("Formatting segments with metadata: %s", metadata) + logger.info("Transcription data: %s", transcription_data) + if metadata: + mapping = assign_participant_ids( + diarization_output=transcription_data, + metadatas=metadata, + recording_metadata=manifest, + overlap_threshold=0.3, + ) + logger.info("Speaker to participant mapping: %s", mapping) + previous_label = None + for segment in transcription_data: + spk = segment.get("speaker") or "UNKNOWN_SPEAKER" + text = segment.get("text") or "" + if not text: + continue + + label = mapping.get(spk) or spk + + if label != previous_label: + formatted_output += f"\n\n **{label}**: {text}" + else: + formatted_output += f" {text}" + previous_label = label - previous_speaker = None + return formatted_output + else: + previous_speaker = None - for segment in segments: + for segment in transcription_data: speaker = segment.get("speaker", "UNKNOWN_SPEAKER") text = segment.get("text", "") if text: @@ -202,7 +377,7 @@ def task_failure_handler(task_id, exception=None, **kwargs): max_retries=settings.celery_max_retries, queue=settings.transcribe_queue, ) -def process_audio_transcribe_summarize_v2( +def process_audio_transcribe_summarize_v2( # noqa: PLR0915 self, filename: str, email: str, @@ -211,6 +386,7 @@ def process_audio_transcribe_summarize_v2( room: Optional[str], recording_date: Optional[str], recording_time: Optional[str], + worker_id: Optional[str], ): """Process an audio file by transcribing it and generating a summary. @@ -286,12 +462,49 @@ def process_audio_transcribe_summarize_v2( os.remove(temp_file_path) logger.debug("Temporary file removed: %s", temp_file_path) - formatted_transcription = ( - DEFAULT_EMPTY_TRANSCRIPTION - if not transcription.segments - else format_segments(transcription) - ) + if ( + analytics.is_feature_enabled("is_metadata_agent_enabled", distinct_id=sub) + and settings.is_summary_enabled + ): + file = filename.split("/")[1].split(".")[0] + metadata_obj = minio_client.get_object( + settings.aws_storage_bucket_name, + object_name=settings.metadata_file.format(filename=file), + ) + file_manifest = "recordings/" + worker_id + ".json" + manifest_obj = minio_client.get_object( + settings.aws_storage_bucket_name, + object_name=file_manifest, + ) + logger.info("Manifest file downloaded: %s", file_manifest) + + logger.info("Downloading metadata file") + + try: + metadata_bytes = metadata_obj.read() + metadata_json = json.loads(metadata_bytes.decode("utf-8")) + manifest_bytes = manifest_obj.read() + manifest_json = json.loads(manifest_bytes.decode("utf-8")) + finally: + metadata_obj.close() + metadata_obj.release_conn() + manifest_obj.close() + manifest_obj.release_conn() + + logger.info("Metadata file successfully downloaded") + logger.debug("Manifest: %s", manifest_json) + formatted_transcription = ( + DEFAULT_EMPTY_TRANSCRIPTION + if not getattr(transcription, "segments", None) + else format_segments(transcription.segments, metadata_json, manifest_json) + ) + else: + formatted_transcription = ( + DEFAULT_EMPTY_TRANSCRIPTION + if not transcription.segments + else format_segments(transcription.segments, None) + ) metadata_manager.track_transcription_metadata(task_id, transcription) if not room or not recording_date or not recording_time: diff --git a/src/summary/summary/core/config.py b/src/summary/summary/core/config.py index 77dc7aa3f..5da27920b 100644 --- a/src/summary/summary/core/config.py +++ b/src/summary/summary/core/config.py @@ -59,6 +59,7 @@ class Settings(BaseSettings): # Summary related settings is_summary_enabled: bool = True + metadata_file: Optional[str] = "recordings/{filename}-metadata.json" # Sentry sentry_is_enabled: bool = False