Python--进程之间的通信IPC、进程之间的数据共享、进程池

一、进程间通信---队列和管道(multiprocess.Queue、multiprocess.Pipe)

进程间通信:IPC(inter-Process Communication)

1、队列

创建共享的进程队列,Queue是多进程的安全的队列,可以使用Queue实现多进程之间的数据传递。

# Queue([maxsize]) 
创建共享的进程队列。
参数 :maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。
底层队列使用管道和锁定实现。

创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。 
Queue的实例q具有以下方法:

q.get( [ block [,timeout ] ] ) 
返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。

q.get_nowait( ) 
同q.get(False)方法。

q.put(item [,block [,timeout ] ] ) 
将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。

q.qsize() 
返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。


q.empty() 
如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。

q.full() 
如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。
q.close() 
关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。

q.cancel_join_thread() 
不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。

q.join_thread() 
连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行为。

代码实现:

'''
multiprocessing模块支持进程间通信的两种主要形式:管道和队列
都是基于消息传递实现的,但是队列接口
'''

from multiprocessing import Queue
q = Queue(3)     只能往这个队列放3个值

 put,get,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
 q.put(3)  # 如果队列已经满了,程序就会停在这里,等待数据被别人取走,再将数据放入队列。
             如果队列中的数据一直不被取走,程序就会永远停在这里。
try:
    q.put_nowait(3)     可以使用put_nowait,如果队列满了不会阻塞,但是会因为队列满了而报错。
except:      因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去,但是会丢掉这个消息。
    print('队列已经满了')

 因此,我们再放入数据之前,可以先看一下队列的状态,如果已经满了,就不继续put了。
print(q.full())     判断是否满了

print(q.get())
 print(q.get())   # 同put方法一样,如果队列已经空了,那么继续取就会出现阻塞。
:
    q.get_nowait(3)     可以使用get_nowait,如果队列满了不会阻塞,但是会因为没取到值而报错。
except:     因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去。
    队列已经空了print(q.empty())    判断是否空了
 Process,Queue

