在了解进程之前,先了解多任务。多任务就是指操作系统能够执行多个任务。例如,使用Window或Linux操作系统可以同时看电影、聊天、查看网页信息等,此时操作系统就是在执行多任务,而每一个任务就是一个进程。
进程(process)是计算机已经运行程序的实体。进程与程序不同,程序本身只是指令、数据及其组织形式的描述,而进程才是(指令和数据)的真正运行实例。
multiprocessing模块提供了一个Process类来代表一个进程对象
语法格式:
Process(group,target,name,args,kwargs)
from multiprocessing import Process #执行子进程代码 def test(): print("当前子进程") #执行主程序 def main(): print("主进程开始") p=Process(target=test) #实例化Process进程类 p.start() #启动子进程 #p.join() #这个是等子进程全部运行完再运行主进程 print("主进程结束") if __name__=="__main__": main()
发现会优先执行主线程,在执行子线程
开启p.join()后
先实例化Process类,然后使用p.start()方法启动子进程 | 开始执行test()函数Process的实例p常用的方法除start()外,还有如下常用方法: |
---|---|
run() | 如果没有给定target参数,对这个对象调用start()方法时,就将执行对象中的run()方法. |
is_alive() | 判断进程实例是否还在执行。 |
join([timeout]) | 是否等待进程实例执行结束,或等待多少秒。 |
start() | 启动进程实例(创建子进程)。 |
terminate() | 不管任务是否完成,立即终止。 |
Precess类还有如下常用属性: |
---|
name :当前进程实例别名,默认为Process-N,N为从1开始递增的整数。 |
pid :当前进程实例的PID值。 |
示例:创建2个子进程,分别使用os模块和time模块输出父进程和子进程的ID以及子进程的时间,并调用Process类的name和 pid属性,代码如下:
from multiprocessing import Process import time import os # os.getpid()可获取当前进程id,返回值为int # os.getppid()可获取父进程id,返回值为int #两个子进程将会调用的俩个方法 def child_1(delay): print("子进程(%s)开始执行,父进程为(%s)" % (os.getpid(),os.getppid())) t_start = time.time() #计时开始 time.sleep(delay) #程序将会被挂起delay秒 t_end = time.time() #即使结束 print("子进程(%s)执行时间为%0.2f秒"%(os.getpid(),t_end-t_start)) def child_2(delay): print("子进程(%s)开始执行,父进程为(%s)" % (os.getpid(),os.getppid())) t_start = time.time() #计时开始 time.sleep(delay) #程序将会被挂起delay秒 t_end = time.time() #即使结束 print("子进程(%s)执行时间为%0.2f秒"%(os.getpid(),t_end-t_start)) if __name__ == "__main__": print("-------父进程开始执行-------") print("父进程PID:%s"%os.getppid) #输出当前程序的PID p1=Process(target=child_1,args=(1,)) #实例化进程p1 p2=Process(target=child_2,name="two",args=(2,)) #实例化进程p2 p1.start() #启动进程p1 p2.start() #启动进程p2 #同时父进程仍在往下执行,如果p1、p2进程还在执行,将会返回True print("p1.is_alive=%s"%p1.is_alive()) print("p2.is_alive=%s"%p2.is_alive()) #输入p1和p2进程的别名和PID print("p1.name=%s"%p1.name) print("p1.pid=%s"%p1.pid) print("p2.name=%s"%p2.name) print("p2.pid=%s"%p2.pid) print("----------等待子进程--------") p1.join() #等待p1进程结束 p2.join() #等待p2进程结束 print("----------父进程执行结束---------")
对于一些简单的小任务,通常使Process(target=test)方式实现多进程。但是如果要处理复杂任务的进程,通常定义一个类,使其继承Process类,每次实例化这个类的时候,就等同于实例化一个进程对象。
from multiprocessing import Process import time import os #继承Process类 class SubProcess(Process): #由于Process类本身也有__init__初始化方法,这个子类相当于重写了父类的这个方法 def __init__(self,delay,name=""): Process.__init__(self) #调用Process父类的初始化方法 #self.delay相当于全局变量 self.delay = delay #接收参数delay if name: #判断传递的参数是否存在 self.name = name #如果传递参数name,则为子进程创建的name属性,否自使用默认属性 def run(self): print("子进程(%s)开始执行,父进程为(%s)" % (os.getpid(),os.getppid())) t_start = time.time() #计时开始 time.sleep(self.delay) #程序将会被挂起delay秒 t_end = time.time() #即使结束 print("子进程(%s)执行时间为%0.2f秒"%(os.getpid(),t_end-t_start)) if __name__ == "__main__": print("-------父进程开始执行-------") print("父进程PID:%s"%os.getppid) #输出当前程序的PID p1 = SubProcess(delay=1) p2 = SubProcess(delay=2,name = "two") #对一个不包含target属性的Process类执行start()方法,就会运行这个类中的run()方法 p1.start() #启动进程p1 p2.start() #启动进程p2 #输出p1和p2进程的执行状态,如果真正执行,返回True,否则返回False print("p1.is_alive=%s"%p1.is_alive()) print("p2.is_alive=%s"%p2.is_alive()) #输入p1和p2进程的别名和PID print("p1.name=%s"%p1.name) print("p1.pid=%s"%p1.pid) print("p2.name=%s"%p2.name) print("p2.pid=%s"%p2.pid) print("----------等待子进程--------") p1.join() #等待p1进程结束 p2.join() #等待p2进程结束 print("----------父进程执行结束---------")
上述代码中,定义了一个SubProcess子类,继承multiprocess.Process 父类。SubProcess子类中定义了2个方法:init()初始化方法和 run()方法。在_init()初识化方法中,调用multiprocess.Process父类的_init()初始化方法,否则父类初始化方法会被覆盖,无法开启进程。此外,在 SubProcess子类中并没有定义start()方法,但在主进程中却调用了start()方法,此时就会自动执行SubPorcess类的run()方法。运行结果如图所示。
Pool进程池。为了更好的理解进程池,可以将进程池比作水池。我们需要完成放满10个水盆的水的任务,而在这个水池中,最多可以安放3个水盆接水,也就是同时可以执行3个任务,即开启3个进程。为更快完成任务,现在打开3个水龙头开始放水,当有一个水盆的水接满时,即该进程完成1个任务,我们就将这个水盆的水倒入水桶中,然后继续接水,即执行下一个任务。如果3个水盆每次同时装满水,那么在放满第9盆水后,系统会随机分配1个水盆接水,另外2个水盆空闲。
接下来,先来了解一下Pool类的常用方法。常用方法及说明如下:
关于什么是阻塞调用和非阻塞调用个人理解如下:
如果使用阻塞方式,必须等待上一个进程退出才能执行下一个进程,而使用非阻塞方式,则可以并行执行3个进程。
下面通过一个示例演示一下如何使用进程池创建多进程。这里模拟水池放水的场景,定义一个进程池,设置最大进程数为3。然后使用非阻塞方式执行10个任务,查看每个进程执行的任务。具体代码:
from multiprocessing import Pool import os,time def task(name): print("子进程:%s 执行task:%s " % (os.getpid(),name)) time.sleep(1) if __name__ == "__main__": print("父进程:%s"%os.getpid()) p = Pool(3) #定义一个进程池,最大进程为3 for i in range(10): p.apply_async(task,args=(i,)) #使用非阻塞方式调用task()函数 print("等待所有子进程结束....") p.close() #关闭进程池,关闭后p不在接收新的请求 p.join() #等待子进程结束 print("所有子进程结束")
发现pid:3972执行了四次,而pid:15464和8704只执行了三次
阻塞方式:
from multiprocessing import Pool import os,time def task(name): print("子进程:%s 执行task:%s " % (os.getpid(),name)) time.sleep(1) if __name__ == "__main__": print("父进程:%s"%os.getpid()) p = Pool(3) #定义一个进程池,最大进程为3 for i in range(10): # p.apply_async(task,args=(i,)) #使用非阻塞方式调用task()函数 p.apply(task,args=(i,)) #使用阻塞方式调用task()函数 print("等待所有子进程结束....") p.close() #关闭进程池,关闭后p不在接收新的请求 p.join() #等待子进程结束 print("所有子进程结束")
from multiprocessing import Process def add(): print("......子进程1开始......") global num num += 50 print("num:%d"%num) print("......子进程1结束......") def sub(): print("......子进程2开始......") global num num -= 50 print("num:%d"%num) print("......子进程2结束......") num = 100 #定义一个全局变量 if __name__=="__main__": print(".....主进程开始.....") print("num:%d"%num) #实例化进程p1,p2 p1 = Process(target=add) p2 = Process(target=sub) #开启进程p1,p2 p1.start() p2.start() #阻塞主进程,等待进程结束 p1.join() p2.join() print(".....主进程结束.....")
上述代码中,分别创建了2个子进程,一个子进程中令num 加上50,另一个子进程令num减去50。但是从运行结果可以看出,num在父进程和2个子进程中的初始值都是100。也就是全局变量num在一个进程中的结果,没有传递到下一个进程中,即进程之间没有共享信息。进程间示意图如图所示。
要如何才能实现进程间的通信呢?Python的multiprocessing模块包装了底层的机制,提供了Queue(队列)、Pipes(管道)等多种方式来交换数据。以下通过队列(Queue)来实现进程间
的通信。
进程之间有时需要通信,操作系统提供了很多机制来实现进程间的通信。可以使用multiprocessing模块的Queue实现多进程之间的数据传递。Queue本身是一个消息列队程序,下面介绍一下Queue的使用。
初始化Queue()对象时(例如:q=Queue(num)),若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头)。
Queue的常用方法如下;
Queue.qsize() :返回当前队列包含的消息数量。
Queue.empty():如果队列为空,返回 True;反之返回False 。
Queue.full():如果队列满了,返回 True;反之返回False。
Queue.get([block[, timeout]):获取队列中的一条消息,然后将其从列队中移除,block默认值为True。
如果block使用默认值,且没有设置timeout(单位秒),消息列队为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止。如果设置了timeout,则会等待timeout秒,若还没读取到任何消息,则抛出"Queue.Empty"异常。
如果block值为False,消息列队为空,则会立刻抛出“Queue.Empty”异常
Queue.get_nowait(:相当Queue.get(False)。
Oueue.put(item,[block[, timeout]]):将item消息写入队列,block 默认值为True。
如果block使用默认值,且没有设置timeout(单位秒),消息列队如果已经没有空间可写此时程序将被阻塞(停在写入状态),直到从消息列队腾出空间为止,如果设置了timeout则会等待timeout秒,若还没空间,则抛出“Queue.Full”异常。
如果block值为False,消息列队没有空间可写入,则会立刻抛出“Queue.Full”异常
Oueue.put_nowait(item):相当Queue.put(item, False)。
下面,通过一个例子学习一下如何使用processing.Queue。代码如下:
from multiprocessing import Queue if __name__=="__main__": q = Queue(3) #初始化一个queue对象,最多可接收三条put消息 q.put("消息1") q.put("消息2") print(q.full()) #q.full() 判断当前队列是否满了 q.put("消息3") print(q.full()) #因为消息队列已满,下面的try都会抛出异常 #第一个try会等待2秒后在抛出异常,第二个try会立刻抛出异常 try: q.put("消息4",True,2) except: print("消息队列已满,现有消息数量:%s"%q.qsize()) try: q.put_nowait("消息4") except: print("消息队列已满,现有消息数量:%s"%q.qsize()) #读取消息时,先判断消息队列是否为空,在读取 if not q.empty(): print("----从队列中获取消息----") for i in range(q.qsize()): print(q.get_nowait()) #先判断消息队列是否已将满,在写入 if not q.full(): q.put_nowait("消息4") if not q.empty(): print("----从队列中获取消息----") print(q.get_nowait())
我们知道使用multiprocessing.Process 可以创建多进程,使用multiprocessing.Queue可以实现队列的操作。接下来,通过一个示例结合Process和 Queue实现进程间的通信。创建2个子进程,一个子进程负责向队列中写入数据,另一个子进程负责从队列中读取数据。为保证能够正确从队列中读取数据,设置读取数据的进程等待时间为2秒。如果2秒后仍然无法读取数据,则抛出异常。代码如下:
from multiprocessing import Process,Queue import time #向队列写入数据 def write_task(q): if not q.full(): for i in range(5): message = "消息" + str(i) q.put(message) print("写入:%s"%message) #向队列读取数据 def read_task(q): time.sleep(1) while not q.empty(): #等待2秒,如果还没有读取到任何消息,则抛出"Queue.Empty" print("读取:%s"%q.get(True,2)) if __name__=="__main__": print("----父进程开始----") #父进程创建 q = Queue() pw = Process(target=write_task,args=(q,)) #实例化写入队列的子进程,并且传递队列 pr = Process(target=read_task,args=(q,)) #实例化读取队列的子进程,并且传递队列 pw.start() #启动子进程pw,写入 pr.start() #启动子进程pr,读取 pw.join() #等待pw结束 pr.join() #等待pr结束 print("----父进程结束----")