如何解决如何将Django Rest Framework API视图变成异步视图?
我正在尝试构建一个REST API,它将管理一些机器学习分类任务。我已经编写了一个API视图,点击该视图将触发分类任务的开始(例如:使用用户先前提供的数据训练SVM分类器)。但是,这是一项长期运行的任务,因此,理想情况下,我希望用户在请求此视图后就不必等待。相反,我想在后台启动此任务并立即给他们答复。他们以后可以在单独的视图中查看分类的结果(尚未实现)。
我正在ASGI_APPLICATION = 'mlxplorebackend.asgi.application'
中使用settings.py
。
这是我在views.py
中的API视图
import asyncio
from concurrent.futures import ProcessPoolExecutor
from django import setup as SetupDjango
# ... other imports
loop = asyncio.get_event_loop()
def DummyClassification():
result = sum(i * i for i in range(10 ** 7))
print(result)
return result
# ... other API views
class TaskExecuteView(APIView):
"""
Once an API call is made to this view,the classification algorithm will start being processed.
Depends on:
1. Parser for the classification algorithm type and parameters
2. Classification algorithm implementation
"""
def get(self,request,taskId,*args,**kwargs):
try:
task = TaskModel.objects.get(taskId = taskId)
except TaskModel.DoesNotExist:
raise Http404
else:
# this is basically the classification task for now
# need to turn this to an async view
with ProcessPoolExecutor(initializer = SetupDjango) as pool:
loop.run_in_executor(pool,DummyClassification)
return Response({ "message": "The task with id: {} has been started".format(task.taskId) },status = status.HTTP_200_OK)
我面临的问题如下:
-
当我不使用
with ProcessPoolExecutor(initializer = SetupDjango) as pool:
即没有初始化程序时,我得到django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.
(在https://paste.ubuntu.com/p/ctjmFNYMXW/处的完整追溯) -
当我使用初始化程序时,视图不再保持异步,而是被阻塞。任务完成后返回响应,这在我的计算机上大约5秒钟。我确实意识到我并没有真正在
asyncio.sleep()
函数中使用DummyClassification()
,但是我不知道这样做的方法。
我猜这不是做到这一点的方法,因此任何建议将不胜感激。如果可以的话,我想避免芹菜,因为这对我来说有点太复杂了。
编辑:
如果我摆脱了ProcessPoolExecutor()
并只是做了loop.run_in_executor(None,DummyClassification)
,它就可以按预期工作,但是只有一个工作线程正在执行该任务,这似乎不太适合分类任务。>
解决方法
这是一次骑行。起初,我经历了设置 celery
的痛苦,结果发现使用一个 CPU 内核的分类任务的原始问题仍然存在。然后我用 django-rq
切换到 redis
,它目前按预期工作。
from .tasks import Pipeline
class TaskExecuteView(APIView):
"""
Once an API call is made to this view,the classification algorithm will start being processed.
Depends on:
1. Parser for the classification algorithm type
2. Classification algorithm implementation
"""
def get(self,request,taskId,*args,**kwargs):
try:
task = TaskModel.objects.get(taskId = taskId)
except TaskModel.DoesNotExist:
raise Http404
else:
Pipeline.delay(taskId) # this is async now ✔
# mark this as an in-progress task
TaskModel.objects.filter(taskId = taskId).update(inProgress = True)
return Response({ "message": "The task with id: {},title: {} has been started".format(task.taskId,task.taskTitle) },status = status.HTTP_200_OK)
tasks.py
from django_rq import job
@job('default',timeout=3600)
def Pipeline(taskId):
# classification task
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。