From 13681eb49905468aec6d66c53f4e7e551aace4d7 Mon Sep 17 00:00:00 2001 From: Kyle McCormick <kmccormick@edx.org> Date: Mon, 22 Jul 2019 13:15:23 -0400 Subject: [PATCH] Add --update option to create_dot_access (#21172) --- .../commands/create_dot_application.py | 126 +++++++++++------- .../tests/test_create_dot_application.py | 56 ++++++++ 2 files changed, 136 insertions(+), 46 deletions(-) diff --git a/openedx/core/djangoapps/oauth_dispatch/management/commands/create_dot_application.py b/openedx/core/djangoapps/oauth_dispatch/management/commands/create_dot_application.py index cea2ebb59b3..35f10abc88a 100644 --- a/openedx/core/djangoapps/oauth_dispatch/management/commands/create_dot_application.py +++ b/openedx/core/djangoapps/oauth_dispatch/management/commands/create_dot_application.py @@ -68,72 +68,106 @@ class Command(BaseCommand): dest='scopes', default='', help='Comma-separated list of scopes that this application will be allowed to request.') + parser.add_argument('--update', + action='store_true', + dest='update', + help='If application and/or access already exist, update values.') + + def _create_application(self, user, app_name, application_kwargs): + """ + Create new application with given User, name, and option values. + """ + application = Application.objects.create( + user=user, name=app_name, **application_kwargs + ) + logger.info('Created {} application with id: {}, client_id: {}, and client_secret: {}'.format( + app_name, + application.id, + application.client_id, + application.client_secret, + )) + return application + + def _update_application(self, application, application_kwargs): + """ + Update given application with option values. + """ + for key, value in application_kwargs.items(): + setattr(application, key, value) + application.save() + logger.info('Updated {} application with id: {}, client_id: {}, and client_secret: {}'.format( + application.name, + application.id, + application.client_id, + application.client_secret, + )) - def _create_application_access(self, application, scopes): + def _create_or_update_access(self, application, scopes, update): """ - If scopes are supplied, creates an oauth_dispatch ApplicationAccess for the provided - scopes and DOT application. + Create application access with specified scopes. + + If application access already exists, then: + * Update with specified scopes if update=True, + * Otherwise do nothing. """ - if not scopes: - return + access = ApplicationAccess.objects.filter(application_id=application.id).first() - if ApplicationAccess.objects.filter(application_id=application.id).exists(): + if access and update: + access.scopes = scopes + access.save() + logger.info('Updated application access for {} with scopes: {}'.format( + application.name, + scopes, + )) + elif access: logger.info('Application access for application {} already exists.'.format( application.name, )) - return - - application_access = ApplicationAccess.objects.create( - application_id=application.id, - scopes=scopes, - ) - application_access.save() - logger.info('Created application access for {} with scopes: {}'.format( - application.name, - application_access.scopes, - )) + else: + application_access = ApplicationAccess.objects.create( + application_id=application.id, + scopes=scopes, + ) + logger.info('Created application access for {} with scopes: {}'.format( + application.name, + application_access.scopes, + )) def handle(self, *args, **options): - app_name = options['name'] username = options['username'] - grant_type = options['grant_type'] + user = User.objects.get(username=username) + app_name = options['name'] + update = options['update'] + redirect_uris = options['redirect_uris'] - skip_authorization = options['skip_authorization'] client_type = Application.CLIENT_PUBLIC if options['public'] else Application.CLIENT_CONFIDENTIAL + grant_type = options['grant_type'] + skip_authorization = options['skip_authorization'] client_id = options['client_id'] client_secret = options['client_secret'] - scopes = options['scopes'] - - user = User.objects.get(username=username) - if Application.objects.filter(user=user, name=app_name).exists(): - logger.info('Application with name {} and user {} already exists.'.format( - app_name, - username - )) - application = Application.objects.get(user=user, name=app_name) - self._create_application_access(application, scopes) - return - - create_kwargs = dict( - name=app_name, - user=user, + application_kwargs = dict( redirect_uris=redirect_uris, client_type=client_type, authorization_grant_type=grant_type, skip_authorization=skip_authorization ) if client_id: - create_kwargs['client_id'] = client_id + application_kwargs['client_id'] = client_id if client_secret: - create_kwargs['client_secret'] = client_secret + application_kwargs['client_secret'] = client_secret - application = Application.objects.create(**create_kwargs) - application.save() - logger.info('Created {} application with id: {}, client_id: {}, and client_secret: {}'.format( - app_name, - application.id, - application.client_id, - application.client_secret - )) - self._create_application_access(application, scopes) + application = Application.objects.filter(user=user, name=app_name).first() + if application and update: + self._update_application(application, application_kwargs) + elif application: + logger.info('Application with name {} and user {} already exists.'.format( + app_name, + username + )) + else: + application = self._create_application(user, app_name, application_kwargs) + + scopes = options['scopes'] + if scopes: + self._create_or_update_access(application, scopes, update) diff --git a/openedx/core/djangoapps/oauth_dispatch/management/commands/tests/test_create_dot_application.py b/openedx/core/djangoapps/oauth_dispatch/management/commands/tests/test_create_dot_application.py index 5e886d3c08b..296319dd4a2 100644 --- a/openedx/core/djangoapps/oauth_dispatch/management/commands/tests/test_create_dot_application.py +++ b/openedx/core/djangoapps/oauth_dispatch/management/commands/tests/test_create_dot_application.py @@ -29,6 +29,62 @@ class TestCreateDotApplication(TestCase): super(TestCreateDotApplication, self).tearDown() Application.objects.filter(user=self.user).delete() + def test_update_dot_application(self): + APP_NAME = "update_test_application" + URI_OLD = "https://example.com/old" + URI_NEW = "https://example.com/new" + SCOPES_X = ["email", "profile", "user_id"] + SCOPES_Y = ["email", "profile"] + base_call_args = [ + APP_NAME, + self.user.username, + "--update", + "--grant-type", + Application.GRANT_CLIENT_CREDENTIALS, + "--public", + "--redirect-uris", + ] + + # Make sure we can create Application with --update + call_args = base_call_args + [URI_OLD] + call_command(Command(), *call_args) + app = Application.objects.get(name=APP_NAME) + self.assertEqual(app.redirect_uris, URI_OLD) + with self.assertRaises(ApplicationAccess.DoesNotExist): + ApplicationAccess.objects.get(application_id=app.id) + + # Make sure we can call again with no changes + call_args = base_call_args + [URI_OLD] + call_command(Command(), *call_args) + app = Application.objects.get(name=APP_NAME) + self.assertEqual(app.redirect_uris, URI_OLD) + with self.assertRaises(ApplicationAccess.DoesNotExist): + ApplicationAccess.objects.get(application_id=app.id) + + # Make sure calling with new URI changes URI, but does not add access + call_args = base_call_args + [URI_NEW] + call_command(Command(), *call_args) + app = Application.objects.get(name=APP_NAME) + self.assertEqual(app.redirect_uris, URI_NEW) + with self.assertRaises(ApplicationAccess.DoesNotExist): + ApplicationAccess.objects.get(application_id=app.id) + + # Make sure calling with scopes adds access + call_args = base_call_args + [URI_NEW, "--scopes", ",".join(SCOPES_X)] + call_command(Command(), *call_args) + app = Application.objects.get(name=APP_NAME) + self.assertEqual(app.redirect_uris, URI_NEW) + access = ApplicationAccess.objects.get(application_id=app.id) + self.assertEqual(access.scopes, SCOPES_X) + + # Make sure calling with new scopes changes them + call_args = base_call_args + [URI_NEW, "--scopes", ",".join(SCOPES_Y)] + call_command(Command(), *call_args) + app = Application.objects.get(name=APP_NAME) + self.assertEqual(app.redirect_uris, URI_NEW) + access = ApplicationAccess.objects.get(application_id=app.id) + self.assertEqual(access.scopes, SCOPES_Y) + @ddt.data( (None, None, None, None, False, None), (None, None, 'client-abc', None, False, None), -- GitLab