如何解决芹菜,连锁任务和返回结果
我正在尝试学习使用芹菜,但在理解一些东西时遇到了一些麻烦。 首先,链接任务以及如何在项目的其他地方使用结果(例如,通过Web套接字发送结果)。
第二,我该如何定义应该是链式任务的函数,还是仅在任务内部调用它?
例如:
# worker
@celery_app.task(acks_late=True)
def get_alerts():
run_analysts_model()
@celery_app.task(acks_late=True)
def compute_stock_indicators(stocks: list):
stocks_with_indicators = {}
for stock in stocks:
current_task.update_state(state=Actions.STARTED,meta={f"starting to fetch {stock}'s indicators"})
stock_indicators = fetch_stock_indicators(stock) # Fetch the stock most recent indicators
current_task.update_state(state=Actions.FINISHED,meta={f"{stock}'s indicators fetched"})
stocks_with_indicators.update({stock: stock_indicators})
return stocks_with_indicators
@celery_app.task(acks_late=True)
def prediction_task(stocks: list):
predictions = {}
stocks_indicators = compute_stock_indicators(stocks)
stocks_symbols = stocks_indicators.keys()
for stock in stocks_indicators:
stock_symbol = stock.keys()
current_task.update_state(state=Actions.STARTED,meta={f"predicting {stock_symbol} ..."})
stock_prediction = predict(stock) # send stock to prediction model
current_task.update_state(state=Actions.FINISHED,meta={f"{stocks_symbols}'s prediction finished.."})
predictions.update({stock_symbol: stock_prediction})
current_task.update_state(state=Actions.FINISHED,meta={f"finished analyzing all stocks"})
return predictions
def chain_tasks():
pass # How do I properly chain the above tasks and use the result of this elsewhere?
#For exmaple in here:
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await chain_tasks() # is the proper way to consume the result?
await websocket.send_text(f"{data}")
最后,如果我将其作为crontab的计划任务使用,是否应该在配置文件中调用chain_tasks函数?
celery_app = Celery(
"worker",backend="redis://:password123@redis:6379/0",broker="amqp://user:bitnami@rabbitmq:5672//"
)
celery_app.conf.task_routes = {
"app.app.worker.celery_worker.chain_tasks": "stocks-queue"
}
celery_app.conf.update(task_track_started=True)
celery_app.conf.beat_schedule = {
'run_analysts_model': {
'task': 'app.app.worker.celery_worker.chain_tasks',# WILL RUN EVERY DAY AT 23:00
'schedule': crontab(minute=0,hour=23),},}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。