From c1fe3c3a93eecaca1fcd70af2c4c769cf6d43eb5 Mon Sep 17 00:00:00 2001 From: Muhammad Soban Javed <58461728+iamsobanjaved@users.noreply.github.com> Date: Mon, 9 Nov 2020 23:43:47 +0500 Subject: [PATCH] Revert ""Update routing config" (#25536)" (#25549) This reverts commit 39a22734c1228fade76bf7766e61a2a6e7ca0f68. --- cms/celery.py | 34 +++++++-- cms/djangoapps/contentstore/tasks.py | 2 +- cms/envs/production.py | 19 +---- common/djangoapps/entitlements/tasks.py | 6 +- lms/celery.py | 16 ++-- lms/djangoapps/bulk_email/tasks.py | 3 + lms/djangoapps/discussion/tasks.py | 3 +- lms/djangoapps/email_marketing/tasks.py | 9 ++- .../management/commands/compute_grades.py | 2 +- .../commands/tests/test_compute_grades.py | 8 +- lms/djangoapps/grades/tasks.py | 8 +- lms/djangoapps/instructor_task/tasks.py | 7 +- lms/djangoapps/verify_student/tasks.py | 5 +- lms/envs/production.py | 61 +--------------- openedx/core/djangoapps/coursegraph/tasks.py | 2 +- .../djangoapps/credentials/tasks/v1/tasks.py | 7 +- openedx/core/djangoapps/heartbeat/tasks.py | 3 +- openedx/core/djangoapps/programs/tasks.py | 11 ++- openedx/core/djangoapps/schedules/tasks.py | 6 +- openedx/core/lib/celery/routers.py | 73 ++++++++++++------- 20 files changed, 139 insertions(+), 146 deletions(-) diff --git a/cms/celery.py b/cms/celery.py index 5e32a82c017..046760436bf 100644 --- a/cms/celery.py +++ b/cms/celery.py @@ -10,7 +10,7 @@ import os from celery import Celery -from openedx.core.lib.celery.routers import route_task_queue +from openedx.core.lib.celery.routers import AlternateEnvironmentRouter # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'cms.envs.production') @@ -23,12 +23,34 @@ APP.conf.task_protocol = 1 APP.config_from_object('django.conf:settings') APP.autodiscover_tasks() +# Import after autodiscovery has had a chance to connect to the import_module signal +# so celery doesn't miss any apps getting installed. +from django.conf import settings # pylint: disable=wrong-import-position,wrong-import-order -def route_task(name, args, kwargs, options, task=None, **kw): # pylint: disable=unused-argument - """ - Celery-defined method allowing for custom routing logic. - If None is returned from this method, default routing logic is used. +class Router(AlternateEnvironmentRouter): + """ + An implementation of AlternateEnvironmentRouter, for routing tasks to non-cms queues. """ - return route_task_queue(name) + @property + def alternate_env_tasks(self): + """ + Defines alternate environment tasks, as a dict of form { task_name: alternate_queue } + """ + # The tasks below will be routed to the default lms queue. + return { + 'completion_aggregator.tasks.update_aggregators': 'lms', + 'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache': 'lms', + 'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache_v2': 'lms', + } + + @property + def explicit_queues(self): + """ + Defines specific queues for tasks to run in (typically outside of the cms environment), + as a dict of form { task_name: queue_name }. + """ + return { + 'lms.djangoapps.grades.tasks.compute_all_grades_for_course': settings.POLICY_CHANGE_GRADES_ROUTING_KEY, + } diff --git a/cms/djangoapps/contentstore/tasks.py b/cms/djangoapps/contentstore/tasks.py index 8f5d047b784..651412db91a 100644 --- a/cms/djangoapps/contentstore/tasks.py +++ b/cms/djangoapps/contentstore/tasks.py @@ -168,7 +168,7 @@ def _parse_time(time_isoformat): ).replace(tzinfo=UTC) -@task +@task(routing_key=settings.UPDATE_SEARCH_INDEX_JOB_QUEUE) def update_search_index(course_id, triggered_time_isoformat): """ Updates course search index. """ try: diff --git a/cms/envs/production.py b/cms/envs/production.py index 86d9c76bea2..ac11b296627 100644 --- a/cms/envs/production.py +++ b/cms/envs/production.py @@ -136,7 +136,7 @@ CELERY_QUEUES = { DEFAULT_PRIORITY_QUEUE: {} } -CELERY_ROUTES = "{}celery.route_task".format(QUEUE_VARIANT) +CELERY_ROUTES = "{}celery.Router".format(QUEUE_VARIANT) # STATIC_URL_BASE specifies the base url to use for static files STATIC_URL_BASE = ENV_TOKENS.get('STATIC_URL_BASE', None) @@ -567,20 +567,3 @@ if FEATURES.get('ENABLE_CORS_HEADERS'): CORS_ALLOW_HEADERS = corsheaders_default_headers + ( 'use-jwt-cookie', ) - -######################## CELERY ROTUING ######################## - -# Defines alternate environment tasks, as a dict of form { task_name: alternate_queue } -ALTERNATE_ENV_TASKS = { - 'completion_aggregator.tasks.update_aggregators': 'lms', - 'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache': 'lms', - 'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache_v2': 'lms', -} - -# Defines the task -> alternate worker queue to be used when routing. -EXPLICIT_QUEUES = { - 'lms.djangoapps.grades.tasks.compute_all_grades_for_course': { - 'queue': POLICY_CHANGE_GRADES_ROUTING_KEY}, - 'cms.djangoapps.contentstore.tasks.update_search_index': { - 'queue': UPDATE_SEARCH_INDEX_JOB_QUEUE}, -} diff --git a/common/djangoapps/entitlements/tasks.py b/common/djangoapps/entitlements/tasks.py index 9732c3acc97..2d3724ce2fc 100644 --- a/common/djangoapps/entitlements/tasks.py +++ b/common/djangoapps/entitlements/tasks.py @@ -5,11 +5,13 @@ This file contains celery tasks for entitlements-related functionality. from celery import task from celery.utils.log import get_task_logger +from django.conf import settings from entitlements.models import CourseEntitlement LOGGER = get_task_logger(__name__) - +# Under cms the following setting is not defined, leading to errors during tests. +ROUTING_KEY = getattr(settings, 'ENTITLEMENTS_EXPIRATION_ROUTING_KEY', None) # Maximum number of retries before giving up on awarding credentials. # For reference, 11 retries with exponential backoff yields a maximum waiting # time of 2047 seconds (about 30 minutes). Setting this to None could yield @@ -17,7 +19,7 @@ LOGGER = get_task_logger(__name__) MAX_RETRIES = 11 -@task(bind=True, ignore_result=True) +@task(bind=True, ignore_result=True, routing_key=ROUTING_KEY) def expire_old_entitlements(self, start, end, logid='...'): """ This task is designed to be called to process a bundle of entitlements diff --git a/lms/celery.py b/lms/celery.py index 67bf6379982..dd9a23a8cf8 100644 --- a/lms/celery.py +++ b/lms/celery.py @@ -5,11 +5,12 @@ and auto discover tasks in all installed django apps. Taken from: https://celery.readthedocs.org/en/latest/django/first-steps-with-django.html """ + import os from celery import Celery -from openedx.core.lib.celery.routers import route_task_queue +from openedx.core.lib.celery.routers import AlternateEnvironmentRouter # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'lms.envs.production') @@ -23,11 +24,14 @@ APP.config_from_object('django.conf:settings') APP.autodiscover_tasks() -def route_task(name, args, kwargs, options, task=None, **kw): # pylint: disable=unused-argument +class Router(AlternateEnvironmentRouter): """ - Celery-defined method allowing for custom routing logic. - - If None is returned from this method, default routing logic is used. + An implementation of AlternateEnvironmentRouter, for routing tasks to non-cms queues. """ - return route_task_queue(name) + @property + def alternate_env_tasks(self): + """ + Defines alternate environment tasks, as a dict of form { task_name: alternate_queue } + """ + return {} diff --git a/lms/djangoapps/bulk_email/tasks.py b/lms/djangoapps/bulk_email/tasks.py index c99127ba58e..f85c15387d6 100644 --- a/lms/djangoapps/bulk_email/tasks.py +++ b/lms/djangoapps/bulk_email/tasks.py @@ -196,6 +196,8 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) total_recipients = combined_set.count() + routing_key = settings.BULK_EMAIL_ROUTING_KEY + # Weird things happen if we allow empty querysets as input to emailing subtasks # The task appears to hang at "0 out of 0 completed" and never finishes. if total_recipients == 0: @@ -215,6 +217,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) initial_subtask_status.to_dict(), ), task_id=subtask_id, + routing_key=routing_key, ) return new_subtask diff --git a/lms/djangoapps/discussion/tasks.py b/lms/djangoapps/discussion/tasks.py index 15b26d58903..a0101e85b46 100644 --- a/lms/djangoapps/discussion/tasks.py +++ b/lms/djangoapps/discussion/tasks.py @@ -35,6 +35,7 @@ log = logging.getLogger(__name__) DEFAULT_LANGUAGE = 'en' +ROUTING_KEY = getattr(settings, 'ACE_ROUTING_KEY', None) @task(base=LoggedTask) @@ -59,7 +60,7 @@ class ResponseNotification(BaseMessageType): pass -@task(base=LoggedTask) +@task(base=LoggedTask, routing_key=ROUTING_KEY) def send_ace_message(context): context['course_id'] = CourseKey.from_string(context['course_id']) diff --git a/lms/djangoapps/email_marketing/tasks.py b/lms/djangoapps/email_marketing/tasks.py index fa5139ecabf..724b88e4f91 100644 --- a/lms/djangoapps/email_marketing/tasks.py +++ b/lms/djangoapps/email_marketing/tasks.py @@ -18,9 +18,10 @@ from .models import EmailMarketingConfiguration log = logging.getLogger(__name__) SAILTHRU_LIST_CACHE_KEY = "email.marketing.cache" +ACE_ROUTING_KEY = getattr(settings, 'ACE_ROUTING_KEY', None) -@task(bind=True) +@task(bind=True, routing_key=ACE_ROUTING_KEY) def get_email_cookies_via_sailthru(self, user_email, post_parms): """ Adds/updates Sailthru cookie information for a new user. @@ -60,7 +61,7 @@ def get_email_cookies_via_sailthru(self, user_email, post_parms): return None -@task(bind=True, default_retry_delay=3600, max_retries=24) +@task(bind=True, default_retry_delay=3600, max_retries=24, routing_key=ACE_ROUTING_KEY) def update_user(self, sailthru_vars, email, site=None, new_user=False, activation=False): """ Adds/updates Sailthru profile information for a user. @@ -142,7 +143,7 @@ def is_default_site(site): return not site or site.get('id') == settings.SITE_ID -@task(bind=True, default_retry_delay=3600, max_retries=24) +@task(bind=True, default_retry_delay=3600, max_retries=24, routing_key=ACE_ROUTING_KEY) def update_user_email(self, new_email, old_email): """ Adds/updates Sailthru when a user email address is changed @@ -302,7 +303,7 @@ def _retryable_sailthru_error(error): return code == 9 or code == 43 -@task(bind=True) +@task(bind=True, routing_key=ACE_ROUTING_KEY) def update_course_enrollment(self, email, course_key, mode, site=None): """Adds/updates Sailthru when a user adds to cart/purchases/upgrades a course Args: diff --git a/lms/djangoapps/grades/management/commands/compute_grades.py b/lms/djangoapps/grades/management/commands/compute_grades.py index bcaecbffb2a..ed047dd5ac2 100644 --- a/lms/djangoapps/grades/management/commands/compute_grades.py +++ b/lms/djangoapps/grades/management/commands/compute_grades.py @@ -80,7 +80,7 @@ class Command(BaseCommand): """ Enqueue all tasks, in shuffled order. """ - task_options = {'queue': options['routing_key']} if options.get('routing_key') else {} + task_options = {'routing_key': options['routing_key']} if options.get('routing_key') else {} for seq_id, kwargs in enumerate(self._shuffled_task_kwargs(options)): kwargs['seq_id'] = seq_id result = tasks.compute_grades_for_course_v2.apply_async(kwargs=kwargs, **task_options) diff --git a/lms/djangoapps/grades/management/commands/tests/test_compute_grades.py b/lms/djangoapps/grades/management/commands/tests/test_compute_grades.py index accd302bb06..12e86c09d1f 100644 --- a/lms/djangoapps/grades/management/commands/tests/test_compute_grades.py +++ b/lms/djangoapps/grades/management/commands/tests/test_compute_grades.py @@ -92,19 +92,19 @@ class TestComputeGrades(SharedModuleStoreTestCase): # Order doesn't matter, but can't use a set because dicts aren't hashable expected = [ ({ - 'queue': 'key', + 'routing_key': 'key', 'kwargs': _kwargs(self.course_keys[0], 0) },), ({ - 'queue': 'key', + 'routing_key': 'key', 'kwargs': _kwargs(self.course_keys[0], 2) },), ({ - 'queue': 'key', + 'routing_key': 'key', 'kwargs': _kwargs(self.course_keys[3], 0) },), ({ - 'queue': 'key', + 'routing_key': 'key', 'kwargs': _kwargs(self.course_keys[3], 2) },), ] diff --git a/lms/djangoapps/grades/tasks.py b/lms/djangoapps/grades/tasks.py index 5878fd7ebcc..cbd517f9c19 100644 --- a/lms/djangoapps/grades/tasks.py +++ b/lms/djangoapps/grades/tasks.py @@ -48,7 +48,7 @@ RETRY_DELAY_SECONDS = 40 SUBSECTION_GRADE_TIMEOUT_SECONDS = 300 -@task(base=LoggedPersistOnFailureTask) +@task(base=LoggedPersistOnFailureTask, routing_key=settings.POLICY_CHANGE_GRADES_ROUTING_KEY) def compute_all_grades_for_course(**kwargs): """ Compute grades for all students in the specified course. @@ -69,7 +69,7 @@ def compute_all_grades_for_course(**kwargs): 'batch_size': batch_size, }) compute_grades_for_course_v2.apply_async( - kwargs=kwargs, queue=settings.POLICY_CHANGE_GRADES_ROUTING_KEY + kwargs=kwargs, routing_key=settings.POLICY_CHANGE_GRADES_ROUTING_KEY ) @@ -131,6 +131,7 @@ def compute_grades_for_course(course_key, offset, batch_size, **kwargs): # pyli time_limit=SUBSECTION_GRADE_TIMEOUT_SECONDS, max_retries=2, default_retry_delay=RETRY_DELAY_SECONDS, + routing_key=settings.POLICY_CHANGE_GRADES_ROUTING_KEY ) def recalculate_course_and_subsection_grades_for_user(self, **kwargs): # pylint: disable=unused-argument """ @@ -170,7 +171,8 @@ def recalculate_course_and_subsection_grades_for_user(self, **kwargs): # pylint base=LoggedPersistOnFailureTask, time_limit=SUBSECTION_GRADE_TIMEOUT_SECONDS, max_retries=2, - default_retry_delay=RETRY_DELAY_SECONDS + default_retry_delay=RETRY_DELAY_SECONDS, + routing_key=settings.RECALCULATE_GRADES_ROUTING_KEY ) def recalculate_subsection_grade_v3(self, **kwargs): """ diff --git a/lms/djangoapps/instructor_task/tasks.py b/lms/djangoapps/instructor_task/tasks.py index f899157db1d..bf39d88e7d2 100644 --- a/lms/djangoapps/instructor_task/tasks.py +++ b/lms/djangoapps/instructor_task/tasks.py @@ -162,6 +162,7 @@ def send_bulk_course_email(entry_id, _xmodule_instance_args): @task( name='lms.djangoapps.instructor_task.tasks.calculate_problem_responses_csv.v2', base=BaseInstructorTask, + routing_key=settings.GRADES_DOWNLOAD_ROUTING_KEY, ) def calculate_problem_responses_csv(entry_id, xmodule_instance_args): """ @@ -174,7 +175,7 @@ def calculate_problem_responses_csv(entry_id, xmodule_instance_args): return run_main_task(entry_id, task_fn, action_name) -@task(base=BaseInstructorTask) +@task(base=BaseInstructorTask, routing_key=settings.GRADES_DOWNLOAD_ROUTING_KEY) def calculate_grades_csv(entry_id, xmodule_instance_args): """ Grade a course and push the results to an S3 bucket for download. @@ -190,7 +191,7 @@ def calculate_grades_csv(entry_id, xmodule_instance_args): return run_main_task(entry_id, task_fn, action_name) -@task(base=BaseInstructorTask) +@task(base=BaseInstructorTask, routing_key=settings.GRADES_DOWNLOAD_ROUTING_KEY) def calculate_problem_grade_report(entry_id, xmodule_instance_args): """ Generate a CSV for a course containing all students' problem @@ -255,7 +256,7 @@ def calculate_may_enroll_csv(entry_id, xmodule_instance_args): return run_main_task(entry_id, task_fn, action_name) -@task(base=BaseInstructorTask) +@task(base=BaseInstructorTask, routing_key=settings.GRADES_DOWNLOAD_ROUTING_KEY) def generate_certificates(entry_id, xmodule_instance_args): """ Grade students and generate certificates. diff --git a/lms/djangoapps/verify_student/tasks.py b/lms/djangoapps/verify_student/tasks.py index 93190ff41be..081e241f22a 100644 --- a/lms/djangoapps/verify_student/tasks.py +++ b/lms/djangoapps/verify_student/tasks.py @@ -15,6 +15,8 @@ from django.core.mail import EmailMessage from edxmako.shortcuts import render_to_string from openedx.core.djangoapps.site_configuration import helpers as configuration_helpers +ACE_ROUTING_KEY = getattr(settings, 'ACE_ROUTING_KEY', None) +SOFTWARE_SECURE_VERIFICATION_ROUTING_KEY = getattr(settings, 'SOFTWARE_SECURE_VERIFICATION_ROUTING_KEY', None) log = logging.getLogger(__name__) @@ -71,7 +73,7 @@ class BaseSoftwareSecureTask(Task): ) -@task +@task(routing_key=ACE_ROUTING_KEY) def send_verification_status_email(context): """ Spins a task to send verification status email to the learner @@ -97,6 +99,7 @@ def send_verification_status_email(context): bind=True, default_retry_delay=settings.SOFTWARE_SECURE_REQUEST_RETRY_DELAY, max_retries=settings.SOFTWARE_SECURE_RETRY_MAX_ATTEMPTS, + routing_key=SOFTWARE_SECURE_VERIFICATION_ROUTING_KEY, ) def send_request_to_ss_for_user(self, user_verification_id, copy_id_photo_from): """ diff --git a/lms/envs/production.py b/lms/envs/production.py index 07d501e52a7..f6641039322 100644 --- a/lms/envs/production.py +++ b/lms/envs/production.py @@ -150,7 +150,7 @@ CELERY_QUEUES = { HIGH_MEM_QUEUE: {}, } -CELERY_ROUTES = "{}celery.route_task".format(QUEUE_VARIANT) +CELERY_ROUTES = "{}celery.Router".format(QUEUE_VARIANT) CELERYBEAT_SCHEDULE = {} # For scheduling tasks, entries can be added to this dict # STATIC_ROOT specifies the directory where static files are @@ -974,62 +974,3 @@ COMPLETION_VIDEO_COMPLETE_PERCENTAGE = ENV_TOKENS.get('COMPLETION_VIDEO_COMPLETE COMPLETION_VIDEO_COMPLETE_PERCENTAGE) COMPLETION_VIDEO_COMPLETE_PERCENTAGE = ENV_TOKENS.get('COMPLETION_BY_VIEWING_DELAY_MS', COMPLETION_BY_VIEWING_DELAY_MS) - -######################## CELERY ROUTING ######################## - -# Defines alternate environment tasks, as a dict of form { task_name: alternate_queue } -ALTERNATE_ENV_TASKS = {} - -# Defines the task -> alternate worker queue to be used when routing. -EXPLICIT_QUEUES = { - 'openedx.core.djangoapps.content.course_overviews.tasks.async_course_overview_update': { - 'queue': GRADES_DOWNLOAD_ROUTING_KEY}, - 'lms.djangoapps.bulk_email.tasks.send_course_email': { - 'queue': BULK_EMAIL_ROUTING_KEY}, - 'openedx.core.djangoapps.heartbeat.tasks.sample_task': { - 'queue': HEARTBEAT_CELERY_ROUTING_KEY}, - 'lms.djangoapps.instructor_task.tasks.calculate_grades_csv': { - 'queue': GRADES_DOWNLOAD_ROUTING_KEY}, - 'lms.djangoapps.instructor_task.tasks.calculate_problem_grade_report': { - 'queue': GRADES_DOWNLOAD_ROUTING_KEY}, - 'lms.djangoapps.instructor_task.tasks.generate_certificates': { - 'queue': GRADES_DOWNLOAD_ROUTING_KEY}, - 'lms.djangoapps.email_marketing.tasks.get_email_cookies_via_sailthru': { - 'queue': ACE_ROUTING_KEY}, - 'lms.djangoapps.email_marketing.tasks.update_user': { - 'queue': ACE_ROUTING_KEY}, - 'lms.djangoapps.email_marketing.tasks.update_user_email': { - 'queue': ACE_ROUTING_KEY}, - 'lms.djangoapps.email_marketing.tasks.update_course_enrollment': { - 'queue': ACE_ROUTING_KEY}, - 'lms.djangoapps.verify_student.tasks.send_verification_status_email': { - 'queue': ACE_ROUTING_KEY}, - 'lms.djangoapps.verify_student.tasks.send_ace_message': { - 'queue': ACE_ROUTING_KEY}, - 'lms.djangoapps.verify_student.tasks.send_request_to_ss_for_user': { - 'queue': SOFTWARE_SECURE_VERIFICATION_ROUTING_KEY}, - 'openedx.core.djangoapps.schedules.tasks._recurring_nudge_schedule_send': { - 'queue': ACE_ROUTING_KEY}, - 'openedx.core.djangoapps.schedules.tasks._upgrade_reminder_schedule_send': { - 'queue': ACE_ROUTING_KEY}, - 'openedx.core.djangoapps.schedules.tasks._course_update_schedule_send': { - 'queue': ACE_ROUTING_KEY}, - 'openedx.core.djangoapps.schedules.tasks.v1.tasks.send_grade_to_credentials': { - 'queue': CREDENTIALS_GENERATION_ROUTING_KEY}, - 'common.djangoapps.entitlements.tasks.expire_old_entitlements': { - 'queue': ENTITLEMENTS_EXPIRATION_ROUTING_KEY}, - 'lms.djangoapps.grades.tasks.recalculate_course_and_subsection_grades_for_user': { - 'queue': POLICY_CHANGE_GRADES_ROUTING_KEY}, - 'lms.djangoapps.grades.tasks.recalculate_subsection_grade_v3': { - 'queue': RECALCULATE_GRADES_ROUTING_KEY}, - 'openedx.core.djangoapps.programs.tasks.v1.tasks.award_program_certificates': { - 'queue': PROGRAM_CERTIFICATES_ROUTING_KEY}, - 'openedx.core.djangoapps.programs.tasks.v1.tasks.revoke_program_certificates': { - 'queue': PROGRAM_CERTIFICATES_ROUTING_KEY}, - 'openedx.core.djangoapps.programs.tasks.v1.tasks.update_certificate_visible_date_on_course_update': { - 'queue': PROGRAM_CERTIFICATES_ROUTING_KEY}, - 'openedx.core.djangoapps.programs.tasks.v1.tasks.award_course_certificate': { - 'queue': PROGRAM_CERTIFICATES_ROUTING_KEY}, - 'openedx.core.djangoapps.coursegraph.dump_course_to_neo4j': { - 'queue': COURSEGRAPH_JOB_QUEUE}, -} diff --git a/openedx/core/djangoapps/coursegraph/tasks.py b/openedx/core/djangoapps/coursegraph/tasks.py index d6480399868..4e32a2d53ad 100644 --- a/openedx/core/djangoapps/coursegraph/tasks.py +++ b/openedx/core/djangoapps/coursegraph/tasks.py @@ -246,7 +246,7 @@ def should_dump_course(course_key, graph): return last_this_command_was_run < course_last_published_date -@task +@task(routing_key=settings.COURSEGRAPH_JOB_QUEUE) def dump_course_to_neo4j(course_key_string, credentials): """ Serializes a course and writes it to neo4j. diff --git a/openedx/core/djangoapps/credentials/tasks/v1/tasks.py b/openedx/core/djangoapps/credentials/tasks/v1/tasks.py index cd2cb1aa387..5b3e72fc89f 100644 --- a/openedx/core/djangoapps/credentials/tasks/v1/tasks.py +++ b/openedx/core/djangoapps/credentials/tasks/v1/tasks.py @@ -13,6 +13,11 @@ from openedx.core.djangoapps.credentials.utils import get_credentials_api_client logger = get_task_logger(__name__) +# Under cms the following setting is not defined, leading to errors during tests. +# These tasks aren't strictly credentials generation, but are similar in the sense +# that they generate records on the credentials side. And have a similar SLA. +ROUTING_KEY = getattr(settings, 'CREDENTIALS_GENERATION_ROUTING_KEY', None) + # Maximum number of retries before giving up. # For reference, 11 retries with exponential backoff yields a maximum waiting # time of 2047 seconds (about 30 minutes). Setting this to None could yield @@ -20,7 +25,7 @@ logger = get_task_logger(__name__) MAX_RETRIES = 11 -@task(bind=True, ignore_result=True) +@task(bind=True, ignore_result=True, routing_key=ROUTING_KEY) def send_grade_to_credentials(self, username, course_run_key, verified, letter_grade, percent_grade): """ Celery task to notify the Credentials IDA of a grade change via POST. """ logger.info(u'Running task send_grade_to_credentials for username %s and course %s', username, course_run_key) diff --git a/openedx/core/djangoapps/heartbeat/tasks.py b/openedx/core/djangoapps/heartbeat/tasks.py index 6e6b5e32966..dca2849b51b 100644 --- a/openedx/core/djangoapps/heartbeat/tasks.py +++ b/openedx/core/djangoapps/heartbeat/tasks.py @@ -4,8 +4,9 @@ A trivial task for health checks from celery.task import task +from django.conf import settings -@task +@task(routing_key=settings.HEARTBEAT_CELERY_ROUTING_KEY) def sample_task(): return True diff --git a/openedx/core/djangoapps/programs/tasks.py b/openedx/core/djangoapps/programs/tasks.py index 99ff63ef533..913c5d41a89 100644 --- a/openedx/core/djangoapps/programs/tasks.py +++ b/openedx/core/djangoapps/programs/tasks.py @@ -22,6 +22,9 @@ from openedx.core.djangoapps.programs.utils import ProgramProgressMeter from openedx.core.djangoapps.site_configuration import helpers as configuration_helpers LOGGER = get_task_logger(__name__) +# Under cms the following setting is not defined, leading to errors during tests. +ROUTING_KEY = getattr(settings, 'CREDENTIALS_GENERATION_ROUTING_KEY', None) +PROGRAM_CERTIFICATES_ROUTING_KEY = getattr(settings, 'PROGRAM_CERTIFICATES_ROUTING_KEY', None) # Maximum number of retries before giving up on awarding credentials. # For reference, 11 retries with exponential backoff yields a maximum waiting # time of 2047 seconds (about 30 minutes). Setting this to None could yield @@ -120,7 +123,7 @@ def award_program_certificate(client, username, program_uuid, visible_date): }) -@task(bind=True, ignore_result=True) +@task(bind=True, ignore_result=True, routing_key=PROGRAM_CERTIFICATES_ROUTING_KEY) def award_program_certificates(self, username): """ This task is designed to be called whenever a student's completion status @@ -282,7 +285,7 @@ def post_course_certificate(client, username, certificate, visible_date): }) -@task(bind=True, ignore_result=True) +@task(bind=True, ignore_result=True, routing_key=ROUTING_KEY) def award_course_certificate(self, username, course_run_key): """ This task is designed to be called whenever a student GeneratedCertificate is updated. @@ -396,7 +399,7 @@ def revoke_program_certificate(client, username, program_uuid): }) -@task(bind=True, ignore_result=True) +@task(bind=True, ignore_result=True, routing_key=PROGRAM_CERTIFICATES_ROUTING_KEY) def revoke_program_certificates(self, username, course_key): """ This task is designed to be called whenever a student's course certificate is @@ -519,7 +522,7 @@ def revoke_program_certificates(self, username, course_key): LOGGER.info(u'Successfully completed the task revoke_program_certificates for username %s', username) -@task(bind=True, ignore_result=True) +@task(bind=True, ignore_result=True, routing_key=PROGRAM_CERTIFICATES_ROUTING_KEY) def update_certificate_visible_date_on_course_update(self, course_key): """ This task is designed to be called whenever a course is updated with diff --git a/openedx/core/djangoapps/schedules/tasks.py b/openedx/core/djangoapps/schedules/tasks.py index 74ed820a74a..fb30694b537 100644 --- a/openedx/core/djangoapps/schedules/tasks.py +++ b/openedx/core/djangoapps/schedules/tasks.py @@ -143,7 +143,7 @@ class BinnedScheduleMessageBaseTask(ScheduleMessageBaseTask): raise NotImplementedError -@task(base=LoggedTask, ignore_result=True) +@task(base=LoggedTask, ignore_result=True, routing_key=ROUTING_KEY) def _recurring_nudge_schedule_send(site_id, msg_str): _schedule_send( msg_str, @@ -153,7 +153,7 @@ def _recurring_nudge_schedule_send(site_id, msg_str): ) -@task(base=LoggedTask, ignore_result=True) +@task(base=LoggedTask, ignore_result=True, routing_key=ROUTING_KEY) def _upgrade_reminder_schedule_send(site_id, msg_str): _schedule_send( msg_str, @@ -163,7 +163,7 @@ def _upgrade_reminder_schedule_send(site_id, msg_str): ) -@task(base=LoggedTask, ignore_result=True) +@task(base=LoggedTask, ignore_result=True, routing_key=ROUTING_KEY) def _course_update_schedule_send(site_id, msg_str): _schedule_send( msg_str, diff --git a/openedx/core/lib/celery/routers.py b/openedx/core/lib/celery/routers.py index 72fc5c8370a..50fef96f522 100644 --- a/openedx/core/lib/celery/routers.py +++ b/openedx/core/lib/celery/routers.py @@ -5,41 +5,62 @@ For more, see https://celery.readthedocs.io/en/latest/userguide/routing.html#rou """ import logging +from abc import ABCMeta, abstractproperty +from django.conf import settings +import six log = logging.getLogger(__name__) -def route_task_queue(name): +class AlternateEnvironmentRouter(six.with_metaclass(ABCMeta, object)): """ - Helper method allowing for custom routing logic. - - If None is returned from this method, default routing logic is used. + A custom Router class for use in routing celery tasks to non-default queues. """ - from django.conf import settings # pylint: disable=import-outside-toplevel - if name in settings.EXPLICIT_QUEUES: - return settings.EXPLICIT_QUEUES[name] + @abstractproperty + def alternate_env_tasks(self): + """ + Defines the task -> alternate worker environment to be used when routing. - alternate_env = settings.ALTERNATE_ENV_TASKS.get(name, None) - if alternate_env: - return ensure_queue_env(alternate_env) + Subclasses must override this property with their own specific mappings. + """ + return {} + @property + def explicit_queues(self): + """ + Defines the task -> alternate worker queue to be used when routing. + """ + return {} -def ensure_queue_env(desired_env): - """ - Helper method to get the desired type of queue. + def route_for_task(self, task, args=None, kwargs=None): # pylint: disable=unused-argument + """ + Celery-defined method allowing for custom routing logic. - If no such queue is defined, default routing logic is used. - """ - from django.conf import settings # pylint: disable=import-outside-toplevel - - queues = getattr(settings, 'CELERY_QUEUES', None) - return next( - ( - queue - for queue in queues - if '.{}.'.format(desired_env) in queue - ), - None - ) + If None is returned from this method, default routing logic is used. + """ + if task in self.explicit_queues: + return self.explicit_queues[task] + + alternate_env = self.alternate_env_tasks.get(task, None) + if alternate_env: + return self.ensure_queue_env(alternate_env) + + return None + + def ensure_queue_env(self, desired_env): + """ + Helper method to get the desired type of queue. + + If no such queue is defined, default routing logic is used. + """ + queues = getattr(settings, 'CELERY_QUEUES', None) + return next( + ( + queue + for queue in queues + if '.{}.'.format(desired_env) in queue + ), + None + ) -- GitLab