From 3a45bab57c28084dfe46df1c4aad12a1623de51a Mon Sep 17 00:00:00 2001 From: Nimisha Asthagiri <nasthagiri@edx.org> Date: Thu, 28 Jun 2018 18:11:28 -0400 Subject: [PATCH] Enable OAuth Scopes for Certificates API --- .../certificates/apis/v0/tests/test_views.py | 311 ++++++++++++------ lms/djangoapps/certificates/apis/v0/views.py | 26 +- 2 files changed, 221 insertions(+), 116 deletions(-) diff --git a/lms/djangoapps/certificates/apis/v0/tests/test_views.py b/lms/djangoapps/certificates/apis/v0/tests/test_views.py index 38f82c51b8c..090dc48af39 100644 --- a/lms/djangoapps/certificates/apis/v0/tests/test_views.py +++ b/lms/djangoapps/certificates/apis/v0/tests/test_views.py @@ -1,7 +1,12 @@ """ Tests for the Certificate REST APIs. """ +# pylint: disable=missing-docstring from datetime import datetime, timedelta +from enum import Enum +from itertools import product +import ddt +from mock import patch from django.urls import reverse from django.utils import timezone @@ -10,9 +15,12 @@ from oauth2_provider import models as dot_models from rest_framework import status from rest_framework.test import APITestCase +from course_modes.models import CourseMode +from lms.djangoapps.certificates.apis.v0.views import CertificatesDetailView from lms.djangoapps.certificates.models import CertificateStatuses from lms.djangoapps.certificates.tests.factories import GeneratedCertificateFactory -from course_modes.models import CourseMode +from openedx.core.djangoapps.oauth_dispatch.toggles import ENFORCE_JWT_SCOPES +from openedx.core.lib.token_utils import JwtBuilder from student.tests.factories import UserFactory from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase from xmodule.modulestore.tests.factories import CourseFactory @@ -20,6 +28,17 @@ from xmodule.modulestore.tests.factories import CourseFactory USER_PASSWORD = 'test' +class AuthType(Enum): + session = 1 + oauth = 2 + jwt = 3 + jwt_restricted = 4 + + +JWT_AUTH_TYPES = [AuthType.jwt, AuthType.jwt_restricted] + + +@ddt.ddt class CertificatesRestApiTest(SharedModuleStoreTestCase, APITestCase): """ Test for the Certificates REST APIs @@ -58,7 +77,34 @@ class CertificatesRestApiTest(SharedModuleStoreTestCase, APITestCase): self.namespaced_url = 'certificates_api:v0:certificates:detail' - # create a configuration for django-oauth-toolkit (DOT) + def _assert_certificate_response(self, response): + self.assertEqual( + response.data, + { + 'username': self.student.username, + 'status': CertificateStatuses.downloadable, + 'is_passing': True, + 'grade': '0.88', + 'download_url': 'www.google.com', + 'certificate_type': CourseMode.VERIFIED, + 'course_id': unicode(self.course.id), + 'created_date': self.now, + } + ) + + def _get_url(self, username): + """ + Helper function to create the url for certificates + """ + return reverse( + self.namespaced_url, + kwargs={ + 'course_id': self.course.id, + 'username': username + } + ) + + def _create_oauth_token(self, user): dot_app_user = UserFactory.create(password=USER_PASSWORD) dot_app = dot_models.Application.objects.create( name='test app', @@ -67,128 +113,183 @@ class CertificatesRestApiTest(SharedModuleStoreTestCase, APITestCase): authorization_grant_type='authorization-code', redirect_uris='http://localhost:8079/complete/edxorg/' ) - self.dot_access_token = dot_models.AccessToken.objects.create( - user=self.student, + return dot_models.AccessToken.objects.create( + user=user, application=dot_app, expires=datetime.utcnow() + timedelta(weeks=1), scope='read write', - token='16MGyP3OaQYHmpT1lK7Q6MMNAZsjwF' + token='test_token', ) - def get_url(self, username): - """ - Helper function to create the url for certificates - """ - return reverse( - self.namespaced_url, - kwargs={ - 'course_id': self.course.id, - 'username': username - } + def _create_jwt_token(self, user, auth_type, scopes=None, include_org_filter=True, include_me_filter=False): + filters = [] + if include_org_filter: + filters += ['content_org:{}'.format(self.course.id.org)] + if include_me_filter: + filters += ['user:me'] + + if scopes is None: + scopes = CertificatesDetailView.required_scopes + + return JwtBuilder(user).build_token( + scopes, + additional_claims=dict( + is_restricted=(auth_type == AuthType.jwt_restricted), + filters=filters, + ), ) - def assert_oauth_status(self, access_token, expected_status): - """ - Helper method for requests with OAUTH token - """ - self.client.logout() - auth_header = "Bearer {0}".format(access_token) - response = self.client.get(self.get_url(self.student.username), HTTP_AUTHORIZATION=auth_header) - self.assertEqual(response.status_code, expected_status) + def _get_response(self, requesting_user, auth_type, url=None, token=None): + auth_header = None + if auth_type == AuthType.session: + self.client.login(username=requesting_user.username, password=USER_PASSWORD) + elif auth_type == AuthType.oauth: + if not token: + token = self._create_oauth_token(requesting_user) + auth_header = "Bearer {0}".format(token) + else: + assert auth_type in JWT_AUTH_TYPES + if not token: + token = self._create_jwt_token(requesting_user, auth_type) + auth_header = "JWT {0}".format(token) - def test_permissions(self): - """ - Test that only the owner of the certificate can access the url - """ - # anonymous user - resp = self.client.get(self.get_url(self.student.username)) + extra = dict(HTTP_AUTHORIZATION=auth_header) if auth_header else {} + return self.client.get( + url if url else self._get_url(self.student.username), + **extra + ) + + def _assert_in_log(self, text, mock_log_method): + self.assertTrue(mock_log_method.called) + self.assertIn(text, mock_log_method.call_args_list[0][0][0]) + + def test_anonymous_user(self): + resp = self.client.get(self._get_url(self.student.username)) self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED) - # another student - self.client.login(username=self.student_no_cert.username, password=USER_PASSWORD) - resp = self.client.get(self.get_url(self.student.username)) - # gets 404 instead of 403 for security reasons + @ddt.data(*list(AuthType)) + def test_no_certificate(self, auth_type): + resp = self._get_response( + self.student_no_cert, + auth_type, + url=self._get_url(self.student_no_cert.username), + ) self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND) - self.assertEqual(resp.data, {u'detail': u'Not found.'}) - self.client.logout() + self.assertIn('error_code', resp.data) + self.assertEqual( + resp.data['error_code'], + 'no_certificate_for_user', + ) - # same student of the certificate - self.client.login(username=self.student.username, password=USER_PASSWORD) - resp = self.client.get(self.get_url(self.student.username)) - self.assertEqual(resp.status_code, status.HTTP_200_OK) - self.client.logout() + @ddt.data(*product(list(AuthType), (True, False))) + @ddt.unpack + def test_self_user(self, auth_type, scopes_enforced): + with ENFORCE_JWT_SCOPES.override(active=scopes_enforced): + resp = self._get_response(self.student, auth_type) + self.assertEqual(resp.status_code, status.HTTP_200_OK) + self._assert_certificate_response(resp) - # staff user - self.client.login(username=self.staff_user.username, password=USER_PASSWORD) - resp = self.client.get(self.get_url(self.student.username)) - self.assertEqual(resp.status_code, status.HTTP_200_OK) + @ddt.data(*product(list(AuthType), (True, False))) + @ddt.unpack + def test_inactive_user(self, auth_type, scopes_enforced): + with ENFORCE_JWT_SCOPES.override(active=scopes_enforced): + self.student.is_active = False + self.student.save() - def test_inactive_user_access(self): - """ - Verify inactive users - those who have not verified their email addresses - - are allowed to access the endpoint. - """ - self.client.login(username=self.student.username, password=USER_PASSWORD) + resp = self._get_response(self.student, auth_type) + self.assertEqual(resp.status_code, status.HTTP_200_OK) - self.student.is_active = False - self.student.save() + @ddt.data(*product(list(AuthType), (True, False))) + @ddt.unpack + def test_staff_user(self, auth_type, scopes_enforced): + with ENFORCE_JWT_SCOPES.override(active=scopes_enforced): + resp = self._get_response(self.staff_user, auth_type) + self.assertEqual(resp.status_code, status.HTTP_200_OK) - resp = self.client.get(self.get_url(self.student.username)) - self.assertEqual(resp.status_code, status.HTTP_200_OK) + @patch('edx_rest_framework_extensions.permissions.log') + @ddt.data(*product(list(AuthType), (True, False))) + @ddt.unpack + def test_another_user(self, auth_type, scopes_enforced, mock_log): + """ Returns 403 for OAuth and Session auth with IsUserInUrl. """ + with ENFORCE_JWT_SCOPES.override(active=scopes_enforced): + resp = self._get_response(self.student_no_cert, auth_type) - def test_dot_valid_accesstoken(self): - """ - Verify access with a valid Django Oauth Toolkit access token. - """ - self.assert_oauth_status(self.dot_access_token, status.HTTP_200_OK) + # Restricted JWT tokens without the user:me filter have access to other users + expected_jwt_access_granted = scopes_enforced and auth_type == AuthType.jwt_restricted - def test_dot_invalid_accesstoken(self): - """ - Verify the endpoint is inaccessible for authorization - attempts made with an invalid OAuth access token. - """ - self.assert_oauth_status("fooooooooooToken", status.HTTP_401_UNAUTHORIZED) + self.assertEqual( + resp.status_code, + status.HTTP_200_OK if expected_jwt_access_granted else status.HTTP_403_FORBIDDEN, + ) + if not expected_jwt_access_granted: + self._assert_in_log("IsUserInUrl", mock_log.info) - def test_dot_expired_accesstoken(self): - """ - Verify the endpoint is inaccessible for authorization - attempts made with an expired OAuth access token. - """ - # set the expiration date in the past - self.dot_access_token.expires = datetime.utcnow() - timedelta(weeks=1) - self.dot_access_token.save() - self.assert_oauth_status(self.dot_access_token, status.HTTP_401_UNAUTHORIZED) + @patch('edx_rest_framework_extensions.permissions.log') + @ddt.data(*product(JWT_AUTH_TYPES, (True, False))) + @ddt.unpack + def test_jwt_no_scopes(self, auth_type, scopes_enforced, mock_log): + """ Returns 403 when scopes are enforced with JwtHasScope. """ + with ENFORCE_JWT_SCOPES.override(active=scopes_enforced): + jwt_token = self._create_jwt_token(self.student, auth_type, scopes=[]) + resp = self._get_response(self.student, AuthType.jwt, token=jwt_token) - def test_no_certificate_for_user(self): - """ - Test for case with no certificate available - """ - self.client.login(username=self.student_no_cert.username, password=USER_PASSWORD) - resp = self.client.get(self.get_url(self.student_no_cert.username)) - self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND) - self.assertIn('error_code', resp.data) - self.assertEqual( - resp.data['error_code'], - 'no_certificate_for_user' - ) + is_enforced = scopes_enforced and auth_type == AuthType.jwt_restricted + self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN if is_enforced else status.HTTP_200_OK) - def test_certificate_for_user(self): - """ - Tests case user that pulls her own certificate - """ - self.client.login(username=self.student.username, password=USER_PASSWORD) - resp = self.client.get(self.get_url(self.student.username)) + if is_enforced: + self._assert_in_log("JwtHasScope", mock_log.warning) + + @patch('edx_rest_framework_extensions.permissions.log') + @ddt.data(*product(JWT_AUTH_TYPES, (True, False))) + @ddt.unpack + def test_jwt_no_filter(self, auth_type, scopes_enforced, mock_log): + """ Returns 403 when scopes are enforced with JwtHasContentOrgFilterForRequestedCourse. """ + with ENFORCE_JWT_SCOPES.override(active=scopes_enforced): + jwt_token = self._create_jwt_token(self.student, auth_type, include_org_filter=False) + resp = self._get_response(self.student, AuthType.jwt, token=jwt_token) + + is_enforced = scopes_enforced and auth_type == AuthType.jwt_restricted + self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN if is_enforced else status.HTTP_200_OK) + + if is_enforced: + self._assert_in_log("JwtHasContentOrgFilterForRequestedCourse", mock_log.warning) + + @ddt.data(*product(JWT_AUTH_TYPES, (True, False))) + @ddt.unpack + def test_jwt_on_behalf_of_user(self, auth_type, scopes_enforced): + with ENFORCE_JWT_SCOPES.override(active=scopes_enforced): + jwt_token = self._create_jwt_token(self.student, auth_type, include_me_filter=True) + + resp = self._get_response(self.student, AuthType.jwt, token=jwt_token) + self.assertEqual(resp.status_code, status.HTTP_200_OK) + + @patch('edx_rest_framework_extensions.permissions.log') + @ddt.data(*product(JWT_AUTH_TYPES, (True, False))) + @ddt.unpack + def test_jwt_on_behalf_of_other_user(self, auth_type, scopes_enforced, mock_log): + """ Returns 403 when scopes are enforced with JwtHasUserFilterForRequestedUser. """ + with ENFORCE_JWT_SCOPES.override(active=scopes_enforced): + jwt_token = self._create_jwt_token(self.student_no_cert, auth_type, include_me_filter=True) + resp = self._get_response(self.student, AuthType.jwt, token=jwt_token) + + self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN) + + if scopes_enforced and auth_type == AuthType.jwt_restricted: + self._assert_in_log("JwtHasUserFilterForRequestedUser", mock_log.warning) + else: + self._assert_in_log("IsUserInUrl", mock_log.info) + + def test_valid_oauth_token(self): + resp = self._get_response(self.student, AuthType.oauth) self.assertEqual(resp.status_code, status.HTTP_200_OK) - self.assertEqual( - resp.data, - { - 'username': self.student.username, - 'status': CertificateStatuses.downloadable, - 'is_passing': True, - 'grade': '0.88', - 'download_url': 'www.google.com', - 'certificate_type': CourseMode.VERIFIED, - 'course_id': unicode(self.course.id), - 'created_date': self.now, - } - ) + + def test_invalid_oauth_token(self): + resp = self._get_response(self.student, AuthType.oauth, token="fooooooooooToken") + self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED) + + def test_expired_oauth_token(self): + token = self._create_oauth_token(self.student) + token.expires = datetime.utcnow() - timedelta(weeks=1) + token.save() + resp = self._get_response(self.student, AuthType.oauth, token=token) + self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED) diff --git a/lms/djangoapps/certificates/apis/v0/views.py b/lms/djangoapps/certificates/apis/v0/views.py index ad57f093d54..565f86244bc 100644 --- a/lms/djangoapps/certificates/apis/v0/views.py +++ b/lms/djangoapps/certificates/apis/v0/views.py @@ -1,15 +1,19 @@ """ API v0 views. """ import logging -from edx_rest_framework_extensions.authentication import JwtAuthentication -from opaque_keys import InvalidKeyError -from opaque_keys.edx.keys import CourseKey from rest_framework.generics import GenericAPIView -from rest_framework.permissions import IsAuthenticated from rest_framework.response import Response +from edx_rest_framework_extensions import permissions +from edx_rest_framework_extensions.authentication import JwtAuthentication from lms.djangoapps.certificates.api import get_certificate_for_user -from openedx.core.lib.api import authentication, permissions +from opaque_keys import InvalidKeyError +from opaque_keys.edx.keys import CourseKey +from openedx.core.lib.api.authentication import ( + OAuth2AuthenticationAllowInactiveUser, + SessionAuthenticationAllowInactiveUser +) + log = logging.getLogger(__name__) @@ -70,14 +74,14 @@ class CertificatesDetailView(GenericAPIView): """ authentication_classes = ( - authentication.OAuth2AuthenticationAllowInactiveUser, - authentication.SessionAuthenticationAllowInactiveUser, JwtAuthentication, + OAuth2AuthenticationAllowInactiveUser, + SessionAuthenticationAllowInactiveUser, ) - permission_classes = ( - IsAuthenticated, - permissions.IsUserInUrlOrStaff - ) + + permission_classes = (permissions.JWT_RESTRICTED_APPLICATION_OR_USER_ACCESS,) + + required_scopes = ['certificates:read'] def get(self, request, username, course_id): """ -- GitLab