当你知道锁的使用抢锁必须要释放锁,其实你在操作锁的时候也极其容易产生死锁现象(整个程序卡死 阻塞)
# 死锁现象:所谓死锁,是指多个进程在运行过程中因争夺资源而造成的一种僵局,当进程处于这种僵持状态时,若无外力作用,它们都将无法再向前推进。 因此我们举个例子来描述,如果此时有一个线程A,按照先锁a再获得锁b的的顺序获得锁,而在此同时又有另外一个线程B,按照先锁b再锁a的顺序获得锁 from threading import Thread, Lock import time mutexA = Lock() mutexB = Lock() # 类只要加括号多次 产生的肯定是不同的对象 # 如果你想要实现多次加括号等到的是相同的对象————单例模式 class MyThead(Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() print('%s 抢到A锁'% self.name) # 获取当前线程名 mutexB.acquire() print('%s 抢到B锁'% self.name) mutexB.release() mutexA.release() def func2(self): mutexB.acquire() print('%s 抢到B锁'% self.name) time.sleep(2) mutexA.acquire() print('%s 抢到A锁'% self.name) # 获取当前线程名 mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(10): t = MyThead() t.start() # 线程A获得锁之后,只有将锁释放之后线程B才能使用该锁,如果线程A持有锁A,线程B持有锁B,这样线程A会等待线程B释放锁B之后才能获得锁B,同样线程B只有等待线程A释放锁A之后才能获得锁A,这样这两个线程就都卡住了,程序不能继续运行下去了
""" 递归锁的特点 可以被连续的acquire和release 但是只能被第一个抢到这把锁执行上述操作 它的内部有一个计数器 每acquire一次计数加一 每realse一次计数减一 只要计数不为0 那么其他人都无法抢到该锁 """ # 将上述的 mutexA = Lock() mutexB = Lock() # 换成 mutexA = mutexB = RLock()
信号量在不同的阶段可能对应不同的技术点
在并发编程中信号量指的是锁
""" 如果我们将互斥锁比喻成一个厕所的话 那么信号量就相当于多个厕所 """ from threading import Thread, Semaphore import time import random # 利用random模块实现打印随机验证码(搜狗的一道笔试题) sm = Semaphore(5) # 括号内写数字 设置‘厕所’的数量,当里面写1的时候与互斥锁一样,实际上信号量能够限制访问一个任务的线程的数量 def task(name): sm.acquire() print('%s 正在蹲坑'% name) time.sleep(random.randint(1, 5)) sm.release() if __name__ == '__main__': for i in range(20): t = Thread(target=task, args=('伞兵%s号'%i, )) t.start()
一些进程/线程需要等待另外一些进程/线程运行完毕之后才能运行,类似于发射信号一样
from threading import Thread, Event import time event = Event() # 引入事件 def light(): print('红灯亮着的') time.sleep(3) print('绿灯亮了') # 告诉等待红灯的人可以走了 event.set() # 设置事件,wait到这个事件就可以继续向下执行了 def car(name): print('%s 车正在灯红灯'%name) event.wait() # 等待别人给你发信号 print('%s 车加油门飙车走了'%name) if __name__ == '__main__': t = Thread(target=light) t.start() for i in range(20): t = Thread(target=car, args=('%s'%i, )) t.start() # 需要等到上面light的线程执行完之后才能继续向下执行
""" 同一个进程下多个线程数据是共享的,为什么同一个进程下还会去使用队列呢,因为队列是: 管道 + 锁 使用队列还是为了保证数据的安全 """ import queue # 我们现在使用的队列都是只能在本地测试使用 # 1 队列q 先进先出 q = queue.Queue(3) q.put(1) q.get() q.get_nowait() q.get(timeout=3) q.full() q.empty() # 2 后进先出q q = queue.LifoQueue(3) # last in first out q.put(1) q.put(2) q.put(3) print(q.get()) # 3 # 3 优先级队列:你可以给放入队列中的数据设置进出的优先级 q = queue.PriorityQueue(4) q.put((10, '111')) q.put((100, '222')) q.put((0, '333')) q.put((-5, '444')) print(q.get()) # (-5, '444') # put括号内放一个元组,第一个放数字表示优先级 # 需要注意的是,数字越小优先级越高
先回顾之前TCP服务端实现并发的效果是怎么玩的
每来一个人就开设一个进程或者线程去处理
""" 无论是开设进程也好还是开设线程也好,都需要消耗资源 只不过开设线程的消耗比开设进程的稍微小一点而已 我们是不可能做到无限制的开设进程和线程的 因为计算机硬件的资源跟不上 硬件的开发速度是远远赶不上软件的 我们的宗旨应该是在保证计算机硬件能够正常工作的情况下最大限度的利用它 """ # 池的概念 """ 什么是池? 池是用来保证计算机硬件安全的情况下最大限度的利用计算机 它降低了程序的运行效率但是保证了计算机硬件的安全,让你写的程序能够正常运行 """
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor import time import os pool = ThreadPoolExecutor(5) # 池子里面固定只有五个线程 # 括号内可以传数字,不传的话默认会开设当前计算机cpu个数五倍的线程 pool = ProcessPoolExecutor(5) # 括号内可以传数字,不传的话默认会开设当前计算机cpu个数进程 def task(n): print(n,os.getpid()) time.sleep(2) return n**n def call_back(n): print('call_back>>>:',n.result()) """ 任务的提交方式 同步:提交任务之后原地等待任务的返回结果,期间不做任何事 异步:提交任务之后不等待任务的返回结果,执行继续往下执行 异步提交任务的返回结果 应该通过回调机制来获取 """ # 下面注释的代码是一种方法,将得到的future对象放到列表中,再从列表中将对象一个一个result()出来 if __name__ == '__main__': # pool.submit(task, 1) # 朝池子中提交任务,默认异步提交,1是task的参数 # t_list = [] for i in range(20): # 朝池子中提交20个任务 # res = pool.submit(task, i) # 得到的实际上是<Future at 0x100f97b38 state=running>对象 # print(res.result()) # 使用result方法可以获得上面的对象获得的值,但是实现的效果是同步提交,和join方法有些类似,所以不建议使用 res = pool.submit(task, i).add_done_callback(call_back) # 这就是传说中的回调机制,简单的说就是传入一个函数,当异步线程通过call_back函数得到返回值的时候,自动返回函数结果 # t_list.append(res) # 等待线程池中所有的任务执行完毕之后再继续往下执行 # pool.shutdown() # 关闭线程池 等待线程池中所有的任务运行完毕 # for t in t_list: # print('>>>:',t.result()) # 肯定是有序的,因为上面放入列表的时候就是有序的 """ 程序有并发变成了串行 任务的为什么打印的是None res.result() 拿到的就是异步提交的任务的返回结果 """
线程池代码总结
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor pool = ProcessPoolExecutor(5) pool.submit(task, i).add_done_callback(call_back) # 异步回调机制
""" 进程:资源单位 线程:执行单位 单线程下实现并发 协程:这个概念完全是我们程序员自己想出来 多道技术 切换+保存状态 我们想通过代码层面自己检测IO行为。一旦遇到IO代码层面实现切换 这样给操作系统的感觉好像我这个程序一直运行没有IO 欺骗操作系统从而最大化的利用CPU 一味的切换加保存状态也有可能会降低程序的效率 计算密集型的 切换会降低效率 IO密集型的 切换会提高效率 """
该模块能够帮助实现IO的检测,并在程序遇到IO的时候切换任务
# 安装 pip3 install gevent
from gevent import monkey;monkey.patch_all() import time from gevent import spawn """ gevent模块本身无法检测常见的一些io操作 在使用的时候需要你额外的导入一句话 from gevent import monkey monkey.patch_all() 又由于上面的两句话在使用gevent模块的时候是肯定要导入的 所以还支持简写 from gevent import monkey;monkey.patch_all() """ def heng(): print('哼') time.sleep(2) print('哼') def ha(): print('哈') time.sleep(3) print('哈') def heiheihei(): print('heiheihei') time.sleep(5) print('heiheihei') start_time = time.time() g1 = spawn(heng) # 检测IO,注意:spawn在检测的时候 是异步提交的 g2 = spawn(ha) g3 = spawn(heiheihei) g1.join() g2.join() # 等待被检测的任务执行完毕 再往后继续执行 g3.join() # heng() # ha() # print(time.time() - start_time) # 5.005702018737793 print(time.time() - start_time) # 3.004199981689453 5.005439043045044
# 服务端 from gevent import monkey;monkey.patch_all() import socket from gevent import spawn def communication(conn): while True: try: data = conn.recv(1024) if len(data) == 0: break conn.send(data.upper()) except ConnectionResetError as e: print(e) break conn.close() def server(ip, port): server = socket.socket() server.bind((ip, port)) server.listen(5) while True: conn, addr = server.accept() spawn(communication, conn) if __name__ == '__main__': g1 = spawn(server, '127.0.0.1', 8080) g1.join() # 客户端 from threading import Thread, current_thread import socket def x_client(): client = socket.socket() client.connect(('127.0.0.1',8080)) n = 0 while True: msg = '%s say hello %s'%(current_thread().name,n) n += 1 client.send(msg.encode('utf-8')) data = client.recv(1024) print(data.decode('utf-8')) if __name__ == '__main__': for i in range(500): t = Thread(target=x_client) t.start()
""" 我们这里研究的IO模型都是针对网络IO的 Stevens在文章中一共比较了五种IO Model(模型): * blocking IO 阻塞IO * nonblocking IO 非阻塞IO * IO multiplexing IO多路复用 * signal driven IO 信号驱动IO * asynchronous IO 异步IO 由于signal driven IO(信号驱动IO)在实际中并不常用,所以主要介绍其余四种IO Model。 """ # 经历阻塞的程序会经历下面两个过程: # 1)等待数据准备 (Waiting for the data to be ready) # 2)将数据从内核拷贝到进程中(Copying the data from the kernel to the process) ''' 常见的网络阻塞状态: accept recv recvfrom send虽然 也有io行为 但是不在我们的考虑范围 '''
""" 我们之前写的都是阻塞IO模型,但是协程除外 实际上应用程序在建立链接的时候是需要向操作系统申请,也就是系统调用,是操作系统调用网卡建立链接,也是操作系统在建立连接之后将接收到的数据放到内存中 阻塞主要存在于accept、recv、recvfrom,因为应用程序需要一直向内核(可以理解为操作系统)申请系统调用,在链接建立之后才能将数据拷贝到进程中,否则就只能一直等待 """ import socket server = socket.socket() server.bind(('127.0.0.1',8080)) server.listen(5) while True: conn, addr = server.accept() while True: try: data = conn.recv(1024) if len(data) == 0:break print(data) conn.send(data.upper()) except ConnectionResetError as e: break conn.close() # 在服务端开设多进程或者多线程 进程池线程池 其实还是没有解决IO问题 # 该等的地方还是得等 没有规避IO # 只不过多个人等待时彼此之间互不干扰
""" 要自己实现一个非阻塞IO模型 实际上解决的是建立链接之前,accept一直等待的问题,首先服务端申请系统调用,如果连接已经建立,则直接使用,如果未建立,就返回一个结果,服务端去询问其他的链接是否建立或者做其他的事 """ import socket import time server = socket.socket() server.bind(('127.0.0.1', 8081)) server.listen(5) server.setblocking(False) # 将所有的网络阻塞变为非阻塞 r_list = [] del_list = [] while True: try: conn, addr = server.accept() r_list.append(conn) except BlockingIOError: # time.sleep(0.1) # print('列表的长度:',len(r_list)) # print('做其他事') for conn in r_list: try: data = conn.recv(1024) # 没有消息 报错 if len(data) == 0: # 客户端断开链接 conn.close() # 关闭conn # 将无用的conn从r_list删除 del_list.append(conn) continue conn.send(data.upper()) except BlockingIOError: continue except ConnectionResetError: conn.close() del_list.append(conn) # 挥手无用的链接 for conn in del_list: r_list.remove(conn) del_list.clear() # 客户端 import socket client = socket.socket() client.connect(('127.0.0.1',8081)) while True: client.send(b'hello world') data = client.recv(1024) print(data) """ 虽然非阻塞IO给你的感觉非常的厉害 但是该模型会长时间占用着CPU并且不干活 让CPU不停的切换 我们实际应用中也不会考虑使用非阻塞IO模型 """
""" 当监管的对象只有一个的时候 其实IO多路复用连阻塞IO都比不上!!! 但是IO多路复用可以一次性监管很多个对象 监管机制是操作系统本身就有的 如果你想要用该监管机制(select) 需要你导入对应的select模块 当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。 这个图和blocking IO的图其实并没有太大的不同,事实上还更差一些。因为这里需要使用两个系统调用(select和recvfrom),而blocking IO只调用了一个系统调用(recvfrom)。但是,用select的优势在于它可以同时处理多个connection """ import socket import select server = socket.socket() server.bind(('127.0.0.1',8080)) server.listen(5) server.setblocking(False) # 默认是true,当设置为false时,程序不会在此处等待,而是直接向下走 read_list = [server] while True: r_list, w_list, x_list = select.select(read_list, [], []) """ 帮你监管 一旦有人来了 立刻给你返回对应的监管对象 """ # print(res) # ([<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080)>], [], []) # print(server) # print(r_list) for i in r_list: # """针对不同的对象做不同的处理""" if i is server: conn, addr = i.accept() # 也应该添加到监管的队列中 read_list.append(conn) else: res = i.recv(1024) if len(res) == 0: i.close() # 将无效的监管对象 移除 read_list.remove(i) continue print(res) i.send(b'heiheiheiheihei') # 客户端 import socket client = socket.socket() client.connect(('127.0.0.1',8080)) while True: client.send(b'hello world') data = client.recv(1024) print(data) """ 监管机制其实有很多 select机制 windows linux都有 poll机制 只在linux有 poll和select都可以监管多个对象 但是poll监管的数量更多 上述select和poll机制其实都不是很完美 当监管的对象特别多的时候 可能会出现 极其大的延时响应 epoll机制 只在linux有 它给每一个监管对象都绑定一个回调机制 一旦有响应 回调机制立刻发起提醒 针对不同的操作系统还需要考虑不同检测机制 书写代码太多繁琐 有一个人能够根据你跑的平台的不同自动帮你选择对应的监管机制 selectors模块 """
""" 异步IO模型是所有模型中效率最高的 也是使用最广泛的 相关的模块和框架 模块:asyncio模块(async实际上就是异步的意思) 异步框架:sanic tronado twisted 速度快 用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了(简单的说就是先向操作系统申请,申请结束之后就可以去干别的事了,剩下的任务操作系统都会帮忙弄好,弄好之后将发信号给进程,进程在继续干剩下来的事) """ import threading import asyncio @asyncio.coroutine def hello(): print('hello world %s'%threading.current_thread()) yield from asyncio.sleep(1) # 这里模拟的IO操作 print('hello world %s' % threading.current_thread()) loop = asyncio.get_event_loop() tasks = [hello(),hello()] loop.run_until_complete(asyncio.wait(tasks)) loop.close()
到目前为止,已经将四个IO Model都介绍完了。现在回过头来回答最初的那几个问题:blocking和non-blocking的区别在哪,synchronous IO和asynchronous IO的区别在哪。
先回答最简单的这个:blocking vs non-blocking。前面的介绍中其实已经很明确的说明了这两者的区别。调用blocking IO会一直block住对应的进程直到操作完成,而non-blocking IO在kernel还准备数据的情况下会立刻返回。
再说明synchronous IO和asynchronous IO的区别之前,需要先给出两者的定义。Stevens给出的定义(其实是POSIX的定义)是这样子的:
A synchronous I/O operation causes the requesting process to be blocked until that I/O operationcompletes;
An asynchronous I/O operation does not cause the requesting process to be blocked;
两者的区别就在于synchronous IO做”IO operation”的时候会将process阻塞。按照这个定义,四个IO模型可以分为两大类,之前所述的blocking IO,non-blocking IO,IO multiplexing都属于synchronous IO这一类,而 asynchronous I/O后一类 。
有人可能会说,non-blocking IO并没有被block啊。这里有个非常“狡猾”的地方,定义中所指的”IO operation”是指真实的IO操作,就是例子中的recvfrom这个system call。non-blocking IO在执行recvfrom这个system call的时候,如果kernel的数据没有准备好,这时候不会block进程。但是,当kernel中数据准备好的时候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,在这段时间内,进程是被block的。而asynchronous IO则不一样,当进程发起IO 操作之后,就直接返回再也不理睬了,直到kernel发送一个信号,告诉进程说IO完成。在这整个过程中,进程完全没有被block。
各个IO Model的比较如图所示:
经过上面的介绍,会发现non-blocking IO和asynchronous IO的区别还是很明显的。在non-blocking IO中,虽然进程大部分时间都不会被block,但是它仍然要求进程去主动的check,并且当数据准备完成以后,也需要进程主动的再次调用recvfrom来将数据拷贝到用户内存。而asynchronous IO则完全不同。它就像是用户进程将整个IO操作交给了他人(kernel)完成,然后他人做完后发信号通知。在此期间,用户进程不需要去检查IO操作的状态,也不需要主动的去拷贝数据。