如何解决我应该在队列中使用池还是进程?
我有一个函数 (A
) 以恒定速率创建数据,假设每秒 100 个。我想在 B
创建的数据上运行另一个函数 (A
)。函数 B
的运行时间可能比 0.01s
长,但我不希望它备份数据流。我应该创建一个 Pool
的 B
并将一个通用的 Queue
传递给 A
和 B
来使用吗(如下面的代码)?我还看到您应该使用 Pool
来处理数据列表。这是应该如何使用它们(关于我描述的方法)?我应该只使用两个 Process
并交替向它们发送数据吗?
def A(queue):
while True:
data = data_getter()
queue.put(data)
def B(queue):
while True:
data = queue.get(True):
do_something(data)
# main.py
q = Queue()
pool = Pool(initializer=B,initargs=[q])
A(q)
解决方法
这是我的简短回答:
进程池存在的目的是允许您以并行方式最大程度地处理 N 个“作业”,前提是您已为该任务分配了 M 个物理处理器。 >
创建一个 Process
实例写入 N 次的队列(相当于提交 N 个“作业”),并让 M Process
个实例读取和处理这些消息,即“作业”,并处理它们,实际上是一种进程池的实现。使用单独的进程池来创建队列的读取进程所需的进程似乎是不必要的复杂层。因此,我将创建 M Process
个实例,这些实例从写入器进程向其添加消息的公共队列中读取。
TL;DR(或长答案)
正如您正确推测的那样,您可以通过 (1) 创建单个 Process
实例或 (2) 使用进程池来实现。方法 1 直观上似乎是最合乎逻辑的方法,但它不一定是最直接的代码。我在下面使用模拟介绍了几种方法,其中队列写入器进程每 0.01 秒创建一次队列条目,但队列读取器进程需要 0.06 秒来处理队列条目,以便至少有 6 个这样的进程 (从一个公共队列)需要跟上:
方法 1 -- 显式过程
import multiprocessing as mp
import time
class Sentinel():
pass
def a(queue,n_readers):
for i in range(1000):
time.sleep(.01)
queue.put(i)
print('queue size is now approximately: ',queue.qsize()) # print queue size
# signal readers to terminate:
end_of_queue = Sentinel()
for _ in range(n_readers):
queue.put(end_of_queue)
def b(queue):
while True:
value = queue.get(True)
# signal to terminate?
if isinstance(value,Sentinel):
break
print(value,flush=True)
time.sleep(.06)
def main():
n_readers = mp.cpu_count() - 1
queue = mp.Queue()
# create queue readers:
readers = [mp.Process(target=b,args=(queue,)) for _ in range(n_readers)]
for p in readers:
p.start()
# now start queue writer:
writer = mp.Process(target=a,n_readers))
writer.start()
# wait for writer to terminate:
writer.join()
for p in readers:
p.join()
print('Done')
if __name__ == '__main__':
main()
方法 2 - 使用进程池
import multiprocessing as mp
import time
class Sentinel():
pass
def init_pool(q):
global queue
queue = q
def a(n_readers):
for i in range(1000):
time.sleep(.01)
queue.put(i)
print('queue size is now approximately: ',queue.qsize()) # print queue size
end_of_queue = Sentinel()
for _ in range(n_readers):
queue.put(end_of_queue)
def b():
while True:
value = queue.get(True)
# signal to terminate?
if isinstance(value,flush=True)
time.sleep(.06)
def main():
n_readers = mp.cpu_count() - 1
queue = mp.Queue()
pool = mp.Pool(n_readers + 1,initializer=init_pool,initargs=(queue,))
readers_results = [pool.apply_async(b) for _ in range(n_readers)]
# now submit writer:
pool.apply(a,args=(n_readers,))
# wait for readers to finish:
for r in readers_results:
r.get()
print('Done')
if __name__ == '__main__':
main()
第二种方法的唯一优点是,如果工作进程 a
和/或 b
需要将值返回给主进程,使用进程池时就变得简单了。
注意
通过使用 B
构造函数的 initializer
参数来实现您的队列读取器进程,函数 Pool
也是可行的(参见下面的方法池 2A),但是函数 {{1 }} 必须在主进程下运行。但是这些 Pool 进程是守护进程,并且会在所有非守护进程终止后立即终止。这就是为什么我在方法 2 中安排将特殊哨兵消息写入队列作为“作业”(但不是运行作业的进程)的信号,以便在读取哨兵消息时终止。因此,我知道当作业完成时,队列中不再有消息,队列中也不会再有消息了。类似的逻辑适用于方法 1,除了整个过程也会终止,我可以使用 A
来知道什么时候发生。但是在您的情况下,使用隐式守护线程来执行队列的读取,即使您添加额外的代码以在读取所有输入队列值和初始化函数 join
时将哨兵值添加到队列中,终止,主进程怎么知道?同样,您可以在池上调用 B
方法,这可以防止任何未来的工作提交到池中(我们实际上从未明确提交工作;所有工作都在池初始化函数中完成)。然后调用 Pool.join()
,等待每个工作进程退出。这将在每个流程实例的池初始化函数完成后立即发生,因为之前对 Pool.join()
的调用告诉池永远不会有任何额外的工作添加到池中。
方法 2A - 使用带有池初始化器的进程池
Pool.close
注意事项
所有三种方法都可以工作,并且所有三种方法都预先假设读取器进程不会无限期运行,因此我们对有序终止感兴趣(因此需要哨兵值向读取器进程发出终止信号)。但是如果writer进程被设计为无限期运行,直到进程被用户中断,那么例如方法2a可以修改为使用用户输入ctrl-C产生的键盘中断来终止执行:
修改后的方法 2A 仅由键盘中断终止
import multiprocessing as mp
import time
class Sentinel():
pass
def a(queue,n_readers):
for i in range(1000):
time.sleep(.01)
queue.put(i)
end_of_queue = Sentinel()
for _ in range(n_readers):
queue.put(end_of_queue)
def b(the_queue):
global queue
queue = the_queue
while True:
value = queue.get(True)
# signal to terminate?
if isinstance(value,flush=True)
time.sleep(.06)
def main():
n_readers = mp.cpu_count() - 1
queue = mp.Queue()
pool = mp.Pool(n_readers,initializer=b,))
a(queue,n_readers)
# wait for readers to finish:
pool.close() # must be called before pool.join()
pool.join()
print('Done')
if __name__ == '__main__':
main()
修改方法 1 仅由键盘输入终止
import multiprocessing as mp
import time
import itertools
def a(queue,n_readers):
try:
for i in itertools.count(0):
time.sleep(.01)
queue.put(i)
except KeyboardInterrupt:
pass
def b(the_queue):
global queue
queue = the_queue
try:
while True:
value = queue.get(True)
print(value,end=' ',flush=True)
time.sleep(.06)
except KeyboardInterrupt:
pass
def main():
n_readers = mp.cpu_count() - 1
queue = mp.Queue()
pool = mp.Pool(n_readers,n_readers)
# wait for readers to finish:
pool.close() # must be called before pool.join()
try:
pool.join()
except KeyboardInterrupt:
pool.terminate()
print('Done')
if __name__ == '__main__':
main()
结论
你显然有选择。如果程序不是无限期运行,并且您希望有序关闭以确保所有已排队的消息都已处理,我的首选方法是方法 1。方法 2 和 2a 似乎是让 N 个进程执行此操作的懒惰方法为您提供相同参数的相同工作。
另一方面,如果您的编写器进程任务无休止地运行并且您需要终止它,并且不要介意队列中可能会留下一两个未处理的消息(毕竟您是在某个时间终止程序)任意时间点,所以应该没什么大不了的),那么如果一个简单的 import multiprocessing as mp
import time
import itertools
def a(queue,n_readers):
for i in itertools.count(0):
time.sleep(.01)
queue.put(i)
def b(queue):
while True:
value = queue.get(True)
if value % 100 == 0:
print(value,),daemon=True) for _ in range(n_readers)]
for p in readers:
p.start()
# now start queue writer:
writer = mp.Process(target=a,n_readers),daemon=True)
writer.start()
input('Enter return to terminate...')
print()
print('Done')
if __name__ == '__main__':
main()
语句足以输入命令终止,修改方法 1 似乎是需要最少修改的方法。但是如果正在运行的程序不断地输出消息,那么input
语句显示的文本就会丢失,并且每个进程都需要依赖键盘中断处理程序,这就比较复杂了。如果有任何修改过的示例,您可以使用此技术;我在修改方法 2a 中使用它作为示例,因为该代码不适合使用 input
语句技术,因为终端输出太多。毫无疑问,当有任何终端输出时,最可靠的方法是使用键盘处理程序中断处理程序方法。只要不需要从任何进程获取返回值,我仍然倾向于使用方法 1 及其变体而不是进程池:
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。