Skip to content
Snippets Groups Projects
test_db.py 7.42 KiB
Newer Older
"""Tests for util.db module."""

import threading
import time
import unittest

from django.contrib.auth.models import User
from django.core.management import call_command
from django.db import IntegrityError, connection
from django.db.transaction import TransactionManagementError, atomic
from django.test import TestCase, TransactionTestCase
from django.test.utils import override_settings
from django.utils.six import StringIO
from util.db import enable_named_outer_atomic, generate_int_id, outer_atomic


def do_nothing():
    """Just return."""
    return
class TransactionManagersTestCase(TransactionTestCase):

    Note: This TestCase only works with MySQL.

    To test do: "./manage.py lms --settings=test_with_mysql test util.tests.test_db"
Jeremy Bowman's avatar
Jeremy Bowman committed
    DECORATORS = {
        'outer_atomic': outer_atomic(),
        'outer_atomic_read_committed': outer_atomic(read_committed=True),
    }
Jeremy Bowman's avatar
Jeremy Bowman committed
        ('outer_atomic', IntegrityError, None, True),
        ('outer_atomic_read_committed', type(None), False, True),
Jeremy Bowman's avatar
Jeremy Bowman committed
    def test_concurrent_requests(self, transaction_decorator_name, exception_class, created_in_1, created_in_2):
        """
        Test that when isolation level is set to READ COMMITTED get_or_create()
        for the same row in concurrent requests does not raise an IntegrityError.
        """
Jeremy Bowman's avatar
Jeremy Bowman committed
        transaction_decorator = self.DECORATORS[transaction_decorator_name]
        if connection.vendor != 'mysql':
            raise unittest.SkipTest('Only works on MySQL.')

        class RequestThread(threading.Thread):
            """ A thread which runs a dummy view."""
            def __init__(self, delay, **kwargs):
                super(RequestThread, self).__init__(**kwargs)
                self.delay = delay
                self.status = {}

            @transaction_decorator
            def run(self):
                """A dummy view."""
                try:
                    try:
                        User.objects.get(username='student', email='student@edx.org')
                    except User.DoesNotExist:
                        pass
                    else:
                        raise AssertionError('Did not raise User.DoesNotExist.')

                    if self.delay > 0:
                        time.sleep(self.delay)

                    __, created = User.objects.get_or_create(username='student', email='student@edx.org')
                except Exception as exception:  # pylint: disable=broad-except
                    self.status['exception'] = exception
                else:
                    self.status['created'] = created

        thread1 = RequestThread(delay=1)
        thread2 = RequestThread(delay=0)

        thread1.start()
        thread2.start()
        thread2.join()
        thread1.join()

        self.assertIsInstance(thread1.status.get('exception'), exception_class)
        self.assertEqual(thread1.status.get('created'), created_in_1)

        self.assertIsNone(thread2.status.get('exception'))
        self.assertEqual(thread2.status.get('created'), created_in_2)
    def test_outer_atomic_nesting(self):
        """
        Test that outer_atomic raises an error if it is nested inside
        another atomic.
        """
        if connection.vendor != 'mysql':
            raise unittest.SkipTest('Only works on MySQL.')

        outer_atomic()(do_nothing)()

        with atomic():
            atomic()(do_nothing)()

        with outer_atomic():
            atomic()(do_nothing)()

        with self.assertRaisesRegex(TransactionManagementError, 'Cannot be inside an atomic block.'):
            with atomic():
                outer_atomic()(do_nothing)()
        with self.assertRaisesRegex(TransactionManagementError, 'Cannot be inside an atomic block.'):
            with outer_atomic():
                outer_atomic()(do_nothing)()
    def test_named_outer_atomic_nesting(self):
        """
        Test that a named outer_atomic raises an error only if nested in
        enable_named_outer_atomic and inside another atomic.
        """
        if connection.vendor != 'mysql':
            raise unittest.SkipTest('Only works on MySQL.')

        outer_atomic(name='abc')(do_nothing)()

        with atomic():
            outer_atomic(name='abc')(do_nothing)()

        with enable_named_outer_atomic('abc'):

            outer_atomic(name='abc')(do_nothing)()  # Not nested.

            with atomic():
                outer_atomic(name='pqr')(do_nothing)()  # Not enabled.

            with self.assertRaisesRegex(TransactionManagementError, 'Cannot be inside an atomic block.'):
                with atomic():
                    outer_atomic(name='abc')(do_nothing)()

        with enable_named_outer_atomic('abc', 'def'):

            outer_atomic(name='def')(do_nothing)()  # Not nested.

            with atomic():
                outer_atomic(name='pqr')(do_nothing)()  # Not enabled.

            with self.assertRaisesRegex(TransactionManagementError, 'Cannot be inside an atomic block.'):
                with atomic():
                    outer_atomic(name='def')(do_nothing)()

            with self.assertRaisesRegex(TransactionManagementError, 'Cannot be inside an atomic block.'):
                with outer_atomic():
                    outer_atomic(name='def')(do_nothing)()

            with self.assertRaisesRegex(TransactionManagementError, 'Cannot be inside an atomic block.'):
                with atomic():
                    outer_atomic(name='abc')(do_nothing)()

            with self.assertRaisesRegex(TransactionManagementError, 'Cannot be inside an atomic block.'):
                with outer_atomic():
                    outer_atomic(name='abc')(do_nothing)()


@ddt.ddt
class GenerateIntIdTestCase(TestCase):
    """Tests for `generate_int_id`"""
    @ddt.data(10)
    def test_no_used_ids(self, times):
        """
        Verify that we get a random integer within the specified range
        when there are no used ids.
        """
        minimum = 1
        maximum = times
        for __ in range(times):
            self.assertIn(generate_int_id(minimum, maximum), list(range(minimum, maximum + 1)))

    @ddt.data(10)
    def test_used_ids(self, times):
        """
        Verify that we get a random integer within the specified range
        but not in a list of used ids.
        """
        minimum = 1
        maximum = times
        used_ids = {2, 4, 6, 8}
        for __ in range(times):
            int_id = generate_int_id(minimum, maximum, used_ids)
            self.assertIn(int_id, list(set(range(minimum, maximum + 1)) - used_ids))


class MigrationTests(TestCase):
    """
    Tests for migrations.
    """
    @override_settings(MIGRATION_MODULES={})
    def test_migrations_are_in_sync(self):
        """
        Tests that the migration files are in sync with the models.
        If this fails, you needs to run the Django command makemigrations.

        The test is set up to override MIGRATION_MODULES to ensure migrations are
        enabled for purposes of this test regardless of the overall test settings.

        TODO: Find a general way of handling the case where if we're trying to
        make a migrationless release that'll require a separate migration
        release afterwards, this test doesn't fail.
        out = StringIO()
        call_command("makemigrations", dry_run=True, verbosity=3, stdout=out)
        output = out.getvalue()
        self.assertIn("No changes detected", output)