如何解决带有Redis代理和多个队列的Celery:将所有任务注册到每个队列
我将celery与Django和redis用作代理。我正在尝试设置两个队列:default
和other
。我的任务正在运行,但是我配置的设置无法正常运行。
我遇到两个相关问题:
- 芹菜任务不遵守
task_routes
设置(请参见下文)。 - 所有celery任务(无论它们如何定义)在启动时都已注册到两个队列中的每个队列
这是包含所有代码的存储库。您可以通过运行docker-compose up
在docker和docker-compose本地运行示例:
https://gitlab.com/verbose-equals-true/digital-ocean-docker-swarm
这是我代码的相关部分:
- 芹菜应用定义文件
- 任务定义/声明
- 启动工作人员的命令
芹菜应用定义:
from celery import Celery
from django.conf import settings
from kombu import Exchange,Queue
CELERY_QUEUE_DEFAULT = 'default'
CELERY_QUEUE_OTHER = 'other'
app = Celery('backend')
app.conf["broker_url"] = f"redis://{settings.REDIS_SERVICE_HOST}:6379/1"
app.conf["result_backend"] = f"redis://{settings.REDIS_SERVICE_HOST}:6379/2"
app.conf["accpet_content"] = ['application/json']
app.conf["task_serializer"] = 'json'
app.conf["result_serializer"] = 'json'
app.conf["task_acks_late"] = True
app.conf["task_default_queue"] = CELERY_QUEUE_DEFAULT
app.conf["worker_send_task_events"] = True
app.conf["worker_prefetch_multiplier"] = 1
app.conf["task_queues"] = (
Queue(
CELERY_QUEUE_DEFAULT,Exchange(CELERY_QUEUE_DEFAULT),routing_key=CELERY_QUEUE_DEFAULT,),Queue(
CELERY_QUEUE_OTHER,Exchange(CELERY_QUEUE_OTHER),routing_key=CELERY_QUEUE_OTHER,)
app.conf["task_routes"] = {
'backend.core.tasks.debug_task': {
'queue': 'default','routing_key': 'default','exchange': 'default',},'backend.core.tasks.debug_task_other': {
'queue': 'other','routing_key': 'other','exchange': 'other',}
app.conf["task_default_exchange_type"] = 'direct'
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
任务定义(在名为tasks.py
的应用程序中的名为core
的文件中定义:
import time
import celery
# from backend import celery_app as app
# from celery import shared_task
from celery.task import task
from django.conf import settings
# @celery.task <-- I have seen these decorators in other example
# @app.task <-- neither of these result in the tasks being sent to the correct queue
# @shared_task
@task(queue="default",exchange="default")
def debug_task():
time.sleep(10)
return "Task is done."
@task(queue="other",exchange="other")
def debug_task_other():
time.sleep(10)
return "Task is done for other queue."
这是我在docker-compose
本地开始工作的方式:
celery:
<<: *backend
container_name: celery
command:
- "watchmedo"
- "auto-restart"
- "--directory=./"
- "--pattern=*.py"
- "--recursive"
- "--"
- "celery"
- "worker"
- "--app=backend.celery_app:app"
- "-Q"
- "default"
- "--concurrency=1"
- "--loglevel=INFO"
ports: []
celery_other:
<<: *backend
container_name: celery_other
command:
- "watchmedo"
- "auto-restart"
- "--directory=./"
- "--pattern=*.py"
- "--recursive"
- "--"
- "celery"
- "worker"
- "--app=backend.celery_app:app"
- "-Q"
- "other"
- "--concurrency=1"
- "--loglevel=INFO"
ports: []
以下是docker-compose
的日志,显示两个任务都已注册到每个工作人员:
celery |
celery | -------------- celery@c8f0ed3f97df v4.4.7 (cliffs)
celery | --- ***** -----
celery | -- ******* ---- Linux-4.15.0-112-generic-x86_64-with-glibc2.2.5 2020-08-21 20:35:47
celery | - *** --- * ---
celery | - ** ---------- [config]
celery | - ** ---------- .> app: backend:0x7fb327c2e6a0
celery | - ** ---------- .> transport: redis://redis:6379/1
celery | - ** ---------- .> results: redis://redis:6379/2
celery | - *** --- * --- .> concurrency: 1 (prefork)
celery | -- ******* ---- .> task events: ON
celery | --- ***** -----
celery | -------------- [queues]
celery | .> default exchange=default(direct) key=default
celery |
celery |
celery | [tasks]
celery | . core.tasks.debug_task
celery | . core.tasks.debug_task_other
celery |
celery_other |
celery_other | -------------- celery@3dd99b1ed32e v4.4.7 (cliffs)
celery_other | --- ***** -----
celery_other | -- ******* ---- Linux-4.15.0-112-generic-x86_64-with-glibc2.2.5 2020-08-21 20:35:47
celery_other | - *** --- * ---
celery_other | - ** ---------- [config]
celery_other | - ** ---------- .> app: backend:0x7f54fa89e6a0
celery_other | - ** ---------- .> transport: redis://redis:6379/1
celery_other | - ** ---------- .> results: redis://redis:6379/2
celery_other | - *** --- * --- .> concurrency: 1 (prefork)
celery_other | -- ******* ---- .> task events: ON
celery_other | --- ***** -----
celery_other | -------------- [queues]
celery_other | .> other exchange=other(direct) key=other
celery_other |
celery_other |
celery_other | [tasks]
celery_other | . core.tasks.debug_task
celery_other | . core.tasks.debug_task_other
我当时认为定义task_routes
意味着我不必在任务装饰器中指定任务的队列。如果我未指定队列,则默认工作程序将接管所有任务。
如果有帮助,这是我的Django目录结构:
tree -L 3 backend
backend
├── backend
│ ├── asgi.py
│ ├── celery_app.py <- this is where I define my celery app
│ ├── __init__.py
│ ├── settings
│ │ ├── base.py
│ │ ├── development.py
│ │ ├── __init__.py
│ │ └── production.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
├── core
│ ├── admin.py
│ ├── apps.py
│ ├── __init__.py
│ ├── migrations
│ │ └── __init__.py
│ ├── models.py
│ ├── tasks.py <- this is where I define the tasks shown above
│ ├── tests.py
│ ├── urls.py
│ └── views.py
├── docker
│ ├── Dockerfile.dev
│ └── Dockerfile.prod
├── manage.py
└── requirements
├── base.txt
├── dev.txt
└── test.txt
这是几年前另一个从未解决的相关SO问题:Celery tasks registering in multiple queues
以下是我在GitHub上celery / celery存储库中提出的同一问题的链接:https://github.com/celery/celery/issues/6309
我尝试按照celery文档中的Routing Tasks
页进行正确设置:https://docs.celeryproject.org/en/stable/userguide/routing.html
编辑:
我对芹菜工人加工过程的输出感到误解。列出的[tasks]
是指芹菜应用程序的所有任务,而不是该工作人员应处理的任务(这就是我的意思)。该SO帖子解释了:How to register Celery task to specific worker?,因此在[tasks]
的两个工作人员的输出中列出所有任务是有道理的。这解决了我上面列出的第二个问题。
我还通过切换到在Django设置中使用命名空间的celery设置解决了第一个问题,我可能也配置有误。现在,芹菜任务已正确路由,并且如果未在CELERY_TASK_ROUTES
中指定路由,则将其路由到默认队列。
解决方法
要解决您的第一个问题,就在这里。
要使用Celery提供的设置,您需要使用celery.update(__all__)
,或者,如果您只想更新设置,则应执行celery.update(settings)
。
如果需要更多帮助,请随时发表评论。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。