Skip to content

Commit

Permalink
added a lot more practical functionality, oriented towards security a…
Browse files Browse the repository at this point in the history
…nd not having things break
  • Loading branch information
AmeerArsala committed Jun 7, 2024
1 parent 17a78d4 commit 0d0479a
Show file tree
Hide file tree
Showing 8 changed files with 553 additions and 77 deletions.
265 changes: 207 additions & 58 deletions app/core/routes/apotheosis.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,200 @@
from typing import Union, Optional, List, Dict
from typing import Union, Optional, List, Dict, Tuple
from fastapi import APIRouter, Depends
from pydantic import BaseModel, Field

from app.core import api_auth

from app.core.schemas.entities import Chunk
from app.core.schemas.users import TierPlan

from app.lib import society, states

import sqlalchemy
import app.core.db.database as db

import re

import numpy as np
import pandas as pd


# prefix: /apotheosis
router = APIRouter(tags=["apotheosis"], dependencies=[Depends(api_auth.get_api_key)])


class CreateCommunityParams(BaseModel):
name: str
private: bool

ai_generate_base: bool = Field(default=False)
# these fields don't matter unless ai_generate_base is True
# these are the default values of the corresponding arguments in society.generate_chunks, so don't touch them
community_desc: str = Field(default="")
num_chunks: int = Field(default=-1)

invited_owners_emails: List[str] = Field(default=[])

# this doesn't matter unless private is True
invited_whitelisted_emails: List[str] = Field(default=[])


class CreateCommunityResponse(BaseModel):
success: bool
name_allowed: bool
privacy_allowed: bool

# these return "OK" if they are allowed and an error message otherwise
name_status_message: str
privacy_status_message: str

community_id: int = Field(default=-1)


@router.post("/community")
async def create_community(params: CreateCommunityParams) -> int:
async def create_community(
params: CreateCommunityParams, api_key: str = Depends(api_auth.get_api_key)
) -> CreateCommunityResponse:
# This is done so the user doesn't go around fucking with other users' accounts
print("Reading API Key...")
email: str = api_auth.read_email_from_api_key(api_key)

response_dict: Dict = {}
success: bool = True

print("Validating Community parameters...")
# Check if the privacy option they chose is allowed
if (
params.private
and society.get_user_tier_plan_from_email(email) < TierPlan.PRO_TIER
):
response_dict["privacy_allowed"] = False
response_dict["privacy_status_message"] = (
"You must be on a paid plan (Pro or beyond) to create private societies"
)
success = False
else:
response_dict["privacy_allowed"] = True
response_dict["privacy_status_message"] = "OK"

# Now, check if the name is valid
def is_valid_name(name: str) -> bool:
# Define a regular expression that matches lowercase letters, numbers, and hyphens
pattern = re.compile(r"^[a-z0-9-]+$")

# Check if the name matches the pattern
return bool(pattern.match(name))

# Assumes `name` has valid chars
def complete_name(name: str) -> str:
return f"{email}/{name}" if params.private else name

def is_name_available(name: str) -> bool:
full_name: str = complete_name(name)

return society.is_community_name_available(full_name)

if not is_valid_name(params.name):
response_dict["name_allowed"] = False
response_dict["name_status_message"] = (
"Names can only consist of lowercase letters, digits, and hyphens (-)"
)
success = False
elif success and not is_name_available(params.name):
response_dict["name_allowed"] = False
response_dict["name_status_message"] = (
f"{complete_name(params.name)} is unavailable or already taken"
)
success = False

if not success:
print("Creation of Community failed. Parameters invalid")
return CreateCommunityResponse(success=False, **response_dict)

# Otherwise, success is True and create the community!
print("Parameters valid. Creating Community...")
community_id: int = society.create_community(params.name)

return community_id
# Generate and add chunks if specified
if params.ai_generate_base:
print("Generating chunks...")
generated_chunks: List[Chunk] = society.generate_chunks(
community_id, desc=params.community_desc, count=params.num_chunks
)

print("Uploading generated chunks...")
society.create_chunks(generated_chunks)

# Invite people

