一、问题引入
- 我们的代码是自上而下同步执行的。
- 发送短信是耗时的操作。如果短信被阻塞住,用户响应将会延迟。
- 响应延迟会造成用户界面的倒计时延迟。
二、解决方案
- 异步发送短信
- 发送短信和响应分开执行,将
发送短信
从主业务中解耦
出来
三、生产者消费者设计模式介绍
- 为了将
发送短信
从主业务中解耦
出来,可以引入生产者消费者设计模式
。- 它是最常用的解耦方式之一,寻找中间人(broker)搭桥,保证两个业务没有直接关联。
- 生产者生成消息,缓存到消息队列中,消费者读取消息队列中的消息并执行。
- 由用户在前端点击发送验证码后端生成发送短信消息,缓存到消息队列中,消费者读取消息队列中的发送短信消息并执行。
四、Celery介绍
1、介绍:
- 一个简单、灵活且可靠、处理大量消息的分布式系统,可以在一台或者多台机器上运行。
- 单个 Celery 进程每分钟可处理数以百万计的任务。
- 通过消息进行通信,使用
消息队列(broker)
在客户端
和消费者
之间进行协调。
2、celery的5个角色:
Task:消息,任务,需要进行异步的函数或其他操作
Broker:中间人,接收生产者发来的任务即Task,将任务存入队列。任务的消费者Worker。Celery本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务。
Worker:执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它。
Beat:定时任务调度器,根据配置定时将任务发送给Broker。
Backend:用于存储任务的执行结果
五、定义发送短信任务
1、window下安装celery
pip install -U Celery
- celery默认是进程池方式,进程数以当前机器的CPU核数为参考,每个CPU开四个进程。
- 由于celery4.x后好像不支持win系统,所以这里我们使用线程的方式
2、window下安装eventlet
pip install eventlet
3、定义celery目录结构
4、定义配置文件config.py
# 指定消息队列为redis(中间人,中间容器)
broker_url = 'redis://mast:{}@xx.xx.xx.xx:6379/10'.format(123456)
# 指定消费者结果存储redis
backend = 'redis://mast:{}@xx.xx.xx.xx:6379/11'.format(123456)
5、定义主文件main.py
# celery启动类模块
from celery import Celery
# 创建celery实例
celery_app = Celery('TestDjango')
# 加载celery配置
celery_app.config_from_object('celery_tasks.config')
# 注册任务到celery实例中
celery_app.autodiscover_tasks(['celery_tasks.sms'])
"""
也可以通过以下方式配置中间人
celery_app = Celery(
'TestDjango', # 指定实例名,
# 指定消息队列为redis(中间人,中间容器)
broker_url='redis://mast:{}@xx.xx.xx.xx:6379/10'.format(123456),
# 指定消费者结果存储redis
backend='redis://mast:{}@xx.xx.xx.xx:6379/11'.format(123456),
)
celery_app.conf.update(
# 指定消息队列为redis(中间人,中间容器)
broker_url='redis://mast:{}@xx.xx.xx.xx:6379/10'.format(123456),
# 指定消费者结果存储redis
backend='redis://mast:{}@xx.xx.xx.xx:6379/11'.format(123456),
)
"""
6、定义sms包下的tasks.py发送短信验证码任务(注意:定义的任务py文件名必须为tasks.py)
不带参数配置
from celery_tasks.sms_code.ronglian_sms_sdk import SmsSDK
from celery_tasks import utils
import logging
from celery_tasks.main import celery_app
# 创建容通讯实例对象
sms_code = SmsSDK(accId=utils.ACCID, accToken=utils.ACCTOKEN, appId=utils.APPID)
@celery_app.task
def celery_send_sms_code(mobile, code):
"""
:param mobile: 电话号码
:param code: 短信验证码
:return: 成功0 或-1
"""
try:
send_result = sms_code.sendMessage(tid='1', mobile=mobile, datas=(code, 5))
print('函数正在测试')
except Exception as e:
logging.error(e)
else:
return send_result
带参数配置
from celery_tasks.sms_code.ronglian_sms_sdk import SmsSDK
from celery_tasks import utils
import logging
from celery_tasks.main import celery_app
# 创建容通讯实例对象
sms_code = SmsSDK(accId=utils.ACCID, accToken=utils.ACCTOKEN, appId=utils.APPID)
# bind:保证task对象会作为第一个参数自动传入
# name:异步任务别名
# retry_backoff:异常自动重试的时间间隔 第n次(retry_backoff×2^(n-1))s
# max_retries:异常自动重试次数的上限
@celery_app.task(bind=True, name='send_sms_code', retry_backoff=3)
def celery_send_sms_code(self,mobile, code):
"""
:param mobile: 电话号码
:param code: 短信验证码
"""
try:
send_result = sms_code.sendMessage(tid='1', mobile=mobile, datas=(code, 5))
print('函数正在测试')
except Exception as e:
logging.error(e)
# 有异常自动重试三次
raise self.retry(exc=e, max_retries=3)
if not send_result:
raise self.retry(exc=Exception('发送短信失败'), max_retries=3)
else:
return send_result
6、在终端下启动celery服务
# 进程方式启动
celery -A celery_tasks.main worker -l info
# 线程方式启动
celery -A celery_tasks.main worker -l info --concurrency=1000 -P eventlet -c 1000
-A:
指对应的应用程序, 其参数是项目中 Celery实例的位置。worker:
指这里要启动的worker。-l
:
指日志等级,比如info
等级- -P:运行的程序,可以是threads、gevent、eventlet等
- -c:任务的并行数
- --concurrency:指定每个CPU开的进程数
7、视图中调用发送短信任务
celery_send_sms_code.delay(mobile=mobile, code=sms_code)
class SmsCodeView(View):
"""
获取短信验证码
"""
def get(self, request, mobile):
# 获取redis连接对象,存储短信验证码
try:
redis_conn = get_redis_connection('verify_code')
except DatabaseError as e:
return http.JsonResponse({
'errmsg': '获取验证码失败'
})
# 获取发送标志
send_flag = redis_conn.get('send_flag_%s' % mobile)
# 如果发送标志存在,说明用户在60秒内多次获取验证码
if send_flag:
return http.JsonResponse({
'errmsg': '发送短信验证码过于频繁'
})
# 生成随机6为验证码
sms_code = '%06d' % random.randint(0, 999999)
# 存储随机验证码(五分钟的有效期)
redis_conn.setex('sms_code_%s' % mobile, 300, sms_code)
# 设置发送标志
redis_conn.setex('send_flag_%s' % mobile, 60, 1)
# 调用发送短信验证码,通过delay()方法
celery_send_sms_code.delay(mobile=mobile, code=sms_code)
# 返回响应
response = http.JsonResponse({
'errmsg': 'ok'
})
response.headers['Access-Contro1-Allow-origin'] = '*'
return response
def post(self, request, mobile):
"""
验证短信验证码
"""
# 接受参数
json_dict = json.loads(request.body.decode())
sms_code_client = json_dict.get('sms_code')
if not sms_code_client:
return http.JsonResponse({
'errmsg': '验证错误'
})
try:
redis_conn = get_redis_connection('verify_code')
sms_code_server = redis_conn.get('sms_code_%s' % mobile)
except DatabaseError as e:
print(e)
return http.JsonResponse({
'errmsg': '验证错误'
})
if sms_code_server is None or sms_code_server != sms_code_client:
return http.JsonResponse({
'errmsg': '验证错误或图形验证码失效'
})
# 验证成功后删除验证码
redis_conn.delete('sms_code_%s' % mobile)
return http.JsonResponse({
'errmsg': 'ok'
})
六、测试
上一篇: vue-django实现(云通讯短信验证码)(三)前后端设置60秒内发送短信验证码限制
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。