Unix和Linux操作系统上提供了fork()
系统调用来创建进程,调用fork()
函数的是父进程,创建出的是子进程,子进程是父进程的一个拷贝,但是子进程拥有自己的PID。fork()
函数非常特殊它会返回两次,父进程中可以通过fork()
函数的返回值得到子进程的PID,而子进程中的返回值永远都是0。
Python的OS
模块提供了fork()
函数。由于Windows系统没有fork()
调用,因此要实现跨平台的多进程编程,可以使用multiprocessing
模块的Process
类来创建子进程,而且该模块还提供了更高级的封装,例如批量启动进程的进程池(Pool
)、用于进程间通信的队列(Queue
)和管道(Pipe
)等。
在Unix/Linux下,multiprocessing模块封装了fork()调用,使我们不需要关注fork()的细节。由于Windows没有fork调用,因此,multiprocessing需要“模拟”出fork的效果,父进程所有Python对象都必须通过pickle序列化再传到子进程去,所有,如果multiprocessing在Windows下调用失败了,要先考虑是不是pickle失败了。
应确保主模块可以被新启动的 Python 解释器安全导入而不会引发什么副作用(比如又启动了一个子进程)。例如,使用 spawn 或 forkserver 启动方式执行下面的模块,会引发 RuntimeError
异常而失败。
from multiprocessing import Process def foo(): print('hello') p = Process(target=foo) p.start()
应该通过下面的方法使用 if __name__ == '__main__':
,从而保护程序"入口点":
from multiprocessing import Process, freeze_support, set_start_method def foo(): print('hello') if __name__ == '__main__': freeze_support() set_start_method('spawn') p = Process(target=foo) p.start()
这允许新启动的 Python 解释器安全导入模块然后运行模块中的 foo() 函数。如果主模块中创建了进程池或者管理器,这个规则也适用。
如果程序将正常运行而不是冻结,则可以省略 freeze_support() 行。
更多信息请参考 multiprocessing 编程指导 -- 使用 multiprocessing 时,应遵循一些指导原则和习惯用法。
is_alive(): 判断进程是否存活
name 属性:进程的名字,自定义(Process 创建进程的时候以参数的形式给出即可);
pid 属性:每个进程的唯一 PID 编号;
daemon 属性:bool,父进程终止后自动终止,且自己不能产生新进程,必须在 start() 之前设置;
exitcode 属性:进程在运行时为 None、如果为–N,表示被信号N结束。
from multiprocessing import Process from os import getpid from time import time,sleep def download(filename, n): print(filename+'pid[%d]' % getpid()) sleep(n) print(filename+'finished') def main(): start = time() p1 = Process(target=download, args=('a', 1)) # 或者 target=download, args=('a', 1), name='process0' p1.start() # 创建进程; start 只是给操作系统发送信号, 先开启哪个子进程是随机的; 当然, 此时已经启动了; p2 = Process(target=download,args=('b', 3)) p2.start() # 创建进程 p1.join() # join 等待指定的子进程执行完后再继续执行主进程, 通常用于进程间的同步; p2.join() # p1.join() 等待 p1 执行完之后再执行 p2.join(); end = time() print('total:', end-start) if __name__ == '__main__': main()
输出:
bpid[1440] apid[14164] afinished bfinished total: 3.115839719772339
join() 等待指定的子进程执行完后再继续执行主进程,通常用于进程间的同步;此处,p1.join() 等待 p1 执行完之后再执行 p2.join();
需要注意的是:join() 是让主线程等,而 p1,p2 在 p1.join() 的时候,p2 等子进程仍然在并发执行的,可能等 p1.join() 结束,p2 等子进程早已经结束了,这样 p2.join() 等将直接通过检测,无需等待,所以多个 join 花费的总时间仍然是耗费时间最长的那个进程运行的时间。
如果是 p1.start(),p1.join(),p2.start(),p2.join() 这个顺序的话,将是串行的执行方式。 此时,如果注释掉 p2.join() 的话,将直接执行后面的语句,不再等待 p2 进程的结束。也就是说,如果没有 .join()方法的话,将直接执行主进程后面的程序,不会等待子进程执行完之后再去执行;
在 Process 没有指定 target 函数时,默认用 run() 函数运行程序;在线程中,继承 Thread 的类也是这样:
from multiprocessing import Process from os import getpid from time import time, sleep class DownloadProcess(Process): def __init__(self, filename, num): super().__init__() self.filename = filename self.num = num def run(self): # 重载 run 方法; 默认用 run() 运行程序 print(self.filename+'pid [%d]' % getpid()) sleep(self.num) print(self.filename+'finished') def main(): start = time() process1 = DownloadProcess('a', 3) process1.start() process2 = DownloadProcess('b', 1) process2.start() process1.join() process2.join() end = time() print('total:', end-start) if __name__=='__main__': main()
输出:
apid [21396] bpid [10652] bfinished afinished total: 3.101225147247314
Pool(processes=num):设置同时运行进程数,当一个进程运行完,才会添加新的进程进去;
apply_async(函数, (参数,)):启动进程池; 非阻塞:同时运行设置的进程数,等有进程结束,再添加新的进程
apply(函数, (参数,)):启动进程池; 阻塞:当一个进程结束后, 再进行下一个进程, 一般都使用非阻塞 apply_async()
close():关闭 Pool,不能再添加新的任务;
terminate():结束运行的进程,不再处理未完成的任务;
join():与 Process 的作用一样, 但要在 close 或 terminate 之后使用。
方法名 | 功能 |
---|---|
apply( func[, args[, kwds]] ) | 将 func 函数提交给进程池处理。其中 args 代表传给 func 的位置参数,kwds 代表传给 func 的关键字参数。该方法会被阻塞直到 func 函数执行完成。 |
apply_async( func[, args[, kwds[, callback[, error_callback]]]] ) | 这是 apply() 方法的异步版本,该方法不会被阻塞。其中 callback 指定 func 函数完成后的回调函数,error_callback 指定 func 函数出错后的回调函数。 |
map( func, iterable[, chunksize] ) | 类似于 Python 的 map() 全局函数,只不过此处使用新进程对 iterable 的每一个元素执行 func 函数。 |
map_async( func, iterable[, chunksize[, callback[, error_callback]]] ) | 这是 map() 方法的异步版本,该方法不会被阻塞。其中 callback 指定 func 函数完成后的回调函数,error_callback 指定 func 函数出错后的回调函数。 |
imap( func, iterable[, chunksize] ) | 这是 map() 方法的延迟版本。 |
imap_unordered( func, iterable[, chunksize] ) | 功能类似于 imap() 方法,但该方法不能保证所生成的结果(包含多个元素)与原 iterable 中的元素顺序一致。 |
starmap( func, iterable[,chunksize] ) | 功能类似于 map() 方法,但该方法要求 iterable 的元素也是 iterable 对象,程序会将每一个元素解包之后作为 func 函数的参数。 |
close() | 关闭进程池。在调用该方法之后,该进程池不能再接收新任务,它会把当前进程池中的所有任务执行完成后再关闭自己。 |
terminate() | 立即中止进程池。 |
join() | 等待所有进程完成。 |
""" Pool(processes=num):设置同时运行进程数,当一个进程运行完,才会添加新的进程进去; apply_async(函数, (参数,)):启动进程池; 非阻塞:同时运行设置的进程数,等有进程结束,再添加新的进程 apply(函数, (参数,)):启动进程池; 阻塞:当一个进程结束后, 再进行下一个进程, 一般都使用非阻塞 apply_async() close():关闭 Pool,不能再添加新的任务; terminate():结束运行的进程,不再处理未完成的任务; join():与 Process 的作用一样, 但要在 close 或 terminate 之后使用。 """
进程池能够将众多进程放在一起,设置一个运行进程上限,每次只同时运行设置的进程数,等有进程结束,再添加新的进程。如果要启动大量的子进程,可以用进程池的方式批量创建子进程:
from multiprocessing import Pool from time import time,sleep from os import getpid from random import random def task(name): print('Run task %s in (%s)...' % (name, getpid())) start = time() sleep(random()*3) # 这里用 random 看的清楚点 end = time() print('Task %s runs %0.2f seconds.' % (name, (end - start))) if __name__=='__main__': start = time() print('Parent process %s.' % getpid()) p = Pool(4) # 创建了四个子进程 for i in range(5): # 循环了五次 p.apply_async(task, (i,)) print('Waiting for all subprocesses done...') p.close() # 关闭进程池; 不能再添加新的任务 p.join() # Pool.join() 方法会等待所有子进程执行完毕,调用 join() 之前必须先调用 close() print('All subprocesses done.') end = time() print('total:', end-start)
输出结果:
Parent process 14368. Waiting for all subprocesses done... Run task 0 in (10880)... Run task 1 in (3576)... Run task 2 in (8668)... Run task 3 in (5456)... Task 2 runs 0.52 seconds. Run task 4 in (8668)... Task 3 runs 0.82 seconds. Task 1 runs 2.02 seconds. Task 4 runs 1.80 seconds. Task 0 runs 2.72 seconds. All subprocesses done. total: 2.929426431655884
可以看到,task 0,1,2,3 是立刻执行的,而 task 4 要等待前面某个 task 完成后才执行,这是因为 Pool 设置的是最多同时执行 4 个进程。这是Pool有意设计的限制,并不是操作系统的限制。如果改成:p = Pool(5)
就可以同时跑5个进程。
另外,还可以使用 with 语句来管理进程池,这意味着我们无需手动调用 close() 方法关闭进程池。例如:
with Pool(processes=4) as pool: # 创建包含 4 条进程的进程池 adds = pool.map(action, ('http://c.biancheng.net/python/', 'http://c.biancheng.net/java/', 'http://c.biancheng.net/shell/')) for arc in adds: print(arc)
map(func, iterable[, chunksize])
内置函数的并行版本 (但它只支持一个 iterable 参数,对于多个可迭代对象请参阅 starmap()
)。 它会保持阻塞直到获得结果。
这个方法会将可迭代对象分割为许多块,然后提交给进程池。可以将 chunksize 设置为一个正整数从而(近似)指定每个块的大小可以。
注意对于很长的迭代对象,可能消耗很多内存。可以考虑使用 imap()
或 imap_unordered()
并且显示指定 chunksize 以提升效率。
生产-消费者模式:一个生产数据,一个消费数据就行。
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了,生产者不需要关心消费者,消费者也不需要关心生产者。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯。阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
1.使用 Queue 实现
队列是线程和进程安全的。启动两个进程,一个输出 Ping,一个输出 Pong,两个进程输出的 Ping 和 Pong 加起来一共10个:
from multiprocessing import Process, Queue from os import getpid from time import time,sleep def producer(queue): print('pid [%d]' % getpid()) for i in range(10): # 这两个参数的处理应该还有更好的方法,暂时还不知道! if i%2 == 0: queue.put('Ping') else: queue.put('Pong') def consumer(queue): print('pid [%d]' % getpid()) for i in range(10): print(queue.get()) # 塞入什么,出来什么;塞入列表,出来列表 if __name__ == '__main__': # 多进程之类的,一定要这么写吗? queue = Queue() p3 = Process(target=producer, args=(queue,)) p4 = Process(target=consumer, args=(queue,)) p3.start() p4.start()
不过,需要注意的是:如果是进程池创建的进程之间通信, 就需要使⽤ multiprocessing.Manager()
中的 Queue()
:
if __name__ == "__main__": q = Manager().Queue() pool = Pool(2) for i in range(2): po.apply_async(producer, (q,)) po.apply_async(consumer, (q,)) pool.close() pool.join()
分布式进程 -- 待续
在 Thread 和 Process中,应当优选 Process,因为 Process 更稳定,而且,Process 可以分布到多台机器上,而 Thread 最多只能分布到同一机器的多个CPU上.
multiprocessing 模块不但支持多进程,其中 managers 子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于 managers 模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。
Queue 对象的一些方法:
''' 1.get([block=True[, timeout=None]]): 获取队列中一条消息 然后将其从队列中移除; get_nowait() 相当于 get(False), 取不到值, 触发异常; 如果可选参数 block 是 True (默认值), 且 timeout 是 None (默认值)时, 将会阻塞当前进程,直到队列中出现可用的对象; 如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的对象时抛出 queue.Empty 异常; 反之 block 是 False 时,仅当有可用对象能够取出时返回,否则立即抛出 queue.Empty 异常 (在这种情形下 timeout 参数会被忽略); 2.put(arg1[, block=True[, timeout=None]]): 将一个值添加到数列; put_nowait() 相当于 put(arg1, False), 当队列满时, 报错; 如果可选参数 block 是 True (默认值), 且 timeout 是 None (默认值)时, 将会阻塞当前进程,直到有空的缓冲槽; 如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的缓冲槽时抛出 queue.Full异常; 反之 block 是 False 时,仅当有可用缓冲槽时才放入对象,否则立即抛出 queue.Full 异常 (在这种情形下 timeout 参数会被忽略); 3.qsize(): 返回当前队列包含的消息数量; 4.empty(): 队列为空时返回 True, 反之, False; 5.full():队列满了返回 True, 反之, False; '''
2.使用 Pipe 实现 -- 待续 Python进程间通信的2种实现方法(Queue和Pipe) (biancheng.net)
方法名 | 功能 |
---|---|
send(obj) | 发送一个 obj 给管道的另一端,另一端使用 recv() 方法接收。需要说明的是,该 obj 必须是可序列化的,如果该对象序列化之后超过 32MB,则很可能会引发 ValueError 异常。 |
recv() | 接收另一端通过 send() 方法发送过来的数据。 |
close() | 关闭连接。 |
poll([timeout]) | 返回连接中是否还有数据可以读取。 |
send_bytes(buffer[, offset[, size]]) | 发送字节数据。如果没有指定 offset、size 参数,则默认发送 buffer 字节串的全部数据;如果指定了 offset 和 size 参数,则只发送 buffer 字节串中从 offset 开始、长度为 size 的字节数据。通过该方法发送的数据,应该使用 recv_bytes() 或 recv_bytes_into 方法接收。 |
recv_bytes([maxlength]) | 接收通过 send_bytes() 方法发送的数据,maxlength 指定最多接收的字节数。该方法返回接收到的字节数据。 |
recv_bytes_into(buffer[, offset]) | 功能与 recv_bytes() 方法类似,只是该方法将接收到的数据放在 buffer 中。 |
很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。subprocess模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出。下面的例子演示了如何在 Python 代码中运行命令 nslookup www.python.org
,这和命令行直接运行的效果是一样的:
import subprocess print('$ nslookup www.python.org') r = subprocess.call(['nslookup', 'www.python.org']) print('Exit code:', r)
from threading import Thread from time import time, sleep def download(filename, n): print(filename+'pid [%d]' % getpid()) sleep(n) print(filename+'finished') def main(): start = time() thread1 = Thread(target=download, args=('a', 3)) thread1.start() thread2 = Thread(target=download, args=('b', 1)) thread2.start() thread1.join() thread2.join() end = time() print('total:', end-start) if __name__=='__main__': main()
输出:
apid [5272] bpid [5272] bfinished afinished total: 3.0152690410614014
from threading import Thread from time import time, sleep class DowloadThread(Thread): def __init__(self, filename, num): super().__init__() self.filename = filename self.num = num def run(self): print(self.filename+'pid [%d]' % getpid()) sleep(self.num) print(self.filename+'finished') def main(): start = time() thread1 = DowloadThread('a', 3) thread1.start() thread2 = DowloadThread('b', 1) thread2.start() thread1.join() thread2.join() end = time() print('total:', end-start) if __name__=='__main__': main()
输出:
apid [11148] bpid [11148] bfinished afinished total: 3.0107333660125732
系统启动一个新线程的成本是比较高的,因为它涉及与操作系统的交互。在这种情形下,使用线程池可以很好地提升性能,尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用线程池。
线程池在系统启动时即创建大量空闲的线程,程序只要将一个函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当该函数执行结束后,该线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个函数。
此外,使用线程池可以有效地控制系统中并发线程的数量。当系统中包含有大量的并发线程时,会导致系统性能急剧下降,甚至导致 Python 解释器崩溃,而线程池的最大线程数参数可以控制系统中并发线程的数量不超过此数。
多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。此时,我们需要对它的访问加上保护,否则资源会处于“混乱”的状态。例:100个线程向同一个银行账户转账(转入1元钱)的场景:
from threading import Thread, Lock from time import time, sleep class Account(object): def __init__(self): self._balance = 0 # 余额 self.lock = Lock() # 创建互斥锁 def deposit(self, money): # 存款 v. self.lock.acquire() # 获取互斥锁 try: sleep(0.01) # 0.01, total: 1.59s; 0.1, total: 11.18s self._balance = self._balance + money finally: self.lock.release() # 释放锁; 不管发生了什么 @property def balance(self): return self._balance class AddMoneyThread(Thread): def __init__(self, account, money): super().__init__() self.account = account # Account 实例对象 self.money = money def run(self): self.account.deposit(self.money) def main(): start = time() account = Account() threads = [] for _ in range(100): t = AddMoneyThread(account, 1) threads.append(t) t.start() for t in threads: t.join() print('账户余额为: ¥%d元' % account.balance) # 100 end = time() print('total:', end-start) # 0.01*100, total: 1.59s; 0.1*100, total: 11.18s if __name__=='__main__': main()
获得锁的线程用完后一定要释放锁,否则那些苦苦等待锁的线程将永远等待下去,成为死线程。所以我们用try...finally来确保锁一定会被释放。
锁的好处就是确保了某段关键代码只能由一个线程从头到尾完整地执行,坏处当然也很多,首先是阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了。其次,由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁,导致多个线程全部挂起,既不能执行,也无法结束,只能靠操作系统强制终止。
当多线程操作同一公有资源时,如果涉及到修改该资源的操作,为了避免数据不同步可能导致的错误,需要使用互斥锁机制。
其实,除非必须将多线程使用的资源设置为公共资源(各线程之间没有如4中操作时),threading 模块还提供了一种避免数据不同步问题的方法,即 local() 函数。
from threading import Thread, local, current_thread from time import time, sleep local_school = local() # 1.创建全局ThreadLocal对象(主线程中的全局变量); 由于其返回值位于全局作用域,无论在程序什么位置,都可以随时调用,很方便 def process_student(): std = local_school.student # 3.获取当前线程关联的student sleep(0.5) print('Hello, %s (in %s)' % (std, current_thread().name)) # threading.current_thread(): 当前线程 def process_thread(name): local_school.student = name # 2.绑定 ThreadLocal的student process_student() # 不需要传递 res了,可以通过全局变量local访问 start = time() t1 = Thread(target= process_thread, args=('Alice',), name='Thread-A') # name 参数:线程名 t2 = Thread(target= process_thread, args=('Bob',), name='Thread-B') t1.start() t2.start() t1.join() t2.join() end = time() print('total:', end-start)
输出:
Hello, Bob (in Thread-B) Hello, Alice (in Thread-A) total: 0.5076196193695068
首先,在全局作用域范围内,调用 local() 函数会生成一个 ThreadLocal
对象,local_school 变量即为公共资源,它可以在程序的任意线程中被调用。
随后,我们创建了 2 个子线程 t1 和 t2,它们都负责执行 process_thread() 函数,该函数中,我们在全局 local 变量的基础上,定义了一个 student 变量,该变量即为线程局部变量,即哪个线程调用该函数,都会将 student 变量拷贝一份并存储在自己的存储空间中。因此,t1 和 t2 线程会各自拥有一份 student 变量的副本.