Skip to content
Snippets Groups Projects
Commit 58bacb4e authored by Brian Wilson's avatar Brian Wilson
Browse files

Rename some constants, and refactor bulk email task flow.

parent 03b4330c
No related merge requests found
......@@ -132,10 +132,57 @@ def _get_course_email_context(course):
return email_context
def _generate_subtasks(create_subtask_fcn, recipient_qset):
"""
Generates a list of subtasks to send email to a given set of recipients.
Arguments:
`create_subtask_fcn` : a function whose inputs are a list of recipients and a subtask_id
to assign to the new subtask. Returns the subtask that will send email to that
list of recipients.
`recipient_qset` : a query set that defines the recipients who should receive emails.
Returns: a tuple, containing:
* A list of subtasks that will send emails to all recipients.
* A list of subtask_ids corresponding to those subtasks.
* A count of the total number of emails being sent.
"""
total_num_emails = recipient_qset.count()
num_queries = int(math.ceil(float(total_num_emails) / float(settings.BULK_EMAIL_EMAILS_PER_QUERY)))
last_pk = recipient_qset[0].pk - 1
num_emails_queued = 0
task_list = []
subtask_id_list = []
for _ in range(num_queries):
recipient_sublist = list(recipient_qset.order_by('pk').filter(pk__gt=last_pk).values('profile__name', 'email', 'pk')[:settings.BULK_EMAIL_EMAILS_PER_QUERY])
last_pk = recipient_sublist[-1]['pk']
num_emails_this_query = len(recipient_sublist)
num_tasks_this_query = int(math.ceil(float(num_emails_this_query) / float(settings.BULK_EMAIL_EMAILS_PER_TASK)))
chunk = int(math.ceil(float(num_emails_this_query) / float(num_tasks_this_query)))
for i in range(num_tasks_this_query):
to_list = recipient_sublist[i * chunk:i * chunk + chunk]
subtask_id = str(uuid4())
subtask_id_list.append(subtask_id)
new_subtask = create_subtask_fcn(to_list, subtask_id)
task_list.append(new_subtask)
num_emails_queued += num_emails_this_query
# Sanity check: we expect the chunking to be properly summing to the original count:
if num_emails_queued != total_num_emails:
error_msg = "Task {}: number of emails generated by chunking {} not equal to original total {}".format(num_emails_queued, total_num_emails)
log.error(error_msg)
raise ValueError(error_msg)
return task_list, subtask_id_list, total_num_emails
def perform_delegate_email_batches(entry_id, course_id, task_input, action_name):
"""
Delegates emails by querying for the list of recipients who should
get the mail, chopping up into batches of settings.EMAILS_PER_TASK size,
get the mail, chopping up into batches of settings.BULK_EMAIL_EMAILS_PER_TASK size,
and queueing up worker jobs.
Returns the number of batches (workers) kicked off.
......@@ -151,86 +198,62 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
format_msg = "Course id conflict: explicit value {} does not match task value {}"
raise ValueError(format_msg.format(course_id, entry.course_id))
# Fetch the CourseEmail.
email_id = task_input['email_id']
try:
email_obj = CourseEmail.objects.get(id=email_id)
except CourseEmail.DoesNotExist as exc:
except CourseEmail.DoesNotExist:
# The CourseEmail object should be committed in the view function before the task
# is submitted and reaches this point.
log.warning("Task %s: Failed to get CourseEmail with id %s", task_id, email_id)
raise
to_option = email_obj.to_option
# Sanity check that course for email_obj matches that of the task referencing it.
if course_id != email_obj.course_id:
format_msg = "Course id conflict: explicit value {} does not match email value {}"
raise ValueError(format_msg.format(course_id, email_obj.course_id))
# Fetch the course object.
try:
course = get_course(course_id)
except ValueError:
log.exception("Task %s: course not found: %s", task_id, course_id)
raise
global_email_context = _get_course_email_context(course)
to_option = email_obj.to_option
recipient_qset = _get_recipient_queryset(user_id, to_option, course_id, course.location)
total_num_emails = recipient_qset.count()
log.info("Task %s: Preparing to queue emails to %d recipient(s) for course %s, email %s, to_option %s",
task_id, total_num_emails, course_id, email_id, to_option)
num_queries = int(math.ceil(float(total_num_emails) / float(settings.EMAILS_PER_QUERY)))
last_pk = recipient_qset[0].pk - 1
num_emails_queued = 0
task_list = []
subtask_id_list = []
for _ in range(num_queries):
recipient_sublist = list(recipient_qset.order_by('pk').filter(pk__gt=last_pk)
.values('profile__name', 'email', 'pk')[:settings.EMAILS_PER_QUERY])
last_pk = recipient_sublist[-1]['pk']
num_emails_this_query = len(recipient_sublist)
num_tasks_this_query = int(math.ceil(float(num_emails_this_query) / float(settings.EMAILS_PER_TASK)))
chunk = int(math.ceil(float(num_emails_this_query) / float(num_tasks_this_query)))
for i in range(num_tasks_this_query):
to_list = recipient_sublist[i * chunk:i * chunk + chunk]
subtask_id = str(uuid4())
subtask_id_list.append(subtask_id)
subtask_status = create_subtask_status(subtask_id)
# Create subtask, passing args and kwargs.
# This includes specifying the task_id to use, so we can track it.
# Specify the routing key as part of it, which is used by
# Celery to route the task request to the right worker.
new_subtask = send_course_email.subtask(
(
entry_id,
email_id,
to_list,
global_email_context,
subtask_status,
),
task_id=subtask_id,
routing_key=settings.BULK_EMAIL_ROUTING_KEY,
)
task_list.append(new_subtask)
num_emails_queued += num_emails_this_query
global_email_context = _get_course_email_context(course)
# Sanity check: we expect the chunking to be properly summing to the original count:
if num_emails_queued != total_num_emails:
error_msg = "Task {}: number of emails generated by chunking {} not equal to original total {}".format(
task_id, num_emails_queued, total_num_emails
def _create_send_email_subtask(to_list, subtask_id):
"""Creates a subtask to send email to a given recipient list."""
subtask_status = create_subtask_status(subtask_id)
new_subtask = send_course_email.subtask(
(
entry_id,
email_id,
to_list,
global_email_context,
subtask_status,
),
task_id=subtask_id,
routing_key=settings.BULK_EMAIL_ROUTING_KEY,
)
log.error(error_msg)
raise Exception(error_msg)
return new_subtask
log.info("Task %s: Preparing to generate subtasks for course %s, email %s, to_option %s",
task_id, course_id, email_id, to_option)
task_list, subtask_id_list, total_num_emails = _generate_subtasks(_create_send_email_subtask, recipient_qset)
# Update the InstructorTask with information about the subtasks we've defined.
log.info("Task %s: Preparing to update task for sending %d emails for course %s, email %s, to_option %s",
task_id, total_num_emails, course_id, email_id, to_option)
progress = initialize_subtask_info(entry, action_name, total_num_emails, subtask_id_list)
num_subtasks = len(subtask_id_list)
log.info("Preparing to queue %d email tasks (%d emails) for course %s, email %s, to %s",
num_subtasks, total_num_emails, course_id, email_id, to_option)
# Now group the subtasks, and start them running. This allows all the subtasks
# in the list to be submitted at the same time.
log.info("Task %s: Preparing to queue %d email tasks (%d emails) for course %s, email %s, to %s",
task_id, num_subtasks, total_num_emails, course_id, email_id, to_option)
task_group = group(task_list)
task_group.apply_async(routing_key=settings.BULK_EMAIL_ROUTING_KEY)
......@@ -328,6 +351,49 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
return new_subtask_status
def _filter_optouts_from_recipients(to_list, course_id):
"""
Filters a recipient list based on student opt-outs for a given course.
Returns the filtered recipient list, as well as the number of optouts
removed from the list.
"""
optouts = Optout.objects.filter(
course_id=course_id,
user__in=[i['pk'] for i in to_list]
).values_list('user__email', flat=True)
optouts = set(optouts)
# Only count the num_optout for the first time the optouts are calculated.
# We assume that the number will not change on retries, and so we don't need
# to calculate it each time.
num_optout = len(optouts)
to_list = [recipient for recipient in to_list if recipient['email'] not in optouts]
return to_list, num_optout
def _get_source_address(course_id, course_title):
"""
Calculates an email address to be used as the 'from-address' for sent emails.
Makes a unique from name and address for each course, e.g.
"COURSE_TITLE" Course Staff <coursenum-no-reply@courseupdates.edx.org>
"""
course_title_no_quotes = re.sub(r'"', '', course_title)
# The course_id is assumed to be in the form 'org/course_num/run',
# so pull out the course_num. Then make sure that it can be used
# in an email address, by substituting a '_' anywhere a non-(ascii, period, or dash)
# character appears.
course_num = course_id.split('/')[1]
INVALID_CHARS = re.compile(r"[^\w.-]")
course_num = INVALID_CHARS.sub('_', course_num)
from_addr = '"{0}" Course Staff <{1}-{2}>'.format(course_title_no_quotes, course_num, settings.BULK_EMAIL_DEFAULT_FROM_EMAIL)
return from_addr
def _send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status):
"""
Performs the email sending task.
......@@ -371,9 +437,6 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
# Get information from current task's request:
task_id = subtask_status['task_id']
# If this is a second attempt due to rate-limits, then throttle the speed at which mail is sent:
throttle = subtask_status['retried_nomax'] > 0
# collect stats on progress:
num_optout = 0
num_sent = 0
......@@ -392,30 +455,11 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
# that existed at that time, and we don't need to keep checking for changes
# in the Optout list.
if (subtask_status['retried_nomax'] + subtask_status['retried_withmax']) == 0:
optouts = (Optout.objects.filter(course_id=course_email.course_id,
user__in=[i['pk'] for i in to_list])
.values_list('user__email', flat=True))
optouts = set(optouts)
# Only count the num_optout for the first time the optouts are calculated.
# We assume that the number will not change on retries, and so we don't need
# to calculate it each time.
num_optout = len(optouts)
to_list = [recipient for recipient in to_list if recipient['email'] not in optouts]
to_list, num_optout = _filter_optouts_from_recipients(to_list, course_email.course_id)
course_title = global_email_context['course_title']
subject = "[" + course_title + "] " + course_email.subject
course_title_no_quotes = re.sub(r'"', '', course_title)
course_num = course_email.course_id.split('/')[1] # course_id = 'org/course_num/run'
# Substitute a '_' anywhere a non-(ascii, period, or dash) character appears.
INVALID_CHARS = re.compile(r"[^\w.-]")
course_num = INVALID_CHARS.sub('_', course_num)
# Make a unique from name and address for each course, eg
# "COURSE_TITLE" Course Staff <coursenum-no-reply@courseupdates.edx.org>
from_addr = '"{0}" Course Staff <{1}-{2}>'.format(
course_title_no_quotes, course_num, settings.DEFAULT_BULK_FROM_EMAIL
)
from_addr = _get_source_address(course_email.course_id, course_title)
course_email_template = CourseEmailTemplate.get_template()
try:
......@@ -423,17 +467,19 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
connection.open()
# Define context values to use in all course emails:
email_context = {
'name': '',
'email': ''
}
email_context = {'name': '', 'email': ''}
email_context.update(global_email_context)
while to_list:
# Update context with user-specific values from the user at the end of the list:
email = to_list[-1]['email']
# Update context with user-specific values from the user at the end of the list.
# At the end of processing this user, they will be popped off of the to_list.
# That way, the to_list will always contain the recipients remaining to be emailed.
# This is convenient for retries, which will need to send to those who haven't
# yet been emailed, but not send to those who have already been sent to.
current_recipient = to_list[-1]
email = current_recipient['email']
email_context['email'] = email
email_context['name'] = to_list[-1]['profile__name']
email_context['name'] = current_recipient['profile__name']
# Construct message content using templates and context:
plaintext_msg = course_email_template.render_plaintext(course_email.text_message, email_context)
......@@ -454,7 +500,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
# for a period of time between all emails within this task. Choice of
# the value depends on the number of workers that might be sending email in
# parallel, and what the SES throttle rate is.
if throttle:
if subtask_status['retried_nomax'] > 0:
sleep(settings.BULK_EMAIL_RETRY_DELAY_BETWEEN_SENDS)
try:
......@@ -488,7 +534,9 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
log.debug('Email with id %s sent to %s', email_id, email)
num_sent += 1
# Pop the user that was emailed off the end of the list:
# Pop the user that was emailed off the end of the list only once they have
# successfully been processed. (That way, if there were a failure that
# needed to be retried, the user is still on the list.)
to_list.pop()
except INFINITE_RETRY_ERRORS as exc:
......
......@@ -243,7 +243,7 @@ class TestEmailSendFromDashboard(ModuleStoreTestCase):
[self.instructor.email] + [s.email for s in self.staff] + [s.email for s in self.students]
)
@override_settings(EMAILS_PER_TASK=3, EMAILS_PER_QUERY=7)
@override_settings(BULK_EMAIL_EMAILS_PER_TASK=3, BULK_EMAIL_EMAILS_PER_QUERY=7)
@patch('bulk_email.tasks.increment_subtask_status')
def test_chunked_queries_send_numerous_emails(self, email_mock):
"""
......
......@@ -76,7 +76,7 @@ class TestEmailErrors(ModuleStoreTestCase):
# have every fourth email fail due to blacklisting:
get_conn.return_value.send_messages.side_effect = cycle([SMTPDataError(554, "Email address is blacklisted"),
None, None, None])
students = [UserFactory() for _ in xrange(settings.EMAILS_PER_TASK)]
students = [UserFactory() for _ in xrange(settings.BULK_EMAIL_EMAILS_PER_TASK)]
for student in students:
CourseEnrollmentFactory.create(user=student, course_id=self.course.id)
......@@ -93,9 +93,9 @@ class TestEmailErrors(ModuleStoreTestCase):
# Test that after the rejected email, the rest still successfully send
((_initial_results), kwargs) = result.call_args
self.assertEquals(kwargs['skipped'], 0)
expected_fails = int((settings.EMAILS_PER_TASK + 3) / 4.0)
expected_fails = int((settings.BULK_EMAIL_EMAILS_PER_TASK + 3) / 4.0)
self.assertEquals(kwargs['failed'], expected_fails)
self.assertEquals(kwargs['succeeded'], settings.EMAILS_PER_TASK - expected_fails)
self.assertEquals(kwargs['succeeded'], settings.BULK_EMAIL_EMAILS_PER_TASK - expected_fails)
@patch('bulk_email.tasks.get_connection', autospec=True)
@patch('bulk_email.tasks.send_course_email.retry')
......
......@@ -189,7 +189,7 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
def test_successful(self):
# Select number of emails to fit into a single subtask.
num_emails = settings.EMAILS_PER_TASK
num_emails = settings.BULK_EMAIL_EMAILS_PER_TASK
# We also send email to the instructor:
self._create_students(num_emails - 1)
with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
......@@ -198,7 +198,7 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
def test_unactivated_user(self):
# Select number of emails to fit into a single subtask.
num_emails = settings.EMAILS_PER_TASK
num_emails = settings.BULK_EMAIL_EMAILS_PER_TASK
# We also send email to the instructor:
students = self._create_students(num_emails - 1)
# mark a student as not yet having activated their email:
......@@ -211,7 +211,7 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
def test_skipped(self):
# Select number of emails to fit into a single subtask.
num_emails = settings.EMAILS_PER_TASK
num_emails = settings.BULK_EMAIL_EMAILS_PER_TASK
# We also send email to the instructor:
students = self._create_students(num_emails - 1)
# have every fourth student optout:
......@@ -227,7 +227,7 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
def _test_email_address_failures(self, exception):
"""Test that celery handles bad address errors by failing and not retrying."""
# Select number of emails to fit into a single subtask.
num_emails = settings.EMAILS_PER_TASK
num_emails = settings.BULK_EMAIL_EMAILS_PER_TASK
# We also send email to the instructor:
self._create_students(num_emails - 1)
expected_fails = int((num_emails + 3) / 4.0)
......
......@@ -141,9 +141,9 @@ PAID_COURSE_REGISTRATION_CURRENCY = ENV_TOKENS.get('PAID_COURSE_REGISTRATION_CUR
PAID_COURSE_REGISTRATION_CURRENCY)
# Bulk Email overrides
DEFAULT_BULK_FROM_EMAIL = ENV_TOKENS.get('DEFAULT_BULK_FROM_EMAIL', DEFAULT_BULK_FROM_EMAIL)
EMAILS_PER_TASK = ENV_TOKENS.get('EMAILS_PER_TASK', EMAILS_PER_TASK)
EMAILS_PER_QUERY = ENV_TOKENS.get('EMAILS_PER_QUERY', EMAILS_PER_QUERY)
BULK_EMAIL_DEFAULT_FROM_EMAIL = ENV_TOKENS.get('BULK_EMAIL_DEFAULT_FROM_EMAIL', BULK_EMAIL_DEFAULT_FROM_EMAIL)
BULK_EMAIL_EMAILS_PER_TASK = ENV_TOKENS.get('BULK_EMAIL_EMAILS_PER_TASK', BULK_EMAIL_EMAILS_PER_TASK)
BULK_EMAIL_EMAILS_PER_QUERY = ENV_TOKENS.get('BULK_EMAIL_EMAILS_PER_QUERY', BULK_EMAIL_EMAILS_PER_QUERY)
BULK_EMAIL_DEFAULT_RETRY_DELAY = ENV_TOKENS.get('BULK_EMAIL_DEFAULT_RETRY_DELAY', BULK_EMAIL_DEFAULT_RETRY_DELAY)
BULK_EMAIL_MAX_RETRIES = ENV_TOKENS.get('BULK_EMAIL_MAX_RETRIES', BULK_EMAIL_MAX_RETRIES)
BULK_EMAIL_INFINITE_RETRY_CAP = ENV_TOKENS.get('BULK_EMAIL_INFINITE_RETRY_CAP', BULK_EMAIL_INFINITE_RETRY_CAP)
......@@ -151,6 +151,7 @@ BULK_EMAIL_LOG_SENT_EMAILS = ENV_TOKENS.get('BULK_EMAIL_LOG_SENT_EMAILS', BULK_E
BULK_EMAIL_RETRY_DELAY_BETWEEN_SENDS = ENV_TOKENS.get('BULK_EMAIL_RETRY_DELAY_BETWEEN_SENDS', BULK_EMAIL_RETRY_DELAY_BETWEEN_SENDS)
# We want Bulk Email running on the high-priority queue, so we define the
# routing key that points to it. At the moment, the name is the same.
# We have to reset the value here, since we have changed the value of the queue name.
BULK_EMAIL_ROUTING_KEY = HIGH_PRIORITY_QUEUE
# Theme overrides
......
......@@ -815,9 +815,13 @@ CELERYD_HIJACK_ROOT_LOGGER = False
################################ Bulk Email ###################################
DEFAULT_BULK_FROM_EMAIL = 'no-reply@courseupdates.edx.org'
EMAILS_PER_TASK = 100
EMAILS_PER_QUERY = 1000
# Suffix used to construct 'from' email address for bulk emails.
# A course-specific identifier is prepended.
BULK_EMAIL_DEFAULT_FROM_EMAIL = 'no-reply@courseupdates.edx.org'
# Parameters for breaking down course enrollment into subtasks.
BULK_EMAIL_EMAILS_PER_TASK = 100
BULK_EMAIL_EMAILS_PER_QUERY = 1000
# Initial delay used for retrying tasks. Additional retries use
# longer delays. Value is in seconds.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment