[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vcijuIAg-1644766841076)(./img/串行并行和并发.png)]
注意
:协程只是一种做事的方式
进程、线程和协程的关系
线程是程序执行的最小单位,而进程是操作系统分配资源的最小单位;
一个进程由一个或多个线程组成,线程是一个进程中代码的不同执行路线;
进程之间相互独立,但同一进程下的各个线程之间共享程序的内存空间(包括代码段、数据集、堆等)及一些进程级的资源(如打开文件和信号),某进程内的线程在其它进程不可见;
调度和切换:线程上下文切换比进程上下文切换要快得多。
同步和异步强调的是消息通信机制,所以异步编程异步只出现在网络通信时
Python的标准库提供了两个模块: _thread
和 threading
, _thread
是低级模块, threading
是高级模块,对 _thread
进行了封装。绝大多数情况下,我们只需要使用 threading
这个高级模块。
线程的创建可以通过两种方式:
from threading import Thread from time import sleep def func1(name): print(f'线程{name} start') for i in range(3): print(f'线程: {name}. {i}') sleep(1) print(f'线程{name} end') if __name__ == '__main__': print("主线程: strat") # 创建线程 t1 = Thread(target=func1,args=("1",)) t2 = Thread(target=func1,args=("2",)) # 启动线程 t1.start() t2.start() print('主线程: end')
from threading import Thread from time import sleep class MyThread(Thread): def __init__(self,name): Thread.__init__(self) self.name = name # 方法重写,这个run函数名不能改 def run(self): print(f'线程{self.name} start') for i in range(3): print(f'线程: {self.name}. {i}') sleep(1) print(f'线程{self.name} end') if __name__ == '__main__': print("主线程: strat") # 创建线程 t1 = MyThread('1') t2 = MyThread('2') # 启动线程 t1.start() t2.start() print('主线程: end')
线程的执行统一通过 start()
方法
之前的代码中,我们会发现: 主线程不会等待子线程的结束;我们可以通过join方法,让主线程等待子线程的结束;
from threading import Thread from time import sleep def func1(name): print(f'线程{name} start') for i in range(3): print(f'线程: {name}. {i}') sleep(1) print(f'线程{name} end') if __name__ == '__main__': print("主线程: strat") # 创建线程 t1 = Thread(target=func1,args=("1",)) t2 = Thread(target=func1,args=("2",)) # 启动线程 t1.start() t2.start() # 主线程等待子线程的结束 t1.join() t2.join() print('主线程: end')
守护线程,主要的特征是它的生命周期。主线程死亡,它也就随之死亡。在python中,线程通过 setDaemon(True|False)
来设置是否为守护线程。
守护线程的作用: 守护线程作用是为其他线程提供便利服务,守护线程最典型的应用就是 GC (垃圾收集器)。
观察如下代码:
from threading import Thread from time import sleep class MyThread(Thread): def __init__(self,name): Thread.__init__(self) self.name = name # 方法重写,这个run函数名不能改 def run(self): print(f'线程{self.name} start') for i in range(3): print(f'线程: {self.name}. {i}') sleep(1) print(f'线程{self.name} end') if __name__ == '__main__': print("主线程: strat") # 创建线程 t1 = MyThread('1') t2 = MyThread('2') # 设置守护线程 t1.daemon = True # 主线程消亡,t1线程也会消亡 # 启动线程 t1.start() t2.start() print('主线程: end')
结果:
这里将t1
线程设置为守护线程,按道理,应该是主线程end之后,t1
线程应该不再执行了,但实际上: 由于主线程下有两个线程,主线程虽然执行完毕但是没有真正消亡,主线程在等待t2
线程执行完毕并且终止t1
(守护线程)的运行后才终止
那如果我们将两个线程都设置为守护线程,结果就能如我们所愿了呢?
from threading import Thread from time import sleep class MyThread(Thread): def __init__(self,name): Thread.__init__(self) self.name = name # 方法重写,这个run函数名不能改 def run(self): print(f'线程{self.name} start') for i in range(3): print(f'线程: {self.name}. {i}') sleep(1) print(f'线程{self.name} end') if __name__ == '__main__': print("主线程: strat") # 创建线程 t1 = MyThread('1') t2 = MyThread('2') # 设置守护线程 t1.daemon = True # 主线程消亡,t1线程也会消亡 t2.daemon = True # 主线程消亡,t2线程也会消亡 # 启动线程 t1.start() t2.start() print('主线程: end')
结果如我们所愿
Python代码的执行由Python 虚拟机(也叫解释器主循环,CPython版本)来控制,Python 在设计之初就考虑到要在解释器的主循环中,同时只有一个线程在执行,即在任意时刻,只有一个线程在解释器中运行。对Python 虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。
处理多线程问题时,多个线程访问同一个对象,并且某些线程还想修改这个对象。 这时候,我们就需要用到“线程同步”。 线程同步其实就是一种等待机制,多个需要同时访问此对象的线程进入这个对象的等待池形成队列,等待前面的线程使用完毕后,下一个线程再使用。
模拟场景: 老王和他老婆同时到了ATM机前面取钱(不同地点),账户中只有100元,而两人都想取80元,如果线程不同步,那会出现什么情况? 写个模拟程序看看
from threading import Thread from time import sleep class Account: def __init__(self, money, name): self.money = money self.name = name # 模拟提款动作 class Drawing(Thread): def __init__(self,drawingNum,account): Thread.__init__(self) self.drawingNum = drawingNum self.account = account self.expenseTotal = 0 def run(self): # 如果想屁吃 if self.account.money < self.drawingNum: return sleep(1) # 判断万可以取钱,则阻塞,就是为了测试发生冲突问题 self.account.money -= self.drawingNum self.expenseTotal += self.drawingNum print(f'账户:{self.account.name}, 余额是:{self.account.money}') print(f'账户:{self.account.name}, 总共取了:{self.expenseTotal}') if __name__ == '__main__': a1 = Account(100,'老王') draw1 = Drawing(80,a1) # 定义一个取钱的线程 draw2 = Drawing(80,a1) # 再定义一个取钱的线程 draw1.start() draw2.start()
可以发现账户余额成为了负数,这就是在未使用线程同步时操作的结果…
我们可以通过“锁机制”来实现线程同步问题,锁机制有如下几个要点:
互斥锁
注意
: 互斥锁是多个线程一起去抢,抢到锁的线程先执行,没有抢到锁的线程需要等待,等互斥锁使用完释放后,其它等待的线程再去抢这个锁。threading
模块中定义了 Lock
变量,这个变量本质上是一个函数,通过调用这个函数可以获取一把互斥锁。
from threading import Thread, Lock from time import sleep class Account: def __init__(self, money, name): self.money = money self.name = name # 模拟提款动作 class Drawing(Thread): def __init__(self,drawingNum,account): Thread.__init__(self) self.drawingNum = drawingNum self.account = account self.expenseTotal = 0 def run(self): lock1.acquire() # 拿锁 # 如果想屁吃 if self.account.money < self.drawingNum: print('账户余额不足') return sleep(1) # 判断万可以取钱,则阻塞,就是为了测试发生冲突问题 self.account.money -= self.drawingNum self.expenseTotal += self.drawingNum lock1.release() print(f'账户:{self.account.name}, 余额是:{self.account.money}') print(f'账户:{self.account.name}, 总共取了:{self.expenseTotal}') if __name__ == '__main__': a1 = Account(100,'老王') lock1 = Lock() draw1 = Drawing(80,a1) # 定义一个取钱的线程 draw2 = Drawing(80,a1) # 再定义一个取钱的线程 draw1.start() draw2.start()
在多线程程序中,死锁问题很大一部分是由于一个线程同时获取多个锁造成的。
举例:有两个人都要做饭,都需要“锅”和“菜刀”才能炒菜。
from threading import Thread, Lock from time import sleep def fun1(): lock1.acquire() print('fun1拿到菜刀') sleep(2) lock2.acquire() print('fun1拿到锅') lock2.release() print('fun1释放锅') lock1.release() print('fun1释放菜刀') def fun2(): lock2.acquire() print('fun2拿到锅') lock1.acquire() print('fun2拿到菜刀') lock1.release() print('fun2释放菜刀') lock2.release() print('fun2释放锅') if __name__ == '__main__': lock1 = Lock() lock2 = Lock() t1 = Thread(target=fun1) t2 = Thread(target=fun2) t1.start() t2.start()
死锁是由于“同步块需要同时持有多个锁造成”的,要解决这个问题,思路很简单,就是:同一个代码块,不要同时持有两个对象锁。
互斥锁使用后,一个资源同时只有一个线程访问。如果某个资源,我们同时想让N个(指定数值)线程访问?这时候,可以使用信号量。信号量控制同时访问资源的数量。信号量和锁相似,锁同一时间只允许一个对象(进程)通过,信号量同一时间允许多个对象(进程)通过。
# 一个房间,只允许两个人进入 from threading import Semaphore,Thread from time import sleep def home(name,se): se.acquire() print(f'{name}进入房间') sleep(2) print(f'***{name}走出房间') se.release() if __name__ == '__main__': se = Semaphore(2) # 信号量对象 for i in range(7): t = Thread(target = home,args=(f'tom {i}', se)) t.start()
原理: Event 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,event 对象中的信号标志被设置假。如果有线程等待一个 event 对象,而这个 event 对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个 event 对象的信号标志设置为真,它将唤醒所有等待个 event 对象的线程。如果一个线程等待一个已经被设置为真的 event 对象,那么它将忽略这个事件,继续执行
Event()
可以创建一个事件管理标志,该标志(event)默认为False, event对象主要有四种方法可以调用:
方法名 | 说明 |
---|---|
event.wait(timeout=None) | 调用该方法的线程会被阻塞,如果设置了timeout参数,超时后,线程会停止阻塞继续执行; |
event.set() | 将event的标志设置为True,调用wait方法的所有线程将被唤醒 |
event.clear() | 将event的标志设置为False,调用wait方法的所有线程将被阻塞 |
event.is_set() | 判断event的标志是否为True |
下面我们用程序模拟下面这幅图:
import threading import time def chihuoguo(name): print(f'{name}已经启动') print(f'小伙伴{name}已经进入就餐状态') time.sleep(1) event.wait() print(f'{name}收到通知了') print(f'小伙伴{name}开始吃咯!') if __name__ == '__main__': event = threading.Event() thread1 = threading.Thread(target=chihuoguo,agrs=('tom',)) thread2 = threading.Thread(target=chihuoguo,agrs=('cherry',)) # 开启线程 thread1.start() thread2.start() # 等待event对象解开 for i in range(10): time.sleep(1) print(">"*(i+1) + '-' * (9-i)) print('--->>> 主线程通知小伙伴开吃咯') event.set()
多线程环境下,我们经常需要多个线程的并发和协作。这个时候,就需要了解一个重要的多线程并发协作模型“生产者/消费者模式”。
缓冲区是实现并发的核心,缓冲区的设置有3个好处
从一个线程向另一个线程发送数据最安全的方式可能就是使用 queue
库中的队列了。创建一个被多个线程共享的 Queue 对象,这些线程通过使用 put() 和 get() 操作来向队列中添加或者删除元素。Queue 对象已经包含了必要的锁,所以你可以通过它在多个线程间多安全地共享数据。
from queue import Queue from time import sleep import random from threading import Thread def producer(): num = 1 while True: if queue.qsize() < 5: print(f'生产{num}号,大馒头') queue.put(f"大馒头:{num}号") num += 1 sleep(random.randint(1,4)) else: print('馒头筐慢了,等待人来取') sleep(1) def consumer(): while True: if queue.qsize() > 0: print(f'获取馒头:{queue.get()}') sleep(random.randint(1,5)) else: print('快点生产我要饿死啦...') sleep(1) if __name__ == '__main__': queue = Queue() t1 = Thread(target=producer) t2 = Thread(target=consumer) t1.start() t2.start()
进程的优点:
创建进程后,使用start()启动进程
from multiprocessing import Process import os from time import sleep def fun(name): print(f'当前进程ID:{os.getpid()}') print(f'父进程ID:{os.getppid()}') print(f'Process: {name}, start') sleep(3) print(f'Process:{name} end') # 类方法创建 class MyProcess(Process): def __init__(self,name): Process.__init__(self) self.name = name def run(self): print(f'当前进程ID:{os.getpid()}') print(f'父进程ID:{os.getppid()}') print(f'Process: {self.name}, start') sleep(3) print(f'Process:{self.name} end') # windows上多进程实现的bug,如果不加main的限制,就会无限递归创建进程, if __name__ == '__main__': print("当前进程ID:",os.getpid()) p1 = Process(target=fun,args=('p1',)) p2 = Process(target=fun,args=('p2',)) p1.start() p2.start() # p1 = MyProcess('p1') # p2 = MyProcess('p2') # p1.start() # p2.start()
值得注意的是: 进程间通信要给每个进程传入数据,虽然明面上看来这个数据是一个全局变量,但是每个进程运行起来的时候是相互独立,不共享数据的…
from multiprocessing import Process,Queue from time import sleep class MyProcess(Process): def __init__(self,name,mq): Process.__init__(self) self.name = name self.mq = mq def run(self): print(f'Process: {self.name}, start') temp = self.mq.get() print(f'get Date:{temp}') sleep(2) print(f'put Data:{temp}' + '1') self.mq.put(temp+'1') print(f'Process:{self.name} end') if __name__ == '__main__': mq = Queue() mq.put('1') mq.put('2') mq.put('3') # 进程列表 p_list = [] for i in range(3): p1 = MyProcess(f'p{i}',mq) p_list.append(p1) p1.start() p1.join() # 让主进程等着 for i in range(3): print(mq.get())
Pipe方法返回(conn1, conn2)代表一个管道的两个端。
import multiprocessing from time import sleep def func1(conn1): sub_info = "Hello!" print(f'进程1--{multiprocessing.current_process().pid}发送数据: {sub_info}') sleep(1) conn1.send(sub_info) print(f'来自进程2:{conn1.recv()}') sleep(1) def func2(conn2): sub_info = "你好!" print(f'进程2--{multiprocessing.current_process().pid}发送数据: {sub_info}') sleep(1) conn2.send(sub_info) print(f'来自进程1:{conn2.recv()}') sleep(1) if __name__ == '__main__': conn1,conn2 = multiprocessing.Pipe() process1 = multiprocessing.Process(target=func1, args=(conn1,)) process2 = multiprocessing.Process(target=func2, args=(conn2,)) # 启动子进程 process1.start() process2.start()
from multiprocessing import Process,Manager def func(name,m_list,m_dict): m_dict['age'] = 19 m_list.append('我是大帅哥!!') if __name__ == '__main__': # Manager与multiprocessing.Queue类似,也类似全局变量一样的通信方式 # 这里虽然只写了一个进程,写两个也是一样的,能通信即可... with Manager() as mgr: m_list = mgr.list() m_dict = mgr.dict() m_list.append('我是PD!!!') p1 = Process(target=func, args=('p1',m_list,m_dict)) p1.start() p1.join() print(m_dict) print(m_dict)
进程池可以提供指定数量的进程给用户使用,即当有新的请求提交到进程池中时,如果池未满,则会创建一个新的进程用来执行该请求;反之,如果池中的进程数已经达到规定最大值,那么该请求就会等待,只要池中有进程空闲下来,该请求就能得到执行。
使用进程池的优点
类/方法 | 功能 | 参数 |
---|---|---|
Pool(processes) | 创建进程池对象 | processes表示进程池中有多少进程 |
pool.apply_async(func,args,kwds) | 异步执行;将事件放入到进程池队列 | func 事件函数 args 以元组形式给func传参 kwds 以字典形式给func传参 返回值:返回一个代表进程池事件的对象,通过返回值的get方法可以得到事件函数的返回值 |
pool.apply(func,args,kwds) | 同步执行;将事件放入到进程池队列 | func 事件函数 args 以元组形式给func传参kwds 以字典形式给func传参 |
pool.close() | 关闭进程池 | |
pool.join() | 回收进程池 | |
pool.map(func,iter) | 类似于python的map函数,将要做的事件放入进程池 | func 要执行的函数 iter 迭代对象 |
from multiprocessing import Pool import os from time import sleep def func(name): print(f'当前进程ID:{os.getpid()},{name}') sleep(2) return name def func2(args): print(f'callback:{args}') if __name__ == '__main__': pool = Pool(5) pool.apply_async(func=func,args=('pd',),callback=func2) pool.apply_async(func=func,args=('pdd',),callback=func2) pool.apply_async(func=func,args=('cpdd',),callback=func2) pool.apply_async(func=func,args=('你好pd',)) pool.apply_async(func=func,args=('你好pdd',)) pool.apply_async(func=func,args=('你好cpdd',)) pool.apply_async(func=func,args=('再见pd',)) pool.apply_async(func=func,args=('再见pdd',)) pool.apply_async(func=func,args=('再见cpdd',)) pool.close() # 如果用with就不需要关闭了 pool.join()
函数式编程写法:
from multiprocessing import Pool import os from time import sleep def func1(name): print(f'当前进程的ID: {os.getpid()},{name}') sleep(2) return name if __name__ == '__main__': with Pool(5) as pool: args = pool.map(func1,('pd','pdd','cpdd','你好pd','你好pdd', '你好cpdd','再见pd','再见pdd','再见cpdd')) for a in args: print(a)
协程,全称是“协同程序”,用来实现任务协作。是一种在线程中,比线程更加轻量级的存在,由程序员自己写程序来管理。
当出现IO阻塞时,CPU一直等待IO返回,处于空转状态。这时候用协程,可以执行其他任务。当IO返回结果后,再回来处理数据。充分利用了IO等待的时间,提高了效率。
注:
asyncio协程是写爬虫比较好的方式。比多线程和多进程都好.开辟新的线程和进程是非常耗时的。
协程的缺点
async
async
用来声明一个函数为异步函数,异步函数的特点是能在函数执行过程中挂起,去执行其他异步函数,等到挂起条件(假设挂起条件是 sleep(5) )消失后,也就是5秒到了再回来执行await
用来用来声明程序挂起,比如异步程序执行到某一步时需要等待的时间很长,就将此挂起,去执行其他的异步程序。import time import asyncio async def func1(): for i in range(1,4): print(f'pd:第{i}次呼叫指挥部!!') await asyncio.sleep(1) return 'pd呼叫完毕...请回答' async def func2(): for k in range(1,4): print(f'指挥部:第{k}次呼叫PD!') await asyncio.sleep(1) return '指挥部呼叫完毕...请回答' async def main(): res = await asyncio.gather(func1(), func2()) # await异步执行func1方法 # gather会交替执行func1() 和 func2() # 返回值为函数的返回值列表 print(res) if __name__ == '__main__': start = time.time() asyncio.run(main()) end = time.time() print(f'运行时间:{end-start}')
回来执行
await
用来用来声明程序挂起,比如异步程序执行到某一步时需要等待的时间很长,就将此挂起,去执行其他的异步程序。import time import asyncio async def func1(): for i in range(1,4): print(f'pd:第{i}次呼叫指挥部!!') await asyncio.sleep(1) return 'pd呼叫完毕...请回答' async def func2(): for k in range(1,4): print(f'指挥部:第{k}次呼叫PD!') await asyncio.sleep(1) return '指挥部呼叫完毕...请回答' async def main(): res = await asyncio.gather(func1(), func2()) # await异步执行func1方法 # gather会交替执行func1() 和 func2() # 返回值为函数的返回值列表 print(res) if __name__ == '__main__': start = time.time() asyncio.run(main()) end = time.time() print(f'运行时间:{end-start}')