From bd601cf3a606c9b72120936b2f6752538ec4f3e8 Mon Sep 17 00:00:00 2001
From: Muhammad Soban Javed <58461728+iamsobanjaved@users.noreply.github.com>
Date: Wed, 16 Dec 2020 13:40:47 +0500
Subject: [PATCH] Update celery routing for celery 4+ (#25567)

* Update celery routing

- Used routing function instead of class
- Move task queues dictionary to Django settings
- Removed routing_key parameter
- Refactored routing for singleton celery instantiation

Co-authored-by: Awais Qureshi <awais.qureshi@arbisoft.com>
---
 cms/celery.py                                 | 33 ---------
 cms/djangoapps/contentstore/tasks.py          |  6 +-
 cms/envs/production.py                        | 19 ++++-
 common/djangoapps/entitlements/tasks.py       |  4 +-
 lms/celery.py                                 | 15 ----
 lms/djangoapps/bulk_email/tasks.py            |  3 -
 lms/djangoapps/discussion/tasks.py            |  3 +-
 lms/djangoapps/email_marketing/tasks.py       |  9 ++-
 lms/djangoapps/gating/tasks.py                |  2 +-
 .../management/commands/compute_grades.py     |  2 +-
 .../commands/tests/test_compute_grades.py     |  8 +--
 lms/djangoapps/grades/tasks.py                |  8 +--
 lms/djangoapps/instructor_task/tasks.py       |  7 +-
 lms/djangoapps/verify_student/tasks.py        |  5 +-
 lms/envs/production.py                        | 61 +++++++++++++++-
 openedx/core/djangoapps/coursegraph/tasks.py  |  2 +-
 .../djangoapps/credentials/tasks/v1/tasks.py  |  7 +-
 openedx/core/djangoapps/heartbeat/tasks.py    |  3 +-
 openedx/core/djangoapps/programs/tasks.py     | 11 ++-
 openedx/core/djangoapps/schedules/tasks.py    |  6 +-
 openedx/core/lib/celery/routers.py            | 71 +++++++------------
 21 files changed, 134 insertions(+), 151 deletions(-)

diff --git a/cms/celery.py b/cms/celery.py
index 7936d04fe3d..293121bcf22 100644
--- a/cms/celery.py
+++ b/cms/celery.py
@@ -8,41 +8,8 @@ Taken from: https://celery.readthedocs.org/en/latest/django/first-steps-with-dja
 
 import os
 
-from openedx.core.lib.celery.routers import AlternateEnvironmentRouter
 
 # Set the default Django settings module for the 'celery' program
 # and then instantiate the Celery singleton.
 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'cms.envs.production')
 from openedx.core.lib.celery import APP  # pylint: disable=wrong-import-position,unused-import
-
-# Import after autodiscovery has had a chance to connect to the import_module signal
-# so celery doesn't miss any apps getting installed.
-from django.conf import settings  # pylint: disable=wrong-import-position,wrong-import-order
-
-
-class Router(AlternateEnvironmentRouter):
-    """
-    An implementation of AlternateEnvironmentRouter, for routing tasks to non-cms queues.
-    """
-
-    @property
-    def alternate_env_tasks(self):
-        """
-        Defines alternate environment tasks, as a dict of form { task_name: alternate_queue }
-        """
-        # The tasks below will be routed to the default lms queue.
-        return {
-            'completion_aggregator.tasks.update_aggregators': 'lms',
-            'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache': 'lms',
-            'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache_v2': 'lms',
-        }
-
-    @property
-    def explicit_queues(self):
-        """
-        Defines specific queues for tasks to run in (typically outside of the cms environment),
-        as a dict of form { task_name: queue_name }.
-        """
-        return {
-            'lms.djangoapps.grades.tasks.compute_all_grades_for_course': settings.POLICY_CHANGE_GRADES_ROUTING_KEY,
-        }
diff --git a/cms/djangoapps/contentstore/tasks.py b/cms/djangoapps/contentstore/tasks.py
index 97e8c5bbb1f..ecc883a636a 100644
--- a/cms/djangoapps/contentstore/tasks.py
+++ b/cms/djangoapps/contentstore/tasks.py
@@ -83,7 +83,7 @@ def clone_instance(instance, field_values):
     return instance
 
 
-@task()
+@task
 @set_code_owner_attribute
 def rerun_course(source_course_key_string, destination_course_key_string, user_id, fields=None):
     """
@@ -170,7 +170,7 @@ def _parse_time(time_isoformat):
     ).replace(tzinfo=UTC)
 
 
-@task(routing_key=settings.UPDATE_SEARCH_INDEX_JOB_QUEUE)
+@task
 @set_code_owner_attribute
 def update_search_index(course_id, triggered_time_isoformat):
     """ Updates course search index. """
@@ -195,7 +195,7 @@ def update_search_index(course_id, triggered_time_isoformat):
         LOGGER.debug(u'Search indexing successful for complete course %s', course_id)
 
 
-@task()
+@task
 @set_code_owner_attribute
 def update_library_index(library_id, triggered_time_isoformat):
     """ Updates course search index. """
diff --git a/cms/envs/production.py b/cms/envs/production.py
index 8cf5473e435..317c842db99 100644
--- a/cms/envs/production.py
+++ b/cms/envs/production.py
@@ -136,7 +136,7 @@ CELERY_QUEUES = {
     DEFAULT_PRIORITY_QUEUE: {}
 }
 
-CELERY_ROUTES = "{}celery.Router".format(QUEUE_VARIANT)
+CELERY_ROUTES = "openedx.core.lib.celery.routers.route_task"
 
 # STATIC_URL_BASE specifies the base url to use for static files
 STATIC_URL_BASE = ENV_TOKENS.get('STATIC_URL_BASE', None)
@@ -574,3 +574,20 @@ LOGO_URL = ENV_TOKENS.get('LOGO_URL', LOGO_URL)
 LOGO_URL_PNG = ENV_TOKENS.get('LOGO_URL_PNG', LOGO_URL_PNG)
 LOGO_TRADEMARK_URL = ENV_TOKENS.get('LOGO_TRADEMARK_URL', LOGO_TRADEMARK_URL)
 FAVICON_URL = ENV_TOKENS.get('FAVICON_URL', FAVICON_URL)
+
+######################## CELERY ROTUING ########################
+
+# Defines alternate environment tasks, as a dict of form { task_name: alternate_queue }
+ALTERNATE_ENV_TASKS = {
+    'completion_aggregator.tasks.update_aggregators': 'lms',
+    'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache': 'lms',
+    'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache_v2': 'lms',
+}
+
+# Defines the task -> alternate worker queue to be used when routing.
+EXPLICIT_QUEUES = {
+    'lms.djangoapps.grades.tasks.compute_all_grades_for_course': {
+        'queue': POLICY_CHANGE_GRADES_ROUTING_KEY},
+    'cms.djangoapps.contentstore.tasks.update_search_index': {
+        'queue': UPDATE_SEARCH_INDEX_JOB_QUEUE},
+}
diff --git a/common/djangoapps/entitlements/tasks.py b/common/djangoapps/entitlements/tasks.py
index ddc01ca572a..72b18ade253 100644
--- a/common/djangoapps/entitlements/tasks.py
+++ b/common/djangoapps/entitlements/tasks.py
@@ -11,8 +11,7 @@ from edx_django_utils.monitoring import set_code_owner_attribute
 from common.djangoapps.entitlements.models import CourseEntitlement
 
 LOGGER = get_task_logger(__name__)
-# Under cms the following setting is not defined, leading to errors during tests.
-ROUTING_KEY = getattr(settings, 'ENTITLEMENTS_EXPIRATION_ROUTING_KEY', None)
+
 # Maximum number of retries before giving up on awarding credentials.
 # For reference, 11 retries with exponential backoff yields a maximum waiting
 # time of 2047 seconds (about 30 minutes). Setting this to None could yield
@@ -23,7 +22,6 @@ MAX_RETRIES = 11
 @task(
     bind=True,
     ignore_result=True,
-    routing_key=ROUTING_KEY,
     name='entitlements.expire_old_entitlements',
 )
 @set_code_owner_attribute
diff --git a/lms/celery.py b/lms/celery.py
index 7f4c44c9b2e..808df030ef6 100644
--- a/lms/celery.py
+++ b/lms/celery.py
@@ -5,25 +5,10 @@ and auto discover tasks in all installed django apps.
 Taken from: https://celery.readthedocs.org/en/latest/django/first-steps-with-django.html
 """
 
-
 import os
 
-from openedx.core.lib.celery.routers import AlternateEnvironmentRouter
 
 # Set the default Django settings module for the 'celery' program
 # and then instantiate the Celery singleton.
 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'lms.envs.production')
 from openedx.core.lib.celery import APP  # pylint: disable=wrong-import-position,unused-import
-
-
-class Router(AlternateEnvironmentRouter):
-    """
-    An implementation of AlternateEnvironmentRouter, for routing tasks to non-cms queues.
-    """
-
-    @property
-    def alternate_env_tasks(self):
-        """
-        Defines alternate environment tasks, as a dict of form { task_name: alternate_queue }
-        """
-        return {}
diff --git a/lms/djangoapps/bulk_email/tasks.py b/lms/djangoapps/bulk_email/tasks.py
index 447482dbefb..65665f2289e 100644
--- a/lms/djangoapps/bulk_email/tasks.py
+++ b/lms/djangoapps/bulk_email/tasks.py
@@ -197,8 +197,6 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
 
     total_recipients = combined_set.count()
 
-    routing_key = settings.BULK_EMAIL_ROUTING_KEY
-
     # Weird things happen if we allow empty querysets as input to emailing subtasks
     # The task appears to hang at "0 out of 0 completed" and never finishes.
     if total_recipients == 0:
@@ -218,7 +216,6 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
                 initial_subtask_status.to_dict(),
             ),
             task_id=subtask_id,
-            routing_key=routing_key,
         )
         return new_subtask
 
diff --git a/lms/djangoapps/discussion/tasks.py b/lms/djangoapps/discussion/tasks.py
index 8c453e153cf..2a00c0c73ce 100644
--- a/lms/djangoapps/discussion/tasks.py
+++ b/lms/djangoapps/discussion/tasks.py
@@ -36,7 +36,6 @@ log = logging.getLogger(__name__)
 
 
 DEFAULT_LANGUAGE = 'en'
-ROUTING_KEY = getattr(settings, 'ACE_ROUTING_KEY', None)
 
 
 @task(base=LoggedTask)
@@ -62,7 +61,7 @@ class ResponseNotification(BaseMessageType):
     pass
 
 
-@task(base=LoggedTask, routing_key=ROUTING_KEY)
+@task(base=LoggedTask)
 @set_code_owner_attribute
 def send_ace_message(context):
     context['course_id'] = CourseKey.from_string(context['course_id'])
diff --git a/lms/djangoapps/email_marketing/tasks.py b/lms/djangoapps/email_marketing/tasks.py
index 21051f43452..ce68f95e3cd 100644
--- a/lms/djangoapps/email_marketing/tasks.py
+++ b/lms/djangoapps/email_marketing/tasks.py
@@ -19,10 +19,9 @@ from .models import EmailMarketingConfiguration
 
 log = logging.getLogger(__name__)
 SAILTHRU_LIST_CACHE_KEY = "email.marketing.cache"
-ACE_ROUTING_KEY = getattr(settings, 'ACE_ROUTING_KEY', None)
 
 
-@task(bind=True, routing_key=ACE_ROUTING_KEY)
+@task(bind=True)
 @set_code_owner_attribute
 def get_email_cookies_via_sailthru(self, user_email, post_parms):
     """
@@ -63,7 +62,7 @@ def get_email_cookies_via_sailthru(self, user_email, post_parms):
     return None
 
 
-@task(bind=True, default_retry_delay=3600, max_retries=24, routing_key=ACE_ROUTING_KEY)
+@task(bind=True, default_retry_delay=3600, max_retries=24)
 @set_code_owner_attribute
 def update_user(self, sailthru_vars, email, site=None, new_user=False, activation=False):
     """
@@ -146,7 +145,7 @@ def is_default_site(site):
     return not site or site.get('id') == settings.SITE_ID
 
 
-@task(bind=True, default_retry_delay=3600, max_retries=24, routing_key=ACE_ROUTING_KEY)
+@task(bind=True, default_retry_delay=3600, max_retries=24)
 @set_code_owner_attribute
 def update_user_email(self, new_email, old_email):
     """
@@ -307,7 +306,7 @@ def _retryable_sailthru_error(error):
     return code == 9 or code == 43
 
 
-@task(bind=True, routing_key=ACE_ROUTING_KEY)
+@task(bind=True)
 @set_code_owner_attribute
 def update_course_enrollment(self, email, course_key, mode, site=None):
     """Adds/updates Sailthru when a user adds to cart/purchases/upgrades a course
diff --git a/lms/djangoapps/gating/tasks.py b/lms/djangoapps/gating/tasks.py
index 5e43bcd2b31..35ce1ee5d93 100644
--- a/lms/djangoapps/gating/tasks.py
+++ b/lms/djangoapps/gating/tasks.py
@@ -18,7 +18,7 @@ from xmodule.modulestore.django import modulestore
 log = logging.getLogger(__name__)
 
 
