1.先代码验证GIL的存在
from threading import Thread import time money = 100 def task(): global money money -= 1 for i in range(100): # 创建100个线程 t = Thread(target=task) t.start() print(money) # 0 # 结果为0,说明各个线程抢到全局锁才回去执行,执行完再交接给下一位,最后都进行了数据修改
from threading import Thread, Lock import time money = 100 mutex = Lock() # 互斥锁 def task(): global money mutex.acquire() # 抢锁 tmp = money time.sleep(0.1) money = tmp - 1 mutex.release() # 放锁 t_list = [] # 存放线程的列表 for i in range(100): t = Thread(target=task) t.start() t_list.append(t) for t in t_list: t.join() # 给所有线程加join方法,确保所有的线程运行完毕 print(money) # 0 注意:如果这里没加互斥锁mutex的时候,结果为99,为什么? 分析:因为如果没有互斥锁保证它独立运行完再运行下一个的话,每个线程获取到的money都是100,tmp-1都是99,那么最终结果也就是99 """ GIL是一个纯理论知识 在实际工作中根本无需考虑它的存在 GIL作用面很窄 仅限于解释器级别 后期我们要想保证数据的安全应该自定义互斥锁(使用别人封装好的工具) """
mutex = lock() with mutex: 加锁的代码
#计算密集型和IO密集型的区别 IO 密集型:系统运作,大部分的状况是CPU 在等I/O (硬盘/内存)的读/写。 CPU 密集型(计算密集型):大部份时间用来做计算、逻辑判断等CPU 动作的程序称之CPU 密集型(计算密集型) #IO密集型任务的特点: 涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。 IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少,因此,用运行速度极快的C语言替换用Python这样运行速度极低的脚本语言,完全无法提升运行效率。对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C语言最差。 #单个CPU 多个IO密集型任务 多进程:浪费资源 无法利用多个CPU 多线程:节省资源 切换+保存状态 多个计算密集型任务 多进程:耗时更长 创建进程的消耗+切换消耗 多线程:耗时较短 切换消耗 #多个CPU 多个IO密集型任务 多进程:浪费资源 多个CPU无用武之地 多线程:节省资源 切换+保存状态 多个计算密集型任务 多进程:利用多核 速度更快 多线程:速度较慢
#如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁 解决: 1.重构代码 2.添加超时释放锁
信号量在不同知识体系中,展现出来的功能时不一样的, #eg: 在并发编程中:信号量是把互斥锁 在djando框架中:信号量时达到某条件自动触发特定功能
""" 如果将自定义互斥锁比喻成是单个厕所(一个坑位) 那么信号量相当于是公共厕所(多个坑位) """ from threading import Thread, Semaphore import time import random sp = Semaphore(5) # 创建一个有五个坑位(带门的)的公共厕所 def task(name): sp.acquire() # 抢锁 print('%s正在蹲坑' % name) time.sleep(random.randint(1, 5)) sp.release() # 放锁 for i in range(1, 31): t = Thread(target=task, args=('伞兵%s号' % i, )) t.start()
#分析:上面代码创建带有五个位置的信号量,然后创建30个线程,会发现,代码结果会直接先运行5个线程,然后陆续运行接下来的线程,说明,信号量是设置好的5个位置,前面的位置腾出来后后面的线程再进去,以此类推
""" 子线程的运行可以由其他子线程决定!!! """ Event几种方法: event.isSet():返回event的状态值; event.wait():如果 event.isSet()==False将阻塞线程; event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度; event.clear():恢复event的状态值为False。
进程池: 提前创建好固定数量的进程,后续反复使用这些进程(合同工) 线程池: 提前创建好固定数量的线程,后续反复使用这些线程(合同工)
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecytor # 线程池 pool = ThreadPoolExecutor(5) # 线程池线程数默认时CPU个数的5倍,也可以自定义,这个代码执行之后就会立刻创建5个等待工作的线程 def task(n): time.sleep(2) print(n) return '任务的执行结果:%s'%n**2 def func(*args, **kwargs): # print(args, kwargs) print(args[0].result()) for i in range(20): # res = pool.submit(task, i) # 朝线程池中提交任务(异步)及参数 # print(res.result()) # 同步提交(获取任务的返回值) '''这样效率太慢,不应该自己主动等待结果 应该让异步提交自动提醒>>>:异步回调机制''' # 异步回调机制:异步提交自动提醒add_done_callback(func) pool.submint(task, i).add_done_callback(func) """add_done_callback只要任务task有结果了,就会自动调用括号内的函数func处理,把task任务当作参数传入func""" # 进程池 pool = ProcessPoolExecutor(5) # 进程池进程数默认是CPU个数 也可以自定义 '''上面的代码执行之后就会立刻创建五个等待工作的进程''' pool.submit(task, i).add_done_callback(func)
进程:资源单位 线程:执行单位 协程:单线程下实现并发
from gevent import monkey;monkey.patch_all() # 固定编写 用于检测所有的IO操作 from gevent import spawn import time def play(name): print('%s play 1' % name) time.sleep(5) print('%s play 2' % name) def eat(name): print('%s eat 1' % name) time.sleep(3) print('%s eat 2' % name) start_time = time.time() g1 = spawn(play, 'jason') # 协程g1 g2 = spawn(eat, 'jason') # 协程g2 g1.join() # 等待检测任务执行完毕 g2.join() # 等待检测任务执行完毕 print('总耗时:', time.time() - start_time) # 5.00609827041626
from gevent import monkey;monkey.patch_all() from gevent import spawn import socket def communication(sock): while True: data = sock.recv(1024) # IO操作 print(data.decode('utf8')) sock.send(data.upper()) def get_server(): server = socket.socket() server.bind(('127.0.0.1', 8080)) server.listen(5) while True: sock, addr = server.accept() # IO操作 spawn(communication, sock) g1 = spawn(get_server) g1.join()