Skip to content
Snippets Groups Projects
base.py 35.79 KiB
"""Base integration test for provider implementations."""

import re
import unittest

import json
import mock

from django import test
from django.contrib import auth
from django.contrib.auth import models as auth_models
from django.contrib.messages.storage import fallback
from django.contrib.sessions.backends import cache
from django.test import utils as django_utils
from django.conf import settings as django_settings
from social import actions, exceptions
from social.apps.django_app import utils as social_utils
from social.apps.django_app import views as social_views
from student import models as student_models
from student import views as student_views

from third_party_auth import middleware, pipeline
from third_party_auth import settings as auth_settings
from third_party_auth.tests import testutil


@unittest.skipUnless(
    testutil.AUTH_FEATURES_KEY in django_settings.FEATURES, testutil.AUTH_FEATURES_KEY + ' not in settings.FEATURES')
@django_utils.override_settings()  # For settings reversion on a method-by-method basis.
class IntegrationTest(testutil.TestCase, test.TestCase):
    """Abstract base class for provider integration tests."""

    # Configuration. You will need to override these values in your test cases.

    # Class. The third_party_auth.provider.BaseProvider child we are testing.
    PROVIDER_CLASS = None

    # Dict of string -> object. Settings that will be merged onto Django's
    # settings object before test execution. In most cases, this is
    # PROVIDER_CLASS.SETTINGS with test values.
    PROVIDER_SETTINGS = {}

    # Methods you must override in your children.

    def get_response_data(self):
        """Gets a dict of response data of the form given by the provider.

        To determine what the provider returns, drop into a debugger in your
        provider's do_auth implementation. Providers may merge different kinds
        of data (for example, data about the user and data about the user's
        credentials).
        """
        raise NotImplementedError

    def get_username(self):
        """Gets username based on response data from a provider.

        Each provider has different logic for username generation. Sadly,
        this is not extracted into its own method in python-social-auth, so we
        must provide a getter ourselves.

        Note that this is the *initial* value the framework will attempt to use.
        If it collides, the pipeline will generate a new username. We extract
        it here so we can force collisions in a polymorphic way.
        """
        raise NotImplementedError

    # Asserts you can optionally override and make more specific.

    def assert_redirect_to_provider_looks_correct(self, response):
        """Asserts the redirect to the provider's site looks correct.

        When we hit /auth/login/<provider>, we should be redirected to the
        provider's site. Here we check that we're redirected, but we don't know
        enough about the provider to check what we're redirected to. Child test
        implementations may optionally strengthen this assertion with, for
        example, more details about the format of the Location header.
        """
        self.assertEqual(302, response.status_code)
        self.assertTrue(response.has_header('Location'))

    def assert_register_response_in_pipeline_looks_correct(self, response, pipeline_kwargs):
        """Performs spot checks of the rendered register.html page.

        When we display the new account registration form after the user signs
        in with a third party, we prepopulate the form with values sent back
        from the provider. The exact set of values varies on a provider-by-
        provider basis and is generated by
        provider.BaseProvider.get_register_form_data. We provide some stock
        assertions based on the provider's implementation; if you want more
        assertions in your test, override this method.
        """
        self.assertEqual(200, response.status_code)
        # Check that the correct provider was selected.
        self.assertIn('successfully signed in with <strong>%s</strong>' % self.PROVIDER_CLASS.NAME, response.content)
        # Expect that each truthy value we've prepopulated the register form
        # with is actually present.
        for prepopulated_form_value in self.PROVIDER_CLASS.get_register_form_data(pipeline_kwargs).values():
            if prepopulated_form_value:
                self.assertIn(prepopulated_form_value, response.content)

    # Implementation details and actual tests past this point -- no more
    # configuration needed.

    def setUp(self):
        super(IntegrationTest, self).setUp()
        self.configure_runtime()
        self.backend_name = self.PROVIDER_CLASS.BACKEND_CLASS.name
        self.client = test.Client()
        self.request_factory = test.RequestFactory()

    def assert_dashboard_response_looks_correct(self, response, user, duplicate=False, linked=None):
        """Asserts the user's dashboard is in the expected state.

        We check unconditionally that the dashboard 200s and contains the
        user's info. If duplicate is True, we expect the duplicate account
        association error to be present. If linked is passed, we conditionally
        check the content and controls in the Account Links section of the
        sidebar.
        """
        duplicate_account_error_needle = '<section class="dashboard-banner third-party-auth">'
        assert_duplicate_presence_fn = self.assertIn if duplicate else self.assertNotIn

        self.assertEqual(200, response.status_code)
        self.assertIn(user.email, response.content)
        self.assertIn(user.username, response.content)
        assert_duplicate_presence_fn(duplicate_account_error_needle, response.content)

        if linked is not None:

            if linked:
                expected_control_text = pipeline.ProviderUserState(
                    self.PROVIDER_CLASS, user, False).get_unlink_form_name()
            else:
                expected_control_text = pipeline.get_login_url(self.PROVIDER_CLASS.NAME, pipeline.AUTH_ENTRY_DASHBOARD)

            icon_state = re.search(r'third-party-auth.+icon icon-(\w+)', response.content, re.DOTALL).groups()[0]
            provider_name = re.search(r'<span class="provider">([^<]+)', response.content, re.DOTALL).groups()[0]

            self.assertIn(expected_control_text, response.content)
            self.assertEqual('link' if linked else 'unlink', icon_state)
            self.assertEqual(self.PROVIDER_CLASS.NAME, provider_name)

    def assert_exception_redirect_looks_correct(self, auth_entry=None):
        """Tests middleware conditional redirection.

        middleware.ExceptionMiddleware makes sure the user ends up in the right
        place when they cancel authentication via the provider's UX.
        """
        exception_middleware = middleware.ExceptionMiddleware()
        request, _ = self.get_request_and_strategy(auth_entry=auth_entry)
        response = exception_middleware.process_exception(
            request, exceptions.AuthCanceled(request.social_strategy.backend))
        location = response.get('Location')

        self.assertEqual(302, response.status_code)
        self.assertIn('canceled', location)
        self.assertIn(self.backend_name, location)

        if auth_entry:
            # Custom redirection to form.
            self.assertTrue(location.startswith('/' + auth_entry))
        else:
            # Stock framework redirection to root.
            self.assertTrue(location.startswith('/?'))

    def assert_first_party_auth_trumps_third_party_auth(self, email=None, password=None, success=None):
        """Asserts first party auth was used in place of third party auth.

        Args:
            email: string. The user's email. If not None, will be set on POST.
            password: string. The user's password. If not None, will be set on
                POST.
            success: None or bool. Whether we expect auth to be successful. Set
                to None to indicate we expect the request to be invalid (meaning
                one of username or password will be missing).
        """
        _, strategy = self.get_request_and_strategy(
            auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete')
        strategy.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
        self.create_user_models_for_existing_account(
            strategy, email, password, self.get_username(), skip_social_auth=True)

        strategy.request.POST = dict(strategy.request.POST)

        if email:
            strategy.request.POST['email'] = email
        if password:
            strategy.request.POST['password'] = 'bad_' + password if success is False else password

        self.assert_pipeline_running(strategy.request)
        payload = json.loads(student_views.login_user(strategy.request).content)

        if success is None:
            # Request malformed -- just one of email/password given.
            self.assertFalse(payload.get('success'))
            self.assertIn('There was an error receiving your login information', payload.get('value'))
        elif success:
            # Request well-formed and credentials good.
            self.assertTrue(payload.get('success'))
        else:
            # Request well-formed but credentials bad.
            self.assertFalse(payload.get('success'))
            self.assertIn('incorrect', payload.get('value'))

    def assert_javascript_would_submit_login_form(self, boolean, response):
        """Asserts we pass form submit JS the right boolean string."""
        argument_string = re.search(
            r'function\ post_form_if_pipeline_running.*\(([a-z]+)\)', response.content, re.DOTALL).groups()[0]
        self.assertIn(argument_string, ['true', 'false'])
        self.assertEqual(boolean, True if argument_string == 'true' else False)

    def assert_json_failure_response_is_missing_social_auth(self, response):
        """Asserts failure on /login for missing social auth looks right."""
        self.assertEqual(200, response.status_code)  # Yes, it's a 200 even though it's a failure.
        payload = json.loads(response.content)
        self.assertFalse(payload.get('success'))
        self.assertIn('associated with your %s account' % self.PROVIDER_CLASS.NAME, payload.get('value'))

    def assert_json_failure_response_is_username_collision(self, response):
        """Asserts the json response indicates a username collision."""
        self.assertEqual(400, response.status_code)
        payload = json.loads(response.content)
        self.assertFalse(payload.get('success'))
        self.assertIn('already exists', payload.get('value'))

    def assert_json_success_response_looks_correct(self, response):
        """Asserts the json response indicates success and redirection."""
        self.assertEqual(200, response.status_code)
        payload = json.loads(response.content)
        self.assertTrue(payload.get('success'))
        self.assertEqual(pipeline.get_complete_url(self.PROVIDER_CLASS.BACKEND_CLASS.name), payload.get('redirect_url'))

    def assert_login_response_before_pipeline_looks_correct(self, response):
        """Asserts a GET of /login not in the pipeline looks correct."""
        self.assertEqual(200, response.status_code)
        self.assertIn('Sign in with ' + self.PROVIDER_CLASS.NAME, response.content)
        self.assert_javascript_would_submit_login_form(False, response)
        self.assert_signin_button_looks_functional(response.content, pipeline.AUTH_ENTRY_LOGIN)

    def assert_login_response_in_pipeline_looks_correct(self, response):
        """Asserts a GET of /login in the pipeline looks correct."""
        self.assertEqual(200, response.status_code)
        # Make sure the form submit JS is told to submit the form:
        self.assert_javascript_would_submit_login_form(True, response)

    def assert_password_overridden_by_pipeline(self, username, password):
        """Verifies that the given password is not correct.

        The pipeline overrides POST['password'], if any, with random data.
        """
        self.assertIsNone(auth.authenticate(password=password, username=username))

    def assert_pipeline_running(self, request):
        """Makes sure the given request is running an auth pipeline."""
        self.assertTrue(pipeline.running(request))

    def assert_redirect_to_dashboard_looks_correct(self, response):
        """Asserts a response would redirect to /dashboard."""
        self.assertEqual(302, response.status_code)
        # pylint: disable-msg=protected-access
        self.assertEqual(auth_settings._SOCIAL_AUTH_LOGIN_REDIRECT_URL, response.get('Location'))

    def assert_redirect_to_login_looks_correct(self, response):
        """Asserts a response would redirect to /login."""
        self.assertEqual(302, response.status_code)
        self.assertEqual('/' + pipeline.AUTH_ENTRY_LOGIN, response.get('Location'))

    def assert_redirect_to_register_looks_correct(self, response):
        """Asserts a response would redirect to /register."""
        self.assertEqual(302, response.status_code)
        self.assertEqual('/' + pipeline.AUTH_ENTRY_REGISTER, response.get('Location'))

    def assert_register_response_before_pipeline_looks_correct(self, response):
        """Asserts a GET of /register not in the pipeline looks correct."""
        self.assertEqual(200, response.status_code)
        self.assertIn('Sign in with ' + self.PROVIDER_CLASS.NAME, response.content)
        self.assert_signin_button_looks_functional(response.content, pipeline.AUTH_ENTRY_REGISTER)

    def assert_signin_button_looks_functional(self, content, auth_entry):
        """Asserts JS is available to signin buttons and has the right args."""
        self.assertTrue(re.search(r'function thirdPartySignin', content))
        self.assertEqual(
            pipeline.get_login_url(self.PROVIDER_CLASS.NAME, auth_entry),
            re.search(r"thirdPartySignin\(event, '([^']+)", content).groups()[0])

    def assert_social_auth_does_not_exist_for_user(self, user, strategy):
        """Asserts a user does not have an auth with the expected provider."""
        social_auths = strategy.storage.user.get_social_auth_for_user(
            user, provider=self.PROVIDER_CLASS.BACKEND_CLASS.name)
        self.assertEqual(0, len(social_auths))

    def assert_social_auth_exists_for_user(self, user, strategy):
        """Asserts a user has a social auth with the expected provider."""
        social_auths = strategy.storage.user.get_social_auth_for_user(
            user, provider=self.PROVIDER_CLASS.BACKEND_CLASS.name)
        self.assertEqual(1, len(social_auths))
        self.assertEqual(self.backend_name, social_auths[0].provider)

    def configure_runtime(self):
        """Configures settings details."""
        auth_settings.apply_settings({self.PROVIDER_CLASS.NAME: self.PROVIDER_SETTINGS}, django_settings)
        # Force settings to propagate into cached members on
        # social.apps.django_app.utils.
        reload(social_utils)

    def create_user_models_for_existing_account(self, strategy, email, password, username, skip_social_auth=False):
        """Creates user, profile, registration, and (usually) social auth.

        This synthesizes what happens during /register.
        See student.views.register and student.views._do_create_account.
        """
        response_data = self.get_response_data()
        uid = strategy.backend.get_user_id(response_data, response_data)
        user = social_utils.Storage.user.create_user(email=email, password=password, username=username)
        profile = student_models.UserProfile(user=user)
        profile.save()
        registration = student_models.Registration()
        registration.register(user)
        registration.save()

        if not skip_social_auth:
            social_utils.Storage.user.create_social_auth(user, uid, self.PROVIDER_CLASS.BACKEND_CLASS.name)

        return user

    def fake_auth_complete(self, strategy):
        """Fake implementation of social.backends.BaseAuth.auth_complete.

        Unlike what the docs say, it does not need to return a user instance.
        Sometimes (like when directing users to the /register form) it instead
        returns a response that 302s to /register.
        """
        args = ()
        kwargs = {
            'request': strategy.request,
            'backend': strategy.backend,
            'user': None,
            'response': self.get_response_data(),
        }
        return strategy.authenticate(*args, **kwargs)

    def get_registration_post_vars(self, overrides=None):
        """POST vars generated by the registration form."""
        defaults = {
            'username': 'username',
            'name': 'First Last',
            'gender': '',
            'year_of_birth': '',
            'level_of_education': '',
            'goals': '',
            'honor_code': 'true',
            'terms_of_service': 'true',
            'password': 'password',
            'mailing_address': '',
            'email': 'user@email.com',
        }

        if overrides:
            defaults.update(overrides)

        return defaults

    def get_request_and_strategy(self, auth_entry=None, redirect_uri=None):
        """Gets a fully-configured request and strategy.

        These two objects contain circular references, so we create them
        together. The references themselves are a mixture of normal __init__
        stuff and monkey-patching done by python-social-auth. See, for example,
        social.apps.django_apps.utils.strategy().
        """
        request = self.request_factory.get(
            pipeline.get_complete_url(self.backend_name) +
            '?redirect_state=redirect_state_value&code=code_value&state=state_value')
        request.user = auth_models.AnonymousUser()
        request.session = cache.SessionStore()
        request.session[self.backend_name + '_state'] = 'state_value'

        if auth_entry:
            request.session[pipeline.AUTH_ENTRY_KEY] = auth_entry

        strategy = social_utils.load_strategy(backend=self.backend_name, redirect_uri=redirect_uri, request=request)
        request.social_strategy = strategy

        return request, strategy

    def get_user_by_email(self, strategy, email):
        """Gets a user by email, using the given strategy."""
        return strategy.storage.user.user_model().objects.get(email=email)

    # Actual tests, executed once per child.

    def test_canceling_authentication_redirects_to_login_when_auth_entry_login(self):
        self.assert_exception_redirect_looks_correct(auth_entry=pipeline.AUTH_ENTRY_LOGIN)

    def test_canceling_authentication_redirects_to_register_when_auth_entry_register(self):
        self.assert_exception_redirect_looks_correct(auth_entry=pipeline.AUTH_ENTRY_REGISTER)

    def test_canceling_authentication_redirects_to_root_when_auth_entry_not_set(self):
        self.assert_exception_redirect_looks_correct()

    def test_full_pipeline_succeeds_for_linking_account(self):
        # First, create, the request and strategy that store pipeline state,
        # configure the backend, and mock out wire traffic.
        request, strategy = self.get_request_and_strategy(
            auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete')
        strategy.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
        request.user = self.create_user_models_for_existing_account(
            strategy, 'user@example.com', 'password', self.get_username(), skip_social_auth=True)

        # Instrument the pipeline to get to the dashboard with the full
        # expected state.
        self.client.get(
            pipeline.get_login_url(self.PROVIDER_CLASS.NAME, pipeline.AUTH_ENTRY_LOGIN))
        actions.do_complete(strategy, social_views._do_login)  # pylint: disable-msg=protected-access
        student_views.signin_user(strategy.request)
        student_views.login_user(strategy.request)
        actions.do_complete(strategy, social_views._do_login)  # pylint: disable-msg=protected-access

        # First we expect that we're in the unlinked state, and that there
        # really is no association in the backend.
        self.assert_dashboard_response_looks_correct(student_views.dashboard(request), request.user, linked=False)
        self.assert_social_auth_does_not_exist_for_user(request.user, strategy)

        # Fire off the auth pipeline to link.
        self.assert_redirect_to_dashboard_looks_correct(actions.do_complete(
            request.social_strategy, social_views._do_login, request.user, None,  # pylint: disable-msg=protected-access
            redirect_field_name=auth.REDIRECT_FIELD_NAME))

        # Now we expect to be in the linked state, with a backend entry.
        self.assert_social_auth_exists_for_user(request.user, strategy)
        self.assert_dashboard_response_looks_correct(student_views.dashboard(request), request.user, linked=True)

    def test_full_pipeline_succeeds_for_unlinking_account(self):
        # First, create, the request and strategy that store pipeline state,
        # configure the backend, and mock out wire traffic.
        request, strategy = self.get_request_and_strategy(
            auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete')
        strategy.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
        user = self.create_user_models_for_existing_account(
            strategy, 'user@example.com', 'password', self.get_username())
        self.assert_social_auth_exists_for_user(user, strategy)

        # Instrument the pipeline to get to the dashboard with the full
        # expected state.
        self.client.get(
            pipeline.get_login_url(self.PROVIDER_CLASS.NAME, pipeline.AUTH_ENTRY_LOGIN))
        actions.do_complete(strategy, social_views._do_login)  # pylint: disable-msg=protected-access
        student_views.signin_user(strategy.request)
        student_views.login_user(strategy.request)
        actions.do_complete(strategy, social_views._do_login, user=user)  # pylint: disable-msg=protected-access

        # First we expect that we're in the linked state, with a backend entry.
        self.assert_dashboard_response_looks_correct(student_views.dashboard(request), user, linked=True)
        self.assert_social_auth_exists_for_user(request.user, strategy)

        # Fire off the disconnect pipeline to unlink.
        self.assert_redirect_to_dashboard_looks_correct(actions.do_disconnect(
            request.social_strategy, request.user, None, redirect_field_name=auth.REDIRECT_FIELD_NAME))

        # Now we expect to be in the unlinked state, with no backend entry.
        self.assert_dashboard_response_looks_correct(student_views.dashboard(request), user, linked=False)
        self.assert_social_auth_does_not_exist_for_user(user, strategy)

    def test_linking_already_associated_account_raises_auth_already_associated(self):
        # This is of a piece with
        # test_already_associated_exception_populates_dashboard_with_error. It
        # verifies the exception gets raised when we expect; the latter test
        # covers exception handling.
        email = 'user@example.com'
        password = 'password'
        username = self.get_username()
        _, strategy = self.get_request_and_strategy(
            auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete')
        strategy.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
        linked_user = self.create_user_models_for_existing_account(strategy, email, password, username)
        unlinked_user = social_utils.Storage.user.create_user(
            email='other_' + email, password=password, username='other_' + username)

        self.assert_social_auth_exists_for_user(linked_user, strategy)
        self.assert_social_auth_does_not_exist_for_user(unlinked_user, strategy)

        with self.assertRaises(exceptions.AuthAlreadyAssociated):
            actions.do_complete(strategy, social_views._do_login, user=unlinked_user)  # pylint: disable-msg=protected-access

    def test_already_associated_exception_populates_dashboard_with_error(self):
        # Instrument the pipeline with an exception. We test that the
        # exception is raised correctly separately, so it's ok that we're
        # raising it artificially here. This makes the linked=True artificial
        # in the final assert because in practice the account would be
        # unlinked, but getting that behavior is cumbersome here and already
        # covered in other tests. Using linked=True does, however, let us test
        # that the duplicate error has no effect on the state of the controls.
        request, strategy = self.get_request_and_strategy(
            auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete')
        strategy.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
        user = self.create_user_models_for_existing_account(
            strategy, 'user@example.com', 'password', self.get_username())
        self.assert_social_auth_exists_for_user(user, strategy)

        self.client.get('/login')
        self.client.get(pipeline.get_login_url(self.PROVIDER_CLASS.NAME, pipeline.AUTH_ENTRY_LOGIN))
        actions.do_complete(strategy, social_views._do_login)  # pylint: disable-msg=protected-access
        student_views.signin_user(strategy.request)
        student_views.login_user(strategy.request)
        actions.do_complete(strategy, social_views._do_login, user=user)  # pylint: disable-msg=protected-access

        # Monkey-patch storage for messaging; pylint: disable-msg=protected-access
        request._messages = fallback.FallbackStorage(request)
        middleware.ExceptionMiddleware().process_exception(
            request, exceptions.AuthAlreadyAssociated(self.PROVIDER_CLASS.BACKEND_CLASS.name, 'duplicate'))

        self.assert_dashboard_response_looks_correct(
            student_views.dashboard(request), user, duplicate=True, linked=True)

    def test_full_pipeline_succeeds_for_signing_in_to_existing_account(self):
        # First, create, the request and strategy that store pipeline state,
        # configure the backend, and mock out wire traffic.
        request, strategy = self.get_request_and_strategy(
            auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete')
        strategy.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
        user = self.create_user_models_for_existing_account(
            strategy, 'user@example.com', 'password', self.get_username())
        self.assert_social_auth_exists_for_user(user, strategy)

        # Begin! Ensure that the login form contains expected controls before
        # the user starts the pipeline.
        self.assert_login_response_before_pipeline_looks_correct(self.client.get('/login'))

        # The pipeline starts by a user GETting /auth/login/<provider>.
        # Synthesize that request and check that it redirects to the correct
        # provider page.
        self.assert_redirect_to_provider_looks_correct(self.client.get(
            pipeline.get_login_url(self.PROVIDER_CLASS.NAME, pipeline.AUTH_ENTRY_LOGIN)))

        # Next, the provider makes a request against /auth/complete/<provider>
        # to resume the pipeline.
        # pylint: disable-msg=protected-access
        self.assert_redirect_to_login_looks_correct(actions.do_complete(strategy, social_views._do_login))

        # At this point we know the pipeline has resumed correctly. Next we
        # fire off the view that displays the login form and posts it via JS.
        self.assert_login_response_in_pipeline_looks_correct(student_views.signin_user(strategy.request))

        # Next, we invoke the view that handles the POST, and expect it
        # redirects to /auth/complete. In the browser ajax handlers will
        # redirect the user to the dashboard; we invoke it manually here.
        self.assert_json_success_response_looks_correct(student_views.login_user(strategy.request))
        self.assert_redirect_to_dashboard_looks_correct(
            actions.do_complete(strategy, social_views._do_login, user=user))
        self.assert_dashboard_response_looks_correct(student_views.dashboard(request), user)

    def test_signin_fails_if_no_account_associated(self):
        _, strategy = self.get_request_and_strategy(
            auth_entry=pipeline.AUTH_ENTRY_LOGIN, redirect_uri='social:complete')
        strategy.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
        self.create_user_models_for_existing_account(
            strategy, 'user@example.com', 'password', self.get_username(), skip_social_auth=True)

        self.assert_json_failure_response_is_missing_social_auth(student_views.login_user(strategy.request))

    def test_first_party_auth_trumps_third_party_auth_but_is_invalid_when_only_email_in_request(self):
        self.assert_first_party_auth_trumps_third_party_auth(email='user@example.com')

    def test_first_party_auth_trumps_third_party_auth_but_is_invalid_when_only_password_in_request(self):
        self.assert_first_party_auth_trumps_third_party_auth(password='password')

    def test_first_party_auth_trumps_third_party_auth_and_fails_when_credentials_bad(self):
        self.assert_first_party_auth_trumps_third_party_auth(
            email='user@example.com', password='password', success=False)

    def test_first_party_auth_trumps_third_party_auth_and_succeeds_when_credentials_good(self):
        self.assert_first_party_auth_trumps_third_party_auth(
            email='user@example.com', password='password', success=True)

    def test_full_pipeline_succeeds_registering_new_account(self):
        # First, create, the request and strategy that store pipeline state.
        # Mock out wire traffic.
        request, strategy = self.get_request_and_strategy(
            auth_entry=pipeline.AUTH_ENTRY_REGISTER, redirect_uri='social:complete')
        strategy.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))

        # Begin! Grab the registration page and check the login control on it.
        self.assert_register_response_before_pipeline_looks_correct(self.client.get('/register'))

        # The pipeline starts by a user GETting /auth/login/<provider>.
        # Synthesize that request and check that it redirects to the correct
        # provider page.
        self.assert_redirect_to_provider_looks_correct(self.client.get(
            pipeline.get_login_url(self.PROVIDER_CLASS.NAME, pipeline.AUTH_ENTRY_LOGIN)))

        # Next, the provider makes a request against /auth/complete/<provider>.
        # pylint:disable-msg=protected-access
        self.assert_redirect_to_register_looks_correct(actions.do_complete(strategy, social_views._do_login))

        # At this point we know the pipeline has resumed correctly. Next we
        # fire off the view that displays the registration form.
        self.assert_register_response_in_pipeline_looks_correct(
            student_views.register_user(strategy.request), pipeline.get(request)['kwargs'])

        # Next, we invoke the view that handles the POST. Not all providers
        # supply email. Manually add it as the user would have to; this
        # also serves as a test of overriding provider values. Always provide a
        # password for us to check that we override it properly.
        overridden_password = strategy.request.POST.get('password')
        email = 'new@example.com'

        if not strategy.request.POST.get('email'):
            strategy.request.POST = self.get_registration_post_vars({'email': email})

        # The user must not exist yet...
        with self.assertRaises(auth_models.User.DoesNotExist):
            self.get_user_by_email(strategy, email)

        # ...but when we invoke create_account the existing edX view will make
        # it, but not social auths. The pipeline creates those later.
        self.assert_json_success_response_looks_correct(student_views.create_account(strategy.request))
        # We've overridden the user's password, so authenticate() with the old
        # value won't work:
        created_user = self.get_user_by_email(strategy, email)
        self.assert_password_overridden_by_pipeline(overridden_password, created_user.username)

        # Last step in the pipeline: we re-invoke the pipeline and expect to
        # end up on /dashboard, with the correct social auth object now in the
        # backend and the correct user's data on display.
        self.assert_redirect_to_dashboard_looks_correct(
            actions.do_complete(strategy, social_views._do_login, user=created_user))
        self.assert_social_auth_exists_for_user(created_user, strategy)
        self.assert_dashboard_response_looks_correct(student_views.dashboard(request), created_user)

    def test_new_account_registration_assigns_distinct_username_on_collision(self):
        original_username = self.get_username()
        request, strategy = self.get_request_and_strategy(
            auth_entry=pipeline.AUTH_ENTRY_REGISTER, redirect_uri='social:complete')

        # Create a colliding username in the backend, then proceed with
        # assignment via pipeline to make sure a distinct username is created.
        strategy.storage.user.create_user(username=self.get_username(), email='user@email.com', password='password')
        strategy.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
        # pylint:disable-msg=protected-access
        self.assert_redirect_to_register_looks_correct(actions.do_complete(strategy, social_views._do_login))
        distinct_username = pipeline.get(request)['kwargs']['username']
        self.assertNotEqual(original_username, distinct_username)

    def test_new_account_registration_fails_if_email_exists(self):
        request, strategy = self.get_request_and_strategy(
            auth_entry=pipeline.AUTH_ENTRY_REGISTER, redirect_uri='social:complete')
        strategy.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))
        # pylint:disable-msg=protected-access
        self.assert_redirect_to_register_looks_correct(actions.do_complete(strategy, social_views._do_login))
        self.assert_register_response_in_pipeline_looks_correct(
            student_views.register_user(strategy.request), pipeline.get(request)['kwargs'])
        strategy.request.POST = self.get_registration_post_vars()
        # Create twice: once successfully, and once causing a collision.
        student_views.create_account(strategy.request)
        self.assert_json_failure_response_is_username_collision(student_views.create_account(strategy.request))

    def test_pipeline_raises_auth_entry_error_if_auth_entry_invalid(self):
        auth_entry = 'invalid'
        self.assertNotIn(auth_entry, pipeline._AUTH_ENTRY_CHOICES)  # pylint: disable-msg=protected-access

        _, strategy = self.get_request_and_strategy(auth_entry=auth_entry, redirect_uri='social:complete')

        with self.assertRaises(pipeline.AuthEntryError):
            strategy.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))

    def test_pipeline_raises_auth_entry_error_if_auth_entry_missing(self):
        _, strategy = self.get_request_and_strategy(auth_entry=None, redirect_uri='social:complete')

        with self.assertRaises(pipeline.AuthEntryError):
            strategy.backend.auth_complete = mock.MagicMock(return_value=self.fake_auth_complete(strategy))


class Oauth2IntegrationTest(IntegrationTest):  # pylint: disable-msg=abstract-method
    """Base test case for integration tests of Oauth2 providers."""

    # Dict of string -> object. Information about the token granted to the
    # user. Override with test values in subclass; None to force a throw.
    TOKEN_RESPONSE_DATA = None

    # Dict of string -> object. Information about the user themself. Override
    # with test values in subclass; None to force a throw.
    USER_RESPONSE_DATA = None

    def get_response_data(self):
        """Gets dict (string -> object) of merged data about the user."""
        response_data = dict(self.TOKEN_RESPONSE_DATA)
        response_data.update(self.USER_RESPONSE_DATA)
        return response_data