Python中最常用的同步有:锁/互斥,以及信号量。其中锁是最简单最低级的机制,信号量用于多线程竞争有限资源的情况。但是锁被释放,线程不一定被释放。
threading.Lock同步锁(原语锁)
通常使用获得锁(加锁)和释放锁(解锁)函数来控制锁的两种状态,在Python中,只需要在公共操作中加上加锁和解锁的操作即可。
如“加锁”和“解锁“操作
通过lock.acquire()获得锁后,线程将一直执行,直到该线程lock.release()的锁被释放,线程才有可能被释放(注意:锁被释放,线程不一定被释放)。
# 创建一个锁对象 lock = threading.Lock() def func(): # 全局变量 global num # 获得锁、加锁 # lock.acquire() num1 = num time.sleep(0.1) num = num1 - 1 # 释放锁、解锁 # lock.release() time.sleep(2) num = 100 l = [] # 开启100个线程 for i in range(100): t = threading.Thread(target=func, args=()) t.start() l.append(t) # 等待线程运行结束 for i in l: i.join() print(f'num={num}')
在上面的示例中,先将lock.acquire()和lock.release()代码注释掉,表示不使用锁,取消lock.acquire()和lock.release()代码注释表示使用加锁和解锁。在代码中增加了time.sleep(0.1),表示在不使用锁时,线程在这里将被释放出来,让给下一个线程执行,而num的值还未被修改,所以后面线程的num1取值都为100。
代码运行结果为:
加锁和解锁时:num=0
不加锁和解锁时:num = 99
Python中,Lock锁与GIL锁的区别
Lock锁的目的是为了保护共享数据,同一时刻只能有一个线程修改共享变量数据,而保护不同的数据需要使用不同的锁。而GIL(全局解释器锁)用于限制一个进程中同一时刻只有一个线程被CPU调度。GIL的级别比Lock高,是解释器级别。
死锁
多线程中最怕遇到的就是死锁,当两个或两个以上的线程在执行时,因争夺资源被相互锁住而互相等待。
示例:互锁造成的死锁
# 生成锁对象 lock1 = threading.Lock() lock2 = threading.Lock() class MyThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self) -> None: self.funA() self.funB() def funA(self): lock1.acquire() print(f"A_1加锁", end='\t') lock2.acquire() print(f'A_2加锁', end='\t') time.sleep(0.2) lock2.release() print(f'A_2释放', end='\t') lock1.release() print(f'A_1释放') def funB(self): lock2.acquire() print(f'B_1加锁', end='\t') lock1.acquire() print(f'B_2加锁', end='\t') time.sleep(0.1) lock1.release() print(f'B_2释放', end='\t') lock2.release() print(f'B_1释放') if __name__ == '__main__': t1 = MyThread() t2 = MyThread() t1.start() t2.start()
运行死锁的结果:
未死锁的结果:
如果两个锁同时被多个线程运行,可能会出现死锁的情况,也可能未出现死锁情况,这是潜在的BUG,也是较难找的BUG。
threading.RLock重入锁(递归锁)
为了支持同一线程多次请求同一资源,python提供了可重入锁(RLock),RLock内部维护着一个锁(Lock)和一个计数器(counter)变量,计数器(counter)记录了acquire的次数,从而使得资源可以被多次acquire,直到一个线程所有acquire都被release,计数器counter为0,其它线程才能获得资源。
例如:
```python Rlock = threading.RLock() class MyThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self) -> None: self.funA() self.funB() def funA(self): Rlock.acquire() print(f"A_1加锁", end='\t') Rlock.acquire() print(f'A_2加锁', end='\t') time.sleep(0.2) Rlock.release() print(f'A_2释放', end='\t') Rlock.release() print(f'A_1释放') def funB(self): Rlock.acquire() print(f'B_1加锁', end='\t') Rlock.acquire() print(f'B_2加锁', end='\t') time.sleep(0.1) Rlock.release() print(f'B_2释放', end='\t') Rlock.release() print(f'B_1释放') if __name__ == '__main__': t1 = MyThread() t2 = MyThread() t1.start() t2.start()
执行结果
当程序运行到time.sleep(0.2)时,也不会切换线程。使用重入锁时,计数器(counter)不为0(所有的acquire都还没有被释放掉),即使遇到长时间的IO操作也不会切换线程。
threading.Semaphore信号量
信号量是一个内部数据,它有一个内置计数器的计数器,它标明当前的共享资源有多少线程可以读取。
如,定义一个只能同时执行4个线程的信号量
# 创建信号量对象,信号量设置为4,需要有4个线程才启动,4个线程并发 semaphore = threading.Semaphore(4)
当线程需要读取关联信号的共享资源时,需要调用semaphore.acquire(),这时信号量的计数器为-1。
semaphore.acquire() # 获取信号量 -1
当线程不需要共享资源时,需要释放信号semaphore.release(),这时信号量的计数器(counter)会加1,在信号量等待队列中,排在最前面的线程会拿到共享资源的权限。
如,信号量为4的线程并行运行
# 创建信号量对象,信号量设置为4,需要有4个线程才启动 semaphore = threading.Semaphore(4) def funcSemaphore(): if semaphore.acquire(): # 获取信号量 -1 print(f'{threading.currentThread().getName()}获得信号量') time.sleep(random.randint(1, 10)) semaphore.release() # 释放信号量 +1 if __name__ == '__main__': for i in range(1, 8): t1 = threading.Thread(target=funcSemaphore) t1.start()
运行结果
开始时只有4个线程获得资源权限,后面当释放多少资源时,就有多少线程能够获得资源权限。
例如,运用信号量进行线程同步
# 同步两个不同的线程,信号量被初始化为0 semaphore0 = threading.Semaphore(0) def consumer(): print("---等待producer运行---") # 获取资源,信号量为0时被挂起,等待信号量释放 semaphore0.acquire() print("---consumer 结束---编号:%s" % item) def producer(): global item time.sleep(3) # 生成随机变量 item = random.randint(0, 100) print("procuder运行编号:%s" %item) semaphore0.release() if __name__ == '__main__': for i in range(0, 8): t1 = threading.Thread(target=producer) t2 = threading.Thread(target=consumer) t1.start() t2.start() t1.join() t2.join() print("程序终止!")
运行结果
信号量被初始化为0,目的是同步两个或多个线程。线程必须并行运行,所以需要信号量同步。这种运用场景有时会用到,理解起来较难。
信号量在多线程中运用较广,但它也有可能造成死锁的情况。如,有一个线程A1,先等待信号量S1,在等待信号量S2,而线程B1,先等待信号量S2,在等待信号量S1,这样就会发生死锁,导致A1等待S2,B1等待S1。
threading.Condition()条件变量
Condition条件变量通常与一个锁相关联。当需要在多个Condition条件中共享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则它将产生一个RLock实例。
条件变量锁实例定义
# 条件变量对象 conn = threading.Condition()
在Condition下的方法:
acquire() 获得锁(线程锁)。
release() 释放锁。
wait(timeout=None) 挂起线程timeout秒(为None时,时间无限),直到收到notify通知或者超时才会被唤醒继续运行,必须在Lock锁下运行。
notify(n=1) 通知挂起的线程开始运行,默认通知正在等待该Condition的线程,可同时唤醒n个,必须在Lock锁下运行。
notifyAll() 通知所有被挂起的线程开始运行,必须在Lock锁下运行。
示例,生产者与消费者,线程produce生产产品并通知消费者(consume线程)进行购买,线程consume消费完后通知生产者进行生产。
import threading import time # 商品 product = None # 条件变量对象 conn = threading.Condition() # 生产方法 def produce(): # 全局变量产品 global product if conn.acquire(): while True: print("---执行produce---") if product is None: product = '电脑' print(f"---生产产品:{product}---") # 通知消费者,商品已经生产 conn.notify() # 唤醒消费线程,通知挂起线程继续运行,默认通知正在等待该condition的线程,可同时唤醒n个,且必须在获得Lock下运行 # 等待通知, 挂起线程毫秒(为None时,时间无限),直到收到notify或者超时才会继续运行 conn.wait() time.sleep(2) # 消费方法 def consume(): global product if conn.acquire(): while True: print("***执行consume***") if product is not None: print(f"***卖出产品:{product}***") product = None # 通知生产者,产品已卖出 conn.notify() # 等待通知 conn.wait() time.sleep(2) if __name__ == "__main__": t1 = threading.Thread(target=consume) t1.start() t2 = threading.Thread(target=produce) t2.start()
运行结果
示例2,生产一定产品数量后进行消费
# 生产数量达到一定条件后被消费 num = 0 condition = threading.Condition() class Producer(threading.Thread): '''生产者,生产商品,10个后等待消费''' def run(self) -> None: global num # 获得锁 condition.acquire() while True: num += 1 print("生产了1个,现在有{0}个".format(num)) time.sleep(1) if num >= 6: print("已到达到6个,停止生产") # 唤醒消费者线程 condition.notify() # 等待释放锁 或者 被唤醒获取锁 condition.wait() class Consumer(threading.Thread): '''消费者抢购商品,每人初始20元,商品单价2元''' def __init__(self, *args, **kwargs): super(Consumer, self).__init__(None, None) self.money = 12 def run(self) -> None: global num while self.money > 0: condition.acquire() if num <= 0: print("商品已卖完,{0}通知生产者".format(threading.current_thread().name)) condition.notify() condition.wait() self.money -= 2 num -= 1 print("{0}消费了1个,剩余{1}个".format(threading.current_thread().name, num)) condition.release() time.sleep(1) print("{0}钱已花完,停止消费".format(threading.current_thread().name)) if __name__ == "__main__": p = Producer(daemon=True) c1 = Consumer(name="Consumer--1") c2 = Consumer(name="Consumer--2") p.start() c1.start() c2.start()
运行结果
上述两个示例大致差不多,只是实现方式不同。
release()和wait()的区别
相同点:release()和wait()都有释放锁的作用。
**不同点:**在wait()后,线程会挂起等待,若要继续执行,就需要接收到notify()或者notifyAll()来唤醒线程;而release()后,该线程还能继续执行。
threading.Event() 事件锁对象
Event事件锁对象用于线程之间的通信,即程序中的其中一个线程通过判断某个线程的状态来确定自己的下一步操作。Event对象有状态值,其默认为False,即遇到Event对象就阻塞线程运行。
Event中的对象方法:
wait(timeout=None) 挂起线程timeout秒(None为时间无限),直到超时或者收到event()信号开关为True时,才唤醒程序。
set() event状态设置为True。
clear() event状态设置为False。
isSet() 返回event对象的状态值。
Event事件锁对象实例定义
# Event对象方法 event = threading.Event()
例如:
# Event对象方法 event = threading.Event() def func(): print('等待响应...') event.wait() print('连接到服务') def connect(): print('成功启动服务') event.set() if __name__ == "__main__": t1 = threading.Thread(target=func) t2 = threading.Thread(target=connect) t1.start() t2.start()
运行结果
从运行结果可以看出,func函数需要等待connect函数运行event.set()之后才继续执行操作。