Skip to content
Snippets Groups Projects
Commit 36697dab authored by muhammad-ammar's avatar muhammad-ammar
Browse files

If user belongs to `edx.org` and does not exist in `AllowedAuthEdxUser`

then user must login through `edx.org` Google account

ENT-2461
parent 1abad23d
No related branches found
No related tags found
No related merge requests found
"""
Waffle flags and switches for user authn.
"""
from __future__ import absolute_import
from openedx.core.djangoapps.waffle_utils import WaffleSwitchNamespace
WAFFLE_NAMESPACE = u'user_authn'
# If this switch is enabled then users must be sign in using their allowed domain SSO account
ENABLE_LOGIN_USING_THIRDPARTY_AUTH_ONLY = 'enable_login_using_thirdparty_auth_only'
waffle = WaffleSwitchNamespace(name=WAFFLE_NAMESPACE, log_prefix=u'UserAuthN: ')
......@@ -33,9 +33,13 @@ from openedx.core.djangoapps.user_authn.cookies import refresh_jwt_cookies, set_
from openedx.core.djangoapps.user_authn.exceptions import AuthFailedError
from openedx.core.djangoapps.util.user_messages import PageLevelMessages
from openedx.core.djangoapps.user_authn.views.password_reset import send_password_reset_email_for_user
from openedx.core.djangoapps.user_authn.config.waffle import (
ENABLE_LOGIN_USING_THIRDPARTY_AUTH_ONLY,
waffle as authn_waffle
)
from openedx.core.djangolib.markup import HTML, Text
from openedx.core.lib.api.view_utils import require_post_params
from student.models import LoginFailures
from student.models import LoginFailures, AllowedAuthUser
from student.views import send_reactivation_email_for_user
from third_party_auth import pipeline, provider
import third_party_auth
......@@ -186,6 +190,8 @@ def _authenticate_first_party(request, unauthenticated_user):
# to fail and we can take advantage of the ratelimited backend
username = unauthenticated_user.username if unauthenticated_user else ""
_check_user_auth_flow(request.site, unauthenticated_user)
try:
password = normalize_password(request.POST['password'])
return authenticate(
......@@ -272,6 +278,26 @@ def _track_user_login(user, request):
)
def _check_user_auth_flow(site, user):
"""
Check if user belongs to an allowed domain and not whitelisted
then ask user to login through allowed domain SSO provider.
"""
if user and authn_waffle.is_enabled(ENABLE_LOGIN_USING_THIRDPARTY_AUTH_ONLY):
allowed_domain = site.configuration.get_value('THIRD_PARTY_AUTH_ONLY_DOMAIN', '').lower()
user_domain = user.email.split('@')[1].strip().lower()
# If user belongs to allowed domain and not whitelisted then user must login through allowed domain SSO
if user_domain == allowed_domain and not AllowedAuthUser.objects.filter(site=site, email=user.email).exists():
msg = _(
u'As an {allowed_domain} user, You must login with your {allowed_domain} {provider} account.'
).format(
allowed_domain=allowed_domain,
provider=site.configuration.get_value('THIRD_PARTY_AUTH_ONLY_PROVIDER')
)
raise AuthFailedError(msg)
@login_required
@require_http_methods(['GET'])
def finish_auth(request): # pylint: disable=unused-argument
......
......@@ -29,16 +29,22 @@ from openedx.core.djangoapps.password_policy.compliance import (
from openedx.core.djangoapps.user_api.config.waffle import PREVENT_AUTH_USER_WRITES, waffle
from openedx.core.djangoapps.user_api.accounts import EMAIL_MIN_LENGTH, EMAIL_MAX_LENGTH
from openedx.core.djangoapps.user_authn.cookies import jwt_cookies
from openedx.core.djangoapps.user_authn.views.login import shim_student_view
from openedx.core.djangoapps.user_authn.views.login import (
shim_student_view,
AllowedAuthUser,
ENABLE_LOGIN_USING_THIRDPARTY_AUTH_ONLY,
authn_waffle
)
from openedx.core.djangoapps.user_authn.tests.utils import setup_login_oauth_client
from openedx.core.djangolib.testing.utils import CacheIsolationTestCase, skip_unless_lms
from openedx.core.djangoapps.site_configuration.tests.mixins import SiteMixin
from openedx.core.lib.api.test_utils import ApiTestCase
from student.tests.factories import RegistrationFactory, UserFactory, UserProfileFactory
from util.password_policy_validators import DEFAULT_MAX_PASSWORD_LENGTH
@ddt.ddt
class LoginTest(CacheIsolationTestCase):
class LoginTest(SiteMixin, CacheIsolationTestCase):
"""
Test login_user() view
"""
......@@ -53,9 +59,7 @@ class LoginTest(CacheIsolationTestCase):
def setUp(self):
"""Setup a test user along with its registration and profile"""
super(LoginTest, self).setUp()
self.user = UserFactory.build(username=self.username, email=self.user_email)
self.user.set_password(self.password)
self.user.save()
self.user = self._create_user(self.username, self.user_email)
RegistrationFactory(user=self.user)
UserProfileFactory(user=self.user)
......@@ -68,6 +72,12 @@ class LoginTest(CacheIsolationTestCase):
except NoReverseMatch:
self.url = reverse('login')
def _create_user(self, username, user_email):
user = UserFactory.build(username=username, email=user_email)
user.set_password(self.password)
user.save()
return user
def test_login_success(self):
response, mock_audit_log = self._login_response(
self.user_email, self.password, patched_audit_log='student.models.AUDIT_LOG'
......@@ -573,6 +583,83 @@ class LoginTest(CacheIsolationTestCase):
for log_string in log_strings:
self.assertNotIn(log_string, format_string)
@ddt.data(
{
'switch_enabled': False,
'whitelisted': False,
'allowed_domain': 'edx.org',
'user_domain': 'edx.org',
'success': True
},
{
'switch_enabled': False,
'whitelisted': True,
'allowed_domain': 'edx.org',
'user_domain': 'edx.org',
'success': True
},
{
'switch_enabled': True,
'whitelisted': False,
'allowed_domain': 'edx.org',
'user_domain': 'edx.org',
'success': False
},
{
'switch_enabled': True,
'whitelisted': False,
'allowed_domain': 'fake.org',
'user_domain': 'edx.org',
'success': True
},
{
'switch_enabled': True,
'whitelisted': True,
'allowed_domain': 'edx.org',
'user_domain': 'edx.org',
'success': True
},
{
'switch_enabled': True,
'whitelisted': False,
'allowed_domain': 'batman.gotham',
'user_domain': 'batman.gotham',
'success': False
},
)
@ddt.unpack
def test_login_for_user_auth_flow(self, switch_enabled, whitelisted, allowed_domain, user_domain, success):
"""
Verify that `login._check_user_auth_flow` works as expected.
"""
username = 'batman'
user_email = '{username}@{domain}'.format(username=username, domain=user_domain)
user = self._create_user(username, user_email)
provider = 'Google'
site = self.set_up_site(allowed_domain, {
'SITE_NAME': allowed_domain,
'THIRD_PARTY_AUTH_ONLY_DOMAIN': allowed_domain,
'THIRD_PARTY_AUTH_ONLY_PROVIDER': provider
})
if whitelisted:
AllowedAuthUser.objects.create(site=site, email=user.email)
else:
AllowedAuthUser.objects.filter(site=site, email=user.email).delete()
with authn_waffle.override(ENABLE_LOGIN_USING_THIRDPARTY_AUTH_ONLY, switch_enabled):
value = None if success else u'As an {0} user, You must login with your {0} {1} account.'.format(
allowed_domain,
provider
)
response, __ = self._login_response(user.email, self.password)
self._assert_response(
response,
success=success,
value=value,
)
@ddt.ddt
@skip_unless_lms
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment