鼠兔`从空队列中弹出`

如何解决鼠兔`从空队列中弹出`

我在 kubernetes 集群中使用 pika 并使用队列中的消息,这会触发在新线程中启动一个函数。然而 RabbitMQ 似乎崩溃了,这些是迄今为止我发现的最好的日志:

2020-12-23 10:39:10,906] WARNING - WRITE indicated on fd=9,but writer callback is None; events=0b100 {/usr/local/lib/python3.9/site-packages/pika/adapters/utils/selector_ioloop_adapter.py:393}
(repeats to a total of n=38 times)
2020-12-23 10:39:10,908] ERROR - _AsyncBaseTransport._produce() failed,aborting connection: error=IndexError('pop from an empty deque'); sock=<socket.socket fd=9,family=AddressFamily.AF_INET,type=SocketKind.SOCK_STREAM,proto=6,laddr=('192.168.100.200',44892),raddr=('192.168.101.201',5672)>; Caller's stack:                                                                                 
Traceback (most recent call last):                                                                                                 
File "/usr/local/lib/python3.9/site-packages/pika/adapters/utils/io_services_utils.py",line 1097,in _on_socket_writable        
    self._produce()                                                                                                                
File "/usr/local/lib/python3.9/site-packages/pika/adapters/utils/io_services_utils.py",line 822,in _produce                    
    chunk = self._tx_buffers.popleft()                                                                                             
IndexError: pop from an empty deque                                                                                                
{/usr/local/lib/python3.9/site-packages/pika/adapters/utils/io_services_utils.py:1103}                                            
Traceback (most recent call last):                                                                                                 
File "/usr/local/lib/python3.9/site-packages/pika/adapters/utils/io_services_utils.py",in _produce                    
    chunk = self._tx_buffers.popleft()                                                                                             
IndexError: pop from an empty deque                                                                                                
2020-12-23 10:39:10,908] INFO - _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=IndexError('pop from an empty deque'); <socket.socket fd=9,5672)> {/usr/local/lib/python3.9/site-packages/pika/adapters/utils/io_services_utils.py:904}                                                                                                               
2020-12-23 10:39:10,908] INFO - Deactivating transport: state=1; <socket.socket fd=9,5672)> {/usr/local/lib/python3.9/site-packages/pika/adapters/utils/io_services_utils.py:869}
2020-12-23 10:39:10,909] ERROR - connection_lost: StreamLostError: ("Stream connection lost: IndexError('pop from an empty deque')",) {/usr/local/lib/python3.9/site-packages/pika/adapters/base_connection.py:428}                                                   
2020-12-23 10:39:10,909] INFO - AMQP stack terminated,failed to connect,or aborted: opened=True,error-arg=StreamLostError: ("Stream connection lost: IndexError('pop from an empty deque')",); pending-error=None {/usr/local/lib/python3.9/site-packages/pika/connection.py:1996}
2020-12-23 10:39:10,909] INFO - Stack terminated due to StreamLostError: ("Stream connection lost: IndexError('pop from an empty deque')",) {/usr/local/lib/python3.9/site-packages/pika/connection.py:2065}                                                          
2020-12-23 10:39:10,909] INFO - Closing transport socket and unlinking: state=2; <socket.socket fd=9,5672)> {/usr/local/lib/python3.9/site-packages/pika/adapters/utils/io_services_utils.py:882}      
2020-12-23 10:39:10,909] ERROR - Unexpected connection close detected: StreamLostError: ("Stream connection lost: IndexError('pop from an empty deque')",) {/usr/local/lib/python3.9/site-packages/pika/adapters/blocking_connection.py:520} 
2020-12-23 10:39:31,416] INFO - Pika version 1.1.0 connecting to ('192.168.101.201',5672) {/usr/local/lib/python3.9/site-packages/pika/adapters/utils/connection_workflow.py:179}                  
2020-12-23 10:39:31,417] INFO - Socket connected: <socket.socket fd=9,47142),5672)> {/usr/local/lib/python3.9/site-packages/pika/adapters/utils/io_services_utils.py:345}                                                                                                         
2020-12-23 10:39:31,418] INFO - Streaming transport linked up: (<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f81b3099a60>,_StreamingProtocolShim: <SelectConnection PROTOCOL transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f81b3099a60> params=<ConnectionParameters host=rabbitmq-0.rabbitmq.testing.svc.cluster.local port=5672 virtual_host=/ ssl=False>>). {/usr/local/lib/python3.9/site-packages/pika/adapters/utils/connection_workflow.py:428}
2020-12-23 10:39:31,421] INFO - AMQPConnector - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f81b3099a60> params=<ConnectionParameters host=rabbitmq-0.rabbitmq.testing.svc.cluster.local port=5672 virtual_host=/ ssl=False>> {/usr/local/lib/python3.9/site-packages/pika/adapters/utils/connection_workflow.py:293}
2020-12-23 10:39:31,421] INFO - AMQPConnectionWorkflow - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f81b3099a60> params=<ConnectionParameters host=rabbitmq-0.rabbitmq.testing.svc.cluster.local port=5672 virtual_host=/ ssl=False>> {/usr/local/lib/python3.9/site-packages/pika/adapters/utils/connection_workflow.py:725}                                                                                                                          
2020-12-23 10:39:31,421] INFO - Connection workflow succeeded: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f81b3099a60> params=<ConnectionParameters host=rabbitmq-0.rabbitmq.testing.svc.cluster.local port=5672 virtual_host=/ ssl=False>> {/usr/local/lib/python3.9/site-packages/pika/adapters/blocking_connection.py:452} 
2020-12-23 10:39:31,422] INFO - Created channel=1 {/usr/local/lib/python3.9/site-packages/pika/adapters/blocking_connection.py:1247
}

我的消费者有以下定义:

def publish_message(channel,message):
    channel.basic_publish(exchange='',routing_key='my_queue',body=message)


def connect_to_mq():
    credentials = pika.PlainCredentials(rabbit_user,rabbit_password)
    parameters = pika.ConnectionParameters(rabbit_host,rabbit_port,'/',credentials)
    connection = pika.BlockingConnection(parameters=parameters)
    channel = connection.channel()
    channel.queue_declare(queue='my_queue')
    return connection,channel
    
    
def on_message(channel,method_frame,header_frame,body):
    message = body.decode('utf-8')
    if message == 'do_work':
        thread = threading.Thread(target=start_processing,args=(channel,))
        thread.start()
        publish_message(channel,'initiated thread')
    
    
def start_processing(channel):
    publish_message(channel,'starting...')
    time.sleep(240)
    publish_message(channel,'processing complete!')


def main():
    connection,channel = connect_to_mq()
    channel.basic_consume(queue='my_queue',auto_ack=True,on_message_callback=on_message)

    channel.start_consuming()

我在单独的线程中处理消息和工作负载的实现和策略是否有任何固有的错误导致这种情况发生?

解决方法

Pika 默认不是线程安全的。理想情况下,您应该为每个线程保留一个连接。

有一堆示例实现 here,我有一个线程安全的 rpc 示例 here,您也可以查看,但我建议使用他们的参考实现之一进行线程处理。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 &lt;select id=&quot;xxx&quot;&gt; SELECT di.id, di.name, di.work_type, di.updated... &lt;where&gt; &lt;if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 &lt;property name=&quot;dynamic.classpath&quot; value=&quot;tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-