-@task()
+@task
 @set_code_owner_attribute
 def task_evaluate_subsection_completion_milestones(course_id, block_id, user_id):
     """
diff --git a/lms/djangoapps/grades/management/commands/compute_grades.py b/lms/djangoapps/grades/management/commands/compute_grades.py
index ed047dd5ac2..bcaecbffb2a 100644
--- a/lms/djangoapps/grades/management/commands/compute_grades.py
+++ b/lms/djangoapps/grades/management/commands/compute_grades.py
@@ -80,7 +80,7 @@ class Command(BaseCommand):
         """
         Enqueue all tasks, in shuffled order.
         """
-        task_options = {'routing_key': options['routing_key']} if options.get('routing_key') else {}
+        task_options = {'queue': options['routing_key']} if options.get('routing_key') else {}
         for seq_id, kwargs in enumerate(self._shuffled_task_kwargs(options)):
             kwargs['seq_id'] = seq_id
             result = tasks.compute_grades_for_course_v2.apply_async(kwargs=kwargs, **task_options)
diff --git a/lms/djangoapps/grades/management/commands/tests/test_compute_grades.py b/lms/djangoapps/grades/management/commands/tests/test_compute_grades.py
index 7ca5fb9dad5..adb55066a0b 100644
--- a/lms/djangoapps/grades/management/commands/tests/test_compute_grades.py
+++ b/lms/djangoapps/grades/management/commands/tests/test_compute_grades.py
@@ -92,19 +92,19 @@ class TestComputeGrades(SharedModuleStoreTestCase):
         # Order doesn't matter, but can't use a set because dicts aren't hashable
         expected = [
             ({
-                'routing_key': 'key',
+                'queue': 'key',
                 'kwargs': _kwargs(self.course_keys[0], 0)
             },),
             ({
-                'routing_key': 'key',
+                'queue': 'key',
                 'kwargs': _kwargs(self.course_keys[0], 2)
             },),
             ({
-                'routing_key': 'key',
+                'queue': 'key',
                 'kwargs': _kwargs(self.course_keys[3], 0)
             },),
             ({
-                'routing_key': 'key',
+                'queue': 'key',
                 'kwargs': _kwargs(self.course_keys[3], 2)
             },),
         ]
diff --git a/lms/djangoapps/grades/tasks.py b/lms/djangoapps/grades/tasks.py
index 2ccfdeb8af5..8f25b348422 100644
--- a/lms/djangoapps/grades/tasks.py
+++ b/lms/djangoapps/grades/tasks.py
@@ -52,7 +52,7 @@ RETRY_DELAY_SECONDS = 40
 SUBSECTION_GRADE_TIMEOUT_SECONDS = 300
 
 
-@task(base=LoggedPersistOnFailureTask, routing_key=settings.POLICY_CHANGE_GRADES_ROUTING_KEY)
+@task(base=LoggedPersistOnFailureTask)
 @set_code_owner_attribute
 def compute_all_grades_for_course(**kwargs):
     """
@@ -74,7 +74,7 @@ def compute_all_grades_for_course(**kwargs):
                 'batch_size': batch_size,
             })
             compute_grades_for_course_v2.apply_async(
-                kwargs=kwargs, routing_key=settings.POLICY_CHANGE_GRADES_ROUTING_KEY
+                kwargs=kwargs, queue=settings.POLICY_CHANGE_GRADES_ROUTING_KEY
             )
 
 
@@ -138,7 +138,6 @@ def compute_grades_for_course(course_key, offset, batch_size, **kwargs):  # pyli
     time_limit=SUBSECTION_GRADE_TIMEOUT_SECONDS,
     max_retries=2,
     default_retry_delay=RETRY_DELAY_SECONDS,
-    routing_key=settings.POLICY_CHANGE_GRADES_ROUTING_KEY
 )
 @set_code_owner_attribute
 def recalculate_course_and_subsection_grades_for_user(self, **kwargs):  # pylint: disable=unused-argument
@@ -179,8 +178,7 @@ def recalculate_course_and_subsection_grades_for_user(self, **kwargs):  # pylint
     base=LoggedPersistOnFailureTask,
     time_limit=SUBSECTION_GRADE_TIMEOUT_SECONDS,
     max_retries=2,
-    default_retry_delay=RETRY_DELAY_SECONDS,
-    routing_key=settings.RECALCULATE_GRADES_ROUTING_KEY
+    default_retry_delay=RETRY_DELAY_SECONDS
 )
 @set_code_owner_attribute
 def recalculate_subsection_grade_v3(self, **kwargs):
diff --git a/lms/djangoapps/instructor_task/tasks.py b/lms/djangoapps/instructor_task/tasks.py
index 34d49748268..8a05d213ff5 100644
--- a/lms/djangoapps/instructor_task/tasks.py
+++ b/lms/djangoapps/instructor_task/tasks.py
@@ -168,7 +168,6 @@ def send_bulk_course_email(entry_id, _xmodule_instance_args):
 @task(
     name='lms.djangoapps.instructor_task.tasks.calculate_problem_responses_csv.v2',
     base=BaseInstructorTask,
-    routing_key=settings.GRADES_DOWNLOAD_ROUTING_KEY,
 )
 @set_code_owner_attribute
 def calculate_problem_responses_csv(entry_id, xmodule_instance_args):
