带有Redis代理和多个队列的Celery:将所有任务注册到每个队列

如何解决带有Redis代理和多个队列的Celery:将所有任务注册到每个队列

我将celery与Django和redis用作代理。我正在尝试设置两个队列:defaultother。我的任务正在运行,但是我配置的设置无法正常运行。

我遇到两个相关问题:

  1. 芹菜任务不遵守task_routes设置(请参见下文)。
  2. 所有celery任务(无论它们如何定义)在启动时都已注册到两个队列中的每个队列

这是包含所有代码的存储库。您可以通过运行docker-compose up在docker和docker-compose本地运行示例:

https://gitlab.com/verbose-equals-true/digital-ocean-docker-swarm

这是我代码的相关部分:

  1. 芹菜应用定义文件
  2. 任务定义/声明
  3. 启动工作人员的命令

芹菜应用定义:

from celery import Celery
from django.conf import settings

from kombu import Exchange,Queue

CELERY_QUEUE_DEFAULT = 'default'
CELERY_QUEUE_OTHER = 'other'

app = Celery('backend')
app.conf["broker_url"] = f"redis://{settings.REDIS_SERVICE_HOST}:6379/1"
app.conf["result_backend"] = f"redis://{settings.REDIS_SERVICE_HOST}:6379/2"
app.conf["accpet_content"] = ['application/json']
app.conf["task_serializer"] = 'json'
app.conf["result_serializer"] = 'json'
app.conf["task_acks_late"] = True
app.conf["task_default_queue"] = CELERY_QUEUE_DEFAULT
app.conf["worker_send_task_events"] = True
app.conf["worker_prefetch_multiplier"] = 1
app.conf["task_queues"] = (
    Queue(
        CELERY_QUEUE_DEFAULT,Exchange(CELERY_QUEUE_DEFAULT),routing_key=CELERY_QUEUE_DEFAULT,),Queue(
        CELERY_QUEUE_OTHER,Exchange(CELERY_QUEUE_OTHER),routing_key=CELERY_QUEUE_OTHER,)
app.conf["task_routes"] = {
    'backend.core.tasks.debug_task': {
        'queue': 'default','routing_key': 'default','exchange': 'default',},'backend.core.tasks.debug_task_other': {
        'queue': 'other','routing_key': 'other','exchange': 'other',}
app.conf["task_default_exchange_type"] = 'direct'

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

任务定义(在名为tasks.py的应用程序中的名为core的文件中定义:

import time

import celery

# from backend import celery_app as app
# from celery import shared_task
from celery.task import task
from django.conf import settings

# @celery.task <-- I have seen these decorators in other example
# @app.task <-- neither of these result in the tasks being sent to the correct queue
# @shared_task
@task(queue="default",exchange="default")
def debug_task():
    time.sleep(10)
    return "Task is done."


@task(queue="other",exchange="other")
def debug_task_other():
    time.sleep(10)
    return "Task is done for other queue."

这是我在docker-compose本地开始工作的方式:

  celery:
    <<: *backend
    container_name: celery
    command:
      - "watchmedo"
      - "auto-restart"
      - "--directory=./"
      - "--pattern=*.py"
      - "--recursive"
      - "--"
      - "celery"
      - "worker"
      - "--app=backend.celery_app:app"
      - "-Q"
      - "default"
      - "--concurrency=1"
      - "--loglevel=INFO"
    ports: []

  celery_other:
    <<: *backend
    container_name: celery_other
    command:
      - "watchmedo"
      - "auto-restart"
      - "--directory=./"
      - "--pattern=*.py"
      - "--recursive"
      - "--"
      - "celery"
      - "worker"
      - "--app=backend.celery_app:app"
      - "-Q"
      - "other"
      - "--concurrency=1"
      - "--loglevel=INFO"
    ports: []

以下是docker-compose的日志,显示两个任务都已注册到每个工作人员:

celery             |  
celery             |  -------------- celery@c8f0ed3f97df v4.4.7 (cliffs)
celery             | --- ***** ----- 
celery             | -- ******* ---- Linux-4.15.0-112-generic-x86_64-with-glibc2.2.5 2020-08-21 20:35:47
celery             | - *** --- * --- 
celery             | - ** ---------- [config]
celery             | - ** ---------- .> app:         backend:0x7fb327c2e6a0
celery             | - ** ---------- .> transport:   redis://redis:6379/1
celery             | - ** ---------- .> results:     redis://redis:6379/2
celery             | - *** --- * --- .> concurrency: 1 (prefork)
celery             | -- ******* ---- .> task events: ON
celery             | --- ***** ----- 
celery             |  -------------- [queues]
celery             |                 .> default          exchange=default(direct) key=default
celery             |                 
celery             | 
celery             | [tasks]
celery             |   . core.tasks.debug_task
celery             |   . core.tasks.debug_task_other
celery             | 
celery_other       |  
celery_other       |  -------------- celery@3dd99b1ed32e v4.4.7 (cliffs)
celery_other       | --- ***** ----- 
celery_other       | -- ******* ---- Linux-4.15.0-112-generic-x86_64-with-glibc2.2.5 2020-08-21 20:35:47
celery_other       | - *** --- * --- 
celery_other       | - ** ---------- [config]
celery_other       | - ** ---------- .> app:         backend:0x7f54fa89e6a0
celery_other       | - ** ---------- .> transport:   redis://redis:6379/1
celery_other       | - ** ---------- .> results:     redis://redis:6379/2
celery_other       | - *** --- * --- .> concurrency: 1 (prefork)
celery_other       | -- ******* ---- .> task events: ON
celery_other       | --- ***** ----- 
celery_other       |  -------------- [queues]
celery_other       |                 .> other            exchange=other(direct) key=other
celery_other       |                 
celery_other       | 
celery_other       | [tasks]
celery_other       |   . core.tasks.debug_task
celery_other       |   . core.tasks.debug_task_other

我当时认为定义task_routes意味着我不必在任务装饰器中指定任务的队列。如果我未指定队列,则默认工作程序将接管所有任务。

如果有帮助,这是我的Django目录结构:

tree -L 3 backend
backend
├── backend
│   ├── asgi.py
│   ├── celery_app.py <- this is where I define my celery app
│   ├── __init__.py
│   ├── settings
│   │   ├── base.py
│   │   ├── development.py
│   │   ├── __init__.py
│   │   └── production.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
├── core
│   ├── admin.py
│   ├── apps.py
│   ├── __init__.py
│   ├── migrations
│   │   └── __init__.py
│   ├── models.py
│   ├── tasks.py <- this is where I define the tasks shown above
│   ├── tests.py
│   ├── urls.py
│   └── views.py
├── docker
│   ├── Dockerfile.dev
│   └── Dockerfile.prod
├── manage.py
└── requirements
    ├── base.txt
    ├── dev.txt
    └── test.txt

这是几年前另一个从未解决的相关SO问题:Celery tasks registering in multiple queues

以下是我在GitHub上celery / celery存储库中提出的同一问题的链接:https://github.com/celery/celery/issues/6309

我尝试按照celery文档中的Routing Tasks页进行正确设置:https://docs.celeryproject.org/en/stable/userguide/routing.html

编辑:

我对芹菜工人加工过程的输出感到误解。列出的[tasks]是指芹菜应用程序的所有任务,而不是该工作人员应处理的任务(这就是我的意思)。该SO帖子解释了:How to register Celery task to specific worker?,因此在[tasks]的两个工作人员的输出中列出所有任务是有道理的。这解决了我上面列出的第二个问题。

我还通过切换到在Django设置中使用命名空间的celery设置解决了第一个问题,我可能也配置有误。现在,芹菜任务已正确路由,并且如果未在CELERY_TASK_ROUTES中指定路由,则将其路由到默认队列。

解决方法

要解决您的第一个问题,就在这里。

要使用Celery提供的设置,您需要使用celery.update(__all__),或者,如果您只想更新设置,则应执行celery.update(settings)

如果需要更多帮助,请随时发表评论。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-