diff --git a/ureport/stats/migrations/0033_backfill_poll_stats_counters_dedupes.py b/ureport/stats/migrations/0033_backfill_poll_stats_counters_dedupes.py new file mode 100644 index 000000000..2ba87ddf1 --- /dev/null +++ b/ureport/stats/migrations/0033_backfill_poll_stats_counters_dedupes.py @@ -0,0 +1,193 @@ +# Generated by Django 5.2.8 on 2025-12-01 13:54 +import time +from datetime import timedelta + +from django.core.cache import cache +from django.db import migrations +from django.db.models import Count +from django.utils import timezone + +from ureport.utils import chunk_list + + +def noop(apps, schema_editor): # pragma: no cover + pass + + +def dedupe_poll_stats_by_questions(apps, schema_editor): # pragma: no cover + PollStats = apps.get_model("stats", "PollStats") + FlowResult = apps.get_model("flows", "FlowResult") + + results_with_duplicate_questions = ( + FlowResult.objects.all().annotate(qs_count=Count("pollquestion")).filter(qs_count__gte=2).order_by("id") + ) + + for flow_result in results_with_duplicate_questions: + questions = flow_result.pollquestion_set.all() + if not questions: + continue + first_question = questions[0] + stats = ( + PollStats.objects.exclude(question=None).filter(flow_result=flow_result).exclude(question=first_question) + ) + if stats.exists(): + print(f"Found {stats.count()} PollStats duplicates for FlowResult ID {flow_result.id}") + stats.delete() + else: + print(f"No duplicate PollStats for FlowResult ID {flow_result.id}") + updated = ( + PollStats.objects.exclude(question=None) + .filter(flow_result=flow_result) + .filter(question=first_question) + .update(question=None) + ) + print(f"Deduped PollStats for FlowResult ID {flow_result.id}, updated {updated} entries") + print("======================") + + +def backfill_poll_stats_counters(apps, schema_editor): # pragma: no cover + PollStats = apps.get_model("stats", "PollStats") + + GenderSegment = apps.get_model("stats", "GenderSegment") + AgeSegment = apps.get_model("stats", "AgeSegment") + SchemeSegment = apps.get_model("stats", "SchemeSegment") + + PollStatsCounter = apps.get_model("stats", "PollStatsCounter") + PollEngagementDailyCount = apps.get_model("stats", "PollEngagementDailyCount") + + Poll = apps.get_model("polls", "Poll") + PollQuestion = apps.get_model("polls", "PollQuestion") + Boundary = apps.get_model("locations", "Boundary") + STATE_LEVEL = 1 + DISTRICT_LEVEL = 2 + WARD_LEVEL = 3 + + stopped_polls = list(Poll.objects.filter(stopped_syncing=True).values_list("id", flat=True)) + flow_result_ids = list( + PollQuestion.objects.filter(poll_id__in=stopped_polls).values_list("flow_result_id", flat=True) + ) + + last_backfilled_poll_stats_id_key = "deduplicated_migrations_backfilled_poll_stats_last_id" + last_id = cache.get(last_backfilled_poll_stats_id_key, 0) + + if last_id == 0: + poll_stats_counters_ids = PollStatsCounter.objects.filter(flow_result_id__in=flow_result_ids) + poll_stats_counters_ids = list(poll_stats_counters_ids.values_list("pk", flat=True)) + + poll_stats_counters_ids_count = len(poll_stats_counters_ids) + + for batch in chunk_list(poll_stats_counters_ids, 1000): + batch_ids = list(batch) + PollStatsCounter.objects.filter(pk__in=batch_ids).delete() + + poll_engagement_daily_count_ids = PollEngagementDailyCount.objects.filter(flow_result_id__in=flow_result_ids) + poll_engagement_daily_count_ids = list(poll_engagement_daily_count_ids.values_list("pk", flat=True)) + + poll_engagement_daily_count_ids_count = len(poll_engagement_daily_count_ids) + + for batch in chunk_list(poll_engagement_daily_count_ids, 1000): + batch_ids = list(batch) + PollEngagementDailyCount.objects.filter(pk__in=batch_ids).delete() + print( + "Backfill for the first time, Deleted %d PollStatsCounter and %d PollEngagementDailyCount entries" + % (poll_stats_counters_ids_count, poll_engagement_daily_count_ids_count) + ) + + poll_stat_ids = list( + PollStats.objects.filter(flow_result_id__in=flow_result_ids, id__gt=last_id) + .order_by("id") + .values_list("id", flat=True) + ) + total = len(poll_stat_ids) + print(f"Total PollStats to migrate: {total}") + start_time = time.time() + + gender_dict = {elt.id: elt.gender.lower() for elt in GenderSegment.objects.all()} + age_dict = {elt.id: elt.min_age for elt in AgeSegment.objects.all()} + scheme_dict = {elt.id: elt.scheme.lower() for elt in SchemeSegment.objects.all()} + + boundaries = Boundary.objects.all().select_related("parent__parent") + location_dict = {elt.id: elt for elt in boundaries} + + processed = 0 + for batch in chunk_list(poll_stat_ids, 1000): + batch_ids = list(batch) + stats = PollStats.objects.filter(id__in=batch_ids) + + poll_stats_counter_obj_to_insert = [] + poll_engagement_daily_count_obj_to_insert = [] + + for stat in stats: + stat_counter_kwargs = dict( + org_id=stat.org_id, + flow_result_id=stat.flow_result_id, + flow_result_category_id=stat.flow_result_category_id, + count=stat.count, + ) + engagement_counter_kwargs = dict() + if stat.date is not None and stat.date >= (timezone.now() - timedelta(days=400)): + engagement_counter_kwargs = dict( + org_id=stat.org_id, + flow_result_id=stat.flow_result_id, + is_responded=bool(stat.flow_result_category_id), + day=stat.date.date(), + count=stat.count, + ) + + scopes = ["all"] + if stat.age_segment_id and age_dict.get(stat.age_segment_id) is not None: + scopes.append(f"age:{age_dict.get(stat.age_segment_id)}") + if stat.gender_segment_id and gender_dict.get(stat.gender_segment_id) is not None: + scopes.append(f"gender:{gender_dict.get(stat.gender_segment_id)}") + if stat.scheme_segment_id and scheme_dict.get(stat.scheme_segment_id) is not None: + scopes.append(f"scheme:{scheme_dict.get(stat.scheme_segment_id)}") + if stat.location_id: + location = location_dict.get(stat.location_id) + if location: + if location.level == WARD_LEVEL: + scopes.append(f"ward:{location.osm_id.upper()}") + if location.parent: + scopes.append(f"district:{location.parent.osm_id.upper()}") + if location.parent and location.parent.parent: + scopes.append(f"state:{location.parent.parent.osm_id.upper()}") + if location.level == DISTRICT_LEVEL: + scopes.append(f"district:{location.osm_id.upper()}") + if location.parent: + scopes.append(f"state:{location.parent.osm_id.upper()}") + if location.level == STATE_LEVEL: + scopes.append(f"state:{location.osm_id.upper()}") + + for scope in scopes: + poll_stats_counter_obj_to_insert.append(PollStatsCounter(**stat_counter_kwargs, scope=scope)) + if engagement_counter_kwargs and "district:" not in scope and "ward:" not in scope: + poll_engagement_daily_count_obj_to_insert.append( + PollEngagementDailyCount(**engagement_counter_kwargs, scope=scope) + ) + + PollStatsCounter.objects.bulk_create(poll_stats_counter_obj_to_insert, batch_size=1000) + PollEngagementDailyCount.objects.bulk_create(poll_engagement_daily_count_obj_to_insert, batch_size=1000) + + cache.set(last_backfilled_poll_stats_id_key, batch_ids[-1], None) + + processed += len(batch_ids) + elapsed = time.time() - start_time + print(f"Backfilled {processed} of {total} PollStats in {elapsed:.1f} seconds") + + +def apply_manual(): # pragma: no cover + from django.apps import apps + + dedupe_poll_stats_by_questions(apps, None) + backfill_poll_stats_counters(apps, None) + + +class Migration(migrations.Migration): + + dependencies = [ + ("stats", "0032_backfill_poll_stats_counters"), + ] + + operations = [ + migrations.RunPython(dedupe_poll_stats_by_questions, noop), + migrations.RunPython(backfill_poll_stats_counters, noop), + ]