[并发编程 - 多线程:信号量、死锁与递归锁、时间Event、定时器Timer、线程队列、GIL锁]
信号量
信号量Semaphore:管理一个内置的计数器
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):
from threading import Thread,Semaphore,current_thread
import time
import random
def func():
sm.acquire() # 运行func都会来抢这把锁
print('%s is wcing' %current_thread().getName())
time.sleep(random.randint(1,5)) # 模拟上厕所的时间
sm.release()
if name == 'main':
sm = Semaphore(5) # 创建信号量,自定义为5,相当于5把钥匙得到信号量对象
for i in range(23): # for循环了23次,开了23个线程
t = Thread(target=func)
t.start()
"""
ps:互斥锁只能acquire一次,再有人来执行acquire,如果没有释放,下一个来拿的人就只能阻在原地无法拿到acquire。而信号量一把锁可以acquire指定5次(Semaphore(5)),如果第6个来在
acquire的时候就没有了,相当于没有钥匙了,就只能在原地等着,只要5个人里面有人释放后面的人就
可以拿到钥匙
"""
与进程池是完全不同的概念,进程池Pool(4),最大只能产生4个进程,而且从头到尾都只是这四个进程,不会产生新的,而信号量是产生一堆线程/进程
死锁与递归锁
所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁
from threading import Thread,Lock
import time
mutexA = Lock()
mutexB = Lock()
class Mythread(Thread):
def init(self,name):
super().init()
self.name = name
def run(self) -> None: self.f1() self.f2() def f1(self): mutexA.acquire() print("%s 抢到了A锁" %self.name) mutexB.acquire() print("%s 抢到了B锁" %self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print("%s 抢到了B锁" % self.name) time.sleep(0.1) mutexA.acquire() print("%s 抢到了A锁" % self.name) mutexA.release() mutexB.release()
if name == 'main':
t1 = Mythread("线程1")
t2 = Mythread("线程2")
t3 = Mythread("线程3")
t4 = Mythread("线程4")
t1.start() t2.start() t3.start() t4.start() print("主线程")
解决方法,递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。
这个RLock内部维护着一个Lock和一个计数(counter)变量,计数记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:
from threading import Thread,RLock,Lock
import time
mutexA = mutexB = RLock()
class Mythread(Thread):
def init(self,name):
super().init()
self.name = name
def run(self) -> None: self.f1() self.f2() def f1(self): mutexA.acquire() print("%s 抢到了A锁" %self.name) mutexB.acquire() print("%s 抢到了B锁" %self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print("%s 抢到了B锁" % self.name) time.sleep(0.1) mutexA.acquire() print("%s 抢到了A锁" % self.name) mutexA.release() mutexB.release()
if name == 'main':
t1 = Mythread("线程1")
t2 = Mythread("线程2")
t3 = Mythread("线程3")
t4 = Mythread("线程4")
t1.start() t2.start() t3.start() t4.start() print("主线程")
事件Event
线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行
Event对象的方法
event.isSet():返回event的状态值;
event.wait():如果 event.isSet()==False将阻塞线程;
event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入
就绪状态, 等待操作系统 调度;
event.clear():恢复event的状态值为False。
Event对象的案例
from threading import Event,Thread,current_thread
import time
e = Event() # 全局变量 = False
def f1():
print('%s 运行' %current_thread().name)
time.sleep(3)
e.set() # 全局变量 = True
def f2():
e.wait() # 等到全局变量 变为 True
print('%s 运行' % current_thread().name)
if name == 'main':
t1 = Thread(target=f1)
t2 = Thread(target=f2)
t1.start() t2.start()
"""
运行结果:
Thread-1 运行
Thread-2 运行
运行过程:
线程t1运行起来之后打印‘Thread-1 运行’,接着睡了3秒,这个时间足够线程t2
运行起来,线程t2起来后运行f2,接着开始等,e.wait()等到全局变量变为 True,
但是默认全局变量是False,线程t2上来后就开始等了,等3秒多一点的时间,等的这
个时间线程t1将全局变量e.set()为True,这个时候线程t2立马感觉到了全局变量为
True接着继续运行起来了打印‘Thread-2 运行’
"""
from threading import Event,Thread,current_thread
import time
import random
e = Event() # 全局变量 = False
def task1(): # 任务1负责控制红绿灯
while True:
e.clear() # 全局变量 = False
print("红灯亮--->禁止通行")
time.sleep(2) # 红灯亮的时间
e.set() # 全局变量 = True print('绿灯亮--->行人通行') time.sleep(3) # 绿灯亮的时间
def task2(): # 任务2负责控制行人
while True:
if e.is_set(): # 如果ste过了代表路灯亮了
print('%s 走你' %current_thread().name)
break
else: # 如果没有set代表红灯亮了
print("%s 等灯" %current_thread().name)
e.wait() # 原地等,只要等灯亮了就等过去了,这个时间刚刚好
if name == 'main':
Thread(target=task1).start() # 开启红绿灯
while True: time.sleep(random.randint(1,5)) # 1-5秒产生一个行人 Thread(target=task2).start()
"""
运行结果:
红灯亮--->禁止通行
绿灯亮--->行人通行
Thread-2 走你
红灯亮--->禁止通行
绿灯亮--->行人通行
Thread-3 走你
红灯亮--->禁止通行
Thread-4 等灯
Thread-5 等灯
"""
定时器timer
定时器Timer类是Thread的派生类,用于在指定时间后调用一个方法。
构造方法:
Timer(interval, function, args=[], kwargs={})
interval: 指定的时间
function: 要执行的方法
args/kwargs: 方法的参数
实例方法:
指定n秒后执行某操作
from threading import Timer
def hello(n): # n秒后执行
print("hello, world",n)
t = Timer(3, hello,args=(1111,))
t.start() # 3秒后,将打印“hello, world 1111”
线程队列
queue队列 :使用import queue,用法与进程Queue一样
当信息必须在多个线程之间安全交换时,队列在线程编程中特别有用。
线程queue基本方法:
put 往线程队列里防止,超过队列长度,直接阻塞
get 从队列中取值,如果获取不到,直接阻塞
put_nowait: 如果放入的值超过队列长度,直接报错(linux)
get_nowait: 如果获取的值已经没有了,直接报错
线程queue的用法
import queue
q = queue.Queue(3) # 指定队列的大小
q.put(111) # 整型
q.put("aaa") # 字符串
q.put((1,2,3)) # 元组
print(q.get())
print(q.get())
print(q.get())
'''
111
aaa
(1, 2, 3)
'''
q = queue.LifoQueue(3)
q.put(111)
q.put("aaa")
q.put((1,2,3))
print(q.get())
print(q.get())
print(q.get())
'''
(1, 2, 3)
aaa
111
'''
q = queue.PriorityQueue(3)
q.put((10,111)) # 第一个值是优先级,第二值才是要放的元素
q.put((11,"aaa"))
q.put((-1,(1,2,3)))
print(q.get())
print(q.get())
print(q.get())
'''
(-1, (1, 2, 3)) # 数越小优先级越高
(10, 111)
(11, 'aaa')
'''
GIL全局解释器锁
GIL的全称是:Global Interpreter Lock,意思就是全局解释器锁
在CPython中,全局解释器锁(GIL)是一个互斥锁,用于防止多个本机线程同时执行Python字节码。这个锁是必需的,主要是因为CPython的内存管理不是线程安全的。(然而,自从GIL存在以来,其他特性已经发展到依赖于它所执行的保证。)这个GIL并不是python的特性,他是只在Cpython解释器里引入的一个概念,而在其他的语言编写的解释器里就没有这个GIL例如:Jython,Pypy
为什么会有GIL?
随着电脑多核cpu的出现核cpu频率的提升,为了充分利用多核处理器,进行多线程的编程方式更为普及,随之而来的困难是线程之间数据的一致性和状态同步,而python也利用了多核,所以也逃不开这个困难,为了解决这个数据不能同步的问题,设计了GIL全局解释器锁。
说到GIL解释器锁,我们容易想到在多线程中共享全局变量的时候会有线程对全局变量进行的资源竞争,会对全局变量的修改产生不是我们想要的结果,而那个时候我们用到的是python中线程模块里面的互斥锁,哪样的话每次对全局变量进行操作的时候,只有一个线程能够拿到这个全局变量
GIL既然是互斥锁,所有互斥锁的本质都一样,都是将并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全。
只要在一个进程里就一定有GIL锁的存在,GIL锁不能保证python数据的安全,它保证的是解释器级别(内存管理)的安全,也可以说是背后存在的一种机制。可以肯定的一点是:保护不同的数据的安全,就应该加不同的锁。
GIL保护的是解释器级的数据,保护用户自己的数据则需要自己加锁处理,如下图:
GIL与Lock代码演示:
from threading import Thread,Lock
import time
mutex = Lock()
n = 100
def task():
global n
mutex.acquire() temp = n time.sleep(0.1) n = temp - 1 mutex.release()
if name == 'main':
l = []
for i in range(100):
t = Thread(target=task)
l.append(t)
t.start()
for obj in l: obj.join() print(n) # 结果肯定为0,由原来的并发执行变成串行,牺牲了执行效率保证了数据安全
'''
分析:
'''
GIL与多线程
有了GIL的存在,同一时刻同一进程中只有一个线程被执行
产生质问:进程可以利用多核,但是开销大,而python的多线程开销小,但却无法利用多核优势,也就是说python没用了,php才是最牛逼的语言?
要解决这个问题,我们需要在几个点上达成一致:
cpu到底是用来做计算的,还是用来做I/O的?
多cpu,意味着可以有多个核并行完成计算,所以多核提升的是计算性能
每个cpu一旦遇到I/O阻塞,仍然需要等待,所以多核对I/O操作没什么用处
结论:
对计算来说,cpu越多越好,但是对于I/O来说,再多的cpu也没用
当然对运行一个程序来说,随着cpu的增多执行效率肯定会有所提高(不管提高幅度多大,总会有所提高),这是因为一个程序基本上不会是纯计算或者纯I/O,所以我们只能相对的去看一个程序到底是计算密集型还是I/O密集型,从而进一步分析python的多线程到底有无用武之地
分析:
我们有四个任务需要处理,处理方式肯定是要玩出并发的效果,解决方案可以是:
方案一:开启四个进程
方案二:一个进程下,开启四个线程
单核情况下,分析结果:
如果四个任务是计算密集型,没有多核来并行计算,方案一徒增了创建进程的开销,方案二胜
如果四个任务是I/O密集型,方案一创建进程的开销大,且进程的切换速度远不如线程,方案二胜
多核情况下,分析结果:
如果四个任务是计算密集型,多核意味着并行计算,在python中一个进程中同一时刻只有一个线
程执行用不上多核,方案一胜
如果四个任务是I/O密集型,再多的核也解决不了I/O问题,方案二胜
结论:现在的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多
大性能上的提升,甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。
多线程性能测试
计算密集型:多进程效率高
from multiprocessing import Process
from threading import Thread
import os,time
def work():
res=0
for i in range(100000000):
res*=i
time.sleep(5)
if name == 'main':
l=[]
# print(os.cpu_count()) #本机为4核
start=time.time()
for i in range(4):
# p=Process(target=work) #多进程:耗时20s多
p=Thread(target=work) # 多线程:耗时31s多
l.append(p)
p.start()
for p in l:
p.join()
stop=time.time()
print('run time is %s' %(stop-start))
I/O密集型:多线程效率高
from multiprocessing import Process
from threading import Thread
import threading
import os,time
def work():
time.sleep(2)
if name == 'main':
l=[]
# print(os.cpu_count()) #本机为4核
start=time.time()
for i in range(100):
# p=Process(target=work) # 耗时14s多,大部分时间耗费在创建进程上
p=Thread(target=work) # 耗时2s多
l.append(p)
p.start()
for p in l:
p.join()
stop=time.time()
print('run time is %s' %(stop-start))
应用:
多线程用于IO密集型,如socket,爬虫,web
多进程用于计算密集型,如金融分析