Skip to content
Snippets Groups Projects
Unverified Commit ef7c3c39 authored by Chris Pappas's avatar Chris Pappas Committed by GitHub
Browse files

Implementing support for multiple credit provider keys (#25036)

* Implementing support for multiple credit provider keys

* Plan on storing keys as list and not dict

* Adding tests for serializer and signature code

* adding more tests

* tweaking a comment

* Quality fixes

* Breaking out some login into a helper function
parent 2aadaefe
Branches
Tags release-2020-10-02-15.01
No related merge requests found
......@@ -112,6 +112,28 @@ def get_credit_provider_info(request, provider_id): # pylint: disable=unused-ar
return JsonResponse(credit_provider_data)
def check_keys_exist(shared_secret_key, provider_id):
"""
Verify that a key is available for single or multiple key support scenarios.
Raise CreditProviderNotConfigured if no key available.
"""
# Accounts for old way of storing provider key
if shared_secret_key is None:
msg = u'Credit provider with ID "{provider_id}" does not have a secret key configured.'.format(
provider_id=provider_id
)
log.error(msg)
raise CreditProviderNotConfigured(msg)
# Accounts for new way of storing provider key
elif isinstance(shared_secret_key, list) and not any(shared_secret_key):
msg = 'Could not retrieve secret key for credit provider [{}]. ' \
'Unable to validate requests from provider.'.format(provider_id)
log.error(msg)
raise CreditProviderNotConfigured(msg)
@transaction.atomic
def create_credit_request(course_key, provider_id, username):
"""
......@@ -212,12 +234,12 @@ def create_credit_request(course_key, provider_id, username):
# That way, if there's a misconfiguration, we won't have requests
# in our system that we know weren't sent to the provider.
shared_secret_key = get_shared_secret_key(credit_provider.provider_id)
if shared_secret_key is None:
msg = u'Credit provider with ID "{provider_id}" does not have a secret key configured.'.format(
provider_id=credit_provider.provider_id
)
log.error(msg)
raise CreditProviderNotConfigured(msg)
check_keys_exist(shared_secret_key, credit_provider.provider_id)
if isinstance(shared_secret_key, list):
# if keys exist, and keys are stored as a list
# then we know at least 1 is available for [0]
shared_secret_key = [key for key in shared_secret_key if key][0]
# Initiate a new request if one has not already been created
credit_request, created = CreditRequest.objects.get_or_create(
......
......@@ -87,20 +87,62 @@ class CreditProviderCallbackSerializer(serializers.Serializer): # pylint:disabl
return value
def validate_signature(self, value):
""" Validate the signature and ensure the provider is setup properly. """
provider_id = self.provider.provider_id
secret_key = get_shared_secret_key(provider_id)
def _check_keys_exist_for_provider(self, secret_key, provider_id):
"""
Verify there are keys available in the secret to
verify signature against.
Throw error if none are available.
"""
# Accounts for old way of storing provider key
if secret_key is None:
msg = 'Could not retrieve secret key for credit provider [{}]. ' \
'Unable to validate requests from provider.'.format(provider_id)
log.error(msg)
raise PermissionDenied(msg)
# Accounts for new way of storing provider key
# We need at least 1 key here that we can use to validate the signature
if isinstance(secret_key, list) and not any(secret_key):
msg = 'Could not retrieve secret key for credit provider [{}]. ' \
'Unable to validate requests from provider.'.format(provider_id)
log.error(msg)
raise PermissionDenied(msg)
def _compare_signatures(self, secret_key, provider_id):
"""
Compare signature we received with the signature we expect/have.
Throw an error if they don't match.
"""
data = self.initial_data
actual_signature = data["signature"]
if signature(data, secret_key) != actual_signature:
# Accounts for old way of storing provider key
if isinstance(secret_key, six.text_type) and signature(data, secret_key) != actual_signature:
msg = 'Request from credit provider [{}] had an invalid signature.'.format(provider_id)
raise PermissionDenied(msg)
# Accounts for new way of storing provider key
if isinstance(secret_key, list):
# Received value just needs to match one of the keys we have
key_match = False
for secretvalue in secret_key:
if signature(data, secretvalue) == actual_signature:
key_match = True
if not key_match:
msg = 'Request from credit provider [{}] had an invalid signature.'.format(provider_id)
raise PermissionDenied(msg)
def validate_signature(self, value):
""" Validate the signature and ensure the provider is setup properly. """
provider_id = self.provider.provider_id
secret_key = get_shared_secret_key(provider_id)
self._check_keys_exist_for_provider(secret_key, provider_id)
self._compare_signatures(secret_key, provider_id)
return value
......@@ -27,18 +27,41 @@ from django.conf import settings
log = logging.getLogger(__name__)
def _encode_secret(secret, provider_id):
"""
Helper function for encoding text_type secrets into ascii.
"""
try:
secret.encode('ascii')
except UnicodeEncodeError:
secret = None
log.error(u'Shared secret key for credit provider "%s" contains non-ASCII unicode.', provider_id)
return secret
def get_shared_secret_key(provider_id):
"""
Retrieve the shared secret key for a particular credit provider.
Retrieve the shared secret for a particular credit provider.
It is possible for the secret to be stored in 2 ways:
1 - a key/value pair of provider_id and secret string
{'cool_school': '123abc'}
2 - a key/value pair of provider_id and secret list
{'cool_school': ['987zyx', '123abc']}
"""
secret = getattr(settings, "CREDIT_PROVIDER_SECRET_KEYS", {}).get(provider_id)
# When secret is just characters
if isinstance(secret, six.text_type):
try:
secret.encode('ascii')
except UnicodeEncodeError:
secret = None
log.error(u'Shared secret key for credit provider "%s" contains non-ASCII unicode.', provider_id)
secret = _encode_secret(secret, provider_id)
# When secret is a list containing multiple keys, encode all of them
elif isinstance(secret, list):
for index, secretvalue in enumerate(secret):
if isinstance(secretvalue, six.text_type):
secret[index] = _encode_secret(secretvalue, provider_id)
return secret
......
......@@ -46,12 +46,13 @@ from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory
TEST_CREDIT_PROVIDER_SECRET_KEY = "931433d583c84ca7ba41784bad3232e6"
TEST_CREDIT_PROVIDER_SECRET_KEY_TWO = "abcf433d583c8baebae1784bad3232e6"
TEST_ECOMMERCE_WORKER = 'test_worker'
@override_settings(CREDIT_PROVIDER_SECRET_KEYS={
"hogwarts": TEST_CREDIT_PROVIDER_SECRET_KEY,
"ASU": TEST_CREDIT_PROVIDER_SECRET_KEY,
"ASU": [TEST_CREDIT_PROVIDER_SECRET_KEY_TWO, TEST_CREDIT_PROVIDER_SECRET_KEY],
"MIT": TEST_CREDIT_PROVIDER_SECRET_KEY
})
class CreditApiTestBase(ModuleStoreTestCase):
......
......@@ -3,8 +3,10 @@
import six
from django.test import TestCase
from django.test.utils import override_settings
from rest_framework.exceptions import PermissionDenied
from openedx.core.djangoapps.credit import serializers
from openedx.core.djangoapps.credit import serializers, signature
from openedx.core.djangoapps.credit.tests.factories import CreditEligibilityFactory, CreditProviderFactory
from student.tests.factories import UserFactory
......@@ -43,3 +45,84 @@ class CreditEligibilitySerializerTests(TestCase):
'username': user.username,
}
self.assertDictEqual(serializer.data, expected)
class CreditProviderCallbackSerializerTests(TestCase):
""" CreditProviderCallbackSerializer tests. """
def test_check_keys_exist_for_provider_string(self):
""" Verify _check_keys_exist_for_provider errors if key is None """
secret_key = None
provider_id = 'asu'
serializer = serializers.CreditProviderCallbackSerializer()
with self.assertRaises(PermissionDenied):
serializer._check_keys_exist_for_provider(secret_key, provider_id)
def test_check_keys_exist_for_provider_list_no_keys(self):
"""
Verify _check_keys_exist_for_provider errors if no keys in list
are truthy. (This accounts for there being 2 keys present both
of which are None due to ascii encode issues)
"""
secret_key = [None, None]
provider_id = 'asu'
serializer = serializers.CreditProviderCallbackSerializer()
with self.assertRaises(PermissionDenied):
serializer._check_keys_exist_for_provider(secret_key, provider_id)
def test_check_keys_exist_for_provider_list_with_key_present(self):
"""
Verify _check_keys_exist_for_provider does not error when at least
1 key in config is a valid key
"""
secret_key = [None, 'abc1234', None]
provider_id = 'asu'
serializer = serializers.CreditProviderCallbackSerializer()
result = serializer._check_keys_exist_for_provider(secret_key, provider_id)
# No return value, so we expect successful execution to return None
assert result is None
def test_compare_signatures_string_key(self):
""" Verify compare_signature errors if string key does not match. """
provider = CreditProviderFactory(
provider_id='asu',
active=False,
)
# Create a serializer that has a signature which was created with a key
# that we do not have in our system.
sig = signature.signature({}, 'iamthewrongkey')
serializer = serializers.CreditProviderCallbackSerializer(
data={'signature': sig}
)
with self.assertRaises(PermissionDenied):
# The first arg here is key we have (that doesn't match the sig)
serializer._compare_signatures('abcd1234', provider.provider_id)
def test_compare_signatures_list_key(self):
"""
Verify compare_signature errors if no keys that are stored in the list
in config match the key handed in the signature.
"""
provider = CreditProviderFactory(
provider_id='asu',
active=False,
)
sig = signature.signature({}, 'iamthewrongkey')
serializer = serializers.CreditProviderCallbackSerializer(
data={'signature': sig}
)
with self.assertRaises(PermissionDenied):
# The first arg here is the list of keys he have (that dont matcht the sig)
serializer._compare_signatures(
['abcd1234', 'xyz789'],
provider.provider_id
)
......@@ -42,3 +42,37 @@ class SignatureTest(TestCase):
key = signature.get_shared_secret_key("asu")
sig = signature.signature({'name': u'Ed Xavíer'}, key)
self.assertEqual(sig, "76b6c9a657000829253d7c23977b35b34ad750c5681b524d7fdfb25cd5273cec")
@override_settings(CREDIT_PROVIDER_SECRET_KEYS={
"asu": 'abcd1234',
})
def test_get_shared_secret_key_string(self):
"""
get_shared_secret_key should return ascii encoded string if provider
secret is stored as a single key.
"""
key = signature.get_shared_secret_key("asu")
self.assertEqual(key, 'abcd1234')
@override_settings(CREDIT_PROVIDER_SECRET_KEYS={
"asu": ['abcd1234', 'zyxw9876']
})
def test_get_shared_secret_key_string_multiple_keys(self):
"""
get_shared_secret_key should return ascii encoded strings if provider
secret is stored as a list for multiple key support.
"""
key = signature.get_shared_secret_key("asu")
self.assertEqual(key, ['abcd1234', 'zyxw9876'])
@override_settings(CREDIT_PROVIDER_SECRET_KEYS={
"asu": [u'\u4567', u'zyxw9876']
})
def test_get_shared_secret_key_string_multiple_keys_with_none(self):
"""
get_shared_secret_key should return ascii encoded string if provider
secret is stored as a list for multiple key support, replacing None
for unencodable strings.
"""
key = signature.get_shared_secret_key("asu")
self.assertEqual(key, [None, 'zyxw9876'])
......@@ -367,36 +367,44 @@ class CreditProviderRequestCreateViewTests(ApiTestCaseMixin, UserMixin, TestCase
)
secret_key = 'secret'
with override_settings(CREDIT_PROVIDER_SECRET_KEYS={self.provider.provider_id: secret_key}):
response = self.post_credit_request(username, course_key)
self.assertEqual(response.status_code, 200)
# Provider keys can be stored as a string or list of strings
secret_key_with_key_as_string = {self.provider.provider_id: secret_key}
# The None represents a key that was not ascii encodable
secret_key_with_key_as_list = {
self.provider.provider_id: [secret_key, None]
}
# Check that the user's request status is pending
request = CreditRequest.objects.get(username=username, course__course_key=course_key)
self.assertEqual(request.status, 'pending')
# Check request parameters
content = json.loads(response.content.decode('utf-8'))
parameters = content['parameters']
self.assertEqual(content['url'], self.provider.provider_url)
self.assertEqual(content['method'], 'POST')
self.assertEqual(len(parameters['request_uuid']), 32)
self.assertEqual(parameters['course_org'], course_key.org)
self.assertEqual(parameters['course_num'], course_key.course)
self.assertEqual(parameters['course_run'], course_key.run)
self.assertEqual(parameters['final_grade'], six.text_type(final_grade))
self.assertEqual(parameters['user_username'], username)
self.assertEqual(parameters['user_full_name'], self.user.get_full_name())
self.assertEqual(parameters['user_mailing_address'], '')
self.assertEqual(parameters['user_country'], '')
# The signature is going to change each test run because the request
# is assigned a different UUID each time.
# For this reason, we use the signature function directly
# (the "signature" parameter will be ignored when calculating the signature).
# Other unit tests verify that the signature function is working correctly.
self.assertEqual(parameters['signature'], signature(parameters, secret_key))
for secret_key_dict in [secret_key_with_key_as_string, secret_key_with_key_as_list]:
with override_settings(CREDIT_PROVIDER_SECRET_KEYS=secret_key_dict):
response = self.post_credit_request(username, course_key)
self.assertEqual(response.status_code, 200)
# Check that the user's request status is pending
request = CreditRequest.objects.get(username=username, course__course_key=course_key)
self.assertEqual(request.status, 'pending')
# Check request parameters
content = json.loads(response.content.decode('utf-8'))
parameters = content['parameters']
self.assertEqual(content['url'], self.provider.provider_url)
self.assertEqual(content['method'], 'POST')
self.assertEqual(len(parameters['request_uuid']), 32)
self.assertEqual(parameters['course_org'], course_key.org)
self.assertEqual(parameters['course_num'], course_key.course)
self.assertEqual(parameters['course_run'], course_key.run)
self.assertEqual(parameters['final_grade'], six.text_type(final_grade))
self.assertEqual(parameters['user_username'], username)
self.assertEqual(parameters['user_full_name'], self.user.get_full_name())
self.assertEqual(parameters['user_mailing_address'], '')
self.assertEqual(parameters['user_country'], '')
# The signature is going to change each test run because the request
# is assigned a different UUID each time.
# For this reason, we use the signature function directly
# (the "signature" parameter will be ignored when calculating the signature).
# Other unit tests verify that the signature function is working correctly.
self.assertEqual(parameters['signature'], signature(parameters, secret_key))
def test_post_invalid_provider(self):
""" Verify the endpoint returns HTTP 404 if the credit provider is not valid. """
......@@ -465,6 +473,21 @@ class CreditProviderRequestCreateViewTests(ApiTestCaseMixin, UserMixin, TestCase
response = self.post_credit_request(self.user.username, self.eligibility.course.course_key)
self.assertEqual(response.status_code, 400)
def test_post_secret_key_not_set_key_as_list(self):
""" Verify the endpoint returns HTTP 400 if we attempt to create a
request for a provider with no secret key set with keys set as list. """
# Enable provider integration
self.provider.enable_integration = True
self.provider.save()
# Cannot initiate a request because we cannot sign it
secret_key_with_key_as_list = {
self.provider.provider_id: []
}
with override_settings(CREDIT_PROVIDER_SECRET_KEYS=secret_key_with_key_as_list):
response = self.post_credit_request(self.user.username, self.eligibility.course.course_key)
self.assertEqual(response.status_code, 400)
@ddt.ddt
@skip_unless_lms
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment