diff --git a/label_studio/data_manager/managers.py b/label_studio/data_manager/managers.py index 70c9b81056a5..b72dd4360620 100644 --- a/label_studio/data_manager/managers.py +++ b/label_studio/data_manager/managers.py @@ -30,6 +30,7 @@ ) from django.db.models.fields.json import KeyTextTransform from django.db.models.functions import Cast, Coalesce, Concat +from fsm.queryset_mixins import FSMStateQuerySetMixin from pydantic import BaseModel from label_studio.core.utils.common import load_func @@ -488,7 +489,16 @@ def apply_filters(queryset, filters, project, request): return queryset -class TaskQuerySet(models.QuerySet): +class TaskQuerySet(FSMStateQuerySetMixin, models.QuerySet): + """ + QuerySet for Task model with FSM state annotation support. + + Extends Django's QuerySet with: + - FSM state annotation (via FSMStateQuerySetMixin) + - Data Manager filters and ordering + - Selected items handling + """ + def prepared(self, prepare_params=None): """Apply filters, ordering and selected items to queryset @@ -700,6 +710,29 @@ def dummy(queryset): return queryset +def annotate_state(queryset): + """ + Annotate queryset with FSM state as 'state' field. + + Uses FSMStateQuerySetMixin.annotate_fsm_state() to efficiently annotate + the current state without causing N+1 queries. Aliases 'current_state' to + 'state' to match the Data Manager column name. + + Note: Feature flag checks and user context validation are handled by + annotate_fsm_state() itself, so no additional checks are needed here. + """ + # Use the mixin's annotate_fsm_state() method which creates 'current_state' annotation + # (includes feature flag and user context checks) + queryset = queryset.annotate_fsm_state() + + # Alias 'current_state' to 'state' for Data Manager column compatibility + # Only add the alias if current_state was actually added (feature flags enabled) + if 'current_state' in queryset.query.annotations: + return queryset.annotate(state=F('current_state')) + + return queryset + + settings.DATA_MANAGER_ANNOTATIONS_MAP = { 'avg_lead_time': annotate_avg_lead_time, 'completed_at': annotate_completed_at, @@ -712,6 +745,7 @@ def dummy(queryset): 'file_upload': file_upload, 'draft_exists': annotate_draft_exists, 'storage_filename': annotate_storage_filename, + 'state': annotate_state, } @@ -724,6 +758,19 @@ def update_annotation_map(obj): class PreparedTaskManager(models.Manager): + """ + Manager for Task model with Data Manager annotations. + + Provides: + - Advanced query annotations for Data Manager + - Filter and ordering support + - FSM state annotation support (via TaskQuerySet) + + Note: Overrides the base get_queryset() to return TaskQuerySet. Also has + a custom get_queryset(fields_for_evaluation, prepare_params, ...) method + for Data Manager-specific functionality. + """ + @staticmethod def annotate_queryset( queryset, fields_for_evaluation=None, all_fields=False, excluded_fields_for_evaluation=None, request=None @@ -754,6 +801,11 @@ def get_queryset( self, fields_for_evaluation=None, prepare_params=None, all_fields=False, excluded_fields_for_evaluation=None ): """ + Get queryset with optional Data Manager annotations and filters. + + When called without parameters (Django internal use), returns TaskQuerySet. + When called with parameters (Data Manager use), returns annotated and filtered queryset. + :param fields_for_evaluation: list of annotated fields in task :param prepare_params: filters, ordering, selected items :param all_fields: evaluate all fields for task @@ -761,6 +813,11 @@ def get_queryset( :param request: request for user extraction :return: task queryset with annotated fields """ + # If called without parameters, return base TaskQuerySet (for Django internal use) + if prepare_params is None: + return TaskQuerySet(self.model, using=self._db) + + # Otherwise, use Data Manager filtering and annotation queryset = self.only_filtered(prepare_params=prepare_params) # Expose view data to annotation functions for column-specific configuration queryset.view_data = getattr(prepare_params, 'data', None) @@ -781,5 +838,24 @@ def only_filtered(self, prepare_params=None): class TaskManager(models.Manager): + """ + Default manager for Task model. + + Provides: + - User-scoped filtering + - Custom QuerySet with FSM state support + + Note: Overrides get_queryset() to return TaskQuerySet, which includes + FSMStateQuerySetMixin for state annotation support. + """ + + def get_queryset(self): + """Return TaskQuerySet which includes FSM state annotation support""" + return TaskQuerySet(self.model, using=self._db) + def for_user(self, user): - return self.filter(project__organization=user.active_organization) + return self.get_queryset().filter(project__organization=user.active_organization) + + def with_state(self): + """Return queryset with FSM state annotated.""" + return self.get_queryset().annotate_fsm_state() diff --git a/label_studio/data_manager/serializers.py b/label_studio/data_manager/serializers.py index 9679d37c80fe..acefe682a611 100644 --- a/label_studio/data_manager/serializers.py +++ b/label_studio/data_manager/serializers.py @@ -3,10 +3,13 @@ import os import ujson as json +from core.current_request import CurrentContext +from core.feature_flags import flag_set from data_manager.models import Filter, FilterGroup, View from django.conf import settings from django.db import transaction from drf_spectacular.utils import extend_schema_field +from fsm.serializer_fields import FSMStateField from projects.models import Project from rest_framework import serializers from tasks.models import Task @@ -434,6 +437,8 @@ class PredictionsDMFieldSerializer(serializers.SerializerMethodField): class DataManagerTaskSerializer(TaskSerializer): + """Data Manager Task Serializer with FSM state support.""" + predictions = PredictionsDMFieldSerializer(required=False, read_only=True) annotations = AnnotationsDMFieldSerializer(required=False, many=True, default=[], read_only=True) drafts = AnnotationDraftDMFieldSerializer(required=False, read_only=True) @@ -454,6 +459,7 @@ class DataManagerTaskSerializer(TaskSerializer): avg_lead_time = serializers.FloatField(required=False) draft_exists = serializers.BooleanField(required=False) updated_by = UpdatedByDMFieldSerializer(required=False, read_only=True) + state = FSMStateField(read_only=True) # FSM state - automatically uses annotation if present CHAR_LIMITS = 500 @@ -470,6 +476,13 @@ def to_representation(self, obj): ret.pop('annotations', None) if not self.context.get('predictions'): ret.pop('predictions', None) + # Remove state field if feature flags are disabled + user = CurrentContext.get_user() + if not ( + flag_set('fflag_feat_fit_568_finite_state_management', user=user) + and flag_set('fflag_feat_fit_710_fsm_state_fields', user=user) + ): + ret.pop('state', None) return ret def _pretty_results(self, task, field, unique=False): diff --git a/label_studio/fsm/queryset_mixins.py b/label_studio/fsm/queryset_mixins.py new file mode 100644 index 000000000000..308a72a19efa --- /dev/null +++ b/label_studio/fsm/queryset_mixins.py @@ -0,0 +1,113 @@ +""" +FSM QuerySet Mixins for annotating entities with their current state. + +Provides reusable Django QuerySet mixins that efficiently annotate entities +with their current FSM state using optimized subqueries to prevent N+1 queries. + +Usage: + class TaskQuerySet(FSMStateQuerySetMixin, models.QuerySet): + pass + + class TaskManager(models.Manager): + def get_queryset(self): + return TaskQuerySet(self.model, using=self._db).annotate_fsm_state() + +Note: + All state annotation functionality is guarded by TWO feature flags: + 1. 'fflag_feat_fit_568_finite_state_management' - Controls FSM background calculations + 2. 'fflag_feat_fit_710_fsm_state_fields' - Controls state field display in APIs + + When disabled, no annotation is performed and there is zero performance impact. +""" + +import logging + +from core.current_request import CurrentContext +from core.feature_flags import flag_set +from django.db.models import OuterRef, Subquery +from fsm.registry import get_state_model + +logger = logging.getLogger(__name__) + + +class FSMStateQuerySetMixin: + """ + Mixin for Django QuerySets to efficiently annotate FSM state. + + Provides the `annotate_fsm_state()` method that adds a `current_state` + annotation to the queryset using an optimized subquery. + + This approach: + - Prevents N+1 queries by using a single JOIN/subquery + - Handles missing states gracefully (returns None) + - Uses UUID7 natural ordering for optimal performance + - Works with any FSM entity that has a registered state model + + Example: + # In your model manager + class TaskManager(models.Manager): + def get_queryset(self): + return TaskQuerySet(self.model, using=self._db) + + def with_state(self): + return self.get_queryset().annotate_fsm_state() + + # Usage + tasks = Task.objects.with_state().filter(project=project) + for task in tasks: + print(f"Task {task.id}: {task.current_state}") # No additional queries! + """ + + def annotate_fsm_state(self): + """ + Annotate the queryset with the current FSM state. + + Adds a `current_state` field to each object containing the current + state string value. This is done using an efficient subquery that + leverages UUID7 natural ordering. + + Returns: + QuerySet: The annotated queryset with `current_state` field + + Note: + - If FSM feature flag is disabled, returns queryset unchanged (zero impact) + - If no state exists for an entity, `current_state` will be None + - The state is read-only and should not be modified directly + """ + # Check feature flag directly (works for both core and enterprise) + # Using flag_set directly instead of is_fsm_enabled to work in enterprise context + user = CurrentContext.get_user() + if not ( + flag_set('fflag_feat_fit_568_finite_state_management', user=user) + and flag_set('fflag_feat_fit_710_fsm_state_fields', user=user) + ): + logger.debug('FSM feature flag disabled, skipping state annotation') + return self + + # Get the entity name from the model + entity_name = self.model._meta.model_name + + # Get the state model for this entity + state_model = get_state_model(entity_name) + + if not state_model: + # No state model registered, return queryset as-is + logger.debug(f'No state model registered for {entity_name}, skipping annotation') + return self + + # Get the foreign key field name on the state model + # e.g., 'task_id' for TaskState + entity_field_name = state_model._get_entity_field_name() + fk_field = f'{entity_field_name}_id' + + # Create subquery to get current state using UUID7 natural ordering + # This is extremely efficient because: + # 1. UUID7 provides natural time ordering (latest = highest ID) + # 2. We only fetch the state column, not the entire record + # 3. Django optimizes this into a single JOIN or lateral subquery + current_state_subquery = Subquery( + state_model.objects.filter(**{fk_field: OuterRef('pk')}).order_by('-id').values('state')[:1] + ) + + # Annotate the queryset with the current state + return self.annotate(current_state=current_state_subquery) diff --git a/label_studio/fsm/serializer_fields.py b/label_studio/fsm/serializer_fields.py new file mode 100644 index 000000000000..2ad63585d20f --- /dev/null +++ b/label_studio/fsm/serializer_fields.py @@ -0,0 +1,122 @@ +""" +FSM Serializer Fields for Django Rest Framework. + +Provides reusable DRF serializer fields for exposing FSM state in API responses. +These fields work seamlessly with the FSMStateQuerySetMixin annotations to prevent +N+1 queries. + +Usage: + from fsm.serializer_fields import FSMStateField + + class TaskSerializer(serializers.ModelSerializer): + state = FSMStateField(read_only=True) + + class Meta: + model = Task + fields = ['id', 'data', 'state', ...] + +Note: + All state serialization functionality is guarded by TWO feature flags: + 1. 'fflag_feat_fit_568_finite_state_management' - Controls FSM background calculations + 2. 'fflag_feat_fit_710_fsm_state_fields' - Controls state field display in APIs + + When either flag is disabled, fields return None. This allows enabling FSM background + work while keeping state fields hidden during incremental rollout and testing. +""" + +from core.current_request import CurrentContext +from core.feature_flags import flag_set +from fsm.state_manager import StateManager +from rest_framework import serializers + + +class FSMStateField(serializers.ReadOnlyField): + """ + Read-only DRF field for exposing FSM state. + + This field automatically uses the `state` or `current_state` annotation if present + (preventing N+1 queries), or falls back to querying the state manager + if the annotation is missing. + + Key features: + - Works with annotated querysets (no N+1 queries) + - Falls back to StateManager for single object retrievals + - Always read-only (state changes through transitions only) + - Returns None if FSM is disabled or no state exists + + Example with annotations (optimal): + # In your viewset + def get_queryset(self): + return Task.objects.all().annotate_fsm_state() + + # In your serializer + class TaskSerializer(serializers.ModelSerializer): + state = FSMStateField() + + class Meta: + model = Task + fields = ['id', 'data', 'state'] + + # Result: No N+1 queries, state comes from annotation + + Example without annotations (fallback): + # Direct object retrieval + task = Task.objects.get(id=123) + serializer = TaskSerializer(task) + + # Result: Calls StateManager.get_current_state_value() + # Still efficient due to StateManager caching + """ + + def __init__(self, **kwargs): + # Set source='*' to pass the entire object instance to to_representation() + # instead of a specific attribute, since we check multiple possible attributes + kwargs.setdefault('source', '*') + super().__init__(**kwargs) + + def to_representation(self, instance): + """ + Serialize the FSM state to a string. + + Args: + instance: The model instance being serialized + + Returns: + str or None: The current state value (None if either feature flag disabled) + """ + # Check both feature flags (works for both core and enterprise) + # 1. General FSM functionality (background calculations) + # 2. State field display control (API exposure) + user = CurrentContext.get_user() + if not ( + flag_set('fflag_feat_fit_568_finite_state_management', user=user) + and flag_set('fflag_feat_fit_710_fsm_state_fields', user=user) + ): + return None + + if instance is None: + return None + + # Check if the instance has a state annotation + # This can come from Data Manager's annotate_state() or FSMStateQuerySetMixin.annotate_fsm_state() + if hasattr(instance, 'state'): + # Use the annotated value (no additional query) + return instance.state + elif hasattr(instance, 'current_state'): + # Fallback to current_state annotation from FSMStateQuerySetMixin + return instance.current_state + + # Fallback: Query the state manager + # This happens when the queryset wasn't annotated + # StateManager has its own caching, so this is still efficient + try: + return StateManager.get_current_state_value(instance) + except Exception: + # If FSM is disabled or state model not found, return None + return None + + def to_internal_value(self, data): + """ + This field is read-only, so this should never be called. + """ + raise NotImplementedError('FSMStateField is read-only. Use transitions to change state.') diff --git a/label_studio/projects/models.py b/label_studio/projects/models.py index 695e131f9e42..fc9d5790ad52 100644 --- a/label_studio/projects/models.py +++ b/label_studio/projects/models.py @@ -35,6 +35,7 @@ from django.utils.functional import cached_property from django.utils.translation import gettext_lazy as _ from fsm.models import FsmHistoryStateModel +from fsm.queryset_mixins import FSMStateQuerySetMixin from label_studio_sdk._extensions.label_studio_tools.core.label_config import parse_config from labels_manager.models import Label from projects.functions import ( @@ -62,7 +63,28 @@ logger = logging.getLogger(__name__) +class ProjectQuerySet(models.QuerySet): + pass + + +class ProjectQuerySetWithFSM(FSMStateQuerySetMixin, ProjectQuerySet): + """ + Custom QuerySet for Project model with FSM state annotation support. + """ + + pass + + class ProjectManager(models.Manager): + """ + Manager for Project model. + + Provides: + - User-scoped filtering + - Counter annotations for project statistics + - FSM state annotation support + """ + COUNTER_FIELDS = [ 'task_number', 'finished_task_number', @@ -85,11 +107,26 @@ class ProjectManager(models.Manager): 'skipped_annotations_number': annotate_skipped_annotations_number, } + def get_queryset(self): + """Return ProjectQuerySet with FSM state annotation support""" + return ProjectQuerySetWithFSM(self.model, using=self._db) + def for_user(self, user): - return self.filter(organization=user.active_organization) + return self.get_queryset().filter(organization=user.active_organization) + + def with_state(self): + """ + Return queryset with FSM state annotated. + + Example: + projects = Project.objects.with_state().filter(organization=org) + for project in projects: + print(project.current_state) # No N+1 queries! + """ + return self.get_queryset().annotate_fsm_state() def with_counts(self, fields=None): - return self.with_counts_annotate(self, fields=fields) + return self.with_counts_annotate(self.get_queryset(), fields=fields) @staticmethod def with_counts_annotate(queryset, fields=None, exclude=None): diff --git a/label_studio/tasks/models.py b/label_studio/tasks/models.py index 816b0a6b0bae..bc22b35b8071 100644 --- a/label_studio/tasks/models.py +++ b/label_studio/tasks/models.py @@ -38,6 +38,7 @@ from django.utils.timezone import now from django.utils.translation import gettext_lazy as _ from fsm.models import FsmHistoryStateModel +from fsm.queryset_mixins import FSMStateQuerySetMixin from label_studio_sdk.label_interface.objects import PredictionValue from rest_framework.exceptions import ValidationError from tasks.choices import ActionType @@ -565,9 +566,40 @@ def delete(self, *args, **kwargs): post_bulk_create = Signal() # providing args 'objs' and 'batch_size' +class AnnotationQuerySet(models.QuerySet): + pass + + +class AnnotationQuerySetWithFSM(FSMStateQuerySetMixin, AnnotationQuerySet): + """ + Custom QuerySet for Annotation model with FSM state annotation support. + """ + + pass + + class AnnotationManager(models.Manager): + """ + Manager for Annotation model with FSM state support. + + Provides: + - User-scoped filtering + - Bulk creation with signals + - FSM state annotation support + """ + + def get_queryset(self): + """Return AnnotationQuerySet with FSM state annotation support""" + # Create a dynamic class that mixes FSM support into the queryset + + return AnnotationQuerySetWithFSM(self.model, using=self._db) + def for_user(self, user): - return self.filter(project__organization=user.active_organization) + return self.get_queryset().filter(project__organization=user.active_organization) + + def with_state(self): + """Return queryset with FSM state annotated.""" + return self.get_queryset().annotate_fsm_state() def bulk_create(self, objs, batch_size=None): pre_bulk_create.send(sender=self.model, objs=objs, batch_size=batch_size) @@ -576,12 +608,6 @@ def bulk_create(self, objs, batch_size=None): return res -GET_UNIQUE_IDS = """ -with tt as ( - select jsonb_array_elements(tch.result) as item from task_completion_history tch - where task=%(t_id)s and task_annotation=%(tc_id)s -) select count( distinct tt.item -> 'id') from tt""" - AnnotationMixin = load_func(settings.ANNOTATION_MIXIN) @@ -814,7 +840,30 @@ def on_delete_update_counters(self): self.decrease_project_summary_counters() +class TaskLockQuerySet(models.QuerySet): + """Custom QuerySet for TaskLock model""" + + pass + + +class TaskLockManager(models.Manager): + """Manager for TaskLock with FSM state support""" + + def get_queryset(self): + """Return QuerySet with FSM state annotation support""" + # Create a dynamic class that mixes FSM support into the queryset + class TaskLockQuerySetWithFSM(FSMStateQuerySetMixin, TaskLockQuerySet): + pass + + return TaskLockQuerySetWithFSM(self.model, using=self._db) + + def with_state(self): + """Return queryset with FSM state annotated.""" + return self.get_queryset().annotate_fsm_state() + + class TaskLock(FsmHistoryStateModel): + objects = TaskLockManager() task = models.ForeignKey( 'tasks.Task', on_delete=models.CASCADE, @@ -832,7 +881,30 @@ class TaskLock(FsmHistoryStateModel): created_at = models.DateTimeField(_('created at'), auto_now_add=True, help_text='Creation time', null=True) +class AnnotationDraftQuerySet(models.QuerySet): + """Custom QuerySet for AnnotationDraft model""" + + pass + + +class AnnotationDraftManager(models.Manager): + """Manager for AnnotationDraft with FSM state support""" + + def get_queryset(self): + """Return QuerySet with FSM state annotation support""" + # Create a dynamic class that mixes FSM support into the queryset + class AnnotationDraftQuerySetWithFSM(FSMStateQuerySetMixin, AnnotationDraftQuerySet): + pass + + return AnnotationDraftQuerySetWithFSM(self.model, using=self._db) + + def with_state(self): + """Return queryset with FSM state annotated.""" + return self.get_queryset().annotate_fsm_state() + + class AnnotationDraft(FsmHistoryStateModel): + objects = AnnotationDraftManager() result = JSONField(_('result'), help_text='Draft result in JSON format') lead_time = models.FloatField( _('lead time'), diff --git a/label_studio/tasks/serializers.py b/label_studio/tasks/serializers.py index 0bdabe42b25e..e6c328f3aca4 100644 --- a/label_studio/tasks/serializers.py +++ b/label_studio/tasks/serializers.py @@ -3,7 +3,7 @@ import logging import ujson as json -from core.current_request import get_current_request +from core.current_request import CurrentContext, get_current_request from core.feature_flags import flag_set from core.label_config import replace_task_data_undefined_with_config_field from core.utils.common import load_func, retry_database_locked @@ -11,6 +11,7 @@ from django.conf import settings from django.db import IntegrityError, transaction from drf_spectacular.utils import extend_schema_field +from fsm.serializer_fields import FSMStateField from label_studio_sdk.label_interface import LabelInterface from projects.models import Project from rest_flex_fields import FlexFieldsModelSerializer @@ -121,7 +122,15 @@ class Meta: class AnnotationSerializer(FlexFieldsModelSerializer): - """ """ + """ + Annotation Serializer with FSM state support. + + Note: The 'state' field will be populated from the queryset annotation + if present, preventing N+1 queries. Use .with_state() on your queryset. + """ + + state = FSMStateField(read_only=True) # FSM state - automatically uses annotation if present + """""" result = AnnotationResultField(required=False) created_username = serializers.SerializerMethodField(default='', read_only=True, help_text='Username string') @@ -168,6 +177,17 @@ def get_created_username(self, annotation) -> str: name += f' {user.email}, {user.id}' return name + def to_representation(self, obj): + """Remove state field if feature flags are disabled""" + ret = super().to_representation(obj) + user = CurrentContext.get_user() + if not ( + flag_set('fflag_feat_fit_568_finite_state_management', user=user) + and flag_set('fflag_feat_fit_710_fsm_state_fields', user=user) + ): + ret.pop('state', None) + return ret + class Meta: model = Annotation exclude = ['prediction', 'result_count'] @@ -710,7 +730,14 @@ class Meta: class AnnotationDraftSerializer(ModelSerializer): + """ + AnnotationDraft Serializer with FSM state support. + + Note: The 'state' field will be populated from the queryset annotation + if present, preventing N+1 queries. Use .with_state() on your queryset. + """ + state = FSMStateField(read_only=True) # FSM state - automatically uses annotation if present user = serializers.CharField(default=serializers.CurrentUserDefault()) created_username = serializers.SerializerMethodField(default='', read_only=True, help_text='User name string') created_ago = serializers.CharField(default='', read_only=True, help_text='Delta time from creation time') @@ -727,6 +754,17 @@ def get_created_username(self, draft): name += (' ' if name else '') + f'{user.email}, {user.id}' return name + def to_representation(self, obj): + """Remove state field if feature flags are disabled""" + ret = super().to_representation(obj) + user = CurrentContext.get_user() + if not ( + flag_set('fflag_feat_fit_568_finite_state_management', user=user) + and flag_set('fflag_feat_fit_710_fsm_state_fields', user=user) + ): + ret.pop('state', None) + return ret + class Meta: model = AnnotationDraft fields = '__all__'