Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ HEYBLOG_PUBLIC_BASE_URL=http://127.0.0.1:3000
HEYBLOG_EMAIL_PROVIDER=disabled
HEYBLOG_EMAIL_FROM=no-reply@example.com
HEYBLOG_EMAIL_DEV_EXPOSE_TOKENS=false
# Refresh admin hourly statistics in the persistence service background scheduler.
HEYBLOG_ADMIN_STATS_SCHEDULER_ENABLED=true
HEYBLOG_SMTP_HOST=smtp.example.com
HEYBLOG_SMTP_PORT=587
HEYBLOG_SMTP_USERNAME=
Expand Down
3 changes: 2 additions & 1 deletion doc/api-docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ Admin API 同样由 `backend` 暴露,但统一位于 `/api/admin/*` 下,并

#### `GET /api/admin/hourly-stats`

用途:返回后台统计小时快照,并在读取时刷新当前自然小时的数据。该接口位于 admin API 下,需要 `Authorization: Bearer <HEYBLOG_ADMIN_TOKEN>` 或已验证 admin 用户 session token。
用途:返回后台统计小时快照。快照由 `persistence-api` 后台整点任务刷新,不依赖打开 admin 页面;该接口位于 admin API 下,需要 `Authorization: Bearer <HEYBLOG_ADMIN_TOKEN>` 或已验证 admin 用户 session token。

查询参数:

Expand All @@ -704,6 +704,7 @@ Admin API 同样由 `backend` 暴露,但统一位于 `/api/admin/*` 下,并
统计语义:

- 数据写入 `admin_hourly_stats` 表,每条记录对应一个 UTC 自然小时窗口 `[hour_start, hour_start + 1h)`
- `persistence-api` 会在每个 UTC 整点刷新刚结束的小时和新的当前小时;服务启动和 admin 页面读取都不会触发实时刷新
- `user_count`: 当前 active 用户总数
- `random_request_count`: 该小时内 random blog 推荐请求数
- `random_impression_count`: 该小时内 random blog 推荐曝光数;随机页每次通常请求 9 个
Expand Down
2 changes: 2 additions & 0 deletions doc/config-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Docker Compose 也会从仓库根目录的 `.env` 读取变量。
| `HEYBLOG_EMAIL_PROVIDER` | `disabled` | `persistence-api` | 用户生命周期邮件 provider。可选 `disabled`/`noop` 或 `smtp`;默认不连接邮件服务 |
| `HEYBLOG_EMAIL_FROM` | 空 | `persistence-api` | SMTP 邮件发件人地址;启用 `smtp` 时必须设置 |
| `HEYBLOG_EMAIL_DEV_EXPOSE_TOKENS` | `false` | `persistence-api` | 是否在验证/重置 API 响应中暴露 raw token/link。仅本地调试需要手动设置为 `true` |
| `HEYBLOG_ADMIN_STATS_SCHEDULER_ENABLED` | `true` | `persistence-api` | 是否启动后台整点任务刷新 admin hourly stats;开启后不依赖打开 admin 页面 |
| `HEYBLOG_SMTP_HOST` | 空 | `persistence-api` | SMTP 服务器主机名 |
| `HEYBLOG_SMTP_PORT` | `587` | `persistence-api` | SMTP 服务器端口 |
| `HEYBLOG_SMTP_USERNAME` | 未设置 | `persistence-api` | SMTP 用户名;为空时不执行登录 |
Expand Down Expand Up @@ -99,6 +100,7 @@ Docker Compose 也会从仓库根目录的 `.env` 读取变量。
| `persistence-api` | `HEYBLOG_DOCKER_DECISION_MODEL_ROOT` | 全库规则重扫读取的容器内运行时模型根目录 |
| `persistence-api` | `HEYBLOG_DECISION_MODEL_CONSENSUS_STRATEGY` / `HEYBLOG_DECISION_MODEL_CONSENSUS_THRESHOLD` | 全库规则重扫使用的模型共识策略与 weighted 阈值 |
| `persistence-api` | `HEYBLOG_EMAIL_PROVIDER` / `HEYBLOG_EMAIL_FROM` / `HEYBLOG_SMTP_*` | 发送邮箱验证与密码重置邮件 |
| `persistence-api` | `HEYBLOG_ADMIN_STATS_SCHEDULER_ENABLED` | 控制 admin hourly stats 后台整点刷新任务 |