def consume(q):
    son-->',q.get())     取走队列一条数据
    q.put(abc')     给队列增加一条数据

if __name__ == __main__:
    q = Queue()
    p = Process(target=consume,args=(q,))
    p.start()
    q.put({haha': 123})     给队列增加一条数据
    p.join()     等待子进程执行完毕
    Foo--> 取走队列一条数据
# son--> {'haha': 123} Foo--> abc

2、生产者消费者模型:

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

为什么要使用生产者和消费者模式

  在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者消费者模式

  生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

 time
 random
 consumer(q,name):
     处理数据
    while 1:
        food = q.get()
        if food is None: break   如果不结束的话程序就会一直不结束
        time.sleep(random.uniform(0.5,1))
        %s吃了一个%s' % (name,food))

 producer(q,name,food):
     获取数据
    for i in range(10):
        time.sleep(random.uniform(0.3,0.8%s生产了%s%s' % (name,food,i + 1))
        q.put(food + str(i))

 Queue()
    Process(target=consumer,args=(q,alex)).start()
    Process(target=consumer,1)">wusir)).start()
    p1 = Process(target=producer,1)">yang包子))
    p2 = Process(target=producer,1)">sihao馒头))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    q.put(None)     有几个consumer就需要放几个None
    q.put(None)     结束信号

3、JoinableQueue([maxsize])

创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

JoinableQueue的实例p除了与Queue对象相同的方法之外,还具有以下方法:

q.task_done() 
使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。如果调用此方法的次数大于从队列中删除的项目数量,将引发ValueError异常。

q.join() 
生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到为队列中的每个项目均调用q.task_done()方法为止。 
下面的例子说明如何建立永远运行的进程,使用和处理队列上的项目。生产者将项目放入队列,并等待它们被处理。

方法介绍
 q.get()
        time.sleep(random.uniform(0.5,food))
        q.task_done()    通知队列已经有一个数据被处理了
        
 str(i))
        
 JoinableQueue()
    c1 = Process(target=consumer,1)">))
    c2 = Process(target=consumer,1)">))
    c1.daemon = True     设置守护进程
    c2.daemon = True     设置守护进程
    c1.start()
    c2.start()
    p1 = Process(target=producer,1)">))
    p1.start()
    p2.start()
    p1.join()    生产者要先把所有的数据都放到队列中
    p2.join()    生产者要先把所有的数据都放到队列中
    q.join()     阻塞直到放入队列中所有的数据都被处理掉(有多少个数据就接收到了多少task_done)

 二、进程之间的数据共享 

展望未来,基于消息传递的并发编程是大势所趋

  即便是使用线程,推荐做法也是将程序设计为大量独立的线程集合,通过消息队列交换数据。

  这样极大地减少了对使用锁定和其他同步手段的需求,还可以扩展到分布式系统中。

  但进程间应该尽量避免通信,即便需要通信,也应该选择进程安全的工具来避免加锁带来的问题。

  以后我们会尝试使用数据库来解决现在进程之间的数据共享问题。

进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的
虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Barrier,Queue,Value  Array.

Manager模块介绍

三、数据池和multiprocess.Pool模块

进程池:

  为什么要有进程池?进程池的概念。

  在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先,创建进程需要消耗时间,销毁进程也需要消耗时间。第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者结束进程。那么我们要怎么做呢?

  在这里,要给大家介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。

multiprocess.Pool 模块

 概念介绍:

Pool([numprocess  [,initializer [,initargs]]]):创建进程池

 参数介绍:
1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None
3 initargs:是要传给initializer的参数组

 主要方法:
1 p.apply(func [,args [,kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
2 需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,
必须从不同线程调用p.apply()函数或者使用p.apply_async()'''
3 p.apply_async(func [,然后返回结果。
4 此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,
将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。'''   
5 p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
6 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用

 其他方法(了解)
1 方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法
2 obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
3 obj.ready():如果调用完成,返回True
4 obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
5 obj.wait([timeout]):等待结果变为可用。
6 obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数

 代码实例:

同步:

 进程池的 同步调用  apply

 os
 time
from multiprocessing import Pool

 task(num):
    time.sleep(1)
    %s : %s (num,os.getpid()))
    return num**2

:
    p = Pool()
    in range(20):
        res = p.apply(task,args=(i,))   提交任务的方法,同步提交
        -->异步:
 进程池的 异步调用  apply_async

 Pool
 没有取返回值
):
        p.apply_async(task,args=(i,1)"> 提交任务的方法,异步提交
    p.close()
    p.join()

 通过队列取返回值
 Pool()
    res_lst = []
    ):
        res = p.apply_async(task,))            res_lst.append(res)
    for res in res_lst:
        print(res.get())

map() 方法:

 Pool

 Pool()
    p.map(task,range(20))

 

原文地址:https://www.cnblogs.com/tracydzf

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


本文从多个角度分析了vi编辑器保存退出命令。我们介绍了保存和退出vi编辑器的命令,以及如何撤销更改、移动光标、查找和替换文本等实用命令。希望这些技巧能帮助你更好地使用vi编辑器。
Python中的回车和换行是计算机中文本处理中的两个重要概念,它们在代码编写中扮演着非常重要的角色。本文从多个角度分析了Python中的回车和换行,包括回车和换行的概念、使用方法、使用场景和注意事项。通过本文的介绍,读者可以更好地理解和掌握Python中的回车和换行,从而编写出更加高效和规范的Python代码。
SQL Server启动不了错误1067是一种比较常见的故障,主要原因是数据库服务启动失败、权限不足和数据库文件损坏等。要解决这个问题,我们需要检查服务日志、重启服务器、检查文件权限和恢复数据库文件等。在日常的数据库运维工作中,我们应该时刻关注数据库的运行状况,及时发现并解决问题,以确保数据库的正常运行。
信息模块是一种可重复使用的、可编程的、可扩展的、可维护的、可测试的、可重构的软件组件。信息模块的端接需要从接口设计、数据格式、消息传递、函数调用等方面进行考虑。信息模块的端接需要满足高内聚、低耦合的原则,以保证系统的可扩展性和可维护性。
本文从电脑配置、PyCharm版本、Java版本、配置文件以及程序冲突等多个角度分析了Win10启动不了PyCharm的可能原因,并提供了解决方法。
本文主要从多个角度分析了安装SQL Server 2012时可能出现的错误,并提供了解决方法。
Pycharm是一款非常优秀的Python集成开发环境,它可以让Python开发者更加高效地进行代码编写、调试和测试。在Pycharm中设置解释器非常简单,我们可以通过创建新项目、修改项目解释器、设置全局解释器等多种方式进行设置。
Python中有多种方法可以将字符串转换为整数,包括使用int()函数、try-except语句、正则表达式、map()函数、ord()函数和reduce()函数。在实际应用中,应根据具体情况选择最合适的方法。
本文介绍了导入CSV文件的多种方法,包括使用Excel、Python和R等工具。同时,还介绍了导入CSV文件时需要注意的一些细节和问题。CSV文件是数据处理和分析中不可或缺的一部分,希望本文能够对读者有所帮助。
mongodb是一种新型的数据库,它采用了面向文档的数据模型,具有灵活性、高性能和高可用性等优势。但是,mongodb也存在数据结构混乱、安全性和学习成本高等问题。
当Python运行不了时,我们应该从代码、Python环境、操作系统和硬件设备等多个角度来排查问题,并采取相应的解决措施。
Python列表是一种常见的数据类型,排序是列表操作中的一个重要部分。本文介绍了Python列表降序排序的方法,包括使用sort()函数、sorted()函数以及自定义函数进行排序。使用sort()函数可以简单方便地实现降序排序,但会改变原始列表的顺序;使用sorted()函数可以保留原始列表的顺序,但需要创建一个新的列表;使用自定义函数可以灵活地控制排序的方式,但需要编写额外的代码。
本文介绍了如何使用Python输入一段英文并统计其中的单词个数,从去除标点符号、忽略单词大小写、排除常用词汇等多个角度进行了分析。此外,还介绍了使用NLTK库进行单词统计的方法。
虚拟环境可以帮助我们在同一台机器上运行不同版本的Python、安装不同的Python包,并且不会相互影响。创建虚拟环境的命令是python3 -m venv myenv,进入虚拟环境的命令是source myenv/bin/activate,退出虚拟环境的命令是deactivate。在虚拟环境中可以使用pip安装包,也可以使用Python运行程序。
本文从XHR对象、fetch API和jQuery三个方面分析了JS获取响应状态的方法及其应用。以上三种方法都可以轻松地发送HTTP请求,并处理响应数据。
桌面的命令包括常见的操作命令、系统命令、批处理命令以及第三方应用程序提供的命令。我们可以通过鼠标右键点击桌面、创建快捷方式、创建批处理文件等方式来运用这些命令,从而更好地管理计算机,提高工作效率。
本文分析了应用程序闪退的多个原因,包括应用程序本身存在问题、手机或平板电脑系统问题、硬件问题、网络问题和其他原因。同时,本文提供了解决闪退问题的多种方式,包括更新或卸载重新下载应用程序、升级系统或进行修复、清理手机缓存、清理不必要的文件或者是更换电池等方式来解决、确保网络信号的稳定性、注意用户隐私和安全问题。
本文介绍了使用Python下载图片的多种方法,包括使用Python标准库urllib.request、第三方库requests、多线程和异步IO。这些方法在不同情况下都有它们的优缺点。使用这些方法,我们可以轻松地将网络上的图片下载到本地,方便我们在离线状态下查看或处理这些图片。
MySQL数据文件是指存储MySQL数据库中数据的文件,存储位置的选择对数据库的性能、可靠性和安全性都有着重要的影响。本文从存储位置的选择、存储设备的选择、存储空间的管理和存储位置的安全性等多个角度对MySQL数据文件的存储位置进行分析,最后得出需要根据实际情况综合考虑多个因素,选择合适的存储位置和存储设备,并进行有效的存储空间管理和安全措施的结论。
AS400是一种主机操作系统,每个库都包含多个表。查询库表总数是一项基本任务。可以使用命令行、系统管理界面以及数据库管理工具来查询库表总数。查询库表总数可以帮助用户更好地管理和优化数据,包括规划数据存储、优化查询性能以及管理空间资源。