我应该在队列中使用池还是进程?

如何解决我应该在队列中使用池还是进程?

我有一个函数 (A) 以恒定速率创建数据,假设每秒 100 个。我想在 B 创建的数据上运行另一个函数 (A)。函数 B 的运行时间可能比 0.01s 长,但我不希望它备份数据流。我应该创建一个 PoolB 并将一个通用的 Queue 传递给 AB 来使用吗(如下面的代码)?我还看到您应该使用 Pool 来处理数据列表。这是应该如何使用它们(关于我描述的方法)?我应该只使用两个 Process 并交替向它们发送数据吗?

def A(queue):
  while True:
    data = data_getter()
    queue.put(data)
def B(queue):
  while True:
    data = queue.get(True):
    do_something(data)
# main.py
q = Queue()

pool = Pool(initializer=B,initargs=[q])

A(q)

解决方法

这是我的简短回答:

进程池存在的目的是允许您以并行方式最大程度地处理 N 个“作业”,前提是您已为该任务分配了 M 个物理处理器。 >

创建一个 Process 实例写入 N 次的队列(相当于提交 N 个“作业”),并让 M Process 个实例读取和处理这些消息,即“作业”,并处理它们,实际上是一种进程池的实现。使用单独的进程池来创建队列的读取进程所需的进程似乎是不必要的复杂层。因此,我将创建 M Process 个实例,这些实例从写入器进程向其添加消息的公共队列中读取。

TL;DR(或长答案)

正如您正确推测的那样,您可以通过 (1) 创建单个 Process 实例或 (2) 使用进程池来实现。方法 1 直观上似乎是最合乎逻辑的方法,但它不一定是最直接的代码。我在下面使用模拟介绍了几种方法,其中队列写入器进程每 0.01 秒创建一次队列条目,但队列读取器进程需要 0.06 秒来处理队列条目,以便至少有 6 个这样的进程 (从一个公共队列)需要跟上:

方法 1 -- 显式过程

import multiprocessing as mp
import time


class Sentinel():
    pass

def a(queue,n_readers):
    for i in range(1000):
        time.sleep(.01)
        queue.put(i)
    print('queue size is now approximately: ',queue.qsize()) # print queue size
    # signal readers to terminate:
    end_of_queue = Sentinel()
    for _ in range(n_readers):
        queue.put(end_of_queue)


def b(queue):
    while True:
        value = queue.get(True)
        # signal to terminate?
        if isinstance(value,Sentinel):
            break
        print(value,flush=True)
        time.sleep(.06)



def main():
    n_readers = mp.cpu_count() - 1
    queue = mp.Queue()
    # create queue readers:
    readers = [mp.Process(target=b,args=(queue,)) for _ in range(n_readers)]
    for p in readers:
        p.start()
    # now start queue writer:
    writer = mp.Process(target=a,n_readers))
    writer.start()
    # wait for writer to terminate:
    writer.join()
    for p in readers:
        p.join()
    print('Done')

if __name__ == '__main__':
    main()

方法 2 - 使用进程池

import multiprocessing as mp
import time


class Sentinel():
    pass

def init_pool(q):
    global queue
    queue = q

def a(n_readers):
    for i in range(1000):
        time.sleep(.01)
        queue.put(i)
    print('queue size is now approximately: ',queue.qsize()) # print queue size
    end_of_queue = Sentinel()
    for _ in range(n_readers):
        queue.put(end_of_queue)


def b():
    while True:
        value = queue.get(True)
        # signal to terminate?
        if isinstance(value,flush=True)
        time.sleep(.06)



def main():
    n_readers = mp.cpu_count() - 1
    queue = mp.Queue()
    pool = mp.Pool(n_readers + 1,initializer=init_pool,initargs=(queue,))
    readers_results = [pool.apply_async(b) for _ in range(n_readers)]
    # now submit writer:
    pool.apply(a,args=(n_readers,))
    # wait for readers to finish:
    for r in readers_results:
        r.get()
    print('Done')

if __name__ == '__main__':
    main()

第二种方法的唯一优点是,如果工作进程 a 和/或 b 需要将值返回给主进程,使用进程池时就变得简单了。

>

注意

通过使用 B 构造函数的 initializer 参数来实现您的队列读取器进程,函数 Pool 也是可行的(参见下面的方法池 2A),但是函数 {{1 }} 必须在主进程下运行。但是这些 Pool 进程是守护进程,并且会在所有非守护进程终止后立即终止。这就是为什么我在方法 2 中安排将特殊哨兵消息写入队列作为“作业”(但不是运行作业的进程)的信号,以便在读取哨兵消息时终止。因此,我知道当作业完成时,队列中不再有消息,队列中也不会再有消息了。类似的逻辑适用于方法 1,除了整个过程也会终止,我可以使用 A 来知道什么时候发生。但是在您的情况下,使用隐式守护线程来执行队列的读取,即使您添加额外的代码以在读取所有输入队列值和初始化函数 join 时将哨兵值添加到队列中,终止,主进程怎么知道?同样,您可以在池上调用 B 方法,这可以防止任何未来的工作提交到池中(我们实际上从未明确提交工作;所有工作都在池初始化函数中完成)。然后调用 Pool.join(),等待每个工作进程退出。这将在每个流程实例的池初始化函数完成后立即发生,因为之前对 Pool.join() 的调用告诉池永远不会有任何额外的工作添加到池中。