## 3.1 运行时资源目录约定

Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ services:
HEYBLOG_EMAIL_PROVIDER: ${HEYBLOG_EMAIL_PROVIDER:-disabled}
HEYBLOG_EMAIL_FROM: ${HEYBLOG_EMAIL_FROM:-}
HEYBLOG_EMAIL_DEV_EXPOSE_TOKENS: ${HEYBLOG_EMAIL_DEV_EXPOSE_TOKENS:-false}
HEYBLOG_ADMIN_STATS_SCHEDULER_ENABLED: ${HEYBLOG_ADMIN_STATS_SCHEDULER_ENABLED:-true}
HEYBLOG_SMTP_HOST: ${HEYBLOG_SMTP_HOST:-}
HEYBLOG_SMTP_PORT: ${HEYBLOG_SMTP_PORT:-587}
HEYBLOG_SMTP_USERNAME: ${HEYBLOG_SMTP_USERNAME:-}
Expand Down
172 changes: 172 additions & 0 deletions persistence_api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@

from __future__ import annotations

from datetime import UTC
from datetime import datetime
from datetime import timedelta
from dataclasses import dataclass
from threading import Event
from threading import Lock
from threading import Thread
from typing import Callable
from typing import Any
from typing import TypeVar
Expand Down Expand Up @@ -36,13 +42,150 @@
LOGGER = get_logger(__name__)


def _utc_hour_start(value: datetime | None = None) -> datetime:
"""Return the UTC natural-hour boundary for one timestamp.

Args:
value: Optional timestamp. Current UTC time is used when omitted.

Returns:
Timezone-aware UTC datetime truncated to the hour.
"""

current = value or datetime.now(UTC)
if current.tzinfo is None:
current = current.replace(tzinfo=UTC)
return current.astimezone(UTC).replace(minute=0, second=0, microsecond=0)


def _seconds_until_next_hour(now: datetime | None = None) -> float:
"""Return seconds from one timestamp until the next UTC hour boundary.

Args:
now: Optional timestamp. Current UTC time is used when omitted.

Returns:
Positive number of seconds until the next natural-hour boundary.
"""

current = now or datetime.now(UTC)
if current.tzinfo is None:
current = current.replace(tzinfo=UTC)
current = current.astimezone(UTC)
next_hour = _utc_hour_start(current) + timedelta(hours=1)
return max((next_hour - current).total_seconds(), 0.001)


class AdminStatsScheduler:
"""Refresh admin hourly statistics independently of admin page reads."""

def __init__(
self,
repository: RepositoryProtocol,
*,
enabled: bool = True,
tick_seconds_factory: Callable[[], float] = _seconds_until_next_hour,
) -> None:
"""Initialize the hourly admin stats scheduler.

Args:
repository: Persistence repository that owns snapshot refreshes.
enabled: Whether scheduler startup should create a background
thread.
tick_seconds_factory: Delay factory used before each hourly tick.

Returns:
None.
"""

self.repository = repository
self.enabled = enabled
self.tick_seconds_factory = tick_seconds_factory
self._stop_event = Event()
self._lock = Lock()
self._thread: Thread | None = None

def start(self) -> dict[str, Any]:
"""Start the background scheduler if enabled.

Args:
None.

Returns:
Payload describing scheduler startup state.
"""

if not self.enabled:
return {"accepted": False, "reason": "scheduler_disabled"}
with self._lock:
if self._thread is not None and self._thread.is_alive():
return {"accepted": False, "reason": "scheduler_already_running"}
self._stop_event.clear()
self._thread = Thread(target=self._run, daemon=True, name="admin-stats-scheduler")
self._thread.start()
return {"accepted": True}

def stop(self) -> dict[str, Any]:
"""Request scheduler shutdown.

Args:
None.

Returns:
Payload describing whether a running scheduler was asked to stop.
"""

