from threading import Thread money = 100 def task(): global money money -= 1 t_list = [] for i in range(100): t = Thread(target=task) t.start() t_list.append(t) for t in t_list: t.join() print(money) --------执行结果----------------- 0
上述例子中,money值等于100,代码块是money减一; 过程 我们假设线程A先抢到了GIL,执行money -1 后,释放GIL。 此时生成的其它线程也会上去抢线程A释放的GIL 线程B,线程C....一直到生成的线程全部执行完。 ------------------------------------- 生成100个线程去执行代码块,所得结果为0, 如果生成10个线程,结果为10. 有此结果,是因为GIL的存在。
-----------------例子2【并发100,执行100 -1】 ---------------------- from threading import Thread import time money = 100 def task(): global money tmp = money time.sleep(0.1) #IO操作 money = tmp - 1 t_list = [] for i in range(100): #生成100个线程 t = Thread(target=task) t.start() t_list.append(t) for t in t_list: t.join() print(money) ----执行结果-------------------- 99
总结
当GIL遇到IO操作时,会释放GIL。 上述例子中,结合图型,生成的线程会去抢GIL,假设线程A抢到GIL,执行过程中有IO操作,此时只能释放GIL。 当所有线程都抢完一遍时。 继续向下执行money -1 ,此时所有线程(线程A,线程B,线程C)此刻都是从100-1 开始 所以上述例子结果为 99.
"""GIL不会影响程序层面的数据也不会保证它的修改是安全的要想保证得自己加锁"""
上述例子中,我们并发100,去执行代码逻辑,得到结果是99. 如果还是想得到结果0,需要在代码逻辑中加锁,不能只依靠GIL.
from threading import Thread, Lock import time money = 100 mutex = Lock() def task(): mutex.acquire() # 加锁 global money tmp = money time.sleep(0.1) money = tmp - 1 mutex.release() # 释放 t_list = [] for i in range(10): t = Thread(target=task) t.start() t_list.append(t) for t in t_list: t.join() print(money) # ----执行结果---------- # 0
小结
上述例子中,线程抢到GIL后,并且又加了锁,这样的话,其它线程需要等待它执行结束。 此时并发变串行,得到结果为0
多线程有优势
IO 密集型 | |
---|---|
多进程 | 申请额外的空间,小号更多的资源 |
多线程 | 消耗资源较少,通过管道技术 |
cpu密集型(计算密集型) | |
---|---|
多进程 | 申请额外的空间 消耗更多的资源(总耗时+申请空间+拷贝代码+切换) |
多线程 | 消耗资源相对较少 通过多道技术(总耗时+切换) |
多线程有优势!
IO密集型 | |
---|---|
多进程 | 总耗时(单个进程的耗时 + IO + 申请空间 + 拷贝代码) |
多线程 | 总耗时(单个进程的耗时 + IO) |
多进程有优势
cpu密集型(计算密集型) | |
---|---|
多进程 | 总耗时(单个进程的耗时) |
多线程 | 总耗时(多个进程的综合) |
import os from threading import Thread from multiprocessing import Process import time def work(): # 计算密集型 res = 1 for i in range(1, 100000): res *= i if __name__ == '__main__': start_time = time.time() p_list = [] for i in range(12): # 一次性创建12个进程 p = Process(target=work) p.start() p_list.append(p) for p in p_list: # 确保所有的进程全部运行完毕 p.join() print('总耗时:%s' % (time.time() - start_time)) # 获取总的耗时 #------执行结果---------- #总耗时:5.261997938156128
import os from threading import Thread from multiprocessing import Process import time def work(): # 计算密集型 res = 1 for i in range(1, 10000): res *= i if __name__ == '__main__': start_time = time.time() import os from threading import Thread from multiprocessing import Process import time def work(): # 计算密集型 res = 1 for i in range(1, 100000): res *= i if __name__ == '__main__': start_time = time.time() t_list = [] for i in range(12): # 创建12个线程 t = Thread(target=work) t.start() t_list.append(t) for t in t_list: t.join() print('总耗时:%s' % (time.time() - start_time)) # 获取总的耗时 #----------执行结果------ #总耗时:26.957412481307983
import os from threading import Thread from multiprocessing import Process import time def work(): time.sleep(2) # 模拟纯IO操作 if __name__ == '__main__': start_time = time.time() p_list = [] for i in range(100): p = Process(target=work) p.start() for p in p_list: p.join() print('总耗时:%s' % (time.time() - start_time)) #-----执行结果---- #总耗时:1.1923396587371826
import os from threading import Thread from multiprocessing import Process import time def work(): time.sleep(2) # 模拟纯IO操作 if __name__ == '__main__': start_time = time.time() t_list = [] for i in range(100): t = Thread(target=work) t.start() for t in t_list: t.join() print('总耗时:%s' % (time.time() - start_time)) ---执行结果----- 总耗时:0.009972572326660156
死锁是指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去。 此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。 ---参考百科
例子
from threading import Thread, Lock import time mutexA = Lock() # 类名加括号每执行一次就会产生一个新的对象 mutexB = Lock() # 类名加括号每执行一次就会产生一个新的对象 class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() print(f'{self.name}抢到了A锁') mutexB.acquire() print(f'{self.name}抢到了B锁') mutexB.release() print(f'{self.name}释放了B锁') mutexA.release() print(f'{self.name}释放了A锁') def func2(self): mutexB.acquire() print(f'{self.name}抢到了B锁') time.sleep(1) mutexA.acquire() print(f'{self.name}抢到了A锁') mutexA.release() print(f'{self.name}释放了A锁') mutexB.release() print(f'{self.name}释放了B锁') for i in range(10): t = MyThread() t.start() -----执行结果--------- Thread-1抢到了A锁 Thread-1抢到了B锁 Thread-1释放了B锁 Thread-1释放了A锁 Thread-2抢到了A锁 Thread-1抢到了B锁
信号量. 信号量 (Semaphore),有时被称为信号灯,是在 多线程 环境下使用的一种设施,是可以用来保证两个或多个关键代码段不被 并发 调用。. 在进入一个关键代码段之前,线程必须获取一个信号量;一旦该关键代码段完成了,那么该线程必须释放信号量。. 其它想进入该关键代码段的线程必须等待直到第一个线程释放信号量。. 为了完成这个过程,需要创建一个信号量VI,然后将Acquire Semaphore VI以及Release Semaphore VI分别放置在每个关键代码段的首末端。.
以停车场的运作为例。 假设停车场只有三个车位,开始三个车位都是空的。 这时同时来了五辆车,看门人开闸允许其中三辆直接进入, 剩下的车则必须在入口等待,后续来的车也在入口处等待。 这时一辆车想离开停车场,告知看门人,打开闸门放他出去, 看门人看了看空车位数量,然后看门人才让外面的一辆车进去。 如果又离开两辆,则又可以放入两辆,如此往复。 在这个停车场系统中,车位是公共资源,每辆车好比一个线程, 看门人起的就是信号量的作用。
信号量本质也是互斥锁 只不过它是多把锁
信号量在不同的知识体系中 意思可能有区别 在并发编程中 信号量就是多把互斥锁 在django中 信号量指的是达到某个条件自动触发(中间件)
from threading import Thread, Lock, Semaphore import time import random sp = Semaphore(5) # 一次性产生五把锁 class MyThread(Thread): def run(self): sp.acquire() print(self.name) time.sleep(random.randint(1, 3)) sp.release() for i in range(20): t = MyThread() t.start()
子进程\子线程之间可以彼此等待彼此 eg: 子A运行到某一个代码位置后发信号告诉子B开始运行 from threading import Thread, Event import time event = Event() # 类似于造了一个红绿灯 def light(): print('红灯亮着的 所有人都不能动') time.sleep(3) print('绿灯亮了 油门踩到底 给我冲!!!') event.set() def car(name): print('%s正在等红灯' % name) event.wait() print('%s加油门 飙车了' % name) t = Thread(target=light) t.start() for i in range(20): t = Thread(target=car, args=('熊猫PRO%s' % i,)) t.start()
进程池和线程池
多进程 多线程 在实际应用中是不是可以无限制的开进程和线程 肯定不可以!!! 会造成内存溢出受限于硬件水平 我们在开设多进程或者多线程的时候 还需要考虑硬件的承受范围 池 降低程序的执行效率 保证计算机硬件的安全 -------------------------------------------------------------------------------- 进程池 提前创建好固定个数的进程供程序使用 后续不会再创建 线程池 提前创建好固定个数的线程供程序使用 后续不会再创建
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from threading import current_thread import os import time # pool = ThreadPoolExecutor(5) # 固定产生五个线程 pool = ProcessPoolExecutor(5) # 固定产生五个线程 def task(n): # print(current_thread().name) print(os.getpid()) # print(n) time.sleep(1) return '返回的结果' def func(*args, **kwargs): print('func', args, kwargs) print(args[0].result()) if __name__ == '__main__': for i in range(20): # res = pool.submit(task,123) # 朝池子中提交任务(异步) # print(res.result()) # 同步 # pool.submit(task, 123).add_done_callback(func) """异步回调:异步任务执行完成后有结果就会自动触发该机制""" pool.submit(task, 123).add_done_callback(func)
import socket from gevent import monkey;monkey.patch_all() # 固定编写 用于检测所有的IO操作(猴子补丁) from gevent import spawn def communication(sock): while True: data = sock.recv(1024) print(data.decode('utf8')) sock.send(data.upper()) def get_server(): server = socket.socket() server.bind(('127.0.0.1', 8080)) server.listen(5) while True: sock, addr = server.accept() # IO操作 spawn(communication, sock) s1 = spawn(get_server) s1.join()
如何不断的提升程序的运行效率 多进程下开多线程 多线程下开协程