Skip to content

Commit

Permalink
BB2-3227: Add access grant expiry to access token responses (#1207)
Browse files Browse the repository at this point in the history
* Adds access grant expiry to access token responses.

* Changed field name for US english consistency.

* Added missing timestamp call

* Fixed dag.expiration_date cases.

* Added tests and made more readable. Fix bug RE: dag ABOUT to exist.

* Fix blank lines linting error

* Removed unused variable

* Loosen time test

* Collect created dag reliably

* More flexible test

* Fixed one time flake

* Use expires in instead of hardcoding
  • Loading branch information
loganbertram authored Jul 3, 2024
1 parent b2832e8 commit 6b80829
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 5 deletions.
58 changes: 58 additions & 0 deletions apps/dot_ext/tests/test_authorization.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import json
import base64
from time import strftime

import pytz
from datetime import datetime
from dateutil.relativedelta import relativedelta
Expand Down Expand Up @@ -520,6 +522,62 @@ def test_refresh_with_one_time_access_retrieve_app_from_auth_header(self):
)
self.assertEqual(response.status_code, 400)

@override_flag('limit_data_access', active=True)
def test_dag_expiration_exists(self):
assert flag_is_active('limit_data_access')
redirect_uri = 'http://localhost'

# create a user
user = self._create_user('anna', '123456')

# create an application and add capabilities
application = self._create_application(
'an app',
grant_type=Application.GRANT_AUTHORIZATION_CODE,
client_type=Application.CLIENT_CONFIDENTIAL,
redirect_uris=redirect_uri,
data_access_type="THIRTEEN_MONTH",
)
capability_a = self._create_capability('Capability A', [])
application.scope.add(capability_a)

# create a data access grant
expiration_date = datetime.now() + relativedelta(months=+13)
dag = DataAccessGrant(beneficiary=user, application=application, expiration_date=expiration_date)
dag.save()

# user logs in
request = HttpRequest()
self.client.login(request=request, username='anna', password='123456')

# post the authorization form with only one scope selected
payload = {
'client_id': application.client_id,
'response_type': 'code',
'redirect_uri': redirect_uri,
'scope': ['capability-a'],
'expires_in': 86400,
'allow': True,
}
response = self.client.post(reverse('oauth2_provider:authorize'), data=payload)
self.client.logout()

# now extract the authorization code and use it to request an access_token
query_dict = parse_qs(urlparse(response['Location']).query)
authorization_code = query_dict.pop('code')
token_request_data = {
'grant_type': 'authorization_code',
'code': authorization_code,
'redirect_uri': redirect_uri,
'client_id': application.client_id,
'client_secret': application.client_secret_plain,
}
c = Client()
response = c.post('/v1/o/token/', data=token_request_data)
tkn = response.json()
expiration_date_string = strftime('%Y-%m-%d %H:%M:%SZ', expiration_date.timetuple())
self.assertEqual(tkn["access_grant_expiration"][:-4], expiration_date_string[:-4])

def test_refresh_with_revoked_token(self):
redirect_uri = 'http://localhost'
# create a user
Expand Down
3 changes: 2 additions & 1 deletion apps/dot_ext/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ def validate_app_is_active(request):
Utility function to check that an application
is an active, valid application.
This method will pull the application from the
request and then check the active flag.
request and then check the active flag and the
data access grant (dag) validity.
RETURN:
application or None
"""
Expand Down
50 changes: 46 additions & 4 deletions apps/dot_ext/views/authorization.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import json
import logging
from datetime import datetime, timedelta
from time import strftime

import waffle
from waffle import get_waffle_flag_model

from django.http.response import HttpResponseBadRequest
from django.http.response import HttpResponse, HttpResponseBadRequest
from django.template.response import TemplateResponse
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.debug import sensitive_post_parameters
from oauth2_provider.exceptions import OAuthToolkitError
from oauth2_provider.views.base import app_authorized, get_access_token_model
from oauth2_provider.views.base import AuthorizationView as DotAuthorizationView
from oauth2_provider.views.base import TokenView as DotTokenView
from oauth2_provider.views.base import RevokeTokenView as DotRevokeTokenView
Expand Down Expand Up @@ -37,7 +42,7 @@
validate_app_is_active,
json_response_from_oauth2_error,
)

from ...authorization.models import DataAccessGrant

log = logging.getLogger(bb2logging.HHS_SERVER_LOGNAME_FMT.format(__name__))

Expand Down Expand Up @@ -292,11 +297,48 @@ class TokenView(DotTokenView):
@method_decorator(sensitive_post_parameters("password"))
def post(self, request, *args, **kwargs):
try:
validate_app_is_active(request)
app = validate_app_is_active(request)
except (InvalidClientError, InvalidGrantError) as error:
return json_response_from_oauth2_error(error)

return super().post(request, args, kwargs)
url, headers, body, status = self.create_token_response(request)

if status == 200:
body = json.loads(body)
access_token = body.get("access_token")

dag_expiry = ""
if access_token is not None:
token = get_access_token_model().objects.get(
token=access_token)
app_authorized.send(
sender=self, request=request,
token=token)

if app.data_access_type == "THIRTEEN_MONTH":
try:
dag = DataAccessGrant.objects.get(
beneficiary=token.user,
application=app
)
if dag.expiration_date is not None:
dag_expiry = strftime('%Y-%m-%d %H:%M:%SZ', dag.expiration_date.timetuple())
except DataAccessGrant.DoesNotExist:
dag_expiry = ""

elif app.data_access_type == "ONE_TIME":
expires_at = datetime.utcnow() + timedelta(seconds=body['expires_in'])
dag_expiry = expires_at.strftime('%Y-%m-%d %H:%M:%SZ')
elif app.data_access_type == "RESEARCH_STUDY":
dag_expiry = ""

body['access_grant_expiration'] = dag_expiry
body = json.dumps(body)

response = HttpResponse(content=body, status=status)
for k, v in headers.items():
response[k] = v
return response


@method_decorator(csrf_exempt, name="dispatch")
Expand Down

0 comments on commit 6b80829

Please sign in to comment.