@@ -182,7 +181,7 @@ def calculate_problem_responses_csv(entry_id, xmodule_instance_args):
     return run_main_task(entry_id, task_fn, action_name)
 
 
-@task(base=BaseInstructorTask, routing_key=settings.GRADES_DOWNLOAD_ROUTING_KEY)
+@task(base=BaseInstructorTask)
 @set_code_owner_attribute
 def calculate_grades_csv(entry_id, xmodule_instance_args):
     """
@@ -199,7 +198,7 @@ def calculate_grades_csv(entry_id, xmodule_instance_args):
     return run_main_task(entry_id, task_fn, action_name)
 
 
-@task(base=BaseInstructorTask, routing_key=settings.GRADES_DOWNLOAD_ROUTING_KEY)
+@task(base=BaseInstructorTask)
 @set_code_owner_attribute
 def calculate_problem_grade_report(entry_id, xmodule_instance_args):
     """
@@ -269,7 +268,7 @@ def calculate_may_enroll_csv(entry_id, xmodule_instance_args):
     return run_main_task(entry_id, task_fn, action_name)
 
 
-@task(base=BaseInstructorTask, routing_key=settings.GRADES_DOWNLOAD_ROUTING_KEY)
+@task(base=BaseInstructorTask)
 @set_code_owner_attribute
 def generate_certificates(entry_id, xmodule_instance_args):
     """
diff --git a/lms/djangoapps/verify_student/tasks.py b/lms/djangoapps/verify_student/tasks.py
index aca08deac0a..f3acc5744d9 100644
--- a/lms/djangoapps/verify_student/tasks.py
+++ b/lms/djangoapps/verify_student/tasks.py
@@ -16,8 +16,6 @@ from edx_django_utils.monitoring import set_code_owner_attribute
 from common.djangoapps.edxmako.shortcuts import render_to_string
 from openedx.core.djangoapps.site_configuration import helpers as configuration_helpers
 
-ACE_ROUTING_KEY = getattr(settings, 'ACE_ROUTING_KEY', None)
-SOFTWARE_SECURE_VERIFICATION_ROUTING_KEY = getattr(settings, 'SOFTWARE_SECURE_VERIFICATION_ROUTING_KEY', None)
 log = logging.getLogger(__name__)
 
 
@@ -74,7 +72,7 @@ class BaseSoftwareSecureTask(Task):
             )
 
 
-@task(routing_key=ACE_ROUTING_KEY)
+@task
 @set_code_owner_attribute
 def send_verification_status_email(context):
     """
@@ -101,7 +99,6 @@ def send_verification_status_email(context):
     bind=True,
     default_retry_delay=settings.SOFTWARE_SECURE_REQUEST_RETRY_DELAY,
     max_retries=settings.SOFTWARE_SECURE_RETRY_MAX_ATTEMPTS,
-    routing_key=SOFTWARE_SECURE_VERIFICATION_ROUTING_KEY,
 )
 @set_code_owner_attribute
 def send_request_to_ss_for_user(self, user_verification_id, copy_id_photo_from):
diff --git a/lms/envs/production.py b/lms/envs/production.py
index 4aaf20a0989..7a60ef949bf 100644
--- a/lms/envs/production.py
+++ b/lms/envs/production.py
@@ -150,7 +150,7 @@ CELERY_QUEUES = {
     HIGH_MEM_QUEUE: {},
 }
 
-CELERY_ROUTES = "{}celery.Router".format(QUEUE_VARIANT)
+CELERY_ROUTES = "openedx.core.lib.celery.routers.route_task"
 CELERYBEAT_SCHEDULE = {}  # For scheduling tasks, entries can be added to this dict
 
 # STATIC_ROOT specifies the directory where static files are
@@ -980,3 +980,62 @@ LOGO_URL = ENV_TOKENS.get('LOGO_URL', LOGO_URL)
 LOGO_URL_PNG = ENV_TOKENS.get('LOGO_URL_PNG', LOGO_URL_PNG)
 LOGO_TRADEMARK_URL = ENV_TOKENS.get('LOGO_TRADEMARK_URL', LOGO_TRADEMARK_URL)
 FAVICON_URL = ENV_TOKENS.get('FAVICON_URL', FAVICON_URL)
