-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Full Python 3 compatibility #61
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,12 +4,15 @@ | |
""" | ||
|
||
from __future__ import absolute_import | ||
|
||
import logging | ||
|
||
import oauth2 | ||
import oauth.oauth as oauth | ||
from xml.etree import ElementTree as etree | ||
|
||
from oauth2 import STRING_TYPES | ||
from six.moves.urllib.parse import urlparse, urlencode | ||
|
||
log = logging.getLogger('pylti.common') # pylint: disable=invalid-name | ||
|
||
LTI_PROPERTY_LIST = [ | ||
|
@@ -49,18 +52,17 @@ | |
LTI_REQUEST_TYPE = [u'any', u'initial', u'session'] | ||
|
||
|
||
class LTIOAuthDataStore(oauth.OAuthDataStore): | ||
# pylint: disable=abstract-method | ||
class LTIOAuthServer(oauth2.Server): | ||
""" | ||
Largely taken from reference implementation | ||
for app engine at https://code.google.com/p/ims-dev/ | ||
""" | ||
|
||
def __init__(self, consumers): | ||
def __init__(self, consumers, signature_methods=None): | ||
""" | ||
Create OAuth store | ||
Create OAuth server | ||
""" | ||
oauth.OAuthDataStore.__init__(self) | ||
super(LTIOAuthServer, self).__init__(signature_methods) | ||
self.consumers = consumers | ||
|
||
def lookup_consumer(self, key): | ||
|
@@ -82,7 +84,7 @@ def lookup_consumer(self, key): | |
log.critical(('Consumer %s, is missing secret' | ||
'in settings file, and needs correction.'), key) | ||
return None | ||
return oauth.OAuthConsumer(key, secret) | ||
return oauth2.Consumer(key, secret) | ||
|
||
def lookup_cert(self, key): | ||
""" | ||
|
@@ -101,15 +103,6 @@ def lookup_cert(self, key): | |
cert = consumer.get('cert', None) | ||
return cert | ||
|
||
def lookup_nonce(self, oauth_consumer, oauth_token, nonce): | ||
""" | ||
Lookup nonce should check if nonce was already used | ||
by this consumer in the past. | ||
Reusing nonce is bad: http://cwe.mitre.org/data/definitions/323.html | ||
Not implemented. | ||
""" | ||
return None | ||
|
||
|
||
class LTIException(Exception): | ||
""" | ||
|
@@ -155,11 +148,10 @@ def _post_patched_request(consumers, lti_key, body, | |
:return: response | ||
""" | ||
# pylint: disable=too-many-locals, too-many-arguments | ||
oauth_store = LTIOAuthDataStore(consumers) | ||
oauth_server = oauth.OAuthServer(oauth_store) | ||
oauth_server.add_signature_method(oauth.OAuthSignatureMethod_HMAC_SHA1()) | ||
lti_consumer = oauth_store.lookup_consumer(lti_key) | ||
lti_cert = oauth_store.lookup_cert(lti_key) | ||
oauth_server = LTIOAuthServer(consumers) | ||
oauth_server.add_signature_method(SignatureMethod_HMAC_SHA1_Unicode()) | ||
lti_consumer = oauth_server.lookup_consumer(lti_key) | ||
lti_cert = oauth_server.lookup_cert(lti_key) | ||
|
||
secret = lti_consumer.secret | ||
|
||
|
@@ -190,7 +182,7 @@ def my_normalize(self, headers): | |
response, content = client.request( | ||
url, | ||
method, | ||
body=body, | ||
body=body.encode('utf-8'), | ||
headers={'Content-Type': content_type}) | ||
|
||
http = httplib2.Http | ||
|
@@ -227,7 +219,7 @@ def post_message(consumers, lti_key, url, body): | |
content_type, | ||
) | ||
|
||
is_success = "<imsx_codeMajor>success</imsx_codeMajor>" in content | ||
is_success = b"<imsx_codeMajor>success</imsx_codeMajor>" in content | ||
log.debug("is success %s", is_success) | ||
return is_success | ||
|
||
|
@@ -277,18 +269,17 @@ def verify_request_common(consumers, url, method, headers, params): | |
log.debug("headers %s", headers) | ||
log.debug("params %s", params) | ||
|
||
oauth_store = LTIOAuthDataStore(consumers) | ||
oauth_server = oauth.OAuthServer(oauth_store) | ||
oauth_server = LTIOAuthServer(consumers) | ||
oauth_server.add_signature_method( | ||
oauth.OAuthSignatureMethod_PLAINTEXT()) | ||
SignatureMethod_PLAINTEXT_Unicode()) | ||
oauth_server.add_signature_method( | ||
oauth.OAuthSignatureMethod_HMAC_SHA1()) | ||
SignatureMethod_HMAC_SHA1_Unicode()) | ||
|
||
# Check header for SSL before selecting the url | ||
if headers.get('X-Forwarded-Proto', 'http') == 'https': | ||
url = url.replace('http:', 'https:', 1) | ||
|
||
oauth_request = oauth.OAuthRequest.from_request( | ||
oauth_request = Request_Fix_Duplicate.from_request( | ||
method, | ||
url, | ||
headers=dict(headers), | ||
|
@@ -301,9 +292,12 @@ def verify_request_common(consumers, url, method, headers, params): | |
'or request') | ||
try: | ||
# pylint: disable=protected-access | ||
consumer = oauth_server._get_consumer(oauth_request) | ||
oauth_server._check_signature(oauth_request, consumer, None) | ||
except oauth.OAuthError: | ||
oauth_consumer_key = oauth_request.get_parameter('oauth_consumer_key') | ||
consumer = oauth_server.lookup_consumer(oauth_consumer_key) | ||
if not consumer: | ||
raise oauth2.Error('Invalid consumer.') | ||
oauth_server.verify_request(oauth_request, consumer, None) | ||
except oauth2.Error: | ||
# Rethrow our own for nice error handling (don't print | ||
# error message as it will contain the key | ||
raise LTIException("OAuth error: Please check your key and secret") | ||
|
@@ -350,7 +344,110 @@ def generate_request_xml(message_identifier_id, operation, | |
text_string = etree.SubElement(result_score, 'textString') | ||
text_string.text = score.__str__() | ||
ret = "<?xml version='1.0' encoding='utf-8'?>\n{}".format( | ||
etree.tostring(root, encoding='utf-8')) | ||
etree.tostring(root, encoding='utf-8').decode('utf-8')) | ||
|
||
log.debug("XML Response: \n%s", ret) | ||
return ret | ||
|
||
|
||
class SignatureMethod_HMAC_SHA1_Unicode(oauth2.SignatureMethod_HMAC_SHA1): | ||
""" | ||
Temporary workaround for | ||
https://github.com/joestump/python-oauth2/issues/207 | ||
|
||
Original code is Copyright (c) 2007 Leah Culver, MIT license. | ||
""" | ||
|
||
def check(self, request, consumer, token, signature): | ||
""" | ||
Returns whether the given signature is the correct signature for | ||
the given consumer and token signing the given request. | ||
""" | ||
built = self.sign(request, consumer, token) | ||
if isinstance(signature, STRING_TYPES): | ||
signature = signature.encode("utf8") | ||
return built == signature | ||
|
||
|
||
class SignatureMethod_PLAINTEXT_Unicode(oauth2.SignatureMethod_PLAINTEXT): | ||
""" | ||
Temporary workaround for | ||
https://github.com/joestump/python-oauth2/issues/207 | ||
|
||
Original code is Copyright (c) 2007 Leah Culver, MIT license. | ||
""" | ||
|
||
def check(self, request, consumer, token, signature): | ||
""" | ||
Returns whether the given signature is the correct signature for | ||
the given consumer and token signing the given request. | ||
""" | ||
built = self.sign(request, consumer, token) | ||
if isinstance(signature, STRING_TYPES): | ||
signature = signature.encode("utf8") | ||
return built == signature | ||
|
||
|
||
class Request_Fix_Duplicate(oauth2.Request): | ||
""" | ||
Temporary workaround for | ||
https://github.com/joestump/python-oauth2/pull/197 | ||
|
||
Original code is Copyright (c) 2007 Leah Culver, MIT license. | ||
""" | ||
|
||
def get_normalized_parameters(self): | ||
""" | ||
Return a string that contains the parameters that must be signed. | ||
""" | ||
items = [] | ||
for key, value in self.items(): | ||
if key == 'oauth_signature': | ||
continue | ||
# 1.0a/9.1.1 states that kvp must be sorted by key, then by value, | ||
# so we unpack sequence values into multiple items for sorting. | ||
if isinstance(value, STRING_TYPES): | ||
items.append( | ||
(oauth2.to_utf8_if_string(key), oauth2.to_utf8(value)) | ||
) | ||
else: | ||
try: | ||
value = list(value) | ||
except TypeError as e: | ||
assert 'is not iterable' in str(e) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do you need this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code is a slight adaptation of the method with the same name in python-oauth2. It is in fact the function patched with joestump/python-oauth2#197. The assert is part of the original code. I forgot to add the specific copyright notice; I will do it with the remaining corrections :-) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, but I still think that line is unnecessary here. |
||
items.append( | ||
(oauth2.to_utf8_if_string(key), | ||
oauth2.to_utf8_if_string(value)) | ||
) | ||
else: | ||
items.extend( | ||
(oauth2.to_utf8_if_string(key), | ||
oauth2.to_utf8_if_string(item)) | ||
for item in value | ||
) | ||
|
||
# Include any query string parameters from the provided URL | ||
query = urlparse(self.url)[4] | ||
url_items = self._split_url_string(query).items() | ||
url_items = [ | ||
(oauth2.to_utf8(k), oauth2.to_utf8_optional_iterator(v)) | ||
for k, v in url_items if k != 'oauth_signature' | ||
] | ||
|
||
# Merge together URL and POST parameters. | ||
# Eliminates parameters duplicated between URL and POST. | ||
items_dict = {} | ||
for k, v in items: | ||
items_dict.setdefault(k, []).append(v) | ||
for k, v in url_items: | ||
if not (k in items_dict and v in items_dict[k]): | ||
items.append((k, v)) | ||
|
||
items.sort() | ||
|
||
encoded_str = urlencode(items, True) | ||
# Encode signature parameters per Oauth Core 1.0 protocol | ||
# spec draft 7, section 3.6 | ||
# (http://tools.ietf.org/html/draft-hammer-oauth-07#section-3.6) | ||
# Spaces must be encoded with "%20" instead of "+" | ||
return encoded_str.replace('+', '%20').replace('%7E', '~') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please modify the docstring accordingly without removing it