diff --git a/.gitignore b/.gitignore index 48b285d..3793398 100644 --- a/.gitignore +++ b/.gitignore @@ -38,6 +38,8 @@ content.db # adapter 跑过留下的 .auth/ .auth-xhs/ +.auth-wechat-channels/ +.auth-bilibili/ .debug/ # Editor / IDE diff --git a/adapters/perf-data/bilibili-stat/README.md b/adapters/perf-data/bilibili-stat/README.md index 5777540..940b792 100644 --- a/adapters/perf-data/bilibili-stat/README.md +++ b/adapters/perf-data/bilibili-stat/README.md @@ -1,26 +1,29 @@ # bilibili-stat — B站 perf-data adapter -回收**自己(或任意)B站视频**的播放数据 + 热门评论,供 `/cheat-retro` 复盘。 +回收**自己(或任意)B站视频**的播放数据、热门评论和弹幕文本,供 `/cheat-retro` 复盘。 被 `/cheat-retro` 调用:当 `state.data_collection=adapter` 且 `Platform: bilibili` 时。 ## 为什么它比抖音/小红书 adapter 简单 -B站的视频统计(`view`)与评论(`reply`)都是**公开接口**: +B站单条视频统计、评论和弹幕是公开数据: -- 不需要登录(没有 `crawler.py login` 步骤、不碰 `.auth/`) -- 不需要 wbi 签名(`view` 接口免签名;评论走 `x/v2/reply` 老接口,按热度 `sort=2`) -- 不需要浏览器(纯 `httpx`,无 playwright) +- 单条复盘不需要登录 +- 自动读取自己空间的作品列表需要扫码登录一次 +- 不需要第三方 Python 包,使用标准库 -所以 **clone 下来装个 `httpx` 就能用**,零配置。 +所以 clone 下来即可使用,零配置。 ## 安装 +单条视频复盘只使用 Python 标准库。自动读取空间投稿列表需要 Playwright: + ```bash pip install -r requirements.txt +playwright install chromium ``` -(若你的内容项目根有 `.venv`,run.sh 会优先用它;否则用系统 `python3`/`python`。) +`run.sh` 会优先使用项目 `.venv`,兼容 Windows 与 macOS/Linux。 ## 用法 @@ -37,8 +40,9 @@ bash run.sh [] 也可直接调底层: ```bash +python review.py login # 首次登录,状态保存到 .auth-bilibili/ +python review.py list # 自动列出最近 20 条及 BV 号 python review.py video BV1cUoUY9Ecr # 抓数据 → 写 videos//report.md -python review.py login # B站无需登录,仅打印说明(接口一致性) ``` ## 输出(report.md) @@ -49,6 +53,7 @@ python review.py login # B站无需登录,仅打印说 - **播放数据**:播放、点赞、投币、收藏、分享、评论、弹幕,并附派生比率(赞播比 / 投币率 / 收藏率 / 分播比)——B站「三连率」尤其能反映硬核认可度 - 原始稿子(若提供) - **热门评论**(按点赞降序,带 IP 属地) +- **弹幕文本**(带视频时间点,最多 100 条) ## 接口 @@ -56,12 +61,13 @@ python review.py login # B站无需登录,仅打印说 |---|---| | 视频统计 | `GET https://api.bilibili.com/x/web-interface/view?bvid=` | | 评论 | `GET https://api.bilibili.com/x/v2/reply?type=1&oid=&sort=2&pn=&ps=20` | +| 弹幕文本 | `GET https://api.bilibili.com/x/v1/dm/list.so?oid=` | `oid` 取自 view 接口返回的 `aid`。评论按热度(`sort=2`)翻页,默认取 top 50。 ## 退出码 -`0` 成功 · `2` 缺依赖(httpx 未装)· `3` 其他失败(网络 / 解析 / BV 号错误)。任何失败时 `/cheat-retro` 会优雅降级到 manual 模式。 +`0` 成功 · `2` 缺少 Python/Playwright · `3` 其他失败(网络 / 解析 / BV 号错误)。任何失败时 `/cheat-retro` 会优雅降级到 manual 模式。 ## 字段随接口改版的维护 diff --git a/adapters/perf-data/bilibili-stat/crawler.py b/adapters/perf-data/bilibili-stat/crawler.py index 633c516..6bd9d1c 100644 --- a/adapters/perf-data/bilibili-stat/crawler.py +++ b/adapters/perf-data/bilibili-stat/crawler.py @@ -1,55 +1,83 @@ -"""B站视频数据 + 评论抓取(公开接口,无需登录、无需签名)。 - -与 douyin-session / xhs-explore 不同:B站的视频统计与评论都是**公开数据**—— -`view` 接口不需要 wbi 签名,评论走 `x/v2/reply` 老接口(按热度)。因此本 adapter -是纯 httpx,零登录、零浏览器,clone 下来即可用。 - -接口: -- 视频数据:https://api.bilibili.com/x/web-interface/view?bvid= -- 评论: https://api.bilibili.com/x/v2/reply?type=1&oid=&sort=2 (sort=2 热度) -""" +"""Bilibili public video statistics, comments, and danmaku.""" from __future__ import annotations +import gzip +import json import re -import sys import time +import urllib.parse +import urllib.request +import xml.etree.ElementTree as ET +import zlib +from typing import Any -import httpx +from paths import auth_dir, debug_dir UA = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36" ) -HEADERS = {"User-Agent": UA, "Referer": "https://www.bilibili.com"} +HEADERS = { + "User-Agent": UA, + "Referer": "https://www.bilibili.com/", + "Accept-Encoding": "identity", +} VIEW_API = "https://api.bilibili.com/x/web-interface/view" REPLY_API = "https://api.bilibili.com/x/v2/reply" +DANMAKU_API = "https://api.bilibili.com/x/v1/dm/list.so" +SPACE_URL = "https://space.bilibili.com/{mid}/upload/video" def normalize_bvid(raw: str) -> str: - """从 BV 号或 B站视频 URL(含 b23.tv 短链跳转后)里提取 BV 号。""" - m = re.search(r"(BV[0-9A-Za-z]{10})", raw or "") - return m.group(1) if m else (raw or "").strip() + match = re.search(r"(BV[0-9A-Za-z]{10})", raw or "") + value = match.group(1) if match else (raw or "").strip() + if not re.fullmatch(r"BV[0-9A-Za-z]{10}", value): + raise ValueError(f"无法识别 BV 号:{raw}") + return value + +def _get_bytes(url: str, params: dict[str, Any] | None = None) -> bytes: + if params: + url = f"{url}?{urllib.parse.urlencode(params)}" + request = urllib.request.Request(url, headers=HEADERS) + with urllib.request.urlopen(request, timeout=25) as response: + raw = response.read() + encoding = (response.headers.get("Content-Encoding") or "").lower() + if encoding == "gzip": + raw = gzip.decompress(raw) + elif encoding == "deflate": + try: + raw = zlib.decompress(raw) + except zlib.error: + raw = zlib.decompress(raw, -zlib.MAX_WBITS) + return raw -def _client() -> httpx.Client: - return httpx.Client(headers=HEADERS, timeout=20, follow_redirects=True) +def _get_json(url: str, params: dict[str, Any] | None = None) -> dict: + return json.loads(_get_bytes(url, params).decode("utf-8")) -def fetch_video(client: httpx.Client, bvid: str) -> dict: - """拉视频信息 + 统计。返回归一化后的 dict。""" - j = client.get(VIEW_API, params={"bvid": bvid}).json() - if j.get("code") != 0: - raise RuntimeError(f"view 接口失败 code={j.get('code')} msg={j.get('message')}") - d = j.get("data") or {} - stat = d.get("stat") or {} + +def fetch_video(bvid: str) -> dict: + data = _get_json(VIEW_API, {"bvid": bvid}) + if data.get("code") != 0: + raise RuntimeError( + f"view 接口失败 code={data.get('code')} msg={data.get('message')}" + ) + item = data.get("data") or {} + stat = item.get("stat") or {} + pages = item.get("pages") or [] + first_page = pages[0] if pages else {} + owner = item.get("owner") or {} return { - "bvid": d.get("bvid") or bvid, - "aid": d.get("aid"), - "title": d.get("title") or "", - "desc": d.get("desc") or "", - "owner": (d.get("owner") or {}).get("name") or "", - "pubdate": d.get("pubdate") or 0, - "duration_s": d.get("duration") or 0, + "bvid": item.get("bvid") or bvid, + "aid": item.get("aid"), + "cid": first_page.get("cid"), + "title": item.get("title") or "", + "desc": item.get("desc") or "", + "owner": owner.get("name") or "", + "owner_mid": owner.get("mid"), + "pubdate": item.get("pubdate") or 0, + "duration_s": item.get("duration") or 0, "play_count": stat.get("view") or 0, "like_count": stat.get("like") or 0, "coin_count": stat.get("coin") or 0, @@ -57,73 +85,199 @@ def fetch_video(client: httpx.Client, bvid: str) -> dict: "share_count": stat.get("share") or 0, "comment_count": stat.get("reply") or 0, "danmaku_count": stat.get("danmaku") or 0, - "raw": d, } -def fetch_comments(client: httpx.Client, aid: int, max_count: int = 50) -> list[dict]: - """按热度(sort=2)翻页抓评论,最多 max_count 条。""" +def fetch_comments(aid: int | None, max_count: int = 50) -> list[dict]: + if not aid: + print("[警告] view 接口未返回 aid,跳过评论抓取。") + return [] out: list[dict] = [] seen: set[str] = set() - pn = 1 - for _ in range(60): # 翻页上限保护 - if len(out) >= max_count: - break - try: - j = client.get( - REPLY_API, - params={"type": 1, "oid": aid, "sort": 2, "pn": pn, "ps": 20}, - ).json() - except Exception as exc: - print(f"[警告] 评论请求异常(停止):{exc}") + page = 1 + while len(out) < max_count and page <= 20: + payload = _get_json( + REPLY_API, + {"type": 1, "oid": aid, "sort": 2, "pn": page, "ps": 20}, + ) + if payload.get("code") != 0: + print( + f"[警告] 评论接口 code={payload.get('code')} " + f"msg={payload.get('message')}(停止)" + ) break - if j.get("code") != 0: - print(f"[警告] 评论接口 code={j.get('code')} msg={j.get('message')}(停止)") + replies = (payload.get("data") or {}).get("replies") or [] + if not replies: break - reps = (j.get("data") or {}).get("replies") or [] - if not reps: - break - for c in reps: - nc = _normalize_comment(c) - if nc["cid"] in seen: - continue - seen.add(nc["cid"]) - out.append(nc) - pn += 1 - time.sleep(0.4) - out.sort(key=lambda x: x["digg_count"], reverse=True) + for reply in replies: + for normalized in _flatten_comment(reply): + if normalized["cid"] and normalized["cid"] not in seen: + seen.add(normalized["cid"]) + out.append(normalized) + page += 1 + time.sleep(0.35) + out.sort(key=lambda item: (item["digg_count"], item["create_time"]), reverse=True) return out[:max_count] -def _normalize_comment(c: dict) -> dict: - member = c.get("member") or {} - content = c.get("content") or {} - loc = (c.get("reply_control") or {}).get("location") or "" - return { - "cid": str(c.get("rpid") or ""), +def _flatten_comment(comment: dict, is_reply: bool = False) -> list[dict]: + member = comment.get("member") or {} + content = comment.get("content") or {} + location = (comment.get("reply_control") or {}).get("location") or "" + current = { + "cid": str(comment.get("rpid") or ""), "text": content.get("message") or "", - "digg_count": c.get("like") or 0, - "reply_comment_total": c.get("rcount") or 0, - "create_time": c.get("ctime") or 0, + "digg_count": comment.get("like") or 0, + "reply_comment_total": comment.get("rcount") or 0, + "create_time": comment.get("ctime") or 0, "user_name": member.get("uname") or "", - "ip_label": loc.replace("IP属地:", "").replace("IP属地:", "").strip() if loc else "", + "ip_label": location.replace("IP属地:", "").replace("IP属地:", "").strip(), + "is_reply": is_reply, } + out = [current] + for child in comment.get("replies") or []: + out.extend(_flatten_comment(child, is_reply=True)) + return out + + +def fetch_danmaku(cid: int | None, max_count: int = 100) -> list[dict]: + if not cid: + return [] + try: + raw = _get_bytes(DANMAKU_API, {"oid": cid}) + root = ET.fromstring(raw) + except Exception as exc: + print(f"[警告] 弹幕抓取失败:{exc}") + return [] + out: list[dict] = [] + for node in root.findall("d"): + attrs = (node.attrib.get("p") or "").split(",") + out.append( + { + "text": (node.text or "").strip(), + "video_time_s": float(attrs[0]) if attrs and attrs[0] else 0, + "sent_at": int(attrs[4]) if len(attrs) > 4 and attrs[4] else 0, + } + ) + if len(out) >= max_count: + break + return out -def fetch_all(bvid: str, max_comments: int = 50) -> dict: - """一次拉完视频数据 + 热门评论。""" +def fetch_all(bvid: str, max_comments: int = 50, max_danmaku: int = 100) -> dict: bvid = normalize_bvid(bvid) - with _client() as client: - print(f" → 拉取视频数据 {bvid}") - video = fetch_video(client, bvid) - print(f" ✓ {video['title'][:40]}(播放 {video['play_count']})") + print(f" → 拉取视频数据 {bvid}") + video = fetch_video(bvid) + print(f" ✓ {video['title'][:40]}(播放 {video['play_count']})") + comments = [] + if video.get("aid"): print(" → 拉取热门评论") - comments = fetch_comments(client, video["aid"], max_count=max_comments) - print(f" ✓ {len(comments)} 条评论") - return {"video": video, "comments": comments} + comments = fetch_comments(video["aid"], max_count=max_comments) + else: + print(" → 跳过热门评论(view 接口未返回 aid)") + print(f" ✓ {len(comments)} 条评论") + print(" → 拉取弹幕文本") + danmaku = fetch_danmaku(video.get("cid"), max_count=max_danmaku) + print(f" ✓ {len(danmaku)} 条弹幕") + return {"video": video, "comments": comments, "danmaku": danmaku} + + +async def ensure_login(mid: str, timeout_s: int = 300) -> bool: + """Open a persistent browser and wait until the Bilibili account is logged in.""" + from playwright.async_api import async_playwright + + playwright = await async_playwright().start() + profile = auth_dir() + profile.mkdir(parents=True, exist_ok=True) + context = await playwright.chromium.launch_persistent_context( + user_data_dir=str(profile), + headless=False, + viewport={"width": 1440, "height": 900}, + args=["--disable-blink-features=AutomationControlled"], + ) + try: + page = context.pages[0] if context.pages else await context.new_page() + await page.goto(SPACE_URL.format(mid=mid), wait_until="domcontentloaded", timeout=60000) + print(f"[登录] 请在浏览器中登录 B 站。最多等待 {timeout_s} 秒……") + for elapsed in range(timeout_s): + cookies = {item["name"]: item.get("value", "") for item in await context.cookies()} + if cookies.get("SESSDATA") and cookies.get("DedeUserID"): + print(f"[登录] ✓ B 站登录态已确认(用时 {elapsed}s)") + await page.reload(wait_until="domcontentloaded") + await page.wait_for_timeout(5000) + return True + await page.wait_for_timeout(1000) + print("[登录] 超时,未检测到 B 站登录态。") + return False + finally: + await context.close() + await playwright.stop() + + +async def fetch_space_videos(mid: str, limit: int = 20) -> list[dict]: + """Read recent video cards from a logged-in Bilibili space page.""" + from playwright.async_api import async_playwright + + playwright = await async_playwright().start() + profile = auth_dir() + profile.mkdir(parents=True, exist_ok=True) + context = await playwright.chromium.launch_persistent_context( + user_data_dir=str(profile), + headless=True, + viewport={"width": 1440, "height": 1000}, + args=["--disable-blink-features=AutomationControlled"], + ) + try: + page = context.pages[0] if context.pages else await context.new_page() + await page.goto(SPACE_URL.format(mid=mid), wait_until="domcontentloaded", timeout=60000) + await page.wait_for_timeout(8000) + for _ in range(4): + await page.mouse.wheel(0, 1200) + await page.wait_for_timeout(1200) + + raw_links = await page.locator('a[href*="/video/BV"]').evaluate_all( + """els => els.map(e => ({ + href: e.href || '', + text: (e.innerText || e.textContent || '').trim(), + title: e.getAttribute('title') || '' + }))""" + ) + by_bvid: dict[str, dict] = {} + for item in raw_links: + match = re.search(r"(BV[0-9A-Za-z]{10})", item.get("href") or "") + if not match: + continue + bvid = match.group(1) + title = (item.get("title") or item.get("text") or "").strip() + existing = by_bvid.get(bvid) + if not existing or len(title) > len(existing.get("title") or ""): + by_bvid[bvid] = { + "bvid": bvid, + "title": title, + "url": f"https://www.bilibili.com/video/{bvid}", + } + videos = list(by_bvid.values())[:limit] + if not videos: + debug = debug_dir() + debug.mkdir(parents=True, exist_ok=True) + await page.screenshot(path=str(debug / "space-video-list.png"), full_page=True) + (debug / "space-page.txt").write_text( + (await page.locator("body").inner_text())[:20000], + encoding="utf-8", + ) + raise RuntimeError("空间页未读取到视频卡片;登录可能过期或页面触发验证") -if __name__ == "__main__": - # 与 douyin-session / xhs-explore 的接口保持一致;B站公开数据无需登录。 - if len(sys.argv) > 1 and sys.argv[1] == "login": - print("[B站] 视频数据与评论均为公开接口,无需登录,直接 /cheat-retro 即可。") + # Public detail API gives canonical title, date, and play count. + normalized: list[dict] = [] + for item in videos: + try: + detail = fetch_video(item["bvid"]) + normalized.append(detail) + except Exception: + normalized.append(item) + normalized.sort(key=lambda item: item.get("pubdate") or 0, reverse=True) + return normalized[:limit] + finally: + await context.close() + await playwright.stop() diff --git a/adapters/perf-data/bilibili-stat/paths.py b/adapters/perf-data/bilibili-stat/paths.py index c3bddde..d7be624 100644 --- a/adapters/perf-data/bilibili-stat/paths.py +++ b/adapters/perf-data/bilibili-stat/paths.py @@ -23,6 +23,13 @@ def debug_dir( return runtime_project_root(env=env, cwd=cwd) / ".cheat-cache" / "bilibili-stat-debug" +def auth_dir( + env: Mapping[str, str] | None = None, + cwd: Path | None = None, +) -> Path: + return runtime_project_root(env=env, cwd=cwd) / ".auth-bilibili" + + def videos_dir( env: Mapping[str, str] | None = None, cwd: Path | None = None, diff --git a/adapters/perf-data/bilibili-stat/renderer.py b/adapters/perf-data/bilibili-stat/renderer.py index 5585c0b..3805f90 100644 --- a/adapters/perf-data/bilibili-stat/renderer.py +++ b/adapters/perf-data/bilibili-stat/renderer.py @@ -31,7 +31,12 @@ def _ratio(num: int | None, den: int | None) -> str: return f"{num / den * 100:.2f}%" -def render_report(video: dict, script: str, comments: list[dict]) -> str: +def render_report( + video: dict, + script: str, + comments: list[dict], + danmaku: list[dict], +) -> str: lines: list[str] = [] title = video.get("title") or "(无标题)" bvid = video["bvid"] @@ -75,7 +80,24 @@ def render_report(video: dict, script: str, comments: list[dict]) -> str: text = (c.get("text") or "").replace("\n", " ").strip() reply = f" 💬{c['reply_comment_total']}" if c.get("reply_comment_total") else "" loc = f" [{c['ip_label']}]" if c.get("ip_label") else "" - lines.append(f"- [👍{c['digg_count']}{reply}]{loc} {text}") + reply_label = "(回复)" if c.get("is_reply") else "" + user = c.get("user_name") or "匿名" + lines.append( + f"- [👍{c['digg_count']}{reply}]{loc} {user}{reply_label}:{text}" + ) + lines.append("") + + lines.append(f"## 弹幕文本(最多 100 条,共抓到 {len(danmaku)} 条)") + lines.append("") + if not danmaku: + lines.append("(未抓到弹幕文本,可能该视频暂无弹幕或接口受限)") + else: + for item in danmaku: + seconds = float(item.get("video_time_s") or 0) + minute, second = divmod(int(seconds), 60) + text = (item.get("text") or "").replace("\n", " ").strip() + if text: + lines.append(f"- [{minute:02d}:{second:02d}] {text}") lines.append("") return "\n".join(lines) diff --git a/adapters/perf-data/bilibili-stat/requirements.txt b/adapters/perf-data/bilibili-stat/requirements.txt index 6ecf620..86a8fae 100644 --- a/adapters/perf-data/bilibili-stat/requirements.txt +++ b/adapters/perf-data/bilibili-stat/requirements.txt @@ -1 +1 @@ -httpx>=0.27 +playwright>=1.44 diff --git a/adapters/perf-data/bilibili-stat/review.py b/adapters/perf-data/bilibili-stat/review.py index a18d54c..439dbec 100644 --- a/adapters/perf-data/bilibili-stat/review.py +++ b/adapters/perf-data/bilibili-stat/review.py @@ -6,6 +6,8 @@ """ from __future__ import annotations +import asyncio +import os import sys from pathlib import Path @@ -31,12 +33,18 @@ def run_with_id(bvid: str, script_path: str | None) -> None: result = crawler.fetch_all(bvid) video = result["video"] comments = result["comments"] + danmaku = result["danmaku"] - out_dir = renderer.output_dir_for(video, active_videos_dir) + output_override = os.environ.get("CHEAT_OUTPUT_DIR") + out_dir = ( + Path(output_override).expanduser().resolve() + if output_override + else renderer.output_dir_for(video, active_videos_dir) + ) out_dir.mkdir(parents=True, exist_ok=True) if script: (out_dir / "script.txt").write_text(script, encoding="utf-8") - md = renderer.render_report(video, script, comments) + md = renderer.render_report(video, script, comments, danmaku) report = out_dir / "report.md" report.write_text(md, encoding="utf-8") print(f"\n✓ {report}") @@ -44,7 +52,23 @@ def run_with_id(bvid: str, script_path: str | None) -> None: def main() -> None: if len(sys.argv) > 1 and sys.argv[1] == "login": - print("[B站] 视频数据与评论均为公开接口,无需登录,直接复盘即可。") + if len(sys.argv) < 3: + print("用法:python review.py login ") + sys.exit(3) + asyncio.run(crawler.ensure_login(sys.argv[2])) + return + if len(sys.argv) > 1 and sys.argv[1] == "list": + if len(sys.argv) < 3: + print("用法:python review.py list ") + sys.exit(3) + videos = asyncio.run(crawler.fetch_space_videos(sys.argv[2], limit=20)) + for index, video in enumerate(videos): + print( + f"[{index}] {video['bvid']} " + f"{renderer._fmt_time(video.get('pubdate', 0))} " + f"播放{renderer._fmt_num(video.get('play_count'))} " + f"{(video.get('title') or '')[:60]}" + ) return if len(sys.argv) > 1 and sys.argv[1] == "video": if len(sys.argv) < 3: @@ -54,7 +78,16 @@ def main() -> None: script_path = sys.argv[3] if len(sys.argv) > 3 else None run_with_id(bvid, script_path) return - print(__doc__) + if len(sys.argv) > 1 and sys.argv[1] == "test": + bvid = sys.argv[2] if len(sys.argv) > 2 else "BV1cUoUY9Ecr" + result = crawler.fetch_all(bvid, max_comments=5, max_danmaku=5) + video = result["video"] + print( + f"[测试] {video['bvid']} 播放={video['play_count']} " + f"评论文本={len(result['comments'])} 弹幕文本={len(result['danmaku'])}" + ) + return + print("用法:review.py login | list | video [script.txt]") if __name__ == "__main__": diff --git a/adapters/perf-data/bilibili-stat/run.sh b/adapters/perf-data/bilibili-stat/run.sh index ecddb8c..4c09cdc 100755 --- a/adapters/perf-data/bilibili-stat/run.sh +++ b/adapters/perf-data/bilibili-stat/run.sh @@ -10,8 +10,7 @@ # Example: # bash run.sh BV1cUoUY9Ecr ~/my-channel/videos/2026-05-04_BV1cUoUY9Ecr_AI接入MC # -# B站视频数据(view)与评论(reply)都是公开接口——无需登录、无需 wbi 签名、无需浏览器。 -# 纯 httpx,因此这个 adapter 没有 `crawler.py login` 步骤,clone 下来配好依赖即可用。 +# B站视频数据、评论与弹幕使用公开读取接口,无需登录和浏览器。 # # Output: writes report.md INTO the video_folder. # Exit codes: @@ -30,38 +29,54 @@ if [[ -z "$BVID" || -z "$VIDEO_FOLDER" ]]; then exit 3 fi +# Git Bash accepts Y:/ paths for some commands but not for executable checks. +if command -v cygpath >/dev/null 2>&1; then + VIDEO_FOLDER="$(cygpath -u "$VIDEO_FOLDER")" + if [[ -n "$SCRIPT_PATH" ]]; then + SCRIPT_PATH="$(cygpath -u "$SCRIPT_PATH")" + fi +fi + # Resolve adapter source dir (where this script lives) ADAPTER_DIR="$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )" +VIDEO_FOLDER_REAL="$( realpath -m "$VIDEO_FOLDER" )" + +find_project_root() { + local dir + dir="$( dirname "$VIDEO_FOLDER_REAL" )" + while [[ "$dir" != "/" && -n "$dir" ]]; do + if [[ -f "$dir/.cheat-state.json" ]]; then + printf '%s\n' "$dir" + return 0 + fi + dir="$( dirname "$dir" )" + done + return 1 +} # Find Python — prefer venv in user's project root if exists -PYTHON="" -PROJECT_ROOT="$( dirname "$( dirname "$( realpath "$VIDEO_FOLDER" )" )" )" +PYTHON=() +if ! PROJECT_ROOT="$(find_project_root)"; then + echo "❌ Could not find .cheat-state.json above $VIDEO_FOLDER" >&2 + exit 3 +fi if [[ -x "$PROJECT_ROOT/.venv/bin/python" ]]; then - PYTHON="$PROJECT_ROOT/.venv/bin/python" -elif command -v python3 >/dev/null 2>&1; then - PYTHON="python3" -elif command -v python >/dev/null 2>&1; then - PYTHON="python" + PYTHON=("$PROJECT_ROOT/.venv/bin/python") +elif [[ -x "$PROJECT_ROOT/.venv/Scripts/python.exe" ]]; then + PYTHON=("$PROJECT_ROOT/.venv/Scripts/python.exe") +elif command -v py.exe >/dev/null 2>&1; then + PYTHON=("py.exe" "-3") +elif command -v python3 >/dev/null 2>&1 && python3 -c "import sys; assert sys.version_info >= (3, 10)" 2>/dev/null; then + PYTHON=("python3") +elif command -v python >/dev/null 2>&1 && python -c "import sys; assert sys.version_info >= (3, 10)" 2>/dev/null; then + PYTHON=("python") else echo "❌ python not found — install Python 3.10+ first" >&2 exit 2 fi -# Verify httpx is installed -if ! "$PYTHON" -c "import httpx" 2>/dev/null; then - cat >&2 </dev/null 2>&1; then + PY_PROJECT_ROOT="$(cygpath -w "$PROJECT_ROOT")" + PY_VIDEOS_DIR="$(cygpath -w "$( dirname "$VIDEO_FOLDER_REAL" )")" + PY_OUTPUT_DIR="$(cygpath -w "$VIDEO_FOLDER_REAL")" +fi +export CHEAT_PROJECT_ROOT="$PY_PROJECT_ROOT" +export CHEAT_VIDEOS_DIR="$PY_VIDEOS_DIR" +export CHEAT_OUTPUT_DIR="$PY_OUTPUT_DIR" +export PYTHONUTF8=1 +export PYTHONIOENCODING=utf-8 -echo "[bilibili-stat] fetching $BVID into $VIDEO_FOLDER" +echo "[bilibili-stat] fetching $BVID into $VIDEO_FOLDER_REAL" if [[ -n "$SCRIPT_ARG" ]]; then - "$PYTHON" "$ADAPTER_DIR/review.py" video "$BVID" "$SCRIPT_ARG" + "${PYTHON[@]}" "$ADAPTER_DIR/review.py" video "$BVID" "$SCRIPT_ARG" else - "$PYTHON" "$ADAPTER_DIR/review.py" video "$BVID" -fi - -# review.py writes to CHEAT_VIDEOS_DIR//report.md (named by title). -# Move it into our canonical video_folder if names differ. -LATEST_REPORT=$(find "$( dirname "$VIDEO_FOLDER" )" -name "report.md" -newer "$VIDEO_FOLDER" -type f 2>/dev/null | head -1) -if [[ -n "$LATEST_REPORT" && "$( dirname "$LATEST_REPORT" )" != "$VIDEO_FOLDER" ]]; then - cp "$LATEST_REPORT" "$VIDEO_FOLDER/report.md" - AUTO_DIR=$( dirname "$LATEST_REPORT" ) - if [[ -f "$AUTO_DIR/script.txt" ]]; then - cp "$AUTO_DIR/script.txt" "$VIDEO_FOLDER/script.txt" - fi - echo "[bilibili-stat] moved auto-named output to $VIDEO_FOLDER/" + "${PYTHON[@]}" "$ADAPTER_DIR/review.py" video "$BVID" fi -if [[ ! -f "$VIDEO_FOLDER/report.md" ]]; then +if [[ ! -f "$VIDEO_FOLDER_REAL/report.md" ]]; then echo "❌ report.md not produced — see review.py output above for details" >&2 exit 3 fi -echo "✅ report.md written to $VIDEO_FOLDER/report.md" +echo "✅ report.md written to $VIDEO_FOLDER_REAL/report.md" exit 0 diff --git a/adapters/perf-data/wechat-channels/README.md b/adapters/perf-data/wechat-channels/README.md new file mode 100644 index 0000000..c07029b --- /dev/null +++ b/adapters/perf-data/wechat-channels/README.md @@ -0,0 +1,46 @@ +# Adapter: wechat-channels(视频号助手) + +用于抓取自己账号在视频号助手后台的作品运营数据。 + +## 原理 + +- Playwright 持久化 Chromium 登录态 +- 被动监听 `channels.weixin.qq.com/platform/` 页面自行发出的 JSON +- 不逆向签名,不读取微信客户端文件 +- 登录态保存在内容项目的 `.auth-wechat-channels/` + +## 安装与登录 + +```powershell +cd <内容项目> +.\.venv\Scripts\python.exe \review.py login +``` + +扫码登录后验证: + +```powershell +.\.venv\Scripts\python.exe \review.py list +``` + +## 抓取单条作品 + +```powershell +.\.venv\Scripts\python.exe \review.py post [script.txt] +``` + +首跑会在 `.cheat-cache/wechat-channels-debug/` 保存捕获的 URL、JSON 和页面截图。 +如果视频号后台改版,可依据这些诊断文件更新字段映射。 + +## 当前限制 + +- 第一版优先实现作品列表和播放/点赞/爱心/评论/分享数据。 +- 已支持从“互动管理 → 评论”自动抓评论文本、点赞数和作者回复。 +- 默认按点赞和时间排序,报告最多写入 Top 20;接口有分页时自动翻页。 +- 如果指定的 post_id 不在最近作品列表中,会以非 0 状态退出,避免生成 0 值 report.md 污染复盘。 + +## TOS / 风险边界 + +- 仅用于用户自己登录后可见的视频号助手后台数据。 +- 不抓取他人作品,不绕过登录,不逆向签名,不自动发布内容。 +- 不持久化 signed media URL、内部用户名、cookie 或 secret 到 report.md。 +- 请低频手动触发,用于个人内容复盘和 NotebookLM/cheat-on-content 分析。 diff --git a/adapters/perf-data/wechat-channels/crawler.py b/adapters/perf-data/wechat-channels/crawler.py new file mode 100644 index 0000000..cb391cd --- /dev/null +++ b/adapters/perf-data/wechat-channels/crawler.py @@ -0,0 +1,566 @@ +"""视频号助手创作者后台抓取。 + +使用 Playwright 持久化登录态,被动监听后台页面自行发出的 JSON 请求。 +不伪造签名,不读取微信客户端数据。 +""" +from __future__ import annotations + +import asyncio +import json +import re +import sys +import time +from pathlib import Path +from typing import Any + +from playwright.async_api import BrowserContext, Page, Response, async_playwright + +from paths import auth_dir, debug_dir + +PLATFORM_HOME = "https://channels.weixin.qq.com/platform/" +CONTENT_URLS = ( + "https://channels.weixin.qq.com/platform/post/list", + "https://channels.weixin.qq.com/platform/content", + PLATFORM_HOME, +) + + +class Session: + def __init__(self, ctx: BrowserContext, pw: Any) -> None: + self.ctx = ctx + self.pw = pw + + @classmethod + async def open(cls, headless: bool = False) -> "Session": + pw = await async_playwright().start() + auth = auth_dir() + auth.mkdir(parents=True, exist_ok=True) + ctx = await pw.chromium.launch_persistent_context( + user_data_dir=str(auth), + headless=headless, + viewport={"width": 1440, "height": 900}, + args=["--disable-blink-features=AutomationControlled"], + ) + return cls(ctx, pw) + + async def close(self) -> None: + try: + await self.ctx.close() + finally: + await self.pw.stop() + + +async def _logged_in(page: Page) -> bool: + if "login" in page.url.lower(): + return False + try: + body = await page.locator("body").inner_text(timeout=3000) + except Exception: + return False + login_markers = ("扫码登录", "微信扫码", "登录视频号助手") + account_markers = ("内容管理", "互动管理", "收入与服务", "视频号ID") + return not any(x in body for x in login_markers) and any(x in body for x in account_markers) + + +async def ensure_login(timeout_s: int = 300) -> bool: + sess = await Session.open() + try: + page = sess.ctx.pages[0] if sess.ctx.pages else await sess.ctx.new_page() + await page.goto(PLATFORM_HOME, wait_until="domcontentloaded", timeout=60000) + print(f"[登录] 请在弹出的窗口扫码登录视频号助手,最多等待 {timeout_s} 秒。") + for i in range(timeout_s): + if await _logged_in(page): + print(f"[登录] ✓ 已检测到视频号助手登录态(用时 {i}s)") + await asyncio.sleep(2) + return True + await asyncio.sleep(1) + print("[登录] 超时,未检测到登录态。") + return False + finally: + await sess.close() + + +def _first(obj: dict, *keys: str, default: Any = None) -> Any: + for key in keys: + if key in obj and obj[key] not in (None, ""): + return obj[key] + return default + + +def _walk(value: Any): + if isinstance(value, dict): + yield value + for child in value.values(): + yield from _walk(child) + elif isinstance(value, list): + for child in value: + yield from _walk(child) + + +def _looks_like_post(obj: dict) -> bool: + keys = set(obj) + has_id = bool(keys & {"object_id", "objectId", "feed_id", "feedId", "post_id", "postId", "id"}) + has_metrics = bool( + keys + & { + "read_count", + "readCount", + "play_count", + "playCount", + "like_count", + "likeCount", + "comment_count", + "commentCount", + "forward_count", + "forwardCount", + } + ) + return has_id and has_metrics + + +def _text_from_desc(value: Any) -> str: + if isinstance(value, str): + return value.strip() + if not isinstance(value, dict): + return "" + for key in ("description", "title", "content"): + text = value.get(key) + if isinstance(text, str) and text.strip(): + return text.strip() + short_title = value.get("shortTitle") + if isinstance(short_title, list): + labels = [ + item.get("shortTitle", "").strip() + for item in short_title + if isinstance(item, dict) and isinstance(item.get("shortTitle"), str) + ] + if labels: + return " ".join(labels) + return "" + + +def _normalize_post(obj: dict) -> dict: + post_id = str( + _first(obj, "object_id", "objectId", "feed_id", "feedId", "post_id", "postId", "id", default="") + ) + title = "" + for key in ("title", "description", "content"): + value = obj.get(key) + if isinstance(value, str) and value.strip(): + title = value.strip() + break + if not title: + title = _text_from_desc(obj.get("desc")) + create_time = _first( + obj, + "create_time", + "createTime", + "publish_time", + "publishTime", + "post_time", + "postTime", + default=0, + ) + return { + "post_id": post_id, + "title": title, + "create_time": create_time, + "view_count": _first(obj, "read_count", "readCount", "play_count", "playCount", "view_count", default=0), + "like_count": _first(obj, "like_count", "likeCount", "like_num", "likeNum", default=0), + "favorite_count": _first( + obj, "favorite_count", "favoriteCount", "fav_count", "favCount", "collect_count", default=0 + ), + "comment_count": _first(obj, "comment_count", "commentCount", "comment_num", default=0), + "share_count": _first( + obj, "forward_count", "forwardCount", "share_count", "shareCount", "export_count", default=0 + ), + "full_play_rate": _first(obj, "fullPlayRate", "full_play_rate", default=None), + "avg_play_time_sec": _first(obj, "avgPlayTimeSec", "avg_play_time_sec", default=None), + "follow_count": _first(obj, "followCount", "follow_count", default=0), + "fast_flip_rate": _first(obj, "fastFlipRate", "fast_flip_rate", default=None), + "yesterday_view_count": _first(obj, "yesterdayReadCount", "yesterday_read_count", default=0), + "comments": obj.get("commentList") if isinstance(obj.get("commentList"), list) else [], + "raw": obj, + } + + +def _dedupe_posts(items: list[dict]) -> list[dict]: + seen: set[str] = set() + result: list[dict] = [] + for item in items: + post_id = item.get("post_id") + if not post_id or post_id in seen: + continue + seen.add(post_id) + result.append(item) + return result + + +async def _capture_json(response: Response, captured: list[dict], urls: list[str]) -> None: + urls.append(response.url) + content_type = (response.headers.get("content-type") or "").lower() + if "json" not in content_type and not any(k in response.url.lower() for k in ("post", "feed", "finder", "data")): + return + try: + data = await response.json() + except Exception: + return + captured.append( + { + "url": response.url, + "method": response.request.method, + "post_data": response.request.post_data, + "data": data, + } + ) + + +async def _click_text(page: Page, label: str) -> bool: + for frame in page.frames: + try: + locator = frame.get_by_text(label, exact=True) + if await locator.count(): + await locator.first.click(timeout=5000) + return True + except Exception: + continue + return False + + +async def fetch_recent_posts(sess: Session, limit: int = 50) -> list[dict]: + captured: list[dict] = [] + urls: list[str] = [] + page = await sess.ctx.new_page() + + async def on_response(response: Response) -> None: + await _capture_json(response, captured, urls) + + page.on("response", on_response) + try: + await page.goto(PLATFORM_HOME, wait_until="domcontentloaded", timeout=60000) + await asyncio.sleep(5) + + # The content app is loaded dynamically after expanding the sidebar. + # Click across all frames because Tencent occasionally mounts it in an iframe. + if not await _click_text(page, "内容管理"): + print("[诊断] 未找到“内容管理”入口。") + await asyncio.sleep(4) + + # Content Management may open its own route before rendering the submenu. + clicked_video = await _click_text(page, "视频") + if not clicked_video: + for url in CONTENT_URLS[:-1]: + try: + await page.goto(url, wait_until="domcontentloaded", timeout=60000) + await asyncio.sleep(4) + if await _click_text(page, "视频"): + clicked_video = True + break + except Exception: + continue + await asyncio.sleep(8) + + for _ in range(5): + await page.mouse.wheel(0, 1400) + await asyncio.sleep(2) + + posts: list[dict] = [] + for packet in captured: + for obj in _walk(packet["data"]): + if _looks_like_post(obj): + posts.append(_normalize_post(obj)) + posts = _dedupe_posts(posts) + + debug = debug_dir() + debug.mkdir(parents=True, exist_ok=True) + (debug / "captured_urls.txt").write_text("\n".join(urls), encoding="utf-8") + (debug / "captured_json.json").write_text( + json.dumps(captured, ensure_ascii=False, indent=2)[:5_000_000], + encoding="utf-8", + ) + await page.screenshot(path=str(debug / "content-page.png"), full_page=True) + if not posts: + frame_texts: list[str] = [] + for frame in page.frames: + try: + frame_texts.append(f"--- {frame.url} ---\n{await frame.locator('body').first.inner_text()}") + except Exception: + continue + (debug / "page.txt").write_text("\n\n".join(frame_texts), encoding="utf-8") + print(f"[诊断] 暂未解析到作品,已保存 {len(captured)} 个 JSON 响应到 {debug}") + return posts[:limit] + finally: + await page.close() + + +async def capture_comment_management(sess: Session, title_hint: str = "") -> dict: + captured: list[dict] = [] + urls: list[str] = [] + page = await sess.ctx.new_page() + + async def on_response(response: Response) -> None: + await _capture_json(response, captured, urls) + + page.on("response", on_response) + try: + await page.goto(PLATFORM_HOME, wait_until="domcontentloaded", timeout=60000) + await asyncio.sleep(5) + if not await _click_text(page, "互动管理"): + print("[诊断] 未找到“互动管理”入口。") + await asyncio.sleep(4) + if not await _click_text(page, "评论"): + print("[诊断] 未找到“评论”子菜单。") + await asyncio.sleep(10) + + clicked_post = False + if title_hint: + for frame in page.frames: + try: + candidates = frame.get_by_text(re.compile(re.escape(title_hint[:12]))) + if await candidates.count(): + await candidates.first.click(timeout=5000) + clicked_post = True + break + except Exception: + continue + if not clicked_post: + # Calibration fallback: click the second visible video row. The first + # row may have zero comments; selecting another row triggers the + # comment detail endpoint needed for field discovery. + await page.mouse.click(480, 435) + await asyncio.sleep(10) + + for _ in range(8): + for frame in page.frames: + try: + await frame.locator("body").first.evaluate("el => el.scrollBy(0, 1200)") + except Exception: + continue + await page.mouse.wheel(0, 1200) + await asyncio.sleep(2) + + debug = debug_dir() + debug.mkdir(parents=True, exist_ok=True) + (debug / "comment_urls.txt").write_text("\n".join(urls), encoding="utf-8") + (debug / "comment_json.json").write_text( + json.dumps(captured, ensure_ascii=False, indent=2)[:10_000_000], + encoding="utf-8", + ) + frame_texts: list[str] = [] + for frame in page.frames: + try: + frame_texts.append(f"--- {frame.url} ---\n{await frame.locator('body').first.inner_text()}") + except Exception: + continue + (debug / "comment_page.txt").write_text("\n\n".join(frame_texts), encoding="utf-8") + await page.screenshot(path=str(debug / "comment-page.png"), full_page=True) + print(f"[评论诊断] 捕获 {len(captured)} 个 JSON 响应,已保存到 {debug}") + return {"captured": captured, "urls": urls} + finally: + await page.close() + + +def _normalize_comment(obj: dict, parent_id: str = "") -> dict: + text = str(_first(obj, "commentContent", "content", "text", default="")).strip() + text = ( + text.replace("/:strong", "👍") + .replace("/:heart", "❤️") + .replace("/:strong", "👍") + ) + return { + "comment_id": str(_first(obj, "commentId", "comment_id", "id", default="")), + "nickname": str(_first(obj, "commentNickname", "nickname", "nickName", default="")).strip(), + "text": text, + "create_time": _first(obj, "commentCreatetime", "createTime", "create_time", default=0), + "like_count": _first(obj, "commentLikeCount", "likeCount", "like_count", default=0), + "parent_id": parent_id, + "is_reply": bool(parent_id or obj.get("replyCommentId")), + } + + +def _comments_from_payload(payload: dict) -> list[dict]: + data = payload.get("data") if isinstance(payload, dict) else None + if isinstance(data, dict) and isinstance(data.get("data"), dict): + data = data["data"] + if not isinstance(data, dict): + return [] + roots = data.get("comment") + if not isinstance(roots, list): + return [] + comments: list[dict] = [] + + def add(item: dict, parent_id: str = "") -> None: + normalized = _normalize_comment(item, parent_id=parent_id) + if normalized["comment_id"] and normalized["text"]: + comments.append(normalized) + replies = item.get("levelTwoComment") + if isinstance(replies, list): + for reply in replies: + if isinstance(reply, dict): + add(reply, normalized["comment_id"]) + + for root in roots: + if isinstance(root, dict): + add(root) + return comments + + +def _dedupe_comments(comments: list[dict]) -> list[dict]: + by_id: dict[str, dict] = {} + for comment in comments: + comment_id = comment.get("comment_id") + if comment_id: + by_id[comment_id] = comment + return sorted( + by_id.values(), + key=lambda item: (int(item.get("like_count") or 0), int(item.get("create_time") or 0)), + reverse=True, + ) + + +async def fetch_comments_creator( + sess: Session, + post_id: str, + title_hint: str = "", + max_pages: int = 20, +) -> list[dict]: + captured: list[dict] = [] + urls: list[str] = [] + page = await sess.ctx.new_page() + + async def on_response(response: Response) -> None: + await _capture_json(response, captured, urls) + + page.on("response", on_response) + try: + await page.goto(PLATFORM_HOME, wait_until="domcontentloaded", timeout=60000) + await asyncio.sleep(5) + await _click_text(page, "互动管理") + await asyncio.sleep(3) + await _click_text(page, "评论") + await asyncio.sleep(8) + + clicked = False + hint = re.sub(r"\s+", " ", title_hint).strip()[:12] + for _ in range(30): + if hint: + for frame in page.frames: + try: + locator = frame.get_by_text(re.compile(re.escape(hint))) + if await locator.count(): + await locator.first.click(timeout=5000) + clicked = True + break + except Exception: + continue + if clicked: + break + # Scroll every likely list container to load older posts. + for frame in page.frames: + try: + await frame.locator("div").evaluate_all( + "els => els.forEach(el => { if (el.scrollHeight > el.clientHeight + 100) el.scrollTop += 900; })" + ) + except Exception: + continue + await asyncio.sleep(2) + + if not clicked: + print(f"[评论] 未在评论管理列表中定位作品:{title_hint[:30]}") + return [] + await asyncio.sleep(8) + + packet = next( + ( + item + for item in reversed(captured) + if "comment/comment_list" in item.get("url", "") + and isinstance(item.get("post_data"), str) + ), + None, + ) + if not packet: + print("[评论] 点选作品后未捕获 comment_list 接口。") + return [] + + try: + request_body = json.loads(packet["post_data"]) + except (TypeError, json.JSONDecodeError): + request_body = {} + # Refuse accidental cross-post data: the request must match the target. + if request_body.get("exportId") != post_id: + print("[评论] 捕获到的作品 ID 与目标不一致,停止以避免串评论。") + return [] + + all_comments = _comments_from_payload(packet["data"]) + page_data = packet["data"].get("data") or {} + page_count = 1 + while page_count < max_pages and int(page_data.get("downContinueFlag") or 0): + request_body["lastBuff"] = page_data.get("lastBuff") or "" + request_body["timestamp"] = str(int(time.time() * 1000)) + response = await sess.ctx.request.post( + packet["url"], + data=request_body, + headers={"content-type": "application/json"}, + ) + if not response.ok: + break + payload = await response.json() + all_comments.extend(_comments_from_payload(payload)) + page_data = payload.get("data") or {} + page_count += 1 + + comments = _dedupe_comments(all_comments) + debug = debug_dir() + debug.mkdir(parents=True, exist_ok=True) + (debug / "last-comments.json").write_text( + json.dumps( + { + "post_id": post_id, + "title": title_hint, + "pages": page_count, + "count": len(comments), + "comments": comments, + }, + ensure_ascii=False, + indent=2, + ), + encoding="utf-8", + ) + print(f"[评论] 抓到 {len(comments)} 条({page_count} 页)") + return comments + finally: + await page.close() + + +async def fetch_post(sess: Session, post_id: str) -> dict: + posts = await fetch_recent_posts(sess, limit=200) + for post in posts: + if post["post_id"] == post_id: + return post + raise LookupError( + f"未在视频号助手最近 200 条作品中找到 post_id={post_id}。" + "为避免把未知作品误记为 0 播放,本次不生成 report.md。" + ) + + +async def fetch_all(post_id: str) -> dict: + sess = await Session.open() + try: + post = await fetch_post(sess, post_id) + comments = post.get("comments") or [] + if int(post.get("comment_count") or 0) > 0: + fetched = await fetch_comments_creator(sess, post_id, title_hint=post.get("title") or "") + if fetched: + comments = fetched + return {"post": post, "comments": comments} + finally: + await sess.close() + + +if __name__ == "__main__": + try: + asyncio.run(ensure_login()) + except LookupError as exc: + print(f"[错误] {exc}", file=sys.stderr) + raise SystemExit(3) from exc diff --git a/adapters/perf-data/wechat-channels/paths.py b/adapters/perf-data/wechat-channels/paths.py new file mode 100644 index 0000000..e972142 --- /dev/null +++ b/adapters/perf-data/wechat-channels/paths.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +import os +from pathlib import Path +from typing import Mapping + + +def runtime_project_root( + env: Mapping[str, str] | None = None, + cwd: Path | None = None, +) -> Path: + active_env = env if env is not None else os.environ + if active_env.get("CHEAT_PROJECT_ROOT"): + return Path(active_env["CHEAT_PROJECT_ROOT"]).expanduser().resolve() + return (cwd or Path.cwd()).expanduser().resolve() + + +def auth_dir() -> Path: + return runtime_project_root() / ".auth-wechat-channels" + + +def debug_dir() -> Path: + return runtime_project_root() / ".cheat-cache" / "wechat-channels-debug" + + +def videos_dir() -> Path: + override = os.environ.get("CHEAT_VIDEOS_DIR") + return Path(override).expanduser().resolve() if override else runtime_project_root() / "videos" diff --git a/adapters/perf-data/wechat-channels/renderer.py b/adapters/perf-data/wechat-channels/renderer.py new file mode 100644 index 0000000..819faf0 --- /dev/null +++ b/adapters/perf-data/wechat-channels/renderer.py @@ -0,0 +1,101 @@ +from __future__ import annotations + +import datetime as dt +import json +from pathlib import Path + + +def _fmt_time(value) -> str: + if not value: + return "未知" + try: + ts = int(value) + if ts > 1_000_000_000_000: + ts //= 1000 + return dt.datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M") + except (TypeError, ValueError, OSError): + return str(value) + + +def _fmt_num(value) -> str: + try: + n = int(value or 0) + except (TypeError, ValueError): + return str(value) + return f"{n / 10000:.1f}w" if n >= 10000 else str(n) + + +def _ratio(num, denom) -> str: + try: + a, b = int(num or 0), int(denom or 0) + return "-" if b <= 0 else f"{a / b * 100:.2f}%" + except (TypeError, ValueError): + return "-" + + +def _percent(value) -> str: + try: + n = float(value) + except (TypeError, ValueError): + return "-" + if n <= 1: + n *= 100 + return f"{n:.2f}%" + + +def slugify(text: str, max_len: int = 30) -> str: + bad = '<>:"/\\|?*\n\r\t' + return ("".join("_" if ch in bad else ch for ch in text).strip()[:max_len] or "untitled") + + +def output_dir_for(post: dict, root: Path) -> Path: + date = _fmt_time(post.get("create_time"))[:10].replace("未知", "nodate") + return root / f"{date}_{slugify(post.get('title') or post['post_id'])}" + + +def render_report(post: dict, script: str, comments: list[dict]) -> str: + title = post.get("title") or "(无标题)" + views = post.get("view_count") or 0 + lines = [ + f"# {title}", + "", + f"- 视频号作品 ID:`{post['post_id']}`", + f"- 发布时间:{_fmt_time(post.get('create_time'))}", + f"- 抓取时间:{dt.datetime.now().strftime('%Y-%m-%d %H:%M')}", + "", + "## 数据快照", + "", + f"- 播放/浏览:{_fmt_num(views)}", + f"- 点赞:{_fmt_num(post.get('like_count'))}(赞播比 {_ratio(post.get('like_count'), views)})", + f"- 爱心/收藏:{_fmt_num(post.get('favorite_count'))}(藏播比 {_ratio(post.get('favorite_count'), views)})", + f"- 评论:{_fmt_num(post.get('comment_count'))}(评播比 {_ratio(post.get('comment_count'), views)})", + f"- 分享/转发:{_fmt_num(post.get('share_count'))}(分播比 {_ratio(post.get('share_count'), views)})", + ] + if post.get("full_play_rate") is not None: + lines.append(f"- 完播率:{_percent(post.get('full_play_rate'))}") + if post.get("avg_play_time_sec") is not None: + lines.append(f"- 平均播放时长:{float(post.get('avg_play_time_sec')):.1f} 秒") + if post.get("fast_flip_rate") is not None: + lines.append(f"- 快速划走率:{_percent(post.get('fast_flip_rate'))}") + if post.get("follow_count"): + lines.append(f"- 带来关注:{_fmt_num(post.get('follow_count'))}") + if post.get("yesterday_view_count"): + lines.append(f"- 昨日新增播放:{_fmt_num(post.get('yesterday_view_count'))}") + lines.append("") + lines.extend(["## 原始稿子", "", script.strip() or "(未提供)", "", "## 评论", ""]) + if comments: + for comment in comments[:20]: + if not isinstance(comment, dict): + continue + text = comment.get("text") or comment.get("content") or comment.get("comment") or "" + likes = comment.get("like_count") or comment.get("likeCount") or 0 + nickname = comment.get("nickname") or "匿名" + reply = "(回复)" if comment.get("is_reply") else "" + if text: + lines.append( + f"- [👍{likes}] {nickname}{reply}:{str(text).replace(chr(10), ' ').strip()}" + ) + else: + lines.append("(作品列表接口未返回评论文本;需要在评论管理页进一步捕获。)") + lines.append("") + return "\n".join(lines) diff --git a/adapters/perf-data/wechat-channels/requirements.txt b/adapters/perf-data/wechat-channels/requirements.txt new file mode 100644 index 0000000..86a8fae --- /dev/null +++ b/adapters/perf-data/wechat-channels/requirements.txt @@ -0,0 +1 @@ +playwright>=1.44 diff --git a/adapters/perf-data/wechat-channels/review.py b/adapters/perf-data/wechat-channels/review.py new file mode 100644 index 0000000..7c0da51 --- /dev/null +++ b/adapters/perf-data/wechat-channels/review.py @@ -0,0 +1,76 @@ +from __future__ import annotations + +import asyncio +import os +import sys +from pathlib import Path + +import crawler +import renderer +from paths import videos_dir + + +async def list_posts() -> list[dict]: + sess = await crawler.Session.open() + try: + return await crawler.fetch_recent_posts(sess, limit=50) + finally: + await sess.close() + + +async def run_with_id(post_id: str, script_path: str | None = None) -> None: + script = "" + if script_path: + path = Path(script_path).expanduser() + if path.is_file(): + script = path.read_text(encoding="utf-8", errors="ignore") + + result = await crawler.fetch_all(post_id) + post = result["post"] + root = videos_dir() + root.mkdir(parents=True, exist_ok=True) + output_override = os.environ.get("CHEAT_OUTPUT_DIR") + out_dir = Path(output_override).expanduser().resolve() if output_override else renderer.output_dir_for(post, root) + out_dir.mkdir(parents=True, exist_ok=True) + if script: + (out_dir / "script.txt").write_text(script, encoding="utf-8") + report = out_dir / "report.md" + report.write_text(renderer.render_report(post, script, result["comments"]), encoding="utf-8") + print(f"✓ {report}") + + +def main() -> None: + command = sys.argv[1] if len(sys.argv) > 1 else "list" + if command == "login": + asyncio.run(crawler.ensure_login()) + elif command == "list": + posts = asyncio.run(list_posts()) + for i, post in enumerate(posts): + title = (post.get("title") or "").replace("\n", " ")[:50] + print( + f"[{i}] {post['post_id']} {renderer._fmt_time(post.get('create_time'))} " + f"播放{renderer._fmt_num(post.get('view_count'))} {title}" + ) + elif command == "capture-comments": + async def _capture() -> None: + sess = await crawler.Session.open() + try: + hint = sys.argv[2] if len(sys.argv) > 2 else "" + await crawler.capture_comment_management(sess, title_hint=hint) + finally: + await sess.close() + asyncio.run(_capture()) + elif command in ("post", "video"): + if len(sys.argv) < 3: + raise SystemExit("Usage: review.py post [script_path]") + asyncio.run(run_with_id(sys.argv[2], sys.argv[3] if len(sys.argv) > 3 else None)) + else: + raise SystemExit("Usage: review.py login|list|capture-comments|post [script_path]") + + +if __name__ == "__main__": + try: + main() + except LookupError as exc: + print(f"[错误] {exc}", file=sys.stderr) + raise SystemExit(3) from exc diff --git a/adapters/perf-data/wechat-channels/run.sh b/adapters/perf-data/wechat-channels/run.sh new file mode 100644 index 0000000..69f9603 --- /dev/null +++ b/adapters/perf-data/wechat-channels/run.sh @@ -0,0 +1,70 @@ +#!/usr/bin/env bash +set -uo pipefail + +POST_ID="${1:-}" +VIDEO_FOLDER="${2:-}" +SCRIPT_PATH="${3:-}" + +if [[ -z "$POST_ID" || -z "$VIDEO_FOLDER" ]]; then + echo "Usage: bash run.sh [script_path]" >&2 + exit 3 +fi + +ADAPTER_DIR="$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )" +VIDEO_FOLDER_REAL="$( realpath -m "$VIDEO_FOLDER" )" + +find_project_root() { + local dir + dir="$( dirname "$VIDEO_FOLDER_REAL" )" + while [[ "$dir" != "/" && -n "$dir" ]]; do + if [[ -f "$dir/.cheat-state.json" ]]; then + printf '%s\n' "$dir" + return 0 + fi + dir="$( dirname "$dir" )" + done + return 1 +} + +if ! PROJECT_ROOT="$(find_project_root)"; then + echo "Could not find .cheat-state.json above $VIDEO_FOLDER" >&2 + exit 3 +fi + +if [[ -x "$PROJECT_ROOT/.venv/Scripts/python.exe" ]]; then + PYTHON="$PROJECT_ROOT/.venv/Scripts/python.exe" +elif [[ -x "$PROJECT_ROOT/.venv/bin/python" ]]; then + PYTHON="$PROJECT_ROOT/.venv/bin/python" +elif command -v python3 >/dev/null 2>&1; then + PYTHON="python3" +elif command -v python >/dev/null 2>&1; then + PYTHON="python" +else + echo "python not found" >&2 + exit 2 +fi + +if ! "$PYTHON" -c "import playwright" 2>/dev/null; then + echo "playwright not installed in project venv" >&2 + exit 2 +fi +if [[ ! -d "$PROJECT_ROOT/.auth-wechat-channels" ]]; then + echo "视频号助手未登录,请先运行: $PYTHON \"$ADAPTER_DIR/review.py\" login" >&2 + exit 1 +fi + +mkdir -p "$VIDEO_FOLDER_REAL" +cd "$PROJECT_ROOT" +export CHEAT_PROJECT_ROOT="$PROJECT_ROOT" +export CHEAT_VIDEOS_DIR="$( dirname "$VIDEO_FOLDER_REAL" )" +export CHEAT_OUTPUT_DIR="$VIDEO_FOLDER_REAL" +export PYTHONUTF8=1 + +if [[ -n "$SCRIPT_PATH" && -f "$SCRIPT_PATH" ]]; then + "$PYTHON" "$ADAPTER_DIR/review.py" post "$POST_ID" "$SCRIPT_PATH" +else + "$PYTHON" "$ADAPTER_DIR/review.py" post "$POST_ID" +fi + +[[ -f "$VIDEO_FOLDER_REAL/report.md" ]] || exit 3 +echo "report.md written to $VIDEO_FOLDER_REAL/report.md" diff --git a/adapters/perf-data/xhs-explore/crawler.py b/adapters/perf-data/xhs-explore/crawler.py index e87473c..de32ad9 100644 --- a/adapters/perf-data/xhs-explore/crawler.py +++ b/adapters/perf-data/xhs-explore/crawler.py @@ -87,6 +87,37 @@ async def _creator_logged_in(ctx: BrowserContext) -> bool: return any(names.get(n) for n in CREATOR_LOGIN_COOKIES) +async def _creator_backend_ready(page: Page) -> bool: + """Confirm that creator pages work, not just that an interim SSO cookie exists.""" + verification_markers = ( + "手机号验证", + "手机验证", + "安全验证", + "短信验证", + "验证码", + "验证身份", + ) + try: + if "note-manager" not in page.url: + await page.goto( + CREATOR_NOTE_MANAGER, + wait_until="domcontentloaded", + timeout=30000, + ) + await asyncio.sleep(2) + body = (await page.locator("body").inner_text(timeout=5000)).strip() + if any(marker in body for marker in verification_markers): + return False + blocked_url_parts = ("login", "verify", "security", "captcha") + return ( + "creator.xiaohongshu.com" in page.url + and "note-manager" in page.url + and not any(part in page.url.lower() for part in blocked_url_parts) + ) + except Exception: + return False + + async def _has_web_session(ctx: BrowserContext) -> bool: for host in ("https://www.xiaohongshu.com", "https://creator.xiaohongshu.com"): if (await _cookie_map(ctx, host)).get(WEB_LOGIN_COOKIE): @@ -109,15 +140,22 @@ async def _acquire_web_session(page: Page) -> None: async def ensure_login(timeout_s: int = 300) -> bool: - """扫码登录创作者中心;检测到创作者登录态后顺便换取 web_session,然后自动关闭。""" + """扫码登录;只有创作者后台真正可用后才自动关闭。""" sess = await Session.open() try: page = await sess.ctx.new_page() await page.goto(CREATOR_HOME) print(f"[登录] 在弹出的 Chromium 窗口里扫码登录小红书创作者中心。最多等 {timeout_s} 秒……") + cookie_seen = False for i in range(timeout_s): try: - if await _creator_logged_in(sess.ctx) and "login" not in page.url: + if await _creator_logged_in(sess.ctx): + if not cookie_seen: + print("[登录] 已收到扫码凭证,正在确认是否还需要手机号安全验证……") + cookie_seen = True + if not await _creator_backend_ready(page): + await asyncio.sleep(1) + continue print(f"[登录] ✓ 创作者中心登录态已确认(用时 {i}s)") await _acquire_web_session(page) await asyncio.sleep(1) diff --git a/skills/cheat-init/SKILL.md b/skills/cheat-init/SKILL.md index 575d3b2..34d8298 100644 --- a/skills/cheat-init/SKILL.md +++ b/skills/cheat-init/SKILL.md @@ -141,11 +141,12 @@ allowed-tools: Bash(*), Read, Write, Edit, Glob, WebFetch, Skill > b) 小红书 — 装 xhs-explore adapter(Playwright + 扫码登录小红书创作者中心) > c) YouTube — 装 youtube-data-api adapter(需 API key) > d) B 站 — bilibili-stat adapter -> e) 其他 / 多平台 — 走 manual paste 模式" +> e) 微信视频号 — 装 wechat-channels adapter(Playwright + 扫码登录视频号助手) +> f) 其他 / 多平台 — 走 manual paste 模式" -如选 a/b/c/d → 询问 Q2.2;如选 e → 跳到 Q2.3 manual。 +如选 a/b/c/d/e → 询问 Q2.2;如选 f → 跳到 Q2.3 manual。 -**Q2.2: adapter 安装时机**(仅 Q2.1=a/b/c/d) +**Q2.2: adapter 安装时机**(仅 Q2.1=a/b/c/d/e) > "现在装 adapter 自动抓取,还是先手动告诉我? > - 现在装 — 引导你装 Playwright + 扫码 → 抓回最近 N 条数据 @@ -288,7 +289,7 @@ c) 不找 → state 标 `benchmark_status: none`,用通用 v0 起步 "data_layer": "markdown", "hooks_installed": <查 Q5 映射表,写 bool true/false>, "enabled_trend_sources": ["manual-paste"], - "enabled_perf_adapters": , + "enabled_perf_adapters": , "last_bump_at": null, "last_bump_self_audited": false, "last_published_at": null, diff --git a/skills/cheat-retro/SKILL.md b/skills/cheat-retro/SKILL.md index 382ac80..3492dff 100644 --- a/skills/cheat-retro/SKILL.md +++ b/skills/cheat-retro/SKILL.md @@ -95,6 +95,7 @@ allowed-tools: Bash(*), Read, Edit, Write, Glob, Grep, Skill |---|---|---| | `douyin` | `adapters/perf-data/douyin-session/` | `bash /douyin-session/run.sh ` | | `xhs` | `adapters/perf-data/xhs-explore/` | `bash /xhs-explore/run.sh ` | +| `wechat_channels` | `adapters/perf-data/wechat-channels/` | `bash /wechat-channels/run.sh ` | | `youtube` | `adapters/perf-data/youtube-data-api/`(待) | 调 YouTube Data API(需 API key) | | `bilibili` | `adapters/perf-data/bilibili-stat/` | `bash /bilibili-stat/run.sh ` | | 其他 | 无 adapter | 优雅降级到 Path A | @@ -113,11 +114,23 @@ allowed-tools: Bash(*), Read, Edit, Write, Glob, Grep, Skill - 字段已校准(观看 `view_count` 等已写死);万一接口改版导致某项为 0,看 report.md 末尾 galaxy 原始 JSON,把新 key 加进 `crawler.py` 的 `_normalize_note` - **评论可能抓不到**(xsec_token 缺失 / 评论关闭)→ report.md 标"未抓到评论" → 此时**降级要求用户 manual 粘 top 20 评论**(评论是真信号,不能省) +**wechat-channels 的特殊处理**: +- 作品 ID 从视频号助手的作品列表取得(通常是 `export/...`) +- 调用前确认 `.auth-wechat-channels/` 存在;不存在则提示运行 + `python /review.py login` 并使用微信扫码 +- adapter 从自己的视频号助手后台抓播放、点赞、爱心、评论数、分享、完播率、 + 平均播放时长、快速划走率和带来关注 +- adapter 会自动进入“互动管理 → 评论”,按作品 ID 抓评论文本、点赞数和作者回复, + 自动分页并写入 Top 20;如果后台列表未找到目标作品或评论权限受限,再降级 manual + **bilibili-stat 的特殊处理**: - 视频 URL(`https://www.bilibili.com/video/` 或 b23.tv 短链)或直接给 BV 号 → adapter 自动提取 BV 号 -- **无需登录**:B站视频数据(view)与评论(reply)都是公开接口、免 wbi 签名,adapter 是纯 httpx,没有 `crawler.py login` 步骤、不碰 `.auth/` -- 依赖 httpx:首次用 `pip install -r /requirements.txt` -- 评论按热度(sort=2)抓取;B站老接口主楼评论可能偏少,不足时降级 manual 粘 +- 单条视频的公开统计、热门评论和弹幕文本无需登录 +- 自动读取自己空间最近投稿时,首次运行 + `python /review.py login ` 扫码登录;状态保存到 `.auth-bilibili/` +- `python /review.py list ` 自动列出最近 20 条及 BV 号 +- 评论按热度抓取并展开接口返回的回复;弹幕文本带视频时间点,默认最多 100 条 +- 公开数据路径只用 Python 标准库;空间列表模式额外依赖 Playwright **任何 adapter 失败**(cookie 过期 / 接口变化 / 网络)→ **优雅降级到 manual**,提示用户:"adapter 调用失败,原因 [X]。改用 manual 模式——粘下面的数据"。**不阻塞流程**。