Newer
Older
"""Tests for util.db module."""
import threading
import time
import unittest
from django.contrib.auth.models import User
from django.core.management import call_command
from django.db import IntegrityError, connection
from django.db.transaction import TransactionManagementError, atomic
from django.test import TestCase, TransactionTestCase
from django.test.utils import override_settings
from six.moves import range
from util.db import enable_named_outer_atomic, generate_int_id, outer_atomic
def do_nothing():
"""Just return."""
return
@ddt.ddt
class TransactionManagersTestCase(TransactionTestCase):
Tests outer_atomic.
Note: This TestCase only works with MySQL.
To test do: "./manage.py lms --settings=test_with_mysql test util.tests.test_db"
DECORATORS = {
'outer_atomic': outer_atomic(),
'outer_atomic_read_committed': outer_atomic(read_committed=True),
}
@ddt.data(
('outer_atomic', IntegrityError, None, True),
('outer_atomic_read_committed', type(None), False, True),
)
@ddt.unpack
def test_concurrent_requests(self, transaction_decorator_name, exception_class, created_in_1, created_in_2):
"""
Test that when isolation level is set to READ COMMITTED get_or_create()
for the same row in concurrent requests does not raise an IntegrityError.
"""
transaction_decorator = self.DECORATORS[transaction_decorator_name]
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
if connection.vendor != 'mysql':
raise unittest.SkipTest('Only works on MySQL.')
class RequestThread(threading.Thread):
""" A thread which runs a dummy view."""
def __init__(self, delay, **kwargs):
super(RequestThread, self).__init__(**kwargs)
self.delay = delay
self.status = {}
@transaction_decorator
def run(self):
"""A dummy view."""
try:
try:
User.objects.get(username='student', email='student@edx.org')
except User.DoesNotExist:
pass
else:
raise AssertionError('Did not raise User.DoesNotExist.')
if self.delay > 0:
time.sleep(self.delay)
__, created = User.objects.get_or_create(username='student', email='student@edx.org')
except Exception as exception: # pylint: disable=broad-except
self.status['exception'] = exception
else:
self.status['created'] = created
thread1 = RequestThread(delay=1)
thread2 = RequestThread(delay=0)
thread1.start()
thread2.start()
thread2.join()
thread1.join()
self.assertIsInstance(thread1.status.get('exception'), exception_class)
self.assertEqual(thread1.status.get('created'), created_in_1)
self.assertIsNone(thread2.status.get('exception'))
self.assertEqual(thread2.status.get('created'), created_in_2)
def test_outer_atomic_nesting(self):
"""
Test that outer_atomic raises an error if it is nested inside
another atomic.
"""
if connection.vendor != 'mysql':
raise unittest.SkipTest('Only works on MySQL.')
outer_atomic()(do_nothing)()
with atomic():
atomic()(do_nothing)()
with outer_atomic():
atomic()(do_nothing)()
with self.assertRaisesRegex(TransactionManagementError, 'Cannot be inside an atomic block.'):
with atomic():
outer_atomic()(do_nothing)()
with self.assertRaisesRegex(TransactionManagementError, 'Cannot be inside an atomic block.'):
with outer_atomic():
outer_atomic()(do_nothing)()
def test_named_outer_atomic_nesting(self):
"""
Test that a named outer_atomic raises an error only if nested in
enable_named_outer_atomic and inside another atomic.
"""
if connection.vendor != 'mysql':
raise unittest.SkipTest('Only works on MySQL.')
outer_atomic(name='abc')(do_nothing)()
with atomic():
outer_atomic(name='abc')(do_nothing)()
with enable_named_outer_atomic('abc'):
outer_atomic(name='abc')(do_nothing)() # Not nested.
with atomic():
outer_atomic(name='pqr')(do_nothing)() # Not enabled.
with self.assertRaisesRegex(TransactionManagementError, 'Cannot be inside an atomic block.'):
with atomic():
outer_atomic(name='abc')(do_nothing)()
with enable_named_outer_atomic('abc', 'def'):
outer_atomic(name='def')(do_nothing)() # Not nested.
with atomic():
outer_atomic(name='pqr')(do_nothing)() # Not enabled.
with self.assertRaisesRegex(TransactionManagementError, 'Cannot be inside an atomic block.'):
with atomic():
outer_atomic(name='def')(do_nothing)()
with self.assertRaisesRegex(TransactionManagementError, 'Cannot be inside an atomic block.'):
with outer_atomic():
outer_atomic(name='def')(do_nothing)()
with self.assertRaisesRegex(TransactionManagementError, 'Cannot be inside an atomic block.'):
with atomic():
outer_atomic(name='abc')(do_nothing)()
with self.assertRaisesRegex(TransactionManagementError, 'Cannot be inside an atomic block.'):
with outer_atomic():
outer_atomic(name='abc')(do_nothing)()
@ddt.ddt
class GenerateIntIdTestCase(TestCase):
"""Tests for `generate_int_id`"""
@ddt.data(10)
def test_no_used_ids(self, times):
"""
Verify that we get a random integer within the specified range
when there are no used ids.
"""
minimum = 1
maximum = times
self.assertIn(generate_int_id(minimum, maximum), list(range(minimum, maximum + 1)))
@ddt.data(10)
def test_used_ids(self, times):
"""
Verify that we get a random integer within the specified range
but not in a list of used ids.
"""
minimum = 1
maximum = times
used_ids = {2, 4, 6, 8}
int_id = generate_int_id(minimum, maximum, used_ids)
self.assertIn(int_id, list(set(range(minimum, maximum + 1)) - used_ids))
class MigrationTests(TestCase):
"""
Tests for migrations.
"""
@override_settings(MIGRATION_MODULES={})
def test_migrations_are_in_sync(self):
"""
Tests that the migration files are in sync with the models.
If this fails, you needs to run the Django command makemigrations.
The test is set up to override MIGRATION_MODULES to ensure migrations are
enabled for purposes of this test regardless of the overall test settings.
TODO: Find a general way of handling the case where if we're trying to
make a migrationless release that'll require a separate migration
release afterwards, this test doesn't fail.
call_command("makemigrations", dry_run=True, verbosity=3, stdout=out)
self.assertIn("No changes detected", output)