计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,
计算密集型任务同时进行的数量应当等于CPU的核心数。(多进程)
涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。
对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C语言最差。
进程就是一个程序在一个数据集上的一次动态执行过程。进程一般由程序、数据集、进程控制块三部分组成。我们编写的程序用来描述进程要完成哪些功能以及如何完成;数据集则是程序在执行过程中所需要使用的资源;进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志。
进程的生命周期:创建(New)、就绪(Runnable)、运行(Running)、阻塞(Block)、销毁(Destroy)
进程的状态(分类):(Actived)活动进程、可见进程(Visiable)、后台进程(Background)、服务进程(Service)、空进程
fork: os.fork() 函数在 Windows 系统上无效,只在 UNIX 及类 UNIX 系统上(包括UNIX、Linux 和 Mac OS X)效。
import os print('父进程 ID =', os.getpid()) # 创建一个子进程,下面代码会被两个进程执行 pid = os.fork() print('当前进程 ID =',os.getpid()," pid=",pid) #根据 pid 值,分别为子进程和父进程布置任务 if pid == 0: print('子进程, ID=',os.getpid()," 父进程 ID=",os.getppid()) else: print('父进程, ID=',os.getpid()," pid=",pid) #pid为0的代表子进程。 """ 其中,pid 作为函数的返回值,主进程和子进程都会执行该语句,但主进程执行 fork() 函数得到的 pid 值为非 0 值(其实是子进程的进程 ID),而子进程执行该语句得到的 pid 值为 0。因此,pid 常常作为区分父进程和子进程的标志。 在大多数操作系统中,都会为执行的进程配备唯一的 ID 号,os 模块提供了 getpid() 和 getppid() 函数,可分别用来获取当前进程的 ID 号和父进程的 ID 号。 """
缺点:
1.兼容性差,只能在类linux系统下使用,windows系统不可使用;
2.扩展性差,当需要多条进程的时候,进程管理变得很复杂;
3.会产生“孤儿”进程和“僵尸”进程,需要手动回收资源。
优点:
是系统自带的接近低层的创建方式,运行效率高。
Process进程
# -*- coding: utf-8 -*- import os from multiprocessing import Process import time def fun(name): print("2 子进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid())) print("hello " + name) def test(): print('ssss') if __name__ == "__main__": print("1 主进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid())) ps = Process(target=fun, args=('jingsanpang', )) print("111 ##### ps pid: " + str(ps.pid) + ", ident:" + str(ps.ident)) print("3 进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid())) print(ps.is_alive()) # 启动之前 is_alive为False(系统未创建) ps.start() print(ps.is_alive()) # 启动之后,is_alive为True(系统已创建) print("222 #### ps pid: " + str(ps.pid) + ", ident:" + str(ps.ident)) print("4 进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid())) ps.join() # 等待子进程完成任务 类似于os.wait() print(ps.is_alive()) print("5 进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid())) ps.terminate() #终断进程 print("6 进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
特点:
主进程执行完后会默认等待子进程结束后回收资源,不需要手动回收资源;join()函数用来控制子进程结束的顺序,其内部也有一个清除僵尸进程的函数,可以回收资源;
Process进程创建时,子进程会将主进程的Process对象完全复制一份,这样在主进程和子进程各有一个 Process对象,但是p.start()启动的是子进程,主进程中的Process对象作为一个静态对象存在,不执行。
当子进程执行完毕后,会产生一个僵尸进程,其会被join函数回收,或者再有一条进程开启,start函数也会回收僵尸进程,所以不一定需要写join函数。
windows系统在子进程结束后会立即自动清除子进程的Process对象,而linux系统子进程的Process对象如果没有join函数和start函数的话会在主进程结束后统一清除。
注意:Process对象可以创建进程,但Process对象不是进程,其删除与否与系统资源是否被回收没有直接的关系。
另外还可以通过继承Process对象来重写run方法创建进程
进程池POOL(多个进程)
apply 阻塞式
import time from multiprocessing import Pool def run(msg): print('msg:%s' %msg) 程序随眠3秒, time.sleep(3) print('end') if __name__ == "__main__": print("开始执行主程序") start_time=time.time() # 使用进程池创建子进程 size=3 pool=Pool(size) print("开始执行子进程") for i in range(size): pool.apply(run,(i,)) print("主进程结束耗时%s"%(time.time()-start_time)) """ 开始执行主程序 开始执行子进程 msg:0 end msg:1 end msg:2 end 主进程结束耗时9.223527431488037 """
进程开始运行,碰到子进程,操作系统切换到子进程,等待子进程运行结束后,再切换到另外一个子进程,直到所有子进程运行完毕。然后在切换到主进程,运行剩余的部分。
apply_async 异步非阻塞
import time from multiprocessing import Pool def run(msg): print('msg:%s' %msg) # 程序随眠3秒, time.sleep(3) print('end') if __name__ == "__main__": print("开始执行主程序") start_time=time.time() # 使用进程池创建子进程 size=3 pool=Pool(size) print("开始执行子进程") for i in range(size): pool.apply_async(run,(i,)) print("主进程结束耗时%s"%(time.time()-start_time)) """ 开始执行主程序 开始执行子进程 主进程结束耗时0.06100344657897949 """
主进程开始运行,碰到子进程后,主进程说:让我先运行个够,等到操作系统进行进程切换的时候,再交给子进程运行。因为我们的程序太短,还没等到操作系统进行进程切换,主进程就运行完毕了。
python主要是通过thread和threading这两个模块来实现多线程支持。python的thread模块是比较底层的模块,python的threading模块是对thread做了一些封装,可以更加方便的被使用。但是python(cpython)由于GIL的存在无法使用threading充分利用CPU资源,如果想充分发挥多核CPU的计算能力需要使用multiprocessing模块(Windows下使用会有诸多问题)。
GIL 限制了同一时刻只能有一个线程运行,无法发挥多核 CPU 的优势。首先需要明确的一点是 GIL 并不是 Python 的特性,它是在实现 Python 解析器(CPython)时所引入的一个概念。就好比 C++ 是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++ ,Visual C++等。Python 也一样,同样一段代码可以通过 CPython,PyPy,Psyco 等不同的 Python 执行环境来执行。像其中的 JPython 就没有GIL。然而因为 CPython 是大部分环境下默认的 Python 执行环境。所以在很多人的概念里 CPython 就是 Python,也就想当然的把 GIL 归结为 Python 语言的缺陷。所以这里要先明确一点:GIL 并不是 Python 的特性,Python 完全可以不依赖于 GIL。
GIL 本质就是一把互斥锁,既然是互斥锁,所有互斥锁的本质都一样,都是将并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全.在一个 Python 的进程内,不仅有主线程或者由该主线程开启的其他线程,还有解释器开启的垃圾回收等解释器级别的线程,总之,所有线程都运行在这一个进程内,所有数据都是共享的,这其中,代码作为一种数据也是被所有线程共享的,多个线程先访问到解释器的代码,即拿到执行权限,然后将 target 的代码交给解释器的代码去执行,
解释器的代码是所有线程共享的,所以垃圾回收线程也可能访问到解释器的代码而去执行,这就导致了一个问题:对于同一个数据 100,可能线程 1 执行 x=100 的同时,而垃圾回收执行的是回收 100 的操作,解决这种问题没有什么高明的方法,就是加锁处理,即 GIL。
线程是指进程内的一个执行单元,也是进程内的可调度实体.
import time import threading def task_thread(counter): print(f'线程名称:{threading.current_thread().name} 参数:{counter} 开始时间:{time.strftime("%Y-%m-%d %H:%M:%S")}') num = counter while num: time.sleep(3) num -= 1 print(f'线程名称:{threading.current_thread().name} 参数:{counter} 结束时间:{time.strftime("%Y-%m-%d %H:%M:%S")}') if __name__ == '__main__': print(f'主线程开始时间:{time.strftime("%Y-%m-%d %H:%M:%S")}') #初始化3个线程,传递不同的参数 t1 = threading.Thread(target=task_thread, args=(3,)) t2 = threading.Thread(target=task_thread, args=(2,)) t3 = threading.Thread(target=task_thread, args=(1,)) #开启三个线程 t1.start() t2.start() t3.start() #等待运行结束 t1.join() t2.join() t3.join() print(f'主线程结束时间:{time.strftime("%Y-%m-%d %H:%M:%S")}') """ 主线程开始时间:2022-03-02 12:49:20 线程名称:Thread-1 参数:3 开始时间:2022-03-02 12:49:20 线程名称:Thread-2 参数:2 开始时间:2022-03-02 12:49:20 线程名称:Thread-3 参数:1 开始时间:2022-03-02 12:49:20 线程名称:Thread-3 参数:1 结束时间:2022-03-02 12:49:23 3 线程名称:Thread-2 参数:2 结束时间:2022-03-02 12:49:26 6 线程名称:Thread-1 参数:3 结束时间:2022-03-02 12:49:29 9 体现了GIL锁机制 主线程结束时间:2022-03-02 12:49:29 """
import time import threading class MyThread(threading.Thread): def __init__(self, counter): super().__init__() self.counter = counter def run(self): print( f'线程名称:{threading.current_thread().name} 参数:{self.counter} 开始时间:{time.strftime("%Y-%m-%d %H:%M:%S")}' ) counter = self.counter while counter: time.sleep(3) counter -= 1 print( f'线程名称:{threading.current_thread().name} 参数:{self.counter} 结束时间:{time.strftime("%Y-%m-%d %H:%M:%S")}' ) if __name__ == "__main__": print(f'主线程开始时间:{time.strftime("%Y-%m-%d %H:%M:%S")}') # 初始化3个线程,传递不同的参数 t1 = MyThread(3) t2 = MyThread(2) t3 = MyThread(1) # 开启三个线程 t1.start() t2.start() t3.start() # 等待运行结束 t1.join() t2.join() t3.join() print(f'主线程结束时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
# 传参 import time import threading def task_thread(counter): print(f'线程名称:{threading.current_thread().name} 参数:{counter} 开始时间:{time.strftime("%Y-%m-%d %H:%M:%S")}') num = counter while num: time.sleep(3) num -= 1 print(f'线程名称:{threading.current_thread().name} 参数:{counter} 结束时间:{time.strftime("%Y-%m-%d %H:%M:%S")}') class MyThread(threading.Thread): def __init__(self, target, args): super().__init__() self.target = target self.args = args def run(self): self.target(*self.args) if __name__ == "__main__": print(f'主线程开始时间:{time.strftime("%Y-%m-%d %H:%M:%S")}') # 初始化3个线程,传递不同的参数 t1 = MyThread(target=task_thread,args=(3,)) t2 = MyThread(target=task_thread,args=(2,)) t3 = MyThread(target=task_thread,args=(1,)) # 开启三个线程 t1.start() t2.start() t3.start() # 等待运行结束 t1.join() t2.join() t3.join() print(f'主线程结束时间:{time.strftime("%Y-%m-%d %H:%M:%S")}')
import time, threading num = 0 lock = threading.Lock() def task_thread(n): global num # 获取锁,用于线程同步 """ 为了保证数据的正确性,需要使用互斥锁对多个线程进行同步,限制当一个线程正在访问数据时,其他只能等待,直到前一线程释放锁。使用 threading.Thread 对象的 Lock 和 Rlock 可以实现简单的线程同步,这两个对象都有 acquire 方法和 release 方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到 acquire 和 release 方法之间。 """ lock.acquire() for i in range(1000000): num = num + n num = num - n #释放锁,开启下一个线程 lock.release() t1 = threading.Thread(target=task_thread, args=(6,)) t2 = threading.Thread(target=task_thread, args=(17,)) t3 = threading.Thread(target=task_thread, args=(11,)) t1.start(); t2.start(); t3.start() t1.join(); t2.join(); t3.join() print(num) # 结果为0
import threading import time # 同时只有5个人办理业务 semaphore = threading.BoundedSemaphore(5) # 模拟银行业务办理 def yewubanli(name): semaphore.acquire() time.sleep(3) print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} {name} 正在办理业务") semaphore.release() thread_list = [] for i in range(12): t = threading.Thread(target=yewubanli, args=(i,)) thread_list.append(t) for thread in thread_list: thread.start() for thread in thread_list: thread.join() # while threading.active_count() != 1: # time.sleep(1) """ 2018-07-08 12:33:57 4 正在办理业务 2018-07-08 12:33:57 1 正在办理业务 2018-07-08 12:33:57 3 正在办理业务 2018-07-08 12:33:57 0 正在办理业务 2018-07-08 12:33:57 2 正在办理业务 -------------------------- 5 2018-07-08 12:34:00 7 正在办理业务 2018-07-08 12:34:00 5 正在办理业务 2018-07-08 12:34:00 6 正在办理业务 2018-07-08 12:34:00 9 正在办理业务 2018-07-08 12:34:00 8 正在办理业务 -------------------------- 5 2018-07-08 12:34:03 11 正在办理业务 2018-07-08 12:34:03 10 正在办理业务 """
条件对象能让一个线程 A 停下来,等待其他线程 B ,线程 B 满足了某个条件后通知(notify)线程 A 继续运行。线程首先获取一个条件变量锁,如果条件不足,则该线程等待(wait)并释放条件变量锁,如果满足就执行线程,也可以通知其他状态为 wait 的线程。其他处于 wait 状态的线程接到通知后会重新判断条件。
import threading class Boy(threading.Thread): def __init__(self, cond, name): super(Boy, self).__init__() self.cond = cond self.name = name def run(self): self.cond.acquire() print(self.name + ": 嫁给我吧!?") self.cond.notify() # 唤醒一个挂起的线程,让hanmeimei表态 self.cond.wait() # 释放内部所占用的琐,同时线程被挂起,直至接收到通知被唤醒或超时,等待hanmeimei回答 print(self.name + ": 我单下跪,送上戒指!") self.cond.notify() self.cond.wait() print(self.name + ": Li太太,你的选择太明治了。") self.cond.release() class Girl(threading.Thread): def __init__(self, cond, name): super(Girl, self).__init__() self.cond = cond self.name = name def run(self): self.cond.acquire() self.cond.wait() # 等待Lilei求婚 print(self.name + ": 没有情调,不够浪漫,不答应") self.cond.notify() self.cond.wait() print(self.name + ": 好吧,答应你了") self.cond.notify() self.cond.release() cond = threading.Condition() boy = Boy(cond, "LiLei") girl = Girl(cond, "HanMeiMei") girl.start() boy.start()
事件用于线程间通信。一个线程发出一个信号,其他一个或多个线程等待,调用 event 对象的 wait 方法,线程则会阻塞等待,直到别的线程 set 之后,才会被唤醒。上面求婚哥的例子使用 Event 代码如下:
import threading, time class Boy(threading.Thread): def __init__(self, cond, name): super(Boy, self).__init__() self.cond = cond self.name = name def run(self): print(self.name + ": 嫁给我吧!?") self.cond.set() # 唤醒一个挂起的线程,让hanmeimei表态 time.sleep(0.5) self.cond.wait() print(self.name + ": 我单下跪,送上戒指!") self.cond.set() time.sleep(0.5) self.cond.wait() self.cond.clear() print(self.name + ": Li太太,你的选择太明治了。") class Girl(threading.Thread): def __init__(self, cond, name): super(Girl, self).__init__() self.cond = cond self.name = name def run(self): self.cond.wait() # 等待Lilei求婚 self.cond.clear() print(self.name + ": 没有情调,不够浪漫,不答应") self.cond.set() time.sleep(0.5) self.cond.wait() print(self.name + ": 好吧,答应你了") self.cond.set() cond = threading.Event() boy = Boy(cond, "LiLei") girl = Girl(cond, "HanMeiMei") boy.start() girl.start()
Python中线程与进程使用的同一模块 multiprocessing。使用方法也基本相同,唯一不同的是,from multiprocessing import Pool 这样导入的 Pool 表示的是进程池,from multiprocessing.dummy import Pool这样导入的 Pool表示的是线程池。这样就可以实现线程里面的并发了。
线程池实例:
from multiprocessing.dummy import Pool as ThreadPool import time def fun(n): time.sleep(2) start = time.time() for i in range(5): fun(i) print("单线程顺序执行耗时:", time.time() - start) start2 = time.time() # 开8个 worker,没有参数时默认是 cpu 的核心数 pool = ThreadPool(processes=2) # 在线程中执行 urllib2.urlopen(url) 并返回执行结果 results2 = pool.map(fun, range(5)) pool.close() pool.join() print("线程池(5)并发执行耗时:", time.time() - start2)