From 2c5e038f829f0a87ac8d3ee287c6fbbf5a7e0f0c Mon Sep 17 00:00:00 2001
From: Brian Wilson <brian@edx.org>
Date: Thu, 13 Jun 2013 03:27:22 -0400
Subject: [PATCH] initial test_tasks

---
 lms/djangoapps/instructor_task/api_helper.py  |   8 +-
 .../instructor_task/tasks_helper.py           |  19 +-
 .../instructor_task/tests/test_api.py         | 266 +++++++++++++-----
 .../instructor_task/tests/test_base.py        |  25 +-
 .../instructor_task/tests/test_integration.py |  48 ++--
 .../instructor_task/tests/test_tasks.py       | 258 +++++++++++++++++
 lms/djangoapps/instructor_task/views.py       |  55 ++--
 lms/envs/test.py                              |   1 -
 8 files changed, 559 insertions(+), 121 deletions(-)
 create mode 100644 lms/djangoapps/instructor_task/tests/test_tasks.py

diff --git a/lms/djangoapps/instructor_task/api_helper.py b/lms/djangoapps/instructor_task/api_helper.py
index 290166e3470..800c493cf6b 100644
--- a/lms/djangoapps/instructor_task/api_helper.py
+++ b/lms/djangoapps/instructor_task/api_helper.py
@@ -14,7 +14,6 @@ from xmodule.modulestore.django import modulestore
 from instructor_task.models import InstructorTask
 from instructor_task.tasks_helper import PROGRESS
 
-
 log = logging.getLogger(__name__)
 
 # define a "state" used in InstructorTask
@@ -49,6 +48,13 @@ def _reserve_task(course_id, task_type, task_key, task_input, requester):
     will cause any pending transaction to be committed by a successful
     save here.  Any future database operations will take place in a
     separate transaction.
+
+    Note that there is a chance of a race condition here, when two users
+    try to run the same task at almost exactly the same time.  One user
+    could be after the check and before the create when the second user
+    gets to the check.  At that point, both users are able to run their
+    tasks simultaneously.  This is deemed a small enough risk to not
+    put in further safeguards.
     """
 
     if _task_is_running(course_id, task_type, task_key):
diff --git a/lms/djangoapps/instructor_task/tasks_helper.py b/lms/djangoapps/instructor_task/tasks_helper.py
index faea9030225..9776c7336d5 100644
--- a/lms/djangoapps/instructor_task/tasks_helper.py
+++ b/lms/djangoapps/instructor_task/tasks_helper.py
@@ -49,6 +49,11 @@ class UpdateProblemModuleStateError(Exception):
     pass
 
 
+def _get_current_task():
+    """Stub to make it easier to test without actually running Celery"""
+    return current_task
+
+
 def _perform_module_state_update(course_id, module_state_key, student_identifier, update_fcn, action_name, filter_fcn,
                                  xmodule_instance_args):
     """
