如何解决队列为空时抛出测试异常
当Queue
为空时,我尝试用特殊消息测试捕获的异常时,测试失败。
这里是Queue类:
import time
from multiprocessing import Lock,Process,Queue,Pool
class QueueFun():
def writing_queue(self,work_tasks,name):
while True:
print("Writing to queue")
work_tasks.put('1')
time.sleep(3)
def read_queue(self,name):
while True:
print("Reading from queue",work_tasks.empty())
if work_tasks.empty():
raise ValueError('Queue should not be empty')
item = work_tasks.get()
time.sleep(1)
if __name__ == '__main__':
work_tasks = Queue()
queue_fun = QueueFun()
print('setup done')
write_queue_process = Process(target=queue_fun.writing_queue,args=(work_tasks,"write_to_queue_process"))
print('write_queue_process')
write_queue_process.start()
read_queue_process_1 = Process(target=queue_fun.read_queue,"read_from_queue_process_1"))
read_queue_process_1.start()
read_queue_process_2 = Process(target=queue_fun.read_queue,"read_from_queue_process_2"))
read_queue_process_2.start()
write_queue_process.join()
read_queue_process_1.join()
read_queue_process_2.join()
这是测试:
import unittest
from queue import Queue
from playground.QueueFun import QueueFun
class QueueFunTests(unittest.TestCase):
def test_empty_queue(self):
q = QueueFun()
work_tasks = Queue()
self.assertRaises(ValueError('Queue should not be empty'),q.read_queue(work_tasks,"1"))
测试失败:
raise ValueError('Queue should not be empty')
ValueError: Queue should not be empty
我没有正确执行测试吗?
解决方法
这可以按预期工作:
class QueueFunTests(unittest.TestCase):
def test_empty_queue(self):
q = QueueFun()
work_tasks = Queue()
with self.assertRaises(ValueError) as ctx:
q.read_queue(work_tasks,"1")
self.assertEqual("Queue should not be empty",str(ctx.exception))
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。