方法 2A - 使用带有池初始化器的进程池

Pool.close

注意事项

所有三种方法都可以工作,并且所有三种方法都预先假设读取器进程不会无限期运行,因此我们对有序终止感兴趣(因此需要哨兵值向读取器进程发出终止信号)。但是如果writer进程被设计为无限期运行,直到进程被用户中断,那么例如方法2a可以修改为使用用户输入ctrl-C产生的键盘中断来终止执行:

修改后的方法 2A 仅由键盘中断终止

import multiprocessing as mp
import time


class Sentinel():
    pass

def a(queue,n_readers):
    for i in range(1000):
        time.sleep(.01)
        queue.put(i)
    end_of_queue = Sentinel()
    for _ in range(n_readers):
        queue.put(end_of_queue)


def b(the_queue):
    global queue
    queue = the_queue
    while True:
        value = queue.get(True)
        # signal to terminate?
        if isinstance(value,flush=True)
        time.sleep(.06)



def main():
    n_readers = mp.cpu_count() - 1
    queue = mp.Queue()
    pool = mp.Pool(n_readers,initializer=b,))
    a(queue,n_readers)
    # wait for readers to finish:
    pool.close() # must be called before pool.join()
    pool.join()
    print('Done')

if __name__ == '__main__':
    main()

修改方法 1 仅由键盘输入终止

import multiprocessing as mp
import time
import itertools


def a(queue,n_readers):
    try:
        for i in itertools.count(0):
            time.sleep(.01)
            queue.put(i)
    except KeyboardInterrupt:
        pass



def b(the_queue):
    global queue
    queue = the_queue
    try:
        while True:
            value = queue.get(True)
            print(value,end=' ',flush=True)
            time.sleep(.06)
    except KeyboardInterrupt:
        pass



def main():
    n_readers = mp.cpu_count() - 1
    queue = mp.Queue()
    pool = mp.Pool(n_readers,n_readers)
    # wait for readers to finish:
    pool.close() # must be called before pool.join()
    try:
        pool.join()
    except KeyboardInterrupt:
        pool.terminate()
    print('Done')

if __name__ == '__main__':
    main()

结论

你显然有选择。如果程序不是无限期运行,并且您希望有序关闭以确保所有已排队的消息都已处理,我的首选方法是方法 1。方法 2 和 2a 似乎是让 N 个进程执行此操作的懒惰方法为您提供相同参数的相同工作。

另一方面,如果您的编写器进程任务无休止地运行并且您需要终止它,并且不要介意队列中可能会留下一两个未处理的消息(毕竟您是在某个时间终止程序)任意时间点,所以应该没什么大不了的),那么如果一个简单的 import multiprocessing as mp import time import itertools def a(queue,n_readers): for i in itertools.count(0): time.sleep(.01) queue.put(i) def b(queue): while True: value = queue.get(True) if value % 100 == 0: print(value,),daemon=True) for _ in range(n_readers)] for p in readers: p.start() # now start queue writer: writer = mp.Process(target=a,n_readers),daemon=True) writer.start() input('Enter return to terminate...') print() print('Done') if __name__ == '__main__': main() 语句足以输入命令终止,修改方法 1 似乎是需要最少修改的方法。但是如果正在运行的程序不断地输出消息,那么input语句显示的文本就会丢失,并且每个进程都需要依赖键盘中断处理程序,这就比较复杂了。如果有任何修改过的示例,您可以使用此技术;我在修改方法 2a 中使用它作为示例,因为该代码不适合使用 input 语句技术,因为终端输出太多。毫无疑问,当有任何终端输出时,最可靠的方法是使用键盘处理程序中断处理程序方法。只要不需要从任何进程获取返回值,我仍然倾向于使用方法 1 及其变体而不是进程池:

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


依赖报错 idea导入项目后依赖报错,解决方案:https://blog.csdn.net/weixin_42420249/article/details/81191861 依赖版本报错:更换其他版本 无法下载依赖可参考:https://blog.csdn.net/weixin_42628809/a
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下 2021-12-03 13:33:33.927 ERROR 7228 [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPL
错误1:gradle项目控制台输出为乱码 # 解决方案:https://blog.csdn.net/weixin_43501566/article/details/112482302 # 在gradle-wrapper.properties 添加以下内容 org.gradle.jvmargs=-Df
错误还原:在查询的过程中,传入的workType为0时,该条件不起作用 <select id="xxx"> SELECT di.id, di.name, di.work_type, di.updated... <where> <if test=&qu
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct redisServer’没有名为‘server_cpulist’的成员 redisSetCpuAffinity(server.server_cpulist); ^ server.c: 在函数‘hasActiveC
解决方案1 1、改项目中.idea/workspace.xml配置文件,增加dynamic.classpath参数 2、搜索PropertiesComponent,添加如下 <property name="dynamic.classpath" value="tru
删除根组件app.vue中的默认代码后报错:Module Error (from ./node_modules/eslint-loader/index.js): 解决方案:关闭ESlint代码检测,在项目根目录创建vue.config.js,在文件中添加 module.exports = { lin
查看spark默认的python版本 [root@master day27]# pyspark /home/software/spark-2.3.4-bin-hadoop2.7/conf/spark-env.sh: line 2: /usr/local/hadoop/bin/hadoop: No s
使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -> systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping("/hires") public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate<String
使用vite构建项目报错 C:\Users\ychen\work>npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-