Skip to content
Snippets Groups Projects
Unverified Commit db4c3b12 authored by Awais Qureshi's avatar Awais Qureshi Committed by GitHub
Browse files

Revert "Revert ""Update routing config" (#25536)" (#25549)" (#25553)

This reverts commit c1fe3c3a.
parent 0f6d94d1
Branches
Tags
No related merge requests found
Showing
with 146 additions and 139 deletions
......@@ -10,7 +10,7 @@ import os
from celery import Celery
from openedx.core.lib.celery.routers import AlternateEnvironmentRouter
from openedx.core.lib.celery.routers import route_task_queue
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'cms.envs.production')
......@@ -23,34 +23,12 @@ APP.conf.task_protocol = 1
APP.config_from_object('django.conf:settings')
APP.autodiscover_tasks()
# Import after autodiscovery has had a chance to connect to the import_module signal
# so celery doesn't miss any apps getting installed.
from django.conf import settings # pylint: disable=wrong-import-position,wrong-import-order
class Router(AlternateEnvironmentRouter):
def route_task(name, args, kwargs, options, task=None, **kw): # pylint: disable=unused-argument
"""
An implementation of AlternateEnvironmentRouter, for routing tasks to non-cms queues.
Celery-defined method allowing for custom routing logic.
If None is returned from this method, default routing logic is used.
"""
@property
def alternate_env_tasks(self):
"""
Defines alternate environment tasks, as a dict of form { task_name: alternate_queue }
"""
# The tasks below will be routed to the default lms queue.
return {
'completion_aggregator.tasks.update_aggregators': 'lms',
'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache': 'lms',
'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache_v2': 'lms',
}
@property
def explicit_queues(self):
"""
Defines specific queues for tasks to run in (typically outside of the cms environment),
as a dict of form { task_name: queue_name }.
"""
return {
'lms.djangoapps.grades.tasks.compute_all_grades_for_course': settings.POLICY_CHANGE_GRADES_ROUTING_KEY,
}
return route_task_queue(name)
......@@ -168,7 +168,7 @@ def _parse_time(time_isoformat):
).replace(tzinfo=UTC)
@task(routing_key=settings.UPDATE_SEARCH_INDEX_JOB_QUEUE)
@task
def update_search_index(course_id, triggered_time_isoformat):
""" Updates course search index. """
try:
......
......@@ -136,7 +136,7 @@ CELERY_QUEUES = {
DEFAULT_PRIORITY_QUEUE: {}
}
CELERY_ROUTES = "{}celery.Router".format(QUEUE_VARIANT)
CELERY_ROUTES = "{}celery.route_task".format(QUEUE_VARIANT)
# STATIC_URL_BASE specifies the base url to use for static files
STATIC_URL_BASE = ENV_TOKENS.get('STATIC_URL_BASE', None)
......@@ -567,3 +567,20 @@ if FEATURES.get('ENABLE_CORS_HEADERS'):
CORS_ALLOW_HEADERS = corsheaders_default_headers + (
'use-jwt-cookie',
)
######################## CELERY ROTUING ########################
# Defines alternate environment tasks, as a dict of form { task_name: alternate_queue }
ALTERNATE_ENV_TASKS = {
'completion_aggregator.tasks.update_aggregators': 'lms',
'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache': 'lms',
'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache_v2': 'lms',
}
# Defines the task -> alternate worker queue to be used when routing.
EXPLICIT_QUEUES = {
'lms.djangoapps.grades.tasks.compute_all_grades_for_course': {
'queue': POLICY_CHANGE_GRADES_ROUTING_KEY},
'cms.djangoapps.contentstore.tasks.update_search_index': {
'queue': UPDATE_SEARCH_INDEX_JOB_QUEUE},
}
......@@ -5,13 +5,11 @@ This file contains celery tasks for entitlements-related functionality.
from celery import task
from celery.utils.log import get_task_logger
from django.conf import settings
from common.djangoapps.entitlements.models import CourseEntitlement
LOGGER = get_task_logger(__name__)
# Under cms the following setting is not defined, leading to errors during tests.
ROUTING_KEY = getattr(settings, 'ENTITLEMENTS_EXPIRATION_ROUTING_KEY', None)
# Maximum number of retries before giving up on awarding credentials.
# For reference, 11 retries with exponential backoff yields a maximum waiting
# time of 2047 seconds (about 30 minutes). Setting this to None could yield
......@@ -19,7 +17,7 @@ ROUTING_KEY = getattr(settings, 'ENTITLEMENTS_EXPIRATION_ROUTING_KEY', None)
MAX_RETRIES = 11
@task(bind=True, ignore_result=True, routing_key=ROUTING_KEY)
@task(bind=True, ignore_result=True)
def expire_old_entitlements(self, start, end, logid='...'):
"""
This task is designed to be called to process a bundle of entitlements
......
......@@ -5,12 +5,11 @@ and auto discover tasks in all installed django apps.
Taken from: https://celery.readthedocs.org/en/latest/django/first-steps-with-django.html
"""
import os
from celery import Celery
from openedx.core.lib.celery.routers import AlternateEnvironmentRouter
from openedx.core.lib.celery.routers import route_task_queue
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'lms.envs.production')
......@@ -24,14 +23,11 @@ APP.config_from_object('django.conf:settings')
APP.autodiscover_tasks()
class Router(AlternateEnvironmentRouter):
def route_task(name, args, kwargs, options, task=None, **kw): # pylint: disable=unused-argument
"""
An implementation of AlternateEnvironmentRouter, for routing tasks to non-cms queues.
Celery-defined method allowing for custom routing logic.
If None is returned from this method, default routing logic is used.
"""
@property
def alternate_env_tasks(self):
"""
Defines alternate environment tasks, as a dict of form { task_name: alternate_queue }
"""
return {}
return route_task_queue(name)
......@@ -196,8 +196,6 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
total_recipients = combined_set.count()
routing_key = settings.BULK_EMAIL_ROUTING_KEY
# Weird things happen if we allow empty querysets as input to emailing subtasks
# The task appears to hang at "0 out of 0 completed" and never finishes.
if total_recipients == 0:
......@@ -217,7 +215,6 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
initial_subtask_status.to_dict(),
),
task_id=subtask_id,
routing_key=routing_key,
)
return new_subtask
......
......@@ -35,7 +35,6 @@ log = logging.getLogger(__name__)
DEFAULT_LANGUAGE = 'en'
ROUTING_KEY = getattr(settings, 'ACE_ROUTING_KEY', None)
@task(base=LoggedTask)
......@@ -60,7 +59,7 @@ class ResponseNotification(BaseMessageType):
pass
@task(base=LoggedTask, routing_key=ROUTING_KEY)
@task(base=LoggedTask)
def send_ace_message(context):
context['course_id'] = CourseKey.from_string(context['course_id'])
......
......@@ -18,10 +18,9 @@ from .models import EmailMarketingConfiguration
log = logging.getLogger(__name__)
SAILTHRU_LIST_CACHE_KEY = "email.marketing.cache"
ACE_ROUTING_KEY = getattr(settings, 'ACE_ROUTING_KEY', None)
@task(bind=True, routing_key=ACE_ROUTING_KEY)
@task(bind=True)
def get_email_cookies_via_sailthru(self, user_email, post_parms):
"""
Adds/updates Sailthru cookie information for a new user.
......@@ -61,7 +60,7 @@ def get_email_cookies_via_sailthru(self, user_email, post_parms):
return None
@task(bind=True, default_retry_delay=3600, max_retries=24, routing_key=ACE_ROUTING_KEY)
@task(bind=True, default_retry_delay=3600, max_retries=24)
def update_user(self, sailthru_vars, email, site=None, new_user=False, activation=False):
"""
Adds/updates Sailthru profile information for a user.
......@@ -143,7 +142,7 @@ def is_default_site(site):
return not site or site.get('id') == settings.SITE_ID
@task(bind=True, default_retry_delay=3600, max_retries=24, routing_key=ACE_ROUTING_KEY)
@task(bind=True, default_retry_delay=3600, max_retries=24)
def update_user_email(self, new_email, old_email):
"""
Adds/updates Sailthru when a user email address is changed
......@@ -303,7 +302,7 @@ def _retryable_sailthru_error(error):
return code == 9 or code == 43
@task(bind=True, routing_key=ACE_ROUTING_KEY)
@task(bind=True)
def update_course_enrollment(self, email, course_key, mode, site=None):
"""Adds/updates Sailthru when a user adds to cart/purchases/upgrades a course
Args:
......
......@@ -80,7 +80,7 @@ class Command(BaseCommand):
"""
Enqueue all tasks, in shuffled order.
"""
task_options = {'routing_key': options['routing_key']} if options.get('routing_key') else {}
task_options = {'queue': options['routing_key']} if options.get('routing_key') else {}
for seq_id, kwargs in enumerate(self._shuffled_task_kwargs(options)):
kwargs['seq_id'] = seq_id
result = tasks.compute_grades_for_course_v2.apply_async(kwargs=kwargs, **task_options)
......
......@@ -92,19 +92,19 @@ class TestComputeGrades(SharedModuleStoreTestCase):
# Order doesn't matter, but can't use a set because dicts aren't hashable
expected = [
({
'routing_key': 'key',
'queue': 'key',
'kwargs': _kwargs(self.course_keys[0], 0)
},),
({
'routing_key': 'key',
'queue': 'key',
'kwargs': _kwargs(self.course_keys[0], 2)
},),
({
'routing_key': 'key',
'queue': 'key',
'kwargs': _kwargs(self.course_keys[3], 0)
},),
({
'routing_key': 'key',
'queue': 'key',
'kwargs': _kwargs(self.course_keys[3], 2)
},),
]
......
......@@ -48,7 +48,7 @@ RETRY_DELAY_SECONDS = 40
SUBSECTION_GRADE_TIMEOUT_SECONDS = 300
@task(base=LoggedPersistOnFailureTask, routing_key=settings.POLICY_CHANGE_GRADES_ROUTING_KEY)
@task(base=LoggedPersistOnFailureTask)
def compute_all_grades_for_course(**kwargs):
"""
Compute grades for all students in the specified course.
......@@ -69,7 +69,7 @@ def compute_all_grades_for_course(**kwargs):
'batch_size': batch_size,
})
compute_grades_for_course_v2.apply_async(
kwargs=kwargs, routing_key=settings.POLICY_CHANGE_GRADES_ROUTING_KEY
kwargs=kwargs, queue=settings.POLICY_CHANGE_GRADES_ROUTING_KEY
)
......@@ -131,7 +131,6 @@ def compute_grades_for_course(course_key, offset, batch_size, **kwargs): # pyli
time_limit=SUBSECTION_GRADE_TIMEOUT_SECONDS,
max_retries=2,
default_retry_delay=RETRY_DELAY_SECONDS,
routing_key=settings.POLICY_CHANGE_GRADES_ROUTING_KEY
)
def recalculate_course_and_subsection_grades_for_user(self, **kwargs): # pylint: disable=unused-argument
"""
......@@ -171,8 +170,7 @@ def recalculate_course_and_subsection_grades_for_user(self, **kwargs): # pylint
base=LoggedPersistOnFailureTask,
time_limit=SUBSECTION_GRADE_TIMEOUT_SECONDS,
max_retries=2,
default_retry_delay=RETRY_DELAY_SECONDS,
routing_key=settings.RECALCULATE_GRADES_ROUTING_KEY
default_retry_delay=RETRY_DELAY_SECONDS
)
def recalculate_subsection_grade_v3(self, **kwargs):
"""
......
......@@ -162,7 +162,6 @@ def send_bulk_course_email(entry_id, _xmodule_instance_args):
@task(
name='lms.djangoapps.instructor_task.tasks.calculate_problem_responses_csv.v2',
base=BaseInstructorTask,
routing_key=settings.GRADES_DOWNLOAD_ROUTING_KEY,
)
def calculate_problem_responses_csv(entry_id, xmodule_instance_args):
"""
......@@ -175,7 +174,7 @@ def calculate_problem_responses_csv(entry_id, xmodule_instance_args):
return run_main_task(entry_id, task_fn, action_name)
@task(base=BaseInstructorTask, routing_key=settings.GRADES_DOWNLOAD_ROUTING_KEY)
@task(base=BaseInstructorTask)
def calculate_grades_csv(entry_id, xmodule_instance_args):
"""
Grade a course and push the results to an S3 bucket for download.
......@@ -191,7 +190,7 @@ def calculate_grades_csv(entry_id, xmodule_instance_args):
return run_main_task(entry_id, task_fn, action_name)
@task(base=BaseInstructorTask, routing_key=settings.GRADES_DOWNLOAD_ROUTING_KEY)
@task(base=BaseInstructorTask)
def calculate_problem_grade_report(entry_id, xmodule_instance_args):
"""
Generate a CSV for a course containing all students' problem
......@@ -256,7 +255,7 @@ def calculate_may_enroll_csv(entry_id, xmodule_instance_args):
return run_main_task(entry_id, task_fn, action_name)
@task(base=BaseInstructorTask, routing_key=settings.GRADES_DOWNLOAD_ROUTING_KEY)
@task(base=BaseInstructorTask)
def generate_certificates(entry_id, xmodule_instance_args):
"""
Grade students and generate certificates.
......
......@@ -15,8 +15,6 @@ from django.core.mail import EmailMessage
from common.djangoapps.edxmako.shortcuts import render_to_string
from openedx.core.djangoapps.site_configuration import helpers as configuration_helpers
ACE_ROUTING_KEY = getattr(settings, 'ACE_ROUTING_KEY', None)
SOFTWARE_SECURE_VERIFICATION_ROUTING_KEY = getattr(settings, 'SOFTWARE_SECURE_VERIFICATION_ROUTING_KEY', None)
log = logging.getLogger(__name__)
......@@ -73,7 +71,7 @@ class BaseSoftwareSecureTask(Task):
)
@task(routing_key=ACE_ROUTING_KEY)
@task
def send_verification_status_email(context):
"""
Spins a task to send verification status email to the learner
......@@ -99,7 +97,6 @@ def send_verification_status_email(context):
bind=True,
default_retry_delay=settings.SOFTWARE_SECURE_REQUEST_RETRY_DELAY,
max_retries=settings.SOFTWARE_SECURE_RETRY_MAX_ATTEMPTS,
routing_key=SOFTWARE_SECURE_VERIFICATION_ROUTING_KEY,
)
def send_request_to_ss_for_user(self, user_verification_id, copy_id_photo_from):
"""
......
......@@ -150,7 +150,7 @@ CELERY_QUEUES = {
HIGH_MEM_QUEUE: {},
}
CELERY_ROUTES = "{}celery.Router".format(QUEUE_VARIANT)
CELERY_ROUTES = "{}celery.route_task".format(QUEUE_VARIANT)
CELERYBEAT_SCHEDULE = {} # For scheduling tasks, entries can be added to this dict
# STATIC_ROOT specifies the directory where static files are
......@@ -974,3 +974,62 @@ COMPLETION_VIDEO_COMPLETE_PERCENTAGE = ENV_TOKENS.get('COMPLETION_VIDEO_COMPLETE
COMPLETION_VIDEO_COMPLETE_PERCENTAGE)
COMPLETION_VIDEO_COMPLETE_PERCENTAGE = ENV_TOKENS.get('COMPLETION_BY_VIEWING_DELAY_MS',
COMPLETION_BY_VIEWING_DELAY_MS)
######################## CELERY ROUTING ########################
# Defines alternate environment tasks, as a dict of form { task_name: alternate_queue }
ALTERNATE_ENV_TASKS = {}
# Defines the task -> alternate worker queue to be used when routing.
EXPLICIT_QUEUES = {
'openedx.core.djangoapps.content.course_overviews.tasks.async_course_overview_update': {
'queue': GRADES_DOWNLOAD_ROUTING_KEY},
'lms.djangoapps.bulk_email.tasks.send_course_email': {
'queue': BULK_EMAIL_ROUTING_KEY},
'openedx.core.djangoapps.heartbeat.tasks.sample_task': {
'queue': HEARTBEAT_CELERY_ROUTING_KEY},
'lms.djangoapps.instructor_task.tasks.calculate_grades_csv': {
'queue': GRADES_DOWNLOAD_ROUTING_KEY},
'lms.djangoapps.instructor_task.tasks.calculate_problem_grade_report': {
'queue': GRADES_DOWNLOAD_ROUTING_KEY},
'lms.djangoapps.instructor_task.tasks.generate_certificates': {
'queue': GRADES_DOWNLOAD_ROUTING_KEY},
'lms.djangoapps.email_marketing.tasks.get_email_cookies_via_sailthru': {
'queue': ACE_ROUTING_KEY},
'lms.djangoapps.email_marketing.tasks.update_user': {
'queue': ACE_ROUTING_KEY},
'lms.djangoapps.email_marketing.tasks.update_user_email': {
'queue': ACE_ROUTING_KEY},
'lms.djangoapps.email_marketing.tasks.update_course_enrollment': {
'queue': ACE_ROUTING_KEY},
'lms.djangoapps.verify_student.tasks.send_verification_status_email': {
'queue': ACE_ROUTING_KEY},
'lms.djangoapps.verify_student.tasks.send_ace_message': {
'queue': ACE_ROUTING_KEY},
'lms.djangoapps.verify_student.tasks.send_request_to_ss_for_user': {
'queue': SOFTWARE_SECURE_VERIFICATION_ROUTING_KEY},
'openedx.core.djangoapps.schedules.tasks._recurring_nudge_schedule_send': {
'queue': ACE_ROUTING_KEY},
'openedx.core.djangoapps.schedules.tasks._upgrade_reminder_schedule_send': {
'queue': ACE_ROUTING_KEY},
'openedx.core.djangoapps.schedules.tasks._course_update_schedule_send': {
'queue': ACE_ROUTING_KEY},
'openedx.core.djangoapps.schedules.tasks.v1.tasks.send_grade_to_credentials': {
'queue': CREDENTIALS_GENERATION_ROUTING_KEY},
'common.djangoapps.entitlements.tasks.expire_old_entitlements': {
'queue': ENTITLEMENTS_EXPIRATION_ROUTING_KEY},
'lms.djangoapps.grades.tasks.recalculate_course_and_subsection_grades_for_user': {
'queue': POLICY_CHANGE_GRADES_ROUTING_KEY},
'lms.djangoapps.grades.tasks.recalculate_subsection_grade_v3': {
'queue': RECALCULATE_GRADES_ROUTING_KEY},
'openedx.core.djangoapps.programs.tasks.v1.tasks.award_program_certificates': {
'queue': PROGRAM_CERTIFICATES_ROUTING_KEY},
'openedx.core.djangoapps.programs.tasks.v1.tasks.revoke_program_certificates': {
'queue': PROGRAM_CERTIFICATES_ROUTING_KEY},
'openedx.core.djangoapps.programs.tasks.v1.tasks.update_certificate_visible_date_on_course_update': {
'queue': PROGRAM_CERTIFICATES_ROUTING_KEY},
'openedx.core.djangoapps.programs.tasks.v1.tasks.award_course_certificate': {
'queue': PROGRAM_CERTIFICATES_ROUTING_KEY},
'openedx.core.djangoapps.coursegraph.dump_course_to_neo4j': {
'queue': COURSEGRAPH_JOB_QUEUE},
}
......@@ -246,7 +246,7 @@ def should_dump_course(course_key, graph):
return last_this_command_was_run < course_last_published_date
@task(routing_key=settings.COURSEGRAPH_JOB_QUEUE)
@task
def dump_course_to_neo4j(course_key_string, credentials):
"""
Serializes a course and writes it to neo4j.
......
......@@ -13,11 +13,6 @@ from openedx.core.djangoapps.credentials.utils import get_credentials_api_client
logger = get_task_logger(__name__)
# Under cms the following setting is not defined, leading to errors during tests.
# These tasks aren't strictly credentials generation, but are similar in the sense
# that they generate records on the credentials side. And have a similar SLA.
ROUTING_KEY = getattr(settings, 'CREDENTIALS_GENERATION_ROUTING_KEY', None)
# Maximum number of retries before giving up.
# For reference, 11 retries with exponential backoff yields a maximum waiting
# time of 2047 seconds (about 30 minutes). Setting this to None could yield
......@@ -25,7 +20,7 @@ ROUTING_KEY = getattr(settings, 'CREDENTIALS_GENERATION_ROUTING_KEY', None)
MAX_RETRIES = 11
@task(bind=True, ignore_result=True, routing_key=ROUTING_KEY)
@task(bind=True, ignore_result=True)
def send_grade_to_credentials(self, username, course_run_key, verified, letter_grade, percent_grade):
""" Celery task to notify the Credentials IDA of a grade change via POST. """
logger.info(u'Running task send_grade_to_credentials for username %s and course %s', username, course_run_key)
......
......@@ -4,9 +4,8 @@ A trivial task for health checks
from celery.task import task
from django.conf import settings
@task(routing_key=settings.HEARTBEAT_CELERY_ROUTING_KEY)
@task
def sample_task():
return True
......@@ -22,9 +22,6 @@ from openedx.core.djangoapps.programs.utils import ProgramProgressMeter
from openedx.core.djangoapps.site_configuration import helpers as configuration_helpers
LOGGER = get_task_logger(__name__)
# Under cms the following setting is not defined, leading to errors during tests.
ROUTING_KEY = getattr(settings, 'CREDENTIALS_GENERATION_ROUTING_KEY', None)
PROGRAM_CERTIFICATES_ROUTING_KEY = getattr(settings, 'PROGRAM_CERTIFICATES_ROUTING_KEY', None)
# Maximum number of retries before giving up on awarding credentials.
# For reference, 11 retries with exponential backoff yields a maximum waiting
# time of 2047 seconds (about 30 minutes). Setting this to None could yield
......@@ -123,7 +120,7 @@ def award_program_certificate(client, username, program_uuid, visible_date):
})
@task(bind=True, ignore_result=True, routing_key=PROGRAM_CERTIFICATES_ROUTING_KEY)
@task(bind=True, ignore_result=True)
def award_program_certificates(self, username):
"""
This task is designed to be called whenever a student's completion status
......@@ -285,7 +282,7 @@ def post_course_certificate(client, username, certificate, visible_date):
})
@task(bind=True, ignore_result=True, routing_key=ROUTING_KEY)
@task(bind=True, ignore_result=True)
def award_course_certificate(self, username, course_run_key):
"""
This task is designed to be called whenever a student GeneratedCertificate is updated.
......@@ -399,7 +396,7 @@ def revoke_program_certificate(client, username, program_uuid):
})
@task(bind=True, ignore_result=True, routing_key=PROGRAM_CERTIFICATES_ROUTING_KEY)
@task(bind=True, ignore_result=True)
def revoke_program_certificates(self, username, course_key):
"""
This task is designed to be called whenever a student's course certificate is
......@@ -522,7 +519,7 @@ def revoke_program_certificates(self, username, course_key):
LOGGER.info(u'Successfully completed the task revoke_program_certificates for username %s', username)
@task(bind=True, ignore_result=True, routing_key=PROGRAM_CERTIFICATES_ROUTING_KEY)
@task(bind=True, ignore_result=True)
def update_certificate_visible_date_on_course_update(self, course_key):
"""
This task is designed to be called whenever a course is updated with
......
......@@ -143,7 +143,7 @@ class BinnedScheduleMessageBaseTask(ScheduleMessageBaseTask):
raise NotImplementedError
@task(base=LoggedTask, ignore_result=True, routing_key=ROUTING_KEY)
@task(base=LoggedTask, ignore_result=True)
def _recurring_nudge_schedule_send(site_id, msg_str):
_schedule_send(
msg_str,
......@@ -153,7 +153,7 @@ def _recurring_nudge_schedule_send(site_id, msg_str):
)
@task(base=LoggedTask, ignore_result=True, routing_key=ROUTING_KEY)
@task(base=LoggedTask, ignore_result=True)
def _upgrade_reminder_schedule_send(site_id, msg_str):
_schedule_send(
msg_str,
......@@ -163,7 +163,7 @@ def _upgrade_reminder_schedule_send(site_id, msg_str):
)
@task(base=LoggedTask, ignore_result=True, routing_key=ROUTING_KEY)
@task(base=LoggedTask, ignore_result=True)
def _course_update_schedule_send(site_id, msg_str):
_schedule_send(
msg_str,
......
......@@ -5,62 +5,41 @@ For more, see https://celery.readthedocs.io/en/latest/userguide/routing.html#rou
"""
import logging
from abc import ABCMeta, abstractproperty
from django.conf import settings
import six
log = logging.getLogger(__name__)
class AlternateEnvironmentRouter(six.with_metaclass(ABCMeta, object)):
def route_task_queue(name):
"""
A custom Router class for use in routing celery tasks to non-default queues.
"""
@abstractproperty
def alternate_env_tasks(self):
"""
Defines the task -> alternate worker environment to be used when routing.
Subclasses must override this property with their own specific mappings.
"""
return {}
@property
def explicit_queues(self):
"""
Defines the task -> alternate worker queue to be used when routing.
"""
return {}
Helper method allowing for custom routing logic.
def route_for_task(self, task, args=None, kwargs=None): # pylint: disable=unused-argument
"""
Celery-defined method allowing for custom routing logic.
If None is returned from this method, default routing logic is used.
"""
from django.conf import settings # pylint: disable=import-outside-toplevel
If None is returned from this method, default routing logic is used.
"""
if task in self.explicit_queues:
return self.explicit_queues[task]
if name in settings.EXPLICIT_QUEUES:
return settings.EXPLICIT_QUEUES[name]
alternate_env = self.alternate_env_tasks.get(task, None)
if alternate_env:
return self.ensure_queue_env(alternate_env)
alternate_env = settings.ALTERNATE_ENV_TASKS.get(name, None)
if alternate_env:
return ensure_queue_env(alternate_env)
return None
def ensure_queue_env(self, desired_env):
"""
Helper method to get the desired type of queue.
def ensure_queue_env(desired_env):
"""
Helper method to get the desired type of queue.
If no such queue is defined, default routing logic is used.
"""
queues = getattr(settings, 'CELERY_QUEUES', None)
return next(
(
queue
for queue in queues
if '.{}.'.format(desired_env) in queue
),
None
)
If no such queue is defined, default routing logic is used.
"""
from django.conf import settings # pylint: disable=import-outside-toplevel
queues = getattr(settings, 'CELERY_QUEUES', None)
return next(
(
queue
for queue in queues
if '.{}.'.format(desired_env) in queue
),
None
)
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment