本文来自对 蚂蚁学Python:Python 并发编程实战 的学习
视频地址:https://www.bilibili.com/video/BV1bK411A7tV
视频配套代码:https://github.com/peiss/ant-learn-python-concurrent
常见程序提速方法
并行、串行、同步、异步
并发:concurrency
并行:parallelism
同步:synchronous
异步:asynchronous
并发是个宽泛的概念,单纯代表计算机能够同时执行多项任务
计算机做到并发,可以使用不同的形式。
比如对于单核处理器,使用分配时间片的方式。这个过程也称为 进程/线程的上下文切换。
对于多核处理器,可以在不同的核心上真正并行的执行任务。这种称为 并行。
同步只能任务A执行完成后,才执行任务B
所以同步中没有并发或者并行的概念
异步 是不同任务同时执行,不会相互等待;
典型的实现异步的方式是多线程编程。
适合(单线程)异步编程的场景:IO 密集的应用;比如 网络请求、数据库操作
适合 多线程编程的场景:计算密集的应用,如视频图像处理、科学计算;让CPU 发挥最大的功效
多协程:Coroutine
多线程:threading
利用CPU和IO可以同时执行的原理,让CPU不会干巴巴等待IO完成
多进程:multiprocessing
利用多核CPU的能力,真正的并行执行任务
异步IO:asyncio
在单线程利用CPU和IO同时执行的原理,实现函数异步执行
CPU密集型(CPU-bound):
CPU密集型也叫计算密集型,是指I/O在很短的时间就可以完成,CPU需要大量的计算和处理,特点是CPU占用率相当高
例如:压缩解压缩、加密解密、正则表达式搜索
IO密集型(I/O bound):
IO密集型指的是系统运作大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作,CPU占用率仍然较低。
例如:文件处理程序、网络爬虫程序、读写数据库程序
一个进程中可以启动N个线程
一个线程中可以启动N个协程
多进程 Process (multiprocessing)
优点:可以利用多核CPU并行运算
缺点:占用资源最多、可启动数目比线程少
适用于:CPU密集型计算
多线程 Thread (threading)
优点:相比进程,更轻量级、占用资源少
缺点:
适用于:IO密集型计算、同时运行的任务数目要求不多
多协程 Coroutine (asyncio)
优点:内存开销最少、启动协程数量最多
缺点:支持的库有限制(aiohttp vs requests)、代码实现复杂
适用于:IO密集型计算、需要超多任务运行、但有现成库支持的场景
GIL 是什么?
全局解释器锁(英语:Global Interpreter Lock,缩写GIL)
是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行。
即便在多核心处理器上,使用 GIL 的解释器也只允许同一时间执行一个线程。
由于GIL的存在
即使电脑有多核CPU,单个时刻也只能使用1个,相比并发加速的C++/JAVA所以慢
简而言之:Python设计初期,为了规避并发问题引入了GIL,现在想去除却去不掉了!
一开始是为了解决多线程之间数据完整性和状态同步问题,好处是 简化了Python对共享资源的管理。
原因:
Python中对象的管理,是使用引用计数器进行的,引用数为0则释放对象
开始:线程A和线程B都引用了对象obj,obj.ref_num = 2,线程A和B都想撤销对obj的引用
1、多线程 threading 机制依然是有用的,用于IO密集型计算
因为在 I/O (read,write,send,recv,etc.)期间,线程会释放GIL,实现CPU和IO的并行
因此多线程用于IO密集型计算依然可以大幅提升速度
但是多线程用于CPU密集型计算时,只会更加拖慢速度
2、使用multiprocessing 的多进程机制实现并行计算、利用多核CPU优势
为了应对GIL的问题,Python提供了multiprocessing
# 1、准备一个函数 def my_func(a, b): do_craw(a,b) # 2、怎样创建一个线程 import threading t = threading.Thread(target=my_func, args=(100, 200) # 3、启动线程 t.start() # 4、等待结束 t.join()
复杂的事情一般都不会一下子做完,而是会分很多中间步骤一步步完成
queue.Queue可以用于多线程之间的、线程安全的数据通信
# 1、导入类库 import queue # 2、创建Queue q = queue.Queue() # 3、添加元素 q.put(item) # 4、获取元素 item = q.get() # 5、查询状态 # 查看元素的多少 q.qsize() # 判断是否为空 q.empty() # 判断是否已满 q.full()
线程安全指某个函数、函数库在多线程环境中被调用时,能够正确地处理多个线程之间的共享变量,使程序功能正确完成。
由于线程的执行随时会发生切换,就造成了不可预料的结果,出现线程不安全
def draw(account, amount): if account.balance >= amount: account.balance -= amount
用法1:try-finally模式
import threading lock = threading.Lock() lock.acquire() try: # do something finally: lock.release()
用法2:用法2:with 模式
import threading lock = threading.Lock() with lock: # do something
新建线程系统需要分配资源、终止线程系统需要回收资源
如果可以重用线程,则可以减去新建/终止的开销
使用线程池的好处:
1、提升性能:因为减去了大量新建、终止线程的开销,重用了线程资源;
2、适用场景:适合处理突发性大量请求或需要大量线程完成任务、但实际任务处理时间较短
3、防御功能:能有效避免系统因为创建线程过多,而导致系统负荷过大相应变慢等问题
4、代码优势:使用线程池的语法比自己新建线程执行线程更加简洁
3、ThreadPoolExecutor的使用语法
from concurrent.futures import ThreadPoolExecutor, as_completed # 用法1:map函数,很简单 # 注意map的结果和入参是顺序对应的 with ThreadPoolExecutor() as pool: results = pool.map(craw, urls) for result in results: print(result) # 用法2:future模式,更强大 # 注意如果用as_completed顺序是不定的 with ThreadPoolExecutor() as pool: futures = [ pool.submit(craw, url) for url in urls ] for future in futures: print(future.result()) for future in as_completed(futures): print(future.result())
1、Web服务的架构以及特点
Web后台服务的特点:
1、Web服务对响应时间要求非常高,比如要求200MS返回
2、Web服务有大量的依赖IO操作的调用,比如磁盘文件、数据库、远程API
3、Web服务经常需要处理几万人、几百万人的同时请求
使用线程池ThreadPoolExecutor的好处:
1、方便的将磁盘文件、数据库、远程API的IO调用并发执行
2、线程池的线程数目不会无限创建(导致系统挂掉),具有防御功能
1、有了多线程threading,为什么还要用多进程multiprocessing
如果遇到了CPU密集型计算,多线程反而会降低执行速度
虽然有全局解释器锁GIL
但是因为有IO的存在
多线程依然可以加速运行
CPU密集型计算
线程的自动切换反而变成了负担
多线程甚至减慢了运行速度
2、多进程multiprocessing知识梳理
语法条目 | 多线程 | 多进程 |
---|---|---|
引入模块 | from threading import Thread | from multiprocessing import Process |
新建 启动 等待结束 | t=Thread(target=func, args=(100, )) t.start() t.join() | p = Process(target=f, args=(‘bob’,)) p.start() p.join() |
数据通信 | import queue q = queue.Queue() q.put(item) item = q.get() | from multiprocessing import Queue q = Queue() q.put([42, None, ‘hello’]) item = q.get() |
线程安全加锁 | from threading import Lock lock = Lock() with lock: # do something | from multiprocessing import Lock lock = Lock() with lock: # do something |
池化技术 | from concurrent.futures import ThreadPoolExecutor with ThreadPoolExecutor() as executor: # 方法1 results = executor.map(func, [1,2,3]) # 方法2 future = executor.submit(func, 1) result = future.result() | from concurrent.futures import ProcessPoolExecutor with ProcessPoolExecutor() as executor: # 方法1 results = executor.map(func, [1,2,3]) # 方法2 future = executor.submit(func, 1) result = future.result() |
3、代码实战:单线程、多线程、多进程对比CPU密集计算速度
CPU密集型计算:100次“判断大数字是否是素数”的计算
由于GIL的存在,多线程比单线程计算的还慢,而多进程可以明显加快执行速度
单线程爬虫的执行路径
协程:在单线程内实现并发
核心原理:用一个超级循环(其实就是while true)循环
核心原理:配合IO多路复用原理(IO时CPU可以干其他事情)
import asyncio # 获取事件循环 loop = asyncio.get_event_loop() # 定义协程 async def myfunc(url): await get_url(url) # 创建task列表 tasks = [loop.create_task(myfunc(url)) for url in urls] # 执行爬虫事件列表 loop.run_until_complete(asyncio.wait(tasks))
注意:
要用在异步IO编程中
依赖的库必须支持异步IO特性
爬虫引用中:
requests 不支持异步
需要用 aiohttp
subprocess 模块:
允许你生成新的进程
连接它们的输入、输出、错误管道
并且获取它们的返回码
几个应用场景:
每天定时8:00自动打开酷狗音乐播放歌曲
调用7z.exe自动解压缩.7z文件
通过Python远程提交一个torrent种子文件,用电脑启动下载
# 用默认的应用程序打开歌曲文件 # 注:windows下是start、mac下是open、Linux是see # windows 环境需要加 shell = True proc = subprocess.Popen(['start', '余生一个浪.mp3'], shell=True) proc.communicate() # 用7z.exe解压7z压缩文件 proc = subprocess.Popen([r"C:\Program Files\7-Zip\7z.exe", "x", "./datas/7z_test.7z", "-o./datas/extract_7z_test", "-aoa"], shell=True) proc.communicate()
英语:Semaphore
信号量(英语:Semaphore)又称为信号量、旗语
是一个同步对象,用于保持在0至指定最大值之间的一个计数值。
当线程完成一次对该semaphore对象的等待(wait)时,该计数值减一;
当线程完成一次对semaphore对象的释放(release)时,计数值加一。
当计数值为0,则线程等待该semaphore对象不再能成功直至该semaphore对象变成signaled状态
semaphore对象的计数值大于0,为signaled状态;计数值等于0,为nonsignaled状态.
使用方式1:
sem = asyncio.Semaphore(10) # ... later async with sem: # work with shared resource
使用方式2:
sem = asyncio.Semaphore(10) # ... later await sem.acquire() try: # work with shared resource finally: sem.release()
import asyncio # 获取事件循环 loop = asyncio.get_event_loop() # 定义协程 async def myfunc(url): await get_url(url) # 创建task列表 tasks = [loop.create_task(myfunc(url)) for url in urls] # 执行爬虫事件列表 loop.run_until_complete(asyncio.wait(tasks))
注意:
asyncio 很多库都不支持
比如不支持requests
需要用aiohttp
安装:pip install gevent
Gevent是一个基于微线程库Greenlet的并发框架
原理:
提供猴子补丁MonkeyPatch方法,通过该方法gevent能够 修改标准库里面大部分的阻塞式系统调用,包括socket、ssl、threading和 select等模块,而变为协作式运行
import gevent.monkey gevent.monkey.patch_all() import gevent import urllib2 import simplejson as json def fetch(pid): response = urllib2.urlopen('http://json-time.appspot.com/time.json') result = response.read() json_result = json.loads(result) datetime = json_result['datetime'] print('Process %s: %s' % (pid, datetime)) return json_result['datetime'] def asynchronous(): threads = [] for i in range(1, 10): threads.append(gevent.spawn(fetch, i)) gevent.joinall(threads) asynchronous()
Gevent:
优点:只需要monkey.patch_all(),就能自动修改阻塞为非阻塞,简单强大
缺点:不知道它具体patch了哪些库修改了哪些模块、类、函数。 创造了“隐式的副作用”,如果出现问题很多时候极难调试
Asyncio:
优点:明确使用asyncio、await等关键字编程,直观易读
缺点:只支持很少的异步库,比如aiohttp
2022-02-13(日)