Skip to content
Snippets Groups Projects
Unverified Commit 564ffcfb authored by Farhanah Sheets's avatar Farhanah Sheets Committed by GitHub
Browse files

Merge pull request #23528 from edx/revert-23267-saad/PROD-1287

Revert "[PROD-1287] - initial work to optimise problem_grade_report code."
parents 22c963a9 a458491f
No related merge requests found
......@@ -2,6 +2,7 @@
Functionality for generating grade reports.
"""
import logging
import re
from collections import OrderedDict, defaultdict
......@@ -68,95 +69,6 @@ def _flatten(iterable):
return list(chain.from_iterable(iterable))
class GradeReportBase(object):
"""
Base class for grade reports (ProblemGradeReport and CourseGradeReport).
"""
def _handle_empty_generator(self, generator, default):
"""
Handle empty generator.
Return default if the generator is emtpy, otherwise return all
its iterations (including the first which was used for validation).
"""
empty_generator_sentinel = object()
first_iteration_output = next(generator, empty_generator_sentinel)
generator_is_empty = first_iteration_output == empty_generator_sentinel
if generator_is_empty:
yield default
else:
yield first_iteration_output
for element in generator:
yield element
def _batch_users(self, context):
"""
Returns a generator of batches of users.
"""
def grouper(iterable, chunk_size=100, fillvalue=None):
args = [iter(iterable)] * chunk_size
return zip_longest(*args, fillvalue=fillvalue)
def get_enrolled_learners_for_course(course_id, verified_only=False):
"""
Get all the enrolled users in a course chunk by chunk.
This generator method fetches & loads the enrolled user objects on demand which in chunk
size defined. This method is a workaround to avoid out-of-memory errors.
"""
filter_kwargs = {
'courseenrollment__course_id': course_id,
}
if verified_only:
filter_kwargs['courseenrollment__mode'] = CourseMode.VERIFIED
user_ids_list = get_user_model().objects.filter(**filter_kwargs).values_list('id', flat=True).order_by('id')
user_chunks = grouper(user_ids_list)
for user_ids in user_chunks:
user_ids = [user_id for user_id in user_ids if user_id is not None]
min_id = min(user_ids)
max_id = max(user_ids)
users = get_user_model().objects.filter(
id__gte=min_id,
id__lte=max_id,
**filter_kwargs
).select_related('profile')
yield users
course_id = context.course_id
report_for_verified_only = generate_grade_report_for_verified_only()
return get_enrolled_learners_for_course(course_id=course_id, verified_only=report_for_verified_only)
def _compile(self, context, batched_rows):
"""
Compiles and returns the complete list of (success_rows, error_rows) for
the given batched_rows and context.
"""
# partition and chain successes and errors
success_rows, error_rows = zip(*batched_rows)
success_rows = list(chain(*success_rows))
error_rows = list(chain(*error_rows))
# update metrics on task status
context.task_progress.succeeded = len(success_rows)
context.task_progress.failed = len(error_rows)
context.task_progress.attempted = context.task_progress.succeeded + context.task_progress.failed
context.task_progress.total = context.task_progress.attempted
return success_rows, error_rows
def _upload(self, context, success_rows, error_rows):
"""
Creates and uploads a CSV for the given headers and rows.
"""
date = datetime.now(UTC)
upload_csv_to_report_store(success_rows, context.file_name, context.course_id, date)
if len(error_rows) > 1:
upload_csv_to_report_store(error_rows, context.file_name + '_err', context.course_id, date)
class _CourseGradeReportContext(object):
"""
Internal class that provides a common context to use for a single grade
......@@ -164,7 +76,6 @@ class _CourseGradeReportContext(object):
elements of this context are serialized and parsed across process
boundaries.
"""
def __init__(self, _xmodule_instance_args, _entry_id, course_id, _task_input, action_name):
self.task_info_string = (
u'Task: {task_id}, '
......@@ -245,41 +156,6 @@ class _CourseGradeReportContext(object):
return self.task_progress.update_task_state(extra_meta={'step': message})
class _ProblemGradeReportContext(object):
"""
Internal class that provides a common context to use for a single problem
grade report. When a report is parallelized across multiple processes,
elements of this context are serialized and parsed across process
boundaries.
"""
def __init__(self, _xmodule_instance_args, _entry_id, course_id, _task_input, action_name):
self.task_info_string = (
'Task: {task_id}, '
'InstructorTask ID: {entry_id}, '
'Course: {course_id}, '
'Input: {task_input}'
).format(
task_id=_xmodule_instance_args.get('task_id') if _xmodule_instance_args is not None else None,
entry_id=_entry_id,
course_id=course_id,
task_input=_task_input,
)
self.action_name = action_name
self.course_id = course_id
self.task_progress = TaskProgress(self.action_name, total=None, start_time=time())
self.course = get_course_by_id(self.course_id)
self.file_name = 'problem_grade_report'
def update_status(self, message):
"""
Updates the status on the celery task to the given message.
Also logs the update.
"""
TASK_LOG.info('%s, Task type: %s, %s', self.task_info_string, self.action_name, message)
return self.task_progress.update_task_state(extra_meta={'step': message})
class _CertificateBulkContext(object):
def __init__(self, context, users):
certificate_whitelist = CertificateWhitelist.objects.filter(course_id=context.course_id, whitelist=True)
......@@ -426,7 +302,6 @@ class CourseGradeReport(object):
"""
Returns a generator of batches of users.
"""
def grouper(iterable, chunk_size=self.USER_BATCH_SIZE, fillvalue=None):
args = [iter(iterable)] * chunk_size
return zip_longest(*args, fillvalue=fillvalue)
......@@ -644,139 +519,131 @@ class CourseGradeReport(object):
return success_rows, error_rows
class ProblemGradeReport(GradeReportBase):
"""
Class to encapsulate functionality related to generating Problem Grade Reports.
"""
class ProblemGradeReport(object):
@classmethod
def generate(cls, _xmodule_instance_args, _entry_id, course_id, _task_input, action_name):
"""
Public method to generate a grade report.
"""
with modulestore().bulk_operations(course_id):
context = _ProblemGradeReportContext(_xmodule_instance_args, _entry_id, course_id, _task_input, action_name)
# pylint: disable=protected-access
return ProblemGradeReport()._generate(context)
def _generate(self, context):
"""
Generate a CSV containing all students' problem grades within a given
`course_id`.
"""
context.update_status('ProblemGradeReport Step 1: Starting problem grades')
course = get_course_by_id(context.course_id)
header_row = OrderedDict([('id', 'Student ID'), ('email', 'Email'), ('username', 'Username')])
context.update_status('ProblemGradeReport Step 2: Retrieving scorable grade blocks for course')
graded_scorable_blocks = self._graded_scorable_blocks_to_header(course)
context.update_status('ProblemGradeReport Step 3: Setting up headers for problem grade report')
success_headers = self._success_headers(header_row, graded_scorable_blocks)
error_headers = self._error_headers(header_row)
context.update_status('ProblemGradeReport Step 4: Retrieving problem scores for course for enrolled users')
generated_rows = self._batched_rows(context, header_row, graded_scorable_blocks)
success_rows, error_rows = self._compile(context, generated_rows)
context.update_status('ProblemGradeReport Step 5: Uploading data to CSV report file')
self._upload(context, [success_headers] + success_rows, [error_headers] + error_rows)
return context.update_status('ProblemGradeReport Step 6: Completed problem grades report')
def log_task_info(message):
"""
Updates the status on the celery task to the given message.
Also logs the update.
"""
fmt = u'Task: {task_id}, InstructorTask ID: {entry_id}, Course: {course_id}, Input: {task_input}'
task_info_string = fmt.format(
task_id=task_id, entry_id=_entry_id, course_id=course_id, task_input=_task_input
)
TASK_LOG.info(u'%s, Task type: %s, %s, %s', task_info_string, action_name, message, task_progress.state)
def _success_headers(self, header_row, graded_scorable_blocks):
"""
Returns headers for all gradable blocks including fixed headers
for report.
start_time = time()
start_date = datetime.now(UTC)
status_interval = 100
task_id = _xmodule_instance_args.get('task_id') if _xmodule_instance_args is not None else None
Arguments:
header_row (dict): list of header values
graded_scorable_blocks (dict): dict of graded_scorable_blocks
report_for_verified_only = generate_grade_report_for_verified_only()
enrolled_students = CourseEnrollment.objects.users_enrolled_in(
course_id=course_id,
include_inactive=True,
verified_only=report_for_verified_only,
)
task_progress = TaskProgress(action_name, enrolled_students.count(), start_time)
Returns:
list: combined header and scorable blocks
"""
header_row = list(header_row.values()) + ['Enrollment Status', 'Grade']
return header_row + _flatten(list(graded_scorable_blocks.values()))
# This struct encapsulates both the display names of each static item in the
# header row as values as well as the django User field names of those items
# as the keys. It is structured in this way to keep the values related.
header_row = OrderedDict([('id', 'Student ID'), ('email', 'Email'), ('username', 'Username')])
def _error_headers(self, header_row):
"""
Returns error headers for error report.
course = get_course_by_id(course_id)
log_task_info(u'Retrieving graded scorable blocks')
graded_scorable_blocks = cls._graded_scorable_blocks_to_header(course)
Arguments:
header_row (dict): list of header values
# Just generate the static fields for now.
rows = [
list(header_row.values()) + ['Enrollment Status', 'Grade'] + _flatten(list(graded_scorable_blocks.values()))
]
error_rows = [list(header_row.values()) + ['error_msg']]
Returns:
list: error headers
"""
return list(header_row.values()) + ['error_msg']
# Bulk fetch and cache enrollment states so we can efficiently determine
# whether each user is currently enrolled in the course.
log_task_info(u'Fetching enrollment status')
CourseEnrollment.bulk_fetch_enrollment_states(enrolled_students, course_id)
def _graded_scorable_blocks_to_header(self, course):
"""
Returns an OrderedDict that maps a scorable block's id to its
headers in the final report.
"""
scorable_blocks_map = OrderedDict()
grading_context = grades_context.grading_context_for_course(course)
for assignment_type_name, subsection_infos in six.iteritems(grading_context['all_graded_subsections_by_type']):
for subsection_index, subsection_info in enumerate(subsection_infos, start=1):
for scorable_block in subsection_info['scored_descendants']:
header_name = (
"{assignment_type} {subsection_index}: "
"{subsection_name} - {scorable_block_name}"
).format(
scorable_block_name=scorable_block.display_name,
assignment_type=assignment_type_name,
subsection_index=subsection_index,
subsection_name=subsection_info['subsection_block'].display_name,
)
scorable_blocks_map[scorable_block.location] = [header_name + " (Earned)",
header_name + " (Possible)"]
return scorable_blocks_map
def _rows_for_users(self, context, users, header_row, graded_scorable_blocks):
"""
Returns a list of rows for the given users for this report.
"""
success_rows, error_rows = [], []
for student, course_grade, error in CourseGradeFactory().iter(users, context.course):
for student, course_grade, error in CourseGradeFactory().iter(enrolled_students, course):
student_fields = [getattr(student, field_name) for field_name in header_row]
task_progress.attempted += 1
if not course_grade:
err_msg = text_type(error)
# There was an error grading this student.
if not err_msg:
err_msg = 'Unknown error'
err_msg = u'Unknown error'
error_rows.append(student_fields + [err_msg])
task_progress.failed += 1
continue
enrollment_status = _user_enrollment_status(student, context.course_id)
enrollment_status = _user_enrollment_status(student, course_id)
earned_possible_values = []
for block_location in graded_scorable_blocks:
try:
problem_score = course_grade.problem_scores[block_location]
except KeyError:
earned_possible_values.append(['Not Available', 'Not Available'])
earned_possible_values.append([u'Not Available', u'Not Available'])
else:
if problem_score.first_attempted:
earned_possible_values.append([problem_score.earned, problem_score.possible])
else:
earned_possible_values.append(['Not Attempted', problem_score.possible])
earned_possible_values.append([u'Not Attempted', problem_score.possible])
rows.append(student_fields + [enrollment_status, course_grade.percent] + _flatten(earned_possible_values))
task_progress.succeeded += 1
if task_progress.attempted % status_interval == 0:
step = u'Calculating Grades'
task_progress.update_task_state(extra_meta={'step': step})
log_message = u'{0} {1}/{2}'.format(step, task_progress.attempted, task_progress.total)
log_task_info(log_message)
log_task_info('Uploading CSV to store')
# Perform the upload if any students have been successfully graded
if len(rows) > 1:
upload_csv_to_report_store(rows, 'problem_grade_report', course_id, start_date)
# If there are any error rows, write them out as well
if len(error_rows) > 1:
upload_csv_to_report_store(error_rows, 'problem_grade_report_err', course_id, start_date)
success_rows.append(
student_fields + [enrollment_status, course_grade.percent] + _flatten(earned_possible_values))
return task_progress.update_task_state(extra_meta={'step': 'Uploading CSV'})
return success_rows, error_rows
def _batched_rows(self, context, header_row, graded_scorable_blocks):
@classmethod
def _graded_scorable_blocks_to_header(cls, course):
"""
A generator of batches of (success_rows, error_rows) for this report.
Returns an OrderedDict that maps a scorable block's id to its
headers in the final report.
"""
for users in self._handle_empty_generator(self._batch_users(context), default=[]):
yield self._rows_for_users(context, users, header_row, graded_scorable_blocks)
scorable_blocks_map = OrderedDict()
grading_context = grades_context.grading_context_for_course(course)
for assignment_type_name, subsection_infos in six.iteritems(grading_context['all_graded_subsections_by_type']):
for subsection_index, subsection_info in enumerate(subsection_infos, start=1):
for scorable_block in subsection_info['scored_descendants']:
header_name = (
u"{assignment_type} {subsection_index}: "
u"{subsection_name} - {scorable_block_name}"
).format(
scorable_block_name=scorable_block.display_name,
assignment_type=assignment_type_name,
subsection_index=subsection_index,
subsection_name=subsection_info['subsection_block'].display_name,
)
scorable_blocks_map[scorable_block.location] = [header_name + " (Earned)",
header_name + " (Possible)"]
return scorable_blocks_map
class ProblemResponses(object):
"""
Class to encapsulate functionality related to generating Problem Responses Reports.
"""
@classmethod
def _build_problem_list(cls, course_blocks, root, path=None):
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment