Python celery异步框架

celery

功能描述

   它是一个简单、灵活、可靠的用于处理大量消息的分布式系统。

   功能主要有三个:执行异步任务,执行延迟任务,执行定时任务。

   举个例子,你现在有两个项目、一个项目用于爬取数据,一个项目用于分析数据,如何在数据爬取后将任务交给另一个项目进行分析呢?这种场景下就可以使用celery进行处理。

   官网

   英文文档

   一个噩耗消息:

Celery is a project with minimal funding,so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.

Celery是一个资金较少的项目,因此我们不支持Microsoft Windows。请不要提出与该平台有关的任何问题。

   尽管官方提示不支持windows,但是你仍然可以进行使用,这可能需要一些其他模块的辅助。

   celery是单独的服务,并不依赖于其他框架,就像Django一样你只要安装了它就可以通过自身命令启动服务。

架构介绍

   celery架构由三部分组成,分别是消息中间件message broker,任务执行单元worker与任务执行结果存储task result store,如下图所示:

  

image-20201129004930243

   celery是一个独立运行的服务,内置socket,如果想使用它你需要做这几件事情:

  1. 安装celery环境框架,配置broker与backend,启动celery服务
  2. 添加任务到borker,worker就会自动的在后台执行任务
  3. 任务执行完成后,通过backend获取结果

基本使用

安装使用

   安装模块,我装的旧版,新版5.x的有些摸不着头脑:

pip3 install celery==4.4.7

   新建一个python包,任意名字。

project
    ├── celery_task  	# celery包
    │   ├── __init__.py # 包文件
    │   ├── celery.py   # celery连接和配置相关文件,且名字必须叫celery.py
    │   └── tasks.py    # 所有任务函数
    ├── add_task.py  	# 添加任务
    └── get_result.py   # 获取结果

   在celery.py中配置borkerbackend

from celery import Celery

broker = "redis://127.0.0.1:6379/1"  # broker任务队列
backend = "redis://127.0.0.1:6379/2" # 结构存储,执行完的结果存在这
# 如果有密码:"redis//:password@127.0.0.1:6379/2"

app = Celery(
    __name__,# 取名,随便取
    broker=broker,backend=backend,include=[
        "celery_tasks.task",# 第一个任务,必须是包名.文件名
    ]
)

任务书写

   在tasks.py中开始书写任务:

from .celery import app
@app.task  # 必须添加该装饰器
def add(x,y):
    return x+y

@app.task
def sub(x,y):
    return x-y

@app.task
def multi(x,y):
    return x*y

任务执行

   在add_task.py中开始执行任务,三个任务分别指定三种不同的执行状态:

# 导入定义好的任务
from celery_task import tasks

# 添加异步任务,返回结果。任务号
t1_id = tasks.add.delay(10,20)

# 配置延迟、定时任务的时区为本地,如果延迟任务不生效,则取消本地时区的设置(windows下失效)
from celery_task.celery import app
# 时区
# app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
# app.conf.enable_utc = False

# 添加延迟任务,返回结果。任务号
from datetime import datetime,timedelta
time =  datetime.utcnow() + timedelta(seconds=10)  # 十秒后执行
t2_id = tasks.sub.apply_async(args=(100,50),eta=time)

# 添加定时任务,需要启动定时任务beat服务
from celery.schedules import crontab  # 如果要定义其他的周期日期,导入这个
app.conf.beat_schedule = {
    'multi-task': {
        'task': 'celery_task.tasks.multi','schedule': timedelta(seconds=3),# 'schedule': crontab(hour=8,day_of_week=1),# 每周一早八点
        'args': (20,10),}
}

获取结果

   在get_result.py中书写获取结果的代码:

from celery_task.celery import app
from celery.result import AsyncResult

id = 'a9ffd16c-dbe0-44d2-9317-b198b432273c'  # 任务号
if __name__ == '__main__':
    async = AsyncResult(id=id,app=app)
    if async.successful():
        result = async.get()
        print(result)
    elif async.failed():
        print('任务失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')

启动服务

   接下来启动服务,首先切换到该包的上级目录中:

# cd project
# Linux
	celery worker -A 模块名 -l info
# Windows
	需要先安装eventlet模块
	pip install eventlet
	celery worker -A 包名 -l info -P eventlet
	
# 如果是定时任务,还需要启动beat服务
	celery beat -A 包名 -l info

Django使用

基本使用

   如果在Django中要使用celery,则需要将celery项目建立在Django项目的根目录下:

- DjangoProject01
	- celery_project
		- __init__.py
		- celery.py
		- django_app_name_task.py
	- app01
	- djangoproject01

   同时,在任务中还需要导入Django环境,一般书写在celery.py文件中即可:

import os
import django

from celery import Celery

# 由于celery是独立的项目,所以必须导入django环境
os.environ.setdefault("DJANGO_SETTINGS_MODULE","DjangoProject.settings")
django.setup()

broker = 'redis://127.0.0.1:6379/1'  # broker任务队列
backend = 'redis://127.0.0.1:6379/2'  # 结构存储,执行完的结果存在这

app=Celery(__name__,broker=broker,include=['celery_project.app01_task',])

app.conf.timezone = "Asia/Shanghai"
app.conf.enable_utc = False

from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
    'add-task': {
        'task': 'celery_project.app01_task.task01','schedule': timedelta(hours=4),}
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


注:所有源代码均实测运行过。所有源代码均已上传CSDN,请有需要的朋友自行下载。
继承APIView和ViewSetMixin;作用也与APIView基本类似,提供了身份认证、权限校验、流量管理等。ViewSet在开发接口中不经常用。
一、Django介绍Python下有许多款不同的 Web 框架。Django是重量级选手中最有代表性的一位。许多成功的网站和APP都基于Django。Django 是一个开放源代码的 Web 应用框架,由 Python 写成。Django 遵守 BSD 版权,初次发布于 2005 年 7 月, 并于 2008 年 9 月发布了第一个正式版本 1.0 。Django学习线路Django 采用了 MVT 的软件设计模式,即模型(Model),视图(View)和模板(Template)。这个MVT模式并
本文从nginx快速掌握到使用,gunicorn快速掌握到使用,实现小白快速搭建django项目,并对可能出现的报错进行了分析
uniapp微信小程序订阅消息发送服务通知
Django终端打印SQL语句 1 Setting配置: 2 默认python 使用的MysqlDB连接,Python3 支持支持pymysql 所有需要在app里面的__init__加上下面配置:
url: re_path('authors/$', views.AuthorView.as_view()), re_path('book/(?P\d+)/$', vie
前提 关于html寻找路线: template 如果在各个APP中存在, Django 会优先找全局template 文件下的html文件,如果全局下的template文件没有相关的html Djan
// GET请求request.GET // POST请求request.POST // 处理文件上传请求request.FILES // 处理如checkbox等多选 接受列表request.get
from bs4 import BeautifulSoup#kindeditordef kindeditor(request): s = ''' <li><s
view.py 配置 html 配置
from django.http import JsonResponse JsonResponse 里面代码会加这一个响应头 kwargs.setdefault('content_type&#
#下面两种是基于QuerySet查询 也就是说SQL中用的jion连表的方式查询books = models.UserInfo.objects.all() print(type(books)) &gt
return HttpResponse("OK") 返回一个字符串 return redirect("/index/") 返回URL return render
from django.http import JsonResponse JsonResponse 里面代码会加这一个响应头 kwargs.setdefault('content_type&#
浏览器有一个很重要的概念——同源策略(Same-Origin Policy)。所谓同源是指,域名,协议,端口相同。不同源的客户端脚本(javascript、ActionScript)在没明确授权的情况
自动发送 > 依赖jQuery文件 实例-->GET请求: 手动发送 > 依赖浏览器XML对象(也叫原生ajax) Ajax主要就是使用 【XmlHttpRequest】对象来完成请
#下面两种是基于QuerySet查询 也就是说SQL中用的jion连表的方式查询books = models.UserInfo.objects.all() print(type(books)) &gt
// GET请求request.GET // POST请求request.POST // 处理文件上传请求request.FILES // 处理如checkbox等多选 接受列表request.get
return HttpResponse("OK") 返回一个字符串 return redirect("/index/") 返回URL return render