Python教程

Python 并发

本文主要是介绍Python 并发,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

文章目录

    • 关于并发
      • Python 对并发编程的支持
      • CPU密集型计算、IO密集型计算
      • Python 中的 多线程、多进程、多协程的对比
        • 怎样根据任务选择对应技术?
    • GIL
      • 怎样规避GIL带来的限制?
      • 多线程 代码实现
      • 多组件的Pipeline技术架构
    • 多线程数据通信的queue.Queue
    • 线程安全
      • Lock 用于解决线程安全问题
    • 线程池
      • 原理
    • Web 中的并发
    • 多进程multiprocessing
    • 异步IO库:asyncio
    • subprocess
      • 实例
    • 信号量
    • 异步库
      • Asyncio
        • Gevent
      • 异步库 asyncio 对比 gevent


本文来自对 蚂蚁学Python:Python 并发编程实战 的学习
视频地址:https://www.bilibili.com/video/BV1bK411A7tV
视频配套代码:https://github.com/peiss/ant-learn-python-concurrent


关于并发


常见程序提速方法

在这里插入图片描述


并行、串行、同步、异步

并发:concurrency
并行:parallelism
同步:synchronous
异步:asynchronous

并发是个宽泛的概念,单纯代表计算机能够同时执行多项任务
计算机做到并发,可以使用不同的形式。
比如对于单核处理器,使用分配时间片的方式。这个过程也称为 进程/线程的上下文切换。
对于多核处理器,可以在不同的核心上真正并行的执行任务。这种称为 并行。


同步只能任务A执行完成后,才执行任务B
所以同步中没有并发或者并行的概念


异步 是不同任务同时执行,不会相互等待;
典型的实现异步的方式是多线程编程。

适合(单线程)异步编程的场景:IO 密集的应用;比如 网络请求、数据库操作

适合 多线程编程的场景:计算密集的应用,如视频图像处理、科学计算;让CPU 发挥最大的功效


Python 对并发编程的支持

多协程:Coroutine


多线程:threading
利用CPU和IO可以同时执行的原理,让CPU不会干巴巴等待IO完成


多进程:multiprocessing
利用多核CPU的能力,真正的并行执行任务


异步IO:asyncio
在单线程利用CPU和IO同时执行的原理,实现函数异步执行


CPU密集型计算、IO密集型计算

CPU密集型(CPU-bound):

CPU密集型也叫计算密集型,是指I/O在很短的时间就可以完成,CPU需要大量的计算和处理,特点是CPU占用率相当高

例如:压缩解压缩、加密解密、正则表达式搜索


IO密集型(I/O bound):

IO密集型指的是系统运作大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作,CPU占用率仍然较低。

例如:文件处理程序、网络爬虫程序、读写数据库程序


Python 中的 多线程、多进程、多协程的对比

一个进程中可以启动N个线程
一个线程中可以启动N个协程


多进程 Process (multiprocessing)

优点:可以利用多核CPU并行运算

缺点:占用资源最多、可启动数目比线程少

适用于:CPU密集型计算


多线程 Thread (threading)

优点:相比进程,更轻量级、占用资源少

缺点:

  • 相比进程:多线程只能并发执行,不能利用多CPU(GIL)
  • 相比协程:启动数目有限制,占用内存资源,有线程切换开销

适用于:IO密集型计算、同时运行的任务数目要求不多


多协程 Coroutine (asyncio)
优点:内存开销最少、启动协程数量最多
缺点:支持的库有限制(aiohttp vs requests)、代码实现复杂
适用于:IO密集型计算、需要超多任务运行、但有现成库支持的场景


怎样根据任务选择对应技术?

在这里插入图片描述


GIL

GIL 是什么?

全局解释器锁(英语:Global Interpreter Lock,缩写GIL)

是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行。

即便在多核心处理器上,使用 GIL 的解释器也只允许同一时间执行一个线程。

在这里插入图片描述


由于GIL的存在

即使电脑有多核CPU,单个时刻也只能使用1个,相比并发加速的C++/JAVA所以慢


简而言之:Python设计初期,为了规避并发问题引入了GIL,现在想去除却去不掉了!
一开始是为了解决多线程之间数据完整性和状态同步问题,好处是 简化了Python对共享资源的管理。

原因:
Python中对象的管理,是使用引用计数器进行的,引用数为0则释放对象
开始:线程A和线程B都引用了对象obj,obj.ref_num = 2,线程A和B都想撤销对obj的引用

在这里插入图片描述


怎样规避GIL带来的限制?

1、多线程 threading 机制依然是有用的,用于IO密集型计算

