Skip to content

Commit

Permalink
Merge pull request #33 from dermasmid/sort-by
Browse files Browse the repository at this point in the history
fix sort by
  • Loading branch information
dermasmid authored Jul 14, 2023
2 parents 2f8c2ac + 8dc8967 commit 76ee440
Showing 1 changed file with 61 additions and 20 deletions.
81 changes: 61 additions & 20 deletions scrapetube/scrapetube.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ def get_channel(
``"streams"``: Streams
"""

sort_by_map = {"newest": "dd", "oldest": "da", "popular": "p"}

base_url = ""
if channel_url:
base_url = channel_url
Expand All @@ -68,13 +66,12 @@ def get_channel(
elif channel_username:
base_url = f"https://www.youtube.com/@{channel_username}"

url = "{base_url}/{content_type}?view=0&sort={sort_by}&flow=grid".format(
url = "{base_url}/{content_type}?view=0&flow=grid".format(
base_url=base_url,
content_type=content_type,
sort_by=sort_by_map[sort_by],
)
api_endpoint = "https://www.youtube.com/youtubei/v1/browse"
videos = get_videos(url, api_endpoint, type_property_map[content_type], limit, sleep)
videos = get_videos(url, api_endpoint, type_property_map[content_type], limit, sleep, sort_by)
for video in videos:
yield video

Expand Down Expand Up @@ -162,15 +159,39 @@ def get_search(
yield video



def get_video(
id: str,
) -> dict:

"""Get a single video.
Parameters:
id (``str``):
The video id from the video you want to get.
"""

session = get_session()
url = f"https://www.youtube.com/watch?v={id}"
html = get_initial_data(session, url)
client = json.loads(
get_json_from_html(html, "INNERTUBE_CONTEXT", 2, '"}},') + '"}}'
)["client"]
session.headers["X-YouTube-Client-Name"] = "1"
session.headers["X-YouTube-Client-Version"] = client["clientVersion"]
data = json.loads(
get_json_from_html(html, "var ytInitialData = ", 0, "};") + "}"
)
return next(search_dict(data, "videoPrimaryInfoRenderer"))



def get_videos(
url: str, api_endpoint: str, selector: str, limit: int, sleep: int
url: str, api_endpoint: str, selector: str, limit: int, sleep: int, sort_by: str = None
) -> Generator[dict, None, None]:
session = requests.Session()
session.headers[
"User-Agent"
] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.101 Safari/537.36"
session = get_session()
is_first = True
quit = False
quit_it = False
count = 0
while True:
if is_first:
Expand All @@ -184,8 +205,10 @@ def get_videos(
data = json.loads(
get_json_from_html(html, "var ytInitialData = ", 0, "};") + "}"
)
next_data = get_next_data(data)
next_data = get_next_data(data, sort_by)
is_first = False
if sort_by and sort_by != "newest":
continue
else:
data = get_ajax_data(session, api_endpoint, api_key, next_data, client)
next_data = get_next_data(data)
Expand All @@ -194,20 +217,28 @@ def get_videos(
count += 1
yield result
if count == limit:
quit = True
quit_it = True
break
except GeneratorExit:
quit = True
quit_it = True
break

if not next_data or quit:
if not next_data or quit_it:
break

time.sleep(sleep)

session.close()


def get_session() -> requests.Session:
session = requests.Session()
session.headers[
"User-Agent"
] = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"
session.headers["Accept-Language"] = "en"
return session

def get_initial_data(session: requests.Session, url: str) -> str:
session.cookies.set("CONSENT", "YES+cb", domain=".youtube.com")
response = session.get(url)
Expand Down Expand Up @@ -237,13 +268,23 @@ def get_json_from_html(html: str, key: str, num_chars: int = 2, stop: str = '"')
return html[pos_begin:pos_end]


def get_next_data(data: dict) -> dict:
raw_next_data = next(search_dict(data, "continuationEndpoint"), None)
if not raw_next_data:
def get_next_data(data: dict, sort_by: str = None) -> dict:
# Youtube, please don't change the order of these
sort_by_map = {
"newest": 0,
"popular": 1,
"oldest": 2,
}
if sort_by and sort_by != "newest":
endpoint = next(
search_dict(data, "feedFilterChipBarRenderer"), None)["contents"][sort_by_map[sort_by]]["chipCloudChipRenderer"]["navigationEndpoint"]
else:
endpoint = next(search_dict(data, "continuationEndpoint"), None)
if not endpoint:
return None
next_data = {
"token": raw_next_data["continuationCommand"]["token"],
"click_params": {"clickTrackingParams": raw_next_data["clickTrackingParams"]},
"token": endpoint["continuationCommand"]["token"],
"click_params": {"clickTrackingParams": endpoint["clickTrackingParams"]},
}

return next_data
Expand Down

0 comments on commit 76ee440

Please sign in to comment.