昨日的主要学习内容是线程。并且线程的很多操作与进程类似。这其实是作者有意为之。目的就是为了我们方便学习。在昨日的学习中,我们体验了很多线程中的内置方法。最后还学习了GIL全局解释器锁。虽然GIL是Cpython解释器的特点,不是python语言的特点。但是Cpython解释器是最常用的,不刻意指定几乎默认都是它。所以我们需要了解。而今天就对并发编程的学习进行收尾了。
1.线程互斥锁是Python代码层面的锁,解决Python程序中多线程共享资源的问题(线程数据共共享,当各个线程访问数据资源时会出现竞争状态,造成数据混乱)。
GIL是Python解释层面的锁,解决解释器中多个线程的竞争资源问题(多个子线程在系统资源竞争是,都在等待对象某个部分资源解除占用状态,结果谁也不愿意先解锁,然后互相等着,程序无法执行下去)。
通过代码验证:
# 先验证GIL的存在 from threading import Thread, Lock import time money = 100 def task(): global money money -= 1 for i in range(100): # 创建一百个线程 t = Thread(target=task) t.start() print(money) # 0 # 验证不同数据加不同锁 from threading import Thread, Lock import time money = 100 mutex = Lock() def task(): global money mutex.acquire() tmp = money time.sleep(0.1) money = tmp - 1 mutex.release() t_list = [] for i in range(100): # 创建一百个线程 t = Thread(target=task) t.start() t_list.append(t) for t in t_list: t.join() # 为了确保结构正确 应该等待所有的线程运行完毕再打印money print(money) # 0
线程在程序中是独立的、并发的执行流。与分隔的进程相比,进程中线程之间的隔离程度要小,它们共享内存、文件句柄和其他进程应有的状态。
因为线程的划分尺度小于进程,使得多线程程序的并发性高。进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率。
线程比进程具有更高的性能,这是由于同一个进程中的线程都有共性多个线程共享同一个进程的虚拟空间。线程共享的环境包括进程代码段、进程的公有数据等,利用这些共享的数据,线程之间很容易实现通信。
操作系统在创建进程时,必须为该进程分配独立的内存空间,并分配大量的相关资源,但创建线程则简单得多。因此,使用多线程来实现并发比使用多进程的性能要高得多。
在不同CPU数量下有以下体现:
# 单个CPU 多个IO密集型任务 多进程:浪费资源 无法利用多个CPU 多线程:节省资源 切换+保存状态 多个计算密集型任务 多进程:耗时更长 创建进程的消耗+切换消耗 多线程:耗时较短 切换消耗 # 多个CPU 多个IO密集型任务 多进程:浪费资源 多个CPU无用武之地 多线程:节省资源 切换+保存状态 多个计算密集型任务 多进程:利用多核 速度更快 多线程:速度较慢
总的来说,多进程和多线程都有具体的应用场景 尤其是多线程并不是没有用。
代码实例:
# 计算机密集型 from threading import Thread from multiprocessing import Process import os import time def work(): res = 1 for i in range(1, 100000): res *= i if __name__ == '__main__': print(os.cpu_count()) # 8 查看当前计算机CPU个数 start_time = time.time() p_list = [] for i in range(12): p = Process(target=work) p.start() p_list.append(p) for p in p_list: p.join() t_list = [] for i in range(12): t = Thread(target=work) t.start() t_list.append(t) for t in t_list: t.join() print('总耗时:%s' % (time.time() - start_time)) # 打印结果 ''' # 多进程 总耗时:8.479638814926147 # 多线程 总耗时:27.77951455116272 ''' # IO密集型 from threading import Thread from multiprocessing import Process import os import time def work(): time.sleep(2) # 模拟纯IO操作 if __name__ == '__main__': start_time = time.time() t_list = [] for i in range(100): t = Thread(target=work) t.start() for t in t_list: t.join() p_list = [] for i in range(100): p = Process(target=work) p.start() for p in p_list: p.join() print('总耗时:%s' % (time.time() - start_time)) # 打印结果 ''' # 多进程 总耗时:0.45645594596862793 # 多线程 总耗时:0.011969804763793945 '''
通过总耗时时间就能明白,在计算机密集型时,多进程更好。在IO密集型时,多线程更好。
死锁产生的原因:比如有AB(越多的锁越容易死锁,这里直说两个锁的情况)两个锁。当A锁内部需要使用B锁,同时B锁内部需要使用A锁的时候,就要可能会出现死锁。下面模拟一下死锁现象:
from threading import Thread, Lock import time # 产生AB两把锁 mutexA = Lock() mutexB = Lock() class MyThread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print(f'{self.name}抢到了A锁') mutexB.acquire() print(f'{self.name}抢到了B锁') mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print(f'{self.name}抢到了B锁') time.sleep(2) mutexA.acquire() print(f'{self.name}抢到了A锁') mutexA.release() mutexB.release() for i in range(20): t = MyThread() t.start() # 打印结果 ''' Thread-1抢到了A锁 Thread-1抢到了B锁 Thread-1抢到了B锁 Thread-2抢到了A锁 # 死锁了,程序无法自己结束。就运行到这儿就没了 '''
信号量在不同的知识体系中,展示出来的功能是不一样的。在并发编程中信号量意思是多把互斥锁。在django框架中信号量意思是达到某个条件自动触发特定功能。
信号量与互斥锁的关系:
互斥锁:将自定义互斥锁比喻成是单个厕所(一个坑位) 信号量:信号量相当于是公共厕所(多个坑位)
实例:
from threading import Thread, Semaphore import time import random sp = Semaphore(5) # 创建一个有五个坑位(带门的)的公共厕所 def task(name): sp.acquire() # 抢锁 print('%s正在蹲坑' % name) time.sleep(random.randint(1, 5)) sp.release() # 放锁 for i in range(1, 6): t = Thread(target=task, args=('伞兵%s号' % i, )) t.start() # 打印结果 ''' 伞兵1号正在蹲坑 伞兵2号正在蹲坑 伞兵3号正在蹲坑 伞兵4号正在蹲坑 伞兵5号正在蹲坑 '''
子线程的运行可以由其他子线程决定。代码实例:
from threading import Thread, Event import time event = Event() # 类似于造了一个红绿灯 def light(): print('红灯亮着的 所有人都不能动') time.sleep(3) print('绿灯亮了 油门踩到底 给我冲!!!') event.set() def car(name): print('%s正在等红灯' % name) event.wait() print('%s加油门 飙车了' % name) t = Thread(target=light) t.start() for i in range(5): t = Thread(target=car, args=('熊猫PRO%s' % i,)) t.start() # 打印结果 ''' 红灯亮着的 所有人都不能动 熊猫PRO0正在等红灯 熊猫PRO1正在等红灯 熊猫PRO2正在等红灯 熊猫PRO3正在等红灯 熊猫PRO4正在等红灯 绿灯亮了 油门踩到底 给我冲!!! 熊猫PRO4加油门 飙车了 熊猫PRO1加油门 飙车了 熊猫PRO2加油门 飙车了 熊猫PRO0加油门 飙车了 熊猫PRO3加油门 飙车了 Process finished with exit code 0 '''
这种效果其实通过队列也能实现。但是没有event简单。
一个线程的运行时间可以分为3部分:线程的启动时间、线程体的运行时间和线程的销毁时间。在多线程处理的情景中,如果线程不能被重用,就意味着每次创建都需要经过启动、销毁和运行3个过程。这必然会增加系统相应的时间,降低了效率。
面对这种情况,线程池就派上用场了。其实线程池就是生产者消费者模式的最佳实践,当线程池初始化时,会自动创建指定数量的线程,有任务到达时直接从线程池中取一个空闲线程来用即可,当任务执行结束时线程不会消亡而是直接进入空闲状态,继续等待下一个任务。而随着任务的增加线程池中的可用线程必将逐渐减少,当减少至零时,任务就需要等待了。
代码实例:
from concurrent.futures import ThreadPoolExecutor import time # 参数times用来模拟网络请求的时间 def get_html(times): time.sleep(times) print("get page {}s finished".format(times)) return times executor = ThreadPoolExecutor(max_workers=2) # 通过submit函数提交执行的函数到线程池中,submit函数立即返回,不阻塞 task1 = executor.submit(get_html, (3)) task2 = executor.submit(get_html, (2)) # done方法用于判定某个任务是否完成 print(task1.done()) # cancel方法用于取消某个任务,该任务没有放入线程池中才能取消成功 print(task2.cancel()) time.sleep(4) print(task1.done()) # result方法可以获取task的执行结果 print(task1.result()) # 执行结果 ''' False False get page 2s finished get page 3s finished True 3 Process finished with exit code 0 '''
简单描述上述代码:
1. ThreadPoolExecutor构造实例的时候,传入max_workers参数来设置线程池中最多能同时运行的线程数目。 2. 使用submit函数来提交线程需要执行的任务(函数名和参数)到线程池中,并返回该任务的句柄(类似于文件、画图),注意submit()不是阻塞的,而是立即返回。 3. 通过submit函数返回的任务句柄,能够使用done()方法判断该任务是否结束。上面的例子可以看出,由于任务有2s的延时,在task1提交后立刻判断,task1还未完成,而在延时4s之后判断,task1就完成了。 4. 使用cancel()方法可以取消提交的任务,如果任务已经在线程池中运行了,就取消不了。这个例子中,线程池的大小设置为2,任务已经在运行了,所以取消失败。如果改变线程池的大小为1,那么先提交的是task1,task2还在排队等候,这是时候就可以成功取消。 5. 使用result()方法可以获取任务的返回值。查看内部代码,发现这个方法是阻塞的。
当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态成生多个进程,但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到multiprocessing模块提供的Pool方法。
初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来执行。
代码实例:
from multiprocessing import freeze_support,Pool import time def Foo(i): time.sleep(2) print('___time---',time.ctime()) return i+100 def Bar(arg): print('----exec done:',arg,time.ctime()) if __name__ == '__main__': freeze_support() pool = Pool(3) #线程池中的同时执行的进程数为3 for i in range(4): pool.apply_async(func=Foo,args=(i,),callback=Bar) #线程池中的同时执行的进程数为3,当一个进程执行完毕后,如果还有新进程等待执行,则会将其添加进去 # pool.apply(func=Foo,args=(i,)) print('end') pool.close() pool.join()#调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束 # 打印结果 ''' end ___time--- Thu Apr 21 20:06:40 2022 ___time--- Thu Apr 21 20:06:40 2022 ___time--- Thu Apr 21 20:06:40 2022 ----exec done: 102 Thu Apr 21 20:06:40 2022 ----exec done: 100 Thu Apr 21 20:06:40 2022 ----exec done: 101 Thu Apr 21 20:06:40 2022 ___time--- Thu Apr 21 20:06:42 2022 ----exec done: 103 Thu Apr 21 20:06:42 2022 Process finished with exit code 0 '''
进程池和线程池其实降低了程序的运行效率,但是保证了硬件的安全。
协程 ,又称为微线程,它是实现多任务的另一种方式,只不过是比线程更小的执行单元。因为它自带CPU的上下文,这样只要在合适的时机,我们可以把一个协程切换到另一个协程。
通俗的理解:在一个线程中的某个函数中,我们可以在任何地方保存当前函数的一些临时变量等信息,然后切换到另外一个函数中执行,注意不是通过调用函数的方式做到的 ,并且切换的次数以及什么时候再切换到原来的函数都由开发者自己确定。
协程与线程的差异:
在实现多任务时, 线程切换__从系统层面__远不止保存和恢复CPU上下文这么简单。操作系统为了程序运行的高效性,每个线程都有自己缓存Cache等等数据,操作系统还会帮你做这些数据的恢复操作,所以线程的切换非常耗性能。但是__协程的切换只是单纯地操作CPU的上下文__,所以一秒钟切换个上百万次系统都抗的住。
基本使用:
# 在python中,yield(生成器)可以很容易的实现上述的功能,从一个函数切换到另外一个函数。但是无法做到检测IO切换 from gevent import monkey;monkey.patch_all() # 固定编写 用于检测所有的IO操作 from gevent import spawn import time def play(name): print('%s play 1' % name) time.sleep(5) print('%s play 2' % name) def eat(name): print('%s eat 1' % name) time.sleep(3) print('%s eat 2' % name) start_time = time.time() g1 = spawn(play, 'jason') g2 = spawn(eat, 'jason') g1.join() # 等待检测任务执行完毕 g2.join() # 等待检测任务执行完毕 print('总耗时:', time.time() - start_time) # 打印结果 ''' jason play 1 jason eat 1 jason eat 2 jason play 2 总耗时: 5.016383409500122 Process finished with exit code 0 '''
from gevent import monkey;monkey.patch_all() from gevent import spawn import socket def communication(sock): while True: data = sock.recv(1024) # IO操作 print(data.decode('utf8')) sock.send(data.upper()) def get_server(): server = socket.socket() server.bind(('127.0.0.1', 8080)) server.listen(5) while True: sock, addr = server.accept() # IO操作 spawn(communication, sock) g1 = spawn(get_server) g1.join()
python可以通过开设多进程,在多进程下开设多线程,在多线程使用协程。从而让程序执行的效率达到极致。但是实际业务中很少需要如此之高的效率(一直占着CPU不放),因为大部分程序都是IO密集型的。