Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding pydantic models for kafka event. #15

Draft
wants to merge 26 commits into
base: west-playground
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
f14ea08
Adding pydantic models for kafka event.
rhysrevans3 Dec 4, 2024
33a7d8a
updating poetry.lock.
rhysrevans3 Dec 5, 2024
f95a8d3
Updating esgf playground dep.
rhysrevans3 Dec 5, 2024
f2a68f0
Switch to branch name.
rhysrevans3 Dec 5, 2024
0555a05
Updating model version.
rhysrevans3 Jan 8, 2025
751e38b
Update settings import.
rhysrevans3 Jan 8, 2025
e518058
Adding dummy event and request ids.
rhysrevans3 Jan 14, 2025
583f8c5
Bug fixing.
rhysrevans3 Jan 14, 2025
4d4a1b6
Making topic configurable.
rhysrevans3 Jan 30, 2025
6a8c9e6
Switch to get.
rhysrevans3 Jan 30, 2025
da16d4d
Adding EGI authorization.
rhysrevans3 Mar 5, 2025
c5133d4
egi authorize.
rhysrevans3 Mar 5, 2025
d1bd8b5
Merge branch 'west-playground' of github.com:esgf2-us/stac-transactio…
rhysrevans3 Mar 5, 2025
0ae70ef
Switch to CMIP6Item.
rhysrevans3 Mar 6, 2025
a1b9f93
Updating client id ref.
rhysrevans3 Mar 6, 2025
65bdd0c
Renaming types.
rhysrevans3 Mar 6, 2025
6b62dd6
Moving access_control_policy to settings.
rhysrevans3 Mar 6, 2025
e16e460
pylint "raise expection from e" fix.
rhysrevans3 Mar 6, 2025
0f7b137
Checkign only hostname for node permission.
rhysrevans3 Mar 10, 2025
0d847a2
Switch to json response.
rhysrevans3 Mar 10, 2025
609846b
Adding development logging.
rhysrevans3 Mar 10, 2025
760c115
Bug fixing.
rhysrevans3 Mar 11, 2025
191ed7c
Remove unused imports.
rhysrevans3 Mar 11, 2025
b16c995
Switch to CMIP6Item for models.
rhysrevans3 Mar 11, 2025
82d9f5a
Metadata bug fixing.
rhysrevans3 Mar 11, 2025
28e414c
Update update and delete.
rhysrevans3 Mar 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
472 changes: 233 additions & 239 deletions poetry.lock

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "stac-transaction-api"
version = "0.1.0"
description = "West ESGF STAC Transaction API"
authors = ["Lukasz Lacinski <[email protected]>", "Steve Turoscy <[email protected]>"]
authors = ["Lukasz Lacinski <[email protected]>", "Steve Turoscy <[email protected]"]
readme = "README.md"
packages = [
{ include = "src" }
Expand All @@ -16,9 +16,11 @@ cffi = "1.16.0"
charset-normalizer = "3.3.2"
confluent-kafka = "2.6.0"
cryptography = "42.0.8"
esgf-playground-utils = "1.0.0"
esgf-playground-utils = "1.0.1"
fastapi = "0.114.0"
globus-sdk = "3.41.0"
httpx = "0.28.1"
httpx-auth = "0.23.1"
idna = "3.7"
mangum = "0.17.0"
pyjwt = "2.8.0"
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ charset-normalizer==3.3.2
click==8.1.7
confluent-kafka==2.6.0
cryptography==42.0.8
esgf-playground-utils==1.0.0
esgf-playground-utils==1.0.1
fastapi==0.114.0
geojson-pydantic==1.1.2
globus-sdk==3.41.0
Expand Down
16 changes: 10 additions & 6 deletions src/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
from stac_fastapi.extensions.core.transaction import TransactionExtension
from stac_fastapi.types.config import ApiSettings

from authorizer import Authorizer
from authorizer import EGIAuthorizer, GlobusAuthorizer
from client import TransactionClient
from producer import KafkaProducer
from utils import load_access_control_policy

from settings.transaction import event_stream, stac_api

app = FastAPI(debug=True)
Expand All @@ -19,18 +17,24 @@ async def healthcheck():
return JSONResponse(
content={"healthcheck": True},
media_type="application/json",
status_code=200
status_code=200,
)

access_control_policy = load_access_control_policy(url=stac_api.get("access_control_policy"))

producer = KafkaProducer(config=event_stream.get("config"))
core_client = TransactionClient(producer=producer, acl=access_control_policy)
core_client = TransactionClient(producer=producer)

settings = ApiSettings(
api_title="STAC Transaction API",
api_version="0.1.0",
openapi_url="/openapi.json",
)

if stac_api.get("authorizer", "globus") == "globus":
Authorizer = GlobusAuthorizer
else:
Authorizer = EGIAuthorizer

app.add_middleware(Authorizer)
app.state.router_prefix = ""
transaction_extension = TransactionExtension(client=core_client, settings=settings)
Expand Down
94 changes: 82 additions & 12 deletions src/authorizer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import json

import httpx
from esgf_playground_utils.models.kafka import RequesterData
from fastapi import Request
from globus_sdk import ConfidentialAppAuthClient, AccessTokenAuthorizer, GroupsClient
from globus_sdk import AccessTokenAuthorizer, ConfidentialAppAuthClient, GroupsClient
from globus_sdk.scopes import GroupsScopes
from starlette.middleware.base import BaseHTTPMiddleware

import settings.transaction as settings

from models import Authorizer

confidential_client = ConfidentialAppAuthClient(
client_id=settings.stac_api.get("client_id"),
Expand All @@ -24,7 +26,7 @@
"""


class Authorizer(BaseHTTPMiddleware):
class GlobusAuthorizer(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
# Health check endpoint for AWS ALB target group
# Need to bypass authorization for this endpoint
Expand All @@ -35,31 +37,49 @@ async def dispatch(self, request: Request, call_next):

# Set API Gateway token validation correctly to avoid IndexError exception
access_token = authorization_header[7:]
response = confidential_client.oauth2_token_introspect(access_token, include="identity_set_detail")
response = confidential_client.oauth2_token_introspect(
access_token, include="identity_set_detail"
)
token_info = response.data

# resource_arn = event["methodArn"].split("/", 1)[0] + "/*"
resource_arn = request.headers.get("resource-arn", "*")

# Verify the access token
if not token_info.get("active", False):
policy = self.generate_policy("unknown", "Deny", resource_arn, token_info=token_info)
policy = self.generate_policy(
"unknown", "Deny", resource_arn, token_info=token_info
)

if settings.stac_api.get("client_id") not in token_info.get("aud", []):
policy = self.generate_policy(token_info.get("sub"), "Deny", resource_arn, token_info=token_info)
policy = self.generate_policy(
token_info.get("sub"), "Deny", resource_arn, token_info=token_info
)

if settings.stac_api.get("scope_string") != token_info.get("scope", ""):
policy = self.generate_policy(token_info.get("sub"), "Deny", resource_arn, token_info=token_info)
policy = self.generate_policy(
token_info.get("sub"), "Deny", resource_arn, token_info=token_info
)

if settings.stac_api.get("issuer") != token_info.get("iss", ""):
policy = self.generate_policy(token_info.get("sub"), "Deny", resource_arn, token_info=token_info)
policy = self.generate_policy(
token_info.get("sub"), "Deny", resource_arn, token_info=token_info
)

# Get the user's groups
groups = self.get_groups(access_token)
if not groups:
policy = self.generate_policy(token_info.get("sub"), "Deny", resource_arn, token_info=token_info)

policy = self.generate_policy(token_info.get("sub"), "Allow", resource_arn, token_info=token_info, groups=groups)
policy = self.generate_policy(
token_info.get("sub"), "Deny", resource_arn, token_info=token_info
)

policy = self.generate_policy(
token_info.get("sub"),
"Allow",
resource_arn,
token_info=token_info,
groups=groups,
)
request.state.authorizer = policy # This is awesome by the way :)
return await call_next(request)

Expand All @@ -72,7 +92,9 @@ def get_groups(self, token):
and if the a new request with the same bearer token
"""

tokens = confidential_client.oauth2_get_dependent_tokens(token, scope=GroupsScopes.view_my_groups_and_memberships)
tokens = confidential_client.oauth2_get_dependent_tokens(
token, scope=GroupsScopes.view_my_groups_and_memberships
)
groups_token = tokens.by_resource_server[GroupsClient.resource_server]
authorizer = AccessTokenAuthorizer(groups_token["access_token"])
groups_client = GroupsClient(authorizer=authorizer)
Expand Down Expand Up @@ -117,3 +139,51 @@ def generate_policy(self, user, effect, resource, token_info=None, groups=None):
# print(auth_response)

return auth_response


class EGIAuthorizer(BaseHTTPMiddleware):
"""
EGI Authorization middleware.
"""

async def dispatch(self, request: Request, call_next):
# Need to bypass authorization for this endpoint
if request.url.path == "/healthcheck":
return await call_next(request)

settings.logger.info("Request Headers %s", request.headers)

auth = httpx.BasicAuth(
username=settings.stac_api.get("client_id"),
password=settings.stac_api.get("client_secret"),
)

async with httpx.AsyncClient(timeout=5.0, verify=False) as client:
settings.logger.info(
"Post request to %s",
settings.stac_api.get("introspection_endpoint"),
)
response = await client.post(
settings.stac_api.get("introspection_endpoint"),
headers={"Content-type": "application/x-www-form-urlencoded"},
data=f"token={request.headers.get('authorization')[7:]}",
auth=auth,
timeout=5,
)
response.raise_for_status()

token_info = response.json()

authorizer = Authorizer(
requester_data=RequesterData(
client_id=settings.event_stream["config"].get("client.id"),
sub=token_info["sub"],
iss=token_info["iss"],
),
)

authorizer.add(token_info["eduperson_entitlement"])

request.state.authorizer = authorizer

return await call_next(request)
Loading