self._stop_event.set()
with self._lock:
running = self._thread is not None and self._thread.is_alive()
return {"accepted": running}

def _run(self) -> None:
"""Run hourly refresh ticks until shutdown is requested.

Args:
None.

Returns:
None.
"""

while not self._stop_event.wait(self.tick_seconds_factory()):
self._refresh_due_hours()

def _refresh_due_hours(self) -> None:
"""Refresh the previous and current UTC natural-hour snapshots.

Args:
None.

Returns:
None.
"""

current_hour = _utc_hour_start()
for hour_start in (current_hour - timedelta(hours=1), current_hour):
try:
self.repository.refresh_admin_hourly_stats(hour_start=hour_start)
except Exception as exc: # pragma: no cover - defensive logging path
log_event(
LOGGER,
event="persistence.admin_stats.refresh_failed",
message="admin hourly stats refresh failed",
stage="admin_stats",
hour_start=hour_start.isoformat(),
error=str(exc),
)


@dataclass(slots=True)
class PersistenceState:
"""State container for the persistence service."""

repository: RepositoryProtocol
graph_service: GraphService
stats_service: StatsService
admin_stats_scheduler: AdminStatsScheduler


class UpsertBlogRequest(BaseModel):
Expand Down Expand Up @@ -316,6 +459,10 @@ def build_persistence_state(settings: Settings | None = None) -> PersistenceStat
age_manager=age_manager,
),
stats_service=StatsService(repository),
admin_stats_scheduler=AdminStatsScheduler(
repository,
enabled=resolved.admin_stats_scheduler_enabled,
),
)


Expand All @@ -340,6 +487,31 @@ def get_state() -> PersistenceState:
app.state.persistence_state = build_persistence_state()
return app.state.persistence_state

@app.on_event("startup")
def start_admin_stats_scheduler() -> None:
"""Start hourly admin statistics refresh when the service starts."""
scheduler_result = get_state().admin_stats_scheduler.start()
log_event(
LOGGER,
event="persistence.admin_stats.scheduler.started",
message="admin hourly stats scheduler started",
stage="admin_stats",
accepted=scheduler_result.get("accepted"),
reason=scheduler_result.get("reason"),
)

@app.on_event("shutdown")
def stop_admin_stats_scheduler() -> None:
"""Stop hourly admin statistics refresh when the service shuts down."""
scheduler_result = get_state().admin_stats_scheduler.stop()
log_event(
LOGGER,
event="persistence.admin_stats.scheduler.stopped",
message="admin hourly stats scheduler stopped",
stage="admin_stats",
accepted=scheduler_result.get("accepted"),
)

@app.get("/internal/health")
def health() -> dict[str, Any]:
return {"status": "ok"} | get_state().graph_service.graph_status()
Expand Down
44 changes: 33 additions & 11 deletions persistence_api/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -1906,6 +1906,8 @@ def get_blog_recommendation_stats(self, blog_id: int) -> dict[str, Any] | None:

def get_recommendation_strategy_stats(self) -> dict[str, Any]: ...

def refresh_admin_hourly_stats(self, *, hour_start: datetime | None = None) -> dict[str, Any]: ...

def get_admin_hourly_stats(self, *, limit: int = 24) -> dict[str, Any]: ...

