如何解决在消息未确认时暂停消费消息
我有一个 pika RabbitMQ 实现,其中消息被提交到队列,并由消费者处理。处理每条消息可能需要一些时间,而且我只想在任何给定时间为每个消费者处理一条消息。
为此,我在单独的工作线程中处理消息,并最终在任务完成时通过添加回调线程安全来确认消息。但是,主线程仍在消费消息。一种方法是在工作完成时发送 nack
条消息,但随着所有消费者变得忙碌,队列中会出现大量重新提交。
有什么办法可以“暂停”消费,同时仍然保持消息未确认?
busy = multiprocessing.Value('b',False)
def on_message(channel,method_frame,header_frame,body):
with busy.get_lock():
am_busy = busy.value
if not am_busy:
busy.value = True
if not am_busy:
thread = threading.Thread(target=do_work,args=(channel,method_frame.delivery_tag,body))
thread.start()
else:
nack_message(channel,method_frame.delivery_tag)
def do_work(channel,delivery_tag,body):
time.sleep(60) # do something with body content...
cb = functools.partial(ack_message,channel,delivery_tag)
connection.add_callback_threadsafe(cb)
with busy.get_lock():
busy.value = False
def ack_message(ch,delivery_tag):
if ch.is_open:
ch.basic_ack(delivery_tag)
else:
pass
def nack_message(ch,delivery_tag):
if ch.is_open:
ch.basic_nack(delivery_tag=delivery_tag,requeue=True)
else:
pass
def main():
connection = pika.BlockingConnection(parameters=parameters)
channel = connection.channel()
queue_name = "work"
channel.queue_declare(queue=queue_name)
on_message_callback = functools.partial(on_message,connection=connection)
channel.basic_consume(queue=queue_name,on_message_callback=on_message_callback)
channel.start_consuming()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。