forked from Diaoxiaozhang/Ximalaya-Downloader
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmain.py
460 lines (437 loc) · 22 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
# -*- coding:utf-8 -*-
import asyncio
import json
import math
import os
import time
import logging
import traceback
from fake_useragent import UserAgent
from base64 import b64decode
import aiofiles
import aiohttp
import requests
from webdriver_manager.chrome import ChromeDriverManager
from webdriver_manager.microsoft import EdgeChromiumDriverManager
from selenium import webdriver
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
import selenium.common.exceptions
import colorama
from conf import BASE_DIR, RESULT_PATH
from utils.exceptions import XMLimitError
colorama.init(autoreset=True)
logger = logging.getLogger('logger')
logger.setLevel(logging.DEBUG)
file_handler = logging.FileHandler('app.log', mode='w', encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
ua = UserAgent()
class Ximalaya:
def __init__(self, account_name="vip"):
self.default_headers = {
"user-agent": ua.random
}
self.conf_path = BASE_DIR / "config" / f"{account_name}.conf"
self.download_path = RESULT_PATH / ".tmp" / "ximalaya"
# 解析声音,如果成功返回声音名和声音链接,否则返回False
def analyze_sound(self, sound_id, headers):
logger.debug(f'开始解析ID为{sound_id}的声音')
url = f"https://www.ximalaya.com/mobile-playpage/track/v3/baseInfo/{int(time.time() * 1000)}"
params = {
"device": "www2",
"trackId": sound_id,
"trackQualityLevel": 2
}
headers["referer"] = f"https://www.ximalaya.com/sound/{sound_id}"
try:
response = requests.get(url, headers=headers, params=params, timeout=15)
except Exception as e:
print(colorama.Fore.RED + f'ID为{sound_id}的声音解析失败!')
logger.debug(f'ID为{sound_id}的声音解析失败!')
logger.debug(traceback.format_exc())
return False
try:
not response.json()["trackInfo"]["isAuthorized"]
except KeyError:
print(colorama.Fore.RED + f'ID为{sound_id}的声音解析失败,可能因为达到每日付费音频下载上限!')
return False
if not response.json()["trackInfo"]["isAuthorized"]:
return 0 # 未购买或未登录vip账号
try:
sound_name = response.json()["trackInfo"]["title"]
encrypted_url_list = response.json()["trackInfo"]["playUrlList"]
except Exception as e:
print(colorama.Fore.RED + f'ID为{sound_id}的声音解析失败!')
logger.debug(f'ID为{sound_id}的声音解析失败!')
logger.debug(traceback.format_exc())
return False
if encrypted_url_list[0]["type"][:2] == "AI":
sound_info = {"name": sound_name, 0: "", 1: "", 2: ""}
sound_info[0] = sound_info[1] = self.decrypt_url(encrypted_url_list[0]["url"])
logger.debug(f'ID为{sound_id}的声音解析成功!')
return sound_info
else:
sound_info = {"name": sound_name, 0: "", 1: "", 2: ""}
for encrypted_url in encrypted_url_list:
if encrypted_url["type"] == "M4A_128":
sound_info[2] = self.decrypt_url(encrypted_url["url"])
elif encrypted_url["type"] == "MP3_64":
sound_info[1] = self.decrypt_url(encrypted_url["url"])
elif encrypted_url["type"] == "MP3_32":
sound_info[0] = self.decrypt_url(encrypted_url["url"])
logger.debug(f'ID为{sound_id}的声音解析成功!')
return sound_info
# 解析专辑,如果成功返回专辑名和专辑声音列表,否则返回False
def analyze_album(self, album_id):
logger.debug(f'开始解析ID为{album_id}的专辑')
url = "https://www.ximalaya.com/revision/album/v1/getTracksList"
params = {
"albumId": album_id,
"pageNum": 1,
"sort": 0,
"pageSize": 100
}
self.default_headers["referer"] = f"https://www.ximalaya.com/album/{album_id}"
retries = 5
while True:
try:
response = requests.get(url, headers=self.default_headers, params=params, timeout=15)
except Exception as e:
print(colorama.Fore.RED + f'ID为{album_id}的专辑解析失败!')
logger.debug(f'ID为{album_id}的专辑解析失败!')
logger.debug(traceback.format_exc())
raise XMLimitError(
f"xm analyze_album error(unknown reason), the reason I don't know : {e}, response.json(): {response.json()}")
if response.json()["data"]["tracks"] == []:
retries -= 1
else:
break
if retries == 0:
print(colorama.Fore.RED + f'ID为{album_id}的专辑解析失败!')
logger.debug(f'ID为{album_id}的专辑解析失败!(getTracksList错误)')
return False, False
pages = math.ceil(response.json()["data"]["trackTotalCount"] / 100)
sounds = []
for page in range(1, pages + 1):
params = {
"albumId": album_id,
"pageNum": page,
"sort": 0,
"pageSize": 100
}
retries = 5
while True:
try:
response = requests.get(url, headers=self.default_headers, params=params, timeout=30)
except Exception as e:
print(colorama.Fore.RED + f'ID为{album_id}的专辑解析失败!')
logger.debug(f'ID为{album_id}的专辑解析失败!')
logger.debug(traceback.format_exc())
return False, False
if response.json()["data"]["tracks"] == []:
print(f"第{page}页解析失败第{6-retries}次,共{pages}页")
retries -= 1
else:
print(f"第{page}页解析成功,共{pages}页")
break
if retries == 0:
print(colorama.Fore.RED + f'ID为{album_id}的专辑解析失败!')
logger.debug(f'ID为{album_id}的专辑解析失败!(getTracksList错误)')
return False, False
sounds += response.json()["data"]["tracks"]
album_name = sounds[0]["albumTitle"]
logger.debug(f'ID为{album_id}的专辑解析成功')
return album_name, sounds
# 协程解析声音
async def async_analyze_sound(self, sound_id, session, headers):
retries = 3
url = f"https://www.ximalaya.com/mobile-playpage/track/v3/baseInfo/{int(time.time() * 1000)}"
params = {
"device": "www2",
"trackId": sound_id,
"trackQualityLevel": 2
}
headers["referer"] = f"https://www.ximalaya.com/sound/{sound_id}"
while retries > 0:
try:
async with session.get(url, headers=headers, params=params, timeout=20) as response:
response_json = json.loads(await response.text())
sound_name = response_json["trackInfo"]["title"]
encrypted_url_list = response_json["trackInfo"]["playUrlList"]
break
except KeyError:
print(colorama.Fore.RED + f'ID为{sound_id}的声音解析失败,可能因为达到每日付费音频下载上限')
return False
except Exception as e:
logger.debug(f'ID为{sound_id}的声音解析失败!')
logger.debug(traceback.format_exc())
if retries == 0:
print(colorama.Fore.RED + f'ID为{sound_id}的声音解析失败!')
return False
retries -= 1
if not response_json["trackInfo"]["isAuthorized"]:
return 0 # 未购买或未登录vip账号
if encrypted_url_list[0]["type"][:2] == "AI":
sound_info = {"name": sound_name, 0: "", 1: "", 2: ""}
sound_info[0] = sound_info[1] = self.decrypt_url(encrypted_url_list[0]["url"])
logger.debug(f'ID为{sound_id}的声音解析成功!')
return sound_info
else:
sound_info = {"name": sound_name, 0: "", 1: "", 2: ""}
for encrypted_url in encrypted_url_list:
if encrypted_url["type"] == "M4A_128":
sound_info[2] = self.decrypt_url(encrypted_url["url"])
elif encrypted_url["type"] == "MP3_64":
sound_info[1] = self.decrypt_url(encrypted_url["url"])
elif encrypted_url["type"] == "MP3_32":
sound_info[0] = self.decrypt_url(encrypted_url["url"])
logger.debug(f'ID为{sound_id}的声音解析成功!')
return sound_info
# 将文件名中不能包含的字符替换为空格
def replace_invalid_chars(self, name):
invalid_chars = ['/', '\\', ':', '*', '?', '"', '<', '>', '|']
for char in invalid_chars:
if char in name:
name = name.replace(char, " ")
return name
# 下载单个声音
def get_sound(self, sound_name, sound_url, path):
retries = 3
sound_name = self.replace_invalid_chars(sound_name)
if '?' in sound_url:
type = sound_url.split('?')[0][-3:]
else:
type = sound_url[-3:]
if os.path.exists(f"{path}/{sound_name}.{type}"):
print(f'{sound_name}已存在!')
return
while retries > 0:
try:
logger.debug(f'开始下载声音{sound_name}')
response = requests.get(sound_url, headers=self.default_headers, timeout=60)
break
except Exception as e:
logger.debug(f'{sound_name}第{4 - retries}次下载失败!')
logger.debug(traceback.format_exc())
retries -= 1
if retries == 0:
print(colorama.Fore.RED + f'{sound_name}下载失败!')
logger.debug(f'{sound_name}经过三次重试后下载失败!')
return False
sound_file = response.content
if not os.path.exists(path):
os.makedirs(path)
with open(f"{path}/{sound_name}.{type}", mode="wb") as f:
f.write(sound_file)
print(f'{sound_name}下载完成!')
logger.debug(f'{sound_name}下载完成!')
# 协程下载声音
async def async_get_sound(self, sound_name, sound_url, album_name, session, path, global_retries, num=None):
retries = 3
logger.debug(f'开始下载声音{sound_name}')
if num is None:
sound_name = self.replace_invalid_chars(sound_name)
else:
sound_name = f"{num}-{sound_name}"
sound_name = self.replace_invalid_chars(sound_name)
if '?' in sound_url:
type = sound_url.split('?')[0][-3:]
else:
type = sound_url[-3:]
album_name = self.replace_invalid_chars(album_name)
album_path = path / f"{album_name}"
album_path.mkdir(parents=True, exist_ok=True)
if (path / f"{album_name}/{sound_name}.{type}").exists():
print(f'{sound_name}已存在!')
return None
while retries > 0:
try:
async with session.get(sound_url, headers=self.default_headers, timeout=120) as response:
async with aiofiles.open(f"{path}/{album_name}/{sound_name}.{type}", mode="wb") as f:
await f.write(await response.content.read())
print(f'{sound_name}下载完成!')
logger.debug(f'{sound_name}下载完成!')
break
except Exception as e:
logger.debug(f'{sound_name}第{global_retries * 3 + 4 - retries}次下载失败!')
logger.debug(traceback.format_exc())
retries -= 1
if os.path.exists(f"{path}/{album_name}/{sound_name}.{type}"):
os.remove(f"{path}/{album_name}/{sound_name}.{type}")
if retries == 0:
return ([sound_name, sound_url, album_name, session, path, global_retries, num])
# 下载专辑中的选定声音
async def get_selected_sounds(self, sounds, album_name, start, end, headers, quality, number, path):
tasks = []
global_retries = 0
max_global_retries = 2
session = aiohttp.ClientSession()
digits = len(str(len(sounds)))
for i in range(start - 1, end):
sound_id = sounds[i]["trackId"]
tasks.append(asyncio.create_task(self.async_analyze_sound(sound_id, session, headers)))
sounds_info = await asyncio.gather(*tasks)
# xm加密链接全部解密失败,意味着可能账号超出限制,需要手动下载
if not all(sounds_info):
await session.close()
raise XMLimitError("也许触发了xm的日限制!")
tasks = []
if number:
num = start
for sound_info in sounds_info:
if sound_info is False or sound_info == 0:
continue
num_ = str(num).zfill(digits)
if quality == 2 and sound_info[2] == "":
quality = 1
tasks.append(asyncio.create_task(self.async_get_sound(sound_info["name"], sound_info[quality], album_name, session, path, global_retries, num_)))
num += 1
else:
for sound_info in sounds_info:
if sound_info is False or sound_info == 0:
continue
if quality == 2 and sound_info[2] == "":
quality = 1
tasks.append(asyncio.create_task(self.async_get_sound(sound_info["name"], sound_info[quality], album_name, session, path, global_retries)))
failed_downloads = [result for result in await asyncio.gather(*tasks) if result is not None]
while failed_downloads and global_retries < max_global_retries:
tasks = [asyncio.create_task(self.async_get_sound(*failed_download)) for failed_download in failed_downloads]
failed_downloads = [result for result in await asyncio.gather(*tasks) if result is not None]
global_retries += 1
print("专辑全部选定声音下载完成!")
if failed_downloads:
for failed_download in failed_downloads:
print(colorama.Fore.RED + f'声音{failed_download[0]}下载失败!')
await session.close()
# 解密vip声音url
def decrypt_url(self, encrypted_url):
o = bytes([183, 174, 108, 16, 131, 159, 250, 5, 239, 110, 193, 202, 153, 137, 251, 176, 119, 150, 47, 204, 97, 237, 1, 71, 177, 42, 88, 218, 166, 82, 87, 94, 14, 195, 69, 127, 215, 240, 225, 197, 238, 142, 123, 44, 219, 50, 190, 29, 181, 186, 169, 98, 139, 185, 152, 13, 141, 76, 6, 157, 200, 132, 182, 49, 20, 116, 136, 43, 155, 194, 101, 231, 162, 242, 151, 213, 53, 60, 26, 134, 211, 56, 28, 223, 107, 161, 199, 15, 229, 61, 96, 41, 66, 158, 254, 21, 165, 253, 103, 89, 3, 168, 40, 246, 81, 95, 58, 31, 172, 78, 99, 45, 148, 187, 222, 124, 55, 203, 235, 64, 68, 149, 180, 35, 113, 207, 118, 111, 91, 38, 247, 214, 7, 212, 209, 189, 241, 18, 115, 173, 25, 236, 121, 249, 75, 57, 216, 10, 175, 112, 234, 164, 70, 206, 198, 255, 140, 230, 12, 32, 83, 46, 245, 0, 62, 227, 72, 191, 156, 138, 248, 114, 220, 90, 84, 170, 128, 19, 24, 122, 146, 80, 39, 37, 8, 34, 22, 11, 93, 130, 63, 154, 244, 160, 144, 79, 23, 133, 92, 54, 102, 210, 65, 67, 27, 196, 201, 106, 143, 52, 74, 100, 217, 179, 48, 233, 126, 117, 184, 226, 85, 171, 167, 86, 2, 147, 17, 135, 228, 252, 105, 30, 192, 129, 178, 120, 36, 145, 51, 163, 77, 205, 73, 4, 188, 125, 232, 33, 243, 109, 224, 104, 208, 221, 59, 9])
a = bytes([204, 53, 135, 197, 39, 73, 58, 160, 79, 24, 12, 83, 180, 250, 101, 60, 206, 30, 10, 227, 36, 95, 161, 16, 135, 150, 235, 116, 242, 116, 165, 171])
encrypted_url = encrypted_url.replace('_', '/').replace('-', '+')
padding = '=' * (-len(encrypted_url) % 4)
encrypted_data = b64decode(encrypted_url + padding)
if len(encrypted_data) < 16:
return encrypted_url
data = encrypted_data[:-16]
iv = encrypted_data[-16:]
decrypted_data = bytearray(data)
for i in range(len(decrypted_data)):
decrypted_data[i] = o[decrypted_data[i]]
for i in range(0, len(decrypted_data), 16):
block = decrypted_data[i:i+16]
decrypted_data[i:i+16] = bytes(a ^ b for a, b in zip(block, iv))
for i in range(0, len(decrypted_data), 32):
block = decrypted_data[i:i+32]
decrypted_data[i:i+32] = bytes(a ^ b for a, b in zip(block, a))
return decrypted_data.decode('utf-8')
# 判断专辑是否为付费专辑,如果是免费专辑返回0,如果是已购买的付费专辑返回1,如果是未购买的付费专辑返回2,如果解析失败返回False
def judge_album(self, album_id, headers):
logger.debug(f'开始判断ID为{album_id}的专辑的类型')
url = "https://www.ximalaya.com/revision/album/v1/simple"
params = {
"albumId": album_id
}
try:
response = requests.get(url, headers=headers, params=params, timeout=15)
except Exception as e:
print(colorama.Fore.RED + f'ID为{album_id}的专辑解析失败!')
logger.debug(f'ID为{album_id}的专辑判断类型失败!')
logger.debug(traceback.format_exc())
return False
logger.debug(f'ID为{album_id}的专辑判断类型成功!')
if not response.json()["data"]["albumPageMainInfo"]["isPaid"]:
return 0 # 免费专辑
elif response.json()["data"]["albumPageMainInfo"]["hasBuy"]:
return 1 # 已购专辑
else:
return 2 # 未购专辑
# 获取配置文件中的cookie和path
def analyze_config(self) -> str:
try:
with open(self.conf_path, "r", encoding="utf-8") as f:
return f.read()
except Exception as e:
return ""
# 判断cookie是否有效
def judge_cookie(self, cookie):
url = "https://www.ximalaya.com/revision/my/getCurrentUserInfo"
headers = {
"user-agent": ua.random,
"cookie": cookie
}
try:
response = requests.get(url, headers=headers, timeout=15)
except Exception as e:
print("无法获取喜马拉雅用户数据,请检查网络状况!")
logger.debug("无法获取喜马拉雅用户数据!")
logger.debug(traceback.format_exc())
if response.json()["ret"] == 200:
return response.json()["data"]["userName"]
else:
return False
# 登录喜马拉雅账号
def login(self):
print("在浏览器中登录并自动提取cookie")
print("Google Chrome")
option = webdriver.ChromeOptions()
option.add_experimental_option("detach", True)
option.add_experimental_option('excludeSwitches', ['enable-logging'])
driver = webdriver.Chrome(ChromeDriverManager().install(), options=option)
print("请在弹出的浏览器中登录喜马拉雅账号,登陆成功浏览器会自动关闭")
driver.get("https://passport.ximalaya.com/page/web/login")
try:
WebDriverWait(driver, 300).until(EC.url_to_be("https://www.ximalaya.com/"))
cookies = driver.get_cookies()
logger.debug('以下是使用浏览器登录喜马拉雅账号时的浏览器日志:')
for entry in driver.get_log('browser'):
logger.debug(entry['message'])
logger.debug('浏览器日志结束')
driver.quit()
except selenium.common.exceptions.TimeoutException:
print("登录超时,自动返回主菜单!")
logger.debug('以下是使用浏览器登录喜马拉雅账号时的浏览器日志:')
for entry in driver.get_log('browser'):
logger.debug(entry['message'])
logger.debug('浏览器日志结束')
driver.quit()
return
cookie = ""
for cookie_ in cookies:
cookie += f"{cookie_['name']}={cookie_['value']}; "
with open(self.conf_path, "w", encoding="utf-8") as f:
json.dump(cookie, f)
username = self.judge_cookie(cookie)
print(f"成功登录账号{username}!")
return cookie
if __name__ == "__main__":
loop = asyncio.get_event_loop()
album_id = "56066051"
ximalaya = Ximalaya()
cookie = ximalaya.analyze_config()
if not cookie or not ximalaya.judge_cookie(ximalaya.analyze_config()):
print("登录信息过期重新登录!")
if not ximalaya.login():
print("似乎登录了也没卵用")
exit()
headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36 Edg/111.0.1660.14",
"cookie": cookie
}
album_name, sounds = ximalaya.analyze_album(album_id)
if not sounds:
exit()
album_type = ximalaya.judge_album(album_id, headers)
if album_type == 0:
print(f"成功解析免费专辑{album_id},专辑名{album_name},共{len(sounds)}个声音")
elif album_type == 1:
print(f"成功解析已购付费专辑{album_id},专辑名{album_name},共{len(sounds)}个声音")
else:
print(f"成功解析付费专辑{album_id},专辑名{album_name},但是当前登陆账号未购买此专辑或未开通vip")
start = 1
end = len(sounds)
quality = 0 # 0 低质量 1 普通 2 高质量
loop.run_until_complete(ximalaya.get_selected_sounds(sounds, album_name, start, end, headers, 0, True, ximalaya.download_path))