def list_blog_labeling_candidates(
Expand Down Expand Up @@ -4409,14 +4411,14 @@ def _admin_hourly_stats_payload(self, row: AdminHourlyStatsModel) -> dict[str, A
return {
"id": row.id,
"hour_start": _iso(row.hour_start),
"user_count": row.user_count,
"random_request_count": row.random_request_count,
"random_impression_count": row.random_impression_count,
"detail_open_count": row.detail_open_count,
"external_open_count": row.external_open_count,
"detail_ctr": row.detail_ctr,
"external_ctr": row.external_ctr,
"total_click_ctr": row.total_click_ctr,
"user_count": row.user_count or 0,
"random_request_count": row.random_request_count or 0,
"random_impression_count": row.random_impression_count or 0,
"detail_open_count": row.detail_open_count or 0,
"external_open_count": row.external_open_count or 0,
"detail_ctr": row.detail_ctr or 0.0,
"external_ctr": row.external_ctr or 0.0,
"total_click_ctr": row.total_click_ctr or 0.0,
"refreshed_at": _iso(row.refreshed_at),
"created_at": _iso(row.created_at),
}
Expand Down Expand Up @@ -4496,27 +4498,47 @@ def _refresh_admin_hourly_stats(self, session: Session, hour_start: datetime) ->
session.flush()
return row

def refresh_admin_hourly_stats(self, *, hour_start: datetime | None = None) -> dict[str, Any]:
"""Refresh or create one admin statistics snapshot.

Args:
hour_start: Optional UTC natural-hour boundary. The current UTC
hour is used when omitted.

Returns:
JSON-ready hourly statistics snapshot.
"""

normalized_hour = self._hour_start(hour_start)
with session_scope(self.session_factory) as session:
row = self._refresh_admin_hourly_stats(session, normalized_hour)
return self._admin_hourly_stats_payload(row)

def get_admin_hourly_stats(self, *, limit: int = 24) -> dict[str, Any]:
"""Return hourly admin statistics and refresh the current hour.
"""Return persisted hourly admin statistics.

Args:
limit: Maximum number of hourly snapshots to return.

Returns:
Admin statistics payload with latest/current snapshots first.
Admin statistics payload with latest/current persisted snapshots
first. Snapshot refresh is owned by the service scheduler, not this
read path.
"""

normalized_limit = max(1, min(int(limit), 168))
with session_scope(self.session_factory) as session:
current_hour = self._hour_start()
current = self._refresh_admin_hourly_stats(session, current_hour)
rows = list(
session.scalars(
select(AdminHourlyStatsModel)
.order_by(AdminHourlyStatsModel.hour_start.desc())
.limit(normalized_limit)
)
)
current = next((row for row in rows if self._hour_start(row.hour_start) == current_hour), None)
if current is None:
current = AdminHourlyStatsModel(hour_start=current_hour)
latest = rows[0] if rows else current
return {
"current_hour": self._admin_hourly_stats_payload(current),
Expand Down
6 changes: 6 additions & 0 deletions shared/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
DEFAULT_CANDIDATE_PAGE_FETCH_CONCURRENCY = 4
DEFAULT_RUNTIME_WORKER_COUNT = 3
DEFAULT_RUNTIME_AUTO_START_INTERVAL_SECONDS = 3600.0
DEFAULT_ADMIN_STATS_SCHEDULER_ENABLED = True
DEFAULT_MAX_FETCHED_PAGE_BYTES = 2_000_000
DEFAULT_RAW_DISCOVERED_URL_LIMIT = 1_000_000
PROJECT_ROOT = Path(__file__).resolve().parent.parent
Expand Down Expand Up @@ -107,6 +108,7 @@ class Settings:
candidate_page_fetch_concurrency: int = DEFAULT_CANDIDATE_PAGE_FETCH_CONCURRENCY
runtime_worker_count: int = DEFAULT_RUNTIME_WORKER_COUNT
runtime_auto_start_interval_seconds: float = DEFAULT_RUNTIME_AUTO_START_INTERVAL_SECONDS
admin_stats_scheduler_enabled: bool = DEFAULT_ADMIN_STATS_SCHEDULER_ENABLED
max_fetched_page_bytes: int = DEFAULT_MAX_FETCHED_PAGE_BYTES
raw_discovered_url_limit: int = DEFAULT_RAW_DISCOVERED_URL_LIMIT
friend_link_domain_blocklist: tuple[str, ...] = ()
Expand Down Expand Up @@ -222,6 +224,10 @@ def from_env(cls) -> "Settings":
)
),
),
admin_stats_scheduler_enabled=_parse_bool_env(
"HEYBLOG_ADMIN_STATS_SCHEDULER_ENABLED",
default=DEFAULT_ADMIN_STATS_SCHEDULER_ENABLED,
),
max_fetched_page_bytes=max(
1,
int(
Expand Down
Loading
Loading