关于基本概念部分这里不再详述,可以参考之前的文章或者自行查阅相关文章。
由于python中线程的创建、属性和方法和进程很相似,这里也不再讲解。
这里重点讲解下多线程访问共享数据的相关问题。
先看下示例:预测并执行看下结果是否和预测一致
from threading import Thread sum = 0 def minus(): global sum for i in range(1000000): sum += 1 def plus(): global sum for i in range(1000000): sum -= 1 if __name__ == '__main__': t1 = Thread(target=minus) t2 = Thread(target=plus) t1.start() t2.start() t1.join() t2.join() print('sum=',sum)
如何解决多线程下共享数据的安全问题呢?
互斥锁:Lock
import time import threading R = threading.Lock() def sub(): global num R.acquire() # 加锁,保证同一时刻只有一个线程可以修改数据 num -= 1 R.release() # 修改完成就可以解锁 time.sleep(1) num = 100 # 定义一个全局变量 l = [] # 定义一个空列表,用来存放所有的列表 def main(): for i in range(100): # for循环100次 t = threading.Thread(target=sub) # 每次循环开启一个线程 t.start() # 开启线程 l.append(t) # 将线程加入列表l for i in l: i.join() # 这里加上join保证所有的线程结束后才运行下面的代码 print(num) if __name__ == '__main__': main()
递归锁(可重入):RLock
import time import threading lock = threading.RLock() def minus1(): global num2 lock.acquire() num2 -= 1 lock.release() def minus2(): global num1 # 在每个线程中都获取这个全局变量 lock.acquire() minus1() time.sleep(1) num1 -= 1 # 对此公共变量进行-1操作 print('%s--get num1:%s,num2:%s' % (threading.current_thread().name, num1, num2)) lock.release() num1, num2 = 5, 9 # 设定共享变量 thread_list = [] if __name__ == '__main__': for i in range(5): t = threading.Thread(target=minus2) t.start() thread_list.append(t) while threading.active_count() != 1: print(threading.active_count()) time.sleep(1) else: print('----all threads done---') print('final num:', num1, num2)
import time import threading semaphore = threading.BoundedSemaphore(3) def task(): semaphore.acquire() print('{}---running-{}'.format(threading.current_thread().name, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))) time.sleep(1) semaphore.release() if __name__ == '__main__': l = [] for i in range(10): t = threading.Thread(target=task, name='task' + str(i)) t.start() l.append(t) for i in l: i.join()
import threading, time, random events = threading.Event() def lighter(): if not events.isSet(): events.set() # 初始化绿灯Event set counter = 0 while True: if counter < 5: print('\033[42;0mGreen is lighten...\033[0m') elif counter < 10: if events.isSet(): events.clear() print('\033[41;0mRed is lighten...\033[0m') else: counter = 0 print('\033[42;1m--green light on---\033[0m') events.set() time.sleep(1) counter += 1 def car(i): while True: if events.isSet(): print("car[%s] is running..." % i) time.sleep(random.randrange(10)) else: print('car is waiting green lighten...') events.wait() if __name__ == '__main__': lighter1 = threading.Thread(target=lighter) lighter1.start() for i in range(3): t = threading.Thread(target=car, args=(i,)) t.start()
所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程
在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源时,就会造成死锁。
尽管死锁很少发生,但一旦发生就会造成应用的停止响应。
示例代码:
import time from threading import Thread, Lock la = Lock() lb = Lock() def task1(): if la.acquire(): print('task1获取锁A') time.sleep(1) if lb.acquire(): print('task1获取锁B') lb.release() print('task1释放锁B') la.release() print('task1释放锁A') def task2(): if lb.acquire(): print('task2获取锁B') time.sleep(1) if la.acquire(): print('task2获取锁A') la.release() print('task2释放锁A') lb.release() print('task2释放锁B') if __name__ == '__main__': t1 = Thread(target=task1) t2 = Thread(target=task2) t1.start() t2.start() t1.join() t2.join()
解决死锁
添加超时时间
import time from threading import Thread, Lock la = Lock() lb = Lock() def task1(): if la.acquire(): print('task1获取锁A') time.sleep(1) if lb.acquire(timeout=5): print('task1获取锁B') lb.release() print('task1释放锁B') la.release() print('task1释放锁A') def task2(): if lb.acquire(): print('task2获取锁B') time.sleep(1) if la.acquire(timeout=4): print('task2获取锁A') la.release() print('task2释放锁A') lb.release() print('task2释放锁B') if __name__ == '__main__': t1 = Thread(target=task1) t2 = Thread(target=task2) t1.start() t2.start() t1.join() t2.join()
银行家算法:换没有学习,感兴趣的话自行查阅相关文档
线程间通信也是通过Queue来完成,这里不在讲解。
示例代码:
import queue import random import time from threading import Thread def produce(q): i = 0 while i < 10: num = random.randint(1, 100) q.put(num) print('生产者生产数据:%d' % num) time.sleep(0.5) i += 1 q.put(None) # 任务结束 q.task_done() def consume(q, n): while True: item = q.get() if item is None: break print('%s获取:%d' % (n, item)) time.sleep(1) # 任务结束 q.put(None) q.task_done() if __name__ == '__main__': q = queue.Queue(10) # 创建生产者 tp = Thread(target=produce, args=(q,)) tp.start() # 创建消费者 tc1 = Thread(target=consume, args=(q, '消费者1')) tc2 = Thread(target=consume, args=(q, '消费者2')) tc1.start() tc2.start() tp.join() tc1.join() tc2.join()
协程,又称微线程,纤程,是一种用户态的轻量级线程。
协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。
因此:协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态(进入上一次离开时所处逻辑流的位置)
协程与线程类似,每个协程表示一个执行单元,既有自己的本地数据,也与其他协程共享全局数据和其他资源。
协程存在于线程中,需要用户来编写调度逻辑,对CPU而言,不需要考虑协程如何调度,切换上下文。
"原子操作(atomic operation)是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何上下文切换 (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。
greenlet是一个用C实现的协程模块,相比与python自带的yield,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generator。
greenlet的工作流程:进行访问网络的IO操作时,出现阻塞,greenlet就显式切换到另一段没有被阻塞的代码执行,直到原来的阻塞状况消失以后,再切换回原来代码段继续处理。因此,greenlet是一种合理安排的串行方法。
greenlet.switch()可实现协程的切换,greenlet并不能实现自动切换。
示例:模拟2个耗时任务
import time from greenlet import greenlet def task1(): for i in range(5): print('A ' + str(i)) g2.switch() # 模拟耗时操作 time.sleep(0.1) def task2(): for i in range(5): print('B ' + str(i)) g1.switch() # 模拟耗时操作 time.sleep(0.1) if __name__ == '__main__': g1 = greenlet(task1) g2 = greenlet(task2) g1.switch()
gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程。gevent是对greenlet进行封装,实现协程的自动切换。
通过gevent.sleep模仿IO操作,实现协程的切换。
示例:
import time import gevent def task1(): for i in range(5): print('A ' + str(i)) # 模拟耗时操作 time.sleep(0.1) def task2(): for i in range(5): print('B ' + str(i)) # 模拟耗时操作 time.sleep(0.1) if __name__ == '__main__': g1 = gevent.spawn(task1) g2 = gevent.spawn(task2) g1.join() g2.join()
协程切换是在IO操作时自动完成,在启动时通过monkey.patch_all()实现将一些常见的阻塞,如socket,select,urllib等地方实现协程跳转,因为gevent并不能完全识别所有当前操作是否为IO操作,而未切换。
示例:
import time import gevent from gevent import monkey monkey.patch_all() def task1(): for i in range(5): print('A ' + str(i)) # 模拟耗时操作 time.sleep(0.1) def task2(): for i in range(5): print('B ' + str(i)) # 模拟耗时操作 time.sleep(0.1) if __name__ == '__main__': g1 = gevent.spawn(task1) g2 = gevent.spawn(task2) g1.join() g2.join()
示例:模拟爬取3个网页
from gevent import monkey monkey.patch_all() import requests import gevent def download(url): resp = requests.get(url) print('下载了{}的数据, 长度:{}'.format(url, len(resp.text))) if __name__ == '__main__': urls = ['https://www.baidu.com', 'https://www.163.com', 'https://www.qq.com'] arr = [] for url in urls: g = gevent.spawn(download, url) arr.append(g) gevent.joinall(arr)
gevent还提供对池的支持,当拥有动态数量的greenlet需要进行并发管理(限制并发数)时,就可以使用池,在处理大量的网络或IO操作时非常重要。
示例:注意python版本
from gevent import monkey monkey.patch_all() from gevent.pool import Pool import requests def run_task(url): print('Visit --> %s'% url) try: res = requests.get(url) data = res.text print('%s bytes received from %s' %(len(data),url)) except Exception as value: print(value) return 'url:%s --> finished' % url if __name__ == '__main__': urls = ['https://www.baidu.com/','https://github.com/','https://www.python.org/'] pool = Pool(2) result = pool.map(run_task, urls) print(result)
参考地址:
参考视频:https://www.bilibili.com/video/BV1R7411F7JV p274~281
源代码仓库地址:https://gitee.com/gaogzhen/python-study
QQ群:433529853