Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 35 additions & 20 deletions api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import urllib.parse
from collections import defaultdict

import pandas as pd
from django.core.cache import cache
from django.db.models import Count, Sum
from django.db.models.functions import TruncDate
Expand Down Expand Up @@ -122,21 +121,39 @@ def _per_day_counts(model, since: datetime.date | None = None) -> list[dict]:

def _resample_per_day(
rows: list[dict], since: datetime.date | None = None
) -> pd.DataFrame:
) -> list[dict]:
"""Fill gaps so every calendar day has a count.

The series always extends to today; when ``since`` is given it also starts
exactly at ``since`` (padding the leading days with zeros) so the window has
a fixed length regardless of when the first event happened.
Returns ``[{"day": date, "count": int}, ...]`` for every calendar day in the
window, in ascending order, padding missing days with ``0``. The series
always extends to today; when ``since`` is given it also starts exactly at
``since`` so the window has a fixed length regardless of when the first
event happened.

A plain loop replaces the former ``pandas.DataFrame.asfreq`` round-trip,
which pulled in the heavy pandas dependency only to forward-fill zero days.
"""
today = datetime.date.today()
if since is not None and (len(rows) == 0 or rows[0]["day"] != since):
rows = [{"day": since, "count": 0}] + rows
if len(rows) == 0 or rows[-1]["day"] != today:
rows = rows + [{"day": today, "count": 0}]
frame = pd.DataFrame(rows).set_index("day").asfreq("1D", fill_value=0)
frame["day"] = frame.index.map(lambda x: x.to_pydatetime().date())
return frame
counts = {row["day"]: int(row["count"]) for row in rows}
if since is not None:
start = since
elif rows:
start = rows[0]["day"]
else:
start = today
# 終端は常に今日まで伸ばす(最後のイベント日が今日より前でも 0 埋めで今日まで含める)。
end = max(today, start)
result: list[dict] = []
day = start
while day <= end:
result.append({"day": day, "count": counts.get(day, 0)})
day += datetime.timedelta(days=1)
return result


def _mean_per_day(rows: list[dict]) -> float:
"""Mean of the daily counts over the window (matches pandas ``Series.mean``)."""
return sum(row["count"] for row in rows) / len(rows) if rows else 0.0


class HealthCheckAPI(APIView):
Expand Down Expand Up @@ -191,8 +208,7 @@ def get(self, request, format=None) -> Response:
since = _since(days)

def produce():
frame = _resample_per_day(_per_day_counts(AnimeUser, since), since)
return frame.to_dict(orient="records")
return _resample_per_day(_per_day_counts(AnimeUser, since), since)

return Response({"data": _cached(f"stats:active-user-per-day:{days}", produce)})

Expand All @@ -210,8 +226,7 @@ def get(self, request, format=None) -> Response:
since = _since(days)

def produce():
frame = _resample_per_day(_per_day_counts(AnimeRoom, since), since)
return frame.to_dict(orient="records")
return _resample_per_day(_per_day_counts(AnimeRoom, since), since)

return Response({"data": _cached(f"stats:active-room-per-day:{days}", produce)})

Expand Down Expand Up @@ -338,8 +353,8 @@ def shields_data(self) -> dict:

class RoomCountParDayShieldsAPI(_ShieldsView):
def shields_data(self) -> dict:
frame = _resample_per_day(_per_day_counts(AnimeRoom))
mean = "{:.2f}".format(frame["count"].mean()) + "/day"
rows = _resample_per_day(_per_day_counts(AnimeRoom))
mean = f"{_mean_per_day(rows):.2f}/day"
return {
"label": "Room",
"message": mean,
Expand All @@ -355,8 +370,8 @@ def shields_data(self) -> dict:

class UserCountParDayShieldsAPI(_ShieldsView):
def shields_data(self) -> dict:
frame = _resample_per_day(_per_day_counts(AnimeUser))
mean = "{:.2f}".format(frame["count"].mean()) + "/day"
rows = _resample_per_day(_per_day_counts(AnimeUser))
mean = f"{_mean_per_day(rows):.2f}/day"
return {
"label": "User",
"message": mean,
Expand Down
4 changes: 3 additions & 1 deletion d_party/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@

ALLOWED_HOSTS = [os.environ["MY_DOMAIN"], "www." + os.environ["MY_DOMAIN"], "django"]
if DEBUG:
ALLOWED_HOSTS += ["*"]
# 開発・テスト用の明示的な許可ホスト。``["*"]`` だと誤って DEBUG=1 のまま本番に
# 出たとき Host ヘッダインジェクションを許してしまうため、ワイルドカードは使わない。
ALLOWED_HOSTS += ["localhost", "127.0.0.1", "[::1]", "testserver"]
CSRF_TRUSTED_ORIGINS = [
"https://*." + os.environ["MY_DOMAIN"],
"https://*.127.0.0.1",
Expand Down
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ dependencies = [
"django-extensions>=3.2.3",
"pydantic>=2.9",
"psycopg[binary]>=3.2",
"pandas>=2.2",
"gunicorn>=23.0",
"uvicorn[standard]>=0.34",
"uvicorn-worker>=0.3",
Expand Down
94 changes: 49 additions & 45 deletions streamer/consumers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import uuid

from channels.db import database_sync_to_async
from django.db import transaction

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [mypy] reported by reviewdog 🐶
Cannot find implementation or library stub for module named "django.db" [import-not-found]

from djangochannelsrestframework.decorators import action
from djangochannelsrestframework.generics import GenericAsyncAPIConsumer

Expand All @@ -26,7 +27,7 @@
VideoOperation,
)
from .models import AnimeReaction, AnimeRoom, AnimeUser, ReactionType
from .util import is_valid_uuid, uuid_json_encoder
from .util import is_valid_uuid

# ホストの WS が一瞬落ちた / タブをリロードしただけでルームが即消え
# すると、ゲストが共有リンクを踏んだときにもう failed_join になる。
Expand Down Expand Up @@ -84,7 +85,6 @@ class AnimePartyConsumer(GenericAsyncAPIConsumer):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
json.JSONEncoder.default = uuid_json_encoder
# 入室したAnimeRoomのオブジェクト
self.anime_room = None
# ユーザー情報
Expand Down Expand Up @@ -120,7 +120,7 @@ async def create(
)
user = User(**self.anime_user.__dict__)
create = Create(room_id=self.anime_room.room_id, user=user)
await self.send(text_data=json.dumps(create.model_dump()))
await self.send(text_data=json.dumps(create.model_dump(mode="json")))
user_list = await self.database_user_list()
user_list_data = UserList(user_list=user_list)
response_data = RoomSend(
Expand All @@ -129,7 +129,7 @@ async def create(
)
await self.channel_layer.group_send(
str(self.anime_room.room_id),
json.loads(json.dumps(response_data.model_dump())),
response_data.model_dump(mode="json"),
)

@action()
Expand All @@ -152,7 +152,7 @@ async def join(
# 早期 return するだけで WS が開いたままになり、後続の sync_request 等が
# self.anime_user.__dict__ で AttributeError を起こして 1011 close を招く。
failed = ServerMessage(message_type="failed_join")
await self.send(text_data=json.dumps(failed.model_dump()))
await self.send(text_data=json.dumps(failed.model_dump(mode="json")))
await self.close()
return
# このルームに猶予期間の削除予約があれば取り消す
Expand All @@ -166,7 +166,7 @@ async def join(
)
user = User(**self.anime_user.__dict__)
join = Join(room_id=self.anime_room.room_id, user=user)
await self.send(text_data=json.dumps(join.model_dump()))
await self.send(text_data=json.dumps(join.model_dump(mode="json")))
user_add = UserAdd(user=user)
response_data = GroupSend(
response=user_add,
Expand All @@ -175,7 +175,7 @@ async def join(
await self.database_increase_num_people()
await self.channel_layer.group_send(
str(self.anime_room.room_id),
json.loads(json.dumps(response_data.model_dump())),
response_data.model_dump(mode="json"),
)
user_list = await self.database_user_list()
user_list_data = UserList(user_list=user_list)
Expand All @@ -185,7 +185,7 @@ async def join(
)
await self.channel_layer.group_send(
str(self.anime_room.room_id),
json.loads(json.dumps(response_data.model_dump())),
response_data.model_dump(mode="json"),
)

@action()
Expand Down Expand Up @@ -226,7 +226,7 @@ async def video_operation(self, operation: str, option: dict, **kwargs):
await self.database_update_room_part_id(video_operation.option.part_id)
await self.channel_layer.group_send(
str(self.anime_room.room_id),
json.loads(json.dumps(response_data.model_dump())),
response_data.model_dump(mode="json"),
)

@action()
Expand All @@ -244,7 +244,7 @@ async def sync_request(self, **kwargs):
)
await self.channel_layer.group_send(
str(self.anime_room.room_id),
json.loads(json.dumps(response_data.model_dump())),
response_data.model_dump(mode="json"),
)

@action()
Expand All @@ -267,7 +267,7 @@ async def sync_response(self, to_user: uuid, option: dict, **kwargs):
)
await self.channel_layer.group_send(
str(self.anime_room.room_id),
json.loads(json.dumps(response_data.model_dump())),
response_data.model_dump(mode="json"),
)

@action()
Expand All @@ -290,7 +290,7 @@ async def operation_notification(self, operation: str, **kwargs):
)
await self.channel_layer.group_send(
str(self.anime_room.room_id),
json.loads(json.dumps(response_data.model_dump())),
response_data.model_dump(mode="json"),
)

@action()
Expand All @@ -316,7 +316,7 @@ async def reaction(self, reaction_type: str, **kwargs):
)
await self.channel_layer.group_send(
str(self.anime_room.room_id),
json.loads(json.dumps(response_data.model_dump())),
response_data.model_dump(mode="json"),
)
if reaction_type in ReactionType.__members__:
await self.database_create_reaction(reaction_type=reaction_type)
Expand All @@ -328,7 +328,7 @@ async def user_list(self, **kwargs):
return
user_list = await self.database_user_list()
response_data = UserList(user_list=user_list)
await self.send(text_data=json.dumps(response_data.model_dump()))
await self.send(text_data=json.dumps(response_data.model_dump(mode="json")))

@action()
async def delete_room(self, **kwargs):
Expand All @@ -352,7 +352,7 @@ async def delete_room(self, **kwargs):
)
await self.channel_layer.group_send(
room_id_str,
json.loads(json.dumps(response_data.model_dump())),
response_data.model_dump(mode="json"),
)
await self.database_delete_room_and_users()

Expand Down Expand Up @@ -418,19 +418,19 @@ async def leave_party(self):
# cancel される。
_schedule_room_delete(str(self.anime_room.room_id))
if user_count >= 1 and self.anime_user.is_host:
next_host = await self.database_get_next_host_or_none()
await self.database_host_change_user(next_host.user_id)
server_message = ServerMessage(message_type="host_change")
send_data = HostSend(
response=server_message, sender_channel_name=self.channel_name
)
await self.channel_layer.group_send(
str(self.anime_room.room_id),
json.loads(json.dumps(send_data.model_dump())),
)
next_host = await self.database_promote_next_host()
if next_host is not None:
server_message = ServerMessage(message_type="host_change")
send_data = HostSend(
response=server_message, sender_channel_name=self.channel_name
)
await self.channel_layer.group_send(
str(self.anime_room.room_id),
send_data.model_dump(mode="json"),
)
await self.channel_layer.group_send(
str(self.anime_room.room_id),
json.loads(json.dumps(response_data.model_dump())),
response_data.model_dump(mode="json"),
)
user_list = await self.database_user_list()
user_list_data = UserList(user_list=user_list)
Expand All @@ -440,7 +440,7 @@ async def leave_party(self):
)
await self.channel_layer.group_send(
str(self.anime_room.room_id),
json.loads(json.dumps(response_data.model_dump())),
response_data.model_dump(mode="json"),
)
await self.channel_layer.group_discard(
str(self.anime_room.room_id), self.channel_name
Expand Down Expand Up @@ -472,9 +472,8 @@ def database_create_user(

@database_sync_to_async
def database_delete_user(self):
"""データベースからユーザーを削除する"""
"""データベースからユーザーを削除する(論理削除)"""
self.anime_user.delete()
self.anime_user.save()

@database_sync_to_async
def database_create_room(self, part_id: str, title: str = ""):
Expand Down Expand Up @@ -502,12 +501,6 @@ def database_update_room_part_id(self, part_id: str):
self.anime_room.part_id = part_id
self.anime_room.save()

@database_sync_to_async
def database_delete_room(self):
"""ルームの論理削除を行う"""
self.anime_room.delete()
self.anime_room.save()

@database_sync_to_async
def database_delete_room_and_users(self):
"""ルームと、その中の生存ユーザーをまとめて論理削除する。
Expand Down Expand Up @@ -538,9 +531,27 @@ def database_decrease_num_people(self):
self.anime_room.save()

@database_sync_to_async
def database_get_next_host_or_none(self):
ar = AnimeRoom.objects.get(room_id=self.anime_room.room_id)
return ar.inroom.alive().earliest("created_at")
def database_promote_next_host(self):
"""残った生存ユーザーのうち最古の 1 人をホストへアトミックに昇格させる。

次ホストの選定と昇格を 1 トランザクション(``select_for_update``)で行う。
「選定 → 昇格」の await 境界で当人が離脱し、論理削除済みの行をホストに
昇格させてしまう競合を防ぐ(旧実装は ``.earliest()`` で空のとき
``DoesNotExist`` を投げて 1011 close を招いていた)。ルームが空なら ``None``。
"""
with transaction.atomic():
next_user = (
AnimeUser.objects.select_for_update()
.alive()
.filter(room_id=self.anime_room.room_id)
.order_by("created_at")
.first()
)
if next_user is None:
return None
next_user.is_host = True
next_user.save(update_fields=["is_host"])
return next_user

@database_sync_to_async
def database_get_user_count(self):
Expand All @@ -564,13 +575,6 @@ def database_get_or_none_room(self, room_id):
# 無視していたため、削除直後のルームに join できてしまうバグがあった)。
return AnimeRoom.objects.alive().filter(room_id=room_id).first()

@database_sync_to_async
def database_host_change_user(self, user_id):
au = AnimeUser.objects.get(user_id=user_id)
au.is_host = True
au.save()
return au

@database_sync_to_async
def database_renew_state(self):
"""インスタンス化しているユーザー情報とルーム情報をデータベースに合わせる"""
Expand Down
3 changes: 2 additions & 1 deletion streamer/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ def delete(self, using=None, keep_parents=False, hard=False):
if hard:
return super().delete(using=using, keep_parents=keep_parents)
self.deleted_at = self.get_deleted_value()
return self.save()
self.save(using=using, update_fields=[DELETE_FLAG_FIELD])
return (1, {self._meta.label: 1})

def revive(self, force_update=False, using=None):
self.deleted_at = None
Expand Down
Loading
Loading