Skip to content

Commit 8a24dd7

Browse files
authored
Merge pull request #1319 from rapidpro/update-backfill-stats
Rebackfill the poll stats counter and engagement counter after dedupl…
2 parents 483da2b + 8665fea commit 8a24dd7

File tree

1 file changed

+193
-0
lines changed

1 file changed

+193
-0
lines changed
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
# Generated by Django 5.2.8 on 2025-12-01 13:54
2+
import time
3+
from datetime import timedelta
4+
5+
from django.core.cache import cache
6+
from django.db import migrations
7+
from django.db.models import Count
8+
from django.utils import timezone
9+
10+
from ureport.utils import chunk_list
11+
12+
13+
def noop(apps, schema_editor): # pragma: no cover
14+
pass
15+
16+
17+
def dedupe_poll_stats_by_questions(apps, schema_editor): # pragma: no cover
18+
PollStats = apps.get_model("stats", "PollStats")
19+
FlowResult = apps.get_model("flows", "FlowResult")
20+
21+
results_with_duplicate_questions = (
22+
FlowResult.objects.all().annotate(qs_count=Count("pollquestion")).filter(qs_count__gte=2).order_by("id")
23+
)
24+
25+
for flow_result in results_with_duplicate_questions:
26+
questions = flow_result.pollquestion_set.all()
27+
if not questions:
28+
continue
29+
first_question = questions[0]
30+
stats = (
31+
PollStats.objects.exclude(question=None).filter(flow_result=flow_result).exclude(question=first_question)
32+
)
33+
if stats.exists():
34+
print(f"Found {stats.count()} PollStats duplicates for FlowResult ID {flow_result.id}")
35+
stats.delete()
36+
else:
37+
print(f"No duplicate PollStats for FlowResult ID {flow_result.id}")
38+
updated = (
39+
PollStats.objects.exclude(question=None)
40+
.filter(flow_result=flow_result)
41+
.filter(question=first_question)
42+
.update(question=None)
43+
)
44+
print(f"Deduped PollStats for FlowResult ID {flow_result.id}, updated {updated} entries")
45+
print("======================")
46+
47+
48+
def backfill_poll_stats_counters(apps, schema_editor): # pragma: no cover
49+
PollStats = apps.get_model("stats", "PollStats")
50+
51+
GenderSegment = apps.get_model("stats", "GenderSegment")
52+
AgeSegment = apps.get_model("stats", "AgeSegment")
53+
SchemeSegment = apps.get_model("stats", "SchemeSegment")
54+
55+
PollStatsCounter = apps.get_model("stats", "PollStatsCounter")
56+
PollEngagementDailyCount = apps.get_model("stats", "PollEngagementDailyCount")
57+
58+
Poll = apps.get_model("polls", "Poll")
59+
PollQuestion = apps.get_model("polls", "PollQuestion")
60+
Boundary = apps.get_model("locations", "Boundary")
61+
STATE_LEVEL = 1
62+
DISTRICT_LEVEL = 2
63+
WARD_LEVEL = 3
64+
65+
stopped_polls = list(Poll.objects.filter(stopped_syncing=True).values_list("id", flat=True))
66+
flow_result_ids = list(
67+
PollQuestion.objects.filter(poll_id__in=stopped_polls).values_list("flow_result_id", flat=True)
68+
)
69+
70+
last_backfilled_poll_stats_id_key = "deduplicated_migrations_backfilled_poll_stats_last_id"
71+
last_id = cache.get(last_backfilled_poll_stats_id_key, 0)
72+
73+
if last_id == 0:
74+
poll_stats_counters_ids = PollStatsCounter.objects.filter(flow_result_id__in=flow_result_ids)
75+
poll_stats_counters_ids = list(poll_stats_counters_ids.values_list("pk", flat=True))
76+
77+
poll_stats_counters_ids_count = len(poll_stats_counters_ids)
78+
79+
for batch in chunk_list(poll_stats_counters_ids, 1000):
80+
batch_ids = list(batch)
81+
PollStatsCounter.objects.filter(pk__in=batch_ids).delete()
82+
83+
poll_engagement_daily_count_ids = PollEngagementDailyCount.objects.filter(flow_result_id__in=flow_result_ids)
84+
poll_engagement_daily_count_ids = list(poll_engagement_daily_count_ids.values_list("pk", flat=True))
85+
86+
poll_engagement_daily_count_ids_count = len(poll_engagement_daily_count_ids)
87+
88+
for batch in chunk_list(poll_engagement_daily_count_ids, 1000):
89+
batch_ids = list(batch)
90+
PollEngagementDailyCount.objects.filter(pk__in=batch_ids).delete()
91+
print(
92+
"Backfill for the first time, Deleted %d PollStatsCounter and %d PollEngagementDailyCount entries"
93+
% (poll_stats_counters_ids_count, poll_engagement_daily_count_ids_count)
94+
)
95+
96+
poll_stat_ids = list(
97+
PollStats.objects.filter(flow_result_id__in=flow_result_ids, id__gt=last_id)
98+
.order_by("id")
99+
.values_list("id", flat=True)
100+
)
101+
total = len(poll_stat_ids)
102+
print(f"Total PollStats to migrate: {total}")
103+
start_time = time.time()
104+
105+
gender_dict = {elt.id: elt.gender.lower() for elt in GenderSegment.objects.all()}
106+
age_dict = {elt.id: elt.min_age for elt in AgeSegment.objects.all()}
107+
scheme_dict = {elt.id: elt.scheme.lower() for elt in SchemeSegment.objects.all()}
108+
109+
boundaries = Boundary.objects.all().select_related("parent__parent")
110+
location_dict = {elt.id: elt for elt in boundaries}
111+
112+
processed = 0
113+
for batch in chunk_list(poll_stat_ids, 1000):
114+
batch_ids = list(batch)
115+
stats = PollStats.objects.filter(id__in=batch_ids)
116+
117+
poll_stats_counter_obj_to_insert = []
118+
poll_engagement_daily_count_obj_to_insert = []
119+
120+
for stat in stats:
121+
stat_counter_kwargs = dict(
122+
org_id=stat.org_id,
123+
flow_result_id=stat.flow_result_id,
124+
flow_result_category_id=stat.flow_result_category_id,
125+
count=stat.count,
126+
)
127+
engagement_counter_kwargs = dict()
128+
if stat.date is not None and stat.date >= (timezone.now() - timedelta(days=400)):
129+
engagement_counter_kwargs = dict(
130+
org_id=stat.org_id,
131+
flow_result_id=stat.flow_result_id,
132+
is_responded=bool(stat.flow_result_category_id),
133+
day=stat.date.date(),
134+
count=stat.count,
135+
)
136+
137+
scopes = ["all"]
138+
if stat.age_segment_id and age_dict.get(stat.age_segment_id) is not None:
139+
scopes.append(f"age:{age_dict.get(stat.age_segment_id)}")
140+
if stat.gender_segment_id and gender_dict.get(stat.gender_segment_id) is not None:
141+
scopes.append(f"gender:{gender_dict.get(stat.gender_segment_id)}")
142+
if stat.scheme_segment_id and scheme_dict.get(stat.scheme_segment_id) is not None:
143+
scopes.append(f"scheme:{scheme_dict.get(stat.scheme_segment_id)}")
144+
if stat.location_id:
145+
location = location_dict.get(stat.location_id)
146+
if location:
147+
if location.level == WARD_LEVEL:
148+
scopes.append(f"ward:{location.osm_id.upper()}")
149+
if location.parent:
150+
scopes.append(f"district:{location.parent.osm_id.upper()}")
151+
if location.parent and location.parent.parent:
152+
scopes.append(f"state:{location.parent.parent.osm_id.upper()}")
153+
if location.level == DISTRICT_LEVEL:
154+
scopes.append(f"district:{location.osm_id.upper()}")
155+
if location.parent:
156+
scopes.append(f"state:{location.parent.osm_id.upper()}")
157+
if location.level == STATE_LEVEL:
158+
scopes.append(f"state:{location.osm_id.upper()}")
159+
160+
for scope in scopes:
161+
poll_stats_counter_obj_to_insert.append(PollStatsCounter(**stat_counter_kwargs, scope=scope))
162+
if engagement_counter_kwargs and "district:" not in scope and "ward:" not in scope:
163+
poll_engagement_daily_count_obj_to_insert.append(
164+
PollEngagementDailyCount(**engagement_counter_kwargs, scope=scope)
165+
)
166+
167+
PollStatsCounter.objects.bulk_create(poll_stats_counter_obj_to_insert, batch_size=1000)
168+
PollEngagementDailyCount.objects.bulk_create(poll_engagement_daily_count_obj_to_insert, batch_size=1000)
169+
170+
cache.set(last_backfilled_poll_stats_id_key, batch_ids[-1], None)
171+
172+
processed += len(batch_ids)
173+
elapsed = time.time() - start_time
174+
print(f"Backfilled {processed} of {total} PollStats in {elapsed:.1f} seconds")
175+
176+
177+
def apply_manual(): # pragma: no cover
178+
from django.apps import apps
179+
180+
dedupe_poll_stats_by_questions(apps, None)
181+
backfill_poll_stats_counters(apps, None)
182+
183+
184+
class Migration(migrations.Migration):
185+
186+
dependencies = [
187+
("stats", "0032_backfill_poll_stats_counters"),
188+
]
189+
190+
operations = [
191+
migrations.RunPython(dedupe_poll_stats_by_questions, noop),
192+
migrations.RunPython(backfill_poll_stats_counters, noop),
193+
]

0 commit comments

Comments
 (0)