进程是什么?
通俗地理解就是一个运行的程序或者软件,进程是操作系统资源分配的基本单位
一个程序至少有一个进程,一个进程至少有一个线程,多进程可以完成多任务。
工作中,任务数往往大于cpu的核数,即一定有一些任务正在执行,而另外一些任务在等待cpu进行执行,因此导致了有了不同的状态。
比如initial
,started
,stopped
等等。
创建子进程其实就是对主进程资源的拷贝。
主进程会等待所有的子进程执行完成程序再退出。
主进程会等待所有的子进程执行完成程序再退出
创建一个子进程对象并命名为Subprocess
Subprocess = multiprocessing.Process(group=None, target=None, name=None, arg=(), kwargs={}, *, daemon=None)
参数 | 含义 |
---|---|
group | ? |
target | 子进程会调用的函数 |
name | 子进程的命名 |
args | 传入子进程的以元组为形式的参数 |
kwargs | 传入子进程的以字典为形式的参数 |
daemon | 是否为为主进程的守护线程 |
信息 | 方法 |
---|---|
进程信息 | multiprocessing.current_process() 子进程: Subprocess |
进程id | 主进程:multiprocessing.parent_process().pid os.getppid() 子进程: multiprocessing.current_process().pid os.getpid() |
进程状态 | multiprocessing.current_process().is_alive() Subprocess.is_alive() |
方法 | 用途 |
---|---|
is_alive() | 判断进程是否还在alive状态: 如果是,返回 True ; 如果否,返回 False |
join() | 阻塞调用该方法的进程,直到其结束,主进程再结束。 |
进程如何使用?
此处举一个简单的例子: 创建2个分别名为A
和B
的子线程, 2个子线程在各自的时间限制内完成相应的工作。
import multiprocessing import time import datetime def func1(startTime): ''' :param startTime: func 1 start time within 10s, print info per 2s. :return: ''' i = 0 while time.time() - startTime < 10.0: print(datetime.datetime.now(), multiprocessing.current_process().name, 'is Alive:', multiprocessing.current_process().is_alive(), i+1) i += 1 time.sleep(2) def func2(startTime): ''' :param startTime: fun 2 start time within 30s, print info per 5s. :return: ''' i = 0 while time.time() - startTime < 30.0: print(datetime.datetime.now(), multiprocessing.current_process().name, 'is Alive:', multiprocessing.current_process().is_alive(), i+1) i += 1 time.sleep(5) if __name__ == "__main__": startTime = time.time() ProA = multiprocessing.Process(target=func1, name='A', args=(startTime, )) ProB = multiprocessing.Process(target=func2, name='B', args=(startTime, )) print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive()) print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive()) print(ProA) print(ProB) ProA.start() ProB.start() print('*'*10, 'Since Now Start 2 Sub Processes') print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive()) print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive()) print(ProA) print(ProB) ########################################################### 2022-01-01 11:15:03.043149 A is Alive: False 2022-01-01 11:15:03.043205 B is Alive: False <Process name='A' parent=71015 initial> <Process name='B' parent=71015 initial> ********** Since Now Start 2 Sub Processes 2022-01-01 11:15:03.052383 A is Alive: True 2022-01-01 11:15:03.052439 B is Alive: True <Process name='A' pid=71017 parent=71015 started> <Process name='B' pid=71018 parent=71015 started> 2022-01-01 11:15:03.074236 B is Alive: True 1 2022-01-01 11:15:03.074280 A is Alive: True 1 2022-01-01 11:15:05.079339 A is Alive: True 2 2022-01-01 11:15:07.079945 A is Alive: True 3 2022-01-01 11:15:08.079214 B is Alive: True 2 2022-01-01 11:15:09.083995 A is Alive: True 4 2022-01-01 11:15:11.087975 A is Alive: True 5 2022-01-01 11:15:13.084359 B is Alive: True 3 2022-01-01 11:15:18.089441 B is Alive: True 4 2022-01-01 11:15:23.090834 B is Alive: True 5 2022-01-01 11:15:28.094172 B is Alive: True 6 Process finished with exit code 0
进程间不共享全局变量。此处举一个简单的例子,全局变量gv
为一个列表,创建2个分别名为A
和B
的子线程, A
向列表gv
中添加元素,B
在A
结束后读取列表gv
import multiprocessing import time import datetime # global variance gv gv = [] # define target func for sub process def func1(): for i in range(10): gv.append(i) time.sleep(0.1) print(datetime.datetime.now(), multiprocessing.current_process().name, gv) def func2(): print(datetime.datetime.now(), multiprocessing.current_process().name, gv) if __name__ == "__main__": ProA = multiprocessing.Process(target=func1, name='A') ProB = multiprocessing.Process(target=func2, name='B') print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive()) print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive()) ProA.start() print('*'*10, 'Since Now Start Subprocess A') print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive()) print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive()) ProA.join() print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive()) print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive()) ProB.start() print('*'*10, 'Since Now Start Subprocess B') print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive()) print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive()) ########################################################### 2022-01-01 13:25:55.476295 A is Alive: False 2022-01-01 13:25:55.476317 B is Alive: False ********** Since Now Start Subprocess A 2022-01-01 13:25:55.481328 A is Alive: True 2022-01-01 13:25:55.481381 B is Alive: False 2022-01-01 13:25:56.547771 A [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 2022-01-01 13:25:56.553970 A is Alive: False 2022-01-01 13:25:56.554019 B is Alive: False ********** Since Now Start Subprocess B 2022-01-01 13:25:56.555426 A is Alive: False 2022-01-01 13:25:56.555611 B is Alive: True 2022-01-01 13:25:56.588454 B [] Process finished with exit code 0
可以使用Queue
实现多进程之间的数据传递,Queue本身是一个消息队列程序。
from multiprocessing import Process, Queue, current_process import datetime def func1(q): for i in range(5): if not q.full(): print('Put %s to queue...' % i) q.put(i) else: print(current_process().name, 'is Full', current_process()) def func2(q): while True: if not q.empty(): i = q.get(True) print('Get %s from queue.' % i) else: break if __name__=='__main__': q = Queue(2) ProA = Process(target=func1, name='A', args=(q,)) ProB = Process(target=func2, name='B', args=(q,)) ProA.daemon = True ProB.daemon = True print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive()) print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive()) ProA.start() print('*' * 10, 'Since Now Start Subprocess A') print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive()) print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive()) ProA.join() print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive()) print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive()) ProB.start() print('*' * 10, 'Since Now Start Subprocess B') print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive()) print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive()) ProB.join() print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive()) print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive()) ########################################################### 2022-01-01 15:49:42.447990 A is Alive: False 2022-01-01 15:49:42.448040 B is Alive: False ********** Since Now Start Subprocess A 2022-01-01 15:49:42.449512 A is Alive: True 2022-01-01 15:49:42.449565 B is Alive: False Put 0 to queue... Put 1 to queue... A is Full <Process name='A' parent=74585 started daemon> A is Full <Process name='A' parent=74585 started daemon> A is Full <Process name='A' parent=74585 started daemon> 2022-01-01 15:49:42.487308 A is Alive: False 2022-01-01 15:49:42.487346 B is Alive: False ********** Since Now Start Subprocess B 2022-01-01 15:49:42.488337 A is Alive: False 2022-01-01 15:49:42.488456 B is Alive: True Get 0 from queue. Get 1 from queue. 2022-01-01 15:49:42.516255 A is Alive: False 2022-01-01 15:49:42.516273 B is Alive: False Process finished with exit code 0
如果此时把q = Queue(2)
改为q = Queue(10)
,则可以得到以下结果,
########################################################### 2022-01-01 15:53:25.808938 A is Alive: False 2022-01-01 15:53:25.809006 B is Alive: False ********** Since Now Start Subprocess A 2022-01-01 15:53:25.810559 A is Alive: True 2022-01-01 15:53:25.810614 B is Alive: False Put 0 to queue... Put 1 to queue... Put 2 to queue... Put 3 to queue... Put 4 to queue... 2022-01-01 15:53:25.852826 A is Alive: False 2022-01-01 15:53:25.852879 B is Alive: False ********** Since Now Start Subprocess B 2022-01-01 15:53:25.854026 A is Alive: False 2022-01-01 15:53:25.854077 B is Alive: True Get 0 from queue. Get 1 from queue. Get 2 from queue. Get 3 from queue. Get 4 from queue. 2022-01-01 15:53:25.903884 A is Alive: False 2022-01-01 15:53:25.903918 B is Alive: False Process finished with exit code 0
如果要塞进
q
的msg数量比初始化Queue
时填入的数量多,应该以q.full()
作为一个判断条件限制填入q
的数量。
池子里面放的是进程,进程池会根据任务执行情况自动创建进程,而且尽量少创建进程,合理利用进程池中的进程完成多任务。
当需要创建的子进程数量不多时,可以直接利用multiprocess中的Process动态生成多个进程,但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到multiprocess模块提供的Pool方法。
初始化Pool时,可以指定一个最大进程数,当有新的请求提到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求,但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务。
使用进程池创建的进程是守护主进程的状态,默认自己通过Process创建的进程是不守护主进程的状态
from multiprocessing import Manager, Pool, current_process, parent_process import os def func2(q): print(current_process().name, os.getpid(), 'start') for i in range(q.qsize()): print("get from q:%s" % q.get(True)) def func1(q): print(current_process().name, os.getpid(), 'start') msg = ['A', 'B', 'C', 'D', 'E'] for i in msg: print("put in p:%s" % i) q.put(i) if __name__ == "__main__": print(current_process().name, os.getpid(), 'start') q = Manager().Queue() po = Pool() po.apply(func1, (q,)) po.apply(func2, (q,)) po.close() po.join() print(current_process().name, os.getpid(), 'end') ########################################################### MainProcess 80712 start SpawnPoolWorker-2 80715 start put in p:A put in p:B put in p:C put in p:D put in p:E SpawnPoolWorker-6 80719 start get from q:A get from q:B get from q:C get from q:D get from q:E MainProcess 80712 end Process finished with exit code 0
进程池同步执行任务表示进程池中的进程在执行任务的时候一个执行完成另外一个才能执行,如果没有执行完会等待上一个进程执行。
在下面的示例中,进程池数量被限制在3,但总共需要调用func
5次。同一时间会开启3个进程,3个进程先后调用func
并且每个仅调用一次,不总是维持在3个进程,直到累计调用func
5次后同时结束3个进程。
from multiprocessing import Pool, current_process import os, time, datetime def func(): print(datetime.datetime.now(), current_process().name, current_process().pid, 'start;', 'current process count', len(psutil.pids())) time.sleep(2) print(datetime.datetime.now(), current_process().name, current_process().pid, 'end') if __name__ == '__main__': print(current_process().name, os.getpid(), 'start;', 'current process count', len(psutil.pids())) pool = Pool(3) for i in range(5): pool.apply(func) pool.close() pool.join() print(current_process().name, os.getpid(), 'end;', 'current process count', len(psutil.pids())) ########################################################### MainProcess 82913 start; current process count 427 2022-01-02 13:34:59.638308 SpawnPoolWorker-1 82915 start; current process count 431 2022-01-02 13:35:01.643672 SpawnPoolWorker-1 82915 end 2022-01-02 13:35:01.646509 SpawnPoolWorker-2 82916 start; current process count 431 2022-01-02 13:35:03.652383 SpawnPoolWorker-2 82916 end 2022-01-02 13:35:03.654136 SpawnPoolWorker-3 82917 start; current process count 431 2022-01-02 13:35:05.657574 SpawnPoolWorker-3 82917 end 2022-01-02 13:35:05.658831 SpawnPoolWorker-1 82915 start; current process count 428 2022-01-02 13:35:07.662861 SpawnPoolWorker-1 82915 end 2022-01-02 13:35:07.664195 SpawnPoolWorker-2 82916 start; current process count 427 2022-01-02 13:35:09.670667 SpawnPoolWorker-2 82916 end MainProcess 82913 end; current process count 424 Process finished with exit code 0
进程池异步执行任务表示进程池中的进程同时执行任务,进程之间不会等待。
在下面的示例中,进程池进程数量被限制在3,但总共需要调用func
5次。同一时间会开启3个进程,每个调用func
一次,每结束1个的同时并开启一个,总是维持在3个进程,直到累计调用func
5次后同时结束3个进程。
from multiprocessing import Pool, current_process import os, datetime, time, psutil def func(): print(datetime.datetime.now(), current_process().name, current_process().pid, 'start;', 'current process count', len(psutil.pids())) time.sleep(2) print(datetime.datetime.now(), current_process().name, current_process().pid, 'end') if __name__ == '__main__': print(current_process().name, os.getpid(), 'start;', 'current process count', len(psutil.pids())) pool = Pool(3) for i in range(5): pool.apply_async(func) pool.close() pool.join() print(current_process().name, os.getpid(), 'end;', 'current process count', len(psutil.pids())) ########################################################### MainProcess 82636 start; current process count 424 2022-01-02 13:18:42.792101 SpawnPoolWorker-1 82638 start; current process count 428 2022-01-02 13:18:42.802557 SpawnPoolWorker-2 82639 start; current process count 428 2022-01-02 13:18:42.803372 SpawnPoolWorker-3 82640 start; current process count 428 2022-01-02 13:18:44.797576 SpawnPoolWorker-1 82638 end 2022-01-02 13:18:44.798311 SpawnPoolWorker-1 82638 start; current process count 428 2022-01-02 13:18:44.807925 SpawnPoolWorker-2 82639 end 2022-01-02 13:18:44.808555 SpawnPoolWorker-3 82640 end 2022-01-02 13:18:44.808436 SpawnPoolWorker-2 82639 start; current process count 428 2022-01-02 13:18:46.802475 SpawnPoolWorker-1 82638 end 2022-01-02 13:18:46.813188 SpawnPoolWorker-2 82639 end MainProcess 82636 end; current process count 425 Process finished with exit code 0
写此文时有参考以下链接
#Python干货#Python网络编程——进程
queue — 一个同步的队列类
python之Queue实现进程间的通信
Python之psutil,查看CPU、内存、网络等使用情况