''' 并发和并行: 并发:当有多个线程在操作时,系统只有一个CPU,并不能真正实现多任务同时进行,它是把CPU划分成若干个时间段,任务顺序执行 并行:当系统有2个及以上CPU,则线程操作有可能并发,当一个CPU 执行一个线程,另一个CPU执行另一个线程,互不抢占资源 实现多任务的方式: 多进程模式 : 进程是程序的实体,对操作系统来说,一个任务就是一个进程。如打开2个记事本就启动2个进程 优点:稳定性高,一个进程崩溃了,不会影响其他进程 缺点:创建进程开销大,操作系统同时运行进程的数量有限 进程创建: linux下使用os模块fork函数来创建,windows下使用 multiprocessing模块中的process类来创建 from multiprocessing import Process process = Process(target=函数, name= 进程的名字,args=(给函数传递的参数)) process 对象 对象调用方法: process.start() 启动进程并执行任务 process.run() 只启动了进程,没有执行任务 terminate() 终止 多线程模式 协程 进程 --> 线程 --> 协程 ''' #进程创建 import os from multiprocessing import Process from time import sleep def task1(): while True: sleep(1) print('This is task1', os.getpid(),'----->',os.getppid()) def task2(): while True: sleep(1) print('This is task2', os.getpid(),'----->',os.getppid()) if __name__ == '__main__': print(os.getpid()) #子进程1 p = Process(target=task1,name='任务1') p.start() print(p.name) #子进程2 p1 = Process(target=task2,name='任务2') p1.start() print(p1.name)
带参数
#进程创建 import os from multiprocessing import Process from time import sleep def task1(s,name): while True: sleep(s) print('This is task1', os.getpid(),'----->',os.getppid(),name) def task2(s,name): while True: sleep(s) print('This is task2', os.getpid(),'----->',os.getppid(),name) number = 1 if __name__ == '__main__': print(os.getpid()) p = Process(target=task1,name='任务1',args=(1,'测试任务1')) p.start() print(p.name) p1 = Process(target=task2,name='任务2',args=(1,'测试任务2')) p1.start() print(p1.name) while True: number += 1 sleep(0.2) if number == 100: p.terminate() p1.terminate() break else: print(number) print('************')
''' 多进程对全局变量的访问:在每一个全局变量里都放一个m变量,保证每个进程访问变量互不干扰 每个进程都独立获得一份全局变量,互不影响各自的修改 ''' #进程创建 import os from multiprocessing import Process from time import sleep m= 1 #不可变类型 list1= [] #可变类型 def task1(s,name): global m while True: m+=1 list1.append(str(m)+'task1') sleep(s) print('This is task1---->',list1) #This is task1----> ['2task1', '3task1', '4task1', '5task1', '6task1', '7task1'] def task2(s,name): global m while True: m+=1 list1.append(str(m)+'task2') #This is task2----> ['2task2', '3task2', '4task2', '5task2', '6task2', '7task2'] sleep(s) print('This is task2---->',list1) number = 1 if __name__ == '__main__': print(os.getpid()) p = Process(target=task1,name='任务1',args=(1,'测试任务1')) p.start() p1 = Process(target=task2,name='任务2',args=(1,'测试任务2')) p1.start() while True: m += 1 list1.append((str(m)+'main')) sleep(0.5) print('This is main:---->', list1) #This is main:----> ['2main', '3main', '4main', '5main', '6main', '7main', '8main', '9main', '10main', '11main', '12main', '13main', '14main']
#进程:自定义 import os from multiprocessing import Process from time import sleep class MyProcess(Process): def __init__(self,name): super(MyProcess,self).__init__() self.name = name #重新run方法 def run(self): n = 1 while True: print(f'{self.name}---> 自定义进程,n:{n}') n += 1 if __name__ == '__main__': p = MyProcess('测试任务1') p.start() # 1, 先开一个新的进程,2,调用run()方法 p1 = MyProcess('测试任务2') p1.start()
非阻塞
''' 使用multiprocessing模块提供的pool方法: 初始化pool时可以指定一个最大进程数,当有新的请求提交到pool中时,如果池子还没满,就会创建一个新的进程来执行该请求 如果pool中进程数已达到指定的最大值,那么该请求就会等待,知道pool中有进程结束,才会创建新的进程来执行 非阻塞式: 全部添加到队列,立刻返回,并没有等待其它进程执行完毕才会结束。但是回调函数,等待任务完成后才去调用 阻塞式: ''' import os import time from multiprocessing import Pool from random import random from time import sleep #非阻塞 def task(task_name): print('开始做任务:',task_name) start = time.time() #使用sleep time.sleep(random()*2) #2s以内 end = time.time() # print('完成任务:',task_name,'用时:', (end - start),'进程ID:',os.getpid()) return '完成任务:',task_name,'用时:', (end - start),'进程ID:',os.getpid() container = [] def callback_func(n): container.append(n) if __name__ == '__main__': pool = Pool(5) tasks = ['唱歌','跳舞','听音乐','画画','练字','吹唢呐','听戏','游泳','跑步'] for k in tasks: pool.apply_async(task,args=(k,),callback=callback_func) pool.close() # 添加任务结束 pool.join() #任务结束前,阻止回到主进程 for c in container: print(c) print('over!!')
阻塞
''' 阻塞式:顺序执行,添加一个任务,执行一个任务,上一个任务不执行完成,下一个任务不会开始 进程池: pool = Pool(max) 创建进程池对象 pool.apply() 阻塞式 pool.apply_async() 非阻塞式 pool.close() 添加任务结束 pool.join() 让主进程让步,插队 ''' import os import time from multiprocessing import Pool from random import random #非阻塞 def task(task_name): print('开始做任务:',task_name) start = time.time() #使用sleep time.sleep(random()*2) #2s以内 end = time.time() print('完成任务:',task_name,'用时:', (end - start),'进程ID:',os.getpid()) # return '完成任务:',task_name,'用时:', (end - start),'进程ID:',os.getpid() # container = [] # def callback_func(n): # container.append(n) if __name__ == '__main__': pool = Pool(5) tasks = ['唱歌','跳舞','听音乐','画画','练字','吹唢呐','听戏','游泳','跑步'] for k in tasks: pool.apply(task,args=(k,)) #阻塞式 pool.close() # 添加任务结束 pool.join() #任务结束前,阻止回到主进程 print('over!!!!')
队列
from multiprocessing import Queue q = Queue(5) #设置队列长度为5 q.put('A') q.put('B') q.put('C') q.put('D',timeout=2) #加timeout,超时不等 q.put('E') #put()如果队列满了,就只能等待,除非有空地,则添加成功 print(q.qsize()) if not q.full(): #q.full()判断队列是否已满,q.empty()判断队列是否是空的 q.put('F') else: print('Queue is full') #获取队列的值 print(q.get(timeout=2)) print(q.get(timeout=2)) print(q.get(timeout=2)) print(q.get(timeout=2)) print(q.get(timeout=2)) # print(q.get(timeout=2)) #不等待,这样就不阻塞 q.put_nowait('G') print(q.get_nowait())
''' 进程间通讯: 通过使用公共对象q = Queue(5),保证任务在同一个队列里面 ''' from multiprocessing import Process,Queue from time import sleep #将队列q作为对象传递 def download(q): images = ['girl.jpg','boy.jpg','lady.jpg'] for image in images: print("Downloading:",image) sleep(0.5) q.put(image) def getfile(q): while True: try: file = q.get(timeout=2) #超时2秒无内容则不去取 print(f'{file} save success!!') except: print('All file downloaded successfully!!') break if __name__ == '__main__': q = Queue(5) #q最为参数 p1 = Process(target=download,args=(q,)) p2 = Process(target=getfile,args=(q,)) p1.start() p1.join() p2.start() p2.join() print('验证任务完成,回到主线程!')
线程状态:
''' 多线程: 进程 :是一个正在执行中的程序,每一个进程执行都有一个执行顺序,该顺序是一个执行路径,或者叫一个控制单元; 线程:就是进程中的一个独立控制单元,线程在控制着进程的执行。一个进程中至少有一个进程。 多线程:一个进程中不只有一个线程。 如何创建和使用线程: 调用threading函数来创建线程对象 线程的状态:新建 --> 就绪 --> 运行 --> 结束 | 阻塞 ''' import threading from time import sleep #进程 Process #线程 Thread def download(n): images = ['girl.jpg','boy.jpg','lady.jpg'] for image in images: print("Downloading:",image) sleep(n) print(f'{image}保存成功!!') def listenMusic(n): musics = ['My heart will go on','My love','吻别','梦回唐朝'] for music in musics: sleep(n) print(f'正在听{music}!!') if __name__ == '__main__': #创建线程对象 t = threading.Thread(target=download,name='aa',args=(1,)) t.start() t1 = threading.Thread(target=listenMusic,name='bb',args=(1,)) t1.start() # n = 1 # while True: # print(n) # n+=1 # sleep(1.5)
''' 线程可以共享全局变量 GIL 全局解释器锁 python底层只要用线程就默认加锁 GIL 线程异步 --> 共享数据会导致数据不安全 线程同步 --> 导致的是效率低,但数据安全 运算数据量大时,会自动释放锁GIL 线程:耗时操作时用,爬虫,IO 进程:计算密集型 ''' import threading from time import sleep #进程 Process #线程 Thread money = 1000 def run1(): global money for i in range(100): sleep(0.1) money -= 1 # def run2(): # global money # for i in range(100): # money -= 1 if __name__ == '__main__': #创建线程对象 4个线程 t = threading.Thread(target=run1,name='th1') t1 = threading.Thread(target=run1,name='th2') t2 = threading.Thread(target=run1,name='th3') t3 = threading.Thread(target=run1,name='th4') t.start() t1.start() t2.start() t3.start() #阻塞 t.join() t1.join() t2.join() t3.join() print('money:',money) #600, 共享同一个全局变量
当计算大数据量时,自动释放锁
import threading from time import sleep #进程 Process #线程 Thread n = 0 def task1(): global n for i in range(1000000): n += 1 print('------->task1:',n) print('------------') sleep(1) def task2(): global n for i in range(1000000): n += 1 print('------->task2:',n) print('------------') sleep(1) if __name__ == '__main__': #创建线程对象 4个线程 t = threading.Thread(target=task1,name='th1') t1 = threading.Thread(target=task2,name='th3') t.start() t1.start() #阻塞 t.join() t1.join() print('over') #运行结果 ------->task1: 1000000 ------------ ------->task2: 1647975 # 释放了锁 ------------ over
''' 共享数据存在不安全性: 如果多个线程同时对数据进行修改,则可能出现不可预料的结果,为了保证数据的正确性,需要的对多个线程进行同步 同步:一个接一个的来做任务,一个做完,另一个才能进来,效率降低 使用Thread对象的lock和Rlock可以实现简单的线程同步,这2个对象都有acquire和release 方法,对应那些需要每次只允许一个线程操作的数据,可以将其操作放到acquire和release之间。 多线程的优势在于可以同时运行多个任务,但是当线程需要共享数据时,可能存在数据不同步的问题,为了避免这种情况,引入了锁的概念。 lock = threading.Lock() lock.acquire() #请求得到锁 ........ lock.release() #释放锁 只要不释放锁,其他线程都无法进入运行状态 ''' import threading from time import sleep import random #加锁 lock = threading.Lock() list1 = [0] *10 #列表里有10个0 def task1(): #获取线程锁,如果已经上锁,则等待锁的释放 lock.acquire() #阻塞 for i in range(len(list1)): list1[i] = 1 sleep(0.5) lock.release() #释放 def task2(): #获取线程锁,如果已经上锁,则等待锁的释放 lock.acquire() #阻塞 for i in range(len(list1)): print('--->',list1[i]) sleep(0.5) lock.release() #释放 if __name__ == '__main__': t1 = threading.Thread(target=task1) t2 = threading.Thread(target=task2) t2.start() t1.start() t2.join() t1.join() print(list1)
''' 死锁: 开发过程中使用线程,在线程共享多个资源时,如果2个线程分别占有一部分资源,并且同时等待对方的资源,就会造成死锁 尽管死锁很少发生,但一旦发生就会造成应用的停止响应,程序不做任何事情 避免出现死锁的方法:资源分配不当导致死锁 1,重构代码 2,acquire() 加timeout,利用时间差来释放锁 ''' from threading import Thread,Lock from time import sleep lockA = Lock() lockB = Lock() class MyThread1(Thread): def run(self):#start if lockA.acquire():#如果可以获取到锁,则返回True print(self.name + '获取到了A锁') sleep(0.2) if lockB.acquire(timeout=4): #阻塞 print(self.name + '又试图获取到了B锁,原来还有A锁的存在') #因为有sleep,A锁已经被线程1占有 lockB.release() lockA.release() class MyThread2(Thread): def run(self):#start if lockB.acquire():#如果可以获取到锁,则返回True print(self.name + '获取到了B锁') sleep(0.2) if lockA.acquire(timeout=2):#阻塞 print(self.name + '又试图获取到了A锁,原来还有B锁的存在') #因为有sleep,A锁已经被线程1占有 lockA.release() lockB.release() if __name__ == '__main__': t1 = MyThread1() t2 = MyThread2() t1.start() t2.start() #运行结果 Thread-1获取到了A锁 Thread-2获取到了B锁 Thread-1又试图获取到了B锁,原来还有A锁的存在
''' 生产者与消费者:两个线程之间的通讯 python的queue模块中提供了同步的,线程安全的队列类,包括FIFO(先进先出)队列Queue,LIFO(后进先出)队列LifoQueue,和 优先级队列PriorityQueue,这些都实现了锁原理(可以理解为原子操作,要么不做,要么做完)。能够在多线程中直接使用。 可以使用队列来实现线程间的同步。 ''' import threading import queue import time import random def produce(q): i=0 while i<10: num = random.randint(1,100) q.put('生产者生产数据:%d'%num) print('生产者生产数据:%d'%num) time.sleep(1) i+=1 q.put(None) #完成任务 q.task_done() def consume(q): while True: item = q.get() arr.append(item) if item is None: break print(f'消费者获取到:{arr}') time.sleep(4) #完成任务 q.task_done() if __name__ == '__main__': q = queue.Queue(10) arr = [] #创建生产者 th = threading.Thread(target=produce,args=(q,)) th.start() #创建消费者 tc = threading.Thread(target=consume,args=(q,)) tc.start() th.join() tc.join() print('over!!')
使用生成器来完成协程任务
''' 协程:微线程 进程 --》线程 --》 协程 耗时操作--> 用协程 网络请求 网络下载(下载) IO 操作:文件读写 阻塞动作 生成器:yield ''' import time def task1(): for i in range(3): print('A'+ str(i)) yield time.sleep(0.1) def task2(): for i in range(3): print('B'+ str(i)) yield time.sleep(0.1) if __name__ == '__main__': g1 = task1() g2 = task1() while True: try: next(g1) next(g2) except: break
使用greenlet来完成协程任务
''' greenlet : 完成协程任务 ''' import time from greenlet import greenlet def task1(): for i in range(3): print('A'+ str(i)) g2.switch() time.sleep(0.1) def task2(): for i in range(3): print('B'+ str(i)) g3.switch() time.sleep(0.1) def task3(): for i in range(3): print('C'+ str(i)) g1.switch() time.sleep(0.1) if __name__ == '__main__': g1 = greenlet(task1) g2 = greenlet(task2) g3 = greenlet(task3) g1.switch()
使用gevent来完成协程任务
''' gevent: greenlet已经实现了协程任务,但是需要人工切换,不是很智能 gevent 完美解决greenlet的不足 原理: 当一个greenlet遇到IO(input output,比如网络,文件操作等)操作时,就自动切换到其他greenlet,等到IO完成,再切回来继续执行 由于IO 操作非常耗时,经常使程序处于等待状态,有了gevent就可以自动完成切换协程,保证了总有greenlet在运行,而不是等待IO ''' import time from greenlet import greenlet import gevent from gevent import monkey #猴子补丁,替换了sleep的动作,不然就是单个函数顺序执行 monkey.patch_all() def task1(): for i in range(3): print('A'+ str(i)) time.sleep(0.1) def task2(): for i in range(3): print('B'+ str(i)) time.sleep(0.1) def task3(): for i in range(3): print('C'+ str(i)) time.sleep(0.1) if __name__ == '__main__': g1 = gevent.spawn(task1) g2 = gevent.spawn(task2) g3 = gevent.spawn(task3) g1.join() g2.join() g2.join() print('---------')
案例
import time,requests import gevent from gevent import monkey import urllib.request #猴子补丁,检测到耗时操作则切换 monkey.patch_all() def download(url): # response = requests.get(url) # content = response.text response = urllib.request.urlopen(url) content = response.read() print(f'下载了{url}的数据,长度{len(content)}') if __name__ == '__main__': urls = ['http://www.baidu.com','http://mail.126.com','http://cn.bing.com/?mkt=zh-CN',] g1 = gevent.spawn(download,urls[0]) g2 = gevent.spawn(download,urls[1]) g3 = gevent.spawn(download,urls[2]) g1.join() g2.join() g3.join() print('---------')