# Normalize
print("Adding owner(s)...")
added_owners_emails: List[str] = params.invited_owners_emails + email
added_owners_emails = pd.unique(np.array(added_owners_emails)).tolist()

society.add_community_owners(community_id, added_owners_emails)

if params.private:
print("Adding user(s) to whitelist...")

# Normalize
added_whitelist_emails: List[str] = pd.unique(
# add owners emails because all owners must be whitelisted too in order to even enter the society
np.array(params.invited_whitelisted_emails + added_owners_emails)
).tolist()

society.add_users_to_community_whitelist(
community_id, added_whitelist_emails, bypass_privacy_check=True
)

# Have OG user join the community
print("Joining Community...")
society.join_community_by_id(email, community_id)

return CreateCommunityResponse(
success=True, community_id=community_id, **response_dict
)


class CreateChunksParams(BaseModel):
chunks: List[Chunk]


@router.post("/chunks")
async def create_chunks(params: CreateChunksParams):
society.create_chunks(params.chunks)
async def create_chunks(
params: CreateChunksParams, api_key: str = Depends(api_auth.get_api_key)
):
"""Constraint: users can only create chunks in communities they have joined."""
print("Reading API Key...")
email: str = api_auth.read_email_from_api_key(api_key)

# Find out whether user is in the communities to be able to create chunks in them
print("Filtering chunks to add to only the ones the user is allowed to add...")
desired_community_ids: List[int] = [chunk.community_id for chunk in params.chunks]
with db.engine.begin() as conn:
query = """
SELECT users_communities.community_id
FROM users INNER JOIN users_communities ON users.id = users_communities.user_id
WHERE users.email = :email AND users_communities.community_id IN :desired_community_ids
"""

results: List[Tuple[int]] = conn.execute(
sqlalchemy.text(query),
[{"email": email, "desired_community_ids": tuple(desired_community_ids)}],
).fetchall()

available_community_ids: List[int] = [result[0] for result in results]

# THESE are the chunks to create
filtered_chunks: List[Chunk] = list(
filter(
lambda chunk: (chunk.community_id in set(available_community_ids)),
params.chunks,
)
)

if len(filtered_chunks) > 0:
print("Only adding the chunks that the user is allowed to add...")
society.create_chunks(filtered_chunks)

return "OK"

Expand All @@ -51,46 +212,22 @@ async def join_community(
params: JoinCommunityParams, api_key: str = Depends(api_auth.get_api_key)
):
# It is already assumed that a valid api key exists by virtue of this route
print("Reading API Key...")
email: str = api_auth.read_email_from_api_key(api_key)

# 1
if email != params.user_email:
return "Bruh use your own email stop tryna use other ppls accounts"

# Action
def join():
society.join_community(
user_email=params.user_email, community_name=params.community_name
)

# 2
is_public: bool = "/" not in params.community_name
if is_public:
join()
return "OK"

can_access_community: bool = False

with db.engine.begin() as conn:
# Get community id first
(community_id,) = conn.execute(
sqlalchemy.text("SELECT id FROM communities WHERE name = :name"),
{"name": params.community_name},
).first()
return "BRUH MOMENT EXCEPTION: use your own damn email stop tryna use other ppls accounts"

query: str = """
SELECT COUNT(eligible_users_for_communities.user_id)
FROM users INNER JOIN eligible_users_for_communities ON users.id = eligible_users_for_communities.user_id
WHERE users.email = :email AND eligible_users_for_communities.community_id = :community_id
"""

(num,) = conn.execute(
sqlalchemy.text(query), {"email": email, "community_id": community_id}
).first()
can_access_community = num > 0
# 2 - join if can access
print("Seeing if user can access community...")
can_access_community: bool = society.user_can_access_community(
params.community_name, params.user_email
)

if can_access_community:
join()
print("Joining community...")
society.join_community(params.user_email, params.community_name)
return "OK"
else:
return "Access Denied"
Expand All @@ -109,6 +246,8 @@ class WhitelistUsersParams(BaseModel):
async def whitelist_users(
params: WhitelistUsersParams, api_key: str = Depends(api_auth.get_api_key)
):
# Check if requesting user has owner perms
user_email: str = api_auth.read_email_from_api_key(api_key)

