如何解决将paho MQTT与Python中的另一个异步过程结合起来
我在Python中有一个基本的MQTTListener类,它侦听有关某些主题的消息,并且应该启动或停止从另一个脚本导入的异步过程。除非手动停止,否则此过程将永远运行。假设监听器看起来像这样:
import paho.mqtt.client as mqtt
import json
from python_file.py import async_forever_function
class MqttListener:
def __init__(self,host,port,client_id):
self.host = host
self.port = port
self.client_id = client_id
self.client = mqtt.Client(client_id=self.client_id)
self.client.connect(host=self.host,port=self.port)
def on_connect(self,client,userdata,flags,rc):
self.client.subscribe(topic=[("start",1),])
self.client.subscribe(topic=[("stop",])
logging.info(msg="MQTT - connected!")
def on_disconnect(client,rc):
logging.info(msg="MQTT - disconnected!")
def on_message(self,message,):
print('PROCESSING MESSAGE',message.topic,message.payload.decode('utf-8'),)
if message.topic == 'start':
async_forever_function(param='start')
print('process started')
else:
async_forever_function(param='stop')
print('process removed')
def start(self):
self.client.on_connect = lambda client,rc: self.on_connect(client,rc)
self.client.on_message = lambda client,message: self.on_message(client,message)
self.client.on_disconnect = lambda client,rc: self.on_disconnect(client,rc)
self.client.loop_start()
def stop(self):
self.client.loop_stop()
现在,这适用于启动一个新的异步过程。即,在启动MQTT主题上发布消息时,正确触发了async_function。但是,一旦启动了该异步过程,侦听器将不再能够接收/处理来自Stop MQTT主题的消息,并且异步过程将永远继续运行,而实际上它应该已经停止了。
我的问题:当后台运行活动的异步进程时,如何修改此类的代码,使其也可以处理消息?
解决方法
您无法在on_message()
回调中阻止任务。
此回调在MQTT客户端线程(由loop_start()
函数启动的线程)上运行。该线程处理所有网络流量和消息处理,如果您将其阻止,则将无法执行任何操作。
如果要从on_message()
回调中调用长时间运行的任务,则需要为该长时间运行的任务启动新线程,以免阻塞MQTT客户端循环。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。