diff --git a/cms/celery.py b/cms/celery.py index 7936d04fe3d693303868db0ec7a69374138548b4..293121bcf22aea760e27aa2481ecdcf2a200e3fc 100644 --- a/cms/celery.py +++ b/cms/celery.py @@ -8,41 +8,8 @@ Taken from: https://celery.readthedocs.org/en/latest/django/first-steps-with-dja import os -from openedx.core.lib.celery.routers import AlternateEnvironmentRouter # Set the default Django settings module for the 'celery' program # and then instantiate the Celery singleton. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'cms.envs.production') from openedx.core.lib.celery import APP # pylint: disable=wrong-import-position,unused-import - -# Import after autodiscovery has had a chance to connect to the import_module signal -# so celery doesn't miss any apps getting installed. -from django.conf import settings # pylint: disable=wrong-import-position,wrong-import-order - - -class Router(AlternateEnvironmentRouter): - """ - An implementation of AlternateEnvironmentRouter, for routing tasks to non-cms queues. - """ - - @property - def alternate_env_tasks(self): - """ - Defines alternate environment tasks, as a dict of form { task_name: alternate_queue } - """ - # The tasks below will be routed to the default lms queue. - return { - 'completion_aggregator.tasks.update_aggregators': 'lms', - 'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache': 'lms', - 'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache_v2': 'lms', - } - - @property - def explicit_queues(self): - """ - Defines specific queues for tasks to run in (typically outside of the cms environment), - as a dict of form { task_name: queue_name }. - """ - return { - 'lms.djangoapps.grades.tasks.compute_all_grades_for_course': settings.POLICY_CHANGE_GRADES_ROUTING_KEY, - } diff --git a/cms/djangoapps/contentstore/tasks.py b/cms/djangoapps/contentstore/tasks.py index 97e8c5bbb1f9e4bd5a9a8e0bc7750050942789b4..ecc883a636a068b2c8324e3aafecff00b587278b 100644 --- a/cms/djangoapps/contentstore/tasks.py +++ b/cms/djangoapps/contentstore/tasks.py @@ -83,7 +83,7 @@ def clone_instance(instance, field_values): return instance -@task() +@task @set_code_owner_attribute def rerun_course(source_course_key_string, destination_course_key_string, user_id, fields=None): """ @@ -170,7 +170,7 @@ def _parse_time(time_isoformat): ).replace(tzinfo=UTC) -@task(routing_key=settings.UPDATE_SEARCH_INDEX_JOB_QUEUE) +@task @set_code_owner_attribute def update_search_index(course_id, triggered_time_isoformat): """ Updates course search index. """ @@ -195,7 +195,7 @@ def update_search_index(course_id, triggered_time_isoformat): LOGGER.debug(u'Search indexing successful for complete course %s', course_id) -@task() +@task @set_code_owner_attribute def update_library_index(library_id, triggered_time_isoformat): """ Updates course search index. """ diff --git a/cms/envs/production.py b/cms/envs/production.py index 8cf5473e43516ed3be38e9d1f9c6569b13e7b85c..317c842db999e059896783a509801f00397691d9 100644 --- a/cms/envs/production.py +++ b/cms/envs/production.py @@ -136,7 +136,7 @@ CELERY_QUEUES = { DEFAULT_PRIORITY_QUEUE: {} } -CELERY_ROUTES = "{}celery.Router".format(QUEUE_VARIANT) +CELERY_ROUTES = "openedx.core.lib.celery.routers.route_task" # STATIC_URL_BASE specifies the base url to use for static files STATIC_URL_BASE = ENV_TOKENS.get('STATIC_URL_BASE', None) @@ -574,3 +574,20 @@ LOGO_URL = ENV_TOKENS.get('LOGO_URL', LOGO_URL) LOGO_URL_PNG = ENV_TOKENS.get('LOGO_URL_PNG', LOGO_URL_PNG) LOGO_TRADEMARK_URL = ENV_TOKENS.get('LOGO_TRADEMARK_URL', LOGO_TRADEMARK_URL) FAVICON_URL = ENV_TOKENS.get('FAVICON_URL', FAVICON_URL) + +######################## CELERY ROTUING ######################## + +# Defines alternate environment tasks, as a dict of form { task_name: alternate_queue } +ALTERNATE_ENV_TASKS = { + 'completion_aggregator.tasks.update_aggregators': 'lms', + 'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache': 'lms', + 'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache_v2': 'lms', +} + +# Defines the task -> alternate worker queue to be used when routing. +EXPLICIT_QUEUES = { + 'lms.djangoapps.grades.tasks.compute_all_grades_for_course': { + 'queue': POLICY_CHANGE_GRADES_ROUTING_KEY}, + 'cms.djangoapps.contentstore.tasks.update_search_index': { + 'queue': UPDATE_SEARCH_INDEX_JOB_QUEUE}, +} diff --git a/common/djangoapps/entitlements/tasks.py b/common/djangoapps/entitlements/tasks.py index ddc01ca572a15871ee380c90f4d3b9bf04608a56..72b18ade253bd18bb79dccafef2fa6d7f83500af 100644 --- a/common/djangoapps/entitlements/tasks.py +++ b/common/djangoapps/entitlements/tasks.py @@ -11,8 +11,7 @@ from edx_django_utils.monitoring import set_code_owner_attribute from common.djangoapps.entitlements.models import CourseEntitlement LOGGER = get_task_logger(__name__) -# Under cms the following setting is not defined, leading to errors during tests. -ROUTING_KEY = getattr(settings, 'ENTITLEMENTS_EXPIRATION_ROUTING_KEY', None) + # Maximum number of retries before giving up on awarding credentials. # For reference, 11 retries with exponential backoff yields a maximum waiting # time of 2047 seconds (about 30 minutes). Setting this to None could yield @@ -23,7 +22,6 @@ MAX_RETRIES = 11 @task( bind=True, ignore_result=True, - routing_key=ROUTING_KEY, name='entitlements.expire_old_entitlements', ) @set_code_owner_attribute diff --git a/lms/celery.py b/lms/celery.py index 7f4c44c9b2ebb15e53a62293feae997ac017172b..808df030ef60364297e89ab31898840949664a6b 100644 --- a/lms/celery.py +++ b/lms/celery.py @@ -5,25 +5,10 @@ and auto discover tasks in all installed django apps. Taken from: https://celery.readthedocs.org/en/latest/django/first-steps-with-django.html """ - import os -from openedx.core.lib.celery.routers import AlternateEnvironmentRouter # Set the default Django settings module for the 'celery' program # and then instantiate the Celery singleton. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'lms.envs.production') from openedx.core.lib.celery import APP # pylint: disable=wrong-import-position,unused-import - - -class Router(AlternateEnvironmentRouter): - """ - An implementation of AlternateEnvironmentRouter, for routing tasks to non-cms queues. - """ - - @property - def alternate_env_tasks(self): - """ - Defines alternate environment tasks, as a dict of form { task_name: alternate_queue } - """ - return {} diff --git a/lms/djangoapps/bulk_email/tasks.py b/lms/djangoapps/bulk_email/tasks.py index 447482dbefb14e29bc62a7e0a5dd0898c14755bb..65665f2289e09902f4440ccdee4419a59675fa01 100644 --- a/lms/djangoapps/bulk_email/tasks.py +++ b/lms/djangoapps/bulk_email/tasks.py @@ -197,8 +197,6 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) total_recipients = combined_set.count() - routing_key = settings.BULK_EMAIL_ROUTING_KEY - # Weird things happen if we allow empty querysets as input to emailing subtasks # The task appears to hang at "0 out of 0 completed" and never finishes. if total_recipients == 0: @@ -218,7 +216,6 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name) initial_subtask_status.to_dict(), ), task_id=subtask_id, - routing_key=routing_key, ) return new_subtask diff --git a/lms/djangoapps/discussion/tasks.py b/lms/djangoapps/discussion/tasks.py index 8c453e153cf6152306898d2b1dffd7654491a9b8..2a00c0c73ce84fdede83b0184ef8cd1ae2ffc972 100644 --- a/lms/djangoapps/discussion/tasks.py +++ b/lms/djangoapps/discussion/tasks.py @@ -36,7 +36,6 @@ log = logging.getLogger(__name__) DEFAULT_LANGUAGE = 'en' -ROUTING_KEY = getattr(settings, 'ACE_ROUTING_KEY', None) @task(base=LoggedTask) @@ -62,7 +61,7 @@ class ResponseNotification(BaseMessageType): pass -@task(base=LoggedTask, routing_key=ROUTING_KEY) +@task(base=LoggedTask) @set_code_owner_attribute def send_ace_message(context): context['course_id'] = CourseKey.from_string(context['course_id']) diff --git a/lms/djangoapps/email_marketing/tasks.py b/lms/djangoapps/email_marketing/tasks.py index 21051f43452bcdd2b9f1b956c115a0e731bf25e2..ce68f95e3cdde00f64719405e0bdf2c2b86cd586 100644 --- a/lms/djangoapps/email_marketing/tasks.py +++ b/lms/djangoapps/email_marketing/tasks.py @@ -19,10 +19,9 @@ from .models import EmailMarketingConfiguration log = logging.getLogger(__name__) SAILTHRU_LIST_CACHE_KEY = "email.marketing.cache" -ACE_ROUTING_KEY = getattr(settings, 'ACE_ROUTING_KEY', None) -@task(bind=True, routing_key=ACE_ROUTING_KEY) +@task(bind=True) @set_code_owner_attribute def get_email_cookies_via_sailthru(self, user_email, post_parms): """ @@ -63,7 +62,7 @@ def get_email_cookies_via_sailthru(self, user_email, post_parms): return None -@task(bind=True, default_retry_delay=3600, max_retries=24, routing_key=ACE_ROUTING_KEY) +@task(bind=True, default_retry_delay=3600, max_retries=24) @set_code_owner_attribute def update_user(self, sailthru_vars, email, site=None, new_user=False, activation=False): """ @@ -146,7 +145,7 @@ def is_default_site(site): return not site or site.get('id') == settings.SITE_ID -@task(bind=True, default_retry_delay=3600, max_retries=24, routing_key=ACE_ROUTING_KEY) +@task(bind=True, default_retry_delay=3600, max_retries=24) @set_code_owner_attribute def update_user_email(self, new_email, old_email): """ @@ -307,7 +306,7 @@ def _retryable_sailthru_error(error): return code == 9 or code == 43 -@task(bind=True, routing_key=ACE_ROUTING_KEY) +@task(bind=True) @set_code_owner_attribute def update_course_enrollment(self, email, course_key, mode, site=None): """Adds/updates Sailthru when a user adds to cart/purchases/upgrades a course diff --git a/lms/djangoapps/gating/tasks.py b/lms/djangoapps/gating/tasks.py index 5e43bcd2b31430c0bde90ac1e2dfe8dc8f568cbd..35ce1ee5d932ea4087081781faf39c485157da57 100644 --- a/lms/djangoapps/gating/tasks.py +++ b/lms/djangoapps/gating/tasks.py @@ -18,7 +18,7 @@ from xmodule.modulestore.django import modulestore log = logging.getLogger(__name__) -@task() +@task @set_code_owner_attribute def task_evaluate_subsection_completion_milestones(course_id, block_id, user_id): """ diff --git a/lms/djangoapps/grades/management/commands/compute_grades.py b/lms/djangoapps/grades/management/commands/compute_grades.py index ed047dd5ac22a4335efdf6b9a9cc8ebda5803cff..bcaecbffb2a6c6efa10c8777270c50952a0a7b60 100644 --- a/lms/djangoapps/grades/management/commands/compute_grades.py +++ b/lms/djangoapps/grades/management/commands/compute_grades.py @@ -80,7 +80,7 @@ class Command(BaseCommand): """ Enqueue all tasks, in shuffled order. """ - task_options = {'routing_key': options['routing_key']} if options.get('routing_key') else {} + task_options = {'queue': options['routing_key']} if options.get('routing_key') else {} for seq_id, kwargs in enumerate(self._shuffled_task_kwargs(options)): kwargs['seq_id'] = seq_id result = tasks.compute_grades_for_course_v2.apply_async(kwargs=kwargs, **task_options) diff --git a/lms/djangoapps/grades/management/commands/tests/test_compute_grades.py b/lms/djangoapps/grades/management/commands/tests/test_compute_grades.py index 7ca5fb9dad54fc67c6e3ba72a8761fc8cb5f6f0f..adb55066a0b3cc5fbca43f1f081e2db9d1e08f06 100644 --- a/lms/djangoapps/grades/management/commands/tests/test_compute_grades.py +++ b/lms/djangoapps/grades/management/commands/tests/test_compute_grades.py @@ -92,19 +92,19 @@ class TestComputeGrades(SharedModuleStoreTestCase): # Order doesn't matter, but can't use a set because dicts aren't hashable expected = [ ({ - 'routing_key': 'key', + 'queue': 'key', 'kwargs': _kwargs(self.course_keys[0], 0) },), ({ - 'routing_key': 'key', + 'queue': 'key', 'kwargs': _kwargs(self.course_keys[0], 2) },), ({ - 'routing_key': 'key', + 'queue': 'key', 'kwargs': _kwargs(self.course_keys[3], 0) },), ({ - 'routing_key': 'key', + 'queue': 'key', 'kwargs': _kwargs(self.course_keys[3], 2) },), ] diff --git a/lms/djangoapps/grades/tasks.py b/lms/djangoapps/grades/tasks.py index 2ccfdeb8af5b10b971b07c63f9ca9386f0abd192..8f25b348422fece526d71a2a70d0992a2bcb342b 100644 --- a/lms/djangoapps/grades/tasks.py +++ b/lms/djangoapps/grades/tasks.py @@ -52,7 +52,7 @@ RETRY_DELAY_SECONDS = 40 SUBSECTION_GRADE_TIMEOUT_SECONDS = 300 -@task(base=LoggedPersistOnFailureTask, routing_key=settings.POLICY_CHANGE_GRADES_ROUTING_KEY) +@task(base=LoggedPersistOnFailureTask) @set_code_owner_attribute def compute_all_grades_for_course(**kwargs): """ @@ -74,7 +74,7 @@ def compute_all_grades_for_course(**kwargs): 'batch_size': batch_size, }) compute_grades_for_course_v2.apply_async( - kwargs=kwargs, routing_key=settings.POLICY_CHANGE_GRADES_ROUTING_KEY + kwargs=kwargs, queue=settings.POLICY_CHANGE_GRADES_ROUTING_KEY ) @@ -138,7 +138,6 @@ def compute_grades_for_course(course_key, offset, batch_size, **kwargs): # pyli time_limit=SUBSECTION_GRADE_TIMEOUT_SECONDS, max_retries=2, default_retry_delay=RETRY_DELAY_SECONDS, - routing_key=settings.POLICY_CHANGE_GRADES_ROUTING_KEY ) @set_code_owner_attribute def recalculate_course_and_subsection_grades_for_user(self, **kwargs): # pylint: disable=unused-argument @@ -179,8 +178,7 @@ def recalculate_course_and_subsection_grades_for_user(self, **kwargs): # pylint base=LoggedPersistOnFailureTask, time_limit=SUBSECTION_GRADE_TIMEOUT_SECONDS, max_retries=2, - default_retry_delay=RETRY_DELAY_SECONDS, - routing_key=settings.RECALCULATE_GRADES_ROUTING_KEY + default_retry_delay=RETRY_DELAY_SECONDS ) @set_code_owner_attribute def recalculate_subsection_grade_v3(self, **kwargs): diff --git a/lms/djangoapps/instructor_task/tasks.py b/lms/djangoapps/instructor_task/tasks.py index 34d497482685aa0664c4c5b8179ed501188a6cf9..8a05d213ff585d093f8935a2c1cba971d99ffcf6 100644 --- a/lms/djangoapps/instructor_task/tasks.py +++ b/lms/djangoapps/instructor_task/tasks.py @@ -168,7 +168,6 @@ def send_bulk_course_email(entry_id, _xmodule_instance_args): @task( name='lms.djangoapps.instructor_task.tasks.calculate_problem_responses_csv.v2', base=BaseInstructorTask, - routing_key=settings.GRADES_DOWNLOAD_ROUTING_KEY, ) @set_code_owner_attribute def calculate_problem_responses_csv(entry_id, xmodule_instance_args): @@ -182,7 +181,7 @@ def calculate_problem_responses_csv(entry_id, xmodule_instance_args): return run_main_task(entry_id, task_fn, action_name) -@task(base=BaseInstructorTask, routing_key=settings.GRADES_DOWNLOAD_ROUTING_KEY) +@task(base=BaseInstructorTask) @set_code_owner_attribute def calculate_grades_csv(entry_id, xmodule_instance_args): """ @@ -199,7 +198,7 @@ def calculate_grades_csv(entry_id, xmodule_instance_args): return run_main_task(entry_id, task_fn, action_name) -@task(base=BaseInstructorTask, routing_key=settings.GRADES_DOWNLOAD_ROUTING_KEY) +@task(base=BaseInstructorTask) @set_code_owner_attribute def calculate_problem_grade_report(entry_id, xmodule_instance_args): """ @@ -269,7 +268,7 @@ def calculate_may_enroll_csv(entry_id, xmodule_instance_args): return run_main_task(entry_id, task_fn, action_name) -@task(base=BaseInstructorTask, routing_key=settings.GRADES_DOWNLOAD_ROUTING_KEY) +@task(base=BaseInstructorTask) @set_code_owner_attribute def generate_certificates(entry_id, xmodule_instance_args): """ diff --git a/lms/djangoapps/verify_student/tasks.py b/lms/djangoapps/verify_student/tasks.py index aca08deac0a862d3100326bf27c5c1a9e8d16c91..f3acc5744d9a87d495344b52b301fa4fbcfaa73a 100644 --- a/lms/djangoapps/verify_student/tasks.py +++ b/lms/djangoapps/verify_student/tasks.py @@ -16,8 +16,6 @@ from edx_django_utils.monitoring import set_code_owner_attribute from common.djangoapps.edxmako.shortcuts import render_to_string from openedx.core.djangoapps.site_configuration import helpers as configuration_helpers -ACE_ROUTING_KEY = getattr(settings, 'ACE_ROUTING_KEY', None) -SOFTWARE_SECURE_VERIFICATION_ROUTING_KEY = getattr(settings, 'SOFTWARE_SECURE_VERIFICATION_ROUTING_KEY', None) log = logging.getLogger(__name__) @@ -74,7 +72,7 @@ class BaseSoftwareSecureTask(Task): ) -@task(routing_key=ACE_ROUTING_KEY) +@task @set_code_owner_attribute def send_verification_status_email(context): """ @@ -101,7 +99,6 @@ def send_verification_status_email(context): bind=True, default_retry_delay=settings.SOFTWARE_SECURE_REQUEST_RETRY_DELAY, max_retries=settings.SOFTWARE_SECURE_RETRY_MAX_ATTEMPTS, - routing_key=SOFTWARE_SECURE_VERIFICATION_ROUTING_KEY, ) @set_code_owner_attribute def send_request_to_ss_for_user(self, user_verification_id, copy_id_photo_from): diff --git a/lms/envs/production.py b/lms/envs/production.py index 4aaf20a0989cd833f62b7695e6f4915462f1e57c..7a60ef949bf33f73805168f028a3a1910f64dbf7 100644 --- a/lms/envs/production.py +++ b/lms/envs/production.py @@ -150,7 +150,7 @@ CELERY_QUEUES = { HIGH_MEM_QUEUE: {}, } -CELERY_ROUTES = "{}celery.Router".format(QUEUE_VARIANT) +CELERY_ROUTES = "openedx.core.lib.celery.routers.route_task" CELERYBEAT_SCHEDULE = {} # For scheduling tasks, entries can be added to this dict # STATIC_ROOT specifies the directory where static files are @@ -980,3 +980,62 @@ LOGO_URL = ENV_TOKENS.get('LOGO_URL', LOGO_URL) LOGO_URL_PNG = ENV_TOKENS.get('LOGO_URL_PNG', LOGO_URL_PNG) LOGO_TRADEMARK_URL = ENV_TOKENS.get('LOGO_TRADEMARK_URL', LOGO_TRADEMARK_URL) FAVICON_URL = ENV_TOKENS.get('FAVICON_URL', FAVICON_URL) + +######################## CELERY ROUTING ######################## + +# Defines alternate environment tasks, as a dict of form { task_name: alternate_queue } +ALTERNATE_ENV_TASKS = {} + +# Defines the task -> alternate worker queue to be used when routing. +EXPLICIT_QUEUES = { + 'openedx.core.djangoapps.content.course_overviews.tasks.async_course_overview_update': { + 'queue': GRADES_DOWNLOAD_ROUTING_KEY}, + 'lms.djangoapps.bulk_email.tasks.send_course_email': { + 'queue': BULK_EMAIL_ROUTING_KEY}, + 'openedx.core.djangoapps.heartbeat.tasks.sample_task': { + 'queue': HEARTBEAT_CELERY_ROUTING_KEY}, + 'lms.djangoapps.instructor_task.tasks.calculate_grades_csv': { + 'queue': GRADES_DOWNLOAD_ROUTING_KEY}, + 'lms.djangoapps.instructor_task.tasks.calculate_problem_grade_report': { + 'queue': GRADES_DOWNLOAD_ROUTING_KEY}, + 'lms.djangoapps.instructor_task.tasks.generate_certificates': { + 'queue': GRADES_DOWNLOAD_ROUTING_KEY}, + 'lms.djangoapps.email_marketing.tasks.get_email_cookies_via_sailthru': { + 'queue': ACE_ROUTING_KEY}, + 'lms.djangoapps.email_marketing.tasks.update_user': { + 'queue': ACE_ROUTING_KEY}, + 'lms.djangoapps.email_marketing.tasks.update_user_email': { + 'queue': ACE_ROUTING_KEY}, + 'lms.djangoapps.email_marketing.tasks.update_course_enrollment': { + 'queue': ACE_ROUTING_KEY}, + 'lms.djangoapps.verify_student.tasks.send_verification_status_email': { + 'queue': ACE_ROUTING_KEY}, + 'lms.djangoapps.verify_student.tasks.send_ace_message': { + 'queue': ACE_ROUTING_KEY}, + 'lms.djangoapps.verify_student.tasks.send_request_to_ss_for_user': { + 'queue': SOFTWARE_SECURE_VERIFICATION_ROUTING_KEY}, + 'openedx.core.djangoapps.schedules.tasks._recurring_nudge_schedule_send': { + 'queue': ACE_ROUTING_KEY}, + 'openedx.core.djangoapps.schedules.tasks._upgrade_reminder_schedule_send': { + 'queue': ACE_ROUTING_KEY}, + 'openedx.core.djangoapps.schedules.tasks._course_update_schedule_send': { + 'queue': ACE_ROUTING_KEY}, + 'openedx.core.djangoapps.schedules.tasks.v1.tasks.send_grade_to_credentials': { + 'queue': CREDENTIALS_GENERATION_ROUTING_KEY}, + 'common.djangoapps.entitlements.tasks.expire_old_entitlements': { + 'queue': ENTITLEMENTS_EXPIRATION_ROUTING_KEY}, + 'lms.djangoapps.grades.tasks.recalculate_course_and_subsection_grades_for_user': { + 'queue': POLICY_CHANGE_GRADES_ROUTING_KEY}, + 'lms.djangoapps.grades.tasks.recalculate_subsection_grade_v3': { + 'queue': RECALCULATE_GRADES_ROUTING_KEY}, + 'openedx.core.djangoapps.programs.tasks.v1.tasks.award_program_certificates': { + 'queue': PROGRAM_CERTIFICATES_ROUTING_KEY}, + 'openedx.core.djangoapps.programs.tasks.v1.tasks.revoke_program_certificates': { + 'queue': PROGRAM_CERTIFICATES_ROUTING_KEY}, + 'openedx.core.djangoapps.programs.tasks.v1.tasks.update_certificate_visible_date_on_course_update': { + 'queue': PROGRAM_CERTIFICATES_ROUTING_KEY}, + 'openedx.core.djangoapps.programs.tasks.v1.tasks.award_course_certificate': { + 'queue': PROGRAM_CERTIFICATES_ROUTING_KEY}, + 'openedx.core.djangoapps.coursegraph.dump_course_to_neo4j': { + 'queue': COURSEGRAPH_JOB_QUEUE}, +} diff --git a/openedx/core/djangoapps/coursegraph/tasks.py b/openedx/core/djangoapps/coursegraph/tasks.py index b9c2ec0894bcccc314707020682941dc78302559..3eec402497eb5f3c768eda19dd97a56af9d425ae 100644 --- a/openedx/core/djangoapps/coursegraph/tasks.py +++ b/openedx/core/djangoapps/coursegraph/tasks.py @@ -247,7 +247,7 @@ def should_dump_course(course_key, graph): return last_this_command_was_run < course_last_published_date -@task(routing_key=settings.COURSEGRAPH_JOB_QUEUE) +@task @set_code_owner_attribute def dump_course_to_neo4j(course_key_string, credentials): """ diff --git a/openedx/core/djangoapps/credentials/tasks/v1/tasks.py b/openedx/core/djangoapps/credentials/tasks/v1/tasks.py index 040aaba75bde4c6a32cc92c9420dcf681a76261f..8492d5cc8938b5ad38491a00512e8758a01c69a2 100644 --- a/openedx/core/djangoapps/credentials/tasks/v1/tasks.py +++ b/openedx/core/djangoapps/credentials/tasks/v1/tasks.py @@ -14,11 +14,6 @@ from openedx.core.djangoapps.credentials.utils import get_credentials_api_client logger = get_task_logger(__name__) -# Under cms the following setting is not defined, leading to errors during tests. -# These tasks aren't strictly credentials generation, but are similar in the sense -# that they generate records on the credentials side. And have a similar SLA. -ROUTING_KEY = getattr(settings, 'CREDENTIALS_GENERATION_ROUTING_KEY', None) - # Maximum number of retries before giving up. # For reference, 11 retries with exponential backoff yields a maximum waiting # time of 2047 seconds (about 30 minutes). Setting this to None could yield @@ -26,7 +21,7 @@ ROUTING_KEY = getattr(settings, 'CREDENTIALS_GENERATION_ROUTING_KEY', None) MAX_RETRIES = 11 -@task(bind=True, ignore_result=True, routing_key=ROUTING_KEY) +@task(bind=True, ignore_result=True) @set_code_owner_attribute def send_grade_to_credentials(self, username, course_run_key, verified, letter_grade, percent_grade): """ Celery task to notify the Credentials IDA of a grade change via POST. """ diff --git a/openedx/core/djangoapps/heartbeat/tasks.py b/openedx/core/djangoapps/heartbeat/tasks.py index 7dd184b760f8377421283e366ced67d6b60f7a6b..962d249d12f5bee96ef2f24f75f294bff8bfa09e 100644 --- a/openedx/core/djangoapps/heartbeat/tasks.py +++ b/openedx/core/djangoapps/heartbeat/tasks.py @@ -4,11 +4,10 @@ A trivial task for health checks from celery.task import task -from django.conf import settings from edx_django_utils.monitoring import set_code_owner_attribute -@task(routing_key=settings.HEARTBEAT_CELERY_ROUTING_KEY) +@task @set_code_owner_attribute def sample_task(): return True diff --git a/openedx/core/djangoapps/programs/tasks.py b/openedx/core/djangoapps/programs/tasks.py index 8d0853f1e654f63ef11f715ab11e1fc10552cec1..a25f7e3eceb073f647f52e87b3bcfb8a18951402 100644 --- a/openedx/core/djangoapps/programs/tasks.py +++ b/openedx/core/djangoapps/programs/tasks.py @@ -23,9 +23,6 @@ from openedx.core.djangoapps.programs.utils import ProgramProgressMeter from openedx.core.djangoapps.site_configuration import helpers as configuration_helpers LOGGER = get_task_logger(__name__) -# Under cms the following setting is not defined, leading to errors during tests. -ROUTING_KEY = getattr(settings, 'CREDENTIALS_GENERATION_ROUTING_KEY', None) -PROGRAM_CERTIFICATES_ROUTING_KEY = getattr(settings, 'PROGRAM_CERTIFICATES_ROUTING_KEY', None) # Maximum number of retries before giving up on awarding credentials. # For reference, 11 retries with exponential backoff yields a maximum waiting # time of 2047 seconds (about 30 minutes). Setting this to None could yield @@ -124,7 +121,7 @@ def award_program_certificate(client, username, program_uuid, visible_date): }) -@task(bind=True, ignore_result=True, routing_key=PROGRAM_CERTIFICATES_ROUTING_KEY) +@task(bind=True, ignore_result=True) @set_code_owner_attribute def award_program_certificates(self, username): """ @@ -287,7 +284,7 @@ def post_course_certificate(client, username, certificate, visible_date): }) -@task(bind=True, ignore_result=True, routing_key=ROUTING_KEY) +@task(bind=True, ignore_result=True) @set_code_owner_attribute def award_course_certificate(self, username, course_run_key): """ @@ -402,7 +399,7 @@ def revoke_program_certificate(client, username, program_uuid): }) -@task(bind=True, ignore_result=True, routing_key=PROGRAM_CERTIFICATES_ROUTING_KEY) +@task(bind=True, ignore_result=True) @set_code_owner_attribute def revoke_program_certificates(self, username, course_key): """ @@ -526,7 +523,7 @@ def revoke_program_certificates(self, username, course_key): LOGGER.info(u'Successfully completed the task revoke_program_certificates for username %s', username) -@task(bind=True, ignore_result=True, routing_key=PROGRAM_CERTIFICATES_ROUTING_KEY) +@task(bind=True, ignore_result=True) @set_code_owner_attribute def update_certificate_visible_date_on_course_update(self, course_key): """ diff --git a/openedx/core/djangoapps/schedules/tasks.py b/openedx/core/djangoapps/schedules/tasks.py index 3693ea75ca3624501677d5d5654c4ec7c542e49e..81d6092f78fa5dc1c3d9e169853bef83fd714014 100644 --- a/openedx/core/djangoapps/schedules/tasks.py +++ b/openedx/core/djangoapps/schedules/tasks.py @@ -150,7 +150,7 @@ class BinnedScheduleMessageBaseTask(ScheduleMessageBaseTask): raise NotImplementedError -@task(base=LoggedTask, ignore_result=True, routing_key=ROUTING_KEY) +@task(base=LoggedTask, ignore_result=True) @set_code_owner_attribute def _recurring_nudge_schedule_send(site_id, msg_str): _schedule_send( @@ -161,7 +161,7 @@ def _recurring_nudge_schedule_send(site_id, msg_str): ) -@task(base=LoggedTask, ignore_result=True, routing_key=ROUTING_KEY) +@task(base=LoggedTask, ignore_result=True) @set_code_owner_attribute def _upgrade_reminder_schedule_send(site_id, msg_str): _schedule_send( @@ -172,7 +172,7 @@ def _upgrade_reminder_schedule_send(site_id, msg_str): ) -@task(base=LoggedTask, ignore_result=True, routing_key=ROUTING_KEY) +@task(base=LoggedTask, ignore_result=True) @set_code_owner_attribute def _course_update_schedule_send(site_id, msg_str): _schedule_send( diff --git a/openedx/core/lib/celery/routers.py b/openedx/core/lib/celery/routers.py index 50fef96f52261c9021909155fe4539a02ac92a75..acc3434ebb6dcc8b0203b3bd094a64cbc6c0fbae 100644 --- a/openedx/core/lib/celery/routers.py +++ b/openedx/core/lib/celery/routers.py @@ -5,62 +5,39 @@ For more, see https://celery.readthedocs.io/en/latest/userguide/routing.html#rou """ import logging -from abc import ABCMeta, abstractproperty from django.conf import settings -import six + log = logging.getLogger(__name__) -class AlternateEnvironmentRouter(six.with_metaclass(ABCMeta, object)): - """ - A custom Router class for use in routing celery tasks to non-default queues. +def route_task(name, args, kwargs, options, task=None, **kw): # pylint: disable=unused-argument """ + Celery-defined method allowing for custom routing logic. - @abstractproperty - def alternate_env_tasks(self): - """ - Defines the task -> alternate worker environment to be used when routing. - - Subclasses must override this property with their own specific mappings. - """ - return {} - - @property - def explicit_queues(self): - """ - Defines the task -> alternate worker queue to be used when routing. - """ - return {} - - def route_for_task(self, task, args=None, kwargs=None): # pylint: disable=unused-argument - """ - Celery-defined method allowing for custom routing logic. - - If None is returned from this method, default routing logic is used. - """ - if task in self.explicit_queues: - return self.explicit_queues[task] + If None is returned from this method, default routing logic is used. + """ + if name in settings.EXPLICIT_QUEUES: + return settings.EXPLICIT_QUEUES[name] - alternate_env = self.alternate_env_tasks.get(task, None) - if alternate_env: - return self.ensure_queue_env(alternate_env) + alternate_env = settings.ALTERNATE_ENV_TASKS.get(name, None) + if alternate_env: + return ensure_queue_env(alternate_env) - return None - def ensure_queue_env(self, desired_env): - """ - Helper method to get the desired type of queue. +def ensure_queue_env(desired_env): + """ + Helper method to get the desired type of queue. - If no such queue is defined, default routing logic is used. - """ - queues = getattr(settings, 'CELERY_QUEUES', None) - return next( - ( - queue - for queue in queues - if '.{}.'.format(desired_env) in queue - ), - None - ) + If no such queue is defined, default routing logic is used. + """ + queues = getattr(settings, 'CELERY_QUEUES', None) + return next( + ( + queue + for queue in queues + if '.{}.'.format(desired_env) in queue + ), + None + )