+
+######################## CELERY ROUTING ########################
+
+# Defines alternate environment tasks, as a dict of form { task_name: alternate_queue }
+ALTERNATE_ENV_TASKS = {}
+
+# Defines the task -> alternate worker queue to be used when routing.
+EXPLICIT_QUEUES = {
+    'openedx.core.djangoapps.content.course_overviews.tasks.async_course_overview_update': {
+        'queue': GRADES_DOWNLOAD_ROUTING_KEY},
+    'lms.djangoapps.bulk_email.tasks.send_course_email': {
+        'queue': BULK_EMAIL_ROUTING_KEY},
+    'openedx.core.djangoapps.heartbeat.tasks.sample_task': {
+        'queue': HEARTBEAT_CELERY_ROUTING_KEY},
+    'lms.djangoapps.instructor_task.tasks.calculate_grades_csv': {
+        'queue': GRADES_DOWNLOAD_ROUTING_KEY},
+    'lms.djangoapps.instructor_task.tasks.calculate_problem_grade_report': {
+        'queue': GRADES_DOWNLOAD_ROUTING_KEY},
+    'lms.djangoapps.instructor_task.tasks.generate_certificates': {
+        'queue': GRADES_DOWNLOAD_ROUTING_KEY},
+    'lms.djangoapps.email_marketing.tasks.get_email_cookies_via_sailthru': {
+        'queue': ACE_ROUTING_KEY},
+    'lms.djangoapps.email_marketing.tasks.update_user': {
+        'queue': ACE_ROUTING_KEY},
+    'lms.djangoapps.email_marketing.tasks.update_user_email': {
+        'queue': ACE_ROUTING_KEY},
+    'lms.djangoapps.email_marketing.tasks.update_course_enrollment': {
+        'queue': ACE_ROUTING_KEY},
+    'lms.djangoapps.verify_student.tasks.send_verification_status_email': {
+        'queue': ACE_ROUTING_KEY},
+    'lms.djangoapps.verify_student.tasks.send_ace_message': {
+        'queue': ACE_ROUTING_KEY},
+    'lms.djangoapps.verify_student.tasks.send_request_to_ss_for_user': {
+        'queue': SOFTWARE_SECURE_VERIFICATION_ROUTING_KEY},
+    'openedx.core.djangoapps.schedules.tasks._recurring_nudge_schedule_send': {
+        'queue': ACE_ROUTING_KEY},
+    'openedx.core.djangoapps.schedules.tasks._upgrade_reminder_schedule_send': {
+        'queue': ACE_ROUTING_KEY},
+    'openedx.core.djangoapps.schedules.tasks._course_update_schedule_send': {
+        'queue': ACE_ROUTING_KEY},
+    'openedx.core.djangoapps.schedules.tasks.v1.tasks.send_grade_to_credentials': {
+        'queue': CREDENTIALS_GENERATION_ROUTING_KEY},
+    'common.djangoapps.entitlements.tasks.expire_old_entitlements': {
+        'queue': ENTITLEMENTS_EXPIRATION_ROUTING_KEY},
+    'lms.djangoapps.grades.tasks.recalculate_course_and_subsection_grades_for_user': {
+        'queue': POLICY_CHANGE_GRADES_ROUTING_KEY},
+    'lms.djangoapps.grades.tasks.recalculate_subsection_grade_v3': {
+        'queue': RECALCULATE_GRADES_ROUTING_KEY},
+    'openedx.core.djangoapps.programs.tasks.v1.tasks.award_program_certificates': {
+        'queue': PROGRAM_CERTIFICATES_ROUTING_KEY},
+    'openedx.core.djangoapps.programs.tasks.v1.tasks.revoke_program_certificates': {
+        'queue': PROGRAM_CERTIFICATES_ROUTING_KEY},
+    'openedx.core.djangoapps.programs.tasks.v1.tasks.update_certificate_visible_date_on_course_update': {
+        'queue': PROGRAM_CERTIFICATES_ROUTING_KEY},
+    'openedx.core.djangoapps.programs.tasks.v1.tasks.award_course_certificate': {
+        'queue': PROGRAM_CERTIFICATES_ROUTING_KEY},
+    'openedx.core.djangoapps.coursegraph.dump_course_to_neo4j': {
+        'queue': COURSEGRAPH_JOB_QUEUE},
+}
diff --git a/openedx/core/djangoapps/coursegraph/tasks.py b/openedx/core/djangoapps/coursegraph/tasks.py
index b9c2ec0894b..3eec402497e 100644
--- a/openedx/core/djangoapps/coursegraph/tasks.py
+++ b/openedx/core/djangoapps/coursegraph/tasks.py
@@ -247,7 +247,7 @@ def should_dump_course(course_key, graph):
     return last_this_command_was_run < course_last_published_date
 
 