with db.engine.begin() as conn:
# Get community id first
Expand All @@ -117,18 +256,21 @@ async def whitelist_users(
{"name": params.community_name},
).first()

query: str = """
INSERT INTO eligible_users_for_communities(user_id, community_id)
SELECT DISTINCT users.id, :community_id
FROM users
WHERE users.email IN :emails
# Check if user is an owner
query = """
SELECT COUNT(*)
FROM users INNER JOIN communities_owners ON users.id = communities_owners.user_id
WHERE users.email = :email AND community_owners.community_id = :community_id
"""

# Execute the insertion
conn.execute(
(occurrences_as_owner,) = conn.execute(
sqlalchemy.text(query),
{"community_id": community_id, "emails": tuple(params.whitelisted_emails)},
)
[{"email": user_email, "community_id": community_id}],
).first()

if occurrences_as_owner == 0:
return "Access Denied: You must be an owner to add users to the whitelist"

society.add_users_to_community_whitelist(community_id, params.whitelisted_emails)

return "OK"

Expand All @@ -143,24 +285,31 @@ async def promote_owners(
params: PromoteOwnersParams, api_key: str = Depends(api_auth.get_api_key)
):
# NOTE: same thing as the function above, just with adding to `communities_owners` rather than `eligible_users_for_communities`

# Check if requesting user has owner perms
user_email: str = api_auth.read_email_from_api_key(api_key)

with db.engine.begin() as conn:
# Get community id first
(community_id,) = conn.execute(
sqlalchemy.text("SELECT id FROM communities WHERE name = :name"),
{"name": params.community_name},
).first()

query: str = """
INSERT INTO communities_owners(user_id, community_id)
SELECT DISTINCT users.id, :community_id
FROM users
WHERE users.email IN :emails
# Check if user is an owner
query = """
SELECT COUNT(*)
FROM users INNER JOIN communities_owners ON users.id = communities_owners.user_id
WHERE users.email = :email AND community_owners.community_id = :community_id
"""

# Execute the insertion
conn.execute(
(occurrences_as_owner,) = conn.execute(
sqlalchemy.text(query),
{"community_id": community_id, "emails": tuple(params.new_owners_emails)},
)
[{"email": user_email, "community_id": community_id}],
).first()

if occurrences_as_owner == 0:
return "Access Denied: You must be an owner to add more owners"

society.add_community_owners(community_id, params.new_owners_emails)

return "OK"
20 changes: 20 additions & 0 deletions app/core/routes/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,26 @@ async def view_user_details(request: Request) -> UserDetails:
return UserDetails(**user_details_dict)


@router.get("/view_details/email")
async def view_user_email(request: Request) -> str:
user_client: KindeApiClient = user_kinde_client(request.url)

user_details_dict: Dict[str, str] = user_client.get_user_details()
return UserDetails(**user_details_dict).email


@router.get("/view_details/user_core_id")
async def view_user_id(request: Request) -> int:
user_client: KindeApiClient = user_kinde_client(request.url)

user_details_dict: Dict[str, str] = user_client.get_user_details()
user_details: UserDetails = UserDetails(**user_details_dict)

user_id: int = society.get_user_id(user_details.email)

return user_id


@router.post("/manifest")
async def manifest_user(request: Request) -> int:
"""Returns the user_id"""
Expand Down
9 changes: 9 additions & 0 deletions app/core/schemas/users.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import List, Dict
from pydantic import BaseModel, Field
from enum import IntEnum


class UserDetails(BaseModel):
Expand All @@ -8,3 +9,11 @@ class UserDetails(BaseModel):
family_name: str
email: str
picture: str # a link to the image


class TierPlan(IntEnum):
FREE_TIER = 1
PRO_TIER = 2
SCALE_TIER = 3
ADMIN_TIER = 4
ALMIGHTY_TIER = 9001
Loading

0 comments on commit 0d0479a

Please sign in to comment.