因为在 I/O (read,write,send,recv,etc.)期间,线程会释放GIL,实现CPU和IO的并行
因此多线程用于IO密集型计算依然可以大幅提升速度

但是多线程用于CPU密集型计算时,只会更加拖慢速度


2、使用multiprocessing 的多进程机制实现并行计算、利用多核CPU优势

为了应对GIL的问题,Python提供了multiprocessing


多线程 代码实现

# 1、准备一个函数
def my_func(a, b):
   do_craw(a,b)

# 2、怎样创建一个线程
import threading
t = threading.Thread(target=my_func, args=(100, 200)

# 3、启动线程
t.start()

# 4、等待结束
t.join()

多组件的Pipeline技术架构

复杂的事情一般都不会一下子做完,而是会分很多中间步骤一步步完成
在这里插入图片描述


多线程数据通信的queue.Queue

queue.Queue可以用于多线程之间的、线程安全的数据通信

# 1、导入类库
import queue

# 2、创建Queue
q = queue.Queue()

# 3、添加元素
q.put(item)

# 4、获取元素
item = q.get()

# 5、查询状态

# 查看元素的多少
q.qsize()
# 判断是否为空
q.empty()
# 判断是否已满
q.full()


线程安全

线程安全指某个函数、函数库在多线程环境中被调用时,能够正确地处理多个线程之间的共享变量,使程序功能正确完成。
由于线程的执行随时会发生切换,就造成了不可预料的结果,出现线程不安全

def draw(account, amount):
    if account.balance >= amount:
        account.balance -= amount

在这里插入图片描述


Lock 用于解决线程安全问题

用法1:try-finally模式

import threading

lock = threading.Lock()

lock.acquire()
try:
    # do something
finally:
    lock.release()

用法2:用法2:with 模式

import threading

lock = threading.Lock()

with lock:
    # do something

线程池

原理

新建线程系统需要分配资源、终止线程系统需要回收资源
如果可以重用线程,则可以减去新建/终止的开销

在这里插入图片描述


在这里插入图片描述


使用线程池的好处:

1、提升性能:因为减去了大量新建、终止线程的开销,重用了线程资源;

2、适用场景:适合处理突发性大量请求或需要大量线程完成任务、但实际任务处理时间较短

3、防御功能:能有效避免系统因为创建线程过多,而导致系统负荷过大相应变慢等问题

4、代码优势:使用线程池的语法比自己新建线程执行线程更加简洁


3、ThreadPoolExecutor的使用语法

from concurrent.futures import ThreadPoolExecutor, as_completed


# 用法1:map函数,很简单
# 注意map的结果和入参是顺序对应的
with ThreadPoolExecutor() as pool:

    results = pool.map(craw, urls)

    for result in results:
        print(result)


# 用法2:future模式,更强大
# 注意如果用as_completed顺序是不定的
with ThreadPoolExecutor() as pool:
    
   futures = [ pool.submit(craw, url)
                     for url in urls ]

    for future in futures:
        print(future.result())
    for future in as_completed(futures):
        print(future.result())

Web 中的并发

1、Web服务的架构以及特点

在这里插入图片描述


Web后台服务的特点:

1、Web服务对响应时间要求非常高,比如要求200MS返回
2、Web服务有大量的依赖IO操作的调用,比如磁盘文件、数据库、远程API
3、Web服务经常需要处理几万人、几百万人的同时请求


使用线程池ThreadPoolExecutor的好处:
1、方便的将磁盘文件、数据库、远程API的IO调用并发执行
2、线程池的线程数目不会无限创建(导致系统挂掉),具有防御功能


多进程multiprocessing


1、有了多线程threading,为什么还要用多进程multiprocessing
如果遇到了CPU密集型计算,多线程反而会降低执行速度

虽然有全局解释器锁GIL
但是因为有IO的存在
多线程依然可以加速运行
在这里插入图片描述


CPU密集型计算
线程的自动切换反而变成了负担
多线程甚至减慢了运行速度
在这里插入图片描述


2、多进程multiprocessing知识梳理

语法条目多线程多进程
引入模块from threading import Threadfrom multiprocessing import Process
新建 启动 等待结束t=Thread(target=func, args=(100, ))
t.start()
t.join()
p = Process(target=f, args=(‘bob’,))
p.start()
p.join()
数据通信import queue
q = queue.Queue()
q.put(item)
item = q.get()
from multiprocessing import Queue
q = Queue()
q.put([42, None, ‘hello’])
item = q.get()
线程安全加锁from threading import Lock lock = Lock()
with lock:
# do something
from multiprocessing
import Lock lock = Lock()
with lock:
# do something
池化技术from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
# 方法1
results = executor.map(func, [1,2,3])
# 方法2
future = executor.submit(func, 1)
result = future.result()
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
# 方法1
results = executor.map(func, [1,2,3])
# 方法2
future = executor.submit(func, 1)
result = future.result()

3、代码实战:单线程、多线程、多进程对比CPU密集计算速度

CPU密集型计算:100次“判断大数字是否是素数”的计算

在这里插入图片描述
由于GIL的存在,多线程比单线程计算的还慢,而多进程可以明显加快执行速度


单线程爬虫的执行路径

在这里插入图片描述


协程:在单线程内实现并发

核心原理:用一个超级循环(其实就是while true)循环
核心原理:配合IO多路复用原理(IO时CPU可以干其他事情)

在这里插入图片描述


异步IO库:asyncio

import asyncio

# 获取事件循环
loop = asyncio.get_event_loop()

# 定义协程
async def myfunc(url):
    await get_url(url)

# 创建task列表
tasks = [loop.create_task(myfunc(url)) for url in urls]

# 执行爬虫事件列表
loop.run_until_complete(asyncio.wait(tasks))

注意:

要用在异步IO编程中
依赖的库必须支持异步IO特性

爬虫引用中:
requests 不支持异步
需要用 aiohttp


subprocess

subprocess 模块:
允许你生成新的进程
连接它们的输入、输出、错误管道
并且获取它们的返回码


几个应用场景:
每天定时8:00自动打开酷狗音乐播放歌曲
调用7z.exe自动解压缩.7z文件
通过Python远程提交一个torrent种子文件,用电脑启动下载


实例

# 用默认的应用程序打开歌曲文件
# 注:windows下是start、mac下是open、Linux是see
# windows 环境需要加 shell = True
proc = subprocess.Popen(['start', '余生一个浪.mp3'], shell=True)
proc.communicate()


# 用7z.exe解压7z压缩文件
proc = subprocess.Popen([r"C:\Program Files\7-Zip\7z.exe",
                         "x", "./datas/7z_test.7z", "-o./datas/extract_7z_test", "-aoa"], shell=True)
proc.communicate()

信号量

英语:Semaphore

信号量(英语:Semaphore)又称为信号量、旗语
是一个同步对象,用于保持在0至指定最大值之间的一个计数值。
当线程完成一次对该semaphore对象的等待(wait)时,该计数值减一;
当线程完成一次对semaphore对象的释放(release)时,计数值加一。
当计数值为0,则线程等待该semaphore对象不再能成功直至该semaphore对象变成signaled状态
semaphore对象的计数值大于0,为signaled状态;计数值等于0,为nonsignaled状态.


使用方式1:

sem = asyncio.Semaphore(10)

# ... later
async with sem:
    # work with shared resource	

使用方式2:

sem = asyncio.Semaphore(10)

# ... later
await sem.acquire()
try:
    # work with shared resource
finally:
    sem.release()

异步库

Asyncio

import asyncio

# 获取事件循环
loop = asyncio.get_event_loop()

# 定义协程
async def myfunc(url):
    await get_url(url)

# 创建task列表
tasks = [loop.create_task(myfunc(url)) for url in urls]

# 执行爬虫事件列表
loop.run_until_complete(asyncio.wait(tasks))

注意:

asyncio 很多库都不支持

比如不支持requests
需要用aiohttp


Gevent

安装:pip install gevent
Gevent是一个基于微线程库Greenlet的并发框架

原理:
提供猴子补丁MonkeyPatch方法,通过该方法gevent能够 修改标准库里面大部分的阻塞式系统调用,包括socket、ssl、threading和 select等模块,而变为协作式运行

import gevent.monkey

gevent.monkey.patch_all()

import gevent
import urllib2
import simplejson as json

def fetch(pid):
  response = urllib2.urlopen('http://json-time.appspot.com/time.json')
  result = response.read()
  json_result = json.loads(result)
  datetime = json_result['datetime']

  print('Process %s: %s' % (pid, datetime))
  return json_result['datetime']

def asynchronous():
  threads = []
  for i in range(1, 10):
    threads.append(gevent.spawn(fetch, i))
  gevent.joinall(threads)

asynchronous()

异步库 asyncio 对比 gevent

Gevent:
优点:只需要monkey.patch_all(),就能自动修改阻塞为非阻塞,简单强大
缺点:不知道它具体patch了哪些库修改了哪些模块、类、函数。 创造了“隐式的副作用”,如果出现问题很多时候极难调试

Asyncio:
优点:明确使用asyncio、await等关键字编程,直观易读
缺点:只支持很少的异步库,比如aiohttp


2022-02-13(日)

这篇关于Python 并发的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!