-@task(routing_key=settings.COURSEGRAPH_JOB_QUEUE)
+@task
 @set_code_owner_attribute
 def dump_course_to_neo4j(course_key_string, credentials):
     """
diff --git a/openedx/core/djangoapps/credentials/tasks/v1/tasks.py b/openedx/core/djangoapps/credentials/tasks/v1/tasks.py
index 040aaba75bd..8492d5cc893 100644
--- a/openedx/core/djangoapps/credentials/tasks/v1/tasks.py
+++ b/openedx/core/djangoapps/credentials/tasks/v1/tasks.py
@@ -14,11 +14,6 @@ from openedx.core.djangoapps.credentials.utils import get_credentials_api_client
 
 logger = get_task_logger(__name__)
 
-# Under cms the following setting is not defined, leading to errors during tests.
-# These tasks aren't strictly credentials generation, but are similar in the sense
-# that they generate records on the credentials side. And have a similar SLA.
-ROUTING_KEY = getattr(settings, 'CREDENTIALS_GENERATION_ROUTING_KEY', None)
-
 # Maximum number of retries before giving up.
 # For reference, 11 retries with exponential backoff yields a maximum waiting
 # time of 2047 seconds (about 30 minutes). Setting this to None could yield
@@ -26,7 +21,7 @@ ROUTING_KEY = getattr(settings, 'CREDENTIALS_GENERATION_ROUTING_KEY', None)
 MAX_RETRIES = 11
 
 
-@task(bind=True, ignore_result=True, routing_key=ROUTING_KEY)
+@task(bind=True, ignore_result=True)
 @set_code_owner_attribute
 def send_grade_to_credentials(self, username, course_run_key, verified, letter_grade, percent_grade):
     """ Celery task to notify the Credentials IDA of a grade change via POST. """
diff --git a/openedx/core/djangoapps/heartbeat/tasks.py b/openedx/core/djangoapps/heartbeat/tasks.py
index 7dd184b760f..962d249d12f 100644
--- a/openedx/core/djangoapps/heartbeat/tasks.py
+++ b/openedx/core/djangoapps/heartbeat/tasks.py
@@ -4,11 +4,10 @@ A trivial task for health checks
 
 
 from celery.task import task
-from django.conf import settings
 from edx_django_utils.monitoring import set_code_owner_attribute
 
 
-@task(routing_key=settings.HEARTBEAT_CELERY_ROUTING_KEY)
+@task
 @set_code_owner_attribute
 def sample_task():
     return True
diff --git a/openedx/core/djangoapps/programs/tasks.py b/openedx/core/djangoapps/programs/tasks.py
index 8d0853f1e65..a25f7e3eceb 100644
--- a/openedx/core/djangoapps/programs/tasks.py
+++ b/openedx/core/djangoapps/programs/tasks.py
@@ -23,9 +23,6 @@ from openedx.core.djangoapps.programs.utils import ProgramProgressMeter
 from openedx.core.djangoapps.site_configuration import helpers as configuration_helpers
 
 LOGGER = get_task_logger(__name__)
-# Under cms the following setting is not defined, leading to errors during tests.
-ROUTING_KEY = getattr(settings, 'CREDENTIALS_GENERATION_ROUTING_KEY', None)
-PROGRAM_CERTIFICATES_ROUTING_KEY = getattr(settings, 'PROGRAM_CERTIFICATES_ROUTING_KEY', None)
 # Maximum number of retries before giving up on awarding credentials.
 # For reference, 11 retries with exponential backoff yields a maximum waiting
 # time of 2047 seconds (about 30 minutes). Setting this to None could yield
@@ -124,7 +121,7 @@ def award_program_certificate(client, username, program_uuid, visible_date):
     })
 
 
-@task(bind=True, ignore_result=True, routing_key=PROGRAM_CERTIFICATES_ROUTING_KEY)
+@task(bind=True, ignore_result=True)
 @set_code_owner_attribute
 def award_program_certificates(self, username):
     """
@@ -287,7 +284,7 @@ def post_course_certificate(client, username, certificate, visible_date):
     })
 
 
-@task(bind=True, ignore_result=True, routing_key=ROUTING_KEY)
+@task(bind=True, ignore_result=True)
 @set_code_owner_attribute
 def award_course_certificate(self, username, course_run_key):
     """
@@ -402,7 +399,7 @@ def revoke_program_certificate(client, username, program_uuid):
     })
 
 
-@task(bind=True, ignore_result=True, routing_key=PROGRAM_CERTIFICATES_ROUTING_KEY)
+@task(bind=True, ignore_result=True)
 @set_code_owner_attribute
 def revoke_program_certificates(self, username, course_key):
     """
@@ -526,7 +523,7 @@ def revoke_program_certificates(self, username, course_key):
     LOGGER.info(u'Successfully completed the task revoke_program_certificates for username %s', username)
 
 