@@ -137,12 +142,12 @@ def _perform_module_state_update(course_id, module_state_key, student_identifier
         return progress
 
     task_progress = get_task_progress()
-    current_task.update_state(state=PROGRESS, meta=task_progress)
+    _get_current_task().update_state(state=PROGRESS, meta=task_progress)
     for module_to_update in modules_to_update:
         num_attempted += 1
         # There is no try here:  if there's an error, we let it throw, and the task will
         # be marked as FAILED, with a stack trace.
-        with dog_stats_api.timer('courseware.tasks.module.{0}.time'.format(action_name)):
+        with dog_stats_api.timer('instructor_tasks.module.{0}.time'.format(action_name)):
             if update_fcn(module_descriptor, module_to_update, xmodule_instance_args):
                 # If the update_fcn returns true, then it performed some kind of work.
                 # Logging of failures is left to the update_fcn itself.
@@ -150,7 +155,7 @@ def _perform_module_state_update(course_id, module_state_key, student_identifier
 
         # update task status:
         task_progress = get_task_progress()
-        current_task.update_state(state=PROGRESS, meta=task_progress)
+        _get_current_task().update_state(state=PROGRESS, meta=task_progress)
 
     return task_progress
 
@@ -162,7 +167,7 @@ def _save_course_task(course_task):
 
 
 def update_problem_module_state(entry_id, update_fcn, action_name, filter_fcn,
-                                 xmodule_instance_args):
+                                xmodule_instance_args):
     """
     Performs generic update by visiting StudentModule instances with the update_fcn provided.
 
@@ -219,7 +224,7 @@ def update_problem_module_state(entry_id, update_fcn, action_name, filter_fcn,
     try:
         # check that the task_id submitted in the InstructorTask matches the current task
         # that is running.
-        request_task_id = current_task.request.id
+        request_task_id = _get_current_task().request.id
         if task_id != request_task_id:
             fmt = 'Requested task "{task_id}" did not match actual task "{actual_id}"'
             message = fmt.format(task_id=task_id, course_id=course_id, state_key=module_state_key, actual_id=request_task_id)
@@ -227,7 +232,7 @@ def update_problem_module_state(entry_id, update_fcn, action_name, filter_fcn,
             raise UpdateProblemModuleStateError(message)
 
         # now do the work:
-        with dog_stats_api.timer('courseware.tasks.module.{0}.overall_time'.format(action_name)):
+        with dog_stats_api.timer('instructor_tasks.module.{0}.overall_time'.format(action_name)):
             task_progress = _perform_module_state_update(course_id, module_state_key, student_ident, update_fcn,
                                                          action_name, filter_fcn, xmodule_instance_args)
     except Exception:
@@ -351,7 +356,7 @@ def reset_attempts_module_state(_module_descriptor, student_module, xmodule_inst
 
     Always returns true, indicating success, if it doesn't raise an exception due to database error.
     """
-    problem_state = json.loads(student_module.state)
+    problem_state = json.loads(student_module.state) if student_module.state else {}
     if 'attempts' in problem_state:
         old_number_of_attempts = problem_state["attempts"]
         if old_number_of_attempts > 0:
diff --git a/lms/djangoapps/instructor_task/tests/test_api.py b/lms/djangoapps/instructor_task/tests/test_api.py
index 834802193f1..c1b87865b70 100644
--- a/lms/djangoapps/instructor_task/tests/test_api.py
+++ b/lms/djangoapps/instructor_task/tests/test_api.py
@@ -3,7 +3,7 @@ Test for LMS instructor background task queue management
 """
 import logging
 import json
-from celery.states import SUCCESS, FAILURE, REVOKED
+from celery.states import SUCCESS, FAILURE, REVOKED, PENDING
 
 from mock import Mock, patch
 from uuid import uuid4
@@ -14,19 +14,23 @@ from django.test.testcases import TestCase
 from xmodule.modulestore.exceptions import ItemNotFoundError
 
 from courseware.tests.factories import UserFactory
-from instructor_task.tests.factories import InstructorTaskFactory
-from instructor_task.tasks_helper import PROGRESS
-from instructor_task.views import instructor_task_status
+
 from instructor_task.api import (get_running_instructor_tasks,
+                                 get_instructor_task_history,
                                  submit_rescore_problem_for_all_students,
                                  submit_rescore_problem_for_student,
                                  submit_reset_problem_attempts_for_all_students,
                                  submit_delete_problem_state_for_all_students)
 
 from instructor_task.api_helper import (QUEUING,
-#                                        AlreadyRunningError,
+                                        AlreadyRunningError,
                                         encode_problem_and_student_input,
                                         )
+from instructor_task.models import InstructorTask
+from instructor_task.tasks_helper import PROGRESS
+from instructor_task.tests.test_base import InstructorTaskTestCase
+from instructor_task.tests.factories import InstructorTaskFactory
+from instructor_task.views import instructor_task_status, get_task_completion_info
 
 
 log = logging.getLogger(__name__)
@@ -34,16 +38,17 @@ log = logging.getLogger(__name__)
 
 TEST_COURSE_ID = 'edx/1.23x/test_course'
 TEST_FAILURE_MESSAGE = 'task failed horribly'
+TEST_FAILURE_EXCEPTION = 'RandomCauseError'
 
 
-class TaskSubmitTestCase(TestCase):
+class InstructorTaskReportTest(TestCase):
     """
-    Check that background tasks are properly queued and report status.
+    Tests API and view methods that involve the reporting of status for background tasks.
     """
     def setUp(self):
         self.student = UserFactory.create(username="student", email="student@edx.org")
         self.instructor = UserFactory.create(username="instructor", email="instructor@edx.org")
-        self.problem_url = TaskSubmitTestCase.problem_location("test_urlname")
+        self.problem_url = InstructorTaskReportTest.problem_location("test_urlname")
 
     @staticmethod
     def problem_location(problem_url_name):
@@ -57,7 +62,7 @@ class TaskSubmitTestCase(TestCase):
     def _create_entry(self, task_state=QUEUING, task_output=None, student=None):
         """Creates a InstructorTask entry for testing."""
         task_id = str(uuid4())
-        progress_json = json.dumps(task_output)
+        progress_json = json.dumps(task_output) if task_output is not None else None
         task_input, task_key = encode_problem_and_student_input(self.problem_url, student)
 
         instructor_task = InstructorTaskFactory.create(course_id=TEST_COURSE_ID,
@@ -73,7 +78,7 @@ class TaskSubmitTestCase(TestCase):
         """Creates a InstructorTask entry representing a failed task."""
         # view task entry for task failure
         progress = {'message': TEST_FAILURE_MESSAGE,
-                    'exception': 'RandomCauseError',
+                    'exception': TEST_FAILURE_EXCEPTION,
                     }
         return self._create_entry(task_state=FAILURE, task_output=progress)
 
@@ -85,9 +90,8 @@ class TaskSubmitTestCase(TestCase):
         """Creates a InstructorTask entry representing a task in progress."""
         progress = {'attempted': 3,
                     'updated': 2,
-                    'total': 10,
+                    'total': 5,
                     'action_name': 'rescored',
-                    'message': 'some random string that should summarize the other info',
                     }
         return self._create_entry(task_state=task_state, task_output=progress, student=student)
 
@@ -100,6 +104,17 @@ class TaskSubmitTestCase(TestCase):
         task_ids = [instructor_task.task_id for instructor_task in get_running_instructor_tasks(TEST_COURSE_ID)]
         self.assertEquals(set(task_ids), set(progress_task_ids))
 
+    def test_get_instructor_task_history(self):
+        # when fetching historical tasks, we get all tasks, including running tasks
+        expected_ids = []
+        for _ in range(1, 5):
+            expected_ids.append(self._create_failure_entry().task_id)
+            expected_ids.append(self._create_success_entry().task_id)
+            expected_ids.append(self._create_progress_entry().task_id)
+        task_ids = [instructor_task.task_id for instructor_task
+                    in get_instructor_task_history(TEST_COURSE_ID, self.problem_url)]
+        self.assertEquals(set(task_ids), set(expected_ids))
+
     def _get_instructor_task_status(self, task_id):
         """Returns status corresponding to task_id via api method."""
         request = Mock()
@@ -129,23 +144,60 @@ class TaskSubmitTestCase(TestCase):
             self.assertEquals(output[task_id]['task_id'], task_id)
 
     def test_get_status_from_failure(self):
+        # get status for a task that has already failed
         instructor_task = self._create_failure_entry()
         task_id = instructor_task.task_id
         response = self._get_instructor_task_status(task_id)
         output = json.loads(response.content)
+        self.assertEquals(output['message'], TEST_FAILURE_MESSAGE)
+        self.assertEquals(output['succeeded'], False)
         self.assertEquals(output['task_id'], task_id)
         self.assertEquals(output['task_state'], FAILURE)
         self.assertFalse(output['in_progress'])
-        self.assertEquals(output['message'], TEST_FAILURE_MESSAGE)
+        expected_progress = {'exception': TEST_FAILURE_EXCEPTION,
+                             'message': TEST_FAILURE_MESSAGE}
+        self.assertEquals(output['task_progress'], expected_progress)
 
     def test_get_status_from_success(self):
+        # get status for a task that has already succeeded
         instructor_task = self._create_success_entry()
         task_id = instructor_task.task_id
         response = self._get_instructor_task_status(task_id)
         output = json.loads(response.content)
+        self.assertEquals(output['message'], "Problem rescored for 2 of 3 students (out of 5)")
+        self.assertEquals(output['succeeded'], False)
         self.assertEquals(output['task_id'], task_id)
         self.assertEquals(output['task_state'], SUCCESS)
         self.assertFalse(output['in_progress'])
+        expected_progress = {'attempted': 3,
+                             'updated': 2,
+                             'total': 5,
+                             'action_name': 'rescored'}
+        self.assertEquals(output['task_progress'], expected_progress)
+
+    def _test_get_status_from_result(self, task_id, mock_result):
+        """
+        Provides mock result to caller of instructor_task_status, and returns resulting output.
+        """
+        with patch('celery.result.AsyncResult.__new__') as mock_result_ctor:
+            mock_result_ctor.return_value = mock_result
+            response = self._get_instructor_task_status(task_id)
+        output = json.loads(response.content)
+        self.assertEquals(output['task_id'], task_id)
+        return output
+
+    def test_get_status_to_pending(self):
+        # get status for a task that hasn't begun to run yet
+        instructor_task = self._create_entry()
+        task_id = instructor_task.task_id
+        mock_result = Mock()
+        mock_result.task_id = task_id
+        mock_result.state = PENDING
+        output = self._test_get_status_from_result(task_id, mock_result)
+        for key in ['message', 'succeeded', 'task_progress']:
+            self.assertTrue(key not in output)
+        self.assertEquals(output['task_state'], 'PENDING')
+        self.assertTrue(output['in_progress'])
 
     def test_update_progress_to_progress(self):
         # view task entry for task in progress
@@ -158,14 +210,12 @@ class TaskSubmitTestCase(TestCase):
                               'updated': 4,
                               'total': 10,
                               'action_name': 'rescored'}
-        with patch('celery.result.AsyncResult.__new__') as mock_result_ctor:
-            mock_result_ctor.return_value = mock_result
-            response = self._get_instructor_task_status(task_id)
-        output = json.loads(response.content)
-        self.assertEquals(output['task_id'], task_id)
+        output = self._test_get_status_from_result(task_id, mock_result)
         self.assertEquals(output['task_state'], PROGRESS)
         self.assertTrue(output['in_progress'])
-        # self.assertEquals(output['message'], )
+        self.assertEquals(output['task_progress'], mock_result.result)
+        for key in ['message', 'succeeded']:
+            self.assertTrue(key not in output)
 
     def test_update_progress_to_failure(self):
         # view task entry for task in progress that later fails
@@ -176,14 +226,15 @@ class TaskSubmitTestCase(TestCase):
         mock_result.state = FAILURE
         mock_result.result = NotImplementedError("This task later failed.")
         mock_result.traceback = "random traceback"
-        with patch('celery.result.AsyncResult.__new__') as mock_result_ctor:
-            mock_result_ctor.return_value = mock_result
-            response = self._get_instructor_task_status(task_id)
-        output = json.loads(response.content)
-        self.assertEquals(output['task_id'], task_id)
+        output = self._test_get_status_from_result(task_id, mock_result)
+        self.assertEquals(output['message'], "This task later failed.")
+        self.assertEquals(output['succeeded'], False)
         self.assertEquals(output['task_state'], FAILURE)
         self.assertFalse(output['in_progress'])
-        self.assertEquals(output['message'], "This task later failed.")
+        expected_progress = {'exception': 'NotImplementedError',
+                             'message': "This task later failed.",
+                             'traceback': "random traceback"}
+        self.assertEquals(output['task_progress'], expected_progress)
 
     def test_update_progress_to_revoked(self):
         # view task entry for task in progress that later fails
@@ -192,14 +243,13 @@ class TaskSubmitTestCase(TestCase):
         mock_result = Mock()
         mock_result.task_id = task_id
         mock_result.state = REVOKED
-        with patch('celery.result.AsyncResult.__new__') as mock_result_ctor:
-            mock_result_ctor.return_value = mock_result
-            response = self._get_instructor_task_status(task_id)
-        output = json.loads(response.content)
-        self.assertEquals(output['task_id'], task_id)
+        output = self._test_get_status_from_result(task_id, mock_result)
+        self.assertEquals(output['message'], "Task revoked before running")
+        self.assertEquals(output['succeeded'], False)
         self.assertEquals(output['task_state'], REVOKED)
         self.assertFalse(output['in_progress'])
-        self.assertEquals(output['message'], "Task revoked before running")
+        expected_progress = {'message': "Task revoked before running"}
+        self.assertEquals(output['task_progress'], expected_progress)
 
     def _get_output_for_task_success(self, attempted, updated, total, student=None):
         """returns the task_id and the result returned by instructor_task_status()."""
@@ -213,53 +263,108 @@ class TaskSubmitTestCase(TestCase):
                               'updated': updated,
                               'total': total,
                               'action_name': 'rescored'}
-        with patch('celery.result.AsyncResult.__new__') as mock_result_ctor:
-            mock_result_ctor.return_value = mock_result
-            response = self._get_instructor_task_status(task_id)
-        output = json.loads(response.content)
-        return task_id, output
+        output = self._test_get_status_from_result(task_id, mock_result)
+        return output
 
     def test_update_progress_to_success(self):
-        task_id, output = self._get_output_for_task_success(10, 8, 10)
-        self.assertEquals(output['task_id'], task_id)
+        output = self._get_output_for_task_success(10, 8, 10)
+        self.assertEquals(output['message'], "Problem rescored for 8 of 10 students")
+        self.assertEquals(output['succeeded'], False)
         self.assertEquals(output['task_state'], SUCCESS)
         self.assertFalse(output['in_progress'])
+        expected_progress = {'attempted': 10,
+                             'updated': 8,
+                             'total': 10,
+                             'action_name': 'rescored'}
+        self.assertEquals(output['task_progress'], expected_progress)
 
     def test_success_messages(self):
-        _, output = self._get_output_for_task_success(0, 0, 10)
-        self.assertTrue("Unable to find any students with submissions to be rescored" in output['message'])
+        output = self._get_output_for_task_success(0, 0, 10)
+        self.assertEqual(output['message'], "Unable to find any students with submissions to be rescored (out of 10)")
         self.assertFalse(output['succeeded'])
 
-        _, output = self._get_output_for_task_success(10, 0, 10)
-        self.assertTrue("Problem failed to be rescored for any of 10 students" in output['message'])
+        output = self._get_output_for_task_success(10, 0, 10)
+        self.assertEqual(output['message'], "Problem failed to be rescored for any of 10 students")
         self.assertFalse(output['succeeded'])
 
-        _, output = self._get_output_for_task_success(10, 8, 10)
-        self.assertTrue("Problem rescored for 8 of 10 students" in output['message'])
+        output = self._get_output_for_task_success(10, 8, 10)
+        self.assertEqual(output['message'], "Problem rescored for 8 of 10 students")
         self.assertFalse(output['succeeded'])
 
-        _, output = self._get_output_for_task_success(10, 10, 10)
-        self.assertTrue("Problem successfully rescored for 10 students" in output['message'])
+        output = self._get_output_for_task_success(9, 8, 10)
+        self.assertEqual(output['message'], "Problem rescored for 8 of 9 students (out of 10)")
+        self.assertFalse(output['succeeded'])
+
+        output = self._get_output_for_task_success(10, 10, 10)
+        self.assertEqual(output['message'], "Problem successfully rescored for 10 students")
         self.assertTrue(output['succeeded'])
 
-        _, output = self._get_output_for_task_success(0, 0, 1, student=self.student)
+        output = self._get_output_for_task_success(0, 0, 1, student=self.student)
         self.assertTrue("Unable to find submission to be rescored for student" in output['message'])
         self.assertFalse(output['succeeded'])
 
-        _, output = self._get_output_for_task_success(1, 0, 1, student=self.student)
+        output = self._get_output_for_task_success(1, 0, 1, student=self.student)
         self.assertTrue("Problem failed to be rescored for student" in output['message'])
         self.assertFalse(output['succeeded'])
 
-        _, output = self._get_output_for_task_success(1, 1, 1, student=self.student)
+        output = self._get_output_for_task_success(1, 1, 1, student=self.student)
         self.assertTrue("Problem successfully rescored for student" in output['message'])
         self.assertTrue(output['succeeded'])
 
+    def test_get_info_for_queuing_task(self):
+        # get status for a task that is still running:
+        instructor_task = self._create_entry()
+        succeeded, message = get_task_completion_info(instructor_task)
+        self.assertFalse(succeeded)
+        self.assertEquals(message, "No status information available")
+
+    def test_get_info_for_missing_output(self):
+        # check for missing task_output
+        instructor_task = self._create_success_entry()
+        instructor_task.task_output = None
+        succeeded, message = get_task_completion_info(instructor_task)
+        self.assertFalse(succeeded)
+        self.assertEquals(message, "No status information available")
+
+    def test_get_info_for_broken_output(self):
+        # check for non-JSON task_output
+        instructor_task = self._create_success_entry()
+        instructor_task.task_output = "{ bad"
+        succeeded, message = get_task_completion_info(instructor_task)
+        self.assertFalse(succeeded)
+        self.assertEquals(message, "No parsable status information available")
+
+    def test_get_info_for_empty_output(self):
+        # check for JSON task_output with missing keys
+        instructor_task = self._create_success_entry()
+        instructor_task.task_output = "{}"
+        succeeded, message = get_task_completion_info(instructor_task)
+        self.assertFalse(succeeded)
+        self.assertEquals(message, "No progress status information available")
+
+    def test_get_info_for_broken_input(self):
+        # check for non-JSON task_input, but then just ignore it
+        instructor_task = self._create_success_entry()
+        instructor_task.task_input = "{ bad"
+        succeeded, message = get_task_completion_info(instructor_task)
+        self.assertFalse(succeeded)
+        self.assertEquals(message, "Problem rescored for 2 of 3 students (out of 5)")
+
+
+class InstructorTaskSubmitTest(InstructorTaskTestCase):
+    """Tests API methods that involve the submission of background tasks."""
+
+    def setUp(self):
+        self.initialize_course()
+        self.student = UserFactory.create(username="student", email="student@edx.org")
+        self.instructor = UserFactory.create(username="instructor", email="instructor@edx.org")
+
     def test_submit_nonexistent_modules(self):
         # confirm that a rescore of a non-existent module returns an exception
         # (Note that it is easier to test a non-rescorable module in test_tasks,
         # where we are creating real modules.
-        problem_url = self.problem_url
-        course_id = "something else"
+        problem_url = InstructorTaskTestCase.problem_location("NonexistentProblem")
+        course_id = self.course.id
         request = None
         with self.assertRaises(ItemNotFoundError):
             submit_rescore_problem_for_student(request, course_id, problem_url, self.student)
@@ -270,16 +375,49 @@ class TaskSubmitTestCase(TestCase):
         with self.assertRaises(ItemNotFoundError):
             submit_delete_problem_state_for_all_students(request, course_id, problem_url)
 
-#    def test_submit_when_running(self):
-#        # get exception when trying to submit a task that is already running
-#        instructor_task = self._create_progress_entry()
-#        problem_url = json.loads(instructor_task.task_input).get('problem_url')
-#        course_id = instructor_task.course_id
-#        # requester doesn't have to be the same when determining if a task is already running
-#        request = Mock()
-#        request.user = self.instructor
-#        with self.assertRaises(AlreadyRunningError):
-#            # just skip making the argument check, so we don't have to fake it deeper down
-#            with patch('instructor_task.api_helper.check_arguments_for_rescoring') as mock_check:
-#                mock_check.return_value = None
-#                submit_rescore_problem_for_all_students(request, course_id, problem_url)
+    def test_submit_nonrescorable_modules(self):
+        # confirm that a rescore of a non-existent module returns an exception
+        # (Note that it is easier to test a non-rescorable module in test_tasks,
+        # where we are creating real modules.
+        problem_url = self.problem_section.location.url()
+        course_id = self.course.id
+        request = None
+        with self.assertRaises(NotImplementedError):
+            submit_rescore_problem_for_student(request, course_id, problem_url, self.student)
+        with self.assertRaises(NotImplementedError):
+            submit_rescore_problem_for_all_students(request, course_id, problem_url)
+
+    def _test_submit_task(self, task_class, student=None):
+        problem_url_name = 'H1P1'
+        self.define_option_problem(problem_url_name)
+        location = InstructorTaskTestCase.problem_location(problem_url_name)
+        if student is not None:
+            instructor_task = task_class(self.create_task_request(self.instructor),
+                                         self.course.id, location, student)
+        else:
+            instructor_task = task_class(self.create_task_request(self.instructor),
+                                         self.course.id, location)
+
+        # test resubmitting, by updating the existing record:
+        instructor_task = InstructorTask.objects.get(id=instructor_task.id)
+        instructor_task.task_state = PROGRESS
+        instructor_task.save()
+
+        with self.assertRaises(AlreadyRunningError):
+            if student is not None:
+                task_class(self.create_task_request(self.instructor), self.course.id, location, student)
+            else:
+                task_class(self.create_task_request(self.instructor), self.course.id, location)
+
+    def test_submit_rescore_all(self):
+        self._test_submit_task(submit_rescore_problem_for_all_students)
+
+    def test_submit_rescore_student(self):
+        self._test_submit_task(submit_rescore_problem_for_student, self.student)
+
+    def test_submit_reset_all(self):
+        self._test_submit_task(submit_reset_problem_attempts_for_all_students)
+
+    def test_submit_delete_all(self):
+        self._test_submit_task(submit_delete_problem_state_for_all_students)
+
diff --git a/lms/djangoapps/instructor_task/tests/test_base.py b/lms/djangoapps/instructor_task/tests/test_base.py
index 572f7a0a532..cd9584460d3 100644
--- a/lms/djangoapps/instructor_task/tests/test_base.py
+++ b/lms/djangoapps/instructor_task/tests/test_base.py
@@ -17,7 +17,7 @@ from xmodule.modulestore.django import modulestore
 from xmodule.modulestore.tests.factories import CourseFactory, ItemFactory
 from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
 
-from student.tests.factories import CourseEnrollmentFactory, UserFactory, AdminFactory
+from student.tests.factories import CourseEnrollmentFactory, UserFactory
 from courseware.model_data import StudentModule
 from courseware.tests.tests import LoginEnrollmentTestCase, TEST_DATA_MONGO_MODULESTORE
 
@@ -36,8 +36,8 @@ TEST_SECTION_NAME = "Problem"
 @override_settings(MODULESTORE=TEST_DATA_MONGO_MODULESTORE)
 class InstructorTaskTestCase(LoginEnrollmentTestCase, ModuleStoreTestCase):
     """
-    Test that all students' answers to a problem can be rescored after the
-    definition of the problem has been redefined.
+    Base test class for InstructorTask-related tests that require
+    the setup of a course and problem.
     """
     course = None
     current_user = None
@@ -67,16 +67,14 @@ class InstructorTaskTestCase(LoginEnrollmentTestCase, ModuleStoreTestCase):
 
     def login_username(self, username):
         """Login the user, given the `username`."""
-        self.login(InstructorTaskTestCase.get_user_email(username), "test")
-        self.current_user = username
+        if self.current_user != username:
+            self.login(InstructorTaskTestCase.get_user_email(username), "test")
+            self.current_user = username
 
     def _create_user(self, username, is_staff=False):
         """Creates a user and enrolls them in the test course."""
         email = InstructorTaskTestCase.get_user_email(username)
-        if (is_staff):
-            AdminFactory.create(username=username, email=email)
-        else:
-            UserFactory.create(username=username, email=email)
+        UserFactory.create(username=username, email=email, is_staff=is_staff)
         thisuser = User.objects.get(username=username)
         CourseEnrollmentFactory.create(user=thisuser, course_id=self.course.id)
         return thisuser
@@ -140,3 +138,12 @@ class InstructorTaskTestCase(LoginEnrollmentTestCase, ModuleStoreTestCase):
         response = instructor_task_status(mock_request)
         status = json.loads(response.content)
         return status
+
+    def create_task_request(self, requester_username):
+        """Generate request that can be used for submitting tasks"""
+        request = Mock()
+        request.user = User.objects.get(username=requester_username)
+        request.get_host = Mock(return_value="testhost")
+        request.META = {'REMOTE_ADDR': '0:0:0:0', 'SERVER_NAME': 'testhost'}
+        request.is_secure = Mock(return_value=False)
+        return request
diff --git a/lms/djangoapps/instructor_task/tests/test_integration.py b/lms/djangoapps/instructor_task/tests/test_integration.py
index 4132d305e26..4574a4c4abd 100644
--- a/lms/djangoapps/instructor_task/tests/test_integration.py
+++ b/lms/djangoapps/instructor_task/tests/test_integration.py
@@ -7,7 +7,7 @@ paths actually work.
 """
 import logging
 import json
-from mock import Mock, patch
+from mock import patch
 import textwrap
 
 from celery.states import SUCCESS, FAILURE
@@ -33,6 +33,9 @@ log = logging.getLogger(__name__)
 
 
 class TestIntegrationTask(InstructorTaskTestCase):
+    """
+    Base class to provide general methods used for "integration" testing of particular tasks. 
+    """
 
     def submit_student_answer(self, username, problem_url_name, responses):
         """
@@ -48,8 +51,7 @@ class TestIntegrationTask(InstructorTaskTestCase):
 
         # make sure that the requested user is logged in, so that the ajax call works
         # on the right problem:
-        if self.current_user != username:
-            self.login_username(username)
+        self.login_username(username)
         # make ajax call:
         modx_url = reverse('modx_dispatch',
                            kwargs={'course_id': self.course.id,
@@ -62,18 +64,13 @@ class TestIntegrationTask(InstructorTaskTestCase):
         })
         return resp
 
-    def create_task_request(self, requester_username):
-        """Generate request that can be used for submitting tasks"""
-        request = Mock()
-        request.user = User.objects.get(username=requester_username)
-        request.get_host = Mock(return_value="testhost")
-        request.META = {'REMOTE_ADDR': '0:0:0:0', 'SERVER_NAME': 'testhost'}
-        request.is_secure = Mock(return_value=False)
-        return request
-
 
 class TestRescoringTask(TestIntegrationTask):
-    """Test rescoring problems in a background task."""
+    """
+    Integration-style tests for rescoring problems in a background task.
+
+    Exercises real problems with a minimum of patching.
+    """
 
     def setUp(self):
         self.initialize_course()
@@ -90,8 +87,7 @@ class TestRescoringTask(TestIntegrationTask):
         """
         # make sure that the requested user is logged in, so that the ajax call works
         # on the right problem:
-        if self.current_user != username:
-            self.login_username(username)
+        self.login_username(username)
         # make ajax call:
         modx_url = reverse('modx_dispatch',
                            kwargs={'course_id': self.course.id,
@@ -109,11 +105,11 @@ class TestRescoringTask(TestIntegrationTask):
         Values checked include the number of attempts, the score, and the max score for a problem.
         """
         module = self.get_student_module(username, descriptor)
-        self.assertEqual(module.grade, expected_score, "Scores were not equal")
-        self.assertEqual(module.max_grade, expected_max_score, "Max scores were not equal")
+        self.assertEqual(module.grade, expected_score)
+        self.assertEqual(module.max_grade, expected_max_score)
         state = json.loads(module.state)
         attempts = state['attempts']
-        self.assertEqual(attempts, expected_attempts, "Attempts were not equal")
+        self.assertEqual(attempts, expected_attempts)
         if attempts > 0:
             self.assertTrue('correct_map' in state)
             self.assertTrue('student_answers' in state)
@@ -342,7 +338,11 @@ class TestRescoringTask(TestIntegrationTask):
 
 
 class TestResetAttemptsTask(TestIntegrationTask):
-    """Test resetting problem attempts in a background task."""
+    """
+    Integration-style tests for resetting problem attempts in a background task.
+
+    Exercises real problems with a minimum of patching.
+    """
     userlist = ['u1', 'u2', 'u3', 'u4']
 
     def setUp(self):
@@ -402,7 +402,7 @@ class TestResetAttemptsTask(TestIntegrationTask):
         self.assertEqual(instructor_task.task_type, 'reset_problem_attempts')
         task_input = json.loads(instructor_task.task_input)
         self.assertFalse('student' in task_input)
-        self.assertEqual(task_input['problem_url'], TestRescoringTask.problem_location(problem_url_name))
+        self.assertEqual(task_input['problem_url'], InstructorTaskTestCase.problem_location(problem_url_name))
         status = json.loads(instructor_task.task_output)
         self.assertEqual(status['exception'], 'ZeroDivisionError')
         self.assertEqual(status['message'], expected_message)
@@ -426,7 +426,11 @@ class TestResetAttemptsTask(TestIntegrationTask):
 
 
 class TestDeleteProblemTask(TestIntegrationTask):
-    """Test deleting problem state in a background task."""
+    """
+    Integration-style tests for deleting problem state in a background task.
+
+    Exercises real problems with a minimum of patching.
+    """
     userlist = ['u1', 'u2', 'u3', 'u4']
 
     def setUp(self):
@@ -479,7 +483,7 @@ class TestDeleteProblemTask(TestIntegrationTask):
         self.assertEqual(instructor_task.task_type, 'delete_problem_state')
         task_input = json.loads(instructor_task.task_input)
         self.assertFalse('student' in task_input)
-        self.assertEqual(task_input['problem_url'], TestRescoringTask.problem_location(problem_url_name))
+        self.assertEqual(task_input['problem_url'], InstructorTaskTestCase.problem_location(problem_url_name))
         status = json.loads(instructor_task.task_output)
         self.assertEqual(status['exception'], 'ZeroDivisionError')
         self.assertEqual(status['message'], expected_message)
diff --git a/lms/djangoapps/instructor_task/tests/test_tasks.py b/lms/djangoapps/instructor_task/tests/test_tasks.py
new file mode 100644
index 00000000000..7b90ace6dbd
--- /dev/null
+++ b/lms/djangoapps/instructor_task/tests/test_tasks.py
@@ -0,0 +1,258 @@
+"""
+Unit tests for LMS instructor-initiated background tasks, 
+
+Runs tasks on answers to course problems to validate that code
+paths actually work.
+
+"""
+import logging
+import json
+from uuid import uuid4
+
+from mock import Mock, patch
+
+from celery.states import SUCCESS, FAILURE
+
+from xmodule.modulestore.exceptions import ItemNotFoundError
+
+from courseware.model_data import StudentModule
+from courseware.tests.factories import StudentModuleFactory
+from student.tests.factories import UserFactory
+
+from instructor_task.models import InstructorTask
+from instructor_task.tests.test_base import InstructorTaskTestCase, TEST_COURSE_ORG, TEST_COURSE_NUMBER
+from instructor_task.tests.factories import InstructorTaskFactory
+from instructor_task.tasks import rescore_problem, reset_problem_attempts, delete_problem_state
+from instructor_task.tasks_helper import UpdateProblemModuleStateError
+
+log = logging.getLogger(__name__)
+PROBLEM_URL_NAME = "test_urlname"
+
+
+class TestTaskFailure(Exception):
+    pass
+
+
+class TestInstructorTasks(InstructorTaskTestCase):
+    def setUp(self):
+        super(InstructorTaskTestCase, self).setUp()
+        self.initialize_course()
+        self.instructor = self.create_instructor('instructor')
+        self.problem_url = InstructorTaskTestCase.problem_location(PROBLEM_URL_NAME)
+
+    def _create_input_entry(self, student_ident=None):
+        """Creates a InstructorTask entry for testing."""
+        task_id = str(uuid4())
+        task_input = {'problem_url': self.problem_url}
+        if student_ident is not None:
+            task_input['student'] = student_ident
+
+        instructor_task = InstructorTaskFactory.create(course_id=self.course.id,
+                                                       requester=self.instructor,
+                                                       task_input=json.dumps(task_input),
+                                                       task_key='dummy value',
+                                                       task_id=task_id)
+        return instructor_task
+
+    def _get_xmodule_instance_args(self):
+        """
+        Calculate dummy values for parameters needed for instantiating xmodule instances.
+        """
+        return {'xqueue_callback_url_prefix': 'dummy_value',
+                'request_info': {},
+                }
+
+    def _run_task_with_mock_celery(self, task_class, entry_id, task_id, expected_failure_message=None):
+        self.current_task = Mock()
+        self.current_task.request = Mock()
+        self.current_task.request.id = task_id
+        self.current_task.update_state = Mock()
+        if expected_failure_message is not None:
+            self.current_task.update_state.side_effect = TestTaskFailure(expected_failure_message)
+        with patch('instructor_task.tasks_helper._get_current_task') as mock_get_task:
+            mock_get_task.return_value = self.current_task
+            return task_class(entry_id, self._get_xmodule_instance_args())
+
+    def test_missing_current_task(self):
+        # run without (mock) Celery running
+        task_entry = self._create_input_entry()
+        with self.assertRaises(UpdateProblemModuleStateError):
+            reset_problem_attempts(task_entry.id, self._get_xmodule_instance_args())
+
+    def test_undefined_problem(self):
+        # run with celery, but no problem defined
+        task_entry = self._create_input_entry()
+        with self.assertRaises(ItemNotFoundError):
+            self._run_task_with_mock_celery(reset_problem_attempts, task_entry.id, task_entry.task_id)
+
+    def _assert_return_matches_entry(self, returned, entry_id):
+        entry = InstructorTask.objects.get(id=entry_id)
+        self.assertEquals(returned, json.loads(entry.task_output))
+
+    def _test_run_with_task(self, task_class, action_name, expected_num_updated):
+        # run with some StudentModules for the problem
+        task_entry = self._create_input_entry()
+        status = self._run_task_with_mock_celery(task_class, task_entry.id, task_entry.task_id)
+        # check return value
+        self.assertEquals(status.get('attempted'), expected_num_updated)
+        self.assertEquals(status.get('updated'), expected_num_updated)
+        self.assertEquals(status.get('total'), expected_num_updated)
+        self.assertEquals(status.get('action_name'), action_name)
+        self.assertTrue('duration_ms' in status)
+        # compare with entry in table:
+        entry = InstructorTask.objects.get(id=task_entry.id)
+        self.assertEquals(json.loads(entry.task_output), status)
+        self.assertEquals(entry.task_state, SUCCESS)
+
+    def _test_run_with_no_state(self, task_class, action_name):
+        # run with no StudentModules for the problem
+        self.define_option_problem(PROBLEM_URL_NAME)
+        self._test_run_with_task(task_class, action_name, 0)
+
+    def test_rescore_with_no_state(self):
+        self._test_run_with_no_state(rescore_problem, 'rescored')
+
+    def test_reset_with_no_state(self):
+        self._test_run_with_no_state(reset_problem_attempts, 'reset')
+
+    def test_delete_with_no_state(self):
+        self._test_run_with_no_state(delete_problem_state, 'deleted')
+
+    def _create_some_students(self, num_students, state=None):
+        self.define_option_problem(PROBLEM_URL_NAME)
+        students = [
+            UserFactory.create(username='robot%d' % i, email='robot+test+%d@edx.org' % i)
+            for i in xrange(num_students)
+        ]
+        for student in students:
+            StudentModuleFactory.create(course_id=self.course.id,
+                                        module_state_key=self.problem_url,
+                                        student=student,
+                                        state=state)
+        return students
+
+    def test_reset_with_some_state(self):
+        initial_attempts = 3
+        input_state = json.dumps({'attempts': initial_attempts})
+        num_students = 10
+        students = self._create_some_students(num_students, input_state)
+        # check that entries were set correctly
+        for student in students:
+            module = StudentModule.objects.get(course_id=self.course.id,
+                                               student=student,
+                                               module_state_key=self.problem_url)
+            state = json.loads(module.state)
+            self.assertEquals(state['attempts'], initial_attempts)
+        # run the task
+        self._test_run_with_task(reset_problem_attempts, 'reset', num_students)
+        # check that entries were reset
+        for student in students:
+            module = StudentModule.objects.get(course_id=self.course.id,
+                                               student=student,
+                                               module_state_key=self.problem_url)
+            state = json.loads(module.state)
+            self.assertEquals(state['attempts'], 0)
+
+    def test_delete_with_some_state(self):
+        # This will create StudentModule entries -- we don't have to worry about
+        # the state inside them.
+        num_students = 10
+        students = self._create_some_students(num_students)
+        # check that entries were created correctly
+        for student in students:
+            StudentModule.objects.get(course_id=self.course.id,
+                                               student=student,
+                                               module_state_key=self.problem_url)
+        self._test_run_with_task(delete_problem_state, 'deleted', num_students)
+        # confirm that no state can be found anymore:
+        for student in students:
+            with self.assertRaises(StudentModule.DoesNotExist):
+                StudentModule.objects.get(course_id=self.course.id,
+                                          student=student,
+                                          module_state_key=self.problem_url)
+
+    def _test_reset_with_student(self, use_email):
+        # run with some StudentModules for the problem
+        num_students = 10
+        initial_attempts = 3
+        input_state = json.dumps({'attempts': initial_attempts})
+        students = self._create_some_students(num_students, input_state)
+        # check that entries were set correctly
+        for student in students:
+            module = StudentModule.objects.get(course_id=self.course.id,
+                                               student=student,
+                                               module_state_key=self.problem_url)
+            state = json.loads(module.state)
+            self.assertEquals(state['attempts'], initial_attempts)
+
+        if use_email:
+            student_ident = students[3].email
+        else:
+            student_ident = students[3].username
+        task_entry = self._create_input_entry(student_ident)
+
+        status = self._run_task_with_mock_celery(reset_problem_attempts, task_entry.id, task_entry.task_id)
+        # check return value
+        self.assertEquals(status.get('attempted'), 1)
+        self.assertEquals(status.get('updated'), 1)
+        self.assertEquals(status.get('total'), 1)
+        self.assertEquals(status.get('action_name'), 'reset')
+        self.assertTrue('duration_ms' in status)
+        # compare with entry in table:
+        entry = InstructorTask.objects.get(id=task_entry.id)
+        self.assertEquals(json.loads(entry.task_output), status)
+        self.assertEquals(entry.task_state, SUCCESS)
+        # TODO: check that entries were reset
+
+    def test_reset_with_student_username(self):
+        self._test_reset_with_student(False)
+
+    def test_reset_with_student_email(self):
+        self._test_reset_with_student(True)
+
+    def _test_run_with_failure(self, task_class, expected_message):
+        # run with no StudentModules for the problem,
+        # because we will fail before entering the loop.
+        task_entry = self._create_input_entry()
+        self.define_option_problem(PROBLEM_URL_NAME)
+        try:
+            self._run_task_with_mock_celery(task_class, task_entry.id, task_entry.task_id, expected_message)
+        except TestTaskFailure:
+            pass
+        # compare with entry in table:
+        entry = InstructorTask.objects.get(id=task_entry.id)
+        self.assertEquals(entry.task_state, FAILURE)
+        output = json.loads(entry.task_output)
+        self.assertEquals(output['exception'], 'TestTaskFailure')
+        self.assertEquals(output['message'], expected_message)
+
+    def test_rescore_with_failure(self):
+        self._test_run_with_failure(rescore_problem, 'We expected this to fail')
+
+    def test_reset_with_failure(self):
+        self._test_run_with_failure(reset_problem_attempts, 'We expected this to fail')
+
+    def test_delete_with_failure(self):
+        self._test_run_with_failure(delete_problem_state, 'We expected this to fail')
+
+    def _test_run_with_long_error_msg(self, task_class):
+        # run with no StudentModules for the problem
+        task_entry = self._create_input_entry()
+        self.define_option_problem(PROBLEM_URL_NAME)
+        expected_message = "x" * 1500
+        try:
+            self._run_task_with_mock_celery(task_class, task_entry.id, task_entry.task_id, expected_message)
+        except TestTaskFailure:
+            pass
+        # compare with entry in table:
+        entry = InstructorTask.objects.get(id=task_entry.id)
+        self.assertEquals(entry.task_state, FAILURE)
+        # TODO: on MySQL this will actually fail, because it was truncated
+        # when it was persisted.  It does not fail on SqlLite3 at the moment,
+        # because it doesn't actually enforce length limits!
+        output = json.loads(entry.task_output)
+        self.assertEquals(output['exception'], 'TestTaskFailure')
+        self.assertEquals(output['message'], expected_message)
+
+    def test_rescore_with_long_error_msg(self):
+        self._test_run_with_long_error_msg(rescore_problem)
diff --git a/lms/djangoapps/instructor_task/views.py b/lms/djangoapps/instructor_task/views.py
index c5970645ff6..6e49dc1421d 100644
--- a/lms/djangoapps/instructor_task/views.py
+++ b/lms/djangoapps/instructor_task/views.py
@@ -32,7 +32,8 @@ def instructor_task_status(request):
       Task_id values that are unrecognized are skipped.
 
     The dict with status information for a task contains the following keys:
-      'message': status message reporting on progress, or providing exception message if failed.
+      'message': on complete tasks, status message reporting on final progress, 
+          or providing exception message if failed.
       'succeeded': on complete tasks, indicates if the task outcome was successful:
           did it achieve what it set out to do.
           This is in contrast with a successful task_state, which indicates that the
@@ -96,25 +97,44 @@ def get_task_completion_info(instructor_task):
     """
     succeeded = False
 
+    # if still in progress, then we assume there is no completion info to provide:
+    if instructor_task.task_state not in READY_STATES:
+        return (succeeded, "No status information available")
+
+    # we're more surprised if there is no output for a completed task, but just warn:
     if instructor_task.task_output is None:
         log.warning("No task_output information found for instructor_task {0}".format(instructor_task.task_id))
         return (succeeded, "No status information available")
 
-    task_output = json.loads(instructor_task.task_output)
-    if instructor_task.task_state in [FAILURE, REVOKED]:
-        return(succeeded, task_output['message'])
+    try:
+        task_output = json.loads(instructor_task.task_output)
+    except ValueError:
+        fmt = "No parsable task_output information found for instructor_task {0}: {1}"
+        log.warning(fmt.format(instructor_task.task_id, instructor_task.task_output))
+        return (succeeded, "No parsable status information available")
 
-    action_name = task_output['action_name']
-    num_attempted = task_output['attempted']
-    num_updated = task_output['updated']
-    num_total = task_output['total']
+    if instructor_task.task_state in [FAILURE, REVOKED]:
+        return (succeeded, task_output.get('message', 'No message provided'))
+
+    if any([key not in task_output for key in ['action_name', 'attempted', 'updated', 'total']]):
+        fmt = "Invalid task_output information found for instructor_task {0}: {1}"
+        log.warning(fmt.format(instructor_task.task_id, instructor_task.task_output))
+        return (succeeded, "No progress status information available")
+
+    action_name = task_output.get('action_name')
+    num_attempted = task_output.get('attempted')
+    num_updated = task_output.get('updated')
+    num_total = task_output.get('total')
+
+    student = None
+    try:
+        task_input = json.loads(instructor_task.task_input)
+    except ValueError:
+        fmt = "No parsable task_input information found for instructor_task {0}: {1}"
+        log.warning(fmt.format(instructor_task.task_id, instructor_task.task_input))
+    else:
+        student = task_input.get('student')
 
-    if instructor_task.task_input is None:
-        log.warning("No task_input information found for instructor_task {0}".format(instructor_task.task_id))
-        return (succeeded, "No status information available")
-    task_input = json.loads(instructor_task.task_input)
-    problem_url = task_input.get('problem_url')
-    student = task_input.get('student')
     if student is not None:
         if num_attempted == 0:
             msg_format = "Unable to find submission to be {action} for student '{student}'"
@@ -133,10 +153,11 @@ def get_task_completion_info(instructor_task):
     else:  # num_updated < num_attempted
         msg_format = "Problem {action} for {updated} of {attempted} students"
 
-    if student is not None and num_attempted != num_total:
+    if student is None and num_attempted != num_total:
         msg_format += " (out of {total})"
 
     # Update status in task result object itself:
-    message = msg_format.format(action=action_name, updated=num_updated, attempted=num_attempted, total=num_total,
-                                student=student, problem=problem_url)
+    message = msg_format.format(action=action_name, updated=num_updated,
+                                attempted=num_attempted, total=num_total,
+                                student=student)
     return (succeeded, message)
diff --git a/lms/envs/test.py b/lms/envs/test.py
index 5342d81a4ed..3ccfa240144 100644
--- a/lms/envs/test.py
+++ b/lms/envs/test.py
@@ -188,4 +188,3 @@ PASSWORD_HASHERS = (
     'django.contrib.auth.hashers.MD5PasswordHasher',
     # 'django.contrib.auth.hashers.CryptPasswordHasher',
 )
-
-- 
GitLab