Skip to content

Commit

Permalink
Merge pull request #8 from knewkarma-io/dev
Browse files Browse the repository at this point in the history
Apply changes from Karma Kaze, and fixed the comments pagination process
  • Loading branch information
rly0nheart authored Oct 20, 2024
2 parents bff0bc4 + 8dcfc7e commit ffc392e
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 376 deletions.
10 changes: 5 additions & 5 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "kraw"
version = "0.2.4"
version = "0.2.5"
description = "Reddit API wrapper for Knew Karma"
authors = ["Richard Mwewa <[email protected]>"]
license = "GPL-3.0+"
Expand All @@ -10,7 +10,7 @@ homepage = "https://github.com/knewkarma-io/kraw"
[tool.poetry.dependencies]
python = "^3.10"
aiohttp = "^3.10.10"
karmakaze = "^2.0.0"
karmakaze = "^3.0.0"

[tool.poetry.group.dev.dependencies]
flake8 = "^7.1.1"
Expand Down
96 changes: 76 additions & 20 deletions src/kraw/connection.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,32 @@
import asyncio
import time
from random import randint
from types import SimpleNamespace
from typing import Optional, Callable, List, Dict, Union

import karmakaze
from aiohttp import ClientSession

from . import dummies

__all__ = ["Connection"]


class Endpoints:

base: str = "https://www.reddit.com"
user: str = f"{base}/u"
users: str = f"{base}/users"
subreddit: str = f"{base}/r"
subreddits: str = f"{base}/subreddits"
username_available: str = f"{base}/api/username_available.json"
infra_status: str = "https://www.redditstatus.com/api/v2/status.json"
infra_components: str = "https://www.redditstatus.com/api/v2/components.json"


class Connection:
def __init__(self, headers: Dict):
self._headers = headers
self._sanitise = karmakaze.Sanitise()
self.endpoints = Endpoints()

async def send_request(
self,
Expand All @@ -37,11 +51,12 @@ async def paginate_response(
session: ClientSession,
endpoint: str,
limit: int,
sanitiser: Callable,
parser: Callable,
message: Optional[dummies.Message] = None,
status: Optional[dummies.Status] = None,
params: Optional[Dict] = None,
is_post_comments: Optional[bool] = False,
) -> List[Dict]:
) -> List[SimpleNamespace]:

# Initialise an empty list to store all items across paginated requests.
all_items: List = []
Expand All @@ -60,17 +75,21 @@ async def paginate_response(
),
params=params,
)

if is_post_comments:
items = await self._process_post_comments(
session=session,
endpoint=endpoint,
response=response,
sanitiser=sanitiser,
response=parser(response[1]),
parser=parser,
limit=limit,
status=status,
message=message,
)
else:

# If not handling comments, simply extract the items from the response.
items = sanitiser(response)
items = parser(response=response).children

# If no items are found, break the loop as there's nothing more to fetch.
if not items:
Expand All @@ -83,10 +102,11 @@ async def paginate_response(
all_items.extend(items[:items_to_limit])

# Update the last_item_id to the ID of the last fetched item for pagination.

last_item_id = (
self._sanitise.pagination_id(response=response[1])
parser(response=response[1]).after
if is_post_comments
else self._sanitise.pagination_id(response=response)
else parser(response=response).after
)

# If we've reached the specified limit, break the loop.
Expand Down Expand Up @@ -116,44 +136,80 @@ async def _paginate_more_items(
session: ClientSession,
more_items_ids: List[str],
endpoint: str,
parser: Callable,
fetched_items: List[Dict],
limit: int,
status: Optional[dummies.Status] = None,
message: Optional[dummies.Message] = None,
):
# Track how many more items are needed to meet the overall limit
remaining_items = limit - len(fetched_items)

if remaining_items <= 0:
return # Stop if we've already hit the limit

message.ok(f"Found {len(more_items_ids)} comments on post")
for more_id in more_items_ids:
# Check if we still need more items, and stop if we've reached the limit.
if len(fetched_items) >= limit:
break

# Construct the endpoint for each additional comment ID.
more_endpoint = f"{endpoint}?comment={more_id}"
# Make an asynchronous request to fetch the additional comments.
more_response = await self.send_request(
session=session, endpoint=more_endpoint
)
# Extract the items (comments) from the response.
more_items = self._sanitise.comments(response=more_response)
more_items = parser(response=more_response[1])

# Determine how many more items we can add without exceeding the limit.
items_to_add = min(remaining_items, len(more_items.children))

# Add the allowed number of items to the main items list.
fetched_items.extend(more_items.children[:items_to_add])

# Update the remaining items to be fetched.
remaining_items -= items_to_add

# Add the fetched items to the main items list.
fetched_items.extend(more_items)
# Stop if we've reached the limit.
if remaining_items <= 0:
break

# Introduce a random sleep duration to avoid rate-limiting.
sleep_duration = randint(1, 5)
await self._pagination_countdown_timer(
duration=sleep_duration,
overall_count=limit,
current_count=len(fetched_items),
status=status,
)

async def _process_post_comments(self, **kwargs):
# If the request is for post comments, handle the response accordingly.
items = [] # Initialise a list to store fetched items.
more_items_ids = [] # Initialise a list to store IDs from "more" items.

# Iterate over the children in the response to extract comments or "more" items.
for item in self._sanitise.comments(response=kwargs.get("response")):
if self._sanitise.kind(item) == "t1":
sanitised_item = kwargs.get("sanitiser")(item)

for item in kwargs.get("response").children:
if item.kind == "t1":
# If the item is a comment (kind == "t1"), add it to the items list.
items.append(sanitised_item)
elif self._sanitise.kind(item) == "more":
items.append(item)
elif item.kind == "more":
# If the item is of kind "more", extract the IDs for additional comments.
more_items_ids.extend(item.get("data", {}).get("children", []))
more_items_ids.extend(item.data.children)

# If there are more items to fetch (kind == "more"), make additional requests.
if more_items_ids:
await self._paginate_more_items(
session=kwargs.get("session"),
message=kwargs.get("message"),
status=kwargs.get("status"),
fetched_items=items,
more_items_ids=more_items_ids,
endpoint=kwargs.get("endpoint"),
limit=kwargs.get("limit"),
parser=kwargs.get("parser"),
)

return items
Expand All @@ -175,7 +231,7 @@ async def _pagination_countdown_timer(
)

countdown_text: str = (
f"[cyan]{current_count}[/] (of [cyan]{overall_count}[/]) items fetched so far. "
f"Gotten [cyan]{current_count}[/] of [cyan]{overall_count}[/] items so far. "
f"Resuming in [cyan]{remaining_seconds}.{remaining_milliseconds:02}[/] seconds"
)

Expand Down
11 changes: 7 additions & 4 deletions src/kraw/dummies.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
class Status:
@staticmethod
def update(dummy_str: str):
pass
__all__ = ["Message", "Status"]


class Message:
Expand All @@ -14,4 +11,10 @@ def warning(dummy_str: str):
pass


class Status:
@staticmethod
def update(dummy_str: str):
pass


# -------------------------------- END ----------------------------------------- #
Loading

0 comments on commit ffc392e

Please sign in to comment.