-@task(bind=True, ignore_result=True, routing_key=PROGRAM_CERTIFICATES_ROUTING_KEY)
+@task(bind=True, ignore_result=True)
 @set_code_owner_attribute
 def update_certificate_visible_date_on_course_update(self, course_key):
     """
diff --git a/openedx/core/djangoapps/schedules/tasks.py b/openedx/core/djangoapps/schedules/tasks.py
index 3693ea75ca3..81d6092f78f 100644
--- a/openedx/core/djangoapps/schedules/tasks.py
+++ b/openedx/core/djangoapps/schedules/tasks.py
@@ -150,7 +150,7 @@ class BinnedScheduleMessageBaseTask(ScheduleMessageBaseTask):
         raise NotImplementedError
 
 
-@task(base=LoggedTask, ignore_result=True, routing_key=ROUTING_KEY)
+@task(base=LoggedTask, ignore_result=True)
 @set_code_owner_attribute
 def _recurring_nudge_schedule_send(site_id, msg_str):
     _schedule_send(
@@ -161,7 +161,7 @@ def _recurring_nudge_schedule_send(site_id, msg_str):
     )
 
 
-@task(base=LoggedTask, ignore_result=True, routing_key=ROUTING_KEY)
+@task(base=LoggedTask, ignore_result=True)
 @set_code_owner_attribute
 def _upgrade_reminder_schedule_send(site_id, msg_str):
     _schedule_send(
@@ -172,7 +172,7 @@ def _upgrade_reminder_schedule_send(site_id, msg_str):
     )
 
 
-@task(base=LoggedTask, ignore_result=True, routing_key=ROUTING_KEY)
+@task(base=LoggedTask, ignore_result=True)
 @set_code_owner_attribute
 def _course_update_schedule_send(site_id, msg_str):
     _schedule_send(
diff --git a/openedx/core/lib/celery/routers.py b/openedx/core/lib/celery/routers.py
index 50fef96f522..acc3434ebb6 100644
--- a/openedx/core/lib/celery/routers.py
+++ b/openedx/core/lib/celery/routers.py
@@ -5,62 +5,39 @@ For more, see https://celery.readthedocs.io/en/latest/userguide/routing.html#rou
 """
 
 import logging
-from abc import ABCMeta, abstractproperty
 
 from django.conf import settings
-import six
+
 
 log = logging.getLogger(__name__)
 
 
-class AlternateEnvironmentRouter(six.with_metaclass(ABCMeta, object)):
-    """
-    A custom Router class for use in routing celery tasks to non-default queues.
+def route_task(name, args, kwargs, options, task=None, **kw):  # pylint: disable=unused-argument
     """
+    Celery-defined method allowing for custom routing logic.
 
-    @abstractproperty
-    def alternate_env_tasks(self):
-        """
-        Defines the task -> alternate worker environment to be used when routing.
-
-        Subclasses must override this property with their own specific mappings.
-        """
-        return {}
-
-    @property
-    def explicit_queues(self):
-        """
-        Defines the task -> alternate worker queue to be used when routing.
-        """
-        return {}
-
-    def route_for_task(self, task, args=None, kwargs=None):  # pylint: disable=unused-argument
-        """
-        Celery-defined method allowing for custom routing logic.
-
-        If None is returned from this method, default routing logic is used.
-        """
-        if task in self.explicit_queues:
-            return self.explicit_queues[task]
+    If None is returned from this method, default routing logic is used.
+    """
+    if name in settings.EXPLICIT_QUEUES:
+        return settings.EXPLICIT_QUEUES[name]
 
-        alternate_env = self.alternate_env_tasks.get(task, None)
-        if alternate_env:
-            return self.ensure_queue_env(alternate_env)
+    alternate_env = settings.ALTERNATE_ENV_TASKS.get(name, None)
+    if alternate_env:
+        return ensure_queue_env(alternate_env)
 
-        return None
 
-    def ensure_queue_env(self, desired_env):
-        """
-        Helper method to get the desired type of queue.
+def ensure_queue_env(desired_env):
+    """
+    Helper method to get the desired type of queue.
 
-        If no such queue is defined, default routing logic is used.
-        """
-        queues = getattr(settings, 'CELERY_QUEUES', None)
-        return next(
-            (
-                queue
-                for queue in queues
-                if '.{}.'.format(desired_env) in queue
-            ),
-            None
-        )
+    If no such queue is defined, default routing logic is used.
+    """
+    queues = getattr(settings, 'CELERY_QUEUES', None)
+    return next(
+        (
+            queue
+            for queue in queues
+            if '.{}.'.format(desired_env) in queue
+        ),
+        None
+    )
-- 
GitLab